001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.Iterator; 027import java.util.List; 028import java.util.Optional; 029import java.util.concurrent.CountDownLatch; 030import java.util.concurrent.TimeUnit; 031import java.util.concurrent.atomic.AtomicBoolean; 032import java.util.concurrent.atomic.AtomicInteger; 033import java.util.concurrent.atomic.AtomicReference; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.hbase.Cell; 036import org.apache.hadoop.hbase.HBaseClassTestRule; 037import org.apache.hadoop.hbase.HBaseTestingUtility; 038import org.apache.hadoop.hbase.HConstants; 039import org.apache.hadoop.hbase.ServerName; 040import org.apache.hadoop.hbase.TableName; 041import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 042import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; 043import org.apache.hadoop.hbase.coprocessor.ObserverContext; 044import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 045import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 046import org.apache.hadoop.hbase.coprocessor.RegionObserver; 047import org.apache.hadoop.hbase.io.hfile.BlockCache; 048import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 049import org.apache.hadoop.hbase.io.hfile.CacheConfig; 050import org.apache.hadoop.hbase.io.hfile.CachedBlock; 051import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache; 052import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; 053import org.apache.hadoop.hbase.regionserver.BloomType; 054import org.apache.hadoop.hbase.regionserver.HRegion; 055import org.apache.hadoop.hbase.regionserver.HStore; 056import org.apache.hadoop.hbase.regionserver.InternalScanner; 057import org.apache.hadoop.hbase.regionserver.RegionScanner; 058import org.apache.hadoop.hbase.regionserver.ScannerContext; 059import org.apache.hadoop.hbase.testclassification.ClientTests; 060import org.apache.hadoop.hbase.testclassification.LargeTests; 061import org.apache.hadoop.hbase.util.Bytes; 062import org.junit.After; 063import org.junit.AfterClass; 064import org.junit.Before; 065import org.junit.BeforeClass; 066import org.junit.ClassRule; 067import org.junit.Rule; 068import org.junit.Test; 069import org.junit.experimental.categories.Category; 070import org.junit.rules.TestName; 071import org.slf4j.Logger; 072import org.slf4j.LoggerFactory; 073 074import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 075 076@Category({ LargeTests.class, ClientTests.class }) 077@SuppressWarnings("deprecation") 078public class TestBlockEvictionFromClient { 079 080 @ClassRule 081 public static final HBaseClassTestRule CLASS_RULE = 082 HBaseClassTestRule.forClass(TestBlockEvictionFromClient.class); 083 084 private static final Logger LOG = LoggerFactory.getLogger(TestBlockEvictionFromClient.class); 085 protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 086 static byte[][] ROWS = new byte[2][]; 087 private static int NO_OF_THREADS = 3; 088 private static byte[] ROW = Bytes.toBytes("testRow"); 089 private static byte[] ROW1 = Bytes.toBytes("testRow1"); 090 private static byte[] ROW2 = Bytes.toBytes("testRow2"); 091 private static byte[] ROW3 = Bytes.toBytes("testRow3"); 092 private static byte[] FAMILY = Bytes.toBytes("testFamily"); 093 private static byte[][] FAMILIES_1 = new byte[1][0]; 094 private static byte[] QUALIFIER = Bytes.toBytes("testQualifier"); 095 private static byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 096 private static byte[] data = new byte[1000]; 097 private static byte[] data2 = Bytes.add(data, data); 098 protected static int SLAVES = 1; 099 private static CountDownLatch latch; 100 private static CountDownLatch getLatch; 101 private static CountDownLatch compactionLatch; 102 private static CountDownLatch exceptionLatch; 103 104 @Rule 105 public TestName name = new TestName(); 106 107 /** 108 * @throws java.lang.Exception 109 */ 110 @BeforeClass 111 public static void setUpBeforeClass() throws Exception { 112 ROWS[0] = ROW; 113 ROWS[1] = ROW1; 114 Configuration conf = TEST_UTIL.getConfiguration(); 115 conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, 116 MultiRowMutationEndpoint.class.getName()); 117 conf.setInt("hbase.regionserver.handler.count", 20); 118 conf.setInt("hbase.bucketcache.size", 400); 119 conf.setStrings(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap"); 120 conf.setFloat("hfile.block.cache.size", 0.2f); 121 conf.setFloat("hbase.regionserver.global.memstore.size", 0.1f); 122 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);// do not retry 123 conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 5000); 124 FAMILIES_1[0] = FAMILY; 125 TEST_UTIL.startMiniCluster(SLAVES); 126 } 127 128 /** 129 * @throws java.lang.Exception 130 */ 131 @AfterClass 132 public static void tearDownAfterClass() throws Exception { 133 TEST_UTIL.shutdownMiniCluster(); 134 } 135 136 /** 137 * @throws java.lang.Exception 138 */ 139 @Before 140 public void setUp() throws Exception { 141 CustomInnerRegionObserver.waitForGets.set(false); 142 CustomInnerRegionObserver.countOfNext.set(0); 143 CustomInnerRegionObserver.countOfGets.set(0); 144 } 145 146 /** 147 * @throws java.lang.Exception 148 */ 149 @After 150 public void tearDown() throws Exception { 151 if (latch != null) { 152 while (latch.getCount() > 0) { 153 latch.countDown(); 154 } 155 } 156 if (getLatch != null) { 157 getLatch.countDown(); 158 } 159 if (compactionLatch != null) { 160 compactionLatch.countDown(); 161 } 162 if (exceptionLatch != null) { 163 exceptionLatch.countDown(); 164 } 165 latch = null; 166 getLatch = null; 167 compactionLatch = null; 168 exceptionLatch = null; 169 CustomInnerRegionObserver.throwException.set(false); 170 // Clean up the tables for every test case 171 TableName[] listTableNames = TEST_UTIL.getAdmin().listTableNames(); 172 for (TableName tableName : listTableNames) { 173 if (!tableName.isSystemTable()) { 174 TEST_UTIL.getAdmin().disableTable(tableName); 175 TEST_UTIL.getAdmin().deleteTable(tableName); 176 } 177 } 178 } 179 180 @Test 181 public void testBlockEvictionWithParallelScans() throws Exception { 182 Table table = null; 183 try { 184 latch = new CountDownLatch(1); 185 final TableName tableName = TableName.valueOf(name.getMethodName()); 186 // Create a table with block size as 1024 187 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 188 CustomInnerRegionObserver.class.getName()); 189 // get the block cache and region 190 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 191 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 192 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName) 193 .getRegion(regionName); 194 HStore store = region.getStores().iterator().next(); 195 CacheConfig cacheConf = store.getCacheConfig(); 196 cacheConf.setCacheDataOnWrite(true); 197 cacheConf.setEvictOnClose(true); 198 BlockCache cache = cacheConf.getBlockCache().get(); 199 200 // insert data. 2 Rows are added 201 Put put = new Put(ROW); 202 put.addColumn(FAMILY, QUALIFIER, data); 203 table.put(put); 204 put = new Put(ROW1); 205 put.addColumn(FAMILY, QUALIFIER, data); 206 table.put(put); 207 assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data)); 208 // data was in memstore so don't expect any changes 209 // flush the data 210 // Should create one Hfile with 2 blocks 211 region.flush(true); 212 // Load cache 213 // Create three sets of scan 214 ScanThread[] scanThreads = initiateScan(table, false); 215 Thread.sleep(100); 216 checkForBlockEviction(cache, false, false); 217 for (ScanThread thread : scanThreads) { 218 thread.join(); 219 } 220 // CustomInnerRegionObserver.sleepTime.set(0); 221 Iterator<CachedBlock> iterator = cache.iterator(); 222 iterateBlockCache(cache, iterator); 223 // read the data and expect same blocks, one new hit, no misses 224 assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data)); 225 iterator = cache.iterator(); 226 iterateBlockCache(cache, iterator); 227 // Check how this miss is happening 228 // insert a second column, read the row, no new blocks, 3 new hits 229 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 230 byte[] data2 = Bytes.add(data, data); 231 put = new Put(ROW); 232 put.addColumn(FAMILY, QUALIFIER2, data2); 233 table.put(put); 234 Result r = table.get(new Get(ROW)); 235 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data)); 236 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2)); 237 iterator = cache.iterator(); 238 iterateBlockCache(cache, iterator); 239 // flush, one new block 240 System.out.println("Flushing cache"); 241 region.flush(true); 242 iterator = cache.iterator(); 243 iterateBlockCache(cache, iterator); 244 // compact, net minus two blocks, two hits, no misses 245 System.out.println("Compacting"); 246 assertEquals(2, store.getStorefilesCount()); 247 store.triggerMajorCompaction(); 248 region.compact(true); 249 waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max 250 assertEquals(1, store.getStorefilesCount()); 251 iterator = cache.iterator(); 252 iterateBlockCache(cache, iterator); 253 // read the row, this should be a cache miss because we don't cache data 254 // blocks on compaction 255 r = table.get(new Get(ROW)); 256 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data)); 257 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2)); 258 iterator = cache.iterator(); 259 iterateBlockCache(cache, iterator); 260 } finally { 261 if (table != null) { 262 table.close(); 263 } 264 } 265 } 266 267 @Test 268 public void testParallelGetsAndScans() throws IOException, InterruptedException { 269 Table table = null; 270 try { 271 latch = new CountDownLatch(2); 272 // Check if get() returns blocks on its close() itself 273 getLatch = new CountDownLatch(1); 274 final TableName tableName = TableName.valueOf(name.getMethodName()); 275 // Create KV that will give you two blocks 276 // Create a table with block size as 1024 277 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 278 CustomInnerRegionObserver.class.getName()); 279 // get the block cache and region 280 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 281 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 282 HRegion region = 283 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 284 HStore store = region.getStores().iterator().next(); 285 CacheConfig cacheConf = store.getCacheConfig(); 286 cacheConf.setCacheDataOnWrite(true); 287 cacheConf.setEvictOnClose(true); 288 BlockCache cache = cacheConf.getBlockCache().get(); 289 290 insertData(table); 291 // flush the data 292 System.out.println("Flushing cache"); 293 // Should create one Hfile with 2 blocks 294 region.flush(true); 295 // Create three sets of scan 296 CustomInnerRegionObserver.waitForGets.set(true); 297 ScanThread[] scanThreads = initiateScan(table, false); 298 // Create three sets of gets 299 GetThread[] getThreads = initiateGet(table, false, false); 300 checkForBlockEviction(cache, false, false); 301 CustomInnerRegionObserver.waitForGets.set(false); 302 checkForBlockEviction(cache, false, false); 303 for (GetThread thread : getThreads) { 304 thread.join(); 305 } 306 // Verify whether the gets have returned the blocks that it had 307 CustomInnerRegionObserver.waitForGets.set(true); 308 // giving some time for the block to be decremented 309 checkForBlockEviction(cache, true, false); 310 getLatch.countDown(); 311 for (ScanThread thread : scanThreads) { 312 thread.join(); 313 } 314 System.out.println("Scans should have returned the bloks"); 315 // Check with either true or false 316 CustomInnerRegionObserver.waitForGets.set(false); 317 // The scan should also have released the blocks by now 318 checkForBlockEviction(cache, true, true); 319 } finally { 320 if (table != null) { 321 table.close(); 322 } 323 } 324 } 325 326 @Test 327 public void testGetWithCellsInDifferentFiles() throws IOException, InterruptedException { 328 Table table = null; 329 try { 330 latch = new CountDownLatch(1); 331 // Check if get() returns blocks on its close() itself 332 getLatch = new CountDownLatch(1); 333 final TableName tableName = TableName.valueOf(name.getMethodName()); 334 // Create KV that will give you two blocks 335 // Create a table with block size as 1024 336 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 337 CustomInnerRegionObserver.class.getName()); 338 // get the block cache and region 339 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 340 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 341 HRegion region = 342 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 343 HStore store = region.getStores().iterator().next(); 344 CacheConfig cacheConf = store.getCacheConfig(); 345 cacheConf.setCacheDataOnWrite(true); 346 cacheConf.setEvictOnClose(true); 347 BlockCache cache = cacheConf.getBlockCache().get(); 348 349 Put put = new Put(ROW); 350 put.addColumn(FAMILY, QUALIFIER, data); 351 table.put(put); 352 region.flush(true); 353 put = new Put(ROW1); 354 put.addColumn(FAMILY, QUALIFIER, data); 355 table.put(put); 356 region.flush(true); 357 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 358 put = new Put(ROW); 359 put.addColumn(FAMILY, QUALIFIER2, data2); 360 table.put(put); 361 region.flush(true); 362 // flush the data 363 System.out.println("Flushing cache"); 364 // Should create one Hfile with 2 blocks 365 CustomInnerRegionObserver.waitForGets.set(true); 366 // Create three sets of gets 367 GetThread[] getThreads = initiateGet(table, false, false); 368 Thread.sleep(200); 369 CustomInnerRegionObserver.getCdl().get().countDown(); 370 for (GetThread thread : getThreads) { 371 thread.join(); 372 } 373 // Verify whether the gets have returned the blocks that it had 374 CustomInnerRegionObserver.waitForGets.set(true); 375 // giving some time for the block to be decremented 376 checkForBlockEviction(cache, true, false); 377 getLatch.countDown(); 378 System.out.println("Gets should have returned the bloks"); 379 } finally { 380 if (table != null) { 381 table.close(); 382 } 383 } 384 } 385 386 @Test 387 // TODO : check how block index works here 388 public void testGetsWithMultiColumnsAndExplicitTracker() 389 throws IOException, InterruptedException { 390 Table table = null; 391 try { 392 latch = new CountDownLatch(1); 393 // Check if get() returns blocks on its close() itself 394 getLatch = new CountDownLatch(1); 395 final TableName tableName = TableName.valueOf(name.getMethodName()); 396 // Create KV that will give you two blocks 397 // Create a table with block size as 1024 398 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 399 CustomInnerRegionObserver.class.getName()); 400 // get the block cache and region 401 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 402 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 403 HRegion region = 404 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 405 BlockCache cache = setCacheProperties(region); 406 Put put = new Put(ROW); 407 put.addColumn(FAMILY, QUALIFIER, data); 408 table.put(put); 409 region.flush(true); 410 put = new Put(ROW1); 411 put.addColumn(FAMILY, QUALIFIER, data); 412 table.put(put); 413 region.flush(true); 414 for (int i = 1; i < 10; i++) { 415 put = new Put(ROW); 416 put.addColumn(FAMILY, Bytes.toBytes("testQualifier" + i), data2); 417 table.put(put); 418 if (i % 2 == 0) { 419 region.flush(true); 420 } 421 } 422 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 423 put = new Put(ROW); 424 put.addColumn(FAMILY, QUALIFIER2, data2); 425 table.put(put); 426 region.flush(true); 427 // flush the data 428 System.out.println("Flushing cache"); 429 // Should create one Hfile with 2 blocks 430 CustomInnerRegionObserver.waitForGets.set(true); 431 // Create three sets of gets 432 GetThread[] getThreads = initiateGet(table, true, false); 433 Thread.sleep(200); 434 Iterator<CachedBlock> iterator = cache.iterator(); 435 boolean usedBlocksFound = false; 436 int refCount = 0; 437 int noOfBlocksWithRef = 0; 438 while (iterator.hasNext()) { 439 CachedBlock next = iterator.next(); 440 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 441 if (cache instanceof BucketCache) { 442 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 443 } else if (cache instanceof CombinedBlockCache) { 444 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 445 } else { 446 continue; 447 } 448 if (refCount != 0) { 449 // Blocks will be with count 3 450 System.out.println("The refCount is " + refCount); 451 assertEquals(NO_OF_THREADS, refCount); 452 usedBlocksFound = true; 453 noOfBlocksWithRef++; 454 } 455 } 456 assertTrue(usedBlocksFound); 457 // the number of blocks referred 458 assertEquals(10, noOfBlocksWithRef); 459 CustomInnerRegionObserver.getCdl().get().countDown(); 460 for (GetThread thread : getThreads) { 461 thread.join(); 462 } 463 // Verify whether the gets have returned the blocks that it had 464 CustomInnerRegionObserver.waitForGets.set(true); 465 // giving some time for the block to be decremented 466 checkForBlockEviction(cache, true, false); 467 getLatch.countDown(); 468 System.out.println("Gets should have returned the bloks"); 469 } finally { 470 if (table != null) { 471 table.close(); 472 } 473 } 474 } 475 476 @Test 477 public void testGetWithMultipleColumnFamilies() throws IOException, InterruptedException { 478 Table table = null; 479 try { 480 latch = new CountDownLatch(1); 481 // Check if get() returns blocks on its close() itself 482 getLatch = new CountDownLatch(1); 483 final TableName tableName = TableName.valueOf(name.getMethodName()); 484 // Create KV that will give you two blocks 485 // Create a table with block size as 1024 486 byte[][] fams = new byte[10][]; 487 fams[0] = FAMILY; 488 for (int i = 1; i < 10; i++) { 489 fams[i] = (Bytes.toBytes("testFamily" + i)); 490 } 491 table = TEST_UTIL.createTable(tableName, fams, 1, 1024, 492 CustomInnerRegionObserver.class.getName()); 493 // get the block cache and region 494 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 495 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 496 HRegion region = 497 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 498 BlockCache cache = setCacheProperties(region); 499 500 Put put = new Put(ROW); 501 put.addColumn(FAMILY, QUALIFIER, data); 502 table.put(put); 503 region.flush(true); 504 put = new Put(ROW1); 505 put.addColumn(FAMILY, QUALIFIER, data); 506 table.put(put); 507 region.flush(true); 508 for (int i = 1; i < 10; i++) { 509 put = new Put(ROW); 510 put.addColumn(Bytes.toBytes("testFamily" + i), Bytes.toBytes("testQualifier" + i), data2); 511 table.put(put); 512 if (i % 2 == 0) { 513 region.flush(true); 514 } 515 } 516 region.flush(true); 517 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 518 put = new Put(ROW); 519 put.addColumn(FAMILY, QUALIFIER2, data2); 520 table.put(put); 521 region.flush(true); 522 // flush the data 523 System.out.println("Flushing cache"); 524 // Should create one Hfile with 2 blocks 525 CustomInnerRegionObserver.waitForGets.set(true); 526 // Create three sets of gets 527 GetThread[] getThreads = initiateGet(table, true, true); 528 Thread.sleep(200); 529 Iterator<CachedBlock> iterator = cache.iterator(); 530 boolean usedBlocksFound = false; 531 int refCount = 0; 532 int noOfBlocksWithRef = 0; 533 while (iterator.hasNext()) { 534 CachedBlock next = iterator.next(); 535 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 536 if (cache instanceof BucketCache) { 537 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 538 } else if (cache instanceof CombinedBlockCache) { 539 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 540 } else { 541 continue; 542 } 543 if (refCount != 0) { 544 // Blocks will be with count 3 545 System.out.println("The refCount is " + refCount); 546 assertEquals(NO_OF_THREADS, refCount); 547 usedBlocksFound = true; 548 noOfBlocksWithRef++; 549 } 550 } 551 assertTrue(usedBlocksFound); 552 // the number of blocks referred 553 assertEquals(3, noOfBlocksWithRef); 554 CustomInnerRegionObserver.getCdl().get().countDown(); 555 for (GetThread thread : getThreads) { 556 thread.join(); 557 } 558 // Verify whether the gets have returned the blocks that it had 559 CustomInnerRegionObserver.waitForGets.set(true); 560 // giving some time for the block to be decremented 561 checkForBlockEviction(cache, true, false); 562 getLatch.countDown(); 563 System.out.println("Gets should have returned the bloks"); 564 } finally { 565 if (table != null) { 566 table.close(); 567 } 568 } 569 } 570 571 @Test 572 public void testBlockRefCountAfterSplits() throws IOException, InterruptedException { 573 Table table = null; 574 try { 575 final TableName tableName = TableName.valueOf(name.getMethodName()); 576 TableDescriptor desc = TEST_UTIL.createTableDescriptor(tableName); 577 // This test expects rpc refcount of cached data blocks to be 0 after split. After split, 578 // two daughter regions are opened and a compaction is scheduled to get rid of reference 579 // of the parent region hfiles. Compaction will increase refcount of cached data blocks by 1. 580 // It is flakey since compaction can kick in anytime. To solve this issue, table is created 581 // with compaction disabled. 582 table = TEST_UTIL.createTable( 583 TableDescriptorBuilder.newBuilder(desc).setCompactionEnabled(false).build(), FAMILIES_1, 584 null, BloomType.ROW, 1024, null); 585 // get the block cache and region 586 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 587 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 588 HRegion region = 589 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 590 HStore store = region.getStores().iterator().next(); 591 CacheConfig cacheConf = store.getCacheConfig(); 592 cacheConf.setEvictOnClose(true); 593 BlockCache cache = cacheConf.getBlockCache().get(); 594 595 Put put = new Put(ROW); 596 put.addColumn(FAMILY, QUALIFIER, data); 597 table.put(put); 598 region.flush(true); 599 put = new Put(ROW1); 600 put.addColumn(FAMILY, QUALIFIER, data); 601 table.put(put); 602 region.flush(true); 603 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 604 put = new Put(ROW2); 605 put.addColumn(FAMILY, QUALIFIER2, data2); 606 table.put(put); 607 put = new Put(ROW3); 608 put.addColumn(FAMILY, QUALIFIER2, data2); 609 table.put(put); 610 region.flush(true); 611 ServerName rs = Iterables.getOnlyElement(TEST_UTIL.getAdmin().getRegionServers()); 612 int regionCount = TEST_UTIL.getAdmin().getRegions(rs).size(); 613 LOG.info("About to SPLIT on {} {}, count={}", Bytes.toString(ROW1), region.getRegionInfo(), 614 regionCount); 615 TEST_UTIL.getAdmin().split(tableName, ROW1); 616 // Wait for splits 617 TEST_UTIL.waitFor(60000, () -> TEST_UTIL.getAdmin().getRegions(rs).size() > regionCount); 618 region.compact(true); 619 List<HRegion> regions = TEST_UTIL.getMiniHBaseCluster().getRegionServer(rs).getRegions(); 620 for (HRegion r: regions) { 621 LOG.info("" + r.getCompactionState()); 622 TEST_UTIL.waitFor(30000, () -> r.getCompactionState().equals(CompactionState.NONE)); 623 } 624 LOG.info("Split finished, is region closed {} {}", region.isClosed(), cache); 625 Iterator<CachedBlock> iterator = cache.iterator(); 626 // Though the split had created the HalfStorefileReader - the firstkey and lastkey scanners 627 // should be closed inorder to return those blocks 628 iterateBlockCache(cache, iterator); 629 } finally { 630 if (table != null) { 631 table.close(); 632 } 633 } 634 } 635 636 @Test 637 public void testMultiGets() throws IOException, InterruptedException { 638 Table table = null; 639 try { 640 latch = new CountDownLatch(2); 641 // Check if get() returns blocks on its close() itself 642 getLatch = new CountDownLatch(1); 643 final TableName tableName = TableName.valueOf(name.getMethodName()); 644 // Create KV that will give you two blocks 645 // Create a table with block size as 1024 646 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 647 CustomInnerRegionObserver.class.getName()); 648 // get the block cache and region 649 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 650 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 651 HRegion region = 652 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 653 HStore store = region.getStores().iterator().next(); 654 CacheConfig cacheConf = store.getCacheConfig(); 655 cacheConf.setCacheDataOnWrite(true); 656 cacheConf.setEvictOnClose(true); 657 BlockCache cache = cacheConf.getBlockCache().get(); 658 659 Put put = new Put(ROW); 660 put.addColumn(FAMILY, QUALIFIER, data); 661 table.put(put); 662 region.flush(true); 663 put = new Put(ROW1); 664 put.addColumn(FAMILY, QUALIFIER, data); 665 table.put(put); 666 region.flush(true); 667 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 668 put = new Put(ROW); 669 put.addColumn(FAMILY, QUALIFIER2, data2); 670 table.put(put); 671 region.flush(true); 672 // flush the data 673 System.out.println("Flushing cache"); 674 // Should create one Hfile with 2 blocks 675 CustomInnerRegionObserver.waitForGets.set(true); 676 // Create three sets of gets 677 MultiGetThread[] getThreads = initiateMultiGet(table); 678 Thread.sleep(200); 679 int refCount; 680 Iterator<CachedBlock> iterator = cache.iterator(); 681 boolean foundNonZeroBlock = false; 682 while (iterator.hasNext()) { 683 CachedBlock next = iterator.next(); 684 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 685 if (cache instanceof BucketCache) { 686 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 687 } else if (cache instanceof CombinedBlockCache) { 688 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 689 } else { 690 continue; 691 } 692 if (refCount != 0) { 693 assertEquals(NO_OF_THREADS, refCount); 694 foundNonZeroBlock = true; 695 } 696 } 697 assertTrue("Should have found nonzero ref count block",foundNonZeroBlock); 698 CustomInnerRegionObserver.getCdl().get().countDown(); 699 CustomInnerRegionObserver.getCdl().get().countDown(); 700 for (MultiGetThread thread : getThreads) { 701 thread.join(); 702 } 703 // Verify whether the gets have returned the blocks that it had 704 CustomInnerRegionObserver.waitForGets.set(true); 705 // giving some time for the block to be decremented 706 iterateBlockCache(cache, iterator); 707 getLatch.countDown(); 708 System.out.println("Gets should have returned the bloks"); 709 } finally { 710 if (table != null) { 711 table.close(); 712 } 713 } 714 } 715 @Test 716 public void testScanWithMultipleColumnFamilies() throws IOException, InterruptedException { 717 Table table = null; 718 try { 719 latch = new CountDownLatch(1); 720 // Check if get() returns blocks on its close() itself 721 final TableName tableName = TableName.valueOf(name.getMethodName()); 722 // Create KV that will give you two blocks 723 // Create a table with block size as 1024 724 byte[][] fams = new byte[10][]; 725 fams[0] = FAMILY; 726 for (int i = 1; i < 10; i++) { 727 fams[i] = (Bytes.toBytes("testFamily" + i)); 728 } 729 table = TEST_UTIL.createTable(tableName, fams, 1, 1024, 730 CustomInnerRegionObserver.class.getName()); 731 // get the block cache and region 732 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 733 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 734 HRegion region = 735 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 736 BlockCache cache = setCacheProperties(region); 737 738 Put put = new Put(ROW); 739 put.addColumn(FAMILY, QUALIFIER, data); 740 table.put(put); 741 region.flush(true); 742 put = new Put(ROW1); 743 put.addColumn(FAMILY, QUALIFIER, data); 744 table.put(put); 745 region.flush(true); 746 for (int i = 1; i < 10; i++) { 747 put = new Put(ROW); 748 put.addColumn(Bytes.toBytes("testFamily" + i), Bytes.toBytes("testQualifier" + i), data2); 749 table.put(put); 750 if (i % 2 == 0) { 751 region.flush(true); 752 } 753 } 754 region.flush(true); 755 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 756 put = new Put(ROW); 757 put.addColumn(FAMILY, QUALIFIER2, data2); 758 table.put(put); 759 region.flush(true); 760 // flush the data 761 System.out.println("Flushing cache"); 762 // Should create one Hfile with 2 blocks 763 // Create three sets of gets 764 ScanThread[] scanThreads = initiateScan(table, true); 765 Thread.sleep(200); 766 Iterator<CachedBlock> iterator = cache.iterator(); 767 boolean usedBlocksFound = false; 768 int refCount = 0; 769 int noOfBlocksWithRef = 0; 770 while (iterator.hasNext()) { 771 CachedBlock next = iterator.next(); 772 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 773 if (cache instanceof BucketCache) { 774 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 775 } else if (cache instanceof CombinedBlockCache) { 776 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 777 } else { 778 continue; 779 } 780 if (refCount != 0) { 781 // Blocks will be with count 3 782 System.out.println("The refCount is " + refCount); 783 assertEquals(NO_OF_THREADS, refCount); 784 usedBlocksFound = true; 785 noOfBlocksWithRef++; 786 } 787 } 788 assertTrue(usedBlocksFound); 789 // the number of blocks referred 790 assertEquals(12, noOfBlocksWithRef); 791 CustomInnerRegionObserver.getCdl().get().countDown(); 792 for (ScanThread thread : scanThreads) { 793 thread.join(); 794 } 795 // giving some time for the block to be decremented 796 checkForBlockEviction(cache, true, false); 797 } finally { 798 if (table != null) { 799 table.close(); 800 } 801 } 802 } 803 804 private BlockCache setCacheProperties(HRegion region) { 805 Iterator<HStore> strItr = region.getStores().iterator(); 806 BlockCache cache = null; 807 while (strItr.hasNext()) { 808 HStore store = strItr.next(); 809 CacheConfig cacheConf = store.getCacheConfig(); 810 cacheConf.setCacheDataOnWrite(true); 811 cacheConf.setEvictOnClose(true); 812 // Use the last one 813 cache = cacheConf.getBlockCache().get(); 814 } 815 return cache; 816 } 817 818 @Test 819 public void testParallelGetsAndScanWithWrappedRegionScanner() throws IOException, 820 InterruptedException { 821 Table table = null; 822 try { 823 latch = new CountDownLatch(2); 824 // Check if get() returns blocks on its close() itself 825 getLatch = new CountDownLatch(1); 826 final TableName tableName = TableName.valueOf(name.getMethodName()); 827 // Create KV that will give you two blocks 828 // Create a table with block size as 1024 829 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 830 CustomInnerRegionObserverWrapper.class.getName()); 831 // get the block cache and region 832 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 833 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 834 HRegion region = 835 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 836 HStore store = region.getStores().iterator().next(); 837 CacheConfig cacheConf = store.getCacheConfig(); 838 cacheConf.setCacheDataOnWrite(true); 839 cacheConf.setEvictOnClose(true); 840 BlockCache cache = cacheConf.getBlockCache().get(); 841 842 // insert data. 2 Rows are added 843 insertData(table); 844 // flush the data 845 System.out.println("Flushing cache"); 846 // Should create one Hfile with 2 blocks 847 region.flush(true); 848 // CustomInnerRegionObserver.sleepTime.set(5000); 849 // Create three sets of scan 850 CustomInnerRegionObserver.waitForGets.set(true); 851 ScanThread[] scanThreads = initiateScan(table, false); 852 // Create three sets of gets 853 GetThread[] getThreads = initiateGet(table, false, false); 854 // The block would have been decremented for the scan case as it was 855 // wrapped 856 // before even the postNext hook gets executed. 857 // giving some time for the block to be decremented 858 Thread.sleep(100); 859 CustomInnerRegionObserver.waitForGets.set(false); 860 checkForBlockEviction(cache, false, false); 861 // countdown the latch 862 CustomInnerRegionObserver.getCdl().get().countDown(); 863 for (GetThread thread : getThreads) { 864 thread.join(); 865 } 866 getLatch.countDown(); 867 for (ScanThread thread : scanThreads) { 868 thread.join(); 869 } 870 } finally { 871 if (table != null) { 872 table.close(); 873 } 874 } 875 } 876 877 @Test 878 public void testScanWithCompaction() throws IOException, InterruptedException { 879 testScanWithCompactionInternals(name.getMethodName(), false); 880 } 881 882 @Test 883 public void testReverseScanWithCompaction() throws IOException, InterruptedException { 884 testScanWithCompactionInternals(name.getMethodName(), true); 885 } 886 887 private void testScanWithCompactionInternals(String tableNameStr, boolean reversed) 888 throws IOException, InterruptedException { 889 Table table = null; 890 try { 891 latch = new CountDownLatch(1); 892 compactionLatch = new CountDownLatch(1); 893 TableName tableName = TableName.valueOf(tableNameStr); 894 // Create a table with block size as 1024 895 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 896 CustomInnerRegionObserverWrapper.class.getName()); 897 // get the block cache and region 898 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 899 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 900 HRegion region = 901 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 902 HStore store = region.getStores().iterator().next(); 903 CacheConfig cacheConf = store.getCacheConfig(); 904 cacheConf.setCacheDataOnWrite(true); 905 cacheConf.setEvictOnClose(true); 906 BlockCache cache = cacheConf.getBlockCache().get(); 907 908 // insert data. 2 Rows are added 909 Put put = new Put(ROW); 910 put.addColumn(FAMILY, QUALIFIER, data); 911 table.put(put); 912 put = new Put(ROW1); 913 put.addColumn(FAMILY, QUALIFIER, data); 914 table.put(put); 915 assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data)); 916 // Should create one Hfile with 2 blocks 917 region.flush(true); 918 // read the data and expect same blocks, one new hit, no misses 919 int refCount = 0; 920 // Check how this miss is happening 921 // insert a second column, read the row, no new blocks, 3 new hits 922 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 923 byte[] data2 = Bytes.add(data, data); 924 put = new Put(ROW); 925 put.addColumn(FAMILY, QUALIFIER2, data2); 926 table.put(put); 927 // flush, one new block 928 System.out.println("Flushing cache"); 929 region.flush(true); 930 Iterator<CachedBlock> iterator = cache.iterator(); 931 iterateBlockCache(cache, iterator); 932 // Create three sets of scan 933 ScanThread[] scanThreads = initiateScan(table, reversed); 934 Thread.sleep(100); 935 iterator = cache.iterator(); 936 boolean usedBlocksFound = false; 937 while (iterator.hasNext()) { 938 CachedBlock next = iterator.next(); 939 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 940 if (cache instanceof BucketCache) { 941 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 942 } else if (cache instanceof CombinedBlockCache) { 943 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 944 } else { 945 continue; 946 } 947 if (refCount != 0) { 948 // Blocks will be with count 3 949 assertEquals(NO_OF_THREADS, refCount); 950 usedBlocksFound = true; 951 } 952 } 953 assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound); 954 usedBlocksFound = false; 955 System.out.println("Compacting"); 956 assertEquals(2, store.getStorefilesCount()); 957 store.triggerMajorCompaction(); 958 region.compact(true); 959 waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max 960 assertEquals(1, store.getStorefilesCount()); 961 // Even after compaction is done we will have some blocks that cannot 962 // be evicted this is because the scan is still referencing them 963 iterator = cache.iterator(); 964 while (iterator.hasNext()) { 965 CachedBlock next = iterator.next(); 966 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 967 if (cache instanceof BucketCache) { 968 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 969 } else if (cache instanceof CombinedBlockCache) { 970 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 971 } else { 972 continue; 973 } 974 if (refCount != 0) { 975 // Blocks will be with count 3 as they are not yet cleared 976 assertEquals(NO_OF_THREADS, refCount); 977 usedBlocksFound = true; 978 } 979 } 980 assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound); 981 // Should not throw exception 982 compactionLatch.countDown(); 983 latch.countDown(); 984 for (ScanThread thread : scanThreads) { 985 thread.join(); 986 } 987 // by this time all blocks should have been evicted 988 iterator = cache.iterator(); 989 iterateBlockCache(cache, iterator); 990 Result r = table.get(new Get(ROW)); 991 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data)); 992 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2)); 993 // The gets would be working on new blocks 994 iterator = cache.iterator(); 995 iterateBlockCache(cache, iterator); 996 } finally { 997 if (table != null) { 998 table.close(); 999 } 1000 } 1001 } 1002 1003 @Test 1004 public void testBlockEvictionAfterHBASE13082WithCompactionAndFlush() 1005 throws IOException, InterruptedException { 1006 // do flush and scan in parallel 1007 Table table = null; 1008 try { 1009 latch = new CountDownLatch(1); 1010 compactionLatch = new CountDownLatch(1); 1011 final TableName tableName = TableName.valueOf(name.getMethodName()); 1012 // Create a table with block size as 1024 1013 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 1014 CustomInnerRegionObserverWrapper.class.getName()); 1015 // get the block cache and region 1016 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 1017 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 1018 HRegion region = 1019 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 1020 HStore store = region.getStores().iterator().next(); 1021 CacheConfig cacheConf = store.getCacheConfig(); 1022 cacheConf.setCacheDataOnWrite(true); 1023 cacheConf.setEvictOnClose(true); 1024 BlockCache cache = cacheConf.getBlockCache().get(); 1025 1026 // insert data. 2 Rows are added 1027 Put put = new Put(ROW); 1028 put.addColumn(FAMILY, QUALIFIER, data); 1029 table.put(put); 1030 put = new Put(ROW1); 1031 put.addColumn(FAMILY, QUALIFIER, data); 1032 table.put(put); 1033 assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data)); 1034 // Should create one Hfile with 2 blocks 1035 region.flush(true); 1036 // read the data and expect same blocks, one new hit, no misses 1037 int refCount = 0; 1038 // Check how this miss is happening 1039 // insert a second column, read the row, no new blocks, 3 new hits 1040 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 1041 byte[] data2 = Bytes.add(data, data); 1042 put = new Put(ROW); 1043 put.addColumn(FAMILY, QUALIFIER2, data2); 1044 table.put(put); 1045 // flush, one new block 1046 System.out.println("Flushing cache"); 1047 region.flush(true); 1048 Iterator<CachedBlock> iterator = cache.iterator(); 1049 iterateBlockCache(cache, iterator); 1050 // Create three sets of scan 1051 ScanThread[] scanThreads = initiateScan(table, false); 1052 Thread.sleep(100); 1053 iterator = cache.iterator(); 1054 boolean usedBlocksFound = false; 1055 while (iterator.hasNext()) { 1056 CachedBlock next = iterator.next(); 1057 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 1058 if (cache instanceof BucketCache) { 1059 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 1060 } else if (cache instanceof CombinedBlockCache) { 1061 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 1062 } else { 1063 continue; 1064 } 1065 if (refCount != 0) { 1066 // Blocks will be with count 3 1067 assertEquals(NO_OF_THREADS, refCount); 1068 usedBlocksFound = true; 1069 } 1070 } 1071 // Make a put and do a flush 1072 QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 1073 data2 = Bytes.add(data, data); 1074 put = new Put(ROW1); 1075 put.addColumn(FAMILY, QUALIFIER2, data2); 1076 table.put(put); 1077 // flush, one new block 1078 System.out.println("Flushing cache"); 1079 region.flush(true); 1080 assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound); 1081 usedBlocksFound = false; 1082 System.out.println("Compacting"); 1083 assertEquals(3, store.getStorefilesCount()); 1084 store.triggerMajorCompaction(); 1085 region.compact(true); 1086 waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max 1087 assertEquals(1, store.getStorefilesCount()); 1088 // Even after compaction is done we will have some blocks that cannot 1089 // be evicted this is because the scan is still referencing them 1090 iterator = cache.iterator(); 1091 while (iterator.hasNext()) { 1092 CachedBlock next = iterator.next(); 1093 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 1094 if (cache instanceof BucketCache) { 1095 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 1096 } else if (cache instanceof CombinedBlockCache) { 1097 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 1098 } else { 1099 continue; 1100 } 1101 if (refCount != 0) { 1102 // Blocks will be with count 3 as they are not yet cleared 1103 assertEquals(NO_OF_THREADS, refCount); 1104 usedBlocksFound = true; 1105 } 1106 } 1107 assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound); 1108 // Should not throw exception 1109 compactionLatch.countDown(); 1110 latch.countDown(); 1111 for (ScanThread thread : scanThreads) { 1112 thread.join(); 1113 } 1114 // by this time all blocks should have been evicted 1115 iterator = cache.iterator(); 1116 // Since a flush and compaction happened after a scan started 1117 // we need to ensure that all the original blocks of the compacted file 1118 // is also removed. 1119 iterateBlockCache(cache, iterator); 1120 Result r = table.get(new Get(ROW)); 1121 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data)); 1122 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2)); 1123 // The gets would be working on new blocks 1124 iterator = cache.iterator(); 1125 iterateBlockCache(cache, iterator); 1126 } finally { 1127 if (table != null) { 1128 table.close(); 1129 } 1130 } 1131 } 1132 1133 1134 @Test 1135 public void testScanWithException() throws IOException, InterruptedException { 1136 Table table = null; 1137 try { 1138 latch = new CountDownLatch(1); 1139 exceptionLatch = new CountDownLatch(1); 1140 final TableName tableName = TableName.valueOf(name.getMethodName()); 1141 // Create KV that will give you two blocks 1142 // Create a table with block size as 1024 1143 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 1144 CustomInnerRegionObserverWrapper.class.getName()); 1145 // get the block cache and region 1146 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 1147 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 1148 HRegion region = 1149 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 1150 HStore store = region.getStores().iterator().next(); 1151 CacheConfig cacheConf = store.getCacheConfig(); 1152 cacheConf.setCacheDataOnWrite(true); 1153 cacheConf.setEvictOnClose(true); 1154 BlockCache cache = cacheConf.getBlockCache().get(); 1155 // insert data. 2 Rows are added 1156 insertData(table); 1157 // flush the data 1158 System.out.println("Flushing cache"); 1159 // Should create one Hfile with 2 blocks 1160 region.flush(true); 1161 // CustomInnerRegionObserver.sleepTime.set(5000); 1162 CustomInnerRegionObserver.throwException.set(true); 1163 ScanThread[] scanThreads = initiateScan(table, false); 1164 // The block would have been decremented for the scan case as it was 1165 // wrapped 1166 // before even the postNext hook gets executed. 1167 // giving some time for the block to be decremented 1168 Thread.sleep(100); 1169 Iterator<CachedBlock> iterator = cache.iterator(); 1170 boolean usedBlocksFound = false; 1171 int refCount = 0; 1172 while (iterator.hasNext()) { 1173 CachedBlock next = iterator.next(); 1174 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 1175 if (cache instanceof BucketCache) { 1176 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 1177 } else if (cache instanceof CombinedBlockCache) { 1178 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 1179 } else { 1180 continue; 1181 } 1182 if (refCount != 0) { 1183 // Blocks will be with count 3 1184 assertEquals(NO_OF_THREADS, refCount); 1185 usedBlocksFound = true; 1186 } 1187 } 1188 assertTrue(usedBlocksFound); 1189 exceptionLatch.countDown(); 1190 // countdown the latch 1191 CustomInnerRegionObserver.getCdl().get().countDown(); 1192 for (ScanThread thread : scanThreads) { 1193 thread.join(); 1194 } 1195 iterator = cache.iterator(); 1196 usedBlocksFound = false; 1197 refCount = 0; 1198 while (iterator.hasNext()) { 1199 CachedBlock next = iterator.next(); 1200 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 1201 if (cache instanceof BucketCache) { 1202 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 1203 } else if (cache instanceof CombinedBlockCache) { 1204 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 1205 } else { 1206 continue; 1207 } 1208 if (refCount != 0) { 1209 // Blocks will be with count 3 1210 assertEquals(NO_OF_THREADS, refCount); 1211 usedBlocksFound = true; 1212 } 1213 } 1214 assertFalse(usedBlocksFound); 1215 // you should always see 0 ref count. since after HBASE-16604 we always recreate the scanner 1216 assertEquals(0, refCount); 1217 } finally { 1218 if (table != null) { 1219 table.close(); 1220 } 1221 } 1222 } 1223 1224 private void iterateBlockCache(BlockCache cache, Iterator<CachedBlock> iterator) { 1225 int refCount; 1226 while (iterator.hasNext()) { 1227 CachedBlock next = iterator.next(); 1228 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 1229 if (cache instanceof BucketCache) { 1230 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 1231 LOG.info("BucketCache {} {}", cacheKey, refCount); 1232 } else if (cache instanceof CombinedBlockCache) { 1233 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 1234 LOG.info("CombinedBlockCache {} {}", cacheKey, refCount); 1235 } else { 1236 continue; 1237 } 1238 assertEquals(0, refCount); 1239 } 1240 } 1241 1242 private void insertData(Table table) throws IOException { 1243 Put put = new Put(ROW); 1244 put.addColumn(FAMILY, QUALIFIER, data); 1245 table.put(put); 1246 put = new Put(ROW1); 1247 put.addColumn(FAMILY, QUALIFIER, data); 1248 table.put(put); 1249 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 1250 put = new Put(ROW); 1251 put.addColumn(FAMILY, QUALIFIER2, data2); 1252 table.put(put); 1253 } 1254 1255 private ScanThread[] initiateScan(Table table, boolean reverse) throws IOException, 1256 InterruptedException { 1257 ScanThread[] scanThreads = new ScanThread[NO_OF_THREADS]; 1258 for (int i = 0; i < NO_OF_THREADS; i++) { 1259 scanThreads[i] = new ScanThread(table, reverse); 1260 } 1261 for (ScanThread thread : scanThreads) { 1262 thread.start(); 1263 } 1264 return scanThreads; 1265 } 1266 1267 private GetThread[] initiateGet(Table table, boolean tracker, boolean multipleCFs) 1268 throws IOException, InterruptedException { 1269 GetThread[] getThreads = new GetThread[NO_OF_THREADS]; 1270 for (int i = 0; i < NO_OF_THREADS; i++) { 1271 getThreads[i] = new GetThread(table, tracker, multipleCFs); 1272 } 1273 for (GetThread thread : getThreads) { 1274 thread.start(); 1275 } 1276 return getThreads; 1277 } 1278 1279 private MultiGetThread[] initiateMultiGet(Table table) 1280 throws IOException, InterruptedException { 1281 MultiGetThread[] multiGetThreads = new MultiGetThread[NO_OF_THREADS]; 1282 for (int i = 0; i < NO_OF_THREADS; i++) { 1283 multiGetThreads[i] = new MultiGetThread(table); 1284 } 1285 for (MultiGetThread thread : multiGetThreads) { 1286 thread.start(); 1287 } 1288 return multiGetThreads; 1289 } 1290 1291 private void checkForBlockEviction(BlockCache cache, boolean getClosed, boolean expectOnlyZero) 1292 throws InterruptedException { 1293 int counter = NO_OF_THREADS; 1294 if (CustomInnerRegionObserver.waitForGets.get()) { 1295 // Because only one row is selected, it has only 2 blocks 1296 counter = counter - 1; 1297 while (CustomInnerRegionObserver.countOfGets.get() < NO_OF_THREADS) { 1298 Thread.sleep(100); 1299 } 1300 } else { 1301 while (CustomInnerRegionObserver.countOfNext.get() < NO_OF_THREADS) { 1302 Thread.sleep(100); 1303 } 1304 } 1305 Iterator<CachedBlock> iterator = cache.iterator(); 1306 int refCount = 0; 1307 while (iterator.hasNext()) { 1308 CachedBlock next = iterator.next(); 1309 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 1310 if (cache instanceof BucketCache) { 1311 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 1312 } else if (cache instanceof CombinedBlockCache) { 1313 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 1314 } else { 1315 continue; 1316 } 1317 System.out.println(" the refcount is " + refCount + " block is " + cacheKey); 1318 if (CustomInnerRegionObserver.waitForGets.get()) { 1319 if (expectOnlyZero) { 1320 assertTrue(refCount == 0); 1321 } 1322 if (refCount != 0) { 1323 // Because the scan would have also touched up on these blocks but 1324 // it 1325 // would have touched 1326 // all 3 1327 if (getClosed) { 1328 // If get has closed only the scan's blocks would be available 1329 assertEquals(refCount, CustomInnerRegionObserver.countOfGets.get()); 1330 } else { 1331 assertEquals(refCount, CustomInnerRegionObserver.countOfGets.get() + (NO_OF_THREADS)); 1332 } 1333 } 1334 } else { 1335 // Because the get would have also touched up on these blocks but it 1336 // would have touched 1337 // upon only 2 additionally 1338 if (expectOnlyZero) { 1339 assertTrue(refCount == 0); 1340 } 1341 if (refCount != 0) { 1342 if (getLatch == null) { 1343 assertEquals(refCount, CustomInnerRegionObserver.countOfNext.get()); 1344 } else { 1345 assertEquals(refCount, CustomInnerRegionObserver.countOfNext.get() + (NO_OF_THREADS)); 1346 } 1347 } 1348 } 1349 } 1350 CustomInnerRegionObserver.getCdl().get().countDown(); 1351 } 1352 1353 private static class MultiGetThread extends Thread { 1354 private final Table table; 1355 private final List<Get> gets = new ArrayList<>(); 1356 public MultiGetThread(Table table) { 1357 this.table = table; 1358 } 1359 @Override 1360 public void run() { 1361 gets.add(new Get(ROW)); 1362 gets.add(new Get(ROW1)); 1363 try { 1364 CustomInnerRegionObserver.getCdl().set(latch); 1365 Result[] r = table.get(gets); 1366 assertTrue(Bytes.equals(r[0].getRow(), ROW)); 1367 assertTrue(Bytes.equals(r[1].getRow(), ROW1)); 1368 } catch (IOException e) { 1369 } 1370 } 1371 } 1372 1373 private static class GetThread extends Thread { 1374 private final Table table; 1375 private final boolean tracker; 1376 private final boolean multipleCFs; 1377 1378 public GetThread(Table table, boolean tracker, boolean multipleCFs) { 1379 this.table = table; 1380 this.tracker = tracker; 1381 this.multipleCFs = multipleCFs; 1382 } 1383 1384 @Override 1385 public void run() { 1386 try { 1387 initiateGet(table); 1388 } catch (IOException e) { 1389 // do nothing 1390 } 1391 } 1392 1393 private void initiateGet(Table table) throws IOException { 1394 Get get = new Get(ROW); 1395 if (tracker) { 1396 // Change this 1397 if (!multipleCFs) { 1398 get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 3)); 1399 get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 8)); 1400 get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 9)); 1401 // Unknown key 1402 get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 900)); 1403 } else { 1404 get.addColumn(Bytes.toBytes("testFamily" + 3), Bytes.toBytes("testQualifier" + 3)); 1405 get.addColumn(Bytes.toBytes("testFamily" + 8), Bytes.toBytes("testQualifier" + 8)); 1406 get.addColumn(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 9)); 1407 // Unknown key 1408 get.addColumn(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 900)); 1409 } 1410 } 1411 CustomInnerRegionObserver.getCdl().set(latch); 1412 Result r = table.get(get); 1413 System.out.println(r); 1414 if (!tracker) { 1415 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data)); 1416 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2)); 1417 } else { 1418 if (!multipleCFs) { 1419 assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 3)), data2)); 1420 assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 8)), data2)); 1421 assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 9)), data2)); 1422 } else { 1423 assertTrue(Bytes.equals( 1424 r.getValue(Bytes.toBytes("testFamily" + 3), Bytes.toBytes("testQualifier" + 3)), 1425 data2)); 1426 assertTrue(Bytes.equals( 1427 r.getValue(Bytes.toBytes("testFamily" + 8), Bytes.toBytes("testQualifier" + 8)), 1428 data2)); 1429 assertTrue(Bytes.equals( 1430 r.getValue(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 9)), 1431 data2)); 1432 } 1433 } 1434 } 1435 } 1436 1437 private static class ScanThread extends Thread { 1438 private final Table table; 1439 private final boolean reverse; 1440 1441 public ScanThread(Table table, boolean reverse) { 1442 this.table = table; 1443 this.reverse = reverse; 1444 } 1445 1446 @Override 1447 public void run() { 1448 try { 1449 initiateScan(table); 1450 } catch (IOException e) { 1451 // do nothing 1452 } 1453 } 1454 1455 private void initiateScan(Table table) throws IOException { 1456 Scan scan = new Scan(); 1457 if (reverse) { 1458 scan.setReversed(true); 1459 } 1460 CustomInnerRegionObserver.getCdl().set(latch); 1461 ResultScanner resScanner = table.getScanner(scan); 1462 int i = (reverse ? ROWS.length - 1 : 0); 1463 boolean resultFound = false; 1464 for (Result result : resScanner) { 1465 resultFound = true; 1466 System.out.println(result); 1467 if (!reverse) { 1468 assertTrue(Bytes.equals(result.getRow(), ROWS[i])); 1469 i++; 1470 } else { 1471 assertTrue(Bytes.equals(result.getRow(), ROWS[i])); 1472 i--; 1473 } 1474 } 1475 assertTrue(resultFound); 1476 } 1477 } 1478 1479 private void waitForStoreFileCount(HStore store, int count, int timeout) 1480 throws InterruptedException { 1481 long start = System.currentTimeMillis(); 1482 while (start + timeout > System.currentTimeMillis() && store.getStorefilesCount() != count) { 1483 Thread.sleep(100); 1484 } 1485 System.out.println("start=" + start + ", now=" + System.currentTimeMillis() + ", cur=" + 1486 store.getStorefilesCount()); 1487 assertEquals(count, store.getStorefilesCount()); 1488 } 1489 1490 private static class CustomScanner implements RegionScanner { 1491 1492 private RegionScanner delegate; 1493 1494 public CustomScanner(RegionScanner delegate) { 1495 this.delegate = delegate; 1496 } 1497 1498 @Override 1499 public boolean next(List<Cell> results) throws IOException { 1500 return delegate.next(results); 1501 } 1502 1503 @Override 1504 public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException { 1505 return delegate.next(result, scannerContext); 1506 } 1507 1508 @Override 1509 public boolean nextRaw(List<Cell> result) throws IOException { 1510 return delegate.nextRaw(result); 1511 } 1512 1513 @Override 1514 public boolean nextRaw(List<Cell> result, ScannerContext context) throws IOException { 1515 boolean nextRaw = delegate.nextRaw(result, context); 1516 if (compactionLatch != null && compactionLatch.getCount() > 0) { 1517 try { 1518 compactionLatch.await(); 1519 } catch (InterruptedException ie) { 1520 } 1521 } 1522 1523 if (CustomInnerRegionObserver.throwException.get()) { 1524 if (exceptionLatch.getCount() > 0) { 1525 try { 1526 exceptionLatch.await(); 1527 } catch (InterruptedException e) { 1528 } 1529 throw new IOException("throw exception"); 1530 } 1531 } 1532 return nextRaw; 1533 } 1534 1535 @Override 1536 public void close() throws IOException { 1537 delegate.close(); 1538 } 1539 1540 @Override 1541 public RegionInfo getRegionInfo() { 1542 return delegate.getRegionInfo(); 1543 } 1544 1545 @Override 1546 public boolean isFilterDone() throws IOException { 1547 return delegate.isFilterDone(); 1548 } 1549 1550 @Override 1551 public boolean reseek(byte[] row) throws IOException { 1552 return false; 1553 } 1554 1555 @Override 1556 public long getMaxResultSize() { 1557 return delegate.getMaxResultSize(); 1558 } 1559 1560 @Override 1561 public long getMvccReadPoint() { 1562 return delegate.getMvccReadPoint(); 1563 } 1564 1565 @Override 1566 public int getBatch() { 1567 return delegate.getBatch(); 1568 } 1569 } 1570 1571 public static class CustomInnerRegionObserverWrapper extends CustomInnerRegionObserver { 1572 @Override 1573 public RegionScanner postScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e, 1574 Scan scan, RegionScanner s) throws IOException { 1575 return new CustomScanner(s); 1576 } 1577 } 1578 1579 public static class CustomInnerRegionObserver implements RegionCoprocessor, RegionObserver { 1580 static final AtomicInteger countOfNext = new AtomicInteger(0); 1581 static final AtomicInteger countOfGets = new AtomicInteger(0); 1582 static final AtomicBoolean waitForGets = new AtomicBoolean(false); 1583 static final AtomicBoolean throwException = new AtomicBoolean(false); 1584 private static final AtomicReference<CountDownLatch> cdl = new AtomicReference<>( 1585 new CountDownLatch(0)); 1586 1587 @Override 1588 public Optional<RegionObserver> getRegionObserver() { 1589 return Optional.of(this); 1590 } 1591 1592 @Override 1593 public boolean postScannerNext(ObserverContext<RegionCoprocessorEnvironment> e, 1594 InternalScanner s, List<Result> results, int limit, boolean hasMore) throws IOException { 1595 slowdownCode(e, false); 1596 if (getLatch != null && getLatch.getCount() > 0) { 1597 try { 1598 getLatch.await(); 1599 } catch (InterruptedException e1) { 1600 } 1601 } 1602 return hasMore; 1603 } 1604 1605 @Override 1606 public void postGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get, 1607 List<Cell> results) throws IOException { 1608 slowdownCode(e, true); 1609 } 1610 1611 public static AtomicReference<CountDownLatch> getCdl() { 1612 return cdl; 1613 } 1614 1615 private void slowdownCode(final ObserverContext<RegionCoprocessorEnvironment> e, 1616 boolean isGet) { 1617 CountDownLatch latch = getCdl().get(); 1618 try { 1619 System.out.println(latch.getCount() + " is the count " + isGet); 1620 if (latch.getCount() > 0) { 1621 if (isGet) { 1622 countOfGets.incrementAndGet(); 1623 } else { 1624 countOfNext.incrementAndGet(); 1625 } 1626 LOG.info("Waiting for the counterCountDownLatch"); 1627 latch.await(2, TimeUnit.MINUTES); // To help the tests to finish. 1628 if (latch.getCount() > 0) { 1629 throw new RuntimeException("Can't wait more"); 1630 } 1631 } 1632 } catch (InterruptedException e1) { 1633 LOG.error(e1.toString(), e1); 1634 } 1635 } 1636 } 1637}