001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertTrue;
024
025import java.io.IOException;
026import java.lang.reflect.Method;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.List;
030import java.util.Optional;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.hbase.Cell;
035import org.apache.hadoop.hbase.CellUtil;
036import org.apache.hadoop.hbase.CompareOperator;
037import org.apache.hadoop.hbase.Coprocessor;
038import org.apache.hadoop.hbase.HBaseClassTestRule;
039import org.apache.hadoop.hbase.HBaseTestingUtility;
040import org.apache.hadoop.hbase.HColumnDescriptor;
041import org.apache.hadoop.hbase.HTableDescriptor;
042import org.apache.hadoop.hbase.KeyValue;
043import org.apache.hadoop.hbase.MiniHBaseCluster;
044import org.apache.hadoop.hbase.ServerName;
045import org.apache.hadoop.hbase.TableName;
046import org.apache.hadoop.hbase.client.Admin;
047import org.apache.hadoop.hbase.client.Append;
048import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
049import org.apache.hadoop.hbase.client.Delete;
050import org.apache.hadoop.hbase.client.Durability;
051import org.apache.hadoop.hbase.client.Get;
052import org.apache.hadoop.hbase.client.Increment;
053import org.apache.hadoop.hbase.client.Put;
054import org.apache.hadoop.hbase.client.RegionInfo;
055import org.apache.hadoop.hbase.client.RegionLocator;
056import org.apache.hadoop.hbase.client.Result;
057import org.apache.hadoop.hbase.client.ResultScanner;
058import org.apache.hadoop.hbase.client.RowMutations;
059import org.apache.hadoop.hbase.client.Scan;
060import org.apache.hadoop.hbase.client.Table;
061import org.apache.hadoop.hbase.client.TableDescriptor;
062import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
063import org.apache.hadoop.hbase.filter.FilterAllFilter;
064import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
065import org.apache.hadoop.hbase.io.hfile.CacheConfig;
066import org.apache.hadoop.hbase.io.hfile.HFile;
067import org.apache.hadoop.hbase.io.hfile.HFileContext;
068import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
069import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
070import org.apache.hadoop.hbase.regionserver.HRegion;
071import org.apache.hadoop.hbase.regionserver.InternalScanner;
072import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
073import org.apache.hadoop.hbase.regionserver.ScanType;
074import org.apache.hadoop.hbase.regionserver.ScannerContext;
075import org.apache.hadoop.hbase.regionserver.Store;
076import org.apache.hadoop.hbase.regionserver.StoreFile;
077import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
078import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
079import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
080import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
081import org.apache.hadoop.hbase.testclassification.LargeTests;
082import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
083import org.apache.hadoop.hbase.util.Bytes;
084import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
085import org.apache.hadoop.hbase.util.JVMClusterUtil;
086import org.apache.hadoop.hbase.util.Threads;
087import org.apache.hadoop.hbase.wal.WALEdit;
088import org.apache.hadoop.hbase.wal.WALKey;
089import org.apache.hadoop.hbase.wal.WALKeyImpl;
090import org.junit.AfterClass;
091import org.junit.Assert;
092import org.junit.BeforeClass;
093import org.junit.ClassRule;
094import org.junit.Rule;
095import org.junit.Test;
096import org.junit.experimental.categories.Category;
097import org.junit.rules.TestName;
098import org.mockito.Mockito;
099import org.slf4j.Logger;
100import org.slf4j.LoggerFactory;
101
102import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
103
104@Category({ CoprocessorTests.class, LargeTests.class })
105public class TestRegionObserverInterface {
106
107  @ClassRule
108  public static final HBaseClassTestRule CLASS_RULE =
109      HBaseClassTestRule.forClass(TestRegionObserverInterface.class);
110
111  private static final Logger LOG = LoggerFactory.getLogger(TestRegionObserverInterface.class);
112
113  public static final TableName TEST_TABLE = TableName.valueOf("TestTable");
114  public static final byte[] FAMILY = Bytes.toBytes("f");
115  public final static byte[] A = Bytes.toBytes("a");
116  public final static byte[] B = Bytes.toBytes("b");
117  public final static byte[] C = Bytes.toBytes("c");
118  public final static byte[] ROW = Bytes.toBytes("testrow");
119
120  private static HBaseTestingUtility util = new HBaseTestingUtility();
121  private static MiniHBaseCluster cluster = null;
122
123  @Rule
124  public TestName name = new TestName();
125
126  @BeforeClass
127  public static void setupBeforeClass() throws Exception {
128    // set configure to indicate which cp should be loaded
129    Configuration conf = util.getConfiguration();
130    conf.setBoolean("hbase.master.distributed.log.replay", true);
131    conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
132      SimpleRegionObserver.class.getName());
133
134    util.startMiniCluster();
135    cluster = util.getMiniHBaseCluster();
136  }
137
138  @AfterClass
139  public static void tearDownAfterClass() throws Exception {
140    util.shutdownMiniCluster();
141  }
142
143  @Test
144  public void testRegionObserver() throws IOException {
145    final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + "." + name.getMethodName());
146    // recreate table every time in order to reset the status of the
147    // coprocessor.
148    Table table = util.createTable(tableName, new byte[][] { A, B, C });
149    try {
150      verifyMethodResult(SimpleRegionObserver.class,
151        new String[] { "hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", "hadDelete",
152            "hadPostStartRegionOperation", "hadPostCloseRegionOperation",
153            "hadPostBatchMutateIndispensably" },
154        tableName, new Boolean[] { false, false, false, false, false, false, false, false });
155
156      Put put = new Put(ROW);
157      put.addColumn(A, A, A);
158      put.addColumn(B, B, B);
159      put.addColumn(C, C, C);
160      table.put(put);
161
162      verifyMethodResult(SimpleRegionObserver.class,
163        new String[] { "hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", "hadPreBatchMutate",
164            "hadPostBatchMutate", "hadDelete", "hadPostStartRegionOperation",
165            "hadPostCloseRegionOperation", "hadPostBatchMutateIndispensably" },
166        TEST_TABLE,
167        new Boolean[] { false, false, true, true, true, true, false, true, true, true });
168
169      verifyMethodResult(SimpleRegionObserver.class,
170        new String[] { "getCtPreOpen", "getCtPostOpen", "getCtPreClose", "getCtPostClose" },
171        tableName, new Integer[] { 1, 1, 0, 0 });
172
173      Get get = new Get(ROW);
174      get.addColumn(A, A);
175      get.addColumn(B, B);
176      get.addColumn(C, C);
177      table.get(get);
178
179      verifyMethodResult(SimpleRegionObserver.class,
180        new String[] { "hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", "hadDelete",
181            "hadPrePreparedDeleteTS" },
182        tableName, new Boolean[] { true, true, true, true, false, false });
183
184      Delete delete = new Delete(ROW);
185      delete.addColumn(A, A);
186      delete.addColumn(B, B);
187      delete.addColumn(C, C);
188      table.delete(delete);
189
190      verifyMethodResult(SimpleRegionObserver.class,
191        new String[] { "hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", "hadPreBatchMutate",
192            "hadPostBatchMutate", "hadDelete", "hadPrePreparedDeleteTS" },
193        tableName, new Boolean[] { true, true, true, true, true, true, true, true });
194    } finally {
195      util.deleteTable(tableName);
196      table.close();
197    }
198    verifyMethodResult(SimpleRegionObserver.class,
199      new String[] { "getCtPreOpen", "getCtPostOpen", "getCtPreClose", "getCtPostClose" },
200      tableName, new Integer[] { 1, 1, 1, 1 });
201  }
202
203  @Test
204  public void testRowMutation() throws IOException {
205    final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + "." + name.getMethodName());
206    Table table = util.createTable(tableName, new byte[][] { A, B, C });
207    try {
208      verifyMethodResult(SimpleRegionObserver.class,
209        new String[] { "hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", "hadDeleted" },
210        tableName, new Boolean[] { false, false, false, false, false });
211      Put put = new Put(ROW);
212      put.addColumn(A, A, A);
213      put.addColumn(B, B, B);
214      put.addColumn(C, C, C);
215
216      Delete delete = new Delete(ROW);
217      delete.addColumn(A, A);
218      delete.addColumn(B, B);
219      delete.addColumn(C, C);
220
221      RowMutations arm = new RowMutations(ROW);
222      arm.add(put);
223      arm.add(delete);
224      table.mutateRow(arm);
225
226      verifyMethodResult(SimpleRegionObserver.class,
227        new String[] { "hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", "hadDeleted" },
228        tableName, new Boolean[] { false, false, true, true, true });
229    } finally {
230      util.deleteTable(tableName);
231      table.close();
232    }
233  }
234
235  @Test
236  public void testIncrementHook() throws IOException {
237    final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + "." + name.getMethodName());
238    Table table = util.createTable(tableName, new byte[][] { A, B, C });
239    try {
240      Increment inc = new Increment(Bytes.toBytes(0));
241      inc.addColumn(A, A, 1);
242
243      verifyMethodResult(SimpleRegionObserver.class,
244        new String[] { "hadPreIncrement", "hadPostIncrement", "hadPreIncrementAfterRowLock" },
245        tableName, new Boolean[] { false, false, false });
246
247      table.increment(inc);
248
249      verifyMethodResult(SimpleRegionObserver.class,
250        new String[] { "hadPreIncrement", "hadPostIncrement", "hadPreIncrementAfterRowLock" },
251        tableName, new Boolean[] { true, true, true });
252    } finally {
253      util.deleteTable(tableName);
254      table.close();
255    }
256  }
257
258  @Test
259  public void testCheckAndPutHooks() throws IOException {
260    final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + "." + name.getMethodName());
261    try (Table table = util.createTable(tableName, new byte[][] { A, B, C })) {
262      Put p = new Put(Bytes.toBytes(0));
263      p.addColumn(A, A, A);
264      table.put(p);
265      p = new Put(Bytes.toBytes(0));
266      p.addColumn(A, A, A);
267      verifyMethodResult(SimpleRegionObserver.class,
268        new String[] { "getPreCheckAndPut", "getPreCheckAndPutAfterRowLock", "getPostCheckAndPut",
269          "getPreCheckAndPutWithFilter", "getPreCheckAndPutWithFilterAfterRowLock",
270          "getPostCheckAndPutWithFilter" },
271        tableName, new Integer[] { 0, 0, 0, 0, 0, 0 });
272
273      table.checkAndMutate(Bytes.toBytes(0), A).qualifier(A).ifEquals(A).thenPut(p);
274      verifyMethodResult(SimpleRegionObserver.class,
275        new String[] { "getPreCheckAndPut", "getPreCheckAndPutAfterRowLock", "getPostCheckAndPut",
276          "getPreCheckAndPutWithFilter", "getPreCheckAndPutWithFilterAfterRowLock",
277          "getPostCheckAndPutWithFilter" },
278        tableName, new Integer[] { 1, 1, 1, 0, 0, 0 });
279
280      table.checkAndMutate(Bytes.toBytes(0),
281          new SingleColumnValueFilter(A, A, CompareOperator.EQUAL, A))
282        .thenPut(p);
283      verifyMethodResult(SimpleRegionObserver.class,
284        new String[] { "getPreCheckAndPut", "getPreCheckAndPutAfterRowLock", "getPostCheckAndPut",
285          "getPreCheckAndPutWithFilter", "getPreCheckAndPutWithFilterAfterRowLock",
286          "getPostCheckAndPutWithFilter" },
287        tableName, new Integer[] { 1, 1, 1, 1, 1, 1 });
288    } finally {
289      util.deleteTable(tableName);
290    }
291  }
292
293  @Test
294  public void testCheckAndDeleteHooks() throws IOException {
295    final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + "." + name.getMethodName());
296    Table table = util.createTable(tableName, new byte[][] { A, B, C });
297    try {
298      Put p = new Put(Bytes.toBytes(0));
299      p.addColumn(A, A, A);
300      table.put(p);
301      Delete d = new Delete(Bytes.toBytes(0));
302      table.delete(d);
303      verifyMethodResult(
304        SimpleRegionObserver.class, new String[] { "getPreCheckAndDelete",
305          "getPreCheckAndDeleteAfterRowLock", "getPostCheckAndDelete",
306          "getPreCheckAndDeleteWithFilter", "getPreCheckAndDeleteWithFilterAfterRowLock",
307          "getPostCheckAndDeleteWithFilter" },
308        tableName, new Integer[] { 0, 0, 0, 0, 0, 0 });
309
310      table.checkAndMutate(Bytes.toBytes(0), A).qualifier(A).ifEquals(A).thenDelete(d);
311      verifyMethodResult(
312        SimpleRegionObserver.class, new String[] { "getPreCheckAndDelete",
313          "getPreCheckAndDeleteAfterRowLock", "getPostCheckAndDelete",
314          "getPreCheckAndDeleteWithFilter", "getPreCheckAndDeleteWithFilterAfterRowLock",
315          "getPostCheckAndDeleteWithFilter" },
316        tableName, new Integer[] { 1, 1, 1, 0, 0, 0 });
317
318      table.checkAndMutate(Bytes.toBytes(0),
319          new SingleColumnValueFilter(A, A, CompareOperator.EQUAL, A))
320        .thenDelete(d);
321      verifyMethodResult(
322        SimpleRegionObserver.class, new String[] { "getPreCheckAndDelete",
323          "getPreCheckAndDeleteAfterRowLock", "getPostCheckAndDelete",
324          "getPreCheckAndDeleteWithFilter", "getPreCheckAndDeleteWithFilterAfterRowLock",
325          "getPostCheckAndDeleteWithFilter" },
326        tableName, new Integer[] { 1, 1, 1, 1, 1, 1 });
327    } finally {
328      util.deleteTable(tableName);
329      table.close();
330    }
331  }
332
333  @Test
334  public void testAppendHook() throws IOException {
335    final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + "." + name.getMethodName());
336    Table table = util.createTable(tableName, new byte[][] { A, B, C });
337    try {
338      Append app = new Append(Bytes.toBytes(0));
339      app.addColumn(A, A, A);
340
341      verifyMethodResult(SimpleRegionObserver.class,
342        new String[] { "hadPreAppend", "hadPostAppend", "hadPreAppendAfterRowLock" }, tableName,
343        new Boolean[] { false, false, false });
344
345      table.append(app);
346
347      verifyMethodResult(SimpleRegionObserver.class,
348        new String[] { "hadPreAppend", "hadPostAppend", "hadPreAppendAfterRowLock" }, tableName,
349        new Boolean[] { true, true, true });
350    } finally {
351      util.deleteTable(tableName);
352      table.close();
353    }
354  }
355
356  @Test
357  // HBase-3583
358  public void testHBase3583() throws IOException {
359    final TableName tableName = TableName.valueOf(name.getMethodName());
360    util.createTable(tableName, new byte[][] { A, B, C });
361    util.waitUntilAllRegionsAssigned(tableName);
362
363    verifyMethodResult(SimpleRegionObserver.class,
364      new String[] { "hadPreGet", "hadPostGet", "wasScannerNextCalled", "wasScannerCloseCalled" },
365      tableName, new Boolean[] { false, false, false, false });
366
367    Table table = util.getConnection().getTable(tableName);
368    Put put = new Put(ROW);
369    put.addColumn(A, A, A);
370    table.put(put);
371
372    Get get = new Get(ROW);
373    get.addColumn(A, A);
374    table.get(get);
375
376    // verify that scannerNext and scannerClose upcalls won't be invoked
377    // when we perform get().
378    verifyMethodResult(SimpleRegionObserver.class,
379      new String[] { "hadPreGet", "hadPostGet", "wasScannerNextCalled", "wasScannerCloseCalled" },
380      tableName, new Boolean[] { true, true, false, false });
381
382    Scan s = new Scan();
383    ResultScanner scanner = table.getScanner(s);
384    try {
385      for (Result rr = scanner.next(); rr != null; rr = scanner.next()) {
386      }
387    } finally {
388      scanner.close();
389    }
390
391    // now scanner hooks should be invoked.
392    verifyMethodResult(SimpleRegionObserver.class,
393      new String[] { "wasScannerNextCalled", "wasScannerCloseCalled" }, tableName,
394      new Boolean[] { true, true });
395    util.deleteTable(tableName);
396    table.close();
397  }
398
399  @Test
400  public void testHBASE14489() throws IOException {
401    final TableName tableName = TableName.valueOf(name.getMethodName());
402    Table table = util.createTable(tableName, new byte[][] { A });
403    Put put = new Put(ROW);
404    put.addColumn(A, A, A);
405    table.put(put);
406
407    Scan s = new Scan();
408    s.setFilter(new FilterAllFilter());
409    ResultScanner scanner = table.getScanner(s);
410    try {
411      for (Result rr = scanner.next(); rr != null; rr = scanner.next()) {
412      }
413    } finally {
414      scanner.close();
415    }
416    verifyMethodResult(SimpleRegionObserver.class, new String[] { "wasScannerFilterRowCalled" },
417      tableName, new Boolean[] { true });
418    util.deleteTable(tableName);
419    table.close();
420
421  }
422
423  @Test
424  // HBase-3758
425  public void testHBase3758() throws IOException {
426    final TableName tableName = TableName.valueOf(name.getMethodName());
427    util.createTable(tableName, new byte[][] { A, B, C });
428
429    verifyMethodResult(SimpleRegionObserver.class,
430      new String[] { "hadDeleted", "wasScannerOpenCalled" }, tableName,
431      new Boolean[] { false, false });
432
433    Table table = util.getConnection().getTable(tableName);
434    Put put = new Put(ROW);
435    put.addColumn(A, A, A);
436    table.put(put);
437
438    Delete delete = new Delete(ROW);
439    table.delete(delete);
440
441    verifyMethodResult(SimpleRegionObserver.class,
442      new String[] { "hadDeleted", "wasScannerOpenCalled" }, tableName,
443      new Boolean[] { true, false });
444
445    Scan s = new Scan();
446    ResultScanner scanner = table.getScanner(s);
447    try {
448      for (Result rr = scanner.next(); rr != null; rr = scanner.next()) {
449      }
450    } finally {
451      scanner.close();
452    }
453
454    // now scanner hooks should be invoked.
455    verifyMethodResult(SimpleRegionObserver.class, new String[] { "wasScannerOpenCalled" },
456      tableName, new Boolean[] { true });
457    util.deleteTable(tableName);
458    table.close();
459  }
460
461  /* Overrides compaction to only output rows with keys that are even numbers */
462  public static class EvenOnlyCompactor implements RegionCoprocessor, RegionObserver {
463    long lastCompaction;
464    long lastFlush;
465
466    @Override
467    public Optional<RegionObserver> getRegionObserver() {
468      return Optional.of(this);
469    }
470
471    @Override
472    public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store,
473        InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
474        CompactionRequest request) {
475      return new InternalScanner() {
476
477        @Override
478        public boolean next(List<Cell> results, ScannerContext scannerContext) throws IOException {
479          List<Cell> internalResults = new ArrayList<>();
480          boolean hasMore;
481          do {
482            hasMore = scanner.next(internalResults, scannerContext);
483            if (!internalResults.isEmpty()) {
484              long row = Bytes.toLong(CellUtil.cloneValue(internalResults.get(0)));
485              if (row % 2 == 0) {
486                // return this row
487                break;
488              }
489              // clear and continue
490              internalResults.clear();
491            }
492          } while (hasMore);
493
494          if (!internalResults.isEmpty()) {
495            results.addAll(internalResults);
496          }
497          return hasMore;
498        }
499
500        @Override
501        public void close() throws IOException {
502          scanner.close();
503        }
504      };
505    }
506
507    @Override
508    public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store,
509        StoreFile resultFile, CompactionLifeCycleTracker tracker, CompactionRequest request) {
510      lastCompaction = EnvironmentEdgeManager.currentTime();
511    }
512
513    @Override
514    public void postFlush(ObserverContext<RegionCoprocessorEnvironment> e,
515        FlushLifeCycleTracker tracker) {
516      lastFlush = EnvironmentEdgeManager.currentTime();
517    }
518  }
519
520  /**
521   * Tests overriding compaction handling via coprocessor hooks
522   * @throws Exception
523   */
524  @Test
525  public void testCompactionOverride() throws Exception {
526    final TableName compactTable = TableName.valueOf(name.getMethodName());
527    Admin admin = util.getAdmin();
528    if (admin.tableExists(compactTable)) {
529      admin.disableTable(compactTable);
530      admin.deleteTable(compactTable);
531    }
532
533    HTableDescriptor htd = new HTableDescriptor(compactTable);
534    htd.addFamily(new HColumnDescriptor(A));
535    htd.addCoprocessor(EvenOnlyCompactor.class.getName());
536    admin.createTable(htd);
537
538    Table table = util.getConnection().getTable(compactTable);
539    for (long i = 1; i <= 10; i++) {
540      byte[] iBytes = Bytes.toBytes(i);
541      Put put = new Put(iBytes);
542      put.setDurability(Durability.SKIP_WAL);
543      put.addColumn(A, A, iBytes);
544      table.put(put);
545    }
546
547    HRegion firstRegion = cluster.getRegions(compactTable).get(0);
548    Coprocessor cp = firstRegion.getCoprocessorHost().findCoprocessor(EvenOnlyCompactor.class);
549    assertNotNull("EvenOnlyCompactor coprocessor should be loaded", cp);
550    EvenOnlyCompactor compactor = (EvenOnlyCompactor) cp;
551
552    // force a compaction
553    long ts = System.currentTimeMillis();
554    admin.flush(compactTable);
555    // wait for flush
556    for (int i = 0; i < 10; i++) {
557      if (compactor.lastFlush >= ts) {
558        break;
559      }
560      Thread.sleep(1000);
561    }
562    assertTrue("Flush didn't complete", compactor.lastFlush >= ts);
563    LOG.debug("Flush complete");
564
565    ts = compactor.lastFlush;
566    admin.majorCompact(compactTable);
567    // wait for compaction
568    for (int i = 0; i < 30; i++) {
569      if (compactor.lastCompaction >= ts) {
570        break;
571      }
572      Thread.sleep(1000);
573    }
574    LOG.debug("Last compaction was at " + compactor.lastCompaction);
575    assertTrue("Compaction didn't complete", compactor.lastCompaction >= ts);
576
577    // only even rows should remain
578    ResultScanner scanner = table.getScanner(new Scan());
579    try {
580      for (long i = 2; i <= 10; i += 2) {
581        Result r = scanner.next();
582        assertNotNull(r);
583        assertFalse(r.isEmpty());
584        byte[] iBytes = Bytes.toBytes(i);
585        assertArrayEquals("Row should be " + i, r.getRow(), iBytes);
586        assertArrayEquals("Value should be " + i, r.getValue(A, A), iBytes);
587      }
588    } finally {
589      scanner.close();
590    }
591    table.close();
592  }
593
594  @Test
595  public void bulkLoadHFileTest() throws Exception {
596    final String testName = TestRegionObserverInterface.class.getName() + "." + name.getMethodName();
597    final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + "." + name.getMethodName());
598    Configuration conf = util.getConfiguration();
599    Table table = util.createTable(tableName, new byte[][] { A, B, C });
600    try (RegionLocator locator = util.getConnection().getRegionLocator(tableName)) {
601      verifyMethodResult(SimpleRegionObserver.class,
602        new String[] { "hadPreBulkLoadHFile", "hadPostBulkLoadHFile" }, tableName,
603        new Boolean[] { false, false });
604
605      FileSystem fs = util.getTestFileSystem();
606      final Path dir = util.getDataTestDirOnTestFS(testName).makeQualified(fs);
607      Path familyDir = new Path(dir, Bytes.toString(A));
608
609      createHFile(util.getConfiguration(), fs, new Path(familyDir, Bytes.toString(A)), A, A);
610
611      // Bulk load
612      new LoadIncrementalHFiles(conf).doBulkLoad(dir, util.getAdmin(), table, locator);
613
614      verifyMethodResult(SimpleRegionObserver.class,
615        new String[] { "hadPreBulkLoadHFile", "hadPostBulkLoadHFile" }, tableName,
616        new Boolean[] { true, true });
617    } finally {
618      util.deleteTable(tableName);
619      table.close();
620    }
621  }
622
623  @Test
624  public void testRecovery() throws Exception {
625    LOG.info(TestRegionObserverInterface.class.getName() + "." + name.getMethodName());
626    final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + "." + name.getMethodName());
627    Table table = util.createTable(tableName, new byte[][] { A, B, C });
628    try (RegionLocator locator = util.getConnection().getRegionLocator(tableName)) {
629
630      JVMClusterUtil.RegionServerThread rs1 = cluster.startRegionServer();
631      ServerName sn2 = rs1.getRegionServer().getServerName();
632      String regEN = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
633
634      util.getAdmin().move(Bytes.toBytes(regEN), sn2);
635      while (!sn2.equals(locator.getAllRegionLocations().get(0).getServerName())) {
636        Thread.sleep(100);
637      }
638
639      Put put = new Put(ROW);
640      put.addColumn(A, A, A);
641      put.addColumn(B, B, B);
642      put.addColumn(C, C, C);
643      table.put(put);
644
645      // put two times
646      table.put(put);
647
648      verifyMethodResult(SimpleRegionObserver.class,
649        new String[] { "hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", "hadPreBatchMutate",
650            "hadPostBatchMutate", "hadDelete" },
651        tableName, new Boolean[] { false, false, true, true, true, true, false });
652
653      verifyMethodResult(SimpleRegionObserver.class,
654        new String[] { "getCtPreReplayWALs", "getCtPostReplayWALs", "getCtPreWALRestore",
655            "getCtPostWALRestore", "getCtPrePut", "getCtPostPut" },
656        tableName, new Integer[] { 0, 0, 0, 0, 2, 2 });
657
658      cluster.killRegionServer(rs1.getRegionServer().getServerName());
659      Threads.sleep(1000); // Let the kill soak in.
660      util.waitUntilAllRegionsAssigned(tableName);
661      LOG.info("All regions assigned");
662
663      verifyMethodResult(SimpleRegionObserver.class,
664        new String[] { "getCtPreReplayWALs", "getCtPostReplayWALs", "getCtPreWALRestore",
665            "getCtPostWALRestore", "getCtPrePut", "getCtPostPut" },
666        tableName, new Integer[] { 1, 1, 2, 2, 0, 0 });
667    } finally {
668      util.deleteTable(tableName);
669      table.close();
670    }
671  }
672
673  @Test
674  public void testPreWALRestoreSkip() throws Exception {
675    LOG.info(TestRegionObserverInterface.class.getName() + "." + name.getMethodName());
676    TableName tableName = TableName.valueOf(SimpleRegionObserver.TABLE_SKIPPED);
677    Table table = util.createTable(tableName, new byte[][] { A, B, C });
678
679    try (RegionLocator locator = util.getConnection().getRegionLocator(tableName)) {
680      JVMClusterUtil.RegionServerThread rs1 = cluster.startRegionServer();
681      ServerName sn2 = rs1.getRegionServer().getServerName();
682      String regEN = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
683
684      util.getAdmin().move(Bytes.toBytes(regEN), sn2);
685      while (!sn2.equals(locator.getAllRegionLocations().get(0).getServerName())) {
686        Thread.sleep(100);
687      }
688
689      Put put = new Put(ROW);
690      put.addColumn(A, A, A);
691      put.addColumn(B, B, B);
692      put.addColumn(C, C, C);
693      table.put(put);
694
695      cluster.killRegionServer(rs1.getRegionServer().getServerName());
696      Threads.sleep(20000); // just to be sure that the kill has fully started.
697      util.waitUntilAllRegionsAssigned(tableName);
698    }
699
700    verifyMethodResult(SimpleRegionObserver.class,
701      new String[] { "getCtPreWALRestore", "getCtPostWALRestore", }, tableName,
702      new Integer[] { 0, 0 });
703
704    util.deleteTable(tableName);
705    table.close();
706  }
707
708  //called from testPreWALAppendIsWrittenToWAL
709  private void testPreWALAppendHook(Table table, TableName tableName) throws IOException {
710    int expectedCalls = 0;
711    String [] methodArray = new String[1];
712    methodArray[0] = "getCtPreWALAppend";
713    Object[] resultArray = new Object[1];
714
715    Put p = new Put(ROW);
716    p.addColumn(A, A, A);
717    table.put(p);
718    resultArray[0] = ++expectedCalls;
719    verifyMethodResult(SimpleRegionObserver.class, methodArray, tableName, resultArray);
720
721    Append a = new Append(ROW);
722    a.addColumn(B, B, B);
723    table.append(a);
724    resultArray[0] = ++expectedCalls;
725    verifyMethodResult(SimpleRegionObserver.class, methodArray, tableName, resultArray);
726
727    Increment i = new Increment(ROW);
728    i.addColumn(C, C, 1);
729    table.increment(i);
730    resultArray[0] = ++expectedCalls;
731    verifyMethodResult(SimpleRegionObserver.class, methodArray, tableName, resultArray);
732
733    Delete d = new Delete(ROW);
734    table.delete(d);
735    resultArray[0] = ++expectedCalls;
736    verifyMethodResult(SimpleRegionObserver.class, methodArray, tableName, resultArray);
737  }
738
739  @Test
740  public void testPreWALAppend() throws Exception {
741    SimpleRegionObserver sro = new SimpleRegionObserver();
742    ObserverContext ctx = Mockito.mock(ObserverContext.class);
743    WALKey key = new WALKeyImpl(Bytes.toBytes("region"), TEST_TABLE,
744        EnvironmentEdgeManager.currentTime());
745    WALEdit edit = new WALEdit();
746    sro.preWALAppend(ctx, key, edit);
747    Assert.assertEquals(1, key.getExtendedAttributes().size());
748    Assert.assertArrayEquals(SimpleRegionObserver.WAL_EXTENDED_ATTRIBUTE_BYTES,
749        key.getExtendedAttribute(Integer.toString(sro.getCtPreWALAppend())));
750  }
751
752  @Test
753  public void testPreWALAppendIsWrittenToWAL() throws Exception {
754    final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() +
755        "." + name.getMethodName());
756    Table table = util.createTable(tableName, new byte[][] { A, B, C });
757
758    PreWALAppendWALActionsListener listener = new PreWALAppendWALActionsListener();
759    List<HRegion> regions = util.getHBaseCluster().getRegions(tableName);
760    //should be only one region
761    HRegion region = regions.get(0);
762    region.getWAL().registerWALActionsListener(listener);
763    testPreWALAppendHook(table, tableName);
764    boolean[] expectedResults = {true, true, true, true};
765    Assert.assertArrayEquals(expectedResults, listener.getWalKeysCorrectArray());
766
767  }
768
769  @Test
770  public void testPreWALAppendNotCalledOnMetaEdit() throws Exception {
771    final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() +
772        "." + name.getMethodName());
773    TableDescriptorBuilder tdBuilder = TableDescriptorBuilder.newBuilder(tableName);
774    ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(FAMILY);
775    tdBuilder.setColumnFamily(cfBuilder.build());
776    tdBuilder.setCoprocessor(SimpleRegionObserver.class.getName());
777    TableDescriptor td = tdBuilder.build();
778    Table table = util.createTable(td, new byte[][] { A, B, C });
779
780    PreWALAppendWALActionsListener listener = new PreWALAppendWALActionsListener();
781    List<HRegion> regions = util.getHBaseCluster().getRegions(tableName);
782    //should be only one region
783    HRegion region = regions.get(0);
784
785    region.getWAL().registerWALActionsListener(listener);
786    //flushing should write to the WAL
787    region.flush(true);
788    //so should compaction
789    region.compact(false);
790    //and so should closing the region
791    region.close();
792
793    //but we still shouldn't have triggered preWALAppend because no user data was written
794    String[] methods = new String[] {"getCtPreWALAppend"};
795    Object[] expectedResult = new Integer[]{0};
796    verifyMethodResult(SimpleRegionObserver.class, methods, tableName, expectedResult);
797  }
798
799  // check each region whether the coprocessor upcalls are called or not.
800  private void verifyMethodResult(Class<?> coprocessor, String methodName[], TableName tableName,
801      Object value[]) throws IOException {
802    try {
803      for (JVMClusterUtil.RegionServerThread t : cluster.getRegionServerThreads()) {
804        if (!t.isAlive() || t.getRegionServer().isAborted() || t.getRegionServer().isStopping()) {
805          continue;
806        }
807        for (RegionInfo r : ProtobufUtil
808            .getOnlineRegions(t.getRegionServer().getRSRpcServices())) {
809          if (!r.getTable().equals(tableName)) {
810            continue;
811          }
812          RegionCoprocessorHost cph =
813              t.getRegionServer().getOnlineRegion(r.getRegionName()).getCoprocessorHost();
814
815          Coprocessor cp = cph.findCoprocessor(coprocessor.getName());
816          assertNotNull(cp);
817          for (int i = 0; i < methodName.length; ++i) {
818            Method m = coprocessor.getMethod(methodName[i]);
819            Object o = m.invoke(cp);
820            assertTrue("Result of " + coprocessor.getName() + "." + methodName[i]
821                    + " is expected to be " + value[i].toString() + ", while we get "
822                    + o.toString(), o.equals(value[i]));
823          }
824        }
825      }
826    } catch (Exception e) {
827      throw new IOException(e.toString());
828    }
829  }
830
831  private static void createHFile(Configuration conf, FileSystem fs, Path path, byte[] family,
832      byte[] qualifier) throws IOException {
833    HFileContext context = new HFileContextBuilder().build();
834    HFile.Writer writer = HFile.getWriterFactory(conf, new CacheConfig(conf)).withPath(fs, path)
835        .withFileContext(context).create();
836    long now = System.currentTimeMillis();
837    try {
838      for (int i = 1; i <= 9; i++) {
839        KeyValue kv =
840            new KeyValue(Bytes.toBytes(i + ""), family, qualifier, now, Bytes.toBytes(i + ""));
841        writer.append(kv);
842      }
843    } finally {
844      writer.close();
845    }
846  }
847
848  private static class PreWALAppendWALActionsListener implements WALActionsListener {
849    boolean[] walKeysCorrect = {false, false, false, false};
850
851    @Override
852    public void postAppend(long entryLen, long elapsedTimeMillis,
853                           WALKey logKey, WALEdit logEdit) throws IOException {
854      for (int k = 0; k < 4; k++) {
855        if (!walKeysCorrect[k]) {
856          walKeysCorrect[k] = Arrays.equals(SimpleRegionObserver.WAL_EXTENDED_ATTRIBUTE_BYTES,
857              logKey.getExtendedAttribute(Integer.toString(k + 1)));
858        }
859      }
860    }
861
862    boolean[] getWalKeysCorrectArray() {
863      return walKeysCorrect;
864    }
865  }
866}