001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.Iterator; 026import java.util.List; 027import java.util.Optional; 028import java.util.concurrent.CountDownLatch; 029import java.util.concurrent.atomic.AtomicBoolean; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hbase.Cell; 032import org.apache.hadoop.hbase.CellComparatorImpl; 033import org.apache.hadoop.hbase.HBaseClassTestRule; 034import org.apache.hadoop.hbase.HBaseTestingUtility; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 038import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; 039import org.apache.hadoop.hbase.coprocessor.ObserverContext; 040import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 041import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 042import org.apache.hadoop.hbase.coprocessor.RegionObserver; 043import org.apache.hadoop.hbase.io.hfile.BlockCache; 044import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 045import org.apache.hadoop.hbase.io.hfile.CacheConfig; 046import org.apache.hadoop.hbase.io.hfile.CachedBlock; 047import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache; 048import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; 049import org.apache.hadoop.hbase.regionserver.DelegatingInternalScanner; 050import org.apache.hadoop.hbase.regionserver.HRegion; 051import org.apache.hadoop.hbase.regionserver.HStore; 052import org.apache.hadoop.hbase.regionserver.InternalScanner; 053import org.apache.hadoop.hbase.regionserver.ScanType; 054import org.apache.hadoop.hbase.regionserver.ScannerContext; 055import org.apache.hadoop.hbase.regionserver.Store; 056import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 057import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; 058import org.apache.hadoop.hbase.testclassification.ClientTests; 059import org.apache.hadoop.hbase.testclassification.LargeTests; 060import org.apache.hadoop.hbase.util.Bytes; 061import org.junit.AfterClass; 062import org.junit.BeforeClass; 063import org.junit.ClassRule; 064import org.junit.Rule; 065import org.junit.Test; 066import org.junit.experimental.categories.Category; 067import org.junit.rules.TestName; 068 069import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 070 071@Category({ LargeTests.class, ClientTests.class }) 072public class TestAvoidCellReferencesIntoShippedBlocks { 073 074 @ClassRule 075 public static final HBaseClassTestRule CLASS_RULE = 076 HBaseClassTestRule.forClass(TestAvoidCellReferencesIntoShippedBlocks.class); 077 078 protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 079 static byte[][] ROWS = new byte[2][]; 080 private static byte[] ROW = Bytes.toBytes("testRow"); 081 private static byte[] ROW1 = Bytes.toBytes("testRow1"); 082 private static byte[] ROW2 = Bytes.toBytes("testRow2"); 083 private static byte[] ROW3 = Bytes.toBytes("testRow3"); 084 private static byte[] ROW4 = Bytes.toBytes("testRow4"); 085 private static byte[] ROW5 = Bytes.toBytes("testRow5"); 086 private static byte[] FAMILY = Bytes.toBytes("testFamily"); 087 private static byte[][] FAMILIES_1 = new byte[1][0]; 088 private static byte[] QUALIFIER = Bytes.toBytes("testQualifier"); 089 private static byte[] QUALIFIER1 = Bytes.toBytes("testQualifier1"); 090 private static byte[] data = new byte[1000]; 091 protected static int SLAVES = 1; 092 private CountDownLatch latch = new CountDownLatch(1); 093 private static CountDownLatch compactReadLatch = new CountDownLatch(1); 094 private static AtomicBoolean doScan = new AtomicBoolean(false); 095 096 @Rule 097 public TestName name = new TestName(); 098 099 /** 100 * @throws java.lang.Exception 101 */ 102 @BeforeClass 103 public static void setUpBeforeClass() throws Exception { 104 ROWS[0] = ROW; 105 ROWS[1] = ROW1; 106 Configuration conf = TEST_UTIL.getConfiguration(); 107 conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, 108 MultiRowMutationEndpoint.class.getName()); 109 conf.setInt("hbase.regionserver.handler.count", 20); 110 conf.setInt("hbase.bucketcache.size", 400); 111 conf.setStrings(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap"); 112 conf.setInt("hbase.hstore.compactionThreshold", 7); 113 conf.setFloat("hfile.block.cache.size", 0.2f); 114 conf.setFloat("hbase.regionserver.global.memstore.size", 0.1f); 115 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);// do not retry 116 conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 500000); 117 FAMILIES_1[0] = FAMILY; 118 TEST_UTIL.startMiniCluster(SLAVES); 119 compactReadLatch = new CountDownLatch(1); 120 } 121 122 /** 123 * @throws java.lang.Exception 124 */ 125 @AfterClass 126 public static void tearDownAfterClass() throws Exception { 127 TEST_UTIL.shutdownMiniCluster(); 128 } 129 130 @Test 131 public void testHBase16372InCompactionWritePath() throws Exception { 132 final TableName tableName = TableName.valueOf(name.getMethodName()); 133 // Create a table with block size as 1024 134 final Table table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 135 CompactorRegionObserver.class.getName()); 136 try { 137 // get the block cache and region 138 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 139 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 140 HRegion region = 141 (HRegion) TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 142 HStore store = region.getStores().iterator().next(); 143 CacheConfig cacheConf = store.getCacheConfig(); 144 cacheConf.setCacheDataOnWrite(true); 145 cacheConf.setEvictOnClose(true); 146 final BlockCache cache = cacheConf.getBlockCache().get(); 147 // insert data. 5 Rows are added 148 Put put = new Put(ROW); 149 put.addColumn(FAMILY, QUALIFIER, data); 150 table.put(put); 151 put = new Put(ROW); 152 put.addColumn(FAMILY, QUALIFIER1, data); 153 table.put(put); 154 put = new Put(ROW1); 155 put.addColumn(FAMILY, QUALIFIER, data); 156 table.put(put); 157 // data was in memstore so don't expect any changes 158 region.flush(true); 159 put = new Put(ROW1); 160 put.addColumn(FAMILY, QUALIFIER1, data); 161 table.put(put); 162 put = new Put(ROW2); 163 put.addColumn(FAMILY, QUALIFIER, data); 164 table.put(put); 165 put = new Put(ROW2); 166 put.addColumn(FAMILY, QUALIFIER1, data); 167 table.put(put); 168 // data was in memstore so don't expect any changes 169 region.flush(true); 170 put = new Put(ROW3); 171 put.addColumn(FAMILY, QUALIFIER, data); 172 table.put(put); 173 put = new Put(ROW3); 174 put.addColumn(FAMILY, QUALIFIER1, data); 175 table.put(put); 176 put = new Put(ROW4); 177 put.addColumn(FAMILY, QUALIFIER, data); 178 table.put(put); 179 // data was in memstore so don't expect any changes 180 region.flush(true); 181 put = new Put(ROW4); 182 put.addColumn(FAMILY, QUALIFIER1, data); 183 table.put(put); 184 put = new Put(ROW5); 185 put.addColumn(FAMILY, QUALIFIER, data); 186 table.put(put); 187 put = new Put(ROW5); 188 put.addColumn(FAMILY, QUALIFIER1, data); 189 table.put(put); 190 // data was in memstore so don't expect any changes 191 region.flush(true); 192 // Load cache 193 Scan s = new Scan(); 194 s.setMaxResultSize(1000); 195 int count; 196 try (ResultScanner scanner = table.getScanner(s)) { 197 count = Iterables.size(scanner); 198 } 199 assertEquals("Count all the rows ", 6, count); 200 // all the cache is loaded 201 // trigger a major compaction 202 ScannerThread scannerThread = new ScannerThread(table, cache); 203 scannerThread.start(); 204 region.compact(true); 205 s = new Scan(); 206 s.setMaxResultSize(1000); 207 try (ResultScanner scanner = table.getScanner(s)) { 208 count = Iterables.size(scanner); 209 } 210 assertEquals("Count all the rows ", 6, count); 211 } finally { 212 table.close(); 213 } 214 } 215 216 private static class ScannerThread extends Thread { 217 private final Table table; 218 private final BlockCache cache; 219 220 public ScannerThread(Table table, BlockCache cache) { 221 this.table = table; 222 this.cache = cache; 223 } 224 225 @Override 226 public void run() { 227 Scan s = new Scan().withStartRow(ROW4).withStopRow(ROW5).setCaching(1); 228 try { 229 while (!doScan.get()) { 230 try { 231 // Sleep till you start scan 232 Thread.sleep(1); 233 } catch (InterruptedException e) { 234 } 235 } 236 List<BlockCacheKey> cacheList = new ArrayList<>(); 237 Iterator<CachedBlock> iterator = cache.iterator(); 238 // evict all the blocks 239 while (iterator.hasNext()) { 240 CachedBlock next = iterator.next(); 241 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 242 cacheList.add(cacheKey); 243 // evict what ever is available 244 cache.evictBlock(cacheKey); 245 } 246 try (ResultScanner scanner = table.getScanner(s)) { 247 while (scanner.next() != null) { 248 } 249 } 250 compactReadLatch.countDown(); 251 } catch (IOException e) { 252 } 253 } 254 } 255 256 public static class CompactorRegionObserver implements RegionCoprocessor, RegionObserver { 257 258 @Override 259 public Optional<RegionObserver> getRegionObserver() { 260 return Optional.of(this); 261 } 262 263 @Override 264 public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store, 265 InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker, 266 CompactionRequest request) throws IOException { 267 return new CompactorInternalScanner(scanner); 268 } 269 } 270 271 private static final class CompactorInternalScanner extends DelegatingInternalScanner { 272 273 public CompactorInternalScanner(InternalScanner scanner) { 274 super(scanner); 275 } 276 277 @Override 278 public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException { 279 boolean next = scanner.next(result, scannerContext); 280 for (Cell cell : result) { 281 if (CellComparatorImpl.COMPARATOR.compareRows(cell, ROW2, 0, ROW2.length) == 0) { 282 try { 283 // hold the compaction 284 // set doscan to true 285 doScan.compareAndSet(false, true); 286 compactReadLatch.await(); 287 } catch (InterruptedException e) { 288 } 289 } 290 } 291 return next; 292 } 293 } 294 295 @Test 296 public void testHBASE16372InReadPath() throws Exception { 297 final TableName tableName = TableName.valueOf(name.getMethodName()); 298 // Create a table with block size as 1024 299 final Table table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, null); 300 try { 301 // get the block cache and region 302 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 303 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 304 HRegion region = 305 (HRegion) TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 306 HStore store = region.getStores().iterator().next(); 307 CacheConfig cacheConf = store.getCacheConfig(); 308 cacheConf.setCacheDataOnWrite(true); 309 cacheConf.setEvictOnClose(true); 310 final BlockCache cache = cacheConf.getBlockCache().get(); 311 // insert data. 5 Rows are added 312 Put put = new Put(ROW); 313 put.addColumn(FAMILY, QUALIFIER, data); 314 table.put(put); 315 put = new Put(ROW); 316 put.addColumn(FAMILY, QUALIFIER1, data); 317 table.put(put); 318 put = new Put(ROW1); 319 put.addColumn(FAMILY, QUALIFIER, data); 320 table.put(put); 321 put = new Put(ROW1); 322 put.addColumn(FAMILY, QUALIFIER1, data); 323 table.put(put); 324 put = new Put(ROW2); 325 put.addColumn(FAMILY, QUALIFIER, data); 326 table.put(put); 327 put = new Put(ROW2); 328 put.addColumn(FAMILY, QUALIFIER1, data); 329 table.put(put); 330 put = new Put(ROW3); 331 put.addColumn(FAMILY, QUALIFIER, data); 332 table.put(put); 333 put = new Put(ROW3); 334 put.addColumn(FAMILY, QUALIFIER1, data); 335 table.put(put); 336 put = new Put(ROW4); 337 put.addColumn(FAMILY, QUALIFIER, data); 338 table.put(put); 339 put = new Put(ROW4); 340 put.addColumn(FAMILY, QUALIFIER1, data); 341 table.put(put); 342 put = new Put(ROW5); 343 put.addColumn(FAMILY, QUALIFIER, data); 344 table.put(put); 345 put = new Put(ROW5); 346 put.addColumn(FAMILY, QUALIFIER1, data); 347 table.put(put); 348 // data was in memstore so don't expect any changes 349 region.flush(true); 350 // Load cache 351 Scan s = new Scan(); 352 s.setMaxResultSize(1000); 353 int count; 354 try (ResultScanner scanner = table.getScanner(s)) { 355 count = Iterables.size(scanner); 356 } 357 assertEquals("Count all the rows ", 6, count); 358 359 // Scan from cache 360 s = new Scan(); 361 // Start a scan from row3 362 s.setCaching(1); 363 s.withStartRow(ROW1); 364 // set partial as true so that the scan can send partial columns also 365 s.setAllowPartialResults(true); 366 s.setMaxResultSize(1000); 367 try (ResultScanner scanner = table.getScanner(s)) { 368 Thread evictorThread = new Thread() { 369 @Override 370 public void run() { 371 List<BlockCacheKey> cacheList = new ArrayList<>(); 372 Iterator<CachedBlock> iterator = cache.iterator(); 373 // evict all the blocks 374 while (iterator.hasNext()) { 375 CachedBlock next = iterator.next(); 376 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 377 cacheList.add(cacheKey); 378 /** 379 * There is only one Block referenced by rpc,here we evict blocks which have no rpc 380 * referenced. 381 */ 382 evictBlock(cache, cacheKey); 383 } 384 try { 385 Thread.sleep(1); 386 } catch (InterruptedException e1) { 387 } 388 iterator = cache.iterator(); 389 int refBlockCount = 0; 390 while (iterator.hasNext()) { 391 iterator.next(); 392 refBlockCount++; 393 } 394 assertEquals("One block should be there ", 1, refBlockCount); 395 // Rescan to prepopulate the data 396 // cache this row. 397 Scan s1 = new Scan(); 398 // This scan will start from ROW1 and it will populate the cache with a 399 // row that is lower than ROW3. 400 s1.withStartRow(ROW3); 401 s1.withStopRow(ROW5); 402 s1.setCaching(1); 403 ResultScanner scanner; 404 try { 405 scanner = table.getScanner(s1); 406 int count = Iterables.size(scanner); 407 assertEquals("Count the rows", 2, count); 408 int newBlockRefCount = 0; 409 List<BlockCacheKey> newCacheList = new ArrayList<>(); 410 while (true) { 411 newBlockRefCount = 0; 412 newCacheList.clear(); 413 iterator = cache.iterator(); 414 while (iterator.hasNext()) { 415 CachedBlock next = iterator.next(); 416 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 417 newCacheList.add(cacheKey); 418 } 419 for (BlockCacheKey key : cacheList) { 420 if (newCacheList.contains(key)) { 421 newBlockRefCount++; 422 } 423 } 424 if (newBlockRefCount == 6) { 425 break; 426 } 427 } 428 latch.countDown(); 429 } catch (IOException e) { 430 } 431 } 432 }; 433 count = 0; 434 while (scanner.next() != null) { 435 count++; 436 if (count == 2) { 437 evictorThread.start(); 438 latch.await(); 439 } 440 } 441 } 442 assertEquals("Count should give all rows ", 10, count); 443 } finally { 444 table.close(); 445 } 446 } 447 448 /** 449 * For {@link BucketCache},we only evict Block if there is no rpc referenced. 450 */ 451 private void evictBlock(BlockCache blockCache, BlockCacheKey blockCacheKey) { 452 assertTrue(blockCache instanceof CombinedBlockCache); 453 BlockCache[] blockCaches = blockCache.getBlockCaches(); 454 for (BlockCache currentBlockCache : blockCaches) { 455 if (currentBlockCache instanceof BucketCache) { 456 ((BucketCache) currentBlockCache).evictBlockIfNoRpcReferenced(blockCacheKey); 457 } else { 458 currentBlockCache.evictBlock(blockCacheKey); 459 } 460 } 461 462 } 463}