001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io;
019
020import static org.apache.hadoop.hbase.io.ByteBuffAllocator.BUFFER_SIZE_KEY;
021import static org.apache.hadoop.hbase.io.hfile.CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY;
022import static org.junit.jupiter.api.Assertions.assertEquals;
023import static org.junit.jupiter.api.Assertions.assertNull;
024import static org.junit.jupiter.api.Assertions.assertTrue;
025
026import java.io.IOException;
027import java.nio.file.Paths;
028import java.util.ArrayList;
029import java.util.List;
030import java.util.concurrent.atomic.AtomicInteger;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FileStatus;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.hbase.Cell;
036import org.apache.hadoop.hbase.CellComparatorImpl;
037import org.apache.hadoop.hbase.CellUtil;
038import org.apache.hadoop.hbase.ExtendedCell;
039import org.apache.hadoop.hbase.HBaseTestingUtil;
040import org.apache.hadoop.hbase.KeyValue;
041import org.apache.hadoop.hbase.KeyValueUtil;
042import org.apache.hadoop.hbase.TableName;
043import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
044import org.apache.hadoop.hbase.client.RegionInfo;
045import org.apache.hadoop.hbase.client.RegionInfoBuilder;
046import org.apache.hadoop.hbase.io.hfile.CacheConfig;
047import org.apache.hadoop.hbase.io.hfile.HFile;
048import org.apache.hadoop.hbase.io.hfile.HFileContext;
049import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
050import org.apache.hadoop.hbase.io.hfile.HFileScanner;
051import org.apache.hadoop.hbase.io.hfile.ReaderContext;
052import org.apache.hadoop.hbase.io.hfile.ReaderContextBuilder;
053import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
054import org.apache.hadoop.hbase.nio.RefCnt;
055import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
056import org.apache.hadoop.hbase.regionserver.StoreContext;
057import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
058import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
059import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
060import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
061import org.apache.hadoop.hbase.testclassification.IOTests;
062import org.apache.hadoop.hbase.testclassification.SmallTests;
063import org.apache.hadoop.hbase.util.Bytes;
064import org.junit.jupiter.api.AfterAll;
065import org.junit.jupiter.api.BeforeAll;
066import org.junit.jupiter.api.Tag;
067import org.junit.jupiter.api.Test;
068import org.slf4j.Logger;
069import org.slf4j.LoggerFactory;
070
071import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetector;
072
073@Tag(IOTests.TAG)
074@Tag(SmallTests.TAG)
075public class TestHalfStoreFileReader {
076
077  private static final Logger LOG = LoggerFactory.getLogger(TestHalfStoreFileReader.class);
078
079  private static HBaseTestingUtil TEST_UTIL;
080
081  @BeforeAll
082  public static void setupBeforeClass() throws Exception {
083    TEST_UTIL = new HBaseTestingUtil();
084  }
085
086  @AfterAll
087  public static void tearDownAfterClass() throws Exception {
088    TEST_UTIL.cleanupTestDir();
089  }
090
091  /**
092   * Test the scanner and reseek of a half hfile scanner. The scanner API demands that seekTo and
093   * reseekTo() only return < 0 if the key lies before the start of the file (with no position on
094   * the scanner). Returning 0 if perfect match (rare), and return > 1 if we got an imperfect match.
095   * The latter case being the most common, we should generally be returning 1, and if we do, there
096   * may or may not be a 'next' in the scanner/file. A bug in the half file scanner was returning -1
097   * at the end of the bottom half, and that was causing the infrastructure above to go null causing
098   * NPEs and other problems. This test reproduces that failure, and also tests both the bottom and
099   * top of the file while we are at it.
100   */
101  @Test
102  public void testHalfScanAndReseek() throws Exception {
103    ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);
104    Configuration conf = TEST_UTIL.getConfiguration();
105    FileSystem fs = FileSystem.get(conf);
106    String root_dir = TEST_UTIL.getDataTestDir().toString();
107    Path parentPath = new Path(new Path(root_dir, "parent"), "CF");
108    fs.mkdirs(parentPath);
109    String tableName = Paths.get(root_dir).getFileName().toString();
110    RegionInfo splitAHri = RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build();
111    Thread.sleep(1000);
112    RegionInfo splitBHri = RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build();
113    Path splitAPath = new Path(new Path(root_dir, splitAHri.getRegionNameAsString()), "CF");
114    Path splitBPath = new Path(new Path(root_dir, splitBHri.getRegionNameAsString()), "CF");
115    Path filePath = StoreFileWriter.getUniqueFile(fs, parentPath);
116    String ioEngineName = "file:" + TEST_UTIL.getDataTestDir() + "/bucketNoRecycler.cache";
117    BucketCache bucketCache = new BucketCache(ioEngineName, 32 * 1024 * 1024, 1024,
118      new int[] { 4 * 1024, 8 * 1024, 64 * 1024, 96 * 1024 }, 1, 1, null);
119    conf.setBoolean(CACHE_BLOCKS_ON_WRITE_KEY, true);
120    conf.setInt(BUFFER_SIZE_KEY, 1024);
121    ByteBuffAllocator allocator = ByteBuffAllocator.create(conf, true);
122
123    final AtomicInteger counter = new AtomicInteger();
124    RefCnt.detector.setLeakListener(new ResourceLeakDetector.LeakListener() {
125      @Override
126      public void onLeak(String s, String s1) {
127        counter.incrementAndGet();
128      }
129    });
130
131    ColumnFamilyDescriptorBuilder cfBuilder =
132      ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("CF"));
133    CacheConfig cacheConf = new CacheConfig(conf, cfBuilder.build(), bucketCache, allocator);
134
135    HFileContext meta = new HFileContextBuilder().withBlockSize(1024).build();
136    HFile.Writer w =
137      HFile.getWriterFactory(conf, cacheConf).withPath(fs, filePath).withFileContext(meta).create();
138
139    // write some things.
140    List<KeyValue> items = genSomeKeys();
141    for (KeyValue kv : items) {
142      w.append(kv);
143    }
144    w.close();
145
146    HFile.Reader r = HFile.createReader(fs, filePath, cacheConf, true, conf);
147    Cell midKV = r.midKey().get();
148    byte[] midkey = CellUtil.cloneRow(midKV);
149
150    Path splitFileA = new Path(splitAPath, filePath.getName() + ".parent");
151    Path splitFileB = new Path(splitBPath, filePath.getName() + ".parent");
152
153    HRegionFileSystem splitAregionFS =
154      HRegionFileSystem.create(conf, fs, new Path(root_dir), splitAHri);
155    StoreContext splitAStoreContext =
156      StoreContext.getBuilder().withColumnFamilyDescriptor(ColumnFamilyDescriptorBuilder.of("CF"))
157        .withFamilyStoreDirectoryPath(splitAPath).withRegionFileSystem(splitAregionFS).build();
158    StoreFileTracker splitAsft = StoreFileTrackerFactory.create(conf, false, splitAStoreContext);
159    Reference bottom = new Reference(midkey, Reference.Range.bottom);
160    splitAsft.createReference(bottom, splitFileA);
161    doTestOfScanAndReseek(splitFileA, fs, bottom, cacheConf);
162
163    HRegionFileSystem splitBregionFS =
164      HRegionFileSystem.create(conf, fs, new Path(root_dir), splitBHri);
165    StoreContext splitBStoreContext =
166      StoreContext.getBuilder().withColumnFamilyDescriptor(ColumnFamilyDescriptorBuilder.of("CF"))
167        .withFamilyStoreDirectoryPath(splitBPath).withRegionFileSystem(splitBregionFS).build();
168    StoreFileTracker splitBsft = StoreFileTrackerFactory.create(conf, false, splitBStoreContext);
169    Reference top = new Reference(midkey, Reference.Range.top);
170    splitBsft.createReference(top, splitFileB);
171    doTestOfScanAndReseek(splitFileB, fs, top, cacheConf);
172
173    r.close();
174
175    assertEquals(0, counter.get());
176  }
177
178  private void doTestOfScanAndReseek(Path p, FileSystem fs, Reference bottom, CacheConfig cacheConf)
179    throws Exception {
180    Path referencePath = StoreFileInfo.getReferredToFile(p);
181    FSDataInputStreamWrapper in = new FSDataInputStreamWrapper(fs, referencePath, false, 0);
182    FileStatus status = fs.getFileStatus(referencePath);
183    long length = status.getLen();
184    ReaderContextBuilder contextBuilder =
185      new ReaderContextBuilder().withInputStreamWrapper(in).withFileSize(length)
186        .withReaderType(ReaderContext.ReaderType.PREAD).withFileSystem(fs).withFilePath(p);
187    ReaderContext context = contextBuilder.build();
188    StoreFileInfo storeFileInfo =
189      new StoreFileInfo(TEST_UTIL.getConfiguration(), fs, fs.getFileStatus(p), bottom);
190    storeFileInfo.initHFileInfo(context);
191    final HalfStoreFileReader halfreader =
192      (HalfStoreFileReader) storeFileInfo.createReader(context, cacheConf);
193    storeFileInfo.getHFileInfo().initMetaAndIndex(halfreader.getHFileReader());
194    halfreader.loadFileInfo();
195    try (HFileScanner scanner = halfreader.getScanner(false, false, false)) {
196
197      scanner.seekTo();
198      Cell curr;
199      do {
200        curr = scanner.getCell();
201        KeyValue reseekKv = getLastOnCol(curr);
202        int ret = scanner.reseekTo(reseekKv);
203        assertTrue(ret > 0, "reseek to returned: " + ret);
204      } while (scanner.next());
205
206      int ret = scanner.reseekTo(getLastOnCol(curr));
207      assertTrue(ret > 0);
208    }
209
210    halfreader.close(true);
211
212    System.gc();
213    Thread.sleep(1000);
214  }
215
216  // Tests the scanner on an HFile that is backed by HalfStoreFiles
217  @Test
218  public void testHalfScanner() throws IOException {
219    String root_dir = TEST_UTIL.getDataTestDir().toString();
220    Path p = new Path(root_dir, "test");
221    Configuration conf = TEST_UTIL.getConfiguration();
222    FileSystem fs = FileSystem.get(conf);
223    CacheConfig cacheConf = new CacheConfig(conf);
224    HFileContext meta = new HFileContextBuilder().withBlockSize(1024).build();
225    HFile.Writer w =
226      HFile.getWriterFactory(conf, cacheConf).withPath(fs, p).withFileContext(meta).create();
227
228    // write some things.
229    List<KeyValue> items = genSomeKeys();
230    for (KeyValue kv : items) {
231      w.append(kv);
232    }
233    w.close();
234
235    HFile.Reader r = HFile.createReader(fs, p, cacheConf, true, conf);
236    ExtendedCell midKV = r.midKey().get();
237    byte[] midkey = CellUtil.cloneRow(midKV);
238
239    Reference bottom = new Reference(midkey, Reference.Range.bottom);
240    Reference top = new Reference(midkey, Reference.Range.top);
241
242    // Ugly code to get the item before the midkey
243    KeyValue beforeMidKey = null;
244    for (KeyValue item : items) {
245      if (CellComparatorImpl.COMPARATOR.compare(item, midKV) >= 0) {
246        break;
247      }
248      beforeMidKey = item;
249    }
250    LOG.info("midkey: " + midKV + " or: " + Bytes.toStringBinary(midkey));
251    LOG.info("beforeMidKey: " + beforeMidKey);
252
253    // Seek on the splitKey, should be in top, not in bottom
254    Cell foundKeyValue = doTestOfSeekBefore(p, fs, bottom, midKV, cacheConf);
255    assertEquals(beforeMidKey, foundKeyValue);
256
257    // Seek tot the last thing should be the penultimate on the top, the one before the midkey on
258    // the bottom.
259    foundKeyValue = doTestOfSeekBefore(p, fs, top, items.get(items.size() - 1), cacheConf);
260    assertEquals(items.get(items.size() - 2), foundKeyValue);
261
262    foundKeyValue = doTestOfSeekBefore(p, fs, bottom, items.get(items.size() - 1), cacheConf);
263    assertEquals(beforeMidKey, foundKeyValue);
264
265    // Try and seek before something that is in the bottom.
266    foundKeyValue = doTestOfSeekBefore(p, fs, top, items.get(0), cacheConf);
267    assertNull(foundKeyValue);
268
269    // Try and seek before the first thing.
270    foundKeyValue = doTestOfSeekBefore(p, fs, bottom, items.get(0), cacheConf);
271    assertNull(foundKeyValue);
272
273    // Try and seek before the second thing in the top and bottom.
274    foundKeyValue = doTestOfSeekBefore(p, fs, top, items.get(1), cacheConf);
275    assertNull(foundKeyValue);
276
277    foundKeyValue = doTestOfSeekBefore(p, fs, bottom, items.get(1), cacheConf);
278    assertEquals(items.get(0), foundKeyValue);
279
280    // Try to seek before the splitKey in the top file
281    foundKeyValue = doTestOfSeekBefore(p, fs, top, midKV, cacheConf);
282    assertNull(foundKeyValue);
283  }
284
285  private Cell doTestOfSeekBefore(Path p, FileSystem fs, Reference bottom, ExtendedCell seekBefore,
286    CacheConfig cacheConfig) throws IOException {
287    ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, p).build();
288    StoreFileInfo storeFileInfo =
289      new StoreFileInfo(TEST_UTIL.getConfiguration(), fs, fs.getFileStatus(p), bottom);
290    storeFileInfo.initHFileInfo(context);
291    final HalfStoreFileReader halfreader =
292      (HalfStoreFileReader) storeFileInfo.createReader(context, cacheConfig);
293    storeFileInfo.getHFileInfo().initMetaAndIndex(halfreader.getHFileReader());
294    halfreader.loadFileInfo();
295    try (HFileScanner scanner = halfreader.getScanner(false, false, false)) {
296      scanner.seekBefore(seekBefore);
297      if (scanner.getCell() != null) {
298        return KeyValueUtil.copyToNewKeyValue(scanner.getCell());
299      } else {
300        return null;
301      }
302    }
303  }
304
305  private KeyValue getLastOnCol(Cell curr) {
306    return KeyValueUtil.createLastOnRow(curr.getRowArray(), curr.getRowOffset(),
307      curr.getRowLength(), curr.getFamilyArray(), curr.getFamilyOffset(), curr.getFamilyLength(),
308      curr.getQualifierArray(), curr.getQualifierOffset(), curr.getQualifierLength());
309  }
310
311  static final int SIZE = 1000;
312
313  static byte[] _b(String s) {
314    return Bytes.toBytes(s);
315  }
316
317  List<KeyValue> genSomeKeys() {
318    List<KeyValue> ret = new ArrayList<>(SIZE);
319    for (int i = 0; i < SIZE; i++) {
320      KeyValue kv =
321        new KeyValue(_b(String.format("row_%04d", i)), _b("family"), _b("qualifier"), 1000, // timestamp
322          _b("value"));
323      ret.add(kv);
324    }
325    return ret;
326  }
327}