001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.Iterator; 027import java.util.List; 028import java.util.Optional; 029import java.util.concurrent.CountDownLatch; 030import java.util.concurrent.TimeUnit; 031import java.util.concurrent.atomic.AtomicBoolean; 032import java.util.concurrent.atomic.AtomicInteger; 033import java.util.concurrent.atomic.AtomicReference; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.hbase.Cell; 036import org.apache.hadoop.hbase.HBaseClassTestRule; 037import org.apache.hadoop.hbase.HBaseTestingUtility; 038import org.apache.hadoop.hbase.HConstants; 039import org.apache.hadoop.hbase.HTableDescriptor; 040import org.apache.hadoop.hbase.ServerName; 041import org.apache.hadoop.hbase.TableName; 042import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 043import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; 044import org.apache.hadoop.hbase.coprocessor.ObserverContext; 045import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 046import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 047import org.apache.hadoop.hbase.coprocessor.RegionObserver; 048import org.apache.hadoop.hbase.io.hfile.BlockCache; 049import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 050import org.apache.hadoop.hbase.io.hfile.CacheConfig; 051import org.apache.hadoop.hbase.io.hfile.CachedBlock; 052import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache; 053import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; 054import org.apache.hadoop.hbase.regionserver.BloomType; 055import org.apache.hadoop.hbase.regionserver.HRegion; 056import org.apache.hadoop.hbase.regionserver.HStore; 057import org.apache.hadoop.hbase.regionserver.InternalScanner; 058import org.apache.hadoop.hbase.regionserver.RegionScanner; 059import org.apache.hadoop.hbase.regionserver.ScannerContext; 060import org.apache.hadoop.hbase.testclassification.ClientTests; 061import org.apache.hadoop.hbase.testclassification.LargeTests; 062import org.apache.hadoop.hbase.util.Bytes; 063import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 064import org.junit.After; 065import org.junit.AfterClass; 066import org.junit.Before; 067import org.junit.BeforeClass; 068import org.junit.ClassRule; 069import org.junit.Rule; 070import org.junit.Test; 071import org.junit.experimental.categories.Category; 072import org.junit.rules.TestName; 073import org.slf4j.Logger; 074import org.slf4j.LoggerFactory; 075 076import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 077 078@Category({ LargeTests.class, ClientTests.class }) 079@SuppressWarnings("deprecation") 080public class TestBlockEvictionFromClient { 081 082 @ClassRule 083 public static final HBaseClassTestRule CLASS_RULE = 084 HBaseClassTestRule.forClass(TestBlockEvictionFromClient.class); 085 086 private static final Logger LOG = LoggerFactory.getLogger(TestBlockEvictionFromClient.class); 087 protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 088 static byte[][] ROWS = new byte[2][]; 089 private static int NO_OF_THREADS = 3; 090 private static byte[] ROW = Bytes.toBytes("testRow"); 091 private static byte[] ROW1 = Bytes.toBytes("testRow1"); 092 private static byte[] ROW2 = Bytes.toBytes("testRow2"); 093 private static byte[] ROW3 = Bytes.toBytes("testRow3"); 094 private static byte[] FAMILY = Bytes.toBytes("testFamily"); 095 private static byte[][] FAMILIES_1 = new byte[1][0]; 096 private static byte[] QUALIFIER = Bytes.toBytes("testQualifier"); 097 private static byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 098 private static byte[] data = new byte[1000]; 099 private static byte[] data2 = Bytes.add(data, data); 100 protected static int SLAVES = 1; 101 private static CountDownLatch latch; 102 private static CountDownLatch getLatch; 103 private static CountDownLatch compactionLatch; 104 private static CountDownLatch exceptionLatch; 105 106 @Rule 107 public TestName name = new TestName(); 108 109 @BeforeClass 110 public static void setUpBeforeClass() throws Exception { 111 ROWS[0] = ROW; 112 ROWS[1] = ROW1; 113 Configuration conf = TEST_UTIL.getConfiguration(); 114 conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, 115 MultiRowMutationEndpoint.class.getName()); 116 conf.setInt("hbase.regionserver.handler.count", 20); 117 conf.setInt("hbase.bucketcache.size", 400); 118 conf.setStrings(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap"); 119 conf.setFloat("hfile.block.cache.size", 0.2f); 120 conf.setFloat("hbase.regionserver.global.memstore.size", 0.1f); 121 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);// do not retry 122 conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 5000); 123 FAMILIES_1[0] = FAMILY; 124 TEST_UTIL.startMiniCluster(SLAVES); 125 } 126 127 @AfterClass 128 public static void tearDownAfterClass() throws Exception { 129 TEST_UTIL.shutdownMiniCluster(); 130 } 131 132 @Before 133 public void setUp() throws Exception { 134 CustomInnerRegionObserver.waitForGets.set(false); 135 CustomInnerRegionObserver.countOfNext.set(0); 136 CustomInnerRegionObserver.countOfGets.set(0); 137 } 138 139 @After 140 public void tearDown() throws Exception { 141 if (latch != null) { 142 while (latch.getCount() > 0) { 143 latch.countDown(); 144 } 145 } 146 if (getLatch != null) { 147 getLatch.countDown(); 148 } 149 if (compactionLatch != null) { 150 compactionLatch.countDown(); 151 } 152 if (exceptionLatch != null) { 153 exceptionLatch.countDown(); 154 } 155 latch = null; 156 getLatch = null; 157 compactionLatch = null; 158 exceptionLatch = null; 159 CustomInnerRegionObserver.throwException.set(false); 160 // Clean up the tables for every test case 161 TableName[] listTableNames = TEST_UTIL.getAdmin().listTableNames(); 162 for (TableName tableName : listTableNames) { 163 if (!tableName.isSystemTable()) { 164 TEST_UTIL.getAdmin().disableTable(tableName); 165 TEST_UTIL.getAdmin().deleteTable(tableName); 166 } 167 } 168 } 169 170 @Test 171 public void testBlockEvictionWithParallelScans() throws Exception { 172 Table table = null; 173 try { 174 latch = new CountDownLatch(1); 175 final TableName tableName = TableName.valueOf(name.getMethodName()); 176 // Create a table with block size as 1024 177 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 178 CustomInnerRegionObserver.class.getName()); 179 // get the block cache and region 180 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 181 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); 182 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 183 HStore store = region.getStores().iterator().next(); 184 CacheConfig cacheConf = store.getCacheConfig(); 185 cacheConf.setCacheDataOnWrite(true); 186 cacheConf.setEvictOnClose(true); 187 BlockCache cache = cacheConf.getBlockCache().get(); 188 189 // insert data. 2 Rows are added 190 Put put = new Put(ROW); 191 put.addColumn(FAMILY, QUALIFIER, data); 192 table.put(put); 193 put = new Put(ROW1); 194 put.addColumn(FAMILY, QUALIFIER, data); 195 table.put(put); 196 assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data)); 197 // data was in memstore so don't expect any changes 198 // flush the data 199 // Should create one Hfile with 2 blocks 200 region.flush(true); 201 // Load cache 202 // Create three sets of scan 203 ScanThread[] scanThreads = initiateScan(table, false); 204 Thread.sleep(100); 205 checkForBlockEviction(cache, false, false); 206 for (ScanThread thread : scanThreads) { 207 thread.join(); 208 } 209 // CustomInnerRegionObserver.sleepTime.set(0); 210 Iterator<CachedBlock> iterator = cache.iterator(); 211 iterateBlockCache(cache, iterator); 212 // read the data and expect same blocks, one new hit, no misses 213 assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data)); 214 iterator = cache.iterator(); 215 iterateBlockCache(cache, iterator); 216 // Check how this miss is happening 217 // insert a second column, read the row, no new blocks, 3 new hits 218 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 219 byte[] data2 = Bytes.add(data, data); 220 put = new Put(ROW); 221 put.addColumn(FAMILY, QUALIFIER2, data2); 222 table.put(put); 223 Result r = table.get(new Get(ROW)); 224 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data)); 225 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2)); 226 iterator = cache.iterator(); 227 iterateBlockCache(cache, iterator); 228 // flush, one new block 229 System.out.println("Flushing cache"); 230 region.flush(true); 231 iterator = cache.iterator(); 232 iterateBlockCache(cache, iterator); 233 // compact, net minus two blocks, two hits, no misses 234 System.out.println("Compacting"); 235 assertEquals(2, store.getStorefilesCount()); 236 store.triggerMajorCompaction(); 237 region.compact(true); 238 waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max 239 assertEquals(1, store.getStorefilesCount()); 240 iterator = cache.iterator(); 241 iterateBlockCache(cache, iterator); 242 // read the row, this should be a cache miss because we don't cache data 243 // blocks on compaction 244 r = table.get(new Get(ROW)); 245 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data)); 246 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2)); 247 iterator = cache.iterator(); 248 iterateBlockCache(cache, iterator); 249 } finally { 250 if (table != null) { 251 table.close(); 252 } 253 } 254 } 255 256 @Test 257 public void testParallelGetsAndScans() throws IOException, InterruptedException { 258 Table table = null; 259 try { 260 latch = new CountDownLatch(2); 261 // Check if get() returns blocks on its close() itself 262 getLatch = new CountDownLatch(1); 263 final TableName tableName = TableName.valueOf(name.getMethodName()); 264 // Create KV that will give you two blocks 265 // Create a table with block size as 1024 266 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 267 CustomInnerRegionObserver.class.getName()); 268 // get the block cache and region 269 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 270 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); 271 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 272 HStore store = region.getStores().iterator().next(); 273 CacheConfig cacheConf = store.getCacheConfig(); 274 cacheConf.setCacheDataOnWrite(true); 275 cacheConf.setEvictOnClose(true); 276 BlockCache cache = cacheConf.getBlockCache().get(); 277 278 insertData(table); 279 // flush the data 280 System.out.println("Flushing cache"); 281 // Should create one Hfile with 2 blocks 282 region.flush(true); 283 // Create three sets of scan 284 CustomInnerRegionObserver.waitForGets.set(true); 285 ScanThread[] scanThreads = initiateScan(table, false); 286 // Create three sets of gets 287 GetThread[] getThreads = initiateGet(table, false, false); 288 checkForBlockEviction(cache, false, false); 289 CustomInnerRegionObserver.waitForGets.set(false); 290 checkForBlockEviction(cache, false, false); 291 for (GetThread thread : getThreads) { 292 thread.join(); 293 } 294 // Verify whether the gets have returned the blocks that it had 295 CustomInnerRegionObserver.waitForGets.set(true); 296 // giving some time for the block to be decremented 297 checkForBlockEviction(cache, true, false); 298 getLatch.countDown(); 299 for (ScanThread thread : scanThreads) { 300 thread.join(); 301 } 302 System.out.println("Scans should have returned the bloks"); 303 // Check with either true or false 304 CustomInnerRegionObserver.waitForGets.set(false); 305 // The scan should also have released the blocks by now 306 checkForBlockEviction(cache, true, true); 307 } finally { 308 if (table != null) { 309 table.close(); 310 } 311 } 312 } 313 314 @Test 315 public void testGetWithCellsInDifferentFiles() throws IOException, InterruptedException { 316 Table table = null; 317 try { 318 latch = new CountDownLatch(1); 319 // Check if get() returns blocks on its close() itself 320 getLatch = new CountDownLatch(1); 321 final TableName tableName = TableName.valueOf(name.getMethodName()); 322 // Create KV that will give you two blocks 323 // Create a table with block size as 1024 324 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 325 CustomInnerRegionObserver.class.getName()); 326 // get the block cache and region 327 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 328 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); 329 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 330 HStore store = region.getStores().iterator().next(); 331 CacheConfig cacheConf = store.getCacheConfig(); 332 cacheConf.setCacheDataOnWrite(true); 333 cacheConf.setEvictOnClose(true); 334 BlockCache cache = cacheConf.getBlockCache().get(); 335 336 Put put = new Put(ROW); 337 put.addColumn(FAMILY, QUALIFIER, data); 338 table.put(put); 339 region.flush(true); 340 put = new Put(ROW1); 341 put.addColumn(FAMILY, QUALIFIER, data); 342 table.put(put); 343 region.flush(true); 344 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 345 put = new Put(ROW); 346 put.addColumn(FAMILY, QUALIFIER2, data2); 347 table.put(put); 348 region.flush(true); 349 // flush the data 350 System.out.println("Flushing cache"); 351 // Should create one Hfile with 2 blocks 352 CustomInnerRegionObserver.waitForGets.set(true); 353 // Create three sets of gets 354 GetThread[] getThreads = initiateGet(table, false, false); 355 Thread.sleep(200); 356 CustomInnerRegionObserver.getCdl().get().countDown(); 357 for (GetThread thread : getThreads) { 358 thread.join(); 359 } 360 // Verify whether the gets have returned the blocks that it had 361 CustomInnerRegionObserver.waitForGets.set(true); 362 // giving some time for the block to be decremented 363 checkForBlockEviction(cache, true, false); 364 getLatch.countDown(); 365 System.out.println("Gets should have returned the bloks"); 366 } finally { 367 if (table != null) { 368 table.close(); 369 } 370 } 371 } 372 373 @Test 374 // TODO : check how block index works here 375 public void testGetsWithMultiColumnsAndExplicitTracker() 376 throws IOException, InterruptedException { 377 Table table = null; 378 try { 379 latch = new CountDownLatch(1); 380 // Check if get() returns blocks on its close() itself 381 getLatch = new CountDownLatch(1); 382 final TableName tableName = TableName.valueOf(name.getMethodName()); 383 // Create KV that will give you two blocks 384 // Create a table with block size as 1024 385 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 386 CustomInnerRegionObserver.class.getName()); 387 // get the block cache and region 388 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 389 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); 390 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 391 BlockCache cache = setCacheProperties(region); 392 Put put = new Put(ROW); 393 put.addColumn(FAMILY, QUALIFIER, data); 394 table.put(put); 395 region.flush(true); 396 put = new Put(ROW1); 397 put.addColumn(FAMILY, QUALIFIER, data); 398 table.put(put); 399 region.flush(true); 400 for (int i = 1; i < 10; i++) { 401 put = new Put(ROW); 402 put.addColumn(FAMILY, Bytes.toBytes("testQualifier" + i), data2); 403 table.put(put); 404 if (i % 2 == 0) { 405 region.flush(true); 406 } 407 } 408 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 409 put = new Put(ROW); 410 put.addColumn(FAMILY, QUALIFIER2, data2); 411 table.put(put); 412 region.flush(true); 413 // flush the data 414 System.out.println("Flushing cache"); 415 // Should create one Hfile with 2 blocks 416 CustomInnerRegionObserver.waitForGets.set(true); 417 // Create three sets of gets 418 GetThread[] getThreads = initiateGet(table, true, false); 419 Thread.sleep(200); 420 Iterator<CachedBlock> iterator = cache.iterator(); 421 boolean usedBlocksFound = false; 422 int refCount = 0; 423 int noOfBlocksWithRef = 0; 424 while (iterator.hasNext()) { 425 CachedBlock next = iterator.next(); 426 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 427 if (cache instanceof BucketCache) { 428 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 429 } else if (cache instanceof CombinedBlockCache) { 430 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 431 } else { 432 continue; 433 } 434 if (refCount != 0) { 435 // Blocks will be with count 3 436 System.out.println("The refCount is " + refCount); 437 assertEquals(NO_OF_THREADS, refCount); 438 usedBlocksFound = true; 439 noOfBlocksWithRef++; 440 } 441 } 442 assertTrue(usedBlocksFound); 443 // the number of blocks referred 444 assertEquals(10, noOfBlocksWithRef); 445 CustomInnerRegionObserver.getCdl().get().countDown(); 446 for (GetThread thread : getThreads) { 447 thread.join(); 448 } 449 // Verify whether the gets have returned the blocks that it had 450 CustomInnerRegionObserver.waitForGets.set(true); 451 // giving some time for the block to be decremented 452 checkForBlockEviction(cache, true, false); 453 getLatch.countDown(); 454 System.out.println("Gets should have returned the bloks"); 455 } finally { 456 if (table != null) { 457 table.close(); 458 } 459 } 460 } 461 462 @Test 463 public void testGetWithMultipleColumnFamilies() throws IOException, InterruptedException { 464 Table table = null; 465 try { 466 latch = new CountDownLatch(1); 467 // Check if get() returns blocks on its close() itself 468 getLatch = new CountDownLatch(1); 469 final TableName tableName = TableName.valueOf(name.getMethodName()); 470 // Create KV that will give you two blocks 471 // Create a table with block size as 1024 472 byte[][] fams = new byte[10][]; 473 fams[0] = FAMILY; 474 for (int i = 1; i < 10; i++) { 475 fams[i] = (Bytes.toBytes("testFamily" + i)); 476 } 477 table = 478 TEST_UTIL.createTable(tableName, fams, 1, 1024, CustomInnerRegionObserver.class.getName()); 479 // get the block cache and region 480 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 481 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); 482 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 483 BlockCache cache = setCacheProperties(region); 484 485 Put put = new Put(ROW); 486 put.addColumn(FAMILY, QUALIFIER, data); 487 table.put(put); 488 region.flush(true); 489 put = new Put(ROW1); 490 put.addColumn(FAMILY, QUALIFIER, data); 491 table.put(put); 492 region.flush(true); 493 for (int i = 1; i < 10; i++) { 494 put = new Put(ROW); 495 put.addColumn(Bytes.toBytes("testFamily" + i), Bytes.toBytes("testQualifier" + i), data2); 496 table.put(put); 497 if (i % 2 == 0) { 498 region.flush(true); 499 } 500 } 501 region.flush(true); 502 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 503 put = new Put(ROW); 504 put.addColumn(FAMILY, QUALIFIER2, data2); 505 table.put(put); 506 region.flush(true); 507 // flush the data 508 System.out.println("Flushing cache"); 509 // Should create one Hfile with 2 blocks 510 CustomInnerRegionObserver.waitForGets.set(true); 511 // Create three sets of gets 512 GetThread[] getThreads = initiateGet(table, true, true); 513 Thread.sleep(200); 514 Iterator<CachedBlock> iterator = cache.iterator(); 515 boolean usedBlocksFound = false; 516 int refCount = 0; 517 int noOfBlocksWithRef = 0; 518 while (iterator.hasNext()) { 519 CachedBlock next = iterator.next(); 520 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 521 if (cache instanceof BucketCache) { 522 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 523 } else if (cache instanceof CombinedBlockCache) { 524 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 525 } else { 526 continue; 527 } 528 if (refCount != 0) { 529 // Blocks will be with count 3 530 System.out.println("The refCount is " + refCount); 531 assertEquals(NO_OF_THREADS, refCount); 532 usedBlocksFound = true; 533 noOfBlocksWithRef++; 534 } 535 } 536 assertTrue(usedBlocksFound); 537 // the number of blocks referred 538 assertEquals(3, noOfBlocksWithRef); 539 CustomInnerRegionObserver.getCdl().get().countDown(); 540 for (GetThread thread : getThreads) { 541 thread.join(); 542 } 543 // Verify whether the gets have returned the blocks that it had 544 CustomInnerRegionObserver.waitForGets.set(true); 545 // giving some time for the block to be decremented 546 checkForBlockEviction(cache, true, false); 547 getLatch.countDown(); 548 System.out.println("Gets should have returned the bloks"); 549 } finally { 550 if (table != null) { 551 table.close(); 552 } 553 } 554 } 555 556 @Test 557 public void testBlockRefCountAfterSplits() throws IOException, InterruptedException { 558 Table table = null; 559 try { 560 final TableName tableName = TableName.valueOf(name.getMethodName()); 561 HTableDescriptor desc = TEST_UTIL.createTableDescriptor(tableName); 562 // This test expects rpc refcount of cached data blocks to be 0 after split. After split, 563 // two daughter regions are opened and a compaction is scheduled to get rid of reference 564 // of the parent region hfiles. Compaction will increase refcount of cached data blocks by 1. 565 // It is flakey since compaction can kick in anytime. To solve this issue, table is created 566 // with compaction disabled. 567 desc.setCompactionEnabled(false); 568 table = TEST_UTIL.createTable(desc, FAMILIES_1, null, BloomType.ROW, 1024, null); 569 // get the block cache and region 570 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 571 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 572 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 573 HStore store = region.getStores().iterator().next(); 574 CacheConfig cacheConf = store.getCacheConfig(); 575 cacheConf.setEvictOnClose(true); 576 BlockCache cache = cacheConf.getBlockCache().get(); 577 Put put = new Put(ROW); 578 put.addColumn(FAMILY, QUALIFIER, data); 579 table.put(put); 580 region.flush(true); 581 put = new Put(ROW1); 582 put.addColumn(FAMILY, QUALIFIER, data); 583 table.put(put); 584 region.flush(true); 585 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 586 put = new Put(ROW2); 587 put.addColumn(FAMILY, QUALIFIER2, data2); 588 table.put(put); 589 put = new Put(ROW3); 590 put.addColumn(FAMILY, QUALIFIER2, data2); 591 table.put(put); 592 region.flush(true); 593 ServerName rs = Iterables.getOnlyElement(TEST_UTIL.getAdmin().getRegionServers()); 594 int regionCount = TEST_UTIL.getAdmin().getRegions(rs).size(); 595 LOG.info("About to SPLIT on {} {}, count={}", Bytes.toString(ROW1), region.getRegionInfo(), 596 regionCount); 597 TEST_UTIL.getAdmin().split(tableName, ROW1); 598 // Wait for splits 599 TEST_UTIL.waitFor(60000, () -> TEST_UTIL.getAdmin().getRegions(rs).size() > regionCount); 600 List<HRegion> regions = TEST_UTIL.getMiniHBaseCluster().getRegionServer(rs).getRegions(); 601 for (HRegion r : regions) { 602 LOG.info("" + r.getCompactionState()); 603 TEST_UTIL.waitFor(30000, () -> r.getCompactionState().equals(CompactionState.NONE)); 604 } 605 LOG.info("Split finished, is region closed {} {}", region.isClosed(), cache); 606 Iterator<CachedBlock> iterator = cache.iterator(); 607 // Though the split had created the HalfStorefileReader - the firstkey and lastkey scanners 608 // should be closed inorder to return those blocks 609 iterateBlockCache(cache, iterator); 610 } finally { 611 if (table != null) { 612 table.close(); 613 } 614 } 615 } 616 617 @Test 618 public void testMultiGets() throws IOException, InterruptedException { 619 Table table = null; 620 try { 621 latch = new CountDownLatch(2); 622 // Check if get() returns blocks on its close() itself 623 getLatch = new CountDownLatch(1); 624 final TableName tableName = TableName.valueOf(name.getMethodName()); 625 // Create KV that will give you two blocks 626 // Create a table with block size as 1024 627 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 628 CustomInnerRegionObserver.class.getName()); 629 // get the block cache and region 630 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 631 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); 632 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 633 HStore store = region.getStores().iterator().next(); 634 CacheConfig cacheConf = store.getCacheConfig(); 635 cacheConf.setCacheDataOnWrite(true); 636 cacheConf.setEvictOnClose(true); 637 BlockCache cache = cacheConf.getBlockCache().get(); 638 639 Put put = new Put(ROW); 640 put.addColumn(FAMILY, QUALIFIER, data); 641 table.put(put); 642 region.flush(true); 643 put = new Put(ROW1); 644 put.addColumn(FAMILY, QUALIFIER, data); 645 table.put(put); 646 region.flush(true); 647 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 648 put = new Put(ROW); 649 put.addColumn(FAMILY, QUALIFIER2, data2); 650 table.put(put); 651 region.flush(true); 652 // flush the data 653 System.out.println("Flushing cache"); 654 // Should create one Hfile with 2 blocks 655 CustomInnerRegionObserver.waitForGets.set(true); 656 // Create three sets of gets 657 MultiGetThread[] getThreads = initiateMultiGet(table); 658 Thread.sleep(200); 659 int refCount; 660 Iterator<CachedBlock> iterator = cache.iterator(); 661 boolean foundNonZeroBlock = false; 662 while (iterator.hasNext()) { 663 CachedBlock next = iterator.next(); 664 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 665 if (cache instanceof BucketCache) { 666 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 667 } else if (cache instanceof CombinedBlockCache) { 668 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 669 } else { 670 continue; 671 } 672 if (refCount != 0) { 673 assertEquals(NO_OF_THREADS, refCount); 674 foundNonZeroBlock = true; 675 } 676 } 677 assertTrue("Should have found nonzero ref count block", foundNonZeroBlock); 678 CustomInnerRegionObserver.getCdl().get().countDown(); 679 CustomInnerRegionObserver.getCdl().get().countDown(); 680 for (MultiGetThread thread : getThreads) { 681 thread.join(); 682 } 683 // Verify whether the gets have returned the blocks that it had 684 CustomInnerRegionObserver.waitForGets.set(true); 685 // giving some time for the block to be decremented 686 iterateBlockCache(cache, iterator); 687 getLatch.countDown(); 688 System.out.println("Gets should have returned the bloks"); 689 } finally { 690 if (table != null) { 691 table.close(); 692 } 693 } 694 } 695 696 @Test 697 public void testScanWithMultipleColumnFamilies() throws IOException, InterruptedException { 698 Table table = null; 699 try { 700 latch = new CountDownLatch(1); 701 // Check if get() returns blocks on its close() itself 702 final TableName tableName = TableName.valueOf(name.getMethodName()); 703 // Create KV that will give you two blocks 704 // Create a table with block size as 1024 705 byte[][] fams = new byte[10][]; 706 fams[0] = FAMILY; 707 for (int i = 1; i < 10; i++) { 708 fams[i] = (Bytes.toBytes("testFamily" + i)); 709 } 710 table = 711 TEST_UTIL.createTable(tableName, fams, 1, 1024, CustomInnerRegionObserver.class.getName()); 712 // get the block cache and region 713 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 714 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); 715 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 716 BlockCache cache = setCacheProperties(region); 717 718 Put put = new Put(ROW); 719 put.addColumn(FAMILY, QUALIFIER, data); 720 table.put(put); 721 region.flush(true); 722 put = new Put(ROW1); 723 put.addColumn(FAMILY, QUALIFIER, data); 724 table.put(put); 725 region.flush(true); 726 for (int i = 1; i < 10; i++) { 727 put = new Put(ROW); 728 put.addColumn(Bytes.toBytes("testFamily" + i), Bytes.toBytes("testQualifier" + i), data2); 729 table.put(put); 730 if (i % 2 == 0) { 731 region.flush(true); 732 } 733 } 734 region.flush(true); 735 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 736 put = new Put(ROW); 737 put.addColumn(FAMILY, QUALIFIER2, data2); 738 table.put(put); 739 region.flush(true); 740 // flush the data 741 System.out.println("Flushing cache"); 742 // Should create one Hfile with 2 blocks 743 // Create three sets of gets 744 ScanThread[] scanThreads = initiateScan(table, true); 745 Thread.sleep(200); 746 Iterator<CachedBlock> iterator = cache.iterator(); 747 boolean usedBlocksFound = false; 748 int refCount = 0; 749 int noOfBlocksWithRef = 0; 750 while (iterator.hasNext()) { 751 CachedBlock next = iterator.next(); 752 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 753 if (cache instanceof BucketCache) { 754 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 755 } else if (cache instanceof CombinedBlockCache) { 756 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 757 } else { 758 continue; 759 } 760 if (refCount != 0) { 761 // Blocks will be with count 3 762 System.out.println("The refCount is " + refCount); 763 assertEquals(NO_OF_THREADS, refCount); 764 usedBlocksFound = true; 765 noOfBlocksWithRef++; 766 } 767 } 768 assertTrue(usedBlocksFound); 769 // the number of blocks referred 770 assertEquals(12, noOfBlocksWithRef); 771 CustomInnerRegionObserver.getCdl().get().countDown(); 772 for (ScanThread thread : scanThreads) { 773 thread.join(); 774 } 775 // giving some time for the block to be decremented 776 checkForBlockEviction(cache, true, false); 777 } finally { 778 if (table != null) { 779 table.close(); 780 } 781 } 782 } 783 784 private BlockCache setCacheProperties(HRegion region) { 785 Iterator<HStore> strItr = region.getStores().iterator(); 786 BlockCache cache = null; 787 while (strItr.hasNext()) { 788 HStore store = strItr.next(); 789 CacheConfig cacheConf = store.getCacheConfig(); 790 cacheConf.setCacheDataOnWrite(true); 791 cacheConf.setEvictOnClose(true); 792 // Use the last one 793 cache = cacheConf.getBlockCache().get(); 794 } 795 return cache; 796 } 797 798 @Test 799 public void testParallelGetsAndScanWithWrappedRegionScanner() 800 throws IOException, InterruptedException { 801 Table table = null; 802 try { 803 latch = new CountDownLatch(2); 804 // Check if get() returns blocks on its close() itself 805 getLatch = new CountDownLatch(1); 806 final TableName tableName = TableName.valueOf(name.getMethodName()); 807 // Create KV that will give you two blocks 808 // Create a table with block size as 1024 809 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 810 CustomInnerRegionObserverWrapper.class.getName()); 811 // get the block cache and region 812 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 813 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); 814 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 815 HStore store = region.getStores().iterator().next(); 816 CacheConfig cacheConf = store.getCacheConfig(); 817 cacheConf.setCacheDataOnWrite(true); 818 cacheConf.setEvictOnClose(true); 819 BlockCache cache = cacheConf.getBlockCache().get(); 820 821 // insert data. 2 Rows are added 822 insertData(table); 823 // flush the data 824 System.out.println("Flushing cache"); 825 // Should create one Hfile with 2 blocks 826 region.flush(true); 827 // CustomInnerRegionObserver.sleepTime.set(5000); 828 // Create three sets of scan 829 CustomInnerRegionObserver.waitForGets.set(true); 830 ScanThread[] scanThreads = initiateScan(table, false); 831 // Create three sets of gets 832 GetThread[] getThreads = initiateGet(table, false, false); 833 // The block would have been decremented for the scan case as it was 834 // wrapped 835 // before even the postNext hook gets executed. 836 // giving some time for the block to be decremented 837 Thread.sleep(100); 838 CustomInnerRegionObserver.waitForGets.set(false); 839 checkForBlockEviction(cache, false, false); 840 // countdown the latch 841 CustomInnerRegionObserver.getCdl().get().countDown(); 842 for (GetThread thread : getThreads) { 843 thread.join(); 844 } 845 getLatch.countDown(); 846 for (ScanThread thread : scanThreads) { 847 thread.join(); 848 } 849 } finally { 850 if (table != null) { 851 table.close(); 852 } 853 } 854 } 855 856 @Test 857 public void testScanWithCompaction() throws IOException, InterruptedException { 858 testScanWithCompactionInternals(name.getMethodName(), false); 859 } 860 861 @Test 862 public void testReverseScanWithCompaction() throws IOException, InterruptedException { 863 testScanWithCompactionInternals(name.getMethodName(), true); 864 } 865 866 private void testScanWithCompactionInternals(String tableNameStr, boolean reversed) 867 throws IOException, InterruptedException { 868 Table table = null; 869 try { 870 latch = new CountDownLatch(1); 871 compactionLatch = new CountDownLatch(1); 872 TableName tableName = TableName.valueOf(tableNameStr); 873 // Create a table with block size as 1024 874 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 875 CustomInnerRegionObserverWrapper.class.getName()); 876 // get the block cache and region 877 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 878 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); 879 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 880 HStore store = region.getStores().iterator().next(); 881 CacheConfig cacheConf = store.getCacheConfig(); 882 cacheConf.setCacheDataOnWrite(true); 883 cacheConf.setEvictOnClose(true); 884 BlockCache cache = cacheConf.getBlockCache().get(); 885 886 // insert data. 2 Rows are added 887 Put put = new Put(ROW); 888 put.addColumn(FAMILY, QUALIFIER, data); 889 table.put(put); 890 put = new Put(ROW1); 891 put.addColumn(FAMILY, QUALIFIER, data); 892 table.put(put); 893 assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data)); 894 // Should create one Hfile with 2 blocks 895 region.flush(true); 896 // read the data and expect same blocks, one new hit, no misses 897 int refCount = 0; 898 // Check how this miss is happening 899 // insert a second column, read the row, no new blocks, 3 new hits 900 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 901 byte[] data2 = Bytes.add(data, data); 902 put = new Put(ROW); 903 put.addColumn(FAMILY, QUALIFIER2, data2); 904 table.put(put); 905 // flush, one new block 906 System.out.println("Flushing cache"); 907 region.flush(true); 908 Iterator<CachedBlock> iterator = cache.iterator(); 909 iterateBlockCache(cache, iterator); 910 // Create three sets of scan 911 ScanThread[] scanThreads = initiateScan(table, reversed); 912 Thread.sleep(100); 913 iterator = cache.iterator(); 914 boolean usedBlocksFound = false; 915 while (iterator.hasNext()) { 916 CachedBlock next = iterator.next(); 917 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 918 if (cache instanceof BucketCache) { 919 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 920 } else if (cache instanceof CombinedBlockCache) { 921 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 922 } else { 923 continue; 924 } 925 if (refCount != 0) { 926 // Blocks will be with count 3 927 assertEquals(NO_OF_THREADS, refCount); 928 usedBlocksFound = true; 929 } 930 } 931 assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound); 932 usedBlocksFound = false; 933 System.out.println("Compacting"); 934 assertEquals(2, store.getStorefilesCount()); 935 store.triggerMajorCompaction(); 936 region.compact(true); 937 waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max 938 assertEquals(1, store.getStorefilesCount()); 939 // Even after compaction is done we will have some blocks that cannot 940 // be evicted this is because the scan is still referencing them 941 iterator = cache.iterator(); 942 while (iterator.hasNext()) { 943 CachedBlock next = iterator.next(); 944 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 945 if (cache instanceof BucketCache) { 946 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 947 } else if (cache instanceof CombinedBlockCache) { 948 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 949 } else { 950 continue; 951 } 952 if (refCount != 0) { 953 // Blocks will be with count 3 as they are not yet cleared 954 assertEquals(NO_OF_THREADS, refCount); 955 usedBlocksFound = true; 956 } 957 } 958 assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound); 959 // Should not throw exception 960 compactionLatch.countDown(); 961 latch.countDown(); 962 for (ScanThread thread : scanThreads) { 963 thread.join(); 964 } 965 // by this time all blocks should have been evicted 966 iterator = cache.iterator(); 967 iterateBlockCache(cache, iterator); 968 Result r = table.get(new Get(ROW)); 969 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data)); 970 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2)); 971 // The gets would be working on new blocks 972 iterator = cache.iterator(); 973 iterateBlockCache(cache, iterator); 974 } finally { 975 if (table != null) { 976 table.close(); 977 } 978 } 979 } 980 981 @Test 982 public void testBlockEvictionAfterHBASE13082WithCompactionAndFlush() 983 throws IOException, InterruptedException { 984 // do flush and scan in parallel 985 Table table = null; 986 try { 987 latch = new CountDownLatch(1); 988 compactionLatch = new CountDownLatch(1); 989 final TableName tableName = TableName.valueOf(name.getMethodName()); 990 // Create a table with block size as 1024 991 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 992 CustomInnerRegionObserverWrapper.class.getName()); 993 // get the block cache and region 994 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 995 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); 996 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 997 HStore store = region.getStores().iterator().next(); 998 CacheConfig cacheConf = store.getCacheConfig(); 999 cacheConf.setCacheDataOnWrite(true); 1000 cacheConf.setEvictOnClose(true); 1001 BlockCache cache = cacheConf.getBlockCache().get(); 1002 1003 // insert data. 2 Rows are added 1004 Put put = new Put(ROW); 1005 put.addColumn(FAMILY, QUALIFIER, data); 1006 table.put(put); 1007 put = new Put(ROW1); 1008 put.addColumn(FAMILY, QUALIFIER, data); 1009 table.put(put); 1010 assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data)); 1011 // Should create one Hfile with 2 blocks 1012 region.flush(true); 1013 // read the data and expect same blocks, one new hit, no misses 1014 int refCount = 0; 1015 // Check how this miss is happening 1016 // insert a second column, read the row, no new blocks, 3 new hits 1017 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 1018 byte[] data2 = Bytes.add(data, data); 1019 put = new Put(ROW); 1020 put.addColumn(FAMILY, QUALIFIER2, data2); 1021 table.put(put); 1022 // flush, one new block 1023 System.out.println("Flushing cache"); 1024 region.flush(true); 1025 Iterator<CachedBlock> iterator = cache.iterator(); 1026 iterateBlockCache(cache, iterator); 1027 // Create three sets of scan 1028 ScanThread[] scanThreads = initiateScan(table, false); 1029 Thread.sleep(100); 1030 iterator = cache.iterator(); 1031 boolean usedBlocksFound = false; 1032 while (iterator.hasNext()) { 1033 CachedBlock next = iterator.next(); 1034 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 1035 if (cache instanceof BucketCache) { 1036 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 1037 } else if (cache instanceof CombinedBlockCache) { 1038 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 1039 } else { 1040 continue; 1041 } 1042 if (refCount != 0) { 1043 // Blocks will be with count 3 1044 assertEquals(NO_OF_THREADS, refCount); 1045 usedBlocksFound = true; 1046 } 1047 } 1048 // Make a put and do a flush 1049 QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 1050 data2 = Bytes.add(data, data); 1051 put = new Put(ROW1); 1052 put.addColumn(FAMILY, QUALIFIER2, data2); 1053 table.put(put); 1054 // flush, one new block 1055 System.out.println("Flushing cache"); 1056 region.flush(true); 1057 assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound); 1058 usedBlocksFound = false; 1059 System.out.println("Compacting"); 1060 assertEquals(3, store.getStorefilesCount()); 1061 store.triggerMajorCompaction(); 1062 region.compact(true); 1063 waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max 1064 assertEquals(1, store.getStorefilesCount()); 1065 // Even after compaction is done we will have some blocks that cannot 1066 // be evicted this is because the scan is still referencing them 1067 iterator = cache.iterator(); 1068 while (iterator.hasNext()) { 1069 CachedBlock next = iterator.next(); 1070 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 1071 if (cache instanceof BucketCache) { 1072 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 1073 } else if (cache instanceof CombinedBlockCache) { 1074 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 1075 } else { 1076 continue; 1077 } 1078 if (refCount != 0) { 1079 // Blocks will be with count 3 as they are not yet cleared 1080 assertEquals(NO_OF_THREADS, refCount); 1081 usedBlocksFound = true; 1082 } 1083 } 1084 assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound); 1085 // Should not throw exception 1086 compactionLatch.countDown(); 1087 latch.countDown(); 1088 for (ScanThread thread : scanThreads) { 1089 thread.join(); 1090 } 1091 // by this time all blocks should have been evicted 1092 iterator = cache.iterator(); 1093 // Since a flush and compaction happened after a scan started 1094 // we need to ensure that all the original blocks of the compacted file 1095 // is also removed. 1096 iterateBlockCache(cache, iterator); 1097 Result r = table.get(new Get(ROW)); 1098 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data)); 1099 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2)); 1100 // The gets would be working on new blocks 1101 iterator = cache.iterator(); 1102 iterateBlockCache(cache, iterator); 1103 } finally { 1104 if (table != null) { 1105 table.close(); 1106 } 1107 } 1108 } 1109 1110 @Test 1111 public void testScanWithException() throws IOException, InterruptedException { 1112 Table table = null; 1113 try { 1114 latch = new CountDownLatch(1); 1115 exceptionLatch = new CountDownLatch(1); 1116 final TableName tableName = TableName.valueOf(name.getMethodName()); 1117 // Create KV that will give you two blocks 1118 // Create a table with block size as 1024 1119 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 1120 CustomInnerRegionObserverWrapper.class.getName()); 1121 // get the block cache and region 1122 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 1123 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); 1124 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 1125 HStore store = region.getStores().iterator().next(); 1126 CacheConfig cacheConf = store.getCacheConfig(); 1127 cacheConf.setCacheDataOnWrite(true); 1128 cacheConf.setEvictOnClose(true); 1129 BlockCache cache = cacheConf.getBlockCache().get(); 1130 // insert data. 2 Rows are added 1131 insertData(table); 1132 // flush the data 1133 System.out.println("Flushing cache"); 1134 // Should create one Hfile with 2 blocks 1135 region.flush(true); 1136 // CustomInnerRegionObserver.sleepTime.set(5000); 1137 CustomInnerRegionObserver.throwException.set(true); 1138 ScanThread[] scanThreads = initiateScan(table, false); 1139 // The block would have been decremented for the scan case as it was 1140 // wrapped 1141 // before even the postNext hook gets executed. 1142 // giving some time for the block to be decremented 1143 Thread.sleep(100); 1144 Iterator<CachedBlock> iterator = cache.iterator(); 1145 boolean usedBlocksFound = false; 1146 int refCount = 0; 1147 while (iterator.hasNext()) { 1148 CachedBlock next = iterator.next(); 1149 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 1150 if (cache instanceof BucketCache) { 1151 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 1152 } else if (cache instanceof CombinedBlockCache) { 1153 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 1154 } else { 1155 continue; 1156 } 1157 if (refCount != 0) { 1158 // Blocks will be with count 3 1159 assertEquals(NO_OF_THREADS, refCount); 1160 usedBlocksFound = true; 1161 } 1162 } 1163 assertTrue(usedBlocksFound); 1164 exceptionLatch.countDown(); 1165 // countdown the latch 1166 CustomInnerRegionObserver.getCdl().get().countDown(); 1167 for (ScanThread thread : scanThreads) { 1168 thread.join(); 1169 } 1170 iterator = cache.iterator(); 1171 usedBlocksFound = false; 1172 refCount = 0; 1173 while (iterator.hasNext()) { 1174 CachedBlock next = iterator.next(); 1175 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 1176 if (cache instanceof BucketCache) { 1177 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 1178 } else if (cache instanceof CombinedBlockCache) { 1179 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 1180 } else { 1181 continue; 1182 } 1183 if (refCount != 0) { 1184 // Blocks will be with count 3 1185 assertEquals(NO_OF_THREADS, refCount); 1186 usedBlocksFound = true; 1187 } 1188 } 1189 assertFalse(usedBlocksFound); 1190 // you should always see 0 ref count. since after HBASE-16604 we always recreate the scanner 1191 assertEquals(0, refCount); 1192 } finally { 1193 if (table != null) { 1194 table.close(); 1195 } 1196 } 1197 } 1198 1199 private void iterateBlockCache(BlockCache cache, Iterator<CachedBlock> iterator) { 1200 int refCount; 1201 while (iterator.hasNext()) { 1202 CachedBlock next = iterator.next(); 1203 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 1204 if (cache instanceof BucketCache) { 1205 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 1206 LOG.info("BucketCache {} {}", cacheKey, refCount); 1207 } else if (cache instanceof CombinedBlockCache) { 1208 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 1209 LOG.info("CombinedBlockCache {} {}", cacheKey, refCount); 1210 } else { 1211 continue; 1212 } 1213 assertEquals(0, refCount); 1214 } 1215 } 1216 1217 private void insertData(Table table) throws IOException { 1218 Put put = new Put(ROW); 1219 put.addColumn(FAMILY, QUALIFIER, data); 1220 table.put(put); 1221 put = new Put(ROW1); 1222 put.addColumn(FAMILY, QUALIFIER, data); 1223 table.put(put); 1224 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 1225 put = new Put(ROW); 1226 put.addColumn(FAMILY, QUALIFIER2, data2); 1227 table.put(put); 1228 } 1229 1230 private ScanThread[] initiateScan(Table table, boolean reverse) 1231 throws IOException, InterruptedException { 1232 ScanThread[] scanThreads = new ScanThread[NO_OF_THREADS]; 1233 for (int i = 0; i < NO_OF_THREADS; i++) { 1234 scanThreads[i] = new ScanThread(table, reverse); 1235 } 1236 for (ScanThread thread : scanThreads) { 1237 thread.start(); 1238 } 1239 return scanThreads; 1240 } 1241 1242 private GetThread[] initiateGet(Table table, boolean tracker, boolean multipleCFs) 1243 throws IOException, InterruptedException { 1244 GetThread[] getThreads = new GetThread[NO_OF_THREADS]; 1245 for (int i = 0; i < NO_OF_THREADS; i++) { 1246 getThreads[i] = new GetThread(table, tracker, multipleCFs); 1247 } 1248 for (GetThread thread : getThreads) { 1249 thread.start(); 1250 } 1251 return getThreads; 1252 } 1253 1254 private MultiGetThread[] initiateMultiGet(Table table) throws IOException, InterruptedException { 1255 MultiGetThread[] multiGetThreads = new MultiGetThread[NO_OF_THREADS]; 1256 for (int i = 0; i < NO_OF_THREADS; i++) { 1257 multiGetThreads[i] = new MultiGetThread(table); 1258 } 1259 for (MultiGetThread thread : multiGetThreads) { 1260 thread.start(); 1261 } 1262 return multiGetThreads; 1263 } 1264 1265 private void checkForBlockEviction(BlockCache cache, boolean getClosed, boolean expectOnlyZero) 1266 throws InterruptedException { 1267 int counter = NO_OF_THREADS; 1268 if (CustomInnerRegionObserver.waitForGets.get()) { 1269 // Because only one row is selected, it has only 2 blocks 1270 counter = counter - 1; 1271 while (CustomInnerRegionObserver.countOfGets.get() < NO_OF_THREADS) { 1272 Thread.sleep(100); 1273 } 1274 } else { 1275 while (CustomInnerRegionObserver.countOfNext.get() < NO_OF_THREADS) { 1276 Thread.sleep(100); 1277 } 1278 } 1279 Iterator<CachedBlock> iterator = cache.iterator(); 1280 int refCount = 0; 1281 while (iterator.hasNext()) { 1282 CachedBlock next = iterator.next(); 1283 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 1284 if (cache instanceof BucketCache) { 1285 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); 1286 } else if (cache instanceof CombinedBlockCache) { 1287 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); 1288 } else { 1289 continue; 1290 } 1291 System.out.println(" the refcount is " + refCount + " block is " + cacheKey); 1292 if (CustomInnerRegionObserver.waitForGets.get()) { 1293 if (expectOnlyZero) { 1294 assertTrue(refCount == 0); 1295 } 1296 if (refCount != 0) { 1297 // Because the scan would have also touched up on these blocks but 1298 // it 1299 // would have touched 1300 // all 3 1301 if (getClosed) { 1302 // If get has closed only the scan's blocks would be available 1303 assertEquals(refCount, CustomInnerRegionObserver.countOfGets.get()); 1304 } else { 1305 assertEquals(refCount, CustomInnerRegionObserver.countOfGets.get() + (NO_OF_THREADS)); 1306 } 1307 } 1308 } else { 1309 // Because the get would have also touched up on these blocks but it 1310 // would have touched 1311 // upon only 2 additionally 1312 if (expectOnlyZero) { 1313 assertTrue(refCount == 0); 1314 } 1315 if (refCount != 0) { 1316 if (getLatch == null) { 1317 assertEquals(refCount, CustomInnerRegionObserver.countOfNext.get()); 1318 } else { 1319 assertEquals(refCount, CustomInnerRegionObserver.countOfNext.get() + (NO_OF_THREADS)); 1320 } 1321 } 1322 } 1323 } 1324 CustomInnerRegionObserver.getCdl().get().countDown(); 1325 } 1326 1327 private static class MultiGetThread extends Thread { 1328 private final Table table; 1329 private final List<Get> gets = new ArrayList<>(); 1330 1331 public MultiGetThread(Table table) { 1332 this.table = table; 1333 } 1334 1335 @Override 1336 public void run() { 1337 gets.add(new Get(ROW)); 1338 gets.add(new Get(ROW1)); 1339 try { 1340 CustomInnerRegionObserver.getCdl().set(latch); 1341 Result[] r = table.get(gets); 1342 assertTrue(Bytes.equals(r[0].getRow(), ROW)); 1343 assertTrue(Bytes.equals(r[1].getRow(), ROW1)); 1344 } catch (IOException e) { 1345 } 1346 } 1347 } 1348 1349 private static class GetThread extends Thread { 1350 private final Table table; 1351 private final boolean tracker; 1352 private final boolean multipleCFs; 1353 1354 public GetThread(Table table, boolean tracker, boolean multipleCFs) { 1355 this.table = table; 1356 this.tracker = tracker; 1357 this.multipleCFs = multipleCFs; 1358 } 1359 1360 @Override 1361 public void run() { 1362 try { 1363 initiateGet(table); 1364 } catch (IOException e) { 1365 // do nothing 1366 } 1367 } 1368 1369 private void initiateGet(Table table) throws IOException { 1370 Get get = new Get(ROW); 1371 if (tracker) { 1372 // Change this 1373 if (!multipleCFs) { 1374 get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 3)); 1375 get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 8)); 1376 get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 9)); 1377 // Unknown key 1378 get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 900)); 1379 } else { 1380 get.addColumn(Bytes.toBytes("testFamily" + 3), Bytes.toBytes("testQualifier" + 3)); 1381 get.addColumn(Bytes.toBytes("testFamily" + 8), Bytes.toBytes("testQualifier" + 8)); 1382 get.addColumn(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 9)); 1383 // Unknown key 1384 get.addColumn(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 900)); 1385 } 1386 } 1387 CustomInnerRegionObserver.getCdl().set(latch); 1388 Result r = table.get(get); 1389 System.out.println(r); 1390 if (!tracker) { 1391 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data)); 1392 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2)); 1393 } else { 1394 if (!multipleCFs) { 1395 assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 3)), data2)); 1396 assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 8)), data2)); 1397 assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 9)), data2)); 1398 } else { 1399 assertTrue(Bytes.equals( 1400 r.getValue(Bytes.toBytes("testFamily" + 3), Bytes.toBytes("testQualifier" + 3)), 1401 data2)); 1402 assertTrue(Bytes.equals( 1403 r.getValue(Bytes.toBytes("testFamily" + 8), Bytes.toBytes("testQualifier" + 8)), 1404 data2)); 1405 assertTrue(Bytes.equals( 1406 r.getValue(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 9)), 1407 data2)); 1408 } 1409 } 1410 } 1411 } 1412 1413 private static class ScanThread extends Thread { 1414 private final Table table; 1415 private final boolean reverse; 1416 1417 public ScanThread(Table table, boolean reverse) { 1418 this.table = table; 1419 this.reverse = reverse; 1420 } 1421 1422 @Override 1423 public void run() { 1424 try { 1425 initiateScan(table); 1426 } catch (IOException e) { 1427 // do nothing 1428 } 1429 } 1430 1431 private void initiateScan(Table table) throws IOException { 1432 Scan scan = new Scan(); 1433 if (reverse) { 1434 scan.setReversed(true); 1435 } 1436 CustomInnerRegionObserver.getCdl().set(latch); 1437 ResultScanner resScanner = table.getScanner(scan); 1438 int i = (reverse ? ROWS.length - 1 : 0); 1439 boolean resultFound = false; 1440 for (Result result : resScanner) { 1441 resultFound = true; 1442 System.out.println(result); 1443 if (!reverse) { 1444 assertTrue(Bytes.equals(result.getRow(), ROWS[i])); 1445 i++; 1446 } else { 1447 assertTrue(Bytes.equals(result.getRow(), ROWS[i])); 1448 i--; 1449 } 1450 } 1451 assertTrue(resultFound); 1452 } 1453 } 1454 1455 private void waitForStoreFileCount(HStore store, int count, int timeout) 1456 throws InterruptedException { 1457 long start = EnvironmentEdgeManager.currentTime(); 1458 while ( 1459 start + timeout > EnvironmentEdgeManager.currentTime() && store.getStorefilesCount() != count 1460 ) { 1461 Thread.sleep(100); 1462 } 1463 System.out.println("start=" + start + ", now=" + EnvironmentEdgeManager.currentTime() + ", cur=" 1464 + store.getStorefilesCount()); 1465 assertEquals(count, store.getStorefilesCount()); 1466 } 1467 1468 private static class CustomScanner implements RegionScanner { 1469 1470 private RegionScanner delegate; 1471 1472 public CustomScanner(RegionScanner delegate) { 1473 this.delegate = delegate; 1474 } 1475 1476 @Override 1477 public boolean next(List<Cell> results) throws IOException { 1478 return delegate.next(results); 1479 } 1480 1481 @Override 1482 public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException { 1483 return delegate.next(result, scannerContext); 1484 } 1485 1486 @Override 1487 public boolean nextRaw(List<Cell> result) throws IOException { 1488 return delegate.nextRaw(result); 1489 } 1490 1491 @Override 1492 public boolean nextRaw(List<Cell> result, ScannerContext context) throws IOException { 1493 boolean nextRaw = delegate.nextRaw(result, context); 1494 if (compactionLatch != null && compactionLatch.getCount() > 0) { 1495 try { 1496 compactionLatch.await(); 1497 } catch (InterruptedException ie) { 1498 } 1499 } 1500 1501 if (CustomInnerRegionObserver.throwException.get()) { 1502 if (exceptionLatch.getCount() > 0) { 1503 try { 1504 exceptionLatch.await(); 1505 } catch (InterruptedException e) { 1506 } 1507 throw new IOException("throw exception"); 1508 } 1509 } 1510 return nextRaw; 1511 } 1512 1513 @Override 1514 public void close() throws IOException { 1515 delegate.close(); 1516 } 1517 1518 @Override 1519 public RegionInfo getRegionInfo() { 1520 return delegate.getRegionInfo(); 1521 } 1522 1523 @Override 1524 public boolean isFilterDone() throws IOException { 1525 return delegate.isFilterDone(); 1526 } 1527 1528 @Override 1529 public boolean reseek(byte[] row) throws IOException { 1530 return false; 1531 } 1532 1533 @Override 1534 public long getMaxResultSize() { 1535 return delegate.getMaxResultSize(); 1536 } 1537 1538 @Override 1539 public long getMvccReadPoint() { 1540 return delegate.getMvccReadPoint(); 1541 } 1542 1543 @Override 1544 public int getBatch() { 1545 return delegate.getBatch(); 1546 } 1547 } 1548 1549 public static class CustomInnerRegionObserverWrapper extends CustomInnerRegionObserver { 1550 @Override 1551 public RegionScanner postScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e, Scan scan, 1552 RegionScanner s) throws IOException { 1553 return new CustomScanner(s); 1554 } 1555 } 1556 1557 public static class CustomInnerRegionObserver implements RegionCoprocessor, RegionObserver { 1558 static final AtomicInteger countOfNext = new AtomicInteger(0); 1559 static final AtomicInteger countOfGets = new AtomicInteger(0); 1560 static final AtomicBoolean waitForGets = new AtomicBoolean(false); 1561 static final AtomicBoolean throwException = new AtomicBoolean(false); 1562 private static final AtomicReference<CountDownLatch> cdl = 1563 new AtomicReference<>(new CountDownLatch(0)); 1564 1565 @Override 1566 public Optional<RegionObserver> getRegionObserver() { 1567 return Optional.of(this); 1568 } 1569 1570 @Override 1571 public boolean postScannerNext(ObserverContext<RegionCoprocessorEnvironment> e, 1572 InternalScanner s, List<Result> results, int limit, boolean hasMore) throws IOException { 1573 slowdownCode(e, false); 1574 if (getLatch != null && getLatch.getCount() > 0) { 1575 try { 1576 getLatch.await(); 1577 } catch (InterruptedException e1) { 1578 } 1579 } 1580 return hasMore; 1581 } 1582 1583 @Override 1584 public void postGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get, 1585 List<Cell> results) throws IOException { 1586 slowdownCode(e, true); 1587 } 1588 1589 public static AtomicReference<CountDownLatch> getCdl() { 1590 return cdl; 1591 } 1592 1593 private void slowdownCode(final ObserverContext<RegionCoprocessorEnvironment> e, 1594 boolean isGet) { 1595 CountDownLatch latch = getCdl().get(); 1596 try { 1597 System.out.println(latch.getCount() + " is the count " + isGet); 1598 if (latch.getCount() > 0) { 1599 if (isGet) { 1600 countOfGets.incrementAndGet(); 1601 } else { 1602 countOfNext.incrementAndGet(); 1603 } 1604 LOG.info("Waiting for the counterCountDownLatch"); 1605 latch.await(2, TimeUnit.MINUTES); // To help the tests to finish. 1606 if (latch.getCount() > 0) { 1607 throw new RuntimeException("Can't wait more"); 1608 } 1609 } 1610 } catch (InterruptedException e1) { 1611 LOG.error(e1.toString(), e1); 1612 } 1613 } 1614 } 1615}