001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.io.hfile;
020
021import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY;
022import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY;
023import static org.apache.hadoop.hbase.HConstants.HFILE_BLOCK_CACHE_SIZE_KEY;
024import static org.apache.hadoop.hbase.io.ByteBuffAllocator.BUFFER_SIZE_KEY;
025import static org.apache.hadoop.hbase.io.ByteBuffAllocator.MAX_BUFFER_COUNT_KEY;
026import static org.apache.hadoop.hbase.io.ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY;
027import static org.apache.hadoop.hbase.io.hfile.HFileBlockIndex.MAX_CHUNK_SIZE_KEY;
028import static org.apache.hadoop.hbase.io.hfile.HFileBlockIndex.MIN_INDEX_NUM_ENTRIES_KEY;
029import static org.junit.Assert.assertEquals;
030
031import java.io.IOException;
032import java.util.Arrays;
033import java.util.Collection;
034import java.util.Random;
035
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.fs.FileSystem;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.hbase.Cell;
040import org.apache.hadoop.hbase.HBaseClassTestRule;
041import org.apache.hadoop.hbase.HBaseTestingUtil;
042import org.apache.hadoop.hbase.HConstants;
043import org.apache.hadoop.hbase.KeyValue;
044import org.apache.hadoop.hbase.io.ByteBuffAllocator;
045import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
046import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
047import org.apache.hadoop.hbase.io.hfile.HFileReaderImpl.HFileScannerImpl;
048import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
049import org.apache.hadoop.hbase.io.hfile.bucket.TestBucketCache;
050import org.apache.hadoop.hbase.testclassification.IOTests;
051import org.apache.hadoop.hbase.testclassification.LargeTests;
052import org.apache.hadoop.hbase.util.Bytes;
053import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
054import org.junit.After;
055import org.junit.Assert;
056import org.junit.Before;
057import org.junit.BeforeClass;
058import org.junit.ClassRule;
059import org.junit.Rule;
060import org.junit.Test;
061import org.junit.experimental.categories.Category;
062import org.junit.rules.TestName;
063import org.junit.runner.RunWith;
064import org.junit.runners.Parameterized;
065import org.junit.runners.Parameterized.Parameter;
066import org.junit.runners.Parameterized.Parameters;
067import org.slf4j.Logger;
068import org.slf4j.LoggerFactory;
069
070@RunWith(Parameterized.class)
071@Category({ IOTests.class, LargeTests.class })
072public class TestHFileScannerImplReferenceCount {
073
074  @ClassRule
075  public static final HBaseClassTestRule CLASS_RULE =
076      HBaseClassTestRule.forClass(TestHFileScannerImplReferenceCount.class);
077
078  @Rule
079  public TestName CASE = new TestName();
080
081  @Parameters(name = "{index}: ioengine={0}")
082  public static Collection<Object[]> data() {
083    return Arrays.asList(new Object[] { "file" }, new Object[] { "offheap" },
084      new Object[] { "mmap" }, new Object[] { "pmem" });
085  }
086
087  @Parameter
088  public String ioengine;
089
090  private static final Logger LOG =
091      LoggerFactory.getLogger(TestHFileScannerImplReferenceCount.class);
092  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
093  private static final byte[] FAMILY = Bytes.toBytes("f");
094  private static final byte[] QUALIFIER = Bytes.toBytes("q");
095  private static final byte[] SUFFIX = randLongBytes();
096  private static final int CELL_COUNT = 1000;
097
098  private static byte[] randLongBytes() {
099    Random rand = new Random();
100    byte[] keys = new byte[30];
101    rand.nextBytes(keys);
102    return keys;
103  }
104
105  // It's a deep copy of configuration of UTIL, DON'T use shallow copy.
106  private Configuration conf;
107  private Path workDir;
108  private FileSystem fs;
109  private Path hfilePath;
110  private Cell firstCell = null;
111  private Cell secondCell = null;
112  private ByteBuffAllocator allocator;
113
114  @BeforeClass
115  public static void setUpBeforeClass() {
116    Configuration conf = UTIL.getConfiguration();
117    // Set the max chunk size and min entries key to be very small for index block, so that we can
118    // create an index block tree with level >= 2.
119    conf.setInt(MAX_CHUNK_SIZE_KEY, 10);
120    conf.setInt(MIN_INDEX_NUM_ENTRIES_KEY, 2);
121    // Create a bucket cache with 32MB.
122    conf.set(BUCKET_CACHE_IOENGINE_KEY, "offheap");
123    conf.setInt(BUCKET_CACHE_SIZE_KEY, 32);
124    conf.setInt(BUFFER_SIZE_KEY, 1024);
125    conf.setInt(MAX_BUFFER_COUNT_KEY, 32 * 1024);
126    // All allocated ByteBuff are pooled ByteBuff.
127    conf.setInt(MIN_ALLOCATE_SIZE_KEY, 0);
128  }
129
130  @Before
131  public void setUp() throws IOException {
132    String caseName = CASE.getMethodName().replaceAll("[^a-zA-Z0-9]", "_");
133    this.workDir = UTIL.getDataTestDir(caseName);
134    if (!"offheap".equals(ioengine)) {
135      ioengine = ioengine + ":" + workDir.toString() + "/cachedata";
136    }
137    UTIL.getConfiguration().set(BUCKET_CACHE_IOENGINE_KEY, ioengine);
138    this.firstCell = null;
139    this.secondCell = null;
140    this.allocator = ByteBuffAllocator.create(UTIL.getConfiguration(), true);
141    this.conf = new Configuration(UTIL.getConfiguration());
142    this.fs = this.workDir.getFileSystem(conf);
143    this.hfilePath = new Path(this.workDir, caseName + EnvironmentEdgeManager.currentTime());
144    LOG.info("Start to write {} cells into hfile: {}, case:{}", CELL_COUNT, hfilePath, caseName);
145  }
146
147  @After
148  public void tearDown() throws IOException {
149    this.allocator.clean();
150    this.fs.delete(this.workDir, true);
151  }
152
153  private void waitBucketCacheFlushed(BlockCache cache) throws InterruptedException {
154    Assert.assertTrue(cache instanceof CombinedBlockCache);
155    BlockCache[] blockCaches = cache.getBlockCaches();
156    Assert.assertEquals(blockCaches.length, 2);
157    Assert.assertTrue(blockCaches[1] instanceof BucketCache);
158    TestBucketCache.waitUntilAllFlushedToBucket((BucketCache) blockCaches[1]);
159  }
160
161  private void writeHFile(Configuration conf, FileSystem fs, Path hfilePath, Algorithm compression,
162      DataBlockEncoding encoding, int cellCount) throws IOException {
163    HFileContext context =
164        new HFileContextBuilder().withBlockSize(1).withDataBlockEncoding(DataBlockEncoding.NONE)
165            .withCompression(compression).withDataBlockEncoding(encoding).build();
166    try (HFile.Writer writer =
167        new HFile.WriterFactory(conf, new CacheConfig(conf)).withPath(fs, hfilePath)
168            .withFileContext(context).create()) {
169      Random rand = new Random(9713312); // Just a fixed seed.
170      for (int i = 0; i < cellCount; ++i) {
171        byte[] keyBytes = Bytes.add(Bytes.toBytes(i), SUFFIX);
172
173        // A random-length random value.
174        byte[] valueBytes = RandomKeyValueUtil.randomValue(rand);
175        KeyValue keyValue =
176            new KeyValue(keyBytes, FAMILY, QUALIFIER, HConstants.LATEST_TIMESTAMP, valueBytes);
177        if (firstCell == null) {
178          firstCell = keyValue;
179        } else if (secondCell == null) {
180          secondCell = keyValue;
181        }
182        writer.append(keyValue);
183      }
184    }
185  }
186
187  /**
188   * A careful UT for validating the reference count mechanism, if want to change this UT please
189   * read the design doc in HBASE-21879 firstly and make sure that understand the refCnt design.
190   */
191  private void testReleaseBlock(Algorithm compression, DataBlockEncoding encoding)
192      throws Exception {
193    writeHFile(conf, fs, hfilePath, compression, encoding, CELL_COUNT);
194    HFileBlock curBlock, prevBlock;
195    BlockCache defaultBC = BlockCacheFactory.createBlockCache(conf);
196    CacheConfig cacheConfig = new CacheConfig(conf, null, defaultBC, allocator);
197    Assert.assertNotNull(defaultBC);
198    Assert.assertTrue(cacheConfig.isCombinedBlockCache());
199    HFile.Reader reader = HFile.createReader(fs, hfilePath, cacheConfig, true, conf);
200    Assert.assertTrue(reader instanceof HFileReaderImpl);
201    // We've build a HFile tree with index = 16.
202    Assert.assertEquals(16, reader.getTrailer().getNumDataIndexLevels());
203
204    HFileScannerImpl scanner = (HFileScannerImpl) reader.getScanner(conf, true, true, false);
205    HFileBlock block1 = reader.getDataBlockIndexReader()
206        .loadDataBlockWithScanInfo(firstCell, null, true, true, false,
207            DataBlockEncoding.NONE, reader).getHFileBlock();
208    waitBucketCacheFlushed(defaultBC);
209    Assert.assertTrue(block1.getBlockType().isData());
210    Assert.assertFalse(block1 instanceof ExclusiveMemHFileBlock);
211
212    HFileBlock block2 = reader.getDataBlockIndexReader()
213        .loadDataBlockWithScanInfo(secondCell, null, true, true, false,
214            DataBlockEncoding.NONE, reader).getHFileBlock();
215    waitBucketCacheFlushed(defaultBC);
216    Assert.assertTrue(block2.getBlockType().isData());
217    Assert.assertFalse(block2 instanceof ExclusiveMemHFileBlock);
218    // Only one refCnt for RPC path.
219    Assert.assertEquals(block1.refCnt(), 1);
220    Assert.assertEquals(block2.refCnt(), 1);
221    Assert.assertFalse(block1 == block2);
222
223    scanner.seekTo(firstCell);
224    curBlock = scanner.curBlock;
225    this.assertRefCnt(curBlock, 2);
226
227    // Seek to the block again, the curBlock won't change and won't read from BlockCache. so
228    // refCnt should be unchanged.
229    scanner.seekTo(firstCell);
230    Assert.assertTrue(curBlock == scanner.curBlock);
231    this.assertRefCnt(curBlock, 2);
232    prevBlock = curBlock;
233
234    scanner.seekTo(secondCell);
235    curBlock = scanner.curBlock;
236    this.assertRefCnt(prevBlock, 2);
237    this.assertRefCnt(curBlock, 2);
238
239    // After shipped, the prevBlock will be release, but curBlock is still referenced by the
240    // curBlock.
241    scanner.shipped();
242    this.assertRefCnt(prevBlock, 1);
243    this.assertRefCnt(curBlock, 2);
244
245    // Try to ship again, though with nothing to client.
246    scanner.shipped();
247    this.assertRefCnt(prevBlock, 1);
248    this.assertRefCnt(curBlock, 2);
249
250    // The curBlock will also be released.
251    scanner.close();
252    this.assertRefCnt(curBlock, 1);
253
254    // Finish the block & block2 RPC path
255    Assert.assertTrue(block1.release());
256    Assert.assertTrue(block2.release());
257
258    // Evict the LRUBlockCache
259    Assert.assertTrue(defaultBC.evictBlocksByHfileName(hfilePath.getName()) >= 2);
260    Assert.assertEquals(prevBlock.refCnt(), 0);
261    Assert.assertEquals(curBlock.refCnt(), 0);
262
263    int count = 0;
264    Assert.assertTrue(scanner.seekTo());
265    ++count;
266    while (scanner.next()) {
267      count++;
268    }
269    assertEquals(CELL_COUNT, count);
270  }
271
272  /**
273   * See HBASE-22480
274   */
275  @Test
276  public void testSeekBefore() throws Exception {
277    HFileBlock curBlock, prevBlock;
278    writeHFile(conf, fs, hfilePath, Algorithm.NONE, DataBlockEncoding.NONE, CELL_COUNT);
279    BlockCache defaultBC = BlockCacheFactory.createBlockCache(conf);
280    CacheConfig cacheConfig = new CacheConfig(conf, null, defaultBC, allocator);
281    Assert.assertNotNull(defaultBC);
282    Assert.assertTrue(cacheConfig.isCombinedBlockCache());
283    HFile.Reader reader = HFile.createReader(fs, hfilePath, cacheConfig, true, conf);
284    Assert.assertTrue(reader instanceof HFileReaderImpl);
285    // We've build a HFile tree with index = 16.
286    Assert.assertEquals(16, reader.getTrailer().getNumDataIndexLevels());
287
288    HFileScannerImpl scanner = (HFileScannerImpl) reader.getScanner(conf, true, true, false);
289    HFileBlock block1 = reader.getDataBlockIndexReader()
290        .loadDataBlockWithScanInfo(firstCell, null, true, true, false,
291            DataBlockEncoding.NONE, reader).getHFileBlock();
292    Assert.assertTrue(block1.getBlockType().isData());
293    Assert.assertFalse(block1 instanceof ExclusiveMemHFileBlock);
294    HFileBlock block2 = reader.getDataBlockIndexReader()
295        .loadDataBlockWithScanInfo(secondCell, null, true, true, false,
296            DataBlockEncoding.NONE, reader).getHFileBlock();
297    Assert.assertTrue(block2.getBlockType().isData());
298    Assert.assertFalse(block2 instanceof ExclusiveMemHFileBlock);
299    // Wait until flushed to IOEngine;
300    waitBucketCacheFlushed(defaultBC);
301    // One RPC reference path.
302    Assert.assertEquals(block1.refCnt(), 1);
303    Assert.assertEquals(block2.refCnt(), 1);
304
305    // Let the curBlock refer to block2.
306    scanner.seekTo(secondCell);
307    curBlock = scanner.curBlock;
308    Assert.assertFalse(curBlock == block2);
309    Assert.assertEquals(1, block2.refCnt());
310    this.assertRefCnt(curBlock, 2);
311    prevBlock = scanner.curBlock;
312
313    // Release the block1, no other reference.
314    Assert.assertTrue(block1.release());
315    Assert.assertEquals(0, block1.refCnt());
316    // Release the block2, no other reference.
317    Assert.assertTrue(block2.release());
318    Assert.assertEquals(0, block2.refCnt());
319
320    // Do the seekBefore: the newBlock will be the previous block of curBlock.
321    Assert.assertTrue(scanner.seekBefore(secondCell));
322    Assert.assertEquals(scanner.prevBlocks.size(), 1);
323    Assert.assertTrue(scanner.prevBlocks.get(0) == prevBlock);
324    curBlock = scanner.curBlock;
325    // the curBlock is read from IOEngine, so a different block.
326    Assert.assertFalse(curBlock == block1);
327    // Two reference for curBlock: 1. scanner; 2. blockCache.
328    this.assertRefCnt(curBlock, 2);
329    // Reference count of prevBlock must be unchanged because we haven't shipped.
330    this.assertRefCnt(prevBlock, 2);
331
332    // Do the shipped
333    scanner.shipped();
334    Assert.assertEquals(scanner.prevBlocks.size(), 0);
335    Assert.assertNotNull(scanner.curBlock);
336    this.assertRefCnt(curBlock, 2);
337    this.assertRefCnt(prevBlock, 1);
338
339    // Do the close
340    scanner.close();
341    Assert.assertNull(scanner.curBlock);
342    this.assertRefCnt(curBlock, 1);
343    this.assertRefCnt(prevBlock, 1);
344
345    Assert.assertTrue(defaultBC.evictBlocksByHfileName(hfilePath.getName()) >= 2);
346    Assert.assertEquals(0, curBlock.refCnt());
347    Assert.assertEquals(0, prevBlock.refCnt());
348
349    // Reload the block1 again.
350    block1 = reader.getDataBlockIndexReader()
351        .loadDataBlockWithScanInfo(firstCell, null, true, true, false,
352            DataBlockEncoding.NONE, reader).getHFileBlock();
353    // Wait until flushed to IOEngine;
354    waitBucketCacheFlushed(defaultBC);
355    Assert.assertTrue(block1.getBlockType().isData());
356    Assert.assertFalse(block1 instanceof ExclusiveMemHFileBlock);
357    Assert.assertTrue(block1.release());
358    Assert.assertEquals(0, block1.refCnt());
359    // Re-seek to the begin.
360    Assert.assertTrue(scanner.seekTo());
361    curBlock = scanner.curBlock;
362    Assert.assertFalse(curBlock == block1);
363    this.assertRefCnt(curBlock, 2);
364    // Return false because firstCell <= c[0]
365    Assert.assertFalse(scanner.seekBefore(firstCell));
366    // The block1 shouldn't be released because we still don't do the shipped or close.
367    this.assertRefCnt(curBlock, 2);
368
369    scanner.close();
370    this.assertRefCnt(curBlock, 1);
371    Assert.assertTrue(defaultBC.evictBlocksByHfileName(hfilePath.getName()) >= 1);
372    Assert.assertEquals(0, curBlock.refCnt());
373  }
374
375  private void assertRefCnt(HFileBlock block, int value) {
376    if (ioengine.startsWith("offheap") || ioengine.startsWith("pmem")) {
377      Assert.assertEquals(value, block.refCnt());
378    } else {
379      Assert.assertEquals(value - 1, block.refCnt());
380    }
381  }
382
383  @Test
384  public void testDefault() throws Exception {
385    testReleaseBlock(Algorithm.NONE, DataBlockEncoding.NONE);
386  }
387
388  @Test
389  public void testCompression() throws Exception {
390    testReleaseBlock(Algorithm.GZ, DataBlockEncoding.NONE);
391  }
392
393  @Test
394  public void testDataBlockEncoding() throws Exception {
395    testReleaseBlock(Algorithm.NONE, DataBlockEncoding.ROW_INDEX_V1);
396  }
397
398  @Test
399  public void testDataBlockEncodingAndCompression() throws Exception {
400    testReleaseBlock(Algorithm.GZ, DataBlockEncoding.ROW_INDEX_V1);
401  }
402
403  @Test
404  public void testWithLruBlockCache() throws Exception {
405    HFileBlock curBlock;
406    writeHFile(conf, fs, hfilePath, Algorithm.NONE, DataBlockEncoding.NONE, CELL_COUNT);
407    // Set LruBlockCache
408    conf.set(BUCKET_CACHE_IOENGINE_KEY, "");
409    BlockCache defaultBC = BlockCacheFactory.createBlockCache(conf);
410    CacheConfig cacheConfig = new CacheConfig(conf, null, defaultBC, allocator);
411    Assert.assertNotNull(defaultBC);
412    Assert.assertFalse(cacheConfig.isCombinedBlockCache()); // Must be LruBlockCache.
413    HFile.Reader reader = HFile.createReader(fs, hfilePath, cacheConfig, true, conf);
414    Assert.assertTrue(reader instanceof HFileReaderImpl);
415    // We've build a HFile tree with index = 16.
416    Assert.assertEquals(16, reader.getTrailer().getNumDataIndexLevels());
417
418    HFileScannerImpl scanner = (HFileScannerImpl) reader.getScanner(conf, true, true, false);
419    HFileBlock block1 = reader.getDataBlockIndexReader()
420        .loadDataBlockWithScanInfo(firstCell, null, true, true, false,
421            DataBlockEncoding.NONE, reader).getHFileBlock();
422    Assert.assertTrue(block1.getBlockType().isData());
423    Assert.assertTrue(block1 instanceof ExclusiveMemHFileBlock);
424    HFileBlock block2 = reader.getDataBlockIndexReader()
425        .loadDataBlockWithScanInfo(secondCell, null, true, true, false,
426            DataBlockEncoding.NONE, reader).getHFileBlock();
427    Assert.assertTrue(block2.getBlockType().isData());
428    Assert.assertTrue(block2 instanceof ExclusiveMemHFileBlock);
429    // One RPC reference path.
430    Assert.assertEquals(block1.refCnt(), 0);
431    Assert.assertEquals(block2.refCnt(), 0);
432
433    scanner.seekTo(firstCell);
434    curBlock = scanner.curBlock;
435    Assert.assertTrue(curBlock == block1);
436    Assert.assertEquals(curBlock.refCnt(), 0);
437    Assert.assertTrue(scanner.prevBlocks.isEmpty());
438
439    // Switch to next block
440    scanner.seekTo(secondCell);
441    curBlock = scanner.curBlock;
442    Assert.assertTrue(curBlock == block2);
443    Assert.assertEquals(curBlock.refCnt(), 0);
444    Assert.assertEquals(curBlock.retain().refCnt(), 0);
445    // Only pooled HFileBlock will be kept in prevBlocks and ExclusiveMemHFileBlock will never keep
446    // in prevBlocks.
447    Assert.assertTrue(scanner.prevBlocks.isEmpty());
448
449    // close the scanner
450    scanner.close();
451    Assert.assertNull(scanner.curBlock);
452    Assert.assertTrue(scanner.prevBlocks.isEmpty());
453  }
454
455  @Test
456  public void testDisabledBlockCache() throws Exception {
457    writeHFile(conf, fs, hfilePath, Algorithm.NONE, DataBlockEncoding.NONE, CELL_COUNT);
458    // Set LruBlockCache
459    conf.setFloat(HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
460    BlockCache defaultBC = BlockCacheFactory.createBlockCache(conf);
461    Assert.assertNull(defaultBC);
462    CacheConfig cacheConfig = new CacheConfig(conf, null, defaultBC, allocator);
463    Assert.assertFalse(cacheConfig.isCombinedBlockCache()); // Must be LruBlockCache.
464    HFile.Reader reader = HFile.createReader(fs, hfilePath, cacheConfig, true, conf);
465    Assert.assertTrue(reader instanceof HFileReaderImpl);
466    // We've build a HFile tree with index = 16.
467    Assert.assertEquals(16, reader.getTrailer().getNumDataIndexLevels());
468
469    HFileBlock block1 = reader.getDataBlockIndexReader()
470        .loadDataBlockWithScanInfo(firstCell, null, true, true, false,
471            DataBlockEncoding.NONE, reader).getHFileBlock();
472
473    Assert.assertTrue(block1.isSharedMem());
474    Assert.assertTrue(block1 instanceof SharedMemHFileBlock);
475    Assert.assertEquals(1, block1.refCnt());
476    Assert.assertTrue(block1.release());
477  }
478}