001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor;
019
020import static junit.framework.Assert.assertEquals;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.List;
025import java.util.Map;
026import java.util.Optional;
027import java.util.concurrent.atomic.AtomicInteger;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.Cell;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseConfiguration;
032import org.apache.hadoop.hbase.HBaseTestingUtil;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.client.Admin;
035import org.apache.hadoop.hbase.client.Delete;
036import org.apache.hadoop.hbase.client.Durability;
037import org.apache.hadoop.hbase.client.Get;
038import org.apache.hadoop.hbase.client.Put;
039import org.apache.hadoop.hbase.client.Result;
040import org.apache.hadoop.hbase.client.Table;
041import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
042import org.apache.hadoop.hbase.testclassification.MediumTests;
043import org.apache.hadoop.hbase.util.Bytes;
044import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
045import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
046import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
047import org.apache.hadoop.hbase.wal.WALEdit;
048import org.junit.AfterClass;
049import org.junit.Before;
050import org.junit.BeforeClass;
051import org.junit.ClassRule;
052import org.junit.Test;
053import org.junit.experimental.categories.Category;
054
055@Category({CoprocessorTests.class, MediumTests.class})
056public class TestRegionObserverBypass {
057
058  @ClassRule
059  public static final HBaseClassTestRule CLASS_RULE =
060      HBaseClassTestRule.forClass(TestRegionObserverBypass.class);
061
062  private static HBaseTestingUtil util;
063  private static final TableName tableName = TableName.valueOf("test");
064  private static final byte[] dummy = Bytes.toBytes("dummy");
065  private static final byte[] row1 = Bytes.toBytes("r1");
066  private static final byte[] row2 = Bytes.toBytes("r2");
067  private static final byte[] row3 = Bytes.toBytes("r3");
068  private static final byte[] test = Bytes.toBytes("test");
069
070  @BeforeClass
071  public static void setUpBeforeClass() throws Exception {
072    // Stack up three coprocessors just so I can check bypass skips subsequent calls.
073    Configuration conf = HBaseConfiguration.create();
074    conf.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
075        new String [] {TestCoprocessor.class.getName(),
076          TestCoprocessor2.class.getName(),
077          TestCoprocessor3.class.getName()});
078    util = new HBaseTestingUtil(conf);
079    util.startMiniCluster();
080  }
081
082  @AfterClass
083  public static void tearDownAfterClass() throws Exception {
084    util.shutdownMiniCluster();
085  }
086
087  @Before
088  public void setUp() throws Exception {
089    Admin admin = util.getAdmin();
090    if (admin.tableExists(tableName)) {
091      if (admin.isTableEnabled(tableName)) {
092        admin.disableTable(tableName);
093      }
094      admin.deleteTable(tableName);
095    }
096    util.createTable(tableName, new byte[][] {dummy, test});
097    TestCoprocessor.PREPUT_BYPASSES.set(0);
098    TestCoprocessor.PREPUT_INVOCATIONS.set(0);
099  }
100
101  /**
102   * do a single put that is bypassed by a RegionObserver
103   * @throws Exception
104   */
105  @Test
106  public void testSimple() throws Exception {
107    Table t = util.getConnection().getTable(tableName);
108    Put p = new Put(row1);
109    p.addColumn(test, dummy, dummy);
110    // before HBASE-4331, this would throw an exception
111    t.put(p);
112    checkRowAndDelete(t,row1,0);
113    t.close();
114  }
115
116  /**
117   * Test various multiput operations.
118   * If the column family is 'test', then bypass is invoked.
119   * @throws Exception
120   */
121  @Test
122  public void testMulti() throws Exception {
123    //ensure that server time increments every time we do an operation, otherwise
124    //previous deletes will eclipse successive puts having the same timestamp
125    EnvironmentEdgeManagerTestHelper.injectEdge(new IncrementingEnvironmentEdge());
126
127    Table t = util.getConnection().getTable(tableName);
128    List<Put> puts = new ArrayList<>();
129    Put p = new Put(row1);
130    p.addColumn(dummy, dummy, dummy);
131    puts.add(p);
132    p = new Put(row2);
133    p.addColumn(test, dummy, dummy);
134    puts.add(p);
135    p = new Put(row3);
136    p.addColumn(test, dummy, dummy);
137    puts.add(p);
138    // before HBASE-4331, this would throw an exception
139    t.put(puts);
140    checkRowAndDelete(t,row1,1);
141    checkRowAndDelete(t,row2,0);
142    checkRowAndDelete(t,row3,0);
143
144    puts.clear();
145    p = new Put(row1);
146    p.addColumn(test, dummy, dummy);
147    puts.add(p);
148    p = new Put(row2);
149    p.addColumn(test, dummy, dummy);
150    puts.add(p);
151    p = new Put(row3);
152    p.addColumn(test, dummy, dummy);
153    puts.add(p);
154    // before HBASE-4331, this would throw an exception
155    t.put(puts);
156    checkRowAndDelete(t,row1,0);
157    checkRowAndDelete(t,row2,0);
158    checkRowAndDelete(t,row3,0);
159
160    puts.clear();
161    p = new Put(row1);
162    p.addColumn(test, dummy, dummy);
163    puts.add(p);
164    p = new Put(row2);
165    p.addColumn(test, dummy, dummy);
166    puts.add(p);
167    p = new Put(row3);
168    p.addColumn(dummy, dummy, dummy);
169    puts.add(p);
170    // this worked fine even before HBASE-4331
171    t.put(puts);
172    checkRowAndDelete(t,row1,0);
173    checkRowAndDelete(t,row2,0);
174    checkRowAndDelete(t,row3,1);
175
176    puts.clear();
177    p = new Put(row1);
178    p.addColumn(dummy, dummy, dummy);
179    puts.add(p);
180    p = new Put(row2);
181    p.addColumn(test, dummy, dummy);
182    puts.add(p);
183    p = new Put(row3);
184    p.addColumn(dummy, dummy, dummy);
185    puts.add(p);
186    // this worked fine even before HBASE-4331
187    t.put(puts);
188    checkRowAndDelete(t,row1,1);
189    checkRowAndDelete(t,row2,0);
190    checkRowAndDelete(t,row3,1);
191
192    puts.clear();
193    p = new Put(row1);
194    p.addColumn(test, dummy, dummy);
195    puts.add(p);
196    p = new Put(row2);
197    p.addColumn(dummy, dummy, dummy);
198    puts.add(p);
199    p = new Put(row3);
200    p.addColumn(test, dummy, dummy);
201    puts.add(p);
202    // before HBASE-4331, this would throw an exception
203    t.put(puts);
204    checkRowAndDelete(t,row1,0);
205    checkRowAndDelete(t,row2,1);
206    checkRowAndDelete(t,row3,0);
207    t.close();
208
209    EnvironmentEdgeManager.reset();
210  }
211
212  private void checkRowAndDelete(Table t, byte[] row, int count) throws IOException {
213    Get g = new Get(row);
214    Result r = t.get(g);
215    assertEquals(count, r.size());
216    Delete d = new Delete(row);
217    t.delete(d);
218  }
219
220  /**
221   * Test that when bypass is called, we skip out calling any other coprocessors stacked up method,
222   * in this case, a prePut.
223   * If the column family is 'test', then bypass is invoked.
224   */
225  @Test
226  public void testBypassAlsoCompletes() throws IOException {
227    //ensure that server time increments every time we do an operation, otherwise
228    //previous deletes will eclipse successive puts having the same timestamp
229    EnvironmentEdgeManagerTestHelper.injectEdge(new IncrementingEnvironmentEdge());
230
231    Table t = util.getConnection().getTable(tableName);
232    List<Put> puts = new ArrayList<>();
233    Put p = new Put(row1);
234    p.addColumn(dummy, dummy, dummy);
235    puts.add(p);
236    p = new Put(row2);
237    p.addColumn(test, dummy, dummy);
238    puts.add(p);
239    p = new Put(row3);
240    p.addColumn(test, dummy, dummy);
241    puts.add(p);
242    t.put(puts);
243    // Ensure expected result.
244    checkRowAndDelete(t,row1,1);
245    checkRowAndDelete(t,row2,0);
246    checkRowAndDelete(t,row3,0);
247    // We have three Coprocessors stacked up on the prePut. See the beforeClass setup. We did three
248    // puts above two of which bypassed. A bypass means do not call the other coprocessors in the
249    // stack so for the two 'test' calls in the above, we should not have call through to all all
250    // three coprocessors in the chain. So we should have:
251    // 3 invocations for first put + 1 invocation + 1 bypass for second put + 1 invocation +
252    // 1 bypass for the last put. Assert.
253    assertEquals("Total CP invocation count", 5, TestCoprocessor.PREPUT_INVOCATIONS.get());
254    assertEquals("Total CP bypasses", 2, TestCoprocessor.PREPUT_BYPASSES.get());
255  }
256
257
258  public static class TestCoprocessor implements RegionCoprocessor, RegionObserver {
259    static AtomicInteger PREPUT_INVOCATIONS = new AtomicInteger(0);
260    static AtomicInteger PREPUT_BYPASSES = new AtomicInteger(0);
261
262    @Override
263    public Optional<RegionObserver> getRegionObserver() {
264      return Optional.of(this);
265    }
266
267    @Override
268    public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e,
269        final Put put, final WALEdit edit, final Durability durability)
270        throws IOException {
271      PREPUT_INVOCATIONS.incrementAndGet();
272      Map<byte[], List<Cell>> familyMap = put.getFamilyCellMap();
273      if (familyMap.containsKey(test)) {
274        PREPUT_BYPASSES.incrementAndGet();
275        e.bypass();
276      }
277    }
278  }
279
280  /**
281   * Calls through to TestCoprocessor.
282   */
283  public static class TestCoprocessor2 extends TestRegionObserverBypass.TestCoprocessor {}
284
285  /**
286   * Calls through to TestCoprocessor.
287   */
288  public static class TestCoprocessor3 extends TestRegionObserverBypass.TestCoprocessor {}
289}