001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.Iterator; 027import java.util.List; 028import java.util.Optional; 029import java.util.concurrent.CountDownLatch; 030import java.util.concurrent.TimeUnit; 031import java.util.concurrent.atomic.AtomicBoolean; 032import java.util.concurrent.atomic.AtomicInteger; 033import java.util.concurrent.atomic.AtomicReference; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.hbase.Cell; 036import org.apache.hadoop.hbase.HBaseClassTestRule; 037import org.apache.hadoop.hbase.HBaseTestingUtil; 038import org.apache.hadoop.hbase.HConstants; 039import org.apache.hadoop.hbase.ServerName; 040import org.apache.hadoop.hbase.TableName; 041import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 042import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; 043import org.apache.hadoop.hbase.coprocessor.ObserverContext; 044import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 045import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 046import org.apache.hadoop.hbase.coprocessor.RegionObserver; 047import org.apache.hadoop.hbase.io.hfile.BlockCache; 048import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 049import org.apache.hadoop.hbase.io.hfile.CacheConfig; 050import org.apache.hadoop.hbase.io.hfile.CachedBlock; 051import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache; 052import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; 053import org.apache.hadoop.hbase.regionserver.BloomType; 054import org.apache.hadoop.hbase.regionserver.HRegion; 055import org.apache.hadoop.hbase.regionserver.HStore; 056import org.apache.hadoop.hbase.regionserver.InternalScanner; 057import org.apache.hadoop.hbase.regionserver.RegionScanner; 058import org.apache.hadoop.hbase.regionserver.ScannerContext; 059import org.apache.hadoop.hbase.testclassification.ClientTests; 060import org.apache.hadoop.hbase.testclassification.LargeTests; 061import org.apache.hadoop.hbase.util.Bytes; 062import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 063import org.junit.After; 064import org.junit.AfterClass; 065import org.junit.Before; 066import org.junit.BeforeClass; 067import org.junit.ClassRule; 068import org.junit.Rule; 069import org.junit.Test; 070import org.junit.experimental.categories.Category; 071import org.junit.rules.TestName; 072import org.slf4j.Logger; 073import org.slf4j.LoggerFactory; 074 075import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 076 077@Category({ LargeTests.class, ClientTests.class }) 078@SuppressWarnings("deprecation") 079public class TestBlockEvictionFromClient { 080 081 @ClassRule 082 public static final HBaseClassTestRule CLASS_RULE = 083 HBaseClassTestRule.forClass(TestBlockEvictionFromClient.class); 084 085 private static final Logger LOG = LoggerFactory.getLogger(TestBlockEvictionFromClient.class); 086 protected final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 087 static byte[][] ROWS = new byte[2][]; 088 private static int NO_OF_THREADS = 3; 089 private static byte[] ROW = Bytes.toBytes("testRow"); 090 private static byte[] ROW1 = Bytes.toBytes("testRow1"); 091 private static byte[] ROW2 = Bytes.toBytes("testRow2"); 092 private static byte[] ROW3 = Bytes.toBytes("testRow3"); 093 private static byte[] FAMILY = Bytes.toBytes("testFamily"); 094 private static byte[][] FAMILIES_1 = new byte[1][0]; 095 private static byte[] QUALIFIER = Bytes.toBytes("testQualifier"); 096 private static byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 097 private static byte[] data = new byte[1000]; 098 private static byte[] data2 = Bytes.add(data, data); 099 protected static int SLAVES = 1; 100 private static CountDownLatch latch; 101 private static CountDownLatch getLatch; 102 private static CountDownLatch compactionLatch; 103 private static CountDownLatch exceptionLatch; 104 105 @Rule 106 public TestName name = new TestName(); 107 108 /** 109 * @throws java.lang.Exception 110 */ 111 @BeforeClass 112 public static void setUpBeforeClass() throws Exception { 113 ROWS[0] = ROW; 114 ROWS[1] = ROW1; 115 Configuration conf = TEST_UTIL.getConfiguration(); 116 conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, 117 MultiRowMutationEndpoint.class.getName()); 118 conf.setInt("hbase.regionserver.handler.count", 20); 119 conf.setInt("hbase.bucketcache.size", 400); 120 conf.setStrings(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap"); 121 conf.setFloat("hfile.block.cache.size", 0.2f); 122 conf.setFloat("hbase.regionserver.global.memstore.size", 0.1f); 123 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);// do not retry 124 conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 5000); 125 FAMILIES_1[0] = FAMILY; 126 TEST_UTIL.startMiniCluster(SLAVES); 127 } 128 129 /** 130 * @throws java.lang.Exception 131 */ 132 @AfterClass 133 public static void tearDownAfterClass() throws Exception { 134 TEST_UTIL.shutdownMiniCluster(); 135 } 136 137 /** 138 * @throws java.lang.Exception 139 */ 140 @Before 141 public void setUp() throws Exception { 142 CustomInnerRegionObserver.waitForGets.set(false); 143 CustomInnerRegionObserver.countOfNext.set(0); 144 CustomInnerRegionObserver.countOfGets.set(0); 145 } 146 147 /** 148 * @throws java.lang.Exception 149 */ 150 @After 151 public void tearDown() throws Exception { 152 if (latch != null) { 153 while (latch.getCount() > 0) { 154 latch.countDown(); 155 } 156 } 157 if (getLatch != null) { 158 getLatch.countDown(); 159 } 160 if (compactionLatch != null) { 161 compactionLatch.countDown(); 162 } 163 if (exceptionLatch != null) { 164 exceptionLatch.countDown(); 165 } 166 latch = null; 167 getLatch = null; 168 compactionLatch = null; 169 exceptionLatch = null; 170 CustomInnerRegionObserver.throwException.set(false); 171 // Clean up the tables for every test case 172 TableName[] listTableNames = TEST_UTIL.getAdmin().listTableNames(); 173 for (TableName tableName : listTableNames) { 174 if (!tableName.isSystemTable()) { 175 TEST_UTIL.getAdmin().disableTable(tableName); 176 TEST_UTIL.getAdmin().deleteTable(tableName); 177 } 178 } 179 } 180 181 @Test 182 public void testBlockEvictionWithParallelScans() throws Exception { 183 Table table = null; 184 try { 185 latch = new CountDownLatch(1); 186 final TableName tableName = TableName.valueOf(name.getMethodName()); 187 // Create a table with block size as 1024 188 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 189 CustomInnerRegionObserver.class.getName()); 190 // get the block cache and region 191 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 192 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 193 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 194 HStore store = region.getStores().iterator().next(); 195 CacheConfig cacheConf = store.getCacheConfig(); 196 cacheConf.setCacheDataOnWrite(true); 197 cacheConf.setEvictOnClose(true); 198 BlockCache cache = cacheConf.getBlockCache().get(); 199 200 // insert data. 2 Rows are added 201 Put put = new Put(ROW); 202 put.addColumn(FAMILY, QUALIFIER, data); 203 table.put(put); 204 put = new Put(ROW1); 205 put.addColumn(FAMILY, QUALIFIER, data); 206 table.put(put); 207 assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data)); 208 // data was in memstore so don't expect any changes 209 // flush the data 210 // Should create one Hfile with 2 blocks 211 region.flush(true); 212 // Load cache 213 // Create three sets of scan 214 ScanThread[] scanThreads = initiateScan(table, false); 215 Thread.sleep(100); 216 checkForBlockEviction(cache, false, false); 217 for (ScanThread thread : scanThreads) { 218 thread.join(); 219 } 220 // CustomInnerRegionObserver.sleepTime.set(0); 221 Iterator<CachedBlock> iterator = cache.iterator(); 222 iterateBlockCache(cache, iterator); 223 // read the data and expect same blocks, one new hit, no misses 224 assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data)); 225 iterator = cache.iterator(); 226 iterateBlockCache(cache, iterator); 227 // Check how this miss is happening 228 // insert a second column, read the row, no new blocks, 3 new hits 229 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 230 byte[] data2 = Bytes.add(data, data); 231 put = new Put(ROW); 232 put.addColumn(FAMILY, QUALIFIER2, data2); 233 table.put(put); 234 Result r = table.get(new Get(ROW)); 235 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data)); 236 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2)); 237 iterator = cache.iterator(); 238 iterateBlockCache(cache, iterator); 239 // flush, one new block 240 System.out.println("Flushing cache"); 241 region.flush(true); 242 iterator = cache.iterator(); 243 iterateBlockCache(cache, iterator); 244 // compact, net minus two blocks, two hits, no misses 245 System.out.println("Compacting"); 246 assertEquals(2, store.getStorefilesCount()); 247 store.triggerMajorCompaction(); 248 region.compact(true); 249 waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max 250 assertEquals(1, store.getStorefilesCount()); 251 iterator = cache.iterator(); 252 iterateBlockCache(cache, iterator); 253 // read the row, this should be a cache miss because we don't cache data 254 // blocks on compaction 255 r = table.get(new Get(ROW)); 256 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data)); 257 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2)); 258 iterator = cache.iterator(); 259 iterateBlockCache(cache, iterator); 260 } finally { 261 if (table != null) { 262 table.close(); 263 } 264 } 265 } 266 267 @Test 268 public void testParallelGetsAndScans() throws IOException, InterruptedException { 269 Table table = null; 270 try { 271 latch = new CountDownLatch(2); 272 // Check if get() returns blocks on its close() itself 273 getLatch = new CountDownLatch(1); 274 final TableName tableName = TableName.valueOf(name.getMethodName()); 275 // Create KV that will give you two blocks 276 // Create a table with block size as 1024 277 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 278 CustomInnerRegionObserver.class.getName()); 279 // get the block cache and region 280 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 281 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 282 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 283 HStore store = region.getStores().iterator().next(); 284 CacheConfig cacheConf = store.getCacheConfig(); 285 cacheConf.setCacheDataOnWrite(true); 286 cacheConf.setEvictOnClose(true); 287 BlockCache cache = cacheConf.getBlockCache().get(); 288 289 insertData(table); 290 // flush the data 291 System.out.println("Flushing cache"); 292 // Should create one Hfile with 2 blocks 293 region.flush(true); 294 // Create three sets of scan 295 CustomInnerRegionObserver.waitForGets.set(true); 296 ScanThread[] scanThreads = initiateScan(table, false); 297 // Create three sets of gets 298 GetThread[] getThreads = initiateGet(table, false, false); 299 checkForBlockEviction(cache, false, false); 300 CustomInnerRegionObserver.waitForGets.set(false); 301 checkForBlockEviction(cache, false, false); 302 for (GetThread thread : getThreads) { 303 thread.join(); 304 } 305 // Verify whether the gets have returned the blocks that it had 306 CustomInnerRegionObserver.waitForGets.set(true); 307 // giving some time for the block to be decremented 308 checkForBlockEviction(cache, true, false); 309 getLatch.countDown(); 310 for (ScanThread thread : scanThreads) { 311 thread.join(); 312 } 313 System.out.println("Scans should have returned the bloks"); 314 // Check with either true or false 315 CustomInnerRegionObserver.waitForGets.set(false); 316 // The scan should also have released the blocks by now 317 checkForBlockEviction(cache, true, true); 318 } finally { 319 if (table != null) { 320 table.close(); 321 } 322 } 323 } 324 325 @Test 326 public void testGetWithCellsInDifferentFiles() throws IOException, InterruptedException { 327 Table table = null; 328 try { 329 latch = new CountDownLatch(1); 330 // Check if get() returns blocks on its close() itself 331 getLatch = new CountDownLatch(1); 332 final TableName tableName = TableName.valueOf(name.getMethodName()); 333 // Create KV that will give you two blocks 334 // Create a table with block size as 1024 335 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 336 CustomInnerRegionObserver.class.getName()); 337 // get the block cache and region 338 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 339 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 340 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 341 HStore store = region.getStores().iterator().next(); 342 CacheConfig cacheConf = store.getCacheConfig(); 343 cacheConf.setCacheDataOnWrite(true); 344 cacheConf.setEvictOnClose(true); 345 BlockCache cache = cacheConf.getBlockCache().get(); 346 347 Put put = new Put(ROW); 348 put.addColumn(FAMILY, QUALIFIER, data); 349 table.put(put); 350 region.flush(true); 351 put = new Put(ROW1); 352 put.addColumn(FAMILY, QUALIFIER, data); 353 table.put(put); 354 region.flush(true); 355 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 356 put = new Put(ROW); 357 put.addColumn(FAMILY, QUALIFIER2, data2); 358 table.put(put); 359 region.flush(true); 360 // flush the data 361 System.out.println("Flushing cache"); 362 // Should create one Hfile with 2 blocks 363 CustomInnerRegionObserver.waitForGets.set(true); 364 // Create three sets of gets 365 GetThread[] getThreads = initiateGet(table, false, false); 366 Thread.sleep(200); 367 CustomInnerRegionObserver.getCdl().get().countDown(); 368 for (GetThread thread : getThreads) { 369 thread.join(); 370 } 371 // Verify whether the gets have returned the blocks that it had 372 CustomInnerRegionObserver.waitForGets.set(true); 373 // giving some time for the block to be decremented 374 checkForBlockEviction(cache, true, false); 375 getLatch.countDown(); 376 System.out.println("Gets should have returned the bloks"); 377 } finally { 378 if (table != null) { 379 table.close(); 380 } 381 } 382 } 383 384 @Test 385 // TODO : check how block index works here 386 public void testGetsWithMultiColumnsAndExplicitTracker() 387 throws IOException, InterruptedException { 388 Table table = null; 389 try { 390 latch = new CountDownLatch(1); 391 // Check if get() returns blocks on its close() itself 392 getLatch = new CountDownLatch(1); 393 final TableName tableName = TableName.valueOf(name.getMethodName()); 394 // Create KV that will give you two blocks 395 // Create a table with block size as 1024 396 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 397 CustomInnerRegionObserver.class.getName()); 398 // get the block cache and region 399 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 400 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 401 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 402 BlockCache cache = setCacheProperties(region); 403 Put put = new Put(ROW); 404 put.addColumn(FAMILY, QUALIFIER, data); 405 table.put(put); 406 region.flush(true); 407 put = new Put(ROW1); 408 put.addColumn(FAMILY, QUALIFIER, data); 409 table.put(put); 410 region.flush(true); 411 for (int i = 1; i < 10; i++) { 412 put = new Put(ROW); 413 put.addColumn(FAMILY, Bytes.toBytes("testQualifier" + i), data2); 414 table.put(put); 415 if (i % 2 == 0) { 416 region.flush(true); 417 } 418 } 419 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 420 put = new Put(ROW); 421 put.addColumn(FAMILY, QUALIFIER2, data2); 422 table.put(put); 423 region.flush(true); 424 // flush the data 425 System.out.println("Flushing cache"); 426 // Should create one Hfile with 2 blocks 427 CustomInnerRegionObserver.waitForGets.set(true); 428 // Create three sets of gets 429 GetThread[] getThreads = initiateGet(table, true, false); 430 Thread.sleep(200); 431 Iterator<CachedBlock> iterator = cache.iterator(); 432 boolean usedBlocksFound = false; 433 int refCount = 0; 434 int noOfBlocksWithRef = 0; 435 while (iterator.hasNext()) { 436 CachedBlock next = iterator.next(); 437 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 438 if (cache instanceof BucketCache) { 439 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 440 } else if (cache instanceof CombinedBlockCache) { 441 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 442 } else { 443 continue; 444 } 445 if (refCount != 0) { 446 // Blocks will be with count 3 447 System.out.println("The refCount is " + refCount); 448 assertEquals(NO_OF_THREADS, refCount); 449 usedBlocksFound = true; 450 noOfBlocksWithRef++; 451 } 452 } 453 assertTrue(usedBlocksFound); 454 // the number of blocks referred 455 assertEquals(10, noOfBlocksWithRef); 456 CustomInnerRegionObserver.getCdl().get().countDown(); 457 for (GetThread thread : getThreads) { 458 thread.join(); 459 } 460 // Verify whether the gets have returned the blocks that it had 461 CustomInnerRegionObserver.waitForGets.set(true); 462 // giving some time for the block to be decremented 463 checkForBlockEviction(cache, true, false); 464 getLatch.countDown(); 465 System.out.println("Gets should have returned the bloks"); 466 } finally { 467 if (table != null) { 468 table.close(); 469 } 470 } 471 } 472 473 @Test 474 public void testGetWithMultipleColumnFamilies() throws IOException, InterruptedException { 475 Table table = null; 476 try { 477 latch = new CountDownLatch(1); 478 // Check if get() returns blocks on its close() itself 479 getLatch = new CountDownLatch(1); 480 final TableName tableName = TableName.valueOf(name.getMethodName()); 481 // Create KV that will give you two blocks 482 // Create a table with block size as 1024 483 byte[][] fams = new byte[10][]; 484 fams[0] = FAMILY; 485 for (int i = 1; i < 10; i++) { 486 fams[i] = (Bytes.toBytes("testFamily" + i)); 487 } 488 table = 489 TEST_UTIL.createTable(tableName, fams, 1, 1024, CustomInnerRegionObserver.class.getName()); 490 // get the block cache and region 491 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 492 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 493 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 494 BlockCache cache = setCacheProperties(region); 495 496 Put put = new Put(ROW); 497 put.addColumn(FAMILY, QUALIFIER, data); 498 table.put(put); 499 region.flush(true); 500 put = new Put(ROW1); 501 put.addColumn(FAMILY, QUALIFIER, data); 502 table.put(put); 503 region.flush(true); 504 for (int i = 1; i < 10; i++) { 505 put = new Put(ROW); 506 put.addColumn(Bytes.toBytes("testFamily" + i), Bytes.toBytes("testQualifier" + i), data2); 507 table.put(put); 508 if (i % 2 == 0) { 509 region.flush(true); 510 } 511 } 512 region.flush(true); 513 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 514 put = new Put(ROW); 515 put.addColumn(FAMILY, QUALIFIER2, data2); 516 table.put(put); 517 region.flush(true); 518 // flush the data 519 System.out.println("Flushing cache"); 520 // Should create one Hfile with 2 blocks 521 CustomInnerRegionObserver.waitForGets.set(true); 522 // Create three sets of gets 523 GetThread[] getThreads = initiateGet(table, true, true); 524 Thread.sleep(200); 525 Iterator<CachedBlock> iterator = cache.iterator(); 526 boolean usedBlocksFound = false; 527 int refCount = 0; 528 int noOfBlocksWithRef = 0; 529 while (iterator.hasNext()) { 530 CachedBlock next = iterator.next(); 531 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 532 if (cache instanceof BucketCache) { 533 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 534 } else if (cache instanceof CombinedBlockCache) { 535 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 536 } else { 537 continue; 538 } 539 if (refCount != 0) { 540 // Blocks will be with count 3 541 System.out.println("The refCount is " + refCount); 542 assertEquals(NO_OF_THREADS, refCount); 543 usedBlocksFound = true; 544 noOfBlocksWithRef++; 545 } 546 } 547 assertTrue(usedBlocksFound); 548 // the number of blocks referred 549 assertEquals(3, noOfBlocksWithRef); 550 CustomInnerRegionObserver.getCdl().get().countDown(); 551 for (GetThread thread : getThreads) { 552 thread.join(); 553 } 554 // Verify whether the gets have returned the blocks that it had 555 CustomInnerRegionObserver.waitForGets.set(true); 556 // giving some time for the block to be decremented 557 checkForBlockEviction(cache, true, false); 558 getLatch.countDown(); 559 System.out.println("Gets should have returned the bloks"); 560 } finally { 561 if (table != null) { 562 table.close(); 563 } 564 } 565 } 566 567 @Test 568 public void testBlockRefCountAfterSplits() throws IOException, InterruptedException { 569 Table table = null; 570 try { 571 final TableName tableName = TableName.valueOf(name.getMethodName()); 572 TableDescriptor desc = TEST_UTIL.createTableDescriptor(tableName); 573 // This test expects rpc refcount of cached data blocks to be 0 after split. After split, 574 // two daughter regions are opened and a compaction is scheduled to get rid of reference 575 // of the parent region hfiles. Compaction will increase refcount of cached data blocks by 1. 576 // It is flakey since compaction can kick in anytime. To solve this issue, table is created 577 // with compaction disabled. 578 table = TEST_UTIL.createTable( 579 TableDescriptorBuilder.newBuilder(desc).setCompactionEnabled(false).build(), FAMILIES_1, 580 null, BloomType.ROW, 1024, null); 581 // get the block cache and region 582 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 583 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 584 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 585 HStore store = region.getStores().iterator().next(); 586 CacheConfig cacheConf = store.getCacheConfig(); 587 cacheConf.setEvictOnClose(true); 588 BlockCache cache = cacheConf.getBlockCache().get(); 589 590 Put put = new Put(ROW); 591 put.addColumn(FAMILY, QUALIFIER, data); 592 table.put(put); 593 region.flush(true); 594 put = new Put(ROW1); 595 put.addColumn(FAMILY, QUALIFIER, data); 596 table.put(put); 597 region.flush(true); 598 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 599 put = new Put(ROW2); 600 put.addColumn(FAMILY, QUALIFIER2, data2); 601 table.put(put); 602 put = new Put(ROW3); 603 put.addColumn(FAMILY, QUALIFIER2, data2); 604 table.put(put); 605 region.flush(true); 606 ServerName rs = Iterables.getOnlyElement(TEST_UTIL.getAdmin().getRegionServers()); 607 int regionCount = TEST_UTIL.getAdmin().getRegions(rs).size(); 608 LOG.info("About to SPLIT on {} {}, count={}", Bytes.toString(ROW1), region.getRegionInfo(), 609 regionCount); 610 TEST_UTIL.getAdmin().split(tableName, ROW1); 611 // Wait for splits 612 TEST_UTIL.waitFor(60000, () -> TEST_UTIL.getAdmin().getRegions(rs).size() > regionCount); 613 region.compact(true); 614 List<HRegion> regions = TEST_UTIL.getMiniHBaseCluster().getRegionServer(rs).getRegions(); 615 for (HRegion r : regions) { 616 LOG.info("" + r.getCompactionState()); 617 TEST_UTIL.waitFor(30000, () -> r.getCompactionState().equals(CompactionState.NONE)); 618 } 619 LOG.info("Split finished, is region closed {} {}", region.isClosed(), cache); 620 Iterator<CachedBlock> iterator = cache.iterator(); 621 // Though the split had created the HalfStorefileReader - the firstkey and lastkey scanners 622 // should be closed inorder to return those blocks 623 iterateBlockCache(cache, iterator); 624 } finally { 625 if (table != null) { 626 table.close(); 627 } 628 } 629 } 630 631 @Test 632 public void testMultiGets() throws IOException, InterruptedException { 633 Table table = null; 634 try { 635 latch = new CountDownLatch(2); 636 // Check if get() returns blocks on its close() itself 637 getLatch = new CountDownLatch(1); 638 final TableName tableName = TableName.valueOf(name.getMethodName()); 639 // Create KV that will give you two blocks 640 // Create a table with block size as 1024 641 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 642 CustomInnerRegionObserver.class.getName()); 643 // get the block cache and region 644 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 645 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 646 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 647 HStore store = region.getStores().iterator().next(); 648 CacheConfig cacheConf = store.getCacheConfig(); 649 cacheConf.setCacheDataOnWrite(true); 650 cacheConf.setEvictOnClose(true); 651 BlockCache cache = cacheConf.getBlockCache().get(); 652 653 Put put = new Put(ROW); 654 put.addColumn(FAMILY, QUALIFIER, data); 655 table.put(put); 656 region.flush(true); 657 put = new Put(ROW1); 658 put.addColumn(FAMILY, QUALIFIER, data); 659 table.put(put); 660 region.flush(true); 661 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 662 put = new Put(ROW); 663 put.addColumn(FAMILY, QUALIFIER2, data2); 664 table.put(put); 665 region.flush(true); 666 // flush the data 667 System.out.println("Flushing cache"); 668 // Should create one Hfile with 2 blocks 669 CustomInnerRegionObserver.waitForGets.set(true); 670 // Create three sets of gets 671 MultiGetThread[] getThreads = initiateMultiGet(table); 672 Thread.sleep(200); 673 int refCount; 674 Iterator<CachedBlock> iterator = cache.iterator(); 675 boolean foundNonZeroBlock = false; 676 while (iterator.hasNext()) { 677 CachedBlock next = iterator.next(); 678 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 679 if (cache instanceof BucketCache) { 680 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 681 } else if (cache instanceof CombinedBlockCache) { 682 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 683 } else { 684 continue; 685 } 686 if (refCount != 0) { 687 assertEquals(NO_OF_THREADS, refCount); 688 foundNonZeroBlock = true; 689 } 690 } 691 assertTrue("Should have found nonzero ref count block", foundNonZeroBlock); 692 CustomInnerRegionObserver.getCdl().get().countDown(); 693 CustomInnerRegionObserver.getCdl().get().countDown(); 694 for (MultiGetThread thread : getThreads) { 695 thread.join(); 696 } 697 // Verify whether the gets have returned the blocks that it had 698 CustomInnerRegionObserver.waitForGets.set(true); 699 // giving some time for the block to be decremented 700 iterateBlockCache(cache, iterator); 701 getLatch.countDown(); 702 System.out.println("Gets should have returned the bloks"); 703 } finally { 704 if (table != null) { 705 table.close(); 706 } 707 } 708 } 709 710 @Test 711 public void testScanWithMultipleColumnFamilies() throws IOException, InterruptedException { 712 Table table = null; 713 try { 714 latch = new CountDownLatch(1); 715 // Check if get() returns blocks on its close() itself 716 final TableName tableName = TableName.valueOf(name.getMethodName()); 717 // Create KV that will give you two blocks 718 // Create a table with block size as 1024 719 byte[][] fams = new byte[10][]; 720 fams[0] = FAMILY; 721 for (int i = 1; i < 10; i++) { 722 fams[i] = (Bytes.toBytes("testFamily" + i)); 723 } 724 table = 725 TEST_UTIL.createTable(tableName, fams, 1, 1024, CustomInnerRegionObserver.class.getName()); 726 // get the block cache and region 727 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 728 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 729 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 730 BlockCache cache = setCacheProperties(region); 731 732 Put put = new Put(ROW); 733 put.addColumn(FAMILY, QUALIFIER, data); 734 table.put(put); 735 region.flush(true); 736 put = new Put(ROW1); 737 put.addColumn(FAMILY, QUALIFIER, data); 738 table.put(put); 739 region.flush(true); 740 for (int i = 1; i < 10; i++) { 741 put = new Put(ROW); 742 put.addColumn(Bytes.toBytes("testFamily" + i), Bytes.toBytes("testQualifier" + i), data2); 743 table.put(put); 744 if (i % 2 == 0) { 745 region.flush(true); 746 } 747 } 748 region.flush(true); 749 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 750 put = new Put(ROW); 751 put.addColumn(FAMILY, QUALIFIER2, data2); 752 table.put(put); 753 region.flush(true); 754 // flush the data 755 System.out.println("Flushing cache"); 756 // Should create one Hfile with 2 blocks 757 // Create three sets of gets 758 ScanThread[] scanThreads = initiateScan(table, true); 759 Thread.sleep(200); 760 Iterator<CachedBlock> iterator = cache.iterator(); 761 boolean usedBlocksFound = false; 762 int refCount = 0; 763 int noOfBlocksWithRef = 0; 764 while (iterator.hasNext()) { 765 CachedBlock next = iterator.next(); 766 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 767 if (cache instanceof BucketCache) { 768 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 769 } else if (cache instanceof CombinedBlockCache) { 770 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 771 } else { 772 continue; 773 } 774 if (refCount != 0) { 775 // Blocks will be with count 3 776 System.out.println("The refCount is " + refCount); 777 assertEquals(NO_OF_THREADS, refCount); 778 usedBlocksFound = true; 779 noOfBlocksWithRef++; 780 } 781 } 782 assertTrue(usedBlocksFound); 783 // the number of blocks referred 784 assertEquals(12, noOfBlocksWithRef); 785 CustomInnerRegionObserver.getCdl().get().countDown(); 786 for (ScanThread thread : scanThreads) { 787 thread.join(); 788 } 789 // giving some time for the block to be decremented 790 checkForBlockEviction(cache, true, false); 791 } finally { 792 if (table != null) { 793 table.close(); 794 } 795 } 796 } 797 798 private BlockCache setCacheProperties(HRegion region) { 799 Iterator<HStore> strItr = region.getStores().iterator(); 800 BlockCache cache = null; 801 while (strItr.hasNext()) { 802 HStore store = strItr.next(); 803 CacheConfig cacheConf = store.getCacheConfig(); 804 cacheConf.setCacheDataOnWrite(true); 805 cacheConf.setEvictOnClose(true); 806 // Use the last one 807 cache = cacheConf.getBlockCache().get(); 808 } 809 return cache; 810 } 811 812 @Test 813 public void testParallelGetsAndScanWithWrappedRegionScanner() 814 throws IOException, InterruptedException { 815 Table table = null; 816 try { 817 latch = new CountDownLatch(2); 818 // Check if get() returns blocks on its close() itself 819 getLatch = new CountDownLatch(1); 820 final TableName tableName = TableName.valueOf(name.getMethodName()); 821 // Create KV that will give you two blocks 822 // Create a table with block size as 1024 823 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 824 CustomInnerRegionObserverWrapper.class.getName()); 825 // get the block cache and region 826 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 827 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 828 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 829 HStore store = region.getStores().iterator().next(); 830 CacheConfig cacheConf = store.getCacheConfig(); 831 cacheConf.setCacheDataOnWrite(true); 832 cacheConf.setEvictOnClose(true); 833 BlockCache cache = cacheConf.getBlockCache().get(); 834 835 // insert data. 2 Rows are added 836 insertData(table); 837 // flush the data 838 System.out.println("Flushing cache"); 839 // Should create one Hfile with 2 blocks 840 region.flush(true); 841 // CustomInnerRegionObserver.sleepTime.set(5000); 842 // Create three sets of scan 843 CustomInnerRegionObserver.waitForGets.set(true); 844 ScanThread[] scanThreads = initiateScan(table, false); 845 // Create three sets of gets 846 GetThread[] getThreads = initiateGet(table, false, false); 847 // The block would have been decremented for the scan case as it was 848 // wrapped 849 // before even the postNext hook gets executed. 850 // giving some time for the block to be decremented 851 Thread.sleep(100); 852 CustomInnerRegionObserver.waitForGets.set(false); 853 checkForBlockEviction(cache, false, false); 854 // countdown the latch 855 CustomInnerRegionObserver.getCdl().get().countDown(); 856 for (GetThread thread : getThreads) { 857 thread.join(); 858 } 859 getLatch.countDown(); 860 for (ScanThread thread : scanThreads) { 861 thread.join(); 862 } 863 } finally { 864 if (table != null) { 865 table.close(); 866 } 867 } 868 } 869 870 @Test 871 public void testScanWithCompaction() throws IOException, InterruptedException { 872 testScanWithCompactionInternals(name.getMethodName(), false); 873 } 874 875 @Test 876 public void testReverseScanWithCompaction() throws IOException, InterruptedException { 877 testScanWithCompactionInternals(name.getMethodName(), true); 878 } 879 880 private void testScanWithCompactionInternals(String tableNameStr, boolean reversed) 881 throws IOException, InterruptedException { 882 Table table = null; 883 try { 884 latch = new CountDownLatch(1); 885 compactionLatch = new CountDownLatch(1); 886 TableName tableName = TableName.valueOf(tableNameStr); 887 // Create a table with block size as 1024 888 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 889 CustomInnerRegionObserverWrapper.class.getName()); 890 // get the block cache and region 891 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 892 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 893 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 894 HStore store = region.getStores().iterator().next(); 895 CacheConfig cacheConf = store.getCacheConfig(); 896 cacheConf.setCacheDataOnWrite(true); 897 cacheConf.setEvictOnClose(true); 898 BlockCache cache = cacheConf.getBlockCache().get(); 899 900 // insert data. 2 Rows are added 901 Put put = new Put(ROW); 902 put.addColumn(FAMILY, QUALIFIER, data); 903 table.put(put); 904 put = new Put(ROW1); 905 put.addColumn(FAMILY, QUALIFIER, data); 906 table.put(put); 907 assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data)); 908 // Should create one Hfile with 2 blocks 909 region.flush(true); 910 // read the data and expect same blocks, one new hit, no misses 911 int refCount = 0; 912 // Check how this miss is happening 913 // insert a second column, read the row, no new blocks, 3 new hits 914 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 915 byte[] data2 = Bytes.add(data, data); 916 put = new Put(ROW); 917 put.addColumn(FAMILY, QUALIFIER2, data2); 918 table.put(put); 919 // flush, one new block 920 System.out.println("Flushing cache"); 921 region.flush(true); 922 Iterator<CachedBlock> iterator = cache.iterator(); 923 iterateBlockCache(cache, iterator); 924 // Create three sets of scan 925 ScanThread[] scanThreads = initiateScan(table, reversed); 926 Thread.sleep(100); 927 iterator = cache.iterator(); 928 boolean usedBlocksFound = false; 929 while (iterator.hasNext()) { 930 CachedBlock next = iterator.next(); 931 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 932 if (cache instanceof BucketCache) { 933 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 934 } else if (cache instanceof CombinedBlockCache) { 935 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 936 } else { 937 continue; 938 } 939 if (refCount != 0) { 940 // Blocks will be with count 3 941 assertEquals(NO_OF_THREADS, refCount); 942 usedBlocksFound = true; 943 } 944 } 945 assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound); 946 usedBlocksFound = false; 947 System.out.println("Compacting"); 948 assertEquals(2, store.getStorefilesCount()); 949 store.triggerMajorCompaction(); 950 region.compact(true); 951 waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max 952 assertEquals(1, store.getStorefilesCount()); 953 // Even after compaction is done we will have some blocks that cannot 954 // be evicted this is because the scan is still referencing them 955 iterator = cache.iterator(); 956 while (iterator.hasNext()) { 957 CachedBlock next = iterator.next(); 958 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 959 if (cache instanceof BucketCache) { 960 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 961 } else if (cache instanceof CombinedBlockCache) { 962 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 963 } else { 964 continue; 965 } 966 if (refCount != 0) { 967 // Blocks will be with count 3 as they are not yet cleared 968 assertEquals(NO_OF_THREADS, refCount); 969 usedBlocksFound = true; 970 } 971 } 972 assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound); 973 // Should not throw exception 974 compactionLatch.countDown(); 975 latch.countDown(); 976 for (ScanThread thread : scanThreads) { 977 thread.join(); 978 } 979 // by this time all blocks should have been evicted 980 iterator = cache.iterator(); 981 iterateBlockCache(cache, iterator); 982 Result r = table.get(new Get(ROW)); 983 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data)); 984 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2)); 985 // The gets would be working on new blocks 986 iterator = cache.iterator(); 987 iterateBlockCache(cache, iterator); 988 } finally { 989 if (table != null) { 990 table.close(); 991 } 992 } 993 } 994 995 @Test 996 public void testBlockEvictionAfterHBASE13082WithCompactionAndFlush() 997 throws IOException, InterruptedException { 998 // do flush and scan in parallel 999 Table table = null; 1000 try { 1001 latch = new CountDownLatch(1); 1002 compactionLatch = new CountDownLatch(1); 1003 final TableName tableName = TableName.valueOf(name.getMethodName()); 1004 // Create a table with block size as 1024 1005 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 1006 CustomInnerRegionObserverWrapper.class.getName()); 1007 // get the block cache and region 1008 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 1009 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 1010 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 1011 HStore store = region.getStores().iterator().next(); 1012 CacheConfig cacheConf = store.getCacheConfig(); 1013 cacheConf.setCacheDataOnWrite(true); 1014 cacheConf.setEvictOnClose(true); 1015 BlockCache cache = cacheConf.getBlockCache().get(); 1016 1017 // insert data. 2 Rows are added 1018 Put put = new Put(ROW); 1019 put.addColumn(FAMILY, QUALIFIER, data); 1020 table.put(put); 1021 put = new Put(ROW1); 1022 put.addColumn(FAMILY, QUALIFIER, data); 1023 table.put(put); 1024 assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data)); 1025 // Should create one Hfile with 2 blocks 1026 region.flush(true); 1027 // read the data and expect same blocks, one new hit, no misses 1028 int refCount = 0; 1029 // Check how this miss is happening 1030 // insert a second column, read the row, no new blocks, 3 new hits 1031 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 1032 byte[] data2 = Bytes.add(data, data); 1033 put = new Put(ROW); 1034 put.addColumn(FAMILY, QUALIFIER2, data2); 1035 table.put(put); 1036 // flush, one new block 1037 System.out.println("Flushing cache"); 1038 region.flush(true); 1039 Iterator<CachedBlock> iterator = cache.iterator(); 1040 iterateBlockCache(cache, iterator); 1041 // Create three sets of scan 1042 ScanThread[] scanThreads = initiateScan(table, false); 1043 Thread.sleep(100); 1044 iterator = cache.iterator(); 1045 boolean usedBlocksFound = false; 1046 while (iterator.hasNext()) { 1047 CachedBlock next = iterator.next(); 1048 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 1049 if (cache instanceof BucketCache) { 1050 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 1051 } else if (cache instanceof CombinedBlockCache) { 1052 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 1053 } else { 1054 continue; 1055 } 1056 if (refCount != 0) { 1057 // Blocks will be with count 3 1058 assertEquals(NO_OF_THREADS, refCount); 1059 usedBlocksFound = true; 1060 } 1061 } 1062 // Make a put and do a flush 1063 QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 1064 data2 = Bytes.add(data, data); 1065 put = new Put(ROW1); 1066 put.addColumn(FAMILY, QUALIFIER2, data2); 1067 table.put(put); 1068 // flush, one new block 1069 System.out.println("Flushing cache"); 1070 region.flush(true); 1071 assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound); 1072 usedBlocksFound = false; 1073 System.out.println("Compacting"); 1074 assertEquals(3, store.getStorefilesCount()); 1075 store.triggerMajorCompaction(); 1076 region.compact(true); 1077 waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max 1078 assertEquals(1, store.getStorefilesCount()); 1079 // Even after compaction is done we will have some blocks that cannot 1080 // be evicted this is because the scan is still referencing them 1081 iterator = cache.iterator(); 1082 while (iterator.hasNext()) { 1083 CachedBlock next = iterator.next(); 1084 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 1085 if (cache instanceof BucketCache) { 1086 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 1087 } else if (cache instanceof CombinedBlockCache) { 1088 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 1089 } else { 1090 continue; 1091 } 1092 if (refCount != 0) { 1093 // Blocks will be with count 3 as they are not yet cleared 1094 assertEquals(NO_OF_THREADS, refCount); 1095 usedBlocksFound = true; 1096 } 1097 } 1098 assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound); 1099 // Should not throw exception 1100 compactionLatch.countDown(); 1101 latch.countDown(); 1102 for (ScanThread thread : scanThreads) { 1103 thread.join(); 1104 } 1105 // by this time all blocks should have been evicted 1106 iterator = cache.iterator(); 1107 // Since a flush and compaction happened after a scan started 1108 // we need to ensure that all the original blocks of the compacted file 1109 // is also removed. 1110 iterateBlockCache(cache, iterator); 1111 Result r = table.get(new Get(ROW)); 1112 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data)); 1113 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2)); 1114 // The gets would be working on new blocks 1115 iterator = cache.iterator(); 1116 iterateBlockCache(cache, iterator); 1117 } finally { 1118 if (table != null) { 1119 table.close(); 1120 } 1121 } 1122 } 1123 1124 @Test 1125 public void testScanWithException() throws IOException, InterruptedException { 1126 Table table = null; 1127 try { 1128 latch = new CountDownLatch(1); 1129 exceptionLatch = new CountDownLatch(1); 1130 final TableName tableName = TableName.valueOf(name.getMethodName()); 1131 // Create KV that will give you two blocks 1132 // Create a table with block size as 1024 1133 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 1134 CustomInnerRegionObserverWrapper.class.getName()); 1135 // get the block cache and region 1136 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 1137 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 1138 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 1139 HStore store = region.getStores().iterator().next(); 1140 CacheConfig cacheConf = store.getCacheConfig(); 1141 cacheConf.setCacheDataOnWrite(true); 1142 cacheConf.setEvictOnClose(true); 1143 BlockCache cache = cacheConf.getBlockCache().get(); 1144 // insert data. 2 Rows are added 1145 insertData(table); 1146 // flush the data 1147 System.out.println("Flushing cache"); 1148 // Should create one Hfile with 2 blocks 1149 region.flush(true); 1150 // CustomInnerRegionObserver.sleepTime.set(5000); 1151 CustomInnerRegionObserver.throwException.set(true); 1152 ScanThread[] scanThreads = initiateScan(table, false); 1153 // The block would have been decremented for the scan case as it was 1154 // wrapped 1155 // before even the postNext hook gets executed. 1156 // giving some time for the block to be decremented 1157 Thread.sleep(100); 1158 Iterator<CachedBlock> iterator = cache.iterator(); 1159 boolean usedBlocksFound = false; 1160 int refCount = 0; 1161 while (iterator.hasNext()) { 1162 CachedBlock next = iterator.next(); 1163 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 1164 if (cache instanceof BucketCache) { 1165 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 1166 } else if (cache instanceof CombinedBlockCache) { 1167 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 1168 } else { 1169 continue; 1170 } 1171 if (refCount != 0) { 1172 // Blocks will be with count 3 1173 assertEquals(NO_OF_THREADS, refCount); 1174 usedBlocksFound = true; 1175 } 1176 } 1177 assertTrue(usedBlocksFound); 1178 exceptionLatch.countDown(); 1179 // countdown the latch 1180 CustomInnerRegionObserver.getCdl().get().countDown(); 1181 for (ScanThread thread : scanThreads) { 1182 thread.join(); 1183 } 1184 iterator = cache.iterator(); 1185 usedBlocksFound = false; 1186 refCount = 0; 1187 while (iterator.hasNext()) { 1188 CachedBlock next = iterator.next(); 1189 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 1190 if (cache instanceof BucketCache) { 1191 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 1192 } else if (cache instanceof CombinedBlockCache) { 1193 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 1194 } else { 1195 continue; 1196 } 1197 if (refCount != 0) { 1198 // Blocks will be with count 3 1199 assertEquals(NO_OF_THREADS, refCount); 1200 usedBlocksFound = true; 1201 } 1202 } 1203 assertFalse(usedBlocksFound); 1204 // you should always see 0 ref count. since after HBASE-16604 we always recreate the scanner 1205 assertEquals(0, refCount); 1206 } finally { 1207 if (table != null) { 1208 table.close(); 1209 } 1210 } 1211 } 1212 1213 private void iterateBlockCache(BlockCache cache, Iterator<CachedBlock> iterator) { 1214 int refCount; 1215 while (iterator.hasNext()) { 1216 CachedBlock next = iterator.next(); 1217 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 1218 if (cache instanceof BucketCache) { 1219 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 1220 LOG.info("BucketCache {} {}", cacheKey, refCount); 1221 } else if (cache instanceof CombinedBlockCache) { 1222 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 1223 LOG.info("CombinedBlockCache {} {}", cacheKey, refCount); 1224 } else { 1225 continue; 1226 } 1227 assertEquals(0, refCount); 1228 } 1229 } 1230 1231 private void insertData(Table table) throws IOException { 1232 Put put = new Put(ROW); 1233 put.addColumn(FAMILY, QUALIFIER, data); 1234 table.put(put); 1235 put = new Put(ROW1); 1236 put.addColumn(FAMILY, QUALIFIER, data); 1237 table.put(put); 1238 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 1239 put = new Put(ROW); 1240 put.addColumn(FAMILY, QUALIFIER2, data2); 1241 table.put(put); 1242 } 1243 1244 private ScanThread[] initiateScan(Table table, boolean reverse) 1245 throws IOException, InterruptedException { 1246 ScanThread[] scanThreads = new ScanThread[NO_OF_THREADS]; 1247 for (int i = 0; i < NO_OF_THREADS; i++) { 1248 scanThreads[i] = new ScanThread(table, reverse); 1249 } 1250 for (ScanThread thread : scanThreads) { 1251 thread.start(); 1252 } 1253 return scanThreads; 1254 } 1255 1256 private GetThread[] initiateGet(Table table, boolean tracker, boolean multipleCFs) 1257 throws IOException, InterruptedException { 1258 GetThread[] getThreads = new GetThread[NO_OF_THREADS]; 1259 for (int i = 0; i < NO_OF_THREADS; i++) { 1260 getThreads[i] = new GetThread(table, tracker, multipleCFs); 1261 } 1262 for (GetThread thread : getThreads) { 1263 thread.start(); 1264 } 1265 return getThreads; 1266 } 1267 1268 private MultiGetThread[] initiateMultiGet(Table table) throws IOException, InterruptedException { 1269 MultiGetThread[] multiGetThreads = new MultiGetThread[NO_OF_THREADS]; 1270 for (int i = 0; i < NO_OF_THREADS; i++) { 1271 multiGetThreads[i] = new MultiGetThread(table); 1272 } 1273 for (MultiGetThread thread : multiGetThreads) { 1274 thread.start(); 1275 } 1276 return multiGetThreads; 1277 } 1278 1279 private void checkForBlockEviction(BlockCache cache, boolean getClosed, boolean expectOnlyZero) 1280 throws InterruptedException { 1281 int counter = NO_OF_THREADS; 1282 if (CustomInnerRegionObserver.waitForGets.get()) { 1283 // Because only one row is selected, it has only 2 blocks 1284 counter = counter - 1; 1285 while (CustomInnerRegionObserver.countOfGets.get() < NO_OF_THREADS) { 1286 Thread.sleep(100); 1287 } 1288 } else { 1289 while (CustomInnerRegionObserver.countOfNext.get() < NO_OF_THREADS) { 1290 Thread.sleep(100); 1291 } 1292 } 1293 Iterator<CachedBlock> iterator = cache.iterator(); 1294 int refCount = 0; 1295 while (iterator.hasNext()) { 1296 CachedBlock next = iterator.next(); 1297 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 1298 if (cache instanceof BucketCache) { 1299 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 1300 } else if (cache instanceof CombinedBlockCache) { 1301 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 1302 } else { 1303 continue; 1304 } 1305 System.out.println(" the refcount is " + refCount + " block is " + cacheKey); 1306 if (CustomInnerRegionObserver.waitForGets.get()) { 1307 if (expectOnlyZero) { 1308 assertTrue(refCount == 0); 1309 } 1310 if (refCount != 0) { 1311 // Because the scan would have also touched up on these blocks but 1312 // it 1313 // would have touched 1314 // all 3 1315 if (getClosed) { 1316 // If get has closed only the scan's blocks would be available 1317 assertEquals(refCount, CustomInnerRegionObserver.countOfGets.get()); 1318 } else { 1319 assertEquals(refCount, CustomInnerRegionObserver.countOfGets.get() + (NO_OF_THREADS)); 1320 } 1321 } 1322 } else { 1323 // Because the get would have also touched up on these blocks but it 1324 // would have touched 1325 // upon only 2 additionally 1326 if (expectOnlyZero) { 1327 assertTrue(refCount == 0); 1328 } 1329 if (refCount != 0) { 1330 if (getLatch == null) { 1331 assertEquals(refCount, CustomInnerRegionObserver.countOfNext.get()); 1332 } else { 1333 assertEquals(refCount, CustomInnerRegionObserver.countOfNext.get() + (NO_OF_THREADS)); 1334 } 1335 } 1336 } 1337 } 1338 CustomInnerRegionObserver.getCdl().get().countDown(); 1339 } 1340 1341 private static class MultiGetThread extends Thread { 1342 private final Table table; 1343 private final List<Get> gets = new ArrayList<>(); 1344 1345 public MultiGetThread(Table table) { 1346 this.table = table; 1347 } 1348 1349 @Override 1350 public void run() { 1351 gets.add(new Get(ROW)); 1352 gets.add(new Get(ROW1)); 1353 try { 1354 CustomInnerRegionObserver.getCdl().set(latch); 1355 Result[] r = table.get(gets); 1356 assertTrue(Bytes.equals(r[0].getRow(), ROW)); 1357 assertTrue(Bytes.equals(r[1].getRow(), ROW1)); 1358 } catch (IOException e) { 1359 } 1360 } 1361 } 1362 1363 private static class GetThread extends Thread { 1364 private final Table table; 1365 private final boolean tracker; 1366 private final boolean multipleCFs; 1367 1368 public GetThread(Table table, boolean tracker, boolean multipleCFs) { 1369 this.table = table; 1370 this.tracker = tracker; 1371 this.multipleCFs = multipleCFs; 1372 } 1373 1374 @Override 1375 public void run() { 1376 try { 1377 initiateGet(table); 1378 } catch (IOException e) { 1379 // do nothing 1380 } 1381 } 1382 1383 private void initiateGet(Table table) throws IOException { 1384 Get get = new Get(ROW); 1385 if (tracker) { 1386 // Change this 1387 if (!multipleCFs) { 1388 get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 3)); 1389 get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 8)); 1390 get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 9)); 1391 // Unknown key 1392 get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 900)); 1393 } else { 1394 get.addColumn(Bytes.toBytes("testFamily" + 3), Bytes.toBytes("testQualifier" + 3)); 1395 get.addColumn(Bytes.toBytes("testFamily" + 8), Bytes.toBytes("testQualifier" + 8)); 1396 get.addColumn(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 9)); 1397 // Unknown key 1398 get.addColumn(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 900)); 1399 } 1400 } 1401 CustomInnerRegionObserver.getCdl().set(latch); 1402 Result r = table.get(get); 1403 System.out.println(r); 1404 if (!tracker) { 1405 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data)); 1406 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2)); 1407 } else { 1408 if (!multipleCFs) { 1409 assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 3)), data2)); 1410 assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 8)), data2)); 1411 assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 9)), data2)); 1412 } else { 1413 assertTrue(Bytes.equals( 1414 r.getValue(Bytes.toBytes("testFamily" + 3), Bytes.toBytes("testQualifier" + 3)), 1415 data2)); 1416 assertTrue(Bytes.equals( 1417 r.getValue(Bytes.toBytes("testFamily" + 8), Bytes.toBytes("testQualifier" + 8)), 1418 data2)); 1419 assertTrue(Bytes.equals( 1420 r.getValue(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 9)), 1421 data2)); 1422 } 1423 } 1424 } 1425 } 1426 1427 private static class ScanThread extends Thread { 1428 private final Table table; 1429 private final boolean reverse; 1430 1431 public ScanThread(Table table, boolean reverse) { 1432 this.table = table; 1433 this.reverse = reverse; 1434 } 1435 1436 @Override 1437 public void run() { 1438 try { 1439 initiateScan(table); 1440 } catch (IOException e) { 1441 // do nothing 1442 } 1443 } 1444 1445 private void initiateScan(Table table) throws IOException { 1446 Scan scan = new Scan(); 1447 if (reverse) { 1448 scan.setReversed(true); 1449 } 1450 CustomInnerRegionObserver.getCdl().set(latch); 1451 ResultScanner resScanner = table.getScanner(scan); 1452 int i = (reverse ? ROWS.length - 1 : 0); 1453 boolean resultFound = false; 1454 for (Result result : resScanner) { 1455 resultFound = true; 1456 System.out.println(result); 1457 if (!reverse) { 1458 assertTrue(Bytes.equals(result.getRow(), ROWS[i])); 1459 i++; 1460 } else { 1461 assertTrue(Bytes.equals(result.getRow(), ROWS[i])); 1462 i--; 1463 } 1464 } 1465 assertTrue(resultFound); 1466 } 1467 } 1468 1469 private void waitForStoreFileCount(HStore store, int count, int timeout) 1470 throws InterruptedException { 1471 long start = EnvironmentEdgeManager.currentTime(); 1472 while ( 1473 start + timeout > EnvironmentEdgeManager.currentTime() && store.getStorefilesCount() != count 1474 ) { 1475 Thread.sleep(100); 1476 } 1477 System.out.println("start=" + start + ", now=" + EnvironmentEdgeManager.currentTime() + ", cur=" 1478 + store.getStorefilesCount()); 1479 assertEquals(count, store.getStorefilesCount()); 1480 } 1481 1482 private static class CustomScanner implements RegionScanner { 1483 1484 private RegionScanner delegate; 1485 1486 public CustomScanner(RegionScanner delegate) { 1487 this.delegate = delegate; 1488 } 1489 1490 @Override 1491 public boolean next(List<Cell> results) throws IOException { 1492 return delegate.next(results); 1493 } 1494 1495 @Override 1496 public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException { 1497 return delegate.next(result, scannerContext); 1498 } 1499 1500 @Override 1501 public boolean nextRaw(List<Cell> result) throws IOException { 1502 return delegate.nextRaw(result); 1503 } 1504 1505 @Override 1506 public boolean nextRaw(List<Cell> result, ScannerContext context) throws IOException { 1507 boolean nextRaw = delegate.nextRaw(result, context); 1508 if (compactionLatch != null && compactionLatch.getCount() > 0) { 1509 try { 1510 compactionLatch.await(); 1511 } catch (InterruptedException ie) { 1512 } 1513 } 1514 1515 if (CustomInnerRegionObserver.throwException.get()) { 1516 if (exceptionLatch.getCount() > 0) { 1517 try { 1518 exceptionLatch.await(); 1519 } catch (InterruptedException e) { 1520 } 1521 throw new IOException("throw exception"); 1522 } 1523 } 1524 return nextRaw; 1525 } 1526 1527 @Override 1528 public void close() throws IOException { 1529 delegate.close(); 1530 } 1531 1532 @Override 1533 public RegionInfo getRegionInfo() { 1534 return delegate.getRegionInfo(); 1535 } 1536 1537 @Override 1538 public boolean isFilterDone() throws IOException { 1539 return delegate.isFilterDone(); 1540 } 1541 1542 @Override 1543 public boolean reseek(byte[] row) throws IOException { 1544 return false; 1545 } 1546 1547 @Override 1548 public long getMaxResultSize() { 1549 return delegate.getMaxResultSize(); 1550 } 1551 1552 @Override 1553 public long getMvccReadPoint() { 1554 return delegate.getMvccReadPoint(); 1555 } 1556 1557 @Override 1558 public int getBatch() { 1559 return delegate.getBatch(); 1560 } 1561 } 1562 1563 public static class CustomInnerRegionObserverWrapper extends CustomInnerRegionObserver { 1564 @Override 1565 public RegionScanner postScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e, Scan scan, 1566 RegionScanner s) throws IOException { 1567 return new CustomScanner(s); 1568 } 1569 } 1570 1571 public static class CustomInnerRegionObserver implements RegionCoprocessor, RegionObserver { 1572 static final AtomicInteger countOfNext = new AtomicInteger(0); 1573 static final AtomicInteger countOfGets = new AtomicInteger(0); 1574 static final AtomicBoolean waitForGets = new AtomicBoolean(false); 1575 static final AtomicBoolean throwException = new AtomicBoolean(false); 1576 private static final AtomicReference<CountDownLatch> cdl = 1577 new AtomicReference<>(new CountDownLatch(0)); 1578 1579 @Override 1580 public Optional<RegionObserver> getRegionObserver() { 1581 return Optional.of(this); 1582 } 1583 1584 @Override 1585 public boolean postScannerNext(ObserverContext<RegionCoprocessorEnvironment> e, 1586 InternalScanner s, List<Result> results, int limit, boolean hasMore) throws IOException { 1587 slowdownCode(e, false); 1588 if (getLatch != null && getLatch.getCount() > 0) { 1589 try { 1590 getLatch.await(); 1591 } catch (InterruptedException e1) { 1592 } 1593 } 1594 return hasMore; 1595 } 1596 1597 @Override 1598 public void postGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get, 1599 List<Cell> results) throws IOException { 1600 slowdownCode(e, true); 1601 } 1602 1603 public static AtomicReference<CountDownLatch> getCdl() { 1604 return cdl; 1605 } 1606 1607 private void slowdownCode(final ObserverContext<RegionCoprocessorEnvironment> e, 1608 boolean isGet) { 1609 CountDownLatch latch = getCdl().get(); 1610 try { 1611 System.out.println(latch.getCount() + " is the count " + isGet); 1612 if (latch.getCount() > 0) { 1613 if (isGet) { 1614 countOfGets.incrementAndGet(); 1615 } else { 1616 countOfNext.incrementAndGet(); 1617 } 1618 LOG.info("Waiting for the counterCountDownLatch"); 1619 latch.await(2, TimeUnit.MINUTES); // To help the tests to finish. 1620 if (latch.getCount() > 0) { 1621 throw new RuntimeException("Can't wait more"); 1622 } 1623 } 1624 } catch (InterruptedException e1) { 1625 LOG.error(e1.toString(), e1); 1626 } 1627 } 1628 } 1629}