001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.List;
025import java.util.Map;
026import java.util.Optional;
027import java.util.concurrent.atomic.AtomicInteger;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.Cell;
030import org.apache.hadoop.hbase.HBaseConfiguration;
031import org.apache.hadoop.hbase.HBaseTestingUtil;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.client.Admin;
034import org.apache.hadoop.hbase.client.Delete;
035import org.apache.hadoop.hbase.client.Durability;
036import org.apache.hadoop.hbase.client.Get;
037import org.apache.hadoop.hbase.client.Put;
038import org.apache.hadoop.hbase.client.Result;
039import org.apache.hadoop.hbase.client.Table;
040import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
041import org.apache.hadoop.hbase.testclassification.MediumTests;
042import org.apache.hadoop.hbase.util.Bytes;
043import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
044import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
045import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
046import org.apache.hadoop.hbase.wal.WALEdit;
047import org.junit.jupiter.api.AfterAll;
048import org.junit.jupiter.api.BeforeAll;
049import org.junit.jupiter.api.BeforeEach;
050import org.junit.jupiter.api.Tag;
051import org.junit.jupiter.api.Test;
052
053@Tag(CoprocessorTests.TAG)
054@Tag(MediumTests.TAG)
055public class TestRegionObserverBypass {
056
057  private static HBaseTestingUtil util;
058  private static final TableName tableName = TableName.valueOf("test");
059  private static final byte[] dummy = Bytes.toBytes("dummy");
060  private static final byte[] row1 = Bytes.toBytes("r1");
061  private static final byte[] row2 = Bytes.toBytes("r2");
062  private static final byte[] row3 = Bytes.toBytes("r3");
063  private static final byte[] test = Bytes.toBytes("test");
064
065  @BeforeAll
066  public static void setUpBeforeClass() throws Exception {
067    // Stack up three coprocessors just so I can check bypass skips subsequent calls.
068    Configuration conf = HBaseConfiguration.create();
069    conf.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
070      new String[] { TestCoprocessor.class.getName(), TestCoprocessor2.class.getName(),
071        TestCoprocessor3.class.getName() });
072    util = new HBaseTestingUtil(conf);
073    util.startMiniCluster();
074  }
075
076  @AfterAll
077  public static void tearDownAfterClass() throws Exception {
078    util.shutdownMiniCluster();
079  }
080
081  @BeforeEach
082  public void setUp() throws Exception {
083    Admin admin = util.getAdmin();
084    if (admin.tableExists(tableName)) {
085      if (admin.isTableEnabled(tableName)) {
086        admin.disableTable(tableName);
087      }
088      admin.deleteTable(tableName);
089    }
090    util.createTable(tableName, new byte[][] { dummy, test });
091    TestCoprocessor.PREPUT_BYPASSES.set(0);
092    TestCoprocessor.PREPUT_INVOCATIONS.set(0);
093  }
094
095  /**
096   * do a single put that is bypassed by a RegionObserver
097   */
098  @Test
099  public void testSimple() throws Exception {
100    Table t = util.getConnection().getTable(tableName);
101    Put p = new Put(row1);
102    p.addColumn(test, dummy, dummy);
103    // before HBASE-4331, this would throw an exception
104    t.put(p);
105    checkRowAndDelete(t, row1, 0);
106    t.close();
107  }
108
109  /**
110   * Test various multiput operations. If the column family is 'test', then bypass is invoked.
111   */
112  @Test
113  public void testMulti() throws Exception {
114    // ensure that server time increments every time we do an operation, otherwise
115    // previous deletes will eclipse successive puts having the same timestamp
116    EnvironmentEdgeManagerTestHelper.injectEdge(new IncrementingEnvironmentEdge());
117
118    Table t = util.getConnection().getTable(tableName);
119    List<Put> puts = new ArrayList<>();
120    Put p = new Put(row1);
121    p.addColumn(dummy, dummy, dummy);
122    puts.add(p);
123    p = new Put(row2);
124    p.addColumn(test, dummy, dummy);
125    puts.add(p);
126    p = new Put(row3);
127    p.addColumn(test, dummy, dummy);
128    puts.add(p);
129    // before HBASE-4331, this would throw an exception
130    t.put(puts);
131    checkRowAndDelete(t, row1, 1);
132    checkRowAndDelete(t, row2, 0);
133    checkRowAndDelete(t, row3, 0);
134
135    puts.clear();
136    p = new Put(row1);
137    p.addColumn(test, dummy, dummy);
138    puts.add(p);
139    p = new Put(row2);
140    p.addColumn(test, dummy, dummy);
141    puts.add(p);
142    p = new Put(row3);
143    p.addColumn(test, dummy, dummy);
144    puts.add(p);
145    // before HBASE-4331, this would throw an exception
146    t.put(puts);
147    checkRowAndDelete(t, row1, 0);
148    checkRowAndDelete(t, row2, 0);
149    checkRowAndDelete(t, row3, 0);
150
151    puts.clear();
152    p = new Put(row1);
153    p.addColumn(test, dummy, dummy);
154    puts.add(p);
155    p = new Put(row2);
156    p.addColumn(test, dummy, dummy);
157    puts.add(p);
158    p = new Put(row3);
159    p.addColumn(dummy, dummy, dummy);
160    puts.add(p);
161    // this worked fine even before HBASE-4331
162    t.put(puts);
163    checkRowAndDelete(t, row1, 0);
164    checkRowAndDelete(t, row2, 0);
165    checkRowAndDelete(t, row3, 1);
166
167    puts.clear();
168    p = new Put(row1);
169    p.addColumn(dummy, dummy, dummy);
170    puts.add(p);
171    p = new Put(row2);
172    p.addColumn(test, dummy, dummy);
173    puts.add(p);
174    p = new Put(row3);
175    p.addColumn(dummy, dummy, dummy);
176    puts.add(p);
177    // this worked fine even before HBASE-4331
178    t.put(puts);
179    checkRowAndDelete(t, row1, 1);
180    checkRowAndDelete(t, row2, 0);
181    checkRowAndDelete(t, row3, 1);
182
183    puts.clear();
184    p = new Put(row1);
185    p.addColumn(test, dummy, dummy);
186    puts.add(p);
187    p = new Put(row2);
188    p.addColumn(dummy, dummy, dummy);
189    puts.add(p);
190    p = new Put(row3);
191    p.addColumn(test, dummy, dummy);
192    puts.add(p);
193    // before HBASE-4331, this would throw an exception
194    t.put(puts);
195    checkRowAndDelete(t, row1, 0);
196    checkRowAndDelete(t, row2, 1);
197    checkRowAndDelete(t, row3, 0);
198    t.close();
199
200    EnvironmentEdgeManager.reset();
201  }
202
203  private void checkRowAndDelete(Table t, byte[] row, int count) throws IOException {
204    Get g = new Get(row);
205    Result r = t.get(g);
206    assertEquals(count, r.size());
207    Delete d = new Delete(row);
208    t.delete(d);
209  }
210
211  /**
212   * Test that when bypass is called, we skip out calling any other coprocessors stacked up method,
213   * in this case, a prePut. If the column family is 'test', then bypass is invoked.
214   */
215  @Test
216  public void testBypassAlsoCompletes() throws IOException {
217    // ensure that server time increments every time we do an operation, otherwise
218    // previous deletes will eclipse successive puts having the same timestamp
219    EnvironmentEdgeManagerTestHelper.injectEdge(new IncrementingEnvironmentEdge());
220
221    Table t = util.getConnection().getTable(tableName);
222    List<Put> puts = new ArrayList<>();
223    Put p = new Put(row1);
224    p.addColumn(dummy, dummy, dummy);
225    puts.add(p);
226    p = new Put(row2);
227    p.addColumn(test, dummy, dummy);
228    puts.add(p);
229    p = new Put(row3);
230    p.addColumn(test, dummy, dummy);
231    puts.add(p);
232    t.put(puts);
233    // Ensure expected result.
234    checkRowAndDelete(t, row1, 1);
235    checkRowAndDelete(t, row2, 0);
236    checkRowAndDelete(t, row3, 0);
237    // We have three Coprocessors stacked up on the prePut. See the beforeClass setup. We did three
238    // puts above two of which bypassed. A bypass means do not call the other coprocessors in the
239    // stack so for the two 'test' calls in the above, we should not have call through to all all
240    // three coprocessors in the chain. So we should have:
241    // 3 invocations for first put + 1 invocation + 1 bypass for second put + 1 invocation +
242    // 1 bypass for the last put. Assert.
243    assertEquals(5, TestCoprocessor.PREPUT_INVOCATIONS.get(), "Total CP invocation count");
244    assertEquals(2, TestCoprocessor.PREPUT_BYPASSES.get(), "Total CP bypasses");
245  }
246
247  public static class TestCoprocessor implements RegionCoprocessor, RegionObserver {
248    static AtomicInteger PREPUT_INVOCATIONS = new AtomicInteger(0);
249    static AtomicInteger PREPUT_BYPASSES = new AtomicInteger(0);
250
251    @Override
252    public Optional<RegionObserver> getRegionObserver() {
253      return Optional.of(this);
254    }
255
256    @Override
257    public void prePut(final ObserverContext<? extends RegionCoprocessorEnvironment> e,
258      final Put put, final WALEdit edit, final Durability durability) throws IOException {
259      PREPUT_INVOCATIONS.incrementAndGet();
260      Map<byte[], List<Cell>> familyMap = put.getFamilyCellMap();
261      if (familyMap.containsKey(test)) {
262        PREPUT_BYPASSES.incrementAndGet();
263        e.bypass();
264      }
265    }
266  }
267
268  /**
269   * Calls through to TestCoprocessor.
270   */
271  public static class TestCoprocessor2 extends TestRegionObserverBypass.TestCoprocessor {
272  }
273
274  /**
275   * Calls through to TestCoprocessor.
276   */
277  public static class TestCoprocessor3 extends TestRegionObserverBypass.TestCoprocessor {
278  }
279}