001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.Iterator; 026import java.util.List; 027import java.util.Optional; 028import java.util.concurrent.CountDownLatch; 029import java.util.concurrent.TimeUnit; 030import java.util.concurrent.atomic.AtomicBoolean; 031import java.util.concurrent.atomic.AtomicInteger; 032import java.util.concurrent.atomic.AtomicReference; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.Cell; 035import org.apache.hadoop.hbase.HBaseClassTestRule; 036import org.apache.hadoop.hbase.HBaseTestingUtility; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.HRegionLocation; 039import org.apache.hadoop.hbase.HTableDescriptor; 040import org.apache.hadoop.hbase.ServerName; 041import org.apache.hadoop.hbase.TableName; 042import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 043import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; 044import org.apache.hadoop.hbase.coprocessor.ObserverContext; 045import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 046import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 047import org.apache.hadoop.hbase.coprocessor.RegionObserver; 048import org.apache.hadoop.hbase.io.hfile.BlockCache; 049import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 050import org.apache.hadoop.hbase.io.hfile.CacheConfig; 051import org.apache.hadoop.hbase.io.hfile.CachedBlock; 052import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache; 053import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; 054import org.apache.hadoop.hbase.regionserver.BloomType; 055import org.apache.hadoop.hbase.regionserver.HRegion; 056import org.apache.hadoop.hbase.regionserver.HStore; 057import org.apache.hadoop.hbase.regionserver.InternalScanner; 058import org.apache.hadoop.hbase.regionserver.RegionScanner; 059import org.apache.hadoop.hbase.regionserver.ScannerContext; 060import org.apache.hadoop.hbase.testclassification.ClientTests; 061import org.apache.hadoop.hbase.testclassification.LargeTests; 062import org.apache.hadoop.hbase.util.Bytes; 063import org.junit.After; 064import org.junit.AfterClass; 065import org.junit.Before; 066import org.junit.BeforeClass; 067import org.junit.ClassRule; 068import org.junit.Rule; 069import org.junit.Test; 070import org.junit.experimental.categories.Category; 071import org.junit.rules.TestName; 072import org.slf4j.Logger; 073import org.slf4j.LoggerFactory; 074import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 075 076@Category({ LargeTests.class, ClientTests.class }) 077@SuppressWarnings("deprecation") 078public class TestBlockEvictionFromClient { 079 080 @ClassRule 081 public static final HBaseClassTestRule CLASS_RULE = 082 HBaseClassTestRule.forClass(TestBlockEvictionFromClient.class); 083 084 private static final Logger LOG = LoggerFactory.getLogger(TestBlockEvictionFromClient.class); 085 protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 086 static byte[][] ROWS = new byte[2][]; 087 private static int NO_OF_THREADS = 3; 088 private static byte[] ROW = Bytes.toBytes("testRow"); 089 private static byte[] ROW1 = Bytes.toBytes("testRow1"); 090 private static byte[] ROW2 = Bytes.toBytes("testRow2"); 091 private static byte[] ROW3 = Bytes.toBytes("testRow3"); 092 private static byte[] FAMILY = Bytes.toBytes("testFamily"); 093 private static byte[][] FAMILIES_1 = new byte[1][0]; 094 private static byte[] QUALIFIER = Bytes.toBytes("testQualifier"); 095 private static byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 096 private static byte[] data = new byte[1000]; 097 private static byte[] data2 = Bytes.add(data, data); 098 protected static int SLAVES = 1; 099 private static CountDownLatch latch; 100 private static CountDownLatch getLatch; 101 private static CountDownLatch compactionLatch; 102 private static CountDownLatch exceptionLatch; 103 104 @Rule 105 public TestName name = new TestName(); 106 107 @BeforeClass 108 public static void setUpBeforeClass() throws Exception { 109 ROWS[0] = ROW; 110 ROWS[1] = ROW1; 111 Configuration conf = TEST_UTIL.getConfiguration(); 112 conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, 113 MultiRowMutationEndpoint.class.getName()); 114 conf.setInt("hbase.regionserver.handler.count", 20); 115 conf.setInt("hbase.bucketcache.size", 400); 116 conf.setStrings(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap"); 117 conf.setFloat("hfile.block.cache.size", 0.2f); 118 conf.setFloat("hbase.regionserver.global.memstore.size", 0.1f); 119 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);// do not retry 120 conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 5000); 121 FAMILIES_1[0] = FAMILY; 122 TEST_UTIL.startMiniCluster(SLAVES); 123 } 124 125 @AfterClass 126 public static void tearDownAfterClass() throws Exception { 127 TEST_UTIL.shutdownMiniCluster(); 128 } 129 130 @Before 131 public void setUp() throws Exception { 132 CustomInnerRegionObserver.waitForGets.set(false); 133 CustomInnerRegionObserver.countOfNext.set(0); 134 CustomInnerRegionObserver.countOfGets.set(0); 135 } 136 137 @After 138 public void tearDown() throws Exception { 139 if (latch != null) { 140 while (latch.getCount() > 0) { 141 latch.countDown(); 142 } 143 } 144 if (getLatch != null) { 145 getLatch.countDown(); 146 } 147 if (compactionLatch != null) { 148 compactionLatch.countDown(); 149 } 150 if (exceptionLatch != null) { 151 exceptionLatch.countDown(); 152 } 153 latch = null; 154 getLatch = null; 155 compactionLatch = null; 156 exceptionLatch = null; 157 CustomInnerRegionObserver.throwException.set(false); 158 // Clean up the tables for every test case 159 TableName[] listTableNames = TEST_UTIL.getAdmin().listTableNames(); 160 for (TableName tableName : listTableNames) { 161 if (!tableName.isSystemTable()) { 162 TEST_UTIL.getAdmin().disableTable(tableName); 163 TEST_UTIL.getAdmin().deleteTable(tableName); 164 } 165 } 166 } 167 168 @Test 169 public void testBlockEvictionWithParallelScans() throws Exception { 170 Table table = null; 171 try { 172 latch = new CountDownLatch(1); 173 final TableName tableName = TableName.valueOf(name.getMethodName()); 174 // Create a table with block size as 1024 175 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 176 CustomInnerRegionObserver.class.getName()); 177 // get the block cache and region 178 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 179 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); 180 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName) 181 .getRegion(regionName); 182 HStore store = region.getStores().iterator().next(); 183 CacheConfig cacheConf = store.getCacheConfig(); 184 cacheConf.setCacheDataOnWrite(true); 185 cacheConf.setEvictOnClose(true); 186 BlockCache cache = cacheConf.getBlockCache().get(); 187 188 // insert data. 2 Rows are added 189 Put put = new Put(ROW); 190 put.addColumn(FAMILY, QUALIFIER, data); 191 table.put(put); 192 put = new Put(ROW1); 193 put.addColumn(FAMILY, QUALIFIER, data); 194 table.put(put); 195 assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data)); 196 // data was in memstore so don't expect any changes 197 // flush the data 198 // Should create one Hfile with 2 blocks 199 region.flush(true); 200 // Load cache 201 // Create three sets of scan 202 ScanThread[] scanThreads = initiateScan(table, false); 203 Thread.sleep(100); 204 checkForBlockEviction(cache, false, false); 205 for (ScanThread thread : scanThreads) { 206 thread.join(); 207 } 208 // CustomInnerRegionObserver.sleepTime.set(0); 209 Iterator<CachedBlock> iterator = cache.iterator(); 210 iterateBlockCache(cache, iterator); 211 // read the data and expect same blocks, one new hit, no misses 212 assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data)); 213 iterator = cache.iterator(); 214 iterateBlockCache(cache, iterator); 215 // Check how this miss is happening 216 // insert a second column, read the row, no new blocks, 3 new hits 217 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 218 byte[] data2 = Bytes.add(data, data); 219 put = new Put(ROW); 220 put.addColumn(FAMILY, QUALIFIER2, data2); 221 table.put(put); 222 Result r = table.get(new Get(ROW)); 223 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data)); 224 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2)); 225 iterator = cache.iterator(); 226 iterateBlockCache(cache, iterator); 227 // flush, one new block 228 System.out.println("Flushing cache"); 229 region.flush(true); 230 iterator = cache.iterator(); 231 iterateBlockCache(cache, iterator); 232 // compact, net minus two blocks, two hits, no misses 233 System.out.println("Compacting"); 234 assertEquals(2, store.getStorefilesCount()); 235 store.triggerMajorCompaction(); 236 region.compact(true); 237 waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max 238 assertEquals(1, store.getStorefilesCount()); 239 iterator = cache.iterator(); 240 iterateBlockCache(cache, iterator); 241 // read the row, this should be a cache miss because we don't cache data 242 // blocks on compaction 243 r = table.get(new Get(ROW)); 244 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data)); 245 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2)); 246 iterator = cache.iterator(); 247 iterateBlockCache(cache, iterator); 248 } finally { 249 if (table != null) { 250 table.close(); 251 } 252 } 253 } 254 255 @Test 256 public void testParallelGetsAndScans() throws IOException, InterruptedException { 257 Table table = null; 258 try { 259 latch = new CountDownLatch(2); 260 // Check if get() returns blocks on its close() itself 261 getLatch = new CountDownLatch(1); 262 final TableName tableName = TableName.valueOf(name.getMethodName()); 263 // Create KV that will give you two blocks 264 // Create a table with block size as 1024 265 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 266 CustomInnerRegionObserver.class.getName()); 267 // get the block cache and region 268 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 269 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); 270 HRegion region = 271 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 272 HStore store = region.getStores().iterator().next(); 273 CacheConfig cacheConf = store.getCacheConfig(); 274 cacheConf.setCacheDataOnWrite(true); 275 cacheConf.setEvictOnClose(true); 276 BlockCache cache = cacheConf.getBlockCache().get(); 277 278 insertData(table); 279 // flush the data 280 System.out.println("Flushing cache"); 281 // Should create one Hfile with 2 blocks 282 region.flush(true); 283 // Create three sets of scan 284 CustomInnerRegionObserver.waitForGets.set(true); 285 ScanThread[] scanThreads = initiateScan(table, false); 286 // Create three sets of gets 287 GetThread[] getThreads = initiateGet(table, false, false); 288 checkForBlockEviction(cache, false, false); 289 CustomInnerRegionObserver.waitForGets.set(false); 290 checkForBlockEviction(cache, false, false); 291 for (GetThread thread : getThreads) { 292 thread.join(); 293 } 294 // Verify whether the gets have returned the blocks that it had 295 CustomInnerRegionObserver.waitForGets.set(true); 296 // giving some time for the block to be decremented 297 checkForBlockEviction(cache, true, false); 298 getLatch.countDown(); 299 for (ScanThread thread : scanThreads) { 300 thread.join(); 301 } 302 System.out.println("Scans should have returned the bloks"); 303 // Check with either true or false 304 CustomInnerRegionObserver.waitForGets.set(false); 305 // The scan should also have released the blocks by now 306 checkForBlockEviction(cache, true, true); 307 } finally { 308 if (table != null) { 309 table.close(); 310 } 311 } 312 } 313 314 @Test 315 public void testGetWithCellsInDifferentFiles() throws IOException, InterruptedException { 316 Table table = null; 317 try { 318 latch = new CountDownLatch(1); 319 // Check if get() returns blocks on its close() itself 320 getLatch = new CountDownLatch(1); 321 final TableName tableName = TableName.valueOf(name.getMethodName()); 322 // Create KV that will give you two blocks 323 // Create a table with block size as 1024 324 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 325 CustomInnerRegionObserver.class.getName()); 326 // get the block cache and region 327 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 328 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); 329 HRegion region = 330 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 331 HStore store = region.getStores().iterator().next(); 332 CacheConfig cacheConf = store.getCacheConfig(); 333 cacheConf.setCacheDataOnWrite(true); 334 cacheConf.setEvictOnClose(true); 335 BlockCache cache = cacheConf.getBlockCache().get(); 336 337 Put put = new Put(ROW); 338 put.addColumn(FAMILY, QUALIFIER, data); 339 table.put(put); 340 region.flush(true); 341 put = new Put(ROW1); 342 put.addColumn(FAMILY, QUALIFIER, data); 343 table.put(put); 344 region.flush(true); 345 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 346 put = new Put(ROW); 347 put.addColumn(FAMILY, QUALIFIER2, data2); 348 table.put(put); 349 region.flush(true); 350 // flush the data 351 System.out.println("Flushing cache"); 352 // Should create one Hfile with 2 blocks 353 CustomInnerRegionObserver.waitForGets.set(true); 354 // Create three sets of gets 355 GetThread[] getThreads = initiateGet(table, false, false); 356 Thread.sleep(200); 357 CustomInnerRegionObserver.getCdl().get().countDown(); 358 for (GetThread thread : getThreads) { 359 thread.join(); 360 } 361 // Verify whether the gets have returned the blocks that it had 362 CustomInnerRegionObserver.waitForGets.set(true); 363 // giving some time for the block to be decremented 364 checkForBlockEviction(cache, true, false); 365 getLatch.countDown(); 366 System.out.println("Gets should have returned the bloks"); 367 } finally { 368 if (table != null) { 369 table.close(); 370 } 371 } 372 } 373 374 @Test 375 // TODO : check how block index works here 376 public void testGetsWithMultiColumnsAndExplicitTracker() 377 throws IOException, InterruptedException { 378 Table table = null; 379 try { 380 latch = new CountDownLatch(1); 381 // Check if get() returns blocks on its close() itself 382 getLatch = new CountDownLatch(1); 383 final TableName tableName = TableName.valueOf(name.getMethodName()); 384 // Create KV that will give you two blocks 385 // Create a table with block size as 1024 386 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 387 CustomInnerRegionObserver.class.getName()); 388 // get the block cache and region 389 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 390 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); 391 HRegion region = 392 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 393 BlockCache cache = setCacheProperties(region); 394 Put put = new Put(ROW); 395 put.addColumn(FAMILY, QUALIFIER, data); 396 table.put(put); 397 region.flush(true); 398 put = new Put(ROW1); 399 put.addColumn(FAMILY, QUALIFIER, data); 400 table.put(put); 401 region.flush(true); 402 for (int i = 1; i < 10; i++) { 403 put = new Put(ROW); 404 put.addColumn(FAMILY, Bytes.toBytes("testQualifier" + i), data2); 405 table.put(put); 406 if (i % 2 == 0) { 407 region.flush(true); 408 } 409 } 410 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 411 put = new Put(ROW); 412 put.addColumn(FAMILY, QUALIFIER2, data2); 413 table.put(put); 414 region.flush(true); 415 // flush the data 416 System.out.println("Flushing cache"); 417 // Should create one Hfile with 2 blocks 418 CustomInnerRegionObserver.waitForGets.set(true); 419 // Create three sets of gets 420 GetThread[] getThreads = initiateGet(table, true, false); 421 Thread.sleep(200); 422 Iterator<CachedBlock> iterator = cache.iterator(); 423 boolean usedBlocksFound = false; 424 int refCount = 0; 425 int noOfBlocksWithRef = 0; 426 while (iterator.hasNext()) { 427 CachedBlock next = iterator.next(); 428 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 429 if (cache instanceof BucketCache) { 430 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 431 } else if (cache instanceof CombinedBlockCache) { 432 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 433 } else { 434 continue; 435 } 436 if (refCount != 0) { 437 // Blocks will be with count 3 438 System.out.println("The refCount is " + refCount); 439 assertEquals(NO_OF_THREADS, refCount); 440 usedBlocksFound = true; 441 noOfBlocksWithRef++; 442 } 443 } 444 assertTrue(usedBlocksFound); 445 // the number of blocks referred 446 assertEquals(10, noOfBlocksWithRef); 447 CustomInnerRegionObserver.getCdl().get().countDown(); 448 for (GetThread thread : getThreads) { 449 thread.join(); 450 } 451 // Verify whether the gets have returned the blocks that it had 452 CustomInnerRegionObserver.waitForGets.set(true); 453 // giving some time for the block to be decremented 454 checkForBlockEviction(cache, true, false); 455 getLatch.countDown(); 456 System.out.println("Gets should have returned the bloks"); 457 } finally { 458 if (table != null) { 459 table.close(); 460 } 461 } 462 } 463 464 @Test 465 public void testGetWithMultipleColumnFamilies() throws IOException, InterruptedException { 466 Table table = null; 467 try { 468 latch = new CountDownLatch(1); 469 // Check if get() returns blocks on its close() itself 470 getLatch = new CountDownLatch(1); 471 final TableName tableName = TableName.valueOf(name.getMethodName()); 472 // Create KV that will give you two blocks 473 // Create a table with block size as 1024 474 byte[][] fams = new byte[10][]; 475 fams[0] = FAMILY; 476 for (int i = 1; i < 10; i++) { 477 fams[i] = (Bytes.toBytes("testFamily" + i)); 478 } 479 table = TEST_UTIL.createTable(tableName, fams, 1, 1024, 480 CustomInnerRegionObserver.class.getName()); 481 // get the block cache and region 482 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 483 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); 484 HRegion region = 485 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 486 BlockCache cache = setCacheProperties(region); 487 488 Put put = new Put(ROW); 489 put.addColumn(FAMILY, QUALIFIER, data); 490 table.put(put); 491 region.flush(true); 492 put = new Put(ROW1); 493 put.addColumn(FAMILY, QUALIFIER, data); 494 table.put(put); 495 region.flush(true); 496 for (int i = 1; i < 10; i++) { 497 put = new Put(ROW); 498 put.addColumn(Bytes.toBytes("testFamily" + i), Bytes.toBytes("testQualifier" + i), data2); 499 table.put(put); 500 if (i % 2 == 0) { 501 region.flush(true); 502 } 503 } 504 region.flush(true); 505 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 506 put = new Put(ROW); 507 put.addColumn(FAMILY, QUALIFIER2, data2); 508 table.put(put); 509 region.flush(true); 510 // flush the data 511 System.out.println("Flushing cache"); 512 // Should create one Hfile with 2 blocks 513 CustomInnerRegionObserver.waitForGets.set(true); 514 // Create three sets of gets 515 GetThread[] getThreads = initiateGet(table, true, true); 516 Thread.sleep(200); 517 Iterator<CachedBlock> iterator = cache.iterator(); 518 boolean usedBlocksFound = false; 519 int refCount = 0; 520 int noOfBlocksWithRef = 0; 521 while (iterator.hasNext()) { 522 CachedBlock next = iterator.next(); 523 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 524 if (cache instanceof BucketCache) { 525 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 526 } else if (cache instanceof CombinedBlockCache) { 527 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 528 } else { 529 continue; 530 } 531 if (refCount != 0) { 532 // Blocks will be with count 3 533 System.out.println("The refCount is " + refCount); 534 assertEquals(NO_OF_THREADS, refCount); 535 usedBlocksFound = true; 536 noOfBlocksWithRef++; 537 } 538 } 539 assertTrue(usedBlocksFound); 540 // the number of blocks referred 541 assertEquals(3, noOfBlocksWithRef); 542 CustomInnerRegionObserver.getCdl().get().countDown(); 543 for (GetThread thread : getThreads) { 544 thread.join(); 545 } 546 // Verify whether the gets have returned the blocks that it had 547 CustomInnerRegionObserver.waitForGets.set(true); 548 // giving some time for the block to be decremented 549 checkForBlockEviction(cache, true, false); 550 getLatch.countDown(); 551 System.out.println("Gets should have returned the bloks"); 552 } finally { 553 if (table != null) { 554 table.close(); 555 } 556 } 557 } 558 559 @Test 560 public void testBlockRefCountAfterSplits() throws IOException, InterruptedException { 561 Table table = null; 562 try { 563 final TableName tableName = TableName.valueOf(name.getMethodName()); 564 HTableDescriptor desc = TEST_UTIL.createTableDescriptor(tableName); 565 // This test expects rpc refcount of cached data blocks to be 0 after split. After split, 566 // two daughter regions are opened and a compaction is scheduled to get rid of reference 567 // of the parent region hfiles. Compaction will increase refcount of cached data blocks by 1. 568 // It is flakey since compaction can kick in anytime. To solve this issue, table is created 569 // with compaction disabled. 570 desc.setCompactionEnabled(false); 571 table = TEST_UTIL.createTable(desc, FAMILIES_1, null, BloomType.ROW, 1024, null); 572 // get the block cache and region 573 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 574 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 575 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 576 HStore store = region.getStores().iterator().next(); 577 CacheConfig cacheConf = store.getCacheConfig(); 578 cacheConf.setEvictOnClose(true); 579 BlockCache cache = cacheConf.getBlockCache().get(); 580 Put put = new Put(ROW); 581 put.addColumn(FAMILY, QUALIFIER, data); 582 table.put(put); 583 region.flush(true); 584 put = new Put(ROW1); 585 put.addColumn(FAMILY, QUALIFIER, data); 586 table.put(put); 587 region.flush(true); 588 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 589 put = new Put(ROW2); 590 put.addColumn(FAMILY, QUALIFIER2, data2); 591 table.put(put); 592 put = new Put(ROW3); 593 put.addColumn(FAMILY, QUALIFIER2, data2); 594 table.put(put); 595 region.flush(true); 596 ServerName rs = Iterables.getOnlyElement(TEST_UTIL.getAdmin().getRegionServers()); 597 int regionCount = TEST_UTIL.getAdmin().getRegions(rs).size(); 598 LOG.info("About to SPLIT on {} {}, count={}", Bytes.toString(ROW1), region.getRegionInfo(), 599 regionCount); 600 TEST_UTIL.getAdmin().split(tableName, ROW1); 601 // Wait for splits 602 TEST_UTIL.waitFor(60000, () -> TEST_UTIL.getAdmin().getRegions(rs).size() > regionCount); 603 List<HRegion> regions = TEST_UTIL.getMiniHBaseCluster().getRegionServer(rs).getRegions(); 604 for (HRegion r: regions) { 605 LOG.info("" + r.getCompactionState()); 606 TEST_UTIL.waitFor(30000, () -> r.getCompactionState().equals(CompactionState.NONE)); 607 } 608 LOG.info("Split finished, is region closed {} {}", region.isClosed(), cache); 609 Iterator<CachedBlock> iterator = cache.iterator(); 610 // Though the split had created the HalfStorefileReader - the firstkey and lastkey scanners 611 // should be closed inorder to return those blocks 612 iterateBlockCache(cache, iterator); 613 } finally { 614 if (table != null) { 615 table.close(); 616 } 617 } 618 } 619 620 @Test 621 public void testMultiGets() throws IOException, InterruptedException { 622 Table table = null; 623 try { 624 latch = new CountDownLatch(2); 625 // Check if get() returns blocks on its close() itself 626 getLatch = new CountDownLatch(1); 627 final TableName tableName = TableName.valueOf(name.getMethodName()); 628 // Create KV that will give you two blocks 629 // Create a table with block size as 1024 630 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 631 CustomInnerRegionObserver.class.getName()); 632 // get the block cache and region 633 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 634 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); 635 HRegion region = 636 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 637 HStore store = region.getStores().iterator().next(); 638 CacheConfig cacheConf = store.getCacheConfig(); 639 cacheConf.setCacheDataOnWrite(true); 640 cacheConf.setEvictOnClose(true); 641 BlockCache cache = cacheConf.getBlockCache().get(); 642 643 Put put = new Put(ROW); 644 put.addColumn(FAMILY, QUALIFIER, data); 645 table.put(put); 646 region.flush(true); 647 put = new Put(ROW1); 648 put.addColumn(FAMILY, QUALIFIER, data); 649 table.put(put); 650 region.flush(true); 651 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 652 put = new Put(ROW); 653 put.addColumn(FAMILY, QUALIFIER2, data2); 654 table.put(put); 655 region.flush(true); 656 // flush the data 657 System.out.println("Flushing cache"); 658 // Should create one Hfile with 2 blocks 659 CustomInnerRegionObserver.waitForGets.set(true); 660 // Create three sets of gets 661 MultiGetThread[] getThreads = initiateMultiGet(table); 662 Thread.sleep(200); 663 int refCount; 664 Iterator<CachedBlock> iterator = cache.iterator(); 665 boolean foundNonZeroBlock = false; 666 while (iterator.hasNext()) { 667 CachedBlock next = iterator.next(); 668 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 669 if (cache instanceof BucketCache) { 670 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 671 } else if (cache instanceof CombinedBlockCache) { 672 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 673 } else { 674 continue; 675 } 676 if (refCount != 0) { 677 assertEquals(NO_OF_THREADS, refCount); 678 foundNonZeroBlock = true; 679 } 680 } 681 assertTrue("Should have found nonzero ref count block",foundNonZeroBlock); 682 CustomInnerRegionObserver.getCdl().get().countDown(); 683 CustomInnerRegionObserver.getCdl().get().countDown(); 684 for (MultiGetThread thread : getThreads) { 685 thread.join(); 686 } 687 // Verify whether the gets have returned the blocks that it had 688 CustomInnerRegionObserver.waitForGets.set(true); 689 // giving some time for the block to be decremented 690 iterateBlockCache(cache, iterator); 691 getLatch.countDown(); 692 System.out.println("Gets should have returned the bloks"); 693 } finally { 694 if (table != null) { 695 table.close(); 696 } 697 } 698 } 699 @Test 700 public void testScanWithMultipleColumnFamilies() throws IOException, InterruptedException { 701 Table table = null; 702 try { 703 latch = new CountDownLatch(1); 704 // Check if get() returns blocks on its close() itself 705 final TableName tableName = TableName.valueOf(name.getMethodName()); 706 // Create KV that will give you two blocks 707 // Create a table with block size as 1024 708 byte[][] fams = new byte[10][]; 709 fams[0] = FAMILY; 710 for (int i = 1; i < 10; i++) { 711 fams[i] = (Bytes.toBytes("testFamily" + i)); 712 } 713 table = TEST_UTIL.createTable(tableName, fams, 1, 1024, 714 CustomInnerRegionObserver.class.getName()); 715 // get the block cache and region 716 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 717 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); 718 HRegion region = 719 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 720 BlockCache cache = setCacheProperties(region); 721 722 Put put = new Put(ROW); 723 put.addColumn(FAMILY, QUALIFIER, data); 724 table.put(put); 725 region.flush(true); 726 put = new Put(ROW1); 727 put.addColumn(FAMILY, QUALIFIER, data); 728 table.put(put); 729 region.flush(true); 730 for (int i = 1; i < 10; i++) { 731 put = new Put(ROW); 732 put.addColumn(Bytes.toBytes("testFamily" + i), Bytes.toBytes("testQualifier" + i), data2); 733 table.put(put); 734 if (i % 2 == 0) { 735 region.flush(true); 736 } 737 } 738 region.flush(true); 739 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 740 put = new Put(ROW); 741 put.addColumn(FAMILY, QUALIFIER2, data2); 742 table.put(put); 743 region.flush(true); 744 // flush the data 745 System.out.println("Flushing cache"); 746 // Should create one Hfile with 2 blocks 747 // Create three sets of gets 748 ScanThread[] scanThreads = initiateScan(table, true); 749 Thread.sleep(200); 750 Iterator<CachedBlock> iterator = cache.iterator(); 751 boolean usedBlocksFound = false; 752 int refCount = 0; 753 int noOfBlocksWithRef = 0; 754 while (iterator.hasNext()) { 755 CachedBlock next = iterator.next(); 756 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 757 if (cache instanceof BucketCache) { 758 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 759 } else if (cache instanceof CombinedBlockCache) { 760 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 761 } else { 762 continue; 763 } 764 if (refCount != 0) { 765 // Blocks will be with count 3 766 System.out.println("The refCount is " + refCount); 767 assertEquals(NO_OF_THREADS, refCount); 768 usedBlocksFound = true; 769 noOfBlocksWithRef++; 770 } 771 } 772 assertTrue(usedBlocksFound); 773 // the number of blocks referred 774 assertEquals(12, noOfBlocksWithRef); 775 CustomInnerRegionObserver.getCdl().get().countDown(); 776 for (ScanThread thread : scanThreads) { 777 thread.join(); 778 } 779 // giving some time for the block to be decremented 780 checkForBlockEviction(cache, true, false); 781 } finally { 782 if (table != null) { 783 table.close(); 784 } 785 } 786 } 787 788 private BlockCache setCacheProperties(HRegion region) { 789 Iterator<HStore> strItr = region.getStores().iterator(); 790 BlockCache cache = null; 791 while (strItr.hasNext()) { 792 HStore store = strItr.next(); 793 CacheConfig cacheConf = store.getCacheConfig(); 794 cacheConf.setCacheDataOnWrite(true); 795 cacheConf.setEvictOnClose(true); 796 // Use the last one 797 cache = cacheConf.getBlockCache().get(); 798 } 799 return cache; 800 } 801 802 @Test 803 public void testParallelGetsAndScanWithWrappedRegionScanner() throws IOException, 804 InterruptedException { 805 Table table = null; 806 try { 807 latch = new CountDownLatch(2); 808 // Check if get() returns blocks on its close() itself 809 getLatch = new CountDownLatch(1); 810 final TableName tableName = TableName.valueOf(name.getMethodName()); 811 // Create KV that will give you two blocks 812 // Create a table with block size as 1024 813 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 814 CustomInnerRegionObserverWrapper.class.getName()); 815 // get the block cache and region 816 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 817 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); 818 HRegion region = 819 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 820 HStore store = region.getStores().iterator().next(); 821 CacheConfig cacheConf = store.getCacheConfig(); 822 cacheConf.setCacheDataOnWrite(true); 823 cacheConf.setEvictOnClose(true); 824 BlockCache cache = cacheConf.getBlockCache().get(); 825 826 // insert data. 2 Rows are added 827 insertData(table); 828 // flush the data 829 System.out.println("Flushing cache"); 830 // Should create one Hfile with 2 blocks 831 region.flush(true); 832 // CustomInnerRegionObserver.sleepTime.set(5000); 833 // Create three sets of scan 834 CustomInnerRegionObserver.waitForGets.set(true); 835 ScanThread[] scanThreads = initiateScan(table, false); 836 // Create three sets of gets 837 GetThread[] getThreads = initiateGet(table, false, false); 838 // The block would have been decremented for the scan case as it was 839 // wrapped 840 // before even the postNext hook gets executed. 841 // giving some time for the block to be decremented 842 Thread.sleep(100); 843 CustomInnerRegionObserver.waitForGets.set(false); 844 checkForBlockEviction(cache, false, false); 845 // countdown the latch 846 CustomInnerRegionObserver.getCdl().get().countDown(); 847 for (GetThread thread : getThreads) { 848 thread.join(); 849 } 850 getLatch.countDown(); 851 for (ScanThread thread : scanThreads) { 852 thread.join(); 853 } 854 } finally { 855 if (table != null) { 856 table.close(); 857 } 858 } 859 } 860 861 @Test 862 public void testScanWithCompaction() throws IOException, InterruptedException { 863 testScanWithCompactionInternals(name.getMethodName(), false); 864 } 865 866 @Test 867 public void testReverseScanWithCompaction() throws IOException, InterruptedException { 868 testScanWithCompactionInternals(name.getMethodName(), true); 869 } 870 871 private void testScanWithCompactionInternals(String tableNameStr, boolean reversed) 872 throws IOException, InterruptedException { 873 Table table = null; 874 try { 875 latch = new CountDownLatch(1); 876 compactionLatch = new CountDownLatch(1); 877 TableName tableName = TableName.valueOf(tableNameStr); 878 // Create a table with block size as 1024 879 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 880 CustomInnerRegionObserverWrapper.class.getName()); 881 // get the block cache and region 882 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 883 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); 884 HRegion region = 885 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 886 HStore store = region.getStores().iterator().next(); 887 CacheConfig cacheConf = store.getCacheConfig(); 888 cacheConf.setCacheDataOnWrite(true); 889 cacheConf.setEvictOnClose(true); 890 BlockCache cache = cacheConf.getBlockCache().get(); 891 892 // insert data. 2 Rows are added 893 Put put = new Put(ROW); 894 put.addColumn(FAMILY, QUALIFIER, data); 895 table.put(put); 896 put = new Put(ROW1); 897 put.addColumn(FAMILY, QUALIFIER, data); 898 table.put(put); 899 assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data)); 900 // Should create one Hfile with 2 blocks 901 region.flush(true); 902 // read the data and expect same blocks, one new hit, no misses 903 int refCount = 0; 904 // Check how this miss is happening 905 // insert a second column, read the row, no new blocks, 3 new hits 906 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 907 byte[] data2 = Bytes.add(data, data); 908 put = new Put(ROW); 909 put.addColumn(FAMILY, QUALIFIER2, data2); 910 table.put(put); 911 // flush, one new block 912 System.out.println("Flushing cache"); 913 region.flush(true); 914 Iterator<CachedBlock> iterator = cache.iterator(); 915 iterateBlockCache(cache, iterator); 916 // Create three sets of scan 917 ScanThread[] scanThreads = initiateScan(table, reversed); 918 Thread.sleep(100); 919 iterator = cache.iterator(); 920 boolean usedBlocksFound = false; 921 while (iterator.hasNext()) { 922 CachedBlock next = iterator.next(); 923 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 924 if (cache instanceof BucketCache) { 925 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 926 } else if (cache instanceof CombinedBlockCache) { 927 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 928 } else { 929 continue; 930 } 931 if (refCount != 0) { 932 // Blocks will be with count 3 933 assertEquals(NO_OF_THREADS, refCount); 934 usedBlocksFound = true; 935 } 936 } 937 assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound); 938 usedBlocksFound = false; 939 System.out.println("Compacting"); 940 assertEquals(2, store.getStorefilesCount()); 941 store.triggerMajorCompaction(); 942 region.compact(true); 943 waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max 944 assertEquals(1, store.getStorefilesCount()); 945 // Even after compaction is done we will have some blocks that cannot 946 // be evicted this is because the scan is still referencing them 947 iterator = cache.iterator(); 948 while (iterator.hasNext()) { 949 CachedBlock next = iterator.next(); 950 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 951 if (cache instanceof BucketCache) { 952 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 953 } else if (cache instanceof CombinedBlockCache) { 954 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 955 } else { 956 continue; 957 } 958 if (refCount != 0) { 959 // Blocks will be with count 3 as they are not yet cleared 960 assertEquals(NO_OF_THREADS, refCount); 961 usedBlocksFound = true; 962 } 963 } 964 assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound); 965 // Should not throw exception 966 compactionLatch.countDown(); 967 latch.countDown(); 968 for (ScanThread thread : scanThreads) { 969 thread.join(); 970 } 971 // by this time all blocks should have been evicted 972 iterator = cache.iterator(); 973 iterateBlockCache(cache, iterator); 974 Result r = table.get(new Get(ROW)); 975 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data)); 976 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2)); 977 // The gets would be working on new blocks 978 iterator = cache.iterator(); 979 iterateBlockCache(cache, iterator); 980 } finally { 981 if (table != null) { 982 table.close(); 983 } 984 } 985 } 986 987 @Test 988 public void testBlockEvictionAfterHBASE13082WithCompactionAndFlush() 989 throws IOException, InterruptedException { 990 // do flush and scan in parallel 991 Table table = null; 992 try { 993 latch = new CountDownLatch(1); 994 compactionLatch = new CountDownLatch(1); 995 final TableName tableName = TableName.valueOf(name.getMethodName()); 996 // Create a table with block size as 1024 997 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 998 CustomInnerRegionObserverWrapper.class.getName()); 999 // get the block cache and region 1000 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 1001 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); 1002 HRegion region = 1003 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 1004 HStore store = region.getStores().iterator().next(); 1005 CacheConfig cacheConf = store.getCacheConfig(); 1006 cacheConf.setCacheDataOnWrite(true); 1007 cacheConf.setEvictOnClose(true); 1008 BlockCache cache = cacheConf.getBlockCache().get(); 1009 1010 // insert data. 2 Rows are added 1011 Put put = new Put(ROW); 1012 put.addColumn(FAMILY, QUALIFIER, data); 1013 table.put(put); 1014 put = new Put(ROW1); 1015 put.addColumn(FAMILY, QUALIFIER, data); 1016 table.put(put); 1017 assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data)); 1018 // Should create one Hfile with 2 blocks 1019 region.flush(true); 1020 // read the data and expect same blocks, one new hit, no misses 1021 int refCount = 0; 1022 // Check how this miss is happening 1023 // insert a second column, read the row, no new blocks, 3 new hits 1024 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 1025 byte[] data2 = Bytes.add(data, data); 1026 put = new Put(ROW); 1027 put.addColumn(FAMILY, QUALIFIER2, data2); 1028 table.put(put); 1029 // flush, one new block 1030 System.out.println("Flushing cache"); 1031 region.flush(true); 1032 Iterator<CachedBlock> iterator = cache.iterator(); 1033 iterateBlockCache(cache, iterator); 1034 // Create three sets of scan 1035 ScanThread[] scanThreads = initiateScan(table, false); 1036 Thread.sleep(100); 1037 iterator = cache.iterator(); 1038 boolean usedBlocksFound = false; 1039 while (iterator.hasNext()) { 1040 CachedBlock next = iterator.next(); 1041 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 1042 if (cache instanceof BucketCache) { 1043 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 1044 } else if (cache instanceof CombinedBlockCache) { 1045 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 1046 } else { 1047 continue; 1048 } 1049 if (refCount != 0) { 1050 // Blocks will be with count 3 1051 assertEquals(NO_OF_THREADS, refCount); 1052 usedBlocksFound = true; 1053 } 1054 } 1055 // Make a put and do a flush 1056 QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 1057 data2 = Bytes.add(data, data); 1058 put = new Put(ROW1); 1059 put.addColumn(FAMILY, QUALIFIER2, data2); 1060 table.put(put); 1061 // flush, one new block 1062 System.out.println("Flushing cache"); 1063 region.flush(true); 1064 assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound); 1065 usedBlocksFound = false; 1066 System.out.println("Compacting"); 1067 assertEquals(3, store.getStorefilesCount()); 1068 store.triggerMajorCompaction(); 1069 region.compact(true); 1070 waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max 1071 assertEquals(1, store.getStorefilesCount()); 1072 // Even after compaction is done we will have some blocks that cannot 1073 // be evicted this is because the scan is still referencing them 1074 iterator = cache.iterator(); 1075 while (iterator.hasNext()) { 1076 CachedBlock next = iterator.next(); 1077 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 1078 if (cache instanceof BucketCache) { 1079 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 1080 } else if (cache instanceof CombinedBlockCache) { 1081 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 1082 } else { 1083 continue; 1084 } 1085 if (refCount != 0) { 1086 // Blocks will be with count 3 as they are not yet cleared 1087 assertEquals(NO_OF_THREADS, refCount); 1088 usedBlocksFound = true; 1089 } 1090 } 1091 assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound); 1092 // Should not throw exception 1093 compactionLatch.countDown(); 1094 latch.countDown(); 1095 for (ScanThread thread : scanThreads) { 1096 thread.join(); 1097 } 1098 // by this time all blocks should have been evicted 1099 iterator = cache.iterator(); 1100 // Since a flush and compaction happened after a scan started 1101 // we need to ensure that all the original blocks of the compacted file 1102 // is also removed. 1103 iterateBlockCache(cache, iterator); 1104 Result r = table.get(new Get(ROW)); 1105 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data)); 1106 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2)); 1107 // The gets would be working on new blocks 1108 iterator = cache.iterator(); 1109 iterateBlockCache(cache, iterator); 1110 } finally { 1111 if (table != null) { 1112 table.close(); 1113 } 1114 } 1115 } 1116 1117 1118 @Test 1119 public void testScanWithException() throws IOException, InterruptedException { 1120 Table table = null; 1121 try { 1122 latch = new CountDownLatch(1); 1123 exceptionLatch = new CountDownLatch(1); 1124 final TableName tableName = TableName.valueOf(name.getMethodName()); 1125 // Create KV that will give you two blocks 1126 // Create a table with block size as 1024 1127 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 1128 CustomInnerRegionObserverWrapper.class.getName()); 1129 // get the block cache and region 1130 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 1131 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); 1132 HRegion region = 1133 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 1134 HStore store = region.getStores().iterator().next(); 1135 CacheConfig cacheConf = store.getCacheConfig(); 1136 cacheConf.setCacheDataOnWrite(true); 1137 cacheConf.setEvictOnClose(true); 1138 BlockCache cache = cacheConf.getBlockCache().get(); 1139 // insert data. 2 Rows are added 1140 insertData(table); 1141 // flush the data 1142 System.out.println("Flushing cache"); 1143 // Should create one Hfile with 2 blocks 1144 region.flush(true); 1145 // CustomInnerRegionObserver.sleepTime.set(5000); 1146 CustomInnerRegionObserver.throwException.set(true); 1147 ScanThread[] scanThreads = initiateScan(table, false); 1148 // The block would have been decremented for the scan case as it was 1149 // wrapped 1150 // before even the postNext hook gets executed. 1151 // giving some time for the block to be decremented 1152 Thread.sleep(100); 1153 Iterator<CachedBlock> iterator = cache.iterator(); 1154 boolean usedBlocksFound = false; 1155 int refCount = 0; 1156 while (iterator.hasNext()) { 1157 CachedBlock next = iterator.next(); 1158 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 1159 if (cache instanceof BucketCache) { 1160 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 1161 } else if (cache instanceof CombinedBlockCache) { 1162 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 1163 } else { 1164 continue; 1165 } 1166 if (refCount != 0) { 1167 // Blocks will be with count 3 1168 assertEquals(NO_OF_THREADS, refCount); 1169 usedBlocksFound = true; 1170 } 1171 } 1172 assertTrue(usedBlocksFound); 1173 exceptionLatch.countDown(); 1174 // countdown the latch 1175 CustomInnerRegionObserver.getCdl().get().countDown(); 1176 for (ScanThread thread : scanThreads) { 1177 thread.join(); 1178 } 1179 iterator = cache.iterator(); 1180 usedBlocksFound = false; 1181 refCount = 0; 1182 while (iterator.hasNext()) { 1183 CachedBlock next = iterator.next(); 1184 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 1185 if (cache instanceof BucketCache) { 1186 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 1187 } else if (cache instanceof CombinedBlockCache) { 1188 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 1189 } else { 1190 continue; 1191 } 1192 if (refCount != 0) { 1193 // Blocks will be with count 3 1194 assertEquals(NO_OF_THREADS, refCount); 1195 usedBlocksFound = true; 1196 } 1197 } 1198 assertFalse(usedBlocksFound); 1199 // you should always see 0 ref count. since after HBASE-16604 we always recreate the scanner 1200 assertEquals(0, refCount); 1201 } finally { 1202 if (table != null) { 1203 table.close(); 1204 } 1205 } 1206 } 1207 1208 private void iterateBlockCache(BlockCache cache, Iterator<CachedBlock> iterator) { 1209 int refCount; 1210 while (iterator.hasNext()) { 1211 CachedBlock next = iterator.next(); 1212 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 1213 if (cache instanceof BucketCache) { 1214 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 1215 LOG.info("BucketCache {} {}", cacheKey, refCount); 1216 } else if (cache instanceof CombinedBlockCache) { 1217 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 1218 LOG.info("CombinedBlockCache {} {}", cacheKey, refCount); 1219 } else { 1220 continue; 1221 } 1222 assertEquals(0, refCount); 1223 } 1224 } 1225 1226 private void insertData(Table table) throws IOException { 1227 Put put = new Put(ROW); 1228 put.addColumn(FAMILY, QUALIFIER, data); 1229 table.put(put); 1230 put = new Put(ROW1); 1231 put.addColumn(FAMILY, QUALIFIER, data); 1232 table.put(put); 1233 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 1234 put = new Put(ROW); 1235 put.addColumn(FAMILY, QUALIFIER2, data2); 1236 table.put(put); 1237 } 1238 1239 private ScanThread[] initiateScan(Table table, boolean reverse) throws IOException, 1240 InterruptedException { 1241 ScanThread[] scanThreads = new ScanThread[NO_OF_THREADS]; 1242 for (int i = 0; i < NO_OF_THREADS; i++) { 1243 scanThreads[i] = new ScanThread(table, reverse); 1244 } 1245 for (ScanThread thread : scanThreads) { 1246 thread.start(); 1247 } 1248 return scanThreads; 1249 } 1250 1251 private GetThread[] initiateGet(Table table, boolean tracker, boolean multipleCFs) 1252 throws IOException, InterruptedException { 1253 GetThread[] getThreads = new GetThread[NO_OF_THREADS]; 1254 for (int i = 0; i < NO_OF_THREADS; i++) { 1255 getThreads[i] = new GetThread(table, tracker, multipleCFs); 1256 } 1257 for (GetThread thread : getThreads) { 1258 thread.start(); 1259 } 1260 return getThreads; 1261 } 1262 1263 private MultiGetThread[] initiateMultiGet(Table table) 1264 throws IOException, InterruptedException { 1265 MultiGetThread[] multiGetThreads = new MultiGetThread[NO_OF_THREADS]; 1266 for (int i = 0; i < NO_OF_THREADS; i++) { 1267 multiGetThreads[i] = new MultiGetThread(table); 1268 } 1269 for (MultiGetThread thread : multiGetThreads) { 1270 thread.start(); 1271 } 1272 return multiGetThreads; 1273 } 1274 1275 private void checkForBlockEviction(BlockCache cache, boolean getClosed, boolean expectOnlyZero) 1276 throws InterruptedException { 1277 int counter = NO_OF_THREADS; 1278 if (CustomInnerRegionObserver.waitForGets.get()) { 1279 // Because only one row is selected, it has only 2 blocks 1280 counter = counter - 1; 1281 while (CustomInnerRegionObserver.countOfGets.get() < NO_OF_THREADS) { 1282 Thread.sleep(100); 1283 } 1284 } else { 1285 while (CustomInnerRegionObserver.countOfNext.get() < NO_OF_THREADS) { 1286 Thread.sleep(100); 1287 } 1288 } 1289 Iterator<CachedBlock> iterator = cache.iterator(); 1290 int refCount = 0; 1291 while (iterator.hasNext()) { 1292 CachedBlock next = iterator.next(); 1293 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 1294 if (cache instanceof BucketCache) { 1295 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 1296 } else if (cache instanceof CombinedBlockCache) { 1297 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 1298 } else { 1299 continue; 1300 } 1301 System.out.println(" the refcount is " + refCount + " block is " + cacheKey); 1302 if (CustomInnerRegionObserver.waitForGets.get()) { 1303 if (expectOnlyZero) { 1304 assertTrue(refCount == 0); 1305 } 1306 if (refCount != 0) { 1307 // Because the scan would have also touched up on these blocks but 1308 // it 1309 // would have touched 1310 // all 3 1311 if (getClosed) { 1312 // If get has closed only the scan's blocks would be available 1313 assertEquals(refCount, CustomInnerRegionObserver.countOfGets.get()); 1314 } else { 1315 assertEquals(refCount, CustomInnerRegionObserver.countOfGets.get() + (NO_OF_THREADS)); 1316 } 1317 } 1318 } else { 1319 // Because the get would have also touched up on these blocks but it 1320 // would have touched 1321 // upon only 2 additionally 1322 if (expectOnlyZero) { 1323 assertTrue(refCount == 0); 1324 } 1325 if (refCount != 0) { 1326 if (getLatch == null) { 1327 assertEquals(refCount, CustomInnerRegionObserver.countOfNext.get()); 1328 } else { 1329 assertEquals(refCount, CustomInnerRegionObserver.countOfNext.get() + (NO_OF_THREADS)); 1330 } 1331 } 1332 } 1333 } 1334 CustomInnerRegionObserver.getCdl().get().countDown(); 1335 } 1336 1337 private static class MultiGetThread extends Thread { 1338 private final Table table; 1339 private final List<Get> gets = new ArrayList<>(); 1340 public MultiGetThread(Table table) { 1341 this.table = table; 1342 } 1343 @Override 1344 public void run() { 1345 gets.add(new Get(ROW)); 1346 gets.add(new Get(ROW1)); 1347 try { 1348 CustomInnerRegionObserver.getCdl().set(latch); 1349 Result[] r = table.get(gets); 1350 assertTrue(Bytes.equals(r[0].getRow(), ROW)); 1351 assertTrue(Bytes.equals(r[1].getRow(), ROW1)); 1352 } catch (IOException e) { 1353 } 1354 } 1355 } 1356 1357 private static class GetThread extends Thread { 1358 private final Table table; 1359 private final boolean tracker; 1360 private final boolean multipleCFs; 1361 1362 public GetThread(Table table, boolean tracker, boolean multipleCFs) { 1363 this.table = table; 1364 this.tracker = tracker; 1365 this.multipleCFs = multipleCFs; 1366 } 1367 1368 @Override 1369 public void run() { 1370 try { 1371 initiateGet(table); 1372 } catch (IOException e) { 1373 // do nothing 1374 } 1375 } 1376 1377 private void initiateGet(Table table) throws IOException { 1378 Get get = new Get(ROW); 1379 if (tracker) { 1380 // Change this 1381 if (!multipleCFs) { 1382 get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 3)); 1383 get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 8)); 1384 get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 9)); 1385 // Unknown key 1386 get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 900)); 1387 } else { 1388 get.addColumn(Bytes.toBytes("testFamily" + 3), Bytes.toBytes("testQualifier" + 3)); 1389 get.addColumn(Bytes.toBytes("testFamily" + 8), Bytes.toBytes("testQualifier" + 8)); 1390 get.addColumn(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 9)); 1391 // Unknown key 1392 get.addColumn(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 900)); 1393 } 1394 } 1395 CustomInnerRegionObserver.getCdl().set(latch); 1396 Result r = table.get(get); 1397 System.out.println(r); 1398 if (!tracker) { 1399 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data)); 1400 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2)); 1401 } else { 1402 if (!multipleCFs) { 1403 assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 3)), data2)); 1404 assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 8)), data2)); 1405 assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 9)), data2)); 1406 } else { 1407 assertTrue(Bytes.equals( 1408 r.getValue(Bytes.toBytes("testFamily" + 3), Bytes.toBytes("testQualifier" + 3)), 1409 data2)); 1410 assertTrue(Bytes.equals( 1411 r.getValue(Bytes.toBytes("testFamily" + 8), Bytes.toBytes("testQualifier" + 8)), 1412 data2)); 1413 assertTrue(Bytes.equals( 1414 r.getValue(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 9)), 1415 data2)); 1416 } 1417 } 1418 } 1419 } 1420 1421 private static class ScanThread extends Thread { 1422 private final Table table; 1423 private final boolean reverse; 1424 1425 public ScanThread(Table table, boolean reverse) { 1426 this.table = table; 1427 this.reverse = reverse; 1428 } 1429 1430 @Override 1431 public void run() { 1432 try { 1433 initiateScan(table); 1434 } catch (IOException e) { 1435 // do nothing 1436 } 1437 } 1438 1439 private void initiateScan(Table table) throws IOException { 1440 Scan scan = new Scan(); 1441 if (reverse) { 1442 scan.setReversed(true); 1443 } 1444 CustomInnerRegionObserver.getCdl().set(latch); 1445 ResultScanner resScanner = table.getScanner(scan); 1446 int i = (reverse ? ROWS.length - 1 : 0); 1447 boolean resultFound = false; 1448 for (Result result : resScanner) { 1449 resultFound = true; 1450 System.out.println(result); 1451 if (!reverse) { 1452 assertTrue(Bytes.equals(result.getRow(), ROWS[i])); 1453 i++; 1454 } else { 1455 assertTrue(Bytes.equals(result.getRow(), ROWS[i])); 1456 i--; 1457 } 1458 } 1459 assertTrue(resultFound); 1460 } 1461 } 1462 1463 private void waitForStoreFileCount(HStore store, int count, int timeout) 1464 throws InterruptedException { 1465 long start = System.currentTimeMillis(); 1466 while (start + timeout > System.currentTimeMillis() && store.getStorefilesCount() != count) { 1467 Thread.sleep(100); 1468 } 1469 System.out.println("start=" + start + ", now=" + System.currentTimeMillis() + ", cur=" + 1470 store.getStorefilesCount()); 1471 assertEquals(count, store.getStorefilesCount()); 1472 } 1473 1474 private static class CustomScanner implements RegionScanner { 1475 1476 private RegionScanner delegate; 1477 1478 public CustomScanner(RegionScanner delegate) { 1479 this.delegate = delegate; 1480 } 1481 1482 @Override 1483 public boolean next(List<Cell> results) throws IOException { 1484 return delegate.next(results); 1485 } 1486 1487 @Override 1488 public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException { 1489 return delegate.next(result, scannerContext); 1490 } 1491 1492 @Override 1493 public boolean nextRaw(List<Cell> result) throws IOException { 1494 return delegate.nextRaw(result); 1495 } 1496 1497 @Override 1498 public boolean nextRaw(List<Cell> result, ScannerContext context) throws IOException { 1499 boolean nextRaw = delegate.nextRaw(result, context); 1500 if (compactionLatch != null && compactionLatch.getCount() > 0) { 1501 try { 1502 compactionLatch.await(); 1503 } catch (InterruptedException ie) { 1504 } 1505 } 1506 1507 if (CustomInnerRegionObserver.throwException.get()) { 1508 if (exceptionLatch.getCount() > 0) { 1509 try { 1510 exceptionLatch.await(); 1511 } catch (InterruptedException e) { 1512 } 1513 throw new IOException("throw exception"); 1514 } 1515 } 1516 return nextRaw; 1517 } 1518 1519 @Override 1520 public void close() throws IOException { 1521 delegate.close(); 1522 } 1523 1524 @Override 1525 public RegionInfo getRegionInfo() { 1526 return delegate.getRegionInfo(); 1527 } 1528 1529 @Override 1530 public boolean isFilterDone() throws IOException { 1531 return delegate.isFilterDone(); 1532 } 1533 1534 @Override 1535 public boolean reseek(byte[] row) throws IOException { 1536 return false; 1537 } 1538 1539 @Override 1540 public long getMaxResultSize() { 1541 return delegate.getMaxResultSize(); 1542 } 1543 1544 @Override 1545 public long getMvccReadPoint() { 1546 return delegate.getMvccReadPoint(); 1547 } 1548 1549 @Override 1550 public int getBatch() { 1551 return delegate.getBatch(); 1552 } 1553 } 1554 1555 public static class CustomInnerRegionObserverWrapper extends CustomInnerRegionObserver { 1556 @Override 1557 public RegionScanner postScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e, 1558 Scan scan, RegionScanner s) throws IOException { 1559 return new CustomScanner(s); 1560 } 1561 } 1562 1563 public static class CustomInnerRegionObserver implements RegionCoprocessor, RegionObserver { 1564 static final AtomicInteger countOfNext = new AtomicInteger(0); 1565 static final AtomicInteger countOfGets = new AtomicInteger(0); 1566 static final AtomicBoolean waitForGets = new AtomicBoolean(false); 1567 static final AtomicBoolean throwException = new AtomicBoolean(false); 1568 private static final AtomicReference<CountDownLatch> cdl = new AtomicReference<>( 1569 new CountDownLatch(0)); 1570 1571 @Override 1572 public Optional<RegionObserver> getRegionObserver() { 1573 return Optional.of(this); 1574 } 1575 1576 @Override 1577 public boolean postScannerNext(ObserverContext<RegionCoprocessorEnvironment> e, 1578 InternalScanner s, List<Result> results, int limit, boolean hasMore) throws IOException { 1579 slowdownCode(e, false); 1580 if (getLatch != null && getLatch.getCount() > 0) { 1581 try { 1582 getLatch.await(); 1583 } catch (InterruptedException e1) { 1584 } 1585 } 1586 return hasMore; 1587 } 1588 1589 @Override 1590 public void postGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get, 1591 List<Cell> results) throws IOException { 1592 slowdownCode(e, true); 1593 } 1594 1595 public static AtomicReference<CountDownLatch> getCdl() { 1596 return cdl; 1597 } 1598 1599 private void slowdownCode(final ObserverContext<RegionCoprocessorEnvironment> e, 1600 boolean isGet) { 1601 CountDownLatch latch = getCdl().get(); 1602 try { 1603 System.out.println(latch.getCount() + " is the count " + isGet); 1604 if (latch.getCount() > 0) { 1605 if (isGet) { 1606 countOfGets.incrementAndGet(); 1607 } else { 1608 countOfNext.incrementAndGet(); 1609 } 1610 LOG.info("Waiting for the counterCountDownLatch"); 1611 latch.await(2, TimeUnit.MINUTES); // To help the tests to finish. 1612 if (latch.getCount() > 0) { 1613 throw new RuntimeException("Can't wait more"); 1614 } 1615 } 1616 } catch (InterruptedException e1) { 1617 LOG.error(e1.toString(), e1); 1618 } 1619 } 1620 } 1621}