001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor; 019 020import static org.junit.Assert.assertArrayEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.assertTrue; 024 025import java.io.IOException; 026import java.lang.reflect.Method; 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.List; 030import java.util.Optional; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.hbase.Cell; 035import org.apache.hadoop.hbase.CellUtil; 036import org.apache.hadoop.hbase.CompareOperator; 037import org.apache.hadoop.hbase.Coprocessor; 038import org.apache.hadoop.hbase.HBaseClassTestRule; 039import org.apache.hadoop.hbase.HBaseTestingUtility; 040import org.apache.hadoop.hbase.HColumnDescriptor; 041import org.apache.hadoop.hbase.HTableDescriptor; 042import org.apache.hadoop.hbase.KeyValue; 043import org.apache.hadoop.hbase.MiniHBaseCluster; 044import org.apache.hadoop.hbase.ServerName; 045import org.apache.hadoop.hbase.TableName; 046import org.apache.hadoop.hbase.client.Admin; 047import org.apache.hadoop.hbase.client.Append; 048import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 049import org.apache.hadoop.hbase.client.Delete; 050import org.apache.hadoop.hbase.client.Durability; 051import org.apache.hadoop.hbase.client.Get; 052import org.apache.hadoop.hbase.client.Increment; 053import org.apache.hadoop.hbase.client.Put; 054import org.apache.hadoop.hbase.client.RegionInfo; 055import org.apache.hadoop.hbase.client.RegionLocator; 056import org.apache.hadoop.hbase.client.Result; 057import org.apache.hadoop.hbase.client.ResultScanner; 058import org.apache.hadoop.hbase.client.RowMutations; 059import org.apache.hadoop.hbase.client.Scan; 060import org.apache.hadoop.hbase.client.Table; 061import org.apache.hadoop.hbase.client.TableDescriptor; 062import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 063import org.apache.hadoop.hbase.filter.FilterAllFilter; 064import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; 065import org.apache.hadoop.hbase.io.hfile.CacheConfig; 066import org.apache.hadoop.hbase.io.hfile.HFile; 067import org.apache.hadoop.hbase.io.hfile.HFileContext; 068import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 069import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; 070import org.apache.hadoop.hbase.regionserver.HRegion; 071import org.apache.hadoop.hbase.regionserver.InternalScanner; 072import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost; 073import org.apache.hadoop.hbase.regionserver.ScanType; 074import org.apache.hadoop.hbase.regionserver.ScannerContext; 075import org.apache.hadoop.hbase.regionserver.Store; 076import org.apache.hadoop.hbase.regionserver.StoreFile; 077import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 078import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; 079import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; 080import org.apache.hadoop.hbase.testclassification.CoprocessorTests; 081import org.apache.hadoop.hbase.testclassification.LargeTests; 082import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; 083import org.apache.hadoop.hbase.util.Bytes; 084import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 085import org.apache.hadoop.hbase.util.JVMClusterUtil; 086import org.apache.hadoop.hbase.util.Threads; 087import org.apache.hadoop.hbase.wal.WALEdit; 088import org.apache.hadoop.hbase.wal.WALKey; 089import org.apache.hadoop.hbase.wal.WALKeyImpl; 090import org.junit.AfterClass; 091import org.junit.Assert; 092import org.junit.BeforeClass; 093import org.junit.ClassRule; 094import org.junit.Rule; 095import org.junit.Test; 096import org.junit.experimental.categories.Category; 097import org.junit.rules.TestName; 098import org.mockito.Mockito; 099import org.slf4j.Logger; 100import org.slf4j.LoggerFactory; 101 102import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 103 104@Category({ CoprocessorTests.class, LargeTests.class }) 105public class TestRegionObserverInterface { 106 107 @ClassRule 108 public static final HBaseClassTestRule CLASS_RULE = 109 HBaseClassTestRule.forClass(TestRegionObserverInterface.class); 110 111 private static final Logger LOG = LoggerFactory.getLogger(TestRegionObserverInterface.class); 112 113 public static final TableName TEST_TABLE = TableName.valueOf("TestTable"); 114 public static final byte[] FAMILY = Bytes.toBytes("f"); 115 public final static byte[] A = Bytes.toBytes("a"); 116 public final static byte[] B = Bytes.toBytes("b"); 117 public final static byte[] C = Bytes.toBytes("c"); 118 public final static byte[] ROW = Bytes.toBytes("testrow"); 119 120 private static HBaseTestingUtility util = new HBaseTestingUtility(); 121 private static MiniHBaseCluster cluster = null; 122 123 @Rule 124 public TestName name = new TestName(); 125 126 @BeforeClass 127 public static void setupBeforeClass() throws Exception { 128 // set configure to indicate which cp should be loaded 129 Configuration conf = util.getConfiguration(); 130 conf.setBoolean("hbase.master.distributed.log.replay", true); 131 conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, 132 SimpleRegionObserver.class.getName()); 133 134 util.startMiniCluster(); 135 cluster = util.getMiniHBaseCluster(); 136 } 137 138 @AfterClass 139 public static void tearDownAfterClass() throws Exception { 140 util.shutdownMiniCluster(); 141 } 142 143 @Test 144 public void testRegionObserver() throws IOException { 145 final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + "." + name.getMethodName()); 146 // recreate table every time in order to reset the status of the 147 // coprocessor. 148 Table table = util.createTable(tableName, new byte[][] { A, B, C }); 149 try { 150 verifyMethodResult(SimpleRegionObserver.class, 151 new String[] { "hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", "hadDelete", 152 "hadPostStartRegionOperation", "hadPostCloseRegionOperation", 153 "hadPostBatchMutateIndispensably" }, 154 tableName, new Boolean[] { false, false, false, false, false, false, false, false }); 155 156 Put put = new Put(ROW); 157 put.addColumn(A, A, A); 158 put.addColumn(B, B, B); 159 put.addColumn(C, C, C); 160 table.put(put); 161 162 verifyMethodResult(SimpleRegionObserver.class, 163 new String[] { "hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", "hadPreBatchMutate", 164 "hadPostBatchMutate", "hadDelete", "hadPostStartRegionOperation", 165 "hadPostCloseRegionOperation", "hadPostBatchMutateIndispensably" }, 166 TEST_TABLE, 167 new Boolean[] { false, false, true, true, true, true, false, true, true, true }); 168 169 verifyMethodResult(SimpleRegionObserver.class, 170 new String[] { "getCtPreOpen", "getCtPostOpen", "getCtPreClose", "getCtPostClose" }, 171 tableName, new Integer[] { 1, 1, 0, 0 }); 172 173 Get get = new Get(ROW); 174 get.addColumn(A, A); 175 get.addColumn(B, B); 176 get.addColumn(C, C); 177 table.get(get); 178 179 verifyMethodResult(SimpleRegionObserver.class, 180 new String[] { "hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", "hadDelete", 181 "hadPrePreparedDeleteTS" }, 182 tableName, new Boolean[] { true, true, true, true, false, false }); 183 184 Delete delete = new Delete(ROW); 185 delete.addColumn(A, A); 186 delete.addColumn(B, B); 187 delete.addColumn(C, C); 188 table.delete(delete); 189 190 verifyMethodResult(SimpleRegionObserver.class, 191 new String[] { "hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", "hadPreBatchMutate", 192 "hadPostBatchMutate", "hadDelete", "hadPrePreparedDeleteTS" }, 193 tableName, new Boolean[] { true, true, true, true, true, true, true, true }); 194 } finally { 195 util.deleteTable(tableName); 196 table.close(); 197 } 198 verifyMethodResult(SimpleRegionObserver.class, 199 new String[] { "getCtPreOpen", "getCtPostOpen", "getCtPreClose", "getCtPostClose" }, 200 tableName, new Integer[] { 1, 1, 1, 1 }); 201 } 202 203 @Test 204 public void testRowMutation() throws IOException { 205 final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + "." + name.getMethodName()); 206 Table table = util.createTable(tableName, new byte[][] { A, B, C }); 207 try { 208 verifyMethodResult(SimpleRegionObserver.class, 209 new String[] { "hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", "hadDeleted" }, 210 tableName, new Boolean[] { false, false, false, false, false }); 211 Put put = new Put(ROW); 212 put.addColumn(A, A, A); 213 put.addColumn(B, B, B); 214 put.addColumn(C, C, C); 215 216 Delete delete = new Delete(ROW); 217 delete.addColumn(A, A); 218 delete.addColumn(B, B); 219 delete.addColumn(C, C); 220 221 RowMutations arm = new RowMutations(ROW); 222 arm.add(put); 223 arm.add(delete); 224 table.mutateRow(arm); 225 226 verifyMethodResult(SimpleRegionObserver.class, 227 new String[] { "hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", "hadDeleted" }, 228 tableName, new Boolean[] { false, false, true, true, true }); 229 } finally { 230 util.deleteTable(tableName); 231 table.close(); 232 } 233 } 234 235 @Test 236 public void testIncrementHook() throws IOException { 237 final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + "." + name.getMethodName()); 238 Table table = util.createTable(tableName, new byte[][] { A, B, C }); 239 try { 240 Increment inc = new Increment(Bytes.toBytes(0)); 241 inc.addColumn(A, A, 1); 242 243 verifyMethodResult(SimpleRegionObserver.class, 244 new String[] { "hadPreIncrement", "hadPostIncrement", "hadPreIncrementAfterRowLock" }, 245 tableName, new Boolean[] { false, false, false }); 246 247 table.increment(inc); 248 249 verifyMethodResult(SimpleRegionObserver.class, 250 new String[] { "hadPreIncrement", "hadPostIncrement", "hadPreIncrementAfterRowLock" }, 251 tableName, new Boolean[] { true, true, true }); 252 } finally { 253 util.deleteTable(tableName); 254 table.close(); 255 } 256 } 257 258 @Test 259 public void testCheckAndPutHooks() throws IOException { 260 final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + "." + name.getMethodName()); 261 try (Table table = util.createTable(tableName, new byte[][] { A, B, C })) { 262 Put p = new Put(Bytes.toBytes(0)); 263 p.addColumn(A, A, A); 264 table.put(p); 265 p = new Put(Bytes.toBytes(0)); 266 p.addColumn(A, A, A); 267 verifyMethodResult(SimpleRegionObserver.class, 268 new String[] { "getPreCheckAndPut", "getPreCheckAndPutAfterRowLock", "getPostCheckAndPut", 269 "getPreCheckAndPutWithFilter", "getPreCheckAndPutWithFilterAfterRowLock", 270 "getPostCheckAndPutWithFilter" }, 271 tableName, new Integer[] { 0, 0, 0, 0, 0, 0 }); 272 273 table.checkAndMutate(Bytes.toBytes(0), A).qualifier(A).ifEquals(A).thenPut(p); 274 verifyMethodResult(SimpleRegionObserver.class, 275 new String[] { "getPreCheckAndPut", "getPreCheckAndPutAfterRowLock", "getPostCheckAndPut", 276 "getPreCheckAndPutWithFilter", "getPreCheckAndPutWithFilterAfterRowLock", 277 "getPostCheckAndPutWithFilter" }, 278 tableName, new Integer[] { 1, 1, 1, 0, 0, 0 }); 279 280 table.checkAndMutate(Bytes.toBytes(0), 281 new SingleColumnValueFilter(A, A, CompareOperator.EQUAL, A)) 282 .thenPut(p); 283 verifyMethodResult(SimpleRegionObserver.class, 284 new String[] { "getPreCheckAndPut", "getPreCheckAndPutAfterRowLock", "getPostCheckAndPut", 285 "getPreCheckAndPutWithFilter", "getPreCheckAndPutWithFilterAfterRowLock", 286 "getPostCheckAndPutWithFilter" }, 287 tableName, new Integer[] { 1, 1, 1, 1, 1, 1 }); 288 } finally { 289 util.deleteTable(tableName); 290 } 291 } 292 293 @Test 294 public void testCheckAndDeleteHooks() throws IOException { 295 final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + "." + name.getMethodName()); 296 Table table = util.createTable(tableName, new byte[][] { A, B, C }); 297 try { 298 Put p = new Put(Bytes.toBytes(0)); 299 p.addColumn(A, A, A); 300 table.put(p); 301 Delete d = new Delete(Bytes.toBytes(0)); 302 table.delete(d); 303 verifyMethodResult( 304 SimpleRegionObserver.class, new String[] { "getPreCheckAndDelete", 305 "getPreCheckAndDeleteAfterRowLock", "getPostCheckAndDelete", 306 "getPreCheckAndDeleteWithFilter", "getPreCheckAndDeleteWithFilterAfterRowLock", 307 "getPostCheckAndDeleteWithFilter" }, 308 tableName, new Integer[] { 0, 0, 0, 0, 0, 0 }); 309 310 table.checkAndMutate(Bytes.toBytes(0), A).qualifier(A).ifEquals(A).thenDelete(d); 311 verifyMethodResult( 312 SimpleRegionObserver.class, new String[] { "getPreCheckAndDelete", 313 "getPreCheckAndDeleteAfterRowLock", "getPostCheckAndDelete", 314 "getPreCheckAndDeleteWithFilter", "getPreCheckAndDeleteWithFilterAfterRowLock", 315 "getPostCheckAndDeleteWithFilter" }, 316 tableName, new Integer[] { 1, 1, 1, 0, 0, 0 }); 317 318 table.checkAndMutate(Bytes.toBytes(0), 319 new SingleColumnValueFilter(A, A, CompareOperator.EQUAL, A)) 320 .thenDelete(d); 321 verifyMethodResult( 322 SimpleRegionObserver.class, new String[] { "getPreCheckAndDelete", 323 "getPreCheckAndDeleteAfterRowLock", "getPostCheckAndDelete", 324 "getPreCheckAndDeleteWithFilter", "getPreCheckAndDeleteWithFilterAfterRowLock", 325 "getPostCheckAndDeleteWithFilter" }, 326 tableName, new Integer[] { 1, 1, 1, 1, 1, 1 }); 327 } finally { 328 util.deleteTable(tableName); 329 table.close(); 330 } 331 } 332 333 @Test 334 public void testAppendHook() throws IOException { 335 final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + "." + name.getMethodName()); 336 Table table = util.createTable(tableName, new byte[][] { A, B, C }); 337 try { 338 Append app = new Append(Bytes.toBytes(0)); 339 app.addColumn(A, A, A); 340 341 verifyMethodResult(SimpleRegionObserver.class, 342 new String[] { "hadPreAppend", "hadPostAppend", "hadPreAppendAfterRowLock" }, tableName, 343 new Boolean[] { false, false, false }); 344 345 table.append(app); 346 347 verifyMethodResult(SimpleRegionObserver.class, 348 new String[] { "hadPreAppend", "hadPostAppend", "hadPreAppendAfterRowLock" }, tableName, 349 new Boolean[] { true, true, true }); 350 } finally { 351 util.deleteTable(tableName); 352 table.close(); 353 } 354 } 355 356 @Test 357 // HBase-3583 358 public void testHBase3583() throws IOException { 359 final TableName tableName = TableName.valueOf(name.getMethodName()); 360 util.createTable(tableName, new byte[][] { A, B, C }); 361 util.waitUntilAllRegionsAssigned(tableName); 362 363 verifyMethodResult(SimpleRegionObserver.class, 364 new String[] { "hadPreGet", "hadPostGet", "wasScannerNextCalled", "wasScannerCloseCalled" }, 365 tableName, new Boolean[] { false, false, false, false }); 366 367 Table table = util.getConnection().getTable(tableName); 368 Put put = new Put(ROW); 369 put.addColumn(A, A, A); 370 table.put(put); 371 372 Get get = new Get(ROW); 373 get.addColumn(A, A); 374 table.get(get); 375 376 // verify that scannerNext and scannerClose upcalls won't be invoked 377 // when we perform get(). 378 verifyMethodResult(SimpleRegionObserver.class, 379 new String[] { "hadPreGet", "hadPostGet", "wasScannerNextCalled", "wasScannerCloseCalled" }, 380 tableName, new Boolean[] { true, true, false, false }); 381 382 Scan s = new Scan(); 383 ResultScanner scanner = table.getScanner(s); 384 try { 385 for (Result rr = scanner.next(); rr != null; rr = scanner.next()) { 386 } 387 } finally { 388 scanner.close(); 389 } 390 391 // now scanner hooks should be invoked. 392 verifyMethodResult(SimpleRegionObserver.class, 393 new String[] { "wasScannerNextCalled", "wasScannerCloseCalled" }, tableName, 394 new Boolean[] { true, true }); 395 util.deleteTable(tableName); 396 table.close(); 397 } 398 399 @Test 400 public void testHBASE14489() throws IOException { 401 final TableName tableName = TableName.valueOf(name.getMethodName()); 402 Table table = util.createTable(tableName, new byte[][] { A }); 403 Put put = new Put(ROW); 404 put.addColumn(A, A, A); 405 table.put(put); 406 407 Scan s = new Scan(); 408 s.setFilter(new FilterAllFilter()); 409 ResultScanner scanner = table.getScanner(s); 410 try { 411 for (Result rr = scanner.next(); rr != null; rr = scanner.next()) { 412 } 413 } finally { 414 scanner.close(); 415 } 416 verifyMethodResult(SimpleRegionObserver.class, new String[] { "wasScannerFilterRowCalled" }, 417 tableName, new Boolean[] { true }); 418 util.deleteTable(tableName); 419 table.close(); 420 421 } 422 423 @Test 424 // HBase-3758 425 public void testHBase3758() throws IOException { 426 final TableName tableName = TableName.valueOf(name.getMethodName()); 427 util.createTable(tableName, new byte[][] { A, B, C }); 428 429 verifyMethodResult(SimpleRegionObserver.class, 430 new String[] { "hadDeleted", "wasScannerOpenCalled" }, tableName, 431 new Boolean[] { false, false }); 432 433 Table table = util.getConnection().getTable(tableName); 434 Put put = new Put(ROW); 435 put.addColumn(A, A, A); 436 table.put(put); 437 438 Delete delete = new Delete(ROW); 439 table.delete(delete); 440 441 verifyMethodResult(SimpleRegionObserver.class, 442 new String[] { "hadDeleted", "wasScannerOpenCalled" }, tableName, 443 new Boolean[] { true, false }); 444 445 Scan s = new Scan(); 446 ResultScanner scanner = table.getScanner(s); 447 try { 448 for (Result rr = scanner.next(); rr != null; rr = scanner.next()) { 449 } 450 } finally { 451 scanner.close(); 452 } 453 454 // now scanner hooks should be invoked. 455 verifyMethodResult(SimpleRegionObserver.class, new String[] { "wasScannerOpenCalled" }, 456 tableName, new Boolean[] { true }); 457 util.deleteTable(tableName); 458 table.close(); 459 } 460 461 /* Overrides compaction to only output rows with keys that are even numbers */ 462 public static class EvenOnlyCompactor implements RegionCoprocessor, RegionObserver { 463 long lastCompaction; 464 long lastFlush; 465 466 @Override 467 public Optional<RegionObserver> getRegionObserver() { 468 return Optional.of(this); 469 } 470 471 @Override 472 public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store, 473 InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker, 474 CompactionRequest request) { 475 return new InternalScanner() { 476 477 @Override 478 public boolean next(List<Cell> results, ScannerContext scannerContext) throws IOException { 479 List<Cell> internalResults = new ArrayList<>(); 480 boolean hasMore; 481 do { 482 hasMore = scanner.next(internalResults, scannerContext); 483 if (!internalResults.isEmpty()) { 484 long row = Bytes.toLong(CellUtil.cloneValue(internalResults.get(0))); 485 if (row % 2 == 0) { 486 // return this row 487 break; 488 } 489 // clear and continue 490 internalResults.clear(); 491 } 492 } while (hasMore); 493 494 if (!internalResults.isEmpty()) { 495 results.addAll(internalResults); 496 } 497 return hasMore; 498 } 499 500 @Override 501 public void close() throws IOException { 502 scanner.close(); 503 } 504 }; 505 } 506 507 @Override 508 public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store, 509 StoreFile resultFile, CompactionLifeCycleTracker tracker, CompactionRequest request) { 510 lastCompaction = EnvironmentEdgeManager.currentTime(); 511 } 512 513 @Override 514 public void postFlush(ObserverContext<RegionCoprocessorEnvironment> e, 515 FlushLifeCycleTracker tracker) { 516 lastFlush = EnvironmentEdgeManager.currentTime(); 517 } 518 } 519 520 /** 521 * Tests overriding compaction handling via coprocessor hooks 522 * @throws Exception 523 */ 524 @Test 525 public void testCompactionOverride() throws Exception { 526 final TableName compactTable = TableName.valueOf(name.getMethodName()); 527 Admin admin = util.getAdmin(); 528 if (admin.tableExists(compactTable)) { 529 admin.disableTable(compactTable); 530 admin.deleteTable(compactTable); 531 } 532 533 HTableDescriptor htd = new HTableDescriptor(compactTable); 534 htd.addFamily(new HColumnDescriptor(A)); 535 htd.addCoprocessor(EvenOnlyCompactor.class.getName()); 536 admin.createTable(htd); 537 538 Table table = util.getConnection().getTable(compactTable); 539 for (long i = 1; i <= 10; i++) { 540 byte[] iBytes = Bytes.toBytes(i); 541 Put put = new Put(iBytes); 542 put.setDurability(Durability.SKIP_WAL); 543 put.addColumn(A, A, iBytes); 544 table.put(put); 545 } 546 547 HRegion firstRegion = cluster.getRegions(compactTable).get(0); 548 Coprocessor cp = firstRegion.getCoprocessorHost().findCoprocessor(EvenOnlyCompactor.class); 549 assertNotNull("EvenOnlyCompactor coprocessor should be loaded", cp); 550 EvenOnlyCompactor compactor = (EvenOnlyCompactor) cp; 551 552 // force a compaction 553 long ts = System.currentTimeMillis(); 554 admin.flush(compactTable); 555 // wait for flush 556 for (int i = 0; i < 10; i++) { 557 if (compactor.lastFlush >= ts) { 558 break; 559 } 560 Thread.sleep(1000); 561 } 562 assertTrue("Flush didn't complete", compactor.lastFlush >= ts); 563 LOG.debug("Flush complete"); 564 565 ts = compactor.lastFlush; 566 admin.majorCompact(compactTable); 567 // wait for compaction 568 for (int i = 0; i < 30; i++) { 569 if (compactor.lastCompaction >= ts) { 570 break; 571 } 572 Thread.sleep(1000); 573 } 574 LOG.debug("Last compaction was at " + compactor.lastCompaction); 575 assertTrue("Compaction didn't complete", compactor.lastCompaction >= ts); 576 577 // only even rows should remain 578 ResultScanner scanner = table.getScanner(new Scan()); 579 try { 580 for (long i = 2; i <= 10; i += 2) { 581 Result r = scanner.next(); 582 assertNotNull(r); 583 assertFalse(r.isEmpty()); 584 byte[] iBytes = Bytes.toBytes(i); 585 assertArrayEquals("Row should be " + i, r.getRow(), iBytes); 586 assertArrayEquals("Value should be " + i, r.getValue(A, A), iBytes); 587 } 588 } finally { 589 scanner.close(); 590 } 591 table.close(); 592 } 593 594 @Test 595 public void bulkLoadHFileTest() throws Exception { 596 final String testName = TestRegionObserverInterface.class.getName() + "." + name.getMethodName(); 597 final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + "." + name.getMethodName()); 598 Configuration conf = util.getConfiguration(); 599 Table table = util.createTable(tableName, new byte[][] { A, B, C }); 600 try (RegionLocator locator = util.getConnection().getRegionLocator(tableName)) { 601 verifyMethodResult(SimpleRegionObserver.class, 602 new String[] { "hadPreBulkLoadHFile", "hadPostBulkLoadHFile" }, tableName, 603 new Boolean[] { false, false }); 604 605 FileSystem fs = util.getTestFileSystem(); 606 final Path dir = util.getDataTestDirOnTestFS(testName).makeQualified(fs); 607 Path familyDir = new Path(dir, Bytes.toString(A)); 608 609 createHFile(util.getConfiguration(), fs, new Path(familyDir, Bytes.toString(A)), A, A); 610 611 // Bulk load 612 new LoadIncrementalHFiles(conf).doBulkLoad(dir, util.getAdmin(), table, locator); 613 614 verifyMethodResult(SimpleRegionObserver.class, 615 new String[] { "hadPreBulkLoadHFile", "hadPostBulkLoadHFile" }, tableName, 616 new Boolean[] { true, true }); 617 } finally { 618 util.deleteTable(tableName); 619 table.close(); 620 } 621 } 622 623 @Test 624 public void testRecovery() throws Exception { 625 LOG.info(TestRegionObserverInterface.class.getName() + "." + name.getMethodName()); 626 final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + "." + name.getMethodName()); 627 Table table = util.createTable(tableName, new byte[][] { A, B, C }); 628 try (RegionLocator locator = util.getConnection().getRegionLocator(tableName)) { 629 630 JVMClusterUtil.RegionServerThread rs1 = cluster.startRegionServer(); 631 ServerName sn2 = rs1.getRegionServer().getServerName(); 632 String regEN = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); 633 634 util.getAdmin().move(Bytes.toBytes(regEN), sn2); 635 while (!sn2.equals(locator.getAllRegionLocations().get(0).getServerName())) { 636 Thread.sleep(100); 637 } 638 639 Put put = new Put(ROW); 640 put.addColumn(A, A, A); 641 put.addColumn(B, B, B); 642 put.addColumn(C, C, C); 643 table.put(put); 644 645 // put two times 646 table.put(put); 647 648 verifyMethodResult(SimpleRegionObserver.class, 649 new String[] { "hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", "hadPreBatchMutate", 650 "hadPostBatchMutate", "hadDelete" }, 651 tableName, new Boolean[] { false, false, true, true, true, true, false }); 652 653 verifyMethodResult(SimpleRegionObserver.class, 654 new String[] { "getCtPreReplayWALs", "getCtPostReplayWALs", "getCtPreWALRestore", 655 "getCtPostWALRestore", "getCtPrePut", "getCtPostPut" }, 656 tableName, new Integer[] { 0, 0, 0, 0, 2, 2 }); 657 658 cluster.killRegionServer(rs1.getRegionServer().getServerName()); 659 Threads.sleep(1000); // Let the kill soak in. 660 util.waitUntilAllRegionsAssigned(tableName); 661 LOG.info("All regions assigned"); 662 663 verifyMethodResult(SimpleRegionObserver.class, 664 new String[] { "getCtPreReplayWALs", "getCtPostReplayWALs", "getCtPreWALRestore", 665 "getCtPostWALRestore", "getCtPrePut", "getCtPostPut" }, 666 tableName, new Integer[] { 1, 1, 2, 2, 0, 0 }); 667 } finally { 668 util.deleteTable(tableName); 669 table.close(); 670 } 671 } 672 673 @Test 674 public void testPreWALRestoreSkip() throws Exception { 675 LOG.info(TestRegionObserverInterface.class.getName() + "." + name.getMethodName()); 676 TableName tableName = TableName.valueOf(SimpleRegionObserver.TABLE_SKIPPED); 677 Table table = util.createTable(tableName, new byte[][] { A, B, C }); 678 679 try (RegionLocator locator = util.getConnection().getRegionLocator(tableName)) { 680 JVMClusterUtil.RegionServerThread rs1 = cluster.startRegionServer(); 681 ServerName sn2 = rs1.getRegionServer().getServerName(); 682 String regEN = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); 683 684 util.getAdmin().move(Bytes.toBytes(regEN), sn2); 685 while (!sn2.equals(locator.getAllRegionLocations().get(0).getServerName())) { 686 Thread.sleep(100); 687 } 688 689 Put put = new Put(ROW); 690 put.addColumn(A, A, A); 691 put.addColumn(B, B, B); 692 put.addColumn(C, C, C); 693 table.put(put); 694 695 cluster.killRegionServer(rs1.getRegionServer().getServerName()); 696 Threads.sleep(20000); // just to be sure that the kill has fully started. 697 util.waitUntilAllRegionsAssigned(tableName); 698 } 699 700 verifyMethodResult(SimpleRegionObserver.class, 701 new String[] { "getCtPreWALRestore", "getCtPostWALRestore", }, tableName, 702 new Integer[] { 0, 0 }); 703 704 util.deleteTable(tableName); 705 table.close(); 706 } 707 708 //called from testPreWALAppendIsWrittenToWAL 709 private void testPreWALAppendHook(Table table, TableName tableName) throws IOException { 710 int expectedCalls = 0; 711 String [] methodArray = new String[1]; 712 methodArray[0] = "getCtPreWALAppend"; 713 Object[] resultArray = new Object[1]; 714 715 Put p = new Put(ROW); 716 p.addColumn(A, A, A); 717 table.put(p); 718 resultArray[0] = ++expectedCalls; 719 verifyMethodResult(SimpleRegionObserver.class, methodArray, tableName, resultArray); 720 721 Append a = new Append(ROW); 722 a.addColumn(B, B, B); 723 table.append(a); 724 resultArray[0] = ++expectedCalls; 725 verifyMethodResult(SimpleRegionObserver.class, methodArray, tableName, resultArray); 726 727 Increment i = new Increment(ROW); 728 i.addColumn(C, C, 1); 729 table.increment(i); 730 resultArray[0] = ++expectedCalls; 731 verifyMethodResult(SimpleRegionObserver.class, methodArray, tableName, resultArray); 732 733 Delete d = new Delete(ROW); 734 table.delete(d); 735 resultArray[0] = ++expectedCalls; 736 verifyMethodResult(SimpleRegionObserver.class, methodArray, tableName, resultArray); 737 } 738 739 @Test 740 public void testPreWALAppend() throws Exception { 741 SimpleRegionObserver sro = new SimpleRegionObserver(); 742 ObserverContext ctx = Mockito.mock(ObserverContext.class); 743 WALKey key = new WALKeyImpl(Bytes.toBytes("region"), TEST_TABLE, 744 EnvironmentEdgeManager.currentTime()); 745 WALEdit edit = new WALEdit(); 746 sro.preWALAppend(ctx, key, edit); 747 Assert.assertEquals(1, key.getExtendedAttributes().size()); 748 Assert.assertArrayEquals(SimpleRegionObserver.WAL_EXTENDED_ATTRIBUTE_BYTES, 749 key.getExtendedAttribute(Integer.toString(sro.getCtPreWALAppend()))); 750 } 751 752 @Test 753 public void testPreWALAppendIsWrittenToWAL() throws Exception { 754 final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + 755 "." + name.getMethodName()); 756 Table table = util.createTable(tableName, new byte[][] { A, B, C }); 757 758 PreWALAppendWALActionsListener listener = new PreWALAppendWALActionsListener(); 759 List<HRegion> regions = util.getHBaseCluster().getRegions(tableName); 760 //should be only one region 761 HRegion region = regions.get(0); 762 region.getWAL().registerWALActionsListener(listener); 763 testPreWALAppendHook(table, tableName); 764 boolean[] expectedResults = {true, true, true, true}; 765 Assert.assertArrayEquals(expectedResults, listener.getWalKeysCorrectArray()); 766 767 } 768 769 @Test 770 public void testPreWALAppendNotCalledOnMetaEdit() throws Exception { 771 final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + 772 "." + name.getMethodName()); 773 TableDescriptorBuilder tdBuilder = TableDescriptorBuilder.newBuilder(tableName); 774 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(FAMILY); 775 tdBuilder.setColumnFamily(cfBuilder.build()); 776 tdBuilder.setCoprocessor(SimpleRegionObserver.class.getName()); 777 TableDescriptor td = tdBuilder.build(); 778 Table table = util.createTable(td, new byte[][] { A, B, C }); 779 780 PreWALAppendWALActionsListener listener = new PreWALAppendWALActionsListener(); 781 List<HRegion> regions = util.getHBaseCluster().getRegions(tableName); 782 //should be only one region 783 HRegion region = regions.get(0); 784 785 region.getWAL().registerWALActionsListener(listener); 786 //flushing should write to the WAL 787 region.flush(true); 788 //so should compaction 789 region.compact(false); 790 //and so should closing the region 791 region.close(); 792 793 //but we still shouldn't have triggered preWALAppend because no user data was written 794 String[] methods = new String[] {"getCtPreWALAppend"}; 795 Object[] expectedResult = new Integer[]{0}; 796 verifyMethodResult(SimpleRegionObserver.class, methods, tableName, expectedResult); 797 } 798 799 // check each region whether the coprocessor upcalls are called or not. 800 private void verifyMethodResult(Class<?> coprocessor, String methodName[], TableName tableName, 801 Object value[]) throws IOException { 802 try { 803 for (JVMClusterUtil.RegionServerThread t : cluster.getRegionServerThreads()) { 804 if (!t.isAlive() || t.getRegionServer().isAborted() || t.getRegionServer().isStopping()) { 805 continue; 806 } 807 for (RegionInfo r : ProtobufUtil 808 .getOnlineRegions(t.getRegionServer().getRSRpcServices())) { 809 if (!r.getTable().equals(tableName)) { 810 continue; 811 } 812 RegionCoprocessorHost cph = 813 t.getRegionServer().getOnlineRegion(r.getRegionName()).getCoprocessorHost(); 814 815 Coprocessor cp = cph.findCoprocessor(coprocessor.getName()); 816 assertNotNull(cp); 817 for (int i = 0; i < methodName.length; ++i) { 818 Method m = coprocessor.getMethod(methodName[i]); 819 Object o = m.invoke(cp); 820 assertTrue("Result of " + coprocessor.getName() + "." + methodName[i] 821 + " is expected to be " + value[i].toString() + ", while we get " 822 + o.toString(), o.equals(value[i])); 823 } 824 } 825 } 826 } catch (Exception e) { 827 throw new IOException(e.toString()); 828 } 829 } 830 831 private static void createHFile(Configuration conf, FileSystem fs, Path path, byte[] family, 832 byte[] qualifier) throws IOException { 833 HFileContext context = new HFileContextBuilder().build(); 834 HFile.Writer writer = HFile.getWriterFactory(conf, new CacheConfig(conf)).withPath(fs, path) 835 .withFileContext(context).create(); 836 long now = System.currentTimeMillis(); 837 try { 838 for (int i = 1; i <= 9; i++) { 839 KeyValue kv = 840 new KeyValue(Bytes.toBytes(i + ""), family, qualifier, now, Bytes.toBytes(i + "")); 841 writer.append(kv); 842 } 843 } finally { 844 writer.close(); 845 } 846 } 847 848 private static class PreWALAppendWALActionsListener implements WALActionsListener { 849 boolean[] walKeysCorrect = {false, false, false, false}; 850 851 @Override 852 public void postAppend(long entryLen, long elapsedTimeMillis, 853 WALKey logKey, WALEdit logEdit) throws IOException { 854 for (int k = 0; k < 4; k++) { 855 if (!walKeysCorrect[k]) { 856 walKeysCorrect[k] = Arrays.equals(SimpleRegionObserver.WAL_EXTENDED_ATTRIBUTE_BYTES, 857 logKey.getExtendedAttribute(Integer.toString(k + 1))); 858 } 859 } 860 } 861 862 boolean[] getWalKeysCorrectArray() { 863 return walKeysCorrect; 864 } 865 } 866}