001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY; 021import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY; 022import static org.apache.hadoop.hbase.io.ByteBuffAllocator.BUFFER_SIZE_KEY; 023import static org.apache.hadoop.hbase.io.ByteBuffAllocator.MAX_BUFFER_COUNT_KEY; 024import static org.apache.hadoop.hbase.io.ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY; 025import static org.apache.hadoop.hbase.io.hfile.BlockCacheFactory.BLOCKCACHE_POLICY_KEY; 026import static org.apache.hadoop.hbase.io.hfile.CacheConfig.EVICT_BLOCKS_ON_CLOSE_KEY; 027import static org.junit.jupiter.api.Assertions.assertEquals; 028import static org.junit.jupiter.api.Assertions.assertFalse; 029import static org.junit.jupiter.api.Assertions.assertNotNull; 030import static org.junit.jupiter.api.Assertions.assertNull; 031import static org.junit.jupiter.api.Assertions.assertTrue; 032import static org.junit.jupiter.api.Assertions.fail; 033 034import java.io.DataInput; 035import java.io.DataOutput; 036import java.io.IOException; 037import java.nio.ByteBuffer; 038import java.util.ArrayList; 039import java.util.Arrays; 040import java.util.List; 041import java.util.Objects; 042import java.util.Optional; 043import java.util.Random; 044import java.util.concurrent.ThreadLocalRandom; 045import java.util.concurrent.atomic.AtomicInteger; 046import org.apache.hadoop.conf.Configuration; 047import org.apache.hadoop.fs.FSDataInputStream; 048import org.apache.hadoop.fs.FSDataOutputStream; 049import org.apache.hadoop.fs.FileStatus; 050import org.apache.hadoop.fs.FileSystem; 051import org.apache.hadoop.fs.Path; 052import org.apache.hadoop.hbase.ArrayBackedTag; 053import org.apache.hadoop.hbase.ByteBufferKeyValue; 054import org.apache.hadoop.hbase.Cell; 055import org.apache.hadoop.hbase.CellBuilderType; 056import org.apache.hadoop.hbase.CellComparatorImpl; 057import org.apache.hadoop.hbase.CellUtil; 058import org.apache.hadoop.hbase.ExtendedCell; 059import org.apache.hadoop.hbase.ExtendedCellBuilder; 060import org.apache.hadoop.hbase.ExtendedCellBuilderFactory; 061import org.apache.hadoop.hbase.HBaseCommonTestingUtil; 062import org.apache.hadoop.hbase.HBaseConfiguration; 063import org.apache.hadoop.hbase.HBaseTestingUtil; 064import org.apache.hadoop.hbase.HConstants; 065import org.apache.hadoop.hbase.KeyValue; 066import org.apache.hadoop.hbase.KeyValue.Type; 067import org.apache.hadoop.hbase.KeyValueUtil; 068import org.apache.hadoop.hbase.MetaCellComparator; 069import org.apache.hadoop.hbase.PrivateCellUtil; 070import org.apache.hadoop.hbase.Tag; 071import org.apache.hadoop.hbase.io.ByteBuffAllocator; 072import org.apache.hadoop.hbase.io.compress.Compression; 073import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder; 074import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 075import org.apache.hadoop.hbase.io.encoding.IndexBlockEncoding; 076import org.apache.hadoop.hbase.io.hfile.HFile.Reader; 077import org.apache.hadoop.hbase.io.hfile.HFile.Writer; 078import org.apache.hadoop.hbase.io.hfile.ReaderContext.ReaderType; 079import org.apache.hadoop.hbase.monitoring.ThreadLocalServerSideScanMetrics; 080import org.apache.hadoop.hbase.nio.ByteBuff; 081import org.apache.hadoop.hbase.nio.RefCnt; 082import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 083import org.apache.hadoop.hbase.testclassification.IOTests; 084import org.apache.hadoop.hbase.testclassification.SmallTests; 085import org.apache.hadoop.hbase.util.ByteBufferUtils; 086import org.apache.hadoop.hbase.util.Bytes; 087import org.apache.hadoop.io.Writable; 088import org.junit.jupiter.api.BeforeAll; 089import org.junit.jupiter.api.Test; 090import org.junit.jupiter.api.TestInfo; 091import org.mockito.Mockito; 092import org.slf4j.Logger; 093import org.slf4j.LoggerFactory; 094 095import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetector; 096 097/** 098 * test hfile features. 099 */ 100@org.junit.jupiter.api.Tag(IOTests.TAG) 101@org.junit.jupiter.api.Tag(SmallTests.TAG) 102public class TestHFile { 103 104 private static final Logger LOG = LoggerFactory.getLogger(TestHFile.class); 105 private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length - 2; 106 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 107 private static String ROOT_DIR = TEST_UTIL.getDataTestDir("TestHFile").toString(); 108 private final int minBlockSize = 512; 109 private static String localFormatter = "%010d"; 110 private static CacheConfig cacheConf; 111 private static Configuration conf; 112 private static FileSystem fs; 113 114 @BeforeAll 115 public static void setUp() throws Exception { 116 conf = TEST_UTIL.getConfiguration(); 117 cacheConf = new CacheConfig(conf); 118 fs = TEST_UTIL.getTestFileSystem(); 119 } 120 121 public static Reader createReaderFromStream(ReaderContext context, CacheConfig cacheConf, 122 Configuration conf) throws IOException { 123 HFileInfo fileInfo = new HFileInfo(context, conf); 124 Reader preadReader = HFile.createReader(context, fileInfo, cacheConf, conf); 125 fileInfo.initMetaAndIndex(preadReader); 126 preadReader.close(); 127 context = new ReaderContextBuilder() 128 .withFileSystemAndPath(context.getFileSystem(), context.getFilePath()) 129 .withReaderType(ReaderType.STREAM).build(); 130 Reader streamReader = HFile.createReader(context, fileInfo, cacheConf, conf); 131 return streamReader; 132 } 133 134 private ByteBuffAllocator initAllocator(boolean reservoirEnabled, int bufSize, int bufCount, 135 int minAllocSize) { 136 Configuration that = HBaseConfiguration.create(conf); 137 that.setInt(BUFFER_SIZE_KEY, bufSize); 138 that.setInt(MAX_BUFFER_COUNT_KEY, bufCount); 139 // All ByteBuffers will be allocated from the buffers. 140 that.setInt(MIN_ALLOCATE_SIZE_KEY, minAllocSize); 141 return ByteBuffAllocator.create(that, reservoirEnabled); 142 } 143 144 private void fillByteBuffAllocator(ByteBuffAllocator alloc, int bufCount) { 145 // Fill the allocator with bufCount ByteBuffer 146 List<ByteBuff> buffs = new ArrayList<>(); 147 for (int i = 0; i < bufCount; i++) { 148 buffs.add(alloc.allocateOneBuffer()); 149 assertEquals(alloc.getFreeBufferCount(), 0); 150 } 151 buffs.forEach(ByteBuff::release); 152 assertEquals(alloc.getFreeBufferCount(), bufCount); 153 } 154 155 @Test 156 public void testReaderWithoutBlockCache() throws Exception { 157 int bufCount = 32; 158 // AllByteBuffers will be allocated from the buffers. 159 ByteBuffAllocator alloc = initAllocator(true, 64 * 1024, bufCount, 0); 160 fillByteBuffAllocator(alloc, bufCount); 161 // start write to store file. 162 Path path = writeStoreFile(); 163 readStoreFile(path, conf, alloc); 164 assertEquals(bufCount, alloc.getFreeBufferCount()); 165 alloc.clean(); 166 } 167 168 /** 169 * Test case for HBASE-22127 in LruBlockCache. 170 */ 171 @Test 172 public void testReaderWithLRUBlockCache() throws Exception { 173 int bufCount = 1024, blockSize = 64 * 1024; 174 ByteBuffAllocator alloc = initAllocator(true, bufCount, blockSize, 0); 175 fillByteBuffAllocator(alloc, bufCount); 176 Path storeFilePath = writeStoreFile(); 177 // Open the file reader with LRUBlockCache 178 BlockCache lru = new LruBlockCache(1024 * 1024 * 32, blockSize, true, conf); 179 CacheConfig cacheConfig = new CacheConfig(conf, null, lru, alloc); 180 HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConfig, true, conf); 181 long offset = 0; 182 while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) { 183 BlockCacheKey key = new BlockCacheKey(storeFilePath.getName(), offset); 184 HFileBlock block = reader.readBlock(offset, -1, true, true, false, true, null, null); 185 offset += block.getOnDiskSizeWithHeader(); 186 // Ensure the block is an heap one. 187 Cacheable cachedBlock = lru.getBlock(key, false, false, true); 188 assertNotNull(cachedBlock); 189 assertTrue(cachedBlock instanceof HFileBlock); 190 assertFalse(((HFileBlock) cachedBlock).isSharedMem()); 191 // Should never allocate off-heap block from allocator because ensure that it's LRU. 192 assertEquals(bufCount, alloc.getFreeBufferCount()); 193 block.release(); // return back the ByteBuffer back to allocator. 194 } 195 reader.close(); 196 assertEquals(bufCount, alloc.getFreeBufferCount()); 197 alloc.clean(); 198 lru.shutdown(); 199 } 200 201 @Test 202 public void testWriterCacheOnWriteSkipDoesNotLeak() throws Exception { 203 int bufCount = 32; 204 int blockSize = 4 * 1024; 205 ByteBuffAllocator alloc = initAllocator(true, blockSize, bufCount, 0); 206 fillByteBuffAllocator(alloc, bufCount); 207 ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID); 208 Configuration myConf = HBaseConfiguration.create(conf); 209 myConf.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, true); 210 myConf.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, false); 211 myConf.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, false); 212 final AtomicInteger counter = new AtomicInteger(); 213 RefCnt.detector.setLeakListener(new ResourceLeakDetector.LeakListener() { 214 @Override 215 public void onLeak(String s, String s1) { 216 counter.incrementAndGet(); 217 } 218 }); 219 BlockCache cache = Mockito.mock(BlockCache.class); 220 Mockito.when(cache.shouldCacheBlock(Mockito.any(), Mockito.anyLong(), Mockito.any())) 221 .thenReturn(Optional.of(false)); 222 Path hfilePath = new Path(TEST_UTIL.getDataTestDir(), "testWriterCacheOnWriteSkipDoesNotLeak"); 223 HFileContext context = new HFileContextBuilder().withBlockSize(blockSize).build(); 224 225 try { 226 Writer writer = new HFile.WriterFactory(myConf, new CacheConfig(myConf, null, cache, alloc)) 227 .withPath(fs, hfilePath).withFileContext(context).create(); 228 try { 229 writer.append(new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("cf"), Bytes.toBytes("q"), 230 HConstants.LATEST_TIMESTAMP, Bytes.toBytes("value"))); 231 } finally { 232 writer.close(); 233 } 234 235 Mockito.verify(cache).shouldCacheBlock(Mockito.any(), Mockito.anyLong(), Mockito.any()); 236 Mockito.verify(cache, Mockito.never()).cacheBlock(Mockito.any(), Mockito.any(), 237 Mockito.anyBoolean(), Mockito.anyBoolean()); 238 for (int i = 0; i < 30 && counter.get() == 0; i++) { 239 System.gc(); 240 try { 241 Thread.sleep(1000); 242 } catch (InterruptedException e) { 243 Thread.currentThread().interrupt(); 244 break; 245 } 246 alloc.allocate(128 * 1024).release(); 247 } 248 assertEquals(0, counter.get()); 249 } finally { 250 fs.delete(hfilePath, false); 251 alloc.clean(); 252 } 253 } 254 255 private void assertBytesReadFromCache(boolean isScanMetricsEnabled) throws Exception { 256 assertBytesReadFromCache(isScanMetricsEnabled, DataBlockEncoding.NONE); 257 } 258 259 private void assertBytesReadFromCache(boolean isScanMetricsEnabled, DataBlockEncoding encoding) 260 throws Exception { 261 // Write a store file 262 Path storeFilePath = writeStoreFile(); 263 264 // Initialize the block cache and HFile reader 265 BlockCache lru = BlockCacheFactory.createBlockCache(conf); 266 assertTrue(lru instanceof LruBlockCache); 267 CacheConfig cacheConfig = new CacheConfig(conf, null, lru, ByteBuffAllocator.HEAP); 268 HFileReaderImpl reader = 269 (HFileReaderImpl) HFile.createReader(fs, storeFilePath, cacheConfig, true, conf); 270 271 // Read the first block in HFile from the block cache. 272 final int offset = 0; 273 BlockCacheKey cacheKey = new BlockCacheKey(storeFilePath.getName(), offset); 274 HFileBlock block = (HFileBlock) lru.getBlock(cacheKey, false, false, true); 275 assertNull(block); 276 277 // Assert that first block has not been cached in the block cache and no disk I/O happened to 278 // check that. 279 ThreadLocalServerSideScanMetrics.getBytesReadFromBlockCacheAndReset(); 280 ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset(); 281 block = reader.getCachedBlock(cacheKey, false, false, true, BlockType.DATA, null); 282 assertEquals(0, ThreadLocalServerSideScanMetrics.getBytesReadFromBlockCacheAndReset()); 283 assertEquals(0, ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset()); 284 285 // Read the first block from the HFile. 286 block = reader.readBlock(offset, -1, true, true, false, true, BlockType.DATA, null); 287 assertNotNull(block); 288 int bytesReadFromFs = block.getOnDiskSizeWithHeader(); 289 if (block.getNextBlockOnDiskSize() > 0) { 290 bytesReadFromFs += block.headerSize(); 291 } 292 block.release(); 293 // Assert that disk I/O happened to read the first block. 294 assertEquals(isScanMetricsEnabled ? bytesReadFromFs : 0, 295 ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset()); 296 assertEquals(0, ThreadLocalServerSideScanMetrics.getBytesReadFromBlockCacheAndReset()); 297 298 // Read the first block again and assert that it has been cached in the block cache. 299 block = reader.getCachedBlock(cacheKey, false, false, true, BlockType.DATA, encoding); 300 long bytesReadFromCache = 0; 301 if (encoding == DataBlockEncoding.NONE) { 302 assertNotNull(block); 303 bytesReadFromCache = block.getOnDiskSizeWithHeader(); 304 if (block.getNextBlockOnDiskSize() > 0) { 305 bytesReadFromCache += block.headerSize(); 306 } 307 block.release(); 308 // Assert that bytes read from block cache account for same number of bytes that would have 309 // been read from FS if block cache wasn't there. 310 assertEquals(bytesReadFromFs, bytesReadFromCache); 311 } else { 312 assertNull(block); 313 } 314 assertEquals(isScanMetricsEnabled ? bytesReadFromCache : 0, 315 ThreadLocalServerSideScanMetrics.getBytesReadFromBlockCacheAndReset()); 316 assertEquals(0, ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset()); 317 318 reader.close(); 319 } 320 321 @Test 322 public void testBytesReadFromCache() throws Exception { 323 ThreadLocalServerSideScanMetrics.setScanMetricsEnabled(true); 324 assertBytesReadFromCache(true); 325 } 326 327 @Test 328 public void testBytesReadFromCacheWithScanMetricsDisabled() throws Exception { 329 ThreadLocalServerSideScanMetrics.setScanMetricsEnabled(false); 330 assertBytesReadFromCache(false); 331 } 332 333 @Test 334 public void testBytesReadFromCacheWithInvalidDataEncoding() throws Exception { 335 ThreadLocalServerSideScanMetrics.setScanMetricsEnabled(true); 336 assertBytesReadFromCache(true, DataBlockEncoding.FAST_DIFF); 337 } 338 339 private BlockCache initCombinedBlockCache(final String l1CachePolicy) { 340 Configuration that = HBaseConfiguration.create(conf); 341 that.setFloat(BUCKET_CACHE_SIZE_KEY, 32); // 32MB for bucket cache. 342 that.set(BUCKET_CACHE_IOENGINE_KEY, "offheap"); 343 that.set(BLOCKCACHE_POLICY_KEY, l1CachePolicy); 344 BlockCache bc = BlockCacheFactory.createBlockCache(that); 345 assertNotNull(bc); 346 assertTrue(bc instanceof CombinedBlockCache); 347 return bc; 348 } 349 350 /** 351 * Test case for HBASE-22127 in CombinedBlockCache 352 */ 353 @Test 354 public void testReaderWithCombinedBlockCache() throws Exception { 355 int bufCount = 1024, blockSize = 64 * 1024; 356 ByteBuffAllocator alloc = initAllocator(true, bufCount, blockSize, 0); 357 fillByteBuffAllocator(alloc, bufCount); 358 Path storeFilePath = writeStoreFile(); 359 // Open the file reader with CombinedBlockCache 360 BlockCache combined = initCombinedBlockCache("LRU"); 361 conf.setBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, true); 362 CacheConfig cacheConfig = new CacheConfig(conf, null, combined, alloc); 363 HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConfig, true, conf); 364 long offset = 0; 365 while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) { 366 BlockCacheKey key = new BlockCacheKey(storeFilePath.getName(), offset); 367 HFileBlock block = reader.readBlock(offset, -1, true, true, false, true, null, null); 368 offset += block.getOnDiskSizeWithHeader(); 369 // Read the cached block. 370 Cacheable cachedBlock = combined.getBlock(key, false, false, true); 371 try { 372 assertNotNull(cachedBlock); 373 assertTrue(cachedBlock instanceof HFileBlock); 374 HFileBlock hfb = (HFileBlock) cachedBlock; 375 // Data block will be cached in BucketCache, so it should be an off-heap block. 376 if (hfb.getBlockType().isData()) { 377 assertTrue(hfb.isSharedMem()); 378 } else { 379 // Non-data block will be cached in LRUBlockCache, so it must be an on-heap block. 380 assertFalse(hfb.isSharedMem()); 381 } 382 } finally { 383 cachedBlock.release(); 384 } 385 block.release(); // return back the ByteBuffer back to allocator. 386 } 387 reader.close(); 388 combined.shutdown(); 389 assertEquals(bufCount, alloc.getFreeBufferCount()); 390 alloc.clean(); 391 } 392 393 /** 394 * Tests that we properly allocate from the off-heap or on-heap when LRUCache is configured. In 395 * this case, the determining factor is whether we end up caching the block or not. So the below 396 * test cases try different permutations of enabling/disabling via CacheConfig and via user 397 * request (cacheblocks), along with different expected block types. 398 */ 399 @Test 400 public void testReaderBlockAllocationWithLRUCache() throws IOException { 401 // false because caching is fully enabled 402 testReaderBlockAllocationWithLRUCache(true, true, null, false); 403 // false because we only look at cache config when expectedBlockType is non-null 404 testReaderBlockAllocationWithLRUCache(false, true, null, false); 405 // false because cacheBlock is true and even with cache config is disabled, we still cache 406 // important blocks like indexes 407 testReaderBlockAllocationWithLRUCache(false, true, BlockType.INTERMEDIATE_INDEX, false); 408 // true because since it's a DATA block, we honor the cache config 409 testReaderBlockAllocationWithLRUCache(false, true, BlockType.DATA, true); 410 // true for the following 2 because cacheBlock takes precedence over cache config 411 testReaderBlockAllocationWithLRUCache(true, false, null, true); 412 testReaderBlockAllocationWithLRUCache(true, false, BlockType.INTERMEDIATE_INDEX, false); 413 // false for the following 3 because both cache config and cacheBlock are false. 414 // per above, INDEX would supersede cache config, but not cacheBlock 415 testReaderBlockAllocationWithLRUCache(false, false, null, true); 416 testReaderBlockAllocationWithLRUCache(false, false, BlockType.INTERMEDIATE_INDEX, true); 417 testReaderBlockAllocationWithLRUCache(false, false, BlockType.DATA, true); 418 } 419 420 private void testReaderBlockAllocationWithLRUCache(boolean cacheConfigCacheBlockOnRead, 421 boolean cacheBlock, BlockType blockType, boolean expectSharedMem) throws IOException { 422 int bufCount = 1024, blockSize = 64 * 1024; 423 ByteBuffAllocator alloc = initAllocator(true, blockSize, bufCount, 0); 424 fillByteBuffAllocator(alloc, bufCount); 425 Path storeFilePath = writeStoreFile(); 426 Configuration myConf = new Configuration(conf); 427 428 myConf.setBoolean(CacheConfig.CACHE_DATA_ON_READ_KEY, cacheConfigCacheBlockOnRead); 429 // Open the file reader with LRUBlockCache 430 BlockCache lru = new LruBlockCache(1024 * 1024 * 32, blockSize, true, myConf); 431 CacheConfig cacheConfig = new CacheConfig(myConf, null, lru, alloc); 432 HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConfig, true, myConf); 433 long offset = 0; 434 while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) { 435 long read = readAtOffsetWithAllocationAsserts(alloc, reader, offset, cacheBlock, blockType, 436 expectSharedMem); 437 if (read < 0) { 438 break; 439 } 440 441 offset += read; 442 } 443 444 reader.close(); 445 assertEquals(bufCount, alloc.getFreeBufferCount()); 446 alloc.clean(); 447 lru.shutdown(); 448 } 449 450 /** 451 * Tests that we properly allocate from the off-heap or on-heap when CombinedCache is configured. 452 * In this case, we should always use off-heap unless the block is an INDEX (which always goes to 453 * L1 cache which is on-heap) 454 */ 455 @Test 456 public void testReaderBlockAllocationWithCombinedCache() throws IOException { 457 // true because caching is fully enabled and block type null 458 testReaderBlockAllocationWithCombinedCache(true, true, null, true); 459 // false because caching is fully enabled, index block type always goes to on-heap L1 460 testReaderBlockAllocationWithCombinedCache(true, true, BlockType.INTERMEDIATE_INDEX, false); 461 // true because cacheBlocks takes precedence over cache config which block type is null 462 testReaderBlockAllocationWithCombinedCache(false, true, null, true); 463 // false because caching is enabled and block type is index, which always goes to L1 464 testReaderBlockAllocationWithCombinedCache(false, true, BlockType.INTERMEDIATE_INDEX, false); 465 // true because since it's a DATA block, we honor the cache config 466 testReaderBlockAllocationWithCombinedCache(false, true, BlockType.DATA, true); 467 // true for the following 2 because cacheBlock takes precedence over cache config 468 // with caching disabled, we always go to off-heap 469 testReaderBlockAllocationWithCombinedCache(true, false, null, true); 470 testReaderBlockAllocationWithCombinedCache(true, false, BlockType.INTERMEDIATE_INDEX, false); 471 // true for the following 3, because with caching disabled we always go to off-heap 472 testReaderBlockAllocationWithCombinedCache(false, false, null, true); 473 testReaderBlockAllocationWithCombinedCache(false, false, BlockType.INTERMEDIATE_INDEX, true); 474 testReaderBlockAllocationWithCombinedCache(false, false, BlockType.DATA, true); 475 } 476 477 private void testReaderBlockAllocationWithCombinedCache(boolean cacheConfigCacheBlockOnRead, 478 boolean cacheBlock, BlockType blockType, boolean expectSharedMem) throws IOException { 479 int bufCount = 1024, blockSize = 64 * 1024; 480 ByteBuffAllocator alloc = initAllocator(true, blockSize, bufCount, 0); 481 fillByteBuffAllocator(alloc, bufCount); 482 Path storeFilePath = writeStoreFile(); 483 // Open the file reader with CombinedBlockCache 484 BlockCache combined = initCombinedBlockCache("LRU"); 485 Configuration myConf = new Configuration(conf); 486 487 myConf.setBoolean(CacheConfig.CACHE_DATA_ON_READ_KEY, cacheConfigCacheBlockOnRead); 488 myConf.setBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, true); 489 490 CacheConfig cacheConfig = new CacheConfig(myConf, null, combined, alloc); 491 HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConfig, true, myConf); 492 long offset = 0; 493 while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) { 494 long read = readAtOffsetWithAllocationAsserts(alloc, reader, offset, cacheBlock, blockType, 495 expectSharedMem); 496 if (read < 0) { 497 break; 498 } 499 500 offset += read; 501 } 502 503 reader.close(); 504 combined.shutdown(); 505 assertEquals(bufCount, alloc.getFreeBufferCount()); 506 alloc.clean(); 507 } 508 509 private long readAtOffsetWithAllocationAsserts(ByteBuffAllocator alloc, HFile.Reader reader, 510 long offset, boolean cacheBlock, BlockType blockType, boolean expectSharedMem) 511 throws IOException { 512 HFileBlock block; 513 try { 514 block = reader.readBlock(offset, -1, cacheBlock, true, false, true, blockType, null); 515 } catch (IOException e) { 516 if (e.getMessage().contains("Expected block type")) { 517 return -1; 518 } 519 throw e; 520 } 521 522 assertEquals(expectSharedMem, block.isSharedMem()); 523 524 if (expectSharedMem) { 525 assertTrue(alloc.getFreeBufferCount() < alloc.getTotalBufferCount()); 526 } else { 527 // Should never allocate off-heap block from allocator because ensure that it's LRU. 528 assertEquals(alloc.getTotalBufferCount(), alloc.getFreeBufferCount()); 529 } 530 531 try { 532 return block.getOnDiskSizeWithHeader(); 533 } finally { 534 block.release(); // return back the ByteBuffer back to allocator. 535 } 536 } 537 538 private void readStoreFile(Path storeFilePath, Configuration conf, ByteBuffAllocator alloc) 539 throws Exception { 540 // Open the file reader with block cache disabled. 541 CacheConfig cache = new CacheConfig(conf, null, null, alloc); 542 HFile.Reader reader = HFile.createReader(fs, storeFilePath, cache, true, conf); 543 long offset = 0; 544 while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) { 545 HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null, null); 546 offset += block.getOnDiskSizeWithHeader(); 547 block.release(); // return back the ByteBuffer back to allocator. 548 } 549 reader.close(); 550 } 551 552 private Path writeStoreFile() throws IOException { 553 Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), "TestHFile"); 554 HFileContext meta = new HFileContextBuilder().withBlockSize(64 * 1024).build(); 555 StoreFileWriter sfw = new StoreFileWriter.Builder(conf, fs).withOutputDir(storeFileParentDir) 556 .withFileContext(meta).build(); 557 final int rowLen = 32; 558 Random rand = ThreadLocalRandom.current(); 559 for (int i = 0; i < 1000; ++i) { 560 byte[] k = RandomKeyValueUtil.randomOrderedKey(rand, i); 561 byte[] v = RandomKeyValueUtil.randomValue(rand); 562 int cfLen = rand.nextInt(k.length - rowLen + 1); 563 KeyValue kv = new KeyValue(k, 0, rowLen, k, rowLen, cfLen, k, rowLen + cfLen, 564 k.length - rowLen - cfLen, rand.nextLong(), generateKeyType(rand), v, 0, v.length); 565 sfw.append(kv); 566 } 567 568 sfw.close(); 569 return sfw.getPath(); 570 } 571 572 public static KeyValue.Type generateKeyType(Random rand) { 573 if (rand.nextBoolean()) { 574 // Let's make half of KVs puts. 575 return KeyValue.Type.Put; 576 } else { 577 KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)]; 578 if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) { 579 throw new RuntimeException("Generated an invalid key type: " + keyType + ". " 580 + "Probably the layout of KeyValue.Type has changed."); 581 } 582 return keyType; 583 } 584 } 585 586 /** 587 * Test empty HFile. Test all features work reasonably when hfile is empty of entries. 588 */ 589 @Test 590 public void testEmptyHFile(TestInfo testInfo) throws IOException { 591 Path f = new Path(ROOT_DIR, testInfo.getTestMethod().get().getName()); 592 HFileContext context = new HFileContextBuilder().withIncludesTags(false).build(); 593 Writer w = 594 HFile.getWriterFactory(conf, cacheConf).withPath(fs, f).withFileContext(context).create(); 595 w.close(); 596 Reader r = HFile.createReader(fs, f, cacheConf, true, conf); 597 assertFalse(r.getFirstKey().isPresent()); 598 assertFalse(r.getLastKey().isPresent()); 599 } 600 601 /** 602 * Create 0-length hfile and show that it fails 603 */ 604 @Test 605 public void testCorrupt0LengthHFile(TestInfo testInfo) throws IOException { 606 Path f = new Path(ROOT_DIR, testInfo.getTestMethod().get().getName()); 607 FSDataOutputStream fsos = fs.create(f); 608 fsos.close(); 609 610 try { 611 Reader r = HFile.createReader(fs, f, cacheConf, true, conf); 612 } catch (CorruptHFileException | IllegalArgumentException che) { 613 // Expected failure 614 return; 615 } 616 fail("Should have thrown exception"); 617 } 618 619 @Test 620 public void testCorruptOutOfOrderHFileWrite(TestInfo testInfo) throws IOException { 621 Path path = new Path(ROOT_DIR, testInfo.getTestMethod().get().getName()); 622 FSDataOutputStream mockedOutputStream = Mockito.mock(FSDataOutputStream.class); 623 String columnFamily = "MyColumnFamily"; 624 String tableName = "MyTableName"; 625 HFileContext fileContext = 626 new HFileContextBuilder().withHFileName(testInfo.getTestMethod().get().getName() + "HFile") 627 .withBlockSize(minBlockSize).withColumnFamily(Bytes.toBytes(columnFamily)) 628 .withTableName(Bytes.toBytes(tableName)).withHBaseCheckSum(false) 629 .withCompression(Compression.Algorithm.NONE).withCompressTags(false).build(); 630 HFileWriterImpl writer = 631 new HFileWriterImpl(conf, cacheConf, path, mockedOutputStream, fileContext); 632 ExtendedCellBuilder cellBuilder = 633 ExtendedCellBuilderFactory.create(CellBuilderType.SHALLOW_COPY); 634 byte[] row = Bytes.toBytes("foo"); 635 byte[] qualifier = Bytes.toBytes("qualifier"); 636 byte[] cf = Bytes.toBytes(columnFamily); 637 byte[] val = Bytes.toBytes("fooVal"); 638 long firstTS = 100L; 639 long secondTS = 101L; 640 ExtendedCell firstCell = cellBuilder.setRow(row).setValue(val).setTimestamp(firstTS) 641 .setQualifier(qualifier).setFamily(cf).setType(Cell.Type.Put).build(); 642 ExtendedCell secondCell = cellBuilder.setRow(row).setValue(val).setTimestamp(secondTS) 643 .setQualifier(qualifier).setFamily(cf).setType(Cell.Type.Put).build(); 644 // second Cell will sort "higher" than the first because later timestamps should come first 645 writer.append(firstCell); 646 try { 647 writer.append(secondCell); 648 } catch (IOException ie) { 649 String message = ie.getMessage(); 650 assertTrue(message.contains("not lexically larger")); 651 assertTrue(message.contains(tableName)); 652 assertTrue(message.contains(columnFamily)); 653 return; 654 } 655 fail("Exception wasn't thrown even though Cells were appended in the wrong order!"); 656 } 657 658 public static void truncateFile(FileSystem fs, Path src, Path dst) throws IOException { 659 FileStatus fst = fs.getFileStatus(src); 660 long len = fst.getLen(); 661 len = len / 2; 662 663 // create a truncated hfile 664 FSDataOutputStream fdos = fs.create(dst); 665 byte[] buf = new byte[(int) len]; 666 FSDataInputStream fdis = fs.open(src); 667 fdis.read(buf); 668 fdos.write(buf); 669 fdis.close(); 670 fdos.close(); 671 } 672 673 /** 674 * Create a truncated hfile and verify that exception thrown. 675 */ 676 @Test 677 public void testCorruptTruncatedHFile(TestInfo testInfo) throws IOException { 678 Path f = new Path(ROOT_DIR, testInfo.getTestMethod().get().getName()); 679 HFileContext context = new HFileContextBuilder().build(); 680 Writer w = HFile.getWriterFactory(conf, cacheConf).withPath(this.fs, f).withFileContext(context) 681 .create(); 682 writeSomeRecords(w, 0, 100, false); 683 w.close(); 684 685 Path trunc = new Path(f.getParent(), "trucated"); 686 truncateFile(fs, w.getPath(), trunc); 687 688 try { 689 HFile.createReader(fs, trunc, cacheConf, true, conf); 690 } catch (CorruptHFileException | IllegalArgumentException che) { 691 // Expected failure 692 return; 693 } 694 fail("Should have thrown exception"); 695 } 696 697 // write some records into the hfile 698 // write them twice 699 private int writeSomeRecords(Writer writer, int start, int n, boolean useTags) 700 throws IOException { 701 String value = "value"; 702 KeyValue kv; 703 for (int i = start; i < (start + n); i++) { 704 String key = String.format(localFormatter, Integer.valueOf(i)); 705 if (useTags) { 706 Tag t = new ArrayBackedTag((byte) 1, "myTag1"); 707 Tag[] tags = new Tag[1]; 708 tags[0] = t; 709 kv = new KeyValue(Bytes.toBytes(key), Bytes.toBytes("family"), Bytes.toBytes("qual"), 710 HConstants.LATEST_TIMESTAMP, Bytes.toBytes(value + key), tags); 711 writer.append(kv); 712 } else { 713 kv = new KeyValue(Bytes.toBytes(key), Bytes.toBytes("family"), Bytes.toBytes("qual"), 714 Bytes.toBytes(value + key)); 715 writer.append(kv); 716 } 717 } 718 return (start + n); 719 } 720 721 private void readAllRecords(HFileScanner scanner) throws IOException { 722 readAndCheckbytes(scanner, 0, 100); 723 } 724 725 // read the records and check 726 private int readAndCheckbytes(HFileScanner scanner, int start, int n) throws IOException { 727 String value = "value"; 728 int i = start; 729 for (; i < (start + n); i++) { 730 ByteBuffer key = ByteBuffer.wrap(((KeyValue) scanner.getKey()).getKey()); 731 ByteBuffer val = scanner.getValue(); 732 String keyStr = String.format(localFormatter, Integer.valueOf(i)); 733 String valStr = value + keyStr; 734 KeyValue kv = new KeyValue(Bytes.toBytes(keyStr), Bytes.toBytes("family"), 735 Bytes.toBytes("qual"), Bytes.toBytes(valStr)); 736 byte[] keyBytes = 737 new KeyValue.KeyOnlyKeyValue(Bytes.toBytes(key), 0, Bytes.toBytes(key).length).getKey(); 738 assertTrue(Arrays.equals(kv.getKey(), keyBytes), 739 "bytes for keys do not match " + keyStr + " " + Bytes.toString(Bytes.toBytes(key))); 740 byte[] valBytes = Bytes.toBytes(val); 741 assertTrue(Arrays.equals(Bytes.toBytes(valStr), valBytes), 742 "bytes for vals do not match " + valStr + " " + Bytes.toString(valBytes)); 743 if (!scanner.next()) { 744 break; 745 } 746 } 747 assertEquals(i, start + n - 1); 748 return (start + n); 749 } 750 751 private byte[] getSomeKey(int rowId) { 752 KeyValue kv = new KeyValue(Bytes.toBytes(String.format(localFormatter, Integer.valueOf(rowId))), 753 Bytes.toBytes("family"), Bytes.toBytes("qual"), HConstants.LATEST_TIMESTAMP, Type.Put); 754 return kv.getKey(); 755 } 756 757 private void writeRecords(Writer writer, boolean useTags) throws IOException { 758 writeSomeRecords(writer, 0, 100, useTags); 759 writer.close(); 760 } 761 762 private FSDataOutputStream createFSOutput(Path name) throws IOException { 763 // if (fs.exists(name)) fs.delete(name, true); 764 FSDataOutputStream fout = fs.create(name); 765 return fout; 766 } 767 768 /** 769 * test none codecs 770 */ 771 void basicWithSomeCodec(String codec, boolean useTags) throws IOException { 772 if (useTags) { 773 conf.setInt("hfile.format.version", 3); 774 } 775 Path ncHFile = new Path(ROOT_DIR, "basic.hfile." + codec.toString() + useTags); 776 FSDataOutputStream fout = createFSOutput(ncHFile); 777 HFileContext meta = new HFileContextBuilder().withBlockSize(minBlockSize) 778 .withCompression(HFileWriterImpl.compressionByName(codec)).build(); 779 Writer writer = 780 HFile.getWriterFactory(conf, cacheConf).withOutputStream(fout).withFileContext(meta).create(); 781 LOG.info(Objects.toString(writer)); 782 writeRecords(writer, useTags); 783 fout.close(); 784 FSDataInputStream fin = fs.open(ncHFile); 785 ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, ncHFile).build(); 786 Reader reader = createReaderFromStream(context, cacheConf, conf); 787 LOG.info(cacheConf.toString()); 788 // Load up the index. 789 // Get a scanner that caches and that does not use pread. 790 HFileScanner scanner = reader.getScanner(conf, true, false); 791 // Align scanner at start of the file. 792 scanner.seekTo(); 793 readAllRecords(scanner); 794 int seekTo = scanner.seekTo(KeyValueUtil.createKeyValueFromKey(getSomeKey(50))); 795 LOG.info("" + seekTo); 796 assertTrue(scanner.seekTo(KeyValueUtil.createKeyValueFromKey(getSomeKey(50))) == 0, 797 "location lookup failed"); 798 // read the key and see if it matches 799 ByteBuffer readKey = ByteBuffer.wrap(((KeyValue) scanner.getKey()).getKey()); 800 assertTrue(Arrays.equals(getSomeKey(50), Bytes.toBytes(readKey)), "seeked key does not match"); 801 802 scanner.seekTo(KeyValueUtil.createKeyValueFromKey(getSomeKey(0))); 803 ByteBuffer val1 = scanner.getValue(); 804 scanner.seekTo(KeyValueUtil.createKeyValueFromKey(getSomeKey(0))); 805 ByteBuffer val2 = scanner.getValue(); 806 assertTrue(Arrays.equals(Bytes.toBytes(val1), Bytes.toBytes(val2))); 807 808 reader.close(); 809 fin.close(); 810 fs.delete(ncHFile, true); 811 } 812 813 @Test 814 public void testTFileFeatures() throws IOException { 815 testHFilefeaturesInternals(false); 816 testHFilefeaturesInternals(true); 817 } 818 819 protected void testHFilefeaturesInternals(boolean useTags) throws IOException { 820 basicWithSomeCodec("none", useTags); 821 basicWithSomeCodec("gz", useTags); 822 } 823 824 private void writeNumMetablocks(Writer writer, int n) { 825 for (int i = 0; i < n; i++) { 826 writer.appendMetaBlock("HFileMeta" + i, new Writable() { 827 private int val; 828 829 public Writable setVal(int val) { 830 this.val = val; 831 return this; 832 } 833 834 @Override 835 public void write(DataOutput out) throws IOException { 836 out.write(Bytes.toBytes("something to test" + val)); 837 } 838 839 @Override 840 public void readFields(DataInput in) throws IOException { 841 } 842 }.setVal(i)); 843 } 844 } 845 846 private void someTestingWithMetaBlock(Writer writer) { 847 writeNumMetablocks(writer, 10); 848 } 849 850 private void readNumMetablocks(Reader reader, int n) throws IOException { 851 for (int i = 0; i < n; i++) { 852 ByteBuff actual = reader.getMetaBlock("HFileMeta" + i, false).getBufferWithoutHeader(); 853 ByteBuffer expected = ByteBuffer.wrap(Bytes.toBytes("something to test" + i)); 854 assertEquals(Bytes.toStringBinary(expected), Bytes.toStringBinary(actual.array(), 855 actual.arrayOffset() + actual.position(), actual.capacity()), "failed to match metadata"); 856 } 857 } 858 859 private void someReadingWithMetaBlock(Reader reader) throws IOException { 860 readNumMetablocks(reader, 10); 861 } 862 863 private void metablocks(final String compress) throws Exception { 864 Path mFile = new Path(ROOT_DIR, "meta.hfile"); 865 FSDataOutputStream fout = createFSOutput(mFile); 866 HFileContext meta = 867 new HFileContextBuilder().withCompression(HFileWriterImpl.compressionByName(compress)) 868 .withBlockSize(minBlockSize).build(); 869 Writer writer = 870 HFile.getWriterFactory(conf, cacheConf).withOutputStream(fout).withFileContext(meta).create(); 871 someTestingWithMetaBlock(writer); 872 writer.close(); 873 fout.close(); 874 ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, mFile).build(); 875 Reader reader = createReaderFromStream(context, cacheConf, conf); 876 // No data -- this should return false. 877 assertFalse(reader.getScanner(conf, false, false).seekTo()); 878 someReadingWithMetaBlock(reader); 879 fs.delete(mFile, true); 880 reader.close(); 881 } 882 883 // test meta blocks for hfiles 884 @Test 885 public void testMetaBlocks() throws Exception { 886 metablocks("none"); 887 metablocks("gz"); 888 } 889 890 @Test 891 public void testNullMetaBlocks() throws Exception { 892 for (Compression.Algorithm compressAlgo : HBaseCommonTestingUtil.COMPRESSION_ALGORITHMS) { 893 Path mFile = new Path(ROOT_DIR, "nometa_" + compressAlgo + ".hfile"); 894 FSDataOutputStream fout = createFSOutput(mFile); 895 HFileContext meta = 896 new HFileContextBuilder().withCompression(compressAlgo).withBlockSize(minBlockSize).build(); 897 Writer writer = HFile.getWriterFactory(conf, cacheConf).withOutputStream(fout) 898 .withFileContext(meta).create(); 899 KeyValue kv = 900 new KeyValue(Bytes.toBytes("foo"), Bytes.toBytes("f1"), null, Bytes.toBytes("value")); 901 writer.append(kv); 902 writer.close(); 903 fout.close(); 904 Reader reader = HFile.createReader(fs, mFile, cacheConf, true, conf); 905 assertNull(reader.getMetaBlock("non-existant", false)); 906 } 907 } 908 909 /** 910 * Make sure the ordinals for our compression algorithms do not change on us. 911 */ 912 @Test 913 public void testCompressionOrdinance() { 914 assertTrue(Compression.Algorithm.LZO.ordinal() == 0); 915 assertTrue(Compression.Algorithm.GZ.ordinal() == 1); 916 assertTrue(Compression.Algorithm.NONE.ordinal() == 2); 917 assertTrue(Compression.Algorithm.SNAPPY.ordinal() == 3); 918 assertTrue(Compression.Algorithm.LZ4.ordinal() == 4); 919 } 920 921 @Test 922 public void testShortMidpointSameQual() { 923 ExtendedCell left = 924 ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(Bytes.toBytes("a")) 925 .setFamily(Bytes.toBytes("a")).setQualifier(Bytes.toBytes("a")).setTimestamp(11) 926 .setType(Type.Maximum.getCode()).setValue(HConstants.EMPTY_BYTE_ARRAY).build(); 927 ExtendedCell right = 928 ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(Bytes.toBytes("a")) 929 .setFamily(Bytes.toBytes("a")).setQualifier(Bytes.toBytes("a")).setTimestamp(9) 930 .setType(Type.Maximum.getCode()).setValue(HConstants.EMPTY_BYTE_ARRAY).build(); 931 ExtendedCell mid = HFileWriterImpl.getMidpoint(CellComparatorImpl.COMPARATOR, left, right); 932 assertTrue( 933 PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, left, mid) <= 0); 934 assertTrue( 935 PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, mid, right) == 0); 936 } 937 938 private ExtendedCell getCell(byte[] row, byte[] family, byte[] qualifier) { 939 return ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row) 940 .setFamily(family).setQualifier(qualifier).setTimestamp(HConstants.LATEST_TIMESTAMP) 941 .setType(KeyValue.Type.Maximum.getCode()).setValue(HConstants.EMPTY_BYTE_ARRAY).build(); 942 } 943 944 @Test 945 public void testGetShortMidpoint() { 946 ExtendedCell left = getCell(Bytes.toBytes("a"), Bytes.toBytes("a"), Bytes.toBytes("a")); 947 ExtendedCell right = getCell(Bytes.toBytes("a"), Bytes.toBytes("a"), Bytes.toBytes("a")); 948 ExtendedCell mid = HFileWriterImpl.getMidpoint(CellComparatorImpl.COMPARATOR, left, right); 949 assertTrue( 950 PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, left, mid) <= 0); 951 assertTrue( 952 PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, mid, right) <= 0); 953 left = getCell(Bytes.toBytes("a"), Bytes.toBytes("a"), Bytes.toBytes("a")); 954 right = getCell(Bytes.toBytes("b"), Bytes.toBytes("a"), Bytes.toBytes("a")); 955 mid = HFileWriterImpl.getMidpoint(CellComparatorImpl.COMPARATOR, left, right); 956 assertTrue(PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, left, mid) < 0); 957 assertTrue( 958 PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, mid, right) <= 0); 959 left = getCell(Bytes.toBytes("g"), Bytes.toBytes("a"), Bytes.toBytes("a")); 960 right = getCell(Bytes.toBytes("i"), Bytes.toBytes("a"), Bytes.toBytes("a")); 961 mid = HFileWriterImpl.getMidpoint(CellComparatorImpl.COMPARATOR, left, right); 962 assertTrue(PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, left, mid) < 0); 963 assertTrue( 964 PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, mid, right) <= 0); 965 left = getCell(Bytes.toBytes("a"), Bytes.toBytes("a"), Bytes.toBytes("a")); 966 right = getCell(Bytes.toBytes("bbbbbbb"), Bytes.toBytes("a"), Bytes.toBytes("a")); 967 mid = HFileWriterImpl.getMidpoint(CellComparatorImpl.COMPARATOR, left, right); 968 assertTrue(PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, left, mid) < 0); 969 assertTrue( 970 PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, mid, right) < 0); 971 assertEquals(1, mid.getRowLength()); 972 left = getCell(Bytes.toBytes("a"), Bytes.toBytes("a"), Bytes.toBytes("a")); 973 right = getCell(Bytes.toBytes("b"), Bytes.toBytes("a"), Bytes.toBytes("a")); 974 mid = HFileWriterImpl.getMidpoint(CellComparatorImpl.COMPARATOR, left, right); 975 assertTrue(PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, left, mid) < 0); 976 assertTrue( 977 PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, mid, right) <= 0); 978 left = getCell(Bytes.toBytes("a"), Bytes.toBytes("a"), Bytes.toBytes("a")); 979 right = getCell(Bytes.toBytes("a"), Bytes.toBytes("aaaaaaaa"), Bytes.toBytes("b")); 980 mid = HFileWriterImpl.getMidpoint(CellComparatorImpl.COMPARATOR, left, right); 981 assertTrue(PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, left, mid) < 0); 982 assertTrue( 983 PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, mid, right) < 0); 984 assertEquals(2, mid.getFamilyLength()); 985 left = getCell(Bytes.toBytes("a"), Bytes.toBytes("a"), Bytes.toBytes("a")); 986 right = getCell(Bytes.toBytes("a"), Bytes.toBytes("a"), Bytes.toBytes("aaaaaaaaa")); 987 mid = HFileWriterImpl.getMidpoint(CellComparatorImpl.COMPARATOR, left, right); 988 assertTrue(PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, left, mid) < 0); 989 assertTrue( 990 PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, mid, right) < 0); 991 assertEquals(2, mid.getQualifierLength()); 992 left = getCell(Bytes.toBytes("a"), Bytes.toBytes("a"), Bytes.toBytes("a")); 993 right = getCell(Bytes.toBytes("a"), Bytes.toBytes("a"), Bytes.toBytes("b")); 994 mid = HFileWriterImpl.getMidpoint(CellComparatorImpl.COMPARATOR, left, right); 995 assertTrue(PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, left, mid) < 0); 996 assertTrue( 997 PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, mid, right) <= 0); 998 assertEquals(1, mid.getQualifierLength()); 999 1000 // Verify boundary conditions 1001 left = getCell(Bytes.toBytes("a"), Bytes.toBytes("a"), new byte[] { 0x00, (byte) 0xFE }); 1002 right = getCell(Bytes.toBytes("a"), Bytes.toBytes("a"), new byte[] { 0x00, (byte) 0xFF }); 1003 mid = HFileWriterImpl.getMidpoint(CellComparatorImpl.COMPARATOR, left, right); 1004 assertTrue(PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, left, mid) < 0); 1005 assertTrue( 1006 PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, mid, right) == 0); 1007 left = getCell(Bytes.toBytes("a"), Bytes.toBytes("a"), new byte[] { 0x00, 0x12 }); 1008 right = getCell(Bytes.toBytes("a"), Bytes.toBytes("a"), new byte[] { 0x00, 0x12, 0x00 }); 1009 mid = HFileWriterImpl.getMidpoint(CellComparatorImpl.COMPARATOR, left, right); 1010 assertTrue(PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, left, mid) < 0); 1011 assertTrue( 1012 PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, mid, right) == 0); 1013 1014 // Assert that if meta comparator, it returns the right cell -- i.e. no 1015 // optimization done. 1016 left = getCell(Bytes.toBytes("g"), Bytes.toBytes("a"), Bytes.toBytes("a")); 1017 right = getCell(Bytes.toBytes("i"), Bytes.toBytes("a"), Bytes.toBytes("a")); 1018 mid = HFileWriterImpl.getMidpoint(MetaCellComparator.META_COMPARATOR, left, right); 1019 assertTrue(PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, left, mid) < 0); 1020 assertTrue( 1021 PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, mid, right) == 0); 1022 byte[] family = Bytes.toBytes("family"); 1023 byte[] qualA = Bytes.toBytes("qfA"); 1024 byte[] qualB = Bytes.toBytes("qfB"); 1025 final CellComparatorImpl keyComparator = CellComparatorImpl.COMPARATOR; 1026 // verify that faked shorter rowkey could be generated 1027 long ts = 5; 1028 KeyValue kv1 = new KeyValue(Bytes.toBytes("the quick brown fox"), family, qualA, ts, Type.Put); 1029 KeyValue kv2 = new KeyValue(Bytes.toBytes("the who test text"), family, qualA, ts, Type.Put); 1030 ExtendedCell newKey = HFileWriterImpl.getMidpoint(keyComparator, kv1, kv2); 1031 assertTrue(keyComparator.compare(kv1, newKey) < 0); 1032 assertTrue((keyComparator.compare(kv2, newKey)) > 0); 1033 byte[] expectedArray = Bytes.toBytes("the r"); 1034 Bytes.equals(newKey.getRowArray(), newKey.getRowOffset(), newKey.getRowLength(), expectedArray, 1035 0, expectedArray.length); 1036 1037 // verify: same with "row + family + qualifier", return rightKey directly 1038 kv1 = new KeyValue(Bytes.toBytes("ilovehbase"), family, qualA, 5, Type.Put); 1039 kv2 = new KeyValue(Bytes.toBytes("ilovehbase"), family, qualA, 0, Type.Put); 1040 assertTrue(keyComparator.compare(kv1, kv2) < 0); 1041 newKey = HFileWriterImpl.getMidpoint(keyComparator, kv1, kv2); 1042 assertTrue(keyComparator.compare(kv1, newKey) < 0); 1043 assertTrue((keyComparator.compare(kv2, newKey)) == 0); 1044 kv1 = new KeyValue(Bytes.toBytes("ilovehbase"), family, qualA, -5, Type.Put); 1045 kv2 = new KeyValue(Bytes.toBytes("ilovehbase"), family, qualA, -10, Type.Put); 1046 assertTrue(keyComparator.compare(kv1, kv2) < 0); 1047 newKey = HFileWriterImpl.getMidpoint(keyComparator, kv1, kv2); 1048 assertTrue(keyComparator.compare(kv1, newKey) < 0); 1049 assertTrue((keyComparator.compare(kv2, newKey)) == 0); 1050 1051 // verify: same with row, different with qualifier 1052 kv1 = new KeyValue(Bytes.toBytes("ilovehbase"), family, qualA, 5, Type.Put); 1053 kv2 = new KeyValue(Bytes.toBytes("ilovehbase"), family, qualB, 5, Type.Put); 1054 assertTrue(keyComparator.compare(kv1, kv2) < 0); 1055 newKey = HFileWriterImpl.getMidpoint(keyComparator, kv1, kv2); 1056 assertTrue(keyComparator.compare(kv1, newKey) < 0); 1057 assertTrue((keyComparator.compare(kv2, newKey)) > 0); 1058 assertTrue(Arrays.equals(CellUtil.cloneFamily(newKey), family)); 1059 assertTrue(Arrays.equals(CellUtil.cloneQualifier(newKey), qualB)); 1060 assertTrue(newKey.getTimestamp() == HConstants.LATEST_TIMESTAMP); 1061 assertTrue(newKey.getTypeByte() == Type.Maximum.getCode()); 1062 1063 // verify metaKeyComparator's getShortMidpointKey output 1064 final CellComparatorImpl metaKeyComparator = MetaCellComparator.META_COMPARATOR; 1065 kv1 = new KeyValue(Bytes.toBytes("ilovehbase123"), family, qualA, 5, Type.Put); 1066 kv2 = new KeyValue(Bytes.toBytes("ilovehbase234"), family, qualA, 0, Type.Put); 1067 newKey = HFileWriterImpl.getMidpoint(metaKeyComparator, kv1, kv2); 1068 assertTrue(metaKeyComparator.compare(kv1, newKey) < 0); 1069 assertTrue((metaKeyComparator.compare(kv2, newKey) == 0)); 1070 1071 // verify common fix scenario 1072 kv1 = new KeyValue(Bytes.toBytes("ilovehbase"), family, qualA, ts, Type.Put); 1073 kv2 = new KeyValue(Bytes.toBytes("ilovehbaseandhdfs"), family, qualA, ts, Type.Put); 1074 assertTrue(keyComparator.compare(kv1, kv2) < 0); 1075 newKey = HFileWriterImpl.getMidpoint(keyComparator, kv1, kv2); 1076 assertTrue(keyComparator.compare(kv1, newKey) < 0); 1077 assertTrue((keyComparator.compare(kv2, newKey)) > 0); 1078 expectedArray = Bytes.toBytes("ilovehbasea"); 1079 Bytes.equals(newKey.getRowArray(), newKey.getRowOffset(), newKey.getRowLength(), expectedArray, 1080 0, expectedArray.length); 1081 // verify only 1 offset scenario 1082 kv1 = new KeyValue(Bytes.toBytes("100abcdefg"), family, qualA, ts, Type.Put); 1083 kv2 = new KeyValue(Bytes.toBytes("101abcdefg"), family, qualA, ts, Type.Put); 1084 assertTrue(keyComparator.compare(kv1, kv2) < 0); 1085 newKey = HFileWriterImpl.getMidpoint(keyComparator, kv1, kv2); 1086 assertTrue(keyComparator.compare(kv1, newKey) < 0); 1087 assertTrue((keyComparator.compare(kv2, newKey)) > 0); 1088 expectedArray = Bytes.toBytes("101"); 1089 Bytes.equals(newKey.getRowArray(), newKey.getRowOffset(), newKey.getRowLength(), expectedArray, 1090 0, expectedArray.length); 1091 } 1092 1093 @Test 1094 public void testDBEShipped(TestInfo testInfo) throws IOException { 1095 for (DataBlockEncoding encoding : DataBlockEncoding.values()) { 1096 DataBlockEncoder encoder = encoding.getEncoder(); 1097 if (encoder == null) { 1098 continue; 1099 } 1100 Path f = new Path(ROOT_DIR, testInfo.getTestMethod().get().getName() + "_" + encoding); 1101 HFileContext context = 1102 new HFileContextBuilder().withIncludesTags(false).withDataBlockEncoding(encoding).build(); 1103 HFileWriterImpl writer = (HFileWriterImpl) HFile.getWriterFactory(conf, cacheConf) 1104 .withPath(fs, f).withFileContext(context).create(); 1105 1106 KeyValue kv = new KeyValue(Bytes.toBytes("testkey1"), Bytes.toBytes("family"), 1107 Bytes.toBytes("qual"), Bytes.toBytes("testvalue")); 1108 KeyValue kv2 = new KeyValue(Bytes.toBytes("testkey2"), Bytes.toBytes("family"), 1109 Bytes.toBytes("qual"), Bytes.toBytes("testvalue")); 1110 KeyValue kv3 = new KeyValue(Bytes.toBytes("testkey3"), Bytes.toBytes("family"), 1111 Bytes.toBytes("qual"), Bytes.toBytes("testvalue")); 1112 1113 ByteBuffer buffer = ByteBuffer.wrap(kv.getBuffer()); 1114 ByteBuffer buffer2 = ByteBuffer.wrap(kv2.getBuffer()); 1115 ByteBuffer buffer3 = ByteBuffer.wrap(kv3.getBuffer()); 1116 1117 writer.append(new ByteBufferKeyValue(buffer, 0, buffer.remaining())); 1118 writer.beforeShipped(); 1119 1120 // pollute first cell's backing ByteBuffer 1121 ByteBufferUtils.copyFromBufferToBuffer(buffer3, buffer); 1122 1123 // write another cell, if DBE not Shipped, test will fail 1124 writer.append(new ByteBufferKeyValue(buffer2, 0, buffer2.remaining())); 1125 writer.close(); 1126 } 1127 } 1128 1129 /** 1130 * Test case for CombinedBlockCache with TinyLfu as L1 cache 1131 */ 1132 @Test 1133 public void testReaderWithTinyLfuCombinedBlockCache() throws Exception { 1134 testReaderCombinedCache("TinyLfu"); 1135 } 1136 1137 /** 1138 * Test case for CombinedBlockCache with AdaptiveLRU as L1 cache 1139 */ 1140 @Test 1141 public void testReaderWithAdaptiveLruCombinedBlockCache() throws Exception { 1142 testReaderCombinedCache("AdaptiveLRU"); 1143 } 1144 1145 /** 1146 * Test case for CombinedBlockCache with AdaptiveLRU as L1 cache 1147 */ 1148 @Test 1149 public void testReaderWithLruCombinedBlockCache() throws Exception { 1150 testReaderCombinedCache("LRU"); 1151 } 1152 1153 private void testReaderCombinedCache(final String l1CachePolicy) throws Exception { 1154 int bufCount = 1024; 1155 int blockSize = 64 * 1024; 1156 ByteBuffAllocator alloc = initAllocator(true, bufCount, blockSize, 0); 1157 fillByteBuffAllocator(alloc, bufCount); 1158 Path storeFilePath = writeStoreFile(); 1159 // Open the file reader with CombinedBlockCache 1160 BlockCache combined = initCombinedBlockCache(l1CachePolicy); 1161 conf.setBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, true); 1162 CacheConfig cacheConfig = new CacheConfig(conf, null, combined, alloc); 1163 HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConfig, true, conf); 1164 long offset = 0; 1165 Cacheable cachedBlock = null; 1166 while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) { 1167 BlockCacheKey key = new BlockCacheKey(storeFilePath.getName(), offset); 1168 HFileBlock block = reader.readBlock(offset, -1, true, true, false, true, null, null); 1169 offset += block.getOnDiskSizeWithHeader(); 1170 // Read the cached block. 1171 cachedBlock = combined.getBlock(key, false, false, true); 1172 try { 1173 assertNotNull(cachedBlock); 1174 assertTrue(cachedBlock instanceof HFileBlock); 1175 HFileBlock hfb = (HFileBlock) cachedBlock; 1176 // Data block will be cached in BucketCache, so it should be an off-heap block. 1177 if (hfb.getBlockType().isData()) { 1178 assertTrue(hfb.isSharedMem()); 1179 } else if (!l1CachePolicy.equals("TinyLfu")) { 1180 assertFalse(hfb.isSharedMem()); 1181 } 1182 } finally { 1183 cachedBlock.release(); 1184 } 1185 block.release(); // return back the ByteBuffer back to allocator. 1186 } 1187 reader.close(); 1188 combined.shutdown(); 1189 if (cachedBlock != null) { 1190 assertEquals(0, cachedBlock.refCnt()); 1191 } 1192 assertEquals(bufCount, alloc.getFreeBufferCount()); 1193 alloc.clean(); 1194 } 1195 1196 @Test 1197 public void testHFileContextBuilderWithIndexEncoding() throws IOException { 1198 HFileContext context = 1199 new HFileContextBuilder().withIndexBlockEncoding(IndexBlockEncoding.PREFIX_TREE).build(); 1200 HFileContext newContext = new HFileContextBuilder(context).build(); 1201 assertTrue(newContext.getIndexBlockEncoding() == IndexBlockEncoding.PREFIX_TREE); 1202 } 1203}