001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertTrue;
024
025import java.io.IOException;
026import java.lang.reflect.Method;
027import java.util.ArrayList;
028import java.util.List;
029import java.util.Optional;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.CellUtil;
035import org.apache.hadoop.hbase.Coprocessor;
036import org.apache.hadoop.hbase.HBaseClassTestRule;
037import org.apache.hadoop.hbase.HBaseTestingUtility;
038import org.apache.hadoop.hbase.HColumnDescriptor;
039import org.apache.hadoop.hbase.HTableDescriptor;
040import org.apache.hadoop.hbase.KeyValue;
041import org.apache.hadoop.hbase.MiniHBaseCluster;
042import org.apache.hadoop.hbase.ServerName;
043import org.apache.hadoop.hbase.TableName;
044import org.apache.hadoop.hbase.client.Admin;
045import org.apache.hadoop.hbase.client.Append;
046import org.apache.hadoop.hbase.client.Delete;
047import org.apache.hadoop.hbase.client.Durability;
048import org.apache.hadoop.hbase.client.Get;
049import org.apache.hadoop.hbase.client.Increment;
050import org.apache.hadoop.hbase.client.Put;
051import org.apache.hadoop.hbase.client.RegionInfo;
052import org.apache.hadoop.hbase.client.RegionLocator;
053import org.apache.hadoop.hbase.client.Result;
054import org.apache.hadoop.hbase.client.ResultScanner;
055import org.apache.hadoop.hbase.client.RowMutations;
056import org.apache.hadoop.hbase.client.Scan;
057import org.apache.hadoop.hbase.client.Table;
058import org.apache.hadoop.hbase.filter.FilterAllFilter;
059import org.apache.hadoop.hbase.io.hfile.CacheConfig;
060import org.apache.hadoop.hbase.io.hfile.HFile;
061import org.apache.hadoop.hbase.io.hfile.HFileContext;
062import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
063import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
064import org.apache.hadoop.hbase.regionserver.HRegion;
065import org.apache.hadoop.hbase.regionserver.InternalScanner;
066import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
067import org.apache.hadoop.hbase.regionserver.ScanType;
068import org.apache.hadoop.hbase.regionserver.ScannerContext;
069import org.apache.hadoop.hbase.regionserver.Store;
070import org.apache.hadoop.hbase.regionserver.StoreFile;
071import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
072import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
073import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
074import org.apache.hadoop.hbase.testclassification.MediumTests;
075import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
076import org.apache.hadoop.hbase.util.Bytes;
077import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
078import org.apache.hadoop.hbase.util.JVMClusterUtil;
079import org.apache.hadoop.hbase.util.Threads;
080import org.junit.AfterClass;
081import org.junit.BeforeClass;
082import org.junit.ClassRule;
083import org.junit.Rule;
084import org.junit.Test;
085import org.junit.experimental.categories.Category;
086import org.junit.rules.TestName;
087import org.slf4j.Logger;
088import org.slf4j.LoggerFactory;
089
090import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
091
092@Category({ CoprocessorTests.class, MediumTests.class })
093public class TestRegionObserverInterface {
094
095  @ClassRule
096  public static final HBaseClassTestRule CLASS_RULE =
097      HBaseClassTestRule.forClass(TestRegionObserverInterface.class);
098
099  private static final Logger LOG = LoggerFactory.getLogger(TestRegionObserverInterface.class);
100
101  public static final TableName TEST_TABLE = TableName.valueOf("TestTable");
102  public final static byte[] A = Bytes.toBytes("a");
103  public final static byte[] B = Bytes.toBytes("b");
104  public final static byte[] C = Bytes.toBytes("c");
105  public final static byte[] ROW = Bytes.toBytes("testrow");
106
107  private static HBaseTestingUtility util = new HBaseTestingUtility();
108  private static MiniHBaseCluster cluster = null;
109
110  @Rule
111  public TestName name = new TestName();
112
113  @BeforeClass
114  public static void setupBeforeClass() throws Exception {
115    // set configure to indicate which cp should be loaded
116    Configuration conf = util.getConfiguration();
117    conf.setBoolean("hbase.master.distributed.log.replay", true);
118    conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
119      SimpleRegionObserver.class.getName());
120
121    util.startMiniCluster();
122    cluster = util.getMiniHBaseCluster();
123  }
124
125  @AfterClass
126  public static void tearDownAfterClass() throws Exception {
127    util.shutdownMiniCluster();
128  }
129
130  @Test
131  public void testRegionObserver() throws IOException {
132    final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + "." + name.getMethodName());
133    // recreate table every time in order to reset the status of the
134    // coprocessor.
135    Table table = util.createTable(tableName, new byte[][] { A, B, C });
136    try {
137      verifyMethodResult(SimpleRegionObserver.class,
138        new String[] { "hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", "hadDelete",
139            "hadPostStartRegionOperation", "hadPostCloseRegionOperation",
140            "hadPostBatchMutateIndispensably" },
141        tableName, new Boolean[] { false, false, false, false, false, false, false, false });
142
143      Put put = new Put(ROW);
144      put.addColumn(A, A, A);
145      put.addColumn(B, B, B);
146      put.addColumn(C, C, C);
147      table.put(put);
148
149      verifyMethodResult(SimpleRegionObserver.class,
150        new String[] { "hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", "hadPreBatchMutate",
151            "hadPostBatchMutate", "hadDelete", "hadPostStartRegionOperation",
152            "hadPostCloseRegionOperation", "hadPostBatchMutateIndispensably" },
153        TEST_TABLE,
154        new Boolean[] { false, false, true, true, true, true, false, true, true, true });
155
156      verifyMethodResult(SimpleRegionObserver.class,
157        new String[] { "getCtPreOpen", "getCtPostOpen", "getCtPreClose", "getCtPostClose" },
158        tableName, new Integer[] { 1, 1, 0, 0 });
159
160      Get get = new Get(ROW);
161      get.addColumn(A, A);
162      get.addColumn(B, B);
163      get.addColumn(C, C);
164      table.get(get);
165
166      verifyMethodResult(SimpleRegionObserver.class,
167        new String[] { "hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", "hadDelete",
168            "hadPrePreparedDeleteTS" },
169        tableName, new Boolean[] { true, true, true, true, false, false });
170
171      Delete delete = new Delete(ROW);
172      delete.addColumn(A, A);
173      delete.addColumn(B, B);
174      delete.addColumn(C, C);
175      table.delete(delete);
176
177      verifyMethodResult(SimpleRegionObserver.class,
178        new String[] { "hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", "hadPreBatchMutate",
179            "hadPostBatchMutate", "hadDelete", "hadPrePreparedDeleteTS" },
180        tableName, new Boolean[] { true, true, true, true, true, true, true, true });
181    } finally {
182      util.deleteTable(tableName);
183      table.close();
184    }
185    verifyMethodResult(SimpleRegionObserver.class,
186      new String[] { "getCtPreOpen", "getCtPostOpen", "getCtPreClose", "getCtPostClose" },
187      tableName, new Integer[] { 1, 1, 1, 1 });
188  }
189
190  @Test
191  public void testRowMutation() throws IOException {
192    final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + "." + name.getMethodName());
193    Table table = util.createTable(tableName, new byte[][] { A, B, C });
194    try {
195      verifyMethodResult(SimpleRegionObserver.class,
196        new String[] { "hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", "hadDeleted" },
197        tableName, new Boolean[] { false, false, false, false, false });
198      Put put = new Put(ROW);
199      put.addColumn(A, A, A);
200      put.addColumn(B, B, B);
201      put.addColumn(C, C, C);
202
203      Delete delete = new Delete(ROW);
204      delete.addColumn(A, A);
205      delete.addColumn(B, B);
206      delete.addColumn(C, C);
207
208      RowMutations arm = new RowMutations(ROW);
209      arm.add(put);
210      arm.add(delete);
211      table.mutateRow(arm);
212
213      verifyMethodResult(SimpleRegionObserver.class,
214        new String[] { "hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", "hadDeleted" },
215        tableName, new Boolean[] { false, false, true, true, true });
216    } finally {
217      util.deleteTable(tableName);
218      table.close();
219    }
220  }
221
222  @Test
223  public void testIncrementHook() throws IOException {
224    final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + "." + name.getMethodName());
225    Table table = util.createTable(tableName, new byte[][] { A, B, C });
226    try {
227      Increment inc = new Increment(Bytes.toBytes(0));
228      inc.addColumn(A, A, 1);
229
230      verifyMethodResult(SimpleRegionObserver.class,
231        new String[] { "hadPreIncrement", "hadPostIncrement", "hadPreIncrementAfterRowLock" },
232        tableName, new Boolean[] { false, false, false });
233
234      table.increment(inc);
235
236      verifyMethodResult(SimpleRegionObserver.class,
237        new String[] { "hadPreIncrement", "hadPostIncrement", "hadPreIncrementAfterRowLock" },
238        tableName, new Boolean[] { true, true, true });
239    } finally {
240      util.deleteTable(tableName);
241      table.close();
242    }
243  }
244
245  @Test
246  public void testCheckAndPutHooks() throws IOException {
247    final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + "." + name.getMethodName());
248    try (Table table = util.createTable(tableName, new byte[][] { A, B, C })) {
249      Put p = new Put(Bytes.toBytes(0));
250      p.addColumn(A, A, A);
251      table.put(p);
252      p = new Put(Bytes.toBytes(0));
253      p.addColumn(A, A, A);
254      verifyMethodResult(SimpleRegionObserver.class,
255        new String[] { "hadPreCheckAndPut", "hadPreCheckAndPutAfterRowLock", "hadPostCheckAndPut" },
256        tableName, new Boolean[] { false, false, false });
257      table.checkAndMutate(Bytes.toBytes(0), A).qualifier(A).ifEquals(A).thenPut(p);
258      verifyMethodResult(SimpleRegionObserver.class,
259        new String[] { "hadPreCheckAndPut", "hadPreCheckAndPutAfterRowLock", "hadPostCheckAndPut" },
260        tableName, new Boolean[] { true, true, true });
261    } finally {
262      util.deleteTable(tableName);
263    }
264  }
265
266  @Test
267  public void testCheckAndDeleteHooks() throws IOException {
268    final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + "." + name.getMethodName());
269    Table table = util.createTable(tableName, new byte[][] { A, B, C });
270    try {
271      Put p = new Put(Bytes.toBytes(0));
272      p.addColumn(A, A, A);
273      table.put(p);
274      Delete d = new Delete(Bytes.toBytes(0));
275      table.delete(d);
276      verifyMethodResult(
277        SimpleRegionObserver.class, new String[] { "hadPreCheckAndDelete",
278            "hadPreCheckAndDeleteAfterRowLock", "hadPostCheckAndDelete" },
279        tableName, new Boolean[] { false, false, false });
280      table.checkAndMutate(Bytes.toBytes(0), A).qualifier(A).ifEquals(A).thenDelete(d);
281      verifyMethodResult(
282        SimpleRegionObserver.class, new String[] { "hadPreCheckAndDelete",
283            "hadPreCheckAndDeleteAfterRowLock", "hadPostCheckAndDelete" },
284        tableName, new Boolean[] { true, true, true });
285    } finally {
286      util.deleteTable(tableName);
287      table.close();
288    }
289  }
290
291  @Test
292  public void testAppendHook() throws IOException {
293    final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + "." + name.getMethodName());
294    Table table = util.createTable(tableName, new byte[][] { A, B, C });
295    try {
296      Append app = new Append(Bytes.toBytes(0));
297      app.addColumn(A, A, A);
298
299      verifyMethodResult(SimpleRegionObserver.class,
300        new String[] { "hadPreAppend", "hadPostAppend", "hadPreAppendAfterRowLock" }, tableName,
301        new Boolean[] { false, false, false });
302
303      table.append(app);
304
305      verifyMethodResult(SimpleRegionObserver.class,
306        new String[] { "hadPreAppend", "hadPostAppend", "hadPreAppendAfterRowLock" }, tableName,
307        new Boolean[] { true, true, true });
308    } finally {
309      util.deleteTable(tableName);
310      table.close();
311    }
312  }
313
314  @Test
315  // HBase-3583
316  public void testHBase3583() throws IOException {
317    final TableName tableName = TableName.valueOf(name.getMethodName());
318    util.createTable(tableName, new byte[][] { A, B, C });
319    util.waitUntilAllRegionsAssigned(tableName);
320
321    verifyMethodResult(SimpleRegionObserver.class,
322      new String[] { "hadPreGet", "hadPostGet", "wasScannerNextCalled", "wasScannerCloseCalled" },
323      tableName, new Boolean[] { false, false, false, false });
324
325    Table table = util.getConnection().getTable(tableName);
326    Put put = new Put(ROW);
327    put.addColumn(A, A, A);
328    table.put(put);
329
330    Get get = new Get(ROW);
331    get.addColumn(A, A);
332    table.get(get);
333
334    // verify that scannerNext and scannerClose upcalls won't be invoked
335    // when we perform get().
336    verifyMethodResult(SimpleRegionObserver.class,
337      new String[] { "hadPreGet", "hadPostGet", "wasScannerNextCalled", "wasScannerCloseCalled" },
338      tableName, new Boolean[] { true, true, false, false });
339
340    Scan s = new Scan();
341    ResultScanner scanner = table.getScanner(s);
342    try {
343      for (Result rr = scanner.next(); rr != null; rr = scanner.next()) {
344      }
345    } finally {
346      scanner.close();
347    }
348
349    // now scanner hooks should be invoked.
350    verifyMethodResult(SimpleRegionObserver.class,
351      new String[] { "wasScannerNextCalled", "wasScannerCloseCalled" }, tableName,
352      new Boolean[] { true, true });
353    util.deleteTable(tableName);
354    table.close();
355  }
356
357  @Test
358  public void testHBASE14489() throws IOException {
359    final TableName tableName = TableName.valueOf(name.getMethodName());
360    Table table = util.createTable(tableName, new byte[][] { A });
361    Put put = new Put(ROW);
362    put.addColumn(A, A, A);
363    table.put(put);
364
365    Scan s = new Scan();
366    s.setFilter(new FilterAllFilter());
367    ResultScanner scanner = table.getScanner(s);
368    try {
369      for (Result rr = scanner.next(); rr != null; rr = scanner.next()) {
370      }
371    } finally {
372      scanner.close();
373    }
374    verifyMethodResult(SimpleRegionObserver.class, new String[] { "wasScannerFilterRowCalled" },
375      tableName, new Boolean[] { true });
376    util.deleteTable(tableName);
377    table.close();
378
379  }
380
381  @Test
382  // HBase-3758
383  public void testHBase3758() throws IOException {
384    final TableName tableName = TableName.valueOf(name.getMethodName());
385    util.createTable(tableName, new byte[][] { A, B, C });
386
387    verifyMethodResult(SimpleRegionObserver.class,
388      new String[] { "hadDeleted", "wasScannerOpenCalled" }, tableName,
389      new Boolean[] { false, false });
390
391    Table table = util.getConnection().getTable(tableName);
392    Put put = new Put(ROW);
393    put.addColumn(A, A, A);
394    table.put(put);
395
396    Delete delete = new Delete(ROW);
397    table.delete(delete);
398
399    verifyMethodResult(SimpleRegionObserver.class,
400      new String[] { "hadDeleted", "wasScannerOpenCalled" }, tableName,
401      new Boolean[] { true, false });
402
403    Scan s = new Scan();
404    ResultScanner scanner = table.getScanner(s);
405    try {
406      for (Result rr = scanner.next(); rr != null; rr = scanner.next()) {
407      }
408    } finally {
409      scanner.close();
410    }
411
412    // now scanner hooks should be invoked.
413    verifyMethodResult(SimpleRegionObserver.class, new String[] { "wasScannerOpenCalled" },
414      tableName, new Boolean[] { true });
415    util.deleteTable(tableName);
416    table.close();
417  }
418
419  /* Overrides compaction to only output rows with keys that are even numbers */
420  public static class EvenOnlyCompactor implements RegionCoprocessor, RegionObserver {
421    long lastCompaction;
422    long lastFlush;
423
424    @Override
425    public Optional<RegionObserver> getRegionObserver() {
426      return Optional.of(this);
427    }
428
429    @Override
430    public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store,
431        InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
432        CompactionRequest request) {
433      return new InternalScanner() {
434
435        @Override
436        public boolean next(List<Cell> results, ScannerContext scannerContext) throws IOException {
437          List<Cell> internalResults = new ArrayList<>();
438          boolean hasMore;
439          do {
440            hasMore = scanner.next(internalResults, scannerContext);
441            if (!internalResults.isEmpty()) {
442              long row = Bytes.toLong(CellUtil.cloneValue(internalResults.get(0)));
443              if (row % 2 == 0) {
444                // return this row
445                break;
446              }
447              // clear and continue
448              internalResults.clear();
449            }
450          } while (hasMore);
451
452          if (!internalResults.isEmpty()) {
453            results.addAll(internalResults);
454          }
455          return hasMore;
456        }
457
458        @Override
459        public void close() throws IOException {
460          scanner.close();
461        }
462      };
463    }
464
465    @Override
466    public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store,
467        StoreFile resultFile, CompactionLifeCycleTracker tracker, CompactionRequest request) {
468      lastCompaction = EnvironmentEdgeManager.currentTime();
469    }
470
471    @Override
472    public void postFlush(ObserverContext<RegionCoprocessorEnvironment> e,
473        FlushLifeCycleTracker tracker) {
474      lastFlush = EnvironmentEdgeManager.currentTime();
475    }
476  }
477
478  /**
479   * Tests overriding compaction handling via coprocessor hooks
480   * @throws Exception
481   */
482  @Test
483  public void testCompactionOverride() throws Exception {
484    final TableName compactTable = TableName.valueOf(name.getMethodName());
485    Admin admin = util.getAdmin();
486    if (admin.tableExists(compactTable)) {
487      admin.disableTable(compactTable);
488      admin.deleteTable(compactTable);
489    }
490
491    HTableDescriptor htd = new HTableDescriptor(compactTable);
492    htd.addFamily(new HColumnDescriptor(A));
493    htd.addCoprocessor(EvenOnlyCompactor.class.getName());
494    admin.createTable(htd);
495
496    Table table = util.getConnection().getTable(compactTable);
497    for (long i = 1; i <= 10; i++) {
498      byte[] iBytes = Bytes.toBytes(i);
499      Put put = new Put(iBytes);
500      put.setDurability(Durability.SKIP_WAL);
501      put.addColumn(A, A, iBytes);
502      table.put(put);
503    }
504
505    HRegion firstRegion = cluster.getRegions(compactTable).get(0);
506    Coprocessor cp = firstRegion.getCoprocessorHost().findCoprocessor(EvenOnlyCompactor.class);
507    assertNotNull("EvenOnlyCompactor coprocessor should be loaded", cp);
508    EvenOnlyCompactor compactor = (EvenOnlyCompactor) cp;
509
510    // force a compaction
511    long ts = System.currentTimeMillis();
512    admin.flush(compactTable);
513    // wait for flush
514    for (int i = 0; i < 10; i++) {
515      if (compactor.lastFlush >= ts) {
516        break;
517      }
518      Thread.sleep(1000);
519    }
520    assertTrue("Flush didn't complete", compactor.lastFlush >= ts);
521    LOG.debug("Flush complete");
522
523    ts = compactor.lastFlush;
524    admin.majorCompact(compactTable);
525    // wait for compaction
526    for (int i = 0; i < 30; i++) {
527      if (compactor.lastCompaction >= ts) {
528        break;
529      }
530      Thread.sleep(1000);
531    }
532    LOG.debug("Last compaction was at " + compactor.lastCompaction);
533    assertTrue("Compaction didn't complete", compactor.lastCompaction >= ts);
534
535    // only even rows should remain
536    ResultScanner scanner = table.getScanner(new Scan());
537    try {
538      for (long i = 2; i <= 10; i += 2) {
539        Result r = scanner.next();
540        assertNotNull(r);
541        assertFalse(r.isEmpty());
542        byte[] iBytes = Bytes.toBytes(i);
543        assertArrayEquals("Row should be " + i, r.getRow(), iBytes);
544        assertArrayEquals("Value should be " + i, r.getValue(A, A), iBytes);
545      }
546    } finally {
547      scanner.close();
548    }
549    table.close();
550  }
551
552  @Test
553  public void bulkLoadHFileTest() throws Exception {
554    final String testName = TestRegionObserverInterface.class.getName() + "." + name.getMethodName();
555    final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + "." + name.getMethodName());
556    Configuration conf = util.getConfiguration();
557    Table table = util.createTable(tableName, new byte[][] { A, B, C });
558    try (RegionLocator locator = util.getConnection().getRegionLocator(tableName)) {
559      verifyMethodResult(SimpleRegionObserver.class,
560        new String[] { "hadPreBulkLoadHFile", "hadPostBulkLoadHFile" }, tableName,
561        new Boolean[] { false, false });
562
563      FileSystem fs = util.getTestFileSystem();
564      final Path dir = util.getDataTestDirOnTestFS(testName).makeQualified(fs);
565      Path familyDir = new Path(dir, Bytes.toString(A));
566
567      createHFile(util.getConfiguration(), fs, new Path(familyDir, Bytes.toString(A)), A, A);
568
569      // Bulk load
570      new LoadIncrementalHFiles(conf).doBulkLoad(dir, util.getAdmin(), table, locator);
571
572      verifyMethodResult(SimpleRegionObserver.class,
573        new String[] { "hadPreBulkLoadHFile", "hadPostBulkLoadHFile" }, tableName,
574        new Boolean[] { true, true });
575    } finally {
576      util.deleteTable(tableName);
577      table.close();
578    }
579  }
580
581  @Test
582  public void testRecovery() throws Exception {
583    LOG.info(TestRegionObserverInterface.class.getName() + "." + name.getMethodName());
584    final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + "." + name.getMethodName());
585    Table table = util.createTable(tableName, new byte[][] { A, B, C });
586    try (RegionLocator locator = util.getConnection().getRegionLocator(tableName)) {
587
588      JVMClusterUtil.RegionServerThread rs1 = cluster.startRegionServer();
589      ServerName sn2 = rs1.getRegionServer().getServerName();
590      String regEN = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
591
592      util.getAdmin().move(Bytes.toBytes(regEN), sn2);
593      while (!sn2.equals(locator.getAllRegionLocations().get(0).getServerName())) {
594        Thread.sleep(100);
595      }
596
597      Put put = new Put(ROW);
598      put.addColumn(A, A, A);
599      put.addColumn(B, B, B);
600      put.addColumn(C, C, C);
601      table.put(put);
602
603      // put two times
604      table.put(put);
605
606      verifyMethodResult(SimpleRegionObserver.class,
607        new String[] { "hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", "hadPreBatchMutate",
608            "hadPostBatchMutate", "hadDelete" },
609        tableName, new Boolean[] { false, false, true, true, true, true, false });
610
611      verifyMethodResult(SimpleRegionObserver.class,
612        new String[] { "getCtPreReplayWALs", "getCtPostReplayWALs", "getCtPreWALRestore",
613            "getCtPostWALRestore", "getCtPrePut", "getCtPostPut" },
614        tableName, new Integer[] { 0, 0, 0, 0, 2, 2 });
615
616      cluster.killRegionServer(rs1.getRegionServer().getServerName());
617      Threads.sleep(1000); // Let the kill soak in.
618      util.waitUntilAllRegionsAssigned(tableName);
619      LOG.info("All regions assigned");
620
621      verifyMethodResult(SimpleRegionObserver.class,
622        new String[] { "getCtPreReplayWALs", "getCtPostReplayWALs", "getCtPreWALRestore",
623            "getCtPostWALRestore", "getCtPrePut", "getCtPostPut" },
624        tableName, new Integer[] { 1, 1, 2, 2, 0, 0 });
625    } finally {
626      util.deleteTable(tableName);
627      table.close();
628    }
629  }
630
631  @Test
632  public void testPreWALRestoreSkip() throws Exception {
633    LOG.info(TestRegionObserverInterface.class.getName() + "." + name.getMethodName());
634    TableName tableName = TableName.valueOf(SimpleRegionObserver.TABLE_SKIPPED);
635    Table table = util.createTable(tableName, new byte[][] { A, B, C });
636
637    try (RegionLocator locator = util.getConnection().getRegionLocator(tableName)) {
638      JVMClusterUtil.RegionServerThread rs1 = cluster.startRegionServer();
639      ServerName sn2 = rs1.getRegionServer().getServerName();
640      String regEN = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
641
642      util.getAdmin().move(Bytes.toBytes(regEN), sn2);
643      while (!sn2.equals(locator.getAllRegionLocations().get(0).getServerName())) {
644        Thread.sleep(100);
645      }
646
647      Put put = new Put(ROW);
648      put.addColumn(A, A, A);
649      put.addColumn(B, B, B);
650      put.addColumn(C, C, C);
651      table.put(put);
652
653      cluster.killRegionServer(rs1.getRegionServer().getServerName());
654      Threads.sleep(20000); // just to be sure that the kill has fully started.
655      util.waitUntilAllRegionsAssigned(tableName);
656    }
657
658    verifyMethodResult(SimpleRegionObserver.class,
659      new String[] { "getCtPreWALRestore", "getCtPostWALRestore", }, tableName,
660      new Integer[] { 0, 0 });
661
662    util.deleteTable(tableName);
663    table.close();
664  }
665
666  // check each region whether the coprocessor upcalls are called or not.
667  private void verifyMethodResult(Class<?> coprocessor, String methodName[], TableName tableName,
668      Object value[]) throws IOException {
669    try {
670      for (JVMClusterUtil.RegionServerThread t : cluster.getRegionServerThreads()) {
671        if (!t.isAlive() || t.getRegionServer().isAborted() || t.getRegionServer().isStopping()) {
672          continue;
673        }
674        for (RegionInfo r : ProtobufUtil
675            .getOnlineRegions(t.getRegionServer().getRSRpcServices())) {
676          if (!r.getTable().equals(tableName)) {
677            continue;
678          }
679          RegionCoprocessorHost cph =
680              t.getRegionServer().getOnlineRegion(r.getRegionName()).getCoprocessorHost();
681
682          Coprocessor cp = cph.findCoprocessor(coprocessor.getName());
683          assertNotNull(cp);
684          for (int i = 0; i < methodName.length; ++i) {
685            Method m = coprocessor.getMethod(methodName[i]);
686            Object o = m.invoke(cp);
687            assertTrue("Result of " + coprocessor.getName() + "." + methodName[i]
688                    + " is expected to be " + value[i].toString() + ", while we get "
689                    + o.toString(), o.equals(value[i]));
690          }
691        }
692      }
693    } catch (Exception e) {
694      throw new IOException(e.toString());
695    }
696  }
697
698  private static void createHFile(Configuration conf, FileSystem fs, Path path, byte[] family,
699      byte[] qualifier) throws IOException {
700    HFileContext context = new HFileContextBuilder().build();
701    HFile.Writer writer = HFile.getWriterFactory(conf, new CacheConfig(conf)).withPath(fs, path)
702        .withFileContext(context).create();
703    long now = System.currentTimeMillis();
704    try {
705      for (int i = 1; i <= 9; i++) {
706        KeyValue kv =
707            new KeyValue(Bytes.toBytes(i + ""), family, qualifier, now, Bytes.toBytes(i + ""));
708        writer.append(kv);
709      }
710    } finally {
711      writer.close();
712    }
713  }
714}