001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY; 021import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY; 022import static org.apache.hadoop.hbase.io.ByteBuffAllocator.BUFFER_SIZE_KEY; 023import static org.apache.hadoop.hbase.io.ByteBuffAllocator.MAX_BUFFER_COUNT_KEY; 024import static org.apache.hadoop.hbase.io.ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY; 025import static org.apache.hadoop.hbase.io.hfile.BlockCacheFactory.BLOCKCACHE_POLICY_KEY; 026import static org.apache.hadoop.hbase.io.hfile.CacheConfig.EVICT_BLOCKS_ON_CLOSE_KEY; 027import static org.junit.Assert.assertEquals; 028import static org.junit.Assert.assertFalse; 029import static org.junit.Assert.assertNull; 030import static org.junit.Assert.assertTrue; 031import static org.junit.Assert.fail; 032 033import java.io.DataInput; 034import java.io.DataOutput; 035import java.io.IOException; 036import java.nio.ByteBuffer; 037import java.util.ArrayList; 038import java.util.Arrays; 039import java.util.List; 040import java.util.Objects; 041import java.util.Optional; 042import java.util.Random; 043import java.util.concurrent.ThreadLocalRandom; 044import java.util.concurrent.atomic.AtomicInteger; 045import org.apache.hadoop.conf.Configuration; 046import org.apache.hadoop.fs.FSDataInputStream; 047import org.apache.hadoop.fs.FSDataOutputStream; 048import org.apache.hadoop.fs.FileStatus; 049import org.apache.hadoop.fs.FileSystem; 050import org.apache.hadoop.fs.Path; 051import org.apache.hadoop.hbase.ArrayBackedTag; 052import org.apache.hadoop.hbase.ByteBufferKeyValue; 053import org.apache.hadoop.hbase.Cell; 054import org.apache.hadoop.hbase.CellBuilderType; 055import org.apache.hadoop.hbase.CellComparatorImpl; 056import org.apache.hadoop.hbase.CellUtil; 057import org.apache.hadoop.hbase.ExtendedCell; 058import org.apache.hadoop.hbase.ExtendedCellBuilder; 059import org.apache.hadoop.hbase.ExtendedCellBuilderFactory; 060import org.apache.hadoop.hbase.HBaseClassTestRule; 061import org.apache.hadoop.hbase.HBaseCommonTestingUtil; 062import org.apache.hadoop.hbase.HBaseConfiguration; 063import org.apache.hadoop.hbase.HBaseTestingUtil; 064import org.apache.hadoop.hbase.HConstants; 065import org.apache.hadoop.hbase.KeyValue; 066import org.apache.hadoop.hbase.KeyValue.Type; 067import org.apache.hadoop.hbase.KeyValueUtil; 068import org.apache.hadoop.hbase.MetaCellComparator; 069import org.apache.hadoop.hbase.PrivateCellUtil; 070import org.apache.hadoop.hbase.Tag; 071import org.apache.hadoop.hbase.io.ByteBuffAllocator; 072import org.apache.hadoop.hbase.io.compress.Compression; 073import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder; 074import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 075import org.apache.hadoop.hbase.io.encoding.IndexBlockEncoding; 076import org.apache.hadoop.hbase.io.hfile.HFile.Reader; 077import org.apache.hadoop.hbase.io.hfile.HFile.Writer; 078import org.apache.hadoop.hbase.io.hfile.ReaderContext.ReaderType; 079import org.apache.hadoop.hbase.monitoring.ThreadLocalServerSideScanMetrics; 080import org.apache.hadoop.hbase.nio.ByteBuff; 081import org.apache.hadoop.hbase.nio.RefCnt; 082import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 083import org.apache.hadoop.hbase.testclassification.IOTests; 084import org.apache.hadoop.hbase.testclassification.SmallTests; 085import org.apache.hadoop.hbase.util.ByteBufferUtils; 086import org.apache.hadoop.hbase.util.Bytes; 087import org.apache.hadoop.io.Writable; 088import org.junit.Assert; 089import org.junit.BeforeClass; 090import org.junit.ClassRule; 091import org.junit.Rule; 092import org.junit.Test; 093import org.junit.experimental.categories.Category; 094import org.junit.rules.TestName; 095import org.mockito.Mockito; 096import org.slf4j.Logger; 097import org.slf4j.LoggerFactory; 098 099import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetector; 100 101/** 102 * test hfile features. 103 */ 104@Category({ IOTests.class, SmallTests.class }) 105public class TestHFile { 106 107 @ClassRule 108 public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestHFile.class); 109 110 @Rule 111 public TestName testName = new TestName(); 112 113 private static final Logger LOG = LoggerFactory.getLogger(TestHFile.class); 114 private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length - 2; 115 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 116 private static String ROOT_DIR = TEST_UTIL.getDataTestDir("TestHFile").toString(); 117 private final int minBlockSize = 512; 118 private static String localFormatter = "%010d"; 119 private static CacheConfig cacheConf; 120 private static Configuration conf; 121 private static FileSystem fs; 122 123 @BeforeClass 124 public static void setUp() throws Exception { 125 conf = TEST_UTIL.getConfiguration(); 126 cacheConf = new CacheConfig(conf); 127 fs = TEST_UTIL.getTestFileSystem(); 128 } 129 130 public static Reader createReaderFromStream(ReaderContext context, CacheConfig cacheConf, 131 Configuration conf) throws IOException { 132 HFileInfo fileInfo = new HFileInfo(context, conf); 133 Reader preadReader = HFile.createReader(context, fileInfo, cacheConf, conf); 134 fileInfo.initMetaAndIndex(preadReader); 135 preadReader.close(); 136 context = new ReaderContextBuilder() 137 .withFileSystemAndPath(context.getFileSystem(), context.getFilePath()) 138 .withReaderType(ReaderType.STREAM).build(); 139 Reader streamReader = HFile.createReader(context, fileInfo, cacheConf, conf); 140 return streamReader; 141 } 142 143 private ByteBuffAllocator initAllocator(boolean reservoirEnabled, int bufSize, int bufCount, 144 int minAllocSize) { 145 Configuration that = HBaseConfiguration.create(conf); 146 that.setInt(BUFFER_SIZE_KEY, bufSize); 147 that.setInt(MAX_BUFFER_COUNT_KEY, bufCount); 148 // All ByteBuffers will be allocated from the buffers. 149 that.setInt(MIN_ALLOCATE_SIZE_KEY, minAllocSize); 150 return ByteBuffAllocator.create(that, reservoirEnabled); 151 } 152 153 private void fillByteBuffAllocator(ByteBuffAllocator alloc, int bufCount) { 154 // Fill the allocator with bufCount ByteBuffer 155 List<ByteBuff> buffs = new ArrayList<>(); 156 for (int i = 0; i < bufCount; i++) { 157 buffs.add(alloc.allocateOneBuffer()); 158 Assert.assertEquals(alloc.getFreeBufferCount(), 0); 159 } 160 buffs.forEach(ByteBuff::release); 161 Assert.assertEquals(alloc.getFreeBufferCount(), bufCount); 162 } 163 164 @Test 165 public void testReaderWithoutBlockCache() throws Exception { 166 int bufCount = 32; 167 // AllByteBuffers will be allocated from the buffers. 168 ByteBuffAllocator alloc = initAllocator(true, 64 * 1024, bufCount, 0); 169 fillByteBuffAllocator(alloc, bufCount); 170 // start write to store file. 171 Path path = writeStoreFile(); 172 readStoreFile(path, conf, alloc); 173 Assert.assertEquals(bufCount, alloc.getFreeBufferCount()); 174 alloc.clean(); 175 } 176 177 /** 178 * Test case for HBASE-22127 in LruBlockCache. 179 */ 180 @Test 181 public void testReaderWithLRUBlockCache() throws Exception { 182 int bufCount = 1024, blockSize = 64 * 1024; 183 ByteBuffAllocator alloc = initAllocator(true, bufCount, blockSize, 0); 184 fillByteBuffAllocator(alloc, bufCount); 185 Path storeFilePath = writeStoreFile(); 186 // Open the file reader with LRUBlockCache 187 BlockCache lru = new LruBlockCache(1024 * 1024 * 32, blockSize, true, conf); 188 CacheConfig cacheConfig = new CacheConfig(conf, null, lru, alloc); 189 HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConfig, true, conf); 190 long offset = 0; 191 while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) { 192 BlockCacheKey key = new BlockCacheKey(storeFilePath.getName(), offset); 193 HFileBlock block = reader.readBlock(offset, -1, true, true, false, true, null, null); 194 offset += block.getOnDiskSizeWithHeader(); 195 // Ensure the block is an heap one. 196 Cacheable cachedBlock = lru.getBlock(key, false, false, true); 197 Assert.assertNotNull(cachedBlock); 198 Assert.assertTrue(cachedBlock instanceof HFileBlock); 199 Assert.assertFalse(((HFileBlock) cachedBlock).isSharedMem()); 200 // Should never allocate off-heap block from allocator because ensure that it's LRU. 201 Assert.assertEquals(bufCount, alloc.getFreeBufferCount()); 202 block.release(); // return back the ByteBuffer back to allocator. 203 } 204 reader.close(); 205 Assert.assertEquals(bufCount, alloc.getFreeBufferCount()); 206 alloc.clean(); 207 lru.shutdown(); 208 } 209 210 @Test 211 public void testWriterCacheOnWriteSkipDoesNotLeak() throws Exception { 212 int bufCount = 32; 213 int blockSize = 4 * 1024; 214 ByteBuffAllocator alloc = initAllocator(true, blockSize, bufCount, 0); 215 fillByteBuffAllocator(alloc, bufCount); 216 ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID); 217 Configuration myConf = HBaseConfiguration.create(conf); 218 myConf.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, true); 219 myConf.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, false); 220 myConf.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, false); 221 final AtomicInteger counter = new AtomicInteger(); 222 RefCnt.detector.setLeakListener(new ResourceLeakDetector.LeakListener() { 223 @Override 224 public void onLeak(String s, String s1) { 225 counter.incrementAndGet(); 226 } 227 }); 228 BlockCache cache = Mockito.mock(BlockCache.class); 229 Mockito.when(cache.shouldCacheBlock(Mockito.any(), Mockito.anyLong(), Mockito.any())) 230 .thenReturn(Optional.of(false)); 231 Path hfilePath = new Path(TEST_UTIL.getDataTestDir(), "testWriterCacheOnWriteSkipDoesNotLeak"); 232 HFileContext context = new HFileContextBuilder().withBlockSize(blockSize).build(); 233 234 try { 235 Writer writer = new HFile.WriterFactory(myConf, new CacheConfig(myConf, null, cache, alloc)) 236 .withPath(fs, hfilePath).withFileContext(context).create(); 237 try { 238 writer.append(new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("cf"), Bytes.toBytes("q"), 239 HConstants.LATEST_TIMESTAMP, Bytes.toBytes("value"))); 240 } finally { 241 writer.close(); 242 } 243 244 Mockito.verify(cache).shouldCacheBlock(Mockito.any(), Mockito.anyLong(), Mockito.any()); 245 Mockito.verify(cache, Mockito.never()).cacheBlock(Mockito.any(), Mockito.any(), 246 Mockito.anyBoolean(), Mockito.anyBoolean()); 247 for (int i = 0; i < 30 && counter.get() == 0; i++) { 248 System.gc(); 249 try { 250 Thread.sleep(1000); 251 } catch (InterruptedException e) { 252 Thread.currentThread().interrupt(); 253 break; 254 } 255 alloc.allocate(128 * 1024).release(); 256 } 257 assertEquals(0, counter.get()); 258 } finally { 259 fs.delete(hfilePath, false); 260 alloc.clean(); 261 } 262 } 263 264 private void assertBytesReadFromCache(boolean isScanMetricsEnabled) throws Exception { 265 assertBytesReadFromCache(isScanMetricsEnabled, DataBlockEncoding.NONE); 266 } 267 268 private void assertBytesReadFromCache(boolean isScanMetricsEnabled, DataBlockEncoding encoding) 269 throws Exception { 270 // Write a store file 271 Path storeFilePath = writeStoreFile(); 272 273 // Initialize the block cache and HFile reader 274 BlockCache lru = BlockCacheFactory.createBlockCache(conf); 275 Assert.assertTrue(lru instanceof LruBlockCache); 276 CacheConfig cacheConfig = new CacheConfig(conf, null, lru, ByteBuffAllocator.HEAP); 277 HFileReaderImpl reader = 278 (HFileReaderImpl) HFile.createReader(fs, storeFilePath, cacheConfig, true, conf); 279 280 // Read the first block in HFile from the block cache. 281 final int offset = 0; 282 BlockCacheKey cacheKey = new BlockCacheKey(storeFilePath.getName(), offset); 283 HFileBlock block = (HFileBlock) lru.getBlock(cacheKey, false, false, true); 284 Assert.assertNull(block); 285 286 // Assert that first block has not been cached in the block cache and no disk I/O happened to 287 // check that. 288 ThreadLocalServerSideScanMetrics.getBytesReadFromBlockCacheAndReset(); 289 ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset(); 290 block = reader.getCachedBlock(cacheKey, false, false, true, BlockType.DATA, null); 291 Assert.assertEquals(0, ThreadLocalServerSideScanMetrics.getBytesReadFromBlockCacheAndReset()); 292 Assert.assertEquals(0, ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset()); 293 294 // Read the first block from the HFile. 295 block = reader.readBlock(offset, -1, true, true, false, true, BlockType.DATA, null); 296 Assert.assertNotNull(block); 297 int bytesReadFromFs = block.getOnDiskSizeWithHeader(); 298 if (block.getNextBlockOnDiskSize() > 0) { 299 bytesReadFromFs += block.headerSize(); 300 } 301 block.release(); 302 // Assert that disk I/O happened to read the first block. 303 Assert.assertEquals(isScanMetricsEnabled ? bytesReadFromFs : 0, 304 ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset()); 305 Assert.assertEquals(0, ThreadLocalServerSideScanMetrics.getBytesReadFromBlockCacheAndReset()); 306 307 // Read the first block again and assert that it has been cached in the block cache. 308 block = reader.getCachedBlock(cacheKey, false, false, true, BlockType.DATA, encoding); 309 long bytesReadFromCache = 0; 310 if (encoding == DataBlockEncoding.NONE) { 311 Assert.assertNotNull(block); 312 bytesReadFromCache = block.getOnDiskSizeWithHeader(); 313 if (block.getNextBlockOnDiskSize() > 0) { 314 bytesReadFromCache += block.headerSize(); 315 } 316 block.release(); 317 // Assert that bytes read from block cache account for same number of bytes that would have 318 // been read from FS if block cache wasn't there. 319 Assert.assertEquals(bytesReadFromFs, bytesReadFromCache); 320 } else { 321 Assert.assertNull(block); 322 } 323 Assert.assertEquals(isScanMetricsEnabled ? bytesReadFromCache : 0, 324 ThreadLocalServerSideScanMetrics.getBytesReadFromBlockCacheAndReset()); 325 Assert.assertEquals(0, ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset()); 326 327 reader.close(); 328 } 329 330 @Test 331 public void testBytesReadFromCache() throws Exception { 332 ThreadLocalServerSideScanMetrics.setScanMetricsEnabled(true); 333 assertBytesReadFromCache(true); 334 } 335 336 @Test 337 public void testBytesReadFromCacheWithScanMetricsDisabled() throws Exception { 338 ThreadLocalServerSideScanMetrics.setScanMetricsEnabled(false); 339 assertBytesReadFromCache(false); 340 } 341 342 @Test 343 public void testBytesReadFromCacheWithInvalidDataEncoding() throws Exception { 344 ThreadLocalServerSideScanMetrics.setScanMetricsEnabled(true); 345 assertBytesReadFromCache(true, DataBlockEncoding.FAST_DIFF); 346 } 347 348 private BlockCache initCombinedBlockCache(final String l1CachePolicy) { 349 Configuration that = HBaseConfiguration.create(conf); 350 that.setFloat(BUCKET_CACHE_SIZE_KEY, 32); // 32MB for bucket cache. 351 that.set(BUCKET_CACHE_IOENGINE_KEY, "offheap"); 352 that.set(BLOCKCACHE_POLICY_KEY, l1CachePolicy); 353 BlockCache bc = BlockCacheFactory.createBlockCache(that); 354 Assert.assertNotNull(bc); 355 Assert.assertTrue(bc instanceof CombinedBlockCache); 356 return bc; 357 } 358 359 /** 360 * Test case for HBASE-22127 in CombinedBlockCache 361 */ 362 @Test 363 public void testReaderWithCombinedBlockCache() throws Exception { 364 int bufCount = 1024, blockSize = 64 * 1024; 365 ByteBuffAllocator alloc = initAllocator(true, bufCount, blockSize, 0); 366 fillByteBuffAllocator(alloc, bufCount); 367 Path storeFilePath = writeStoreFile(); 368 // Open the file reader with CombinedBlockCache 369 BlockCache combined = initCombinedBlockCache("LRU"); 370 conf.setBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, true); 371 CacheConfig cacheConfig = new CacheConfig(conf, null, combined, alloc); 372 HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConfig, true, conf); 373 long offset = 0; 374 while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) { 375 BlockCacheKey key = new BlockCacheKey(storeFilePath.getName(), offset); 376 HFileBlock block = reader.readBlock(offset, -1, true, true, false, true, null, null); 377 offset += block.getOnDiskSizeWithHeader(); 378 // Read the cached block. 379 Cacheable cachedBlock = combined.getBlock(key, false, false, true); 380 try { 381 Assert.assertNotNull(cachedBlock); 382 Assert.assertTrue(cachedBlock instanceof HFileBlock); 383 HFileBlock hfb = (HFileBlock) cachedBlock; 384 // Data block will be cached in BucketCache, so it should be an off-heap block. 385 if (hfb.getBlockType().isData()) { 386 Assert.assertTrue(hfb.isSharedMem()); 387 } else { 388 // Non-data block will be cached in LRUBlockCache, so it must be an on-heap block. 389 Assert.assertFalse(hfb.isSharedMem()); 390 } 391 } finally { 392 cachedBlock.release(); 393 } 394 block.release(); // return back the ByteBuffer back to allocator. 395 } 396 reader.close(); 397 combined.shutdown(); 398 Assert.assertEquals(bufCount, alloc.getFreeBufferCount()); 399 alloc.clean(); 400 } 401 402 /** 403 * Tests that we properly allocate from the off-heap or on-heap when LRUCache is configured. In 404 * this case, the determining factor is whether we end up caching the block or not. So the below 405 * test cases try different permutations of enabling/disabling via CacheConfig and via user 406 * request (cacheblocks), along with different expected block types. 407 */ 408 @Test 409 public void testReaderBlockAllocationWithLRUCache() throws IOException { 410 // false because caching is fully enabled 411 testReaderBlockAllocationWithLRUCache(true, true, null, false); 412 // false because we only look at cache config when expectedBlockType is non-null 413 testReaderBlockAllocationWithLRUCache(false, true, null, false); 414 // false because cacheBlock is true and even with cache config is disabled, we still cache 415 // important blocks like indexes 416 testReaderBlockAllocationWithLRUCache(false, true, BlockType.INTERMEDIATE_INDEX, false); 417 // true because since it's a DATA block, we honor the cache config 418 testReaderBlockAllocationWithLRUCache(false, true, BlockType.DATA, true); 419 // true for the following 2 because cacheBlock takes precedence over cache config 420 testReaderBlockAllocationWithLRUCache(true, false, null, true); 421 testReaderBlockAllocationWithLRUCache(true, false, BlockType.INTERMEDIATE_INDEX, false); 422 // false for the following 3 because both cache config and cacheBlock are false. 423 // per above, INDEX would supersede cache config, but not cacheBlock 424 testReaderBlockAllocationWithLRUCache(false, false, null, true); 425 testReaderBlockAllocationWithLRUCache(false, false, BlockType.INTERMEDIATE_INDEX, true); 426 testReaderBlockAllocationWithLRUCache(false, false, BlockType.DATA, true); 427 } 428 429 private void testReaderBlockAllocationWithLRUCache(boolean cacheConfigCacheBlockOnRead, 430 boolean cacheBlock, BlockType blockType, boolean expectSharedMem) throws IOException { 431 int bufCount = 1024, blockSize = 64 * 1024; 432 ByteBuffAllocator alloc = initAllocator(true, blockSize, bufCount, 0); 433 fillByteBuffAllocator(alloc, bufCount); 434 Path storeFilePath = writeStoreFile(); 435 Configuration myConf = new Configuration(conf); 436 437 myConf.setBoolean(CacheConfig.CACHE_DATA_ON_READ_KEY, cacheConfigCacheBlockOnRead); 438 // Open the file reader with LRUBlockCache 439 BlockCache lru = new LruBlockCache(1024 * 1024 * 32, blockSize, true, myConf); 440 CacheConfig cacheConfig = new CacheConfig(myConf, null, lru, alloc); 441 HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConfig, true, myConf); 442 long offset = 0; 443 while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) { 444 long read = readAtOffsetWithAllocationAsserts(alloc, reader, offset, cacheBlock, blockType, 445 expectSharedMem); 446 if (read < 0) { 447 break; 448 } 449 450 offset += read; 451 } 452 453 reader.close(); 454 Assert.assertEquals(bufCount, alloc.getFreeBufferCount()); 455 alloc.clean(); 456 lru.shutdown(); 457 } 458 459 /** 460 * Tests that we properly allocate from the off-heap or on-heap when CombinedCache is configured. 461 * In this case, we should always use off-heap unless the block is an INDEX (which always goes to 462 * L1 cache which is on-heap) 463 */ 464 @Test 465 public void testReaderBlockAllocationWithCombinedCache() throws IOException { 466 // true because caching is fully enabled and block type null 467 testReaderBlockAllocationWithCombinedCache(true, true, null, true); 468 // false because caching is fully enabled, index block type always goes to on-heap L1 469 testReaderBlockAllocationWithCombinedCache(true, true, BlockType.INTERMEDIATE_INDEX, false); 470 // true because cacheBlocks takes precedence over cache config which block type is null 471 testReaderBlockAllocationWithCombinedCache(false, true, null, true); 472 // false because caching is enabled and block type is index, which always goes to L1 473 testReaderBlockAllocationWithCombinedCache(false, true, BlockType.INTERMEDIATE_INDEX, false); 474 // true because since it's a DATA block, we honor the cache config 475 testReaderBlockAllocationWithCombinedCache(false, true, BlockType.DATA, true); 476 // true for the following 2 because cacheBlock takes precedence over cache config 477 // with caching disabled, we always go to off-heap 478 testReaderBlockAllocationWithCombinedCache(true, false, null, true); 479 testReaderBlockAllocationWithCombinedCache(true, false, BlockType.INTERMEDIATE_INDEX, false); 480 // true for the following 3, because with caching disabled we always go to off-heap 481 testReaderBlockAllocationWithCombinedCache(false, false, null, true); 482 testReaderBlockAllocationWithCombinedCache(false, false, BlockType.INTERMEDIATE_INDEX, true); 483 testReaderBlockAllocationWithCombinedCache(false, false, BlockType.DATA, true); 484 } 485 486 private void testReaderBlockAllocationWithCombinedCache(boolean cacheConfigCacheBlockOnRead, 487 boolean cacheBlock, BlockType blockType, boolean expectSharedMem) throws IOException { 488 int bufCount = 1024, blockSize = 64 * 1024; 489 ByteBuffAllocator alloc = initAllocator(true, blockSize, bufCount, 0); 490 fillByteBuffAllocator(alloc, bufCount); 491 Path storeFilePath = writeStoreFile(); 492 // Open the file reader with CombinedBlockCache 493 BlockCache combined = initCombinedBlockCache("LRU"); 494 Configuration myConf = new Configuration(conf); 495 496 myConf.setBoolean(CacheConfig.CACHE_DATA_ON_READ_KEY, cacheConfigCacheBlockOnRead); 497 myConf.setBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, true); 498 499 CacheConfig cacheConfig = new CacheConfig(myConf, null, combined, alloc); 500 HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConfig, true, myConf); 501 long offset = 0; 502 while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) { 503 long read = readAtOffsetWithAllocationAsserts(alloc, reader, offset, cacheBlock, blockType, 504 expectSharedMem); 505 if (read < 0) { 506 break; 507 } 508 509 offset += read; 510 } 511 512 reader.close(); 513 combined.shutdown(); 514 Assert.assertEquals(bufCount, alloc.getFreeBufferCount()); 515 alloc.clean(); 516 } 517 518 private long readAtOffsetWithAllocationAsserts(ByteBuffAllocator alloc, HFile.Reader reader, 519 long offset, boolean cacheBlock, BlockType blockType, boolean expectSharedMem) 520 throws IOException { 521 HFileBlock block; 522 try { 523 block = reader.readBlock(offset, -1, cacheBlock, true, false, true, blockType, null); 524 } catch (IOException e) { 525 if (e.getMessage().contains("Expected block type")) { 526 return -1; 527 } 528 throw e; 529 } 530 531 Assert.assertEquals(expectSharedMem, block.isSharedMem()); 532 533 if (expectSharedMem) { 534 Assert.assertTrue(alloc.getFreeBufferCount() < alloc.getTotalBufferCount()); 535 } else { 536 // Should never allocate off-heap block from allocator because ensure that it's LRU. 537 Assert.assertEquals(alloc.getTotalBufferCount(), alloc.getFreeBufferCount()); 538 } 539 540 try { 541 return block.getOnDiskSizeWithHeader(); 542 } finally { 543 block.release(); // return back the ByteBuffer back to allocator. 544 } 545 } 546 547 private void readStoreFile(Path storeFilePath, Configuration conf, ByteBuffAllocator alloc) 548 throws Exception { 549 // Open the file reader with block cache disabled. 550 CacheConfig cache = new CacheConfig(conf, null, null, alloc); 551 HFile.Reader reader = HFile.createReader(fs, storeFilePath, cache, true, conf); 552 long offset = 0; 553 while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) { 554 HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null, null); 555 offset += block.getOnDiskSizeWithHeader(); 556 block.release(); // return back the ByteBuffer back to allocator. 557 } 558 reader.close(); 559 } 560 561 private Path writeStoreFile() throws IOException { 562 Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), "TestHFile"); 563 HFileContext meta = new HFileContextBuilder().withBlockSize(64 * 1024).build(); 564 StoreFileWriter sfw = new StoreFileWriter.Builder(conf, fs).withOutputDir(storeFileParentDir) 565 .withFileContext(meta).build(); 566 final int rowLen = 32; 567 Random rand = ThreadLocalRandom.current(); 568 for (int i = 0; i < 1000; ++i) { 569 byte[] k = RandomKeyValueUtil.randomOrderedKey(rand, i); 570 byte[] v = RandomKeyValueUtil.randomValue(rand); 571 int cfLen = rand.nextInt(k.length - rowLen + 1); 572 KeyValue kv = new KeyValue(k, 0, rowLen, k, rowLen, cfLen, k, rowLen + cfLen, 573 k.length - rowLen - cfLen, rand.nextLong(), generateKeyType(rand), v, 0, v.length); 574 sfw.append(kv); 575 } 576 577 sfw.close(); 578 return sfw.getPath(); 579 } 580 581 public static KeyValue.Type generateKeyType(Random rand) { 582 if (rand.nextBoolean()) { 583 // Let's make half of KVs puts. 584 return KeyValue.Type.Put; 585 } else { 586 KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)]; 587 if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) { 588 throw new RuntimeException("Generated an invalid key type: " + keyType + ". " 589 + "Probably the layout of KeyValue.Type has changed."); 590 } 591 return keyType; 592 } 593 } 594 595 /** 596 * Test empty HFile. Test all features work reasonably when hfile is empty of entries. 597 */ 598 @Test 599 public void testEmptyHFile() throws IOException { 600 Path f = new Path(ROOT_DIR, testName.getMethodName()); 601 HFileContext context = new HFileContextBuilder().withIncludesTags(false).build(); 602 Writer w = 603 HFile.getWriterFactory(conf, cacheConf).withPath(fs, f).withFileContext(context).create(); 604 w.close(); 605 Reader r = HFile.createReader(fs, f, cacheConf, true, conf); 606 assertFalse(r.getFirstKey().isPresent()); 607 assertFalse(r.getLastKey().isPresent()); 608 } 609 610 /** 611 * Create 0-length hfile and show that it fails 612 */ 613 @Test 614 public void testCorrupt0LengthHFile() throws IOException { 615 Path f = new Path(ROOT_DIR, testName.getMethodName()); 616 FSDataOutputStream fsos = fs.create(f); 617 fsos.close(); 618 619 try { 620 Reader r = HFile.createReader(fs, f, cacheConf, true, conf); 621 } catch (CorruptHFileException | IllegalArgumentException che) { 622 // Expected failure 623 return; 624 } 625 fail("Should have thrown exception"); 626 } 627 628 @Test 629 public void testCorruptOutOfOrderHFileWrite() throws IOException { 630 Path path = new Path(ROOT_DIR, testName.getMethodName()); 631 FSDataOutputStream mockedOutputStream = Mockito.mock(FSDataOutputStream.class); 632 String columnFamily = "MyColumnFamily"; 633 String tableName = "MyTableName"; 634 HFileContext fileContext = 635 new HFileContextBuilder().withHFileName(testName.getMethodName() + "HFile") 636 .withBlockSize(minBlockSize).withColumnFamily(Bytes.toBytes(columnFamily)) 637 .withTableName(Bytes.toBytes(tableName)).withHBaseCheckSum(false) 638 .withCompression(Compression.Algorithm.NONE).withCompressTags(false).build(); 639 HFileWriterImpl writer = 640 new HFileWriterImpl(conf, cacheConf, path, mockedOutputStream, fileContext); 641 ExtendedCellBuilder cellBuilder = 642 ExtendedCellBuilderFactory.create(CellBuilderType.SHALLOW_COPY); 643 byte[] row = Bytes.toBytes("foo"); 644 byte[] qualifier = Bytes.toBytes("qualifier"); 645 byte[] cf = Bytes.toBytes(columnFamily); 646 byte[] val = Bytes.toBytes("fooVal"); 647 long firstTS = 100L; 648 long secondTS = 101L; 649 ExtendedCell firstCell = cellBuilder.setRow(row).setValue(val).setTimestamp(firstTS) 650 .setQualifier(qualifier).setFamily(cf).setType(Cell.Type.Put).build(); 651 ExtendedCell secondCell = cellBuilder.setRow(row).setValue(val).setTimestamp(secondTS) 652 .setQualifier(qualifier).setFamily(cf).setType(Cell.Type.Put).build(); 653 // second Cell will sort "higher" than the first because later timestamps should come first 654 writer.append(firstCell); 655 try { 656 writer.append(secondCell); 657 } catch (IOException ie) { 658 String message = ie.getMessage(); 659 Assert.assertTrue(message.contains("not lexically larger")); 660 Assert.assertTrue(message.contains(tableName)); 661 Assert.assertTrue(message.contains(columnFamily)); 662 return; 663 } 664 Assert.fail("Exception wasn't thrown even though Cells were appended in the wrong order!"); 665 } 666 667 public static void truncateFile(FileSystem fs, Path src, Path dst) throws IOException { 668 FileStatus fst = fs.getFileStatus(src); 669 long len = fst.getLen(); 670 len = len / 2; 671 672 // create a truncated hfile 673 FSDataOutputStream fdos = fs.create(dst); 674 byte[] buf = new byte[(int) len]; 675 FSDataInputStream fdis = fs.open(src); 676 fdis.read(buf); 677 fdos.write(buf); 678 fdis.close(); 679 fdos.close(); 680 } 681 682 /** 683 * Create a truncated hfile and verify that exception thrown. 684 */ 685 @Test 686 public void testCorruptTruncatedHFile() throws IOException { 687 Path f = new Path(ROOT_DIR, testName.getMethodName()); 688 HFileContext context = new HFileContextBuilder().build(); 689 Writer w = HFile.getWriterFactory(conf, cacheConf).withPath(this.fs, f).withFileContext(context) 690 .create(); 691 writeSomeRecords(w, 0, 100, false); 692 w.close(); 693 694 Path trunc = new Path(f.getParent(), "trucated"); 695 truncateFile(fs, w.getPath(), trunc); 696 697 try { 698 HFile.createReader(fs, trunc, cacheConf, true, conf); 699 } catch (CorruptHFileException | IllegalArgumentException che) { 700 // Expected failure 701 return; 702 } 703 fail("Should have thrown exception"); 704 } 705 706 // write some records into the hfile 707 // write them twice 708 private int writeSomeRecords(Writer writer, int start, int n, boolean useTags) 709 throws IOException { 710 String value = "value"; 711 KeyValue kv; 712 for (int i = start; i < (start + n); i++) { 713 String key = String.format(localFormatter, Integer.valueOf(i)); 714 if (useTags) { 715 Tag t = new ArrayBackedTag((byte) 1, "myTag1"); 716 Tag[] tags = new Tag[1]; 717 tags[0] = t; 718 kv = new KeyValue(Bytes.toBytes(key), Bytes.toBytes("family"), Bytes.toBytes("qual"), 719 HConstants.LATEST_TIMESTAMP, Bytes.toBytes(value + key), tags); 720 writer.append(kv); 721 } else { 722 kv = new KeyValue(Bytes.toBytes(key), Bytes.toBytes("family"), Bytes.toBytes("qual"), 723 Bytes.toBytes(value + key)); 724 writer.append(kv); 725 } 726 } 727 return (start + n); 728 } 729 730 private void readAllRecords(HFileScanner scanner) throws IOException { 731 readAndCheckbytes(scanner, 0, 100); 732 } 733 734 // read the records and check 735 private int readAndCheckbytes(HFileScanner scanner, int start, int n) throws IOException { 736 String value = "value"; 737 int i = start; 738 for (; i < (start + n); i++) { 739 ByteBuffer key = ByteBuffer.wrap(((KeyValue) scanner.getKey()).getKey()); 740 ByteBuffer val = scanner.getValue(); 741 String keyStr = String.format(localFormatter, Integer.valueOf(i)); 742 String valStr = value + keyStr; 743 KeyValue kv = new KeyValue(Bytes.toBytes(keyStr), Bytes.toBytes("family"), 744 Bytes.toBytes("qual"), Bytes.toBytes(valStr)); 745 byte[] keyBytes = 746 new KeyValue.KeyOnlyKeyValue(Bytes.toBytes(key), 0, Bytes.toBytes(key).length).getKey(); 747 assertTrue("bytes for keys do not match " + keyStr + " " + Bytes.toString(Bytes.toBytes(key)), 748 Arrays.equals(kv.getKey(), keyBytes)); 749 byte[] valBytes = Bytes.toBytes(val); 750 assertTrue("bytes for vals do not match " + valStr + " " + Bytes.toString(valBytes), 751 Arrays.equals(Bytes.toBytes(valStr), valBytes)); 752 if (!scanner.next()) { 753 break; 754 } 755 } 756 assertEquals(i, start + n - 1); 757 return (start + n); 758 } 759 760 private byte[] getSomeKey(int rowId) { 761 KeyValue kv = new KeyValue(Bytes.toBytes(String.format(localFormatter, Integer.valueOf(rowId))), 762 Bytes.toBytes("family"), Bytes.toBytes("qual"), HConstants.LATEST_TIMESTAMP, Type.Put); 763 return kv.getKey(); 764 } 765 766 private void writeRecords(Writer writer, boolean useTags) throws IOException { 767 writeSomeRecords(writer, 0, 100, useTags); 768 writer.close(); 769 } 770 771 private FSDataOutputStream createFSOutput(Path name) throws IOException { 772 // if (fs.exists(name)) fs.delete(name, true); 773 FSDataOutputStream fout = fs.create(name); 774 return fout; 775 } 776 777 /** 778 * test none codecs 779 */ 780 void basicWithSomeCodec(String codec, boolean useTags) throws IOException { 781 if (useTags) { 782 conf.setInt("hfile.format.version", 3); 783 } 784 Path ncHFile = new Path(ROOT_DIR, "basic.hfile." + codec.toString() + useTags); 785 FSDataOutputStream fout = createFSOutput(ncHFile); 786 HFileContext meta = new HFileContextBuilder().withBlockSize(minBlockSize) 787 .withCompression(HFileWriterImpl.compressionByName(codec)).build(); 788 Writer writer = 789 HFile.getWriterFactory(conf, cacheConf).withOutputStream(fout).withFileContext(meta).create(); 790 LOG.info(Objects.toString(writer)); 791 writeRecords(writer, useTags); 792 fout.close(); 793 FSDataInputStream fin = fs.open(ncHFile); 794 ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, ncHFile).build(); 795 Reader reader = createReaderFromStream(context, cacheConf, conf); 796 System.out.println(cacheConf.toString()); 797 // Load up the index. 798 // Get a scanner that caches and that does not use pread. 799 HFileScanner scanner = reader.getScanner(conf, true, false); 800 // Align scanner at start of the file. 801 scanner.seekTo(); 802 readAllRecords(scanner); 803 int seekTo = scanner.seekTo(KeyValueUtil.createKeyValueFromKey(getSomeKey(50))); 804 System.out.println(seekTo); 805 assertTrue("location lookup failed", 806 scanner.seekTo(KeyValueUtil.createKeyValueFromKey(getSomeKey(50))) == 0); 807 // read the key and see if it matches 808 ByteBuffer readKey = ByteBuffer.wrap(((KeyValue) scanner.getKey()).getKey()); 809 assertTrue("seeked key does not match", Arrays.equals(getSomeKey(50), Bytes.toBytes(readKey))); 810 811 scanner.seekTo(KeyValueUtil.createKeyValueFromKey(getSomeKey(0))); 812 ByteBuffer val1 = scanner.getValue(); 813 scanner.seekTo(KeyValueUtil.createKeyValueFromKey(getSomeKey(0))); 814 ByteBuffer val2 = scanner.getValue(); 815 assertTrue(Arrays.equals(Bytes.toBytes(val1), Bytes.toBytes(val2))); 816 817 reader.close(); 818 fin.close(); 819 fs.delete(ncHFile, true); 820 } 821 822 @Test 823 public void testTFileFeatures() throws IOException { 824 testHFilefeaturesInternals(false); 825 testHFilefeaturesInternals(true); 826 } 827 828 protected void testHFilefeaturesInternals(boolean useTags) throws IOException { 829 basicWithSomeCodec("none", useTags); 830 basicWithSomeCodec("gz", useTags); 831 } 832 833 private void writeNumMetablocks(Writer writer, int n) { 834 for (int i = 0; i < n; i++) { 835 writer.appendMetaBlock("HFileMeta" + i, new Writable() { 836 private int val; 837 838 public Writable setVal(int val) { 839 this.val = val; 840 return this; 841 } 842 843 @Override 844 public void write(DataOutput out) throws IOException { 845 out.write(Bytes.toBytes("something to test" + val)); 846 } 847 848 @Override 849 public void readFields(DataInput in) throws IOException { 850 } 851 }.setVal(i)); 852 } 853 } 854 855 private void someTestingWithMetaBlock(Writer writer) { 856 writeNumMetablocks(writer, 10); 857 } 858 859 private void readNumMetablocks(Reader reader, int n) throws IOException { 860 for (int i = 0; i < n; i++) { 861 ByteBuff actual = reader.getMetaBlock("HFileMeta" + i, false).getBufferWithoutHeader(); 862 ByteBuffer expected = ByteBuffer.wrap(Bytes.toBytes("something to test" + i)); 863 assertEquals("failed to match metadata", Bytes.toStringBinary(expected), Bytes.toStringBinary( 864 actual.array(), actual.arrayOffset() + actual.position(), actual.capacity())); 865 } 866 } 867 868 private void someReadingWithMetaBlock(Reader reader) throws IOException { 869 readNumMetablocks(reader, 10); 870 } 871 872 private void metablocks(final String compress) throws Exception { 873 Path mFile = new Path(ROOT_DIR, "meta.hfile"); 874 FSDataOutputStream fout = createFSOutput(mFile); 875 HFileContext meta = 876 new HFileContextBuilder().withCompression(HFileWriterImpl.compressionByName(compress)) 877 .withBlockSize(minBlockSize).build(); 878 Writer writer = 879 HFile.getWriterFactory(conf, cacheConf).withOutputStream(fout).withFileContext(meta).create(); 880 someTestingWithMetaBlock(writer); 881 writer.close(); 882 fout.close(); 883 ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, mFile).build(); 884 Reader reader = createReaderFromStream(context, cacheConf, conf); 885 // No data -- this should return false. 886 assertFalse(reader.getScanner(conf, false, false).seekTo()); 887 someReadingWithMetaBlock(reader); 888 fs.delete(mFile, true); 889 reader.close(); 890 } 891 892 // test meta blocks for hfiles 893 @Test 894 public void testMetaBlocks() throws Exception { 895 metablocks("none"); 896 metablocks("gz"); 897 } 898 899 @Test 900 public void testNullMetaBlocks() throws Exception { 901 for (Compression.Algorithm compressAlgo : HBaseCommonTestingUtil.COMPRESSION_ALGORITHMS) { 902 Path mFile = new Path(ROOT_DIR, "nometa_" + compressAlgo + ".hfile"); 903 FSDataOutputStream fout = createFSOutput(mFile); 904 HFileContext meta = 905 new HFileContextBuilder().withCompression(compressAlgo).withBlockSize(minBlockSize).build(); 906 Writer writer = HFile.getWriterFactory(conf, cacheConf).withOutputStream(fout) 907 .withFileContext(meta).create(); 908 KeyValue kv = 909 new KeyValue(Bytes.toBytes("foo"), Bytes.toBytes("f1"), null, Bytes.toBytes("value")); 910 writer.append(kv); 911 writer.close(); 912 fout.close(); 913 Reader reader = HFile.createReader(fs, mFile, cacheConf, true, conf); 914 assertNull(reader.getMetaBlock("non-existant", false)); 915 } 916 } 917 918 /** 919 * Make sure the ordinals for our compression algorithms do not change on us. 920 */ 921 @Test 922 public void testCompressionOrdinance() { 923 assertTrue(Compression.Algorithm.LZO.ordinal() == 0); 924 assertTrue(Compression.Algorithm.GZ.ordinal() == 1); 925 assertTrue(Compression.Algorithm.NONE.ordinal() == 2); 926 assertTrue(Compression.Algorithm.SNAPPY.ordinal() == 3); 927 assertTrue(Compression.Algorithm.LZ4.ordinal() == 4); 928 } 929 930 @Test 931 public void testShortMidpointSameQual() { 932 ExtendedCell left = 933 ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(Bytes.toBytes("a")) 934 .setFamily(Bytes.toBytes("a")).setQualifier(Bytes.toBytes("a")).setTimestamp(11) 935 .setType(Type.Maximum.getCode()).setValue(HConstants.EMPTY_BYTE_ARRAY).build(); 936 ExtendedCell right = 937 ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(Bytes.toBytes("a")) 938 .setFamily(Bytes.toBytes("a")).setQualifier(Bytes.toBytes("a")).setTimestamp(9) 939 .setType(Type.Maximum.getCode()).setValue(HConstants.EMPTY_BYTE_ARRAY).build(); 940 ExtendedCell mid = HFileWriterImpl.getMidpoint(CellComparatorImpl.COMPARATOR, left, right); 941 assertTrue( 942 PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, left, mid) <= 0); 943 assertTrue( 944 PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, mid, right) == 0); 945 } 946 947 private ExtendedCell getCell(byte[] row, byte[] family, byte[] qualifier) { 948 return ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row) 949 .setFamily(family).setQualifier(qualifier).setTimestamp(HConstants.LATEST_TIMESTAMP) 950 .setType(KeyValue.Type.Maximum.getCode()).setValue(HConstants.EMPTY_BYTE_ARRAY).build(); 951 } 952 953 @Test 954 public void testGetShortMidpoint() { 955 ExtendedCell left = getCell(Bytes.toBytes("a"), Bytes.toBytes("a"), Bytes.toBytes("a")); 956 ExtendedCell right = getCell(Bytes.toBytes("a"), Bytes.toBytes("a"), Bytes.toBytes("a")); 957 ExtendedCell mid = HFileWriterImpl.getMidpoint(CellComparatorImpl.COMPARATOR, left, right); 958 assertTrue( 959 PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, left, mid) <= 0); 960 assertTrue( 961 PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, mid, right) <= 0); 962 left = getCell(Bytes.toBytes("a"), Bytes.toBytes("a"), Bytes.toBytes("a")); 963 right = getCell(Bytes.toBytes("b"), Bytes.toBytes("a"), Bytes.toBytes("a")); 964 mid = HFileWriterImpl.getMidpoint(CellComparatorImpl.COMPARATOR, left, right); 965 assertTrue(PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, left, mid) < 0); 966 assertTrue( 967 PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, mid, right) <= 0); 968 left = getCell(Bytes.toBytes("g"), Bytes.toBytes("a"), Bytes.toBytes("a")); 969 right = getCell(Bytes.toBytes("i"), Bytes.toBytes("a"), Bytes.toBytes("a")); 970 mid = HFileWriterImpl.getMidpoint(CellComparatorImpl.COMPARATOR, left, right); 971 assertTrue(PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, left, mid) < 0); 972 assertTrue( 973 PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, mid, right) <= 0); 974 left = getCell(Bytes.toBytes("a"), Bytes.toBytes("a"), Bytes.toBytes("a")); 975 right = getCell(Bytes.toBytes("bbbbbbb"), Bytes.toBytes("a"), Bytes.toBytes("a")); 976 mid = HFileWriterImpl.getMidpoint(CellComparatorImpl.COMPARATOR, left, right); 977 assertTrue(PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, left, mid) < 0); 978 assertTrue( 979 PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, mid, right) < 0); 980 assertEquals(1, mid.getRowLength()); 981 left = getCell(Bytes.toBytes("a"), Bytes.toBytes("a"), Bytes.toBytes("a")); 982 right = getCell(Bytes.toBytes("b"), Bytes.toBytes("a"), Bytes.toBytes("a")); 983 mid = HFileWriterImpl.getMidpoint(CellComparatorImpl.COMPARATOR, left, right); 984 assertTrue(PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, left, mid) < 0); 985 assertTrue( 986 PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, mid, right) <= 0); 987 left = getCell(Bytes.toBytes("a"), Bytes.toBytes("a"), Bytes.toBytes("a")); 988 right = getCell(Bytes.toBytes("a"), Bytes.toBytes("aaaaaaaa"), Bytes.toBytes("b")); 989 mid = HFileWriterImpl.getMidpoint(CellComparatorImpl.COMPARATOR, left, right); 990 assertTrue(PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, left, mid) < 0); 991 assertTrue( 992 PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, mid, right) < 0); 993 assertEquals(2, mid.getFamilyLength()); 994 left = getCell(Bytes.toBytes("a"), Bytes.toBytes("a"), Bytes.toBytes("a")); 995 right = getCell(Bytes.toBytes("a"), Bytes.toBytes("a"), Bytes.toBytes("aaaaaaaaa")); 996 mid = HFileWriterImpl.getMidpoint(CellComparatorImpl.COMPARATOR, left, right); 997 assertTrue(PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, left, mid) < 0); 998 assertTrue( 999 PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, mid, right) < 0); 1000 assertEquals(2, mid.getQualifierLength()); 1001 left = getCell(Bytes.toBytes("a"), Bytes.toBytes("a"), Bytes.toBytes("a")); 1002 right = getCell(Bytes.toBytes("a"), Bytes.toBytes("a"), Bytes.toBytes("b")); 1003 mid = HFileWriterImpl.getMidpoint(CellComparatorImpl.COMPARATOR, left, right); 1004 assertTrue(PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, left, mid) < 0); 1005 assertTrue( 1006 PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, mid, right) <= 0); 1007 assertEquals(1, mid.getQualifierLength()); 1008 1009 // Verify boundary conditions 1010 left = getCell(Bytes.toBytes("a"), Bytes.toBytes("a"), new byte[] { 0x00, (byte) 0xFE }); 1011 right = getCell(Bytes.toBytes("a"), Bytes.toBytes("a"), new byte[] { 0x00, (byte) 0xFF }); 1012 mid = HFileWriterImpl.getMidpoint(CellComparatorImpl.COMPARATOR, left, right); 1013 assertTrue(PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, left, mid) < 0); 1014 assertTrue( 1015 PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, mid, right) == 0); 1016 left = getCell(Bytes.toBytes("a"), Bytes.toBytes("a"), new byte[] { 0x00, 0x12 }); 1017 right = getCell(Bytes.toBytes("a"), Bytes.toBytes("a"), new byte[] { 0x00, 0x12, 0x00 }); 1018 mid = HFileWriterImpl.getMidpoint(CellComparatorImpl.COMPARATOR, left, right); 1019 assertTrue(PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, left, mid) < 0); 1020 assertTrue( 1021 PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, mid, right) == 0); 1022 1023 // Assert that if meta comparator, it returns the right cell -- i.e. no 1024 // optimization done. 1025 left = getCell(Bytes.toBytes("g"), Bytes.toBytes("a"), Bytes.toBytes("a")); 1026 right = getCell(Bytes.toBytes("i"), Bytes.toBytes("a"), Bytes.toBytes("a")); 1027 mid = HFileWriterImpl.getMidpoint(MetaCellComparator.META_COMPARATOR, left, right); 1028 assertTrue(PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, left, mid) < 0); 1029 assertTrue( 1030 PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, mid, right) == 0); 1031 byte[] family = Bytes.toBytes("family"); 1032 byte[] qualA = Bytes.toBytes("qfA"); 1033 byte[] qualB = Bytes.toBytes("qfB"); 1034 final CellComparatorImpl keyComparator = CellComparatorImpl.COMPARATOR; 1035 // verify that faked shorter rowkey could be generated 1036 long ts = 5; 1037 KeyValue kv1 = new KeyValue(Bytes.toBytes("the quick brown fox"), family, qualA, ts, Type.Put); 1038 KeyValue kv2 = new KeyValue(Bytes.toBytes("the who test text"), family, qualA, ts, Type.Put); 1039 ExtendedCell newKey = HFileWriterImpl.getMidpoint(keyComparator, kv1, kv2); 1040 assertTrue(keyComparator.compare(kv1, newKey) < 0); 1041 assertTrue((keyComparator.compare(kv2, newKey)) > 0); 1042 byte[] expectedArray = Bytes.toBytes("the r"); 1043 Bytes.equals(newKey.getRowArray(), newKey.getRowOffset(), newKey.getRowLength(), expectedArray, 1044 0, expectedArray.length); 1045 1046 // verify: same with "row + family + qualifier", return rightKey directly 1047 kv1 = new KeyValue(Bytes.toBytes("ilovehbase"), family, qualA, 5, Type.Put); 1048 kv2 = new KeyValue(Bytes.toBytes("ilovehbase"), family, qualA, 0, Type.Put); 1049 assertTrue(keyComparator.compare(kv1, kv2) < 0); 1050 newKey = HFileWriterImpl.getMidpoint(keyComparator, kv1, kv2); 1051 assertTrue(keyComparator.compare(kv1, newKey) < 0); 1052 assertTrue((keyComparator.compare(kv2, newKey)) == 0); 1053 kv1 = new KeyValue(Bytes.toBytes("ilovehbase"), family, qualA, -5, Type.Put); 1054 kv2 = new KeyValue(Bytes.toBytes("ilovehbase"), family, qualA, -10, Type.Put); 1055 assertTrue(keyComparator.compare(kv1, kv2) < 0); 1056 newKey = HFileWriterImpl.getMidpoint(keyComparator, kv1, kv2); 1057 assertTrue(keyComparator.compare(kv1, newKey) < 0); 1058 assertTrue((keyComparator.compare(kv2, newKey)) == 0); 1059 1060 // verify: same with row, different with qualifier 1061 kv1 = new KeyValue(Bytes.toBytes("ilovehbase"), family, qualA, 5, Type.Put); 1062 kv2 = new KeyValue(Bytes.toBytes("ilovehbase"), family, qualB, 5, Type.Put); 1063 assertTrue(keyComparator.compare(kv1, kv2) < 0); 1064 newKey = HFileWriterImpl.getMidpoint(keyComparator, kv1, kv2); 1065 assertTrue(keyComparator.compare(kv1, newKey) < 0); 1066 assertTrue((keyComparator.compare(kv2, newKey)) > 0); 1067 assertTrue(Arrays.equals(CellUtil.cloneFamily(newKey), family)); 1068 assertTrue(Arrays.equals(CellUtil.cloneQualifier(newKey), qualB)); 1069 assertTrue(newKey.getTimestamp() == HConstants.LATEST_TIMESTAMP); 1070 assertTrue(newKey.getTypeByte() == Type.Maximum.getCode()); 1071 1072 // verify metaKeyComparator's getShortMidpointKey output 1073 final CellComparatorImpl metaKeyComparator = MetaCellComparator.META_COMPARATOR; 1074 kv1 = new KeyValue(Bytes.toBytes("ilovehbase123"), family, qualA, 5, Type.Put); 1075 kv2 = new KeyValue(Bytes.toBytes("ilovehbase234"), family, qualA, 0, Type.Put); 1076 newKey = HFileWriterImpl.getMidpoint(metaKeyComparator, kv1, kv2); 1077 assertTrue(metaKeyComparator.compare(kv1, newKey) < 0); 1078 assertTrue((metaKeyComparator.compare(kv2, newKey) == 0)); 1079 1080 // verify common fix scenario 1081 kv1 = new KeyValue(Bytes.toBytes("ilovehbase"), family, qualA, ts, Type.Put); 1082 kv2 = new KeyValue(Bytes.toBytes("ilovehbaseandhdfs"), family, qualA, ts, Type.Put); 1083 assertTrue(keyComparator.compare(kv1, kv2) < 0); 1084 newKey = HFileWriterImpl.getMidpoint(keyComparator, kv1, kv2); 1085 assertTrue(keyComparator.compare(kv1, newKey) < 0); 1086 assertTrue((keyComparator.compare(kv2, newKey)) > 0); 1087 expectedArray = Bytes.toBytes("ilovehbasea"); 1088 Bytes.equals(newKey.getRowArray(), newKey.getRowOffset(), newKey.getRowLength(), expectedArray, 1089 0, expectedArray.length); 1090 // verify only 1 offset scenario 1091 kv1 = new KeyValue(Bytes.toBytes("100abcdefg"), family, qualA, ts, Type.Put); 1092 kv2 = new KeyValue(Bytes.toBytes("101abcdefg"), family, qualA, ts, Type.Put); 1093 assertTrue(keyComparator.compare(kv1, kv2) < 0); 1094 newKey = HFileWriterImpl.getMidpoint(keyComparator, kv1, kv2); 1095 assertTrue(keyComparator.compare(kv1, newKey) < 0); 1096 assertTrue((keyComparator.compare(kv2, newKey)) > 0); 1097 expectedArray = Bytes.toBytes("101"); 1098 Bytes.equals(newKey.getRowArray(), newKey.getRowOffset(), newKey.getRowLength(), expectedArray, 1099 0, expectedArray.length); 1100 } 1101 1102 @Test 1103 public void testDBEShipped() throws IOException { 1104 for (DataBlockEncoding encoding : DataBlockEncoding.values()) { 1105 DataBlockEncoder encoder = encoding.getEncoder(); 1106 if (encoder == null) { 1107 continue; 1108 } 1109 Path f = new Path(ROOT_DIR, testName.getMethodName() + "_" + encoding); 1110 HFileContext context = 1111 new HFileContextBuilder().withIncludesTags(false).withDataBlockEncoding(encoding).build(); 1112 HFileWriterImpl writer = (HFileWriterImpl) HFile.getWriterFactory(conf, cacheConf) 1113 .withPath(fs, f).withFileContext(context).create(); 1114 1115 KeyValue kv = new KeyValue(Bytes.toBytes("testkey1"), Bytes.toBytes("family"), 1116 Bytes.toBytes("qual"), Bytes.toBytes("testvalue")); 1117 KeyValue kv2 = new KeyValue(Bytes.toBytes("testkey2"), Bytes.toBytes("family"), 1118 Bytes.toBytes("qual"), Bytes.toBytes("testvalue")); 1119 KeyValue kv3 = new KeyValue(Bytes.toBytes("testkey3"), Bytes.toBytes("family"), 1120 Bytes.toBytes("qual"), Bytes.toBytes("testvalue")); 1121 1122 ByteBuffer buffer = ByteBuffer.wrap(kv.getBuffer()); 1123 ByteBuffer buffer2 = ByteBuffer.wrap(kv2.getBuffer()); 1124 ByteBuffer buffer3 = ByteBuffer.wrap(kv3.getBuffer()); 1125 1126 writer.append(new ByteBufferKeyValue(buffer, 0, buffer.remaining())); 1127 writer.beforeShipped(); 1128 1129 // pollute first cell's backing ByteBuffer 1130 ByteBufferUtils.copyFromBufferToBuffer(buffer3, buffer); 1131 1132 // write another cell, if DBE not Shipped, test will fail 1133 writer.append(new ByteBufferKeyValue(buffer2, 0, buffer2.remaining())); 1134 writer.close(); 1135 } 1136 } 1137 1138 /** 1139 * Test case for CombinedBlockCache with TinyLfu as L1 cache 1140 */ 1141 @Test 1142 public void testReaderWithTinyLfuCombinedBlockCache() throws Exception { 1143 testReaderCombinedCache("TinyLfu"); 1144 } 1145 1146 /** 1147 * Test case for CombinedBlockCache with AdaptiveLRU as L1 cache 1148 */ 1149 @Test 1150 public void testReaderWithAdaptiveLruCombinedBlockCache() throws Exception { 1151 testReaderCombinedCache("AdaptiveLRU"); 1152 } 1153 1154 /** 1155 * Test case for CombinedBlockCache with AdaptiveLRU as L1 cache 1156 */ 1157 @Test 1158 public void testReaderWithLruCombinedBlockCache() throws Exception { 1159 testReaderCombinedCache("LRU"); 1160 } 1161 1162 private void testReaderCombinedCache(final String l1CachePolicy) throws Exception { 1163 int bufCount = 1024; 1164 int blockSize = 64 * 1024; 1165 ByteBuffAllocator alloc = initAllocator(true, bufCount, blockSize, 0); 1166 fillByteBuffAllocator(alloc, bufCount); 1167 Path storeFilePath = writeStoreFile(); 1168 // Open the file reader with CombinedBlockCache 1169 BlockCache combined = initCombinedBlockCache(l1CachePolicy); 1170 conf.setBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, true); 1171 CacheConfig cacheConfig = new CacheConfig(conf, null, combined, alloc); 1172 HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConfig, true, conf); 1173 long offset = 0; 1174 Cacheable cachedBlock = null; 1175 while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) { 1176 BlockCacheKey key = new BlockCacheKey(storeFilePath.getName(), offset); 1177 HFileBlock block = reader.readBlock(offset, -1, true, true, false, true, null, null); 1178 offset += block.getOnDiskSizeWithHeader(); 1179 // Read the cached block. 1180 cachedBlock = combined.getBlock(key, false, false, true); 1181 try { 1182 Assert.assertNotNull(cachedBlock); 1183 Assert.assertTrue(cachedBlock instanceof HFileBlock); 1184 HFileBlock hfb = (HFileBlock) cachedBlock; 1185 // Data block will be cached in BucketCache, so it should be an off-heap block. 1186 if (hfb.getBlockType().isData()) { 1187 Assert.assertTrue(hfb.isSharedMem()); 1188 } else if (!l1CachePolicy.equals("TinyLfu")) { 1189 Assert.assertFalse(hfb.isSharedMem()); 1190 } 1191 } finally { 1192 cachedBlock.release(); 1193 } 1194 block.release(); // return back the ByteBuffer back to allocator. 1195 } 1196 reader.close(); 1197 combined.shutdown(); 1198 if (cachedBlock != null) { 1199 Assert.assertEquals(0, cachedBlock.refCnt()); 1200 } 1201 Assert.assertEquals(bufCount, alloc.getFreeBufferCount()); 1202 alloc.clean(); 1203 } 1204 1205 @Test 1206 public void testHFileContextBuilderWithIndexEncoding() throws IOException { 1207 HFileContext context = 1208 new HFileContextBuilder().withIndexBlockEncoding(IndexBlockEncoding.PREFIX_TREE).build(); 1209 HFileContext newContext = new HFileContextBuilder(context).build(); 1210 assertTrue(newContext.getIndexBlockEncoding() == IndexBlockEncoding.PREFIX_TREE); 1211 } 1212}