001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertTrue;
024
025import java.io.IOException;
026import java.lang.reflect.Method;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.List;
030import java.util.Optional;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.hbase.Cell;
035import org.apache.hadoop.hbase.CellUtil;
036import org.apache.hadoop.hbase.CompareOperator;
037import org.apache.hadoop.hbase.Coprocessor;
038import org.apache.hadoop.hbase.HBaseClassTestRule;
039import org.apache.hadoop.hbase.HBaseTestingUtil;
040import org.apache.hadoop.hbase.KeyValue;
041import org.apache.hadoop.hbase.ServerName;
042import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
043import org.apache.hadoop.hbase.TableName;
044import org.apache.hadoop.hbase.client.Admin;
045import org.apache.hadoop.hbase.client.Append;
046import org.apache.hadoop.hbase.client.CheckAndMutate;
047import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
048import org.apache.hadoop.hbase.client.Delete;
049import org.apache.hadoop.hbase.client.Durability;
050import org.apache.hadoop.hbase.client.Get;
051import org.apache.hadoop.hbase.client.Increment;
052import org.apache.hadoop.hbase.client.Mutation;
053import org.apache.hadoop.hbase.client.Put;
054import org.apache.hadoop.hbase.client.RegionInfo;
055import org.apache.hadoop.hbase.client.RegionLocator;
056import org.apache.hadoop.hbase.client.Result;
057import org.apache.hadoop.hbase.client.ResultScanner;
058import org.apache.hadoop.hbase.client.RowMutations;
059import org.apache.hadoop.hbase.client.Scan;
060import org.apache.hadoop.hbase.client.Table;
061import org.apache.hadoop.hbase.client.TableDescriptor;
062import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
063import org.apache.hadoop.hbase.filter.FilterAllFilter;
064import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
065import org.apache.hadoop.hbase.io.hfile.CacheConfig;
066import org.apache.hadoop.hbase.io.hfile.HFile;
067import org.apache.hadoop.hbase.io.hfile.HFileContext;
068import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
069import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
070import org.apache.hadoop.hbase.regionserver.HRegion;
071import org.apache.hadoop.hbase.regionserver.InternalScanner;
072import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
073import org.apache.hadoop.hbase.regionserver.ScanType;
074import org.apache.hadoop.hbase.regionserver.ScannerContext;
075import org.apache.hadoop.hbase.regionserver.Store;
076import org.apache.hadoop.hbase.regionserver.StoreFile;
077import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
078import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
079import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
080import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
081import org.apache.hadoop.hbase.testclassification.LargeTests;
082import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
083import org.apache.hadoop.hbase.util.Bytes;
084import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
085import org.apache.hadoop.hbase.util.JVMClusterUtil;
086import org.apache.hadoop.hbase.util.Threads;
087import org.apache.hadoop.hbase.wal.WALEdit;
088import org.apache.hadoop.hbase.wal.WALKey;
089import org.apache.hadoop.hbase.wal.WALKeyImpl;
090import org.junit.AfterClass;
091import org.junit.Assert;
092import org.junit.BeforeClass;
093import org.junit.ClassRule;
094import org.junit.Rule;
095import org.junit.Test;
096import org.junit.experimental.categories.Category;
097import org.junit.rules.TestName;
098import org.mockito.Mockito;
099import org.slf4j.Logger;
100import org.slf4j.LoggerFactory;
101
102import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
103
104@Category({ CoprocessorTests.class, LargeTests.class })
105public class TestRegionObserverInterface {
106
107  @ClassRule
108  public static final HBaseClassTestRule CLASS_RULE =
109    HBaseClassTestRule.forClass(TestRegionObserverInterface.class);
110
111  private static final Logger LOG = LoggerFactory.getLogger(TestRegionObserverInterface.class);
112
113  public static final TableName TEST_TABLE = TableName.valueOf("TestTable");
114  public static final byte[] FAMILY = Bytes.toBytes("f");
115  public final static byte[] A = Bytes.toBytes("a");
116  public final static byte[] B = Bytes.toBytes("b");
117  public final static byte[] C = Bytes.toBytes("c");
118  public final static byte[] ROW = Bytes.toBytes("testrow");
119
120  private static HBaseTestingUtil util = new HBaseTestingUtil();
121  private static SingleProcessHBaseCluster cluster = null;
122
123  @Rule
124  public TestName name = new TestName();
125
126  @BeforeClass
127  public static void setupBeforeClass() throws Exception {
128    // set configure to indicate which cp should be loaded
129    Configuration conf = util.getConfiguration();
130    conf.setBoolean("hbase.master.distributed.log.replay", true);
131    conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
132      SimpleRegionObserver.class.getName());
133
134    util.startMiniCluster();
135    cluster = util.getMiniHBaseCluster();
136  }
137
138  @AfterClass
139  public static void tearDownAfterClass() throws Exception {
140    util.shutdownMiniCluster();
141  }
142
143  @Test
144  public void testRegionObserver() throws IOException {
145    final TableName tableName =
146      TableName.valueOf(TEST_TABLE.getNameAsString() + "." + name.getMethodName());
147    // recreate table every time in order to reset the status of the
148    // coprocessor.
149    Table table = util.createTable(tableName, new byte[][] { A, B, C });
150    try {
151      verifyMethodResult(SimpleRegionObserver.class,
152        new String[] { "hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", "hadDelete",
153          "hadPostStartRegionOperation", "hadPostCloseRegionOperation",
154          "hadPostBatchMutateIndispensably" },
155        tableName, new Boolean[] { false, false, false, false, false, false, false, false });
156
157      Put put = new Put(ROW);
158      put.addColumn(A, A, A);
159      put.addColumn(B, B, B);
160      put.addColumn(C, C, C);
161      table.put(put);
162
163      verifyMethodResult(SimpleRegionObserver.class,
164        new String[] { "hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", "hadPreBatchMutate",
165          "hadPostBatchMutate", "hadDelete", "hadPostStartRegionOperation",
166          "hadPostCloseRegionOperation", "hadPostBatchMutateIndispensably" },
167        TEST_TABLE,
168        new Boolean[] { false, false, true, true, true, true, false, true, true, true });
169
170      verifyMethodResult(SimpleRegionObserver.class,
171        new String[] { "getCtPreOpen", "getCtPostOpen", "getCtPreClose", "getCtPostClose" },
172        tableName, new Integer[] { 1, 1, 0, 0 });
173
174      Get get = new Get(ROW);
175      get.addColumn(A, A);
176      get.addColumn(B, B);
177      get.addColumn(C, C);
178      table.get(get);
179
180      verifyMethodResult(SimpleRegionObserver.class,
181        new String[] { "hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", "hadDelete",
182          "hadPrePreparedDeleteTS" },
183        tableName, new Boolean[] { true, true, true, true, false, false });
184
185      Delete delete = new Delete(ROW);
186      delete.addColumn(A, A);
187      delete.addColumn(B, B);
188      delete.addColumn(C, C);
189      table.delete(delete);
190
191      verifyMethodResult(SimpleRegionObserver.class,
192        new String[] { "hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", "hadPreBatchMutate",
193          "hadPostBatchMutate", "hadDelete", "hadPrePreparedDeleteTS" },
194        tableName, new Boolean[] { true, true, true, true, true, true, true, true });
195    } finally {
196      util.deleteTable(tableName);
197      table.close();
198    }
199    verifyMethodResult(SimpleRegionObserver.class,
200      new String[] { "getCtPreOpen", "getCtPostOpen", "getCtPreClose", "getCtPostClose" },
201      tableName, new Integer[] { 1, 1, 1, 1 });
202  }
203
204  @Test
205  public void testRowMutation() throws IOException {
206    final TableName tableName =
207      TableName.valueOf(TEST_TABLE.getNameAsString() + "." + name.getMethodName());
208    Table table = util.createTable(tableName, new byte[][] { A, B, C });
209    try {
210      verifyMethodResult(SimpleRegionObserver.class,
211        new String[] { "hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", "hadDeleted" },
212        tableName, new Boolean[] { false, false, false, false, false });
213      Put put = new Put(ROW);
214      put.addColumn(A, A, A);
215      put.addColumn(B, B, B);
216      put.addColumn(C, C, C);
217
218      Delete delete = new Delete(ROW);
219      delete.addColumn(A, A);
220      delete.addColumn(B, B);
221      delete.addColumn(C, C);
222
223      RowMutations arm = new RowMutations(ROW);
224      arm.add(put);
225      arm.add(delete);
226      table.mutateRow(arm);
227
228      verifyMethodResult(SimpleRegionObserver.class,
229        new String[] { "hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", "hadDeleted" },
230        tableName, new Boolean[] { false, false, true, true, true });
231    } finally {
232      util.deleteTable(tableName);
233      table.close();
234    }
235  }
236
237  @Test
238  public void testIncrementHook() throws IOException {
239    final TableName tableName =
240      TableName.valueOf(TEST_TABLE.getNameAsString() + "." + name.getMethodName());
241    Table table = util.createTable(tableName, new byte[][] { A, B, C });
242    try {
243      Increment inc = new Increment(Bytes.toBytes(0));
244      inc.addColumn(A, A, 1);
245
246      verifyMethodResult(SimpleRegionObserver.class,
247        new String[] { "hadPreIncrement", "hadPostIncrement", "hadPreIncrementAfterRowLock",
248          "hadPreBatchMutate", "hadPostBatchMutate", "hadPostBatchMutateIndispensably" },
249        tableName, new Boolean[] { false, false, false, false, false, false });
250
251      table.increment(inc);
252
253      verifyMethodResult(SimpleRegionObserver.class,
254        new String[] { "hadPreIncrement", "hadPostIncrement", "hadPreIncrementAfterRowLock",
255          "hadPreBatchMutate", "hadPostBatchMutate", "hadPostBatchMutateIndispensably" },
256        tableName, new Boolean[] { true, true, true, true, true, true });
257    } finally {
258      util.deleteTable(tableName);
259      table.close();
260    }
261  }
262
263  @Test
264  public void testCheckAndPutHooks() throws IOException {
265    final TableName tableName =
266      TableName.valueOf(TEST_TABLE.getNameAsString() + "." + name.getMethodName());
267    try (Table table = util.createTable(tableName, new byte[][] { A, B, C })) {
268      Put p = new Put(Bytes.toBytes(0));
269      p.addColumn(A, A, A);
270      table.put(p);
271      p = new Put(Bytes.toBytes(0));
272      p.addColumn(A, A, A);
273      verifyMethodResult(SimpleRegionObserver.class,
274        new String[] { "getPreCheckAndPut", "getPreCheckAndPutAfterRowLock", "getPostCheckAndPut",
275          "getPreCheckAndPutWithFilter", "getPreCheckAndPutWithFilterAfterRowLock",
276          "getPostCheckAndPutWithFilter", "getPreCheckAndMutate",
277          "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
278        tableName, new Integer[] { 0, 0, 0, 0, 0, 0, 0, 0, 0 });
279
280      table.checkAndMutate(Bytes.toBytes(0), A).qualifier(A).ifEquals(A).thenPut(p);
281      verifyMethodResult(SimpleRegionObserver.class,
282        new String[] { "getPreCheckAndPut", "getPreCheckAndPutAfterRowLock", "getPostCheckAndPut",
283          "getPreCheckAndPutWithFilter", "getPreCheckAndPutWithFilterAfterRowLock",
284          "getPostCheckAndPutWithFilter", "getPreCheckAndMutate",
285          "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
286        tableName, new Integer[] { 1, 1, 1, 0, 0, 0, 1, 1, 1 });
287
288      table.checkAndMutate(Bytes.toBytes(0),
289        new SingleColumnValueFilter(A, A, CompareOperator.EQUAL, A)).thenPut(p);
290      verifyMethodResult(SimpleRegionObserver.class,
291        new String[] { "getPreCheckAndPut", "getPreCheckAndPutAfterRowLock", "getPostCheckAndPut",
292          "getPreCheckAndPutWithFilter", "getPreCheckAndPutWithFilterAfterRowLock",
293          "getPostCheckAndPutWithFilter", "getPreCheckAndMutate",
294          "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
295        tableName, new Integer[] { 1, 1, 1, 1, 1, 1, 2, 2, 2 });
296    } finally {
297      util.deleteTable(tableName);
298    }
299  }
300
301  @Test
302  public void testCheckAndDeleteHooks() throws IOException {
303    final TableName tableName =
304      TableName.valueOf(TEST_TABLE.getNameAsString() + "." + name.getMethodName());
305    Table table = util.createTable(tableName, new byte[][] { A, B, C });
306    try {
307      Put p = new Put(Bytes.toBytes(0));
308      p.addColumn(A, A, A);
309      table.put(p);
310      Delete d = new Delete(Bytes.toBytes(0));
311      table.delete(d);
312      verifyMethodResult(SimpleRegionObserver.class,
313        new String[] { "getPreCheckAndDelete", "getPreCheckAndDeleteAfterRowLock",
314          "getPostCheckAndDelete", "getPreCheckAndDeleteWithFilter",
315          "getPreCheckAndDeleteWithFilterAfterRowLock", "getPostCheckAndDeleteWithFilter",
316          "getPreCheckAndMutate", "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
317        tableName, new Integer[] { 0, 0, 0, 0, 0, 0, 0, 0, 0 });
318
319      table.checkAndMutate(Bytes.toBytes(0), A).qualifier(A).ifEquals(A).thenDelete(d);
320      verifyMethodResult(SimpleRegionObserver.class,
321        new String[] { "getPreCheckAndDelete", "getPreCheckAndDeleteAfterRowLock",
322          "getPostCheckAndDelete", "getPreCheckAndDeleteWithFilter",
323          "getPreCheckAndDeleteWithFilterAfterRowLock", "getPostCheckAndDeleteWithFilter",
324          "getPreCheckAndMutate", "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
325        tableName, new Integer[] { 1, 1, 1, 0, 0, 0, 1, 1, 1 });
326
327      table.checkAndMutate(Bytes.toBytes(0),
328        new SingleColumnValueFilter(A, A, CompareOperator.EQUAL, A)).thenDelete(d);
329      verifyMethodResult(SimpleRegionObserver.class,
330        new String[] { "getPreCheckAndDelete", "getPreCheckAndDeleteAfterRowLock",
331          "getPostCheckAndDelete", "getPreCheckAndDeleteWithFilter",
332          "getPreCheckAndDeleteWithFilterAfterRowLock", "getPostCheckAndDeleteWithFilter",
333          "getPreCheckAndMutate", "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
334        tableName, new Integer[] { 1, 1, 1, 1, 1, 1, 2, 2, 2 });
335    } finally {
336      util.deleteTable(tableName);
337      table.close();
338    }
339  }
340
341  @Test
342  public void testCheckAndIncrementHooks() throws Exception {
343    final TableName tableName =
344      TableName.valueOf(TEST_TABLE.getNameAsString() + "." + name.getMethodName());
345    Table table = util.createTable(tableName, new byte[][] { A, B, C });
346    try {
347      byte[] row = Bytes.toBytes(0);
348
349      verifyMethodResult(
350        SimpleRegionObserver.class, new String[] { "getPreCheckAndMutate",
351          "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
352        tableName, new Integer[] { 0, 0, 0 });
353
354      table.checkAndMutate(CheckAndMutate.newBuilder(row).ifNotExists(A, A)
355        .build(new Increment(row).addColumn(A, A, 1)));
356      verifyMethodResult(
357        SimpleRegionObserver.class, new String[] { "getPreCheckAndMutate",
358          "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
359        tableName, new Integer[] { 1, 1, 1 });
360
361      table.checkAndMutate(CheckAndMutate.newBuilder(row).ifEquals(A, A, Bytes.toBytes(1L))
362        .build(new Increment(row).addColumn(A, A, 1)));
363      verifyMethodResult(
364        SimpleRegionObserver.class, new String[] { "getPreCheckAndMutate",
365          "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
366        tableName, new Integer[] { 2, 2, 2 });
367    } finally {
368      util.deleteTable(tableName);
369      table.close();
370    }
371  }
372
373  @Test
374  public void testCheckAndAppendHooks() throws Exception {
375    final TableName tableName =
376      TableName.valueOf(TEST_TABLE.getNameAsString() + "." + name.getMethodName());
377    Table table = util.createTable(tableName, new byte[][] { A, B, C });
378    try {
379      byte[] row = Bytes.toBytes(0);
380
381      verifyMethodResult(
382        SimpleRegionObserver.class, new String[] { "getPreCheckAndMutate",
383          "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
384        tableName, new Integer[] { 0, 0, 0 });
385
386      table.checkAndMutate(
387        CheckAndMutate.newBuilder(row).ifNotExists(A, A).build(new Append(row).addColumn(A, A, A)));
388      verifyMethodResult(
389        SimpleRegionObserver.class, new String[] { "getPreCheckAndMutate",
390          "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
391        tableName, new Integer[] { 1, 1, 1 });
392
393      table.checkAndMutate(
394        CheckAndMutate.newBuilder(row).ifEquals(A, A, A).build(new Append(row).addColumn(A, A, A)));
395      verifyMethodResult(
396        SimpleRegionObserver.class, new String[] { "getPreCheckAndMutate",
397          "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
398        tableName, new Integer[] { 2, 2, 2 });
399    } finally {
400      util.deleteTable(tableName);
401      table.close();
402    }
403  }
404
405  @Test
406  public void testCheckAndRowMutationsHooks() throws Exception {
407    final TableName tableName =
408      TableName.valueOf(TEST_TABLE.getNameAsString() + "." + name.getMethodName());
409    Table table = util.createTable(tableName, new byte[][] { A, B, C });
410    try {
411      byte[] row = Bytes.toBytes(0);
412
413      Put p = new Put(row).addColumn(A, A, A);
414      table.put(p);
415      verifyMethodResult(
416        SimpleRegionObserver.class, new String[] { "getPreCheckAndMutate",
417          "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
418        tableName, new Integer[] { 0, 0, 0 });
419
420      table
421        .checkAndMutate(CheckAndMutate.newBuilder(row).ifEquals(A, A, A).build(new RowMutations(row)
422          .add((Mutation) new Put(row).addColumn(B, B, B)).add((Mutation) new Delete(row))));
423      verifyMethodResult(
424        SimpleRegionObserver.class, new String[] { "getPreCheckAndMutate",
425          "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
426        tableName, new Integer[] { 1, 1, 1 });
427
428      Object[] result = new Object[2];
429      table.batch(
430        Arrays.asList(p,
431          CheckAndMutate.newBuilder(row).ifEquals(A, A, A).build(new RowMutations(row)
432            .add((Mutation) new Put(row).addColumn(B, B, B)).add((Mutation) new Delete(row)))),
433        result);
434      verifyMethodResult(
435        SimpleRegionObserver.class, new String[] { "getPreCheckAndMutate",
436          "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
437        tableName, new Integer[] { 2, 2, 2 });
438    } finally {
439      util.deleteTable(tableName);
440      table.close();
441    }
442  }
443
444  @Test
445  public void testAppendHook() throws IOException {
446    final TableName tableName =
447      TableName.valueOf(TEST_TABLE.getNameAsString() + "." + name.getMethodName());
448    Table table = util.createTable(tableName, new byte[][] { A, B, C });
449    try {
450      Append app = new Append(Bytes.toBytes(0));
451      app.addColumn(A, A, A);
452
453      verifyMethodResult(SimpleRegionObserver.class,
454        new String[] { "hadPreAppend", "hadPostAppend", "hadPreAppendAfterRowLock",
455          "hadPreBatchMutate", "hadPostBatchMutate", "hadPostBatchMutateIndispensably" },
456        tableName, new Boolean[] { false, false, false, false, false, false });
457
458      table.append(app);
459
460      verifyMethodResult(SimpleRegionObserver.class,
461        new String[] { "hadPreAppend", "hadPostAppend", "hadPreAppendAfterRowLock",
462          "hadPreBatchMutate", "hadPostBatchMutate", "hadPostBatchMutateIndispensably" },
463        tableName, new Boolean[] { true, true, true, true, true, true });
464    } finally {
465      util.deleteTable(tableName);
466      table.close();
467    }
468  }
469
470  @Test
471  // HBase-3583
472  public void testHBase3583() throws IOException {
473    final TableName tableName = TableName.valueOf(name.getMethodName());
474    util.createTable(tableName, new byte[][] { A, B, C });
475    util.waitUntilAllRegionsAssigned(tableName);
476
477    verifyMethodResult(SimpleRegionObserver.class,
478      new String[] { "hadPreGet", "hadPostGet", "wasScannerNextCalled", "wasScannerCloseCalled" },
479      tableName, new Boolean[] { false, false, false, false });
480
481    Table table = util.getConnection().getTable(tableName);
482    Put put = new Put(ROW);
483    put.addColumn(A, A, A);
484    table.put(put);
485
486    Get get = new Get(ROW);
487    get.addColumn(A, A);
488    table.get(get);
489
490    // verify that scannerNext and scannerClose upcalls won't be invoked
491    // when we perform get().
492    verifyMethodResult(SimpleRegionObserver.class,
493      new String[] { "hadPreGet", "hadPostGet", "wasScannerNextCalled", "wasScannerCloseCalled" },
494      tableName, new Boolean[] { true, true, false, false });
495
496    Scan s = new Scan();
497    ResultScanner scanner = table.getScanner(s);
498    try {
499      for (Result rr = scanner.next(); rr != null; rr = scanner.next()) {
500      }
501    } finally {
502      scanner.close();
503    }
504
505    // now scanner hooks should be invoked.
506    verifyMethodResult(SimpleRegionObserver.class,
507      new String[] { "wasScannerNextCalled", "wasScannerCloseCalled" }, tableName,
508      new Boolean[] { true, true });
509    util.deleteTable(tableName);
510    table.close();
511  }
512
513  @Test
514  public void testHBASE14489() throws IOException {
515    final TableName tableName = TableName.valueOf(name.getMethodName());
516    Table table = util.createTable(tableName, new byte[][] { A });
517    Put put = new Put(ROW);
518    put.addColumn(A, A, A);
519    table.put(put);
520
521    Scan s = new Scan();
522    s.setFilter(new FilterAllFilter());
523    ResultScanner scanner = table.getScanner(s);
524    try {
525      for (Result rr = scanner.next(); rr != null; rr = scanner.next()) {
526      }
527    } finally {
528      scanner.close();
529    }
530    verifyMethodResult(SimpleRegionObserver.class, new String[] { "wasScannerFilterRowCalled" },
531      tableName, new Boolean[] { true });
532    util.deleteTable(tableName);
533    table.close();
534
535  }
536
537  @Test
538  // HBase-3758
539  public void testHBase3758() throws IOException {
540    final TableName tableName = TableName.valueOf(name.getMethodName());
541    util.createTable(tableName, new byte[][] { A, B, C });
542
543    verifyMethodResult(SimpleRegionObserver.class,
544      new String[] { "hadDeleted", "wasScannerOpenCalled" }, tableName,
545      new Boolean[] { false, false });
546
547    Table table = util.getConnection().getTable(tableName);
548    Put put = new Put(ROW);
549    put.addColumn(A, A, A);
550    table.put(put);
551
552    Delete delete = new Delete(ROW);
553    table.delete(delete);
554
555    verifyMethodResult(SimpleRegionObserver.class,
556      new String[] { "hadDeleted", "wasScannerOpenCalled" }, tableName,
557      new Boolean[] { true, false });
558
559    Scan s = new Scan();
560    ResultScanner scanner = table.getScanner(s);
561    try {
562      for (Result rr = scanner.next(); rr != null; rr = scanner.next()) {
563      }
564    } finally {
565      scanner.close();
566    }
567
568    // now scanner hooks should be invoked.
569    verifyMethodResult(SimpleRegionObserver.class, new String[] { "wasScannerOpenCalled" },
570      tableName, new Boolean[] { true });
571    util.deleteTable(tableName);
572    table.close();
573  }
574
575  /* Overrides compaction to only output rows with keys that are even numbers */
576  public static class EvenOnlyCompactor implements RegionCoprocessor, RegionObserver {
577    long lastCompaction;
578    long lastFlush;
579
580    @Override
581    public Optional<RegionObserver> getRegionObserver() {
582      return Optional.of(this);
583    }
584
585    @Override
586    public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store,
587      InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
588      CompactionRequest request) {
589      return new InternalScanner() {
590
591        @Override
592        public boolean next(List<Cell> results, ScannerContext scannerContext) throws IOException {
593          List<Cell> internalResults = new ArrayList<>();
594          boolean hasMore;
595          do {
596            hasMore = scanner.next(internalResults, scannerContext);
597            if (!internalResults.isEmpty()) {
598              long row = Bytes.toLong(CellUtil.cloneValue(internalResults.get(0)));
599              if (row % 2 == 0) {
600                // return this row
601                break;
602              }
603              // clear and continue
604              internalResults.clear();
605            }
606          } while (hasMore);
607
608          if (!internalResults.isEmpty()) {
609            results.addAll(internalResults);
610          }
611          return hasMore;
612        }
613
614        @Override
615        public void close() throws IOException {
616          scanner.close();
617        }
618      };
619    }
620
621    @Override
622    public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store,
623      StoreFile resultFile, CompactionLifeCycleTracker tracker, CompactionRequest request) {
624      lastCompaction = EnvironmentEdgeManager.currentTime();
625    }
626
627    @Override
628    public void postFlush(ObserverContext<RegionCoprocessorEnvironment> e,
629      FlushLifeCycleTracker tracker) {
630      lastFlush = EnvironmentEdgeManager.currentTime();
631    }
632  }
633
634  /**
635   * Tests overriding compaction handling via coprocessor hooks
636   */
637  @Test
638  public void testCompactionOverride() throws Exception {
639    final TableName compactTable = TableName.valueOf(name.getMethodName());
640    Admin admin = util.getAdmin();
641    if (admin.tableExists(compactTable)) {
642      admin.disableTable(compactTable);
643      admin.deleteTable(compactTable);
644    }
645
646    TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(compactTable)
647      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(A))
648      .setCoprocessor(EvenOnlyCompactor.class.getName()).build();
649    admin.createTable(tableDescriptor);
650
651    Table table = util.getConnection().getTable(compactTable);
652    for (long i = 1; i <= 10; i++) {
653      byte[] iBytes = Bytes.toBytes(i);
654      Put put = new Put(iBytes);
655      put.setDurability(Durability.SKIP_WAL);
656      put.addColumn(A, A, iBytes);
657      table.put(put);
658    }
659
660    HRegion firstRegion = cluster.getRegions(compactTable).get(0);
661    Coprocessor cp = firstRegion.getCoprocessorHost().findCoprocessor(EvenOnlyCompactor.class);
662    assertNotNull("EvenOnlyCompactor coprocessor should be loaded", cp);
663    EvenOnlyCompactor compactor = (EvenOnlyCompactor) cp;
664
665    // force a compaction
666    long ts = EnvironmentEdgeManager.currentTime();
667    admin.flush(compactTable);
668    // wait for flush
669    for (int i = 0; i < 10; i++) {
670      if (compactor.lastFlush >= ts) {
671        break;
672      }
673      Thread.sleep(1000);
674    }
675    assertTrue("Flush didn't complete", compactor.lastFlush >= ts);
676    LOG.debug("Flush complete");
677
678    ts = compactor.lastFlush;
679    admin.majorCompact(compactTable);
680    // wait for compaction
681    for (int i = 0; i < 30; i++) {
682      if (compactor.lastCompaction >= ts) {
683        break;
684      }
685      Thread.sleep(1000);
686    }
687    LOG.debug("Last compaction was at " + compactor.lastCompaction);
688    assertTrue("Compaction didn't complete", compactor.lastCompaction >= ts);
689
690    // only even rows should remain
691    ResultScanner scanner = table.getScanner(new Scan());
692    try {
693      for (long i = 2; i <= 10; i += 2) {
694        Result r = scanner.next();
695        assertNotNull(r);
696        assertFalse(r.isEmpty());
697        byte[] iBytes = Bytes.toBytes(i);
698        assertArrayEquals("Row should be " + i, r.getRow(), iBytes);
699        assertArrayEquals("Value should be " + i, r.getValue(A, A), iBytes);
700      }
701    } finally {
702      scanner.close();
703    }
704    table.close();
705  }
706
707  @Test
708  public void bulkLoadHFileTest() throws Exception {
709    final String testName =
710      TestRegionObserverInterface.class.getName() + "." + name.getMethodName();
711    final TableName tableName =
712      TableName.valueOf(TEST_TABLE.getNameAsString() + "." + name.getMethodName());
713    Configuration conf = util.getConfiguration();
714    Table table = util.createTable(tableName, new byte[][] { A, B, C });
715    try (RegionLocator locator = util.getConnection().getRegionLocator(tableName)) {
716      verifyMethodResult(SimpleRegionObserver.class,
717        new String[] { "hadPreBulkLoadHFile", "hadPostBulkLoadHFile" }, tableName,
718        new Boolean[] { false, false });
719
720      FileSystem fs = util.getTestFileSystem();
721      final Path dir = util.getDataTestDirOnTestFS(testName).makeQualified(fs);
722      Path familyDir = new Path(dir, Bytes.toString(A));
723
724      createHFile(util.getConfiguration(), fs, new Path(familyDir, Bytes.toString(A)), A, A);
725
726      // Bulk load
727      BulkLoadHFiles.create(conf).bulkLoad(tableName, dir);
728
729      verifyMethodResult(SimpleRegionObserver.class,
730        new String[] { "hadPreBulkLoadHFile", "hadPostBulkLoadHFile" }, tableName,
731        new Boolean[] { true, true });
732    } finally {
733      util.deleteTable(tableName);
734      table.close();
735    }
736  }
737
738  @Test
739  public void testRecovery() throws Exception {
740    LOG.info(TestRegionObserverInterface.class.getName() + "." + name.getMethodName());
741    final TableName tableName =
742      TableName.valueOf(TEST_TABLE.getNameAsString() + "." + name.getMethodName());
743    Table table = util.createTable(tableName, new byte[][] { A, B, C });
744    try (RegionLocator locator = util.getConnection().getRegionLocator(tableName)) {
745
746      JVMClusterUtil.RegionServerThread rs1 = cluster.startRegionServer();
747      ServerName sn2 = rs1.getRegionServer().getServerName();
748      String regEN = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
749
750      util.getAdmin().move(Bytes.toBytes(regEN), sn2);
751      while (!sn2.equals(locator.getAllRegionLocations().get(0).getServerName())) {
752        Thread.sleep(100);
753      }
754
755      Put put = new Put(ROW);
756      put.addColumn(A, A, A);
757      put.addColumn(B, B, B);
758      put.addColumn(C, C, C);
759      table.put(put);
760
761      // put two times
762      table.put(put);
763
764      verifyMethodResult(SimpleRegionObserver.class,
765        new String[] { "hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", "hadPreBatchMutate",
766          "hadPostBatchMutate", "hadDelete" },
767        tableName, new Boolean[] { false, false, true, true, true, true, false });
768
769      verifyMethodResult(SimpleRegionObserver.class,
770        new String[] { "getCtPreReplayWALs", "getCtPostReplayWALs", "getCtPreWALRestore",
771          "getCtPostWALRestore", "getCtPrePut", "getCtPostPut" },
772        tableName, new Integer[] { 0, 0, 0, 0, 2, 2 });
773
774      cluster.killRegionServer(rs1.getRegionServer().getServerName());
775      Threads.sleep(1000); // Let the kill soak in.
776      util.waitUntilAllRegionsAssigned(tableName);
777      LOG.info("All regions assigned");
778
779      verifyMethodResult(SimpleRegionObserver.class,
780        new String[] { "getCtPreReplayWALs", "getCtPostReplayWALs", "getCtPreWALRestore",
781          "getCtPostWALRestore", "getCtPrePut", "getCtPostPut" },
782        tableName, new Integer[] { 1, 1, 2, 2, 0, 0 });
783    } finally {
784      util.deleteTable(tableName);
785      table.close();
786    }
787  }
788
789  @Test
790  public void testPreWALRestoreSkip() throws Exception {
791    LOG.info(TestRegionObserverInterface.class.getName() + "." + name.getMethodName());
792    TableName tableName = TableName.valueOf(SimpleRegionObserver.TABLE_SKIPPED);
793    Table table = util.createTable(tableName, new byte[][] { A, B, C });
794
795    try (RegionLocator locator = util.getConnection().getRegionLocator(tableName)) {
796      JVMClusterUtil.RegionServerThread rs1 = cluster.startRegionServer();
797      ServerName sn2 = rs1.getRegionServer().getServerName();
798      String regEN = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
799
800      util.getAdmin().move(Bytes.toBytes(regEN), sn2);
801      while (!sn2.equals(locator.getAllRegionLocations().get(0).getServerName())) {
802        Thread.sleep(100);
803      }
804
805      Put put = new Put(ROW);
806      put.addColumn(A, A, A);
807      put.addColumn(B, B, B);
808      put.addColumn(C, C, C);
809      table.put(put);
810
811      cluster.killRegionServer(rs1.getRegionServer().getServerName());
812      Threads.sleep(20000); // just to be sure that the kill has fully started.
813      util.waitUntilAllRegionsAssigned(tableName);
814    }
815
816    verifyMethodResult(SimpleRegionObserver.class,
817      new String[] { "getCtPreWALRestore", "getCtPostWALRestore", }, tableName,
818      new Integer[] { 0, 0 });
819
820    util.deleteTable(tableName);
821    table.close();
822  }
823
824  // called from testPreWALAppendIsWrittenToWAL
825  private void testPreWALAppendHook(Table table, TableName tableName) throws IOException {
826    int expectedCalls = 0;
827    String[] methodArray = new String[1];
828    methodArray[0] = "getCtPreWALAppend";
829    Object[] resultArray = new Object[1];
830
831    Put p = new Put(ROW);
832    p.addColumn(A, A, A);
833    table.put(p);
834    resultArray[0] = ++expectedCalls;
835    verifyMethodResult(SimpleRegionObserver.class, methodArray, tableName, resultArray);
836
837    Append a = new Append(ROW);
838    a.addColumn(B, B, B);
839    table.append(a);
840    resultArray[0] = ++expectedCalls;
841    verifyMethodResult(SimpleRegionObserver.class, methodArray, tableName, resultArray);
842
843    Increment i = new Increment(ROW);
844    i.addColumn(C, C, 1);
845    table.increment(i);
846    resultArray[0] = ++expectedCalls;
847    verifyMethodResult(SimpleRegionObserver.class, methodArray, tableName, resultArray);
848
849    Delete d = new Delete(ROW);
850    table.delete(d);
851    resultArray[0] = ++expectedCalls;
852    verifyMethodResult(SimpleRegionObserver.class, methodArray, tableName, resultArray);
853  }
854
855  @Test
856  public void testPreWALAppend() throws Exception {
857    SimpleRegionObserver sro = new SimpleRegionObserver();
858    ObserverContext ctx = Mockito.mock(ObserverContext.class);
859    WALKey key =
860      new WALKeyImpl(Bytes.toBytes("region"), TEST_TABLE, EnvironmentEdgeManager.currentTime());
861    WALEdit edit = new WALEdit();
862    sro.preWALAppend(ctx, key, edit);
863    Assert.assertEquals(1, key.getExtendedAttributes().size());
864    Assert.assertArrayEquals(SimpleRegionObserver.WAL_EXTENDED_ATTRIBUTE_BYTES,
865      key.getExtendedAttribute(Integer.toString(sro.getCtPreWALAppend())));
866  }
867
868  @Test
869  public void testPreWALAppendIsWrittenToWAL() throws Exception {
870    final TableName tableName =
871      TableName.valueOf(TEST_TABLE.getNameAsString() + "." + name.getMethodName());
872    Table table = util.createTable(tableName, new byte[][] { A, B, C });
873
874    PreWALAppendWALActionsListener listener = new PreWALAppendWALActionsListener();
875    List<HRegion> regions = util.getHBaseCluster().getRegions(tableName);
876    // should be only one region
877    HRegion region = regions.get(0);
878    region.getWAL().registerWALActionsListener(listener);
879    testPreWALAppendHook(table, tableName);
880    boolean[] expectedResults = { true, true, true, true };
881    Assert.assertArrayEquals(expectedResults, listener.getWalKeysCorrectArray());
882
883  }
884
885  @Test
886  public void testPreWALAppendNotCalledOnMetaEdit() throws Exception {
887    final TableName tableName =
888      TableName.valueOf(TEST_TABLE.getNameAsString() + "." + name.getMethodName());
889    TableDescriptorBuilder tdBuilder = TableDescriptorBuilder.newBuilder(tableName);
890    ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(FAMILY);
891    tdBuilder.setColumnFamily(cfBuilder.build());
892    tdBuilder.setCoprocessor(SimpleRegionObserver.class.getName());
893    TableDescriptor td = tdBuilder.build();
894    Table table = util.createTable(td, new byte[][] { A, B, C });
895
896    PreWALAppendWALActionsListener listener = new PreWALAppendWALActionsListener();
897    List<HRegion> regions = util.getHBaseCluster().getRegions(tableName);
898    // should be only one region
899    HRegion region = regions.get(0);
900
901    region.getWAL().registerWALActionsListener(listener);
902    // flushing should write to the WAL
903    region.flush(true);
904    // so should compaction
905    region.compact(false);
906    // and so should closing the region
907    region.close();
908
909    // but we still shouldn't have triggered preWALAppend because no user data was written
910    String[] methods = new String[] { "getCtPreWALAppend" };
911    Object[] expectedResult = new Integer[] { 0 };
912    verifyMethodResult(SimpleRegionObserver.class, methods, tableName, expectedResult);
913  }
914
915  // check each region whether the coprocessor upcalls are called or not.
916  private void verifyMethodResult(Class<?> coprocessor, String methodName[], TableName tableName,
917    Object value[]) throws IOException {
918    try {
919      for (JVMClusterUtil.RegionServerThread t : cluster.getRegionServerThreads()) {
920        if (!t.isAlive() || t.getRegionServer().isAborted() || t.getRegionServer().isStopping()) {
921          continue;
922        }
923        for (RegionInfo r : ProtobufUtil.getOnlineRegions(t.getRegionServer().getRSRpcServices())) {
924          if (!r.getTable().equals(tableName)) {
925            continue;
926          }
927          RegionCoprocessorHost cph =
928            t.getRegionServer().getOnlineRegion(r.getRegionName()).getCoprocessorHost();
929
930          Coprocessor cp = cph.findCoprocessor(coprocessor.getName());
931          assertNotNull(cp);
932          for (int i = 0; i < methodName.length; ++i) {
933            Method m = coprocessor.getMethod(methodName[i]);
934            Object o = m.invoke(cp);
935            assertTrue("Result of " + coprocessor.getName() + "." + methodName[i]
936              + " is expected to be " + value[i].toString() + ", while we get " + o.toString(),
937              o.equals(value[i]));
938          }
939        }
940      }
941    } catch (Exception e) {
942      throw new IOException(e.toString());
943    }
944  }
945
946  private static void createHFile(Configuration conf, FileSystem fs, Path path, byte[] family,
947    byte[] qualifier) throws IOException {
948    HFileContext context = new HFileContextBuilder().build();
949    HFile.Writer writer = HFile.getWriterFactory(conf, new CacheConfig(conf)).withPath(fs, path)
950      .withFileContext(context).create();
951    long now = EnvironmentEdgeManager.currentTime();
952    try {
953      for (int i = 1; i <= 9; i++) {
954        KeyValue kv =
955          new KeyValue(Bytes.toBytes(i + ""), family, qualifier, now, Bytes.toBytes(i + ""));
956        writer.append(kv);
957      }
958    } finally {
959      writer.close();
960    }
961  }
962
963  private static class PreWALAppendWALActionsListener implements WALActionsListener {
964    boolean[] walKeysCorrect = { false, false, false, false };
965
966    @Override
967    public void postAppend(long entryLen, long elapsedTimeMillis, WALKey logKey, WALEdit logEdit)
968      throws IOException {
969      for (int k = 0; k < 4; k++) {
970        if (!walKeysCorrect[k]) {
971          walKeysCorrect[k] = Arrays.equals(SimpleRegionObserver.WAL_EXTENDED_ATTRIBUTE_BYTES,
972            logKey.getExtendedAttribute(Integer.toString(k + 1)));
973        }
974      }
975    }
976
977    boolean[] getWalKeysCorrectArray() {
978      return walKeysCorrect;
979    }
980  }
981}