001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertFalse; 022import static org.junit.jupiter.api.Assertions.assertTrue; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.Iterator; 027import java.util.List; 028import java.util.Optional; 029import java.util.Set; 030import java.util.concurrent.CountDownLatch; 031import java.util.concurrent.TimeUnit; 032import java.util.concurrent.atomic.AtomicBoolean; 033import java.util.concurrent.atomic.AtomicInteger; 034import java.util.concurrent.atomic.AtomicReference; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.hbase.Cell; 038import org.apache.hadoop.hbase.ExtendedCell; 039import org.apache.hadoop.hbase.HBaseTestingUtil; 040import org.apache.hadoop.hbase.HConstants; 041import org.apache.hadoop.hbase.ServerName; 042import org.apache.hadoop.hbase.TableName; 043import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 044import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; 045import org.apache.hadoop.hbase.coprocessor.ObserverContext; 046import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 047import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 048import org.apache.hadoop.hbase.coprocessor.RegionObserver; 049import org.apache.hadoop.hbase.io.hfile.BlockCache; 050import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 051import org.apache.hadoop.hbase.io.hfile.CacheConfig; 052import org.apache.hadoop.hbase.io.hfile.CachedBlock; 053import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache; 054import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; 055import org.apache.hadoop.hbase.regionserver.BloomType; 056import org.apache.hadoop.hbase.regionserver.HRegion; 057import org.apache.hadoop.hbase.regionserver.HStore; 058import org.apache.hadoop.hbase.regionserver.InternalScanner; 059import org.apache.hadoop.hbase.regionserver.RegionScanner; 060import org.apache.hadoop.hbase.regionserver.ScannerContext; 061import org.apache.hadoop.hbase.testclassification.ClientTests; 062import org.apache.hadoop.hbase.testclassification.LargeTests; 063import org.apache.hadoop.hbase.util.Bytes; 064import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 065import org.junit.jupiter.api.AfterAll; 066import org.junit.jupiter.api.AfterEach; 067import org.junit.jupiter.api.BeforeAll; 068import org.junit.jupiter.api.BeforeEach; 069import org.junit.jupiter.api.Tag; 070import org.junit.jupiter.api.Test; 071import org.junit.jupiter.api.TestInfo; 072import org.slf4j.Logger; 073import org.slf4j.LoggerFactory; 074 075import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 076 077@Tag(LargeTests.TAG) 078@Tag(ClientTests.TAG) 079public class TestBlockEvictionFromClient { 080 081 private static final Logger LOG = LoggerFactory.getLogger(TestBlockEvictionFromClient.class); 082 protected final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 083 static byte[][] ROWS = new byte[2][]; 084 private static int NO_OF_THREADS = 3; 085 private static byte[] ROW = Bytes.toBytes("testRow"); 086 private static byte[] ROW1 = Bytes.toBytes("testRow1"); 087 private static byte[] ROW2 = Bytes.toBytes("testRow2"); 088 private static byte[] ROW3 = Bytes.toBytes("testRow3"); 089 private static byte[] FAMILY = Bytes.toBytes("testFamily"); 090 private static byte[][] FAMILIES_1 = new byte[1][0]; 091 private static byte[] QUALIFIER = Bytes.toBytes("testQualifier"); 092 private static byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 093 private static byte[] data = new byte[1000]; 094 private static byte[] data2 = Bytes.add(data, data); 095 protected static int SLAVES = 1; 096 private static CountDownLatch latch; 097 private static CountDownLatch getLatch; 098 private static CountDownLatch compactionLatch; 099 private static CountDownLatch exceptionLatch; 100 private String methodName; 101 102 @BeforeEach 103 public void setUp(TestInfo testInfo) throws Exception { 104 this.methodName = testInfo.getTestMethod().get().getName(); 105 CustomInnerRegionObserver.waitForGets.set(false); 106 CustomInnerRegionObserver.countOfNext.set(0); 107 CustomInnerRegionObserver.countOfGets.set(0); 108 } 109 110 @BeforeAll 111 public static void setUpBeforeClass() throws Exception { 112 ROWS[0] = ROW; 113 ROWS[1] = ROW1; 114 Configuration conf = TEST_UTIL.getConfiguration(); 115 conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, 116 MultiRowMutationEndpoint.class.getName()); 117 conf.setInt("hbase.regionserver.handler.count", 20); 118 conf.setInt("hbase.bucketcache.size", 400); 119 conf.setStrings(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap"); 120 conf.setFloat("hfile.block.cache.size", 0.2f); 121 conf.setFloat("hbase.regionserver.global.memstore.size", 0.1f); 122 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);// do not retry 123 conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 5000); 124 FAMILIES_1[0] = FAMILY; 125 TEST_UTIL.startMiniCluster(SLAVES); 126 } 127 128 /** 129 * @throws java.lang.Exception 130 */ 131 @AfterAll 132 public static void tearDownAfterClass() throws Exception { 133 TEST_UTIL.shutdownMiniCluster(); 134 } 135 136 /** 137 * @throws java.lang.Exception 138 */ 139 @AfterEach 140 public void tearDown() throws Exception { 141 if (latch != null) { 142 while (latch.getCount() > 0) { 143 latch.countDown(); 144 } 145 } 146 if (getLatch != null) { 147 getLatch.countDown(); 148 } 149 if (compactionLatch != null) { 150 compactionLatch.countDown(); 151 } 152 if (exceptionLatch != null) { 153 exceptionLatch.countDown(); 154 } 155 latch = null; 156 getLatch = null; 157 compactionLatch = null; 158 exceptionLatch = null; 159 CustomInnerRegionObserver.throwException.set(false); 160 // Clean up the tables for every test case 161 TableName[] listTableNames = TEST_UTIL.getAdmin().listTableNames(); 162 for (TableName tableName : listTableNames) { 163 if (!tableName.isSystemTable()) { 164 TEST_UTIL.getAdmin().disableTable(tableName); 165 TEST_UTIL.getAdmin().deleteTable(tableName); 166 } 167 } 168 } 169 170 @Test 171 public void testBlockEvictionWithParallelScans() throws Exception { 172 Table table = null; 173 try { 174 latch = new CountDownLatch(1); 175 final TableName tableName = TableName.valueOf(methodName); 176 // Create a table with block size as 1024 177 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 178 CustomInnerRegionObserver.class.getName()); 179 // get the block cache and region 180 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 181 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 182 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 183 HStore store = region.getStores().iterator().next(); 184 CacheConfig cacheConf = store.getCacheConfig(); 185 cacheConf.setCacheDataOnWrite(true); 186 cacheConf.setEvictOnClose(true); 187 BlockCache cache = cacheConf.getBlockCache().get(); 188 189 // insert data. 2 Rows are added 190 Put put = new Put(ROW); 191 put.addColumn(FAMILY, QUALIFIER, data); 192 table.put(put); 193 put = new Put(ROW1); 194 put.addColumn(FAMILY, QUALIFIER, data); 195 table.put(put); 196 assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data)); 197 // data was in memstore so don't expect any changes 198 // flush the data 199 // Should create one Hfile with 2 blocks 200 region.flush(true); 201 // Load cache 202 // Create three sets of scan 203 ScanThread[] scanThreads = initiateScan(table, false); 204 Thread.sleep(100); 205 checkForBlockEviction(cache, false, false); 206 for (ScanThread thread : scanThreads) { 207 thread.join(); 208 } 209 // CustomInnerRegionObserver.sleepTime.set(0); 210 Iterator<CachedBlock> iterator = cache.iterator(); 211 iterateBlockCache(cache, iterator); 212 // read the data and expect same blocks, one new hit, no misses 213 assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data)); 214 iterator = cache.iterator(); 215 iterateBlockCache(cache, iterator); 216 // Check how this miss is happening 217 // insert a second column, read the row, no new blocks, 3 new hits 218 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 219 byte[] data2 = Bytes.add(data, data); 220 put = new Put(ROW); 221 put.addColumn(FAMILY, QUALIFIER2, data2); 222 table.put(put); 223 Result r = table.get(new Get(ROW)); 224 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data)); 225 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2)); 226 iterator = cache.iterator(); 227 iterateBlockCache(cache, iterator); 228 // flush, one new block 229 LOG.info("Flushing cache"); 230 region.flush(true); 231 iterator = cache.iterator(); 232 iterateBlockCache(cache, iterator); 233 // compact, net minus two blocks, two hits, no misses 234 LOG.info("Compacting"); 235 assertEquals(2, store.getStorefilesCount()); 236 store.triggerMajorCompaction(); 237 region.compact(true); 238 waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max 239 assertEquals(1, store.getStorefilesCount()); 240 iterator = cache.iterator(); 241 iterateBlockCache(cache, iterator); 242 // read the row, this should be a cache miss because we don't cache data 243 // blocks on compaction 244 r = table.get(new Get(ROW)); 245 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data)); 246 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2)); 247 iterator = cache.iterator(); 248 iterateBlockCache(cache, iterator); 249 } finally { 250 if (table != null) { 251 table.close(); 252 } 253 } 254 } 255 256 @Test 257 public void testParallelGetsAndScans() throws IOException, InterruptedException { 258 Table table = null; 259 try { 260 latch = new CountDownLatch(2); 261 // Check if get() returns blocks on its close() itself 262 getLatch = new CountDownLatch(1); 263 final TableName tableName = TableName.valueOf(methodName); 264 // Create KV that will give you two blocks 265 // Create a table with block size as 1024 266 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 267 CustomInnerRegionObserver.class.getName()); 268 // get the block cache and region 269 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 270 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 271 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 272 HStore store = region.getStores().iterator().next(); 273 CacheConfig cacheConf = store.getCacheConfig(); 274 cacheConf.setCacheDataOnWrite(true); 275 cacheConf.setEvictOnClose(true); 276 BlockCache cache = cacheConf.getBlockCache().get(); 277 278 insertData(table); 279 // flush the data 280 LOG.info("Flushing cache"); 281 // Should create one Hfile with 2 blocks 282 region.flush(true); 283 // Create three sets of scan 284 CustomInnerRegionObserver.waitForGets.set(true); 285 ScanThread[] scanThreads = initiateScan(table, false); 286 // Create three sets of gets 287 GetThread[] getThreads = initiateGet(table, false, false); 288 checkForBlockEviction(cache, false, false); 289 CustomInnerRegionObserver.waitForGets.set(false); 290 checkForBlockEviction(cache, false, false); 291 for (GetThread thread : getThreads) { 292 thread.join(); 293 } 294 // Verify whether the gets have returned the blocks that it had 295 CustomInnerRegionObserver.waitForGets.set(true); 296 // giving some time for the block to be decremented 297 checkForBlockEviction(cache, true, false); 298 getLatch.countDown(); 299 for (ScanThread thread : scanThreads) { 300 thread.join(); 301 } 302 LOG.info("Scans should have returned the bloks"); 303 // Check with either true or false 304 CustomInnerRegionObserver.waitForGets.set(false); 305 // The scan should also have released the blocks by now 306 checkForBlockEviction(cache, true, true); 307 } finally { 308 if (table != null) { 309 table.close(); 310 } 311 } 312 } 313 314 @Test 315 public void testGetWithCellsInDifferentFiles() throws IOException, InterruptedException { 316 Table table = null; 317 try { 318 latch = new CountDownLatch(1); 319 // Check if get() returns blocks on its close() itself 320 getLatch = new CountDownLatch(1); 321 final TableName tableName = TableName.valueOf(methodName); 322 // Create KV that will give you two blocks 323 // Create a table with block size as 1024 324 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 325 CustomInnerRegionObserver.class.getName()); 326 // get the block cache and region 327 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 328 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 329 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 330 HStore store = region.getStores().iterator().next(); 331 CacheConfig cacheConf = store.getCacheConfig(); 332 cacheConf.setCacheDataOnWrite(true); 333 cacheConf.setEvictOnClose(true); 334 BlockCache cache = cacheConf.getBlockCache().get(); 335 336 Put put = new Put(ROW); 337 put.addColumn(FAMILY, QUALIFIER, data); 338 table.put(put); 339 region.flush(true); 340 put = new Put(ROW1); 341 put.addColumn(FAMILY, QUALIFIER, data); 342 table.put(put); 343 region.flush(true); 344 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 345 put = new Put(ROW); 346 put.addColumn(FAMILY, QUALIFIER2, data2); 347 table.put(put); 348 region.flush(true); 349 // flush the data 350 LOG.info("Flushing cache"); 351 // Should create one Hfile with 2 blocks 352 CustomInnerRegionObserver.waitForGets.set(true); 353 // Create three sets of gets 354 GetThread[] getThreads = initiateGet(table, false, false); 355 Thread.sleep(200); 356 CustomInnerRegionObserver.getCdl().get().countDown(); 357 for (GetThread thread : getThreads) { 358 thread.join(); 359 } 360 // Verify whether the gets have returned the blocks that it had 361 CustomInnerRegionObserver.waitForGets.set(true); 362 // giving some time for the block to be decremented 363 checkForBlockEviction(cache, true, false); 364 getLatch.countDown(); 365 LOG.info("Gets should have returned the bloks"); 366 } finally { 367 if (table != null) { 368 table.close(); 369 } 370 } 371 } 372 373 @Test 374 // TODO : check how block index works here 375 public void testGetsWithMultiColumnsAndExplicitTracker() 376 throws IOException, InterruptedException { 377 Table table = null; 378 try { 379 latch = new CountDownLatch(1); 380 // Check if get() returns blocks on its close() itself 381 getLatch = new CountDownLatch(1); 382 final TableName tableName = TableName.valueOf(methodName); 383 // Create KV that will give you two blocks 384 // Create a table with block size as 1024 385 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 386 CustomInnerRegionObserver.class.getName()); 387 // get the block cache and region 388 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 389 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 390 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 391 BlockCache cache = setCacheProperties(region); 392 Put put = new Put(ROW); 393 put.addColumn(FAMILY, QUALIFIER, data); 394 table.put(put); 395 region.flush(true); 396 put = new Put(ROW1); 397 put.addColumn(FAMILY, QUALIFIER, data); 398 table.put(put); 399 region.flush(true); 400 for (int i = 1; i < 10; i++) { 401 put = new Put(ROW); 402 put.addColumn(FAMILY, Bytes.toBytes("testQualifier" + i), data2); 403 table.put(put); 404 if (i % 2 == 0) { 405 region.flush(true); 406 } 407 } 408 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 409 put = new Put(ROW); 410 put.addColumn(FAMILY, QUALIFIER2, data2); 411 table.put(put); 412 region.flush(true); 413 // flush the data 414 LOG.info("Flushing cache"); 415 // Should create one Hfile with 2 blocks 416 CustomInnerRegionObserver.waitForGets.set(true); 417 // Create three sets of gets 418 GetThread[] getThreads = initiateGet(table, true, false); 419 Thread.sleep(200); 420 Iterator<CachedBlock> iterator = cache.iterator(); 421 boolean usedBlocksFound = false; 422 int refCount = 0; 423 int noOfBlocksWithRef = 0; 424 while (iterator.hasNext()) { 425 CachedBlock next = iterator.next(); 426 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 427 if (cache instanceof BucketCache) { 428 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 429 } else if (cache instanceof CombinedBlockCache) { 430 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 431 } else { 432 continue; 433 } 434 if (refCount != 0) { 435 // Blocks will be with count 3 436 LOG.info("The refCount is " + refCount); 437 assertEquals(NO_OF_THREADS, refCount); 438 usedBlocksFound = true; 439 noOfBlocksWithRef++; 440 } 441 } 442 assertTrue(usedBlocksFound); 443 // the number of blocks referred 444 assertEquals(10, noOfBlocksWithRef); 445 CustomInnerRegionObserver.getCdl().get().countDown(); 446 for (GetThread thread : getThreads) { 447 thread.join(); 448 } 449 // Verify whether the gets have returned the blocks that it had 450 CustomInnerRegionObserver.waitForGets.set(true); 451 // giving some time for the block to be decremented 452 checkForBlockEviction(cache, true, false); 453 getLatch.countDown(); 454 LOG.info("Gets should have returned the bloks"); 455 } finally { 456 if (table != null) { 457 table.close(); 458 } 459 } 460 } 461 462 @Test 463 public void testGetWithMultipleColumnFamilies() throws IOException, InterruptedException { 464 Table table = null; 465 try { 466 latch = new CountDownLatch(1); 467 // Check if get() returns blocks on its close() itself 468 getLatch = new CountDownLatch(1); 469 final TableName tableName = TableName.valueOf(methodName); 470 // Create KV that will give you two blocks 471 // Create a table with block size as 1024 472 byte[][] fams = new byte[10][]; 473 fams[0] = FAMILY; 474 for (int i = 1; i < 10; i++) { 475 fams[i] = (Bytes.toBytes("testFamily" + i)); 476 } 477 table = 478 TEST_UTIL.createTable(tableName, fams, 1, 1024, CustomInnerRegionObserver.class.getName()); 479 // get the block cache and region 480 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 481 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 482 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 483 BlockCache cache = setCacheProperties(region); 484 485 Put put = new Put(ROW); 486 put.addColumn(FAMILY, QUALIFIER, data); 487 table.put(put); 488 region.flush(true); 489 put = new Put(ROW1); 490 put.addColumn(FAMILY, QUALIFIER, data); 491 table.put(put); 492 region.flush(true); 493 for (int i = 1; i < 10; i++) { 494 put = new Put(ROW); 495 put.addColumn(Bytes.toBytes("testFamily" + i), Bytes.toBytes("testQualifier" + i), data2); 496 table.put(put); 497 if (i % 2 == 0) { 498 region.flush(true); 499 } 500 } 501 region.flush(true); 502 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 503 put = new Put(ROW); 504 put.addColumn(FAMILY, QUALIFIER2, data2); 505 table.put(put); 506 region.flush(true); 507 // flush the data 508 LOG.info("Flushing cache"); 509 // Should create one Hfile with 2 blocks 510 CustomInnerRegionObserver.waitForGets.set(true); 511 // Create three sets of gets 512 GetThread[] getThreads = initiateGet(table, true, true); 513 Thread.sleep(200); 514 Iterator<CachedBlock> iterator = cache.iterator(); 515 boolean usedBlocksFound = false; 516 int refCount = 0; 517 int noOfBlocksWithRef = 0; 518 while (iterator.hasNext()) { 519 CachedBlock next = iterator.next(); 520 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 521 if (cache instanceof BucketCache) { 522 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 523 } else if (cache instanceof CombinedBlockCache) { 524 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 525 } else { 526 continue; 527 } 528 if (refCount != 0) { 529 // Blocks will be with count 3 530 LOG.info("The refCount is " + refCount); 531 assertEquals(NO_OF_THREADS, refCount); 532 usedBlocksFound = true; 533 noOfBlocksWithRef++; 534 } 535 } 536 assertTrue(usedBlocksFound); 537 // the number of blocks referred 538 assertEquals(3, noOfBlocksWithRef); 539 CustomInnerRegionObserver.getCdl().get().countDown(); 540 for (GetThread thread : getThreads) { 541 thread.join(); 542 } 543 // Verify whether the gets have returned the blocks that it had 544 CustomInnerRegionObserver.waitForGets.set(true); 545 // giving some time for the block to be decremented 546 checkForBlockEviction(cache, true, false); 547 getLatch.countDown(); 548 LOG.info("Gets should have returned the bloks"); 549 } finally { 550 if (table != null) { 551 table.close(); 552 } 553 } 554 } 555 556 @Test 557 public void testBlockRefCountAfterSplits() throws IOException, InterruptedException { 558 Table table = null; 559 try { 560 final TableName tableName = TableName.valueOf(methodName); 561 TableDescriptor desc = TEST_UTIL.createTableDescriptor(tableName); 562 // This test expects rpc refcount of cached data blocks to be 0 after split. After split, 563 // two daughter regions are opened and a compaction is scheduled to get rid of reference 564 // of the parent region hfiles. Compaction will increase refcount of cached data blocks by 1. 565 // It is flakey since compaction can kick in anytime. To solve this issue, table is created 566 // with compaction disabled. 567 table = TEST_UTIL.createTable( 568 TableDescriptorBuilder.newBuilder(desc).setCompactionEnabled(false).build(), FAMILIES_1, 569 null, BloomType.ROW, 1024, null); 570 // get the block cache and region 571 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 572 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 573 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 574 HStore store = region.getStores().iterator().next(); 575 CacheConfig cacheConf = store.getCacheConfig(); 576 cacheConf.setEvictOnClose(true); 577 BlockCache cache = cacheConf.getBlockCache().get(); 578 579 Put put = new Put(ROW); 580 put.addColumn(FAMILY, QUALIFIER, data); 581 table.put(put); 582 region.flush(true); 583 put = new Put(ROW1); 584 put.addColumn(FAMILY, QUALIFIER, data); 585 table.put(put); 586 region.flush(true); 587 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 588 put = new Put(ROW2); 589 put.addColumn(FAMILY, QUALIFIER2, data2); 590 table.put(put); 591 put = new Put(ROW3); 592 put.addColumn(FAMILY, QUALIFIER2, data2); 593 table.put(put); 594 region.flush(true); 595 ServerName rs = Iterables.getOnlyElement(TEST_UTIL.getAdmin().getRegionServers()); 596 int regionCount = TEST_UTIL.getAdmin().getRegions(rs).size(); 597 LOG.info("About to SPLIT on {} {}, count={}", Bytes.toString(ROW1), region.getRegionInfo(), 598 regionCount); 599 TEST_UTIL.getAdmin().split(tableName, ROW1); 600 // Wait for splits 601 TEST_UTIL.waitFor(60000, () -> TEST_UTIL.getAdmin().getRegions(rs).size() > regionCount); 602 region.compact(true); 603 List<HRegion> regions = TEST_UTIL.getMiniHBaseCluster().getRegionServer(rs).getRegions(); 604 for (HRegion r : regions) { 605 LOG.info("" + r.getCompactionState()); 606 TEST_UTIL.waitFor(30000, () -> r.getCompactionState().equals(CompactionState.NONE)); 607 } 608 LOG.info("Split finished, is region closed {} {}", region.isClosed(), cache); 609 Iterator<CachedBlock> iterator = cache.iterator(); 610 // Though the split had created the HalfStorefileReader - the firstkey and lastkey scanners 611 // should be closed inorder to return those blocks 612 iterateBlockCache(cache, iterator); 613 } finally { 614 if (table != null) { 615 table.close(); 616 } 617 } 618 } 619 620 @Test 621 public void testMultiGets() throws IOException, InterruptedException { 622 Table table = null; 623 try { 624 latch = new CountDownLatch(2); 625 // Check if get() returns blocks on its close() itself 626 getLatch = new CountDownLatch(1); 627 final TableName tableName = TableName.valueOf(methodName); 628 // Create KV that will give you two blocks 629 // Create a table with block size as 1024 630 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 631 CustomInnerRegionObserver.class.getName()); 632 // get the block cache and region 633 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 634 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 635 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 636 HStore store = region.getStores().iterator().next(); 637 CacheConfig cacheConf = store.getCacheConfig(); 638 cacheConf.setCacheDataOnWrite(true); 639 cacheConf.setEvictOnClose(true); 640 BlockCache cache = cacheConf.getBlockCache().get(); 641 642 Put put = new Put(ROW); 643 put.addColumn(FAMILY, QUALIFIER, data); 644 table.put(put); 645 region.flush(true); 646 put = new Put(ROW1); 647 put.addColumn(FAMILY, QUALIFIER, data); 648 table.put(put); 649 region.flush(true); 650 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 651 put = new Put(ROW); 652 put.addColumn(FAMILY, QUALIFIER2, data2); 653 table.put(put); 654 region.flush(true); 655 // flush the data 656 LOG.info("Flushing cache"); 657 // Should create one Hfile with 2 blocks 658 CustomInnerRegionObserver.waitForGets.set(true); 659 // Create three sets of gets 660 MultiGetThread[] getThreads = initiateMultiGet(table); 661 Thread.sleep(200); 662 int refCount; 663 Iterator<CachedBlock> iterator = cache.iterator(); 664 boolean foundNonZeroBlock = false; 665 while (iterator.hasNext()) { 666 CachedBlock next = iterator.next(); 667 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 668 if (cache instanceof BucketCache) { 669 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 670 } else if (cache instanceof CombinedBlockCache) { 671 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 672 } else { 673 continue; 674 } 675 if (refCount != 0) { 676 assertEquals(NO_OF_THREADS, refCount); 677 foundNonZeroBlock = true; 678 } 679 } 680 assertTrue(foundNonZeroBlock, "Should have found nonzero ref count block"); 681 CustomInnerRegionObserver.getCdl().get().countDown(); 682 CustomInnerRegionObserver.getCdl().get().countDown(); 683 for (MultiGetThread thread : getThreads) { 684 thread.join(); 685 } 686 // Verify whether the gets have returned the blocks that it had 687 CustomInnerRegionObserver.waitForGets.set(true); 688 // giving some time for the block to be decremented 689 iterateBlockCache(cache, iterator); 690 getLatch.countDown(); 691 LOG.info("Gets should have returned the bloks"); 692 } finally { 693 if (table != null) { 694 table.close(); 695 } 696 } 697 } 698 699 @Test 700 public void testScanWithMultipleColumnFamilies() throws IOException, InterruptedException { 701 Table table = null; 702 try { 703 latch = new CountDownLatch(1); 704 // Check if get() returns blocks on its close() itself 705 final TableName tableName = TableName.valueOf(methodName); 706 // Create KV that will give you two blocks 707 // Create a table with block size as 1024 708 byte[][] fams = new byte[10][]; 709 fams[0] = FAMILY; 710 for (int i = 1; i < 10; i++) { 711 fams[i] = (Bytes.toBytes("testFamily" + i)); 712 } 713 table = 714 TEST_UTIL.createTable(tableName, fams, 1, 1024, CustomInnerRegionObserver.class.getName()); 715 // get the block cache and region 716 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 717 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 718 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 719 BlockCache cache = setCacheProperties(region); 720 721 Put put = new Put(ROW); 722 put.addColumn(FAMILY, QUALIFIER, data); 723 table.put(put); 724 region.flush(true); 725 put = new Put(ROW1); 726 put.addColumn(FAMILY, QUALIFIER, data); 727 table.put(put); 728 region.flush(true); 729 for (int i = 1; i < 10; i++) { 730 put = new Put(ROW); 731 put.addColumn(Bytes.toBytes("testFamily" + i), Bytes.toBytes("testQualifier" + i), data2); 732 table.put(put); 733 if (i % 2 == 0) { 734 region.flush(true); 735 } 736 } 737 region.flush(true); 738 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 739 put = new Put(ROW); 740 put.addColumn(FAMILY, QUALIFIER2, data2); 741 table.put(put); 742 region.flush(true); 743 // flush the data 744 LOG.info("Flushing cache"); 745 // Should create one Hfile with 2 blocks 746 // Create three sets of gets 747 ScanThread[] scanThreads = initiateScan(table, true); 748 Thread.sleep(200); 749 Iterator<CachedBlock> iterator = cache.iterator(); 750 boolean usedBlocksFound = false; 751 int refCount = 0; 752 int noOfBlocksWithRef = 0; 753 while (iterator.hasNext()) { 754 CachedBlock next = iterator.next(); 755 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 756 if (cache instanceof BucketCache) { 757 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 758 } else if (cache instanceof CombinedBlockCache) { 759 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 760 } else { 761 continue; 762 } 763 if (refCount != 0) { 764 // Blocks will be with count 3 765 LOG.info("The refCount is " + refCount); 766 assertEquals(NO_OF_THREADS, refCount); 767 usedBlocksFound = true; 768 noOfBlocksWithRef++; 769 } 770 } 771 assertTrue(usedBlocksFound); 772 // the number of blocks referred 773 assertEquals(12, noOfBlocksWithRef); 774 CustomInnerRegionObserver.getCdl().get().countDown(); 775 for (ScanThread thread : scanThreads) { 776 thread.join(); 777 } 778 // giving some time for the block to be decremented 779 checkForBlockEviction(cache, true, false); 780 } finally { 781 if (table != null) { 782 table.close(); 783 } 784 } 785 } 786 787 private BlockCache setCacheProperties(HRegion region) { 788 Iterator<HStore> strItr = region.getStores().iterator(); 789 BlockCache cache = null; 790 while (strItr.hasNext()) { 791 HStore store = strItr.next(); 792 CacheConfig cacheConf = store.getCacheConfig(); 793 cacheConf.setCacheDataOnWrite(true); 794 cacheConf.setEvictOnClose(true); 795 // Use the last one 796 cache = cacheConf.getBlockCache().get(); 797 } 798 return cache; 799 } 800 801 @Test 802 public void testParallelGetsAndScanWithWrappedRegionScanner() 803 throws IOException, InterruptedException { 804 Table table = null; 805 try { 806 latch = new CountDownLatch(2); 807 // Check if get() returns blocks on its close() itself 808 getLatch = new CountDownLatch(1); 809 final TableName tableName = TableName.valueOf(methodName); 810 // Create KV that will give you two blocks 811 // Create a table with block size as 1024 812 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 813 CustomInnerRegionObserverWrapper.class.getName()); 814 // get the block cache and region 815 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 816 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 817 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 818 HStore store = region.getStores().iterator().next(); 819 CacheConfig cacheConf = store.getCacheConfig(); 820 cacheConf.setCacheDataOnWrite(true); 821 cacheConf.setEvictOnClose(true); 822 BlockCache cache = cacheConf.getBlockCache().get(); 823 824 // insert data. 2 Rows are added 825 insertData(table); 826 // flush the data 827 LOG.info("Flushing cache"); 828 // Should create one Hfile with 2 blocks 829 region.flush(true); 830 // CustomInnerRegionObserver.sleepTime.set(5000); 831 // Create three sets of scan 832 CustomInnerRegionObserver.waitForGets.set(true); 833 ScanThread[] scanThreads = initiateScan(table, false); 834 // Create three sets of gets 835 GetThread[] getThreads = initiateGet(table, false, false); 836 // The block would have been decremented for the scan case as it was 837 // wrapped 838 // before even the postNext hook gets executed. 839 // giving some time for the block to be decremented 840 Thread.sleep(100); 841 CustomInnerRegionObserver.waitForGets.set(false); 842 checkForBlockEviction(cache, false, false); 843 // countdown the latch 844 CustomInnerRegionObserver.getCdl().get().countDown(); 845 for (GetThread thread : getThreads) { 846 thread.join(); 847 } 848 getLatch.countDown(); 849 for (ScanThread thread : scanThreads) { 850 thread.join(); 851 } 852 } finally { 853 if (table != null) { 854 table.close(); 855 } 856 } 857 } 858 859 @Test 860 public void testScanWithCompaction() throws IOException, InterruptedException { 861 testScanWithCompactionInternals(methodName, false); 862 } 863 864 @Test 865 public void testReverseScanWithCompaction() throws IOException, InterruptedException { 866 testScanWithCompactionInternals(methodName, true); 867 } 868 869 private void testScanWithCompactionInternals(String tableNameStr, boolean reversed) 870 throws IOException, InterruptedException { 871 Table table = null; 872 try { 873 latch = new CountDownLatch(1); 874 compactionLatch = new CountDownLatch(1); 875 TableName tableName = TableName.valueOf(tableNameStr); 876 // Create a table with block size as 1024 877 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 878 CustomInnerRegionObserverWrapper.class.getName()); 879 // get the block cache and region 880 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 881 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 882 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 883 HStore store = region.getStores().iterator().next(); 884 CacheConfig cacheConf = store.getCacheConfig(); 885 cacheConf.setCacheDataOnWrite(true); 886 cacheConf.setEvictOnClose(true); 887 BlockCache cache = cacheConf.getBlockCache().get(); 888 889 // insert data. 2 Rows are added 890 Put put = new Put(ROW); 891 put.addColumn(FAMILY, QUALIFIER, data); 892 table.put(put); 893 put = new Put(ROW1); 894 put.addColumn(FAMILY, QUALIFIER, data); 895 table.put(put); 896 assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data)); 897 // Should create one Hfile with 2 blocks 898 region.flush(true); 899 // read the data and expect same blocks, one new hit, no misses 900 int refCount = 0; 901 // Check how this miss is happening 902 // insert a second column, read the row, no new blocks, 3 new hits 903 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 904 byte[] data2 = Bytes.add(data, data); 905 put = new Put(ROW); 906 put.addColumn(FAMILY, QUALIFIER2, data2); 907 table.put(put); 908 // flush, one new block 909 LOG.info("Flushing cache"); 910 region.flush(true); 911 Iterator<CachedBlock> iterator = cache.iterator(); 912 iterateBlockCache(cache, iterator); 913 // Create three sets of scan 914 ScanThread[] scanThreads = initiateScan(table, reversed); 915 Thread.sleep(100); 916 iterator = cache.iterator(); 917 boolean usedBlocksFound = false; 918 while (iterator.hasNext()) { 919 CachedBlock next = iterator.next(); 920 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 921 if (cache instanceof BucketCache) { 922 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 923 } else if (cache instanceof CombinedBlockCache) { 924 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 925 } else { 926 continue; 927 } 928 if (refCount != 0) { 929 // Blocks will be with count 3 930 assertEquals(NO_OF_THREADS, refCount); 931 usedBlocksFound = true; 932 } 933 } 934 assertTrue(usedBlocksFound, "Blocks with non zero ref count should be found "); 935 usedBlocksFound = false; 936 LOG.info("Compacting"); 937 assertEquals(2, store.getStorefilesCount()); 938 store.triggerMajorCompaction(); 939 region.compact(true); 940 waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max 941 assertEquals(1, store.getStorefilesCount()); 942 // Even after compaction is done we will have some blocks that cannot 943 // be evicted this is because the scan is still referencing them 944 iterator = cache.iterator(); 945 while (iterator.hasNext()) { 946 CachedBlock next = iterator.next(); 947 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 948 if (cache instanceof BucketCache) { 949 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 950 } else if (cache instanceof CombinedBlockCache) { 951 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 952 } else { 953 continue; 954 } 955 if (refCount != 0) { 956 // Blocks will be with count 3 as they are not yet cleared 957 assertEquals(NO_OF_THREADS, refCount); 958 usedBlocksFound = true; 959 } 960 } 961 assertTrue(usedBlocksFound, "Blocks with non zero ref count should be found "); 962 // Should not throw exception 963 compactionLatch.countDown(); 964 latch.countDown(); 965 for (ScanThread thread : scanThreads) { 966 thread.join(); 967 } 968 // by this time all blocks should have been evicted 969 iterator = cache.iterator(); 970 iterateBlockCache(cache, iterator); 971 Result r = table.get(new Get(ROW)); 972 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data)); 973 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2)); 974 // The gets would be working on new blocks 975 iterator = cache.iterator(); 976 iterateBlockCache(cache, iterator); 977 } finally { 978 if (table != null) { 979 table.close(); 980 } 981 } 982 } 983 984 @Test 985 public void testBlockEvictionAfterHBASE13082WithCompactionAndFlush() 986 throws IOException, InterruptedException { 987 // do flush and scan in parallel 988 Table table = null; 989 try { 990 latch = new CountDownLatch(1); 991 compactionLatch = new CountDownLatch(1); 992 final TableName tableName = TableName.valueOf(methodName); 993 // Create a table with block size as 1024 994 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 995 CustomInnerRegionObserverWrapper.class.getName()); 996 // get the block cache and region 997 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 998 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 999 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 1000 HStore store = region.getStores().iterator().next(); 1001 CacheConfig cacheConf = store.getCacheConfig(); 1002 cacheConf.setCacheDataOnWrite(true); 1003 cacheConf.setEvictOnClose(true); 1004 BlockCache cache = cacheConf.getBlockCache().get(); 1005 1006 // insert data. 2 Rows are added 1007 Put put = new Put(ROW); 1008 put.addColumn(FAMILY, QUALIFIER, data); 1009 table.put(put); 1010 put = new Put(ROW1); 1011 put.addColumn(FAMILY, QUALIFIER, data); 1012 table.put(put); 1013 assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data)); 1014 // Should create one Hfile with 2 blocks 1015 region.flush(true); 1016 // read the data and expect same blocks, one new hit, no misses 1017 int refCount = 0; 1018 // Check how this miss is happening 1019 // insert a second column, read the row, no new blocks, 3 new hits 1020 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 1021 byte[] data2 = Bytes.add(data, data); 1022 put = new Put(ROW); 1023 put.addColumn(FAMILY, QUALIFIER2, data2); 1024 table.put(put); 1025 // flush, one new block 1026 LOG.info("Flushing cache"); 1027 region.flush(true); 1028 Iterator<CachedBlock> iterator = cache.iterator(); 1029 iterateBlockCache(cache, iterator); 1030 // Create three sets of scan 1031 ScanThread[] scanThreads = initiateScan(table, false); 1032 Thread.sleep(100); 1033 iterator = cache.iterator(); 1034 boolean usedBlocksFound = false; 1035 while (iterator.hasNext()) { 1036 CachedBlock next = iterator.next(); 1037 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 1038 if (cache instanceof BucketCache) { 1039 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 1040 } else if (cache instanceof CombinedBlockCache) { 1041 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 1042 } else { 1043 continue; 1044 } 1045 if (refCount != 0) { 1046 // Blocks will be with count 3 1047 assertEquals(NO_OF_THREADS, refCount); 1048 usedBlocksFound = true; 1049 } 1050 } 1051 // Make a put and do a flush 1052 QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 1053 data2 = Bytes.add(data, data); 1054 put = new Put(ROW1); 1055 put.addColumn(FAMILY, QUALIFIER2, data2); 1056 table.put(put); 1057 // flush, one new block 1058 LOG.info("Flushing cache"); 1059 region.flush(true); 1060 assertTrue(usedBlocksFound, "Blocks with non zero ref count should be found "); 1061 usedBlocksFound = false; 1062 LOG.info("Compacting"); 1063 assertEquals(3, store.getStorefilesCount()); 1064 store.triggerMajorCompaction(); 1065 region.compact(true); 1066 waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max 1067 assertEquals(1, store.getStorefilesCount()); 1068 // Even after compaction is done we will have some blocks that cannot 1069 // be evicted this is because the scan is still referencing them 1070 iterator = cache.iterator(); 1071 while (iterator.hasNext()) { 1072 CachedBlock next = iterator.next(); 1073 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 1074 if (cache instanceof BucketCache) { 1075 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 1076 } else if (cache instanceof CombinedBlockCache) { 1077 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 1078 } else { 1079 continue; 1080 } 1081 if (refCount != 0) { 1082 // Blocks will be with count 3 as they are not yet cleared 1083 assertEquals(NO_OF_THREADS, refCount); 1084 usedBlocksFound = true; 1085 } 1086 } 1087 assertTrue(usedBlocksFound, "Blocks with non zero ref count should be found "); 1088 // Should not throw exception 1089 compactionLatch.countDown(); 1090 latch.countDown(); 1091 for (ScanThread thread : scanThreads) { 1092 thread.join(); 1093 } 1094 // by this time all blocks should have been evicted 1095 iterator = cache.iterator(); 1096 // Since a flush and compaction happened after a scan started 1097 // we need to ensure that all the original blocks of the compacted file 1098 // is also removed. 1099 iterateBlockCache(cache, iterator); 1100 Result r = table.get(new Get(ROW)); 1101 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data)); 1102 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2)); 1103 // The gets would be working on new blocks 1104 iterator = cache.iterator(); 1105 iterateBlockCache(cache, iterator); 1106 } finally { 1107 if (table != null) { 1108 table.close(); 1109 } 1110 } 1111 } 1112 1113 @Test 1114 public void testScanWithException() throws IOException, InterruptedException { 1115 Table table = null; 1116 try { 1117 latch = new CountDownLatch(1); 1118 exceptionLatch = new CountDownLatch(1); 1119 final TableName tableName = TableName.valueOf(methodName); 1120 // Create KV that will give you two blocks 1121 // Create a table with block size as 1024 1122 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 1123 CustomInnerRegionObserverWrapper.class.getName()); 1124 // get the block cache and region 1125 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 1126 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 1127 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 1128 HStore store = region.getStores().iterator().next(); 1129 CacheConfig cacheConf = store.getCacheConfig(); 1130 cacheConf.setCacheDataOnWrite(true); 1131 cacheConf.setEvictOnClose(true); 1132 BlockCache cache = cacheConf.getBlockCache().get(); 1133 // insert data. 2 Rows are added 1134 insertData(table); 1135 // flush the data 1136 LOG.info("Flushing cache"); 1137 // Should create one Hfile with 2 blocks 1138 region.flush(true); 1139 // CustomInnerRegionObserver.sleepTime.set(5000); 1140 CustomInnerRegionObserver.throwException.set(true); 1141 ScanThread[] scanThreads = initiateScan(table, false); 1142 // The block would have been decremented for the scan case as it was 1143 // wrapped 1144 // before even the postNext hook gets executed. 1145 // giving some time for the block to be decremented 1146 Thread.sleep(100); 1147 Iterator<CachedBlock> iterator = cache.iterator(); 1148 boolean usedBlocksFound = false; 1149 int refCount = 0; 1150 while (iterator.hasNext()) { 1151 CachedBlock next = iterator.next(); 1152 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 1153 if (cache instanceof BucketCache) { 1154 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 1155 } else if (cache instanceof CombinedBlockCache) { 1156 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 1157 } else { 1158 continue; 1159 } 1160 if (refCount != 0) { 1161 // Blocks will be with count 3 1162 assertEquals(NO_OF_THREADS, refCount); 1163 usedBlocksFound = true; 1164 } 1165 } 1166 assertTrue(usedBlocksFound); 1167 exceptionLatch.countDown(); 1168 // countdown the latch 1169 CustomInnerRegionObserver.getCdl().get().countDown(); 1170 for (ScanThread thread : scanThreads) { 1171 thread.join(); 1172 } 1173 iterator = cache.iterator(); 1174 usedBlocksFound = false; 1175 refCount = 0; 1176 while (iterator.hasNext()) { 1177 CachedBlock next = iterator.next(); 1178 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 1179 if (cache instanceof BucketCache) { 1180 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 1181 } else if (cache instanceof CombinedBlockCache) { 1182 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 1183 } else { 1184 continue; 1185 } 1186 if (refCount != 0) { 1187 // Blocks will be with count 3 1188 assertEquals(NO_OF_THREADS, refCount); 1189 usedBlocksFound = true; 1190 } 1191 } 1192 assertFalse(usedBlocksFound); 1193 // you should always see 0 ref count. since after HBASE-16604 we always recreate the scanner 1194 assertEquals(0, refCount); 1195 } finally { 1196 if (table != null) { 1197 table.close(); 1198 } 1199 } 1200 } 1201 1202 private void iterateBlockCache(BlockCache cache, Iterator<CachedBlock> iterator) { 1203 int refCount; 1204 while (iterator.hasNext()) { 1205 CachedBlock next = iterator.next(); 1206 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 1207 if (cache instanceof BucketCache) { 1208 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 1209 LOG.info("BucketCache {} {}", cacheKey, refCount); 1210 } else if (cache instanceof CombinedBlockCache) { 1211 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 1212 LOG.info("CombinedBlockCache {} {}", cacheKey, refCount); 1213 } else { 1214 continue; 1215 } 1216 assertEquals(0, refCount); 1217 } 1218 } 1219 1220 private void insertData(Table table) throws IOException { 1221 Put put = new Put(ROW); 1222 put.addColumn(FAMILY, QUALIFIER, data); 1223 table.put(put); 1224 put = new Put(ROW1); 1225 put.addColumn(FAMILY, QUALIFIER, data); 1226 table.put(put); 1227 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 1228 put = new Put(ROW); 1229 put.addColumn(FAMILY, QUALIFIER2, data2); 1230 table.put(put); 1231 } 1232 1233 private ScanThread[] initiateScan(Table table, boolean reverse) 1234 throws IOException, InterruptedException { 1235 ScanThread[] scanThreads = new ScanThread[NO_OF_THREADS]; 1236 for (int i = 0; i < NO_OF_THREADS; i++) { 1237 scanThreads[i] = new ScanThread(table, reverse); 1238 } 1239 for (ScanThread thread : scanThreads) { 1240 thread.start(); 1241 } 1242 return scanThreads; 1243 } 1244 1245 private GetThread[] initiateGet(Table table, boolean tracker, boolean multipleCFs) 1246 throws IOException, InterruptedException { 1247 GetThread[] getThreads = new GetThread[NO_OF_THREADS]; 1248 for (int i = 0; i < NO_OF_THREADS; i++) { 1249 getThreads[i] = new GetThread(table, tracker, multipleCFs); 1250 } 1251 for (GetThread thread : getThreads) { 1252 thread.start(); 1253 } 1254 return getThreads; 1255 } 1256 1257 private MultiGetThread[] initiateMultiGet(Table table) throws IOException, InterruptedException { 1258 MultiGetThread[] multiGetThreads = new MultiGetThread[NO_OF_THREADS]; 1259 for (int i = 0; i < NO_OF_THREADS; i++) { 1260 multiGetThreads[i] = new MultiGetThread(table); 1261 } 1262 for (MultiGetThread thread : multiGetThreads) { 1263 thread.start(); 1264 } 1265 return multiGetThreads; 1266 } 1267 1268 private void checkForBlockEviction(BlockCache cache, boolean getClosed, boolean expectOnlyZero) 1269 throws InterruptedException { 1270 int counter = NO_OF_THREADS; 1271 if (CustomInnerRegionObserver.waitForGets.get()) { 1272 // Because only one row is selected, it has only 2 blocks 1273 counter = counter - 1; 1274 while (CustomInnerRegionObserver.countOfGets.get() < NO_OF_THREADS) { 1275 Thread.sleep(100); 1276 } 1277 } else { 1278 while (CustomInnerRegionObserver.countOfNext.get() < NO_OF_THREADS) { 1279 Thread.sleep(100); 1280 } 1281 } 1282 Iterator<CachedBlock> iterator = cache.iterator(); 1283 int refCount = 0; 1284 while (iterator.hasNext()) { 1285 CachedBlock next = iterator.next(); 1286 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 1287 if (cache instanceof BucketCache) { 1288 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 1289 } else if (cache instanceof CombinedBlockCache) { 1290 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 1291 } else { 1292 continue; 1293 } 1294 LOG.info(" the refcount is " + refCount + " block is " + cacheKey); 1295 if (CustomInnerRegionObserver.waitForGets.get()) { 1296 if (expectOnlyZero) { 1297 assertTrue(refCount == 0); 1298 } 1299 if (refCount != 0) { 1300 // Because the scan would have also touched up on these blocks but 1301 // it 1302 // would have touched 1303 // all 3 1304 if (getClosed) { 1305 // If get has closed only the scan's blocks would be available 1306 assertEquals(refCount, CustomInnerRegionObserver.countOfGets.get()); 1307 } else { 1308 assertEquals(refCount, CustomInnerRegionObserver.countOfGets.get() + (NO_OF_THREADS)); 1309 } 1310 } 1311 } else { 1312 // Because the get would have also touched up on these blocks but it 1313 // would have touched 1314 // upon only 2 additionally 1315 if (expectOnlyZero) { 1316 assertTrue(refCount == 0); 1317 } 1318 if (refCount != 0) { 1319 if (getLatch == null) { 1320 assertEquals(refCount, CustomInnerRegionObserver.countOfNext.get()); 1321 } else { 1322 assertEquals(refCount, CustomInnerRegionObserver.countOfNext.get() + (NO_OF_THREADS)); 1323 } 1324 } 1325 } 1326 } 1327 CustomInnerRegionObserver.getCdl().get().countDown(); 1328 } 1329 1330 private static class MultiGetThread extends Thread { 1331 private final Table table; 1332 private final List<Get> gets = new ArrayList<>(); 1333 1334 public MultiGetThread(Table table) { 1335 this.table = table; 1336 } 1337 1338 @Override 1339 public void run() { 1340 gets.add(new Get(ROW)); 1341 gets.add(new Get(ROW1)); 1342 try { 1343 CustomInnerRegionObserver.getCdl().set(latch); 1344 Result[] r = table.get(gets); 1345 assertTrue(Bytes.equals(r[0].getRow(), ROW)); 1346 assertTrue(Bytes.equals(r[1].getRow(), ROW1)); 1347 } catch (IOException e) { 1348 } 1349 } 1350 } 1351 1352 private static class GetThread extends Thread { 1353 private final Table table; 1354 private final boolean tracker; 1355 private final boolean multipleCFs; 1356 1357 public GetThread(Table table, boolean tracker, boolean multipleCFs) { 1358 this.table = table; 1359 this.tracker = tracker; 1360 this.multipleCFs = multipleCFs; 1361 } 1362 1363 @Override 1364 public void run() { 1365 try { 1366 initiateGet(table); 1367 } catch (IOException e) { 1368 // do nothing 1369 } 1370 } 1371 1372 private void initiateGet(Table table) throws IOException { 1373 Get get = new Get(ROW); 1374 if (tracker) { 1375 // Change this 1376 if (!multipleCFs) { 1377 get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 3)); 1378 get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 8)); 1379 get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 9)); 1380 // Unknown key 1381 get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 900)); 1382 } else { 1383 get.addColumn(Bytes.toBytes("testFamily" + 3), Bytes.toBytes("testQualifier" + 3)); 1384 get.addColumn(Bytes.toBytes("testFamily" + 8), Bytes.toBytes("testQualifier" + 8)); 1385 get.addColumn(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 9)); 1386 // Unknown key 1387 get.addColumn(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 900)); 1388 } 1389 } 1390 CustomInnerRegionObserver.getCdl().set(latch); 1391 Result r = table.get(get); 1392 LOG.info("" + r); 1393 if (!tracker) { 1394 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data)); 1395 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2)); 1396 } else { 1397 if (!multipleCFs) { 1398 assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 3)), data2)); 1399 assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 8)), data2)); 1400 assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 9)), data2)); 1401 } else { 1402 assertTrue(Bytes.equals( 1403 r.getValue(Bytes.toBytes("testFamily" + 3), Bytes.toBytes("testQualifier" + 3)), 1404 data2)); 1405 assertTrue(Bytes.equals( 1406 r.getValue(Bytes.toBytes("testFamily" + 8), Bytes.toBytes("testQualifier" + 8)), 1407 data2)); 1408 assertTrue(Bytes.equals( 1409 r.getValue(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 9)), 1410 data2)); 1411 } 1412 } 1413 } 1414 } 1415 1416 private static class ScanThread extends Thread { 1417 private final Table table; 1418 private final boolean reverse; 1419 1420 public ScanThread(Table table, boolean reverse) { 1421 this.table = table; 1422 this.reverse = reverse; 1423 } 1424 1425 @Override 1426 public void run() { 1427 try { 1428 initiateScan(table); 1429 } catch (IOException e) { 1430 // do nothing 1431 } 1432 } 1433 1434 private void initiateScan(Table table) throws IOException { 1435 Scan scan = new Scan(); 1436 if (reverse) { 1437 scan.setReversed(true); 1438 } 1439 CustomInnerRegionObserver.getCdl().set(latch); 1440 ResultScanner resScanner = table.getScanner(scan); 1441 int i = (reverse ? ROWS.length - 1 : 0); 1442 boolean resultFound = false; 1443 for (Result result : resScanner) { 1444 resultFound = true; 1445 LOG.info("" + result); 1446 if (!reverse) { 1447 assertTrue(Bytes.equals(result.getRow(), ROWS[i])); 1448 i++; 1449 } else { 1450 assertTrue(Bytes.equals(result.getRow(), ROWS[i])); 1451 i--; 1452 } 1453 } 1454 assertTrue(resultFound); 1455 } 1456 } 1457 1458 private void waitForStoreFileCount(HStore store, int count, int timeout) 1459 throws InterruptedException { 1460 long start = EnvironmentEdgeManager.currentTime(); 1461 while ( 1462 start + timeout > EnvironmentEdgeManager.currentTime() && store.getStorefilesCount() != count 1463 ) { 1464 Thread.sleep(100); 1465 } 1466 LOG.info("start=" + start + ", now=" + EnvironmentEdgeManager.currentTime() + ", cur=" 1467 + store.getStorefilesCount()); 1468 assertEquals(count, store.getStorefilesCount()); 1469 } 1470 1471 private static class CustomScanner implements RegionScanner { 1472 1473 private RegionScanner delegate; 1474 1475 public CustomScanner(RegionScanner delegate) { 1476 this.delegate = delegate; 1477 } 1478 1479 @Override 1480 public boolean next(List<? super ExtendedCell> results) throws IOException { 1481 return delegate.next(results); 1482 } 1483 1484 @Override 1485 public boolean next(List<? super ExtendedCell> result, ScannerContext scannerContext) 1486 throws IOException { 1487 return delegate.next(result, scannerContext); 1488 } 1489 1490 @Override 1491 public boolean nextRaw(List<? super ExtendedCell> result) throws IOException { 1492 return delegate.nextRaw(result); 1493 } 1494 1495 @Override 1496 public boolean nextRaw(List<? super ExtendedCell> result, ScannerContext context) 1497 throws IOException { 1498 boolean nextRaw = delegate.nextRaw(result, context); 1499 if (compactionLatch != null && compactionLatch.getCount() > 0) { 1500 try { 1501 compactionLatch.await(); 1502 } catch (InterruptedException ie) { 1503 } 1504 } 1505 1506 if (CustomInnerRegionObserver.throwException.get()) { 1507 if (exceptionLatch.getCount() > 0) { 1508 try { 1509 exceptionLatch.await(); 1510 } catch (InterruptedException e) { 1511 } 1512 throw new IOException("throw exception"); 1513 } 1514 } 1515 return nextRaw; 1516 } 1517 1518 @Override 1519 public Set<Path> getFilesRead() { 1520 return delegate.getFilesRead(); 1521 } 1522 1523 @Override 1524 public void close() throws IOException { 1525 delegate.close(); 1526 } 1527 1528 @Override 1529 public RegionInfo getRegionInfo() { 1530 return delegate.getRegionInfo(); 1531 } 1532 1533 @Override 1534 public boolean isFilterDone() throws IOException { 1535 return delegate.isFilterDone(); 1536 } 1537 1538 @Override 1539 public boolean reseek(byte[] row) throws IOException { 1540 return false; 1541 } 1542 1543 @Override 1544 public long getMaxResultSize() { 1545 return delegate.getMaxResultSize(); 1546 } 1547 1548 @Override 1549 public long getMvccReadPoint() { 1550 return delegate.getMvccReadPoint(); 1551 } 1552 1553 @Override 1554 public int getBatch() { 1555 return delegate.getBatch(); 1556 } 1557 } 1558 1559 public static class CustomInnerRegionObserverWrapper extends CustomInnerRegionObserver { 1560 @Override 1561 public RegionScanner postScannerOpen(ObserverContext<? extends RegionCoprocessorEnvironment> e, 1562 Scan scan, RegionScanner s) throws IOException { 1563 return new CustomScanner(s); 1564 } 1565 } 1566 1567 public static class CustomInnerRegionObserver implements RegionCoprocessor, RegionObserver { 1568 static final AtomicInteger countOfNext = new AtomicInteger(0); 1569 static final AtomicInteger countOfGets = new AtomicInteger(0); 1570 static final AtomicBoolean waitForGets = new AtomicBoolean(false); 1571 static final AtomicBoolean throwException = new AtomicBoolean(false); 1572 private static final AtomicReference<CountDownLatch> cdl = 1573 new AtomicReference<>(new CountDownLatch(0)); 1574 1575 @Override 1576 public Optional<RegionObserver> getRegionObserver() { 1577 return Optional.of(this); 1578 } 1579 1580 @Override 1581 public boolean postScannerNext(ObserverContext<? extends RegionCoprocessorEnvironment> e, 1582 InternalScanner s, List<Result> results, int limit, boolean hasMore) throws IOException { 1583 slowdownCode(e, false); 1584 if (getLatch != null && getLatch.getCount() > 0) { 1585 try { 1586 getLatch.await(); 1587 } catch (InterruptedException e1) { 1588 } 1589 } 1590 return hasMore; 1591 } 1592 1593 @Override 1594 public void postGetOp(ObserverContext<? extends RegionCoprocessorEnvironment> e, Get get, 1595 List<Cell> results) throws IOException { 1596 slowdownCode(e, true); 1597 } 1598 1599 public static AtomicReference<CountDownLatch> getCdl() { 1600 return cdl; 1601 } 1602 1603 private void slowdownCode(final ObserverContext<? extends RegionCoprocessorEnvironment> e, 1604 boolean isGet) { 1605 CountDownLatch latch = getCdl().get(); 1606 try { 1607 LOG.info(latch.getCount() + " is the count " + isGet); 1608 if (latch.getCount() > 0) { 1609 if (isGet) { 1610 countOfGets.incrementAndGet(); 1611 } else { 1612 countOfNext.incrementAndGet(); 1613 } 1614 LOG.info("Waiting for the counterCountDownLatch"); 1615 latch.await(2, TimeUnit.MINUTES); // To help the tests to finish. 1616 if (latch.getCount() > 0) { 1617 throw new RuntimeException("Can't wait more"); 1618 } 1619 } 1620 } catch (InterruptedException e1) { 1621 LOG.error(e1.toString(), e1); 1622 } 1623 } 1624 } 1625}