001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.io.hfile;
020
021import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY;
022import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY;
023import static org.apache.hadoop.hbase.HConstants.HFILE_BLOCK_CACHE_SIZE_KEY;
024import static org.apache.hadoop.hbase.io.ByteBuffAllocator.BUFFER_SIZE_KEY;
025import static org.apache.hadoop.hbase.io.ByteBuffAllocator.MAX_BUFFER_COUNT_KEY;
026import static org.apache.hadoop.hbase.io.ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY;
027import static org.apache.hadoop.hbase.io.hfile.HFileBlockIndex.MAX_CHUNK_SIZE_KEY;
028import static org.apache.hadoop.hbase.io.hfile.HFileBlockIndex.MIN_INDEX_NUM_ENTRIES_KEY;
029import static org.junit.Assert.assertEquals;
030
031import java.io.IOException;
032import java.util.Arrays;
033import java.util.Collection;
034import java.util.Random;
035
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.fs.FileSystem;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.hbase.Cell;
040import org.apache.hadoop.hbase.HBaseClassTestRule;
041import org.apache.hadoop.hbase.HBaseTestingUtility;
042import org.apache.hadoop.hbase.HConstants;
043import org.apache.hadoop.hbase.KeyValue;
044import org.apache.hadoop.hbase.io.ByteBuffAllocator;
045import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
046import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
047import org.apache.hadoop.hbase.io.hfile.HFileReaderImpl.HFileScannerImpl;
048import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
049import org.apache.hadoop.hbase.io.hfile.bucket.TestBucketCache;
050import org.apache.hadoop.hbase.testclassification.IOTests;
051import org.apache.hadoop.hbase.testclassification.LargeTests;
052import org.apache.hadoop.hbase.util.Bytes;
053import org.junit.After;
054import org.junit.Assert;
055import org.junit.Before;
056import org.junit.BeforeClass;
057import org.junit.ClassRule;
058import org.junit.Rule;
059import org.junit.Test;
060import org.junit.experimental.categories.Category;
061import org.junit.rules.TestName;
062import org.junit.runner.RunWith;
063import org.junit.runners.Parameterized;
064import org.junit.runners.Parameterized.Parameter;
065import org.junit.runners.Parameterized.Parameters;
066import org.slf4j.Logger;
067import org.slf4j.LoggerFactory;
068
069@RunWith(Parameterized.class)
070@Category({ IOTests.class, LargeTests.class })
071public class TestHFileScannerImplReferenceCount {
072
073  @ClassRule
074  public static final HBaseClassTestRule CLASS_RULE =
075      HBaseClassTestRule.forClass(TestHFileScannerImplReferenceCount.class);
076
077  @Rule
078  public TestName CASE = new TestName();
079
080  @Parameters(name = "{index}: ioengine={0}")
081  public static Collection<Object[]> data() {
082    return Arrays.asList(new Object[] { "file" }, new Object[] { "offheap" },
083      new Object[] { "mmap" }, new Object[] { "pmem" });
084  }
085
086  @Parameter
087  public String ioengine;
088
089  private static final Logger LOG =
090      LoggerFactory.getLogger(TestHFileScannerImplReferenceCount.class);
091  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
092  private static final byte[] FAMILY = Bytes.toBytes("f");
093  private static final byte[] QUALIFIER = Bytes.toBytes("q");
094  private static final byte[] SUFFIX = randLongBytes();
095  private static final int CELL_COUNT = 1000;
096
097  private static byte[] randLongBytes() {
098    Random rand = new Random();
099    byte[] keys = new byte[30];
100    rand.nextBytes(keys);
101    return keys;
102  }
103
104  // It's a deep copy of configuration of UTIL, DON'T use shallow copy.
105  private Configuration conf;
106  private Path workDir;
107  private FileSystem fs;
108  private Path hfilePath;
109  private Cell firstCell = null;
110  private Cell secondCell = null;
111  private ByteBuffAllocator allocator;
112
113  @BeforeClass
114  public static void setUpBeforeClass() {
115    Configuration conf = UTIL.getConfiguration();
116    // Set the max chunk size and min entries key to be very small for index block, so that we can
117    // create an index block tree with level >= 2.
118    conf.setInt(MAX_CHUNK_SIZE_KEY, 10);
119    conf.setInt(MIN_INDEX_NUM_ENTRIES_KEY, 2);
120    // Create a bucket cache with 32MB.
121    conf.set(BUCKET_CACHE_IOENGINE_KEY, "offheap");
122    conf.setInt(BUCKET_CACHE_SIZE_KEY, 32);
123    conf.setInt(BUFFER_SIZE_KEY, 1024);
124    conf.setInt(MAX_BUFFER_COUNT_KEY, 32 * 1024);
125    // All allocated ByteBuff are pooled ByteBuff.
126    conf.setInt(MIN_ALLOCATE_SIZE_KEY, 0);
127  }
128
129  @Before
130  public void setUp() throws IOException {
131    String caseName = CASE.getMethodName().replaceAll("[^a-zA-Z0-9]", "_");
132    this.workDir = UTIL.getDataTestDir(caseName);
133    if (!"offheap".equals(ioengine)) {
134      ioengine = ioengine + ":" + workDir.toString() + "/cachedata";
135    }
136    UTIL.getConfiguration().set(BUCKET_CACHE_IOENGINE_KEY, ioengine);
137    this.firstCell = null;
138    this.secondCell = null;
139    this.allocator = ByteBuffAllocator.create(UTIL.getConfiguration(), true);
140    this.conf = new Configuration(UTIL.getConfiguration());
141    this.fs = this.workDir.getFileSystem(conf);
142    this.hfilePath = new Path(this.workDir, caseName + System.currentTimeMillis());
143    LOG.info("Start to write {} cells into hfile: {}, case:{}", CELL_COUNT, hfilePath, caseName);
144  }
145
146  @After
147  public void tearDown() throws IOException {
148    this.allocator.clean();
149    this.fs.delete(this.workDir, true);
150  }
151
152  private void waitBucketCacheFlushed(BlockCache cache) throws InterruptedException {
153    Assert.assertTrue(cache instanceof CombinedBlockCache);
154    BlockCache[] blockCaches = cache.getBlockCaches();
155    Assert.assertEquals(blockCaches.length, 2);
156    Assert.assertTrue(blockCaches[1] instanceof BucketCache);
157    TestBucketCache.waitUntilAllFlushedToBucket((BucketCache) blockCaches[1]);
158  }
159
160  private void writeHFile(Configuration conf, FileSystem fs, Path hfilePath, Algorithm compression,
161      DataBlockEncoding encoding, int cellCount) throws IOException {
162    HFileContext context =
163        new HFileContextBuilder().withBlockSize(1).withDataBlockEncoding(DataBlockEncoding.NONE)
164            .withCompression(compression).withDataBlockEncoding(encoding).build();
165    try (HFile.Writer writer =
166        new HFile.WriterFactory(conf, new CacheConfig(conf)).withPath(fs, hfilePath)
167            .withFileContext(context).create()) {
168      Random rand = new Random(9713312); // Just a fixed seed.
169      for (int i = 0; i < cellCount; ++i) {
170        byte[] keyBytes = Bytes.add(Bytes.toBytes(i), SUFFIX);
171
172        // A random-length random value.
173        byte[] valueBytes = RandomKeyValueUtil.randomValue(rand);
174        KeyValue keyValue =
175            new KeyValue(keyBytes, FAMILY, QUALIFIER, HConstants.LATEST_TIMESTAMP, valueBytes);
176        if (firstCell == null) {
177          firstCell = keyValue;
178        } else if (secondCell == null) {
179          secondCell = keyValue;
180        }
181        writer.append(keyValue);
182      }
183    }
184  }
185
186  /**
187   * A careful UT for validating the reference count mechanism, if want to change this UT please
188   * read the design doc in HBASE-21879 firstly and make sure that understand the refCnt design.
189   */
190  private void testReleaseBlock(Algorithm compression, DataBlockEncoding encoding)
191      throws Exception {
192    writeHFile(conf, fs, hfilePath, compression, encoding, CELL_COUNT);
193    HFileBlock curBlock, prevBlock;
194    BlockCache defaultBC = BlockCacheFactory.createBlockCache(conf);
195    CacheConfig cacheConfig = new CacheConfig(conf, null, defaultBC, allocator);
196    Assert.assertNotNull(defaultBC);
197    Assert.assertTrue(cacheConfig.isCombinedBlockCache());
198    HFile.Reader reader = HFile.createReader(fs, hfilePath, cacheConfig, true, conf);
199    Assert.assertTrue(reader instanceof HFileReaderImpl);
200    // We've build a HFile tree with index = 16.
201    Assert.assertEquals(16, reader.getTrailer().getNumDataIndexLevels());
202
203    HFileScannerImpl scanner = (HFileScannerImpl) reader.getScanner(true, true, false);
204    HFileBlock block1 = reader.getDataBlockIndexReader()
205        .loadDataBlockWithScanInfo(firstCell, null, true, true, false,
206            DataBlockEncoding.NONE, reader).getHFileBlock();
207    waitBucketCacheFlushed(defaultBC);
208    Assert.assertTrue(block1.getBlockType().isData());
209    Assert.assertFalse(block1 instanceof ExclusiveMemHFileBlock);
210
211    HFileBlock block2 = reader.getDataBlockIndexReader()
212        .loadDataBlockWithScanInfo(secondCell, null, true, true, false,
213            DataBlockEncoding.NONE, reader).getHFileBlock();
214    waitBucketCacheFlushed(defaultBC);
215    Assert.assertTrue(block2.getBlockType().isData());
216    Assert.assertFalse(block2 instanceof ExclusiveMemHFileBlock);
217    // Only one refCnt for RPC path.
218    Assert.assertEquals(block1.refCnt(), 1);
219    Assert.assertEquals(block2.refCnt(), 1);
220    Assert.assertFalse(block1 == block2);
221
222    scanner.seekTo(firstCell);
223    curBlock = scanner.curBlock;
224    this.assertRefCnt(curBlock, 2);
225
226    // Seek to the block again, the curBlock won't change and won't read from BlockCache. so
227    // refCnt should be unchanged.
228    scanner.seekTo(firstCell);
229    Assert.assertTrue(curBlock == scanner.curBlock);
230    this.assertRefCnt(curBlock, 2);
231    prevBlock = curBlock;
232
233    scanner.seekTo(secondCell);
234    curBlock = scanner.curBlock;
235    this.assertRefCnt(prevBlock, 2);
236    this.assertRefCnt(curBlock, 2);
237
238    // After shipped, the prevBlock will be release, but curBlock is still referenced by the
239    // curBlock.
240    scanner.shipped();
241    this.assertRefCnt(prevBlock, 1);
242    this.assertRefCnt(curBlock, 2);
243
244    // Try to ship again, though with nothing to client.
245    scanner.shipped();
246    this.assertRefCnt(prevBlock, 1);
247    this.assertRefCnt(curBlock, 2);
248
249    // The curBlock will also be released.
250    scanner.close();
251    this.assertRefCnt(curBlock, 1);
252
253    // Finish the block & block2 RPC path
254    Assert.assertTrue(block1.release());
255    Assert.assertTrue(block2.release());
256
257    // Evict the LRUBlockCache
258    Assert.assertTrue(defaultBC.evictBlocksByHfileName(hfilePath.getName()) >= 2);
259    Assert.assertEquals(prevBlock.refCnt(), 0);
260    Assert.assertEquals(curBlock.refCnt(), 0);
261
262    int count = 0;
263    Assert.assertTrue(scanner.seekTo());
264    ++count;
265    while (scanner.next()) {
266      count++;
267    }
268    assertEquals(CELL_COUNT, count);
269  }
270
271  /**
272   * See HBASE-22480
273   */
274  @Test
275  public void testSeekBefore() throws Exception {
276    HFileBlock curBlock, prevBlock;
277    writeHFile(conf, fs, hfilePath, Algorithm.NONE, DataBlockEncoding.NONE, CELL_COUNT);
278    BlockCache defaultBC = BlockCacheFactory.createBlockCache(conf);
279    CacheConfig cacheConfig = new CacheConfig(conf, null, defaultBC, allocator);
280    Assert.assertNotNull(defaultBC);
281    Assert.assertTrue(cacheConfig.isCombinedBlockCache());
282    HFile.Reader reader = HFile.createReader(fs, hfilePath, cacheConfig, true, conf);
283    Assert.assertTrue(reader instanceof HFileReaderImpl);
284    // We've build a HFile tree with index = 16.
285    Assert.assertEquals(16, reader.getTrailer().getNumDataIndexLevels());
286
287    HFileScannerImpl scanner = (HFileScannerImpl) reader.getScanner(true, true, false);
288    HFileBlock block1 = reader.getDataBlockIndexReader()
289        .loadDataBlockWithScanInfo(firstCell, null, true, true, false,
290            DataBlockEncoding.NONE, reader).getHFileBlock();
291    Assert.assertTrue(block1.getBlockType().isData());
292    Assert.assertFalse(block1 instanceof ExclusiveMemHFileBlock);
293    HFileBlock block2 = reader.getDataBlockIndexReader()
294        .loadDataBlockWithScanInfo(secondCell, null, true, true, false,
295            DataBlockEncoding.NONE, reader).getHFileBlock();
296    Assert.assertTrue(block2.getBlockType().isData());
297    Assert.assertFalse(block2 instanceof ExclusiveMemHFileBlock);
298    // Wait until flushed to IOEngine;
299    waitBucketCacheFlushed(defaultBC);
300    // One RPC reference path.
301    Assert.assertEquals(block1.refCnt(), 1);
302    Assert.assertEquals(block2.refCnt(), 1);
303
304    // Let the curBlock refer to block2.
305    scanner.seekTo(secondCell);
306    curBlock = scanner.curBlock;
307    Assert.assertFalse(curBlock == block2);
308    Assert.assertEquals(1, block2.refCnt());
309    this.assertRefCnt(curBlock, 2);
310    prevBlock = scanner.curBlock;
311
312    // Release the block1, no other reference.
313    Assert.assertTrue(block1.release());
314    Assert.assertEquals(0, block1.refCnt());
315    // Release the block2, no other reference.
316    Assert.assertTrue(block2.release());
317    Assert.assertEquals(0, block2.refCnt());
318
319    // Do the seekBefore: the newBlock will be the previous block of curBlock.
320    Assert.assertTrue(scanner.seekBefore(secondCell));
321    Assert.assertEquals(scanner.prevBlocks.size(), 1);
322    Assert.assertTrue(scanner.prevBlocks.get(0) == prevBlock);
323    curBlock = scanner.curBlock;
324    // the curBlock is read from IOEngine, so a different block.
325    Assert.assertFalse(curBlock == block1);
326    // Two reference for curBlock: 1. scanner; 2. blockCache.
327    this.assertRefCnt(curBlock, 2);
328    // Reference count of prevBlock must be unchanged because we haven't shipped.
329    this.assertRefCnt(prevBlock, 2);
330
331    // Do the shipped
332    scanner.shipped();
333    Assert.assertEquals(scanner.prevBlocks.size(), 0);
334    Assert.assertNotNull(scanner.curBlock);
335    this.assertRefCnt(curBlock, 2);
336    this.assertRefCnt(prevBlock, 1);
337
338    // Do the close
339    scanner.close();
340    Assert.assertNull(scanner.curBlock);
341    this.assertRefCnt(curBlock, 1);
342    this.assertRefCnt(prevBlock, 1);
343
344    Assert.assertTrue(defaultBC.evictBlocksByHfileName(hfilePath.getName()) >= 2);
345    Assert.assertEquals(0, curBlock.refCnt());
346    Assert.assertEquals(0, prevBlock.refCnt());
347
348    // Reload the block1 again.
349    block1 = reader.getDataBlockIndexReader()
350        .loadDataBlockWithScanInfo(firstCell, null, true, true, false,
351            DataBlockEncoding.NONE, reader).getHFileBlock();
352    // Wait until flushed to IOEngine;
353    waitBucketCacheFlushed(defaultBC);
354    Assert.assertTrue(block1.getBlockType().isData());
355    Assert.assertFalse(block1 instanceof ExclusiveMemHFileBlock);
356    Assert.assertTrue(block1.release());
357    Assert.assertEquals(0, block1.refCnt());
358    // Re-seek to the begin.
359    Assert.assertTrue(scanner.seekTo());
360    curBlock = scanner.curBlock;
361    Assert.assertFalse(curBlock == block1);
362    this.assertRefCnt(curBlock, 2);
363    // Return false because firstCell <= c[0]
364    Assert.assertFalse(scanner.seekBefore(firstCell));
365    // The block1 shouldn't be released because we still don't do the shipped or close.
366    this.assertRefCnt(curBlock, 2);
367
368    scanner.close();
369    this.assertRefCnt(curBlock, 1);
370    Assert.assertTrue(defaultBC.evictBlocksByHfileName(hfilePath.getName()) >= 1);
371    Assert.assertEquals(0, curBlock.refCnt());
372  }
373
374  private void assertRefCnt(HFileBlock block, int value) {
375    if (ioengine.startsWith("offheap") || ioengine.startsWith("pmem")) {
376      Assert.assertEquals(value, block.refCnt());
377    } else {
378      Assert.assertEquals(value - 1, block.refCnt());
379    }
380  }
381
382  @Test
383  public void testDefault() throws Exception {
384    testReleaseBlock(Algorithm.NONE, DataBlockEncoding.NONE);
385  }
386
387  @Test
388  public void testCompression() throws Exception {
389    testReleaseBlock(Algorithm.GZ, DataBlockEncoding.NONE);
390  }
391
392  @Test
393  public void testDataBlockEncoding() throws Exception {
394    testReleaseBlock(Algorithm.NONE, DataBlockEncoding.ROW_INDEX_V1);
395  }
396
397  @Test
398  public void testDataBlockEncodingAndCompression() throws Exception {
399    testReleaseBlock(Algorithm.GZ, DataBlockEncoding.ROW_INDEX_V1);
400  }
401
402  @Test
403  public void testWithLruBlockCache() throws Exception {
404    HFileBlock curBlock;
405    writeHFile(conf, fs, hfilePath, Algorithm.NONE, DataBlockEncoding.NONE, CELL_COUNT);
406    // Set LruBlockCache
407    conf.set(BUCKET_CACHE_IOENGINE_KEY, "");
408    BlockCache defaultBC = BlockCacheFactory.createBlockCache(conf);
409    CacheConfig cacheConfig = new CacheConfig(conf, null, defaultBC, allocator);
410    Assert.assertNotNull(defaultBC);
411    Assert.assertFalse(cacheConfig.isCombinedBlockCache()); // Must be LruBlockCache.
412    HFile.Reader reader = HFile.createReader(fs, hfilePath, cacheConfig, true, conf);
413    Assert.assertTrue(reader instanceof HFileReaderImpl);
414    // We've build a HFile tree with index = 16.
415    Assert.assertEquals(16, reader.getTrailer().getNumDataIndexLevels());
416
417    HFileScannerImpl scanner = (HFileScannerImpl) reader.getScanner(true, true, false);
418    HFileBlock block1 = reader.getDataBlockIndexReader()
419        .loadDataBlockWithScanInfo(firstCell, null, true, true, false,
420            DataBlockEncoding.NONE, reader).getHFileBlock();
421    Assert.assertTrue(block1.getBlockType().isData());
422    Assert.assertTrue(block1 instanceof ExclusiveMemHFileBlock);
423    HFileBlock block2 = reader.getDataBlockIndexReader()
424        .loadDataBlockWithScanInfo(secondCell, null, true, true, false,
425            DataBlockEncoding.NONE, reader).getHFileBlock();
426    Assert.assertTrue(block2.getBlockType().isData());
427    Assert.assertTrue(block2 instanceof ExclusiveMemHFileBlock);
428    // One RPC reference path.
429    Assert.assertEquals(block1.refCnt(), 0);
430    Assert.assertEquals(block2.refCnt(), 0);
431
432    scanner.seekTo(firstCell);
433    curBlock = scanner.curBlock;
434    Assert.assertTrue(curBlock == block1);
435    Assert.assertEquals(curBlock.refCnt(), 0);
436    Assert.assertTrue(scanner.prevBlocks.isEmpty());
437
438    // Switch to next block
439    scanner.seekTo(secondCell);
440    curBlock = scanner.curBlock;
441    Assert.assertTrue(curBlock == block2);
442    Assert.assertEquals(curBlock.refCnt(), 0);
443    Assert.assertEquals(curBlock.retain().refCnt(), 0);
444    // Only pooled HFileBlock will be kept in prevBlocks and ExclusiveMemHFileBlock will never keep
445    // in prevBlocks.
446    Assert.assertTrue(scanner.prevBlocks.isEmpty());
447
448    // close the scanner
449    scanner.close();
450    Assert.assertNull(scanner.curBlock);
451    Assert.assertTrue(scanner.prevBlocks.isEmpty());
452  }
453
454  @Test
455  public void testDisabledBlockCache() throws Exception {
456    writeHFile(conf, fs, hfilePath, Algorithm.NONE, DataBlockEncoding.NONE, CELL_COUNT);
457    // Set LruBlockCache
458    conf.setFloat(HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
459    BlockCache defaultBC = BlockCacheFactory.createBlockCache(conf);
460    Assert.assertNull(defaultBC);
461    CacheConfig cacheConfig = new CacheConfig(conf, null, defaultBC, allocator);
462    Assert.assertFalse(cacheConfig.isCombinedBlockCache()); // Must be LruBlockCache.
463    HFile.Reader reader = HFile.createReader(fs, hfilePath, cacheConfig, true, conf);
464    Assert.assertTrue(reader instanceof HFileReaderImpl);
465    // We've build a HFile tree with index = 16.
466    Assert.assertEquals(16, reader.getTrailer().getNumDataIndexLevels());
467
468    HFileBlock block1 = reader.getDataBlockIndexReader()
469        .loadDataBlockWithScanInfo(firstCell, null, true, true, false,
470            DataBlockEncoding.NONE, reader).getHFileBlock();
471
472    Assert.assertTrue(block1.isSharedMem());
473    Assert.assertTrue(block1 instanceof SharedMemHFileBlock);
474    Assert.assertEquals(1, block1.refCnt());
475    Assert.assertTrue(block1.release());
476  }
477}