001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.mockito.Mockito.mock;
021import static org.mockito.Mockito.when;
022
023import java.io.IOException;
024import java.nio.ByteBuffer;
025import java.util.ArrayList;
026import java.util.Arrays;
027import java.util.Collections;
028import java.util.Comparator;
029import java.util.List;
030import java.util.Map;
031import java.util.OptionalLong;
032import java.util.TreeSet;
033import java.util.concurrent.atomic.AtomicInteger;
034
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.fs.FileSystem;
037import org.apache.hadoop.fs.Path;
038import org.apache.hadoop.hbase.Cell;
039import org.apache.hadoop.hbase.CellUtil;
040import org.apache.hadoop.hbase.HBaseClassTestRule;
041import org.apache.hadoop.hbase.HBaseTestCase;
042import org.apache.hadoop.hbase.HBaseTestingUtility;
043import org.apache.hadoop.hbase.HConstants;
044import org.apache.hadoop.hbase.HRegionInfo;
045import org.apache.hadoop.hbase.KeyValue;
046import org.apache.hadoop.hbase.KeyValueUtil;
047import org.apache.hadoop.hbase.PrivateCellUtil;
048import org.apache.hadoop.hbase.TableName;
049import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
050import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
051import org.apache.hadoop.hbase.client.RegionInfo;
052import org.apache.hadoop.hbase.client.RegionInfoBuilder;
053import org.apache.hadoop.hbase.client.Scan;
054import org.apache.hadoop.hbase.io.HFileLink;
055import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
056import org.apache.hadoop.hbase.io.hfile.BlockCache;
057import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
058import org.apache.hadoop.hbase.io.hfile.CacheConfig;
059import org.apache.hadoop.hbase.io.hfile.CacheStats;
060import org.apache.hadoop.hbase.io.hfile.HFileContext;
061import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
062import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
063import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
064import org.apache.hadoop.hbase.io.hfile.HFileScanner;
065import org.apache.hadoop.hbase.testclassification.RegionServerTests;
066import org.apache.hadoop.hbase.testclassification.SmallTests;
067import org.apache.hadoop.hbase.util.BloomFilterFactory;
068import org.apache.hadoop.hbase.util.Bytes;
069import org.apache.hadoop.hbase.util.ChecksumType;
070import org.apache.hadoop.hbase.util.FSUtils;
071import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
072import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
073import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
074import org.junit.After;
075import org.junit.Before;
076import org.junit.ClassRule;
077import org.junit.Test;
078import org.junit.experimental.categories.Category;
079import org.mockito.Mockito;
080import org.slf4j.Logger;
081import org.slf4j.LoggerFactory;
082
083/**
084 * Test HStoreFile
085 */
086@Category({RegionServerTests.class, SmallTests.class})
087public class TestHStoreFile extends HBaseTestCase {
088
089  @ClassRule
090  public static final HBaseClassTestRule CLASS_RULE =
091      HBaseClassTestRule.forClass(TestHStoreFile.class);
092
093  private static final Logger LOG = LoggerFactory.getLogger(TestHStoreFile.class);
094  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
095  private CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration());
096  private static String ROOT_DIR = TEST_UTIL.getDataTestDir("TestStoreFile").toString();
097  private static final ChecksumType CKTYPE = ChecksumType.CRC32C;
098  private static final int CKBYTES = 512;
099  private static String TEST_FAMILY = "cf";
100
101  @Override
102  @Before
103  public void setUp() throws Exception {
104    super.setUp();
105  }
106
107  @Override
108  @After
109  public void tearDown() throws Exception {
110    super.tearDown();
111  }
112
113  /**
114   * Write a file and then assert that we can read from top and bottom halves
115   * using two HalfMapFiles.
116   * @throws Exception
117   */
118  @Test
119  public void testBasicHalfMapFile() throws Exception {
120    final HRegionInfo hri =
121        new HRegionInfo(TableName.valueOf("testBasicHalfMapFileTb"));
122    HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(
123      conf, fs, new Path(testDir, hri.getTable().getNameAsString()), hri);
124
125    HFileContext meta = new HFileContextBuilder().withBlockSize(2*1024).build();
126    StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs)
127            .withFilePath(regionFs.createTempName())
128            .withFileContext(meta)
129            .build();
130    writeStoreFile(writer);
131
132    Path sfPath = regionFs.commitStoreFile(TEST_FAMILY, writer.getPath());
133    HStoreFile sf = new HStoreFile(this.fs, sfPath, conf, cacheConf, BloomType.NONE, true);
134    checkHalfHFile(regionFs, sf);
135  }
136
137  private void writeStoreFile(final StoreFileWriter writer) throws IOException {
138    writeStoreFile(writer, Bytes.toBytes(getName()), Bytes.toBytes(getName()));
139  }
140
141  // pick an split point (roughly halfway)
142  byte[] SPLITKEY = new byte[] { (LAST_CHAR + FIRST_CHAR)/2, FIRST_CHAR};
143
144  /*
145   * Writes HStoreKey and ImmutableBytes data to passed writer and
146   * then closes it.
147   * @param writer
148   * @throws IOException
149   */
150  public static void writeStoreFile(final StoreFileWriter writer, byte[] fam, byte[] qualifier)
151  throws IOException {
152    long now = System.currentTimeMillis();
153    try {
154      for (char d = FIRST_CHAR; d <= LAST_CHAR; d++) {
155        for (char e = FIRST_CHAR; e <= LAST_CHAR; e++) {
156          byte[] b = new byte[] { (byte) d, (byte) e };
157          writer.append(new KeyValue(b, fam, qualifier, now, b));
158        }
159      }
160    } finally {
161      writer.close();
162    }
163  }
164
165  /**
166   * Test that our mechanism of writing store files in one region to reference
167   * store files in other regions works.
168   * @throws IOException
169   */
170  @Test
171  public void testReference() throws IOException {
172    final HRegionInfo hri = new HRegionInfo(TableName.valueOf("testReferenceTb"));
173    HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(
174      conf, fs, new Path(testDir, hri.getTable().getNameAsString()), hri);
175
176    HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
177    // Make a store file and write data to it.
178    StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs)
179            .withFilePath(regionFs.createTempName())
180            .withFileContext(meta)
181            .build();
182    writeStoreFile(writer);
183
184    Path hsfPath = regionFs.commitStoreFile(TEST_FAMILY, writer.getPath());
185    HStoreFile hsf = new HStoreFile(this.fs, hsfPath, conf, cacheConf, BloomType.NONE, true);
186    hsf.initReader();
187    StoreFileReader reader = hsf.getReader();
188    // Split on a row, not in middle of row.  Midkey returned by reader
189    // may be in middle of row.  Create new one with empty column and
190    // timestamp.
191    byte [] midRow = CellUtil.cloneRow(reader.midKey().get());
192    byte [] finalRow = CellUtil.cloneRow(reader.getLastKey().get());
193    hsf.closeStoreFile(true);
194
195    // Make a reference
196    HRegionInfo splitHri = new HRegionInfo(hri.getTable(), null, midRow);
197    Path refPath = splitStoreFile(regionFs, splitHri, TEST_FAMILY, hsf, midRow, true);
198    HStoreFile refHsf = new HStoreFile(this.fs, refPath, conf, cacheConf, BloomType.NONE, true);
199    refHsf.initReader();
200    // Now confirm that I can read from the reference and that it only gets
201    // keys from top half of the file.
202    HFileScanner s = refHsf.getReader().getScanner(false, false);
203    Cell kv = null;
204    for (boolean first = true; (!s.isSeeked() && s.seekTo()) || s.next();) {
205      ByteBuffer bb = ByteBuffer.wrap(((KeyValue) s.getKey()).getKey());
206      kv = KeyValueUtil.createKeyValueFromKey(bb);
207      if (first) {
208        assertTrue(Bytes.equals(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(), midRow, 0,
209          midRow.length));
210        first = false;
211      }
212    }
213    assertTrue(Bytes.equals(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(), finalRow, 0,
214      finalRow.length));
215  }
216
217  @Test
218  public void testStoreFileReference() throws Exception {
219    final RegionInfo hri =
220        RegionInfoBuilder.newBuilder(TableName.valueOf("testStoreFileReference")).build();
221    HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(conf, fs,
222      new Path(testDir, hri.getTable().getNameAsString()), hri);
223    HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
224
225    // Make a store file and write data to it.
226    StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs)
227        .withFilePath(regionFs.createTempName()).withFileContext(meta).build();
228    writeStoreFile(writer);
229    Path hsfPath = regionFs.commitStoreFile(TEST_FAMILY, writer.getPath());
230    writer.close();
231
232    HStoreFile file = new HStoreFile(this.fs, hsfPath, conf, cacheConf, BloomType.NONE, true);
233    file.initReader();
234    StoreFileReader r = file.getReader();
235    assertNotNull(r);
236    StoreFileScanner scanner =
237        new StoreFileScanner(r, mock(HFileScanner.class), false, false, 0, 0, false);
238
239    // Verify after instantiating scanner refCount is increased
240    assertTrue("Verify file is being referenced", file.isReferencedInReads());
241    scanner.close();
242    // Verify after closing scanner refCount is decreased
243    assertFalse("Verify file is not being referenced", file.isReferencedInReads());
244  }
245
246  @Test
247  public void testEmptyStoreFileRestrictKeyRanges() throws Exception {
248    StoreFileReader reader = mock(StoreFileReader.class);
249    HStore store = mock(HStore.class);
250    byte[] cf = Bytes.toBytes("ty");
251    ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder.of(cf);
252    when(store.getColumnFamilyDescriptor()).thenReturn(cfd);
253    StoreFileScanner scanner =
254        new StoreFileScanner(reader, mock(HFileScanner.class), false, false, 0, 0, true);
255    Scan scan = new Scan();
256    scan.setColumnFamilyTimeRange(cf, 0, 1);
257    assertFalse(scanner.shouldUseScanner(scan, store, 0));
258  }
259
260  @Test
261  public void testHFileLink() throws IOException {
262    final HRegionInfo hri = new HRegionInfo(TableName.valueOf("testHFileLinkTb"));
263    // force temp data in hbase/target/test-data instead of /tmp/hbase-xxxx/
264    Configuration testConf = new Configuration(this.conf);
265    FSUtils.setRootDir(testConf, testDir);
266    HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(
267      testConf, fs, FSUtils.getTableDir(testDir, hri.getTable()), hri);
268    HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
269
270    // Make a store file and write data to it.
271    StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs)
272            .withFilePath(regionFs.createTempName())
273            .withFileContext(meta)
274            .build();
275    writeStoreFile(writer);
276
277    Path storeFilePath = regionFs.commitStoreFile(TEST_FAMILY, writer.getPath());
278    Path dstPath = new Path(regionFs.getTableDir(), new Path("test-region", TEST_FAMILY));
279    HFileLink.create(testConf, this.fs, dstPath, hri, storeFilePath.getName());
280    Path linkFilePath = new Path(dstPath,
281                  HFileLink.createHFileLinkName(hri, storeFilePath.getName()));
282
283    // Try to open store file from link
284    StoreFileInfo storeFileInfo = new StoreFileInfo(testConf, this.fs, linkFilePath);
285    HStoreFile hsf =
286        new HStoreFile(this.fs, storeFileInfo, testConf, cacheConf, BloomType.NONE, true);
287    assertTrue(storeFileInfo.isLink());
288    hsf.initReader();
289
290    // Now confirm that I can read from the link
291    int count = 1;
292    HFileScanner s = hsf.getReader().getScanner(false, false);
293    s.seekTo();
294    while (s.next()) {
295      count++;
296    }
297    assertEquals((LAST_CHAR - FIRST_CHAR + 1) * (LAST_CHAR - FIRST_CHAR + 1), count);
298  }
299
300  /**
301   * This test creates an hfile and then the dir structures and files to verify that references
302   * to hfilelinks (created by snapshot clones) can be properly interpreted.
303   */
304  @Test
305  public void testReferenceToHFileLink() throws IOException {
306    // force temp data in hbase/target/test-data instead of /tmp/hbase-xxxx/
307    Configuration testConf = new Configuration(this.conf);
308    FSUtils.setRootDir(testConf, testDir);
309
310    // adding legal table name chars to verify regex handles it.
311    HRegionInfo hri = new HRegionInfo(TableName.valueOf("_original-evil-name"));
312    HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(
313      testConf, fs, FSUtils.getTableDir(testDir, hri.getTable()), hri);
314
315    HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
316    // Make a store file and write data to it. <root>/<tablename>/<rgn>/<cf>/<file>
317    StoreFileWriter writer = new StoreFileWriter.Builder(testConf, cacheConf, this.fs)
318            .withFilePath(regionFs.createTempName())
319            .withFileContext(meta)
320            .build();
321    writeStoreFile(writer);
322    Path storeFilePath = regionFs.commitStoreFile(TEST_FAMILY, writer.getPath());
323
324    // create link to store file. <root>/clone/region/<cf>/<hfile>-<region>-<table>
325    HRegionInfo hriClone = new HRegionInfo(TableName.valueOf("clone"));
326    HRegionFileSystem cloneRegionFs = HRegionFileSystem.createRegionOnFileSystem(
327      testConf, fs, FSUtils.getTableDir(testDir, hri.getTable()),
328        hriClone);
329    Path dstPath = cloneRegionFs.getStoreDir(TEST_FAMILY);
330    HFileLink.create(testConf, this.fs, dstPath, hri, storeFilePath.getName());
331    Path linkFilePath = new Path(dstPath,
332                  HFileLink.createHFileLinkName(hri, storeFilePath.getName()));
333
334    // create splits of the link.
335    // <root>/clone/splitA/<cf>/<reftohfilelink>,
336    // <root>/clone/splitB/<cf>/<reftohfilelink>
337    HRegionInfo splitHriA = new HRegionInfo(hri.getTable(), null, SPLITKEY);
338    HRegionInfo splitHriB = new HRegionInfo(hri.getTable(), SPLITKEY, null);
339    HStoreFile f = new HStoreFile(fs, linkFilePath, testConf, cacheConf, BloomType.NONE, true);
340    f.initReader();
341    Path pathA = splitStoreFile(cloneRegionFs, splitHriA, TEST_FAMILY, f, SPLITKEY, true); // top
342    Path pathB = splitStoreFile(cloneRegionFs, splitHriB, TEST_FAMILY, f, SPLITKEY, false);// bottom
343    f.closeStoreFile(true);
344    // OK test the thing
345    FSUtils.logFileSystemState(fs, testDir, LOG);
346
347    // There is a case where a file with the hfilelink pattern is actually a daughter
348    // reference to a hfile link.  This code in StoreFile that handles this case.
349
350    // Try to open store file from link
351    HStoreFile hsfA = new HStoreFile(this.fs, pathA, testConf, cacheConf, BloomType.NONE, true);
352    hsfA.initReader();
353
354    // Now confirm that I can read from the ref to link
355    int count = 1;
356    HFileScanner s = hsfA.getReader().getScanner(false, false);
357    s.seekTo();
358    while (s.next()) {
359      count++;
360    }
361    assertTrue(count > 0); // read some rows here
362
363    // Try to open store file from link
364    HStoreFile hsfB = new HStoreFile(this.fs, pathB, testConf, cacheConf, BloomType.NONE, true);
365    hsfB.initReader();
366
367    // Now confirm that I can read from the ref to link
368    HFileScanner sB = hsfB.getReader().getScanner(false, false);
369    sB.seekTo();
370
371    //count++ as seekTo() will advance the scanner
372    count++;
373    while (sB.next()) {
374      count++;
375    }
376
377    // read the rest of the rows
378    assertEquals((LAST_CHAR - FIRST_CHAR + 1) * (LAST_CHAR - FIRST_CHAR + 1), count);
379  }
380
381  private void checkHalfHFile(final HRegionFileSystem regionFs, final HStoreFile f)
382      throws IOException {
383    f.initReader();
384    Cell midkey = f.getReader().midKey().get();
385    KeyValue midKV = (KeyValue)midkey;
386    byte [] midRow = CellUtil.cloneRow(midKV);
387    // Create top split.
388    HRegionInfo topHri = new HRegionInfo(regionFs.getRegionInfo().getTable(),
389        null, midRow);
390    Path topPath = splitStoreFile(regionFs, topHri, TEST_FAMILY, f, midRow, true);
391    // Create bottom split.
392    HRegionInfo bottomHri = new HRegionInfo(regionFs.getRegionInfo().getTable(),
393        midRow, null);
394    Path bottomPath = splitStoreFile(regionFs, bottomHri, TEST_FAMILY, f, midRow, false);
395    // Make readers on top and bottom.
396    HStoreFile topF = new HStoreFile(this.fs, topPath, conf, cacheConf, BloomType.NONE, true);
397    topF.initReader();
398    StoreFileReader top = topF.getReader();
399    HStoreFile bottomF = new HStoreFile(this.fs, bottomPath, conf, cacheConf, BloomType.NONE, true);
400    bottomF.initReader();
401    StoreFileReader bottom = bottomF.getReader();
402    ByteBuffer previous = null;
403    LOG.info("Midkey: " + midKV.toString());
404    ByteBuffer bbMidkeyBytes = ByteBuffer.wrap(midKV.getKey());
405    try {
406      // Now make two HalfMapFiles and assert they can read the full backing
407      // file, one from the top and the other from the bottom.
408      // Test bottom half first.
409      // Now test reading from the top.
410      boolean first = true;
411      ByteBuffer key = null;
412      HFileScanner topScanner = top.getScanner(false, false);
413      while ((!topScanner.isSeeked() && topScanner.seekTo()) ||
414             (topScanner.isSeeked() && topScanner.next())) {
415        key = ByteBuffer.wrap(((KeyValue) topScanner.getKey()).getKey());
416
417        if ((PrivateCellUtil.compare(topScanner.getReader().getComparator(), midKV, key.array(),
418          key.arrayOffset(), key.limit())) > 0) {
419          fail("key=" + Bytes.toStringBinary(key) + " < midkey=" +
420              midkey);
421        }
422        if (first) {
423          first = false;
424          LOG.info("First in top: " + Bytes.toString(Bytes.toBytes(key)));
425        }
426      }
427      LOG.info("Last in top: " + Bytes.toString(Bytes.toBytes(key)));
428
429      first = true;
430      HFileScanner bottomScanner = bottom.getScanner(false, false);
431      while ((!bottomScanner.isSeeked() && bottomScanner.seekTo()) ||
432          bottomScanner.next()) {
433        previous = ByteBuffer.wrap(((KeyValue) bottomScanner.getKey()).getKey());
434        key = ByteBuffer.wrap(((KeyValue) bottomScanner.getKey()).getKey());
435        if (first) {
436          first = false;
437          LOG.info("First in bottom: " +
438            Bytes.toString(Bytes.toBytes(previous)));
439        }
440        assertTrue(key.compareTo(bbMidkeyBytes) < 0);
441      }
442      if (previous != null) {
443        LOG.info("Last in bottom: " + Bytes.toString(Bytes.toBytes(previous)));
444      }
445      // Remove references.
446      regionFs.cleanupDaughterRegion(topHri);
447      regionFs.cleanupDaughterRegion(bottomHri);
448
449      // Next test using a midkey that does not exist in the file.
450      // First, do a key that is < than first key. Ensure splits behave
451      // properly.
452      byte [] badmidkey = Bytes.toBytes("  .");
453      assertTrue(fs.exists(f.getPath()));
454      topPath = splitStoreFile(regionFs, topHri, TEST_FAMILY, f, badmidkey, true);
455      bottomPath = splitStoreFile(regionFs, bottomHri, TEST_FAMILY, f, badmidkey, false);
456
457      assertNull(bottomPath);
458
459      topF = new HStoreFile(this.fs, topPath, conf, cacheConf, BloomType.NONE, true);
460      topF.initReader();
461      top = topF.getReader();
462      // Now read from the top.
463      first = true;
464      topScanner = top.getScanner(false, false);
465      KeyValue.KeyOnlyKeyValue keyOnlyKV = new KeyValue.KeyOnlyKeyValue();
466      while ((!topScanner.isSeeked() && topScanner.seekTo()) ||
467          topScanner.next()) {
468        key = ByteBuffer.wrap(((KeyValue) topScanner.getKey()).getKey());
469        keyOnlyKV.setKey(key.array(), 0 + key.arrayOffset(), key.limit());
470        assertTrue(PrivateCellUtil.compare(topScanner.getReader().getComparator(), keyOnlyKV,
471          badmidkey, 0, badmidkey.length) >= 0);
472        if (first) {
473          first = false;
474          KeyValue keyKV = KeyValueUtil.createKeyValueFromKey(key);
475          LOG.info("First top when key < bottom: " + keyKV);
476          String tmp =
477              Bytes.toString(keyKV.getRowArray(), keyKV.getRowOffset(), keyKV.getRowLength());
478          for (int i = 0; i < tmp.length(); i++) {
479            assertTrue(tmp.charAt(i) == 'a');
480          }
481        }
482      }
483      KeyValue keyKV = KeyValueUtil.createKeyValueFromKey(key);
484      LOG.info("Last top when key < bottom: " + keyKV);
485      String tmp = Bytes.toString(keyKV.getRowArray(), keyKV.getRowOffset(), keyKV.getRowLength());
486      for (int i = 0; i < tmp.length(); i++) {
487        assertTrue(tmp.charAt(i) == 'z');
488      }
489      // Remove references.
490      regionFs.cleanupDaughterRegion(topHri);
491      regionFs.cleanupDaughterRegion(bottomHri);
492
493      // Test when badkey is > than last key in file ('||' > 'zz').
494      badmidkey = Bytes.toBytes("|||");
495      topPath = splitStoreFile(regionFs,topHri, TEST_FAMILY, f, badmidkey, true);
496      bottomPath = splitStoreFile(regionFs, bottomHri, TEST_FAMILY, f, badmidkey, false);
497      assertNull(topPath);
498
499      bottomF = new HStoreFile(this.fs, bottomPath, conf, cacheConf, BloomType.NONE, true);
500      bottomF.initReader();
501      bottom = bottomF.getReader();
502      first = true;
503      bottomScanner = bottom.getScanner(false, false);
504      while ((!bottomScanner.isSeeked() && bottomScanner.seekTo()) ||
505          bottomScanner.next()) {
506        key = ByteBuffer.wrap(((KeyValue) bottomScanner.getKey()).getKey());
507        if (first) {
508          first = false;
509          keyKV = KeyValueUtil.createKeyValueFromKey(key);
510          LOG.info("First bottom when key > top: " + keyKV);
511          tmp = Bytes.toString(keyKV.getRowArray(), keyKV.getRowOffset(), keyKV.getRowLength());
512          for (int i = 0; i < tmp.length(); i++) {
513            assertTrue(tmp.charAt(i) == 'a');
514          }
515        }
516      }
517      keyKV = KeyValueUtil.createKeyValueFromKey(key);
518      LOG.info("Last bottom when key > top: " + keyKV);
519      for (int i = 0; i < tmp.length(); i++) {
520        assertTrue(Bytes.toString(keyKV.getRowArray(), keyKV.getRowOffset(), keyKV.getRowLength())
521            .charAt(i) == 'z');
522      }
523    } finally {
524      if (top != null) {
525        top.close(true); // evict since we are about to delete the file
526      }
527      if (bottom != null) {
528        bottom.close(true); // evict since we are about to delete the file
529      }
530      fs.delete(f.getPath(), true);
531    }
532  }
533
534  private static StoreFileScanner getStoreFileScanner(StoreFileReader reader, boolean cacheBlocks,
535      boolean pread) {
536    return reader.getStoreFileScanner(cacheBlocks, pread, false, 0, 0, false);
537  }
538
539  private static final String localFormatter = "%010d";
540
541  private void bloomWriteRead(StoreFileWriter writer, FileSystem fs) throws Exception {
542    float err = conf.getFloat(BloomFilterFactory.IO_STOREFILE_BLOOM_ERROR_RATE, 0);
543    Path f = writer.getPath();
544    long now = System.currentTimeMillis();
545    for (int i = 0; i < 2000; i += 2) {
546      String row = String.format(localFormatter, i);
547      KeyValue kv = new KeyValue(Bytes.toBytes(row), Bytes.toBytes("family"),
548        Bytes.toBytes("col"), now, Bytes.toBytes("value"));
549      writer.append(kv);
550    }
551    writer.close();
552
553    StoreFileReader reader =
554        new StoreFileReader(fs, f, cacheConf, true, new AtomicInteger(0), true, conf);
555    reader.loadFileInfo();
556    reader.loadBloomfilter();
557    StoreFileScanner scanner = getStoreFileScanner(reader, false, false);
558
559    // check false positives rate
560    int falsePos = 0;
561    int falseNeg = 0;
562    for (int i = 0; i < 2000; i++) {
563      String row = String.format(localFormatter, i);
564      TreeSet<byte[]> columns = new TreeSet<>(Bytes.BYTES_COMPARATOR);
565      columns.add(Bytes.toBytes("family:col"));
566
567      Scan scan = new Scan(Bytes.toBytes(row),Bytes.toBytes(row));
568      scan.addColumn(Bytes.toBytes("family"), Bytes.toBytes("family:col"));
569      HStore store = mock(HStore.class);
570      when(store.getColumnFamilyDescriptor())
571          .thenReturn(ColumnFamilyDescriptorBuilder.of("family"));
572      boolean exists = scanner.shouldUseScanner(scan, store, Long.MIN_VALUE);
573      if (i % 2 == 0) {
574        if (!exists) falseNeg++;
575      } else {
576        if (exists) falsePos++;
577      }
578    }
579    reader.close(true); // evict because we are about to delete the file
580    fs.delete(f, true);
581    assertEquals("False negatives: " + falseNeg, 0, falseNeg);
582    int maxFalsePos = (int) (2 * 2000 * err);
583    assertTrue("Too many false positives: " + falsePos + " (err=" + err + ", expected no more than "
584            + maxFalsePos + ")", falsePos <= maxFalsePos);
585  }
586
587  private static final int BLOCKSIZE_SMALL = 8192;
588
589  @Test
590  public void testBloomFilter() throws Exception {
591    FileSystem fs = FileSystem.getLocal(conf);
592    conf.setFloat(BloomFilterFactory.IO_STOREFILE_BLOOM_ERROR_RATE, (float) 0.01);
593    conf.setBoolean(BloomFilterFactory.IO_STOREFILE_BLOOM_ENABLED, true);
594
595    // write the file
596    Path f = new Path(ROOT_DIR, getName());
597    HFileContext meta = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL)
598                        .withChecksumType(CKTYPE)
599                        .withBytesPerCheckSum(CKBYTES).build();
600    // Make a store file and write data to it.
601    StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs)
602            .withFilePath(f)
603            .withBloomType(BloomType.ROW)
604            .withMaxKeyCount(2000)
605            .withFileContext(meta)
606            .build();
607    bloomWriteRead(writer, fs);
608  }
609
610  @Test
611  public void testDeleteFamilyBloomFilter() throws Exception {
612    FileSystem fs = FileSystem.getLocal(conf);
613    conf.setFloat(BloomFilterFactory.IO_STOREFILE_BLOOM_ERROR_RATE, (float) 0.01);
614    conf.setBoolean(BloomFilterFactory.IO_STOREFILE_BLOOM_ENABLED, true);
615    float err = conf.getFloat(BloomFilterFactory.IO_STOREFILE_BLOOM_ERROR_RATE, 0);
616
617    // write the file
618    Path f = new Path(ROOT_DIR, getName());
619
620    HFileContext meta = new HFileContextBuilder()
621                        .withBlockSize(BLOCKSIZE_SMALL)
622                        .withChecksumType(CKTYPE)
623                        .withBytesPerCheckSum(CKBYTES).build();
624    // Make a store file and write data to it.
625    StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs)
626            .withFilePath(f)
627            .withMaxKeyCount(2000)
628            .withFileContext(meta)
629            .build();
630
631    // add delete family
632    long now = System.currentTimeMillis();
633    for (int i = 0; i < 2000; i += 2) {
634      String row = String.format(localFormatter, i);
635      KeyValue kv = new KeyValue(Bytes.toBytes(row), Bytes.toBytes("family"),
636          Bytes.toBytes("col"), now, KeyValue.Type.DeleteFamily, Bytes.toBytes("value"));
637      writer.append(kv);
638    }
639    writer.close();
640
641    StoreFileReader reader =
642        new StoreFileReader(fs, f, cacheConf, true, new AtomicInteger(0), true, conf);
643    reader.loadFileInfo();
644    reader.loadBloomfilter();
645
646    // check false positives rate
647    int falsePos = 0;
648    int falseNeg = 0;
649    for (int i = 0; i < 2000; i++) {
650      String row = String.format(localFormatter, i);
651      byte[] rowKey = Bytes.toBytes(row);
652      boolean exists = reader.passesDeleteFamilyBloomFilter(rowKey, 0, rowKey.length);
653      if (i % 2 == 0) {
654        if (!exists)
655          falseNeg++;
656      } else {
657        if (exists)
658          falsePos++;
659      }
660    }
661    assertEquals(1000, reader.getDeleteFamilyCnt());
662    reader.close(true); // evict because we are about to delete the file
663    fs.delete(f, true);
664    assertEquals("False negatives: " + falseNeg, 0, falseNeg);
665    int maxFalsePos = (int) (2 * 2000 * err);
666    assertTrue("Too many false positives: " + falsePos + " (err=" + err
667        + ", expected no more than " + maxFalsePos, falsePos <= maxFalsePos);
668  }
669
670  /**
671   * Test for HBASE-8012
672   */
673  @Test
674  public void testReseek() throws Exception {
675    // write the file
676    Path f = new Path(ROOT_DIR, getName());
677    HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
678    // Make a store file and write data to it.
679    StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs)
680            .withFilePath(f)
681            .withFileContext(meta)
682            .build();
683
684    writeStoreFile(writer);
685    writer.close();
686
687    StoreFileReader reader =
688        new StoreFileReader(fs, f, cacheConf, true, new AtomicInteger(0), true, conf);
689
690    // Now do reseek with empty KV to position to the beginning of the file
691
692    KeyValue k = KeyValueUtil.createFirstOnRow(HConstants.EMPTY_BYTE_ARRAY);
693    StoreFileScanner s = getStoreFileScanner(reader, false, false);
694    s.reseek(k);
695
696    assertNotNull("Intial reseek should position at the beginning of the file", s.peek());
697  }
698
699  @Test
700  public void testBloomTypes() throws Exception {
701    float err = (float) 0.01;
702    FileSystem fs = FileSystem.getLocal(conf);
703    conf.setFloat(BloomFilterFactory.IO_STOREFILE_BLOOM_ERROR_RATE, err);
704    conf.setBoolean(BloomFilterFactory.IO_STOREFILE_BLOOM_ENABLED, true);
705
706    int rowCount = 50;
707    int colCount = 10;
708    int versions = 2;
709
710    // run once using columns and once using rows
711    BloomType[] bt = {BloomType.ROWCOL, BloomType.ROW};
712    int[] expKeys  = {rowCount*colCount, rowCount};
713    // below line deserves commentary.  it is expected bloom false positives
714    //  column = rowCount*2*colCount inserts
715    //  row-level = only rowCount*2 inserts, but failures will be magnified by
716    //              2nd for loop for every column (2*colCount)
717    float[] expErr   = {2*rowCount*colCount*err, 2*rowCount*2*colCount*err};
718
719    for (int x : new int[]{0,1}) {
720      // write the file
721      Path f = new Path(ROOT_DIR, getName() + x);
722      HFileContext meta = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL)
723          .withChecksumType(CKTYPE)
724          .withBytesPerCheckSum(CKBYTES).build();
725      // Make a store file and write data to it.
726      StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs)
727              .withFilePath(f)
728              .withBloomType(bt[x])
729              .withMaxKeyCount(expKeys[x])
730              .withFileContext(meta)
731              .build();
732
733      long now = System.currentTimeMillis();
734      for (int i = 0; i < rowCount*2; i += 2) { // rows
735        for (int j = 0; j < colCount*2; j += 2) {   // column qualifiers
736          String row = String.format(localFormatter, i);
737          String col = String.format(localFormatter, j);
738          for (int k= 0; k < versions; ++k) { // versions
739            KeyValue kv = new KeyValue(Bytes.toBytes(row), Bytes.toBytes("family"),
740                Bytes.toBytes("col" + col), now-k, Bytes.toBytes(-1L));
741            writer.append(kv);
742          }
743        }
744      }
745      writer.close();
746
747      StoreFileReader reader =
748          new StoreFileReader(fs, f, cacheConf, true, new AtomicInteger(0), true, conf);
749      reader.loadFileInfo();
750      reader.loadBloomfilter();
751      StoreFileScanner scanner = getStoreFileScanner(reader, false, false);
752      assertEquals(expKeys[x], reader.getGeneralBloomFilter().getKeyCount());
753
754      HStore store = mock(HStore.class);
755      when(store.getColumnFamilyDescriptor())
756          .thenReturn(ColumnFamilyDescriptorBuilder.of("family"));
757      // check false positives rate
758      int falsePos = 0;
759      int falseNeg = 0;
760      for (int i = 0; i < rowCount*2; ++i) { // rows
761        for (int j = 0; j < colCount*2; ++j) {   // column qualifiers
762          String row = String.format(localFormatter, i);
763          String col = String.format(localFormatter, j);
764          TreeSet<byte[]> columns = new TreeSet<>(Bytes.BYTES_COMPARATOR);
765          columns.add(Bytes.toBytes("col" + col));
766
767          Scan scan = new Scan(Bytes.toBytes(row),Bytes.toBytes(row));
768          scan.addColumn(Bytes.toBytes("family"), Bytes.toBytes(("col"+col)));
769
770          boolean exists =
771              scanner.shouldUseScanner(scan, store, Long.MIN_VALUE);
772          boolean shouldRowExist = i % 2 == 0;
773          boolean shouldColExist = j % 2 == 0;
774          shouldColExist = shouldColExist || bt[x] == BloomType.ROW;
775          if (shouldRowExist && shouldColExist) {
776            if (!exists) falseNeg++;
777          } else {
778            if (exists) falsePos++;
779          }
780        }
781      }
782      reader.close(true); // evict because we are about to delete the file
783      fs.delete(f, true);
784      System.out.println(bt[x].toString());
785      System.out.println("  False negatives: " + falseNeg);
786      System.out.println("  False positives: " + falsePos);
787      assertEquals(0, falseNeg);
788      assertTrue(falsePos < 2*expErr[x]);
789    }
790  }
791
792  @Test
793  public void testSeqIdComparator() {
794    assertOrdering(StoreFileComparators.SEQ_ID, mockStoreFile(true, 100, 1000, -1, "/foo/123"),
795        mockStoreFile(true, 100, 1000, -1, "/foo/124"),
796        mockStoreFile(true, 99, 1000, -1, "/foo/126"),
797        mockStoreFile(true, 98, 2000, -1, "/foo/126"), mockStoreFile(false, 3453, -1, 1, "/foo/1"),
798        mockStoreFile(false, 2, -1, 3, "/foo/2"), mockStoreFile(false, 1000, -1, 5, "/foo/2"),
799        mockStoreFile(false, 76, -1, 5, "/foo/3"));
800  }
801
802  /**
803   * Assert that the given comparator orders the given storefiles in the
804   * same way that they're passed.
805   */
806  private void assertOrdering(Comparator<? super HStoreFile> comparator, HStoreFile ... sfs) {
807    ArrayList<HStoreFile> sorted = Lists.newArrayList(sfs);
808    Collections.shuffle(sorted);
809    Collections.sort(sorted, comparator);
810    LOG.debug("sfs: " + Joiner.on(",").join(sfs));
811    LOG.debug("sorted: " + Joiner.on(",").join(sorted));
812    assertTrue(Iterables.elementsEqual(Arrays.asList(sfs), sorted));
813  }
814
815  /**
816   * Create a mock StoreFile with the given attributes.
817   */
818  private HStoreFile mockStoreFile(boolean bulkLoad,
819                                  long size,
820                                  long bulkTimestamp,
821                                  long seqId,
822                                  String path) {
823    HStoreFile mock = Mockito.mock(HStoreFile.class);
824    StoreFileReader reader = Mockito.mock(StoreFileReader.class);
825
826    Mockito.doReturn(size).when(reader).length();
827
828    Mockito.doReturn(reader).when(mock).getReader();
829    Mockito.doReturn(bulkLoad).when(mock).isBulkLoadResult();
830    Mockito.doReturn(OptionalLong.of(bulkTimestamp)).when(mock).getBulkLoadTimestamp();
831    Mockito.doReturn(seqId).when(mock).getMaxSequenceId();
832    Mockito.doReturn(new Path(path)).when(mock).getPath();
833    String name = "mock storefile, bulkLoad=" + bulkLoad +
834      " bulkTimestamp=" + bulkTimestamp +
835      " seqId=" + seqId +
836      " path=" + path;
837    Mockito.doReturn(name).when(mock).toString();
838    return mock;
839  }
840
841  /**
842   * Generate a list of KeyValues for testing based on given parameters
843   * @param timestamps
844   * @param numRows
845   * @param qualifier
846   * @param family
847   * @return the rows key-value list
848   */
849  List<KeyValue> getKeyValueSet(long[] timestamps, int numRows,
850      byte[] qualifier, byte[] family) {
851    List<KeyValue> kvList = new ArrayList<>();
852    for (int i=1;i<=numRows;i++) {
853      byte[] b = Bytes.toBytes(i) ;
854      LOG.info(Bytes.toString(b));
855      LOG.info(Bytes.toString(b));
856      for (long timestamp: timestamps)
857      {
858        kvList.add(new KeyValue(b, family, qualifier, timestamp, b));
859      }
860    }
861    return kvList;
862  }
863
864  /**
865   * Test to ensure correctness when using StoreFile with multiple timestamps
866   * @throws IOException
867   */
868  @Test
869  public void testMultipleTimestamps() throws IOException {
870    byte[] family = Bytes.toBytes("familyname");
871    byte[] qualifier = Bytes.toBytes("qualifier");
872    int numRows = 10;
873    long[] timestamps = new long[] {20,10,5,1};
874    Scan scan = new Scan();
875
876    // Make up a directory hierarchy that has a regiondir ("7e0102") and familyname.
877    Path storedir = new Path(new Path(testDir, "7e0102"), Bytes.toString(family));
878    Path dir = new Path(storedir, "1234567890");
879    HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
880    // Make a store file and write data to it.
881    StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs)
882            .withOutputDir(dir)
883            .withFileContext(meta)
884            .build();
885
886    List<KeyValue> kvList = getKeyValueSet(timestamps,numRows,
887        qualifier, family);
888
889    for (KeyValue kv : kvList) {
890      writer.append(kv);
891    }
892    writer.appendMetadata(0, false);
893    writer.close();
894
895    HStoreFile hsf = new HStoreFile(this.fs, writer.getPath(), conf, cacheConf,
896      BloomType.NONE, true);
897    HStore store = mock(HStore.class);
898    when(store.getColumnFamilyDescriptor()).thenReturn(ColumnFamilyDescriptorBuilder.of(family));
899    hsf.initReader();
900    StoreFileReader reader = hsf.getReader();
901    StoreFileScanner scanner = getStoreFileScanner(reader, false, false);
902    TreeSet<byte[]> columns = new TreeSet<>(Bytes.BYTES_COMPARATOR);
903    columns.add(qualifier);
904
905    scan.setTimeRange(20, 100);
906    assertTrue(scanner.shouldUseScanner(scan, store, Long.MIN_VALUE));
907
908    scan.setTimeRange(1, 2);
909    assertTrue(scanner.shouldUseScanner(scan, store, Long.MIN_VALUE));
910
911    scan.setTimeRange(8, 10);
912    assertTrue(scanner.shouldUseScanner(scan, store, Long.MIN_VALUE));
913
914    // lets make sure it still works with column family time ranges
915    scan.setColumnFamilyTimeRange(family, 7, 50);
916    assertTrue(scanner.shouldUseScanner(scan, store, Long.MIN_VALUE));
917
918    // This test relies on the timestamp range optimization
919    scan = new Scan();
920    scan.setTimeRange(27, 50);
921    assertTrue(!scanner.shouldUseScanner(scan, store, Long.MIN_VALUE));
922
923    // should still use the scanner because we override the family time range
924    scan = new Scan();
925    scan.setTimeRange(27, 50);
926    scan.setColumnFamilyTimeRange(family, 7, 50);
927    assertTrue(scanner.shouldUseScanner(scan, store, Long.MIN_VALUE));
928  }
929
930  @Test
931  public void testCacheOnWriteEvictOnClose() throws Exception {
932    Configuration conf = this.conf;
933
934    // Find a home for our files (regiondir ("7e0102") and familyname).
935    Path baseDir = new Path(new Path(testDir, "7e0102"),"twoCOWEOC");
936
937    // Grab the block cache and get the initial hit/miss counts
938    BlockCache bc = BlockCacheFactory.createBlockCache(conf);
939    assertNotNull(bc);
940    CacheStats cs = bc.getStats();
941    long startHit = cs.getHitCount();
942    long startMiss = cs.getMissCount();
943    long startEvicted = cs.getEvictedCount();
944
945    // Let's write a StoreFile with three blocks, with cache on write off
946    conf.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, false);
947    CacheConfig cacheConf = new CacheConfig(conf, bc);
948    Path pathCowOff = new Path(baseDir, "123456789");
949    StoreFileWriter writer = writeStoreFile(conf, cacheConf, pathCowOff, 3);
950    HStoreFile hsf = new HStoreFile(this.fs, writer.getPath(), conf, cacheConf,
951      BloomType.NONE, true);
952    LOG.debug(hsf.getPath().toString());
953
954    // Read this file, we should see 3 misses
955    hsf.initReader();
956    StoreFileReader reader = hsf.getReader();
957    reader.loadFileInfo();
958    StoreFileScanner scanner = getStoreFileScanner(reader, true, true);
959    scanner.seek(KeyValue.LOWESTKEY);
960    while (scanner.next() != null);
961    assertEquals(startHit, cs.getHitCount());
962    assertEquals(startMiss + 3, cs.getMissCount());
963    assertEquals(startEvicted, cs.getEvictedCount());
964    startMiss += 3;
965    scanner.close();
966    reader.close(cacheConf.shouldEvictOnClose());
967
968    // Now write a StoreFile with three blocks, with cache on write on
969    conf.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, true);
970    cacheConf = new CacheConfig(conf, bc);
971    Path pathCowOn = new Path(baseDir, "123456788");
972    writer = writeStoreFile(conf, cacheConf, pathCowOn, 3);
973    hsf = new HStoreFile(this.fs, writer.getPath(), conf, cacheConf,
974      BloomType.NONE, true);
975
976    // Read this file, we should see 3 hits
977    hsf.initReader();
978    reader = hsf.getReader();
979    scanner = getStoreFileScanner(reader, true, true);
980    scanner.seek(KeyValue.LOWESTKEY);
981    while (scanner.next() != null);
982    assertEquals(startHit + 3, cs.getHitCount());
983    assertEquals(startMiss, cs.getMissCount());
984    assertEquals(startEvicted, cs.getEvictedCount());
985    startHit += 3;
986    scanner.close();
987    reader.close(cacheConf.shouldEvictOnClose());
988
989    // Let's read back the two files to ensure the blocks exactly match
990    hsf = new HStoreFile(this.fs, pathCowOff, conf, cacheConf, BloomType.NONE, true);
991    hsf.initReader();
992    StoreFileReader readerOne = hsf.getReader();
993    readerOne.loadFileInfo();
994    StoreFileScanner scannerOne = getStoreFileScanner(readerOne, true, true);
995    scannerOne.seek(KeyValue.LOWESTKEY);
996    hsf = new HStoreFile(this.fs, pathCowOn, conf, cacheConf, BloomType.NONE, true);
997    hsf.initReader();
998    StoreFileReader readerTwo = hsf.getReader();
999    readerTwo.loadFileInfo();
1000    StoreFileScanner scannerTwo = getStoreFileScanner(readerTwo, true, true);
1001    scannerTwo.seek(KeyValue.LOWESTKEY);
1002    Cell kv1 = null;
1003    Cell kv2 = null;
1004    while ((kv1 = scannerOne.next()) != null) {
1005      kv2 = scannerTwo.next();
1006      assertTrue(kv1.equals(kv2));
1007      KeyValue keyv1 = KeyValueUtil.ensureKeyValue(kv1);
1008      KeyValue keyv2 = KeyValueUtil.ensureKeyValue(kv2);
1009      assertTrue(Bytes.compareTo(
1010          keyv1.getBuffer(), keyv1.getKeyOffset(), keyv1.getKeyLength(),
1011          keyv2.getBuffer(), keyv2.getKeyOffset(), keyv2.getKeyLength()) == 0);
1012      assertTrue(Bytes.compareTo(
1013          kv1.getValueArray(), kv1.getValueOffset(), kv1.getValueLength(),
1014          kv2.getValueArray(), kv2.getValueOffset(), kv2.getValueLength()) == 0);
1015    }
1016    assertNull(scannerTwo.next());
1017    assertEquals(startHit + 6, cs.getHitCount());
1018    assertEquals(startMiss, cs.getMissCount());
1019    assertEquals(startEvicted, cs.getEvictedCount());
1020    startHit += 6;
1021    scannerOne.close();
1022    readerOne.close(cacheConf.shouldEvictOnClose());
1023    scannerTwo.close();
1024    readerTwo.close(cacheConf.shouldEvictOnClose());
1025
1026    // Let's close the first file with evict on close turned on
1027    conf.setBoolean("hbase.rs.evictblocksonclose", true);
1028    cacheConf = new CacheConfig(conf, bc);
1029    hsf = new HStoreFile(this.fs, pathCowOff, conf, cacheConf, BloomType.NONE, true);
1030    hsf.initReader();
1031    reader = hsf.getReader();
1032    reader.close(cacheConf.shouldEvictOnClose());
1033
1034    // We should have 3 new evictions but the evict count stat should not change. Eviction because
1035    // of HFile invalidation is not counted along with normal evictions
1036    assertEquals(startHit, cs.getHitCount());
1037    assertEquals(startMiss, cs.getMissCount());
1038    assertEquals(startEvicted, cs.getEvictedCount());
1039
1040    // Let's close the second file with evict on close turned off
1041    conf.setBoolean("hbase.rs.evictblocksonclose", false);
1042    cacheConf = new CacheConfig(conf, bc);
1043    hsf = new HStoreFile(this.fs, pathCowOn, conf, cacheConf, BloomType.NONE, true);
1044    hsf.initReader();
1045    reader = hsf.getReader();
1046    reader.close(cacheConf.shouldEvictOnClose());
1047
1048    // We expect no changes
1049    assertEquals(startHit, cs.getHitCount());
1050    assertEquals(startMiss, cs.getMissCount());
1051    assertEquals(startEvicted, cs.getEvictedCount());
1052  }
1053
1054  private Path splitStoreFile(final HRegionFileSystem regionFs, final HRegionInfo hri,
1055      final String family, final HStoreFile sf, final byte[] splitKey, boolean isTopRef)
1056      throws IOException {
1057    FileSystem fs = regionFs.getFileSystem();
1058    Path path = regionFs.splitStoreFile(hri, family, sf, splitKey, isTopRef, null);
1059    if (null == path) {
1060      return null;
1061    }
1062    Path regionDir = regionFs.commitDaughterRegion(hri);
1063    return new Path(new Path(regionDir, family), path.getName());
1064  }
1065
1066  private StoreFileWriter writeStoreFile(Configuration conf,
1067      CacheConfig cacheConf, Path path, int numBlocks)
1068  throws IOException {
1069    // Let's put ~5 small KVs in each block, so let's make 5*numBlocks KVs
1070    int numKVs = 5 * numBlocks;
1071    List<KeyValue> kvs = new ArrayList<>(numKVs);
1072    byte [] b = Bytes.toBytes("x");
1073    int totalSize = 0;
1074    for (int i=numKVs;i>0;i--) {
1075      KeyValue kv = new KeyValue(b, b, b, i, b);
1076      kvs.add(kv);
1077      // kv has memstoreTS 0, which takes 1 byte to store.
1078      totalSize += kv.getLength() + 1;
1079    }
1080    int blockSize = totalSize / numBlocks;
1081    HFileContext meta = new HFileContextBuilder().withBlockSize(blockSize)
1082                        .withChecksumType(CKTYPE)
1083                        .withBytesPerCheckSum(CKBYTES)
1084                        .build();
1085    // Make a store file and write data to it.
1086    StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs)
1087            .withFilePath(path)
1088            .withMaxKeyCount(2000)
1089            .withFileContext(meta)
1090            .build();
1091    // We'll write N-1 KVs to ensure we don't write an extra block
1092    kvs.remove(kvs.size()-1);
1093    for (KeyValue kv : kvs) {
1094      writer.append(kv);
1095    }
1096    writer.appendMetadata(0, false);
1097    writer.close();
1098    return writer;
1099  }
1100
1101  /**
1102   * Check if data block encoding information is saved correctly in HFile's
1103   * file info.
1104   */
1105  @Test
1106  public void testDataBlockEncodingMetaData() throws IOException {
1107    // Make up a directory hierarchy that has a regiondir ("7e0102") and familyname.
1108    Path dir = new Path(new Path(testDir, "7e0102"), "familyname");
1109    Path path = new Path(dir, "1234567890");
1110
1111    DataBlockEncoding dataBlockEncoderAlgo =
1112        DataBlockEncoding.FAST_DIFF;
1113    HFileDataBlockEncoder dataBlockEncoder =
1114        new HFileDataBlockEncoderImpl(
1115            dataBlockEncoderAlgo);
1116    cacheConf = new CacheConfig(conf);
1117    HFileContext meta = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL)
1118        .withChecksumType(CKTYPE)
1119        .withBytesPerCheckSum(CKBYTES)
1120        .withDataBlockEncoding(dataBlockEncoderAlgo)
1121        .build();
1122    // Make a store file and write data to it.
1123    StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs)
1124            .withFilePath(path)
1125            .withMaxKeyCount(2000)
1126            .withFileContext(meta)
1127            .build();
1128    writer.close();
1129
1130    HStoreFile storeFile =
1131        new HStoreFile(fs, writer.getPath(), conf, cacheConf, BloomType.NONE, true);
1132    storeFile.initReader();
1133    StoreFileReader reader = storeFile.getReader();
1134
1135    Map<byte[], byte[]> fileInfo = reader.loadFileInfo();
1136    byte[] value = fileInfo.get(HFileDataBlockEncoder.DATA_BLOCK_ENCODING);
1137    assertEquals(dataBlockEncoderAlgo.getNameInBytes(), value);
1138  }
1139}