001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY;
021import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY;
022import static org.apache.hadoop.hbase.HConstants.HFILE_BLOCK_CACHE_SIZE_KEY;
023import static org.apache.hadoop.hbase.io.ByteBuffAllocator.BUFFER_SIZE_KEY;
024import static org.apache.hadoop.hbase.io.ByteBuffAllocator.MAX_BUFFER_COUNT_KEY;
025import static org.apache.hadoop.hbase.io.ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY;
026import static org.apache.hadoop.hbase.io.hfile.HFileBlockIndex.MAX_CHUNK_SIZE_KEY;
027import static org.apache.hadoop.hbase.io.hfile.HFileBlockIndex.MIN_INDEX_NUM_ENTRIES_KEY;
028import static org.junit.jupiter.api.Assertions.assertEquals;
029import static org.junit.jupiter.api.Assertions.assertFalse;
030import static org.junit.jupiter.api.Assertions.assertNotNull;
031import static org.junit.jupiter.api.Assertions.assertNull;
032import static org.junit.jupiter.api.Assertions.assertTrue;
033
034import java.io.IOException;
035import java.util.Random;
036import java.util.stream.Stream;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.fs.FileSystem;
039import org.apache.hadoop.fs.Path;
040import org.apache.hadoop.hbase.ExtendedCell;
041import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate;
042import org.apache.hadoop.hbase.HBaseTestingUtil;
043import org.apache.hadoop.hbase.HConstants;
044import org.apache.hadoop.hbase.KeyValue;
045import org.apache.hadoop.hbase.io.ByteBuffAllocator;
046import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
047import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
048import org.apache.hadoop.hbase.io.hfile.HFileReaderImpl.HFileScannerImpl;
049import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
050import org.apache.hadoop.hbase.io.hfile.bucket.TestBucketCache;
051import org.apache.hadoop.hbase.testclassification.IOTests;
052import org.apache.hadoop.hbase.testclassification.LargeTests;
053import org.apache.hadoop.hbase.util.Bytes;
054import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
055import org.junit.jupiter.api.AfterEach;
056import org.junit.jupiter.api.BeforeAll;
057import org.junit.jupiter.api.BeforeEach;
058import org.junit.jupiter.api.Tag;
059import org.junit.jupiter.api.TestInfo;
060import org.junit.jupiter.api.TestTemplate;
061import org.junit.jupiter.params.provider.Arguments;
062import org.slf4j.Logger;
063import org.slf4j.LoggerFactory;
064
065@Tag(IOTests.TAG)
066@Tag(LargeTests.TAG)
067@HBaseParameterizedTestTemplate(name = "{index}: ioengine={0}")
068public class TestHFileScannerImplReferenceCount {
069
070  public static Stream<Arguments> parameters() {
071    return Stream.of(Arguments.of("file"), Arguments.of("offheap"), Arguments.of("mmap"),
072      Arguments.of("pmem"));
073  }
074
075  private String ioengine;
076
077  public TestHFileScannerImplReferenceCount(String ioengine) {
078    this.ioengine = ioengine;
079  }
080
081  private static final Logger LOG =
082    LoggerFactory.getLogger(TestHFileScannerImplReferenceCount.class);
083  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
084  private static final Random RNG = new Random(9713312); // Just a fixed seed.
085  private static final byte[] FAMILY = Bytes.toBytes("f");
086  private static final byte[] QUALIFIER = Bytes.toBytes("q");
087  private static final byte[] SUFFIX = randLongBytes();
088  private static final int CELL_COUNT = 1000;
089
090  private static byte[] randLongBytes() {
091    byte[] keys = new byte[30];
092    Bytes.random(keys);
093    return keys;
094  }
095
096  // It's a deep copy of configuration of UTIL, DON'T use shallow copy.
097  private Configuration conf;
098  private Path workDir;
099  private FileSystem fs;
100  private Path hfilePath;
101  private ExtendedCell firstCell = null;
102  private ExtendedCell secondCell = null;
103  private ByteBuffAllocator allocator;
104
105  @BeforeAll
106  public static void setUpBeforeClass() {
107    Configuration conf = UTIL.getConfiguration();
108    // Set the max chunk size and min entries key to be very small for index block, so that we can
109    // create an index block tree with level >= 2.
110    conf.setInt(MAX_CHUNK_SIZE_KEY, 10);
111    conf.setInt(MIN_INDEX_NUM_ENTRIES_KEY, 2);
112    // Create a bucket cache with 32MB.
113    conf.set(BUCKET_CACHE_IOENGINE_KEY, "offheap");
114    conf.setInt(BUCKET_CACHE_SIZE_KEY, 32);
115    conf.setInt(BUFFER_SIZE_KEY, 1024);
116    conf.setInt(MAX_BUFFER_COUNT_KEY, 32 * 1024);
117    // All allocated ByteBuff are pooled ByteBuff.
118    conf.setInt(MIN_ALLOCATE_SIZE_KEY, 0);
119  }
120
121  @BeforeEach
122  public void setUp(TestInfo testInfo) throws IOException {
123    String caseName = testInfo.getDisplayName().replaceAll("[^a-zA-Z0-9]", "_");
124    this.workDir = UTIL.getDataTestDir(caseName);
125    if (!"offheap".equals(ioengine)) {
126      ioengine = ioengine + ":" + workDir.toString() + "/cachedata";
127    }
128    UTIL.getConfiguration().set(BUCKET_CACHE_IOENGINE_KEY, ioengine);
129    this.firstCell = null;
130    this.secondCell = null;
131    this.allocator = ByteBuffAllocator.create(UTIL.getConfiguration(), true);
132    this.conf = new Configuration(UTIL.getConfiguration());
133    this.fs = this.workDir.getFileSystem(conf);
134    this.hfilePath = new Path(this.workDir, caseName + EnvironmentEdgeManager.currentTime());
135    LOG.info("Start to write {} cells into hfile: {}, case:{}", CELL_COUNT, hfilePath, caseName);
136  }
137
138  @AfterEach
139  public void tearDown() throws IOException {
140    this.allocator.clean();
141    this.fs.delete(this.workDir, true);
142  }
143
144  private void waitBucketCacheFlushed(BlockCache cache) throws InterruptedException {
145    assertTrue(cache instanceof CombinedBlockCache);
146    BlockCache[] blockCaches = cache.getBlockCaches();
147    assertEquals(blockCaches.length, 2);
148    assertTrue(blockCaches[1] instanceof BucketCache);
149    TestBucketCache.waitUntilAllFlushedToBucket((BucketCache) blockCaches[1]);
150  }
151
152  private void writeHFile(Configuration conf, FileSystem fs, Path hfilePath, Algorithm compression,
153    DataBlockEncoding encoding, int cellCount) throws IOException {
154    HFileContext context =
155      new HFileContextBuilder().withBlockSize(1).withDataBlockEncoding(DataBlockEncoding.NONE)
156        .withCompression(compression).withDataBlockEncoding(encoding).build();
157    try (HFile.Writer writer = new HFile.WriterFactory(conf, new CacheConfig(conf))
158      .withPath(fs, hfilePath).withFileContext(context).create()) {
159      for (int i = 0; i < cellCount; ++i) {
160        byte[] keyBytes = Bytes.add(Bytes.toBytes(i), SUFFIX);
161        // A random-length random value.
162        byte[] valueBytes = RandomKeyValueUtil.randomValue(RNG);
163        KeyValue keyValue =
164          new KeyValue(keyBytes, FAMILY, QUALIFIER, HConstants.LATEST_TIMESTAMP, valueBytes);
165        if (firstCell == null) {
166          firstCell = keyValue;
167        } else if (secondCell == null) {
168          secondCell = keyValue;
169        }
170        writer.append(keyValue);
171      }
172    }
173  }
174
175  /**
176   * A careful UT for validating the reference count mechanism, if want to change this UT please
177   * read the design doc in HBASE-21879 firstly and make sure that understand the refCnt design.
178   */
179  private void testReleaseBlock(Algorithm compression, DataBlockEncoding encoding)
180    throws Exception {
181    writeHFile(conf, fs, hfilePath, compression, encoding, CELL_COUNT);
182    HFileBlock curBlock, prevBlock;
183    BlockCache defaultBC = BlockCacheFactory.createBlockCache(conf);
184    CacheConfig cacheConfig = new CacheConfig(conf, null, defaultBC, allocator);
185    assertNotNull(defaultBC);
186    assertTrue(cacheConfig.isCombinedBlockCache());
187    HFile.Reader reader = HFile.createReader(fs, hfilePath, cacheConfig, true, conf);
188    assertTrue(reader instanceof HFileReaderImpl);
189    // We've build a HFile tree with index = 16.
190    assertEquals(16, reader.getTrailer().getNumDataIndexLevels());
191
192    HFileScannerImpl scanner = (HFileScannerImpl) reader.getScanner(conf, true, true, false);
193    HFileBlock block1 = reader.getDataBlockIndexReader()
194      .loadDataBlockWithScanInfo(firstCell, null, true, true, false, DataBlockEncoding.NONE, reader)
195      .getHFileBlock();
196    waitBucketCacheFlushed(defaultBC);
197    assertTrue(block1.getBlockType().isData());
198    assertFalse(block1 instanceof ExclusiveMemHFileBlock);
199
200    HFileBlock block2 = reader.getDataBlockIndexReader().loadDataBlockWithScanInfo(secondCell, null,
201      true, true, false, DataBlockEncoding.NONE, reader).getHFileBlock();
202    waitBucketCacheFlushed(defaultBC);
203    assertTrue(block2.getBlockType().isData());
204    assertFalse(block2 instanceof ExclusiveMemHFileBlock);
205    // Only one refCnt for RPC path.
206    assertEquals(block1.refCnt(), 1);
207    assertEquals(block2.refCnt(), 1);
208    assertFalse(block1 == block2);
209
210    scanner.seekTo(firstCell);
211    curBlock = scanner.curBlock;
212    this.assertRefCnt(curBlock, 2);
213
214    // Seek to the block again, the curBlock won't change and won't read from BlockCache. so
215    // refCnt should be unchanged.
216    scanner.seekTo(firstCell);
217    assertTrue(curBlock == scanner.curBlock);
218    this.assertRefCnt(curBlock, 2);
219    prevBlock = curBlock;
220
221    scanner.seekTo(secondCell);
222    curBlock = scanner.curBlock;
223    this.assertRefCnt(prevBlock, 2);
224    this.assertRefCnt(curBlock, 2);
225
226    // After shipped, the prevBlock will be release, but curBlock is still referenced by the
227    // curBlock.
228    scanner.shipped();
229    this.assertRefCnt(prevBlock, 1);
230    this.assertRefCnt(curBlock, 2);
231
232    // Try to ship again, though with nothing to client.
233    scanner.shipped();
234    this.assertRefCnt(prevBlock, 1);
235    this.assertRefCnt(curBlock, 2);
236
237    // The curBlock will also be released.
238    scanner.close();
239    this.assertRefCnt(curBlock, 1);
240
241    // Finish the block & block2 RPC path
242    assertTrue(block1.release());
243    assertTrue(block2.release());
244
245    // Evict the LRUBlockCache
246    assertTrue(defaultBC.evictBlocksByHfileName(hfilePath.getName()) >= 2);
247    assertEquals(prevBlock.refCnt(), 0);
248    assertEquals(curBlock.refCnt(), 0);
249
250    int count = 0;
251    assertTrue(scanner.seekTo());
252    ++count;
253    while (scanner.next()) {
254      count++;
255    }
256    assertEquals(CELL_COUNT, count);
257  }
258
259  /**
260   * See HBASE-22480
261   */
262  @TestTemplate
263  public void testSeekBefore() throws Exception {
264    HFileBlock curBlock, prevBlock;
265    writeHFile(conf, fs, hfilePath, Algorithm.NONE, DataBlockEncoding.NONE, CELL_COUNT);
266    BlockCache defaultBC = BlockCacheFactory.createBlockCache(conf);
267    CacheConfig cacheConfig = new CacheConfig(conf, null, defaultBC, allocator);
268    assertNotNull(defaultBC);
269    assertTrue(cacheConfig.isCombinedBlockCache());
270    HFile.Reader reader = HFile.createReader(fs, hfilePath, cacheConfig, true, conf);
271    assertTrue(reader instanceof HFileReaderImpl);
272    // We've build a HFile tree with index = 16.
273    assertEquals(16, reader.getTrailer().getNumDataIndexLevels());
274
275    HFileScannerImpl scanner = (HFileScannerImpl) reader.getScanner(conf, true, true, false);
276    HFileBlock block1 = reader.getDataBlockIndexReader()
277      .loadDataBlockWithScanInfo(firstCell, null, true, true, false, DataBlockEncoding.NONE, reader)
278      .getHFileBlock();
279    assertTrue(block1.getBlockType().isData());
280    assertFalse(block1 instanceof ExclusiveMemHFileBlock);
281    HFileBlock block2 = reader.getDataBlockIndexReader().loadDataBlockWithScanInfo(secondCell, null,
282      true, true, false, DataBlockEncoding.NONE, reader).getHFileBlock();
283    assertTrue(block2.getBlockType().isData());
284    assertFalse(block2 instanceof ExclusiveMemHFileBlock);
285    // Wait until flushed to IOEngine;
286    waitBucketCacheFlushed(defaultBC);
287    // One RPC reference path.
288    assertEquals(block1.refCnt(), 1);
289    assertEquals(block2.refCnt(), 1);
290
291    // Let the curBlock refer to block2.
292    scanner.seekTo(secondCell);
293    curBlock = scanner.curBlock;
294    assertFalse(curBlock == block2);
295    assertEquals(1, block2.refCnt());
296    this.assertRefCnt(curBlock, 2);
297    prevBlock = scanner.curBlock;
298
299    // Release the block1, no other reference.
300    assertTrue(block1.release());
301    assertEquals(0, block1.refCnt());
302    // Release the block2, no other reference.
303    assertTrue(block2.release());
304    assertEquals(0, block2.refCnt());
305
306    // Do the seekBefore: the newBlock will be the previous block of curBlock.
307    assertTrue(scanner.seekBefore(secondCell));
308    assertEquals(scanner.prevBlocks.size(), 1);
309    assertTrue(scanner.prevBlocks.get(0) == prevBlock);
310    curBlock = scanner.curBlock;
311    // the curBlock is read from IOEngine, so a different block.
312    assertFalse(curBlock == block1);
313    // Two reference for curBlock: 1. scanner; 2. blockCache.
314    this.assertRefCnt(curBlock, 2);
315    // Reference count of prevBlock must be unchanged because we haven't shipped.
316    this.assertRefCnt(prevBlock, 2);
317
318    // Do the shipped
319    scanner.shipped();
320    assertEquals(scanner.prevBlocks.size(), 0);
321    assertNotNull(scanner.curBlock);
322    this.assertRefCnt(curBlock, 2);
323    this.assertRefCnt(prevBlock, 1);
324
325    // Do the close
326    scanner.close();
327    assertNull(scanner.curBlock);
328    this.assertRefCnt(curBlock, 1);
329    this.assertRefCnt(prevBlock, 1);
330
331    assertTrue(defaultBC.evictBlocksByHfileName(hfilePath.getName()) >= 2);
332    assertEquals(0, curBlock.refCnt());
333    assertEquals(0, prevBlock.refCnt());
334
335    // Reload the block1 again.
336    block1 = reader.getDataBlockIndexReader()
337      .loadDataBlockWithScanInfo(firstCell, null, true, true, false, DataBlockEncoding.NONE, reader)
338      .getHFileBlock();
339    // Wait until flushed to IOEngine;
340    waitBucketCacheFlushed(defaultBC);
341    assertTrue(block1.getBlockType().isData());
342    assertFalse(block1 instanceof ExclusiveMemHFileBlock);
343    assertTrue(block1.release());
344    assertEquals(0, block1.refCnt());
345    // Re-seek to the begin.
346    assertTrue(scanner.seekTo());
347    curBlock = scanner.curBlock;
348    assertFalse(curBlock == block1);
349    this.assertRefCnt(curBlock, 2);
350    // Return false because firstCell <= c[0]
351    assertFalse(scanner.seekBefore(firstCell));
352    // The block1 shouldn't be released because we still don't do the shipped or close.
353    this.assertRefCnt(curBlock, 2);
354
355    scanner.close();
356    this.assertRefCnt(curBlock, 1);
357    assertTrue(defaultBC.evictBlocksByHfileName(hfilePath.getName()) >= 1);
358    assertEquals(0, curBlock.refCnt());
359  }
360
361  private void assertRefCnt(HFileBlock block, int value) {
362    if (ioengine.startsWith("offheap") || ioengine.startsWith("pmem")) {
363      assertEquals(value, block.refCnt());
364    } else {
365      assertEquals(value - 1, block.refCnt());
366    }
367  }
368
369  @TestTemplate
370  public void testDefault() throws Exception {
371    testReleaseBlock(Algorithm.NONE, DataBlockEncoding.NONE);
372  }
373
374  @TestTemplate
375  public void testCompression() throws Exception {
376    testReleaseBlock(Algorithm.GZ, DataBlockEncoding.NONE);
377  }
378
379  @TestTemplate
380  public void testDataBlockEncoding() throws Exception {
381    testReleaseBlock(Algorithm.NONE, DataBlockEncoding.ROW_INDEX_V1);
382  }
383
384  @TestTemplate
385  public void testDataBlockEncodingAndCompression() throws Exception {
386    testReleaseBlock(Algorithm.GZ, DataBlockEncoding.ROW_INDEX_V1);
387  }
388
389  @TestTemplate
390  public void testWithLruBlockCache() throws Exception {
391    HFileBlock curBlock;
392    writeHFile(conf, fs, hfilePath, Algorithm.NONE, DataBlockEncoding.NONE, CELL_COUNT);
393    // Set LruBlockCache
394    conf.set(BUCKET_CACHE_IOENGINE_KEY, "");
395    BlockCache defaultBC = BlockCacheFactory.createBlockCache(conf);
396    CacheConfig cacheConfig = new CacheConfig(conf, null, defaultBC, allocator);
397    assertNotNull(defaultBC);
398    assertFalse(cacheConfig.isCombinedBlockCache()); // Must be LruBlockCache.
399    HFile.Reader reader = HFile.createReader(fs, hfilePath, cacheConfig, true, conf);
400    assertTrue(reader instanceof HFileReaderImpl);
401    // We've build a HFile tree with index = 16.
402    assertEquals(16, reader.getTrailer().getNumDataIndexLevels());
403
404    HFileScannerImpl scanner = (HFileScannerImpl) reader.getScanner(conf, true, true, false);
405    HFileBlock block1 = reader.getDataBlockIndexReader()
406      .loadDataBlockWithScanInfo(firstCell, null, true, true, false, DataBlockEncoding.NONE, reader)
407      .getHFileBlock();
408    assertTrue(block1.getBlockType().isData());
409    assertTrue(block1 instanceof ExclusiveMemHFileBlock);
410    HFileBlock block2 = reader.getDataBlockIndexReader().loadDataBlockWithScanInfo(secondCell, null,
411      true, true, false, DataBlockEncoding.NONE, reader).getHFileBlock();
412    assertTrue(block2.getBlockType().isData());
413    assertTrue(block2 instanceof ExclusiveMemHFileBlock);
414    // One RPC reference path.
415    assertEquals(block1.refCnt(), 0);
416    assertEquals(block2.refCnt(), 0);
417
418    scanner.seekTo(firstCell);
419    curBlock = scanner.curBlock;
420    assertTrue(curBlock == block1);
421    assertEquals(curBlock.refCnt(), 0);
422    assertTrue(scanner.prevBlocks.isEmpty());
423
424    // Switch to next block
425    scanner.seekTo(secondCell);
426    curBlock = scanner.curBlock;
427    assertTrue(curBlock == block2);
428    assertEquals(curBlock.refCnt(), 0);
429    assertEquals(curBlock.retain().refCnt(), 0);
430    // Only pooled HFileBlock will be kept in prevBlocks and ExclusiveMemHFileBlock will never keep
431    // in prevBlocks.
432    assertTrue(scanner.prevBlocks.isEmpty());
433
434    // close the scanner
435    scanner.close();
436    assertNull(scanner.curBlock);
437    assertTrue(scanner.prevBlocks.isEmpty());
438  }
439
440  @TestTemplate
441  public void testDisabledBlockCache() throws Exception {
442    writeHFile(conf, fs, hfilePath, Algorithm.NONE, DataBlockEncoding.NONE, CELL_COUNT);
443    // Set LruBlockCache
444    conf.setFloat(HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
445    BlockCache defaultBC = BlockCacheFactory.createBlockCache(conf);
446    assertNull(defaultBC);
447    CacheConfig cacheConfig = new CacheConfig(conf, null, defaultBC, allocator);
448    assertFalse(cacheConfig.isCombinedBlockCache()); // Must be LruBlockCache.
449    HFile.Reader reader = HFile.createReader(fs, hfilePath, cacheConfig, true, conf);
450    assertTrue(reader instanceof HFileReaderImpl);
451    // We've build a HFile tree with index = 16.
452    assertEquals(16, reader.getTrailer().getNumDataIndexLevels());
453
454    HFileBlock block1 = reader.getDataBlockIndexReader()
455      .loadDataBlockWithScanInfo(firstCell, null, true, true, false, DataBlockEncoding.NONE, reader)
456      .getHFileBlock();
457
458    assertTrue(block1.isSharedMem());
459    assertTrue(block1 instanceof SharedMemHFileBlock);
460    assertEquals(1, block1.refCnt());
461    assertTrue(block1.release());
462  }
463}