001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY;
021import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY;
022import static org.apache.hadoop.hbase.io.ByteBuffAllocator.BUFFER_SIZE_KEY;
023import static org.apache.hadoop.hbase.io.ByteBuffAllocator.MAX_BUFFER_COUNT_KEY;
024import static org.apache.hadoop.hbase.io.ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY;
025import static org.apache.hadoop.hbase.io.hfile.BlockCacheFactory.BLOCKCACHE_POLICY_KEY;
026import static org.apache.hadoop.hbase.io.hfile.CacheConfig.EVICT_BLOCKS_ON_CLOSE_KEY;
027import static org.junit.jupiter.api.Assertions.assertEquals;
028import static org.junit.jupiter.api.Assertions.assertFalse;
029import static org.junit.jupiter.api.Assertions.assertNotNull;
030import static org.junit.jupiter.api.Assertions.assertNull;
031import static org.junit.jupiter.api.Assertions.assertTrue;
032import static org.junit.jupiter.api.Assertions.fail;
033
034import java.io.DataInput;
035import java.io.DataOutput;
036import java.io.IOException;
037import java.nio.ByteBuffer;
038import java.util.ArrayList;
039import java.util.Arrays;
040import java.util.List;
041import java.util.Objects;
042import java.util.Optional;
043import java.util.Random;
044import java.util.concurrent.ThreadLocalRandom;
045import java.util.concurrent.atomic.AtomicInteger;
046import org.apache.hadoop.conf.Configuration;
047import org.apache.hadoop.fs.FSDataInputStream;
048import org.apache.hadoop.fs.FSDataOutputStream;
049import org.apache.hadoop.fs.FileStatus;
050import org.apache.hadoop.fs.FileSystem;
051import org.apache.hadoop.fs.Path;
052import org.apache.hadoop.hbase.ArrayBackedTag;
053import org.apache.hadoop.hbase.ByteBufferKeyValue;
054import org.apache.hadoop.hbase.Cell;
055import org.apache.hadoop.hbase.CellBuilderType;
056import org.apache.hadoop.hbase.CellComparatorImpl;
057import org.apache.hadoop.hbase.CellUtil;
058import org.apache.hadoop.hbase.ExtendedCell;
059import org.apache.hadoop.hbase.ExtendedCellBuilder;
060import org.apache.hadoop.hbase.ExtendedCellBuilderFactory;
061import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
062import org.apache.hadoop.hbase.HBaseConfiguration;
063import org.apache.hadoop.hbase.HBaseTestingUtil;
064import org.apache.hadoop.hbase.HConstants;
065import org.apache.hadoop.hbase.KeyValue;
066import org.apache.hadoop.hbase.KeyValue.Type;
067import org.apache.hadoop.hbase.KeyValueUtil;
068import org.apache.hadoop.hbase.MetaCellComparator;
069import org.apache.hadoop.hbase.PrivateCellUtil;
070import org.apache.hadoop.hbase.Tag;
071import org.apache.hadoop.hbase.io.ByteBuffAllocator;
072import org.apache.hadoop.hbase.io.compress.Compression;
073import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
074import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
075import org.apache.hadoop.hbase.io.encoding.IndexBlockEncoding;
076import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
077import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
078import org.apache.hadoop.hbase.io.hfile.ReaderContext.ReaderType;
079import org.apache.hadoop.hbase.monitoring.ThreadLocalServerSideScanMetrics;
080import org.apache.hadoop.hbase.nio.ByteBuff;
081import org.apache.hadoop.hbase.nio.RefCnt;
082import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
083import org.apache.hadoop.hbase.testclassification.IOTests;
084import org.apache.hadoop.hbase.testclassification.SmallTests;
085import org.apache.hadoop.hbase.util.ByteBufferUtils;
086import org.apache.hadoop.hbase.util.Bytes;
087import org.apache.hadoop.io.Writable;
088import org.junit.jupiter.api.BeforeAll;
089import org.junit.jupiter.api.Test;
090import org.junit.jupiter.api.TestInfo;
091import org.mockito.Mockito;
092import org.slf4j.Logger;
093import org.slf4j.LoggerFactory;
094
095import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetector;
096
097/**
098 * test hfile features.
099 */
100@org.junit.jupiter.api.Tag(IOTests.TAG)
101@org.junit.jupiter.api.Tag(SmallTests.TAG)
102public class TestHFile {
103
104  private static final Logger LOG = LoggerFactory.getLogger(TestHFile.class);
105  private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length - 2;
106  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
107  private static String ROOT_DIR = TEST_UTIL.getDataTestDir("TestHFile").toString();
108  private final int minBlockSize = 512;
109  private static String localFormatter = "%010d";
110  private static CacheConfig cacheConf;
111  private static Configuration conf;
112  private static FileSystem fs;
113
114  @BeforeAll
115  public static void setUp() throws Exception {
116    conf = TEST_UTIL.getConfiguration();
117    cacheConf = new CacheConfig(conf);
118    fs = TEST_UTIL.getTestFileSystem();
119  }
120
121  public static Reader createReaderFromStream(ReaderContext context, CacheConfig cacheConf,
122    Configuration conf) throws IOException {
123    HFileInfo fileInfo = new HFileInfo(context, conf);
124    Reader preadReader = HFile.createReader(context, fileInfo, cacheConf, conf);
125    fileInfo.initMetaAndIndex(preadReader);
126    preadReader.close();
127    context = new ReaderContextBuilder()
128      .withFileSystemAndPath(context.getFileSystem(), context.getFilePath())
129      .withReaderType(ReaderType.STREAM).build();
130    Reader streamReader = HFile.createReader(context, fileInfo, cacheConf, conf);
131    return streamReader;
132  }
133
134  private ByteBuffAllocator initAllocator(boolean reservoirEnabled, int bufSize, int bufCount,
135    int minAllocSize) {
136    Configuration that = HBaseConfiguration.create(conf);
137    that.setInt(BUFFER_SIZE_KEY, bufSize);
138    that.setInt(MAX_BUFFER_COUNT_KEY, bufCount);
139    // All ByteBuffers will be allocated from the buffers.
140    that.setInt(MIN_ALLOCATE_SIZE_KEY, minAllocSize);
141    return ByteBuffAllocator.create(that, reservoirEnabled);
142  }
143
144  private void fillByteBuffAllocator(ByteBuffAllocator alloc, int bufCount) {
145    // Fill the allocator with bufCount ByteBuffer
146    List<ByteBuff> buffs = new ArrayList<>();
147    for (int i = 0; i < bufCount; i++) {
148      buffs.add(alloc.allocateOneBuffer());
149      assertEquals(alloc.getFreeBufferCount(), 0);
150    }
151    buffs.forEach(ByteBuff::release);
152    assertEquals(alloc.getFreeBufferCount(), bufCount);
153  }
154
155  @Test
156  public void testReaderWithoutBlockCache() throws Exception {
157    int bufCount = 32;
158    // AllByteBuffers will be allocated from the buffers.
159    ByteBuffAllocator alloc = initAllocator(true, 64 * 1024, bufCount, 0);
160    fillByteBuffAllocator(alloc, bufCount);
161    // start write to store file.
162    Path path = writeStoreFile();
163    readStoreFile(path, conf, alloc);
164    assertEquals(bufCount, alloc.getFreeBufferCount());
165    alloc.clean();
166  }
167
168  /**
169   * Test case for HBASE-22127 in LruBlockCache.
170   */
171  @Test
172  public void testReaderWithLRUBlockCache() throws Exception {
173    int bufCount = 1024, blockSize = 64 * 1024;
174    ByteBuffAllocator alloc = initAllocator(true, bufCount, blockSize, 0);
175    fillByteBuffAllocator(alloc, bufCount);
176    Path storeFilePath = writeStoreFile();
177    // Open the file reader with LRUBlockCache
178    BlockCache lru = new LruBlockCache(1024 * 1024 * 32, blockSize, true, conf);
179    CacheConfig cacheConfig = new CacheConfig(conf, null, lru, alloc);
180    HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConfig, true, conf);
181    long offset = 0;
182    while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
183      BlockCacheKey key = new BlockCacheKey(storeFilePath.getName(), offset);
184      HFileBlock block = reader.readBlock(offset, -1, true, true, false, true, null, null);
185      offset += block.getOnDiskSizeWithHeader();
186      // Ensure the block is an heap one.
187      Cacheable cachedBlock = lru.getBlock(key, false, false, true);
188      assertNotNull(cachedBlock);
189      assertTrue(cachedBlock instanceof HFileBlock);
190      assertFalse(((HFileBlock) cachedBlock).isSharedMem());
191      // Should never allocate off-heap block from allocator because ensure that it's LRU.
192      assertEquals(bufCount, alloc.getFreeBufferCount());
193      block.release(); // return back the ByteBuffer back to allocator.
194    }
195    reader.close();
196    assertEquals(bufCount, alloc.getFreeBufferCount());
197    alloc.clean();
198    lru.shutdown();
199  }
200
201  @Test
202  public void testWriterCacheOnWriteSkipDoesNotLeak() throws Exception {
203    int bufCount = 32;
204    int blockSize = 4 * 1024;
205    ByteBuffAllocator alloc = initAllocator(true, blockSize, bufCount, 0);
206    fillByteBuffAllocator(alloc, bufCount);
207    ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);
208    Configuration myConf = HBaseConfiguration.create(conf);
209    myConf.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, true);
210    myConf.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, false);
211    myConf.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, false);
212    final AtomicInteger counter = new AtomicInteger();
213    RefCnt.detector.setLeakListener(new ResourceLeakDetector.LeakListener() {
214      @Override
215      public void onLeak(String s, String s1) {
216        counter.incrementAndGet();
217      }
218    });
219    BlockCache cache = Mockito.mock(BlockCache.class);
220    Mockito.when(cache.shouldCacheBlock(Mockito.any(), Mockito.anyLong(), Mockito.any()))
221      .thenReturn(Optional.of(false));
222    Path hfilePath = new Path(TEST_UTIL.getDataTestDir(), "testWriterCacheOnWriteSkipDoesNotLeak");
223    HFileContext context = new HFileContextBuilder().withBlockSize(blockSize).build();
224
225    try {
226      Writer writer = new HFile.WriterFactory(myConf, new CacheConfig(myConf, null, cache, alloc))
227        .withPath(fs, hfilePath).withFileContext(context).create();
228      try {
229        writer.append(new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("cf"), Bytes.toBytes("q"),
230          HConstants.LATEST_TIMESTAMP, Bytes.toBytes("value")));
231      } finally {
232        writer.close();
233      }
234
235      Mockito.verify(cache).shouldCacheBlock(Mockito.any(), Mockito.anyLong(), Mockito.any());
236      Mockito.verify(cache, Mockito.never()).cacheBlock(Mockito.any(), Mockito.any(),
237        Mockito.anyBoolean(), Mockito.anyBoolean());
238      for (int i = 0; i < 30 && counter.get() == 0; i++) {
239        System.gc();
240        try {
241          Thread.sleep(1000);
242        } catch (InterruptedException e) {
243          Thread.currentThread().interrupt();
244          break;
245        }
246        alloc.allocate(128 * 1024).release();
247      }
248      assertEquals(0, counter.get());
249    } finally {
250      fs.delete(hfilePath, false);
251      alloc.clean();
252    }
253  }
254
255  private void assertBytesReadFromCache(boolean isScanMetricsEnabled) throws Exception {
256    assertBytesReadFromCache(isScanMetricsEnabled, DataBlockEncoding.NONE);
257  }
258
259  private void assertBytesReadFromCache(boolean isScanMetricsEnabled, DataBlockEncoding encoding)
260    throws Exception {
261    // Write a store file
262    Path storeFilePath = writeStoreFile();
263
264    // Initialize the block cache and HFile reader
265    BlockCache lru = BlockCacheFactory.createBlockCache(conf);
266    assertTrue(lru instanceof LruBlockCache);
267    CacheConfig cacheConfig = new CacheConfig(conf, null, lru, ByteBuffAllocator.HEAP);
268    HFileReaderImpl reader =
269      (HFileReaderImpl) HFile.createReader(fs, storeFilePath, cacheConfig, true, conf);
270
271    // Read the first block in HFile from the block cache.
272    final int offset = 0;
273    BlockCacheKey cacheKey = new BlockCacheKey(storeFilePath.getName(), offset);
274    HFileBlock block = (HFileBlock) lru.getBlock(cacheKey, false, false, true);
275    assertNull(block);
276
277    // Assert that first block has not been cached in the block cache and no disk I/O happened to
278    // check that.
279    ThreadLocalServerSideScanMetrics.getBytesReadFromBlockCacheAndReset();
280    ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset();
281    block = reader.getCachedBlock(cacheKey, false, false, true, BlockType.DATA, null);
282    assertEquals(0, ThreadLocalServerSideScanMetrics.getBytesReadFromBlockCacheAndReset());
283    assertEquals(0, ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset());
284
285    // Read the first block from the HFile.
286    block = reader.readBlock(offset, -1, true, true, false, true, BlockType.DATA, null);
287    assertNotNull(block);
288    int bytesReadFromFs = block.getOnDiskSizeWithHeader();
289    if (block.getNextBlockOnDiskSize() > 0) {
290      bytesReadFromFs += block.headerSize();
291    }
292    block.release();
293    // Assert that disk I/O happened to read the first block.
294    assertEquals(isScanMetricsEnabled ? bytesReadFromFs : 0,
295      ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset());
296    assertEquals(0, ThreadLocalServerSideScanMetrics.getBytesReadFromBlockCacheAndReset());
297
298    // Read the first block again and assert that it has been cached in the block cache.
299    block = reader.getCachedBlock(cacheKey, false, false, true, BlockType.DATA, encoding);
300    long bytesReadFromCache = 0;
301    if (encoding == DataBlockEncoding.NONE) {
302      assertNotNull(block);
303      bytesReadFromCache = block.getOnDiskSizeWithHeader();
304      if (block.getNextBlockOnDiskSize() > 0) {
305        bytesReadFromCache += block.headerSize();
306      }
307      block.release();
308      // Assert that bytes read from block cache account for same number of bytes that would have
309      // been read from FS if block cache wasn't there.
310      assertEquals(bytesReadFromFs, bytesReadFromCache);
311    } else {
312      assertNull(block);
313    }
314    assertEquals(isScanMetricsEnabled ? bytesReadFromCache : 0,
315      ThreadLocalServerSideScanMetrics.getBytesReadFromBlockCacheAndReset());
316    assertEquals(0, ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset());
317
318    reader.close();
319  }
320
321  @Test
322  public void testBytesReadFromCache() throws Exception {
323    ThreadLocalServerSideScanMetrics.setScanMetricsEnabled(true);
324    assertBytesReadFromCache(true);
325  }
326
327  @Test
328  public void testBytesReadFromCacheWithScanMetricsDisabled() throws Exception {
329    ThreadLocalServerSideScanMetrics.setScanMetricsEnabled(false);
330    assertBytesReadFromCache(false);
331  }
332
333  @Test
334  public void testBytesReadFromCacheWithInvalidDataEncoding() throws Exception {
335    ThreadLocalServerSideScanMetrics.setScanMetricsEnabled(true);
336    assertBytesReadFromCache(true, DataBlockEncoding.FAST_DIFF);
337  }
338
339  private BlockCache initCombinedBlockCache(final String l1CachePolicy) {
340    Configuration that = HBaseConfiguration.create(conf);
341    that.setFloat(BUCKET_CACHE_SIZE_KEY, 32); // 32MB for bucket cache.
342    that.set(BUCKET_CACHE_IOENGINE_KEY, "offheap");
343    that.set(BLOCKCACHE_POLICY_KEY, l1CachePolicy);
344    BlockCache bc = BlockCacheFactory.createBlockCache(that);
345    assertNotNull(bc);
346    assertTrue(bc instanceof CombinedBlockCache);
347    return bc;
348  }
349
350  /**
351   * Test case for HBASE-22127 in CombinedBlockCache
352   */
353  @Test
354  public void testReaderWithCombinedBlockCache() throws Exception {
355    int bufCount = 1024, blockSize = 64 * 1024;
356    ByteBuffAllocator alloc = initAllocator(true, bufCount, blockSize, 0);
357    fillByteBuffAllocator(alloc, bufCount);
358    Path storeFilePath = writeStoreFile();
359    // Open the file reader with CombinedBlockCache
360    BlockCache combined = initCombinedBlockCache("LRU");
361    conf.setBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, true);
362    CacheConfig cacheConfig = new CacheConfig(conf, null, combined, alloc);
363    HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConfig, true, conf);
364    long offset = 0;
365    while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
366      BlockCacheKey key = new BlockCacheKey(storeFilePath.getName(), offset);
367      HFileBlock block = reader.readBlock(offset, -1, true, true, false, true, null, null);
368      offset += block.getOnDiskSizeWithHeader();
369      // Read the cached block.
370      Cacheable cachedBlock = combined.getBlock(key, false, false, true);
371      try {
372        assertNotNull(cachedBlock);
373        assertTrue(cachedBlock instanceof HFileBlock);
374        HFileBlock hfb = (HFileBlock) cachedBlock;
375        // Data block will be cached in BucketCache, so it should be an off-heap block.
376        if (hfb.getBlockType().isData()) {
377          assertTrue(hfb.isSharedMem());
378        } else {
379          // Non-data block will be cached in LRUBlockCache, so it must be an on-heap block.
380          assertFalse(hfb.isSharedMem());
381        }
382      } finally {
383        cachedBlock.release();
384      }
385      block.release(); // return back the ByteBuffer back to allocator.
386    }
387    reader.close();
388    combined.shutdown();
389    assertEquals(bufCount, alloc.getFreeBufferCount());
390    alloc.clean();
391  }
392
393  /**
394   * Tests that we properly allocate from the off-heap or on-heap when LRUCache is configured. In
395   * this case, the determining factor is whether we end up caching the block or not. So the below
396   * test cases try different permutations of enabling/disabling via CacheConfig and via user
397   * request (cacheblocks), along with different expected block types.
398   */
399  @Test
400  public void testReaderBlockAllocationWithLRUCache() throws IOException {
401    // false because caching is fully enabled
402    testReaderBlockAllocationWithLRUCache(true, true, null, false);
403    // false because we only look at cache config when expectedBlockType is non-null
404    testReaderBlockAllocationWithLRUCache(false, true, null, false);
405    // false because cacheBlock is true and even with cache config is disabled, we still cache
406    // important blocks like indexes
407    testReaderBlockAllocationWithLRUCache(false, true, BlockType.INTERMEDIATE_INDEX, false);
408    // true because since it's a DATA block, we honor the cache config
409    testReaderBlockAllocationWithLRUCache(false, true, BlockType.DATA, true);
410    // true for the following 2 because cacheBlock takes precedence over cache config
411    testReaderBlockAllocationWithLRUCache(true, false, null, true);
412    testReaderBlockAllocationWithLRUCache(true, false, BlockType.INTERMEDIATE_INDEX, false);
413    // false for the following 3 because both cache config and cacheBlock are false.
414    // per above, INDEX would supersede cache config, but not cacheBlock
415    testReaderBlockAllocationWithLRUCache(false, false, null, true);
416    testReaderBlockAllocationWithLRUCache(false, false, BlockType.INTERMEDIATE_INDEX, true);
417    testReaderBlockAllocationWithLRUCache(false, false, BlockType.DATA, true);
418  }
419
420  private void testReaderBlockAllocationWithLRUCache(boolean cacheConfigCacheBlockOnRead,
421    boolean cacheBlock, BlockType blockType, boolean expectSharedMem) throws IOException {
422    int bufCount = 1024, blockSize = 64 * 1024;
423    ByteBuffAllocator alloc = initAllocator(true, blockSize, bufCount, 0);
424    fillByteBuffAllocator(alloc, bufCount);
425    Path storeFilePath = writeStoreFile();
426    Configuration myConf = new Configuration(conf);
427
428    myConf.setBoolean(CacheConfig.CACHE_DATA_ON_READ_KEY, cacheConfigCacheBlockOnRead);
429    // Open the file reader with LRUBlockCache
430    BlockCache lru = new LruBlockCache(1024 * 1024 * 32, blockSize, true, myConf);
431    CacheConfig cacheConfig = new CacheConfig(myConf, null, lru, alloc);
432    HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConfig, true, myConf);
433    long offset = 0;
434    while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
435      long read = readAtOffsetWithAllocationAsserts(alloc, reader, offset, cacheBlock, blockType,
436        expectSharedMem);
437      if (read < 0) {
438        break;
439      }
440
441      offset += read;
442    }
443
444    reader.close();
445    assertEquals(bufCount, alloc.getFreeBufferCount());
446    alloc.clean();
447    lru.shutdown();
448  }
449
450  /**
451   * Tests that we properly allocate from the off-heap or on-heap when CombinedCache is configured.
452   * In this case, we should always use off-heap unless the block is an INDEX (which always goes to
453   * L1 cache which is on-heap)
454   */
455  @Test
456  public void testReaderBlockAllocationWithCombinedCache() throws IOException {
457    // true because caching is fully enabled and block type null
458    testReaderBlockAllocationWithCombinedCache(true, true, null, true);
459    // false because caching is fully enabled, index block type always goes to on-heap L1
460    testReaderBlockAllocationWithCombinedCache(true, true, BlockType.INTERMEDIATE_INDEX, false);
461    // true because cacheBlocks takes precedence over cache config which block type is null
462    testReaderBlockAllocationWithCombinedCache(false, true, null, true);
463    // false because caching is enabled and block type is index, which always goes to L1
464    testReaderBlockAllocationWithCombinedCache(false, true, BlockType.INTERMEDIATE_INDEX, false);
465    // true because since it's a DATA block, we honor the cache config
466    testReaderBlockAllocationWithCombinedCache(false, true, BlockType.DATA, true);
467    // true for the following 2 because cacheBlock takes precedence over cache config
468    // with caching disabled, we always go to off-heap
469    testReaderBlockAllocationWithCombinedCache(true, false, null, true);
470    testReaderBlockAllocationWithCombinedCache(true, false, BlockType.INTERMEDIATE_INDEX, false);
471    // true for the following 3, because with caching disabled we always go to off-heap
472    testReaderBlockAllocationWithCombinedCache(false, false, null, true);
473    testReaderBlockAllocationWithCombinedCache(false, false, BlockType.INTERMEDIATE_INDEX, true);
474    testReaderBlockAllocationWithCombinedCache(false, false, BlockType.DATA, true);
475  }
476
477  private void testReaderBlockAllocationWithCombinedCache(boolean cacheConfigCacheBlockOnRead,
478    boolean cacheBlock, BlockType blockType, boolean expectSharedMem) throws IOException {
479    int bufCount = 1024, blockSize = 64 * 1024;
480    ByteBuffAllocator alloc = initAllocator(true, blockSize, bufCount, 0);
481    fillByteBuffAllocator(alloc, bufCount);
482    Path storeFilePath = writeStoreFile();
483    // Open the file reader with CombinedBlockCache
484    BlockCache combined = initCombinedBlockCache("LRU");
485    Configuration myConf = new Configuration(conf);
486
487    myConf.setBoolean(CacheConfig.CACHE_DATA_ON_READ_KEY, cacheConfigCacheBlockOnRead);
488    myConf.setBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, true);
489
490    CacheConfig cacheConfig = new CacheConfig(myConf, null, combined, alloc);
491    HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConfig, true, myConf);
492    long offset = 0;
493    while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
494      long read = readAtOffsetWithAllocationAsserts(alloc, reader, offset, cacheBlock, blockType,
495        expectSharedMem);
496      if (read < 0) {
497        break;
498      }
499
500      offset += read;
501    }
502
503    reader.close();
504    combined.shutdown();
505    assertEquals(bufCount, alloc.getFreeBufferCount());
506    alloc.clean();
507  }
508
509  private long readAtOffsetWithAllocationAsserts(ByteBuffAllocator alloc, HFile.Reader reader,
510    long offset, boolean cacheBlock, BlockType blockType, boolean expectSharedMem)
511    throws IOException {
512    HFileBlock block;
513    try {
514      block = reader.readBlock(offset, -1, cacheBlock, true, false, true, blockType, null);
515    } catch (IOException e) {
516      if (e.getMessage().contains("Expected block type")) {
517        return -1;
518      }
519      throw e;
520    }
521
522    assertEquals(expectSharedMem, block.isSharedMem());
523
524    if (expectSharedMem) {
525      assertTrue(alloc.getFreeBufferCount() < alloc.getTotalBufferCount());
526    } else {
527      // Should never allocate off-heap block from allocator because ensure that it's LRU.
528      assertEquals(alloc.getTotalBufferCount(), alloc.getFreeBufferCount());
529    }
530
531    try {
532      return block.getOnDiskSizeWithHeader();
533    } finally {
534      block.release(); // return back the ByteBuffer back to allocator.
535    }
536  }
537
538  private void readStoreFile(Path storeFilePath, Configuration conf, ByteBuffAllocator alloc)
539    throws Exception {
540    // Open the file reader with block cache disabled.
541    CacheConfig cache = new CacheConfig(conf, null, null, alloc);
542    HFile.Reader reader = HFile.createReader(fs, storeFilePath, cache, true, conf);
543    long offset = 0;
544    while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
545      HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null, null);
546      offset += block.getOnDiskSizeWithHeader();
547      block.release(); // return back the ByteBuffer back to allocator.
548    }
549    reader.close();
550  }
551
552  private Path writeStoreFile() throws IOException {
553    Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), "TestHFile");
554    HFileContext meta = new HFileContextBuilder().withBlockSize(64 * 1024).build();
555    StoreFileWriter sfw = new StoreFileWriter.Builder(conf, fs).withOutputDir(storeFileParentDir)
556      .withFileContext(meta).build();
557    final int rowLen = 32;
558    Random rand = ThreadLocalRandom.current();
559    for (int i = 0; i < 1000; ++i) {
560      byte[] k = RandomKeyValueUtil.randomOrderedKey(rand, i);
561      byte[] v = RandomKeyValueUtil.randomValue(rand);
562      int cfLen = rand.nextInt(k.length - rowLen + 1);
563      KeyValue kv = new KeyValue(k, 0, rowLen, k, rowLen, cfLen, k, rowLen + cfLen,
564        k.length - rowLen - cfLen, rand.nextLong(), generateKeyType(rand), v, 0, v.length);
565      sfw.append(kv);
566    }
567
568    sfw.close();
569    return sfw.getPath();
570  }
571
572  public static KeyValue.Type generateKeyType(Random rand) {
573    if (rand.nextBoolean()) {
574      // Let's make half of KVs puts.
575      return KeyValue.Type.Put;
576    } else {
577      KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)];
578      if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) {
579        throw new RuntimeException("Generated an invalid key type: " + keyType + ". "
580          + "Probably the layout of KeyValue.Type has changed.");
581      }
582      return keyType;
583    }
584  }
585
586  /**
587   * Test empty HFile. Test all features work reasonably when hfile is empty of entries.
588   */
589  @Test
590  public void testEmptyHFile(TestInfo testInfo) throws IOException {
591    Path f = new Path(ROOT_DIR, testInfo.getTestMethod().get().getName());
592    HFileContext context = new HFileContextBuilder().withIncludesTags(false).build();
593    Writer w =
594      HFile.getWriterFactory(conf, cacheConf).withPath(fs, f).withFileContext(context).create();
595    w.close();
596    Reader r = HFile.createReader(fs, f, cacheConf, true, conf);
597    assertFalse(r.getFirstKey().isPresent());
598    assertFalse(r.getLastKey().isPresent());
599  }
600
601  /**
602   * Create 0-length hfile and show that it fails
603   */
604  @Test
605  public void testCorrupt0LengthHFile(TestInfo testInfo) throws IOException {
606    Path f = new Path(ROOT_DIR, testInfo.getTestMethod().get().getName());
607    FSDataOutputStream fsos = fs.create(f);
608    fsos.close();
609
610    try {
611      Reader r = HFile.createReader(fs, f, cacheConf, true, conf);
612    } catch (CorruptHFileException | IllegalArgumentException che) {
613      // Expected failure
614      return;
615    }
616    fail("Should have thrown exception");
617  }
618
619  @Test
620  public void testCorruptOutOfOrderHFileWrite(TestInfo testInfo) throws IOException {
621    Path path = new Path(ROOT_DIR, testInfo.getTestMethod().get().getName());
622    FSDataOutputStream mockedOutputStream = Mockito.mock(FSDataOutputStream.class);
623    String columnFamily = "MyColumnFamily";
624    String tableName = "MyTableName";
625    HFileContext fileContext =
626      new HFileContextBuilder().withHFileName(testInfo.getTestMethod().get().getName() + "HFile")
627        .withBlockSize(minBlockSize).withColumnFamily(Bytes.toBytes(columnFamily))
628        .withTableName(Bytes.toBytes(tableName)).withHBaseCheckSum(false)
629        .withCompression(Compression.Algorithm.NONE).withCompressTags(false).build();
630    HFileWriterImpl writer =
631      new HFileWriterImpl(conf, cacheConf, path, mockedOutputStream, fileContext);
632    ExtendedCellBuilder cellBuilder =
633      ExtendedCellBuilderFactory.create(CellBuilderType.SHALLOW_COPY);
634    byte[] row = Bytes.toBytes("foo");
635    byte[] qualifier = Bytes.toBytes("qualifier");
636    byte[] cf = Bytes.toBytes(columnFamily);
637    byte[] val = Bytes.toBytes("fooVal");
638    long firstTS = 100L;
639    long secondTS = 101L;
640    ExtendedCell firstCell = cellBuilder.setRow(row).setValue(val).setTimestamp(firstTS)
641      .setQualifier(qualifier).setFamily(cf).setType(Cell.Type.Put).build();
642    ExtendedCell secondCell = cellBuilder.setRow(row).setValue(val).setTimestamp(secondTS)
643      .setQualifier(qualifier).setFamily(cf).setType(Cell.Type.Put).build();
644    // second Cell will sort "higher" than the first because later timestamps should come first
645    writer.append(firstCell);
646    try {
647      writer.append(secondCell);
648    } catch (IOException ie) {
649      String message = ie.getMessage();
650      assertTrue(message.contains("not lexically larger"));
651      assertTrue(message.contains(tableName));
652      assertTrue(message.contains(columnFamily));
653      return;
654    }
655    fail("Exception wasn't thrown even though Cells were appended in the wrong order!");
656  }
657
658  public static void truncateFile(FileSystem fs, Path src, Path dst) throws IOException {
659    FileStatus fst = fs.getFileStatus(src);
660    long len = fst.getLen();
661    len = len / 2;
662
663    // create a truncated hfile
664    FSDataOutputStream fdos = fs.create(dst);
665    byte[] buf = new byte[(int) len];
666    FSDataInputStream fdis = fs.open(src);
667    fdis.read(buf);
668    fdos.write(buf);
669    fdis.close();
670    fdos.close();
671  }
672
673  /**
674   * Create a truncated hfile and verify that exception thrown.
675   */
676  @Test
677  public void testCorruptTruncatedHFile(TestInfo testInfo) throws IOException {
678    Path f = new Path(ROOT_DIR, testInfo.getTestMethod().get().getName());
679    HFileContext context = new HFileContextBuilder().build();
680    Writer w = HFile.getWriterFactory(conf, cacheConf).withPath(this.fs, f).withFileContext(context)
681      .create();
682    writeSomeRecords(w, 0, 100, false);
683    w.close();
684
685    Path trunc = new Path(f.getParent(), "trucated");
686    truncateFile(fs, w.getPath(), trunc);
687
688    try {
689      HFile.createReader(fs, trunc, cacheConf, true, conf);
690    } catch (CorruptHFileException | IllegalArgumentException che) {
691      // Expected failure
692      return;
693    }
694    fail("Should have thrown exception");
695  }
696
697  // write some records into the hfile
698  // write them twice
699  private int writeSomeRecords(Writer writer, int start, int n, boolean useTags)
700    throws IOException {
701    String value = "value";
702    KeyValue kv;
703    for (int i = start; i < (start + n); i++) {
704      String key = String.format(localFormatter, Integer.valueOf(i));
705      if (useTags) {
706        Tag t = new ArrayBackedTag((byte) 1, "myTag1");
707        Tag[] tags = new Tag[1];
708        tags[0] = t;
709        kv = new KeyValue(Bytes.toBytes(key), Bytes.toBytes("family"), Bytes.toBytes("qual"),
710          HConstants.LATEST_TIMESTAMP, Bytes.toBytes(value + key), tags);
711        writer.append(kv);
712      } else {
713        kv = new KeyValue(Bytes.toBytes(key), Bytes.toBytes("family"), Bytes.toBytes("qual"),
714          Bytes.toBytes(value + key));
715        writer.append(kv);
716      }
717    }
718    return (start + n);
719  }
720
721  private void readAllRecords(HFileScanner scanner) throws IOException {
722    readAndCheckbytes(scanner, 0, 100);
723  }
724
725  // read the records and check
726  private int readAndCheckbytes(HFileScanner scanner, int start, int n) throws IOException {
727    String value = "value";
728    int i = start;
729    for (; i < (start + n); i++) {
730      ByteBuffer key = ByteBuffer.wrap(((KeyValue) scanner.getKey()).getKey());
731      ByteBuffer val = scanner.getValue();
732      String keyStr = String.format(localFormatter, Integer.valueOf(i));
733      String valStr = value + keyStr;
734      KeyValue kv = new KeyValue(Bytes.toBytes(keyStr), Bytes.toBytes("family"),
735        Bytes.toBytes("qual"), Bytes.toBytes(valStr));
736      byte[] keyBytes =
737        new KeyValue.KeyOnlyKeyValue(Bytes.toBytes(key), 0, Bytes.toBytes(key).length).getKey();
738      assertTrue(Arrays.equals(kv.getKey(), keyBytes),
739        "bytes for keys do not match " + keyStr + " " + Bytes.toString(Bytes.toBytes(key)));
740      byte[] valBytes = Bytes.toBytes(val);
741      assertTrue(Arrays.equals(Bytes.toBytes(valStr), valBytes),
742        "bytes for vals do not match " + valStr + " " + Bytes.toString(valBytes));
743      if (!scanner.next()) {
744        break;
745      }
746    }
747    assertEquals(i, start + n - 1);
748    return (start + n);
749  }
750
751  private byte[] getSomeKey(int rowId) {
752    KeyValue kv = new KeyValue(Bytes.toBytes(String.format(localFormatter, Integer.valueOf(rowId))),
753      Bytes.toBytes("family"), Bytes.toBytes("qual"), HConstants.LATEST_TIMESTAMP, Type.Put);
754    return kv.getKey();
755  }
756
757  private void writeRecords(Writer writer, boolean useTags) throws IOException {
758    writeSomeRecords(writer, 0, 100, useTags);
759    writer.close();
760  }
761
762  private FSDataOutputStream createFSOutput(Path name) throws IOException {
763    // if (fs.exists(name)) fs.delete(name, true);
764    FSDataOutputStream fout = fs.create(name);
765    return fout;
766  }
767
768  /**
769   * test none codecs
770   */
771  void basicWithSomeCodec(String codec, boolean useTags) throws IOException {
772    if (useTags) {
773      conf.setInt("hfile.format.version", 3);
774    }
775    Path ncHFile = new Path(ROOT_DIR, "basic.hfile." + codec.toString() + useTags);
776    FSDataOutputStream fout = createFSOutput(ncHFile);
777    HFileContext meta = new HFileContextBuilder().withBlockSize(minBlockSize)
778      .withCompression(HFileWriterImpl.compressionByName(codec)).build();
779    Writer writer =
780      HFile.getWriterFactory(conf, cacheConf).withOutputStream(fout).withFileContext(meta).create();
781    LOG.info(Objects.toString(writer));
782    writeRecords(writer, useTags);
783    fout.close();
784    FSDataInputStream fin = fs.open(ncHFile);
785    ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, ncHFile).build();
786    Reader reader = createReaderFromStream(context, cacheConf, conf);
787    LOG.info(cacheConf.toString());
788    // Load up the index.
789    // Get a scanner that caches and that does not use pread.
790    HFileScanner scanner = reader.getScanner(conf, true, false);
791    // Align scanner at start of the file.
792    scanner.seekTo();
793    readAllRecords(scanner);
794    int seekTo = scanner.seekTo(KeyValueUtil.createKeyValueFromKey(getSomeKey(50)));
795    LOG.info("" + seekTo);
796    assertTrue(scanner.seekTo(KeyValueUtil.createKeyValueFromKey(getSomeKey(50))) == 0,
797      "location lookup failed");
798    // read the key and see if it matches
799    ByteBuffer readKey = ByteBuffer.wrap(((KeyValue) scanner.getKey()).getKey());
800    assertTrue(Arrays.equals(getSomeKey(50), Bytes.toBytes(readKey)), "seeked key does not match");
801
802    scanner.seekTo(KeyValueUtil.createKeyValueFromKey(getSomeKey(0)));
803    ByteBuffer val1 = scanner.getValue();
804    scanner.seekTo(KeyValueUtil.createKeyValueFromKey(getSomeKey(0)));
805    ByteBuffer val2 = scanner.getValue();
806    assertTrue(Arrays.equals(Bytes.toBytes(val1), Bytes.toBytes(val2)));
807
808    reader.close();
809    fin.close();
810    fs.delete(ncHFile, true);
811  }
812
813  @Test
814  public void testTFileFeatures() throws IOException {
815    testHFilefeaturesInternals(false);
816    testHFilefeaturesInternals(true);
817  }
818
819  protected void testHFilefeaturesInternals(boolean useTags) throws IOException {
820    basicWithSomeCodec("none", useTags);
821    basicWithSomeCodec("gz", useTags);
822  }
823
824  private void writeNumMetablocks(Writer writer, int n) {
825    for (int i = 0; i < n; i++) {
826      writer.appendMetaBlock("HFileMeta" + i, new Writable() {
827        private int val;
828
829        public Writable setVal(int val) {
830          this.val = val;
831          return this;
832        }
833
834        @Override
835        public void write(DataOutput out) throws IOException {
836          out.write(Bytes.toBytes("something to test" + val));
837        }
838
839        @Override
840        public void readFields(DataInput in) throws IOException {
841        }
842      }.setVal(i));
843    }
844  }
845
846  private void someTestingWithMetaBlock(Writer writer) {
847    writeNumMetablocks(writer, 10);
848  }
849
850  private void readNumMetablocks(Reader reader, int n) throws IOException {
851    for (int i = 0; i < n; i++) {
852      ByteBuff actual = reader.getMetaBlock("HFileMeta" + i, false).getBufferWithoutHeader();
853      ByteBuffer expected = ByteBuffer.wrap(Bytes.toBytes("something to test" + i));
854      assertEquals(Bytes.toStringBinary(expected), Bytes.toStringBinary(actual.array(),
855        actual.arrayOffset() + actual.position(), actual.capacity()), "failed to match metadata");
856    }
857  }
858
859  private void someReadingWithMetaBlock(Reader reader) throws IOException {
860    readNumMetablocks(reader, 10);
861  }
862
863  private void metablocks(final String compress) throws Exception {
864    Path mFile = new Path(ROOT_DIR, "meta.hfile");
865    FSDataOutputStream fout = createFSOutput(mFile);
866    HFileContext meta =
867      new HFileContextBuilder().withCompression(HFileWriterImpl.compressionByName(compress))
868        .withBlockSize(minBlockSize).build();
869    Writer writer =
870      HFile.getWriterFactory(conf, cacheConf).withOutputStream(fout).withFileContext(meta).create();
871    someTestingWithMetaBlock(writer);
872    writer.close();
873    fout.close();
874    ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, mFile).build();
875    Reader reader = createReaderFromStream(context, cacheConf, conf);
876    // No data -- this should return false.
877    assertFalse(reader.getScanner(conf, false, false).seekTo());
878    someReadingWithMetaBlock(reader);
879    fs.delete(mFile, true);
880    reader.close();
881  }
882
883  // test meta blocks for hfiles
884  @Test
885  public void testMetaBlocks() throws Exception {
886    metablocks("none");
887    metablocks("gz");
888  }
889
890  @Test
891  public void testNullMetaBlocks() throws Exception {
892    for (Compression.Algorithm compressAlgo : HBaseCommonTestingUtil.COMPRESSION_ALGORITHMS) {
893      Path mFile = new Path(ROOT_DIR, "nometa_" + compressAlgo + ".hfile");
894      FSDataOutputStream fout = createFSOutput(mFile);
895      HFileContext meta =
896        new HFileContextBuilder().withCompression(compressAlgo).withBlockSize(minBlockSize).build();
897      Writer writer = HFile.getWriterFactory(conf, cacheConf).withOutputStream(fout)
898        .withFileContext(meta).create();
899      KeyValue kv =
900        new KeyValue(Bytes.toBytes("foo"), Bytes.toBytes("f1"), null, Bytes.toBytes("value"));
901      writer.append(kv);
902      writer.close();
903      fout.close();
904      Reader reader = HFile.createReader(fs, mFile, cacheConf, true, conf);
905      assertNull(reader.getMetaBlock("non-existant", false));
906    }
907  }
908
909  /**
910   * Make sure the ordinals for our compression algorithms do not change on us.
911   */
912  @Test
913  public void testCompressionOrdinance() {
914    assertTrue(Compression.Algorithm.LZO.ordinal() == 0);
915    assertTrue(Compression.Algorithm.GZ.ordinal() == 1);
916    assertTrue(Compression.Algorithm.NONE.ordinal() == 2);
917    assertTrue(Compression.Algorithm.SNAPPY.ordinal() == 3);
918    assertTrue(Compression.Algorithm.LZ4.ordinal() == 4);
919  }
920
921  @Test
922  public void testShortMidpointSameQual() {
923    ExtendedCell left =
924      ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(Bytes.toBytes("a"))
925        .setFamily(Bytes.toBytes("a")).setQualifier(Bytes.toBytes("a")).setTimestamp(11)
926        .setType(Type.Maximum.getCode()).setValue(HConstants.EMPTY_BYTE_ARRAY).build();
927    ExtendedCell right =
928      ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(Bytes.toBytes("a"))
929        .setFamily(Bytes.toBytes("a")).setQualifier(Bytes.toBytes("a")).setTimestamp(9)
930        .setType(Type.Maximum.getCode()).setValue(HConstants.EMPTY_BYTE_ARRAY).build();
931    ExtendedCell mid = HFileWriterImpl.getMidpoint(CellComparatorImpl.COMPARATOR, left, right);
932    assertTrue(
933      PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, left, mid) <= 0);
934    assertTrue(
935      PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, mid, right) == 0);
936  }
937
938  private ExtendedCell getCell(byte[] row, byte[] family, byte[] qualifier) {
939    return ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row)
940      .setFamily(family).setQualifier(qualifier).setTimestamp(HConstants.LATEST_TIMESTAMP)
941      .setType(KeyValue.Type.Maximum.getCode()).setValue(HConstants.EMPTY_BYTE_ARRAY).build();
942  }
943
944  @Test
945  public void testGetShortMidpoint() {
946    ExtendedCell left = getCell(Bytes.toBytes("a"), Bytes.toBytes("a"), Bytes.toBytes("a"));
947    ExtendedCell right = getCell(Bytes.toBytes("a"), Bytes.toBytes("a"), Bytes.toBytes("a"));
948    ExtendedCell mid = HFileWriterImpl.getMidpoint(CellComparatorImpl.COMPARATOR, left, right);
949    assertTrue(
950      PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, left, mid) <= 0);
951    assertTrue(
952      PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, mid, right) <= 0);
953    left = getCell(Bytes.toBytes("a"), Bytes.toBytes("a"), Bytes.toBytes("a"));
954    right = getCell(Bytes.toBytes("b"), Bytes.toBytes("a"), Bytes.toBytes("a"));
955    mid = HFileWriterImpl.getMidpoint(CellComparatorImpl.COMPARATOR, left, right);
956    assertTrue(PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, left, mid) < 0);
957    assertTrue(
958      PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, mid, right) <= 0);
959    left = getCell(Bytes.toBytes("g"), Bytes.toBytes("a"), Bytes.toBytes("a"));
960    right = getCell(Bytes.toBytes("i"), Bytes.toBytes("a"), Bytes.toBytes("a"));
961    mid = HFileWriterImpl.getMidpoint(CellComparatorImpl.COMPARATOR, left, right);
962    assertTrue(PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, left, mid) < 0);
963    assertTrue(
964      PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, mid, right) <= 0);
965    left = getCell(Bytes.toBytes("a"), Bytes.toBytes("a"), Bytes.toBytes("a"));
966    right = getCell(Bytes.toBytes("bbbbbbb"), Bytes.toBytes("a"), Bytes.toBytes("a"));
967    mid = HFileWriterImpl.getMidpoint(CellComparatorImpl.COMPARATOR, left, right);
968    assertTrue(PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, left, mid) < 0);
969    assertTrue(
970      PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, mid, right) < 0);
971    assertEquals(1, mid.getRowLength());
972    left = getCell(Bytes.toBytes("a"), Bytes.toBytes("a"), Bytes.toBytes("a"));
973    right = getCell(Bytes.toBytes("b"), Bytes.toBytes("a"), Bytes.toBytes("a"));
974    mid = HFileWriterImpl.getMidpoint(CellComparatorImpl.COMPARATOR, left, right);
975    assertTrue(PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, left, mid) < 0);
976    assertTrue(
977      PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, mid, right) <= 0);
978    left = getCell(Bytes.toBytes("a"), Bytes.toBytes("a"), Bytes.toBytes("a"));
979    right = getCell(Bytes.toBytes("a"), Bytes.toBytes("aaaaaaaa"), Bytes.toBytes("b"));
980    mid = HFileWriterImpl.getMidpoint(CellComparatorImpl.COMPARATOR, left, right);
981    assertTrue(PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, left, mid) < 0);
982    assertTrue(
983      PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, mid, right) < 0);
984    assertEquals(2, mid.getFamilyLength());
985    left = getCell(Bytes.toBytes("a"), Bytes.toBytes("a"), Bytes.toBytes("a"));
986    right = getCell(Bytes.toBytes("a"), Bytes.toBytes("a"), Bytes.toBytes("aaaaaaaaa"));
987    mid = HFileWriterImpl.getMidpoint(CellComparatorImpl.COMPARATOR, left, right);
988    assertTrue(PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, left, mid) < 0);
989    assertTrue(
990      PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, mid, right) < 0);
991    assertEquals(2, mid.getQualifierLength());
992    left = getCell(Bytes.toBytes("a"), Bytes.toBytes("a"), Bytes.toBytes("a"));
993    right = getCell(Bytes.toBytes("a"), Bytes.toBytes("a"), Bytes.toBytes("b"));
994    mid = HFileWriterImpl.getMidpoint(CellComparatorImpl.COMPARATOR, left, right);
995    assertTrue(PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, left, mid) < 0);
996    assertTrue(
997      PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, mid, right) <= 0);
998    assertEquals(1, mid.getQualifierLength());
999
1000    // Verify boundary conditions
1001    left = getCell(Bytes.toBytes("a"), Bytes.toBytes("a"), new byte[] { 0x00, (byte) 0xFE });
1002    right = getCell(Bytes.toBytes("a"), Bytes.toBytes("a"), new byte[] { 0x00, (byte) 0xFF });
1003    mid = HFileWriterImpl.getMidpoint(CellComparatorImpl.COMPARATOR, left, right);
1004    assertTrue(PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, left, mid) < 0);
1005    assertTrue(
1006      PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, mid, right) == 0);
1007    left = getCell(Bytes.toBytes("a"), Bytes.toBytes("a"), new byte[] { 0x00, 0x12 });
1008    right = getCell(Bytes.toBytes("a"), Bytes.toBytes("a"), new byte[] { 0x00, 0x12, 0x00 });
1009    mid = HFileWriterImpl.getMidpoint(CellComparatorImpl.COMPARATOR, left, right);
1010    assertTrue(PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, left, mid) < 0);
1011    assertTrue(
1012      PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, mid, right) == 0);
1013
1014    // Assert that if meta comparator, it returns the right cell -- i.e. no
1015    // optimization done.
1016    left = getCell(Bytes.toBytes("g"), Bytes.toBytes("a"), Bytes.toBytes("a"));
1017    right = getCell(Bytes.toBytes("i"), Bytes.toBytes("a"), Bytes.toBytes("a"));
1018    mid = HFileWriterImpl.getMidpoint(MetaCellComparator.META_COMPARATOR, left, right);
1019    assertTrue(PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, left, mid) < 0);
1020    assertTrue(
1021      PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, mid, right) == 0);
1022    byte[] family = Bytes.toBytes("family");
1023    byte[] qualA = Bytes.toBytes("qfA");
1024    byte[] qualB = Bytes.toBytes("qfB");
1025    final CellComparatorImpl keyComparator = CellComparatorImpl.COMPARATOR;
1026    // verify that faked shorter rowkey could be generated
1027    long ts = 5;
1028    KeyValue kv1 = new KeyValue(Bytes.toBytes("the quick brown fox"), family, qualA, ts, Type.Put);
1029    KeyValue kv2 = new KeyValue(Bytes.toBytes("the who test text"), family, qualA, ts, Type.Put);
1030    ExtendedCell newKey = HFileWriterImpl.getMidpoint(keyComparator, kv1, kv2);
1031    assertTrue(keyComparator.compare(kv1, newKey) < 0);
1032    assertTrue((keyComparator.compare(kv2, newKey)) > 0);
1033    byte[] expectedArray = Bytes.toBytes("the r");
1034    Bytes.equals(newKey.getRowArray(), newKey.getRowOffset(), newKey.getRowLength(), expectedArray,
1035      0, expectedArray.length);
1036
1037    // verify: same with "row + family + qualifier", return rightKey directly
1038    kv1 = new KeyValue(Bytes.toBytes("ilovehbase"), family, qualA, 5, Type.Put);
1039    kv2 = new KeyValue(Bytes.toBytes("ilovehbase"), family, qualA, 0, Type.Put);
1040    assertTrue(keyComparator.compare(kv1, kv2) < 0);
1041    newKey = HFileWriterImpl.getMidpoint(keyComparator, kv1, kv2);
1042    assertTrue(keyComparator.compare(kv1, newKey) < 0);
1043    assertTrue((keyComparator.compare(kv2, newKey)) == 0);
1044    kv1 = new KeyValue(Bytes.toBytes("ilovehbase"), family, qualA, -5, Type.Put);
1045    kv2 = new KeyValue(Bytes.toBytes("ilovehbase"), family, qualA, -10, Type.Put);
1046    assertTrue(keyComparator.compare(kv1, kv2) < 0);
1047    newKey = HFileWriterImpl.getMidpoint(keyComparator, kv1, kv2);
1048    assertTrue(keyComparator.compare(kv1, newKey) < 0);
1049    assertTrue((keyComparator.compare(kv2, newKey)) == 0);
1050
1051    // verify: same with row, different with qualifier
1052    kv1 = new KeyValue(Bytes.toBytes("ilovehbase"), family, qualA, 5, Type.Put);
1053    kv2 = new KeyValue(Bytes.toBytes("ilovehbase"), family, qualB, 5, Type.Put);
1054    assertTrue(keyComparator.compare(kv1, kv2) < 0);
1055    newKey = HFileWriterImpl.getMidpoint(keyComparator, kv1, kv2);
1056    assertTrue(keyComparator.compare(kv1, newKey) < 0);
1057    assertTrue((keyComparator.compare(kv2, newKey)) > 0);
1058    assertTrue(Arrays.equals(CellUtil.cloneFamily(newKey), family));
1059    assertTrue(Arrays.equals(CellUtil.cloneQualifier(newKey), qualB));
1060    assertTrue(newKey.getTimestamp() == HConstants.LATEST_TIMESTAMP);
1061    assertTrue(newKey.getTypeByte() == Type.Maximum.getCode());
1062
1063    // verify metaKeyComparator's getShortMidpointKey output
1064    final CellComparatorImpl metaKeyComparator = MetaCellComparator.META_COMPARATOR;
1065    kv1 = new KeyValue(Bytes.toBytes("ilovehbase123"), family, qualA, 5, Type.Put);
1066    kv2 = new KeyValue(Bytes.toBytes("ilovehbase234"), family, qualA, 0, Type.Put);
1067    newKey = HFileWriterImpl.getMidpoint(metaKeyComparator, kv1, kv2);
1068    assertTrue(metaKeyComparator.compare(kv1, newKey) < 0);
1069    assertTrue((metaKeyComparator.compare(kv2, newKey) == 0));
1070
1071    // verify common fix scenario
1072    kv1 = new KeyValue(Bytes.toBytes("ilovehbase"), family, qualA, ts, Type.Put);
1073    kv2 = new KeyValue(Bytes.toBytes("ilovehbaseandhdfs"), family, qualA, ts, Type.Put);
1074    assertTrue(keyComparator.compare(kv1, kv2) < 0);
1075    newKey = HFileWriterImpl.getMidpoint(keyComparator, kv1, kv2);
1076    assertTrue(keyComparator.compare(kv1, newKey) < 0);
1077    assertTrue((keyComparator.compare(kv2, newKey)) > 0);
1078    expectedArray = Bytes.toBytes("ilovehbasea");
1079    Bytes.equals(newKey.getRowArray(), newKey.getRowOffset(), newKey.getRowLength(), expectedArray,
1080      0, expectedArray.length);
1081    // verify only 1 offset scenario
1082    kv1 = new KeyValue(Bytes.toBytes("100abcdefg"), family, qualA, ts, Type.Put);
1083    kv2 = new KeyValue(Bytes.toBytes("101abcdefg"), family, qualA, ts, Type.Put);
1084    assertTrue(keyComparator.compare(kv1, kv2) < 0);
1085    newKey = HFileWriterImpl.getMidpoint(keyComparator, kv1, kv2);
1086    assertTrue(keyComparator.compare(kv1, newKey) < 0);
1087    assertTrue((keyComparator.compare(kv2, newKey)) > 0);
1088    expectedArray = Bytes.toBytes("101");
1089    Bytes.equals(newKey.getRowArray(), newKey.getRowOffset(), newKey.getRowLength(), expectedArray,
1090      0, expectedArray.length);
1091  }
1092
1093  @Test
1094  public void testDBEShipped(TestInfo testInfo) throws IOException {
1095    for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
1096      DataBlockEncoder encoder = encoding.getEncoder();
1097      if (encoder == null) {
1098        continue;
1099      }
1100      Path f = new Path(ROOT_DIR, testInfo.getTestMethod().get().getName() + "_" + encoding);
1101      HFileContext context =
1102        new HFileContextBuilder().withIncludesTags(false).withDataBlockEncoding(encoding).build();
1103      HFileWriterImpl writer = (HFileWriterImpl) HFile.getWriterFactory(conf, cacheConf)
1104        .withPath(fs, f).withFileContext(context).create();
1105
1106      KeyValue kv = new KeyValue(Bytes.toBytes("testkey1"), Bytes.toBytes("family"),
1107        Bytes.toBytes("qual"), Bytes.toBytes("testvalue"));
1108      KeyValue kv2 = new KeyValue(Bytes.toBytes("testkey2"), Bytes.toBytes("family"),
1109        Bytes.toBytes("qual"), Bytes.toBytes("testvalue"));
1110      KeyValue kv3 = new KeyValue(Bytes.toBytes("testkey3"), Bytes.toBytes("family"),
1111        Bytes.toBytes("qual"), Bytes.toBytes("testvalue"));
1112
1113      ByteBuffer buffer = ByteBuffer.wrap(kv.getBuffer());
1114      ByteBuffer buffer2 = ByteBuffer.wrap(kv2.getBuffer());
1115      ByteBuffer buffer3 = ByteBuffer.wrap(kv3.getBuffer());
1116
1117      writer.append(new ByteBufferKeyValue(buffer, 0, buffer.remaining()));
1118      writer.beforeShipped();
1119
1120      // pollute first cell's backing ByteBuffer
1121      ByteBufferUtils.copyFromBufferToBuffer(buffer3, buffer);
1122
1123      // write another cell, if DBE not Shipped, test will fail
1124      writer.append(new ByteBufferKeyValue(buffer2, 0, buffer2.remaining()));
1125      writer.close();
1126    }
1127  }
1128
1129  /**
1130   * Test case for CombinedBlockCache with TinyLfu as L1 cache
1131   */
1132  @Test
1133  public void testReaderWithTinyLfuCombinedBlockCache() throws Exception {
1134    testReaderCombinedCache("TinyLfu");
1135  }
1136
1137  /**
1138   * Test case for CombinedBlockCache with AdaptiveLRU as L1 cache
1139   */
1140  @Test
1141  public void testReaderWithAdaptiveLruCombinedBlockCache() throws Exception {
1142    testReaderCombinedCache("AdaptiveLRU");
1143  }
1144
1145  /**
1146   * Test case for CombinedBlockCache with AdaptiveLRU as L1 cache
1147   */
1148  @Test
1149  public void testReaderWithLruCombinedBlockCache() throws Exception {
1150    testReaderCombinedCache("LRU");
1151  }
1152
1153  private void testReaderCombinedCache(final String l1CachePolicy) throws Exception {
1154    int bufCount = 1024;
1155    int blockSize = 64 * 1024;
1156    ByteBuffAllocator alloc = initAllocator(true, bufCount, blockSize, 0);
1157    fillByteBuffAllocator(alloc, bufCount);
1158    Path storeFilePath = writeStoreFile();
1159    // Open the file reader with CombinedBlockCache
1160    BlockCache combined = initCombinedBlockCache(l1CachePolicy);
1161    conf.setBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, true);
1162    CacheConfig cacheConfig = new CacheConfig(conf, null, combined, alloc);
1163    HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConfig, true, conf);
1164    long offset = 0;
1165    Cacheable cachedBlock = null;
1166    while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
1167      BlockCacheKey key = new BlockCacheKey(storeFilePath.getName(), offset);
1168      HFileBlock block = reader.readBlock(offset, -1, true, true, false, true, null, null);
1169      offset += block.getOnDiskSizeWithHeader();
1170      // Read the cached block.
1171      cachedBlock = combined.getBlock(key, false, false, true);
1172      try {
1173        assertNotNull(cachedBlock);
1174        assertTrue(cachedBlock instanceof HFileBlock);
1175        HFileBlock hfb = (HFileBlock) cachedBlock;
1176        // Data block will be cached in BucketCache, so it should be an off-heap block.
1177        if (hfb.getBlockType().isData()) {
1178          assertTrue(hfb.isSharedMem());
1179        } else if (!l1CachePolicy.equals("TinyLfu")) {
1180          assertFalse(hfb.isSharedMem());
1181        }
1182      } finally {
1183        cachedBlock.release();
1184      }
1185      block.release(); // return back the ByteBuffer back to allocator.
1186    }
1187    reader.close();
1188    combined.shutdown();
1189    if (cachedBlock != null) {
1190      assertEquals(0, cachedBlock.refCnt());
1191    }
1192    assertEquals(bufCount, alloc.getFreeBufferCount());
1193    alloc.clean();
1194  }
1195
1196  @Test
1197  public void testHFileContextBuilderWithIndexEncoding() throws IOException {
1198    HFileContext context =
1199      new HFileContextBuilder().withIndexBlockEncoding(IndexBlockEncoding.PREFIX_TREE).build();
1200    HFileContext newContext = new HFileContextBuilder(context).build();
1201    assertTrue(newContext.getIndexBlockEncoding() == IndexBlockEncoding.PREFIX_TREE);
1202  }
1203}