001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.Iterator; 027import java.util.List; 028import java.util.Optional; 029import java.util.concurrent.CountDownLatch; 030import java.util.concurrent.TimeUnit; 031import java.util.concurrent.atomic.AtomicBoolean; 032import java.util.concurrent.atomic.AtomicInteger; 033import java.util.concurrent.atomic.AtomicReference; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.hbase.Cell; 036import org.apache.hadoop.hbase.ExtendedCell; 037import org.apache.hadoop.hbase.HBaseClassTestRule; 038import org.apache.hadoop.hbase.HBaseTestingUtil; 039import org.apache.hadoop.hbase.HConstants; 040import org.apache.hadoop.hbase.ServerName; 041import org.apache.hadoop.hbase.TableName; 042import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 043import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; 044import org.apache.hadoop.hbase.coprocessor.ObserverContext; 045import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 046import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 047import org.apache.hadoop.hbase.coprocessor.RegionObserver; 048import org.apache.hadoop.hbase.io.hfile.BlockCache; 049import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 050import org.apache.hadoop.hbase.io.hfile.CacheConfig; 051import org.apache.hadoop.hbase.io.hfile.CachedBlock; 052import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache; 053import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; 054import org.apache.hadoop.hbase.regionserver.BloomType; 055import org.apache.hadoop.hbase.regionserver.HRegion; 056import org.apache.hadoop.hbase.regionserver.HStore; 057import org.apache.hadoop.hbase.regionserver.InternalScanner; 058import org.apache.hadoop.hbase.regionserver.RegionScanner; 059import org.apache.hadoop.hbase.regionserver.ScannerContext; 060import org.apache.hadoop.hbase.testclassification.ClientTests; 061import org.apache.hadoop.hbase.testclassification.LargeTests; 062import org.apache.hadoop.hbase.util.Bytes; 063import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 064import org.junit.After; 065import org.junit.AfterClass; 066import org.junit.Before; 067import org.junit.BeforeClass; 068import org.junit.ClassRule; 069import org.junit.Rule; 070import org.junit.Test; 071import org.junit.experimental.categories.Category; 072import org.junit.rules.TestName; 073import org.slf4j.Logger; 074import org.slf4j.LoggerFactory; 075 076import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 077 078@Category({ LargeTests.class, ClientTests.class }) 079@SuppressWarnings("deprecation") 080public class TestBlockEvictionFromClient { 081 082 @ClassRule 083 public static final HBaseClassTestRule CLASS_RULE = 084 HBaseClassTestRule.forClass(TestBlockEvictionFromClient.class); 085 086 private static final Logger LOG = LoggerFactory.getLogger(TestBlockEvictionFromClient.class); 087 protected final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 088 static byte[][] ROWS = new byte[2][]; 089 private static int NO_OF_THREADS = 3; 090 private static byte[] ROW = Bytes.toBytes("testRow"); 091 private static byte[] ROW1 = Bytes.toBytes("testRow1"); 092 private static byte[] ROW2 = Bytes.toBytes("testRow2"); 093 private static byte[] ROW3 = Bytes.toBytes("testRow3"); 094 private static byte[] FAMILY = Bytes.toBytes("testFamily"); 095 private static byte[][] FAMILIES_1 = new byte[1][0]; 096 private static byte[] QUALIFIER = Bytes.toBytes("testQualifier"); 097 private static byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 098 private static byte[] data = new byte[1000]; 099 private static byte[] data2 = Bytes.add(data, data); 100 protected static int SLAVES = 1; 101 private static CountDownLatch latch; 102 private static CountDownLatch getLatch; 103 private static CountDownLatch compactionLatch; 104 private static CountDownLatch exceptionLatch; 105 106 @Rule 107 public TestName name = new TestName(); 108 109 /** 110 * @throws java.lang.Exception 111 */ 112 @BeforeClass 113 public static void setUpBeforeClass() throws Exception { 114 ROWS[0] = ROW; 115 ROWS[1] = ROW1; 116 Configuration conf = TEST_UTIL.getConfiguration(); 117 conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, 118 MultiRowMutationEndpoint.class.getName()); 119 conf.setInt("hbase.regionserver.handler.count", 20); 120 conf.setInt("hbase.bucketcache.size", 400); 121 conf.setStrings(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap"); 122 conf.setFloat("hfile.block.cache.size", 0.2f); 123 conf.setFloat("hbase.regionserver.global.memstore.size", 0.1f); 124 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);// do not retry 125 conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 5000); 126 FAMILIES_1[0] = FAMILY; 127 TEST_UTIL.startMiniCluster(SLAVES); 128 } 129 130 /** 131 * @throws java.lang.Exception 132 */ 133 @AfterClass 134 public static void tearDownAfterClass() throws Exception { 135 TEST_UTIL.shutdownMiniCluster(); 136 } 137 138 /** 139 * @throws java.lang.Exception 140 */ 141 @Before 142 public void setUp() throws Exception { 143 CustomInnerRegionObserver.waitForGets.set(false); 144 CustomInnerRegionObserver.countOfNext.set(0); 145 CustomInnerRegionObserver.countOfGets.set(0); 146 } 147 148 /** 149 * @throws java.lang.Exception 150 */ 151 @After 152 public void tearDown() throws Exception { 153 if (latch != null) { 154 while (latch.getCount() > 0) { 155 latch.countDown(); 156 } 157 } 158 if (getLatch != null) { 159 getLatch.countDown(); 160 } 161 if (compactionLatch != null) { 162 compactionLatch.countDown(); 163 } 164 if (exceptionLatch != null) { 165 exceptionLatch.countDown(); 166 } 167 latch = null; 168 getLatch = null; 169 compactionLatch = null; 170 exceptionLatch = null; 171 CustomInnerRegionObserver.throwException.set(false); 172 // Clean up the tables for every test case 173 TableName[] listTableNames = TEST_UTIL.getAdmin().listTableNames(); 174 for (TableName tableName : listTableNames) { 175 if (!tableName.isSystemTable()) { 176 TEST_UTIL.getAdmin().disableTable(tableName); 177 TEST_UTIL.getAdmin().deleteTable(tableName); 178 } 179 } 180 } 181 182 @Test 183 public void testBlockEvictionWithParallelScans() throws Exception { 184 Table table = null; 185 try { 186 latch = new CountDownLatch(1); 187 final TableName tableName = TableName.valueOf(name.getMethodName()); 188 // Create a table with block size as 1024 189 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 190 CustomInnerRegionObserver.class.getName()); 191 // get the block cache and region 192 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 193 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 194 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 195 HStore store = region.getStores().iterator().next(); 196 CacheConfig cacheConf = store.getCacheConfig(); 197 cacheConf.setCacheDataOnWrite(true); 198 cacheConf.setEvictOnClose(true); 199 BlockCache cache = cacheConf.getBlockCache().get(); 200 201 // insert data. 2 Rows are added 202 Put put = new Put(ROW); 203 put.addColumn(FAMILY, QUALIFIER, data); 204 table.put(put); 205 put = new Put(ROW1); 206 put.addColumn(FAMILY, QUALIFIER, data); 207 table.put(put); 208 assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data)); 209 // data was in memstore so don't expect any changes 210 // flush the data 211 // Should create one Hfile with 2 blocks 212 region.flush(true); 213 // Load cache 214 // Create three sets of scan 215 ScanThread[] scanThreads = initiateScan(table, false); 216 Thread.sleep(100); 217 checkForBlockEviction(cache, false, false); 218 for (ScanThread thread : scanThreads) { 219 thread.join(); 220 } 221 // CustomInnerRegionObserver.sleepTime.set(0); 222 Iterator<CachedBlock> iterator = cache.iterator(); 223 iterateBlockCache(cache, iterator); 224 // read the data and expect same blocks, one new hit, no misses 225 assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data)); 226 iterator = cache.iterator(); 227 iterateBlockCache(cache, iterator); 228 // Check how this miss is happening 229 // insert a second column, read the row, no new blocks, 3 new hits 230 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 231 byte[] data2 = Bytes.add(data, data); 232 put = new Put(ROW); 233 put.addColumn(FAMILY, QUALIFIER2, data2); 234 table.put(put); 235 Result r = table.get(new Get(ROW)); 236 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data)); 237 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2)); 238 iterator = cache.iterator(); 239 iterateBlockCache(cache, iterator); 240 // flush, one new block 241 System.out.println("Flushing cache"); 242 region.flush(true); 243 iterator = cache.iterator(); 244 iterateBlockCache(cache, iterator); 245 // compact, net minus two blocks, two hits, no misses 246 System.out.println("Compacting"); 247 assertEquals(2, store.getStorefilesCount()); 248 store.triggerMajorCompaction(); 249 region.compact(true); 250 waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max 251 assertEquals(1, store.getStorefilesCount()); 252 iterator = cache.iterator(); 253 iterateBlockCache(cache, iterator); 254 // read the row, this should be a cache miss because we don't cache data 255 // blocks on compaction 256 r = table.get(new Get(ROW)); 257 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data)); 258 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2)); 259 iterator = cache.iterator(); 260 iterateBlockCache(cache, iterator); 261 } finally { 262 if (table != null) { 263 table.close(); 264 } 265 } 266 } 267 268 @Test 269 public void testParallelGetsAndScans() throws IOException, InterruptedException { 270 Table table = null; 271 try { 272 latch = new CountDownLatch(2); 273 // Check if get() returns blocks on its close() itself 274 getLatch = new CountDownLatch(1); 275 final TableName tableName = TableName.valueOf(name.getMethodName()); 276 // Create KV that will give you two blocks 277 // Create a table with block size as 1024 278 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 279 CustomInnerRegionObserver.class.getName()); 280 // get the block cache and region 281 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 282 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 283 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 284 HStore store = region.getStores().iterator().next(); 285 CacheConfig cacheConf = store.getCacheConfig(); 286 cacheConf.setCacheDataOnWrite(true); 287 cacheConf.setEvictOnClose(true); 288 BlockCache cache = cacheConf.getBlockCache().get(); 289 290 insertData(table); 291 // flush the data 292 System.out.println("Flushing cache"); 293 // Should create one Hfile with 2 blocks 294 region.flush(true); 295 // Create three sets of scan 296 CustomInnerRegionObserver.waitForGets.set(true); 297 ScanThread[] scanThreads = initiateScan(table, false); 298 // Create three sets of gets 299 GetThread[] getThreads = initiateGet(table, false, false); 300 checkForBlockEviction(cache, false, false); 301 CustomInnerRegionObserver.waitForGets.set(false); 302 checkForBlockEviction(cache, false, false); 303 for (GetThread thread : getThreads) { 304 thread.join(); 305 } 306 // Verify whether the gets have returned the blocks that it had 307 CustomInnerRegionObserver.waitForGets.set(true); 308 // giving some time for the block to be decremented 309 checkForBlockEviction(cache, true, false); 310 getLatch.countDown(); 311 for (ScanThread thread : scanThreads) { 312 thread.join(); 313 } 314 System.out.println("Scans should have returned the bloks"); 315 // Check with either true or false 316 CustomInnerRegionObserver.waitForGets.set(false); 317 // The scan should also have released the blocks by now 318 checkForBlockEviction(cache, true, true); 319 } finally { 320 if (table != null) { 321 table.close(); 322 } 323 } 324 } 325 326 @Test 327 public void testGetWithCellsInDifferentFiles() throws IOException, InterruptedException { 328 Table table = null; 329 try { 330 latch = new CountDownLatch(1); 331 // Check if get() returns blocks on its close() itself 332 getLatch = new CountDownLatch(1); 333 final TableName tableName = TableName.valueOf(name.getMethodName()); 334 // Create KV that will give you two blocks 335 // Create a table with block size as 1024 336 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 337 CustomInnerRegionObserver.class.getName()); 338 // get the block cache and region 339 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 340 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 341 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 342 HStore store = region.getStores().iterator().next(); 343 CacheConfig cacheConf = store.getCacheConfig(); 344 cacheConf.setCacheDataOnWrite(true); 345 cacheConf.setEvictOnClose(true); 346 BlockCache cache = cacheConf.getBlockCache().get(); 347 348 Put put = new Put(ROW); 349 put.addColumn(FAMILY, QUALIFIER, data); 350 table.put(put); 351 region.flush(true); 352 put = new Put(ROW1); 353 put.addColumn(FAMILY, QUALIFIER, data); 354 table.put(put); 355 region.flush(true); 356 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 357 put = new Put(ROW); 358 put.addColumn(FAMILY, QUALIFIER2, data2); 359 table.put(put); 360 region.flush(true); 361 // flush the data 362 System.out.println("Flushing cache"); 363 // Should create one Hfile with 2 blocks 364 CustomInnerRegionObserver.waitForGets.set(true); 365 // Create three sets of gets 366 GetThread[] getThreads = initiateGet(table, false, false); 367 Thread.sleep(200); 368 CustomInnerRegionObserver.getCdl().get().countDown(); 369 for (GetThread thread : getThreads) { 370 thread.join(); 371 } 372 // Verify whether the gets have returned the blocks that it had 373 CustomInnerRegionObserver.waitForGets.set(true); 374 // giving some time for the block to be decremented 375 checkForBlockEviction(cache, true, false); 376 getLatch.countDown(); 377 System.out.println("Gets should have returned the bloks"); 378 } finally { 379 if (table != null) { 380 table.close(); 381 } 382 } 383 } 384 385 @Test 386 // TODO : check how block index works here 387 public void testGetsWithMultiColumnsAndExplicitTracker() 388 throws IOException, InterruptedException { 389 Table table = null; 390 try { 391 latch = new CountDownLatch(1); 392 // Check if get() returns blocks on its close() itself 393 getLatch = new CountDownLatch(1); 394 final TableName tableName = TableName.valueOf(name.getMethodName()); 395 // Create KV that will give you two blocks 396 // Create a table with block size as 1024 397 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 398 CustomInnerRegionObserver.class.getName()); 399 // get the block cache and region 400 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 401 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 402 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 403 BlockCache cache = setCacheProperties(region); 404 Put put = new Put(ROW); 405 put.addColumn(FAMILY, QUALIFIER, data); 406 table.put(put); 407 region.flush(true); 408 put = new Put(ROW1); 409 put.addColumn(FAMILY, QUALIFIER, data); 410 table.put(put); 411 region.flush(true); 412 for (int i = 1; i < 10; i++) { 413 put = new Put(ROW); 414 put.addColumn(FAMILY, Bytes.toBytes("testQualifier" + i), data2); 415 table.put(put); 416 if (i % 2 == 0) { 417 region.flush(true); 418 } 419 } 420 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 421 put = new Put(ROW); 422 put.addColumn(FAMILY, QUALIFIER2, data2); 423 table.put(put); 424 region.flush(true); 425 // flush the data 426 System.out.println("Flushing cache"); 427 // Should create one Hfile with 2 blocks 428 CustomInnerRegionObserver.waitForGets.set(true); 429 // Create three sets of gets 430 GetThread[] getThreads = initiateGet(table, true, false); 431 Thread.sleep(200); 432 Iterator<CachedBlock> iterator = cache.iterator(); 433 boolean usedBlocksFound = false; 434 int refCount = 0; 435 int noOfBlocksWithRef = 0; 436 while (iterator.hasNext()) { 437 CachedBlock next = iterator.next(); 438 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 439 if (cache instanceof BucketCache) { 440 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 441 } else if (cache instanceof CombinedBlockCache) { 442 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 443 } else { 444 continue; 445 } 446 if (refCount != 0) { 447 // Blocks will be with count 3 448 System.out.println("The refCount is " + refCount); 449 assertEquals(NO_OF_THREADS, refCount); 450 usedBlocksFound = true; 451 noOfBlocksWithRef++; 452 } 453 } 454 assertTrue(usedBlocksFound); 455 // the number of blocks referred 456 assertEquals(10, noOfBlocksWithRef); 457 CustomInnerRegionObserver.getCdl().get().countDown(); 458 for (GetThread thread : getThreads) { 459 thread.join(); 460 } 461 // Verify whether the gets have returned the blocks that it had 462 CustomInnerRegionObserver.waitForGets.set(true); 463 // giving some time for the block to be decremented 464 checkForBlockEviction(cache, true, false); 465 getLatch.countDown(); 466 System.out.println("Gets should have returned the bloks"); 467 } finally { 468 if (table != null) { 469 table.close(); 470 } 471 } 472 } 473 474 @Test 475 public void testGetWithMultipleColumnFamilies() throws IOException, InterruptedException { 476 Table table = null; 477 try { 478 latch = new CountDownLatch(1); 479 // Check if get() returns blocks on its close() itself 480 getLatch = new CountDownLatch(1); 481 final TableName tableName = TableName.valueOf(name.getMethodName()); 482 // Create KV that will give you two blocks 483 // Create a table with block size as 1024 484 byte[][] fams = new byte[10][]; 485 fams[0] = FAMILY; 486 for (int i = 1; i < 10; i++) { 487 fams[i] = (Bytes.toBytes("testFamily" + i)); 488 } 489 table = 490 TEST_UTIL.createTable(tableName, fams, 1, 1024, CustomInnerRegionObserver.class.getName()); 491 // get the block cache and region 492 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 493 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 494 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 495 BlockCache cache = setCacheProperties(region); 496 497 Put put = new Put(ROW); 498 put.addColumn(FAMILY, QUALIFIER, data); 499 table.put(put); 500 region.flush(true); 501 put = new Put(ROW1); 502 put.addColumn(FAMILY, QUALIFIER, data); 503 table.put(put); 504 region.flush(true); 505 for (int i = 1; i < 10; i++) { 506 put = new Put(ROW); 507 put.addColumn(Bytes.toBytes("testFamily" + i), Bytes.toBytes("testQualifier" + i), data2); 508 table.put(put); 509 if (i % 2 == 0) { 510 region.flush(true); 511 } 512 } 513 region.flush(true); 514 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 515 put = new Put(ROW); 516 put.addColumn(FAMILY, QUALIFIER2, data2); 517 table.put(put); 518 region.flush(true); 519 // flush the data 520 System.out.println("Flushing cache"); 521 // Should create one Hfile with 2 blocks 522 CustomInnerRegionObserver.waitForGets.set(true); 523 // Create three sets of gets 524 GetThread[] getThreads = initiateGet(table, true, true); 525 Thread.sleep(200); 526 Iterator<CachedBlock> iterator = cache.iterator(); 527 boolean usedBlocksFound = false; 528 int refCount = 0; 529 int noOfBlocksWithRef = 0; 530 while (iterator.hasNext()) { 531 CachedBlock next = iterator.next(); 532 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 533 if (cache instanceof BucketCache) { 534 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 535 } else if (cache instanceof CombinedBlockCache) { 536 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 537 } else { 538 continue; 539 } 540 if (refCount != 0) { 541 // Blocks will be with count 3 542 System.out.println("The refCount is " + refCount); 543 assertEquals(NO_OF_THREADS, refCount); 544 usedBlocksFound = true; 545 noOfBlocksWithRef++; 546 } 547 } 548 assertTrue(usedBlocksFound); 549 // the number of blocks referred 550 assertEquals(3, noOfBlocksWithRef); 551 CustomInnerRegionObserver.getCdl().get().countDown(); 552 for (GetThread thread : getThreads) { 553 thread.join(); 554 } 555 // Verify whether the gets have returned the blocks that it had 556 CustomInnerRegionObserver.waitForGets.set(true); 557 // giving some time for the block to be decremented 558 checkForBlockEviction(cache, true, false); 559 getLatch.countDown(); 560 System.out.println("Gets should have returned the bloks"); 561 } finally { 562 if (table != null) { 563 table.close(); 564 } 565 } 566 } 567 568 @Test 569 public void testBlockRefCountAfterSplits() throws IOException, InterruptedException { 570 Table table = null; 571 try { 572 final TableName tableName = TableName.valueOf(name.getMethodName()); 573 TableDescriptor desc = TEST_UTIL.createTableDescriptor(tableName); 574 // This test expects rpc refcount of cached data blocks to be 0 after split. After split, 575 // two daughter regions are opened and a compaction is scheduled to get rid of reference 576 // of the parent region hfiles. Compaction will increase refcount of cached data blocks by 1. 577 // It is flakey since compaction can kick in anytime. To solve this issue, table is created 578 // with compaction disabled. 579 table = TEST_UTIL.createTable( 580 TableDescriptorBuilder.newBuilder(desc).setCompactionEnabled(false).build(), FAMILIES_1, 581 null, BloomType.ROW, 1024, null); 582 // get the block cache and region 583 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 584 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 585 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 586 HStore store = region.getStores().iterator().next(); 587 CacheConfig cacheConf = store.getCacheConfig(); 588 cacheConf.setEvictOnClose(true); 589 BlockCache cache = cacheConf.getBlockCache().get(); 590 591 Put put = new Put(ROW); 592 put.addColumn(FAMILY, QUALIFIER, data); 593 table.put(put); 594 region.flush(true); 595 put = new Put(ROW1); 596 put.addColumn(FAMILY, QUALIFIER, data); 597 table.put(put); 598 region.flush(true); 599 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 600 put = new Put(ROW2); 601 put.addColumn(FAMILY, QUALIFIER2, data2); 602 table.put(put); 603 put = new Put(ROW3); 604 put.addColumn(FAMILY, QUALIFIER2, data2); 605 table.put(put); 606 region.flush(true); 607 ServerName rs = Iterables.getOnlyElement(TEST_UTIL.getAdmin().getRegionServers()); 608 int regionCount = TEST_UTIL.getAdmin().getRegions(rs).size(); 609 LOG.info("About to SPLIT on {} {}, count={}", Bytes.toString(ROW1), region.getRegionInfo(), 610 regionCount); 611 TEST_UTIL.getAdmin().split(tableName, ROW1); 612 // Wait for splits 613 TEST_UTIL.waitFor(60000, () -> TEST_UTIL.getAdmin().getRegions(rs).size() > regionCount); 614 region.compact(true); 615 List<HRegion> regions = TEST_UTIL.getMiniHBaseCluster().getRegionServer(rs).getRegions(); 616 for (HRegion r : regions) { 617 LOG.info("" + r.getCompactionState()); 618 TEST_UTIL.waitFor(30000, () -> r.getCompactionState().equals(CompactionState.NONE)); 619 } 620 LOG.info("Split finished, is region closed {} {}", region.isClosed(), cache); 621 Iterator<CachedBlock> iterator = cache.iterator(); 622 // Though the split had created the HalfStorefileReader - the firstkey and lastkey scanners 623 // should be closed inorder to return those blocks 624 iterateBlockCache(cache, iterator); 625 } finally { 626 if (table != null) { 627 table.close(); 628 } 629 } 630 } 631 632 @Test 633 public void testMultiGets() throws IOException, InterruptedException { 634 Table table = null; 635 try { 636 latch = new CountDownLatch(2); 637 // Check if get() returns blocks on its close() itself 638 getLatch = new CountDownLatch(1); 639 final TableName tableName = TableName.valueOf(name.getMethodName()); 640 // Create KV that will give you two blocks 641 // Create a table with block size as 1024 642 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 643 CustomInnerRegionObserver.class.getName()); 644 // get the block cache and region 645 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 646 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 647 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 648 HStore store = region.getStores().iterator().next(); 649 CacheConfig cacheConf = store.getCacheConfig(); 650 cacheConf.setCacheDataOnWrite(true); 651 cacheConf.setEvictOnClose(true); 652 BlockCache cache = cacheConf.getBlockCache().get(); 653 654 Put put = new Put(ROW); 655 put.addColumn(FAMILY, QUALIFIER, data); 656 table.put(put); 657 region.flush(true); 658 put = new Put(ROW1); 659 put.addColumn(FAMILY, QUALIFIER, data); 660 table.put(put); 661 region.flush(true); 662 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 663 put = new Put(ROW); 664 put.addColumn(FAMILY, QUALIFIER2, data2); 665 table.put(put); 666 region.flush(true); 667 // flush the data 668 System.out.println("Flushing cache"); 669 // Should create one Hfile with 2 blocks 670 CustomInnerRegionObserver.waitForGets.set(true); 671 // Create three sets of gets 672 MultiGetThread[] getThreads = initiateMultiGet(table); 673 Thread.sleep(200); 674 int refCount; 675 Iterator<CachedBlock> iterator = cache.iterator(); 676 boolean foundNonZeroBlock = false; 677 while (iterator.hasNext()) { 678 CachedBlock next = iterator.next(); 679 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 680 if (cache instanceof BucketCache) { 681 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 682 } else if (cache instanceof CombinedBlockCache) { 683 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 684 } else { 685 continue; 686 } 687 if (refCount != 0) { 688 assertEquals(NO_OF_THREADS, refCount); 689 foundNonZeroBlock = true; 690 } 691 } 692 assertTrue("Should have found nonzero ref count block", foundNonZeroBlock); 693 CustomInnerRegionObserver.getCdl().get().countDown(); 694 CustomInnerRegionObserver.getCdl().get().countDown(); 695 for (MultiGetThread thread : getThreads) { 696 thread.join(); 697 } 698 // Verify whether the gets have returned the blocks that it had 699 CustomInnerRegionObserver.waitForGets.set(true); 700 // giving some time for the block to be decremented 701 iterateBlockCache(cache, iterator); 702 getLatch.countDown(); 703 System.out.println("Gets should have returned the bloks"); 704 } finally { 705 if (table != null) { 706 table.close(); 707 } 708 } 709 } 710 711 @Test 712 public void testScanWithMultipleColumnFamilies() throws IOException, InterruptedException { 713 Table table = null; 714 try { 715 latch = new CountDownLatch(1); 716 // Check if get() returns blocks on its close() itself 717 final TableName tableName = TableName.valueOf(name.getMethodName()); 718 // Create KV that will give you two blocks 719 // Create a table with block size as 1024 720 byte[][] fams = new byte[10][]; 721 fams[0] = FAMILY; 722 for (int i = 1; i < 10; i++) { 723 fams[i] = (Bytes.toBytes("testFamily" + i)); 724 } 725 table = 726 TEST_UTIL.createTable(tableName, fams, 1, 1024, CustomInnerRegionObserver.class.getName()); 727 // get the block cache and region 728 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 729 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 730 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 731 BlockCache cache = setCacheProperties(region); 732 733 Put put = new Put(ROW); 734 put.addColumn(FAMILY, QUALIFIER, data); 735 table.put(put); 736 region.flush(true); 737 put = new Put(ROW1); 738 put.addColumn(FAMILY, QUALIFIER, data); 739 table.put(put); 740 region.flush(true); 741 for (int i = 1; i < 10; i++) { 742 put = new Put(ROW); 743 put.addColumn(Bytes.toBytes("testFamily" + i), Bytes.toBytes("testQualifier" + i), data2); 744 table.put(put); 745 if (i % 2 == 0) { 746 region.flush(true); 747 } 748 } 749 region.flush(true); 750 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 751 put = new Put(ROW); 752 put.addColumn(FAMILY, QUALIFIER2, data2); 753 table.put(put); 754 region.flush(true); 755 // flush the data 756 System.out.println("Flushing cache"); 757 // Should create one Hfile with 2 blocks 758 // Create three sets of gets 759 ScanThread[] scanThreads = initiateScan(table, true); 760 Thread.sleep(200); 761 Iterator<CachedBlock> iterator = cache.iterator(); 762 boolean usedBlocksFound = false; 763 int refCount = 0; 764 int noOfBlocksWithRef = 0; 765 while (iterator.hasNext()) { 766 CachedBlock next = iterator.next(); 767 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 768 if (cache instanceof BucketCache) { 769 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 770 } else if (cache instanceof CombinedBlockCache) { 771 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 772 } else { 773 continue; 774 } 775 if (refCount != 0) { 776 // Blocks will be with count 3 777 System.out.println("The refCount is " + refCount); 778 assertEquals(NO_OF_THREADS, refCount); 779 usedBlocksFound = true; 780 noOfBlocksWithRef++; 781 } 782 } 783 assertTrue(usedBlocksFound); 784 // the number of blocks referred 785 assertEquals(12, noOfBlocksWithRef); 786 CustomInnerRegionObserver.getCdl().get().countDown(); 787 for (ScanThread thread : scanThreads) { 788 thread.join(); 789 } 790 // giving some time for the block to be decremented 791 checkForBlockEviction(cache, true, false); 792 } finally { 793 if (table != null) { 794 table.close(); 795 } 796 } 797 } 798 799 private BlockCache setCacheProperties(HRegion region) { 800 Iterator<HStore> strItr = region.getStores().iterator(); 801 BlockCache cache = null; 802 while (strItr.hasNext()) { 803 HStore store = strItr.next(); 804 CacheConfig cacheConf = store.getCacheConfig(); 805 cacheConf.setCacheDataOnWrite(true); 806 cacheConf.setEvictOnClose(true); 807 // Use the last one 808 cache = cacheConf.getBlockCache().get(); 809 } 810 return cache; 811 } 812 813 @Test 814 public void testParallelGetsAndScanWithWrappedRegionScanner() 815 throws IOException, InterruptedException { 816 Table table = null; 817 try { 818 latch = new CountDownLatch(2); 819 // Check if get() returns blocks on its close() itself 820 getLatch = new CountDownLatch(1); 821 final TableName tableName = TableName.valueOf(name.getMethodName()); 822 // Create KV that will give you two blocks 823 // Create a table with block size as 1024 824 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 825 CustomInnerRegionObserverWrapper.class.getName()); 826 // get the block cache and region 827 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 828 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 829 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 830 HStore store = region.getStores().iterator().next(); 831 CacheConfig cacheConf = store.getCacheConfig(); 832 cacheConf.setCacheDataOnWrite(true); 833 cacheConf.setEvictOnClose(true); 834 BlockCache cache = cacheConf.getBlockCache().get(); 835 836 // insert data. 2 Rows are added 837 insertData(table); 838 // flush the data 839 System.out.println("Flushing cache"); 840 // Should create one Hfile with 2 blocks 841 region.flush(true); 842 // CustomInnerRegionObserver.sleepTime.set(5000); 843 // Create three sets of scan 844 CustomInnerRegionObserver.waitForGets.set(true); 845 ScanThread[] scanThreads = initiateScan(table, false); 846 // Create three sets of gets 847 GetThread[] getThreads = initiateGet(table, false, false); 848 // The block would have been decremented for the scan case as it was 849 // wrapped 850 // before even the postNext hook gets executed. 851 // giving some time for the block to be decremented 852 Thread.sleep(100); 853 CustomInnerRegionObserver.waitForGets.set(false); 854 checkForBlockEviction(cache, false, false); 855 // countdown the latch 856 CustomInnerRegionObserver.getCdl().get().countDown(); 857 for (GetThread thread : getThreads) { 858 thread.join(); 859 } 860 getLatch.countDown(); 861 for (ScanThread thread : scanThreads) { 862 thread.join(); 863 } 864 } finally { 865 if (table != null) { 866 table.close(); 867 } 868 } 869 } 870 871 @Test 872 public void testScanWithCompaction() throws IOException, InterruptedException { 873 testScanWithCompactionInternals(name.getMethodName(), false); 874 } 875 876 @Test 877 public void testReverseScanWithCompaction() throws IOException, InterruptedException { 878 testScanWithCompactionInternals(name.getMethodName(), true); 879 } 880 881 private void testScanWithCompactionInternals(String tableNameStr, boolean reversed) 882 throws IOException, InterruptedException { 883 Table table = null; 884 try { 885 latch = new CountDownLatch(1); 886 compactionLatch = new CountDownLatch(1); 887 TableName tableName = TableName.valueOf(tableNameStr); 888 // Create a table with block size as 1024 889 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 890 CustomInnerRegionObserverWrapper.class.getName()); 891 // get the block cache and region 892 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 893 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 894 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 895 HStore store = region.getStores().iterator().next(); 896 CacheConfig cacheConf = store.getCacheConfig(); 897 cacheConf.setCacheDataOnWrite(true); 898 cacheConf.setEvictOnClose(true); 899 BlockCache cache = cacheConf.getBlockCache().get(); 900 901 // insert data. 2 Rows are added 902 Put put = new Put(ROW); 903 put.addColumn(FAMILY, QUALIFIER, data); 904 table.put(put); 905 put = new Put(ROW1); 906 put.addColumn(FAMILY, QUALIFIER, data); 907 table.put(put); 908 assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data)); 909 // Should create one Hfile with 2 blocks 910 region.flush(true); 911 // read the data and expect same blocks, one new hit, no misses 912 int refCount = 0; 913 // Check how this miss is happening 914 // insert a second column, read the row, no new blocks, 3 new hits 915 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 916 byte[] data2 = Bytes.add(data, data); 917 put = new Put(ROW); 918 put.addColumn(FAMILY, QUALIFIER2, data2); 919 table.put(put); 920 // flush, one new block 921 System.out.println("Flushing cache"); 922 region.flush(true); 923 Iterator<CachedBlock> iterator = cache.iterator(); 924 iterateBlockCache(cache, iterator); 925 // Create three sets of scan 926 ScanThread[] scanThreads = initiateScan(table, reversed); 927 Thread.sleep(100); 928 iterator = cache.iterator(); 929 boolean usedBlocksFound = false; 930 while (iterator.hasNext()) { 931 CachedBlock next = iterator.next(); 932 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 933 if (cache instanceof BucketCache) { 934 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 935 } else if (cache instanceof CombinedBlockCache) { 936 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 937 } else { 938 continue; 939 } 940 if (refCount != 0) { 941 // Blocks will be with count 3 942 assertEquals(NO_OF_THREADS, refCount); 943 usedBlocksFound = true; 944 } 945 } 946 assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound); 947 usedBlocksFound = false; 948 System.out.println("Compacting"); 949 assertEquals(2, store.getStorefilesCount()); 950 store.triggerMajorCompaction(); 951 region.compact(true); 952 waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max 953 assertEquals(1, store.getStorefilesCount()); 954 // Even after compaction is done we will have some blocks that cannot 955 // be evicted this is because the scan is still referencing them 956 iterator = cache.iterator(); 957 while (iterator.hasNext()) { 958 CachedBlock next = iterator.next(); 959 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 960 if (cache instanceof BucketCache) { 961 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 962 } else if (cache instanceof CombinedBlockCache) { 963 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 964 } else { 965 continue; 966 } 967 if (refCount != 0) { 968 // Blocks will be with count 3 as they are not yet cleared 969 assertEquals(NO_OF_THREADS, refCount); 970 usedBlocksFound = true; 971 } 972 } 973 assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound); 974 // Should not throw exception 975 compactionLatch.countDown(); 976 latch.countDown(); 977 for (ScanThread thread : scanThreads) { 978 thread.join(); 979 } 980 // by this time all blocks should have been evicted 981 iterator = cache.iterator(); 982 iterateBlockCache(cache, iterator); 983 Result r = table.get(new Get(ROW)); 984 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data)); 985 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2)); 986 // The gets would be working on new blocks 987 iterator = cache.iterator(); 988 iterateBlockCache(cache, iterator); 989 } finally { 990 if (table != null) { 991 table.close(); 992 } 993 } 994 } 995 996 @Test 997 public void testBlockEvictionAfterHBASE13082WithCompactionAndFlush() 998 throws IOException, InterruptedException { 999 // do flush and scan in parallel 1000 Table table = null; 1001 try { 1002 latch = new CountDownLatch(1); 1003 compactionLatch = new CountDownLatch(1); 1004 final TableName tableName = TableName.valueOf(name.getMethodName()); 1005 // Create a table with block size as 1024 1006 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 1007 CustomInnerRegionObserverWrapper.class.getName()); 1008 // get the block cache and region 1009 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 1010 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 1011 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 1012 HStore store = region.getStores().iterator().next(); 1013 CacheConfig cacheConf = store.getCacheConfig(); 1014 cacheConf.setCacheDataOnWrite(true); 1015 cacheConf.setEvictOnClose(true); 1016 BlockCache cache = cacheConf.getBlockCache().get(); 1017 1018 // insert data. 2 Rows are added 1019 Put put = new Put(ROW); 1020 put.addColumn(FAMILY, QUALIFIER, data); 1021 table.put(put); 1022 put = new Put(ROW1); 1023 put.addColumn(FAMILY, QUALIFIER, data); 1024 table.put(put); 1025 assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data)); 1026 // Should create one Hfile with 2 blocks 1027 region.flush(true); 1028 // read the data and expect same blocks, one new hit, no misses 1029 int refCount = 0; 1030 // Check how this miss is happening 1031 // insert a second column, read the row, no new blocks, 3 new hits 1032 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 1033 byte[] data2 = Bytes.add(data, data); 1034 put = new Put(ROW); 1035 put.addColumn(FAMILY, QUALIFIER2, data2); 1036 table.put(put); 1037 // flush, one new block 1038 System.out.println("Flushing cache"); 1039 region.flush(true); 1040 Iterator<CachedBlock> iterator = cache.iterator(); 1041 iterateBlockCache(cache, iterator); 1042 // Create three sets of scan 1043 ScanThread[] scanThreads = initiateScan(table, false); 1044 Thread.sleep(100); 1045 iterator = cache.iterator(); 1046 boolean usedBlocksFound = false; 1047 while (iterator.hasNext()) { 1048 CachedBlock next = iterator.next(); 1049 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 1050 if (cache instanceof BucketCache) { 1051 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 1052 } else if (cache instanceof CombinedBlockCache) { 1053 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 1054 } else { 1055 continue; 1056 } 1057 if (refCount != 0) { 1058 // Blocks will be with count 3 1059 assertEquals(NO_OF_THREADS, refCount); 1060 usedBlocksFound = true; 1061 } 1062 } 1063 // Make a put and do a flush 1064 QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 1065 data2 = Bytes.add(data, data); 1066 put = new Put(ROW1); 1067 put.addColumn(FAMILY, QUALIFIER2, data2); 1068 table.put(put); 1069 // flush, one new block 1070 System.out.println("Flushing cache"); 1071 region.flush(true); 1072 assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound); 1073 usedBlocksFound = false; 1074 System.out.println("Compacting"); 1075 assertEquals(3, store.getStorefilesCount()); 1076 store.triggerMajorCompaction(); 1077 region.compact(true); 1078 waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max 1079 assertEquals(1, store.getStorefilesCount()); 1080 // Even after compaction is done we will have some blocks that cannot 1081 // be evicted this is because the scan is still referencing them 1082 iterator = cache.iterator(); 1083 while (iterator.hasNext()) { 1084 CachedBlock next = iterator.next(); 1085 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 1086 if (cache instanceof BucketCache) { 1087 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 1088 } else if (cache instanceof CombinedBlockCache) { 1089 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 1090 } else { 1091 continue; 1092 } 1093 if (refCount != 0) { 1094 // Blocks will be with count 3 as they are not yet cleared 1095 assertEquals(NO_OF_THREADS, refCount); 1096 usedBlocksFound = true; 1097 } 1098 } 1099 assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound); 1100 // Should not throw exception 1101 compactionLatch.countDown(); 1102 latch.countDown(); 1103 for (ScanThread thread : scanThreads) { 1104 thread.join(); 1105 } 1106 // by this time all blocks should have been evicted 1107 iterator = cache.iterator(); 1108 // Since a flush and compaction happened after a scan started 1109 // we need to ensure that all the original blocks of the compacted file 1110 // is also removed. 1111 iterateBlockCache(cache, iterator); 1112 Result r = table.get(new Get(ROW)); 1113 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data)); 1114 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2)); 1115 // The gets would be working on new blocks 1116 iterator = cache.iterator(); 1117 iterateBlockCache(cache, iterator); 1118 } finally { 1119 if (table != null) { 1120 table.close(); 1121 } 1122 } 1123 } 1124 1125 @Test 1126 public void testScanWithException() throws IOException, InterruptedException { 1127 Table table = null; 1128 try { 1129 latch = new CountDownLatch(1); 1130 exceptionLatch = new CountDownLatch(1); 1131 final TableName tableName = TableName.valueOf(name.getMethodName()); 1132 // Create KV that will give you two blocks 1133 // Create a table with block size as 1024 1134 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 1135 CustomInnerRegionObserverWrapper.class.getName()); 1136 // get the block cache and region 1137 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 1138 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 1139 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 1140 HStore store = region.getStores().iterator().next(); 1141 CacheConfig cacheConf = store.getCacheConfig(); 1142 cacheConf.setCacheDataOnWrite(true); 1143 cacheConf.setEvictOnClose(true); 1144 BlockCache cache = cacheConf.getBlockCache().get(); 1145 // insert data. 2 Rows are added 1146 insertData(table); 1147 // flush the data 1148 System.out.println("Flushing cache"); 1149 // Should create one Hfile with 2 blocks 1150 region.flush(true); 1151 // CustomInnerRegionObserver.sleepTime.set(5000); 1152 CustomInnerRegionObserver.throwException.set(true); 1153 ScanThread[] scanThreads = initiateScan(table, false); 1154 // The block would have been decremented for the scan case as it was 1155 // wrapped 1156 // before even the postNext hook gets executed. 1157 // giving some time for the block to be decremented 1158 Thread.sleep(100); 1159 Iterator<CachedBlock> iterator = cache.iterator(); 1160 boolean usedBlocksFound = false; 1161 int refCount = 0; 1162 while (iterator.hasNext()) { 1163 CachedBlock next = iterator.next(); 1164 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 1165 if (cache instanceof BucketCache) { 1166 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 1167 } else if (cache instanceof CombinedBlockCache) { 1168 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 1169 } else { 1170 continue; 1171 } 1172 if (refCount != 0) { 1173 // Blocks will be with count 3 1174 assertEquals(NO_OF_THREADS, refCount); 1175 usedBlocksFound = true; 1176 } 1177 } 1178 assertTrue(usedBlocksFound); 1179 exceptionLatch.countDown(); 1180 // countdown the latch 1181 CustomInnerRegionObserver.getCdl().get().countDown(); 1182 for (ScanThread thread : scanThreads) { 1183 thread.join(); 1184 } 1185 iterator = cache.iterator(); 1186 usedBlocksFound = false; 1187 refCount = 0; 1188 while (iterator.hasNext()) { 1189 CachedBlock next = iterator.next(); 1190 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 1191 if (cache instanceof BucketCache) { 1192 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 1193 } else if (cache instanceof CombinedBlockCache) { 1194 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 1195 } else { 1196 continue; 1197 } 1198 if (refCount != 0) { 1199 // Blocks will be with count 3 1200 assertEquals(NO_OF_THREADS, refCount); 1201 usedBlocksFound = true; 1202 } 1203 } 1204 assertFalse(usedBlocksFound); 1205 // you should always see 0 ref count. since after HBASE-16604 we always recreate the scanner 1206 assertEquals(0, refCount); 1207 } finally { 1208 if (table != null) { 1209 table.close(); 1210 } 1211 } 1212 } 1213 1214 private void iterateBlockCache(BlockCache cache, Iterator<CachedBlock> iterator) { 1215 int refCount; 1216 while (iterator.hasNext()) { 1217 CachedBlock next = iterator.next(); 1218 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 1219 if (cache instanceof BucketCache) { 1220 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 1221 LOG.info("BucketCache {} {}", cacheKey, refCount); 1222 } else if (cache instanceof CombinedBlockCache) { 1223 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 1224 LOG.info("CombinedBlockCache {} {}", cacheKey, refCount); 1225 } else { 1226 continue; 1227 } 1228 assertEquals(0, refCount); 1229 } 1230 } 1231 1232 private void insertData(Table table) throws IOException { 1233 Put put = new Put(ROW); 1234 put.addColumn(FAMILY, QUALIFIER, data); 1235 table.put(put); 1236 put = new Put(ROW1); 1237 put.addColumn(FAMILY, QUALIFIER, data); 1238 table.put(put); 1239 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 1240 put = new Put(ROW); 1241 put.addColumn(FAMILY, QUALIFIER2, data2); 1242 table.put(put); 1243 } 1244 1245 private ScanThread[] initiateScan(Table table, boolean reverse) 1246 throws IOException, InterruptedException { 1247 ScanThread[] scanThreads = new ScanThread[NO_OF_THREADS]; 1248 for (int i = 0; i < NO_OF_THREADS; i++) { 1249 scanThreads[i] = new ScanThread(table, reverse); 1250 } 1251 for (ScanThread thread : scanThreads) { 1252 thread.start(); 1253 } 1254 return scanThreads; 1255 } 1256 1257 private GetThread[] initiateGet(Table table, boolean tracker, boolean multipleCFs) 1258 throws IOException, InterruptedException { 1259 GetThread[] getThreads = new GetThread[NO_OF_THREADS]; 1260 for (int i = 0; i < NO_OF_THREADS; i++) { 1261 getThreads[i] = new GetThread(table, tracker, multipleCFs); 1262 } 1263 for (GetThread thread : getThreads) { 1264 thread.start(); 1265 } 1266 return getThreads; 1267 } 1268 1269 private MultiGetThread[] initiateMultiGet(Table table) throws IOException, InterruptedException { 1270 MultiGetThread[] multiGetThreads = new MultiGetThread[NO_OF_THREADS]; 1271 for (int i = 0; i < NO_OF_THREADS; i++) { 1272 multiGetThreads[i] = new MultiGetThread(table); 1273 } 1274 for (MultiGetThread thread : multiGetThreads) { 1275 thread.start(); 1276 } 1277 return multiGetThreads; 1278 } 1279 1280 private void checkForBlockEviction(BlockCache cache, boolean getClosed, boolean expectOnlyZero) 1281 throws InterruptedException { 1282 int counter = NO_OF_THREADS; 1283 if (CustomInnerRegionObserver.waitForGets.get()) { 1284 // Because only one row is selected, it has only 2 blocks 1285 counter = counter - 1; 1286 while (CustomInnerRegionObserver.countOfGets.get() < NO_OF_THREADS) { 1287 Thread.sleep(100); 1288 } 1289 } else { 1290 while (CustomInnerRegionObserver.countOfNext.get() < NO_OF_THREADS) { 1291 Thread.sleep(100); 1292 } 1293 } 1294 Iterator<CachedBlock> iterator = cache.iterator(); 1295 int refCount = 0; 1296 while (iterator.hasNext()) { 1297 CachedBlock next = iterator.next(); 1298 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 1299 if (cache instanceof BucketCache) { 1300 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 1301 } else if (cache instanceof CombinedBlockCache) { 1302 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 1303 } else { 1304 continue; 1305 } 1306 System.out.println(" the refcount is " + refCount + " block is " + cacheKey); 1307 if (CustomInnerRegionObserver.waitForGets.get()) { 1308 if (expectOnlyZero) { 1309 assertTrue(refCount == 0); 1310 } 1311 if (refCount != 0) { 1312 // Because the scan would have also touched up on these blocks but 1313 // it 1314 // would have touched 1315 // all 3 1316 if (getClosed) { 1317 // If get has closed only the scan's blocks would be available 1318 assertEquals(refCount, CustomInnerRegionObserver.countOfGets.get()); 1319 } else { 1320 assertEquals(refCount, CustomInnerRegionObserver.countOfGets.get() + (NO_OF_THREADS)); 1321 } 1322 } 1323 } else { 1324 // Because the get would have also touched up on these blocks but it 1325 // would have touched 1326 // upon only 2 additionally 1327 if (expectOnlyZero) { 1328 assertTrue(refCount == 0); 1329 } 1330 if (refCount != 0) { 1331 if (getLatch == null) { 1332 assertEquals(refCount, CustomInnerRegionObserver.countOfNext.get()); 1333 } else { 1334 assertEquals(refCount, CustomInnerRegionObserver.countOfNext.get() + (NO_OF_THREADS)); 1335 } 1336 } 1337 } 1338 } 1339 CustomInnerRegionObserver.getCdl().get().countDown(); 1340 } 1341 1342 private static class MultiGetThread extends Thread { 1343 private final Table table; 1344 private final List<Get> gets = new ArrayList<>(); 1345 1346 public MultiGetThread(Table table) { 1347 this.table = table; 1348 } 1349 1350 @Override 1351 public void run() { 1352 gets.add(new Get(ROW)); 1353 gets.add(new Get(ROW1)); 1354 try { 1355 CustomInnerRegionObserver.getCdl().set(latch); 1356 Result[] r = table.get(gets); 1357 assertTrue(Bytes.equals(r[0].getRow(), ROW)); 1358 assertTrue(Bytes.equals(r[1].getRow(), ROW1)); 1359 } catch (IOException e) { 1360 } 1361 } 1362 } 1363 1364 private static class GetThread extends Thread { 1365 private final Table table; 1366 private final boolean tracker; 1367 private final boolean multipleCFs; 1368 1369 public GetThread(Table table, boolean tracker, boolean multipleCFs) { 1370 this.table = table; 1371 this.tracker = tracker; 1372 this.multipleCFs = multipleCFs; 1373 } 1374 1375 @Override 1376 public void run() { 1377 try { 1378 initiateGet(table); 1379 } catch (IOException e) { 1380 // do nothing 1381 } 1382 } 1383 1384 private void initiateGet(Table table) throws IOException { 1385 Get get = new Get(ROW); 1386 if (tracker) { 1387 // Change this 1388 if (!multipleCFs) { 1389 get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 3)); 1390 get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 8)); 1391 get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 9)); 1392 // Unknown key 1393 get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 900)); 1394 } else { 1395 get.addColumn(Bytes.toBytes("testFamily" + 3), Bytes.toBytes("testQualifier" + 3)); 1396 get.addColumn(Bytes.toBytes("testFamily" + 8), Bytes.toBytes("testQualifier" + 8)); 1397 get.addColumn(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 9)); 1398 // Unknown key 1399 get.addColumn(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 900)); 1400 } 1401 } 1402 CustomInnerRegionObserver.getCdl().set(latch); 1403 Result r = table.get(get); 1404 System.out.println(r); 1405 if (!tracker) { 1406 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data)); 1407 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2)); 1408 } else { 1409 if (!multipleCFs) { 1410 assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 3)), data2)); 1411 assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 8)), data2)); 1412 assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 9)), data2)); 1413 } else { 1414 assertTrue(Bytes.equals( 1415 r.getValue(Bytes.toBytes("testFamily" + 3), Bytes.toBytes("testQualifier" + 3)), 1416 data2)); 1417 assertTrue(Bytes.equals( 1418 r.getValue(Bytes.toBytes("testFamily" + 8), Bytes.toBytes("testQualifier" + 8)), 1419 data2)); 1420 assertTrue(Bytes.equals( 1421 r.getValue(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 9)), 1422 data2)); 1423 } 1424 } 1425 } 1426 } 1427 1428 private static class ScanThread extends Thread { 1429 private final Table table; 1430 private final boolean reverse; 1431 1432 public ScanThread(Table table, boolean reverse) { 1433 this.table = table; 1434 this.reverse = reverse; 1435 } 1436 1437 @Override 1438 public void run() { 1439 try { 1440 initiateScan(table); 1441 } catch (IOException e) { 1442 // do nothing 1443 } 1444 } 1445 1446 private void initiateScan(Table table) throws IOException { 1447 Scan scan = new Scan(); 1448 if (reverse) { 1449 scan.setReversed(true); 1450 } 1451 CustomInnerRegionObserver.getCdl().set(latch); 1452 ResultScanner resScanner = table.getScanner(scan); 1453 int i = (reverse ? ROWS.length - 1 : 0); 1454 boolean resultFound = false; 1455 for (Result result : resScanner) { 1456 resultFound = true; 1457 System.out.println(result); 1458 if (!reverse) { 1459 assertTrue(Bytes.equals(result.getRow(), ROWS[i])); 1460 i++; 1461 } else { 1462 assertTrue(Bytes.equals(result.getRow(), ROWS[i])); 1463 i--; 1464 } 1465 } 1466 assertTrue(resultFound); 1467 } 1468 } 1469 1470 private void waitForStoreFileCount(HStore store, int count, int timeout) 1471 throws InterruptedException { 1472 long start = EnvironmentEdgeManager.currentTime(); 1473 while ( 1474 start + timeout > EnvironmentEdgeManager.currentTime() && store.getStorefilesCount() != count 1475 ) { 1476 Thread.sleep(100); 1477 } 1478 System.out.println("start=" + start + ", now=" + EnvironmentEdgeManager.currentTime() + ", cur=" 1479 + store.getStorefilesCount()); 1480 assertEquals(count, store.getStorefilesCount()); 1481 } 1482 1483 private static class CustomScanner implements RegionScanner { 1484 1485 private RegionScanner delegate; 1486 1487 public CustomScanner(RegionScanner delegate) { 1488 this.delegate = delegate; 1489 } 1490 1491 @Override 1492 public boolean next(List<? super ExtendedCell> results) throws IOException { 1493 return delegate.next(results); 1494 } 1495 1496 @Override 1497 public boolean next(List<? super ExtendedCell> result, ScannerContext scannerContext) 1498 throws IOException { 1499 return delegate.next(result, scannerContext); 1500 } 1501 1502 @Override 1503 public boolean nextRaw(List<? super ExtendedCell> result) throws IOException { 1504 return delegate.nextRaw(result); 1505 } 1506 1507 @Override 1508 public boolean nextRaw(List<? super ExtendedCell> result, ScannerContext context) 1509 throws IOException { 1510 boolean nextRaw = delegate.nextRaw(result, context); 1511 if (compactionLatch != null && compactionLatch.getCount() > 0) { 1512 try { 1513 compactionLatch.await(); 1514 } catch (InterruptedException ie) { 1515 } 1516 } 1517 1518 if (CustomInnerRegionObserver.throwException.get()) { 1519 if (exceptionLatch.getCount() > 0) { 1520 try { 1521 exceptionLatch.await(); 1522 } catch (InterruptedException e) { 1523 } 1524 throw new IOException("throw exception"); 1525 } 1526 } 1527 return nextRaw; 1528 } 1529 1530 @Override 1531 public void close() throws IOException { 1532 delegate.close(); 1533 } 1534 1535 @Override 1536 public RegionInfo getRegionInfo() { 1537 return delegate.getRegionInfo(); 1538 } 1539 1540 @Override 1541 public boolean isFilterDone() throws IOException { 1542 return delegate.isFilterDone(); 1543 } 1544 1545 @Override 1546 public boolean reseek(byte[] row) throws IOException { 1547 return false; 1548 } 1549 1550 @Override 1551 public long getMaxResultSize() { 1552 return delegate.getMaxResultSize(); 1553 } 1554 1555 @Override 1556 public long getMvccReadPoint() { 1557 return delegate.getMvccReadPoint(); 1558 } 1559 1560 @Override 1561 public int getBatch() { 1562 return delegate.getBatch(); 1563 } 1564 } 1565 1566 public static class CustomInnerRegionObserverWrapper extends CustomInnerRegionObserver { 1567 @Override 1568 public RegionScanner postScannerOpen(ObserverContext<? extends RegionCoprocessorEnvironment> e, 1569 Scan scan, RegionScanner s) throws IOException { 1570 return new CustomScanner(s); 1571 } 1572 } 1573 1574 public static class CustomInnerRegionObserver implements RegionCoprocessor, RegionObserver { 1575 static final AtomicInteger countOfNext = new AtomicInteger(0); 1576 static final AtomicInteger countOfGets = new AtomicInteger(0); 1577 static final AtomicBoolean waitForGets = new AtomicBoolean(false); 1578 static final AtomicBoolean throwException = new AtomicBoolean(false); 1579 private static final AtomicReference<CountDownLatch> cdl = 1580 new AtomicReference<>(new CountDownLatch(0)); 1581 1582 @Override 1583 public Optional<RegionObserver> getRegionObserver() { 1584 return Optional.of(this); 1585 } 1586 1587 @Override 1588 public boolean postScannerNext(ObserverContext<? extends RegionCoprocessorEnvironment> e, 1589 InternalScanner s, List<Result> results, int limit, boolean hasMore) throws IOException { 1590 slowdownCode(e, false); 1591 if (getLatch != null && getLatch.getCount() > 0) { 1592 try { 1593 getLatch.await(); 1594 } catch (InterruptedException e1) { 1595 } 1596 } 1597 return hasMore; 1598 } 1599 1600 @Override 1601 public void postGetOp(ObserverContext<? extends RegionCoprocessorEnvironment> e, Get get, 1602 List<Cell> results) throws IOException { 1603 slowdownCode(e, true); 1604 } 1605 1606 public static AtomicReference<CountDownLatch> getCdl() { 1607 return cdl; 1608 } 1609 1610 private void slowdownCode(final ObserverContext<? extends RegionCoprocessorEnvironment> e, 1611 boolean isGet) { 1612 CountDownLatch latch = getCdl().get(); 1613 try { 1614 System.out.println(latch.getCount() + " is the count " + isGet); 1615 if (latch.getCount() > 0) { 1616 if (isGet) { 1617 countOfGets.incrementAndGet(); 1618 } else { 1619 countOfNext.incrementAndGet(); 1620 } 1621 LOG.info("Waiting for the counterCountDownLatch"); 1622 latch.await(2, TimeUnit.MINUTES); // To help the tests to finish. 1623 if (latch.getCount() > 0) { 1624 throw new RuntimeException("Can't wait more"); 1625 } 1626 } 1627 } catch (InterruptedException e1) { 1628 LOG.error(e1.toString(), e1); 1629 } 1630 } 1631 } 1632}