001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.Iterator; 027import java.util.List; 028import java.util.Optional; 029import java.util.concurrent.CountDownLatch; 030import java.util.concurrent.TimeUnit; 031import java.util.concurrent.atomic.AtomicBoolean; 032import java.util.concurrent.atomic.AtomicInteger; 033import java.util.concurrent.atomic.AtomicLong; 034import java.util.concurrent.atomic.AtomicReference; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.hbase.Cell; 037import org.apache.hadoop.hbase.HBaseClassTestRule; 038import org.apache.hadoop.hbase.HBaseTestingUtility; 039import org.apache.hadoop.hbase.HConstants; 040import org.apache.hadoop.hbase.ServerName; 041import org.apache.hadoop.hbase.TableName; 042import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 043import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; 044import org.apache.hadoop.hbase.coprocessor.ObserverContext; 045import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 046import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 047import org.apache.hadoop.hbase.coprocessor.RegionObserver; 048import org.apache.hadoop.hbase.io.hfile.BlockCache; 049import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 050import org.apache.hadoop.hbase.io.hfile.CacheConfig; 051import org.apache.hadoop.hbase.io.hfile.CachedBlock; 052import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache; 053import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; 054import org.apache.hadoop.hbase.regionserver.HRegion; 055import org.apache.hadoop.hbase.regionserver.HStore; 056import org.apache.hadoop.hbase.regionserver.InternalScanner; 057import org.apache.hadoop.hbase.regionserver.RegionScanner; 058import org.apache.hadoop.hbase.regionserver.ScannerContext; 059import org.apache.hadoop.hbase.testclassification.ClientTests; 060import org.apache.hadoop.hbase.testclassification.LargeTests; 061import org.apache.hadoop.hbase.util.Bytes; 062import org.junit.After; 063import org.junit.AfterClass; 064import org.junit.Before; 065import org.junit.BeforeClass; 066import org.junit.ClassRule; 067import org.junit.Rule; 068import org.junit.Test; 069import org.junit.experimental.categories.Category; 070import org.junit.rules.TestName; 071import org.slf4j.Logger; 072import org.slf4j.LoggerFactory; 073 074import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 075 076@Category({ LargeTests.class, ClientTests.class }) 077@SuppressWarnings("deprecation") 078public class TestBlockEvictionFromClient { 079 080 @ClassRule 081 public static final HBaseClassTestRule CLASS_RULE = 082 HBaseClassTestRule.forClass(TestBlockEvictionFromClient.class); 083 084 private static final Logger LOG = LoggerFactory.getLogger(TestBlockEvictionFromClient.class); 085 protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 086 static byte[][] ROWS = new byte[2][]; 087 private static int NO_OF_THREADS = 3; 088 private static byte[] ROW = Bytes.toBytes("testRow"); 089 private static byte[] ROW1 = Bytes.toBytes("testRow1"); 090 private static byte[] ROW2 = Bytes.toBytes("testRow2"); 091 private static byte[] ROW3 = Bytes.toBytes("testRow3"); 092 private static byte[] FAMILY = Bytes.toBytes("testFamily"); 093 private static byte[][] FAMILIES_1 = new byte[1][0]; 094 private static byte[] QUALIFIER = Bytes.toBytes("testQualifier"); 095 private static byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 096 private static byte[] data = new byte[1000]; 097 private static byte[] data2 = Bytes.add(data, data); 098 protected static int SLAVES = 1; 099 private static CountDownLatch latch; 100 private static CountDownLatch getLatch; 101 private static CountDownLatch compactionLatch; 102 private static CountDownLatch exceptionLatch; 103 104 @Rule 105 public TestName name = new TestName(); 106 107 /** 108 * @throws java.lang.Exception 109 */ 110 @BeforeClass 111 public static void setUpBeforeClass() throws Exception { 112 ROWS[0] = ROW; 113 ROWS[1] = ROW1; 114 Configuration conf = TEST_UTIL.getConfiguration(); 115 conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, 116 MultiRowMutationEndpoint.class.getName()); 117 conf.setInt("hbase.regionserver.handler.count", 20); 118 conf.setInt("hbase.bucketcache.size", 400); 119 conf.setStrings(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap"); 120 conf.setFloat("hfile.block.cache.size", 0.2f); 121 conf.setFloat("hbase.regionserver.global.memstore.size", 0.1f); 122 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);// do not retry 123 conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 5000); 124 FAMILIES_1[0] = FAMILY; 125 TEST_UTIL.startMiniCluster(SLAVES); 126 } 127 128 /** 129 * @throws java.lang.Exception 130 */ 131 @AfterClass 132 public static void tearDownAfterClass() throws Exception { 133 TEST_UTIL.shutdownMiniCluster(); 134 } 135 136 /** 137 * @throws java.lang.Exception 138 */ 139 @Before 140 public void setUp() throws Exception { 141 CustomInnerRegionObserver.waitForGets.set(false); 142 CustomInnerRegionObserver.countOfNext.set(0); 143 CustomInnerRegionObserver.countOfGets.set(0); 144 } 145 146 /** 147 * @throws java.lang.Exception 148 */ 149 @After 150 public void tearDown() throws Exception { 151 if (latch != null) { 152 while (latch.getCount() > 0) { 153 latch.countDown(); 154 } 155 } 156 if (getLatch != null) { 157 getLatch.countDown(); 158 } 159 if (compactionLatch != null) { 160 compactionLatch.countDown(); 161 } 162 if (exceptionLatch != null) { 163 exceptionLatch.countDown(); 164 } 165 latch = null; 166 getLatch = null; 167 compactionLatch = null; 168 exceptionLatch = null; 169 CustomInnerRegionObserver.throwException.set(false); 170 // Clean up the tables for every test case 171 TableName[] listTableNames = TEST_UTIL.getAdmin().listTableNames(); 172 for (TableName tableName : listTableNames) { 173 if (!tableName.isSystemTable()) { 174 TEST_UTIL.getAdmin().disableTable(tableName); 175 TEST_UTIL.getAdmin().deleteTable(tableName); 176 } 177 } 178 } 179 180 @Test 181 public void testBlockEvictionWithParallelScans() throws Exception { 182 Table table = null; 183 try { 184 latch = new CountDownLatch(1); 185 final TableName tableName = TableName.valueOf(name.getMethodName()); 186 // Create a table with block size as 1024 187 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 188 CustomInnerRegionObserver.class.getName()); 189 // get the block cache and region 190 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 191 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); 192 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName) 193 .getRegion(regionName); 194 HStore store = region.getStores().iterator().next(); 195 CacheConfig cacheConf = store.getCacheConfig(); 196 cacheConf.setCacheDataOnWrite(true); 197 cacheConf.setEvictOnClose(true); 198 BlockCache cache = cacheConf.getBlockCache().get(); 199 200 // insert data. 2 Rows are added 201 Put put = new Put(ROW); 202 put.addColumn(FAMILY, QUALIFIER, data); 203 table.put(put); 204 put = new Put(ROW1); 205 put.addColumn(FAMILY, QUALIFIER, data); 206 table.put(put); 207 assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data)); 208 // data was in memstore so don't expect any changes 209 // flush the data 210 // Should create one Hfile with 2 blocks 211 region.flush(true); 212 // Load cache 213 // Create three sets of scan 214 ScanThread[] scanThreads = initiateScan(table, false); 215 Thread.sleep(100); 216 checkForBlockEviction(cache, false, false); 217 for (ScanThread thread : scanThreads) { 218 thread.join(); 219 } 220 // CustomInnerRegionObserver.sleepTime.set(0); 221 Iterator<CachedBlock> iterator = cache.iterator(); 222 iterateBlockCache(cache, iterator); 223 // read the data and expect same blocks, one new hit, no misses 224 assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data)); 225 iterator = cache.iterator(); 226 iterateBlockCache(cache, iterator); 227 // Check how this miss is happening 228 // insert a second column, read the row, no new blocks, 3 new hits 229 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 230 byte[] data2 = Bytes.add(data, data); 231 put = new Put(ROW); 232 put.addColumn(FAMILY, QUALIFIER2, data2); 233 table.put(put); 234 Result r = table.get(new Get(ROW)); 235 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data)); 236 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2)); 237 iterator = cache.iterator(); 238 iterateBlockCache(cache, iterator); 239 // flush, one new block 240 System.out.println("Flushing cache"); 241 region.flush(true); 242 iterator = cache.iterator(); 243 iterateBlockCache(cache, iterator); 244 // compact, net minus two blocks, two hits, no misses 245 System.out.println("Compacting"); 246 assertEquals(2, store.getStorefilesCount()); 247 store.triggerMajorCompaction(); 248 region.compact(true); 249 waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max 250 assertEquals(1, store.getStorefilesCount()); 251 iterator = cache.iterator(); 252 iterateBlockCache(cache, iterator); 253 // read the row, this should be a cache miss because we don't cache data 254 // blocks on compaction 255 r = table.get(new Get(ROW)); 256 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data)); 257 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2)); 258 iterator = cache.iterator(); 259 iterateBlockCache(cache, iterator); 260 } finally { 261 if (table != null) { 262 table.close(); 263 } 264 } 265 } 266 267 @Test 268 public void testParallelGetsAndScans() throws IOException, InterruptedException { 269 Table table = null; 270 try { 271 latch = new CountDownLatch(2); 272 // Check if get() returns blocks on its close() itself 273 getLatch = new CountDownLatch(1); 274 final TableName tableName = TableName.valueOf(name.getMethodName()); 275 // Create KV that will give you two blocks 276 // Create a table with block size as 1024 277 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 278 CustomInnerRegionObserver.class.getName()); 279 // get the block cache and region 280 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 281 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); 282 HRegion region = 283 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 284 HStore store = region.getStores().iterator().next(); 285 CacheConfig cacheConf = store.getCacheConfig(); 286 cacheConf.setCacheDataOnWrite(true); 287 cacheConf.setEvictOnClose(true); 288 BlockCache cache = cacheConf.getBlockCache().get(); 289 290 insertData(table); 291 // flush the data 292 System.out.println("Flushing cache"); 293 // Should create one Hfile with 2 blocks 294 region.flush(true); 295 // Create three sets of scan 296 CustomInnerRegionObserver.waitForGets.set(true); 297 ScanThread[] scanThreads = initiateScan(table, false); 298 // Create three sets of gets 299 GetThread[] getThreads = initiateGet(table, false, false); 300 checkForBlockEviction(cache, false, false); 301 CustomInnerRegionObserver.waitForGets.set(false); 302 checkForBlockEviction(cache, false, false); 303 for (GetThread thread : getThreads) { 304 thread.join(); 305 } 306 // Verify whether the gets have returned the blocks that it had 307 CustomInnerRegionObserver.waitForGets.set(true); 308 // giving some time for the block to be decremented 309 checkForBlockEviction(cache, true, false); 310 getLatch.countDown(); 311 for (ScanThread thread : scanThreads) { 312 thread.join(); 313 } 314 System.out.println("Scans should have returned the bloks"); 315 // Check with either true or false 316 CustomInnerRegionObserver.waitForGets.set(false); 317 // The scan should also have released the blocks by now 318 checkForBlockEviction(cache, true, true); 319 } finally { 320 if (table != null) { 321 table.close(); 322 } 323 } 324 } 325 326 @Test 327 public void testGetWithCellsInDifferentFiles() throws IOException, InterruptedException { 328 Table table = null; 329 try { 330 latch = new CountDownLatch(1); 331 // Check if get() returns blocks on its close() itself 332 getLatch = new CountDownLatch(1); 333 final TableName tableName = TableName.valueOf(name.getMethodName()); 334 // Create KV that will give you two blocks 335 // Create a table with block size as 1024 336 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 337 CustomInnerRegionObserver.class.getName()); 338 // get the block cache and region 339 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 340 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); 341 HRegion region = 342 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 343 HStore store = region.getStores().iterator().next(); 344 CacheConfig cacheConf = store.getCacheConfig(); 345 cacheConf.setCacheDataOnWrite(true); 346 cacheConf.setEvictOnClose(true); 347 BlockCache cache = cacheConf.getBlockCache().get(); 348 349 Put put = new Put(ROW); 350 put.addColumn(FAMILY, QUALIFIER, data); 351 table.put(put); 352 region.flush(true); 353 put = new Put(ROW1); 354 put.addColumn(FAMILY, QUALIFIER, data); 355 table.put(put); 356 region.flush(true); 357 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 358 put = new Put(ROW); 359 put.addColumn(FAMILY, QUALIFIER2, data2); 360 table.put(put); 361 region.flush(true); 362 // flush the data 363 System.out.println("Flushing cache"); 364 // Should create one Hfile with 2 blocks 365 CustomInnerRegionObserver.waitForGets.set(true); 366 // Create three sets of gets 367 GetThread[] getThreads = initiateGet(table, false, false); 368 Thread.sleep(200); 369 CustomInnerRegionObserver.getCdl().get().countDown(); 370 for (GetThread thread : getThreads) { 371 thread.join(); 372 } 373 // Verify whether the gets have returned the blocks that it had 374 CustomInnerRegionObserver.waitForGets.set(true); 375 // giving some time for the block to be decremented 376 checkForBlockEviction(cache, true, false); 377 getLatch.countDown(); 378 System.out.println("Gets should have returned the bloks"); 379 } finally { 380 if (table != null) { 381 table.close(); 382 } 383 } 384 } 385 386 @Test 387 // TODO : check how block index works here 388 public void testGetsWithMultiColumnsAndExplicitTracker() 389 throws IOException, InterruptedException { 390 Table table = null; 391 try { 392 latch = new CountDownLatch(1); 393 // Check if get() returns blocks on its close() itself 394 getLatch = new CountDownLatch(1); 395 final TableName tableName = TableName.valueOf(name.getMethodName()); 396 // Create KV that will give you two blocks 397 // Create a table with block size as 1024 398 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 399 CustomInnerRegionObserver.class.getName()); 400 // get the block cache and region 401 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 402 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); 403 HRegion region = 404 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 405 BlockCache cache = setCacheProperties(region); 406 Put put = new Put(ROW); 407 put.addColumn(FAMILY, QUALIFIER, data); 408 table.put(put); 409 region.flush(true); 410 put = new Put(ROW1); 411 put.addColumn(FAMILY, QUALIFIER, data); 412 table.put(put); 413 region.flush(true); 414 for (int i = 1; i < 10; i++) { 415 put = new Put(ROW); 416 put.addColumn(FAMILY, Bytes.toBytes("testQualifier" + i), data2); 417 table.put(put); 418 if (i % 2 == 0) { 419 region.flush(true); 420 } 421 } 422 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 423 put = new Put(ROW); 424 put.addColumn(FAMILY, QUALIFIER2, data2); 425 table.put(put); 426 region.flush(true); 427 // flush the data 428 System.out.println("Flushing cache"); 429 // Should create one Hfile with 2 blocks 430 CustomInnerRegionObserver.waitForGets.set(true); 431 // Create three sets of gets 432 GetThread[] getThreads = initiateGet(table, true, false); 433 Thread.sleep(200); 434 Iterator<CachedBlock> iterator = cache.iterator(); 435 boolean usedBlocksFound = false; 436 int refCount = 0; 437 int noOfBlocksWithRef = 0; 438 while (iterator.hasNext()) { 439 CachedBlock next = iterator.next(); 440 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 441 if (cache instanceof BucketCache) { 442 refCount = ((BucketCache) cache).getRefCount(cacheKey); 443 } else if (cache instanceof CombinedBlockCache) { 444 refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey); 445 } else { 446 continue; 447 } 448 if (refCount != 0) { 449 // Blocks will be with count 3 450 System.out.println("The refCount is " + refCount); 451 assertEquals(NO_OF_THREADS, refCount); 452 usedBlocksFound = true; 453 noOfBlocksWithRef++; 454 } 455 } 456 assertTrue(usedBlocksFound); 457 // the number of blocks referred 458 assertEquals(10, noOfBlocksWithRef); 459 CustomInnerRegionObserver.getCdl().get().countDown(); 460 for (GetThread thread : getThreads) { 461 thread.join(); 462 } 463 // Verify whether the gets have returned the blocks that it had 464 CustomInnerRegionObserver.waitForGets.set(true); 465 // giving some time for the block to be decremented 466 checkForBlockEviction(cache, true, false); 467 getLatch.countDown(); 468 System.out.println("Gets should have returned the bloks"); 469 } finally { 470 if (table != null) { 471 table.close(); 472 } 473 } 474 } 475 476 @Test 477 public void testGetWithMultipleColumnFamilies() throws IOException, InterruptedException { 478 Table table = null; 479 try { 480 latch = new CountDownLatch(1); 481 // Check if get() returns blocks on its close() itself 482 getLatch = new CountDownLatch(1); 483 final TableName tableName = TableName.valueOf(name.getMethodName()); 484 // Create KV that will give you two blocks 485 // Create a table with block size as 1024 486 byte[][] fams = new byte[10][]; 487 fams[0] = FAMILY; 488 for (int i = 1; i < 10; i++) { 489 fams[i] = (Bytes.toBytes("testFamily" + i)); 490 } 491 table = TEST_UTIL.createTable(tableName, fams, 1, 1024, 492 CustomInnerRegionObserver.class.getName()); 493 // get the block cache and region 494 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 495 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); 496 HRegion region = 497 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 498 BlockCache cache = setCacheProperties(region); 499 500 Put put = new Put(ROW); 501 put.addColumn(FAMILY, QUALIFIER, data); 502 table.put(put); 503 region.flush(true); 504 put = new Put(ROW1); 505 put.addColumn(FAMILY, QUALIFIER, data); 506 table.put(put); 507 region.flush(true); 508 for (int i = 1; i < 10; i++) { 509 put = new Put(ROW); 510 put.addColumn(Bytes.toBytes("testFamily" + i), Bytes.toBytes("testQualifier" + i), data2); 511 table.put(put); 512 if (i % 2 == 0) { 513 region.flush(true); 514 } 515 } 516 region.flush(true); 517 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 518 put = new Put(ROW); 519 put.addColumn(FAMILY, QUALIFIER2, data2); 520 table.put(put); 521 region.flush(true); 522 // flush the data 523 System.out.println("Flushing cache"); 524 // Should create one Hfile with 2 blocks 525 CustomInnerRegionObserver.waitForGets.set(true); 526 // Create three sets of gets 527 GetThread[] getThreads = initiateGet(table, true, true); 528 Thread.sleep(200); 529 Iterator<CachedBlock> iterator = cache.iterator(); 530 boolean usedBlocksFound = false; 531 int refCount = 0; 532 int noOfBlocksWithRef = 0; 533 while (iterator.hasNext()) { 534 CachedBlock next = iterator.next(); 535 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 536 if (cache instanceof BucketCache) { 537 refCount = ((BucketCache) cache).getRefCount(cacheKey); 538 } else if (cache instanceof CombinedBlockCache) { 539 refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey); 540 } else { 541 continue; 542 } 543 if (refCount != 0) { 544 // Blocks will be with count 3 545 System.out.println("The refCount is " + refCount); 546 assertEquals(NO_OF_THREADS, refCount); 547 usedBlocksFound = true; 548 noOfBlocksWithRef++; 549 } 550 } 551 assertTrue(usedBlocksFound); 552 // the number of blocks referred 553 assertEquals(3, noOfBlocksWithRef); 554 CustomInnerRegionObserver.getCdl().get().countDown(); 555 for (GetThread thread : getThreads) { 556 thread.join(); 557 } 558 // Verify whether the gets have returned the blocks that it had 559 CustomInnerRegionObserver.waitForGets.set(true); 560 // giving some time for the block to be decremented 561 checkForBlockEviction(cache, true, false); 562 getLatch.countDown(); 563 System.out.println("Gets should have returned the bloks"); 564 } finally { 565 if (table != null) { 566 table.close(); 567 } 568 } 569 } 570 571 @Test 572 public void testBlockRefCountAfterSplits() throws IOException, InterruptedException { 573 Table table = null; 574 try { 575 final TableName tableName = TableName.valueOf(name.getMethodName()); 576 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024); 577 // get the block cache and region 578 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 579 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 580 HRegion region = 581 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 582 HStore store = region.getStores().iterator().next(); 583 CacheConfig cacheConf = store.getCacheConfig(); 584 cacheConf.setEvictOnClose(true); 585 BlockCache cache = cacheConf.getBlockCache().get(); 586 587 Put put = new Put(ROW); 588 put.addColumn(FAMILY, QUALIFIER, data); 589 table.put(put); 590 region.flush(true); 591 put = new Put(ROW1); 592 put.addColumn(FAMILY, QUALIFIER, data); 593 table.put(put); 594 region.flush(true); 595 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 596 put = new Put(ROW2); 597 put.addColumn(FAMILY, QUALIFIER2, data2); 598 table.put(put); 599 put = new Put(ROW3); 600 put.addColumn(FAMILY, QUALIFIER2, data2); 601 table.put(put); 602 region.flush(true); 603 ServerName rs = Iterables.getOnlyElement(TEST_UTIL.getAdmin().getRegionServers()); 604 int regionCount = TEST_UTIL.getAdmin().getRegions(rs).size(); 605 LOG.info("About to SPLIT on " + Bytes.toString(ROW1)); 606 TEST_UTIL.getAdmin().split(tableName, ROW1); 607 // Wait for splits 608 TEST_UTIL.waitFor(60000, () -> TEST_UTIL.getAdmin().getRegions(rs).size() > regionCount); 609 region.compact(true); 610 Iterator<CachedBlock> iterator = cache.iterator(); 611 // Though the split had created the HalfStorefileReader - the firstkey and lastkey scanners 612 // should be closed inorder to return those blocks 613 iterateBlockCache(cache, iterator); 614 } finally { 615 if (table != null) { 616 table.close(); 617 } 618 } 619 } 620 621 @Test 622 public void testMultiGets() throws IOException, InterruptedException { 623 Table table = null; 624 try { 625 latch = new CountDownLatch(2); 626 // Check if get() returns blocks on its close() itself 627 getLatch = new CountDownLatch(1); 628 final TableName tableName = TableName.valueOf(name.getMethodName()); 629 // Create KV that will give you two blocks 630 // Create a table with block size as 1024 631 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 632 CustomInnerRegionObserver.class.getName()); 633 // get the block cache and region 634 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 635 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); 636 HRegion region = 637 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 638 HStore store = region.getStores().iterator().next(); 639 CacheConfig cacheConf = store.getCacheConfig(); 640 cacheConf.setCacheDataOnWrite(true); 641 cacheConf.setEvictOnClose(true); 642 BlockCache cache = cacheConf.getBlockCache().get(); 643 644 Put put = new Put(ROW); 645 put.addColumn(FAMILY, QUALIFIER, data); 646 table.put(put); 647 region.flush(true); 648 put = new Put(ROW1); 649 put.addColumn(FAMILY, QUALIFIER, data); 650 table.put(put); 651 region.flush(true); 652 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 653 put = new Put(ROW); 654 put.addColumn(FAMILY, QUALIFIER2, data2); 655 table.put(put); 656 region.flush(true); 657 // flush the data 658 System.out.println("Flushing cache"); 659 // Should create one Hfile with 2 blocks 660 CustomInnerRegionObserver.waitForGets.set(true); 661 // Create three sets of gets 662 MultiGetThread[] getThreads = initiateMultiGet(table); 663 Thread.sleep(200); 664 int refCount; 665 Iterator<CachedBlock> iterator = cache.iterator(); 666 boolean foundNonZeroBlock = false; 667 while (iterator.hasNext()) { 668 CachedBlock next = iterator.next(); 669 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 670 if (cache instanceof BucketCache) { 671 refCount = ((BucketCache) cache).getRefCount(cacheKey); 672 } else if (cache instanceof CombinedBlockCache) { 673 refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey); 674 } else { 675 continue; 676 } 677 if (refCount != 0) { 678 assertEquals(NO_OF_THREADS, refCount); 679 foundNonZeroBlock = true; 680 } 681 } 682 assertTrue("Should have found nonzero ref count block",foundNonZeroBlock); 683 CustomInnerRegionObserver.getCdl().get().countDown(); 684 CustomInnerRegionObserver.getCdl().get().countDown(); 685 for (MultiGetThread thread : getThreads) { 686 thread.join(); 687 } 688 // Verify whether the gets have returned the blocks that it had 689 CustomInnerRegionObserver.waitForGets.set(true); 690 // giving some time for the block to be decremented 691 iterateBlockCache(cache, iterator); 692 getLatch.countDown(); 693 System.out.println("Gets should have returned the bloks"); 694 } finally { 695 if (table != null) { 696 table.close(); 697 } 698 } 699 } 700 @Test 701 public void testScanWithMultipleColumnFamilies() throws IOException, InterruptedException { 702 Table table = null; 703 try { 704 latch = new CountDownLatch(1); 705 // Check if get() returns blocks on its close() itself 706 final TableName tableName = TableName.valueOf(name.getMethodName()); 707 // Create KV that will give you two blocks 708 // Create a table with block size as 1024 709 byte[][] fams = new byte[10][]; 710 fams[0] = FAMILY; 711 for (int i = 1; i < 10; i++) { 712 fams[i] = (Bytes.toBytes("testFamily" + i)); 713 } 714 table = TEST_UTIL.createTable(tableName, fams, 1, 1024, 715 CustomInnerRegionObserver.class.getName()); 716 // get the block cache and region 717 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 718 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); 719 HRegion region = 720 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 721 BlockCache cache = setCacheProperties(region); 722 723 Put put = new Put(ROW); 724 put.addColumn(FAMILY, QUALIFIER, data); 725 table.put(put); 726 region.flush(true); 727 put = new Put(ROW1); 728 put.addColumn(FAMILY, QUALIFIER, data); 729 table.put(put); 730 region.flush(true); 731 for (int i = 1; i < 10; i++) { 732 put = new Put(ROW); 733 put.addColumn(Bytes.toBytes("testFamily" + i), Bytes.toBytes("testQualifier" + i), data2); 734 table.put(put); 735 if (i % 2 == 0) { 736 region.flush(true); 737 } 738 } 739 region.flush(true); 740 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 741 put = new Put(ROW); 742 put.addColumn(FAMILY, QUALIFIER2, data2); 743 table.put(put); 744 region.flush(true); 745 // flush the data 746 System.out.println("Flushing cache"); 747 // Should create one Hfile with 2 blocks 748 // Create three sets of gets 749 ScanThread[] scanThreads = initiateScan(table, true); 750 Thread.sleep(200); 751 Iterator<CachedBlock> iterator = cache.iterator(); 752 boolean usedBlocksFound = false; 753 int refCount = 0; 754 int noOfBlocksWithRef = 0; 755 while (iterator.hasNext()) { 756 CachedBlock next = iterator.next(); 757 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 758 if (cache instanceof BucketCache) { 759 refCount = ((BucketCache) cache).getRefCount(cacheKey); 760 } else if (cache instanceof CombinedBlockCache) { 761 refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey); 762 } else { 763 continue; 764 } 765 if (refCount != 0) { 766 // Blocks will be with count 3 767 System.out.println("The refCount is " + refCount); 768 assertEquals(NO_OF_THREADS, refCount); 769 usedBlocksFound = true; 770 noOfBlocksWithRef++; 771 } 772 } 773 assertTrue(usedBlocksFound); 774 // the number of blocks referred 775 assertEquals(12, noOfBlocksWithRef); 776 CustomInnerRegionObserver.getCdl().get().countDown(); 777 for (ScanThread thread : scanThreads) { 778 thread.join(); 779 } 780 // giving some time for the block to be decremented 781 checkForBlockEviction(cache, true, false); 782 } finally { 783 if (table != null) { 784 table.close(); 785 } 786 } 787 } 788 789 private BlockCache setCacheProperties(HRegion region) { 790 Iterator<HStore> strItr = region.getStores().iterator(); 791 BlockCache cache = null; 792 while (strItr.hasNext()) { 793 HStore store = strItr.next(); 794 CacheConfig cacheConf = store.getCacheConfig(); 795 cacheConf.setCacheDataOnWrite(true); 796 cacheConf.setEvictOnClose(true); 797 // Use the last one 798 cache = cacheConf.getBlockCache().get(); 799 } 800 return cache; 801 } 802 803 @Test 804 public void testParallelGetsAndScanWithWrappedRegionScanner() throws IOException, 805 InterruptedException { 806 Table table = null; 807 try { 808 latch = new CountDownLatch(2); 809 // Check if get() returns blocks on its close() itself 810 getLatch = new CountDownLatch(1); 811 final TableName tableName = TableName.valueOf(name.getMethodName()); 812 // Create KV that will give you two blocks 813 // Create a table with block size as 1024 814 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 815 CustomInnerRegionObserverWrapper.class.getName()); 816 // get the block cache and region 817 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 818 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); 819 HRegion region = 820 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 821 HStore store = region.getStores().iterator().next(); 822 CacheConfig cacheConf = store.getCacheConfig(); 823 cacheConf.setCacheDataOnWrite(true); 824 cacheConf.setEvictOnClose(true); 825 BlockCache cache = cacheConf.getBlockCache().get(); 826 827 // insert data. 2 Rows are added 828 insertData(table); 829 // flush the data 830 System.out.println("Flushing cache"); 831 // Should create one Hfile with 2 blocks 832 region.flush(true); 833 // CustomInnerRegionObserver.sleepTime.set(5000); 834 // Create three sets of scan 835 CustomInnerRegionObserver.waitForGets.set(true); 836 ScanThread[] scanThreads = initiateScan(table, false); 837 // Create three sets of gets 838 GetThread[] getThreads = initiateGet(table, false, false); 839 // The block would have been decremented for the scan case as it was 840 // wrapped 841 // before even the postNext hook gets executed. 842 // giving some time for the block to be decremented 843 Thread.sleep(100); 844 CustomInnerRegionObserver.waitForGets.set(false); 845 checkForBlockEviction(cache, false, false); 846 // countdown the latch 847 CustomInnerRegionObserver.getCdl().get().countDown(); 848 for (GetThread thread : getThreads) { 849 thread.join(); 850 } 851 getLatch.countDown(); 852 for (ScanThread thread : scanThreads) { 853 thread.join(); 854 } 855 } finally { 856 if (table != null) { 857 table.close(); 858 } 859 } 860 } 861 862 @Test 863 public void testScanWithCompaction() throws IOException, InterruptedException { 864 testScanWithCompactionInternals(name.getMethodName(), false); 865 } 866 867 @Test 868 public void testReverseScanWithCompaction() throws IOException, InterruptedException { 869 testScanWithCompactionInternals(name.getMethodName(), true); 870 } 871 872 private void testScanWithCompactionInternals(String tableNameStr, boolean reversed) 873 throws IOException, InterruptedException { 874 Table table = null; 875 try { 876 latch = new CountDownLatch(1); 877 compactionLatch = new CountDownLatch(1); 878 TableName tableName = TableName.valueOf(tableNameStr); 879 // Create a table with block size as 1024 880 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 881 CustomInnerRegionObserverWrapper.class.getName()); 882 // get the block cache and region 883 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 884 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); 885 HRegion region = 886 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 887 HStore store = region.getStores().iterator().next(); 888 CacheConfig cacheConf = store.getCacheConfig(); 889 cacheConf.setCacheDataOnWrite(true); 890 cacheConf.setEvictOnClose(true); 891 BlockCache cache = cacheConf.getBlockCache().get(); 892 893 // insert data. 2 Rows are added 894 Put put = new Put(ROW); 895 put.addColumn(FAMILY, QUALIFIER, data); 896 table.put(put); 897 put = new Put(ROW1); 898 put.addColumn(FAMILY, QUALIFIER, data); 899 table.put(put); 900 assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data)); 901 // Should create one Hfile with 2 blocks 902 region.flush(true); 903 // read the data and expect same blocks, one new hit, no misses 904 int refCount = 0; 905 // Check how this miss is happening 906 // insert a second column, read the row, no new blocks, 3 new hits 907 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 908 byte[] data2 = Bytes.add(data, data); 909 put = new Put(ROW); 910 put.addColumn(FAMILY, QUALIFIER2, data2); 911 table.put(put); 912 // flush, one new block 913 System.out.println("Flushing cache"); 914 region.flush(true); 915 Iterator<CachedBlock> iterator = cache.iterator(); 916 iterateBlockCache(cache, iterator); 917 // Create three sets of scan 918 ScanThread[] scanThreads = initiateScan(table, reversed); 919 Thread.sleep(100); 920 iterator = cache.iterator(); 921 boolean usedBlocksFound = false; 922 while (iterator.hasNext()) { 923 CachedBlock next = iterator.next(); 924 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 925 if (cache instanceof BucketCache) { 926 refCount = ((BucketCache) cache).getRefCount(cacheKey); 927 } else if (cache instanceof CombinedBlockCache) { 928 refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey); 929 } else { 930 continue; 931 } 932 if (refCount != 0) { 933 // Blocks will be with count 3 934 assertEquals(NO_OF_THREADS, refCount); 935 usedBlocksFound = true; 936 } 937 } 938 assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound); 939 usedBlocksFound = false; 940 System.out.println("Compacting"); 941 assertEquals(2, store.getStorefilesCount()); 942 store.triggerMajorCompaction(); 943 region.compact(true); 944 waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max 945 assertEquals(1, store.getStorefilesCount()); 946 // Even after compaction is done we will have some blocks that cannot 947 // be evicted this is because the scan is still referencing them 948 iterator = cache.iterator(); 949 while (iterator.hasNext()) { 950 CachedBlock next = iterator.next(); 951 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 952 if (cache instanceof BucketCache) { 953 refCount = ((BucketCache) cache).getRefCount(cacheKey); 954 } else if (cache instanceof CombinedBlockCache) { 955 refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey); 956 } else { 957 continue; 958 } 959 if (refCount != 0) { 960 // Blocks will be with count 3 as they are not yet cleared 961 assertEquals(NO_OF_THREADS, refCount); 962 usedBlocksFound = true; 963 } 964 } 965 assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound); 966 // Should not throw exception 967 compactionLatch.countDown(); 968 latch.countDown(); 969 for (ScanThread thread : scanThreads) { 970 thread.join(); 971 } 972 // by this time all blocks should have been evicted 973 iterator = cache.iterator(); 974 iterateBlockCache(cache, iterator); 975 Result r = table.get(new Get(ROW)); 976 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data)); 977 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2)); 978 // The gets would be working on new blocks 979 iterator = cache.iterator(); 980 iterateBlockCache(cache, iterator); 981 } finally { 982 if (table != null) { 983 table.close(); 984 } 985 } 986 } 987 988 @Test 989 public void testBlockEvictionAfterHBASE13082WithCompactionAndFlush() 990 throws IOException, InterruptedException { 991 // do flush and scan in parallel 992 Table table = null; 993 try { 994 latch = new CountDownLatch(1); 995 compactionLatch = new CountDownLatch(1); 996 final TableName tableName = TableName.valueOf(name.getMethodName()); 997 // Create a table with block size as 1024 998 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 999 CustomInnerRegionObserverWrapper.class.getName()); 1000 // get the block cache and region 1001 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 1002 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); 1003 HRegion region = 1004 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 1005 HStore store = region.getStores().iterator().next(); 1006 CacheConfig cacheConf = store.getCacheConfig(); 1007 cacheConf.setCacheDataOnWrite(true); 1008 cacheConf.setEvictOnClose(true); 1009 BlockCache cache = cacheConf.getBlockCache().get(); 1010 1011 // insert data. 2 Rows are added 1012 Put put = new Put(ROW); 1013 put.addColumn(FAMILY, QUALIFIER, data); 1014 table.put(put); 1015 put = new Put(ROW1); 1016 put.addColumn(FAMILY, QUALIFIER, data); 1017 table.put(put); 1018 assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data)); 1019 // Should create one Hfile with 2 blocks 1020 region.flush(true); 1021 // read the data and expect same blocks, one new hit, no misses 1022 int refCount = 0; 1023 // Check how this miss is happening 1024 // insert a second column, read the row, no new blocks, 3 new hits 1025 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 1026 byte[] data2 = Bytes.add(data, data); 1027 put = new Put(ROW); 1028 put.addColumn(FAMILY, QUALIFIER2, data2); 1029 table.put(put); 1030 // flush, one new block 1031 System.out.println("Flushing cache"); 1032 region.flush(true); 1033 Iterator<CachedBlock> iterator = cache.iterator(); 1034 iterateBlockCache(cache, iterator); 1035 // Create three sets of scan 1036 ScanThread[] scanThreads = initiateScan(table, false); 1037 Thread.sleep(100); 1038 iterator = cache.iterator(); 1039 boolean usedBlocksFound = false; 1040 while (iterator.hasNext()) { 1041 CachedBlock next = iterator.next(); 1042 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 1043 if (cache instanceof BucketCache) { 1044 refCount = ((BucketCache) cache).getRefCount(cacheKey); 1045 } else if (cache instanceof CombinedBlockCache) { 1046 refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey); 1047 } else { 1048 continue; 1049 } 1050 if (refCount != 0) { 1051 // Blocks will be with count 3 1052 assertEquals(NO_OF_THREADS, refCount); 1053 usedBlocksFound = true; 1054 } 1055 } 1056 // Make a put and do a flush 1057 QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 1058 data2 = Bytes.add(data, data); 1059 put = new Put(ROW1); 1060 put.addColumn(FAMILY, QUALIFIER2, data2); 1061 table.put(put); 1062 // flush, one new block 1063 System.out.println("Flushing cache"); 1064 region.flush(true); 1065 assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound); 1066 usedBlocksFound = false; 1067 System.out.println("Compacting"); 1068 assertEquals(3, store.getStorefilesCount()); 1069 store.triggerMajorCompaction(); 1070 region.compact(true); 1071 waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max 1072 assertEquals(1, store.getStorefilesCount()); 1073 // Even after compaction is done we will have some blocks that cannot 1074 // be evicted this is because the scan is still referencing them 1075 iterator = cache.iterator(); 1076 while (iterator.hasNext()) { 1077 CachedBlock next = iterator.next(); 1078 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 1079 if (cache instanceof BucketCache) { 1080 refCount = ((BucketCache) cache).getRefCount(cacheKey); 1081 } else if (cache instanceof CombinedBlockCache) { 1082 refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey); 1083 } else { 1084 continue; 1085 } 1086 if (refCount != 0) { 1087 // Blocks will be with count 3 as they are not yet cleared 1088 assertEquals(NO_OF_THREADS, refCount); 1089 usedBlocksFound = true; 1090 } 1091 } 1092 assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound); 1093 // Should not throw exception 1094 compactionLatch.countDown(); 1095 latch.countDown(); 1096 for (ScanThread thread : scanThreads) { 1097 thread.join(); 1098 } 1099 // by this time all blocks should have been evicted 1100 iterator = cache.iterator(); 1101 // Since a flush and compaction happened after a scan started 1102 // we need to ensure that all the original blocks of the compacted file 1103 // is also removed. 1104 iterateBlockCache(cache, iterator); 1105 Result r = table.get(new Get(ROW)); 1106 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data)); 1107 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2)); 1108 // The gets would be working on new blocks 1109 iterator = cache.iterator(); 1110 iterateBlockCache(cache, iterator); 1111 } finally { 1112 if (table != null) { 1113 table.close(); 1114 } 1115 } 1116 } 1117 1118 1119 @Test 1120 public void testScanWithException() throws IOException, InterruptedException { 1121 Table table = null; 1122 try { 1123 latch = new CountDownLatch(1); 1124 exceptionLatch = new CountDownLatch(1); 1125 final TableName tableName = TableName.valueOf(name.getMethodName()); 1126 // Create KV that will give you two blocks 1127 // Create a table with block size as 1024 1128 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 1129 CustomInnerRegionObserverWrapper.class.getName()); 1130 // get the block cache and region 1131 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 1132 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); 1133 HRegion region = 1134 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 1135 HStore store = region.getStores().iterator().next(); 1136 CacheConfig cacheConf = store.getCacheConfig(); 1137 cacheConf.setCacheDataOnWrite(true); 1138 cacheConf.setEvictOnClose(true); 1139 BlockCache cache = cacheConf.getBlockCache().get(); 1140 // insert data. 2 Rows are added 1141 insertData(table); 1142 // flush the data 1143 System.out.println("Flushing cache"); 1144 // Should create one Hfile with 2 blocks 1145 region.flush(true); 1146 // CustomInnerRegionObserver.sleepTime.set(5000); 1147 CustomInnerRegionObserver.throwException.set(true); 1148 ScanThread[] scanThreads = initiateScan(table, false); 1149 // The block would have been decremented for the scan case as it was 1150 // wrapped 1151 // before even the postNext hook gets executed. 1152 // giving some time for the block to be decremented 1153 Thread.sleep(100); 1154 Iterator<CachedBlock> iterator = cache.iterator(); 1155 boolean usedBlocksFound = false; 1156 int refCount = 0; 1157 while (iterator.hasNext()) { 1158 CachedBlock next = iterator.next(); 1159 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 1160 if (cache instanceof BucketCache) { 1161 refCount = ((BucketCache) cache).getRefCount(cacheKey); 1162 } else if (cache instanceof CombinedBlockCache) { 1163 refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey); 1164 } else { 1165 continue; 1166 } 1167 if (refCount != 0) { 1168 // Blocks will be with count 3 1169 assertEquals(NO_OF_THREADS, refCount); 1170 usedBlocksFound = true; 1171 } 1172 } 1173 assertTrue(usedBlocksFound); 1174 exceptionLatch.countDown(); 1175 // countdown the latch 1176 CustomInnerRegionObserver.getCdl().get().countDown(); 1177 for (ScanThread thread : scanThreads) { 1178 thread.join(); 1179 } 1180 iterator = cache.iterator(); 1181 usedBlocksFound = false; 1182 refCount = 0; 1183 while (iterator.hasNext()) { 1184 CachedBlock next = iterator.next(); 1185 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 1186 if (cache instanceof BucketCache) { 1187 refCount = ((BucketCache) cache).getRefCount(cacheKey); 1188 } else if (cache instanceof CombinedBlockCache) { 1189 refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey); 1190 } else { 1191 continue; 1192 } 1193 if (refCount != 0) { 1194 // Blocks will be with count 3 1195 assertEquals(NO_OF_THREADS, refCount); 1196 usedBlocksFound = true; 1197 } 1198 } 1199 assertFalse(usedBlocksFound); 1200 // you should always see 0 ref count. since after HBASE-16604 we always recreate the scanner 1201 assertEquals(0, refCount); 1202 } finally { 1203 if (table != null) { 1204 table.close(); 1205 } 1206 } 1207 } 1208 1209 private void iterateBlockCache(BlockCache cache, Iterator<CachedBlock> iterator) { 1210 int refCount; 1211 while (iterator.hasNext()) { 1212 CachedBlock next = iterator.next(); 1213 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 1214 if (cache instanceof BucketCache) { 1215 refCount = ((BucketCache) cache).getRefCount(cacheKey); 1216 } else if (cache instanceof CombinedBlockCache) { 1217 refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey); 1218 } else { 1219 continue; 1220 } 1221 assertEquals(0, refCount); 1222 } 1223 } 1224 1225 private void insertData(Table table) throws IOException { 1226 Put put = new Put(ROW); 1227 put.addColumn(FAMILY, QUALIFIER, data); 1228 table.put(put); 1229 put = new Put(ROW1); 1230 put.addColumn(FAMILY, QUALIFIER, data); 1231 table.put(put); 1232 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 1233 put = new Put(ROW); 1234 put.addColumn(FAMILY, QUALIFIER2, data2); 1235 table.put(put); 1236 } 1237 1238 private ScanThread[] initiateScan(Table table, boolean reverse) throws IOException, 1239 InterruptedException { 1240 ScanThread[] scanThreads = new ScanThread[NO_OF_THREADS]; 1241 for (int i = 0; i < NO_OF_THREADS; i++) { 1242 scanThreads[i] = new ScanThread(table, reverse); 1243 } 1244 for (ScanThread thread : scanThreads) { 1245 thread.start(); 1246 } 1247 return scanThreads; 1248 } 1249 1250 private GetThread[] initiateGet(Table table, boolean tracker, boolean multipleCFs) 1251 throws IOException, InterruptedException { 1252 GetThread[] getThreads = new GetThread[NO_OF_THREADS]; 1253 for (int i = 0; i < NO_OF_THREADS; i++) { 1254 getThreads[i] = new GetThread(table, tracker, multipleCFs); 1255 } 1256 for (GetThread thread : getThreads) { 1257 thread.start(); 1258 } 1259 return getThreads; 1260 } 1261 1262 private MultiGetThread[] initiateMultiGet(Table table) 1263 throws IOException, InterruptedException { 1264 MultiGetThread[] multiGetThreads = new MultiGetThread[NO_OF_THREADS]; 1265 for (int i = 0; i < NO_OF_THREADS; i++) { 1266 multiGetThreads[i] = new MultiGetThread(table); 1267 } 1268 for (MultiGetThread thread : multiGetThreads) { 1269 thread.start(); 1270 } 1271 return multiGetThreads; 1272 } 1273 1274 private void checkForBlockEviction(BlockCache cache, boolean getClosed, boolean expectOnlyZero) 1275 throws InterruptedException { 1276 int counter = NO_OF_THREADS; 1277 if (CustomInnerRegionObserver.waitForGets.get()) { 1278 // Because only one row is selected, it has only 2 blocks 1279 counter = counter - 1; 1280 while (CustomInnerRegionObserver.countOfGets.get() < NO_OF_THREADS) { 1281 Thread.sleep(100); 1282 } 1283 } else { 1284 while (CustomInnerRegionObserver.countOfNext.get() < NO_OF_THREADS) { 1285 Thread.sleep(100); 1286 } 1287 } 1288 Iterator<CachedBlock> iterator = cache.iterator(); 1289 int refCount = 0; 1290 while (iterator.hasNext()) { 1291 CachedBlock next = iterator.next(); 1292 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 1293 if (cache instanceof BucketCache) { 1294 refCount = ((BucketCache) cache).getRefCount(cacheKey); 1295 } else if (cache instanceof CombinedBlockCache) { 1296 refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey); 1297 } else { 1298 continue; 1299 } 1300 System.out.println(" the refcount is " + refCount + " block is " + cacheKey); 1301 if (CustomInnerRegionObserver.waitForGets.get()) { 1302 if (expectOnlyZero) { 1303 assertTrue(refCount == 0); 1304 } 1305 if (refCount != 0) { 1306 // Because the scan would have also touched up on these blocks but 1307 // it 1308 // would have touched 1309 // all 3 1310 if (getClosed) { 1311 // If get has closed only the scan's blocks would be available 1312 assertEquals(refCount, CustomInnerRegionObserver.countOfGets.get()); 1313 } else { 1314 assertEquals(refCount, CustomInnerRegionObserver.countOfGets.get() + (NO_OF_THREADS)); 1315 } 1316 } 1317 } else { 1318 // Because the get would have also touched up on these blocks but it 1319 // would have touched 1320 // upon only 2 additionally 1321 if (expectOnlyZero) { 1322 assertTrue(refCount == 0); 1323 } 1324 if (refCount != 0) { 1325 if (getLatch == null) { 1326 assertEquals(refCount, CustomInnerRegionObserver.countOfNext.get()); 1327 } else { 1328 assertEquals(refCount, CustomInnerRegionObserver.countOfNext.get() + (NO_OF_THREADS)); 1329 } 1330 } 1331 } 1332 } 1333 CustomInnerRegionObserver.getCdl().get().countDown(); 1334 } 1335 1336 private static class MultiGetThread extends Thread { 1337 private final Table table; 1338 private final List<Get> gets = new ArrayList<>(); 1339 public MultiGetThread(Table table) { 1340 this.table = table; 1341 } 1342 @Override 1343 public void run() { 1344 gets.add(new Get(ROW)); 1345 gets.add(new Get(ROW1)); 1346 try { 1347 CustomInnerRegionObserver.getCdl().set(latch); 1348 Result[] r = table.get(gets); 1349 assertTrue(Bytes.equals(r[0].getRow(), ROW)); 1350 assertTrue(Bytes.equals(r[1].getRow(), ROW1)); 1351 } catch (IOException e) { 1352 } 1353 } 1354 } 1355 1356 private static class GetThread extends Thread { 1357 private final Table table; 1358 private final boolean tracker; 1359 private final boolean multipleCFs; 1360 1361 public GetThread(Table table, boolean tracker, boolean multipleCFs) { 1362 this.table = table; 1363 this.tracker = tracker; 1364 this.multipleCFs = multipleCFs; 1365 } 1366 1367 @Override 1368 public void run() { 1369 try { 1370 initiateGet(table); 1371 } catch (IOException e) { 1372 // do nothing 1373 } 1374 } 1375 1376 private void initiateGet(Table table) throws IOException { 1377 Get get = new Get(ROW); 1378 if (tracker) { 1379 // Change this 1380 if (!multipleCFs) { 1381 get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 3)); 1382 get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 8)); 1383 get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 9)); 1384 // Unknown key 1385 get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 900)); 1386 } else { 1387 get.addColumn(Bytes.toBytes("testFamily" + 3), Bytes.toBytes("testQualifier" + 3)); 1388 get.addColumn(Bytes.toBytes("testFamily" + 8), Bytes.toBytes("testQualifier" + 8)); 1389 get.addColumn(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 9)); 1390 // Unknown key 1391 get.addColumn(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 900)); 1392 } 1393 } 1394 CustomInnerRegionObserver.getCdl().set(latch); 1395 Result r = table.get(get); 1396 System.out.println(r); 1397 if (!tracker) { 1398 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data)); 1399 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2)); 1400 } else { 1401 if (!multipleCFs) { 1402 assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 3)), data2)); 1403 assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 8)), data2)); 1404 assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 9)), data2)); 1405 } else { 1406 assertTrue(Bytes.equals( 1407 r.getValue(Bytes.toBytes("testFamily" + 3), Bytes.toBytes("testQualifier" + 3)), 1408 data2)); 1409 assertTrue(Bytes.equals( 1410 r.getValue(Bytes.toBytes("testFamily" + 8), Bytes.toBytes("testQualifier" + 8)), 1411 data2)); 1412 assertTrue(Bytes.equals( 1413 r.getValue(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 9)), 1414 data2)); 1415 } 1416 } 1417 } 1418 } 1419 1420 private static class ScanThread extends Thread { 1421 private final Table table; 1422 private final boolean reverse; 1423 1424 public ScanThread(Table table, boolean reverse) { 1425 this.table = table; 1426 this.reverse = reverse; 1427 } 1428 1429 @Override 1430 public void run() { 1431 try { 1432 initiateScan(table); 1433 } catch (IOException e) { 1434 // do nothing 1435 } 1436 } 1437 1438 private void initiateScan(Table table) throws IOException { 1439 Scan scan = new Scan(); 1440 if (reverse) { 1441 scan.setReversed(true); 1442 } 1443 CustomInnerRegionObserver.getCdl().set(latch); 1444 ResultScanner resScanner = table.getScanner(scan); 1445 int i = (reverse ? ROWS.length - 1 : 0); 1446 boolean resultFound = false; 1447 for (Result result : resScanner) { 1448 resultFound = true; 1449 System.out.println(result); 1450 if (!reverse) { 1451 assertTrue(Bytes.equals(result.getRow(), ROWS[i])); 1452 i++; 1453 } else { 1454 assertTrue(Bytes.equals(result.getRow(), ROWS[i])); 1455 i--; 1456 } 1457 } 1458 assertTrue(resultFound); 1459 } 1460 } 1461 1462 private void waitForStoreFileCount(HStore store, int count, int timeout) 1463 throws InterruptedException { 1464 long start = System.currentTimeMillis(); 1465 while (start + timeout > System.currentTimeMillis() && store.getStorefilesCount() != count) { 1466 Thread.sleep(100); 1467 } 1468 System.out.println("start=" + start + ", now=" + System.currentTimeMillis() + ", cur=" + 1469 store.getStorefilesCount()); 1470 assertEquals(count, store.getStorefilesCount()); 1471 } 1472 1473 private static class CustomScanner implements RegionScanner { 1474 1475 private RegionScanner delegate; 1476 1477 public CustomScanner(RegionScanner delegate) { 1478 this.delegate = delegate; 1479 } 1480 1481 @Override 1482 public boolean next(List<Cell> results) throws IOException { 1483 return delegate.next(results); 1484 } 1485 1486 @Override 1487 public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException { 1488 return delegate.next(result, scannerContext); 1489 } 1490 1491 @Override 1492 public boolean nextRaw(List<Cell> result) throws IOException { 1493 return delegate.nextRaw(result); 1494 } 1495 1496 @Override 1497 public boolean nextRaw(List<Cell> result, ScannerContext context) throws IOException { 1498 boolean nextRaw = delegate.nextRaw(result, context); 1499 if (compactionLatch != null && compactionLatch.getCount() > 0) { 1500 try { 1501 compactionLatch.await(); 1502 } catch (InterruptedException ie) { 1503 } 1504 } 1505 1506 if (CustomInnerRegionObserver.throwException.get()) { 1507 if (exceptionLatch.getCount() > 0) { 1508 try { 1509 exceptionLatch.await(); 1510 } catch (InterruptedException e) { 1511 } 1512 throw new IOException("throw exception"); 1513 } 1514 } 1515 return nextRaw; 1516 } 1517 1518 @Override 1519 public void close() throws IOException { 1520 delegate.close(); 1521 } 1522 1523 @Override 1524 public RegionInfo getRegionInfo() { 1525 return delegate.getRegionInfo(); 1526 } 1527 1528 @Override 1529 public boolean isFilterDone() throws IOException { 1530 return delegate.isFilterDone(); 1531 } 1532 1533 @Override 1534 public boolean reseek(byte[] row) throws IOException { 1535 return false; 1536 } 1537 1538 @Override 1539 public long getMaxResultSize() { 1540 return delegate.getMaxResultSize(); 1541 } 1542 1543 @Override 1544 public long getMvccReadPoint() { 1545 return delegate.getMvccReadPoint(); 1546 } 1547 1548 @Override 1549 public int getBatch() { 1550 return delegate.getBatch(); 1551 } 1552 } 1553 1554 public static class CustomInnerRegionObserverWrapper extends CustomInnerRegionObserver { 1555 @Override 1556 public RegionScanner postScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e, 1557 Scan scan, RegionScanner s) throws IOException { 1558 return new CustomScanner(s); 1559 } 1560 } 1561 1562 public static class CustomInnerRegionObserver implements RegionCoprocessor, RegionObserver { 1563 static final AtomicLong sleepTime = new AtomicLong(0); 1564 static final AtomicBoolean slowDownNext = new AtomicBoolean(false); 1565 static final AtomicInteger countOfNext = new AtomicInteger(0); 1566 static final AtomicInteger countOfGets = new AtomicInteger(0); 1567 static final AtomicBoolean waitForGets = new AtomicBoolean(false); 1568 static final AtomicBoolean throwException = new AtomicBoolean(false); 1569 private static final AtomicReference<CountDownLatch> cdl = new AtomicReference<>( 1570 new CountDownLatch(0)); 1571 1572 @Override 1573 public Optional<RegionObserver> getRegionObserver() { 1574 return Optional.of(this); 1575 } 1576 1577 @Override 1578 public boolean postScannerNext(ObserverContext<RegionCoprocessorEnvironment> e, 1579 InternalScanner s, List<Result> results, int limit, boolean hasMore) throws IOException { 1580 slowdownCode(e, false); 1581 if (getLatch != null && getLatch.getCount() > 0) { 1582 try { 1583 getLatch.await(); 1584 } catch (InterruptedException e1) { 1585 } 1586 } 1587 return hasMore; 1588 } 1589 1590 @Override 1591 public void postGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get, 1592 List<Cell> results) throws IOException { 1593 slowdownCode(e, true); 1594 } 1595 1596 public static AtomicReference<CountDownLatch> getCdl() { 1597 return cdl; 1598 } 1599 1600 private void slowdownCode(final ObserverContext<RegionCoprocessorEnvironment> e, 1601 boolean isGet) { 1602 CountDownLatch latch = getCdl().get(); 1603 try { 1604 System.out.println(latch.getCount() + " is the count " + isGet); 1605 if (latch.getCount() > 0) { 1606 if (isGet) { 1607 countOfGets.incrementAndGet(); 1608 } else { 1609 countOfNext.incrementAndGet(); 1610 } 1611 LOG.info("Waiting for the counterCountDownLatch"); 1612 latch.await(2, TimeUnit.MINUTES); // To help the tests to finish. 1613 if (latch.getCount() > 0) { 1614 throw new RuntimeException("Can't wait more"); 1615 } 1616 } 1617 } catch (InterruptedException e1) { 1618 LOG.error(e1.toString(), e1); 1619 } 1620 } 1621 } 1622}