001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor; 019 020import static org.junit.Assert.assertArrayEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.assertTrue; 024 025import java.io.IOException; 026import java.lang.reflect.Method; 027import java.util.ArrayList; 028import java.util.List; 029import java.util.Optional; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.Cell; 034import org.apache.hadoop.hbase.CellUtil; 035import org.apache.hadoop.hbase.Coprocessor; 036import org.apache.hadoop.hbase.HBaseClassTestRule; 037import org.apache.hadoop.hbase.HBaseTestingUtility; 038import org.apache.hadoop.hbase.HColumnDescriptor; 039import org.apache.hadoop.hbase.HTableDescriptor; 040import org.apache.hadoop.hbase.KeyValue; 041import org.apache.hadoop.hbase.MiniHBaseCluster; 042import org.apache.hadoop.hbase.ServerName; 043import org.apache.hadoop.hbase.TableName; 044import org.apache.hadoop.hbase.client.Admin; 045import org.apache.hadoop.hbase.client.Append; 046import org.apache.hadoop.hbase.client.Delete; 047import org.apache.hadoop.hbase.client.Durability; 048import org.apache.hadoop.hbase.client.Get; 049import org.apache.hadoop.hbase.client.Increment; 050import org.apache.hadoop.hbase.client.Put; 051import org.apache.hadoop.hbase.client.RegionInfo; 052import org.apache.hadoop.hbase.client.RegionLocator; 053import org.apache.hadoop.hbase.client.Result; 054import org.apache.hadoop.hbase.client.ResultScanner; 055import org.apache.hadoop.hbase.client.RowMutations; 056import org.apache.hadoop.hbase.client.Scan; 057import org.apache.hadoop.hbase.client.Table; 058import org.apache.hadoop.hbase.filter.FilterAllFilter; 059import org.apache.hadoop.hbase.io.hfile.CacheConfig; 060import org.apache.hadoop.hbase.io.hfile.HFile; 061import org.apache.hadoop.hbase.io.hfile.HFileContext; 062import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 063import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; 064import org.apache.hadoop.hbase.regionserver.HRegion; 065import org.apache.hadoop.hbase.regionserver.InternalScanner; 066import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost; 067import org.apache.hadoop.hbase.regionserver.ScanType; 068import org.apache.hadoop.hbase.regionserver.ScannerContext; 069import org.apache.hadoop.hbase.regionserver.Store; 070import org.apache.hadoop.hbase.regionserver.StoreFile; 071import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 072import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; 073import org.apache.hadoop.hbase.testclassification.CoprocessorTests; 074import org.apache.hadoop.hbase.testclassification.MediumTests; 075import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; 076import org.apache.hadoop.hbase.util.Bytes; 077import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 078import org.apache.hadoop.hbase.util.JVMClusterUtil; 079import org.apache.hadoop.hbase.util.Threads; 080import org.junit.AfterClass; 081import org.junit.BeforeClass; 082import org.junit.ClassRule; 083import org.junit.Rule; 084import org.junit.Test; 085import org.junit.experimental.categories.Category; 086import org.junit.rules.TestName; 087import org.slf4j.Logger; 088import org.slf4j.LoggerFactory; 089 090import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 091 092@Category({ CoprocessorTests.class, MediumTests.class }) 093public class TestRegionObserverInterface { 094 095 @ClassRule 096 public static final HBaseClassTestRule CLASS_RULE = 097 HBaseClassTestRule.forClass(TestRegionObserverInterface.class); 098 099 private static final Logger LOG = LoggerFactory.getLogger(TestRegionObserverInterface.class); 100 101 public static final TableName TEST_TABLE = TableName.valueOf("TestTable"); 102 public final static byte[] A = Bytes.toBytes("a"); 103 public final static byte[] B = Bytes.toBytes("b"); 104 public final static byte[] C = Bytes.toBytes("c"); 105 public final static byte[] ROW = Bytes.toBytes("testrow"); 106 107 private static HBaseTestingUtility util = new HBaseTestingUtility(); 108 private static MiniHBaseCluster cluster = null; 109 110 @Rule 111 public TestName name = new TestName(); 112 113 @BeforeClass 114 public static void setupBeforeClass() throws Exception { 115 // set configure to indicate which cp should be loaded 116 Configuration conf = util.getConfiguration(); 117 conf.setBoolean("hbase.master.distributed.log.replay", true); 118 conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, 119 SimpleRegionObserver.class.getName()); 120 121 util.startMiniCluster(); 122 cluster = util.getMiniHBaseCluster(); 123 } 124 125 @AfterClass 126 public static void tearDownAfterClass() throws Exception { 127 util.shutdownMiniCluster(); 128 } 129 130 @Test 131 public void testRegionObserver() throws IOException { 132 final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + "." + name.getMethodName()); 133 // recreate table every time in order to reset the status of the 134 // coprocessor. 135 Table table = util.createTable(tableName, new byte[][] { A, B, C }); 136 try { 137 verifyMethodResult(SimpleRegionObserver.class, 138 new String[] { "hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", "hadDelete", 139 "hadPostStartRegionOperation", "hadPostCloseRegionOperation", 140 "hadPostBatchMutateIndispensably" }, 141 tableName, new Boolean[] { false, false, false, false, false, false, false, false }); 142 143 Put put = new Put(ROW); 144 put.addColumn(A, A, A); 145 put.addColumn(B, B, B); 146 put.addColumn(C, C, C); 147 table.put(put); 148 149 verifyMethodResult(SimpleRegionObserver.class, 150 new String[] { "hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", "hadPreBatchMutate", 151 "hadPostBatchMutate", "hadDelete", "hadPostStartRegionOperation", 152 "hadPostCloseRegionOperation", "hadPostBatchMutateIndispensably" }, 153 TEST_TABLE, 154 new Boolean[] { false, false, true, true, true, true, false, true, true, true }); 155 156 verifyMethodResult(SimpleRegionObserver.class, 157 new String[] { "getCtPreOpen", "getCtPostOpen", "getCtPreClose", "getCtPostClose" }, 158 tableName, new Integer[] { 1, 1, 0, 0 }); 159 160 Get get = new Get(ROW); 161 get.addColumn(A, A); 162 get.addColumn(B, B); 163 get.addColumn(C, C); 164 table.get(get); 165 166 verifyMethodResult(SimpleRegionObserver.class, 167 new String[] { "hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", "hadDelete", 168 "hadPrePreparedDeleteTS" }, 169 tableName, new Boolean[] { true, true, true, true, false, false }); 170 171 Delete delete = new Delete(ROW); 172 delete.addColumn(A, A); 173 delete.addColumn(B, B); 174 delete.addColumn(C, C); 175 table.delete(delete); 176 177 verifyMethodResult(SimpleRegionObserver.class, 178 new String[] { "hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", "hadPreBatchMutate", 179 "hadPostBatchMutate", "hadDelete", "hadPrePreparedDeleteTS" }, 180 tableName, new Boolean[] { true, true, true, true, true, true, true, true }); 181 } finally { 182 util.deleteTable(tableName); 183 table.close(); 184 } 185 verifyMethodResult(SimpleRegionObserver.class, 186 new String[] { "getCtPreOpen", "getCtPostOpen", "getCtPreClose", "getCtPostClose" }, 187 tableName, new Integer[] { 1, 1, 1, 1 }); 188 } 189 190 @Test 191 public void testRowMutation() throws IOException { 192 final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + "." + name.getMethodName()); 193 Table table = util.createTable(tableName, new byte[][] { A, B, C }); 194 try { 195 verifyMethodResult(SimpleRegionObserver.class, 196 new String[] { "hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", "hadDeleted" }, 197 tableName, new Boolean[] { false, false, false, false, false }); 198 Put put = new Put(ROW); 199 put.addColumn(A, A, A); 200 put.addColumn(B, B, B); 201 put.addColumn(C, C, C); 202 203 Delete delete = new Delete(ROW); 204 delete.addColumn(A, A); 205 delete.addColumn(B, B); 206 delete.addColumn(C, C); 207 208 RowMutations arm = new RowMutations(ROW); 209 arm.add(put); 210 arm.add(delete); 211 table.mutateRow(arm); 212 213 verifyMethodResult(SimpleRegionObserver.class, 214 new String[] { "hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", "hadDeleted" }, 215 tableName, new Boolean[] { false, false, true, true, true }); 216 } finally { 217 util.deleteTable(tableName); 218 table.close(); 219 } 220 } 221 222 @Test 223 public void testIncrementHook() throws IOException { 224 final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + "." + name.getMethodName()); 225 Table table = util.createTable(tableName, new byte[][] { A, B, C }); 226 try { 227 Increment inc = new Increment(Bytes.toBytes(0)); 228 inc.addColumn(A, A, 1); 229 230 verifyMethodResult(SimpleRegionObserver.class, 231 new String[] { "hadPreIncrement", "hadPostIncrement", "hadPreIncrementAfterRowLock" }, 232 tableName, new Boolean[] { false, false, false }); 233 234 table.increment(inc); 235 236 verifyMethodResult(SimpleRegionObserver.class, 237 new String[] { "hadPreIncrement", "hadPostIncrement", "hadPreIncrementAfterRowLock" }, 238 tableName, new Boolean[] { true, true, true }); 239 } finally { 240 util.deleteTable(tableName); 241 table.close(); 242 } 243 } 244 245 @Test 246 public void testCheckAndPutHooks() throws IOException { 247 final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + "." + name.getMethodName()); 248 try (Table table = util.createTable(tableName, new byte[][] { A, B, C })) { 249 Put p = new Put(Bytes.toBytes(0)); 250 p.addColumn(A, A, A); 251 table.put(p); 252 p = new Put(Bytes.toBytes(0)); 253 p.addColumn(A, A, A); 254 verifyMethodResult(SimpleRegionObserver.class, 255 new String[] { "hadPreCheckAndPut", "hadPreCheckAndPutAfterRowLock", "hadPostCheckAndPut" }, 256 tableName, new Boolean[] { false, false, false }); 257 table.checkAndMutate(Bytes.toBytes(0), A).qualifier(A).ifEquals(A).thenPut(p); 258 verifyMethodResult(SimpleRegionObserver.class, 259 new String[] { "hadPreCheckAndPut", "hadPreCheckAndPutAfterRowLock", "hadPostCheckAndPut" }, 260 tableName, new Boolean[] { true, true, true }); 261 } finally { 262 util.deleteTable(tableName); 263 } 264 } 265 266 @Test 267 public void testCheckAndDeleteHooks() throws IOException { 268 final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + "." + name.getMethodName()); 269 Table table = util.createTable(tableName, new byte[][] { A, B, C }); 270 try { 271 Put p = new Put(Bytes.toBytes(0)); 272 p.addColumn(A, A, A); 273 table.put(p); 274 Delete d = new Delete(Bytes.toBytes(0)); 275 table.delete(d); 276 verifyMethodResult( 277 SimpleRegionObserver.class, new String[] { "hadPreCheckAndDelete", 278 "hadPreCheckAndDeleteAfterRowLock", "hadPostCheckAndDelete" }, 279 tableName, new Boolean[] { false, false, false }); 280 table.checkAndMutate(Bytes.toBytes(0), A).qualifier(A).ifEquals(A).thenDelete(d); 281 verifyMethodResult( 282 SimpleRegionObserver.class, new String[] { "hadPreCheckAndDelete", 283 "hadPreCheckAndDeleteAfterRowLock", "hadPostCheckAndDelete" }, 284 tableName, new Boolean[] { true, true, true }); 285 } finally { 286 util.deleteTable(tableName); 287 table.close(); 288 } 289 } 290 291 @Test 292 public void testAppendHook() throws IOException { 293 final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + "." + name.getMethodName()); 294 Table table = util.createTable(tableName, new byte[][] { A, B, C }); 295 try { 296 Append app = new Append(Bytes.toBytes(0)); 297 app.addColumn(A, A, A); 298 299 verifyMethodResult(SimpleRegionObserver.class, 300 new String[] { "hadPreAppend", "hadPostAppend", "hadPreAppendAfterRowLock" }, tableName, 301 new Boolean[] { false, false, false }); 302 303 table.append(app); 304 305 verifyMethodResult(SimpleRegionObserver.class, 306 new String[] { "hadPreAppend", "hadPostAppend", "hadPreAppendAfterRowLock" }, tableName, 307 new Boolean[] { true, true, true }); 308 } finally { 309 util.deleteTable(tableName); 310 table.close(); 311 } 312 } 313 314 @Test 315 // HBase-3583 316 public void testHBase3583() throws IOException { 317 final TableName tableName = TableName.valueOf(name.getMethodName()); 318 util.createTable(tableName, new byte[][] { A, B, C }); 319 util.waitUntilAllRegionsAssigned(tableName); 320 321 verifyMethodResult(SimpleRegionObserver.class, 322 new String[] { "hadPreGet", "hadPostGet", "wasScannerNextCalled", "wasScannerCloseCalled" }, 323 tableName, new Boolean[] { false, false, false, false }); 324 325 Table table = util.getConnection().getTable(tableName); 326 Put put = new Put(ROW); 327 put.addColumn(A, A, A); 328 table.put(put); 329 330 Get get = new Get(ROW); 331 get.addColumn(A, A); 332 table.get(get); 333 334 // verify that scannerNext and scannerClose upcalls won't be invoked 335 // when we perform get(). 336 verifyMethodResult(SimpleRegionObserver.class, 337 new String[] { "hadPreGet", "hadPostGet", "wasScannerNextCalled", "wasScannerCloseCalled" }, 338 tableName, new Boolean[] { true, true, false, false }); 339 340 Scan s = new Scan(); 341 ResultScanner scanner = table.getScanner(s); 342 try { 343 for (Result rr = scanner.next(); rr != null; rr = scanner.next()) { 344 } 345 } finally { 346 scanner.close(); 347 } 348 349 // now scanner hooks should be invoked. 350 verifyMethodResult(SimpleRegionObserver.class, 351 new String[] { "wasScannerNextCalled", "wasScannerCloseCalled" }, tableName, 352 new Boolean[] { true, true }); 353 util.deleteTable(tableName); 354 table.close(); 355 } 356 357 @Test 358 public void testHBASE14489() throws IOException { 359 final TableName tableName = TableName.valueOf(name.getMethodName()); 360 Table table = util.createTable(tableName, new byte[][] { A }); 361 Put put = new Put(ROW); 362 put.addColumn(A, A, A); 363 table.put(put); 364 365 Scan s = new Scan(); 366 s.setFilter(new FilterAllFilter()); 367 ResultScanner scanner = table.getScanner(s); 368 try { 369 for (Result rr = scanner.next(); rr != null; rr = scanner.next()) { 370 } 371 } finally { 372 scanner.close(); 373 } 374 verifyMethodResult(SimpleRegionObserver.class, new String[] { "wasScannerFilterRowCalled" }, 375 tableName, new Boolean[] { true }); 376 util.deleteTable(tableName); 377 table.close(); 378 379 } 380 381 @Test 382 // HBase-3758 383 public void testHBase3758() throws IOException { 384 final TableName tableName = TableName.valueOf(name.getMethodName()); 385 util.createTable(tableName, new byte[][] { A, B, C }); 386 387 verifyMethodResult(SimpleRegionObserver.class, 388 new String[] { "hadDeleted", "wasScannerOpenCalled" }, tableName, 389 new Boolean[] { false, false }); 390 391 Table table = util.getConnection().getTable(tableName); 392 Put put = new Put(ROW); 393 put.addColumn(A, A, A); 394 table.put(put); 395 396 Delete delete = new Delete(ROW); 397 table.delete(delete); 398 399 verifyMethodResult(SimpleRegionObserver.class, 400 new String[] { "hadDeleted", "wasScannerOpenCalled" }, tableName, 401 new Boolean[] { true, false }); 402 403 Scan s = new Scan(); 404 ResultScanner scanner = table.getScanner(s); 405 try { 406 for (Result rr = scanner.next(); rr != null; rr = scanner.next()) { 407 } 408 } finally { 409 scanner.close(); 410 } 411 412 // now scanner hooks should be invoked. 413 verifyMethodResult(SimpleRegionObserver.class, new String[] { "wasScannerOpenCalled" }, 414 tableName, new Boolean[] { true }); 415 util.deleteTable(tableName); 416 table.close(); 417 } 418 419 /* Overrides compaction to only output rows with keys that are even numbers */ 420 public static class EvenOnlyCompactor implements RegionCoprocessor, RegionObserver { 421 long lastCompaction; 422 long lastFlush; 423 424 @Override 425 public Optional<RegionObserver> getRegionObserver() { 426 return Optional.of(this); 427 } 428 429 @Override 430 public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store, 431 InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker, 432 CompactionRequest request) { 433 return new InternalScanner() { 434 435 @Override 436 public boolean next(List<Cell> results, ScannerContext scannerContext) throws IOException { 437 List<Cell> internalResults = new ArrayList<>(); 438 boolean hasMore; 439 do { 440 hasMore = scanner.next(internalResults, scannerContext); 441 if (!internalResults.isEmpty()) { 442 long row = Bytes.toLong(CellUtil.cloneValue(internalResults.get(0))); 443 if (row % 2 == 0) { 444 // return this row 445 break; 446 } 447 // clear and continue 448 internalResults.clear(); 449 } 450 } while (hasMore); 451 452 if (!internalResults.isEmpty()) { 453 results.addAll(internalResults); 454 } 455 return hasMore; 456 } 457 458 @Override 459 public void close() throws IOException { 460 scanner.close(); 461 } 462 }; 463 } 464 465 @Override 466 public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store, 467 StoreFile resultFile, CompactionLifeCycleTracker tracker, CompactionRequest request) { 468 lastCompaction = EnvironmentEdgeManager.currentTime(); 469 } 470 471 @Override 472 public void postFlush(ObserverContext<RegionCoprocessorEnvironment> e, 473 FlushLifeCycleTracker tracker) { 474 lastFlush = EnvironmentEdgeManager.currentTime(); 475 } 476 } 477 478 /** 479 * Tests overriding compaction handling via coprocessor hooks 480 * @throws Exception 481 */ 482 @Test 483 public void testCompactionOverride() throws Exception { 484 final TableName compactTable = TableName.valueOf(name.getMethodName()); 485 Admin admin = util.getAdmin(); 486 if (admin.tableExists(compactTable)) { 487 admin.disableTable(compactTable); 488 admin.deleteTable(compactTable); 489 } 490 491 HTableDescriptor htd = new HTableDescriptor(compactTable); 492 htd.addFamily(new HColumnDescriptor(A)); 493 htd.addCoprocessor(EvenOnlyCompactor.class.getName()); 494 admin.createTable(htd); 495 496 Table table = util.getConnection().getTable(compactTable); 497 for (long i = 1; i <= 10; i++) { 498 byte[] iBytes = Bytes.toBytes(i); 499 Put put = new Put(iBytes); 500 put.setDurability(Durability.SKIP_WAL); 501 put.addColumn(A, A, iBytes); 502 table.put(put); 503 } 504 505 HRegion firstRegion = cluster.getRegions(compactTable).get(0); 506 Coprocessor cp = firstRegion.getCoprocessorHost().findCoprocessor(EvenOnlyCompactor.class); 507 assertNotNull("EvenOnlyCompactor coprocessor should be loaded", cp); 508 EvenOnlyCompactor compactor = (EvenOnlyCompactor) cp; 509 510 // force a compaction 511 long ts = System.currentTimeMillis(); 512 admin.flush(compactTable); 513 // wait for flush 514 for (int i = 0; i < 10; i++) { 515 if (compactor.lastFlush >= ts) { 516 break; 517 } 518 Thread.sleep(1000); 519 } 520 assertTrue("Flush didn't complete", compactor.lastFlush >= ts); 521 LOG.debug("Flush complete"); 522 523 ts = compactor.lastFlush; 524 admin.majorCompact(compactTable); 525 // wait for compaction 526 for (int i = 0; i < 30; i++) { 527 if (compactor.lastCompaction >= ts) { 528 break; 529 } 530 Thread.sleep(1000); 531 } 532 LOG.debug("Last compaction was at " + compactor.lastCompaction); 533 assertTrue("Compaction didn't complete", compactor.lastCompaction >= ts); 534 535 // only even rows should remain 536 ResultScanner scanner = table.getScanner(new Scan()); 537 try { 538 for (long i = 2; i <= 10; i += 2) { 539 Result r = scanner.next(); 540 assertNotNull(r); 541 assertFalse(r.isEmpty()); 542 byte[] iBytes = Bytes.toBytes(i); 543 assertArrayEquals("Row should be " + i, r.getRow(), iBytes); 544 assertArrayEquals("Value should be " + i, r.getValue(A, A), iBytes); 545 } 546 } finally { 547 scanner.close(); 548 } 549 table.close(); 550 } 551 552 @Test 553 public void bulkLoadHFileTest() throws Exception { 554 final String testName = TestRegionObserverInterface.class.getName() + "." + name.getMethodName(); 555 final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + "." + name.getMethodName()); 556 Configuration conf = util.getConfiguration(); 557 Table table = util.createTable(tableName, new byte[][] { A, B, C }); 558 try (RegionLocator locator = util.getConnection().getRegionLocator(tableName)) { 559 verifyMethodResult(SimpleRegionObserver.class, 560 new String[] { "hadPreBulkLoadHFile", "hadPostBulkLoadHFile" }, tableName, 561 new Boolean[] { false, false }); 562 563 FileSystem fs = util.getTestFileSystem(); 564 final Path dir = util.getDataTestDirOnTestFS(testName).makeQualified(fs); 565 Path familyDir = new Path(dir, Bytes.toString(A)); 566 567 createHFile(util.getConfiguration(), fs, new Path(familyDir, Bytes.toString(A)), A, A); 568 569 // Bulk load 570 new LoadIncrementalHFiles(conf).doBulkLoad(dir, util.getAdmin(), table, locator); 571 572 verifyMethodResult(SimpleRegionObserver.class, 573 new String[] { "hadPreBulkLoadHFile", "hadPostBulkLoadHFile" }, tableName, 574 new Boolean[] { true, true }); 575 } finally { 576 util.deleteTable(tableName); 577 table.close(); 578 } 579 } 580 581 @Test 582 public void testRecovery() throws Exception { 583 LOG.info(TestRegionObserverInterface.class.getName() + "." + name.getMethodName()); 584 final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + "." + name.getMethodName()); 585 Table table = util.createTable(tableName, new byte[][] { A, B, C }); 586 try (RegionLocator locator = util.getConnection().getRegionLocator(tableName)) { 587 588 JVMClusterUtil.RegionServerThread rs1 = cluster.startRegionServer(); 589 ServerName sn2 = rs1.getRegionServer().getServerName(); 590 String regEN = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); 591 592 util.getAdmin().move(Bytes.toBytes(regEN), sn2); 593 while (!sn2.equals(locator.getAllRegionLocations().get(0).getServerName())) { 594 Thread.sleep(100); 595 } 596 597 Put put = new Put(ROW); 598 put.addColumn(A, A, A); 599 put.addColumn(B, B, B); 600 put.addColumn(C, C, C); 601 table.put(put); 602 603 // put two times 604 table.put(put); 605 606 verifyMethodResult(SimpleRegionObserver.class, 607 new String[] { "hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", "hadPreBatchMutate", 608 "hadPostBatchMutate", "hadDelete" }, 609 tableName, new Boolean[] { false, false, true, true, true, true, false }); 610 611 verifyMethodResult(SimpleRegionObserver.class, 612 new String[] { "getCtPreReplayWALs", "getCtPostReplayWALs", "getCtPreWALRestore", 613 "getCtPostWALRestore", "getCtPrePut", "getCtPostPut" }, 614 tableName, new Integer[] { 0, 0, 0, 0, 2, 2 }); 615 616 cluster.killRegionServer(rs1.getRegionServer().getServerName()); 617 Threads.sleep(1000); // Let the kill soak in. 618 util.waitUntilAllRegionsAssigned(tableName); 619 LOG.info("All regions assigned"); 620 621 verifyMethodResult(SimpleRegionObserver.class, 622 new String[] { "getCtPreReplayWALs", "getCtPostReplayWALs", "getCtPreWALRestore", 623 "getCtPostWALRestore", "getCtPrePut", "getCtPostPut" }, 624 tableName, new Integer[] { 1, 1, 2, 2, 0, 0 }); 625 } finally { 626 util.deleteTable(tableName); 627 table.close(); 628 } 629 } 630 631 @Test 632 public void testPreWALRestoreSkip() throws Exception { 633 LOG.info(TestRegionObserverInterface.class.getName() + "." + name.getMethodName()); 634 TableName tableName = TableName.valueOf(SimpleRegionObserver.TABLE_SKIPPED); 635 Table table = util.createTable(tableName, new byte[][] { A, B, C }); 636 637 try (RegionLocator locator = util.getConnection().getRegionLocator(tableName)) { 638 JVMClusterUtil.RegionServerThread rs1 = cluster.startRegionServer(); 639 ServerName sn2 = rs1.getRegionServer().getServerName(); 640 String regEN = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); 641 642 util.getAdmin().move(Bytes.toBytes(regEN), sn2); 643 while (!sn2.equals(locator.getAllRegionLocations().get(0).getServerName())) { 644 Thread.sleep(100); 645 } 646 647 Put put = new Put(ROW); 648 put.addColumn(A, A, A); 649 put.addColumn(B, B, B); 650 put.addColumn(C, C, C); 651 table.put(put); 652 653 cluster.killRegionServer(rs1.getRegionServer().getServerName()); 654 Threads.sleep(20000); // just to be sure that the kill has fully started. 655 util.waitUntilAllRegionsAssigned(tableName); 656 } 657 658 verifyMethodResult(SimpleRegionObserver.class, 659 new String[] { "getCtPreWALRestore", "getCtPostWALRestore", }, tableName, 660 new Integer[] { 0, 0 }); 661 662 util.deleteTable(tableName); 663 table.close(); 664 } 665 666 // check each region whether the coprocessor upcalls are called or not. 667 private void verifyMethodResult(Class<?> coprocessor, String methodName[], TableName tableName, 668 Object value[]) throws IOException { 669 try { 670 for (JVMClusterUtil.RegionServerThread t : cluster.getRegionServerThreads()) { 671 if (!t.isAlive() || t.getRegionServer().isAborted() || t.getRegionServer().isStopping()) { 672 continue; 673 } 674 for (RegionInfo r : ProtobufUtil 675 .getOnlineRegions(t.getRegionServer().getRSRpcServices())) { 676 if (!r.getTable().equals(tableName)) { 677 continue; 678 } 679 RegionCoprocessorHost cph = 680 t.getRegionServer().getOnlineRegion(r.getRegionName()).getCoprocessorHost(); 681 682 Coprocessor cp = cph.findCoprocessor(coprocessor.getName()); 683 assertNotNull(cp); 684 for (int i = 0; i < methodName.length; ++i) { 685 Method m = coprocessor.getMethod(methodName[i]); 686 Object o = m.invoke(cp); 687 assertTrue("Result of " + coprocessor.getName() + "." + methodName[i] 688 + " is expected to be " + value[i].toString() + ", while we get " 689 + o.toString(), o.equals(value[i])); 690 } 691 } 692 } 693 } catch (Exception e) { 694 throw new IOException(e.toString()); 695 } 696 } 697 698 private static void createHFile(Configuration conf, FileSystem fs, Path path, byte[] family, 699 byte[] qualifier) throws IOException { 700 HFileContext context = new HFileContextBuilder().build(); 701 HFile.Writer writer = HFile.getWriterFactory(conf, new CacheConfig(conf)).withPath(fs, path) 702 .withFileContext(context).create(); 703 long now = System.currentTimeMillis(); 704 try { 705 for (int i = 1; i <= 9; i++) { 706 KeyValue kv = 707 new KeyValue(Bytes.toBytes(i + ""), family, qualifier, now, Bytes.toBytes(i + "")); 708 writer.append(kv); 709 } 710 } finally { 711 writer.close(); 712 } 713 } 714}