001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.Iterator; 027import java.util.List; 028import java.util.Optional; 029import java.util.Set; 030import java.util.concurrent.CountDownLatch; 031import java.util.concurrent.TimeUnit; 032import java.util.concurrent.atomic.AtomicBoolean; 033import java.util.concurrent.atomic.AtomicInteger; 034import java.util.concurrent.atomic.AtomicReference; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.hbase.Cell; 038import org.apache.hadoop.hbase.ExtendedCell; 039import org.apache.hadoop.hbase.HBaseClassTestRule; 040import org.apache.hadoop.hbase.HBaseTestingUtil; 041import org.apache.hadoop.hbase.HConstants; 042import org.apache.hadoop.hbase.ServerName; 043import org.apache.hadoop.hbase.TableName; 044import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 045import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; 046import org.apache.hadoop.hbase.coprocessor.ObserverContext; 047import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 048import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 049import org.apache.hadoop.hbase.coprocessor.RegionObserver; 050import org.apache.hadoop.hbase.io.hfile.BlockCache; 051import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 052import org.apache.hadoop.hbase.io.hfile.CacheConfig; 053import org.apache.hadoop.hbase.io.hfile.CachedBlock; 054import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache; 055import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; 056import org.apache.hadoop.hbase.regionserver.BloomType; 057import org.apache.hadoop.hbase.regionserver.HRegion; 058import org.apache.hadoop.hbase.regionserver.HStore; 059import org.apache.hadoop.hbase.regionserver.InternalScanner; 060import org.apache.hadoop.hbase.regionserver.RegionScanner; 061import org.apache.hadoop.hbase.regionserver.ScannerContext; 062import org.apache.hadoop.hbase.testclassification.ClientTests; 063import org.apache.hadoop.hbase.testclassification.LargeTests; 064import org.apache.hadoop.hbase.util.Bytes; 065import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 066import org.junit.After; 067import org.junit.AfterClass; 068import org.junit.Before; 069import org.junit.BeforeClass; 070import org.junit.ClassRule; 071import org.junit.Rule; 072import org.junit.Test; 073import org.junit.experimental.categories.Category; 074import org.junit.rules.TestName; 075import org.slf4j.Logger; 076import org.slf4j.LoggerFactory; 077 078import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 079 080@Category({ LargeTests.class, ClientTests.class }) 081@SuppressWarnings("deprecation") 082public class TestBlockEvictionFromClient { 083 084 @ClassRule 085 public static final HBaseClassTestRule CLASS_RULE = 086 HBaseClassTestRule.forClass(TestBlockEvictionFromClient.class); 087 088 private static final Logger LOG = LoggerFactory.getLogger(TestBlockEvictionFromClient.class); 089 protected final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 090 static byte[][] ROWS = new byte[2][]; 091 private static int NO_OF_THREADS = 3; 092 private static byte[] ROW = Bytes.toBytes("testRow"); 093 private static byte[] ROW1 = Bytes.toBytes("testRow1"); 094 private static byte[] ROW2 = Bytes.toBytes("testRow2"); 095 private static byte[] ROW3 = Bytes.toBytes("testRow3"); 096 private static byte[] FAMILY = Bytes.toBytes("testFamily"); 097 private static byte[][] FAMILIES_1 = new byte[1][0]; 098 private static byte[] QUALIFIER = Bytes.toBytes("testQualifier"); 099 private static byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 100 private static byte[] data = new byte[1000]; 101 private static byte[] data2 = Bytes.add(data, data); 102 protected static int SLAVES = 1; 103 private static CountDownLatch latch; 104 private static CountDownLatch getLatch; 105 private static CountDownLatch compactionLatch; 106 private static CountDownLatch exceptionLatch; 107 108 @Rule 109 public TestName name = new TestName(); 110 111 /** 112 * @throws java.lang.Exception 113 */ 114 @BeforeClass 115 public static void setUpBeforeClass() throws Exception { 116 ROWS[0] = ROW; 117 ROWS[1] = ROW1; 118 Configuration conf = TEST_UTIL.getConfiguration(); 119 conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, 120 MultiRowMutationEndpoint.class.getName()); 121 conf.setInt("hbase.regionserver.handler.count", 20); 122 conf.setInt("hbase.bucketcache.size", 400); 123 conf.setStrings(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap"); 124 conf.setFloat("hfile.block.cache.size", 0.2f); 125 conf.setFloat("hbase.regionserver.global.memstore.size", 0.1f); 126 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);// do not retry 127 conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 5000); 128 FAMILIES_1[0] = FAMILY; 129 TEST_UTIL.startMiniCluster(SLAVES); 130 } 131 132 /** 133 * @throws java.lang.Exception 134 */ 135 @AfterClass 136 public static void tearDownAfterClass() throws Exception { 137 TEST_UTIL.shutdownMiniCluster(); 138 } 139 140 /** 141 * @throws java.lang.Exception 142 */ 143 @Before 144 public void setUp() throws Exception { 145 CustomInnerRegionObserver.waitForGets.set(false); 146 CustomInnerRegionObserver.countOfNext.set(0); 147 CustomInnerRegionObserver.countOfGets.set(0); 148 } 149 150 /** 151 * @throws java.lang.Exception 152 */ 153 @After 154 public void tearDown() throws Exception { 155 if (latch != null) { 156 while (latch.getCount() > 0) { 157 latch.countDown(); 158 } 159 } 160 if (getLatch != null) { 161 getLatch.countDown(); 162 } 163 if (compactionLatch != null) { 164 compactionLatch.countDown(); 165 } 166 if (exceptionLatch != null) { 167 exceptionLatch.countDown(); 168 } 169 latch = null; 170 getLatch = null; 171 compactionLatch = null; 172 exceptionLatch = null; 173 CustomInnerRegionObserver.throwException.set(false); 174 // Clean up the tables for every test case 175 TableName[] listTableNames = TEST_UTIL.getAdmin().listTableNames(); 176 for (TableName tableName : listTableNames) { 177 if (!tableName.isSystemTable()) { 178 TEST_UTIL.getAdmin().disableTable(tableName); 179 TEST_UTIL.getAdmin().deleteTable(tableName); 180 } 181 } 182 } 183 184 @Test 185 public void testBlockEvictionWithParallelScans() throws Exception { 186 Table table = null; 187 try { 188 latch = new CountDownLatch(1); 189 final TableName tableName = TableName.valueOf(name.getMethodName()); 190 // Create a table with block size as 1024 191 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 192 CustomInnerRegionObserver.class.getName()); 193 // get the block cache and region 194 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 195 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 196 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 197 HStore store = region.getStores().iterator().next(); 198 CacheConfig cacheConf = store.getCacheConfig(); 199 cacheConf.setCacheDataOnWrite(true); 200 cacheConf.setEvictOnClose(true); 201 BlockCache cache = cacheConf.getBlockCache().get(); 202 203 // insert data. 2 Rows are added 204 Put put = new Put(ROW); 205 put.addColumn(FAMILY, QUALIFIER, data); 206 table.put(put); 207 put = new Put(ROW1); 208 put.addColumn(FAMILY, QUALIFIER, data); 209 table.put(put); 210 assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data)); 211 // data was in memstore so don't expect any changes 212 // flush the data 213 // Should create one Hfile with 2 blocks 214 region.flush(true); 215 // Load cache 216 // Create three sets of scan 217 ScanThread[] scanThreads = initiateScan(table, false); 218 Thread.sleep(100); 219 checkForBlockEviction(cache, false, false); 220 for (ScanThread thread : scanThreads) { 221 thread.join(); 222 } 223 // CustomInnerRegionObserver.sleepTime.set(0); 224 Iterator<CachedBlock> iterator = cache.iterator(); 225 iterateBlockCache(cache, iterator); 226 // read the data and expect same blocks, one new hit, no misses 227 assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data)); 228 iterator = cache.iterator(); 229 iterateBlockCache(cache, iterator); 230 // Check how this miss is happening 231 // insert a second column, read the row, no new blocks, 3 new hits 232 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 233 byte[] data2 = Bytes.add(data, data); 234 put = new Put(ROW); 235 put.addColumn(FAMILY, QUALIFIER2, data2); 236 table.put(put); 237 Result r = table.get(new Get(ROW)); 238 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data)); 239 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2)); 240 iterator = cache.iterator(); 241 iterateBlockCache(cache, iterator); 242 // flush, one new block 243 System.out.println("Flushing cache"); 244 region.flush(true); 245 iterator = cache.iterator(); 246 iterateBlockCache(cache, iterator); 247 // compact, net minus two blocks, two hits, no misses 248 System.out.println("Compacting"); 249 assertEquals(2, store.getStorefilesCount()); 250 store.triggerMajorCompaction(); 251 region.compact(true); 252 waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max 253 assertEquals(1, store.getStorefilesCount()); 254 iterator = cache.iterator(); 255 iterateBlockCache(cache, iterator); 256 // read the row, this should be a cache miss because we don't cache data 257 // blocks on compaction 258 r = table.get(new Get(ROW)); 259 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data)); 260 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2)); 261 iterator = cache.iterator(); 262 iterateBlockCache(cache, iterator); 263 } finally { 264 if (table != null) { 265 table.close(); 266 } 267 } 268 } 269 270 @Test 271 public void testParallelGetsAndScans() throws IOException, InterruptedException { 272 Table table = null; 273 try { 274 latch = new CountDownLatch(2); 275 // Check if get() returns blocks on its close() itself 276 getLatch = new CountDownLatch(1); 277 final TableName tableName = TableName.valueOf(name.getMethodName()); 278 // Create KV that will give you two blocks 279 // Create a table with block size as 1024 280 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 281 CustomInnerRegionObserver.class.getName()); 282 // get the block cache and region 283 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 284 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 285 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 286 HStore store = region.getStores().iterator().next(); 287 CacheConfig cacheConf = store.getCacheConfig(); 288 cacheConf.setCacheDataOnWrite(true); 289 cacheConf.setEvictOnClose(true); 290 BlockCache cache = cacheConf.getBlockCache().get(); 291 292 insertData(table); 293 // flush the data 294 System.out.println("Flushing cache"); 295 // Should create one Hfile with 2 blocks 296 region.flush(true); 297 // Create three sets of scan 298 CustomInnerRegionObserver.waitForGets.set(true); 299 ScanThread[] scanThreads = initiateScan(table, false); 300 // Create three sets of gets 301 GetThread[] getThreads = initiateGet(table, false, false); 302 checkForBlockEviction(cache, false, false); 303 CustomInnerRegionObserver.waitForGets.set(false); 304 checkForBlockEviction(cache, false, false); 305 for (GetThread thread : getThreads) { 306 thread.join(); 307 } 308 // Verify whether the gets have returned the blocks that it had 309 CustomInnerRegionObserver.waitForGets.set(true); 310 // giving some time for the block to be decremented 311 checkForBlockEviction(cache, true, false); 312 getLatch.countDown(); 313 for (ScanThread thread : scanThreads) { 314 thread.join(); 315 } 316 System.out.println("Scans should have returned the bloks"); 317 // Check with either true or false 318 CustomInnerRegionObserver.waitForGets.set(false); 319 // The scan should also have released the blocks by now 320 checkForBlockEviction(cache, true, true); 321 } finally { 322 if (table != null) { 323 table.close(); 324 } 325 } 326 } 327 328 @Test 329 public void testGetWithCellsInDifferentFiles() throws IOException, InterruptedException { 330 Table table = null; 331 try { 332 latch = new CountDownLatch(1); 333 // Check if get() returns blocks on its close() itself 334 getLatch = new CountDownLatch(1); 335 final TableName tableName = TableName.valueOf(name.getMethodName()); 336 // Create KV that will give you two blocks 337 // Create a table with block size as 1024 338 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 339 CustomInnerRegionObserver.class.getName()); 340 // get the block cache and region 341 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 342 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 343 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 344 HStore store = region.getStores().iterator().next(); 345 CacheConfig cacheConf = store.getCacheConfig(); 346 cacheConf.setCacheDataOnWrite(true); 347 cacheConf.setEvictOnClose(true); 348 BlockCache cache = cacheConf.getBlockCache().get(); 349 350 Put put = new Put(ROW); 351 put.addColumn(FAMILY, QUALIFIER, data); 352 table.put(put); 353 region.flush(true); 354 put = new Put(ROW1); 355 put.addColumn(FAMILY, QUALIFIER, data); 356 table.put(put); 357 region.flush(true); 358 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 359 put = new Put(ROW); 360 put.addColumn(FAMILY, QUALIFIER2, data2); 361 table.put(put); 362 region.flush(true); 363 // flush the data 364 System.out.println("Flushing cache"); 365 // Should create one Hfile with 2 blocks 366 CustomInnerRegionObserver.waitForGets.set(true); 367 // Create three sets of gets 368 GetThread[] getThreads = initiateGet(table, false, false); 369 Thread.sleep(200); 370 CustomInnerRegionObserver.getCdl().get().countDown(); 371 for (GetThread thread : getThreads) { 372 thread.join(); 373 } 374 // Verify whether the gets have returned the blocks that it had 375 CustomInnerRegionObserver.waitForGets.set(true); 376 // giving some time for the block to be decremented 377 checkForBlockEviction(cache, true, false); 378 getLatch.countDown(); 379 System.out.println("Gets should have returned the bloks"); 380 } finally { 381 if (table != null) { 382 table.close(); 383 } 384 } 385 } 386 387 @Test 388 // TODO : check how block index works here 389 public void testGetsWithMultiColumnsAndExplicitTracker() 390 throws IOException, InterruptedException { 391 Table table = null; 392 try { 393 latch = new CountDownLatch(1); 394 // Check if get() returns blocks on its close() itself 395 getLatch = new CountDownLatch(1); 396 final TableName tableName = TableName.valueOf(name.getMethodName()); 397 // Create KV that will give you two blocks 398 // Create a table with block size as 1024 399 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 400 CustomInnerRegionObserver.class.getName()); 401 // get the block cache and region 402 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 403 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 404 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 405 BlockCache cache = setCacheProperties(region); 406 Put put = new Put(ROW); 407 put.addColumn(FAMILY, QUALIFIER, data); 408 table.put(put); 409 region.flush(true); 410 put = new Put(ROW1); 411 put.addColumn(FAMILY, QUALIFIER, data); 412 table.put(put); 413 region.flush(true); 414 for (int i = 1; i < 10; i++) { 415 put = new Put(ROW); 416 put.addColumn(FAMILY, Bytes.toBytes("testQualifier" + i), data2); 417 table.put(put); 418 if (i % 2 == 0) { 419 region.flush(true); 420 } 421 } 422 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 423 put = new Put(ROW); 424 put.addColumn(FAMILY, QUALIFIER2, data2); 425 table.put(put); 426 region.flush(true); 427 // flush the data 428 System.out.println("Flushing cache"); 429 // Should create one Hfile with 2 blocks 430 CustomInnerRegionObserver.waitForGets.set(true); 431 // Create three sets of gets 432 GetThread[] getThreads = initiateGet(table, true, false); 433 Thread.sleep(200); 434 Iterator<CachedBlock> iterator = cache.iterator(); 435 boolean usedBlocksFound = false; 436 int refCount = 0; 437 int noOfBlocksWithRef = 0; 438 while (iterator.hasNext()) { 439 CachedBlock next = iterator.next(); 440 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 441 if (cache instanceof BucketCache) { 442 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 443 } else if (cache instanceof CombinedBlockCache) { 444 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 445 } else { 446 continue; 447 } 448 if (refCount != 0) { 449 // Blocks will be with count 3 450 System.out.println("The refCount is " + refCount); 451 assertEquals(NO_OF_THREADS, refCount); 452 usedBlocksFound = true; 453 noOfBlocksWithRef++; 454 } 455 } 456 assertTrue(usedBlocksFound); 457 // the number of blocks referred 458 assertEquals(10, noOfBlocksWithRef); 459 CustomInnerRegionObserver.getCdl().get().countDown(); 460 for (GetThread thread : getThreads) { 461 thread.join(); 462 } 463 // Verify whether the gets have returned the blocks that it had 464 CustomInnerRegionObserver.waitForGets.set(true); 465 // giving some time for the block to be decremented 466 checkForBlockEviction(cache, true, false); 467 getLatch.countDown(); 468 System.out.println("Gets should have returned the bloks"); 469 } finally { 470 if (table != null) { 471 table.close(); 472 } 473 } 474 } 475 476 @Test 477 public void testGetWithMultipleColumnFamilies() throws IOException, InterruptedException { 478 Table table = null; 479 try { 480 latch = new CountDownLatch(1); 481 // Check if get() returns blocks on its close() itself 482 getLatch = new CountDownLatch(1); 483 final TableName tableName = TableName.valueOf(name.getMethodName()); 484 // Create KV that will give you two blocks 485 // Create a table with block size as 1024 486 byte[][] fams = new byte[10][]; 487 fams[0] = FAMILY; 488 for (int i = 1; i < 10; i++) { 489 fams[i] = (Bytes.toBytes("testFamily" + i)); 490 } 491 table = 492 TEST_UTIL.createTable(tableName, fams, 1, 1024, CustomInnerRegionObserver.class.getName()); 493 // get the block cache and region 494 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 495 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 496 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 497 BlockCache cache = setCacheProperties(region); 498 499 Put put = new Put(ROW); 500 put.addColumn(FAMILY, QUALIFIER, data); 501 table.put(put); 502 region.flush(true); 503 put = new Put(ROW1); 504 put.addColumn(FAMILY, QUALIFIER, data); 505 table.put(put); 506 region.flush(true); 507 for (int i = 1; i < 10; i++) { 508 put = new Put(ROW); 509 put.addColumn(Bytes.toBytes("testFamily" + i), Bytes.toBytes("testQualifier" + i), data2); 510 table.put(put); 511 if (i % 2 == 0) { 512 region.flush(true); 513 } 514 } 515 region.flush(true); 516 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 517 put = new Put(ROW); 518 put.addColumn(FAMILY, QUALIFIER2, data2); 519 table.put(put); 520 region.flush(true); 521 // flush the data 522 System.out.println("Flushing cache"); 523 // Should create one Hfile with 2 blocks 524 CustomInnerRegionObserver.waitForGets.set(true); 525 // Create three sets of gets 526 GetThread[] getThreads = initiateGet(table, true, true); 527 Thread.sleep(200); 528 Iterator<CachedBlock> iterator = cache.iterator(); 529 boolean usedBlocksFound = false; 530 int refCount = 0; 531 int noOfBlocksWithRef = 0; 532 while (iterator.hasNext()) { 533 CachedBlock next = iterator.next(); 534 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 535 if (cache instanceof BucketCache) { 536 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 537 } else if (cache instanceof CombinedBlockCache) { 538 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 539 } else { 540 continue; 541 } 542 if (refCount != 0) { 543 // Blocks will be with count 3 544 System.out.println("The refCount is " + refCount); 545 assertEquals(NO_OF_THREADS, refCount); 546 usedBlocksFound = true; 547 noOfBlocksWithRef++; 548 } 549 } 550 assertTrue(usedBlocksFound); 551 // the number of blocks referred 552 assertEquals(3, noOfBlocksWithRef); 553 CustomInnerRegionObserver.getCdl().get().countDown(); 554 for (GetThread thread : getThreads) { 555 thread.join(); 556 } 557 // Verify whether the gets have returned the blocks that it had 558 CustomInnerRegionObserver.waitForGets.set(true); 559 // giving some time for the block to be decremented 560 checkForBlockEviction(cache, true, false); 561 getLatch.countDown(); 562 System.out.println("Gets should have returned the bloks"); 563 } finally { 564 if (table != null) { 565 table.close(); 566 } 567 } 568 } 569 570 @Test 571 public void testBlockRefCountAfterSplits() throws IOException, InterruptedException { 572 Table table = null; 573 try { 574 final TableName tableName = TableName.valueOf(name.getMethodName()); 575 TableDescriptor desc = TEST_UTIL.createTableDescriptor(tableName); 576 // This test expects rpc refcount of cached data blocks to be 0 after split. After split, 577 // two daughter regions are opened and a compaction is scheduled to get rid of reference 578 // of the parent region hfiles. Compaction will increase refcount of cached data blocks by 1. 579 // It is flakey since compaction can kick in anytime. To solve this issue, table is created 580 // with compaction disabled. 581 table = TEST_UTIL.createTable( 582 TableDescriptorBuilder.newBuilder(desc).setCompactionEnabled(false).build(), FAMILIES_1, 583 null, BloomType.ROW, 1024, null); 584 // get the block cache and region 585 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 586 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 587 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 588 HStore store = region.getStores().iterator().next(); 589 CacheConfig cacheConf = store.getCacheConfig(); 590 cacheConf.setEvictOnClose(true); 591 BlockCache cache = cacheConf.getBlockCache().get(); 592 593 Put put = new Put(ROW); 594 put.addColumn(FAMILY, QUALIFIER, data); 595 table.put(put); 596 region.flush(true); 597 put = new Put(ROW1); 598 put.addColumn(FAMILY, QUALIFIER, data); 599 table.put(put); 600 region.flush(true); 601 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 602 put = new Put(ROW2); 603 put.addColumn(FAMILY, QUALIFIER2, data2); 604 table.put(put); 605 put = new Put(ROW3); 606 put.addColumn(FAMILY, QUALIFIER2, data2); 607 table.put(put); 608 region.flush(true); 609 ServerName rs = Iterables.getOnlyElement(TEST_UTIL.getAdmin().getRegionServers()); 610 int regionCount = TEST_UTIL.getAdmin().getRegions(rs).size(); 611 LOG.info("About to SPLIT on {} {}, count={}", Bytes.toString(ROW1), region.getRegionInfo(), 612 regionCount); 613 TEST_UTIL.getAdmin().split(tableName, ROW1); 614 // Wait for splits 615 TEST_UTIL.waitFor(60000, () -> TEST_UTIL.getAdmin().getRegions(rs).size() > regionCount); 616 region.compact(true); 617 List<HRegion> regions = TEST_UTIL.getMiniHBaseCluster().getRegionServer(rs).getRegions(); 618 for (HRegion r : regions) { 619 LOG.info("" + r.getCompactionState()); 620 TEST_UTIL.waitFor(30000, () -> r.getCompactionState().equals(CompactionState.NONE)); 621 } 622 LOG.info("Split finished, is region closed {} {}", region.isClosed(), cache); 623 Iterator<CachedBlock> iterator = cache.iterator(); 624 // Though the split had created the HalfStorefileReader - the firstkey and lastkey scanners 625 // should be closed inorder to return those blocks 626 iterateBlockCache(cache, iterator); 627 } finally { 628 if (table != null) { 629 table.close(); 630 } 631 } 632 } 633 634 @Test 635 public void testMultiGets() throws IOException, InterruptedException { 636 Table table = null; 637 try { 638 latch = new CountDownLatch(2); 639 // Check if get() returns blocks on its close() itself 640 getLatch = new CountDownLatch(1); 641 final TableName tableName = TableName.valueOf(name.getMethodName()); 642 // Create KV that will give you two blocks 643 // Create a table with block size as 1024 644 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 645 CustomInnerRegionObserver.class.getName()); 646 // get the block cache and region 647 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 648 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 649 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 650 HStore store = region.getStores().iterator().next(); 651 CacheConfig cacheConf = store.getCacheConfig(); 652 cacheConf.setCacheDataOnWrite(true); 653 cacheConf.setEvictOnClose(true); 654 BlockCache cache = cacheConf.getBlockCache().get(); 655 656 Put put = new Put(ROW); 657 put.addColumn(FAMILY, QUALIFIER, data); 658 table.put(put); 659 region.flush(true); 660 put = new Put(ROW1); 661 put.addColumn(FAMILY, QUALIFIER, data); 662 table.put(put); 663 region.flush(true); 664 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 665 put = new Put(ROW); 666 put.addColumn(FAMILY, QUALIFIER2, data2); 667 table.put(put); 668 region.flush(true); 669 // flush the data 670 System.out.println("Flushing cache"); 671 // Should create one Hfile with 2 blocks 672 CustomInnerRegionObserver.waitForGets.set(true); 673 // Create three sets of gets 674 MultiGetThread[] getThreads = initiateMultiGet(table); 675 Thread.sleep(200); 676 int refCount; 677 Iterator<CachedBlock> iterator = cache.iterator(); 678 boolean foundNonZeroBlock = false; 679 while (iterator.hasNext()) { 680 CachedBlock next = iterator.next(); 681 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 682 if (cache instanceof BucketCache) { 683 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 684 } else if (cache instanceof CombinedBlockCache) { 685 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 686 } else { 687 continue; 688 } 689 if (refCount != 0) { 690 assertEquals(NO_OF_THREADS, refCount); 691 foundNonZeroBlock = true; 692 } 693 } 694 assertTrue("Should have found nonzero ref count block", foundNonZeroBlock); 695 CustomInnerRegionObserver.getCdl().get().countDown(); 696 CustomInnerRegionObserver.getCdl().get().countDown(); 697 for (MultiGetThread thread : getThreads) { 698 thread.join(); 699 } 700 // Verify whether the gets have returned the blocks that it had 701 CustomInnerRegionObserver.waitForGets.set(true); 702 // giving some time for the block to be decremented 703 iterateBlockCache(cache, iterator); 704 getLatch.countDown(); 705 System.out.println("Gets should have returned the bloks"); 706 } finally { 707 if (table != null) { 708 table.close(); 709 } 710 } 711 } 712 713 @Test 714 public void testScanWithMultipleColumnFamilies() throws IOException, InterruptedException { 715 Table table = null; 716 try { 717 latch = new CountDownLatch(1); 718 // Check if get() returns blocks on its close() itself 719 final TableName tableName = TableName.valueOf(name.getMethodName()); 720 // Create KV that will give you two blocks 721 // Create a table with block size as 1024 722 byte[][] fams = new byte[10][]; 723 fams[0] = FAMILY; 724 for (int i = 1; i < 10; i++) { 725 fams[i] = (Bytes.toBytes("testFamily" + i)); 726 } 727 table = 728 TEST_UTIL.createTable(tableName, fams, 1, 1024, CustomInnerRegionObserver.class.getName()); 729 // get the block cache and region 730 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 731 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 732 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 733 BlockCache cache = setCacheProperties(region); 734 735 Put put = new Put(ROW); 736 put.addColumn(FAMILY, QUALIFIER, data); 737 table.put(put); 738 region.flush(true); 739 put = new Put(ROW1); 740 put.addColumn(FAMILY, QUALIFIER, data); 741 table.put(put); 742 region.flush(true); 743 for (int i = 1; i < 10; i++) { 744 put = new Put(ROW); 745 put.addColumn(Bytes.toBytes("testFamily" + i), Bytes.toBytes("testQualifier" + i), data2); 746 table.put(put); 747 if (i % 2 == 0) { 748 region.flush(true); 749 } 750 } 751 region.flush(true); 752 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 753 put = new Put(ROW); 754 put.addColumn(FAMILY, QUALIFIER2, data2); 755 table.put(put); 756 region.flush(true); 757 // flush the data 758 System.out.println("Flushing cache"); 759 // Should create one Hfile with 2 blocks 760 // Create three sets of gets 761 ScanThread[] scanThreads = initiateScan(table, true); 762 Thread.sleep(200); 763 Iterator<CachedBlock> iterator = cache.iterator(); 764 boolean usedBlocksFound = false; 765 int refCount = 0; 766 int noOfBlocksWithRef = 0; 767 while (iterator.hasNext()) { 768 CachedBlock next = iterator.next(); 769 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 770 if (cache instanceof BucketCache) { 771 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 772 } else if (cache instanceof CombinedBlockCache) { 773 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 774 } else { 775 continue; 776 } 777 if (refCount != 0) { 778 // Blocks will be with count 3 779 System.out.println("The refCount is " + refCount); 780 assertEquals(NO_OF_THREADS, refCount); 781 usedBlocksFound = true; 782 noOfBlocksWithRef++; 783 } 784 } 785 assertTrue(usedBlocksFound); 786 // the number of blocks referred 787 assertEquals(12, noOfBlocksWithRef); 788 CustomInnerRegionObserver.getCdl().get().countDown(); 789 for (ScanThread thread : scanThreads) { 790 thread.join(); 791 } 792 // giving some time for the block to be decremented 793 checkForBlockEviction(cache, true, false); 794 } finally { 795 if (table != null) { 796 table.close(); 797 } 798 } 799 } 800 801 private BlockCache setCacheProperties(HRegion region) { 802 Iterator<HStore> strItr = region.getStores().iterator(); 803 BlockCache cache = null; 804 while (strItr.hasNext()) { 805 HStore store = strItr.next(); 806 CacheConfig cacheConf = store.getCacheConfig(); 807 cacheConf.setCacheDataOnWrite(true); 808 cacheConf.setEvictOnClose(true); 809 // Use the last one 810 cache = cacheConf.getBlockCache().get(); 811 } 812 return cache; 813 } 814 815 @Test 816 public void testParallelGetsAndScanWithWrappedRegionScanner() 817 throws IOException, InterruptedException { 818 Table table = null; 819 try { 820 latch = new CountDownLatch(2); 821 // Check if get() returns blocks on its close() itself 822 getLatch = new CountDownLatch(1); 823 final TableName tableName = TableName.valueOf(name.getMethodName()); 824 // Create KV that will give you two blocks 825 // Create a table with block size as 1024 826 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 827 CustomInnerRegionObserverWrapper.class.getName()); 828 // get the block cache and region 829 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 830 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 831 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 832 HStore store = region.getStores().iterator().next(); 833 CacheConfig cacheConf = store.getCacheConfig(); 834 cacheConf.setCacheDataOnWrite(true); 835 cacheConf.setEvictOnClose(true); 836 BlockCache cache = cacheConf.getBlockCache().get(); 837 838 // insert data. 2 Rows are added 839 insertData(table); 840 // flush the data 841 System.out.println("Flushing cache"); 842 // Should create one Hfile with 2 blocks 843 region.flush(true); 844 // CustomInnerRegionObserver.sleepTime.set(5000); 845 // Create three sets of scan 846 CustomInnerRegionObserver.waitForGets.set(true); 847 ScanThread[] scanThreads = initiateScan(table, false); 848 // Create three sets of gets 849 GetThread[] getThreads = initiateGet(table, false, false); 850 // The block would have been decremented for the scan case as it was 851 // wrapped 852 // before even the postNext hook gets executed. 853 // giving some time for the block to be decremented 854 Thread.sleep(100); 855 CustomInnerRegionObserver.waitForGets.set(false); 856 checkForBlockEviction(cache, false, false); 857 // countdown the latch 858 CustomInnerRegionObserver.getCdl().get().countDown(); 859 for (GetThread thread : getThreads) { 860 thread.join(); 861 } 862 getLatch.countDown(); 863 for (ScanThread thread : scanThreads) { 864 thread.join(); 865 } 866 } finally { 867 if (table != null) { 868 table.close(); 869 } 870 } 871 } 872 873 @Test 874 public void testScanWithCompaction() throws IOException, InterruptedException { 875 testScanWithCompactionInternals(name.getMethodName(), false); 876 } 877 878 @Test 879 public void testReverseScanWithCompaction() throws IOException, InterruptedException { 880 testScanWithCompactionInternals(name.getMethodName(), true); 881 } 882 883 private void testScanWithCompactionInternals(String tableNameStr, boolean reversed) 884 throws IOException, InterruptedException { 885 Table table = null; 886 try { 887 latch = new CountDownLatch(1); 888 compactionLatch = new CountDownLatch(1); 889 TableName tableName = TableName.valueOf(tableNameStr); 890 // Create a table with block size as 1024 891 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 892 CustomInnerRegionObserverWrapper.class.getName()); 893 // get the block cache and region 894 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 895 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 896 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 897 HStore store = region.getStores().iterator().next(); 898 CacheConfig cacheConf = store.getCacheConfig(); 899 cacheConf.setCacheDataOnWrite(true); 900 cacheConf.setEvictOnClose(true); 901 BlockCache cache = cacheConf.getBlockCache().get(); 902 903 // insert data. 2 Rows are added 904 Put put = new Put(ROW); 905 put.addColumn(FAMILY, QUALIFIER, data); 906 table.put(put); 907 put = new Put(ROW1); 908 put.addColumn(FAMILY, QUALIFIER, data); 909 table.put(put); 910 assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data)); 911 // Should create one Hfile with 2 blocks 912 region.flush(true); 913 // read the data and expect same blocks, one new hit, no misses 914 int refCount = 0; 915 // Check how this miss is happening 916 // insert a second column, read the row, no new blocks, 3 new hits 917 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 918 byte[] data2 = Bytes.add(data, data); 919 put = new Put(ROW); 920 put.addColumn(FAMILY, QUALIFIER2, data2); 921 table.put(put); 922 // flush, one new block 923 System.out.println("Flushing cache"); 924 region.flush(true); 925 Iterator<CachedBlock> iterator = cache.iterator(); 926 iterateBlockCache(cache, iterator); 927 // Create three sets of scan 928 ScanThread[] scanThreads = initiateScan(table, reversed); 929 Thread.sleep(100); 930 iterator = cache.iterator(); 931 boolean usedBlocksFound = false; 932 while (iterator.hasNext()) { 933 CachedBlock next = iterator.next(); 934 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 935 if (cache instanceof BucketCache) { 936 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 937 } else if (cache instanceof CombinedBlockCache) { 938 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 939 } else { 940 continue; 941 } 942 if (refCount != 0) { 943 // Blocks will be with count 3 944 assertEquals(NO_OF_THREADS, refCount); 945 usedBlocksFound = true; 946 } 947 } 948 assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound); 949 usedBlocksFound = false; 950 System.out.println("Compacting"); 951 assertEquals(2, store.getStorefilesCount()); 952 store.triggerMajorCompaction(); 953 region.compact(true); 954 waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max 955 assertEquals(1, store.getStorefilesCount()); 956 // Even after compaction is done we will have some blocks that cannot 957 // be evicted this is because the scan is still referencing them 958 iterator = cache.iterator(); 959 while (iterator.hasNext()) { 960 CachedBlock next = iterator.next(); 961 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 962 if (cache instanceof BucketCache) { 963 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 964 } else if (cache instanceof CombinedBlockCache) { 965 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 966 } else { 967 continue; 968 } 969 if (refCount != 0) { 970 // Blocks will be with count 3 as they are not yet cleared 971 assertEquals(NO_OF_THREADS, refCount); 972 usedBlocksFound = true; 973 } 974 } 975 assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound); 976 // Should not throw exception 977 compactionLatch.countDown(); 978 latch.countDown(); 979 for (ScanThread thread : scanThreads) { 980 thread.join(); 981 } 982 // by this time all blocks should have been evicted 983 iterator = cache.iterator(); 984 iterateBlockCache(cache, iterator); 985 Result r = table.get(new Get(ROW)); 986 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data)); 987 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2)); 988 // The gets would be working on new blocks 989 iterator = cache.iterator(); 990 iterateBlockCache(cache, iterator); 991 } finally { 992 if (table != null) { 993 table.close(); 994 } 995 } 996 } 997 998 @Test 999 public void testBlockEvictionAfterHBASE13082WithCompactionAndFlush() 1000 throws IOException, InterruptedException { 1001 // do flush and scan in parallel 1002 Table table = null; 1003 try { 1004 latch = new CountDownLatch(1); 1005 compactionLatch = new CountDownLatch(1); 1006 final TableName tableName = TableName.valueOf(name.getMethodName()); 1007 // Create a table with block size as 1024 1008 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 1009 CustomInnerRegionObserverWrapper.class.getName()); 1010 // get the block cache and region 1011 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 1012 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 1013 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 1014 HStore store = region.getStores().iterator().next(); 1015 CacheConfig cacheConf = store.getCacheConfig(); 1016 cacheConf.setCacheDataOnWrite(true); 1017 cacheConf.setEvictOnClose(true); 1018 BlockCache cache = cacheConf.getBlockCache().get(); 1019 1020 // insert data. 2 Rows are added 1021 Put put = new Put(ROW); 1022 put.addColumn(FAMILY, QUALIFIER, data); 1023 table.put(put); 1024 put = new Put(ROW1); 1025 put.addColumn(FAMILY, QUALIFIER, data); 1026 table.put(put); 1027 assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data)); 1028 // Should create one Hfile with 2 blocks 1029 region.flush(true); 1030 // read the data and expect same blocks, one new hit, no misses 1031 int refCount = 0; 1032 // Check how this miss is happening 1033 // insert a second column, read the row, no new blocks, 3 new hits 1034 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 1035 byte[] data2 = Bytes.add(data, data); 1036 put = new Put(ROW); 1037 put.addColumn(FAMILY, QUALIFIER2, data2); 1038 table.put(put); 1039 // flush, one new block 1040 System.out.println("Flushing cache"); 1041 region.flush(true); 1042 Iterator<CachedBlock> iterator = cache.iterator(); 1043 iterateBlockCache(cache, iterator); 1044 // Create three sets of scan 1045 ScanThread[] scanThreads = initiateScan(table, false); 1046 Thread.sleep(100); 1047 iterator = cache.iterator(); 1048 boolean usedBlocksFound = false; 1049 while (iterator.hasNext()) { 1050 CachedBlock next = iterator.next(); 1051 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 1052 if (cache instanceof BucketCache) { 1053 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 1054 } else if (cache instanceof CombinedBlockCache) { 1055 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 1056 } else { 1057 continue; 1058 } 1059 if (refCount != 0) { 1060 // Blocks will be with count 3 1061 assertEquals(NO_OF_THREADS, refCount); 1062 usedBlocksFound = true; 1063 } 1064 } 1065 // Make a put and do a flush 1066 QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 1067 data2 = Bytes.add(data, data); 1068 put = new Put(ROW1); 1069 put.addColumn(FAMILY, QUALIFIER2, data2); 1070 table.put(put); 1071 // flush, one new block 1072 System.out.println("Flushing cache"); 1073 region.flush(true); 1074 assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound); 1075 usedBlocksFound = false; 1076 System.out.println("Compacting"); 1077 assertEquals(3, store.getStorefilesCount()); 1078 store.triggerMajorCompaction(); 1079 region.compact(true); 1080 waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max 1081 assertEquals(1, store.getStorefilesCount()); 1082 // Even after compaction is done we will have some blocks that cannot 1083 // be evicted this is because the scan is still referencing them 1084 iterator = cache.iterator(); 1085 while (iterator.hasNext()) { 1086 CachedBlock next = iterator.next(); 1087 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 1088 if (cache instanceof BucketCache) { 1089 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 1090 } else if (cache instanceof CombinedBlockCache) { 1091 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 1092 } else { 1093 continue; 1094 } 1095 if (refCount != 0) { 1096 // Blocks will be with count 3 as they are not yet cleared 1097 assertEquals(NO_OF_THREADS, refCount); 1098 usedBlocksFound = true; 1099 } 1100 } 1101 assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound); 1102 // Should not throw exception 1103 compactionLatch.countDown(); 1104 latch.countDown(); 1105 for (ScanThread thread : scanThreads) { 1106 thread.join(); 1107 } 1108 // by this time all blocks should have been evicted 1109 iterator = cache.iterator(); 1110 // Since a flush and compaction happened after a scan started 1111 // we need to ensure that all the original blocks of the compacted file 1112 // is also removed. 1113 iterateBlockCache(cache, iterator); 1114 Result r = table.get(new Get(ROW)); 1115 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data)); 1116 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2)); 1117 // The gets would be working on new blocks 1118 iterator = cache.iterator(); 1119 iterateBlockCache(cache, iterator); 1120 } finally { 1121 if (table != null) { 1122 table.close(); 1123 } 1124 } 1125 } 1126 1127 @Test 1128 public void testScanWithException() throws IOException, InterruptedException { 1129 Table table = null; 1130 try { 1131 latch = new CountDownLatch(1); 1132 exceptionLatch = new CountDownLatch(1); 1133 final TableName tableName = TableName.valueOf(name.getMethodName()); 1134 // Create KV that will give you two blocks 1135 // Create a table with block size as 1024 1136 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 1137 CustomInnerRegionObserverWrapper.class.getName()); 1138 // get the block cache and region 1139 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 1140 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 1141 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 1142 HStore store = region.getStores().iterator().next(); 1143 CacheConfig cacheConf = store.getCacheConfig(); 1144 cacheConf.setCacheDataOnWrite(true); 1145 cacheConf.setEvictOnClose(true); 1146 BlockCache cache = cacheConf.getBlockCache().get(); 1147 // insert data. 2 Rows are added 1148 insertData(table); 1149 // flush the data 1150 System.out.println("Flushing cache"); 1151 // Should create one Hfile with 2 blocks 1152 region.flush(true); 1153 // CustomInnerRegionObserver.sleepTime.set(5000); 1154 CustomInnerRegionObserver.throwException.set(true); 1155 ScanThread[] scanThreads = initiateScan(table, false); 1156 // The block would have been decremented for the scan case as it was 1157 // wrapped 1158 // before even the postNext hook gets executed. 1159 // giving some time for the block to be decremented 1160 Thread.sleep(100); 1161 Iterator<CachedBlock> iterator = cache.iterator(); 1162 boolean usedBlocksFound = false; 1163 int refCount = 0; 1164 while (iterator.hasNext()) { 1165 CachedBlock next = iterator.next(); 1166 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 1167 if (cache instanceof BucketCache) { 1168 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 1169 } else if (cache instanceof CombinedBlockCache) { 1170 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 1171 } else { 1172 continue; 1173 } 1174 if (refCount != 0) { 1175 // Blocks will be with count 3 1176 assertEquals(NO_OF_THREADS, refCount); 1177 usedBlocksFound = true; 1178 } 1179 } 1180 assertTrue(usedBlocksFound); 1181 exceptionLatch.countDown(); 1182 // countdown the latch 1183 CustomInnerRegionObserver.getCdl().get().countDown(); 1184 for (ScanThread thread : scanThreads) { 1185 thread.join(); 1186 } 1187 iterator = cache.iterator(); 1188 usedBlocksFound = false; 1189 refCount = 0; 1190 while (iterator.hasNext()) { 1191 CachedBlock next = iterator.next(); 1192 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 1193 if (cache instanceof BucketCache) { 1194 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 1195 } else if (cache instanceof CombinedBlockCache) { 1196 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 1197 } else { 1198 continue; 1199 } 1200 if (refCount != 0) { 1201 // Blocks will be with count 3 1202 assertEquals(NO_OF_THREADS, refCount); 1203 usedBlocksFound = true; 1204 } 1205 } 1206 assertFalse(usedBlocksFound); 1207 // you should always see 0 ref count. since after HBASE-16604 we always recreate the scanner 1208 assertEquals(0, refCount); 1209 } finally { 1210 if (table != null) { 1211 table.close(); 1212 } 1213 } 1214 } 1215 1216 private void iterateBlockCache(BlockCache cache, Iterator<CachedBlock> iterator) { 1217 int refCount; 1218 while (iterator.hasNext()) { 1219 CachedBlock next = iterator.next(); 1220 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 1221 if (cache instanceof BucketCache) { 1222 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 1223 LOG.info("BucketCache {} {}", cacheKey, refCount); 1224 } else if (cache instanceof CombinedBlockCache) { 1225 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 1226 LOG.info("CombinedBlockCache {} {}", cacheKey, refCount); 1227 } else { 1228 continue; 1229 } 1230 assertEquals(0, refCount); 1231 } 1232 } 1233 1234 private void insertData(Table table) throws IOException { 1235 Put put = new Put(ROW); 1236 put.addColumn(FAMILY, QUALIFIER, data); 1237 table.put(put); 1238 put = new Put(ROW1); 1239 put.addColumn(FAMILY, QUALIFIER, data); 1240 table.put(put); 1241 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 1242 put = new Put(ROW); 1243 put.addColumn(FAMILY, QUALIFIER2, data2); 1244 table.put(put); 1245 } 1246 1247 private ScanThread[] initiateScan(Table table, boolean reverse) 1248 throws IOException, InterruptedException { 1249 ScanThread[] scanThreads = new ScanThread[NO_OF_THREADS]; 1250 for (int i = 0; i < NO_OF_THREADS; i++) { 1251 scanThreads[i] = new ScanThread(table, reverse); 1252 } 1253 for (ScanThread thread : scanThreads) { 1254 thread.start(); 1255 } 1256 return scanThreads; 1257 } 1258 1259 private GetThread[] initiateGet(Table table, boolean tracker, boolean multipleCFs) 1260 throws IOException, InterruptedException { 1261 GetThread[] getThreads = new GetThread[NO_OF_THREADS]; 1262 for (int i = 0; i < NO_OF_THREADS; i++) { 1263 getThreads[i] = new GetThread(table, tracker, multipleCFs); 1264 } 1265 for (GetThread thread : getThreads) { 1266 thread.start(); 1267 } 1268 return getThreads; 1269 } 1270 1271 private MultiGetThread[] initiateMultiGet(Table table) throws IOException, InterruptedException { 1272 MultiGetThread[] multiGetThreads = new MultiGetThread[NO_OF_THREADS]; 1273 for (int i = 0; i < NO_OF_THREADS; i++) { 1274 multiGetThreads[i] = new MultiGetThread(table); 1275 } 1276 for (MultiGetThread thread : multiGetThreads) { 1277 thread.start(); 1278 } 1279 return multiGetThreads; 1280 } 1281 1282 private void checkForBlockEviction(BlockCache cache, boolean getClosed, boolean expectOnlyZero) 1283 throws InterruptedException { 1284 int counter = NO_OF_THREADS; 1285 if (CustomInnerRegionObserver.waitForGets.get()) { 1286 // Because only one row is selected, it has only 2 blocks 1287 counter = counter - 1; 1288 while (CustomInnerRegionObserver.countOfGets.get() < NO_OF_THREADS) { 1289 Thread.sleep(100); 1290 } 1291 } else { 1292 while (CustomInnerRegionObserver.countOfNext.get() < NO_OF_THREADS) { 1293 Thread.sleep(100); 1294 } 1295 } 1296 Iterator<CachedBlock> iterator = cache.iterator(); 1297 int refCount = 0; 1298 while (iterator.hasNext()) { 1299 CachedBlock next = iterator.next(); 1300 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 1301 if (cache instanceof BucketCache) { 1302 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 1303 } else if (cache instanceof CombinedBlockCache) { 1304 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 1305 } else { 1306 continue; 1307 } 1308 System.out.println(" the refcount is " + refCount + " block is " + cacheKey); 1309 if (CustomInnerRegionObserver.waitForGets.get()) { 1310 if (expectOnlyZero) { 1311 assertTrue(refCount == 0); 1312 } 1313 if (refCount != 0) { 1314 // Because the scan would have also touched up on these blocks but 1315 // it 1316 // would have touched 1317 // all 3 1318 if (getClosed) { 1319 // If get has closed only the scan's blocks would be available 1320 assertEquals(refCount, CustomInnerRegionObserver.countOfGets.get()); 1321 } else { 1322 assertEquals(refCount, CustomInnerRegionObserver.countOfGets.get() + (NO_OF_THREADS)); 1323 } 1324 } 1325 } else { 1326 // Because the get would have also touched up on these blocks but it 1327 // would have touched 1328 // upon only 2 additionally 1329 if (expectOnlyZero) { 1330 assertTrue(refCount == 0); 1331 } 1332 if (refCount != 0) { 1333 if (getLatch == null) { 1334 assertEquals(refCount, CustomInnerRegionObserver.countOfNext.get()); 1335 } else { 1336 assertEquals(refCount, CustomInnerRegionObserver.countOfNext.get() + (NO_OF_THREADS)); 1337 } 1338 } 1339 } 1340 } 1341 CustomInnerRegionObserver.getCdl().get().countDown(); 1342 } 1343 1344 private static class MultiGetThread extends Thread { 1345 private final Table table; 1346 private final List<Get> gets = new ArrayList<>(); 1347 1348 public MultiGetThread(Table table) { 1349 this.table = table; 1350 } 1351 1352 @Override 1353 public void run() { 1354 gets.add(new Get(ROW)); 1355 gets.add(new Get(ROW1)); 1356 try { 1357 CustomInnerRegionObserver.getCdl().set(latch); 1358 Result[] r = table.get(gets); 1359 assertTrue(Bytes.equals(r[0].getRow(), ROW)); 1360 assertTrue(Bytes.equals(r[1].getRow(), ROW1)); 1361 } catch (IOException e) { 1362 } 1363 } 1364 } 1365 1366 private static class GetThread extends Thread { 1367 private final Table table; 1368 private final boolean tracker; 1369 private final boolean multipleCFs; 1370 1371 public GetThread(Table table, boolean tracker, boolean multipleCFs) { 1372 this.table = table; 1373 this.tracker = tracker; 1374 this.multipleCFs = multipleCFs; 1375 } 1376 1377 @Override 1378 public void run() { 1379 try { 1380 initiateGet(table); 1381 } catch (IOException e) { 1382 // do nothing 1383 } 1384 } 1385 1386 private void initiateGet(Table table) throws IOException { 1387 Get get = new Get(ROW); 1388 if (tracker) { 1389 // Change this 1390 if (!multipleCFs) { 1391 get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 3)); 1392 get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 8)); 1393 get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 9)); 1394 // Unknown key 1395 get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 900)); 1396 } else { 1397 get.addColumn(Bytes.toBytes("testFamily" + 3), Bytes.toBytes("testQualifier" + 3)); 1398 get.addColumn(Bytes.toBytes("testFamily" + 8), Bytes.toBytes("testQualifier" + 8)); 1399 get.addColumn(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 9)); 1400 // Unknown key 1401 get.addColumn(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 900)); 1402 } 1403 } 1404 CustomInnerRegionObserver.getCdl().set(latch); 1405 Result r = table.get(get); 1406 System.out.println(r); 1407 if (!tracker) { 1408 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data)); 1409 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2)); 1410 } else { 1411 if (!multipleCFs) { 1412 assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 3)), data2)); 1413 assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 8)), data2)); 1414 assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 9)), data2)); 1415 } else { 1416 assertTrue(Bytes.equals( 1417 r.getValue(Bytes.toBytes("testFamily" + 3), Bytes.toBytes("testQualifier" + 3)), 1418 data2)); 1419 assertTrue(Bytes.equals( 1420 r.getValue(Bytes.toBytes("testFamily" + 8), Bytes.toBytes("testQualifier" + 8)), 1421 data2)); 1422 assertTrue(Bytes.equals( 1423 r.getValue(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 9)), 1424 data2)); 1425 } 1426 } 1427 } 1428 } 1429 1430 private static class ScanThread extends Thread { 1431 private final Table table; 1432 private final boolean reverse; 1433 1434 public ScanThread(Table table, boolean reverse) { 1435 this.table = table; 1436 this.reverse = reverse; 1437 } 1438 1439 @Override 1440 public void run() { 1441 try { 1442 initiateScan(table); 1443 } catch (IOException e) { 1444 // do nothing 1445 } 1446 } 1447 1448 private void initiateScan(Table table) throws IOException { 1449 Scan scan = new Scan(); 1450 if (reverse) { 1451 scan.setReversed(true); 1452 } 1453 CustomInnerRegionObserver.getCdl().set(latch); 1454 ResultScanner resScanner = table.getScanner(scan); 1455 int i = (reverse ? ROWS.length - 1 : 0); 1456 boolean resultFound = false; 1457 for (Result result : resScanner) { 1458 resultFound = true; 1459 System.out.println(result); 1460 if (!reverse) { 1461 assertTrue(Bytes.equals(result.getRow(), ROWS[i])); 1462 i++; 1463 } else { 1464 assertTrue(Bytes.equals(result.getRow(), ROWS[i])); 1465 i--; 1466 } 1467 } 1468 assertTrue(resultFound); 1469 } 1470 } 1471 1472 private void waitForStoreFileCount(HStore store, int count, int timeout) 1473 throws InterruptedException { 1474 long start = EnvironmentEdgeManager.currentTime(); 1475 while ( 1476 start + timeout > EnvironmentEdgeManager.currentTime() && store.getStorefilesCount() != count 1477 ) { 1478 Thread.sleep(100); 1479 } 1480 System.out.println("start=" + start + ", now=" + EnvironmentEdgeManager.currentTime() + ", cur=" 1481 + store.getStorefilesCount()); 1482 assertEquals(count, store.getStorefilesCount()); 1483 } 1484 1485 private static class CustomScanner implements RegionScanner { 1486 1487 private RegionScanner delegate; 1488 1489 public CustomScanner(RegionScanner delegate) { 1490 this.delegate = delegate; 1491 } 1492 1493 @Override 1494 public boolean next(List<? super ExtendedCell> results) throws IOException { 1495 return delegate.next(results); 1496 } 1497 1498 @Override 1499 public boolean next(List<? super ExtendedCell> result, ScannerContext scannerContext) 1500 throws IOException { 1501 return delegate.next(result, scannerContext); 1502 } 1503 1504 @Override 1505 public boolean nextRaw(List<? super ExtendedCell> result) throws IOException { 1506 return delegate.nextRaw(result); 1507 } 1508 1509 @Override 1510 public boolean nextRaw(List<? super ExtendedCell> result, ScannerContext context) 1511 throws IOException { 1512 boolean nextRaw = delegate.nextRaw(result, context); 1513 if (compactionLatch != null && compactionLatch.getCount() > 0) { 1514 try { 1515 compactionLatch.await(); 1516 } catch (InterruptedException ie) { 1517 } 1518 } 1519 1520 if (CustomInnerRegionObserver.throwException.get()) { 1521 if (exceptionLatch.getCount() > 0) { 1522 try { 1523 exceptionLatch.await(); 1524 } catch (InterruptedException e) { 1525 } 1526 throw new IOException("throw exception"); 1527 } 1528 } 1529 return nextRaw; 1530 } 1531 1532 @Override 1533 public Set<Path> getFilesRead() { 1534 return delegate.getFilesRead(); 1535 } 1536 1537 @Override 1538 public void close() throws IOException { 1539 delegate.close(); 1540 } 1541 1542 @Override 1543 public RegionInfo getRegionInfo() { 1544 return delegate.getRegionInfo(); 1545 } 1546 1547 @Override 1548 public boolean isFilterDone() throws IOException { 1549 return delegate.isFilterDone(); 1550 } 1551 1552 @Override 1553 public boolean reseek(byte[] row) throws IOException { 1554 return false; 1555 } 1556 1557 @Override 1558 public long getMaxResultSize() { 1559 return delegate.getMaxResultSize(); 1560 } 1561 1562 @Override 1563 public long getMvccReadPoint() { 1564 return delegate.getMvccReadPoint(); 1565 } 1566 1567 @Override 1568 public int getBatch() { 1569 return delegate.getBatch(); 1570 } 1571 } 1572 1573 public static class CustomInnerRegionObserverWrapper extends CustomInnerRegionObserver { 1574 @Override 1575 public RegionScanner postScannerOpen(ObserverContext<? extends RegionCoprocessorEnvironment> e, 1576 Scan scan, RegionScanner s) throws IOException { 1577 return new CustomScanner(s); 1578 } 1579 } 1580 1581 public static class CustomInnerRegionObserver implements RegionCoprocessor, RegionObserver { 1582 static final AtomicInteger countOfNext = new AtomicInteger(0); 1583 static final AtomicInteger countOfGets = new AtomicInteger(0); 1584 static final AtomicBoolean waitForGets = new AtomicBoolean(false); 1585 static final AtomicBoolean throwException = new AtomicBoolean(false); 1586 private static final AtomicReference<CountDownLatch> cdl = 1587 new AtomicReference<>(new CountDownLatch(0)); 1588 1589 @Override 1590 public Optional<RegionObserver> getRegionObserver() { 1591 return Optional.of(this); 1592 } 1593 1594 @Override 1595 public boolean postScannerNext(ObserverContext<? extends RegionCoprocessorEnvironment> e, 1596 InternalScanner s, List<Result> results, int limit, boolean hasMore) throws IOException { 1597 slowdownCode(e, false); 1598 if (getLatch != null && getLatch.getCount() > 0) { 1599 try { 1600 getLatch.await(); 1601 } catch (InterruptedException e1) { 1602 } 1603 } 1604 return hasMore; 1605 } 1606 1607 @Override 1608 public void postGetOp(ObserverContext<? extends RegionCoprocessorEnvironment> e, Get get, 1609 List<Cell> results) throws IOException { 1610 slowdownCode(e, true); 1611 } 1612 1613 public static AtomicReference<CountDownLatch> getCdl() { 1614 return cdl; 1615 } 1616 1617 private void slowdownCode(final ObserverContext<? extends RegionCoprocessorEnvironment> e, 1618 boolean isGet) { 1619 CountDownLatch latch = getCdl().get(); 1620 try { 1621 System.out.println(latch.getCount() + " is the count " + isGet); 1622 if (latch.getCount() > 0) { 1623 if (isGet) { 1624 countOfGets.incrementAndGet(); 1625 } else { 1626 countOfNext.incrementAndGet(); 1627 } 1628 LOG.info("Waiting for the counterCountDownLatch"); 1629 latch.await(2, TimeUnit.MINUTES); // To help the tests to finish. 1630 if (latch.getCount() > 0) { 1631 throw new RuntimeException("Can't wait more"); 1632 } 1633 } 1634 } catch (InterruptedException e1) { 1635 LOG.error(e1.toString(), e1); 1636 } 1637 } 1638 } 1639}