001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.mockito.Mockito.mock;
021import static org.mockito.Mockito.when;
022
023import java.io.IOException;
024import java.nio.ByteBuffer;
025import java.util.ArrayList;
026import java.util.Arrays;
027import java.util.Collections;
028import java.util.Comparator;
029import java.util.List;
030import java.util.Map;
031import java.util.OptionalLong;
032import java.util.TreeSet;
033import java.util.concurrent.atomic.AtomicInteger;
034
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.fs.FileSystem;
037import org.apache.hadoop.fs.Path;
038import org.apache.hadoop.hbase.Cell;
039import org.apache.hadoop.hbase.CellUtil;
040import org.apache.hadoop.hbase.HBaseClassTestRule;
041import org.apache.hadoop.hbase.HBaseTestCase;
042import org.apache.hadoop.hbase.HBaseTestingUtility;
043import org.apache.hadoop.hbase.HConstants;
044import org.apache.hadoop.hbase.HRegionInfo;
045import org.apache.hadoop.hbase.KeyValue;
046import org.apache.hadoop.hbase.KeyValueUtil;
047import org.apache.hadoop.hbase.PrivateCellUtil;
048import org.apache.hadoop.hbase.TableName;
049import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
050import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
051import org.apache.hadoop.hbase.client.RegionInfo;
052import org.apache.hadoop.hbase.client.RegionInfoBuilder;
053import org.apache.hadoop.hbase.client.Scan;
054import org.apache.hadoop.hbase.io.HFileLink;
055import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
056import org.apache.hadoop.hbase.io.hfile.BlockCache;
057import org.apache.hadoop.hbase.io.hfile.CacheConfig;
058import org.apache.hadoop.hbase.io.hfile.CacheStats;
059import org.apache.hadoop.hbase.io.hfile.HFileContext;
060import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
061import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
062import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
063import org.apache.hadoop.hbase.io.hfile.HFileScanner;
064import org.apache.hadoop.hbase.testclassification.RegionServerTests;
065import org.apache.hadoop.hbase.testclassification.SmallTests;
066import org.apache.hadoop.hbase.util.BloomFilterFactory;
067import org.apache.hadoop.hbase.util.Bytes;
068import org.apache.hadoop.hbase.util.ChecksumType;
069import org.apache.hadoop.hbase.util.FSUtils;
070import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
071import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
072import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
073import org.junit.After;
074import org.junit.Before;
075import org.junit.ClassRule;
076import org.junit.Test;
077import org.junit.experimental.categories.Category;
078import org.mockito.Mockito;
079import org.slf4j.Logger;
080import org.slf4j.LoggerFactory;
081
082/**
083 * Test HStoreFile
084 */
085@Category({RegionServerTests.class, SmallTests.class})
086public class TestHStoreFile extends HBaseTestCase {
087
088  @ClassRule
089  public static final HBaseClassTestRule CLASS_RULE =
090      HBaseClassTestRule.forClass(TestHStoreFile.class);
091
092  private static final Logger LOG = LoggerFactory.getLogger(TestHStoreFile.class);
093  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
094  private CacheConfig cacheConf =  new CacheConfig(TEST_UTIL.getConfiguration());
095  private static String ROOT_DIR = TEST_UTIL.getDataTestDir("TestStoreFile").toString();
096  private static final ChecksumType CKTYPE = ChecksumType.CRC32C;
097  private static final int CKBYTES = 512;
098  private static String TEST_FAMILY = "cf";
099
100  @Override
101  @Before
102  public void setUp() throws Exception {
103    super.setUp();
104  }
105
106  @Override
107  @After
108  public void tearDown() throws Exception {
109    super.tearDown();
110  }
111
112  /**
113   * Write a file and then assert that we can read from top and bottom halves
114   * using two HalfMapFiles.
115   * @throws Exception
116   */
117  @Test
118  public void testBasicHalfMapFile() throws Exception {
119    final HRegionInfo hri =
120        new HRegionInfo(TableName.valueOf("testBasicHalfMapFileTb"));
121    HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(
122      conf, fs, new Path(testDir, hri.getTable().getNameAsString()), hri);
123
124    HFileContext meta = new HFileContextBuilder().withBlockSize(2*1024).build();
125    StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs)
126            .withFilePath(regionFs.createTempName())
127            .withFileContext(meta)
128            .build();
129    writeStoreFile(writer);
130
131    Path sfPath = regionFs.commitStoreFile(TEST_FAMILY, writer.getPath());
132    HStoreFile sf = new HStoreFile(this.fs, sfPath, conf, cacheConf, BloomType.NONE, true);
133    checkHalfHFile(regionFs, sf);
134  }
135
136  private void writeStoreFile(final StoreFileWriter writer) throws IOException {
137    writeStoreFile(writer, Bytes.toBytes(getName()), Bytes.toBytes(getName()));
138  }
139
140  // pick an split point (roughly halfway)
141  byte[] SPLITKEY = new byte[] { (LAST_CHAR + FIRST_CHAR)/2, FIRST_CHAR};
142
143  /*
144   * Writes HStoreKey and ImmutableBytes data to passed writer and
145   * then closes it.
146   * @param writer
147   * @throws IOException
148   */
149  public static void writeStoreFile(final StoreFileWriter writer, byte[] fam, byte[] qualifier)
150  throws IOException {
151    long now = System.currentTimeMillis();
152    try {
153      for (char d = FIRST_CHAR; d <= LAST_CHAR; d++) {
154        for (char e = FIRST_CHAR; e <= LAST_CHAR; e++) {
155          byte[] b = new byte[] { (byte) d, (byte) e };
156          writer.append(new KeyValue(b, fam, qualifier, now, b));
157        }
158      }
159    } finally {
160      writer.close();
161    }
162  }
163
164  /**
165   * Test that our mechanism of writing store files in one region to reference
166   * store files in other regions works.
167   * @throws IOException
168   */
169  @Test
170  public void testReference() throws IOException {
171    final HRegionInfo hri = new HRegionInfo(TableName.valueOf("testReferenceTb"));
172    HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(
173      conf, fs, new Path(testDir, hri.getTable().getNameAsString()), hri);
174
175    HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
176    // Make a store file and write data to it.
177    StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs)
178            .withFilePath(regionFs.createTempName())
179            .withFileContext(meta)
180            .build();
181    writeStoreFile(writer);
182
183    Path hsfPath = regionFs.commitStoreFile(TEST_FAMILY, writer.getPath());
184    HStoreFile hsf = new HStoreFile(this.fs, hsfPath, conf, cacheConf, BloomType.NONE, true);
185    hsf.initReader();
186    StoreFileReader reader = hsf.getReader();
187    // Split on a row, not in middle of row.  Midkey returned by reader
188    // may be in middle of row.  Create new one with empty column and
189    // timestamp.
190    byte [] midRow = CellUtil.cloneRow(reader.midKey().get());
191    byte [] finalRow = CellUtil.cloneRow(reader.getLastKey().get());
192    hsf.closeStoreFile(true);
193
194    // Make a reference
195    HRegionInfo splitHri = new HRegionInfo(hri.getTable(), null, midRow);
196    Path refPath = splitStoreFile(regionFs, splitHri, TEST_FAMILY, hsf, midRow, true);
197    HStoreFile refHsf = new HStoreFile(this.fs, refPath, conf, cacheConf, BloomType.NONE, true);
198    refHsf.initReader();
199    // Now confirm that I can read from the reference and that it only gets
200    // keys from top half of the file.
201    HFileScanner s = refHsf.getReader().getScanner(false, false);
202    Cell kv = null;
203    for (boolean first = true; (!s.isSeeked() && s.seekTo()) || s.next();) {
204      ByteBuffer bb = ByteBuffer.wrap(((KeyValue) s.getKey()).getKey());
205      kv = KeyValueUtil.createKeyValueFromKey(bb);
206      if (first) {
207        assertTrue(Bytes.equals(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(), midRow, 0,
208          midRow.length));
209        first = false;
210      }
211    }
212    assertTrue(Bytes.equals(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(), finalRow, 0,
213      finalRow.length));
214  }
215
216  @Test
217  public void testStoreFileReference() throws Exception {
218    final RegionInfo hri =
219        RegionInfoBuilder.newBuilder(TableName.valueOf("testStoreFileReference")).build();
220    HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(conf, fs,
221      new Path(testDir, hri.getTable().getNameAsString()), hri);
222    HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
223
224    // Make a store file and write data to it.
225    StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs)
226        .withFilePath(regionFs.createTempName()).withFileContext(meta).build();
227    writeStoreFile(writer);
228    Path hsfPath = regionFs.commitStoreFile(TEST_FAMILY, writer.getPath());
229    writer.close();
230
231    HStoreFile file = new HStoreFile(this.fs, hsfPath, conf, cacheConf, BloomType.NONE, true);
232    file.initReader();
233    StoreFileReader r = file.getReader();
234    assertNotNull(r);
235    StoreFileScanner scanner =
236        new StoreFileScanner(r, mock(HFileScanner.class), false, false, 0, 0, false);
237
238    // Verify after instantiating scanner refCount is increased
239    assertTrue("Verify file is being referenced", file.isReferencedInReads());
240    scanner.close();
241    // Verify after closing scanner refCount is decreased
242    assertFalse("Verify file is not being referenced", file.isReferencedInReads());
243  }
244
245  @Test
246  public void testEmptyStoreFileRestrictKeyRanges() throws Exception {
247    StoreFileReader reader = mock(StoreFileReader.class);
248    HStore store = mock(HStore.class);
249    byte[] cf = Bytes.toBytes("ty");
250    ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder.of(cf);
251    when(store.getColumnFamilyDescriptor()).thenReturn(cfd);
252    StoreFileScanner scanner =
253        new StoreFileScanner(reader, mock(HFileScanner.class), false, false, 0, 0, true);
254    Scan scan = new Scan();
255    scan.setColumnFamilyTimeRange(cf, 0, 1);
256    assertFalse(scanner.shouldUseScanner(scan, store, 0));
257  }
258
259  @Test
260  public void testHFileLink() throws IOException {
261    final HRegionInfo hri = new HRegionInfo(TableName.valueOf("testHFileLinkTb"));
262    // force temp data in hbase/target/test-data instead of /tmp/hbase-xxxx/
263    Configuration testConf = new Configuration(this.conf);
264    FSUtils.setRootDir(testConf, testDir);
265    HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(
266      testConf, fs, FSUtils.getTableDir(testDir, hri.getTable()), hri);
267    HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
268
269    // Make a store file and write data to it.
270    StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs)
271            .withFilePath(regionFs.createTempName())
272            .withFileContext(meta)
273            .build();
274    writeStoreFile(writer);
275
276    Path storeFilePath = regionFs.commitStoreFile(TEST_FAMILY, writer.getPath());
277    Path dstPath = new Path(regionFs.getTableDir(), new Path("test-region", TEST_FAMILY));
278    HFileLink.create(testConf, this.fs, dstPath, hri, storeFilePath.getName());
279    Path linkFilePath = new Path(dstPath,
280                  HFileLink.createHFileLinkName(hri, storeFilePath.getName()));
281
282    // Try to open store file from link
283    StoreFileInfo storeFileInfo = new StoreFileInfo(testConf, this.fs, linkFilePath);
284    HStoreFile hsf =
285        new HStoreFile(this.fs, storeFileInfo, testConf, cacheConf, BloomType.NONE, true);
286    assertTrue(storeFileInfo.isLink());
287    hsf.initReader();
288
289    // Now confirm that I can read from the link
290    int count = 1;
291    HFileScanner s = hsf.getReader().getScanner(false, false);
292    s.seekTo();
293    while (s.next()) {
294      count++;
295    }
296    assertEquals((LAST_CHAR - FIRST_CHAR + 1) * (LAST_CHAR - FIRST_CHAR + 1), count);
297  }
298
299  /**
300   * This test creates an hfile and then the dir structures and files to verify that references
301   * to hfilelinks (created by snapshot clones) can be properly interpreted.
302   */
303  @Test
304  public void testReferenceToHFileLink() throws IOException {
305    // force temp data in hbase/target/test-data instead of /tmp/hbase-xxxx/
306    Configuration testConf = new Configuration(this.conf);
307    FSUtils.setRootDir(testConf, testDir);
308
309    // adding legal table name chars to verify regex handles it.
310    HRegionInfo hri = new HRegionInfo(TableName.valueOf("_original-evil-name"));
311    HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(
312      testConf, fs, FSUtils.getTableDir(testDir, hri.getTable()), hri);
313
314    HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
315    // Make a store file and write data to it. <root>/<tablename>/<rgn>/<cf>/<file>
316    StoreFileWriter writer = new StoreFileWriter.Builder(testConf, cacheConf, this.fs)
317            .withFilePath(regionFs.createTempName())
318            .withFileContext(meta)
319            .build();
320    writeStoreFile(writer);
321    Path storeFilePath = regionFs.commitStoreFile(TEST_FAMILY, writer.getPath());
322
323    // create link to store file. <root>/clone/region/<cf>/<hfile>-<region>-<table>
324    HRegionInfo hriClone = new HRegionInfo(TableName.valueOf("clone"));
325    HRegionFileSystem cloneRegionFs = HRegionFileSystem.createRegionOnFileSystem(
326      testConf, fs, FSUtils.getTableDir(testDir, hri.getTable()),
327        hriClone);
328    Path dstPath = cloneRegionFs.getStoreDir(TEST_FAMILY);
329    HFileLink.create(testConf, this.fs, dstPath, hri, storeFilePath.getName());
330    Path linkFilePath = new Path(dstPath,
331                  HFileLink.createHFileLinkName(hri, storeFilePath.getName()));
332
333    // create splits of the link.
334    // <root>/clone/splitA/<cf>/<reftohfilelink>,
335    // <root>/clone/splitB/<cf>/<reftohfilelink>
336    HRegionInfo splitHriA = new HRegionInfo(hri.getTable(), null, SPLITKEY);
337    HRegionInfo splitHriB = new HRegionInfo(hri.getTable(), SPLITKEY, null);
338    HStoreFile f = new HStoreFile(fs, linkFilePath, testConf, cacheConf, BloomType.NONE, true);
339    f.initReader();
340    Path pathA = splitStoreFile(cloneRegionFs, splitHriA, TEST_FAMILY, f, SPLITKEY, true); // top
341    Path pathB = splitStoreFile(cloneRegionFs, splitHriB, TEST_FAMILY, f, SPLITKEY, false);// bottom
342    f.closeStoreFile(true);
343    // OK test the thing
344    FSUtils.logFileSystemState(fs, testDir, LOG);
345
346    // There is a case where a file with the hfilelink pattern is actually a daughter
347    // reference to a hfile link.  This code in StoreFile that handles this case.
348
349    // Try to open store file from link
350    HStoreFile hsfA = new HStoreFile(this.fs, pathA, testConf, cacheConf, BloomType.NONE, true);
351    hsfA.initReader();
352
353    // Now confirm that I can read from the ref to link
354    int count = 1;
355    HFileScanner s = hsfA.getReader().getScanner(false, false);
356    s.seekTo();
357    while (s.next()) {
358      count++;
359    }
360    assertTrue(count > 0); // read some rows here
361
362    // Try to open store file from link
363    HStoreFile hsfB = new HStoreFile(this.fs, pathB, testConf, cacheConf, BloomType.NONE, true);
364    hsfB.initReader();
365
366    // Now confirm that I can read from the ref to link
367    HFileScanner sB = hsfB.getReader().getScanner(false, false);
368    sB.seekTo();
369
370    //count++ as seekTo() will advance the scanner
371    count++;
372    while (sB.next()) {
373      count++;
374    }
375
376    // read the rest of the rows
377    assertEquals((LAST_CHAR - FIRST_CHAR + 1) * (LAST_CHAR - FIRST_CHAR + 1), count);
378  }
379
380  private void checkHalfHFile(final HRegionFileSystem regionFs, final HStoreFile f)
381      throws IOException {
382    f.initReader();
383    Cell midkey = f.getReader().midKey().get();
384    KeyValue midKV = (KeyValue)midkey;
385    byte [] midRow = CellUtil.cloneRow(midKV);
386    // Create top split.
387    HRegionInfo topHri = new HRegionInfo(regionFs.getRegionInfo().getTable(),
388        null, midRow);
389    Path topPath = splitStoreFile(regionFs, topHri, TEST_FAMILY, f, midRow, true);
390    // Create bottom split.
391    HRegionInfo bottomHri = new HRegionInfo(regionFs.getRegionInfo().getTable(),
392        midRow, null);
393    Path bottomPath = splitStoreFile(regionFs, bottomHri, TEST_FAMILY, f, midRow, false);
394    // Make readers on top and bottom.
395    HStoreFile topF = new HStoreFile(this.fs, topPath, conf, cacheConf, BloomType.NONE, true);
396    topF.initReader();
397    StoreFileReader top = topF.getReader();
398    HStoreFile bottomF = new HStoreFile(this.fs, bottomPath, conf, cacheConf, BloomType.NONE, true);
399    bottomF.initReader();
400    StoreFileReader bottom = bottomF.getReader();
401    ByteBuffer previous = null;
402    LOG.info("Midkey: " + midKV.toString());
403    ByteBuffer bbMidkeyBytes = ByteBuffer.wrap(midKV.getKey());
404    try {
405      // Now make two HalfMapFiles and assert they can read the full backing
406      // file, one from the top and the other from the bottom.
407      // Test bottom half first.
408      // Now test reading from the top.
409      boolean first = true;
410      ByteBuffer key = null;
411      HFileScanner topScanner = top.getScanner(false, false);
412      while ((!topScanner.isSeeked() && topScanner.seekTo()) ||
413             (topScanner.isSeeked() && topScanner.next())) {
414        key = ByteBuffer.wrap(((KeyValue) topScanner.getKey()).getKey());
415
416        if ((PrivateCellUtil.compare(topScanner.getReader().getComparator(), midKV, key.array(),
417          key.arrayOffset(), key.limit())) > 0) {
418          fail("key=" + Bytes.toStringBinary(key) + " < midkey=" +
419              midkey);
420        }
421        if (first) {
422          first = false;
423          LOG.info("First in top: " + Bytes.toString(Bytes.toBytes(key)));
424        }
425      }
426      LOG.info("Last in top: " + Bytes.toString(Bytes.toBytes(key)));
427
428      first = true;
429      HFileScanner bottomScanner = bottom.getScanner(false, false);
430      while ((!bottomScanner.isSeeked() && bottomScanner.seekTo()) ||
431          bottomScanner.next()) {
432        previous = ByteBuffer.wrap(((KeyValue) bottomScanner.getKey()).getKey());
433        key = ByteBuffer.wrap(((KeyValue) bottomScanner.getKey()).getKey());
434        if (first) {
435          first = false;
436          LOG.info("First in bottom: " +
437            Bytes.toString(Bytes.toBytes(previous)));
438        }
439        assertTrue(key.compareTo(bbMidkeyBytes) < 0);
440      }
441      if (previous != null) {
442        LOG.info("Last in bottom: " + Bytes.toString(Bytes.toBytes(previous)));
443      }
444      // Remove references.
445      regionFs.cleanupDaughterRegion(topHri);
446      regionFs.cleanupDaughterRegion(bottomHri);
447
448      // Next test using a midkey that does not exist in the file.
449      // First, do a key that is < than first key. Ensure splits behave
450      // properly.
451      byte [] badmidkey = Bytes.toBytes("  .");
452      assertTrue(fs.exists(f.getPath()));
453      topPath = splitStoreFile(regionFs, topHri, TEST_FAMILY, f, badmidkey, true);
454      bottomPath = splitStoreFile(regionFs, bottomHri, TEST_FAMILY, f, badmidkey, false);
455
456      assertNull(bottomPath);
457
458      topF = new HStoreFile(this.fs, topPath, conf, cacheConf, BloomType.NONE, true);
459      topF.initReader();
460      top = topF.getReader();
461      // Now read from the top.
462      first = true;
463      topScanner = top.getScanner(false, false);
464      KeyValue.KeyOnlyKeyValue keyOnlyKV = new KeyValue.KeyOnlyKeyValue();
465      while ((!topScanner.isSeeked() && topScanner.seekTo()) ||
466          topScanner.next()) {
467        key = ByteBuffer.wrap(((KeyValue) topScanner.getKey()).getKey());
468        keyOnlyKV.setKey(key.array(), 0 + key.arrayOffset(), key.limit());
469        assertTrue(PrivateCellUtil.compare(topScanner.getReader().getComparator(), keyOnlyKV,
470          badmidkey, 0, badmidkey.length) >= 0);
471        if (first) {
472          first = false;
473          KeyValue keyKV = KeyValueUtil.createKeyValueFromKey(key);
474          LOG.info("First top when key < bottom: " + keyKV);
475          String tmp =
476              Bytes.toString(keyKV.getRowArray(), keyKV.getRowOffset(), keyKV.getRowLength());
477          for (int i = 0; i < tmp.length(); i++) {
478            assertTrue(tmp.charAt(i) == 'a');
479          }
480        }
481      }
482      KeyValue keyKV = KeyValueUtil.createKeyValueFromKey(key);
483      LOG.info("Last top when key < bottom: " + keyKV);
484      String tmp = Bytes.toString(keyKV.getRowArray(), keyKV.getRowOffset(), keyKV.getRowLength());
485      for (int i = 0; i < tmp.length(); i++) {
486        assertTrue(tmp.charAt(i) == 'z');
487      }
488      // Remove references.
489      regionFs.cleanupDaughterRegion(topHri);
490      regionFs.cleanupDaughterRegion(bottomHri);
491
492      // Test when badkey is > than last key in file ('||' > 'zz').
493      badmidkey = Bytes.toBytes("|||");
494      topPath = splitStoreFile(regionFs,topHri, TEST_FAMILY, f, badmidkey, true);
495      bottomPath = splitStoreFile(regionFs, bottomHri, TEST_FAMILY, f, badmidkey, false);
496      assertNull(topPath);
497
498      bottomF = new HStoreFile(this.fs, bottomPath, conf, cacheConf, BloomType.NONE, true);
499      bottomF.initReader();
500      bottom = bottomF.getReader();
501      first = true;
502      bottomScanner = bottom.getScanner(false, false);
503      while ((!bottomScanner.isSeeked() && bottomScanner.seekTo()) ||
504          bottomScanner.next()) {
505        key = ByteBuffer.wrap(((KeyValue) bottomScanner.getKey()).getKey());
506        if (first) {
507          first = false;
508          keyKV = KeyValueUtil.createKeyValueFromKey(key);
509          LOG.info("First bottom when key > top: " + keyKV);
510          tmp = Bytes.toString(keyKV.getRowArray(), keyKV.getRowOffset(), keyKV.getRowLength());
511          for (int i = 0; i < tmp.length(); i++) {
512            assertTrue(tmp.charAt(i) == 'a');
513          }
514        }
515      }
516      keyKV = KeyValueUtil.createKeyValueFromKey(key);
517      LOG.info("Last bottom when key > top: " + keyKV);
518      for (int i = 0; i < tmp.length(); i++) {
519        assertTrue(Bytes.toString(keyKV.getRowArray(), keyKV.getRowOffset(), keyKV.getRowLength())
520            .charAt(i) == 'z');
521      }
522    } finally {
523      if (top != null) {
524        top.close(true); // evict since we are about to delete the file
525      }
526      if (bottom != null) {
527        bottom.close(true); // evict since we are about to delete the file
528      }
529      fs.delete(f.getPath(), true);
530    }
531  }
532
533  private static StoreFileScanner getStoreFileScanner(StoreFileReader reader, boolean cacheBlocks,
534      boolean pread) {
535    return reader.getStoreFileScanner(cacheBlocks, pread, false, 0, 0, false);
536  }
537
538  private static final String localFormatter = "%010d";
539
540  private void bloomWriteRead(StoreFileWriter writer, FileSystem fs) throws Exception {
541    float err = conf.getFloat(BloomFilterFactory.IO_STOREFILE_BLOOM_ERROR_RATE, 0);
542    Path f = writer.getPath();
543    long now = System.currentTimeMillis();
544    for (int i = 0; i < 2000; i += 2) {
545      String row = String.format(localFormatter, i);
546      KeyValue kv = new KeyValue(Bytes.toBytes(row), Bytes.toBytes("family"),
547        Bytes.toBytes("col"), now, Bytes.toBytes("value"));
548      writer.append(kv);
549    }
550    writer.close();
551
552    StoreFileReader reader =
553        new StoreFileReader(fs, f, cacheConf, true, new AtomicInteger(0), true, conf);
554    reader.loadFileInfo();
555    reader.loadBloomfilter();
556    StoreFileScanner scanner = getStoreFileScanner(reader, false, false);
557
558    // check false positives rate
559    int falsePos = 0;
560    int falseNeg = 0;
561    for (int i = 0; i < 2000; i++) {
562      String row = String.format(localFormatter, i);
563      TreeSet<byte[]> columns = new TreeSet<>(Bytes.BYTES_COMPARATOR);
564      columns.add(Bytes.toBytes("family:col"));
565
566      Scan scan = new Scan(Bytes.toBytes(row),Bytes.toBytes(row));
567      scan.addColumn(Bytes.toBytes("family"), Bytes.toBytes("family:col"));
568      HStore store = mock(HStore.class);
569      when(store.getColumnFamilyDescriptor())
570          .thenReturn(ColumnFamilyDescriptorBuilder.of("family"));
571      boolean exists = scanner.shouldUseScanner(scan, store, Long.MIN_VALUE);
572      if (i % 2 == 0) {
573        if (!exists) falseNeg++;
574      } else {
575        if (exists) falsePos++;
576      }
577    }
578    reader.close(true); // evict because we are about to delete the file
579    fs.delete(f, true);
580    assertEquals("False negatives: " + falseNeg, 0, falseNeg);
581    int maxFalsePos = (int) (2 * 2000 * err);
582    assertTrue("Too many false positives: " + falsePos + " (err=" + err + ", expected no more than "
583            + maxFalsePos + ")", falsePos <= maxFalsePos);
584  }
585
586  private static final int BLOCKSIZE_SMALL = 8192;
587
588  @Test
589  public void testBloomFilter() throws Exception {
590    FileSystem fs = FileSystem.getLocal(conf);
591    conf.setFloat(BloomFilterFactory.IO_STOREFILE_BLOOM_ERROR_RATE, (float) 0.01);
592    conf.setBoolean(BloomFilterFactory.IO_STOREFILE_BLOOM_ENABLED, true);
593
594    // write the file
595    Path f = new Path(ROOT_DIR, getName());
596    HFileContext meta = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL)
597                        .withChecksumType(CKTYPE)
598                        .withBytesPerCheckSum(CKBYTES).build();
599    // Make a store file and write data to it.
600    StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs)
601            .withFilePath(f)
602            .withBloomType(BloomType.ROW)
603            .withMaxKeyCount(2000)
604            .withFileContext(meta)
605            .build();
606    bloomWriteRead(writer, fs);
607  }
608
609  @Test
610  public void testDeleteFamilyBloomFilter() throws Exception {
611    FileSystem fs = FileSystem.getLocal(conf);
612    conf.setFloat(BloomFilterFactory.IO_STOREFILE_BLOOM_ERROR_RATE, (float) 0.01);
613    conf.setBoolean(BloomFilterFactory.IO_STOREFILE_BLOOM_ENABLED, true);
614    float err = conf.getFloat(BloomFilterFactory.IO_STOREFILE_BLOOM_ERROR_RATE, 0);
615
616    // write the file
617    Path f = new Path(ROOT_DIR, getName());
618
619    HFileContext meta = new HFileContextBuilder()
620                        .withBlockSize(BLOCKSIZE_SMALL)
621                        .withChecksumType(CKTYPE)
622                        .withBytesPerCheckSum(CKBYTES).build();
623    // Make a store file and write data to it.
624    StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs)
625            .withFilePath(f)
626            .withMaxKeyCount(2000)
627            .withFileContext(meta)
628            .build();
629
630    // add delete family
631    long now = System.currentTimeMillis();
632    for (int i = 0; i < 2000; i += 2) {
633      String row = String.format(localFormatter, i);
634      KeyValue kv = new KeyValue(Bytes.toBytes(row), Bytes.toBytes("family"),
635          Bytes.toBytes("col"), now, KeyValue.Type.DeleteFamily, Bytes.toBytes("value"));
636      writer.append(kv);
637    }
638    writer.close();
639
640    StoreFileReader reader =
641        new StoreFileReader(fs, f, cacheConf, true, new AtomicInteger(0), true, conf);
642    reader.loadFileInfo();
643    reader.loadBloomfilter();
644
645    // check false positives rate
646    int falsePos = 0;
647    int falseNeg = 0;
648    for (int i = 0; i < 2000; i++) {
649      String row = String.format(localFormatter, i);
650      byte[] rowKey = Bytes.toBytes(row);
651      boolean exists = reader.passesDeleteFamilyBloomFilter(rowKey, 0, rowKey.length);
652      if (i % 2 == 0) {
653        if (!exists)
654          falseNeg++;
655      } else {
656        if (exists)
657          falsePos++;
658      }
659    }
660    assertEquals(1000, reader.getDeleteFamilyCnt());
661    reader.close(true); // evict because we are about to delete the file
662    fs.delete(f, true);
663    assertEquals("False negatives: " + falseNeg, 0, falseNeg);
664    int maxFalsePos = (int) (2 * 2000 * err);
665    assertTrue("Too many false positives: " + falsePos + " (err=" + err
666        + ", expected no more than " + maxFalsePos, falsePos <= maxFalsePos);
667  }
668
669  /**
670   * Test for HBASE-8012
671   */
672  @Test
673  public void testReseek() throws Exception {
674    // write the file
675    Path f = new Path(ROOT_DIR, getName());
676    HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
677    // Make a store file and write data to it.
678    StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs)
679            .withFilePath(f)
680            .withFileContext(meta)
681            .build();
682
683    writeStoreFile(writer);
684    writer.close();
685
686    StoreFileReader reader =
687        new StoreFileReader(fs, f, cacheConf, true, new AtomicInteger(0), true, conf);
688
689    // Now do reseek with empty KV to position to the beginning of the file
690
691    KeyValue k = KeyValueUtil.createFirstOnRow(HConstants.EMPTY_BYTE_ARRAY);
692    StoreFileScanner s = getStoreFileScanner(reader, false, false);
693    s.reseek(k);
694
695    assertNotNull("Intial reseek should position at the beginning of the file", s.peek());
696  }
697
698  @Test
699  public void testBloomTypes() throws Exception {
700    float err = (float) 0.01;
701    FileSystem fs = FileSystem.getLocal(conf);
702    conf.setFloat(BloomFilterFactory.IO_STOREFILE_BLOOM_ERROR_RATE, err);
703    conf.setBoolean(BloomFilterFactory.IO_STOREFILE_BLOOM_ENABLED, true);
704
705    int rowCount = 50;
706    int colCount = 10;
707    int versions = 2;
708
709    // run once using columns and once using rows
710    BloomType[] bt = {BloomType.ROWCOL, BloomType.ROW};
711    int[] expKeys  = {rowCount*colCount, rowCount};
712    // below line deserves commentary.  it is expected bloom false positives
713    //  column = rowCount*2*colCount inserts
714    //  row-level = only rowCount*2 inserts, but failures will be magnified by
715    //              2nd for loop for every column (2*colCount)
716    float[] expErr   = {2*rowCount*colCount*err, 2*rowCount*2*colCount*err};
717
718    for (int x : new int[]{0,1}) {
719      // write the file
720      Path f = new Path(ROOT_DIR, getName() + x);
721      HFileContext meta = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL)
722          .withChecksumType(CKTYPE)
723          .withBytesPerCheckSum(CKBYTES).build();
724      // Make a store file and write data to it.
725      StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs)
726              .withFilePath(f)
727              .withBloomType(bt[x])
728              .withMaxKeyCount(expKeys[x])
729              .withFileContext(meta)
730              .build();
731
732      long now = System.currentTimeMillis();
733      for (int i = 0; i < rowCount*2; i += 2) { // rows
734        for (int j = 0; j < colCount*2; j += 2) {   // column qualifiers
735          String row = String.format(localFormatter, i);
736          String col = String.format(localFormatter, j);
737          for (int k= 0; k < versions; ++k) { // versions
738            KeyValue kv = new KeyValue(Bytes.toBytes(row), Bytes.toBytes("family"),
739                Bytes.toBytes("col" + col), now-k, Bytes.toBytes(-1L));
740            writer.append(kv);
741          }
742        }
743      }
744      writer.close();
745
746      StoreFileReader reader =
747          new StoreFileReader(fs, f, cacheConf, true, new AtomicInteger(0), true, conf);
748      reader.loadFileInfo();
749      reader.loadBloomfilter();
750      StoreFileScanner scanner = getStoreFileScanner(reader, false, false);
751      assertEquals(expKeys[x], reader.generalBloomFilter.getKeyCount());
752
753      HStore store = mock(HStore.class);
754      when(store.getColumnFamilyDescriptor())
755          .thenReturn(ColumnFamilyDescriptorBuilder.of("family"));
756      // check false positives rate
757      int falsePos = 0;
758      int falseNeg = 0;
759      for (int i = 0; i < rowCount*2; ++i) { // rows
760        for (int j = 0; j < colCount*2; ++j) {   // column qualifiers
761          String row = String.format(localFormatter, i);
762          String col = String.format(localFormatter, j);
763          TreeSet<byte[]> columns = new TreeSet<>(Bytes.BYTES_COMPARATOR);
764          columns.add(Bytes.toBytes("col" + col));
765
766          Scan scan = new Scan(Bytes.toBytes(row),Bytes.toBytes(row));
767          scan.addColumn(Bytes.toBytes("family"), Bytes.toBytes(("col"+col)));
768
769          boolean exists =
770              scanner.shouldUseScanner(scan, store, Long.MIN_VALUE);
771          boolean shouldRowExist = i % 2 == 0;
772          boolean shouldColExist = j % 2 == 0;
773          shouldColExist = shouldColExist || bt[x] == BloomType.ROW;
774          if (shouldRowExist && shouldColExist) {
775            if (!exists) falseNeg++;
776          } else {
777            if (exists) falsePos++;
778          }
779        }
780      }
781      reader.close(true); // evict because we are about to delete the file
782      fs.delete(f, true);
783      System.out.println(bt[x].toString());
784      System.out.println("  False negatives: " + falseNeg);
785      System.out.println("  False positives: " + falsePos);
786      assertEquals(0, falseNeg);
787      assertTrue(falsePos < 2*expErr[x]);
788    }
789  }
790
791  @Test
792  public void testSeqIdComparator() {
793    assertOrdering(StoreFileComparators.SEQ_ID, mockStoreFile(true, 100, 1000, -1, "/foo/123"),
794        mockStoreFile(true, 100, 1000, -1, "/foo/124"),
795        mockStoreFile(true, 99, 1000, -1, "/foo/126"),
796        mockStoreFile(true, 98, 2000, -1, "/foo/126"), mockStoreFile(false, 3453, -1, 1, "/foo/1"),
797        mockStoreFile(false, 2, -1, 3, "/foo/2"), mockStoreFile(false, 1000, -1, 5, "/foo/2"),
798        mockStoreFile(false, 76, -1, 5, "/foo/3"));
799  }
800
801  /**
802   * Assert that the given comparator orders the given storefiles in the
803   * same way that they're passed.
804   */
805  private void assertOrdering(Comparator<? super HStoreFile> comparator, HStoreFile ... sfs) {
806    ArrayList<HStoreFile> sorted = Lists.newArrayList(sfs);
807    Collections.shuffle(sorted);
808    Collections.sort(sorted, comparator);
809    LOG.debug("sfs: " + Joiner.on(",").join(sfs));
810    LOG.debug("sorted: " + Joiner.on(",").join(sorted));
811    assertTrue(Iterables.elementsEqual(Arrays.asList(sfs), sorted));
812  }
813
814  /**
815   * Create a mock StoreFile with the given attributes.
816   */
817  private HStoreFile mockStoreFile(boolean bulkLoad,
818                                  long size,
819                                  long bulkTimestamp,
820                                  long seqId,
821                                  String path) {
822    HStoreFile mock = Mockito.mock(HStoreFile.class);
823    StoreFileReader reader = Mockito.mock(StoreFileReader.class);
824
825    Mockito.doReturn(size).when(reader).length();
826
827    Mockito.doReturn(reader).when(mock).getReader();
828    Mockito.doReturn(bulkLoad).when(mock).isBulkLoadResult();
829    Mockito.doReturn(OptionalLong.of(bulkTimestamp)).when(mock).getBulkLoadTimestamp();
830    Mockito.doReturn(seqId).when(mock).getMaxSequenceId();
831    Mockito.doReturn(new Path(path)).when(mock).getPath();
832    String name = "mock storefile, bulkLoad=" + bulkLoad +
833      " bulkTimestamp=" + bulkTimestamp +
834      " seqId=" + seqId +
835      " path=" + path;
836    Mockito.doReturn(name).when(mock).toString();
837    return mock;
838  }
839
840  /**
841   * Generate a list of KeyValues for testing based on given parameters
842   * @param timestamps
843   * @param numRows
844   * @param qualifier
845   * @param family
846   * @return the rows key-value list
847   */
848  List<KeyValue> getKeyValueSet(long[] timestamps, int numRows,
849      byte[] qualifier, byte[] family) {
850    List<KeyValue> kvList = new ArrayList<>();
851    for (int i=1;i<=numRows;i++) {
852      byte[] b = Bytes.toBytes(i) ;
853      LOG.info(Bytes.toString(b));
854      LOG.info(Bytes.toString(b));
855      for (long timestamp: timestamps)
856      {
857        kvList.add(new KeyValue(b, family, qualifier, timestamp, b));
858      }
859    }
860    return kvList;
861  }
862
863  /**
864   * Test to ensure correctness when using StoreFile with multiple timestamps
865   * @throws IOException
866   */
867  @Test
868  public void testMultipleTimestamps() throws IOException {
869    byte[] family = Bytes.toBytes("familyname");
870    byte[] qualifier = Bytes.toBytes("qualifier");
871    int numRows = 10;
872    long[] timestamps = new long[] {20,10,5,1};
873    Scan scan = new Scan();
874
875    // Make up a directory hierarchy that has a regiondir ("7e0102") and familyname.
876    Path storedir = new Path(new Path(testDir, "7e0102"), Bytes.toString(family));
877    Path dir = new Path(storedir, "1234567890");
878    HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
879    // Make a store file and write data to it.
880    StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs)
881            .withOutputDir(dir)
882            .withFileContext(meta)
883            .build();
884
885    List<KeyValue> kvList = getKeyValueSet(timestamps,numRows,
886        qualifier, family);
887
888    for (KeyValue kv : kvList) {
889      writer.append(kv);
890    }
891    writer.appendMetadata(0, false);
892    writer.close();
893
894    HStoreFile hsf = new HStoreFile(this.fs, writer.getPath(), conf, cacheConf,
895      BloomType.NONE, true);
896    HStore store = mock(HStore.class);
897    when(store.getColumnFamilyDescriptor()).thenReturn(ColumnFamilyDescriptorBuilder.of(family));
898    hsf.initReader();
899    StoreFileReader reader = hsf.getReader();
900    StoreFileScanner scanner = getStoreFileScanner(reader, false, false);
901    TreeSet<byte[]> columns = new TreeSet<>(Bytes.BYTES_COMPARATOR);
902    columns.add(qualifier);
903
904    scan.setTimeRange(20, 100);
905    assertTrue(scanner.shouldUseScanner(scan, store, Long.MIN_VALUE));
906
907    scan.setTimeRange(1, 2);
908    assertTrue(scanner.shouldUseScanner(scan, store, Long.MIN_VALUE));
909
910    scan.setTimeRange(8, 10);
911    assertTrue(scanner.shouldUseScanner(scan, store, Long.MIN_VALUE));
912
913    // lets make sure it still works with column family time ranges
914    scan.setColumnFamilyTimeRange(family, 7, 50);
915    assertTrue(scanner.shouldUseScanner(scan, store, Long.MIN_VALUE));
916
917    // This test relies on the timestamp range optimization
918    scan = new Scan();
919    scan.setTimeRange(27, 50);
920    assertTrue(!scanner.shouldUseScanner(scan, store, Long.MIN_VALUE));
921
922    // should still use the scanner because we override the family time range
923    scan = new Scan();
924    scan.setTimeRange(27, 50);
925    scan.setColumnFamilyTimeRange(family, 7, 50);
926    assertTrue(scanner.shouldUseScanner(scan, store, Long.MIN_VALUE));
927  }
928
929  @Test
930  public void testCacheOnWriteEvictOnClose() throws Exception {
931    Configuration conf = this.conf;
932
933    // Find a home for our files (regiondir ("7e0102") and familyname).
934    Path baseDir = new Path(new Path(testDir, "7e0102"),"twoCOWEOC");
935
936    // Grab the block cache and get the initial hit/miss counts
937    CacheConfig.instantiateBlockCache(conf);
938    BlockCache bc = new CacheConfig(conf).getBlockCache();
939    assertNotNull(bc);
940    CacheStats cs = bc.getStats();
941    long startHit = cs.getHitCount();
942    long startMiss = cs.getMissCount();
943    long startEvicted = cs.getEvictedCount();
944
945    // Let's write a StoreFile with three blocks, with cache on write off
946    conf.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, false);
947    CacheConfig cacheConf = new CacheConfig(conf);
948    Path pathCowOff = new Path(baseDir, "123456789");
949    StoreFileWriter writer = writeStoreFile(conf, cacheConf, pathCowOff, 3);
950    HStoreFile hsf = new HStoreFile(this.fs, writer.getPath(), conf, cacheConf,
951      BloomType.NONE, true);
952    LOG.debug(hsf.getPath().toString());
953
954    // Read this file, we should see 3 misses
955    hsf.initReader();
956    StoreFileReader reader = hsf.getReader();
957    reader.loadFileInfo();
958    StoreFileScanner scanner = getStoreFileScanner(reader, true, true);
959    scanner.seek(KeyValue.LOWESTKEY);
960    while (scanner.next() != null);
961    assertEquals(startHit, cs.getHitCount());
962    assertEquals(startMiss + 3, cs.getMissCount());
963    assertEquals(startEvicted, cs.getEvictedCount());
964    startMiss += 3;
965    scanner.close();
966    reader.close(cacheConf.shouldEvictOnClose());
967
968    // Now write a StoreFile with three blocks, with cache on write on
969    conf.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, true);
970    cacheConf = new CacheConfig(conf);
971    Path pathCowOn = new Path(baseDir, "123456788");
972    writer = writeStoreFile(conf, cacheConf, pathCowOn, 3);
973    hsf = new HStoreFile(this.fs, writer.getPath(), conf, cacheConf,
974      BloomType.NONE, true);
975
976    // Read this file, we should see 3 hits
977    hsf.initReader();
978    reader = hsf.getReader();
979    scanner = getStoreFileScanner(reader, true, true);
980    scanner.seek(KeyValue.LOWESTKEY);
981    while (scanner.next() != null);
982    assertEquals(startHit + 3, cs.getHitCount());
983    assertEquals(startMiss, cs.getMissCount());
984    assertEquals(startEvicted, cs.getEvictedCount());
985    startHit += 3;
986    scanner.close();
987    reader.close(cacheConf.shouldEvictOnClose());
988
989    // Let's read back the two files to ensure the blocks exactly match
990    hsf = new HStoreFile(this.fs, pathCowOff, conf, cacheConf, BloomType.NONE, true);
991    hsf.initReader();
992    StoreFileReader readerOne = hsf.getReader();
993    readerOne.loadFileInfo();
994    StoreFileScanner scannerOne = getStoreFileScanner(readerOne, true, true);
995    scannerOne.seek(KeyValue.LOWESTKEY);
996    hsf = new HStoreFile(this.fs, pathCowOn, conf, cacheConf, BloomType.NONE, true);
997    hsf.initReader();
998    StoreFileReader readerTwo = hsf.getReader();
999    readerTwo.loadFileInfo();
1000    StoreFileScanner scannerTwo = getStoreFileScanner(readerTwo, true, true);
1001    scannerTwo.seek(KeyValue.LOWESTKEY);
1002    Cell kv1 = null;
1003    Cell kv2 = null;
1004    while ((kv1 = scannerOne.next()) != null) {
1005      kv2 = scannerTwo.next();
1006      assertTrue(kv1.equals(kv2));
1007      KeyValue keyv1 = KeyValueUtil.ensureKeyValue(kv1);
1008      KeyValue keyv2 = KeyValueUtil.ensureKeyValue(kv2);
1009      assertTrue(Bytes.compareTo(
1010          keyv1.getBuffer(), keyv1.getKeyOffset(), keyv1.getKeyLength(),
1011          keyv2.getBuffer(), keyv2.getKeyOffset(), keyv2.getKeyLength()) == 0);
1012      assertTrue(Bytes.compareTo(
1013          kv1.getValueArray(), kv1.getValueOffset(), kv1.getValueLength(),
1014          kv2.getValueArray(), kv2.getValueOffset(), kv2.getValueLength()) == 0);
1015    }
1016    assertNull(scannerTwo.next());
1017    assertEquals(startHit + 6, cs.getHitCount());
1018    assertEquals(startMiss, cs.getMissCount());
1019    assertEquals(startEvicted, cs.getEvictedCount());
1020    startHit += 6;
1021    scannerOne.close();
1022    readerOne.close(cacheConf.shouldEvictOnClose());
1023    scannerTwo.close();
1024    readerTwo.close(cacheConf.shouldEvictOnClose());
1025
1026    // Let's close the first file with evict on close turned on
1027    conf.setBoolean("hbase.rs.evictblocksonclose", true);
1028    cacheConf = new CacheConfig(conf);
1029    hsf = new HStoreFile(this.fs, pathCowOff, conf, cacheConf, BloomType.NONE, true);
1030    hsf.initReader();
1031    reader = hsf.getReader();
1032    reader.close(cacheConf.shouldEvictOnClose());
1033
1034    // We should have 3 new evictions but the evict count stat should not change. Eviction because
1035    // of HFile invalidation is not counted along with normal evictions
1036    assertEquals(startHit, cs.getHitCount());
1037    assertEquals(startMiss, cs.getMissCount());
1038    assertEquals(startEvicted, cs.getEvictedCount());
1039
1040    // Let's close the second file with evict on close turned off
1041    conf.setBoolean("hbase.rs.evictblocksonclose", false);
1042    cacheConf = new CacheConfig(conf);
1043    hsf = new HStoreFile(this.fs, pathCowOn, conf, cacheConf, BloomType.NONE, true);
1044    hsf.initReader();
1045    reader = hsf.getReader();
1046    reader.close(cacheConf.shouldEvictOnClose());
1047
1048    // We expect no changes
1049    assertEquals(startHit, cs.getHitCount());
1050    assertEquals(startMiss, cs.getMissCount());
1051    assertEquals(startEvicted, cs.getEvictedCount());
1052  }
1053
1054  private Path splitStoreFile(final HRegionFileSystem regionFs, final HRegionInfo hri,
1055      final String family, final HStoreFile sf, final byte[] splitKey, boolean isTopRef)
1056      throws IOException {
1057    FileSystem fs = regionFs.getFileSystem();
1058    Path path = regionFs.splitStoreFile(hri, family, sf, splitKey, isTopRef, null);
1059    if (null == path) {
1060      return null;
1061    }
1062    Path regionDir = regionFs.commitDaughterRegion(hri);
1063    return new Path(new Path(regionDir, family), path.getName());
1064  }
1065
1066  private StoreFileWriter writeStoreFile(Configuration conf,
1067      CacheConfig cacheConf, Path path, int numBlocks)
1068  throws IOException {
1069    // Let's put ~5 small KVs in each block, so let's make 5*numBlocks KVs
1070    int numKVs = 5 * numBlocks;
1071    List<KeyValue> kvs = new ArrayList<>(numKVs);
1072    byte [] b = Bytes.toBytes("x");
1073    int totalSize = 0;
1074    for (int i=numKVs;i>0;i--) {
1075      KeyValue kv = new KeyValue(b, b, b, i, b);
1076      kvs.add(kv);
1077      // kv has memstoreTS 0, which takes 1 byte to store.
1078      totalSize += kv.getLength() + 1;
1079    }
1080    int blockSize = totalSize / numBlocks;
1081    HFileContext meta = new HFileContextBuilder().withBlockSize(blockSize)
1082                        .withChecksumType(CKTYPE)
1083                        .withBytesPerCheckSum(CKBYTES)
1084                        .build();
1085    // Make a store file and write data to it.
1086    StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs)
1087            .withFilePath(path)
1088            .withMaxKeyCount(2000)
1089            .withFileContext(meta)
1090            .build();
1091    // We'll write N-1 KVs to ensure we don't write an extra block
1092    kvs.remove(kvs.size()-1);
1093    for (KeyValue kv : kvs) {
1094      writer.append(kv);
1095    }
1096    writer.appendMetadata(0, false);
1097    writer.close();
1098    return writer;
1099  }
1100
1101  /**
1102   * Check if data block encoding information is saved correctly in HFile's
1103   * file info.
1104   */
1105  @Test
1106  public void testDataBlockEncodingMetaData() throws IOException {
1107    // Make up a directory hierarchy that has a regiondir ("7e0102") and familyname.
1108    Path dir = new Path(new Path(testDir, "7e0102"), "familyname");
1109    Path path = new Path(dir, "1234567890");
1110
1111    DataBlockEncoding dataBlockEncoderAlgo =
1112        DataBlockEncoding.FAST_DIFF;
1113    HFileDataBlockEncoder dataBlockEncoder =
1114        new HFileDataBlockEncoderImpl(
1115            dataBlockEncoderAlgo);
1116    cacheConf = new CacheConfig(conf);
1117    HFileContext meta = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL)
1118        .withChecksumType(CKTYPE)
1119        .withBytesPerCheckSum(CKBYTES)
1120        .withDataBlockEncoding(dataBlockEncoderAlgo)
1121        .build();
1122    // Make a store file and write data to it.
1123    StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs)
1124            .withFilePath(path)
1125            .withMaxKeyCount(2000)
1126            .withFileContext(meta)
1127            .build();
1128    writer.close();
1129
1130    HStoreFile storeFile =
1131        new HStoreFile(fs, writer.getPath(), conf, cacheConf, BloomType.NONE, true);
1132    storeFile.initReader();
1133    StoreFileReader reader = storeFile.getReader();
1134
1135    Map<byte[], byte[]> fileInfo = reader.loadFileInfo();
1136    byte[] value = fileInfo.get(HFileDataBlockEncoder.DATA_BLOCK_ENCODING);
1137    assertEquals(dataBlockEncoderAlgo.getNameInBytes(), value);
1138  }
1139}