001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025import static org.mockito.ArgumentMatchers.any;
026import static org.mockito.Mockito.mock;
027import static org.mockito.Mockito.spy;
028import static org.mockito.Mockito.times;
029import static org.mockito.Mockito.verify;
030import static org.mockito.Mockito.when;
031
032import java.io.IOException;
033import java.lang.ref.SoftReference;
034import java.security.PrivilegedExceptionAction;
035import java.util.ArrayList;
036import java.util.Arrays;
037import java.util.Collection;
038import java.util.Collections;
039import java.util.Iterator;
040import java.util.List;
041import java.util.ListIterator;
042import java.util.NavigableSet;
043import java.util.TreeSet;
044import java.util.concurrent.ConcurrentSkipListSet;
045import java.util.concurrent.CountDownLatch;
046import java.util.concurrent.ExecutorService;
047import java.util.concurrent.Executors;
048import java.util.concurrent.ThreadPoolExecutor;
049import java.util.concurrent.TimeUnit;
050import java.util.concurrent.atomic.AtomicBoolean;
051import java.util.concurrent.atomic.AtomicInteger;
052import org.apache.hadoop.conf.Configuration;
053import org.apache.hadoop.fs.FSDataOutputStream;
054import org.apache.hadoop.fs.FileStatus;
055import org.apache.hadoop.fs.FileSystem;
056import org.apache.hadoop.fs.FilterFileSystem;
057import org.apache.hadoop.fs.LocalFileSystem;
058import org.apache.hadoop.fs.Path;
059import org.apache.hadoop.fs.permission.FsPermission;
060import org.apache.hadoop.hbase.Cell;
061import org.apache.hadoop.hbase.CellBuilderFactory;
062import org.apache.hadoop.hbase.CellBuilderType;
063import org.apache.hadoop.hbase.CellComparator;
064import org.apache.hadoop.hbase.CellComparatorImpl;
065import org.apache.hadoop.hbase.CellUtil;
066import org.apache.hadoop.hbase.HBaseClassTestRule;
067import org.apache.hadoop.hbase.HBaseConfiguration;
068import org.apache.hadoop.hbase.HBaseTestingUtility;
069import org.apache.hadoop.hbase.HConstants;
070import org.apache.hadoop.hbase.KeyValue;
071import org.apache.hadoop.hbase.MemoryCompactionPolicy;
072import org.apache.hadoop.hbase.NamespaceDescriptor;
073import org.apache.hadoop.hbase.PrivateCellUtil;
074import org.apache.hadoop.hbase.TableName;
075import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
076import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
077import org.apache.hadoop.hbase.client.Get;
078import org.apache.hadoop.hbase.client.RegionInfo;
079import org.apache.hadoop.hbase.client.RegionInfoBuilder;
080import org.apache.hadoop.hbase.client.Scan;
081import org.apache.hadoop.hbase.client.TableDescriptor;
082import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
083import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
084import org.apache.hadoop.hbase.filter.Filter;
085import org.apache.hadoop.hbase.filter.FilterBase;
086import org.apache.hadoop.hbase.io.compress.Compression;
087import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
088import org.apache.hadoop.hbase.io.hfile.CacheConfig;
089import org.apache.hadoop.hbase.io.hfile.HFile;
090import org.apache.hadoop.hbase.io.hfile.HFileContext;
091import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
092import org.apache.hadoop.hbase.monitoring.MonitoredTask;
093import org.apache.hadoop.hbase.quotas.RegionSizeStoreImpl;
094import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
095import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
096import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
097import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
098import org.apache.hadoop.hbase.security.User;
099import org.apache.hadoop.hbase.testclassification.MediumTests;
100import org.apache.hadoop.hbase.testclassification.RegionServerTests;
101import org.apache.hadoop.hbase.util.Bytes;
102import org.apache.hadoop.hbase.util.CommonFSUtils;
103import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
104import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
105import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
106import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
107import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
108import org.apache.hadoop.hbase.wal.WALFactory;
109import org.apache.hadoop.util.Progressable;
110import org.junit.After;
111import org.junit.AfterClass;
112import org.junit.Before;
113import org.junit.ClassRule;
114import org.junit.Rule;
115import org.junit.Test;
116import org.junit.experimental.categories.Category;
117import org.junit.rules.TestName;
118import org.mockito.Mockito;
119import org.slf4j.Logger;
120import org.slf4j.LoggerFactory;
121
122import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
123
124/**
125 * Test class for the HStore
126 */
127@Category({ RegionServerTests.class, MediumTests.class })
128public class TestHStore {
129
130  @ClassRule
131  public static final HBaseClassTestRule CLASS_RULE =
132      HBaseClassTestRule.forClass(TestHStore.class);
133
134  private static final Logger LOG = LoggerFactory.getLogger(TestHStore.class);
135  @Rule
136  public TestName name = new TestName();
137
138  HRegion region;
139  HStore store;
140  byte [] table = Bytes.toBytes("table");
141  byte [] family = Bytes.toBytes("family");
142
143  byte [] row = Bytes.toBytes("row");
144  byte [] row2 = Bytes.toBytes("row2");
145  byte [] qf1 = Bytes.toBytes("qf1");
146  byte [] qf2 = Bytes.toBytes("qf2");
147  byte [] qf3 = Bytes.toBytes("qf3");
148  byte [] qf4 = Bytes.toBytes("qf4");
149  byte [] qf5 = Bytes.toBytes("qf5");
150  byte [] qf6 = Bytes.toBytes("qf6");
151
152  NavigableSet<byte[]> qualifiers = new ConcurrentSkipListSet<>(Bytes.BYTES_COMPARATOR);
153
154  List<Cell> expected = new ArrayList<>();
155  List<Cell> result = new ArrayList<>();
156
157  long id = System.currentTimeMillis();
158  Get get = new Get(row);
159
160  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
161  private static final String DIR = TEST_UTIL.getDataTestDir("TestStore").toString();
162
163
164  /**
165   * Setup
166   * @throws IOException
167   */
168  @Before
169  public void setUp() throws IOException {
170    qualifiers.clear();
171    qualifiers.add(qf1);
172    qualifiers.add(qf3);
173    qualifiers.add(qf5);
174
175    Iterator<byte[]> iter = qualifiers.iterator();
176    while(iter.hasNext()){
177      byte [] next = iter.next();
178      expected.add(new KeyValue(row, family, next, 1, (byte[])null));
179      get.addColumn(family, next);
180    }
181  }
182
183  private void init(String methodName) throws IOException {
184    init(methodName, TEST_UTIL.getConfiguration());
185  }
186
187  private HStore init(String methodName, Configuration conf) throws IOException {
188    // some of the tests write 4 versions and then flush
189    // (with HBASE-4241, lower versions are collected on flush)
190    return init(methodName, conf,
191      ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(4).build());
192  }
193
194  private HStore init(String methodName, Configuration conf, ColumnFamilyDescriptor hcd)
195      throws IOException {
196    return init(methodName, conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), hcd);
197  }
198
199  private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder,
200      ColumnFamilyDescriptor hcd) throws IOException {
201    return init(methodName, conf, builder, hcd, null);
202  }
203
204  private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder,
205      ColumnFamilyDescriptor hcd, MyStoreHook hook) throws IOException {
206    return init(methodName, conf, builder, hcd, hook, false);
207  }
208
209  private void initHRegion(String methodName, Configuration conf, TableDescriptorBuilder builder,
210      ColumnFamilyDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException {
211    TableDescriptor htd = builder.setColumnFamily(hcd).build();
212    Path basedir = new Path(DIR + methodName);
213    Path tableDir = CommonFSUtils.getTableDir(basedir, htd.getTableName());
214    final Path logdir = new Path(basedir, AbstractFSWALProvider.getWALDirectoryName(methodName));
215
216    FileSystem fs = FileSystem.get(conf);
217
218    fs.delete(logdir, true);
219    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false,
220      MemStoreLABImpl.CHUNK_SIZE_DEFAULT, 1, 0, null);
221    RegionInfo info = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
222    Configuration walConf = new Configuration(conf);
223    CommonFSUtils.setRootDir(walConf, basedir);
224    WALFactory wals = new WALFactory(walConf, methodName);
225    region = new HRegion(new HRegionFileSystem(conf, fs, tableDir, info), wals.getWAL(info), conf,
226        htd, null);
227    region.regionServicesForStores = Mockito.spy(region.regionServicesForStores);
228    ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
229    Mockito.when(region.regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool);
230  }
231
232  private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder,
233      ColumnFamilyDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException {
234    initHRegion(methodName, conf, builder, hcd, hook, switchToPread);
235    if (hook == null) {
236      store = new HStore(region, hcd, conf, false);
237    } else {
238      store = new MyStore(region, hcd, conf, hook, switchToPread);
239    }
240    return store;
241  }
242
243  /**
244   * Test we do not lose data if we fail a flush and then close.
245   * Part of HBase-10466
246   * @throws Exception
247   */
248  @Test
249  public void testFlushSizeSizing() throws Exception {
250    LOG.info("Setting up a faulty file system that cannot write in " + this.name.getMethodName());
251    final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
252    // Only retry once.
253    conf.setInt("hbase.hstore.flush.retries.number", 1);
254    User user = User.createUserForTesting(conf, this.name.getMethodName(),
255      new String[]{"foo"});
256    // Inject our faulty LocalFileSystem
257    conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
258    user.runAs(new PrivilegedExceptionAction<Object>() {
259      @Override
260      public Object run() throws Exception {
261        // Make sure it worked (above is sensitive to caching details in hadoop core)
262        FileSystem fs = FileSystem.get(conf);
263        assertEquals(FaultyFileSystem.class, fs.getClass());
264        FaultyFileSystem ffs = (FaultyFileSystem)fs;
265
266        // Initialize region
267        init(name.getMethodName(), conf);
268
269        MemStoreSize mss = store.memstore.getFlushableSize();
270        assertEquals(0, mss.getDataSize());
271        LOG.info("Adding some data");
272        MemStoreSizing kvSize = new NonThreadSafeMemStoreSizing();
273        store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), kvSize);
274        // add the heap size of active (mutable) segment
275        kvSize.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0, 0);
276        mss = store.memstore.getFlushableSize();
277        assertEquals(kvSize.getMemStoreSize(), mss);
278        // Flush.  Bug #1 from HBASE-10466.  Make sure size calculation on failed flush is right.
279        try {
280          LOG.info("Flushing");
281          flushStore(store, id++);
282          fail("Didn't bubble up IOE!");
283        } catch (IOException ioe) {
284          assertTrue(ioe.getMessage().contains("Fault injected"));
285        }
286        // due to snapshot, change mutable to immutable segment
287        kvSize.incMemStoreSize(0,
288          CSLMImmutableSegment.DEEP_OVERHEAD_CSLM - MutableSegment.DEEP_OVERHEAD, 0, 0);
289        mss = store.memstore.getFlushableSize();
290        assertEquals(kvSize.getMemStoreSize(), mss);
291        MemStoreSizing kvSize2 = new NonThreadSafeMemStoreSizing();
292        store.add(new KeyValue(row, family, qf2, 2, (byte[]) null), kvSize2);
293        kvSize2.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0, 0);
294        // Even though we add a new kv, we expect the flushable size to be 'same' since we have
295        // not yet cleared the snapshot -- the above flush failed.
296        assertEquals(kvSize.getMemStoreSize(), mss);
297        ffs.fault.set(false);
298        flushStore(store, id++);
299        mss = store.memstore.getFlushableSize();
300        // Size should be the foreground kv size.
301        assertEquals(kvSize2.getMemStoreSize(), mss);
302        flushStore(store, id++);
303        mss = store.memstore.getFlushableSize();
304        assertEquals(0, mss.getDataSize());
305        assertEquals(MutableSegment.DEEP_OVERHEAD, mss.getHeapSize());
306        return null;
307      }
308    });
309  }
310
311  /**
312   * Verify that compression and data block encoding are respected by the
313   * Store.createWriterInTmp() method, used on store flush.
314   */
315  @Test
316  public void testCreateWriter() throws Exception {
317    Configuration conf = HBaseConfiguration.create();
318    FileSystem fs = FileSystem.get(conf);
319
320    ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.newBuilder(family)
321        .setCompressionType(Compression.Algorithm.GZ).setDataBlockEncoding(DataBlockEncoding.DIFF)
322        .build();
323    init(name.getMethodName(), conf, hcd);
324
325    // Test createWriterInTmp()
326    StoreFileWriter writer =
327        store.createWriterInTmp(4, hcd.getCompressionType(), false, true, false, false);
328    Path path = writer.getPath();
329    writer.append(new KeyValue(row, family, qf1, Bytes.toBytes(1)));
330    writer.append(new KeyValue(row, family, qf2, Bytes.toBytes(2)));
331    writer.append(new KeyValue(row2, family, qf1, Bytes.toBytes(3)));
332    writer.append(new KeyValue(row2, family, qf2, Bytes.toBytes(4)));
333    writer.close();
334
335    // Verify that compression and encoding settings are respected
336    HFile.Reader reader = HFile.createReader(fs, path, new CacheConfig(conf), true, conf);
337    assertEquals(hcd.getCompressionType(), reader.getTrailer().getCompressionCodec());
338    assertEquals(hcd.getDataBlockEncoding(), reader.getDataBlockEncoding());
339    reader.close();
340  }
341
342  @Test
343  public void testDeleteExpiredStoreFiles() throws Exception {
344    testDeleteExpiredStoreFiles(0);
345    testDeleteExpiredStoreFiles(1);
346  }
347
348  /*
349   * @param minVersions the MIN_VERSIONS for the column family
350   */
351  public void testDeleteExpiredStoreFiles(int minVersions) throws Exception {
352    int storeFileNum = 4;
353    int ttl = 4;
354    IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
355    EnvironmentEdgeManagerTestHelper.injectEdge(edge);
356
357    Configuration conf = HBaseConfiguration.create();
358    // Enable the expired store file deletion
359    conf.setBoolean("hbase.store.delete.expired.storefile", true);
360    // Set the compaction threshold higher to avoid normal compactions.
361    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 5);
362
363    init(name.getMethodName() + "-" + minVersions, conf, ColumnFamilyDescriptorBuilder
364        .newBuilder(family).setMinVersions(minVersions).setTimeToLive(ttl).build());
365
366    long storeTtl = this.store.getScanInfo().getTtl();
367    long sleepTime = storeTtl / storeFileNum;
368    long timeStamp;
369    // There are 4 store files and the max time stamp difference among these
370    // store files will be (this.store.ttl / storeFileNum)
371    for (int i = 1; i <= storeFileNum; i++) {
372      LOG.info("Adding some data for the store file #" + i);
373      timeStamp = EnvironmentEdgeManager.currentTime();
374      this.store.add(new KeyValue(row, family, qf1, timeStamp, (byte[]) null), null);
375      this.store.add(new KeyValue(row, family, qf2, timeStamp, (byte[]) null), null);
376      this.store.add(new KeyValue(row, family, qf3, timeStamp, (byte[]) null), null);
377      flush(i);
378      edge.incrementTime(sleepTime);
379    }
380
381    // Verify the total number of store files
382    assertEquals(storeFileNum, this.store.getStorefiles().size());
383
384     // Each call will find one expired store file and delete it before compaction happens.
385     // There will be no compaction due to threshold above. Last file will not be replaced.
386    for (int i = 1; i <= storeFileNum - 1; i++) {
387      // verify the expired store file.
388      assertFalse(this.store.requestCompaction().isPresent());
389      Collection<HStoreFile> sfs = this.store.getStorefiles();
390      // Ensure i files are gone.
391      if (minVersions == 0) {
392        assertEquals(storeFileNum - i, sfs.size());
393        // Ensure only non-expired files remain.
394        for (HStoreFile sf : sfs) {
395          assertTrue(sf.getReader().getMaxTimestamp() >= (edge.currentTime() - storeTtl));
396        }
397      } else {
398        assertEquals(storeFileNum, sfs.size());
399      }
400      // Let the next store file expired.
401      edge.incrementTime(sleepTime);
402    }
403    assertFalse(this.store.requestCompaction().isPresent());
404
405    Collection<HStoreFile> sfs = this.store.getStorefiles();
406    // Assert the last expired file is not removed.
407    if (minVersions == 0) {
408      assertEquals(1, sfs.size());
409    }
410    long ts = sfs.iterator().next().getReader().getMaxTimestamp();
411    assertTrue(ts < (edge.currentTime() - storeTtl));
412
413    for (HStoreFile sf : sfs) {
414      sf.closeStoreFile(true);
415    }
416  }
417
418  @Test
419  public void testLowestModificationTime() throws Exception {
420    Configuration conf = HBaseConfiguration.create();
421    FileSystem fs = FileSystem.get(conf);
422    // Initialize region
423    init(name.getMethodName(), conf);
424
425    int storeFileNum = 4;
426    for (int i = 1; i <= storeFileNum; i++) {
427      LOG.info("Adding some data for the store file #"+i);
428      this.store.add(new KeyValue(row, family, qf1, i, (byte[])null), null);
429      this.store.add(new KeyValue(row, family, qf2, i, (byte[])null), null);
430      this.store.add(new KeyValue(row, family, qf3, i, (byte[])null), null);
431      flush(i);
432    }
433    // after flush; check the lowest time stamp
434    long lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
435    long lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
436    assertEquals(lowestTimeStampFromManager,lowestTimeStampFromFS);
437
438    // after compact; check the lowest time stamp
439    store.compact(store.requestCompaction().get(), NoLimitThroughputController.INSTANCE, null);
440    lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
441    lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
442    assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS);
443  }
444
445  private static long getLowestTimeStampFromFS(FileSystem fs,
446      final Collection<HStoreFile> candidates) throws IOException {
447    long minTs = Long.MAX_VALUE;
448    if (candidates.isEmpty()) {
449      return minTs;
450    }
451    Path[] p = new Path[candidates.size()];
452    int i = 0;
453    for (HStoreFile sf : candidates) {
454      p[i] = sf.getPath();
455      ++i;
456    }
457
458    FileStatus[] stats = fs.listStatus(p);
459    if (stats == null || stats.length == 0) {
460      return minTs;
461    }
462    for (FileStatus s : stats) {
463      minTs = Math.min(minTs, s.getModificationTime());
464    }
465    return minTs;
466  }
467
468  //////////////////////////////////////////////////////////////////////////////
469  // Get tests
470  //////////////////////////////////////////////////////////////////////////////
471
472  private static final int BLOCKSIZE_SMALL = 8192;
473  /**
474   * Test for hbase-1686.
475   * @throws IOException
476   */
477  @Test
478  public void testEmptyStoreFile() throws IOException {
479    init(this.name.getMethodName());
480    // Write a store file.
481    this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
482    this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null);
483    flush(1);
484    // Now put in place an empty store file.  Its a little tricky.  Have to
485    // do manually with hacked in sequence id.
486    HStoreFile f = this.store.getStorefiles().iterator().next();
487    Path storedir = f.getPath().getParent();
488    long seqid = f.getMaxSequenceId();
489    Configuration c = HBaseConfiguration.create();
490    FileSystem fs = FileSystem.get(c);
491    HFileContext meta = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build();
492    StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c),
493        fs)
494            .withOutputDir(storedir)
495            .withFileContext(meta)
496            .build();
497    w.appendMetadata(seqid + 1, false);
498    w.close();
499    this.store.close();
500    // Reopen it... should pick up two files
501    this.store =
502        new HStore(this.store.getHRegion(), this.store.getColumnFamilyDescriptor(), c, false);
503    assertEquals(2, this.store.getStorefilesCount());
504
505    result = HBaseTestingUtility.getFromStoreFile(store,
506        get.getRow(),
507        qualifiers);
508    assertEquals(1, result.size());
509  }
510
511  /**
512   * Getting data from memstore only
513   * @throws IOException
514   */
515  @Test
516  public void testGet_FromMemStoreOnly() throws IOException {
517    init(this.name.getMethodName());
518
519    //Put data in memstore
520    this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
521    this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null);
522    this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null), null);
523    this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null), null);
524    this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null), null);
525    this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null), null);
526
527    //Get
528    result = HBaseTestingUtility.getFromStoreFile(store,
529        get.getRow(), qualifiers);
530
531    //Compare
532    assertCheck();
533  }
534
535  @Test
536  public void testTimeRangeIfSomeCellsAreDroppedInFlush() throws IOException {
537    testTimeRangeIfSomeCellsAreDroppedInFlush(1);
538    testTimeRangeIfSomeCellsAreDroppedInFlush(3);
539    testTimeRangeIfSomeCellsAreDroppedInFlush(5);
540  }
541
542  private void testTimeRangeIfSomeCellsAreDroppedInFlush(int maxVersion) throws IOException {
543    init(this.name.getMethodName(), TEST_UTIL.getConfiguration(),
544    ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(maxVersion).build());
545    long currentTs = 100;
546    long minTs = currentTs;
547    // the extra cell won't be flushed to disk,
548    // so the min of timerange will be different between memStore and hfile.
549    for (int i = 0; i != (maxVersion + 1); ++i) {
550      this.store.add(new KeyValue(row, family, qf1, ++currentTs, (byte[])null), null);
551      if (i == 1) {
552        minTs = currentTs;
553      }
554    }
555    flushStore(store, id++);
556
557    Collection<HStoreFile> files = store.getStorefiles();
558    assertEquals(1, files.size());
559    HStoreFile f = files.iterator().next();
560    f.initReader();
561    StoreFileReader reader = f.getReader();
562    assertEquals(minTs, reader.timeRange.getMin());
563    assertEquals(currentTs, reader.timeRange.getMax());
564  }
565
566  /**
567   * Getting data from files only
568   * @throws IOException
569   */
570  @Test
571  public void testGet_FromFilesOnly() throws IOException {
572    init(this.name.getMethodName());
573
574    //Put data in memstore
575    this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
576    this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null);
577    //flush
578    flush(1);
579
580    //Add more data
581    this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null), null);
582    this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null), null);
583    //flush
584    flush(2);
585
586    //Add more data
587    this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null), null);
588    this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null), null);
589    //flush
590    flush(3);
591
592    //Get
593    result = HBaseTestingUtility.getFromStoreFile(store,
594        get.getRow(),
595        qualifiers);
596    //this.store.get(get, qualifiers, result);
597
598    //Need to sort the result since multiple files
599    Collections.sort(result, CellComparatorImpl.COMPARATOR);
600
601    //Compare
602    assertCheck();
603  }
604
605  /**
606   * Getting data from memstore and files
607   * @throws IOException
608   */
609  @Test
610  public void testGet_FromMemStoreAndFiles() throws IOException {
611    init(this.name.getMethodName());
612
613    //Put data in memstore
614    this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
615    this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null);
616    //flush
617    flush(1);
618
619    //Add more data
620    this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null), null);
621    this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null), null);
622    //flush
623    flush(2);
624
625    //Add more data
626    this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null), null);
627    this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null), null);
628
629    //Get
630    result = HBaseTestingUtility.getFromStoreFile(store,
631        get.getRow(), qualifiers);
632
633    //Need to sort the result since multiple files
634    Collections.sort(result, CellComparatorImpl.COMPARATOR);
635
636    //Compare
637    assertCheck();
638  }
639
640  private void flush(int storeFilessize) throws IOException{
641    this.store.snapshot();
642    flushStore(store, id++);
643    assertEquals(storeFilessize, this.store.getStorefiles().size());
644    assertEquals(0, ((AbstractMemStore)this.store.memstore).getActive().getCellsCount());
645  }
646
647  private void assertCheck() {
648    assertEquals(expected.size(), result.size());
649    for(int i=0; i<expected.size(); i++) {
650      assertEquals(expected.get(i), result.get(i));
651    }
652  }
653
654  @After
655  public void tearDown() throws Exception {
656    EnvironmentEdgeManagerTestHelper.reset();
657    if (store != null) {
658      try {
659        store.close();
660      } catch (IOException e) {
661      }
662      store = null;
663    }
664    if (region != null) {
665      region.close();
666      region = null;
667    }
668  }
669
670  @AfterClass
671  public static void tearDownAfterClass() throws IOException {
672    TEST_UTIL.cleanupTestDir();
673  }
674
675  @Test
676  public void testHandleErrorsInFlush() throws Exception {
677    LOG.info("Setting up a faulty file system that cannot write");
678
679    final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
680    User user = User.createUserForTesting(conf,
681        "testhandleerrorsinflush", new String[]{"foo"});
682    // Inject our faulty LocalFileSystem
683    conf.setClass("fs.file.impl", FaultyFileSystem.class,
684        FileSystem.class);
685    user.runAs(new PrivilegedExceptionAction<Object>() {
686      @Override
687      public Object run() throws Exception {
688        // Make sure it worked (above is sensitive to caching details in hadoop core)
689        FileSystem fs = FileSystem.get(conf);
690        assertEquals(FaultyFileSystem.class, fs.getClass());
691
692        // Initialize region
693        init(name.getMethodName(), conf);
694
695        LOG.info("Adding some data");
696        store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
697        store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null);
698        store.add(new KeyValue(row, family, qf3, 1, (byte[])null), null);
699
700        LOG.info("Before flush, we should have no files");
701
702        Collection<StoreFileInfo> files =
703          store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName());
704        assertEquals(0, files != null ? files.size() : 0);
705
706        //flush
707        try {
708          LOG.info("Flushing");
709          flush(1);
710          fail("Didn't bubble up IOE!");
711        } catch (IOException ioe) {
712          assertTrue(ioe.getMessage().contains("Fault injected"));
713        }
714
715        LOG.info("After failed flush, we should still have no files!");
716        files = store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName());
717        assertEquals(0, files != null ? files.size() : 0);
718        store.getHRegion().getWAL().close();
719        return null;
720      }
721    });
722    FileSystem.closeAllForUGI(user.getUGI());
723  }
724
725  /**
726   * Faulty file system that will fail if you write past its fault position the FIRST TIME
727   * only; thereafter it will succeed.  Used by {@link TestHRegion} too.
728   */
729  static class FaultyFileSystem extends FilterFileSystem {
730    List<SoftReference<FaultyOutputStream>> outStreams = new ArrayList<>();
731    private long faultPos = 200;
732    AtomicBoolean fault = new AtomicBoolean(true);
733
734    public FaultyFileSystem() {
735      super(new LocalFileSystem());
736      System.err.println("Creating faulty!");
737    }
738
739    @Override
740    public FSDataOutputStream create(Path p) throws IOException {
741      return new FaultyOutputStream(super.create(p), faultPos, fault);
742    }
743
744    @Override
745    public FSDataOutputStream create(Path f, FsPermission permission,
746        boolean overwrite, int bufferSize, short replication, long blockSize,
747        Progressable progress) throws IOException {
748      return new FaultyOutputStream(super.create(f, permission,
749          overwrite, bufferSize, replication, blockSize, progress), faultPos, fault);
750    }
751
752    @Override
753    public FSDataOutputStream createNonRecursive(Path f, boolean overwrite,
754        int bufferSize, short replication, long blockSize, Progressable progress)
755    throws IOException {
756      // Fake it.  Call create instead.  The default implementation throws an IOE
757      // that this is not supported.
758      return create(f, overwrite, bufferSize, replication, blockSize, progress);
759    }
760  }
761
762  static class FaultyOutputStream extends FSDataOutputStream {
763    volatile long faultPos = Long.MAX_VALUE;
764    private final AtomicBoolean fault;
765
766    public FaultyOutputStream(FSDataOutputStream out, long faultPos, final AtomicBoolean fault)
767    throws IOException {
768      super(out, null);
769      this.faultPos = faultPos;
770      this.fault = fault;
771    }
772
773    @Override
774    public synchronized void write(byte[] buf, int offset, int length) throws IOException {
775      System.err.println("faulty stream write at pos " + getPos());
776      injectFault();
777      super.write(buf, offset, length);
778    }
779
780    private void injectFault() throws IOException {
781      if (this.fault.get() && getPos() >= faultPos) {
782        throw new IOException("Fault injected");
783      }
784    }
785  }
786
787  private static void flushStore(HStore store, long id) throws IOException {
788    StoreFlushContext storeFlushCtx = store.createFlushContext(id, FlushLifeCycleTracker.DUMMY);
789    storeFlushCtx.prepare();
790    storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
791    storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
792  }
793
794  /**
795   * Generate a list of KeyValues for testing based on given parameters
796   * @param timestamps
797   * @param numRows
798   * @param qualifier
799   * @param family
800   * @return the rows key-value list
801   */
802  List<Cell> getKeyValueSet(long[] timestamps, int numRows,
803      byte[] qualifier, byte[] family) {
804    List<Cell> kvList = new ArrayList<>();
805    for (int i=1;i<=numRows;i++) {
806      byte[] b = Bytes.toBytes(i);
807      for (long timestamp: timestamps) {
808        kvList.add(new KeyValue(b, family, qualifier, timestamp, b));
809      }
810    }
811    return kvList;
812  }
813
814  /**
815   * Test to ensure correctness when using Stores with multiple timestamps
816   * @throws IOException
817   */
818  @Test
819  public void testMultipleTimestamps() throws IOException {
820    int numRows = 1;
821    long[] timestamps1 = new long[] {1,5,10,20};
822    long[] timestamps2 = new long[] {30,80};
823
824    init(this.name.getMethodName());
825
826    List<Cell> kvList1 = getKeyValueSet(timestamps1,numRows, qf1, family);
827    for (Cell kv : kvList1) {
828      this.store.add(kv, null);
829    }
830
831    this.store.snapshot();
832    flushStore(store, id++);
833
834    List<Cell> kvList2 = getKeyValueSet(timestamps2,numRows, qf1, family);
835    for(Cell kv : kvList2) {
836      this.store.add(kv, null);
837    }
838
839    List<Cell> result;
840    Get get = new Get(Bytes.toBytes(1));
841    get.addColumn(family,qf1);
842
843    get.setTimeRange(0,15);
844    result = HBaseTestingUtility.getFromStoreFile(store, get);
845    assertTrue(result.size()>0);
846
847    get.setTimeRange(40,90);
848    result = HBaseTestingUtility.getFromStoreFile(store, get);
849    assertTrue(result.size()>0);
850
851    get.setTimeRange(10,45);
852    result = HBaseTestingUtility.getFromStoreFile(store, get);
853    assertTrue(result.size()>0);
854
855    get.setTimeRange(80,145);
856    result = HBaseTestingUtility.getFromStoreFile(store, get);
857    assertTrue(result.size()>0);
858
859    get.setTimeRange(1,2);
860    result = HBaseTestingUtility.getFromStoreFile(store, get);
861    assertTrue(result.size()>0);
862
863    get.setTimeRange(90,200);
864    result = HBaseTestingUtility.getFromStoreFile(store, get);
865    assertTrue(result.size()==0);
866  }
867
868  /**
869   * Test for HBASE-3492 - Test split on empty colfam (no store files).
870   *
871   * @throws IOException When the IO operations fail.
872   */
873  @Test
874  public void testSplitWithEmptyColFam() throws IOException {
875    init(this.name.getMethodName());
876    assertFalse(store.getSplitPoint().isPresent());
877    store.getHRegion().forceSplit(null);
878    assertFalse(store.getSplitPoint().isPresent());
879    store.getHRegion().clearSplit();
880  }
881
882  @Test
883  public void testStoreUsesConfigurationFromHcdAndHtd() throws Exception {
884    final String CONFIG_KEY = "hbase.regionserver.thread.compaction.throttle";
885    long anyValue = 10;
886
887    // We'll check that it uses correct config and propagates it appropriately by going thru
888    // the simplest "real" path I can find - "throttleCompaction", which just checks whether
889    // a number we pass in is higher than some config value, inside compactionPolicy.
890    Configuration conf = HBaseConfiguration.create();
891    conf.setLong(CONFIG_KEY, anyValue);
892    init(name.getMethodName() + "-xml", conf);
893    assertTrue(store.throttleCompaction(anyValue + 1));
894    assertFalse(store.throttleCompaction(anyValue));
895
896    // HTD overrides XML.
897    --anyValue;
898    init(name.getMethodName() + "-htd", conf, TableDescriptorBuilder
899        .newBuilder(TableName.valueOf(table)).setValue(CONFIG_KEY, Long.toString(anyValue)),
900      ColumnFamilyDescriptorBuilder.of(family));
901    assertTrue(store.throttleCompaction(anyValue + 1));
902    assertFalse(store.throttleCompaction(anyValue));
903
904    // HCD overrides them both.
905    --anyValue;
906    init(name.getMethodName() + "-hcd", conf,
907      TableDescriptorBuilder.newBuilder(TableName.valueOf(table)).setValue(CONFIG_KEY,
908        Long.toString(anyValue)),
909      ColumnFamilyDescriptorBuilder.newBuilder(family).setValue(CONFIG_KEY, Long.toString(anyValue))
910          .build());
911    assertTrue(store.throttleCompaction(anyValue + 1));
912    assertFalse(store.throttleCompaction(anyValue));
913  }
914
915  public static class DummyStoreEngine extends DefaultStoreEngine {
916    public static DefaultCompactor lastCreatedCompactor = null;
917
918    @Override
919    protected void createComponents(Configuration conf, HStore store, CellComparator comparator)
920        throws IOException {
921      super.createComponents(conf, store, comparator);
922      lastCreatedCompactor = this.compactor;
923    }
924  }
925
926  @Test
927  public void testStoreUsesSearchEngineOverride() throws Exception {
928    Configuration conf = HBaseConfiguration.create();
929    conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DummyStoreEngine.class.getName());
930    init(this.name.getMethodName(), conf);
931    assertEquals(DummyStoreEngine.lastCreatedCompactor,
932      this.store.storeEngine.getCompactor());
933  }
934
935  private void addStoreFile() throws IOException {
936    HStoreFile f = this.store.getStorefiles().iterator().next();
937    Path storedir = f.getPath().getParent();
938    long seqid = this.store.getMaxSequenceId().orElse(0L);
939    Configuration c = TEST_UTIL.getConfiguration();
940    FileSystem fs = FileSystem.get(c);
941    HFileContext fileContext = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build();
942    StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c),
943        fs)
944            .withOutputDir(storedir)
945            .withFileContext(fileContext)
946            .build();
947    w.appendMetadata(seqid + 1, false);
948    w.close();
949    LOG.info("Added store file:" + w.getPath());
950  }
951
952  private void archiveStoreFile(int index) throws IOException {
953    Collection<HStoreFile> files = this.store.getStorefiles();
954    HStoreFile sf = null;
955    Iterator<HStoreFile> it = files.iterator();
956    for (int i = 0; i <= index; i++) {
957      sf = it.next();
958    }
959    store.getRegionFileSystem().removeStoreFiles(store.getColumnFamilyName(), Lists.newArrayList(sf));
960  }
961
962  private void closeCompactedFile(int index) throws IOException {
963    Collection<HStoreFile> files =
964        this.store.getStoreEngine().getStoreFileManager().getCompactedfiles();
965    HStoreFile sf = null;
966    Iterator<HStoreFile> it = files.iterator();
967    for (int i = 0; i <= index; i++) {
968      sf = it.next();
969    }
970    sf.closeStoreFile(true);
971    store.getStoreEngine().getStoreFileManager().removeCompactedFiles(Lists.newArrayList(sf));
972  }
973
974  @Test
975  public void testRefreshStoreFiles() throws Exception {
976    init(name.getMethodName());
977
978    assertEquals(0, this.store.getStorefilesCount());
979
980    // Test refreshing store files when no store files are there
981    store.refreshStoreFiles();
982    assertEquals(0, this.store.getStorefilesCount());
983
984    // add some data, flush
985    this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
986    flush(1);
987    assertEquals(1, this.store.getStorefilesCount());
988
989    // add one more file
990    addStoreFile();
991
992    assertEquals(1, this.store.getStorefilesCount());
993    store.refreshStoreFiles();
994    assertEquals(2, this.store.getStorefilesCount());
995
996    // add three more files
997    addStoreFile();
998    addStoreFile();
999    addStoreFile();
1000
1001    assertEquals(2, this.store.getStorefilesCount());
1002    store.refreshStoreFiles();
1003    assertEquals(5, this.store.getStorefilesCount());
1004
1005    closeCompactedFile(0);
1006    archiveStoreFile(0);
1007
1008    assertEquals(5, this.store.getStorefilesCount());
1009    store.refreshStoreFiles();
1010    assertEquals(4, this.store.getStorefilesCount());
1011
1012    archiveStoreFile(0);
1013    archiveStoreFile(1);
1014    archiveStoreFile(2);
1015
1016    assertEquals(4, this.store.getStorefilesCount());
1017    store.refreshStoreFiles();
1018    assertEquals(1, this.store.getStorefilesCount());
1019
1020    archiveStoreFile(0);
1021    store.refreshStoreFiles();
1022    assertEquals(0, this.store.getStorefilesCount());
1023  }
1024
1025  @Test
1026  public void testRefreshStoreFilesNotChanged() throws IOException {
1027    init(name.getMethodName());
1028
1029    assertEquals(0, this.store.getStorefilesCount());
1030
1031    // add some data, flush
1032    this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
1033    flush(1);
1034    // add one more file
1035    addStoreFile();
1036
1037    HStore spiedStore = spy(store);
1038
1039    // call first time after files changed
1040    spiedStore.refreshStoreFiles();
1041    assertEquals(2, this.store.getStorefilesCount());
1042    verify(spiedStore, times(1)).replaceStoreFiles(any(), any());
1043
1044    // call second time
1045    spiedStore.refreshStoreFiles();
1046
1047    //ensure that replaceStoreFiles is not called if files are not refreshed
1048    verify(spiedStore, times(0)).replaceStoreFiles(null, null);
1049  }
1050
1051  private long countMemStoreScanner(StoreScanner scanner) {
1052    if (scanner.currentScanners == null) {
1053      return 0;
1054    }
1055    return scanner.currentScanners.stream()
1056            .filter(s -> !s.isFileScanner())
1057            .count();
1058  }
1059
1060  @Test
1061  public void testNumberOfMemStoreScannersAfterFlush() throws IOException {
1062    long seqId = 100;
1063    long timestamp = System.currentTimeMillis();
1064    Cell cell0 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family)
1065        .setQualifier(qf1).setTimestamp(timestamp).setType(Cell.Type.Put)
1066        .setValue(qf1).build();
1067    PrivateCellUtil.setSequenceId(cell0, seqId);
1068    testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Collections.emptyList());
1069
1070    Cell cell1 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family)
1071        .setQualifier(qf2).setTimestamp(timestamp).setType(Cell.Type.Put)
1072        .setValue(qf1).build();
1073    PrivateCellUtil.setSequenceId(cell1, seqId);
1074    testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1));
1075
1076    seqId = 101;
1077    timestamp = System.currentTimeMillis();
1078    Cell cell2 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row2).setFamily(family)
1079        .setQualifier(qf2).setTimestamp(timestamp).setType(Cell.Type.Put)
1080        .setValue(qf1).build();
1081    PrivateCellUtil.setSequenceId(cell2, seqId);
1082    testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1, cell2));
1083  }
1084
1085  private void testNumberOfMemStoreScannersAfterFlush(List<Cell> inputCellsBeforeSnapshot,
1086      List<Cell> inputCellsAfterSnapshot) throws IOException {
1087    init(this.name.getMethodName() + "-" + inputCellsBeforeSnapshot.size());
1088    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1089    long seqId = Long.MIN_VALUE;
1090    for (Cell c : inputCellsBeforeSnapshot) {
1091      quals.add(CellUtil.cloneQualifier(c));
1092      seqId = Math.max(seqId, c.getSequenceId());
1093    }
1094    for (Cell c : inputCellsAfterSnapshot) {
1095      quals.add(CellUtil.cloneQualifier(c));
1096      seqId = Math.max(seqId, c.getSequenceId());
1097    }
1098    inputCellsBeforeSnapshot.forEach(c -> store.add(c, null));
1099    StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY);
1100    storeFlushCtx.prepare();
1101    inputCellsAfterSnapshot.forEach(c -> store.add(c, null));
1102    int numberOfMemScannersBeforeFlush = inputCellsAfterSnapshot.isEmpty() ? 1 : 2;
1103    try (StoreScanner s = (StoreScanner) store.getScanner(new Scan(), quals, seqId)) {
1104      // snapshot + active (if inputCellsAfterSnapshot isn't empty)
1105      assertEquals(numberOfMemScannersBeforeFlush, countMemStoreScanner(s));
1106      storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1107      storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1108      // snapshot has no data after flush
1109      int numberOfMemScannersAfterFlush = inputCellsAfterSnapshot.isEmpty() ? 0 : 1;
1110      boolean more;
1111      int cellCount = 0;
1112      do {
1113        List<Cell> cells = new ArrayList<>();
1114        more = s.next(cells);
1115        cellCount += cells.size();
1116        assertEquals(more ? numberOfMemScannersAfterFlush : 0, countMemStoreScanner(s));
1117      } while (more);
1118      assertEquals("The number of cells added before snapshot is " + inputCellsBeforeSnapshot.size()
1119          + ", The number of cells added after snapshot is " + inputCellsAfterSnapshot.size(),
1120          inputCellsBeforeSnapshot.size() + inputCellsAfterSnapshot.size(), cellCount);
1121      // the current scanners is cleared
1122      assertEquals(0, countMemStoreScanner(s));
1123    }
1124  }
1125
1126  private Cell createCell(byte[] qualifier, long ts, long sequenceId, byte[] value)
1127      throws IOException {
1128    return createCell(row, qualifier, ts, sequenceId, value);
1129  }
1130
1131  private Cell createCell(byte[] row, byte[] qualifier, long ts, long sequenceId, byte[] value)
1132      throws IOException {
1133    Cell c = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family)
1134        .setQualifier(qualifier).setTimestamp(ts).setType(Cell.Type.Put)
1135        .setValue(value).build();
1136    PrivateCellUtil.setSequenceId(c, sequenceId);
1137    return c;
1138  }
1139
1140  @Test
1141  public void testFlushBeforeCompletingScanWoFilter() throws IOException, InterruptedException {
1142    final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false);
1143    final int expectedSize = 3;
1144    testFlushBeforeCompletingScan(new MyListHook() {
1145      @Override
1146      public void hook(int currentSize) {
1147        if (currentSize == expectedSize - 1) {
1148          try {
1149            flushStore(store, id++);
1150            timeToGoNextRow.set(true);
1151          } catch (IOException e) {
1152            throw new RuntimeException(e);
1153          }
1154        }
1155      }
1156    }, new FilterBase() {
1157      @Override
1158      public Filter.ReturnCode filterCell(final Cell c) throws IOException {
1159        return ReturnCode.INCLUDE;
1160      }
1161    }, expectedSize);
1162  }
1163
1164  @Test
1165  public void testFlushBeforeCompletingScanWithFilter() throws IOException, InterruptedException {
1166    final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false);
1167    final int expectedSize = 2;
1168    testFlushBeforeCompletingScan(new MyListHook() {
1169      @Override
1170      public void hook(int currentSize) {
1171        if (currentSize == expectedSize - 1) {
1172          try {
1173            flushStore(store, id++);
1174            timeToGoNextRow.set(true);
1175          } catch (IOException e) {
1176            throw new RuntimeException(e);
1177          }
1178        }
1179      }
1180    }, new FilterBase() {
1181      @Override
1182      public Filter.ReturnCode filterCell(final Cell c) throws IOException {
1183        if (timeToGoNextRow.get()) {
1184          timeToGoNextRow.set(false);
1185          return ReturnCode.NEXT_ROW;
1186        } else {
1187          return ReturnCode.INCLUDE;
1188        }
1189      }
1190    }, expectedSize);
1191  }
1192
1193  @Test
1194  public void testFlushBeforeCompletingScanWithFilterHint() throws IOException,
1195      InterruptedException {
1196    final AtomicBoolean timeToGetHint = new AtomicBoolean(false);
1197    final int expectedSize = 2;
1198    testFlushBeforeCompletingScan(new MyListHook() {
1199      @Override
1200      public void hook(int currentSize) {
1201        if (currentSize == expectedSize - 1) {
1202          try {
1203            flushStore(store, id++);
1204            timeToGetHint.set(true);
1205          } catch (IOException e) {
1206            throw new RuntimeException(e);
1207          }
1208        }
1209      }
1210    }, new FilterBase() {
1211      @Override
1212      public Filter.ReturnCode filterCell(final Cell c) throws IOException {
1213        if (timeToGetHint.get()) {
1214          timeToGetHint.set(false);
1215          return Filter.ReturnCode.SEEK_NEXT_USING_HINT;
1216        } else {
1217          return Filter.ReturnCode.INCLUDE;
1218        }
1219      }
1220      @Override
1221      public Cell getNextCellHint(Cell currentCell) throws IOException {
1222        return currentCell;
1223      }
1224    }, expectedSize);
1225  }
1226
1227  private void testFlushBeforeCompletingScan(MyListHook hook, Filter filter, int expectedSize)
1228          throws IOException, InterruptedException {
1229    Configuration conf = HBaseConfiguration.create();
1230    byte[] r0 = Bytes.toBytes("row0");
1231    byte[] r1 = Bytes.toBytes("row1");
1232    byte[] r2 = Bytes.toBytes("row2");
1233    byte[] value0 = Bytes.toBytes("value0");
1234    byte[] value1 = Bytes.toBytes("value1");
1235    byte[] value2 = Bytes.toBytes("value2");
1236    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1237    long ts = EnvironmentEdgeManager.currentTime();
1238    long seqId = 100;
1239    init(name.getMethodName(), conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)),
1240      ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(1).build(),
1241      new MyStoreHook() {
1242        @Override
1243        public long getSmallestReadPoint(HStore store) {
1244          return seqId + 3;
1245        }
1246      });
1247    // The cells having the value0 won't be flushed to disk because the value of max version is 1
1248    store.add(createCell(r0, qf1, ts, seqId, value0), memStoreSizing);
1249    store.add(createCell(r0, qf2, ts, seqId, value0), memStoreSizing);
1250    store.add(createCell(r0, qf3, ts, seqId, value0), memStoreSizing);
1251    store.add(createCell(r1, qf1, ts + 1, seqId + 1, value1), memStoreSizing);
1252    store.add(createCell(r1, qf2, ts + 1, seqId + 1, value1), memStoreSizing);
1253    store.add(createCell(r1, qf3, ts + 1, seqId + 1, value1), memStoreSizing);
1254    store.add(createCell(r2, qf1, ts + 2, seqId + 2, value2), memStoreSizing);
1255    store.add(createCell(r2, qf2, ts + 2, seqId + 2, value2), memStoreSizing);
1256    store.add(createCell(r2, qf3, ts + 2, seqId + 2, value2), memStoreSizing);
1257    store.add(createCell(r1, qf1, ts + 3, seqId + 3, value1), memStoreSizing);
1258    store.add(createCell(r1, qf2, ts + 3, seqId + 3, value1), memStoreSizing);
1259    store.add(createCell(r1, qf3, ts + 3, seqId + 3, value1), memStoreSizing);
1260    List<Cell> myList = new MyList<>(hook);
1261    Scan scan = new Scan()
1262            .withStartRow(r1)
1263            .setFilter(filter);
1264    try (InternalScanner scanner = (InternalScanner) store.getScanner(
1265          scan, null, seqId + 3)){
1266      // r1
1267      scanner.next(myList);
1268      assertEquals(expectedSize, myList.size());
1269      for (Cell c : myList) {
1270        byte[] actualValue = CellUtil.cloneValue(c);
1271        assertTrue("expected:" + Bytes.toStringBinary(value1)
1272          + ", actual:" + Bytes.toStringBinary(actualValue)
1273          , Bytes.equals(actualValue, value1));
1274      }
1275      List<Cell> normalList = new ArrayList<>(3);
1276      // r2
1277      scanner.next(normalList);
1278      assertEquals(3, normalList.size());
1279      for (Cell c : normalList) {
1280        byte[] actualValue = CellUtil.cloneValue(c);
1281        assertTrue("expected:" + Bytes.toStringBinary(value2)
1282          + ", actual:" + Bytes.toStringBinary(actualValue)
1283          , Bytes.equals(actualValue, value2));
1284      }
1285    }
1286  }
1287
1288  @Test
1289  public void testCreateScannerAndSnapshotConcurrently() throws IOException, InterruptedException {
1290    Configuration conf = HBaseConfiguration.create();
1291    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore.class.getName());
1292    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
1293        .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
1294    byte[] value = Bytes.toBytes("value");
1295    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1296    long ts = EnvironmentEdgeManager.currentTime();
1297    long seqId = 100;
1298    // older data whihc shouldn't be "seen" by client
1299    store.add(createCell(qf1, ts, seqId, value), memStoreSizing);
1300    store.add(createCell(qf2, ts, seqId, value), memStoreSizing);
1301    store.add(createCell(qf3, ts, seqId, value), memStoreSizing);
1302    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1303    quals.add(qf1);
1304    quals.add(qf2);
1305    quals.add(qf3);
1306    StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY);
1307    MyCompactingMemStore.START_TEST.set(true);
1308    Runnable flush = () -> {
1309      // this is blocked until we create first scanner from pipeline and snapshot -- phase (1/5)
1310      // recreate the active memstore -- phase (4/5)
1311      storeFlushCtx.prepare();
1312    };
1313    ExecutorService service = Executors.newSingleThreadExecutor();
1314    service.submit(flush);
1315    // we get scanner from pipeline and snapshot but they are empty. -- phase (2/5)
1316    // this is blocked until we recreate the active memstore -- phase (3/5)
1317    // we get scanner from active memstore but it is empty -- phase (5/5)
1318    InternalScanner scanner = (InternalScanner) store.getScanner(
1319          new Scan(new Get(row)), quals, seqId + 1);
1320    service.shutdown();
1321    service.awaitTermination(20, TimeUnit.SECONDS);
1322    try {
1323      try {
1324        List<Cell> results = new ArrayList<>();
1325        scanner.next(results);
1326        assertEquals(3, results.size());
1327        for (Cell c : results) {
1328          byte[] actualValue = CellUtil.cloneValue(c);
1329          assertTrue("expected:" + Bytes.toStringBinary(value)
1330            + ", actual:" + Bytes.toStringBinary(actualValue)
1331            , Bytes.equals(actualValue, value));
1332        }
1333      } finally {
1334        scanner.close();
1335      }
1336    } finally {
1337      MyCompactingMemStore.START_TEST.set(false);
1338      storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1339      storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1340    }
1341  }
1342
1343  @Test
1344  public void testScanWithDoubleFlush() throws IOException {
1345    Configuration conf = HBaseConfiguration.create();
1346    // Initialize region
1347    MyStore myStore = initMyStore(name.getMethodName(), conf, new MyStoreHook(){
1348      @Override
1349      public void getScanners(MyStore store) throws IOException {
1350        final long tmpId = id++;
1351        ExecutorService s = Executors.newSingleThreadExecutor();
1352        s.submit(() -> {
1353          try {
1354            // flush the store before storescanner updates the scanners from store.
1355            // The current data will be flushed into files, and the memstore will
1356            // be clear.
1357            // -- phase (4/4)
1358            flushStore(store, tmpId);
1359          }catch (IOException ex) {
1360            throw new RuntimeException(ex);
1361          }
1362        });
1363        s.shutdown();
1364        try {
1365          // wait for the flush, the thread will be blocked in HStore#notifyChangedReadersObservers.
1366          s.awaitTermination(3, TimeUnit.SECONDS);
1367        } catch (InterruptedException ex) {
1368        }
1369      }
1370    });
1371    byte[] oldValue = Bytes.toBytes("oldValue");
1372    byte[] currentValue = Bytes.toBytes("currentValue");
1373    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1374    long ts = EnvironmentEdgeManager.currentTime();
1375    long seqId = 100;
1376    // older data whihc shouldn't be "seen" by client
1377    myStore.add(createCell(qf1, ts, seqId, oldValue), memStoreSizing);
1378    myStore.add(createCell(qf2, ts, seqId, oldValue), memStoreSizing);
1379    myStore.add(createCell(qf3, ts, seqId, oldValue), memStoreSizing);
1380    long snapshotId = id++;
1381    // push older data into snapshot -- phase (1/4)
1382    StoreFlushContext storeFlushCtx = store.createFlushContext(snapshotId, FlushLifeCycleTracker
1383        .DUMMY);
1384    storeFlushCtx.prepare();
1385
1386    // insert current data into active -- phase (2/4)
1387    myStore.add(createCell(qf1, ts + 1, seqId + 1, currentValue), memStoreSizing);
1388    myStore.add(createCell(qf2, ts + 1, seqId + 1, currentValue), memStoreSizing);
1389    myStore.add(createCell(qf3, ts + 1, seqId + 1, currentValue), memStoreSizing);
1390    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1391    quals.add(qf1);
1392    quals.add(qf2);
1393    quals.add(qf3);
1394    try (InternalScanner scanner = (InternalScanner) myStore.getScanner(
1395        new Scan(new Get(row)), quals, seqId + 1)) {
1396      // complete the flush -- phase (3/4)
1397      storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1398      storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1399
1400      List<Cell> results = new ArrayList<>();
1401      scanner.next(results);
1402      assertEquals(3, results.size());
1403      for (Cell c : results) {
1404        byte[] actualValue = CellUtil.cloneValue(c);
1405        assertTrue("expected:" + Bytes.toStringBinary(currentValue)
1406          + ", actual:" + Bytes.toStringBinary(actualValue)
1407          , Bytes.equals(actualValue, currentValue));
1408      }
1409    }
1410  }
1411
1412  @Test
1413  public void testReclaimChunkWhenScaning() throws IOException {
1414    init("testReclaimChunkWhenScaning");
1415    long ts = EnvironmentEdgeManager.currentTime();
1416    long seqId = 100;
1417    byte[] value = Bytes.toBytes("value");
1418    // older data whihc shouldn't be "seen" by client
1419    store.add(createCell(qf1, ts, seqId, value), null);
1420    store.add(createCell(qf2, ts, seqId, value), null);
1421    store.add(createCell(qf3, ts, seqId, value), null);
1422    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1423    quals.add(qf1);
1424    quals.add(qf2);
1425    quals.add(qf3);
1426    try (InternalScanner scanner = (InternalScanner) store.getScanner(
1427        new Scan(new Get(row)), quals, seqId)) {
1428      List<Cell> results = new MyList<>(size -> {
1429        switch (size) {
1430          // 1) we get the first cell (qf1)
1431          // 2) flush the data to have StoreScanner update inner scanners
1432          // 3) the chunk will be reclaimed after updaing
1433          case 1:
1434            try {
1435              flushStore(store, id++);
1436            } catch (IOException e) {
1437              throw new RuntimeException(e);
1438            }
1439            break;
1440          // 1) we get the second cell (qf2)
1441          // 2) add some cell to fill some byte into the chunk (we have only one chunk)
1442          case 2:
1443            try {
1444              byte[] newValue = Bytes.toBytes("newValue");
1445              // older data whihc shouldn't be "seen" by client
1446              store.add(createCell(qf1, ts + 1, seqId + 1, newValue), null);
1447              store.add(createCell(qf2, ts + 1, seqId + 1, newValue), null);
1448              store.add(createCell(qf3, ts + 1, seqId + 1, newValue), null);
1449            } catch (IOException e) {
1450              throw new RuntimeException(e);
1451            }
1452            break;
1453          default:
1454            break;
1455        }
1456      });
1457      scanner.next(results);
1458      assertEquals(3, results.size());
1459      for (Cell c : results) {
1460        byte[] actualValue = CellUtil.cloneValue(c);
1461        assertTrue("expected:" + Bytes.toStringBinary(value)
1462          + ", actual:" + Bytes.toStringBinary(actualValue)
1463          , Bytes.equals(actualValue, value));
1464      }
1465    }
1466  }
1467
1468  /**
1469   * If there are two running InMemoryFlushRunnable, the later InMemoryFlushRunnable
1470   * may change the versionedList. And the first InMemoryFlushRunnable will use the chagned
1471   * versionedList to remove the corresponding segments.
1472   * In short, there will be some segements which isn't in merge are removed.
1473   * @throws IOException
1474   * @throws InterruptedException
1475   */
1476  @Test
1477  public void testRunDoubleMemStoreCompactors() throws IOException, InterruptedException {
1478    int flushSize = 500;
1479    Configuration conf = HBaseConfiguration.create();
1480    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStoreWithCustomCompactor.class.getName());
1481    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.25);
1482    MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.set(0);
1483    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushSize));
1484    // Set the lower threshold to invoke the "MERGE" policy
1485    conf.set(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, String.valueOf(0));
1486    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
1487        .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
1488    byte[] value = Bytes.toBytes("thisisavarylargevalue");
1489    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1490    long ts = EnvironmentEdgeManager.currentTime();
1491    long seqId = 100;
1492    // older data whihc shouldn't be "seen" by client
1493    store.add(createCell(qf1, ts, seqId, value), memStoreSizing);
1494    store.add(createCell(qf2, ts, seqId, value), memStoreSizing);
1495    store.add(createCell(qf3, ts, seqId, value), memStoreSizing);
1496    assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
1497    StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY);
1498    storeFlushCtx.prepare();
1499    // This shouldn't invoke another in-memory flush because the first compactor thread
1500    // hasn't accomplished the in-memory compaction.
1501    store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing);
1502    store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing);
1503    store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing);
1504    assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
1505    //okay. Let the compaction be completed
1506    MyMemStoreCompactor.START_COMPACTOR_LATCH.countDown();
1507    CompactingMemStore mem = (CompactingMemStore) ((HStore)store).memstore;
1508    while (mem.isMemStoreFlushingInMemory()) {
1509      TimeUnit.SECONDS.sleep(1);
1510    }
1511    // This should invoke another in-memory flush.
1512    store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing);
1513    store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing);
1514    store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing);
1515    assertEquals(2, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
1516    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
1517      String.valueOf(TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE));
1518    storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1519    storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1520  }
1521
1522  @Test
1523  public void testAge() throws IOException {
1524    long currentTime = System.currentTimeMillis();
1525    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
1526    edge.setValue(currentTime);
1527    EnvironmentEdgeManager.injectEdge(edge);
1528    Configuration conf = TEST_UTIL.getConfiguration();
1529    ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(family);
1530    initHRegion(name.getMethodName(), conf,
1531      TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), hcd, null, false);
1532    HStore store = new HStore(region, hcd, conf, false) {
1533
1534      @Override
1535      protected StoreEngine<?, ?, ?, ?> createStoreEngine(HStore store, Configuration conf,
1536          CellComparator kvComparator) throws IOException {
1537        List<HStoreFile> storefiles =
1538            Arrays.asList(mockStoreFile(currentTime - 10), mockStoreFile(currentTime - 100),
1539              mockStoreFile(currentTime - 1000), mockStoreFile(currentTime - 10000));
1540        StoreFileManager sfm = mock(StoreFileManager.class);
1541        when(sfm.getStorefiles()).thenReturn(storefiles);
1542        StoreEngine<?, ?, ?, ?> storeEngine = mock(StoreEngine.class);
1543        when(storeEngine.getStoreFileManager()).thenReturn(sfm);
1544        return storeEngine;
1545      }
1546    };
1547    assertEquals(10L, store.getMinStoreFileAge().getAsLong());
1548    assertEquals(10000L, store.getMaxStoreFileAge().getAsLong());
1549    assertEquals((10 + 100 + 1000 + 10000) / 4.0, store.getAvgStoreFileAge().getAsDouble(), 1E-4);
1550  }
1551
1552  private HStoreFile mockStoreFile(long createdTime) {
1553    StoreFileInfo info = mock(StoreFileInfo.class);
1554    when(info.getCreatedTimestamp()).thenReturn(createdTime);
1555    HStoreFile sf = mock(HStoreFile.class);
1556    when(sf.getReader()).thenReturn(mock(StoreFileReader.class));
1557    when(sf.isHFile()).thenReturn(true);
1558    when(sf.getFileInfo()).thenReturn(info);
1559    return sf;
1560  }
1561
1562  private MyStore initMyStore(String methodName, Configuration conf, MyStoreHook hook)
1563      throws IOException {
1564    return (MyStore) init(methodName, conf,
1565      TableDescriptorBuilder.newBuilder(TableName.valueOf(table)),
1566      ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(5).build(), hook);
1567  }
1568
1569  private static class MyStore extends HStore {
1570    private final MyStoreHook hook;
1571
1572    MyStore(final HRegion region, final ColumnFamilyDescriptor family, final Configuration
1573        confParam, MyStoreHook hook, boolean switchToPread) throws IOException {
1574      super(region, family, confParam, false);
1575      this.hook = hook;
1576    }
1577
1578    @Override
1579    public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks,
1580        boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,
1581        boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt,
1582        boolean includeMemstoreScanner) throws IOException {
1583      hook.getScanners(this);
1584      return super.getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true,
1585        stopRow, false, readPt, includeMemstoreScanner);
1586    }
1587
1588    @Override
1589    public long getSmallestReadPoint() {
1590      return hook.getSmallestReadPoint(this);
1591    }
1592  }
1593
1594  private abstract static class MyStoreHook {
1595
1596    void getScanners(MyStore store) throws IOException {
1597    }
1598
1599    long getSmallestReadPoint(HStore store) {
1600      return store.getHRegion().getSmallestReadPoint();
1601    }
1602  }
1603
1604  @Test
1605  public void testSwitchingPreadtoStreamParallelyWithCompactionDischarger() throws Exception {
1606    Configuration conf = HBaseConfiguration.create();
1607    conf.set("hbase.hstore.engine.class", DummyStoreEngine.class.getName());
1608    conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 0);
1609    // Set the lower threshold to invoke the "MERGE" policy
1610    MyStore store = initMyStore(name.getMethodName(), conf, new MyStoreHook() {});
1611    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1612    long ts = System.currentTimeMillis();
1613    long seqID = 1L;
1614    // Add some data to the region and do some flushes
1615    for (int i = 1; i < 10; i++) {
1616      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1617        memStoreSizing);
1618    }
1619    // flush them
1620    flushStore(store, seqID);
1621    for (int i = 11; i < 20; i++) {
1622      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1623        memStoreSizing);
1624    }
1625    // flush them
1626    flushStore(store, seqID);
1627    for (int i = 21; i < 30; i++) {
1628      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1629        memStoreSizing);
1630    }
1631    // flush them
1632    flushStore(store, seqID);
1633
1634    assertEquals(3, store.getStorefilesCount());
1635    Scan scan = new Scan();
1636    scan.addFamily(family);
1637    Collection<HStoreFile> storefiles2 = store.getStorefiles();
1638    ArrayList<HStoreFile> actualStorefiles = Lists.newArrayList(storefiles2);
1639    StoreScanner storeScanner =
1640        (StoreScanner) store.getScanner(scan, scan.getFamilyMap().get(family), Long.MAX_VALUE);
1641    // get the current heap
1642    KeyValueHeap heap = storeScanner.heap;
1643    // create more store files
1644    for (int i = 31; i < 40; i++) {
1645      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1646        memStoreSizing);
1647    }
1648    // flush them
1649    flushStore(store, seqID);
1650
1651    for (int i = 41; i < 50; i++) {
1652      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1653        memStoreSizing);
1654    }
1655    // flush them
1656    flushStore(store, seqID);
1657    storefiles2 = store.getStorefiles();
1658    ArrayList<HStoreFile> actualStorefiles1 = Lists.newArrayList(storefiles2);
1659    actualStorefiles1.removeAll(actualStorefiles);
1660    // Do compaction
1661    MyThread thread = new MyThread(storeScanner);
1662    thread.start();
1663    store.replaceStoreFiles(actualStorefiles, actualStorefiles1);
1664    thread.join();
1665    KeyValueHeap heap2 = thread.getHeap();
1666    assertFalse(heap.equals(heap2));
1667  }
1668
1669  @Test
1670  public void testInMemoryCompactionTypeWithLowerCase() throws IOException, InterruptedException {
1671    Configuration conf = HBaseConfiguration.create();
1672    conf.set("hbase.systemtables.compacting.memstore.type", "eager");
1673    init(name.getMethodName(), conf,
1674      TableDescriptorBuilder.newBuilder(
1675        TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME, "meta".getBytes())),
1676      ColumnFamilyDescriptorBuilder.newBuilder(family)
1677        .setInMemoryCompaction(MemoryCompactionPolicy.NONE).build());
1678    assertTrue(((MemStoreCompactor) ((CompactingMemStore) store.memstore).compactor).toString()
1679      .startsWith("eager".toUpperCase()));
1680  }
1681
1682  @Test
1683  public void testSpaceQuotaChangeAfterReplacement() throws IOException {
1684    final TableName tn = TableName.valueOf(name.getMethodName());
1685    init(name.getMethodName());
1686
1687    RegionSizeStoreImpl sizeStore = new RegionSizeStoreImpl();
1688
1689    HStoreFile sf1 = mockStoreFileWithLength(1024L);
1690    HStoreFile sf2 = mockStoreFileWithLength(2048L);
1691    HStoreFile sf3 = mockStoreFileWithLength(4096L);
1692    HStoreFile sf4 = mockStoreFileWithLength(8192L);
1693
1694    RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tn).setStartKey(Bytes.toBytes("a"))
1695        .setEndKey(Bytes.toBytes("b")).build();
1696
1697    // Compacting two files down to one, reducing size
1698    sizeStore.put(regionInfo, 1024L + 4096L);
1699    store.updateSpaceQuotaAfterFileReplacement(
1700        sizeStore, regionInfo, Arrays.asList(sf1, sf3), Arrays.asList(sf2));
1701
1702    assertEquals(2048L, sizeStore.getRegionSize(regionInfo).getSize());
1703
1704    // The same file length in and out should have no change
1705    store.updateSpaceQuotaAfterFileReplacement(
1706        sizeStore, regionInfo, Arrays.asList(sf2), Arrays.asList(sf2));
1707
1708    assertEquals(2048L, sizeStore.getRegionSize(regionInfo).getSize());
1709
1710    // Increase the total size used
1711    store.updateSpaceQuotaAfterFileReplacement(
1712        sizeStore, regionInfo, Arrays.asList(sf2), Arrays.asList(sf3));
1713
1714    assertEquals(4096L, sizeStore.getRegionSize(regionInfo).getSize());
1715
1716    RegionInfo regionInfo2 = RegionInfoBuilder.newBuilder(tn).setStartKey(Bytes.toBytes("b"))
1717        .setEndKey(Bytes.toBytes("c")).build();
1718    store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo2, null, Arrays.asList(sf4));
1719
1720    assertEquals(8192L, sizeStore.getRegionSize(regionInfo2).getSize());
1721  }
1722
1723  @Test
1724  public void testHFileContextSetWithCFAndTable() throws Exception {
1725    init(this.name.getMethodName());
1726    StoreFileWriter writer = store.createWriterInTmp(10000L,
1727        Compression.Algorithm.NONE, false, true, false, true);
1728    HFileContext hFileContext = writer.getHFileWriter().getFileContext();
1729    assertArrayEquals(family, hFileContext.getColumnFamily());
1730    assertArrayEquals(table, hFileContext.getTableName());
1731  }
1732
1733  private HStoreFile mockStoreFileWithLength(long length) {
1734    HStoreFile sf = mock(HStoreFile.class);
1735    StoreFileReader sfr = mock(StoreFileReader.class);
1736    when(sf.isHFile()).thenReturn(true);
1737    when(sf.getReader()).thenReturn(sfr);
1738    when(sfr.length()).thenReturn(length);
1739    return sf;
1740  }
1741
1742  private static class MyThread extends Thread {
1743    private StoreScanner scanner;
1744    private KeyValueHeap heap;
1745
1746    public MyThread(StoreScanner scanner) {
1747      this.scanner = scanner;
1748    }
1749
1750    public KeyValueHeap getHeap() {
1751      return this.heap;
1752    }
1753
1754    @Override
1755    public void run() {
1756      scanner.trySwitchToStreamRead();
1757      heap = scanner.heap;
1758    }
1759  }
1760
1761  private static class MyMemStoreCompactor extends MemStoreCompactor {
1762    private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0);
1763    private static final CountDownLatch START_COMPACTOR_LATCH = new CountDownLatch(1);
1764    public MyMemStoreCompactor(CompactingMemStore compactingMemStore, MemoryCompactionPolicy
1765        compactionPolicy) throws IllegalArgumentIOException {
1766      super(compactingMemStore, compactionPolicy);
1767    }
1768
1769    @Override
1770    public boolean start() throws IOException {
1771      boolean isFirst = RUNNER_COUNT.getAndIncrement() == 0;
1772      if (isFirst) {
1773        try {
1774          START_COMPACTOR_LATCH.await();
1775          return super.start();
1776        } catch (InterruptedException ex) {
1777          throw new RuntimeException(ex);
1778        }
1779      }
1780      return super.start();
1781    }
1782  }
1783
1784  public static class MyCompactingMemStoreWithCustomCompactor extends CompactingMemStore {
1785    private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0);
1786    public MyCompactingMemStoreWithCustomCompactor(Configuration conf, CellComparatorImpl c,
1787        HStore store, RegionServicesForStores regionServices,
1788        MemoryCompactionPolicy compactionPolicy) throws IOException {
1789      super(conf, c, store, regionServices, compactionPolicy);
1790    }
1791
1792    @Override
1793    protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy)
1794        throws IllegalArgumentIOException {
1795      return new MyMemStoreCompactor(this, compactionPolicy);
1796    }
1797
1798    @Override
1799    protected boolean setInMemoryCompactionFlag() {
1800      boolean rval = super.setInMemoryCompactionFlag();
1801      if (rval) {
1802        RUNNER_COUNT.incrementAndGet();
1803        if (LOG.isDebugEnabled()) {
1804          LOG.debug("runner count: " + RUNNER_COUNT.get());
1805        }
1806      }
1807      return rval;
1808    }
1809  }
1810
1811  public static class MyCompactingMemStore extends CompactingMemStore {
1812    private static final AtomicBoolean START_TEST = new AtomicBoolean(false);
1813    private final CountDownLatch getScannerLatch = new CountDownLatch(1);
1814    private final CountDownLatch snapshotLatch = new CountDownLatch(1);
1815    public MyCompactingMemStore(Configuration conf, CellComparatorImpl c,
1816        HStore store, RegionServicesForStores regionServices,
1817        MemoryCompactionPolicy compactionPolicy) throws IOException {
1818      super(conf, c, store, regionServices, compactionPolicy);
1819    }
1820
1821    @Override
1822    protected List<KeyValueScanner> createList(int capacity) {
1823      if (START_TEST.get()) {
1824        try {
1825          getScannerLatch.countDown();
1826          snapshotLatch.await();
1827        } catch (InterruptedException e) {
1828          throw new RuntimeException(e);
1829        }
1830      }
1831      return new ArrayList<>(capacity);
1832    }
1833    @Override
1834    protected void pushActiveToPipeline(MutableSegment active) {
1835      if (START_TEST.get()) {
1836        try {
1837          getScannerLatch.await();
1838        } catch (InterruptedException e) {
1839          throw new RuntimeException(e);
1840        }
1841      }
1842
1843      super.pushActiveToPipeline(active);
1844      if (START_TEST.get()) {
1845        snapshotLatch.countDown();
1846      }
1847    }
1848  }
1849
1850  interface MyListHook {
1851    void hook(int currentSize);
1852  }
1853
1854  private static class MyList<T> implements List<T> {
1855    private final List<T> delegatee = new ArrayList<>();
1856    private final MyListHook hookAtAdd;
1857    MyList(final MyListHook hookAtAdd) {
1858      this.hookAtAdd = hookAtAdd;
1859    }
1860    @Override
1861    public int size() {return delegatee.size();}
1862
1863    @Override
1864    public boolean isEmpty() {return delegatee.isEmpty();}
1865
1866    @Override
1867    public boolean contains(Object o) {return delegatee.contains(o);}
1868
1869    @Override
1870    public Iterator<T> iterator() {return delegatee.iterator();}
1871
1872    @Override
1873    public Object[] toArray() {return delegatee.toArray();}
1874
1875    @Override
1876    public <R> R[] toArray(R[] a) {return delegatee.toArray(a);}
1877
1878    @Override
1879    public boolean add(T e) {
1880      hookAtAdd.hook(size());
1881      return delegatee.add(e);
1882    }
1883
1884    @Override
1885    public boolean remove(Object o) {return delegatee.remove(o);}
1886
1887    @Override
1888    public boolean containsAll(Collection<?> c) {return delegatee.containsAll(c);}
1889
1890    @Override
1891    public boolean addAll(Collection<? extends T> c) {return delegatee.addAll(c);}
1892
1893    @Override
1894    public boolean addAll(int index, Collection<? extends T> c) {return delegatee.addAll(index, c);}
1895
1896    @Override
1897    public boolean removeAll(Collection<?> c) {return delegatee.removeAll(c);}
1898
1899    @Override
1900    public boolean retainAll(Collection<?> c) {return delegatee.retainAll(c);}
1901
1902    @Override
1903    public void clear() {delegatee.clear();}
1904
1905    @Override
1906    public T get(int index) {return delegatee.get(index);}
1907
1908    @Override
1909    public T set(int index, T element) {return delegatee.set(index, element);}
1910
1911    @Override
1912    public void add(int index, T element) {delegatee.add(index, element);}
1913
1914    @Override
1915    public T remove(int index) {return delegatee.remove(index);}
1916
1917    @Override
1918    public int indexOf(Object o) {return delegatee.indexOf(o);}
1919
1920    @Override
1921    public int lastIndexOf(Object o) {return delegatee.lastIndexOf(o);}
1922
1923    @Override
1924    public ListIterator<T> listIterator() {return delegatee.listIterator();}
1925
1926    @Override
1927    public ListIterator<T> listIterator(int index) {return delegatee.listIterator(index);}
1928
1929    @Override
1930    public List<T> subList(int fromIndex, int toIndex) {return delegatee.subList(fromIndex, toIndex);}
1931  }
1932}