001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.Iterator; 025import java.util.List; 026import java.util.Optional; 027import java.util.concurrent.CountDownLatch; 028import java.util.concurrent.atomic.AtomicBoolean; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.Cell; 031import org.apache.hadoop.hbase.CellComparatorImpl; 032import org.apache.hadoop.hbase.HBaseClassTestRule; 033import org.apache.hadoop.hbase.HBaseTestingUtility; 034import org.apache.hadoop.hbase.HConstants; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 037import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; 038import org.apache.hadoop.hbase.coprocessor.ObserverContext; 039import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 040import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 041import org.apache.hadoop.hbase.coprocessor.RegionObserver; 042import org.apache.hadoop.hbase.io.hfile.BlockCache; 043import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 044import org.apache.hadoop.hbase.io.hfile.CacheConfig; 045import org.apache.hadoop.hbase.io.hfile.CachedBlock; 046import org.apache.hadoop.hbase.regionserver.DelegatingInternalScanner; 047import org.apache.hadoop.hbase.regionserver.HRegion; 048import org.apache.hadoop.hbase.regionserver.HStore; 049import org.apache.hadoop.hbase.regionserver.InternalScanner; 050import org.apache.hadoop.hbase.regionserver.ScanType; 051import org.apache.hadoop.hbase.regionserver.ScannerContext; 052import org.apache.hadoop.hbase.regionserver.Store; 053import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 054import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; 055import org.apache.hadoop.hbase.testclassification.ClientTests; 056import org.apache.hadoop.hbase.testclassification.LargeTests; 057import org.apache.hadoop.hbase.util.Bytes; 058import org.junit.AfterClass; 059import org.junit.BeforeClass; 060import org.junit.ClassRule; 061import org.junit.Rule; 062import org.junit.Test; 063import org.junit.experimental.categories.Category; 064import org.junit.rules.TestName; 065 066import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 067 068@Category({ LargeTests.class, ClientTests.class }) 069public class TestAvoidCellReferencesIntoShippedBlocks { 070 071 @ClassRule 072 public static final HBaseClassTestRule CLASS_RULE = 073 HBaseClassTestRule.forClass(TestAvoidCellReferencesIntoShippedBlocks.class); 074 075 protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 076 static byte[][] ROWS = new byte[2][]; 077 private static byte[] ROW = Bytes.toBytes("testRow"); 078 private static byte[] ROW1 = Bytes.toBytes("testRow1"); 079 private static byte[] ROW2 = Bytes.toBytes("testRow2"); 080 private static byte[] ROW3 = Bytes.toBytes("testRow3"); 081 private static byte[] ROW4 = Bytes.toBytes("testRow4"); 082 private static byte[] ROW5 = Bytes.toBytes("testRow5"); 083 private static byte[] FAMILY = Bytes.toBytes("testFamily"); 084 private static byte[][] FAMILIES_1 = new byte[1][0]; 085 private static byte[] QUALIFIER = Bytes.toBytes("testQualifier"); 086 private static byte[] QUALIFIER1 = Bytes.toBytes("testQualifier1"); 087 private static byte[] data = new byte[1000]; 088 protected static int SLAVES = 1; 089 private CountDownLatch latch = new CountDownLatch(1); 090 private static CountDownLatch compactReadLatch = new CountDownLatch(1); 091 private static AtomicBoolean doScan = new AtomicBoolean(false); 092 093 @Rule 094 public TestName name = new TestName(); 095 096 /** 097 * @throws java.lang.Exception 098 */ 099 @BeforeClass 100 public static void setUpBeforeClass() throws Exception { 101 ROWS[0] = ROW; 102 ROWS[1] = ROW1; 103 Configuration conf = TEST_UTIL.getConfiguration(); 104 conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, 105 MultiRowMutationEndpoint.class.getName()); 106 conf.setInt("hbase.regionserver.handler.count", 20); 107 conf.setInt("hbase.bucketcache.size", 400); 108 conf.setStrings(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap"); 109 conf.setInt("hbase.hstore.compactionThreshold", 7); 110 conf.setFloat("hfile.block.cache.size", 0.2f); 111 conf.setFloat("hbase.regionserver.global.memstore.size", 0.1f); 112 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);// do not retry 113 conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 500000); 114 FAMILIES_1[0] = FAMILY; 115 TEST_UTIL.startMiniCluster(SLAVES); 116 compactReadLatch = new CountDownLatch(1); 117 } 118 119 /** 120 * @throws java.lang.Exception 121 */ 122 @AfterClass 123 public static void tearDownAfterClass() throws Exception { 124 TEST_UTIL.shutdownMiniCluster(); 125 } 126 127 @Test 128 public void testHBase16372InCompactionWritePath() throws Exception { 129 final TableName tableName = TableName.valueOf(name.getMethodName()); 130 // Create a table with block size as 1024 131 final Table table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 132 CompactorRegionObserver.class.getName()); 133 try { 134 // get the block cache and region 135 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 136 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 137 HRegion region = 138 (HRegion) TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 139 HStore store = region.getStores().iterator().next(); 140 CacheConfig cacheConf = store.getCacheConfig(); 141 cacheConf.setCacheDataOnWrite(true); 142 cacheConf.setEvictOnClose(true); 143 final BlockCache cache = cacheConf.getBlockCache().get(); 144 // insert data. 5 Rows are added 145 Put put = new Put(ROW); 146 put.addColumn(FAMILY, QUALIFIER, data); 147 table.put(put); 148 put = new Put(ROW); 149 put.addColumn(FAMILY, QUALIFIER1, data); 150 table.put(put); 151 put = new Put(ROW1); 152 put.addColumn(FAMILY, QUALIFIER, data); 153 table.put(put); 154 // data was in memstore so don't expect any changes 155 region.flush(true); 156 put = new Put(ROW1); 157 put.addColumn(FAMILY, QUALIFIER1, data); 158 table.put(put); 159 put = new Put(ROW2); 160 put.addColumn(FAMILY, QUALIFIER, data); 161 table.put(put); 162 put = new Put(ROW2); 163 put.addColumn(FAMILY, QUALIFIER1, data); 164 table.put(put); 165 // data was in memstore so don't expect any changes 166 region.flush(true); 167 put = new Put(ROW3); 168 put.addColumn(FAMILY, QUALIFIER, data); 169 table.put(put); 170 put = new Put(ROW3); 171 put.addColumn(FAMILY, QUALIFIER1, data); 172 table.put(put); 173 put = new Put(ROW4); 174 put.addColumn(FAMILY, QUALIFIER, data); 175 table.put(put); 176 // data was in memstore so don't expect any changes 177 region.flush(true); 178 put = new Put(ROW4); 179 put.addColumn(FAMILY, QUALIFIER1, data); 180 table.put(put); 181 put = new Put(ROW5); 182 put.addColumn(FAMILY, QUALIFIER, data); 183 table.put(put); 184 put = new Put(ROW5); 185 put.addColumn(FAMILY, QUALIFIER1, data); 186 table.put(put); 187 // data was in memstore so don't expect any changes 188 region.flush(true); 189 // Load cache 190 Scan s = new Scan(); 191 s.setMaxResultSize(1000); 192 int count; 193 try (ResultScanner scanner = table.getScanner(s)) { 194 count = Iterables.size(scanner); 195 } 196 assertEquals("Count all the rows ", 6, count); 197 // all the cache is loaded 198 // trigger a major compaction 199 ScannerThread scannerThread = new ScannerThread(table, cache); 200 scannerThread.start(); 201 region.compact(true); 202 s = new Scan(); 203 s.setMaxResultSize(1000); 204 try (ResultScanner scanner = table.getScanner(s)) { 205 count = Iterables.size(scanner); 206 } 207 assertEquals("Count all the rows ", 6, count); 208 } finally { 209 table.close(); 210 } 211 } 212 213 private static class ScannerThread extends Thread { 214 private final Table table; 215 private final BlockCache cache; 216 217 public ScannerThread(Table table, BlockCache cache) { 218 this.table = table; 219 this.cache = cache; 220 } 221 222 @Override 223 public void run() { 224 Scan s = new Scan().withStartRow(ROW4).withStopRow(ROW5).setCaching(1); 225 try { 226 while(!doScan.get()) { 227 try { 228 // Sleep till you start scan 229 Thread.sleep(1); 230 } catch (InterruptedException e) { 231 } 232 } 233 List<BlockCacheKey> cacheList = new ArrayList<>(); 234 Iterator<CachedBlock> iterator = cache.iterator(); 235 // evict all the blocks 236 while (iterator.hasNext()) { 237 CachedBlock next = iterator.next(); 238 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 239 cacheList.add(cacheKey); 240 // evict what ever is available 241 cache.evictBlock(cacheKey); 242 } 243 try (ResultScanner scanner = table.getScanner(s)) { 244 while (scanner.next() != null) { 245 } 246 } 247 compactReadLatch.countDown(); 248 } catch (IOException e) { 249 } 250 } 251 } 252 253 public static class CompactorRegionObserver implements RegionCoprocessor, RegionObserver { 254 255 @Override 256 public Optional<RegionObserver> getRegionObserver() { 257 return Optional.of(this); 258 } 259 260 @Override 261 public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store, 262 InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker, 263 CompactionRequest request) throws IOException { 264 return new CompactorInternalScanner(scanner); 265 } 266 } 267 268 private static final class CompactorInternalScanner extends DelegatingInternalScanner { 269 270 public CompactorInternalScanner(InternalScanner scanner) { 271 super(scanner); 272 } 273 274 @Override 275 public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException { 276 boolean next = scanner.next(result, scannerContext); 277 for (Cell cell : result) { 278 if (CellComparatorImpl.COMPARATOR.compareRows(cell, ROW2, 0, ROW2.length) == 0) { 279 try { 280 // hold the compaction 281 // set doscan to true 282 doScan.compareAndSet(false, true); 283 compactReadLatch.await(); 284 } catch (InterruptedException e) { 285 } 286 } 287 } 288 return next; 289 } 290 } 291 292 @Test 293 public void testHBASE16372InReadPath() throws Exception { 294 final TableName tableName = TableName.valueOf(name.getMethodName()); 295 // Create a table with block size as 1024 296 final Table table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, null); 297 try { 298 // get the block cache and region 299 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 300 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 301 HRegion region = (HRegion) TEST_UTIL.getRSForFirstRegionInTable(tableName) 302 .getRegion(regionName); 303 HStore store = region.getStores().iterator().next(); 304 CacheConfig cacheConf = store.getCacheConfig(); 305 cacheConf.setCacheDataOnWrite(true); 306 cacheConf.setEvictOnClose(true); 307 final BlockCache cache = cacheConf.getBlockCache().get(); 308 // insert data. 5 Rows are added 309 Put put = new Put(ROW); 310 put.addColumn(FAMILY, QUALIFIER, data); 311 table.put(put); 312 put = new Put(ROW); 313 put.addColumn(FAMILY, QUALIFIER1, data); 314 table.put(put); 315 put = new Put(ROW1); 316 put.addColumn(FAMILY, QUALIFIER, data); 317 table.put(put); 318 put = new Put(ROW1); 319 put.addColumn(FAMILY, QUALIFIER1, data); 320 table.put(put); 321 put = new Put(ROW2); 322 put.addColumn(FAMILY, QUALIFIER, data); 323 table.put(put); 324 put = new Put(ROW2); 325 put.addColumn(FAMILY, QUALIFIER1, data); 326 table.put(put); 327 put = new Put(ROW3); 328 put.addColumn(FAMILY, QUALIFIER, data); 329 table.put(put); 330 put = new Put(ROW3); 331 put.addColumn(FAMILY, QUALIFIER1, data); 332 table.put(put); 333 put = new Put(ROW4); 334 put.addColumn(FAMILY, QUALIFIER, data); 335 table.put(put); 336 put = new Put(ROW4); 337 put.addColumn(FAMILY, QUALIFIER1, data); 338 table.put(put); 339 put = new Put(ROW5); 340 put.addColumn(FAMILY, QUALIFIER, data); 341 table.put(put); 342 put = new Put(ROW5); 343 put.addColumn(FAMILY, QUALIFIER1, data); 344 table.put(put); 345 // data was in memstore so don't expect any changes 346 region.flush(true); 347 // Load cache 348 Scan s = new Scan(); 349 s.setMaxResultSize(1000); 350 int count; 351 try (ResultScanner scanner = table.getScanner(s)) { 352 count = Iterables.size(scanner); 353 } 354 assertEquals("Count all the rows ", 6, count); 355 356 // Scan from cache 357 s = new Scan(); 358 // Start a scan from row3 359 s.setCaching(1); 360 s.withStartRow(ROW1); 361 // set partial as true so that the scan can send partial columns also 362 s.setAllowPartialResults(true); 363 s.setMaxResultSize(1000); 364 try (ResultScanner scanner = table.getScanner(s)) { 365 Thread evictorThread = new Thread() { 366 @Override 367 public void run() { 368 List<BlockCacheKey> cacheList = new ArrayList<>(); 369 Iterator<CachedBlock> iterator = cache.iterator(); 370 // evict all the blocks 371 while (iterator.hasNext()) { 372 CachedBlock next = iterator.next(); 373 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 374 cacheList.add(cacheKey); 375 cache.evictBlock(cacheKey); 376 } 377 try { 378 Thread.sleep(1); 379 } catch (InterruptedException e1) { 380 } 381 iterator = cache.iterator(); 382 int refBlockCount = 0; 383 while (iterator.hasNext()) { 384 iterator.next(); 385 refBlockCount++; 386 } 387 assertEquals("One block should be there ", 1, refBlockCount); 388 // Rescan to prepopulate the data 389 // cache this row. 390 Scan s1 = new Scan(); 391 // This scan will start from ROW1 and it will populate the cache with a 392 // row that is lower than ROW3. 393 s1.withStartRow(ROW3); 394 s1.withStopRow(ROW5); 395 s1.setCaching(1); 396 ResultScanner scanner; 397 try { 398 scanner = table.getScanner(s1); 399 int count = Iterables.size(scanner); 400 assertEquals("Count the rows", 2, count); 401 int newBlockRefCount = 0; 402 List<BlockCacheKey> newCacheList = new ArrayList<>(); 403 while (true) { 404 newBlockRefCount = 0; 405 newCacheList.clear(); 406 iterator = cache.iterator(); 407 while (iterator.hasNext()) { 408 CachedBlock next = iterator.next(); 409 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 410 newCacheList.add(cacheKey); 411 } 412 for (BlockCacheKey key : cacheList) { 413 if (newCacheList.contains(key)) { 414 newBlockRefCount++; 415 } 416 } 417 if (newBlockRefCount == 6) { 418 break; 419 } 420 } 421 latch.countDown(); 422 } catch (IOException e) { 423 } 424 } 425 }; 426 count = 0; 427 while (scanner.next() != null) { 428 count++; 429 if (count == 2) { 430 evictorThread.start(); 431 latch.await(); 432 } 433 } 434 } 435 assertEquals("Count should give all rows ", 10, count); 436 } finally { 437 table.close(); 438 } 439 } 440}