001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.Iterator; 027import java.util.List; 028import java.util.Optional; 029import java.util.concurrent.CountDownLatch; 030import java.util.concurrent.TimeUnit; 031import java.util.concurrent.atomic.AtomicBoolean; 032import java.util.concurrent.atomic.AtomicInteger; 033import java.util.concurrent.atomic.AtomicLong; 034import java.util.concurrent.atomic.AtomicReference; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.hbase.Cell; 037import org.apache.hadoop.hbase.HBaseClassTestRule; 038import org.apache.hadoop.hbase.HBaseTestingUtility; 039import org.apache.hadoop.hbase.HConstants; 040import org.apache.hadoop.hbase.ServerName; 041import org.apache.hadoop.hbase.TableName; 042import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 043import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; 044import org.apache.hadoop.hbase.coprocessor.ObserverContext; 045import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 046import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 047import org.apache.hadoop.hbase.coprocessor.RegionObserver; 048import org.apache.hadoop.hbase.io.hfile.BlockCache; 049import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 050import org.apache.hadoop.hbase.io.hfile.CacheConfig; 051import org.apache.hadoop.hbase.io.hfile.CachedBlock; 052import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache; 053import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; 054import org.apache.hadoop.hbase.regionserver.HRegion; 055import org.apache.hadoop.hbase.regionserver.HStore; 056import org.apache.hadoop.hbase.regionserver.InternalScanner; 057import org.apache.hadoop.hbase.regionserver.RegionScanner; 058import org.apache.hadoop.hbase.regionserver.ScannerContext; 059import org.apache.hadoop.hbase.testclassification.ClientTests; 060import org.apache.hadoop.hbase.testclassification.LargeTests; 061import org.apache.hadoop.hbase.util.Bytes; 062import org.junit.After; 063import org.junit.AfterClass; 064import org.junit.Before; 065import org.junit.BeforeClass; 066import org.junit.ClassRule; 067import org.junit.Rule; 068import org.junit.Test; 069import org.junit.experimental.categories.Category; 070import org.junit.rules.TestName; 071import org.slf4j.Logger; 072import org.slf4j.LoggerFactory; 073 074import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 075 076@Category({ LargeTests.class, ClientTests.class }) 077@SuppressWarnings("deprecation") 078public class TestBlockEvictionFromClient { 079 080 @ClassRule 081 public static final HBaseClassTestRule CLASS_RULE = 082 HBaseClassTestRule.forClass(TestBlockEvictionFromClient.class); 083 084 private static final Logger LOG = LoggerFactory.getLogger(TestBlockEvictionFromClient.class); 085 protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 086 static byte[][] ROWS = new byte[2][]; 087 private static int NO_OF_THREADS = 3; 088 private static byte[] ROW = Bytes.toBytes("testRow"); 089 private static byte[] ROW1 = Bytes.toBytes("testRow1"); 090 private static byte[] ROW2 = Bytes.toBytes("testRow2"); 091 private static byte[] ROW3 = Bytes.toBytes("testRow3"); 092 private static byte[] FAMILY = Bytes.toBytes("testFamily"); 093 private static byte[][] FAMILIES_1 = new byte[1][0]; 094 private static byte[] QUALIFIER = Bytes.toBytes("testQualifier"); 095 private static byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 096 private static byte[] data = new byte[1000]; 097 private static byte[] data2 = Bytes.add(data, data); 098 protected static int SLAVES = 1; 099 private static CountDownLatch latch; 100 private static CountDownLatch getLatch; 101 private static CountDownLatch compactionLatch; 102 private static CountDownLatch exceptionLatch; 103 104 @Rule 105 public TestName name = new TestName(); 106 107 /** 108 * @throws java.lang.Exception 109 */ 110 @BeforeClass 111 public static void setUpBeforeClass() throws Exception { 112 ROWS[0] = ROW; 113 ROWS[1] = ROW1; 114 Configuration conf = TEST_UTIL.getConfiguration(); 115 conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, 116 MultiRowMutationEndpoint.class.getName()); 117 conf.setBoolean("hbase.table.sanity.checks", true); // enable for below 118 // tests 119 conf.setInt("hbase.regionserver.handler.count", 20); 120 conf.setInt("hbase.bucketcache.size", 400); 121 conf.setStrings(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap"); 122 conf.setFloat("hfile.block.cache.size", 0.2f); 123 conf.setFloat("hbase.regionserver.global.memstore.size", 0.1f); 124 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);// do not retry 125 conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 5000); 126 FAMILIES_1[0] = FAMILY; 127 TEST_UTIL.startMiniCluster(SLAVES); 128 } 129 130 /** 131 * @throws java.lang.Exception 132 */ 133 @AfterClass 134 public static void tearDownAfterClass() throws Exception { 135 TEST_UTIL.shutdownMiniCluster(); 136 } 137 138 /** 139 * @throws java.lang.Exception 140 */ 141 @Before 142 public void setUp() throws Exception { 143 CustomInnerRegionObserver.waitForGets.set(false); 144 CustomInnerRegionObserver.countOfNext.set(0); 145 CustomInnerRegionObserver.countOfGets.set(0); 146 } 147 148 /** 149 * @throws java.lang.Exception 150 */ 151 @After 152 public void tearDown() throws Exception { 153 if (latch != null) { 154 while (latch.getCount() > 0) { 155 latch.countDown(); 156 } 157 } 158 if (getLatch != null) { 159 getLatch.countDown(); 160 } 161 if (compactionLatch != null) { 162 compactionLatch.countDown(); 163 } 164 if (exceptionLatch != null) { 165 exceptionLatch.countDown(); 166 } 167 latch = null; 168 getLatch = null; 169 compactionLatch = null; 170 exceptionLatch = null; 171 CustomInnerRegionObserver.throwException.set(false); 172 // Clean up the tables for every test case 173 TableName[] listTableNames = TEST_UTIL.getAdmin().listTableNames(); 174 for (TableName tableName : listTableNames) { 175 if (!tableName.isSystemTable()) { 176 TEST_UTIL.getAdmin().disableTable(tableName); 177 TEST_UTIL.getAdmin().deleteTable(tableName); 178 } 179 } 180 } 181 182 @Test 183 public void testBlockEvictionWithParallelScans() throws Exception { 184 Table table = null; 185 try { 186 latch = new CountDownLatch(1); 187 final TableName tableName = TableName.valueOf(name.getMethodName()); 188 // Create a table with block size as 1024 189 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 190 CustomInnerRegionObserver.class.getName()); 191 // get the block cache and region 192 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 193 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); 194 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName) 195 .getRegion(regionName); 196 HStore store = region.getStores().iterator().next(); 197 CacheConfig cacheConf = store.getCacheConfig(); 198 cacheConf.setCacheDataOnWrite(true); 199 cacheConf.setEvictOnClose(true); 200 BlockCache cache = cacheConf.getBlockCache(); 201 202 // insert data. 2 Rows are added 203 Put put = new Put(ROW); 204 put.addColumn(FAMILY, QUALIFIER, data); 205 table.put(put); 206 put = new Put(ROW1); 207 put.addColumn(FAMILY, QUALIFIER, data); 208 table.put(put); 209 assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data)); 210 // data was in memstore so don't expect any changes 211 // flush the data 212 // Should create one Hfile with 2 blocks 213 region.flush(true); 214 // Load cache 215 // Create three sets of scan 216 ScanThread[] scanThreads = initiateScan(table, false); 217 Thread.sleep(100); 218 checkForBlockEviction(cache, false, false); 219 for (ScanThread thread : scanThreads) { 220 thread.join(); 221 } 222 // CustomInnerRegionObserver.sleepTime.set(0); 223 Iterator<CachedBlock> iterator = cache.iterator(); 224 iterateBlockCache(cache, iterator); 225 // read the data and expect same blocks, one new hit, no misses 226 assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data)); 227 iterator = cache.iterator(); 228 iterateBlockCache(cache, iterator); 229 // Check how this miss is happening 230 // insert a second column, read the row, no new blocks, 3 new hits 231 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 232 byte[] data2 = Bytes.add(data, data); 233 put = new Put(ROW); 234 put.addColumn(FAMILY, QUALIFIER2, data2); 235 table.put(put); 236 Result r = table.get(new Get(ROW)); 237 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data)); 238 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2)); 239 iterator = cache.iterator(); 240 iterateBlockCache(cache, iterator); 241 // flush, one new block 242 System.out.println("Flushing cache"); 243 region.flush(true); 244 iterator = cache.iterator(); 245 iterateBlockCache(cache, iterator); 246 // compact, net minus two blocks, two hits, no misses 247 System.out.println("Compacting"); 248 assertEquals(2, store.getStorefilesCount()); 249 store.triggerMajorCompaction(); 250 region.compact(true); 251 waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max 252 assertEquals(1, store.getStorefilesCount()); 253 iterator = cache.iterator(); 254 iterateBlockCache(cache, iterator); 255 // read the row, this should be a cache miss because we don't cache data 256 // blocks on compaction 257 r = table.get(new Get(ROW)); 258 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data)); 259 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2)); 260 iterator = cache.iterator(); 261 iterateBlockCache(cache, iterator); 262 } finally { 263 if (table != null) { 264 table.close(); 265 } 266 } 267 } 268 269 @Test 270 public void testParallelGetsAndScans() throws IOException, InterruptedException { 271 Table table = null; 272 try { 273 latch = new CountDownLatch(2); 274 // Check if get() returns blocks on its close() itself 275 getLatch = new CountDownLatch(1); 276 final TableName tableName = TableName.valueOf(name.getMethodName()); 277 // Create KV that will give you two blocks 278 // Create a table with block size as 1024 279 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 280 CustomInnerRegionObserver.class.getName()); 281 // get the block cache and region 282 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 283 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); 284 HRegion region = 285 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 286 HStore store = region.getStores().iterator().next(); 287 CacheConfig cacheConf = store.getCacheConfig(); 288 cacheConf.setCacheDataOnWrite(true); 289 cacheConf.setEvictOnClose(true); 290 BlockCache cache = cacheConf.getBlockCache(); 291 292 insertData(table); 293 // flush the data 294 System.out.println("Flushing cache"); 295 // Should create one Hfile with 2 blocks 296 region.flush(true); 297 // Create three sets of scan 298 CustomInnerRegionObserver.waitForGets.set(true); 299 ScanThread[] scanThreads = initiateScan(table, false); 300 // Create three sets of gets 301 GetThread[] getThreads = initiateGet(table, false, false); 302 checkForBlockEviction(cache, false, false); 303 CustomInnerRegionObserver.waitForGets.set(false); 304 checkForBlockEviction(cache, false, false); 305 for (GetThread thread : getThreads) { 306 thread.join(); 307 } 308 // Verify whether the gets have returned the blocks that it had 309 CustomInnerRegionObserver.waitForGets.set(true); 310 // giving some time for the block to be decremented 311 checkForBlockEviction(cache, true, false); 312 getLatch.countDown(); 313 for (ScanThread thread : scanThreads) { 314 thread.join(); 315 } 316 System.out.println("Scans should have returned the bloks"); 317 // Check with either true or false 318 CustomInnerRegionObserver.waitForGets.set(false); 319 // The scan should also have released the blocks by now 320 checkForBlockEviction(cache, true, true); 321 } finally { 322 if (table != null) { 323 table.close(); 324 } 325 } 326 } 327 328 @Test 329 public void testGetWithCellsInDifferentFiles() throws IOException, InterruptedException { 330 Table table = null; 331 try { 332 latch = new CountDownLatch(1); 333 // Check if get() returns blocks on its close() itself 334 getLatch = new CountDownLatch(1); 335 final TableName tableName = TableName.valueOf(name.getMethodName()); 336 // Create KV that will give you two blocks 337 // Create a table with block size as 1024 338 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 339 CustomInnerRegionObserver.class.getName()); 340 // get the block cache and region 341 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 342 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); 343 HRegion region = 344 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 345 HStore store = region.getStores().iterator().next(); 346 CacheConfig cacheConf = store.getCacheConfig(); 347 cacheConf.setCacheDataOnWrite(true); 348 cacheConf.setEvictOnClose(true); 349 BlockCache cache = cacheConf.getBlockCache(); 350 351 Put put = new Put(ROW); 352 put.addColumn(FAMILY, QUALIFIER, data); 353 table.put(put); 354 region.flush(true); 355 put = new Put(ROW1); 356 put.addColumn(FAMILY, QUALIFIER, data); 357 table.put(put); 358 region.flush(true); 359 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 360 put = new Put(ROW); 361 put.addColumn(FAMILY, QUALIFIER2, data2); 362 table.put(put); 363 region.flush(true); 364 // flush the data 365 System.out.println("Flushing cache"); 366 // Should create one Hfile with 2 blocks 367 CustomInnerRegionObserver.waitForGets.set(true); 368 // Create three sets of gets 369 GetThread[] getThreads = initiateGet(table, false, false); 370 Thread.sleep(200); 371 CustomInnerRegionObserver.getCdl().get().countDown(); 372 for (GetThread thread : getThreads) { 373 thread.join(); 374 } 375 // Verify whether the gets have returned the blocks that it had 376 CustomInnerRegionObserver.waitForGets.set(true); 377 // giving some time for the block to be decremented 378 checkForBlockEviction(cache, true, false); 379 getLatch.countDown(); 380 System.out.println("Gets should have returned the bloks"); 381 } finally { 382 if (table != null) { 383 table.close(); 384 } 385 } 386 } 387 388 @Test 389 // TODO : check how block index works here 390 public void testGetsWithMultiColumnsAndExplicitTracker() 391 throws IOException, InterruptedException { 392 Table table = null; 393 try { 394 latch = new CountDownLatch(1); 395 // Check if get() returns blocks on its close() itself 396 getLatch = new CountDownLatch(1); 397 final TableName tableName = TableName.valueOf(name.getMethodName()); 398 // Create KV that will give you two blocks 399 // Create a table with block size as 1024 400 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 401 CustomInnerRegionObserver.class.getName()); 402 // get the block cache and region 403 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 404 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); 405 HRegion region = 406 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 407 BlockCache cache = setCacheProperties(region); 408 Put put = new Put(ROW); 409 put.addColumn(FAMILY, QUALIFIER, data); 410 table.put(put); 411 region.flush(true); 412 put = new Put(ROW1); 413 put.addColumn(FAMILY, QUALIFIER, data); 414 table.put(put); 415 region.flush(true); 416 for (int i = 1; i < 10; i++) { 417 put = new Put(ROW); 418 put.addColumn(FAMILY, Bytes.toBytes("testQualifier" + i), data2); 419 table.put(put); 420 if (i % 2 == 0) { 421 region.flush(true); 422 } 423 } 424 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 425 put = new Put(ROW); 426 put.addColumn(FAMILY, QUALIFIER2, data2); 427 table.put(put); 428 region.flush(true); 429 // flush the data 430 System.out.println("Flushing cache"); 431 // Should create one Hfile with 2 blocks 432 CustomInnerRegionObserver.waitForGets.set(true); 433 // Create three sets of gets 434 GetThread[] getThreads = initiateGet(table, true, false); 435 Thread.sleep(200); 436 Iterator<CachedBlock> iterator = cache.iterator(); 437 boolean usedBlocksFound = false; 438 int refCount = 0; 439 int noOfBlocksWithRef = 0; 440 while (iterator.hasNext()) { 441 CachedBlock next = iterator.next(); 442 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 443 if (cache instanceof BucketCache) { 444 refCount = ((BucketCache) cache).getRefCount(cacheKey); 445 } else if (cache instanceof CombinedBlockCache) { 446 refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey); 447 } else { 448 continue; 449 } 450 if (refCount != 0) { 451 // Blocks will be with count 3 452 System.out.println("The refCount is " + refCount); 453 assertEquals(NO_OF_THREADS, refCount); 454 usedBlocksFound = true; 455 noOfBlocksWithRef++; 456 } 457 } 458 assertTrue(usedBlocksFound); 459 // the number of blocks referred 460 assertEquals(10, noOfBlocksWithRef); 461 CustomInnerRegionObserver.getCdl().get().countDown(); 462 for (GetThread thread : getThreads) { 463 thread.join(); 464 } 465 // Verify whether the gets have returned the blocks that it had 466 CustomInnerRegionObserver.waitForGets.set(true); 467 // giving some time for the block to be decremented 468 checkForBlockEviction(cache, true, false); 469 getLatch.countDown(); 470 System.out.println("Gets should have returned the bloks"); 471 } finally { 472 if (table != null) { 473 table.close(); 474 } 475 } 476 } 477 478 @Test 479 public void testGetWithMultipleColumnFamilies() throws IOException, InterruptedException { 480 Table table = null; 481 try { 482 latch = new CountDownLatch(1); 483 // Check if get() returns blocks on its close() itself 484 getLatch = new CountDownLatch(1); 485 final TableName tableName = TableName.valueOf(name.getMethodName()); 486 // Create KV that will give you two blocks 487 // Create a table with block size as 1024 488 byte[][] fams = new byte[10][]; 489 fams[0] = FAMILY; 490 for (int i = 1; i < 10; i++) { 491 fams[i] = (Bytes.toBytes("testFamily" + i)); 492 } 493 table = TEST_UTIL.createTable(tableName, fams, 1, 1024, 494 CustomInnerRegionObserver.class.getName()); 495 // get the block cache and region 496 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 497 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); 498 HRegion region = 499 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 500 BlockCache cache = setCacheProperties(region); 501 502 Put put = new Put(ROW); 503 put.addColumn(FAMILY, QUALIFIER, data); 504 table.put(put); 505 region.flush(true); 506 put = new Put(ROW1); 507 put.addColumn(FAMILY, QUALIFIER, data); 508 table.put(put); 509 region.flush(true); 510 for (int i = 1; i < 10; i++) { 511 put = new Put(ROW); 512 put.addColumn(Bytes.toBytes("testFamily" + i), Bytes.toBytes("testQualifier" + i), data2); 513 table.put(put); 514 if (i % 2 == 0) { 515 region.flush(true); 516 } 517 } 518 region.flush(true); 519 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 520 put = new Put(ROW); 521 put.addColumn(FAMILY, QUALIFIER2, data2); 522 table.put(put); 523 region.flush(true); 524 // flush the data 525 System.out.println("Flushing cache"); 526 // Should create one Hfile with 2 blocks 527 CustomInnerRegionObserver.waitForGets.set(true); 528 // Create three sets of gets 529 GetThread[] getThreads = initiateGet(table, true, true); 530 Thread.sleep(200); 531 Iterator<CachedBlock> iterator = cache.iterator(); 532 boolean usedBlocksFound = false; 533 int refCount = 0; 534 int noOfBlocksWithRef = 0; 535 while (iterator.hasNext()) { 536 CachedBlock next = iterator.next(); 537 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 538 if (cache instanceof BucketCache) { 539 refCount = ((BucketCache) cache).getRefCount(cacheKey); 540 } else if (cache instanceof CombinedBlockCache) { 541 refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey); 542 } else { 543 continue; 544 } 545 if (refCount != 0) { 546 // Blocks will be with count 3 547 System.out.println("The refCount is " + refCount); 548 assertEquals(NO_OF_THREADS, refCount); 549 usedBlocksFound = true; 550 noOfBlocksWithRef++; 551 } 552 } 553 assertTrue(usedBlocksFound); 554 // the number of blocks referred 555 assertEquals(3, noOfBlocksWithRef); 556 CustomInnerRegionObserver.getCdl().get().countDown(); 557 for (GetThread thread : getThreads) { 558 thread.join(); 559 } 560 // Verify whether the gets have returned the blocks that it had 561 CustomInnerRegionObserver.waitForGets.set(true); 562 // giving some time for the block to be decremented 563 checkForBlockEviction(cache, true, false); 564 getLatch.countDown(); 565 System.out.println("Gets should have returned the bloks"); 566 } finally { 567 if (table != null) { 568 table.close(); 569 } 570 } 571 } 572 573 @Test 574 public void testBlockRefCountAfterSplits() throws IOException, InterruptedException { 575 Table table = null; 576 try { 577 final TableName tableName = TableName.valueOf(name.getMethodName()); 578 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024); 579 // get the block cache and region 580 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 581 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 582 HRegion region = 583 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 584 HStore store = region.getStores().iterator().next(); 585 CacheConfig cacheConf = store.getCacheConfig(); 586 cacheConf.setEvictOnClose(true); 587 BlockCache cache = cacheConf.getBlockCache(); 588 589 Put put = new Put(ROW); 590 put.addColumn(FAMILY, QUALIFIER, data); 591 table.put(put); 592 region.flush(true); 593 put = new Put(ROW1); 594 put.addColumn(FAMILY, QUALIFIER, data); 595 table.put(put); 596 region.flush(true); 597 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 598 put = new Put(ROW2); 599 put.addColumn(FAMILY, QUALIFIER2, data2); 600 table.put(put); 601 put = new Put(ROW3); 602 put.addColumn(FAMILY, QUALIFIER2, data2); 603 table.put(put); 604 region.flush(true); 605 ServerName rs = Iterables.getOnlyElement(TEST_UTIL.getAdmin().getRegionServers()); 606 int regionCount = TEST_UTIL.getAdmin().getRegions(rs).size(); 607 LOG.info("About to SPLIT on " + Bytes.toString(ROW1)); 608 TEST_UTIL.getAdmin().split(tableName, ROW1); 609 // Wait for splits 610 TEST_UTIL.waitFor(60000, () -> TEST_UTIL.getAdmin().getRegions(rs).size() > regionCount); 611 region.compact(true); 612 Iterator<CachedBlock> iterator = cache.iterator(); 613 // Though the split had created the HalfStorefileReader - the firstkey and lastkey scanners 614 // should be closed inorder to return those blocks 615 iterateBlockCache(cache, iterator); 616 } finally { 617 if (table != null) { 618 table.close(); 619 } 620 } 621 } 622 623 @Test 624 public void testMultiGets() throws IOException, InterruptedException { 625 Table table = null; 626 try { 627 latch = new CountDownLatch(2); 628 // Check if get() returns blocks on its close() itself 629 getLatch = new CountDownLatch(1); 630 final TableName tableName = TableName.valueOf(name.getMethodName()); 631 // Create KV that will give you two blocks 632 // Create a table with block size as 1024 633 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 634 CustomInnerRegionObserver.class.getName()); 635 // get the block cache and region 636 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 637 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); 638 HRegion region = 639 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 640 HStore store = region.getStores().iterator().next(); 641 CacheConfig cacheConf = store.getCacheConfig(); 642 cacheConf.setCacheDataOnWrite(true); 643 cacheConf.setEvictOnClose(true); 644 BlockCache cache = cacheConf.getBlockCache(); 645 646 Put put = new Put(ROW); 647 put.addColumn(FAMILY, QUALIFIER, data); 648 table.put(put); 649 region.flush(true); 650 put = new Put(ROW1); 651 put.addColumn(FAMILY, QUALIFIER, data); 652 table.put(put); 653 region.flush(true); 654 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 655 put = new Put(ROW); 656 put.addColumn(FAMILY, QUALIFIER2, data2); 657 table.put(put); 658 region.flush(true); 659 // flush the data 660 System.out.println("Flushing cache"); 661 // Should create one Hfile with 2 blocks 662 CustomInnerRegionObserver.waitForGets.set(true); 663 // Create three sets of gets 664 MultiGetThread[] getThreads = initiateMultiGet(table); 665 Thread.sleep(200); 666 int refCount; 667 Iterator<CachedBlock> iterator = cache.iterator(); 668 boolean foundNonZeroBlock = false; 669 while (iterator.hasNext()) { 670 CachedBlock next = iterator.next(); 671 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 672 if (cache instanceof BucketCache) { 673 refCount = ((BucketCache) cache).getRefCount(cacheKey); 674 } else if (cache instanceof CombinedBlockCache) { 675 refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey); 676 } else { 677 continue; 678 } 679 if (refCount != 0) { 680 assertEquals(NO_OF_THREADS, refCount); 681 foundNonZeroBlock = true; 682 } 683 } 684 assertTrue("Should have found nonzero ref count block",foundNonZeroBlock); 685 CustomInnerRegionObserver.getCdl().get().countDown(); 686 CustomInnerRegionObserver.getCdl().get().countDown(); 687 for (MultiGetThread thread : getThreads) { 688 thread.join(); 689 } 690 // Verify whether the gets have returned the blocks that it had 691 CustomInnerRegionObserver.waitForGets.set(true); 692 // giving some time for the block to be decremented 693 iterateBlockCache(cache, iterator); 694 getLatch.countDown(); 695 System.out.println("Gets should have returned the bloks"); 696 } finally { 697 if (table != null) { 698 table.close(); 699 } 700 } 701 } 702 @Test 703 public void testScanWithMultipleColumnFamilies() throws IOException, InterruptedException { 704 Table table = null; 705 try { 706 latch = new CountDownLatch(1); 707 // Check if get() returns blocks on its close() itself 708 final TableName tableName = TableName.valueOf(name.getMethodName()); 709 // Create KV that will give you two blocks 710 // Create a table with block size as 1024 711 byte[][] fams = new byte[10][]; 712 fams[0] = FAMILY; 713 for (int i = 1; i < 10; i++) { 714 fams[i] = (Bytes.toBytes("testFamily" + i)); 715 } 716 table = TEST_UTIL.createTable(tableName, fams, 1, 1024, 717 CustomInnerRegionObserver.class.getName()); 718 // get the block cache and region 719 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 720 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); 721 HRegion region = 722 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 723 BlockCache cache = setCacheProperties(region); 724 725 Put put = new Put(ROW); 726 put.addColumn(FAMILY, QUALIFIER, data); 727 table.put(put); 728 region.flush(true); 729 put = new Put(ROW1); 730 put.addColumn(FAMILY, QUALIFIER, data); 731 table.put(put); 732 region.flush(true); 733 for (int i = 1; i < 10; i++) { 734 put = new Put(ROW); 735 put.addColumn(Bytes.toBytes("testFamily" + i), Bytes.toBytes("testQualifier" + i), data2); 736 table.put(put); 737 if (i % 2 == 0) { 738 region.flush(true); 739 } 740 } 741 region.flush(true); 742 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 743 put = new Put(ROW); 744 put.addColumn(FAMILY, QUALIFIER2, data2); 745 table.put(put); 746 region.flush(true); 747 // flush the data 748 System.out.println("Flushing cache"); 749 // Should create one Hfile with 2 blocks 750 // Create three sets of gets 751 ScanThread[] scanThreads = initiateScan(table, true); 752 Thread.sleep(200); 753 Iterator<CachedBlock> iterator = cache.iterator(); 754 boolean usedBlocksFound = false; 755 int refCount = 0; 756 int noOfBlocksWithRef = 0; 757 while (iterator.hasNext()) { 758 CachedBlock next = iterator.next(); 759 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 760 if (cache instanceof BucketCache) { 761 refCount = ((BucketCache) cache).getRefCount(cacheKey); 762 } else if (cache instanceof CombinedBlockCache) { 763 refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey); 764 } else { 765 continue; 766 } 767 if (refCount != 0) { 768 // Blocks will be with count 3 769 System.out.println("The refCount is " + refCount); 770 assertEquals(NO_OF_THREADS, refCount); 771 usedBlocksFound = true; 772 noOfBlocksWithRef++; 773 } 774 } 775 assertTrue(usedBlocksFound); 776 // the number of blocks referred 777 assertEquals(12, noOfBlocksWithRef); 778 CustomInnerRegionObserver.getCdl().get().countDown(); 779 for (ScanThread thread : scanThreads) { 780 thread.join(); 781 } 782 // giving some time for the block to be decremented 783 checkForBlockEviction(cache, true, false); 784 } finally { 785 if (table != null) { 786 table.close(); 787 } 788 } 789 } 790 791 private BlockCache setCacheProperties(HRegion region) { 792 Iterator<HStore> strItr = region.getStores().iterator(); 793 BlockCache cache = null; 794 while (strItr.hasNext()) { 795 HStore store = strItr.next(); 796 CacheConfig cacheConf = store.getCacheConfig(); 797 cacheConf.setCacheDataOnWrite(true); 798 cacheConf.setEvictOnClose(true); 799 // Use the last one 800 cache = cacheConf.getBlockCache(); 801 } 802 return cache; 803 } 804 805 @Test 806 public void testParallelGetsAndScanWithWrappedRegionScanner() throws IOException, 807 InterruptedException { 808 Table table = null; 809 try { 810 latch = new CountDownLatch(2); 811 // Check if get() returns blocks on its close() itself 812 getLatch = new CountDownLatch(1); 813 final TableName tableName = TableName.valueOf(name.getMethodName()); 814 // Create KV that will give you two blocks 815 // Create a table with block size as 1024 816 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 817 CustomInnerRegionObserverWrapper.class.getName()); 818 // get the block cache and region 819 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 820 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); 821 HRegion region = 822 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 823 HStore store = region.getStores().iterator().next(); 824 CacheConfig cacheConf = store.getCacheConfig(); 825 cacheConf.setCacheDataOnWrite(true); 826 cacheConf.setEvictOnClose(true); 827 BlockCache cache = cacheConf.getBlockCache(); 828 829 // insert data. 2 Rows are added 830 insertData(table); 831 // flush the data 832 System.out.println("Flushing cache"); 833 // Should create one Hfile with 2 blocks 834 region.flush(true); 835 // CustomInnerRegionObserver.sleepTime.set(5000); 836 // Create three sets of scan 837 CustomInnerRegionObserver.waitForGets.set(true); 838 ScanThread[] scanThreads = initiateScan(table, false); 839 // Create three sets of gets 840 GetThread[] getThreads = initiateGet(table, false, false); 841 // The block would have been decremented for the scan case as it was 842 // wrapped 843 // before even the postNext hook gets executed. 844 // giving some time for the block to be decremented 845 Thread.sleep(100); 846 CustomInnerRegionObserver.waitForGets.set(false); 847 checkForBlockEviction(cache, false, false); 848 // countdown the latch 849 CustomInnerRegionObserver.getCdl().get().countDown(); 850 for (GetThread thread : getThreads) { 851 thread.join(); 852 } 853 getLatch.countDown(); 854 for (ScanThread thread : scanThreads) { 855 thread.join(); 856 } 857 } finally { 858 if (table != null) { 859 table.close(); 860 } 861 } 862 } 863 864 @Test 865 public void testScanWithCompaction() throws IOException, InterruptedException { 866 testScanWithCompactionInternals(name.getMethodName(), false); 867 } 868 869 @Test 870 public void testReverseScanWithCompaction() throws IOException, InterruptedException { 871 testScanWithCompactionInternals(name.getMethodName(), true); 872 } 873 874 private void testScanWithCompactionInternals(String tableNameStr, boolean reversed) 875 throws IOException, InterruptedException { 876 Table table = null; 877 try { 878 latch = new CountDownLatch(1); 879 compactionLatch = new CountDownLatch(1); 880 TableName tableName = TableName.valueOf(tableNameStr); 881 // Create a table with block size as 1024 882 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 883 CustomInnerRegionObserverWrapper.class.getName()); 884 // get the block cache and region 885 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 886 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); 887 HRegion region = 888 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 889 HStore store = region.getStores().iterator().next(); 890 CacheConfig cacheConf = store.getCacheConfig(); 891 cacheConf.setCacheDataOnWrite(true); 892 cacheConf.setEvictOnClose(true); 893 BlockCache cache = cacheConf.getBlockCache(); 894 895 // insert data. 2 Rows are added 896 Put put = new Put(ROW); 897 put.addColumn(FAMILY, QUALIFIER, data); 898 table.put(put); 899 put = new Put(ROW1); 900 put.addColumn(FAMILY, QUALIFIER, data); 901 table.put(put); 902 assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data)); 903 // Should create one Hfile with 2 blocks 904 region.flush(true); 905 // read the data and expect same blocks, one new hit, no misses 906 int refCount = 0; 907 // Check how this miss is happening 908 // insert a second column, read the row, no new blocks, 3 new hits 909 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 910 byte[] data2 = Bytes.add(data, data); 911 put = new Put(ROW); 912 put.addColumn(FAMILY, QUALIFIER2, data2); 913 table.put(put); 914 // flush, one new block 915 System.out.println("Flushing cache"); 916 region.flush(true); 917 Iterator<CachedBlock> iterator = cache.iterator(); 918 iterateBlockCache(cache, iterator); 919 // Create three sets of scan 920 ScanThread[] scanThreads = initiateScan(table, reversed); 921 Thread.sleep(100); 922 iterator = cache.iterator(); 923 boolean usedBlocksFound = false; 924 while (iterator.hasNext()) { 925 CachedBlock next = iterator.next(); 926 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 927 if (cache instanceof BucketCache) { 928 refCount = ((BucketCache) cache).getRefCount(cacheKey); 929 } else if (cache instanceof CombinedBlockCache) { 930 refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey); 931 } else { 932 continue; 933 } 934 if (refCount != 0) { 935 // Blocks will be with count 3 936 assertEquals(NO_OF_THREADS, refCount); 937 usedBlocksFound = true; 938 } 939 } 940 assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound); 941 usedBlocksFound = false; 942 System.out.println("Compacting"); 943 assertEquals(2, store.getStorefilesCount()); 944 store.triggerMajorCompaction(); 945 region.compact(true); 946 waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max 947 assertEquals(1, store.getStorefilesCount()); 948 // Even after compaction is done we will have some blocks that cannot 949 // be evicted this is because the scan is still referencing them 950 iterator = cache.iterator(); 951 while (iterator.hasNext()) { 952 CachedBlock next = iterator.next(); 953 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 954 if (cache instanceof BucketCache) { 955 refCount = ((BucketCache) cache).getRefCount(cacheKey); 956 } else if (cache instanceof CombinedBlockCache) { 957 refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey); 958 } else { 959 continue; 960 } 961 if (refCount != 0) { 962 // Blocks will be with count 3 as they are not yet cleared 963 assertEquals(NO_OF_THREADS, refCount); 964 usedBlocksFound = true; 965 } 966 } 967 assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound); 968 // Should not throw exception 969 compactionLatch.countDown(); 970 latch.countDown(); 971 for (ScanThread thread : scanThreads) { 972 thread.join(); 973 } 974 // by this time all blocks should have been evicted 975 iterator = cache.iterator(); 976 iterateBlockCache(cache, iterator); 977 Result r = table.get(new Get(ROW)); 978 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data)); 979 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2)); 980 // The gets would be working on new blocks 981 iterator = cache.iterator(); 982 iterateBlockCache(cache, iterator); 983 } finally { 984 if (table != null) { 985 table.close(); 986 } 987 } 988 } 989 990 @Test 991 public void testBlockEvictionAfterHBASE13082WithCompactionAndFlush() 992 throws IOException, InterruptedException { 993 // do flush and scan in parallel 994 Table table = null; 995 try { 996 latch = new CountDownLatch(1); 997 compactionLatch = new CountDownLatch(1); 998 final TableName tableName = TableName.valueOf(name.getMethodName()); 999 // Create a table with block size as 1024 1000 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 1001 CustomInnerRegionObserverWrapper.class.getName()); 1002 // get the block cache and region 1003 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 1004 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); 1005 HRegion region = 1006 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 1007 HStore store = region.getStores().iterator().next(); 1008 CacheConfig cacheConf = store.getCacheConfig(); 1009 cacheConf.setCacheDataOnWrite(true); 1010 cacheConf.setEvictOnClose(true); 1011 BlockCache cache = cacheConf.getBlockCache(); 1012 1013 // insert data. 2 Rows are added 1014 Put put = new Put(ROW); 1015 put.addColumn(FAMILY, QUALIFIER, data); 1016 table.put(put); 1017 put = new Put(ROW1); 1018 put.addColumn(FAMILY, QUALIFIER, data); 1019 table.put(put); 1020 assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data)); 1021 // Should create one Hfile with 2 blocks 1022 region.flush(true); 1023 // read the data and expect same blocks, one new hit, no misses 1024 int refCount = 0; 1025 // Check how this miss is happening 1026 // insert a second column, read the row, no new blocks, 3 new hits 1027 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 1028 byte[] data2 = Bytes.add(data, data); 1029 put = new Put(ROW); 1030 put.addColumn(FAMILY, QUALIFIER2, data2); 1031 table.put(put); 1032 // flush, one new block 1033 System.out.println("Flushing cache"); 1034 region.flush(true); 1035 Iterator<CachedBlock> iterator = cache.iterator(); 1036 iterateBlockCache(cache, iterator); 1037 // Create three sets of scan 1038 ScanThread[] scanThreads = initiateScan(table, false); 1039 Thread.sleep(100); 1040 iterator = cache.iterator(); 1041 boolean usedBlocksFound = false; 1042 while (iterator.hasNext()) { 1043 CachedBlock next = iterator.next(); 1044 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 1045 if (cache instanceof BucketCache) { 1046 refCount = ((BucketCache) cache).getRefCount(cacheKey); 1047 } else if (cache instanceof CombinedBlockCache) { 1048 refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey); 1049 } else { 1050 continue; 1051 } 1052 if (refCount != 0) { 1053 // Blocks will be with count 3 1054 assertEquals(NO_OF_THREADS, refCount); 1055 usedBlocksFound = true; 1056 } 1057 } 1058 // Make a put and do a flush 1059 QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 1060 data2 = Bytes.add(data, data); 1061 put = new Put(ROW1); 1062 put.addColumn(FAMILY, QUALIFIER2, data2); 1063 table.put(put); 1064 // flush, one new block 1065 System.out.println("Flushing cache"); 1066 region.flush(true); 1067 assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound); 1068 usedBlocksFound = false; 1069 System.out.println("Compacting"); 1070 assertEquals(3, store.getStorefilesCount()); 1071 store.triggerMajorCompaction(); 1072 region.compact(true); 1073 waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max 1074 assertEquals(1, store.getStorefilesCount()); 1075 // Even after compaction is done we will have some blocks that cannot 1076 // be evicted this is because the scan is still referencing them 1077 iterator = cache.iterator(); 1078 while (iterator.hasNext()) { 1079 CachedBlock next = iterator.next(); 1080 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 1081 if (cache instanceof BucketCache) { 1082 refCount = ((BucketCache) cache).getRefCount(cacheKey); 1083 } else if (cache instanceof CombinedBlockCache) { 1084 refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey); 1085 } else { 1086 continue; 1087 } 1088 if (refCount != 0) { 1089 // Blocks will be with count 3 as they are not yet cleared 1090 assertEquals(NO_OF_THREADS, refCount); 1091 usedBlocksFound = true; 1092 } 1093 } 1094 assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound); 1095 // Should not throw exception 1096 compactionLatch.countDown(); 1097 latch.countDown(); 1098 for (ScanThread thread : scanThreads) { 1099 thread.join(); 1100 } 1101 // by this time all blocks should have been evicted 1102 iterator = cache.iterator(); 1103 // Since a flush and compaction happened after a scan started 1104 // we need to ensure that all the original blocks of the compacted file 1105 // is also removed. 1106 iterateBlockCache(cache, iterator); 1107 Result r = table.get(new Get(ROW)); 1108 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data)); 1109 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2)); 1110 // The gets would be working on new blocks 1111 iterator = cache.iterator(); 1112 iterateBlockCache(cache, iterator); 1113 } finally { 1114 if (table != null) { 1115 table.close(); 1116 } 1117 } 1118 } 1119 1120 1121 @Test 1122 public void testScanWithException() throws IOException, InterruptedException { 1123 Table table = null; 1124 try { 1125 latch = new CountDownLatch(1); 1126 exceptionLatch = new CountDownLatch(1); 1127 final TableName tableName = TableName.valueOf(name.getMethodName()); 1128 // Create KV that will give you two blocks 1129 // Create a table with block size as 1024 1130 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 1131 CustomInnerRegionObserverWrapper.class.getName()); 1132 // get the block cache and region 1133 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 1134 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); 1135 HRegion region = 1136 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 1137 HStore store = region.getStores().iterator().next(); 1138 CacheConfig cacheConf = store.getCacheConfig(); 1139 cacheConf.setCacheDataOnWrite(true); 1140 cacheConf.setEvictOnClose(true); 1141 BlockCache cache = cacheConf.getBlockCache(); 1142 // insert data. 2 Rows are added 1143 insertData(table); 1144 // flush the data 1145 System.out.println("Flushing cache"); 1146 // Should create one Hfile with 2 blocks 1147 region.flush(true); 1148 // CustomInnerRegionObserver.sleepTime.set(5000); 1149 CustomInnerRegionObserver.throwException.set(true); 1150 ScanThread[] scanThreads = initiateScan(table, false); 1151 // The block would have been decremented for the scan case as it was 1152 // wrapped 1153 // before even the postNext hook gets executed. 1154 // giving some time for the block to be decremented 1155 Thread.sleep(100); 1156 Iterator<CachedBlock> iterator = cache.iterator(); 1157 boolean usedBlocksFound = false; 1158 int refCount = 0; 1159 while (iterator.hasNext()) { 1160 CachedBlock next = iterator.next(); 1161 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 1162 if (cache instanceof BucketCache) { 1163 refCount = ((BucketCache) cache).getRefCount(cacheKey); 1164 } else if (cache instanceof CombinedBlockCache) { 1165 refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey); 1166 } else { 1167 continue; 1168 } 1169 if (refCount != 0) { 1170 // Blocks will be with count 3 1171 assertEquals(NO_OF_THREADS, refCount); 1172 usedBlocksFound = true; 1173 } 1174 } 1175 assertTrue(usedBlocksFound); 1176 exceptionLatch.countDown(); 1177 // countdown the latch 1178 CustomInnerRegionObserver.getCdl().get().countDown(); 1179 for (ScanThread thread : scanThreads) { 1180 thread.join(); 1181 } 1182 iterator = cache.iterator(); 1183 usedBlocksFound = false; 1184 refCount = 0; 1185 while (iterator.hasNext()) { 1186 CachedBlock next = iterator.next(); 1187 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 1188 if (cache instanceof BucketCache) { 1189 refCount = ((BucketCache) cache).getRefCount(cacheKey); 1190 } else if (cache instanceof CombinedBlockCache) { 1191 refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey); 1192 } else { 1193 continue; 1194 } 1195 if (refCount != 0) { 1196 // Blocks will be with count 3 1197 assertEquals(NO_OF_THREADS, refCount); 1198 usedBlocksFound = true; 1199 } 1200 } 1201 assertFalse(usedBlocksFound); 1202 // you should always see 0 ref count. since after HBASE-16604 we always recreate the scanner 1203 assertEquals(0, refCount); 1204 } finally { 1205 if (table != null) { 1206 table.close(); 1207 } 1208 } 1209 } 1210 1211 private void iterateBlockCache(BlockCache cache, Iterator<CachedBlock> iterator) { 1212 int refCount; 1213 while (iterator.hasNext()) { 1214 CachedBlock next = iterator.next(); 1215 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 1216 if (cache instanceof BucketCache) { 1217 refCount = ((BucketCache) cache).getRefCount(cacheKey); 1218 } else if (cache instanceof CombinedBlockCache) { 1219 refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey); 1220 } else { 1221 continue; 1222 } 1223 assertEquals(0, refCount); 1224 } 1225 } 1226 1227 private void insertData(Table table) throws IOException { 1228 Put put = new Put(ROW); 1229 put.addColumn(FAMILY, QUALIFIER, data); 1230 table.put(put); 1231 put = new Put(ROW1); 1232 put.addColumn(FAMILY, QUALIFIER, data); 1233 table.put(put); 1234 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); 1235 put = new Put(ROW); 1236 put.addColumn(FAMILY, QUALIFIER2, data2); 1237 table.put(put); 1238 } 1239 1240 private ScanThread[] initiateScan(Table table, boolean reverse) throws IOException, 1241 InterruptedException { 1242 ScanThread[] scanThreads = new ScanThread[NO_OF_THREADS]; 1243 for (int i = 0; i < NO_OF_THREADS; i++) { 1244 scanThreads[i] = new ScanThread(table, reverse); 1245 } 1246 for (ScanThread thread : scanThreads) { 1247 thread.start(); 1248 } 1249 return scanThreads; 1250 } 1251 1252 private GetThread[] initiateGet(Table table, boolean tracker, boolean multipleCFs) 1253 throws IOException, InterruptedException { 1254 GetThread[] getThreads = new GetThread[NO_OF_THREADS]; 1255 for (int i = 0; i < NO_OF_THREADS; i++) { 1256 getThreads[i] = new GetThread(table, tracker, multipleCFs); 1257 } 1258 for (GetThread thread : getThreads) { 1259 thread.start(); 1260 } 1261 return getThreads; 1262 } 1263 1264 private MultiGetThread[] initiateMultiGet(Table table) 1265 throws IOException, InterruptedException { 1266 MultiGetThread[] multiGetThreads = new MultiGetThread[NO_OF_THREADS]; 1267 for (int i = 0; i < NO_OF_THREADS; i++) { 1268 multiGetThreads[i] = new MultiGetThread(table); 1269 } 1270 for (MultiGetThread thread : multiGetThreads) { 1271 thread.start(); 1272 } 1273 return multiGetThreads; 1274 } 1275 1276 private void checkForBlockEviction(BlockCache cache, boolean getClosed, boolean expectOnlyZero) 1277 throws InterruptedException { 1278 int counter = NO_OF_THREADS; 1279 if (CustomInnerRegionObserver.waitForGets.get()) { 1280 // Because only one row is selected, it has only 2 blocks 1281 counter = counter - 1; 1282 while (CustomInnerRegionObserver.countOfGets.get() < NO_OF_THREADS) { 1283 Thread.sleep(100); 1284 } 1285 } else { 1286 while (CustomInnerRegionObserver.countOfNext.get() < NO_OF_THREADS) { 1287 Thread.sleep(100); 1288 } 1289 } 1290 Iterator<CachedBlock> iterator = cache.iterator(); 1291 int refCount = 0; 1292 while (iterator.hasNext()) { 1293 CachedBlock next = iterator.next(); 1294 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 1295 if (cache instanceof BucketCache) { 1296 refCount = ((BucketCache) cache).getRefCount(cacheKey); 1297 } else if (cache instanceof CombinedBlockCache) { 1298 refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey); 1299 } else { 1300 continue; 1301 } 1302 System.out.println(" the refcount is " + refCount + " block is " + cacheKey); 1303 if (CustomInnerRegionObserver.waitForGets.get()) { 1304 if (expectOnlyZero) { 1305 assertTrue(refCount == 0); 1306 } 1307 if (refCount != 0) { 1308 // Because the scan would have also touched up on these blocks but 1309 // it 1310 // would have touched 1311 // all 3 1312 if (getClosed) { 1313 // If get has closed only the scan's blocks would be available 1314 assertEquals(refCount, CustomInnerRegionObserver.countOfGets.get()); 1315 } else { 1316 assertEquals(refCount, CustomInnerRegionObserver.countOfGets.get() + (NO_OF_THREADS)); 1317 } 1318 } 1319 } else { 1320 // Because the get would have also touched up on these blocks but it 1321 // would have touched 1322 // upon only 2 additionally 1323 if (expectOnlyZero) { 1324 assertTrue(refCount == 0); 1325 } 1326 if (refCount != 0) { 1327 if (getLatch == null) { 1328 assertEquals(refCount, CustomInnerRegionObserver.countOfNext.get()); 1329 } else { 1330 assertEquals(refCount, CustomInnerRegionObserver.countOfNext.get() + (NO_OF_THREADS)); 1331 } 1332 } 1333 } 1334 } 1335 CustomInnerRegionObserver.getCdl().get().countDown(); 1336 } 1337 1338 private static class MultiGetThread extends Thread { 1339 private final Table table; 1340 private final List<Get> gets = new ArrayList<>(); 1341 public MultiGetThread(Table table) { 1342 this.table = table; 1343 } 1344 @Override 1345 public void run() { 1346 gets.add(new Get(ROW)); 1347 gets.add(new Get(ROW1)); 1348 try { 1349 CustomInnerRegionObserver.getCdl().set(latch); 1350 Result[] r = table.get(gets); 1351 assertTrue(Bytes.equals(r[0].getRow(), ROW)); 1352 assertTrue(Bytes.equals(r[1].getRow(), ROW1)); 1353 } catch (IOException e) { 1354 } 1355 } 1356 } 1357 1358 private static class GetThread extends Thread { 1359 private final Table table; 1360 private final boolean tracker; 1361 private final boolean multipleCFs; 1362 1363 public GetThread(Table table, boolean tracker, boolean multipleCFs) { 1364 this.table = table; 1365 this.tracker = tracker; 1366 this.multipleCFs = multipleCFs; 1367 } 1368 1369 @Override 1370 public void run() { 1371 try { 1372 initiateGet(table); 1373 } catch (IOException e) { 1374 // do nothing 1375 } 1376 } 1377 1378 private void initiateGet(Table table) throws IOException { 1379 Get get = new Get(ROW); 1380 if (tracker) { 1381 // Change this 1382 if (!multipleCFs) { 1383 get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 3)); 1384 get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 8)); 1385 get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 9)); 1386 // Unknown key 1387 get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 900)); 1388 } else { 1389 get.addColumn(Bytes.toBytes("testFamily" + 3), Bytes.toBytes("testQualifier" + 3)); 1390 get.addColumn(Bytes.toBytes("testFamily" + 8), Bytes.toBytes("testQualifier" + 8)); 1391 get.addColumn(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 9)); 1392 // Unknown key 1393 get.addColumn(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 900)); 1394 } 1395 } 1396 CustomInnerRegionObserver.getCdl().set(latch); 1397 Result r = table.get(get); 1398 System.out.println(r); 1399 if (!tracker) { 1400 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data)); 1401 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2)); 1402 } else { 1403 if (!multipleCFs) { 1404 assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 3)), data2)); 1405 assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 8)), data2)); 1406 assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 9)), data2)); 1407 } else { 1408 assertTrue(Bytes.equals( 1409 r.getValue(Bytes.toBytes("testFamily" + 3), Bytes.toBytes("testQualifier" + 3)), 1410 data2)); 1411 assertTrue(Bytes.equals( 1412 r.getValue(Bytes.toBytes("testFamily" + 8), Bytes.toBytes("testQualifier" + 8)), 1413 data2)); 1414 assertTrue(Bytes.equals( 1415 r.getValue(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 9)), 1416 data2)); 1417 } 1418 } 1419 } 1420 } 1421 1422 private static class ScanThread extends Thread { 1423 private final Table table; 1424 private final boolean reverse; 1425 1426 public ScanThread(Table table, boolean reverse) { 1427 this.table = table; 1428 this.reverse = reverse; 1429 } 1430 1431 @Override 1432 public void run() { 1433 try { 1434 initiateScan(table); 1435 } catch (IOException e) { 1436 // do nothing 1437 } 1438 } 1439 1440 private void initiateScan(Table table) throws IOException { 1441 Scan scan = new Scan(); 1442 if (reverse) { 1443 scan.setReversed(true); 1444 } 1445 CustomInnerRegionObserver.getCdl().set(latch); 1446 ResultScanner resScanner = table.getScanner(scan); 1447 int i = (reverse ? ROWS.length - 1 : 0); 1448 boolean resultFound = false; 1449 for (Result result : resScanner) { 1450 resultFound = true; 1451 System.out.println(result); 1452 if (!reverse) { 1453 assertTrue(Bytes.equals(result.getRow(), ROWS[i])); 1454 i++; 1455 } else { 1456 assertTrue(Bytes.equals(result.getRow(), ROWS[i])); 1457 i--; 1458 } 1459 } 1460 assertTrue(resultFound); 1461 } 1462 } 1463 1464 private void waitForStoreFileCount(HStore store, int count, int timeout) 1465 throws InterruptedException { 1466 long start = System.currentTimeMillis(); 1467 while (start + timeout > System.currentTimeMillis() && store.getStorefilesCount() != count) { 1468 Thread.sleep(100); 1469 } 1470 System.out.println("start=" + start + ", now=" + System.currentTimeMillis() + ", cur=" + 1471 store.getStorefilesCount()); 1472 assertEquals(count, store.getStorefilesCount()); 1473 } 1474 1475 private static class CustomScanner implements RegionScanner { 1476 1477 private RegionScanner delegate; 1478 1479 public CustomScanner(RegionScanner delegate) { 1480 this.delegate = delegate; 1481 } 1482 1483 @Override 1484 public boolean next(List<Cell> results) throws IOException { 1485 return delegate.next(results); 1486 } 1487 1488 @Override 1489 public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException { 1490 return delegate.next(result, scannerContext); 1491 } 1492 1493 @Override 1494 public boolean nextRaw(List<Cell> result) throws IOException { 1495 return delegate.nextRaw(result); 1496 } 1497 1498 @Override 1499 public boolean nextRaw(List<Cell> result, ScannerContext context) throws IOException { 1500 boolean nextRaw = delegate.nextRaw(result, context); 1501 if (compactionLatch != null && compactionLatch.getCount() > 0) { 1502 try { 1503 compactionLatch.await(); 1504 } catch (InterruptedException ie) { 1505 } 1506 } 1507 1508 if (CustomInnerRegionObserver.throwException.get()) { 1509 if (exceptionLatch.getCount() > 0) { 1510 try { 1511 exceptionLatch.await(); 1512 } catch (InterruptedException e) { 1513 } 1514 throw new IOException("throw exception"); 1515 } 1516 } 1517 return nextRaw; 1518 } 1519 1520 @Override 1521 public void close() throws IOException { 1522 delegate.close(); 1523 } 1524 1525 @Override 1526 public RegionInfo getRegionInfo() { 1527 return delegate.getRegionInfo(); 1528 } 1529 1530 @Override 1531 public boolean isFilterDone() throws IOException { 1532 return delegate.isFilterDone(); 1533 } 1534 1535 @Override 1536 public boolean reseek(byte[] row) throws IOException { 1537 return false; 1538 } 1539 1540 @Override 1541 public long getMaxResultSize() { 1542 return delegate.getMaxResultSize(); 1543 } 1544 1545 @Override 1546 public long getMvccReadPoint() { 1547 return delegate.getMvccReadPoint(); 1548 } 1549 1550 @Override 1551 public int getBatch() { 1552 return delegate.getBatch(); 1553 } 1554 } 1555 1556 public static class CustomInnerRegionObserverWrapper extends CustomInnerRegionObserver { 1557 @Override 1558 public RegionScanner postScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e, 1559 Scan scan, RegionScanner s) throws IOException { 1560 return new CustomScanner(s); 1561 } 1562 } 1563 1564 public static class CustomInnerRegionObserver implements RegionCoprocessor, RegionObserver { 1565 static final AtomicLong sleepTime = new AtomicLong(0); 1566 static final AtomicBoolean slowDownNext = new AtomicBoolean(false); 1567 static final AtomicInteger countOfNext = new AtomicInteger(0); 1568 static final AtomicInteger countOfGets = new AtomicInteger(0); 1569 static final AtomicBoolean waitForGets = new AtomicBoolean(false); 1570 static final AtomicBoolean throwException = new AtomicBoolean(false); 1571 private static final AtomicReference<CountDownLatch> cdl = new AtomicReference<>( 1572 new CountDownLatch(0)); 1573 1574 @Override 1575 public Optional<RegionObserver> getRegionObserver() { 1576 return Optional.of(this); 1577 } 1578 1579 @Override 1580 public boolean postScannerNext(ObserverContext<RegionCoprocessorEnvironment> e, 1581 InternalScanner s, List<Result> results, int limit, boolean hasMore) throws IOException { 1582 slowdownCode(e, false); 1583 if (getLatch != null && getLatch.getCount() > 0) { 1584 try { 1585 getLatch.await(); 1586 } catch (InterruptedException e1) { 1587 } 1588 } 1589 return hasMore; 1590 } 1591 1592 @Override 1593 public void postGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get, 1594 List<Cell> results) throws IOException { 1595 slowdownCode(e, true); 1596 } 1597 1598 public static AtomicReference<CountDownLatch> getCdl() { 1599 return cdl; 1600 } 1601 1602 private void slowdownCode(final ObserverContext<RegionCoprocessorEnvironment> e, 1603 boolean isGet) { 1604 CountDownLatch latch = getCdl().get(); 1605 try { 1606 System.out.println(latch.getCount() + " is the count " + isGet); 1607 if (latch.getCount() > 0) { 1608 if (isGet) { 1609 countOfGets.incrementAndGet(); 1610 } else { 1611 countOfNext.incrementAndGet(); 1612 } 1613 LOG.info("Waiting for the counterCountDownLatch"); 1614 latch.await(2, TimeUnit.MINUTES); // To help the tests to finish. 1615 if (latch.getCount() > 0) { 1616 throw new RuntimeException("Can't wait more"); 1617 } 1618 } 1619 } catch (InterruptedException e1) { 1620 LOG.error(e1.toString(), e1); 1621 } 1622 } 1623 } 1624}