001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.Iterator; 025import java.util.List; 026import java.util.Optional; 027import java.util.concurrent.CountDownLatch; 028import java.util.concurrent.atomic.AtomicBoolean; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.Cell; 031import org.apache.hadoop.hbase.CellComparatorImpl; 032import org.apache.hadoop.hbase.HBaseClassTestRule; 033import org.apache.hadoop.hbase.HBaseTestingUtility; 034import org.apache.hadoop.hbase.HConstants; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 037import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; 038import org.apache.hadoop.hbase.coprocessor.ObserverContext; 039import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 040import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 041import org.apache.hadoop.hbase.coprocessor.RegionObserver; 042import org.apache.hadoop.hbase.io.hfile.BlockCache; 043import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 044import org.apache.hadoop.hbase.io.hfile.CacheConfig; 045import org.apache.hadoop.hbase.io.hfile.CachedBlock; 046import org.apache.hadoop.hbase.regionserver.DelegatingInternalScanner; 047import org.apache.hadoop.hbase.regionserver.HRegion; 048import org.apache.hadoop.hbase.regionserver.HStore; 049import org.apache.hadoop.hbase.regionserver.InternalScanner; 050import org.apache.hadoop.hbase.regionserver.ScanType; 051import org.apache.hadoop.hbase.regionserver.ScannerContext; 052import org.apache.hadoop.hbase.regionserver.Store; 053import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 054import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; 055import org.apache.hadoop.hbase.testclassification.ClientTests; 056import org.apache.hadoop.hbase.testclassification.LargeTests; 057import org.apache.hadoop.hbase.util.Bytes; 058import org.junit.AfterClass; 059import org.junit.BeforeClass; 060import org.junit.ClassRule; 061import org.junit.Rule; 062import org.junit.Test; 063import org.junit.experimental.categories.Category; 064import org.junit.rules.TestName; 065 066import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 067 068@Category({ LargeTests.class, ClientTests.class }) 069public class TestAvoidCellReferencesIntoShippedBlocks { 070 071 @ClassRule 072 public static final HBaseClassTestRule CLASS_RULE = 073 HBaseClassTestRule.forClass(TestAvoidCellReferencesIntoShippedBlocks.class); 074 075 protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 076 static byte[][] ROWS = new byte[2][]; 077 private static byte[] ROW = Bytes.toBytes("testRow"); 078 private static byte[] ROW1 = Bytes.toBytes("testRow1"); 079 private static byte[] ROW2 = Bytes.toBytes("testRow2"); 080 private static byte[] ROW3 = Bytes.toBytes("testRow3"); 081 private static byte[] ROW4 = Bytes.toBytes("testRow4"); 082 private static byte[] ROW5 = Bytes.toBytes("testRow5"); 083 private static byte[] FAMILY = Bytes.toBytes("testFamily"); 084 private static byte[][] FAMILIES_1 = new byte[1][0]; 085 private static byte[] QUALIFIER = Bytes.toBytes("testQualifier"); 086 private static byte[] QUALIFIER1 = Bytes.toBytes("testQualifier1"); 087 private static byte[] data = new byte[1000]; 088 protected static int SLAVES = 1; 089 private CountDownLatch latch = new CountDownLatch(1); 090 private static CountDownLatch compactReadLatch = new CountDownLatch(1); 091 private static AtomicBoolean doScan = new AtomicBoolean(false); 092 093 @Rule 094 public TestName name = new TestName(); 095 096 /** 097 * @throws java.lang.Exception 098 */ 099 @BeforeClass 100 public static void setUpBeforeClass() throws Exception { 101 ROWS[0] = ROW; 102 ROWS[1] = ROW1; 103 Configuration conf = TEST_UTIL.getConfiguration(); 104 conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, 105 MultiRowMutationEndpoint.class.getName()); 106 conf.setBoolean("hbase.table.sanity.checks", true); // enable for below 107 // tests 108 conf.setInt("hbase.regionserver.handler.count", 20); 109 conf.setInt("hbase.bucketcache.size", 400); 110 conf.setStrings(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap"); 111 conf.setInt("hbase.hstore.compactionThreshold", 7); 112 conf.setFloat("hfile.block.cache.size", 0.2f); 113 conf.setFloat("hbase.regionserver.global.memstore.size", 0.1f); 114 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);// do not retry 115 conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 500000); 116 FAMILIES_1[0] = FAMILY; 117 TEST_UTIL.startMiniCluster(SLAVES); 118 compactReadLatch = new CountDownLatch(1); 119 } 120 121 /** 122 * @throws java.lang.Exception 123 */ 124 @AfterClass 125 public static void tearDownAfterClass() throws Exception { 126 TEST_UTIL.shutdownMiniCluster(); 127 } 128 129 @Test 130 public void testHBase16372InCompactionWritePath() throws Exception { 131 final TableName tableName = TableName.valueOf(name.getMethodName()); 132 // Create a table with block size as 1024 133 final Table table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 134 CompactorRegionObserver.class.getName()); 135 try { 136 // get the block cache and region 137 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 138 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 139 HRegion region = 140 (HRegion) TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 141 HStore store = region.getStores().iterator().next(); 142 CacheConfig cacheConf = store.getCacheConfig(); 143 cacheConf.setCacheDataOnWrite(true); 144 cacheConf.setEvictOnClose(true); 145 final BlockCache cache = cacheConf.getBlockCache(); 146 // insert data. 5 Rows are added 147 Put put = new Put(ROW); 148 put.addColumn(FAMILY, QUALIFIER, data); 149 table.put(put); 150 put = new Put(ROW); 151 put.addColumn(FAMILY, QUALIFIER1, data); 152 table.put(put); 153 put = new Put(ROW1); 154 put.addColumn(FAMILY, QUALIFIER, data); 155 table.put(put); 156 // data was in memstore so don't expect any changes 157 region.flush(true); 158 put = new Put(ROW1); 159 put.addColumn(FAMILY, QUALIFIER1, data); 160 table.put(put); 161 put = new Put(ROW2); 162 put.addColumn(FAMILY, QUALIFIER, data); 163 table.put(put); 164 put = new Put(ROW2); 165 put.addColumn(FAMILY, QUALIFIER1, data); 166 table.put(put); 167 // data was in memstore so don't expect any changes 168 region.flush(true); 169 put = new Put(ROW3); 170 put.addColumn(FAMILY, QUALIFIER, data); 171 table.put(put); 172 put = new Put(ROW3); 173 put.addColumn(FAMILY, QUALIFIER1, data); 174 table.put(put); 175 put = new Put(ROW4); 176 put.addColumn(FAMILY, QUALIFIER, data); 177 table.put(put); 178 // data was in memstore so don't expect any changes 179 region.flush(true); 180 put = new Put(ROW4); 181 put.addColumn(FAMILY, QUALIFIER1, data); 182 table.put(put); 183 put = new Put(ROW5); 184 put.addColumn(FAMILY, QUALIFIER, data); 185 table.put(put); 186 put = new Put(ROW5); 187 put.addColumn(FAMILY, QUALIFIER1, data); 188 table.put(put); 189 // data was in memstore so don't expect any changes 190 region.flush(true); 191 // Load cache 192 Scan s = new Scan(); 193 s.setMaxResultSize(1000); 194 int count; 195 try (ResultScanner scanner = table.getScanner(s)) { 196 count = Iterables.size(scanner); 197 } 198 assertEquals("Count all the rows ", 6, count); 199 // all the cache is loaded 200 // trigger a major compaction 201 ScannerThread scannerThread = new ScannerThread(table, cache); 202 scannerThread.start(); 203 region.compact(true); 204 s = new Scan(); 205 s.setMaxResultSize(1000); 206 try (ResultScanner scanner = table.getScanner(s)) { 207 count = Iterables.size(scanner); 208 } 209 assertEquals("Count all the rows ", 6, count); 210 } finally { 211 table.close(); 212 } 213 } 214 215 private static class ScannerThread extends Thread { 216 private final Table table; 217 private final BlockCache cache; 218 219 public ScannerThread(Table table, BlockCache cache) { 220 this.table = table; 221 this.cache = cache; 222 } 223 224 @Override 225 public void run() { 226 Scan s = new Scan().withStartRow(ROW4).withStopRow(ROW5).setCaching(1); 227 try { 228 while(!doScan.get()) { 229 try { 230 // Sleep till you start scan 231 Thread.sleep(1); 232 } catch (InterruptedException e) { 233 } 234 } 235 List<BlockCacheKey> cacheList = new ArrayList<>(); 236 Iterator<CachedBlock> iterator = cache.iterator(); 237 // evict all the blocks 238 while (iterator.hasNext()) { 239 CachedBlock next = iterator.next(); 240 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 241 cacheList.add(cacheKey); 242 // evict what ever is available 243 cache.evictBlock(cacheKey); 244 } 245 try (ResultScanner scanner = table.getScanner(s)) { 246 while (scanner.next() != null) { 247 } 248 } 249 compactReadLatch.countDown(); 250 } catch (IOException e) { 251 } 252 } 253 } 254 255 public static class CompactorRegionObserver implements RegionCoprocessor, RegionObserver { 256 257 @Override 258 public Optional<RegionObserver> getRegionObserver() { 259 return Optional.of(this); 260 } 261 262 @Override 263 public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store, 264 InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker, 265 CompactionRequest request) throws IOException { 266 return new CompactorInternalScanner(scanner); 267 } 268 } 269 270 private static final class CompactorInternalScanner extends DelegatingInternalScanner { 271 272 public CompactorInternalScanner(InternalScanner scanner) { 273 super(scanner); 274 } 275 276 @Override 277 public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException { 278 boolean next = scanner.next(result, scannerContext); 279 for (Cell cell : result) { 280 if (CellComparatorImpl.COMPARATOR.compareRows(cell, ROW2, 0, ROW2.length) == 0) { 281 try { 282 // hold the compaction 283 // set doscan to true 284 doScan.compareAndSet(false, true); 285 compactReadLatch.await(); 286 } catch (InterruptedException e) { 287 } 288 } 289 } 290 return next; 291 } 292 } 293 294 @Test 295 public void testHBASE16372InReadPath() throws Exception { 296 final TableName tableName = TableName.valueOf(name.getMethodName()); 297 // Create a table with block size as 1024 298 final Table table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, null); 299 try { 300 // get the block cache and region 301 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 302 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 303 HRegion region = (HRegion) TEST_UTIL.getRSForFirstRegionInTable(tableName) 304 .getRegion(regionName); 305 HStore store = region.getStores().iterator().next(); 306 CacheConfig cacheConf = store.getCacheConfig(); 307 cacheConf.setCacheDataOnWrite(true); 308 cacheConf.setEvictOnClose(true); 309 final BlockCache cache = cacheConf.getBlockCache(); 310 // insert data. 5 Rows are added 311 Put put = new Put(ROW); 312 put.addColumn(FAMILY, QUALIFIER, data); 313 table.put(put); 314 put = new Put(ROW); 315 put.addColumn(FAMILY, QUALIFIER1, data); 316 table.put(put); 317 put = new Put(ROW1); 318 put.addColumn(FAMILY, QUALIFIER, data); 319 table.put(put); 320 put = new Put(ROW1); 321 put.addColumn(FAMILY, QUALIFIER1, data); 322 table.put(put); 323 put = new Put(ROW2); 324 put.addColumn(FAMILY, QUALIFIER, data); 325 table.put(put); 326 put = new Put(ROW2); 327 put.addColumn(FAMILY, QUALIFIER1, data); 328 table.put(put); 329 put = new Put(ROW3); 330 put.addColumn(FAMILY, QUALIFIER, data); 331 table.put(put); 332 put = new Put(ROW3); 333 put.addColumn(FAMILY, QUALIFIER1, data); 334 table.put(put); 335 put = new Put(ROW4); 336 put.addColumn(FAMILY, QUALIFIER, data); 337 table.put(put); 338 put = new Put(ROW4); 339 put.addColumn(FAMILY, QUALIFIER1, data); 340 table.put(put); 341 put = new Put(ROW5); 342 put.addColumn(FAMILY, QUALIFIER, data); 343 table.put(put); 344 put = new Put(ROW5); 345 put.addColumn(FAMILY, QUALIFIER1, data); 346 table.put(put); 347 // data was in memstore so don't expect any changes 348 region.flush(true); 349 // Load cache 350 Scan s = new Scan(); 351 s.setMaxResultSize(1000); 352 int count; 353 try (ResultScanner scanner = table.getScanner(s)) { 354 count = Iterables.size(scanner); 355 } 356 assertEquals("Count all the rows ", 6, count); 357 358 // Scan from cache 359 s = new Scan(); 360 // Start a scan from row3 361 s.setCaching(1); 362 s.withStartRow(ROW1); 363 // set partial as true so that the scan can send partial columns also 364 s.setAllowPartialResults(true); 365 s.setMaxResultSize(1000); 366 try (ResultScanner scanner = table.getScanner(s)) { 367 Thread evictorThread = new Thread() { 368 @Override 369 public void run() { 370 List<BlockCacheKey> cacheList = new ArrayList<>(); 371 Iterator<CachedBlock> iterator = cache.iterator(); 372 // evict all the blocks 373 while (iterator.hasNext()) { 374 CachedBlock next = iterator.next(); 375 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 376 cacheList.add(cacheKey); 377 cache.evictBlock(cacheKey); 378 } 379 try { 380 Thread.sleep(1); 381 } catch (InterruptedException e1) { 382 } 383 iterator = cache.iterator(); 384 int refBlockCount = 0; 385 while (iterator.hasNext()) { 386 iterator.next(); 387 refBlockCount++; 388 } 389 assertEquals("One block should be there ", 1, refBlockCount); 390 // Rescan to prepopulate the data 391 // cache this row. 392 Scan s1 = new Scan(); 393 // This scan will start from ROW1 and it will populate the cache with a 394 // row that is lower than ROW3. 395 s1.withStartRow(ROW3); 396 s1.withStopRow(ROW5); 397 s1.setCaching(1); 398 ResultScanner scanner; 399 try { 400 scanner = table.getScanner(s1); 401 int count = Iterables.size(scanner); 402 assertEquals("Count the rows", 2, count); 403 int newBlockRefCount = 0; 404 List<BlockCacheKey> newCacheList = new ArrayList<>(); 405 while (true) { 406 newBlockRefCount = 0; 407 newCacheList.clear(); 408 iterator = cache.iterator(); 409 while (iterator.hasNext()) { 410 CachedBlock next = iterator.next(); 411 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 412 newCacheList.add(cacheKey); 413 } 414 for (BlockCacheKey key : cacheList) { 415 if (newCacheList.contains(key)) { 416 newBlockRefCount++; 417 } 418 } 419 if (newBlockRefCount == 6) { 420 break; 421 } 422 } 423 latch.countDown(); 424 } catch (IOException e) { 425 } 426 } 427 }; 428 count = 0; 429 while (scanner.next() != null) { 430 count++; 431 if (count == 2) { 432 evictorThread.start(); 433 latch.await(); 434 } 435 } 436 } 437 assertEquals("Count should give all rows ", 10, count); 438 } finally { 439 table.close(); 440 } 441 } 442}