001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertTrue; 022 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.Iterator; 026import java.util.List; 027import java.util.Optional; 028import java.util.concurrent.CountDownLatch; 029import java.util.concurrent.atomic.AtomicBoolean; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hbase.Cell; 032import org.apache.hadoop.hbase.CellComparatorImpl; 033import org.apache.hadoop.hbase.ExtendedCell; 034import org.apache.hadoop.hbase.HBaseTestingUtil; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 038import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; 039import org.apache.hadoop.hbase.coprocessor.ObserverContext; 040import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 041import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 042import org.apache.hadoop.hbase.coprocessor.RegionObserver; 043import org.apache.hadoop.hbase.io.hfile.BlockCache; 044import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 045import org.apache.hadoop.hbase.io.hfile.CacheConfig; 046import org.apache.hadoop.hbase.io.hfile.CachedBlock; 047import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache; 048import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; 049import org.apache.hadoop.hbase.regionserver.DelegatingInternalScanner; 050import org.apache.hadoop.hbase.regionserver.HRegion; 051import org.apache.hadoop.hbase.regionserver.HStore; 052import org.apache.hadoop.hbase.regionserver.InternalScanner; 053import org.apache.hadoop.hbase.regionserver.ScanType; 054import org.apache.hadoop.hbase.regionserver.ScannerContext; 055import org.apache.hadoop.hbase.regionserver.Store; 056import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 057import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; 058import org.apache.hadoop.hbase.testclassification.ClientTests; 059import org.apache.hadoop.hbase.testclassification.LargeTests; 060import org.apache.hadoop.hbase.util.Bytes; 061import org.junit.jupiter.api.AfterAll; 062import org.junit.jupiter.api.BeforeAll; 063import org.junit.jupiter.api.BeforeEach; 064import org.junit.jupiter.api.Tag; 065import org.junit.jupiter.api.Test; 066import org.junit.jupiter.api.TestInfo; 067 068import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 069 070@Tag(LargeTests.TAG) 071@Tag(ClientTests.TAG) 072public class TestAvoidCellReferencesIntoShippedBlocks { 073 074 protected final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 075 static byte[][] ROWS = new byte[2][]; 076 private static byte[] ROW = Bytes.toBytes("testRow"); 077 private static byte[] ROW1 = Bytes.toBytes("testRow1"); 078 private static byte[] ROW2 = Bytes.toBytes("testRow2"); 079 private static byte[] ROW3 = Bytes.toBytes("testRow3"); 080 private static byte[] ROW4 = Bytes.toBytes("testRow4"); 081 private static byte[] ROW5 = Bytes.toBytes("testRow5"); 082 private static byte[] FAMILY = Bytes.toBytes("testFamily"); 083 private static byte[][] FAMILIES_1 = new byte[1][0]; 084 private static byte[] QUALIFIER = Bytes.toBytes("testQualifier"); 085 private static byte[] QUALIFIER1 = Bytes.toBytes("testQualifier1"); 086 private static byte[] data = new byte[1000]; 087 protected static int SLAVES = 1; 088 private CountDownLatch latch = new CountDownLatch(1); 089 private static CountDownLatch compactReadLatch = new CountDownLatch(1); 090 private static AtomicBoolean doScan = new AtomicBoolean(false); 091 private String methodName; 092 093 @BeforeEach 094 public void setUp(TestInfo testInfo) throws Exception { 095 this.methodName = testInfo.getTestMethod().get().getName(); 096 } 097 098 @BeforeAll 099 public static void setUpBeforeClass() throws Exception { 100 ROWS[0] = ROW; 101 ROWS[1] = ROW1; 102 Configuration conf = TEST_UTIL.getConfiguration(); 103 conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, 104 MultiRowMutationEndpoint.class.getName()); 105 conf.setInt("hbase.regionserver.handler.count", 20); 106 conf.setInt("hbase.bucketcache.size", 400); 107 conf.setStrings(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap"); 108 conf.setInt("hbase.hstore.compactionThreshold", 7); 109 conf.setFloat("hfile.block.cache.size", 0.2f); 110 conf.setFloat("hbase.regionserver.global.memstore.size", 0.1f); 111 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);// do not retry 112 conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 500000); 113 FAMILIES_1[0] = FAMILY; 114 TEST_UTIL.startMiniCluster(SLAVES); 115 compactReadLatch = new CountDownLatch(1); 116 } 117 118 /** 119 * @throws java.lang.Exception 120 */ 121 @AfterAll 122 public static void tearDownAfterClass() throws Exception { 123 TEST_UTIL.shutdownMiniCluster(); 124 } 125 126 @Test 127 public void testHBase16372InCompactionWritePath() throws Exception { 128 final TableName tableName = TableName.valueOf(methodName); 129 // Create a table with block size as 1024 130 final Table table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 131 CompactorRegionObserver.class.getName()); 132 try { 133 // get the block cache and region 134 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 135 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 136 HRegion region = 137 (HRegion) TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 138 HStore store = region.getStores().iterator().next(); 139 CacheConfig cacheConf = store.getCacheConfig(); 140 cacheConf.setCacheDataOnWrite(true); 141 cacheConf.setEvictOnClose(true); 142 final BlockCache cache = cacheConf.getBlockCache().get(); 143 // insert data. 5 Rows are added 144 Put put = new Put(ROW); 145 put.addColumn(FAMILY, QUALIFIER, data); 146 table.put(put); 147 put = new Put(ROW); 148 put.addColumn(FAMILY, QUALIFIER1, data); 149 table.put(put); 150 put = new Put(ROW1); 151 put.addColumn(FAMILY, QUALIFIER, data); 152 table.put(put); 153 // data was in memstore so don't expect any changes 154 region.flush(true); 155 put = new Put(ROW1); 156 put.addColumn(FAMILY, QUALIFIER1, data); 157 table.put(put); 158 put = new Put(ROW2); 159 put.addColumn(FAMILY, QUALIFIER, data); 160 table.put(put); 161 put = new Put(ROW2); 162 put.addColumn(FAMILY, QUALIFIER1, data); 163 table.put(put); 164 // data was in memstore so don't expect any changes 165 region.flush(true); 166 put = new Put(ROW3); 167 put.addColumn(FAMILY, QUALIFIER, data); 168 table.put(put); 169 put = new Put(ROW3); 170 put.addColumn(FAMILY, QUALIFIER1, data); 171 table.put(put); 172 put = new Put(ROW4); 173 put.addColumn(FAMILY, QUALIFIER, data); 174 table.put(put); 175 // data was in memstore so don't expect any changes 176 region.flush(true); 177 put = new Put(ROW4); 178 put.addColumn(FAMILY, QUALIFIER1, data); 179 table.put(put); 180 put = new Put(ROW5); 181 put.addColumn(FAMILY, QUALIFIER, data); 182 table.put(put); 183 put = new Put(ROW5); 184 put.addColumn(FAMILY, QUALIFIER1, data); 185 table.put(put); 186 // data was in memstore so don't expect any changes 187 region.flush(true); 188 // Load cache 189 Scan s = new Scan(); 190 s.setMaxResultSize(1000); 191 int count; 192 try (ResultScanner scanner = table.getScanner(s)) { 193 count = Iterables.size(scanner); 194 } 195 assertEquals(6, count, "Count all the rows "); 196 // all the cache is loaded 197 // trigger a major compaction 198 ScannerThread scannerThread = new ScannerThread(table, cache); 199 scannerThread.start(); 200 region.compact(true); 201 s = new Scan(); 202 s.setMaxResultSize(1000); 203 try (ResultScanner scanner = table.getScanner(s)) { 204 count = Iterables.size(scanner); 205 } 206 assertEquals(6, count, "Count all the rows "); 207 } finally { 208 table.close(); 209 } 210 } 211 212 private static class ScannerThread extends Thread { 213 private final Table table; 214 private final BlockCache cache; 215 216 public ScannerThread(Table table, BlockCache cache) { 217 this.table = table; 218 this.cache = cache; 219 } 220 221 @Override 222 public void run() { 223 Scan s = new Scan().withStartRow(ROW4).withStopRow(ROW5).setCaching(1); 224 try { 225 while (!doScan.get()) { 226 try { 227 // Sleep till you start scan 228 Thread.sleep(1); 229 } catch (InterruptedException e) { 230 } 231 } 232 List<BlockCacheKey> cacheList = new ArrayList<>(); 233 Iterator<CachedBlock> iterator = cache.iterator(); 234 // evict all the blocks 235 while (iterator.hasNext()) { 236 CachedBlock next = iterator.next(); 237 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 238 cacheList.add(cacheKey); 239 // evict what ever is available 240 cache.evictBlock(cacheKey); 241 } 242 try (ResultScanner scanner = table.getScanner(s)) { 243 while (scanner.next() != null) { 244 } 245 } 246 compactReadLatch.countDown(); 247 } catch (IOException e) { 248 } 249 } 250 } 251 252 public static class CompactorRegionObserver implements RegionCoprocessor, RegionObserver { 253 254 @Override 255 public Optional<RegionObserver> getRegionObserver() { 256 return Optional.of(this); 257 } 258 259 @Override 260 public InternalScanner preCompact(ObserverContext<? extends RegionCoprocessorEnvironment> c, 261 Store store, InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker, 262 CompactionRequest request) throws IOException { 263 return new CompactorInternalScanner(scanner); 264 } 265 } 266 267 private static final class CompactorInternalScanner extends DelegatingInternalScanner { 268 269 public CompactorInternalScanner(InternalScanner scanner) { 270 super(scanner); 271 } 272 273 @Override 274 public boolean next(List<? super ExtendedCell> result, ScannerContext scannerContext) 275 throws IOException { 276 boolean next = scanner.next(result, scannerContext); 277 for (Iterator<? super ExtendedCell> iter = result.iterator(); iter.hasNext();) { 278 Cell cell = (Cell) iter.next(); 279 if (CellComparatorImpl.COMPARATOR.compareRows(cell, ROW2, 0, ROW2.length) == 0) { 280 try { 281 // hold the compaction 282 // set doscan to true 283 doScan.compareAndSet(false, true); 284 compactReadLatch.await(); 285 } catch (InterruptedException e) { 286 } 287 } 288 } 289 return next; 290 } 291 } 292 293 @Test 294 public void testHBASE16372InReadPath() throws Exception { 295 final TableName tableName = TableName.valueOf(methodName); 296 // Create a table with block size as 1024 297 try (Table table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, null)) { 298 // get the block cache and region 299 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 300 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 301 HRegion region = 302 (HRegion) TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 303 HStore store = region.getStores().iterator().next(); 304 CacheConfig cacheConf = store.getCacheConfig(); 305 cacheConf.setCacheDataOnWrite(true); 306 cacheConf.setEvictOnClose(true); 307 final BlockCache cache = cacheConf.getBlockCache().get(); 308 // insert data. 5 Rows are added 309 Put put = new Put(ROW); 310 put.addColumn(FAMILY, QUALIFIER, data); 311 table.put(put); 312 put = new Put(ROW); 313 put.addColumn(FAMILY, QUALIFIER1, data); 314 table.put(put); 315 put = new Put(ROW1); 316 put.addColumn(FAMILY, QUALIFIER, data); 317 table.put(put); 318 put = new Put(ROW1); 319 put.addColumn(FAMILY, QUALIFIER1, data); 320 table.put(put); 321 put = new Put(ROW2); 322 put.addColumn(FAMILY, QUALIFIER, data); 323 table.put(put); 324 put = new Put(ROW2); 325 put.addColumn(FAMILY, QUALIFIER1, data); 326 table.put(put); 327 put = new Put(ROW3); 328 put.addColumn(FAMILY, QUALIFIER, data); 329 table.put(put); 330 put = new Put(ROW3); 331 put.addColumn(FAMILY, QUALIFIER1, data); 332 table.put(put); 333 put = new Put(ROW4); 334 put.addColumn(FAMILY, QUALIFIER, data); 335 table.put(put); 336 put = new Put(ROW4); 337 put.addColumn(FAMILY, QUALIFIER1, data); 338 table.put(put); 339 put = new Put(ROW5); 340 put.addColumn(FAMILY, QUALIFIER, data); 341 table.put(put); 342 put = new Put(ROW5); 343 put.addColumn(FAMILY, QUALIFIER1, data); 344 table.put(put); 345 // data was in memstore so don't expect any changes 346 region.flush(true); 347 // Load cache 348 Scan s = new Scan(); 349 s.setMaxResultSize(1000); 350 int count; 351 try (ResultScanner scanner = table.getScanner(s)) { 352 count = Iterables.size(scanner); 353 } 354 assertEquals(6, count, "Count all the rows "); 355 356 // Scan from cache 357 s = new Scan(); 358 // Start a scan from row3 359 s.setCaching(1); 360 s.withStartRow(ROW1); 361 // set partial as true so that the scan can send partial columns also 362 s.setAllowPartialResults(true); 363 s.setMaxResultSize(1000); 364 try (ScanPerNextResultScanner scanner = 365 new ScanPerNextResultScanner(TEST_UTIL.getAsyncConnection().getTable(tableName), s)) { 366 Thread evictorThread = new Thread() { 367 @Override 368 public void run() { 369 List<BlockCacheKey> cacheList = new ArrayList<>(); 370 Iterator<CachedBlock> iterator = cache.iterator(); 371 // evict all the blocks 372 while (iterator.hasNext()) { 373 CachedBlock next = iterator.next(); 374 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 375 cacheList.add(cacheKey); 376 /** 377 * There is only one Block referenced by rpc,here we evict blocks which have no rpc 378 * referenced. 379 */ 380 evictBlock(cache, cacheKey); 381 } 382 try { 383 Thread.sleep(1); 384 } catch (InterruptedException e1) { 385 } 386 iterator = cache.iterator(); 387 int refBlockCount = 0; 388 while (iterator.hasNext()) { 389 iterator.next(); 390 refBlockCount++; 391 } 392 assertEquals(1, refBlockCount, "One block should be there "); 393 // Rescan to prepopulate the data 394 // cache this row. 395 Scan s1 = new Scan(); 396 // This scan will start from ROW1 and it will populate the cache with a 397 // row that is lower than ROW3. 398 s1.withStartRow(ROW3); 399 s1.withStopRow(ROW5); 400 s1.setCaching(1); 401 402 try (ResultScanner scanner = table.getScanner(s1)) { 403 int count = Iterables.size(scanner); 404 assertEquals(2, count, "Count the rows"); 405 int newBlockRefCount = 0; 406 List<BlockCacheKey> newCacheList = new ArrayList<>(); 407 while (true) { 408 newBlockRefCount = 0; 409 newCacheList.clear(); 410 iterator = cache.iterator(); 411 while (iterator.hasNext()) { 412 CachedBlock next = iterator.next(); 413 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 414 newCacheList.add(cacheKey); 415 } 416 for (BlockCacheKey key : cacheList) { 417 if (newCacheList.contains(key)) { 418 newBlockRefCount++; 419 } 420 } 421 if (newBlockRefCount == 6) { 422 break; 423 } 424 } 425 latch.countDown(); 426 } catch (IOException e) { 427 } 428 } 429 }; 430 count = 0; 431 while (scanner.next() != null) { 432 count++; 433 if (count == 2) { 434 evictorThread.start(); 435 latch.await(); 436 } 437 } 438 } 439 assertEquals(10, count, "Count should give all rows "); 440 } 441 } 442 443 /** 444 * For {@link BucketCache},we only evict Block if there is no rpc referenced. 445 */ 446 private void evictBlock(BlockCache blockCache, BlockCacheKey blockCacheKey) { 447 assertTrue(blockCache instanceof CombinedBlockCache); 448 BlockCache[] blockCaches = blockCache.getBlockCaches(); 449 for (BlockCache currentBlockCache : blockCaches) { 450 if (currentBlockCache instanceof BucketCache) { 451 ((BucketCache) currentBlockCache).evictBlockIfNoRpcReferenced(blockCacheKey); 452 } else { 453 currentBlockCache.evictBlock(blockCacheKey); 454 } 455 } 456 457 } 458}