001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.mockito.Mockito.mock;
021import static org.mockito.Mockito.when;
022
023import java.io.IOException;
024import java.nio.ByteBuffer;
025import java.util.ArrayList;
026import java.util.Arrays;
027import java.util.Collections;
028import java.util.Comparator;
029import java.util.List;
030import java.util.Map;
031import java.util.OptionalLong;
032import java.util.TreeSet;
033import java.util.concurrent.atomic.AtomicInteger;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.hbase.Cell;
038import org.apache.hadoop.hbase.CellUtil;
039import org.apache.hadoop.hbase.HBaseClassTestRule;
040import org.apache.hadoop.hbase.HBaseTestCase;
041import org.apache.hadoop.hbase.HBaseTestingUtility;
042import org.apache.hadoop.hbase.HConstants;
043import org.apache.hadoop.hbase.HRegionInfo;
044import org.apache.hadoop.hbase.KeyValue;
045import org.apache.hadoop.hbase.KeyValueUtil;
046import org.apache.hadoop.hbase.PrivateCellUtil;
047import org.apache.hadoop.hbase.TableName;
048import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
049import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
050import org.apache.hadoop.hbase.client.RegionInfo;
051import org.apache.hadoop.hbase.client.RegionInfoBuilder;
052import org.apache.hadoop.hbase.client.Scan;
053import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
054import org.apache.hadoop.hbase.io.HFileLink;
055import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
056import org.apache.hadoop.hbase.io.hfile.BlockCache;
057import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
058import org.apache.hadoop.hbase.io.hfile.CacheConfig;
059import org.apache.hadoop.hbase.io.hfile.CacheStats;
060import org.apache.hadoop.hbase.io.hfile.HFileContext;
061import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
062import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
063import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
064import org.apache.hadoop.hbase.io.hfile.HFileInfo;
065import org.apache.hadoop.hbase.io.hfile.HFileScanner;
066import org.apache.hadoop.hbase.io.hfile.ReaderContext;
067import org.apache.hadoop.hbase.io.hfile.ReaderContextBuilder;
068import org.apache.hadoop.hbase.testclassification.MediumTests;
069import org.apache.hadoop.hbase.testclassification.RegionServerTests;
070import org.apache.hadoop.hbase.util.BloomFilterFactory;
071import org.apache.hadoop.hbase.util.Bytes;
072import org.apache.hadoop.hbase.util.ChecksumType;
073import org.apache.hadoop.hbase.util.CommonFSUtils;
074import org.junit.After;
075import org.junit.Before;
076import org.junit.ClassRule;
077import org.junit.Test;
078import org.junit.experimental.categories.Category;
079import org.mockito.Mockito;
080import org.slf4j.Logger;
081import org.slf4j.LoggerFactory;
082
083import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
084import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
085import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
086
087/**
088 * Test HStoreFile
089 */
090@Category({RegionServerTests.class, MediumTests.class})
091public class TestHStoreFile extends HBaseTestCase {
092
093  @ClassRule
094  public static final HBaseClassTestRule CLASS_RULE =
095      HBaseClassTestRule.forClass(TestHStoreFile.class);
096
097  private static final Logger LOG = LoggerFactory.getLogger(TestHStoreFile.class);
098  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
099  private CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration());
100  private static String ROOT_DIR = TEST_UTIL.getDataTestDir("TestStoreFile").toString();
101  private static final ChecksumType CKTYPE = ChecksumType.CRC32C;
102  private static final int CKBYTES = 512;
103  private static String TEST_FAMILY = "cf";
104
105  @Override
106  @Before
107  public void setUp() throws Exception {
108    super.setUp();
109  }
110
111  @Override
112  @After
113  public void tearDown() throws Exception {
114    super.tearDown();
115  }
116
117  /**
118   * Write a file and then assert that we can read from top and bottom halves
119   * using two HalfMapFiles.
120   */
121  @Test
122  public void testBasicHalfMapFile() throws Exception {
123    final HRegionInfo hri =
124        new HRegionInfo(TableName.valueOf("testBasicHalfMapFileTb"));
125    HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(
126      conf, fs, new Path(testDir, hri.getTable().getNameAsString()), hri);
127
128    HFileContext meta = new HFileContextBuilder().withBlockSize(2*1024).build();
129    StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs)
130            .withFilePath(regionFs.createTempName())
131            .withFileContext(meta)
132            .build();
133    writeStoreFile(writer);
134
135    Path sfPath = regionFs.commitStoreFile(TEST_FAMILY, writer.getPath());
136    HStoreFile sf = new HStoreFile(this.fs, sfPath, conf, cacheConf, BloomType.NONE, true);
137    checkHalfHFile(regionFs, sf);
138  }
139
140  private void writeStoreFile(final StoreFileWriter writer) throws IOException {
141    writeStoreFile(writer, Bytes.toBytes(getName()), Bytes.toBytes(getName()));
142  }
143
144  // pick an split point (roughly halfway)
145  byte[] SPLITKEY = new byte[] { (LAST_CHAR + FIRST_CHAR)/2, FIRST_CHAR};
146
147  /*
148   * Writes HStoreKey and ImmutableBytes data to passed writer and
149   * then closes it.
150   * @param writer
151   * @throws IOException
152   */
153  public static void writeStoreFile(final StoreFileWriter writer, byte[] fam, byte[] qualifier)
154      throws IOException {
155    long now = System.currentTimeMillis();
156    try {
157      for (char d = FIRST_CHAR; d <= LAST_CHAR; d++) {
158        for (char e = FIRST_CHAR; e <= LAST_CHAR; e++) {
159          byte[] b = new byte[] { (byte) d, (byte) e };
160          writer.append(new KeyValue(b, fam, qualifier, now, b));
161        }
162      }
163    } finally {
164      writer.close();
165    }
166  }
167
168  /**
169   * Test that our mechanism of writing store files in one region to reference
170   * store files in other regions works.
171   */
172  @Test
173  public void testReference() throws IOException {
174    final HRegionInfo hri = new HRegionInfo(TableName.valueOf("testReferenceTb"));
175    HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(
176      conf, fs, new Path(testDir, hri.getTable().getNameAsString()), hri);
177
178    HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
179    // Make a store file and write data to it.
180    StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs)
181            .withFilePath(regionFs.createTempName())
182            .withFileContext(meta)
183            .build();
184    writeStoreFile(writer);
185
186    Path hsfPath = regionFs.commitStoreFile(TEST_FAMILY, writer.getPath());
187    HStoreFile hsf = new HStoreFile(this.fs, hsfPath, conf, cacheConf, BloomType.NONE, true);
188    hsf.initReader();
189    StoreFileReader reader = hsf.getReader();
190    // Split on a row, not in middle of row.  Midkey returned by reader
191    // may be in middle of row.  Create new one with empty column and
192    // timestamp.
193    byte [] midRow = CellUtil.cloneRow(reader.midKey().get());
194    byte [] finalRow = CellUtil.cloneRow(reader.getLastKey().get());
195    hsf.closeStoreFile(true);
196
197    // Make a reference
198    HRegionInfo splitHri = new HRegionInfo(hri.getTable(), null, midRow);
199    Path refPath = splitStoreFile(regionFs, splitHri, TEST_FAMILY, hsf, midRow, true);
200    HStoreFile refHsf = new HStoreFile(this.fs, refPath, conf, cacheConf, BloomType.NONE, true);
201    refHsf.initReader();
202    // Now confirm that I can read from the reference and that it only gets
203    // keys from top half of the file.
204    HFileScanner s = refHsf.getReader().getScanner(false, false);
205    Cell kv = null;
206    for (boolean first = true; (!s.isSeeked() && s.seekTo()) || s.next();) {
207      ByteBuffer bb = ByteBuffer.wrap(((KeyValue) s.getKey()).getKey());
208      kv = KeyValueUtil.createKeyValueFromKey(bb);
209      if (first) {
210        assertTrue(Bytes.equals(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(), midRow, 0,
211          midRow.length));
212        first = false;
213      }
214    }
215    assertTrue(Bytes.equals(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(), finalRow, 0,
216      finalRow.length));
217  }
218
219  @Test
220  public void testStoreFileReference() throws Exception {
221    final RegionInfo hri =
222        RegionInfoBuilder.newBuilder(TableName.valueOf("testStoreFileReference")).build();
223    HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(conf, fs,
224      new Path(testDir, hri.getTable().getNameAsString()), hri);
225    HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
226
227    // Make a store file and write data to it.
228    StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs)
229        .withFilePath(regionFs.createTempName()).withFileContext(meta).build();
230    writeStoreFile(writer);
231    Path hsfPath = regionFs.commitStoreFile(TEST_FAMILY, writer.getPath());
232    writer.close();
233
234    HStoreFile file = new HStoreFile(this.fs, hsfPath, conf, cacheConf, BloomType.NONE, true);
235    file.initReader();
236    StoreFileReader r = file.getReader();
237    assertNotNull(r);
238    StoreFileScanner scanner =
239        new StoreFileScanner(r, mock(HFileScanner.class), false, false, 0, 0, false);
240
241    // Verify after instantiating scanner refCount is increased
242    assertTrue("Verify file is being referenced", file.isReferencedInReads());
243    scanner.close();
244    // Verify after closing scanner refCount is decreased
245    assertFalse("Verify file is not being referenced", file.isReferencedInReads());
246  }
247
248  @Test
249  public void testEmptyStoreFileRestrictKeyRanges() throws Exception {
250    StoreFileReader reader = mock(StoreFileReader.class);
251    HStore store = mock(HStore.class);
252    byte[] cf = Bytes.toBytes("ty");
253    ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder.of(cf);
254    when(store.getColumnFamilyDescriptor()).thenReturn(cfd);
255    StoreFileScanner scanner =
256        new StoreFileScanner(reader, mock(HFileScanner.class), false, false, 0, 0, true);
257    Scan scan = new Scan();
258    scan.setColumnFamilyTimeRange(cf, 0, 1);
259    assertFalse(scanner.shouldUseScanner(scan, store, 0));
260  }
261
262  @Test
263  public void testHFileLink() throws IOException {
264    final HRegionInfo hri = new HRegionInfo(TableName.valueOf("testHFileLinkTb"));
265    // force temp data in hbase/target/test-data instead of /tmp/hbase-xxxx/
266    Configuration testConf = new Configuration(this.conf);
267    CommonFSUtils.setRootDir(testConf, testDir);
268    HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(
269      testConf, fs, CommonFSUtils.getTableDir(testDir, hri.getTable()), hri);
270    HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
271
272    // Make a store file and write data to it.
273    StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs)
274            .withFilePath(regionFs.createTempName())
275            .withFileContext(meta)
276            .build();
277    writeStoreFile(writer);
278
279    Path storeFilePath = regionFs.commitStoreFile(TEST_FAMILY, writer.getPath());
280    Path dstPath = new Path(regionFs.getTableDir(), new Path("test-region", TEST_FAMILY));
281    HFileLink.create(testConf, this.fs, dstPath, hri, storeFilePath.getName());
282    Path linkFilePath = new Path(dstPath,
283                  HFileLink.createHFileLinkName(hri, storeFilePath.getName()));
284
285    // Try to open store file from link
286    StoreFileInfo storeFileInfo = new StoreFileInfo(testConf, this.fs, linkFilePath, true);
287    HStoreFile hsf = new HStoreFile(storeFileInfo, BloomType.NONE, cacheConf);
288    assertTrue(storeFileInfo.isLink());
289    hsf.initReader();
290
291    // Now confirm that I can read from the link
292    int count = 1;
293    HFileScanner s = hsf.getReader().getScanner(false, false);
294    s.seekTo();
295    while (s.next()) {
296      count++;
297    }
298    assertEquals((LAST_CHAR - FIRST_CHAR + 1) * (LAST_CHAR - FIRST_CHAR + 1), count);
299  }
300
301  /**
302   * This test creates an hfile and then the dir structures and files to verify that references
303   * to hfilelinks (created by snapshot clones) can be properly interpreted.
304   */
305  @Test
306  public void testReferenceToHFileLink() throws IOException {
307    // force temp data in hbase/target/test-data instead of /tmp/hbase-xxxx/
308    Configuration testConf = new Configuration(this.conf);
309    CommonFSUtils.setRootDir(testConf, testDir);
310
311    // adding legal table name chars to verify regex handles it.
312    HRegionInfo hri = new HRegionInfo(TableName.valueOf("_original-evil-name"));
313    HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(
314      testConf, fs, CommonFSUtils.getTableDir(testDir, hri.getTable()), hri);
315
316    HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
317    // Make a store file and write data to it. <root>/<tablename>/<rgn>/<cf>/<file>
318    StoreFileWriter writer = new StoreFileWriter.Builder(testConf, cacheConf, this.fs)
319            .withFilePath(regionFs.createTempName())
320            .withFileContext(meta)
321            .build();
322    writeStoreFile(writer);
323    Path storeFilePath = regionFs.commitStoreFile(TEST_FAMILY, writer.getPath());
324
325    // create link to store file. <root>/clone/region/<cf>/<hfile>-<region>-<table>
326    HRegionInfo hriClone = new HRegionInfo(TableName.valueOf("clone"));
327    HRegionFileSystem cloneRegionFs = HRegionFileSystem.createRegionOnFileSystem(
328      testConf, fs, CommonFSUtils.getTableDir(testDir, hri.getTable()),
329        hriClone);
330    Path dstPath = cloneRegionFs.getStoreDir(TEST_FAMILY);
331    HFileLink.create(testConf, this.fs, dstPath, hri, storeFilePath.getName());
332    Path linkFilePath = new Path(dstPath,
333                  HFileLink.createHFileLinkName(hri, storeFilePath.getName()));
334
335    // create splits of the link.
336    // <root>/clone/splitA/<cf>/<reftohfilelink>,
337    // <root>/clone/splitB/<cf>/<reftohfilelink>
338    HRegionInfo splitHriA = new HRegionInfo(hri.getTable(), null, SPLITKEY);
339    HRegionInfo splitHriB = new HRegionInfo(hri.getTable(), SPLITKEY, null);
340    HStoreFile f = new HStoreFile(fs, linkFilePath, testConf, cacheConf, BloomType.NONE, true);
341    f.initReader();
342    Path pathA = splitStoreFile(cloneRegionFs, splitHriA, TEST_FAMILY, f, SPLITKEY, true); // top
343    Path pathB = splitStoreFile(cloneRegionFs, splitHriB, TEST_FAMILY, f, SPLITKEY, false);// bottom
344    f.closeStoreFile(true);
345    // OK test the thing
346    CommonFSUtils.logFileSystemState(fs, testDir, LOG);
347
348    // There is a case where a file with the hfilelink pattern is actually a daughter
349    // reference to a hfile link.  This code in StoreFile that handles this case.
350
351    // Try to open store file from link
352    HStoreFile hsfA = new HStoreFile(this.fs, pathA, testConf, cacheConf, BloomType.NONE, true);
353    hsfA.initReader();
354
355    // Now confirm that I can read from the ref to link
356    int count = 1;
357    HFileScanner s = hsfA.getReader().getScanner(false, false);
358    s.seekTo();
359    while (s.next()) {
360      count++;
361    }
362    assertTrue(count > 0); // read some rows here
363
364    // Try to open store file from link
365    HStoreFile hsfB = new HStoreFile(this.fs, pathB, testConf, cacheConf, BloomType.NONE, true);
366    hsfB.initReader();
367
368    // Now confirm that I can read from the ref to link
369    HFileScanner sB = hsfB.getReader().getScanner(false, false);
370    sB.seekTo();
371
372    //count++ as seekTo() will advance the scanner
373    count++;
374    while (sB.next()) {
375      count++;
376    }
377
378    // read the rest of the rows
379    assertEquals((LAST_CHAR - FIRST_CHAR + 1) * (LAST_CHAR - FIRST_CHAR + 1), count);
380  }
381
382  private void checkHalfHFile(final HRegionFileSystem regionFs, final HStoreFile f)
383      throws IOException {
384    f.initReader();
385    Cell midkey = f.getReader().midKey().get();
386    KeyValue midKV = (KeyValue)midkey;
387    byte [] midRow = CellUtil.cloneRow(midKV);
388    // Create top split.
389    HRegionInfo topHri = new HRegionInfo(regionFs.getRegionInfo().getTable(), null, midRow);
390    Path topPath = splitStoreFile(regionFs, topHri, TEST_FAMILY, f, midRow, true);
391    // Create bottom split.
392    HRegionInfo bottomHri = new HRegionInfo(regionFs.getRegionInfo().getTable(),
393        midRow, null);
394    Path bottomPath = splitStoreFile(regionFs, bottomHri, TEST_FAMILY, f, midRow, false);
395    // Make readers on top and bottom.
396    HStoreFile topF = new HStoreFile(this.fs, topPath, conf, cacheConf, BloomType.NONE, true);
397    topF.initReader();
398    StoreFileReader top = topF.getReader();
399    HStoreFile bottomF = new HStoreFile(this.fs, bottomPath, conf, cacheConf, BloomType.NONE, true);
400    bottomF.initReader();
401    StoreFileReader bottom = bottomF.getReader();
402    ByteBuffer previous = null;
403    LOG.info("Midkey: " + midKV.toString());
404    ByteBuffer bbMidkeyBytes = ByteBuffer.wrap(midKV.getKey());
405    try {
406      // Now make two HalfMapFiles and assert they can read the full backing
407      // file, one from the top and the other from the bottom.
408      // Test bottom half first.
409      // Now test reading from the top.
410      boolean first = true;
411      ByteBuffer key = null;
412      HFileScanner topScanner = top.getScanner(false, false);
413      while ((!topScanner.isSeeked() && topScanner.seekTo()) ||
414             (topScanner.isSeeked() && topScanner.next())) {
415        key = ByteBuffer.wrap(((KeyValue) topScanner.getKey()).getKey());
416
417        if ((PrivateCellUtil.compare(topScanner.getReader().getComparator(), midKV, key.array(),
418          key.arrayOffset(), key.limit())) > 0) {
419          fail("key=" + Bytes.toStringBinary(key) + " < midkey=" +
420              midkey);
421        }
422        if (first) {
423          first = false;
424          LOG.info("First in top: " + Bytes.toString(Bytes.toBytes(key)));
425        }
426      }
427      LOG.info("Last in top: " + Bytes.toString(Bytes.toBytes(key)));
428
429      first = true;
430      HFileScanner bottomScanner = bottom.getScanner(false, false);
431      while ((!bottomScanner.isSeeked() && bottomScanner.seekTo()) ||
432          bottomScanner.next()) {
433        previous = ByteBuffer.wrap(((KeyValue) bottomScanner.getKey()).getKey());
434        key = ByteBuffer.wrap(((KeyValue) bottomScanner.getKey()).getKey());
435        if (first) {
436          first = false;
437          LOG.info("First in bottom: " +
438            Bytes.toString(Bytes.toBytes(previous)));
439        }
440        assertTrue(key.compareTo(bbMidkeyBytes) < 0);
441      }
442      if (previous != null) {
443        LOG.info("Last in bottom: " + Bytes.toString(Bytes.toBytes(previous)));
444      }
445      // Remove references.
446      regionFs.cleanupDaughterRegion(topHri);
447      regionFs.cleanupDaughterRegion(bottomHri);
448
449      // Next test using a midkey that does not exist in the file.
450      // First, do a key that is < than first key. Ensure splits behave
451      // properly.
452      byte [] badmidkey = Bytes.toBytes("  .");
453      assertTrue(fs.exists(f.getPath()));
454      topPath = splitStoreFile(regionFs, topHri, TEST_FAMILY, f, badmidkey, true);
455      bottomPath = splitStoreFile(regionFs, bottomHri, TEST_FAMILY, f, badmidkey, false);
456
457      assertNull(bottomPath);
458
459      topF = new HStoreFile(this.fs, topPath, conf, cacheConf, BloomType.NONE, true);
460      topF.initReader();
461      top = topF.getReader();
462      // Now read from the top.
463      first = true;
464      topScanner = top.getScanner(false, false);
465      KeyValue.KeyOnlyKeyValue keyOnlyKV = new KeyValue.KeyOnlyKeyValue();
466      while ((!topScanner.isSeeked() && topScanner.seekTo()) ||
467          topScanner.next()) {
468        key = ByteBuffer.wrap(((KeyValue) topScanner.getKey()).getKey());
469        keyOnlyKV.setKey(key.array(), 0 + key.arrayOffset(), key.limit());
470        assertTrue(PrivateCellUtil.compare(topScanner.getReader().getComparator(), keyOnlyKV,
471          badmidkey, 0, badmidkey.length) >= 0);
472        if (first) {
473          first = false;
474          KeyValue keyKV = KeyValueUtil.createKeyValueFromKey(key);
475          LOG.info("First top when key < bottom: " + keyKV);
476          String tmp =
477              Bytes.toString(keyKV.getRowArray(), keyKV.getRowOffset(), keyKV.getRowLength());
478          for (int i = 0; i < tmp.length(); i++) {
479            assertTrue(tmp.charAt(i) == 'a');
480          }
481        }
482      }
483      KeyValue keyKV = KeyValueUtil.createKeyValueFromKey(key);
484      LOG.info("Last top when key < bottom: " + keyKV);
485      String tmp = Bytes.toString(keyKV.getRowArray(), keyKV.getRowOffset(), keyKV.getRowLength());
486      for (int i = 0; i < tmp.length(); i++) {
487        assertTrue(tmp.charAt(i) == 'z');
488      }
489      // Remove references.
490      regionFs.cleanupDaughterRegion(topHri);
491      regionFs.cleanupDaughterRegion(bottomHri);
492
493      // Test when badkey is > than last key in file ('||' > 'zz').
494      badmidkey = Bytes.toBytes("|||");
495      topPath = splitStoreFile(regionFs,topHri, TEST_FAMILY, f, badmidkey, true);
496      bottomPath = splitStoreFile(regionFs, bottomHri, TEST_FAMILY, f, badmidkey, false);
497      assertNull(topPath);
498
499      bottomF = new HStoreFile(this.fs, bottomPath, conf, cacheConf, BloomType.NONE, true);
500      bottomF.initReader();
501      bottom = bottomF.getReader();
502      first = true;
503      bottomScanner = bottom.getScanner(false, false);
504      while ((!bottomScanner.isSeeked() && bottomScanner.seekTo()) ||
505          bottomScanner.next()) {
506        key = ByteBuffer.wrap(((KeyValue) bottomScanner.getKey()).getKey());
507        if (first) {
508          first = false;
509          keyKV = KeyValueUtil.createKeyValueFromKey(key);
510          LOG.info("First bottom when key > top: " + keyKV);
511          tmp = Bytes.toString(keyKV.getRowArray(), keyKV.getRowOffset(), keyKV.getRowLength());
512          for (int i = 0; i < tmp.length(); i++) {
513            assertTrue(tmp.charAt(i) == 'a');
514          }
515        }
516      }
517      keyKV = KeyValueUtil.createKeyValueFromKey(key);
518      LOG.info("Last bottom when key > top: " + keyKV);
519      for (int i = 0; i < tmp.length(); i++) {
520        assertTrue(Bytes.toString(keyKV.getRowArray(), keyKV.getRowOffset(), keyKV.getRowLength())
521            .charAt(i) == 'z');
522      }
523    } finally {
524      if (top != null) {
525        top.close(true); // evict since we are about to delete the file
526      }
527      if (bottom != null) {
528        bottom.close(true); // evict since we are about to delete the file
529      }
530      fs.delete(f.getPath(), true);
531    }
532  }
533
534  private static StoreFileScanner getStoreFileScanner(StoreFileReader reader, boolean cacheBlocks,
535      boolean pread) {
536    return reader.getStoreFileScanner(cacheBlocks, pread, false, 0, 0, false);
537  }
538
539  private static final String localFormatter = "%010d";
540
541  private void bloomWriteRead(StoreFileWriter writer, FileSystem fs) throws Exception {
542    float err = conf.getFloat(BloomFilterFactory.IO_STOREFILE_BLOOM_ERROR_RATE, 0);
543    Path f = writer.getPath();
544    long now = System.currentTimeMillis();
545    for (int i = 0; i < 2000; i += 2) {
546      String row = String.format(localFormatter, i);
547      KeyValue kv = new KeyValue(Bytes.toBytes(row), Bytes.toBytes("family"),
548        Bytes.toBytes("col"), now, Bytes.toBytes("value"));
549      writer.append(kv);
550    }
551    writer.close();
552
553    ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, f).build();
554    HFileInfo fileInfo = new HFileInfo(context, conf);
555    StoreFileReader reader =
556        new StoreFileReader(context, fileInfo, cacheConf, new AtomicInteger(0), conf);
557    fileInfo.initMetaAndIndex(reader.getHFileReader());
558    reader.loadFileInfo();
559    reader.loadBloomfilter();
560    StoreFileScanner scanner = getStoreFileScanner(reader, false, false);
561
562    // check false positives rate
563    int falsePos = 0;
564    int falseNeg = 0;
565    for (int i = 0; i < 2000; i++) {
566      String row = String.format(localFormatter, i);
567      TreeSet<byte[]> columns = new TreeSet<>(Bytes.BYTES_COMPARATOR);
568      columns.add(Bytes.toBytes("family:col"));
569
570      Scan scan = new Scan(Bytes.toBytes(row),Bytes.toBytes(row));
571      scan.addColumn(Bytes.toBytes("family"), Bytes.toBytes("family:col"));
572      HStore store = mock(HStore.class);
573      when(store.getColumnFamilyDescriptor())
574          .thenReturn(ColumnFamilyDescriptorBuilder.of("family"));
575      boolean exists = scanner.shouldUseScanner(scan, store, Long.MIN_VALUE);
576      if (i % 2 == 0) {
577        if (!exists) {
578          falseNeg++;
579        }
580      } else {
581        if (exists) {
582          falsePos++;
583        }
584      }
585    }
586    reader.close(true); // evict because we are about to delete the file
587    fs.delete(f, true);
588    assertEquals("False negatives: " + falseNeg, 0, falseNeg);
589    int maxFalsePos = (int) (2 * 2000 * err);
590    assertTrue("Too many false positives: " + falsePos + " (err=" + err + ", expected no more than "
591            + maxFalsePos + ")", falsePos <= maxFalsePos);
592  }
593
594  private static final int BLOCKSIZE_SMALL = 8192;
595
596  @Test
597  public void testBloomFilter() throws Exception {
598    FileSystem fs = FileSystem.getLocal(conf);
599    conf.setFloat(BloomFilterFactory.IO_STOREFILE_BLOOM_ERROR_RATE, (float) 0.01);
600    conf.setBoolean(BloomFilterFactory.IO_STOREFILE_BLOOM_ENABLED, true);
601
602    // write the file
603    Path f = new Path(ROOT_DIR, getName());
604    HFileContext meta = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL)
605                        .withChecksumType(CKTYPE)
606                        .withBytesPerCheckSum(CKBYTES).build();
607    // Make a store file and write data to it.
608    StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs)
609            .withFilePath(f)
610            .withBloomType(BloomType.ROW)
611            .withMaxKeyCount(2000)
612            .withFileContext(meta)
613            .build();
614    bloomWriteRead(writer, fs);
615  }
616
617  @Test
618  public void testDeleteFamilyBloomFilter() throws Exception {
619    FileSystem fs = FileSystem.getLocal(conf);
620    conf.setFloat(BloomFilterFactory.IO_STOREFILE_BLOOM_ERROR_RATE, (float) 0.01);
621    conf.setBoolean(BloomFilterFactory.IO_STOREFILE_BLOOM_ENABLED, true);
622    float err = conf.getFloat(BloomFilterFactory.IO_STOREFILE_BLOOM_ERROR_RATE, 0);
623
624    // write the file
625    Path f = new Path(ROOT_DIR, getName());
626
627    HFileContext meta = new HFileContextBuilder()
628                        .withBlockSize(BLOCKSIZE_SMALL)
629                        .withChecksumType(CKTYPE)
630                        .withBytesPerCheckSum(CKBYTES).build();
631    // Make a store file and write data to it.
632    StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs)
633            .withFilePath(f)
634            .withMaxKeyCount(2000)
635            .withFileContext(meta)
636            .build();
637
638    // add delete family
639    long now = System.currentTimeMillis();
640    for (int i = 0; i < 2000; i += 2) {
641      String row = String.format(localFormatter, i);
642      KeyValue kv = new KeyValue(Bytes.toBytes(row), Bytes.toBytes("family"),
643          Bytes.toBytes("col"), now, KeyValue.Type.DeleteFamily, Bytes.toBytes("value"));
644      writer.append(kv);
645    }
646    writer.close();
647
648    ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, f).build();
649    HFileInfo fileInfo = new HFileInfo(context, conf);
650    StoreFileReader reader =
651        new StoreFileReader(context, fileInfo, cacheConf, new AtomicInteger(0), conf);
652    fileInfo.initMetaAndIndex(reader.getHFileReader());
653    reader.loadFileInfo();
654    reader.loadBloomfilter();
655
656    // check false positives rate
657    int falsePos = 0;
658    int falseNeg = 0;
659    for (int i = 0; i < 2000; i++) {
660      String row = String.format(localFormatter, i);
661      byte[] rowKey = Bytes.toBytes(row);
662      boolean exists = reader.passesDeleteFamilyBloomFilter(rowKey, 0, rowKey.length);
663      if (i % 2 == 0) {
664        if (!exists) {
665          falseNeg++;
666        }
667      } else {
668        if (exists) {
669          falsePos++;
670        }
671      }
672    }
673    assertEquals(1000, reader.getDeleteFamilyCnt());
674    reader.close(true); // evict because we are about to delete the file
675    fs.delete(f, true);
676    assertEquals("False negatives: " + falseNeg, 0, falseNeg);
677    int maxFalsePos = (int) (2 * 2000 * err);
678    assertTrue("Too many false positives: " + falsePos + " (err=" + err
679        + ", expected no more than " + maxFalsePos, falsePos <= maxFalsePos);
680  }
681
682  /**
683   * Test for HBASE-8012
684   */
685  @Test
686  public void testReseek() throws Exception {
687    // write the file
688    Path f = new Path(ROOT_DIR, getName());
689    HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
690    // Make a store file and write data to it.
691    StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs)
692            .withFilePath(f)
693            .withFileContext(meta)
694            .build();
695
696    writeStoreFile(writer);
697    writer.close();
698
699    ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, f).build();
700    HFileInfo fileInfo = new HFileInfo(context, conf);
701    StoreFileReader reader =
702        new StoreFileReader(context, fileInfo, cacheConf, new AtomicInteger(0), conf);
703    fileInfo.initMetaAndIndex(reader.getHFileReader());
704
705    // Now do reseek with empty KV to position to the beginning of the file
706
707    KeyValue k = KeyValueUtil.createFirstOnRow(HConstants.EMPTY_BYTE_ARRAY);
708    StoreFileScanner s = getStoreFileScanner(reader, false, false);
709    s.reseek(k);
710
711    assertNotNull("Intial reseek should position at the beginning of the file", s.peek());
712  }
713
714  @Test
715  public void testBloomTypes() throws Exception {
716    float err = (float) 0.01;
717    FileSystem fs = FileSystem.getLocal(conf);
718    conf.setFloat(BloomFilterFactory.IO_STOREFILE_BLOOM_ERROR_RATE, err);
719    conf.setBoolean(BloomFilterFactory.IO_STOREFILE_BLOOM_ENABLED, true);
720
721    int rowCount = 50;
722    int colCount = 10;
723    int versions = 2;
724
725    // run once using columns and once using rows
726    BloomType[] bt = {BloomType.ROWCOL, BloomType.ROW};
727    int[] expKeys  = {rowCount*colCount, rowCount};
728    // below line deserves commentary.  it is expected bloom false positives
729    //  column = rowCount*2*colCount inserts
730    //  row-level = only rowCount*2 inserts, but failures will be magnified by
731    //              2nd for loop for every column (2*colCount)
732    float[] expErr   = {2*rowCount*colCount*err, 2*rowCount*2*colCount*err};
733
734    for (int x : new int[]{0,1}) {
735      // write the file
736      Path f = new Path(ROOT_DIR, getName() + x);
737      HFileContext meta = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL)
738          .withChecksumType(CKTYPE)
739          .withBytesPerCheckSum(CKBYTES).build();
740      // Make a store file and write data to it.
741      StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs)
742              .withFilePath(f)
743              .withBloomType(bt[x])
744              .withMaxKeyCount(expKeys[x])
745              .withFileContext(meta)
746              .build();
747
748      long now = System.currentTimeMillis();
749      for (int i = 0; i < rowCount*2; i += 2) { // rows
750        for (int j = 0; j < colCount*2; j += 2) {   // column qualifiers
751          String row = String.format(localFormatter, i);
752          String col = String.format(localFormatter, j);
753          for (int k= 0; k < versions; ++k) { // versions
754            KeyValue kv = new KeyValue(Bytes.toBytes(row), Bytes.toBytes("family"),
755                Bytes.toBytes("col" + col), now-k, Bytes.toBytes(-1L));
756            writer.append(kv);
757          }
758        }
759      }
760      writer.close();
761
762      ReaderContext context = new ReaderContextBuilder()
763          .withFilePath(f)
764          .withFileSize(fs.getFileStatus(f).getLen())
765          .withFileSystem(fs)
766          .withInputStreamWrapper(new FSDataInputStreamWrapper(fs, f))
767          .build();
768      HFileInfo fileInfo = new HFileInfo(context, conf);
769      StoreFileReader reader =
770          new StoreFileReader(context, fileInfo, cacheConf, new AtomicInteger(0), conf);
771      fileInfo.initMetaAndIndex(reader.getHFileReader());
772      reader.loadFileInfo();
773      reader.loadBloomfilter();
774      StoreFileScanner scanner = getStoreFileScanner(reader, false, false);
775      assertEquals(expKeys[x], reader.getGeneralBloomFilter().getKeyCount());
776
777      HStore store = mock(HStore.class);
778      when(store.getColumnFamilyDescriptor())
779          .thenReturn(ColumnFamilyDescriptorBuilder.of("family"));
780      // check false positives rate
781      int falsePos = 0;
782      int falseNeg = 0;
783      for (int i = 0; i < rowCount*2; ++i) { // rows
784        for (int j = 0; j < colCount*2; ++j) {   // column qualifiers
785          String row = String.format(localFormatter, i);
786          String col = String.format(localFormatter, j);
787          TreeSet<byte[]> columns = new TreeSet<>(Bytes.BYTES_COMPARATOR);
788          columns.add(Bytes.toBytes("col" + col));
789
790          Scan scan = new Scan(Bytes.toBytes(row),Bytes.toBytes(row));
791          scan.addColumn(Bytes.toBytes("family"), Bytes.toBytes(("col"+col)));
792
793          boolean exists =
794              scanner.shouldUseScanner(scan, store, Long.MIN_VALUE);
795          boolean shouldRowExist = i % 2 == 0;
796          boolean shouldColExist = j % 2 == 0;
797          shouldColExist = shouldColExist || bt[x] == BloomType.ROW;
798          if (shouldRowExist && shouldColExist) {
799            if (!exists) {
800              falseNeg++;
801            }
802          } else {
803            if (exists) {
804              falsePos++;
805            }
806          }
807        }
808      }
809      reader.close(true); // evict because we are about to delete the file
810      fs.delete(f, true);
811      System.out.println(bt[x].toString());
812      System.out.println("  False negatives: " + falseNeg);
813      System.out.println("  False positives: " + falsePos);
814      assertEquals(0, falseNeg);
815      assertTrue(falsePos < 2*expErr[x]);
816    }
817  }
818
819  @Test
820  public void testSeqIdComparator() {
821    assertOrdering(StoreFileComparators.SEQ_ID, mockStoreFile(true, 100, 1000, -1, "/foo/123"),
822        mockStoreFile(true, 100, 1000, -1, "/foo/124"),
823        mockStoreFile(true, 99, 1000, -1, "/foo/126"),
824        mockStoreFile(true, 98, 2000, -1, "/foo/126"), mockStoreFile(false, 3453, -1, 1, "/foo/1"),
825        mockStoreFile(false, 2, -1, 3, "/foo/2"), mockStoreFile(false, 1000, -1, 5, "/foo/2"),
826        mockStoreFile(false, 76, -1, 5, "/foo/3"));
827  }
828
829  /**
830   * Assert that the given comparator orders the given storefiles in the
831   * same way that they're passed.
832   */
833  private void assertOrdering(Comparator<? super HStoreFile> comparator, HStoreFile ... sfs) {
834    ArrayList<HStoreFile> sorted = Lists.newArrayList(sfs);
835    Collections.shuffle(sorted);
836    Collections.sort(sorted, comparator);
837    LOG.debug("sfs: " + Joiner.on(",").join(sfs));
838    LOG.debug("sorted: " + Joiner.on(",").join(sorted));
839    assertTrue(Iterables.elementsEqual(Arrays.asList(sfs), sorted));
840  }
841
842  /**
843   * Create a mock StoreFile with the given attributes.
844   */
845  private HStoreFile mockStoreFile(boolean bulkLoad,
846                                  long size,
847                                  long bulkTimestamp,
848                                  long seqId,
849                                  String path) {
850    HStoreFile mock = Mockito.mock(HStoreFile.class);
851    StoreFileReader reader = Mockito.mock(StoreFileReader.class);
852
853    Mockito.doReturn(size).when(reader).length();
854
855    Mockito.doReturn(reader).when(mock).getReader();
856    Mockito.doReturn(bulkLoad).when(mock).isBulkLoadResult();
857    Mockito.doReturn(OptionalLong.of(bulkTimestamp)).when(mock).getBulkLoadTimestamp();
858    Mockito.doReturn(seqId).when(mock).getMaxSequenceId();
859    Mockito.doReturn(new Path(path)).when(mock).getPath();
860    String name = "mock storefile, bulkLoad=" + bulkLoad +
861      " bulkTimestamp=" + bulkTimestamp +
862      " seqId=" + seqId +
863      " path=" + path;
864    Mockito.doReturn(name).when(mock).toString();
865    return mock;
866  }
867
868  /**
869   * Generate a list of KeyValues for testing based on given parameters
870   * @return the rows key-value list
871   */
872  List<KeyValue> getKeyValueSet(long[] timestamps, int numRows,
873      byte[] qualifier, byte[] family) {
874    List<KeyValue> kvList = new ArrayList<>();
875    for (int i=1;i<=numRows;i++) {
876      byte[] b = Bytes.toBytes(i) ;
877      LOG.info(Bytes.toString(b));
878      LOG.info(Bytes.toString(b));
879      for (long timestamp: timestamps) {
880        kvList.add(new KeyValue(b, family, qualifier, timestamp, b));
881      }
882    }
883    return kvList;
884  }
885
886  /**
887   * Test to ensure correctness when using StoreFile with multiple timestamps
888   */
889  @Test
890  public void testMultipleTimestamps() throws IOException {
891    byte[] family = Bytes.toBytes("familyname");
892    byte[] qualifier = Bytes.toBytes("qualifier");
893    int numRows = 10;
894    long[] timestamps = new long[] {20,10,5,1};
895    Scan scan = new Scan();
896
897    // Make up a directory hierarchy that has a regiondir ("7e0102") and familyname.
898    Path storedir = new Path(new Path(testDir, "7e0102"), Bytes.toString(family));
899    Path dir = new Path(storedir, "1234567890");
900    HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
901    // Make a store file and write data to it.
902    StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs)
903            .withOutputDir(dir)
904            .withFileContext(meta)
905            .build();
906
907    List<KeyValue> kvList = getKeyValueSet(timestamps,numRows,
908        qualifier, family);
909
910    for (KeyValue kv : kvList) {
911      writer.append(kv);
912    }
913    writer.appendMetadata(0, false);
914    writer.close();
915
916    HStoreFile hsf = new HStoreFile(this.fs, writer.getPath(), conf, cacheConf,
917      BloomType.NONE, true);
918    HStore store = mock(HStore.class);
919    when(store.getColumnFamilyDescriptor()).thenReturn(ColumnFamilyDescriptorBuilder.of(family));
920    hsf.initReader();
921    StoreFileReader reader = hsf.getReader();
922    StoreFileScanner scanner = getStoreFileScanner(reader, false, false);
923    TreeSet<byte[]> columns = new TreeSet<>(Bytes.BYTES_COMPARATOR);
924    columns.add(qualifier);
925
926    scan.setTimeRange(20, 100);
927    assertTrue(scanner.shouldUseScanner(scan, store, Long.MIN_VALUE));
928
929    scan.setTimeRange(1, 2);
930    assertTrue(scanner.shouldUseScanner(scan, store, Long.MIN_VALUE));
931
932    scan.setTimeRange(8, 10);
933    assertTrue(scanner.shouldUseScanner(scan, store, Long.MIN_VALUE));
934
935    // lets make sure it still works with column family time ranges
936    scan.setColumnFamilyTimeRange(family, 7, 50);
937    assertTrue(scanner.shouldUseScanner(scan, store, Long.MIN_VALUE));
938
939    // This test relies on the timestamp range optimization
940    scan = new Scan();
941    scan.setTimeRange(27, 50);
942    assertTrue(!scanner.shouldUseScanner(scan, store, Long.MIN_VALUE));
943
944    // should still use the scanner because we override the family time range
945    scan = new Scan();
946    scan.setTimeRange(27, 50);
947    scan.setColumnFamilyTimeRange(family, 7, 50);
948    assertTrue(scanner.shouldUseScanner(scan, store, Long.MIN_VALUE));
949  }
950
951  @Test
952  public void testCacheOnWriteEvictOnClose() throws Exception {
953    Configuration conf = this.conf;
954
955    // Find a home for our files (regiondir ("7e0102") and familyname).
956    Path baseDir = new Path(new Path(testDir, "7e0102"),"twoCOWEOC");
957
958    // Grab the block cache and get the initial hit/miss counts
959    BlockCache bc = BlockCacheFactory.createBlockCache(conf);
960    assertNotNull(bc);
961    CacheStats cs = bc.getStats();
962    long startHit = cs.getHitCount();
963    long startMiss = cs.getMissCount();
964    long startEvicted = cs.getEvictedCount();
965
966    // Let's write a StoreFile with three blocks, with cache on write off
967    conf.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, false);
968    CacheConfig cacheConf = new CacheConfig(conf, bc);
969    Path pathCowOff = new Path(baseDir, "123456789");
970    StoreFileWriter writer = writeStoreFile(conf, cacheConf, pathCowOff, 3);
971    HStoreFile hsf = new HStoreFile(this.fs, writer.getPath(), conf, cacheConf,
972      BloomType.NONE, true);
973    LOG.debug(hsf.getPath().toString());
974
975    // Read this file, we should see 3 misses
976    hsf.initReader();
977    StoreFileReader reader = hsf.getReader();
978    reader.loadFileInfo();
979    StoreFileScanner scanner = getStoreFileScanner(reader, true, true);
980    scanner.seek(KeyValue.LOWESTKEY);
981    while (scanner.next() != null) {
982      continue;
983    }
984    assertEquals(startHit, cs.getHitCount());
985    assertEquals(startMiss + 3, cs.getMissCount());
986    assertEquals(startEvicted, cs.getEvictedCount());
987    startMiss += 3;
988    scanner.close();
989    reader.close(cacheConf.shouldEvictOnClose());
990
991    // Now write a StoreFile with three blocks, with cache on write on
992    conf.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, true);
993    cacheConf = new CacheConfig(conf, bc);
994    Path pathCowOn = new Path(baseDir, "123456788");
995    writer = writeStoreFile(conf, cacheConf, pathCowOn, 3);
996    hsf = new HStoreFile(this.fs, writer.getPath(), conf, cacheConf,
997      BloomType.NONE, true);
998
999    // Read this file, we should see 3 hits
1000    hsf.initReader();
1001    reader = hsf.getReader();
1002    scanner = getStoreFileScanner(reader, true, true);
1003    scanner.seek(KeyValue.LOWESTKEY);
1004    while (scanner.next() != null) {
1005      continue;
1006    }
1007    assertEquals(startHit + 3, cs.getHitCount());
1008    assertEquals(startMiss, cs.getMissCount());
1009    assertEquals(startEvicted, cs.getEvictedCount());
1010    startHit += 3;
1011    scanner.close();
1012    reader.close(cacheConf.shouldEvictOnClose());
1013
1014    // Let's read back the two files to ensure the blocks exactly match
1015    hsf = new HStoreFile(this.fs, pathCowOff, conf, cacheConf, BloomType.NONE, true);
1016    hsf.initReader();
1017    StoreFileReader readerOne = hsf.getReader();
1018    readerOne.loadFileInfo();
1019    StoreFileScanner scannerOne = getStoreFileScanner(readerOne, true, true);
1020    scannerOne.seek(KeyValue.LOWESTKEY);
1021    hsf = new HStoreFile(this.fs, pathCowOn, conf, cacheConf, BloomType.NONE, true);
1022    hsf.initReader();
1023    StoreFileReader readerTwo = hsf.getReader();
1024    readerTwo.loadFileInfo();
1025    StoreFileScanner scannerTwo = getStoreFileScanner(readerTwo, true, true);
1026    scannerTwo.seek(KeyValue.LOWESTKEY);
1027    Cell kv1 = null;
1028    Cell kv2 = null;
1029    while ((kv1 = scannerOne.next()) != null) {
1030      kv2 = scannerTwo.next();
1031      assertTrue(kv1.equals(kv2));
1032      KeyValue keyv1 = KeyValueUtil.ensureKeyValue(kv1);
1033      KeyValue keyv2 = KeyValueUtil.ensureKeyValue(kv2);
1034      assertTrue(Bytes.compareTo(
1035          keyv1.getBuffer(), keyv1.getKeyOffset(), keyv1.getKeyLength(),
1036          keyv2.getBuffer(), keyv2.getKeyOffset(), keyv2.getKeyLength()) == 0);
1037      assertTrue(Bytes.compareTo(
1038          kv1.getValueArray(), kv1.getValueOffset(), kv1.getValueLength(),
1039          kv2.getValueArray(), kv2.getValueOffset(), kv2.getValueLength()) == 0);
1040    }
1041    assertNull(scannerTwo.next());
1042    assertEquals(startHit + 6, cs.getHitCount());
1043    assertEquals(startMiss, cs.getMissCount());
1044    assertEquals(startEvicted, cs.getEvictedCount());
1045    startHit += 6;
1046    scannerOne.close();
1047    readerOne.close(cacheConf.shouldEvictOnClose());
1048    scannerTwo.close();
1049    readerTwo.close(cacheConf.shouldEvictOnClose());
1050
1051    // Let's close the first file with evict on close turned on
1052    conf.setBoolean("hbase.rs.evictblocksonclose", true);
1053    cacheConf = new CacheConfig(conf, bc);
1054    hsf = new HStoreFile(this.fs, pathCowOff, conf, cacheConf, BloomType.NONE, true);
1055    hsf.initReader();
1056    reader = hsf.getReader();
1057    reader.close(cacheConf.shouldEvictOnClose());
1058
1059    // We should have 3 new evictions but the evict count stat should not change. Eviction because
1060    // of HFile invalidation is not counted along with normal evictions
1061    assertEquals(startHit, cs.getHitCount());
1062    assertEquals(startMiss, cs.getMissCount());
1063    assertEquals(startEvicted, cs.getEvictedCount());
1064
1065    // Let's close the second file with evict on close turned off
1066    conf.setBoolean("hbase.rs.evictblocksonclose", false);
1067    cacheConf = new CacheConfig(conf, bc);
1068    hsf = new HStoreFile(this.fs, pathCowOn, conf, cacheConf, BloomType.NONE, true);
1069    hsf.initReader();
1070    reader = hsf.getReader();
1071    reader.close(cacheConf.shouldEvictOnClose());
1072
1073    // We expect no changes
1074    assertEquals(startHit, cs.getHitCount());
1075    assertEquals(startMiss, cs.getMissCount());
1076    assertEquals(startEvicted, cs.getEvictedCount());
1077  }
1078
1079  private Path splitStoreFile(final HRegionFileSystem regionFs, final HRegionInfo hri,
1080      final String family, final HStoreFile sf, final byte[] splitKey, boolean isTopRef)
1081      throws IOException {
1082    FileSystem fs = regionFs.getFileSystem();
1083    Path path = regionFs.splitStoreFile(hri, family, sf, splitKey, isTopRef, null);
1084    if (null == path) {
1085      return null;
1086    }
1087    Path regionDir = regionFs.commitDaughterRegion(hri);
1088    return new Path(new Path(regionDir, family), path.getName());
1089  }
1090
1091  private StoreFileWriter writeStoreFile(Configuration conf, CacheConfig cacheConf, Path path,
1092      int numBlocks) throws IOException {
1093    // Let's put ~5 small KVs in each block, so let's make 5*numBlocks KVs
1094    int numKVs = 5 * numBlocks;
1095    List<KeyValue> kvs = new ArrayList<>(numKVs);
1096    byte [] b = Bytes.toBytes("x");
1097    int totalSize = 0;
1098    for (int i=numKVs;i>0;i--) {
1099      KeyValue kv = new KeyValue(b, b, b, i, b);
1100      kvs.add(kv);
1101      // kv has memstoreTS 0, which takes 1 byte to store.
1102      totalSize += kv.getLength() + 1;
1103    }
1104    int blockSize = totalSize / numBlocks;
1105    HFileContext meta = new HFileContextBuilder().withBlockSize(blockSize)
1106                        .withChecksumType(CKTYPE)
1107                        .withBytesPerCheckSum(CKBYTES)
1108                        .build();
1109    // Make a store file and write data to it.
1110    StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs)
1111            .withFilePath(path)
1112            .withMaxKeyCount(2000)
1113            .withFileContext(meta)
1114            .build();
1115    // We'll write N-1 KVs to ensure we don't write an extra block
1116    kvs.remove(kvs.size()-1);
1117    for (KeyValue kv : kvs) {
1118      writer.append(kv);
1119    }
1120    writer.appendMetadata(0, false);
1121    writer.close();
1122    return writer;
1123  }
1124
1125  /**
1126   * Check if data block encoding information is saved correctly in HFile's
1127   * file info.
1128   */
1129  @Test
1130  public void testDataBlockEncodingMetaData() throws IOException {
1131    // Make up a directory hierarchy that has a regiondir ("7e0102") and familyname.
1132    Path dir = new Path(new Path(testDir, "7e0102"), "familyname");
1133    Path path = new Path(dir, "1234567890");
1134
1135    DataBlockEncoding dataBlockEncoderAlgo =
1136        DataBlockEncoding.FAST_DIFF;
1137    HFileDataBlockEncoder dataBlockEncoder =
1138        new HFileDataBlockEncoderImpl(
1139            dataBlockEncoderAlgo);
1140    cacheConf = new CacheConfig(conf);
1141    HFileContext meta = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL)
1142        .withChecksumType(CKTYPE)
1143        .withBytesPerCheckSum(CKBYTES)
1144        .withDataBlockEncoding(dataBlockEncoderAlgo)
1145        .build();
1146    // Make a store file and write data to it.
1147    StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs)
1148            .withFilePath(path)
1149            .withMaxKeyCount(2000)
1150            .withFileContext(meta)
1151            .build();
1152    writer.close();
1153
1154    HStoreFile storeFile =
1155        new HStoreFile(fs, writer.getPath(), conf, cacheConf, BloomType.NONE, true);
1156    storeFile.initReader();
1157    StoreFileReader reader = storeFile.getReader();
1158
1159    Map<byte[], byte[]> fileInfo = reader.loadFileInfo();
1160    byte[] value = fileInfo.get(HFileDataBlockEncoder.DATA_BLOCK_ENCODING);
1161    assertEquals(dataBlockEncoderAlgo.getNameInBytes(), value);
1162  }
1163}