001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertTrue;
024
025import java.io.IOException;
026import java.lang.reflect.Method;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.List;
030import java.util.Optional;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.hbase.CellUtil;
035import org.apache.hadoop.hbase.CompareOperator;
036import org.apache.hadoop.hbase.Coprocessor;
037import org.apache.hadoop.hbase.ExtendedCell;
038import org.apache.hadoop.hbase.HBaseClassTestRule;
039import org.apache.hadoop.hbase.HBaseTestingUtil;
040import org.apache.hadoop.hbase.KeyValue;
041import org.apache.hadoop.hbase.ServerName;
042import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
043import org.apache.hadoop.hbase.TableName;
044import org.apache.hadoop.hbase.client.Admin;
045import org.apache.hadoop.hbase.client.Append;
046import org.apache.hadoop.hbase.client.CheckAndMutate;
047import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
048import org.apache.hadoop.hbase.client.Delete;
049import org.apache.hadoop.hbase.client.Durability;
050import org.apache.hadoop.hbase.client.Get;
051import org.apache.hadoop.hbase.client.Increment;
052import org.apache.hadoop.hbase.client.Mutation;
053import org.apache.hadoop.hbase.client.Put;
054import org.apache.hadoop.hbase.client.RegionInfo;
055import org.apache.hadoop.hbase.client.RegionLocator;
056import org.apache.hadoop.hbase.client.Result;
057import org.apache.hadoop.hbase.client.ResultScanner;
058import org.apache.hadoop.hbase.client.RowMutations;
059import org.apache.hadoop.hbase.client.Scan;
060import org.apache.hadoop.hbase.client.Table;
061import org.apache.hadoop.hbase.client.TableDescriptor;
062import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
063import org.apache.hadoop.hbase.filter.FilterAllFilter;
064import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
065import org.apache.hadoop.hbase.io.hfile.CacheConfig;
066import org.apache.hadoop.hbase.io.hfile.HFile;
067import org.apache.hadoop.hbase.io.hfile.HFileContext;
068import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
069import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
070import org.apache.hadoop.hbase.regionserver.HRegion;
071import org.apache.hadoop.hbase.regionserver.InternalScanner;
072import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
073import org.apache.hadoop.hbase.regionserver.ScanType;
074import org.apache.hadoop.hbase.regionserver.ScannerContext;
075import org.apache.hadoop.hbase.regionserver.Store;
076import org.apache.hadoop.hbase.regionserver.StoreFile;
077import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
078import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
079import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
080import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
081import org.apache.hadoop.hbase.testclassification.LargeTests;
082import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
083import org.apache.hadoop.hbase.util.Bytes;
084import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
085import org.apache.hadoop.hbase.util.JVMClusterUtil;
086import org.apache.hadoop.hbase.util.Threads;
087import org.apache.hadoop.hbase.wal.WALEdit;
088import org.apache.hadoop.hbase.wal.WALKey;
089import org.apache.hadoop.hbase.wal.WALKeyImpl;
090import org.junit.AfterClass;
091import org.junit.Assert;
092import org.junit.BeforeClass;
093import org.junit.ClassRule;
094import org.junit.Rule;
095import org.junit.Test;
096import org.junit.experimental.categories.Category;
097import org.junit.rules.TestName;
098import org.mockito.Mockito;
099import org.slf4j.Logger;
100import org.slf4j.LoggerFactory;
101
102import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
103
104@Category({ CoprocessorTests.class, LargeTests.class })
105public class TestRegionObserverInterface {
106
107  @ClassRule
108  public static final HBaseClassTestRule CLASS_RULE =
109    HBaseClassTestRule.forClass(TestRegionObserverInterface.class);
110
111  private static final Logger LOG = LoggerFactory.getLogger(TestRegionObserverInterface.class);
112
113  public static final TableName TEST_TABLE = TableName.valueOf("TestTable");
114  public static final byte[] FAMILY = Bytes.toBytes("f");
115  public final static byte[] A = Bytes.toBytes("a");
116  public final static byte[] B = Bytes.toBytes("b");
117  public final static byte[] C = Bytes.toBytes("c");
118  public final static byte[] ROW = Bytes.toBytes("testrow");
119
120  private static HBaseTestingUtil util = new HBaseTestingUtil();
121  private static SingleProcessHBaseCluster cluster = null;
122
123  @Rule
124  public TestName name = new TestName();
125
126  @BeforeClass
127  public static void setupBeforeClass() throws Exception {
128    // set configure to indicate which cp should be loaded
129    Configuration conf = util.getConfiguration();
130    conf.setBoolean("hbase.master.distributed.log.replay", true);
131    conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
132      SimpleRegionObserver.class.getName());
133
134    util.startMiniCluster();
135    cluster = util.getMiniHBaseCluster();
136  }
137
138  @AfterClass
139  public static void tearDownAfterClass() throws Exception {
140    util.shutdownMiniCluster();
141  }
142
143  @Test
144  public void testRegionObserver() throws IOException {
145    final TableName tableName =
146      TableName.valueOf(TEST_TABLE.getNameAsString() + "." + name.getMethodName());
147    // recreate table every time in order to reset the status of the
148    // coprocessor.
149    Table table = util.createTable(tableName, new byte[][] { A, B, C });
150    try {
151      verifyMethodResult(SimpleRegionObserver.class,
152        new String[] { "hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", "hadDelete",
153          "hadPostStartRegionOperation", "hadPostCloseRegionOperation",
154          "hadPostBatchMutateIndispensably" },
155        tableName, new Boolean[] { false, false, false, false, false, false, false, false });
156
157      Put put = new Put(ROW);
158      put.addColumn(A, A, A);
159      put.addColumn(B, B, B);
160      put.addColumn(C, C, C);
161      table.put(put);
162
163      verifyMethodResult(SimpleRegionObserver.class,
164        new String[] { "hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", "hadPreBatchMutate",
165          "hadPostBatchMutate", "hadDelete", "hadPostStartRegionOperation",
166          "hadPostCloseRegionOperation", "hadPostBatchMutateIndispensably" },
167        TEST_TABLE,
168        new Boolean[] { false, false, true, true, true, true, false, true, true, true });
169
170      verifyMethodResult(SimpleRegionObserver.class,
171        new String[] { "getCtPreOpen", "getCtPostOpen", "getCtPreClose", "getCtPostClose" },
172        tableName, new Integer[] { 1, 1, 0, 0 });
173
174      Get get = new Get(ROW);
175      get.addColumn(A, A);
176      get.addColumn(B, B);
177      get.addColumn(C, C);
178      table.get(get);
179
180      verifyMethodResult(SimpleRegionObserver.class,
181        new String[] { "hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", "hadDelete",
182          "hadPrePreparedDeleteTS" },
183        tableName, new Boolean[] { true, true, true, true, false, false });
184
185      Delete delete = new Delete(ROW);
186      delete.addColumn(A, A);
187      delete.addColumn(B, B);
188      delete.addColumn(C, C);
189      table.delete(delete);
190
191      verifyMethodResult(SimpleRegionObserver.class,
192        new String[] { "hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", "hadPreBatchMutate",
193          "hadPostBatchMutate", "hadDelete", "hadPrePreparedDeleteTS" },
194        tableName, new Boolean[] { true, true, true, true, true, true, true, true });
195    } finally {
196      util.deleteTable(tableName);
197      table.close();
198    }
199    verifyMethodResult(SimpleRegionObserver.class,
200      new String[] { "getCtPreOpen", "getCtPostOpen", "getCtPreClose", "getCtPostClose" },
201      tableName, new Integer[] { 1, 1, 1, 1 });
202  }
203
204  @Test
205  public void testRowMutation() throws IOException {
206    final TableName tableName =
207      TableName.valueOf(TEST_TABLE.getNameAsString() + "." + name.getMethodName());
208    Table table = util.createTable(tableName, new byte[][] { A, B, C });
209    try {
210      verifyMethodResult(SimpleRegionObserver.class,
211        new String[] { "hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", "hadDeleted" },
212        tableName, new Boolean[] { false, false, false, false, false });
213      Put put = new Put(ROW);
214      put.addColumn(A, A, A);
215      put.addColumn(B, B, B);
216      put.addColumn(C, C, C);
217
218      Delete delete = new Delete(ROW);
219      delete.addColumn(A, A);
220      delete.addColumn(B, B);
221      delete.addColumn(C, C);
222
223      RowMutations arm = new RowMutations(ROW);
224      arm.add(put);
225      arm.add(delete);
226      table.mutateRow(arm);
227
228      verifyMethodResult(SimpleRegionObserver.class,
229        new String[] { "hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", "hadDeleted" },
230        tableName, new Boolean[] { false, false, true, true, true });
231    } finally {
232      util.deleteTable(tableName);
233      table.close();
234    }
235  }
236
237  @Test
238  public void testIncrementHook() throws IOException {
239    final TableName tableName =
240      TableName.valueOf(TEST_TABLE.getNameAsString() + "." + name.getMethodName());
241    Table table = util.createTable(tableName, new byte[][] { A, B, C });
242    try {
243      Increment inc = new Increment(Bytes.toBytes(0));
244      inc.addColumn(A, A, 1);
245
246      verifyMethodResult(SimpleRegionObserver.class,
247        new String[] { "hadPreIncrement", "hadPostIncrement", "hadPreIncrementAfterRowLock",
248          "hadPreBatchMutate", "hadPostBatchMutate", "hadPostBatchMutateIndispensably" },
249        tableName, new Boolean[] { false, false, false, false, false, false });
250
251      table.increment(inc);
252
253      verifyMethodResult(SimpleRegionObserver.class,
254        new String[] { "hadPreIncrement", "hadPostIncrement", "hadPreIncrementAfterRowLock",
255          "hadPreBatchMutate", "hadPostBatchMutate", "hadPostBatchMutateIndispensably" },
256        tableName, new Boolean[] { true, true, true, true, true, true });
257    } finally {
258      util.deleteTable(tableName);
259      table.close();
260    }
261  }
262
263  @Test
264  public void testCheckAndPutHooks() throws IOException {
265    final TableName tableName =
266      TableName.valueOf(TEST_TABLE.getNameAsString() + "." + name.getMethodName());
267    try (Table table = util.createTable(tableName, new byte[][] { A, B, C })) {
268      Put p = new Put(Bytes.toBytes(0));
269      p.addColumn(A, A, A);
270      table.put(p);
271      p = new Put(Bytes.toBytes(0));
272      p.addColumn(A, A, A);
273      verifyMethodResult(SimpleRegionObserver.class,
274        new String[] { "getPreCheckAndPut", "getPreCheckAndPutAfterRowLock", "getPostCheckAndPut",
275          "getPreCheckAndPutWithFilter", "getPreCheckAndPutWithFilterAfterRowLock",
276          "getPostCheckAndPutWithFilter", "getPreCheckAndMutate",
277          "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
278        tableName, new Integer[] { 0, 0, 0, 0, 0, 0, 0, 0, 0 });
279
280      table.checkAndMutate(Bytes.toBytes(0), A).qualifier(A).ifEquals(A).thenPut(p);
281      verifyMethodResult(SimpleRegionObserver.class,
282        new String[] { "getPreCheckAndPut", "getPreCheckAndPutAfterRowLock", "getPostCheckAndPut",
283          "getPreCheckAndPutWithFilter", "getPreCheckAndPutWithFilterAfterRowLock",
284          "getPostCheckAndPutWithFilter", "getPreCheckAndMutate",
285          "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
286        tableName, new Integer[] { 1, 1, 1, 0, 0, 0, 1, 1, 1 });
287
288      table.checkAndMutate(Bytes.toBytes(0),
289        new SingleColumnValueFilter(A, A, CompareOperator.EQUAL, A)).thenPut(p);
290      verifyMethodResult(SimpleRegionObserver.class,
291        new String[] { "getPreCheckAndPut", "getPreCheckAndPutAfterRowLock", "getPostCheckAndPut",
292          "getPreCheckAndPutWithFilter", "getPreCheckAndPutWithFilterAfterRowLock",
293          "getPostCheckAndPutWithFilter", "getPreCheckAndMutate",
294          "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
295        tableName, new Integer[] { 1, 1, 1, 1, 1, 1, 2, 2, 2 });
296    } finally {
297      util.deleteTable(tableName);
298    }
299  }
300
301  @Test
302  public void testCheckAndDeleteHooks() throws IOException {
303    final TableName tableName =
304      TableName.valueOf(TEST_TABLE.getNameAsString() + "." + name.getMethodName());
305    Table table = util.createTable(tableName, new byte[][] { A, B, C });
306    try {
307      Put p = new Put(Bytes.toBytes(0));
308      p.addColumn(A, A, A);
309      table.put(p);
310      Delete d = new Delete(Bytes.toBytes(0));
311      table.delete(d);
312      verifyMethodResult(SimpleRegionObserver.class,
313        new String[] { "getPreCheckAndDelete", "getPreCheckAndDeleteAfterRowLock",
314          "getPostCheckAndDelete", "getPreCheckAndDeleteWithFilter",
315          "getPreCheckAndDeleteWithFilterAfterRowLock", "getPostCheckAndDeleteWithFilter",
316          "getPreCheckAndMutate", "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
317        tableName, new Integer[] { 0, 0, 0, 0, 0, 0, 0, 0, 0 });
318
319      table.checkAndMutate(Bytes.toBytes(0), A).qualifier(A).ifEquals(A).thenDelete(d);
320      verifyMethodResult(SimpleRegionObserver.class,
321        new String[] { "getPreCheckAndDelete", "getPreCheckAndDeleteAfterRowLock",
322          "getPostCheckAndDelete", "getPreCheckAndDeleteWithFilter",
323          "getPreCheckAndDeleteWithFilterAfterRowLock", "getPostCheckAndDeleteWithFilter",
324          "getPreCheckAndMutate", "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
325        tableName, new Integer[] { 1, 1, 1, 0, 0, 0, 1, 1, 1 });
326
327      table.checkAndMutate(Bytes.toBytes(0),
328        new SingleColumnValueFilter(A, A, CompareOperator.EQUAL, A)).thenDelete(d);
329      verifyMethodResult(SimpleRegionObserver.class,
330        new String[] { "getPreCheckAndDelete", "getPreCheckAndDeleteAfterRowLock",
331          "getPostCheckAndDelete", "getPreCheckAndDeleteWithFilter",
332          "getPreCheckAndDeleteWithFilterAfterRowLock", "getPostCheckAndDeleteWithFilter",
333          "getPreCheckAndMutate", "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
334        tableName, new Integer[] { 1, 1, 1, 1, 1, 1, 2, 2, 2 });
335    } finally {
336      util.deleteTable(tableName);
337      table.close();
338    }
339  }
340
341  @Test
342  public void testCheckAndIncrementHooks() throws Exception {
343    final TableName tableName =
344      TableName.valueOf(TEST_TABLE.getNameAsString() + "." + name.getMethodName());
345    Table table = util.createTable(tableName, new byte[][] { A, B, C });
346    try {
347      byte[] row = Bytes.toBytes(0);
348
349      verifyMethodResult(
350        SimpleRegionObserver.class, new String[] { "getPreCheckAndMutate",
351          "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
352        tableName, new Integer[] { 0, 0, 0 });
353
354      table.checkAndMutate(CheckAndMutate.newBuilder(row).ifNotExists(A, A)
355        .build(new Increment(row).addColumn(A, A, 1)));
356      verifyMethodResult(
357        SimpleRegionObserver.class, new String[] { "getPreCheckAndMutate",
358          "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
359        tableName, new Integer[] { 1, 1, 1 });
360
361      table.checkAndMutate(CheckAndMutate.newBuilder(row).ifEquals(A, A, Bytes.toBytes(1L))
362        .build(new Increment(row).addColumn(A, A, 1)));
363      verifyMethodResult(
364        SimpleRegionObserver.class, new String[] { "getPreCheckAndMutate",
365          "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
366        tableName, new Integer[] { 2, 2, 2 });
367    } finally {
368      util.deleteTable(tableName);
369      table.close();
370    }
371  }
372
373  @Test
374  public void testCheckAndAppendHooks() throws Exception {
375    final TableName tableName =
376      TableName.valueOf(TEST_TABLE.getNameAsString() + "." + name.getMethodName());
377    Table table = util.createTable(tableName, new byte[][] { A, B, C });
378    try {
379      byte[] row = Bytes.toBytes(0);
380
381      verifyMethodResult(
382        SimpleRegionObserver.class, new String[] { "getPreCheckAndMutate",
383          "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
384        tableName, new Integer[] { 0, 0, 0 });
385
386      table.checkAndMutate(
387        CheckAndMutate.newBuilder(row).ifNotExists(A, A).build(new Append(row).addColumn(A, A, A)));
388      verifyMethodResult(
389        SimpleRegionObserver.class, new String[] { "getPreCheckAndMutate",
390          "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
391        tableName, new Integer[] { 1, 1, 1 });
392
393      table.checkAndMutate(
394        CheckAndMutate.newBuilder(row).ifEquals(A, A, A).build(new Append(row).addColumn(A, A, A)));
395      verifyMethodResult(
396        SimpleRegionObserver.class, new String[] { "getPreCheckAndMutate",
397          "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
398        tableName, new Integer[] { 2, 2, 2 });
399    } finally {
400      util.deleteTable(tableName);
401      table.close();
402    }
403  }
404
405  @Test
406  public void testCheckAndRowMutationsHooks() throws Exception {
407    final TableName tableName =
408      TableName.valueOf(TEST_TABLE.getNameAsString() + "." + name.getMethodName());
409    Table table = util.createTable(tableName, new byte[][] { A, B, C });
410    try {
411      byte[] row = Bytes.toBytes(0);
412
413      Put p = new Put(row).addColumn(A, A, A);
414      table.put(p);
415      verifyMethodResult(
416        SimpleRegionObserver.class, new String[] { "getPreCheckAndMutate",
417          "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
418        tableName, new Integer[] { 0, 0, 0 });
419
420      table
421        .checkAndMutate(CheckAndMutate.newBuilder(row).ifEquals(A, A, A).build(new RowMutations(row)
422          .add((Mutation) new Put(row).addColumn(B, B, B)).add((Mutation) new Delete(row))));
423      verifyMethodResult(
424        SimpleRegionObserver.class, new String[] { "getPreCheckAndMutate",
425          "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
426        tableName, new Integer[] { 1, 1, 1 });
427
428      Object[] result = new Object[2];
429      table.batch(
430        Arrays.asList(p,
431          CheckAndMutate.newBuilder(row).ifEquals(A, A, A).build(new RowMutations(row)
432            .add((Mutation) new Put(row).addColumn(B, B, B)).add((Mutation) new Delete(row)))),
433        result);
434      verifyMethodResult(
435        SimpleRegionObserver.class, new String[] { "getPreCheckAndMutate",
436          "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
437        tableName, new Integer[] { 2, 2, 2 });
438    } finally {
439      util.deleteTable(tableName);
440      table.close();
441    }
442  }
443
444  @Test
445  public void testAppendHook() throws IOException {
446    final TableName tableName =
447      TableName.valueOf(TEST_TABLE.getNameAsString() + "." + name.getMethodName());
448    Table table = util.createTable(tableName, new byte[][] { A, B, C });
449    try {
450      Append app = new Append(Bytes.toBytes(0));
451      app.addColumn(A, A, A);
452
453      verifyMethodResult(SimpleRegionObserver.class,
454        new String[] { "hadPreAppend", "hadPostAppend", "hadPreAppendAfterRowLock",
455          "hadPreBatchMutate", "hadPostBatchMutate", "hadPostBatchMutateIndispensably" },
456        tableName, new Boolean[] { false, false, false, false, false, false });
457
458      table.append(app);
459
460      verifyMethodResult(SimpleRegionObserver.class,
461        new String[] { "hadPreAppend", "hadPostAppend", "hadPreAppendAfterRowLock",
462          "hadPreBatchMutate", "hadPostBatchMutate", "hadPostBatchMutateIndispensably" },
463        tableName, new Boolean[] { true, true, true, true, true, true });
464    } finally {
465      util.deleteTable(tableName);
466      table.close();
467    }
468  }
469
470  @Test
471  // HBase-3583
472  public void testHBase3583() throws IOException {
473    final TableName tableName = TableName.valueOf(name.getMethodName());
474    util.createTable(tableName, new byte[][] { A, B, C });
475    util.waitUntilAllRegionsAssigned(tableName);
476
477    verifyMethodResult(SimpleRegionObserver.class,
478      new String[] { "hadPreGet", "hadPostGet", "wasScannerNextCalled", "wasScannerCloseCalled" },
479      tableName, new Boolean[] { false, false, false, false });
480
481    Table table = util.getConnection().getTable(tableName);
482    Put put = new Put(ROW);
483    put.addColumn(A, A, A);
484    table.put(put);
485
486    Get get = new Get(ROW);
487    get.addColumn(A, A);
488    table.get(get);
489
490    // verify that scannerNext and scannerClose upcalls won't be invoked
491    // when we perform get().
492    verifyMethodResult(SimpleRegionObserver.class,
493      new String[] { "hadPreGet", "hadPostGet", "wasScannerNextCalled", "wasScannerCloseCalled" },
494      tableName, new Boolean[] { true, true, false, false });
495
496    Scan s = new Scan();
497    ResultScanner scanner = table.getScanner(s);
498    try {
499      for (Result rr = scanner.next(); rr != null; rr = scanner.next()) {
500      }
501    } finally {
502      scanner.close();
503    }
504
505    // now scanner hooks should be invoked.
506    verifyMethodResult(SimpleRegionObserver.class,
507      new String[] { "wasScannerNextCalled", "wasScannerCloseCalled" }, tableName,
508      new Boolean[] { true, true });
509    util.deleteTable(tableName);
510    table.close();
511  }
512
513  @Test
514  public void testHBASE14489() throws IOException {
515    final TableName tableName = TableName.valueOf(name.getMethodName());
516    Table table = util.createTable(tableName, new byte[][] { A });
517    Put put = new Put(ROW);
518    put.addColumn(A, A, A);
519    table.put(put);
520
521    Scan s = new Scan();
522    s.setFilter(new FilterAllFilter());
523    ResultScanner scanner = table.getScanner(s);
524    try {
525      for (Result rr = scanner.next(); rr != null; rr = scanner.next()) {
526      }
527    } finally {
528      scanner.close();
529    }
530    verifyMethodResult(SimpleRegionObserver.class, new String[] { "wasScannerFilterRowCalled" },
531      tableName, new Boolean[] { true });
532    util.deleteTable(tableName);
533    table.close();
534
535  }
536
537  @Test
538  // HBase-3758
539  public void testHBase3758() throws IOException {
540    final TableName tableName = TableName.valueOf(name.getMethodName());
541    util.createTable(tableName, new byte[][] { A, B, C });
542
543    verifyMethodResult(SimpleRegionObserver.class,
544      new String[] { "hadDeleted", "wasScannerOpenCalled" }, tableName,
545      new Boolean[] { false, false });
546
547    Table table = util.getConnection().getTable(tableName);
548    Put put = new Put(ROW);
549    put.addColumn(A, A, A);
550    table.put(put);
551
552    Delete delete = new Delete(ROW);
553    table.delete(delete);
554
555    verifyMethodResult(SimpleRegionObserver.class,
556      new String[] { "hadDeleted", "wasScannerOpenCalled" }, tableName,
557      new Boolean[] { true, false });
558
559    Scan s = new Scan();
560    ResultScanner scanner = table.getScanner(s);
561    try {
562      for (Result rr = scanner.next(); rr != null; rr = scanner.next()) {
563      }
564    } finally {
565      scanner.close();
566    }
567
568    // now scanner hooks should be invoked.
569    verifyMethodResult(SimpleRegionObserver.class, new String[] { "wasScannerOpenCalled" },
570      tableName, new Boolean[] { true });
571    util.deleteTable(tableName);
572    table.close();
573  }
574
575  /* Overrides compaction to only output rows with keys that are even numbers */
576  public static class EvenOnlyCompactor implements RegionCoprocessor, RegionObserver {
577    long lastCompaction;
578    long lastFlush;
579
580    @Override
581    public Optional<RegionObserver> getRegionObserver() {
582      return Optional.of(this);
583    }
584
585    @Override
586    public InternalScanner preCompact(ObserverContext<? extends RegionCoprocessorEnvironment> e,
587      Store store, InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
588      CompactionRequest request) {
589      return new InternalScanner() {
590
591        @Override
592        public boolean next(List<? super ExtendedCell> results, ScannerContext scannerContext)
593          throws IOException {
594          List<ExtendedCell> internalResults = new ArrayList<>();
595          boolean hasMore;
596          do {
597            hasMore = scanner.next(internalResults, scannerContext);
598            if (!internalResults.isEmpty()) {
599              long row = Bytes.toLong(CellUtil.cloneValue(internalResults.get(0)));
600              if (row % 2 == 0) {
601                // return this row
602                break;
603              }
604              // clear and continue
605              internalResults.clear();
606            }
607          } while (hasMore);
608
609          if (!internalResults.isEmpty()) {
610            results.addAll(internalResults);
611          }
612          return hasMore;
613        }
614
615        @Override
616        public void close() throws IOException {
617          scanner.close();
618        }
619      };
620    }
621
622    @Override
623    public void postCompact(ObserverContext<? extends RegionCoprocessorEnvironment> e, Store store,
624      StoreFile resultFile, CompactionLifeCycleTracker tracker, CompactionRequest request) {
625      lastCompaction = EnvironmentEdgeManager.currentTime();
626    }
627
628    @Override
629    public void postFlush(ObserverContext<? extends RegionCoprocessorEnvironment> e,
630      FlushLifeCycleTracker tracker) {
631      lastFlush = EnvironmentEdgeManager.currentTime();
632    }
633  }
634
635  /**
636   * Tests overriding compaction handling via coprocessor hooks
637   */
638  @Test
639  public void testCompactionOverride() throws Exception {
640    final TableName compactTable = TableName.valueOf(name.getMethodName());
641    Admin admin = util.getAdmin();
642    if (admin.tableExists(compactTable)) {
643      admin.disableTable(compactTable);
644      admin.deleteTable(compactTable);
645    }
646
647    TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(compactTable)
648      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(A))
649      .setCoprocessor(EvenOnlyCompactor.class.getName()).build();
650    admin.createTable(tableDescriptor);
651
652    Table table = util.getConnection().getTable(compactTable);
653    for (long i = 1; i <= 10; i++) {
654      byte[] iBytes = Bytes.toBytes(i);
655      Put put = new Put(iBytes);
656      put.setDurability(Durability.SKIP_WAL);
657      put.addColumn(A, A, iBytes);
658      table.put(put);
659    }
660
661    HRegion firstRegion = cluster.getRegions(compactTable).get(0);
662    Coprocessor cp = firstRegion.getCoprocessorHost().findCoprocessor(EvenOnlyCompactor.class);
663    assertNotNull("EvenOnlyCompactor coprocessor should be loaded", cp);
664    EvenOnlyCompactor compactor = (EvenOnlyCompactor) cp;
665
666    // force a compaction
667    long ts = EnvironmentEdgeManager.currentTime();
668    admin.flush(compactTable);
669    // wait for flush
670    for (int i = 0; i < 10; i++) {
671      if (compactor.lastFlush >= ts) {
672        break;
673      }
674      Thread.sleep(1000);
675    }
676    assertTrue("Flush didn't complete", compactor.lastFlush >= ts);
677    LOG.debug("Flush complete");
678
679    ts = compactor.lastFlush;
680    admin.majorCompact(compactTable);
681    // wait for compaction
682    for (int i = 0; i < 30; i++) {
683      if (compactor.lastCompaction >= ts) {
684        break;
685      }
686      Thread.sleep(1000);
687    }
688    LOG.debug("Last compaction was at " + compactor.lastCompaction);
689    assertTrue("Compaction didn't complete", compactor.lastCompaction >= ts);
690
691    // only even rows should remain
692    ResultScanner scanner = table.getScanner(new Scan());
693    try {
694      for (long i = 2; i <= 10; i += 2) {
695        Result r = scanner.next();
696        assertNotNull(r);
697        assertFalse(r.isEmpty());
698        byte[] iBytes = Bytes.toBytes(i);
699        assertArrayEquals("Row should be " + i, r.getRow(), iBytes);
700        assertArrayEquals("Value should be " + i, r.getValue(A, A), iBytes);
701      }
702    } finally {
703      scanner.close();
704    }
705    table.close();
706  }
707
708  @Test
709  public void bulkLoadHFileTest() throws Exception {
710    final String testName =
711      TestRegionObserverInterface.class.getName() + "." + name.getMethodName();
712    final TableName tableName =
713      TableName.valueOf(TEST_TABLE.getNameAsString() + "." + name.getMethodName());
714    Configuration conf = util.getConfiguration();
715    Table table = util.createTable(tableName, new byte[][] { A, B, C });
716    try (RegionLocator locator = util.getConnection().getRegionLocator(tableName)) {
717      verifyMethodResult(SimpleRegionObserver.class,
718        new String[] { "hadPreBulkLoadHFile", "hadPostBulkLoadHFile" }, tableName,
719        new Boolean[] { false, false });
720
721      FileSystem fs = util.getTestFileSystem();
722      final Path dir = util.getDataTestDirOnTestFS(testName).makeQualified(fs);
723      Path familyDir = new Path(dir, Bytes.toString(A));
724
725      createHFile(util.getConfiguration(), fs, new Path(familyDir, Bytes.toString(A)), A, A);
726
727      // Bulk load
728      BulkLoadHFiles.create(conf).bulkLoad(tableName, dir);
729
730      verifyMethodResult(SimpleRegionObserver.class,
731        new String[] { "hadPreBulkLoadHFile", "hadPostBulkLoadHFile" }, tableName,
732        new Boolean[] { true, true });
733    } finally {
734      util.deleteTable(tableName);
735      table.close();
736    }
737  }
738
739  @Test
740  public void testRecovery() throws Exception {
741    LOG.info(TestRegionObserverInterface.class.getName() + "." + name.getMethodName());
742    final TableName tableName =
743      TableName.valueOf(TEST_TABLE.getNameAsString() + "." + name.getMethodName());
744    Table table = util.createTable(tableName, new byte[][] { A, B, C });
745    try (RegionLocator locator = util.getConnection().getRegionLocator(tableName)) {
746
747      JVMClusterUtil.RegionServerThread rs1 = cluster.startRegionServer();
748      ServerName sn2 = rs1.getRegionServer().getServerName();
749      String regEN = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
750
751      util.getAdmin().move(Bytes.toBytes(regEN), sn2);
752      while (!sn2.equals(locator.getAllRegionLocations().get(0).getServerName())) {
753        Thread.sleep(100);
754      }
755
756      Put put = new Put(ROW);
757      put.addColumn(A, A, A);
758      put.addColumn(B, B, B);
759      put.addColumn(C, C, C);
760      table.put(put);
761
762      // put two times
763      table.put(put);
764
765      verifyMethodResult(SimpleRegionObserver.class,
766        new String[] { "hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", "hadPreBatchMutate",
767          "hadPostBatchMutate", "hadDelete" },
768        tableName, new Boolean[] { false, false, true, true, true, true, false });
769
770      verifyMethodResult(SimpleRegionObserver.class,
771        new String[] { "getCtPreReplayWALs", "getCtPostReplayWALs", "getCtPrePut", "getCtPostPut" },
772        tableName, new Integer[] { 0, 0, 2, 2 });
773
774      cluster.killRegionServer(rs1.getRegionServer().getServerName());
775      Threads.sleep(1000); // Let the kill soak in.
776      util.waitUntilAllRegionsAssigned(tableName);
777      LOG.info("All regions assigned");
778
779      verifyMethodResult(SimpleRegionObserver.class,
780        new String[] { "getCtPreReplayWALs", "getCtPostReplayWALs", "getCtPrePut", "getCtPostPut" },
781        tableName, new Integer[] { 1, 1, 0, 0 });
782    } finally {
783      util.deleteTable(tableName);
784      table.close();
785    }
786  }
787
788  // called from testPreWALAppendIsWrittenToWAL
789  private void testPreWALAppendHook(Table table, TableName tableName) throws IOException {
790    int expectedCalls = 0;
791    String[] methodArray = new String[1];
792    methodArray[0] = "getCtPreWALAppend";
793    Object[] resultArray = new Object[1];
794
795    Put p = new Put(ROW);
796    p.addColumn(A, A, A);
797    table.put(p);
798    resultArray[0] = ++expectedCalls;
799    verifyMethodResult(SimpleRegionObserver.class, methodArray, tableName, resultArray);
800
801    Append a = new Append(ROW);
802    a.addColumn(B, B, B);
803    table.append(a);
804    resultArray[0] = ++expectedCalls;
805    verifyMethodResult(SimpleRegionObserver.class, methodArray, tableName, resultArray);
806
807    Increment i = new Increment(ROW);
808    i.addColumn(C, C, 1);
809    table.increment(i);
810    resultArray[0] = ++expectedCalls;
811    verifyMethodResult(SimpleRegionObserver.class, methodArray, tableName, resultArray);
812
813    Delete d = new Delete(ROW);
814    table.delete(d);
815    resultArray[0] = ++expectedCalls;
816    verifyMethodResult(SimpleRegionObserver.class, methodArray, tableName, resultArray);
817  }
818
819  @Test
820  public void testPreWALAppend() throws Exception {
821    SimpleRegionObserver sro = new SimpleRegionObserver();
822    ObserverContext ctx = Mockito.mock(ObserverContext.class);
823    WALKey key =
824      new WALKeyImpl(Bytes.toBytes("region"), TEST_TABLE, EnvironmentEdgeManager.currentTime());
825    WALEdit edit = new WALEdit();
826    sro.preWALAppend(ctx, key, edit);
827    Assert.assertEquals(1, key.getExtendedAttributes().size());
828    Assert.assertArrayEquals(SimpleRegionObserver.WAL_EXTENDED_ATTRIBUTE_BYTES,
829      key.getExtendedAttribute(Integer.toString(sro.getCtPreWALAppend())));
830  }
831
832  @Test
833  public void testPreWALAppendIsWrittenToWAL() throws Exception {
834    final TableName tableName =
835      TableName.valueOf(TEST_TABLE.getNameAsString() + "." + name.getMethodName());
836    Table table = util.createTable(tableName, new byte[][] { A, B, C });
837
838    PreWALAppendWALActionsListener listener = new PreWALAppendWALActionsListener();
839    List<HRegion> regions = util.getHBaseCluster().getRegions(tableName);
840    // should be only one region
841    HRegion region = regions.get(0);
842    region.getWAL().registerWALActionsListener(listener);
843    testPreWALAppendHook(table, tableName);
844    boolean[] expectedResults = { true, true, true, true };
845    Assert.assertArrayEquals(expectedResults, listener.getWalKeysCorrectArray());
846
847  }
848
849  @Test
850  public void testPreWALAppendNotCalledOnMetaEdit() throws Exception {
851    final TableName tableName =
852      TableName.valueOf(TEST_TABLE.getNameAsString() + "." + name.getMethodName());
853    TableDescriptorBuilder tdBuilder = TableDescriptorBuilder.newBuilder(tableName);
854    ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(FAMILY);
855    tdBuilder.setColumnFamily(cfBuilder.build());
856    tdBuilder.setCoprocessor(SimpleRegionObserver.class.getName());
857    TableDescriptor td = tdBuilder.build();
858    Table table = util.createTable(td, new byte[][] { A, B, C });
859
860    PreWALAppendWALActionsListener listener = new PreWALAppendWALActionsListener();
861    List<HRegion> regions = util.getHBaseCluster().getRegions(tableName);
862    // should be only one region
863    HRegion region = regions.get(0);
864
865    region.getWAL().registerWALActionsListener(listener);
866    // flushing should write to the WAL
867    region.flush(true);
868    // so should compaction
869    region.compact(false);
870    // and so should closing the region
871    region.close();
872
873    // but we still shouldn't have triggered preWALAppend because no user data was written
874    String[] methods = new String[] { "getCtPreWALAppend" };
875    Object[] expectedResult = new Integer[] { 0 };
876    verifyMethodResult(SimpleRegionObserver.class, methods, tableName, expectedResult);
877  }
878
879  // check each region whether the coprocessor upcalls are called or not.
880  private void verifyMethodResult(Class<?> coprocessor, String methodName[], TableName tableName,
881    Object value[]) throws IOException {
882    try {
883      for (JVMClusterUtil.RegionServerThread t : cluster.getRegionServerThreads()) {
884        if (!t.isAlive() || t.getRegionServer().isAborted() || t.getRegionServer().isStopping()) {
885          continue;
886        }
887        for (RegionInfo r : ProtobufUtil.getOnlineRegions(t.getRegionServer().getRSRpcServices())) {
888          if (!r.getTable().equals(tableName)) {
889            continue;
890          }
891          RegionCoprocessorHost cph =
892            t.getRegionServer().getOnlineRegion(r.getRegionName()).getCoprocessorHost();
893
894          Coprocessor cp = cph.findCoprocessor(coprocessor.getName());
895          assertNotNull(cp);
896          for (int i = 0; i < methodName.length; ++i) {
897            Method m = coprocessor.getMethod(methodName[i]);
898            Object o = m.invoke(cp);
899            assertTrue("Result of " + coprocessor.getName() + "." + methodName[i]
900              + " is expected to be " + value[i].toString() + ", while we get " + o.toString(),
901              o.equals(value[i]));
902          }
903        }
904      }
905    } catch (Exception e) {
906      throw new IOException(e.toString());
907    }
908  }
909
910  private static void createHFile(Configuration conf, FileSystem fs, Path path, byte[] family,
911    byte[] qualifier) throws IOException {
912    HFileContext context = new HFileContextBuilder().build();
913    HFile.Writer writer = HFile.getWriterFactory(conf, new CacheConfig(conf)).withPath(fs, path)
914      .withFileContext(context).create();
915    long now = EnvironmentEdgeManager.currentTime();
916    try {
917      for (int i = 1; i <= 9; i++) {
918        KeyValue kv =
919          new KeyValue(Bytes.toBytes(i + ""), family, qualifier, now, Bytes.toBytes(i + ""));
920        writer.append(kv);
921      }
922    } finally {
923      writer.close();
924    }
925  }
926
927  private static class PreWALAppendWALActionsListener implements WALActionsListener {
928    boolean[] walKeysCorrect = { false, false, false, false };
929
930    @Override
931    public void postAppend(long entryLen, long elapsedTimeMillis, WALKey logKey, WALEdit logEdit)
932      throws IOException {
933      for (int k = 0; k < 4; k++) {
934        if (!walKeysCorrect[k]) {
935          walKeysCorrect[k] = Arrays.equals(SimpleRegionObserver.WAL_EXTENDED_ATTRIBUTE_BYTES,
936            logKey.getExtendedAttribute(Integer.toString(k + 1)));
937        }
938      }
939    }
940
941    boolean[] getWalKeysCorrectArray() {
942      return walKeysCorrect;
943    }
944  }
945}