001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023import static org.junit.Assert.fail;
024import static org.mockito.ArgumentMatchers.any;
025import static org.mockito.Mockito.mock;
026import static org.mockito.Mockito.spy;
027import static org.mockito.Mockito.times;
028import static org.mockito.Mockito.verify;
029import static org.mockito.Mockito.when;
030
031import java.io.IOException;
032import java.lang.ref.SoftReference;
033import java.security.PrivilegedExceptionAction;
034import java.util.ArrayList;
035import java.util.Arrays;
036import java.util.Collection;
037import java.util.Collections;
038import java.util.Iterator;
039import java.util.List;
040import java.util.ListIterator;
041import java.util.NavigableSet;
042import java.util.TreeSet;
043import java.util.concurrent.ConcurrentSkipListSet;
044import java.util.concurrent.CountDownLatch;
045import java.util.concurrent.ExecutorService;
046import java.util.concurrent.Executors;
047import java.util.concurrent.ThreadPoolExecutor;
048import java.util.concurrent.TimeUnit;
049import java.util.concurrent.atomic.AtomicBoolean;
050import java.util.concurrent.atomic.AtomicInteger;
051import org.apache.hadoop.conf.Configuration;
052import org.apache.hadoop.fs.FSDataOutputStream;
053import org.apache.hadoop.fs.FileStatus;
054import org.apache.hadoop.fs.FileSystem;
055import org.apache.hadoop.fs.FilterFileSystem;
056import org.apache.hadoop.fs.LocalFileSystem;
057import org.apache.hadoop.fs.Path;
058import org.apache.hadoop.fs.permission.FsPermission;
059import org.apache.hadoop.hbase.Cell;
060import org.apache.hadoop.hbase.CellBuilderFactory;
061import org.apache.hadoop.hbase.CellBuilderType;
062import org.apache.hadoop.hbase.CellComparator;
063import org.apache.hadoop.hbase.CellComparatorImpl;
064import org.apache.hadoop.hbase.CellUtil;
065import org.apache.hadoop.hbase.HBaseClassTestRule;
066import org.apache.hadoop.hbase.HBaseConfiguration;
067import org.apache.hadoop.hbase.HBaseTestingUtility;
068import org.apache.hadoop.hbase.HConstants;
069import org.apache.hadoop.hbase.KeyValue;
070import org.apache.hadoop.hbase.MemoryCompactionPolicy;
071import org.apache.hadoop.hbase.PrivateCellUtil;
072import org.apache.hadoop.hbase.TableName;
073import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
074import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
075import org.apache.hadoop.hbase.client.Get;
076import org.apache.hadoop.hbase.client.RegionInfo;
077import org.apache.hadoop.hbase.client.RegionInfoBuilder;
078import org.apache.hadoop.hbase.client.Scan;
079import org.apache.hadoop.hbase.client.TableDescriptor;
080import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
081import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
082import org.apache.hadoop.hbase.filter.Filter;
083import org.apache.hadoop.hbase.filter.FilterBase;
084import org.apache.hadoop.hbase.io.compress.Compression;
085import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
086import org.apache.hadoop.hbase.io.hfile.CacheConfig;
087import org.apache.hadoop.hbase.io.hfile.HFile;
088import org.apache.hadoop.hbase.io.hfile.HFileContext;
089import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
090import org.apache.hadoop.hbase.monitoring.MonitoredTask;
091import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
092import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
093import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
094import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
095import org.apache.hadoop.hbase.security.User;
096import org.apache.hadoop.hbase.testclassification.MediumTests;
097import org.apache.hadoop.hbase.testclassification.RegionServerTests;
098import org.apache.hadoop.hbase.util.Bytes;
099import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
100import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
101import org.apache.hadoop.hbase.util.FSUtils;
102import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
103import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
104import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
105import org.apache.hadoop.hbase.wal.WALFactory;
106import org.apache.hadoop.util.Progressable;
107import org.junit.After;
108import org.junit.AfterClass;
109import org.junit.Before;
110import org.junit.ClassRule;
111import org.junit.Rule;
112import org.junit.Test;
113import org.junit.experimental.categories.Category;
114import org.junit.rules.TestName;
115import org.mockito.Mockito;
116import org.slf4j.Logger;
117import org.slf4j.LoggerFactory;
118
119import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
120
121/**
122 * Test class for the HStore
123 */
124@Category({ RegionServerTests.class, MediumTests.class })
125public class TestHStore {
126
127  @ClassRule
128  public static final HBaseClassTestRule CLASS_RULE =
129      HBaseClassTestRule.forClass(TestHStore.class);
130
131  private static final Logger LOG = LoggerFactory.getLogger(TestHStore.class);
132  @Rule
133  public TestName name = new TestName();
134
135  HRegion region;
136  HStore store;
137  byte [] table = Bytes.toBytes("table");
138  byte [] family = Bytes.toBytes("family");
139
140  byte [] row = Bytes.toBytes("row");
141  byte [] row2 = Bytes.toBytes("row2");
142  byte [] qf1 = Bytes.toBytes("qf1");
143  byte [] qf2 = Bytes.toBytes("qf2");
144  byte [] qf3 = Bytes.toBytes("qf3");
145  byte [] qf4 = Bytes.toBytes("qf4");
146  byte [] qf5 = Bytes.toBytes("qf5");
147  byte [] qf6 = Bytes.toBytes("qf6");
148
149  NavigableSet<byte[]> qualifiers = new ConcurrentSkipListSet<>(Bytes.BYTES_COMPARATOR);
150
151  List<Cell> expected = new ArrayList<>();
152  List<Cell> result = new ArrayList<>();
153
154  long id = System.currentTimeMillis();
155  Get get = new Get(row);
156
157  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
158  private static final String DIR = TEST_UTIL.getDataTestDir("TestStore").toString();
159
160
161  /**
162   * Setup
163   * @throws IOException
164   */
165  @Before
166  public void setUp() throws IOException {
167    qualifiers.add(qf1);
168    qualifiers.add(qf3);
169    qualifiers.add(qf5);
170
171    Iterator<byte[]> iter = qualifiers.iterator();
172    while(iter.hasNext()){
173      byte [] next = iter.next();
174      expected.add(new KeyValue(row, family, next, 1, (byte[])null));
175      get.addColumn(family, next);
176    }
177  }
178
179  private void init(String methodName) throws IOException {
180    init(methodName, TEST_UTIL.getConfiguration());
181  }
182
183  private HStore init(String methodName, Configuration conf) throws IOException {
184    // some of the tests write 4 versions and then flush
185    // (with HBASE-4241, lower versions are collected on flush)
186    return init(methodName, conf,
187      ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(4).build());
188  }
189
190  private HStore init(String methodName, Configuration conf, ColumnFamilyDescriptor hcd)
191      throws IOException {
192    return init(methodName, conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), hcd);
193  }
194
195  private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder,
196      ColumnFamilyDescriptor hcd) throws IOException {
197    return init(methodName, conf, builder, hcd, null);
198  }
199
200  private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder,
201      ColumnFamilyDescriptor hcd, MyStoreHook hook) throws IOException {
202    return init(methodName, conf, builder, hcd, hook, false);
203  }
204
205  private void initHRegion(String methodName, Configuration conf, TableDescriptorBuilder builder,
206      ColumnFamilyDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException {
207    TableDescriptor htd = builder.setColumnFamily(hcd).build();
208    Path basedir = new Path(DIR + methodName);
209    Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName());
210    final Path logdir = new Path(basedir, AbstractFSWALProvider.getWALDirectoryName(methodName));
211
212    FileSystem fs = FileSystem.get(conf);
213
214    fs.delete(logdir, true);
215    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false,
216      MemStoreLABImpl.CHUNK_SIZE_DEFAULT, 1, 0, null);
217    RegionInfo info = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
218    Configuration walConf = new Configuration(conf);
219    FSUtils.setRootDir(walConf, basedir);
220    WALFactory wals = new WALFactory(walConf, methodName);
221    region = new HRegion(new HRegionFileSystem(conf, fs, tableDir, info), wals.getWAL(info), conf,
222        htd, null);
223    region.regionServicesForStores = Mockito.spy(region.regionServicesForStores);
224    ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
225    Mockito.when(region.regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool);
226  }
227
228  private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder,
229      ColumnFamilyDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException {
230    initHRegion(methodName, conf, builder, hcd, hook, switchToPread);
231    if (hook == null) {
232      store = new HStore(region, hcd, conf);
233    } else {
234      store = new MyStore(region, hcd, conf, hook, switchToPread);
235    }
236    return store;
237  }
238
239  /**
240   * Test we do not lose data if we fail a flush and then close.
241   * Part of HBase-10466
242   * @throws Exception
243   */
244  @Test
245  public void testFlushSizeSizing() throws Exception {
246    LOG.info("Setting up a faulty file system that cannot write in " + this.name.getMethodName());
247    final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
248    // Only retry once.
249    conf.setInt("hbase.hstore.flush.retries.number", 1);
250    User user = User.createUserForTesting(conf, this.name.getMethodName(),
251      new String[]{"foo"});
252    // Inject our faulty LocalFileSystem
253    conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
254    user.runAs(new PrivilegedExceptionAction<Object>() {
255      @Override
256      public Object run() throws Exception {
257        // Make sure it worked (above is sensitive to caching details in hadoop core)
258        FileSystem fs = FileSystem.get(conf);
259        assertEquals(FaultyFileSystem.class, fs.getClass());
260        FaultyFileSystem ffs = (FaultyFileSystem)fs;
261
262        // Initialize region
263        init(name.getMethodName(), conf);
264
265        MemStoreSize mss = store.memstore.getFlushableSize();
266        assertEquals(0, mss.getDataSize());
267        LOG.info("Adding some data");
268        MemStoreSizing kvSize = new NonThreadSafeMemStoreSizing();
269        store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), kvSize);
270        // add the heap size of active (mutable) segment
271        kvSize.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0, 0);
272        mss = store.memstore.getFlushableSize();
273        assertEquals(kvSize.getMemStoreSize(), mss);
274        // Flush.  Bug #1 from HBASE-10466.  Make sure size calculation on failed flush is right.
275        try {
276          LOG.info("Flushing");
277          flushStore(store, id++);
278          fail("Didn't bubble up IOE!");
279        } catch (IOException ioe) {
280          assertTrue(ioe.getMessage().contains("Fault injected"));
281        }
282        // due to snapshot, change mutable to immutable segment
283        kvSize.incMemStoreSize(0,
284          CSLMImmutableSegment.DEEP_OVERHEAD_CSLM - MutableSegment.DEEP_OVERHEAD, 0, 0);
285        mss = store.memstore.getFlushableSize();
286        assertEquals(kvSize.getMemStoreSize(), mss);
287        MemStoreSizing kvSize2 = new NonThreadSafeMemStoreSizing();
288        store.add(new KeyValue(row, family, qf2, 2, (byte[]) null), kvSize2);
289        kvSize2.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0, 0);
290        // Even though we add a new kv, we expect the flushable size to be 'same' since we have
291        // not yet cleared the snapshot -- the above flush failed.
292        assertEquals(kvSize.getMemStoreSize(), mss);
293        ffs.fault.set(false);
294        flushStore(store, id++);
295        mss = store.memstore.getFlushableSize();
296        // Size should be the foreground kv size.
297        assertEquals(kvSize2.getMemStoreSize(), mss);
298        flushStore(store, id++);
299        mss = store.memstore.getFlushableSize();
300        assertEquals(0, mss.getDataSize());
301        assertEquals(MutableSegment.DEEP_OVERHEAD, mss.getHeapSize());
302        return null;
303      }
304    });
305  }
306
307  /**
308   * Verify that compression and data block encoding are respected by the
309   * Store.createWriterInTmp() method, used on store flush.
310   */
311  @Test
312  public void testCreateWriter() throws Exception {
313    Configuration conf = HBaseConfiguration.create();
314    FileSystem fs = FileSystem.get(conf);
315
316    ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.newBuilder(family)
317        .setCompressionType(Compression.Algorithm.GZ).setDataBlockEncoding(DataBlockEncoding.DIFF)
318        .build();
319    init(name.getMethodName(), conf, hcd);
320
321    // Test createWriterInTmp()
322    StoreFileWriter writer =
323        store.createWriterInTmp(4, hcd.getCompressionType(), false, true, false, false);
324    Path path = writer.getPath();
325    writer.append(new KeyValue(row, family, qf1, Bytes.toBytes(1)));
326    writer.append(new KeyValue(row, family, qf2, Bytes.toBytes(2)));
327    writer.append(new KeyValue(row2, family, qf1, Bytes.toBytes(3)));
328    writer.append(new KeyValue(row2, family, qf2, Bytes.toBytes(4)));
329    writer.close();
330
331    // Verify that compression and encoding settings are respected
332    HFile.Reader reader = HFile.createReader(fs, path, new CacheConfig(conf), true, conf);
333    assertEquals(hcd.getCompressionType(), reader.getCompressionAlgorithm());
334    assertEquals(hcd.getDataBlockEncoding(), reader.getDataBlockEncoding());
335    reader.close();
336  }
337
338  @Test
339  public void testDeleteExpiredStoreFiles() throws Exception {
340    testDeleteExpiredStoreFiles(0);
341    testDeleteExpiredStoreFiles(1);
342  }
343
344  /*
345   * @param minVersions the MIN_VERSIONS for the column family
346   */
347  public void testDeleteExpiredStoreFiles(int minVersions) throws Exception {
348    int storeFileNum = 4;
349    int ttl = 4;
350    IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
351    EnvironmentEdgeManagerTestHelper.injectEdge(edge);
352
353    Configuration conf = HBaseConfiguration.create();
354    // Enable the expired store file deletion
355    conf.setBoolean("hbase.store.delete.expired.storefile", true);
356    // Set the compaction threshold higher to avoid normal compactions.
357    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 5);
358
359    init(name.getMethodName() + "-" + minVersions, conf, ColumnFamilyDescriptorBuilder
360        .newBuilder(family).setMinVersions(minVersions).setTimeToLive(ttl).build());
361
362    long storeTtl = this.store.getScanInfo().getTtl();
363    long sleepTime = storeTtl / storeFileNum;
364    long timeStamp;
365    // There are 4 store files and the max time stamp difference among these
366    // store files will be (this.store.ttl / storeFileNum)
367    for (int i = 1; i <= storeFileNum; i++) {
368      LOG.info("Adding some data for the store file #" + i);
369      timeStamp = EnvironmentEdgeManager.currentTime();
370      this.store.add(new KeyValue(row, family, qf1, timeStamp, (byte[]) null), null);
371      this.store.add(new KeyValue(row, family, qf2, timeStamp, (byte[]) null), null);
372      this.store.add(new KeyValue(row, family, qf3, timeStamp, (byte[]) null), null);
373      flush(i);
374      edge.incrementTime(sleepTime);
375    }
376
377    // Verify the total number of store files
378    assertEquals(storeFileNum, this.store.getStorefiles().size());
379
380     // Each call will find one expired store file and delete it before compaction happens.
381     // There will be no compaction due to threshold above. Last file will not be replaced.
382    for (int i = 1; i <= storeFileNum - 1; i++) {
383      // verify the expired store file.
384      assertFalse(this.store.requestCompaction().isPresent());
385      Collection<HStoreFile> sfs = this.store.getStorefiles();
386      // Ensure i files are gone.
387      if (minVersions == 0) {
388        assertEquals(storeFileNum - i, sfs.size());
389        // Ensure only non-expired files remain.
390        for (HStoreFile sf : sfs) {
391          assertTrue(sf.getReader().getMaxTimestamp() >= (edge.currentTime() - storeTtl));
392        }
393      } else {
394        assertEquals(storeFileNum, sfs.size());
395      }
396      // Let the next store file expired.
397      edge.incrementTime(sleepTime);
398    }
399    assertFalse(this.store.requestCompaction().isPresent());
400
401    Collection<HStoreFile> sfs = this.store.getStorefiles();
402    // Assert the last expired file is not removed.
403    if (minVersions == 0) {
404      assertEquals(1, sfs.size());
405    }
406    long ts = sfs.iterator().next().getReader().getMaxTimestamp();
407    assertTrue(ts < (edge.currentTime() - storeTtl));
408
409    for (HStoreFile sf : sfs) {
410      sf.closeStoreFile(true);
411    }
412  }
413
414  @Test
415  public void testLowestModificationTime() throws Exception {
416    Configuration conf = HBaseConfiguration.create();
417    FileSystem fs = FileSystem.get(conf);
418    // Initialize region
419    init(name.getMethodName(), conf);
420
421    int storeFileNum = 4;
422    for (int i = 1; i <= storeFileNum; i++) {
423      LOG.info("Adding some data for the store file #"+i);
424      this.store.add(new KeyValue(row, family, qf1, i, (byte[])null), null);
425      this.store.add(new KeyValue(row, family, qf2, i, (byte[])null), null);
426      this.store.add(new KeyValue(row, family, qf3, i, (byte[])null), null);
427      flush(i);
428    }
429    // after flush; check the lowest time stamp
430    long lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
431    long lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
432    assertEquals(lowestTimeStampFromManager,lowestTimeStampFromFS);
433
434    // after compact; check the lowest time stamp
435    store.compact(store.requestCompaction().get(), NoLimitThroughputController.INSTANCE, null);
436    lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
437    lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
438    assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS);
439  }
440
441  private static long getLowestTimeStampFromFS(FileSystem fs,
442      final Collection<HStoreFile> candidates) throws IOException {
443    long minTs = Long.MAX_VALUE;
444    if (candidates.isEmpty()) {
445      return minTs;
446    }
447    Path[] p = new Path[candidates.size()];
448    int i = 0;
449    for (HStoreFile sf : candidates) {
450      p[i] = sf.getPath();
451      ++i;
452    }
453
454    FileStatus[] stats = fs.listStatus(p);
455    if (stats == null || stats.length == 0) {
456      return minTs;
457    }
458    for (FileStatus s : stats) {
459      minTs = Math.min(minTs, s.getModificationTime());
460    }
461    return minTs;
462  }
463
464  //////////////////////////////////////////////////////////////////////////////
465  // Get tests
466  //////////////////////////////////////////////////////////////////////////////
467
468  private static final int BLOCKSIZE_SMALL = 8192;
469  /**
470   * Test for hbase-1686.
471   * @throws IOException
472   */
473  @Test
474  public void testEmptyStoreFile() throws IOException {
475    init(this.name.getMethodName());
476    // Write a store file.
477    this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
478    this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null);
479    flush(1);
480    // Now put in place an empty store file.  Its a little tricky.  Have to
481    // do manually with hacked in sequence id.
482    HStoreFile f = this.store.getStorefiles().iterator().next();
483    Path storedir = f.getPath().getParent();
484    long seqid = f.getMaxSequenceId();
485    Configuration c = HBaseConfiguration.create();
486    FileSystem fs = FileSystem.get(c);
487    HFileContext meta = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build();
488    StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c),
489        fs)
490            .withOutputDir(storedir)
491            .withFileContext(meta)
492            .build();
493    w.appendMetadata(seqid + 1, false);
494    w.close();
495    this.store.close();
496    // Reopen it... should pick up two files
497    this.store = new HStore(this.store.getHRegion(), this.store.getColumnFamilyDescriptor(), c);
498    assertEquals(2, this.store.getStorefilesCount());
499
500    result = HBaseTestingUtility.getFromStoreFile(store,
501        get.getRow(),
502        qualifiers);
503    assertEquals(1, result.size());
504  }
505
506  /**
507   * Getting data from memstore only
508   * @throws IOException
509   */
510  @Test
511  public void testGet_FromMemStoreOnly() throws IOException {
512    init(this.name.getMethodName());
513
514    //Put data in memstore
515    this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
516    this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null);
517    this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null), null);
518    this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null), null);
519    this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null), null);
520    this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null), null);
521
522    //Get
523    result = HBaseTestingUtility.getFromStoreFile(store,
524        get.getRow(), qualifiers);
525
526    //Compare
527    assertCheck();
528  }
529
530  @Test
531  public void testTimeRangeIfSomeCellsAreDroppedInFlush() throws IOException {
532    testTimeRangeIfSomeCellsAreDroppedInFlush(1);
533    testTimeRangeIfSomeCellsAreDroppedInFlush(3);
534    testTimeRangeIfSomeCellsAreDroppedInFlush(5);
535  }
536
537  private void testTimeRangeIfSomeCellsAreDroppedInFlush(int maxVersion) throws IOException {
538    init(this.name.getMethodName(), TEST_UTIL.getConfiguration(),
539    ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(maxVersion).build());
540    long currentTs = 100;
541    long minTs = currentTs;
542    // the extra cell won't be flushed to disk,
543    // so the min of timerange will be different between memStore and hfile.
544    for (int i = 0; i != (maxVersion + 1); ++i) {
545      this.store.add(new KeyValue(row, family, qf1, ++currentTs, (byte[])null), null);
546      if (i == 1) {
547        minTs = currentTs;
548      }
549    }
550    flushStore(store, id++);
551
552    Collection<HStoreFile> files = store.getStorefiles();
553    assertEquals(1, files.size());
554    HStoreFile f = files.iterator().next();
555    f.initReader();
556    StoreFileReader reader = f.getReader();
557    assertEquals(minTs, reader.timeRange.getMin());
558    assertEquals(currentTs, reader.timeRange.getMax());
559  }
560
561  /**
562   * Getting data from files only
563   * @throws IOException
564   */
565  @Test
566  public void testGet_FromFilesOnly() throws IOException {
567    init(this.name.getMethodName());
568
569    //Put data in memstore
570    this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
571    this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null);
572    //flush
573    flush(1);
574
575    //Add more data
576    this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null), null);
577    this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null), null);
578    //flush
579    flush(2);
580
581    //Add more data
582    this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null), null);
583    this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null), null);
584    //flush
585    flush(3);
586
587    //Get
588    result = HBaseTestingUtility.getFromStoreFile(store,
589        get.getRow(),
590        qualifiers);
591    //this.store.get(get, qualifiers, result);
592
593    //Need to sort the result since multiple files
594    Collections.sort(result, CellComparatorImpl.COMPARATOR);
595
596    //Compare
597    assertCheck();
598  }
599
600  /**
601   * Getting data from memstore and files
602   * @throws IOException
603   */
604  @Test
605  public void testGet_FromMemStoreAndFiles() throws IOException {
606    init(this.name.getMethodName());
607
608    //Put data in memstore
609    this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
610    this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null);
611    //flush
612    flush(1);
613
614    //Add more data
615    this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null), null);
616    this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null), null);
617    //flush
618    flush(2);
619
620    //Add more data
621    this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null), null);
622    this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null), null);
623
624    //Get
625    result = HBaseTestingUtility.getFromStoreFile(store,
626        get.getRow(), qualifiers);
627
628    //Need to sort the result since multiple files
629    Collections.sort(result, CellComparatorImpl.COMPARATOR);
630
631    //Compare
632    assertCheck();
633  }
634
635  private void flush(int storeFilessize) throws IOException{
636    this.store.snapshot();
637    flushStore(store, id++);
638    assertEquals(storeFilessize, this.store.getStorefiles().size());
639    assertEquals(0, ((AbstractMemStore)this.store.memstore).getActive().getCellsCount());
640  }
641
642  private void assertCheck() {
643    assertEquals(expected.size(), result.size());
644    for(int i=0; i<expected.size(); i++) {
645      assertEquals(expected.get(i), result.get(i));
646    }
647  }
648
649  @After
650  public void tearDown() throws Exception {
651    EnvironmentEdgeManagerTestHelper.reset();
652    if (store != null) {
653      try {
654        store.close();
655      } catch (IOException e) {
656      }
657      store = null;
658    }
659    if (region != null) {
660      region.close();
661      region = null;
662    }
663  }
664
665  @AfterClass
666  public static void tearDownAfterClass() throws IOException {
667    TEST_UTIL.cleanupTestDir();
668  }
669
670  @Test
671  public void testHandleErrorsInFlush() throws Exception {
672    LOG.info("Setting up a faulty file system that cannot write");
673
674    final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
675    User user = User.createUserForTesting(conf,
676        "testhandleerrorsinflush", new String[]{"foo"});
677    // Inject our faulty LocalFileSystem
678    conf.setClass("fs.file.impl", FaultyFileSystem.class,
679        FileSystem.class);
680    user.runAs(new PrivilegedExceptionAction<Object>() {
681      @Override
682      public Object run() throws Exception {
683        // Make sure it worked (above is sensitive to caching details in hadoop core)
684        FileSystem fs = FileSystem.get(conf);
685        assertEquals(FaultyFileSystem.class, fs.getClass());
686
687        // Initialize region
688        init(name.getMethodName(), conf);
689
690        LOG.info("Adding some data");
691        store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
692        store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null);
693        store.add(new KeyValue(row, family, qf3, 1, (byte[])null), null);
694
695        LOG.info("Before flush, we should have no files");
696
697        Collection<StoreFileInfo> files =
698          store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName());
699        assertEquals(0, files != null ? files.size() : 0);
700
701        //flush
702        try {
703          LOG.info("Flushing");
704          flush(1);
705          fail("Didn't bubble up IOE!");
706        } catch (IOException ioe) {
707          assertTrue(ioe.getMessage().contains("Fault injected"));
708        }
709
710        LOG.info("After failed flush, we should still have no files!");
711        files = store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName());
712        assertEquals(0, files != null ? files.size() : 0);
713        store.getHRegion().getWAL().close();
714        return null;
715      }
716    });
717    FileSystem.closeAllForUGI(user.getUGI());
718  }
719
720  /**
721   * Faulty file system that will fail if you write past its fault position the FIRST TIME
722   * only; thereafter it will succeed.  Used by {@link TestHRegion} too.
723   */
724  static class FaultyFileSystem extends FilterFileSystem {
725    List<SoftReference<FaultyOutputStream>> outStreams = new ArrayList<>();
726    private long faultPos = 200;
727    AtomicBoolean fault = new AtomicBoolean(true);
728
729    public FaultyFileSystem() {
730      super(new LocalFileSystem());
731      System.err.println("Creating faulty!");
732    }
733
734    @Override
735    public FSDataOutputStream create(Path p) throws IOException {
736      return new FaultyOutputStream(super.create(p), faultPos, fault);
737    }
738
739    @Override
740    public FSDataOutputStream create(Path f, FsPermission permission,
741        boolean overwrite, int bufferSize, short replication, long blockSize,
742        Progressable progress) throws IOException {
743      return new FaultyOutputStream(super.create(f, permission,
744          overwrite, bufferSize, replication, blockSize, progress), faultPos, fault);
745    }
746
747    @Override
748    public FSDataOutputStream createNonRecursive(Path f, boolean overwrite,
749        int bufferSize, short replication, long blockSize, Progressable progress)
750    throws IOException {
751      // Fake it.  Call create instead.  The default implementation throws an IOE
752      // that this is not supported.
753      return create(f, overwrite, bufferSize, replication, blockSize, progress);
754    }
755  }
756
757  static class FaultyOutputStream extends FSDataOutputStream {
758    volatile long faultPos = Long.MAX_VALUE;
759    private final AtomicBoolean fault;
760
761    public FaultyOutputStream(FSDataOutputStream out, long faultPos, final AtomicBoolean fault)
762    throws IOException {
763      super(out, null);
764      this.faultPos = faultPos;
765      this.fault = fault;
766    }
767
768    @Override
769    public synchronized void write(byte[] buf, int offset, int length) throws IOException {
770      System.err.println("faulty stream write at pos " + getPos());
771      injectFault();
772      super.write(buf, offset, length);
773    }
774
775    private void injectFault() throws IOException {
776      if (this.fault.get() && getPos() >= faultPos) {
777        throw new IOException("Fault injected");
778      }
779    }
780  }
781
782  private static void flushStore(HStore store, long id) throws IOException {
783    StoreFlushContext storeFlushCtx = store.createFlushContext(id, FlushLifeCycleTracker.DUMMY);
784    storeFlushCtx.prepare();
785    storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
786    storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
787  }
788
789  /**
790   * Generate a list of KeyValues for testing based on given parameters
791   * @param timestamps
792   * @param numRows
793   * @param qualifier
794   * @param family
795   * @return
796   */
797  List<Cell> getKeyValueSet(long[] timestamps, int numRows,
798      byte[] qualifier, byte[] family) {
799    List<Cell> kvList = new ArrayList<>();
800    for (int i=1;i<=numRows;i++) {
801      byte[] b = Bytes.toBytes(i);
802      for (long timestamp: timestamps) {
803        kvList.add(new KeyValue(b, family, qualifier, timestamp, b));
804      }
805    }
806    return kvList;
807  }
808
809  /**
810   * Test to ensure correctness when using Stores with multiple timestamps
811   * @throws IOException
812   */
813  @Test
814  public void testMultipleTimestamps() throws IOException {
815    int numRows = 1;
816    long[] timestamps1 = new long[] {1,5,10,20};
817    long[] timestamps2 = new long[] {30,80};
818
819    init(this.name.getMethodName());
820
821    List<Cell> kvList1 = getKeyValueSet(timestamps1,numRows, qf1, family);
822    for (Cell kv : kvList1) {
823      this.store.add(kv, null);
824    }
825
826    this.store.snapshot();
827    flushStore(store, id++);
828
829    List<Cell> kvList2 = getKeyValueSet(timestamps2,numRows, qf1, family);
830    for(Cell kv : kvList2) {
831      this.store.add(kv, null);
832    }
833
834    List<Cell> result;
835    Get get = new Get(Bytes.toBytes(1));
836    get.addColumn(family,qf1);
837
838    get.setTimeRange(0,15);
839    result = HBaseTestingUtility.getFromStoreFile(store, get);
840    assertTrue(result.size()>0);
841
842    get.setTimeRange(40,90);
843    result = HBaseTestingUtility.getFromStoreFile(store, get);
844    assertTrue(result.size()>0);
845
846    get.setTimeRange(10,45);
847    result = HBaseTestingUtility.getFromStoreFile(store, get);
848    assertTrue(result.size()>0);
849
850    get.setTimeRange(80,145);
851    result = HBaseTestingUtility.getFromStoreFile(store, get);
852    assertTrue(result.size()>0);
853
854    get.setTimeRange(1,2);
855    result = HBaseTestingUtility.getFromStoreFile(store, get);
856    assertTrue(result.size()>0);
857
858    get.setTimeRange(90,200);
859    result = HBaseTestingUtility.getFromStoreFile(store, get);
860    assertTrue(result.size()==0);
861  }
862
863  /**
864   * Test for HBASE-3492 - Test split on empty colfam (no store files).
865   *
866   * @throws IOException When the IO operations fail.
867   */
868  @Test
869  public void testSplitWithEmptyColFam() throws IOException {
870    init(this.name.getMethodName());
871    assertFalse(store.getSplitPoint().isPresent());
872    store.getHRegion().forceSplit(null);
873    assertFalse(store.getSplitPoint().isPresent());
874    store.getHRegion().clearSplit();
875  }
876
877  @Test
878  public void testStoreUsesConfigurationFromHcdAndHtd() throws Exception {
879    final String CONFIG_KEY = "hbase.regionserver.thread.compaction.throttle";
880    long anyValue = 10;
881
882    // We'll check that it uses correct config and propagates it appropriately by going thru
883    // the simplest "real" path I can find - "throttleCompaction", which just checks whether
884    // a number we pass in is higher than some config value, inside compactionPolicy.
885    Configuration conf = HBaseConfiguration.create();
886    conf.setLong(CONFIG_KEY, anyValue);
887    init(name.getMethodName() + "-xml", conf);
888    assertTrue(store.throttleCompaction(anyValue + 1));
889    assertFalse(store.throttleCompaction(anyValue));
890
891    // HTD overrides XML.
892    --anyValue;
893    init(name.getMethodName() + "-htd", conf, TableDescriptorBuilder
894        .newBuilder(TableName.valueOf(table)).setValue(CONFIG_KEY, Long.toString(anyValue)),
895      ColumnFamilyDescriptorBuilder.of(family));
896    assertTrue(store.throttleCompaction(anyValue + 1));
897    assertFalse(store.throttleCompaction(anyValue));
898
899    // HCD overrides them both.
900    --anyValue;
901    init(name.getMethodName() + "-hcd", conf,
902      TableDescriptorBuilder.newBuilder(TableName.valueOf(table)).setValue(CONFIG_KEY,
903        Long.toString(anyValue)),
904      ColumnFamilyDescriptorBuilder.newBuilder(family).setValue(CONFIG_KEY, Long.toString(anyValue))
905          .build());
906    assertTrue(store.throttleCompaction(anyValue + 1));
907    assertFalse(store.throttleCompaction(anyValue));
908  }
909
910  public static class DummyStoreEngine extends DefaultStoreEngine {
911    public static DefaultCompactor lastCreatedCompactor = null;
912
913    @Override
914    protected void createComponents(Configuration conf, HStore store, CellComparator comparator)
915        throws IOException {
916      super.createComponents(conf, store, comparator);
917      lastCreatedCompactor = this.compactor;
918    }
919  }
920
921  @Test
922  public void testStoreUsesSearchEngineOverride() throws Exception {
923    Configuration conf = HBaseConfiguration.create();
924    conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DummyStoreEngine.class.getName());
925    init(this.name.getMethodName(), conf);
926    assertEquals(DummyStoreEngine.lastCreatedCompactor,
927      this.store.storeEngine.getCompactor());
928  }
929
930  private void addStoreFile() throws IOException {
931    HStoreFile f = this.store.getStorefiles().iterator().next();
932    Path storedir = f.getPath().getParent();
933    long seqid = this.store.getMaxSequenceId().orElse(0L);
934    Configuration c = TEST_UTIL.getConfiguration();
935    FileSystem fs = FileSystem.get(c);
936    HFileContext fileContext = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build();
937    StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c),
938        fs)
939            .withOutputDir(storedir)
940            .withFileContext(fileContext)
941            .build();
942    w.appendMetadata(seqid + 1, false);
943    w.close();
944    LOG.info("Added store file:" + w.getPath());
945  }
946
947  private void archiveStoreFile(int index) throws IOException {
948    Collection<HStoreFile> files = this.store.getStorefiles();
949    HStoreFile sf = null;
950    Iterator<HStoreFile> it = files.iterator();
951    for (int i = 0; i <= index; i++) {
952      sf = it.next();
953    }
954    store.getRegionFileSystem().removeStoreFiles(store.getColumnFamilyName(), Lists.newArrayList(sf));
955  }
956
957  private void closeCompactedFile(int index) throws IOException {
958    Collection<HStoreFile> files =
959        this.store.getStoreEngine().getStoreFileManager().getCompactedfiles();
960    HStoreFile sf = null;
961    Iterator<HStoreFile> it = files.iterator();
962    for (int i = 0; i <= index; i++) {
963      sf = it.next();
964    }
965    sf.closeStoreFile(true);
966    store.getStoreEngine().getStoreFileManager().removeCompactedFiles(Lists.newArrayList(sf));
967  }
968
969  @Test
970  public void testRefreshStoreFiles() throws Exception {
971    init(name.getMethodName());
972
973    assertEquals(0, this.store.getStorefilesCount());
974
975    // Test refreshing store files when no store files are there
976    store.refreshStoreFiles();
977    assertEquals(0, this.store.getStorefilesCount());
978
979    // add some data, flush
980    this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
981    flush(1);
982    assertEquals(1, this.store.getStorefilesCount());
983
984    // add one more file
985    addStoreFile();
986
987    assertEquals(1, this.store.getStorefilesCount());
988    store.refreshStoreFiles();
989    assertEquals(2, this.store.getStorefilesCount());
990
991    // add three more files
992    addStoreFile();
993    addStoreFile();
994    addStoreFile();
995
996    assertEquals(2, this.store.getStorefilesCount());
997    store.refreshStoreFiles();
998    assertEquals(5, this.store.getStorefilesCount());
999
1000    closeCompactedFile(0);
1001    archiveStoreFile(0);
1002
1003    assertEquals(5, this.store.getStorefilesCount());
1004    store.refreshStoreFiles();
1005    assertEquals(4, this.store.getStorefilesCount());
1006
1007    archiveStoreFile(0);
1008    archiveStoreFile(1);
1009    archiveStoreFile(2);
1010
1011    assertEquals(4, this.store.getStorefilesCount());
1012    store.refreshStoreFiles();
1013    assertEquals(1, this.store.getStorefilesCount());
1014
1015    archiveStoreFile(0);
1016    store.refreshStoreFiles();
1017    assertEquals(0, this.store.getStorefilesCount());
1018  }
1019
1020  @Test
1021  public void testRefreshStoreFilesNotChanged() throws IOException {
1022    init(name.getMethodName());
1023
1024    assertEquals(0, this.store.getStorefilesCount());
1025
1026    // add some data, flush
1027    this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
1028    flush(1);
1029    // add one more file
1030    addStoreFile();
1031
1032    HStore spiedStore = spy(store);
1033
1034    // call first time after files changed
1035    spiedStore.refreshStoreFiles();
1036    assertEquals(2, this.store.getStorefilesCount());
1037    verify(spiedStore, times(1)).replaceStoreFiles(any(), any());
1038
1039    // call second time
1040    spiedStore.refreshStoreFiles();
1041
1042    //ensure that replaceStoreFiles is not called if files are not refreshed
1043    verify(spiedStore, times(0)).replaceStoreFiles(null, null);
1044  }
1045
1046  private long countMemStoreScanner(StoreScanner scanner) {
1047    if (scanner.currentScanners == null) {
1048      return 0;
1049    }
1050    return scanner.currentScanners.stream()
1051            .filter(s -> !s.isFileScanner())
1052            .count();
1053  }
1054
1055  @Test
1056  public void testNumberOfMemStoreScannersAfterFlush() throws IOException {
1057    long seqId = 100;
1058    long timestamp = System.currentTimeMillis();
1059    Cell cell0 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family)
1060        .setQualifier(qf1).setTimestamp(timestamp).setType(Cell.Type.Put)
1061        .setValue(qf1).build();
1062    PrivateCellUtil.setSequenceId(cell0, seqId);
1063    testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Collections.emptyList());
1064
1065    Cell cell1 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family)
1066        .setQualifier(qf2).setTimestamp(timestamp).setType(Cell.Type.Put)
1067        .setValue(qf1).build();
1068    PrivateCellUtil.setSequenceId(cell1, seqId);
1069    testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1));
1070
1071    seqId = 101;
1072    timestamp = System.currentTimeMillis();
1073    Cell cell2 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row2).setFamily(family)
1074        .setQualifier(qf2).setTimestamp(timestamp).setType(Cell.Type.Put)
1075        .setValue(qf1).build();
1076    PrivateCellUtil.setSequenceId(cell2, seqId);
1077    testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1, cell2));
1078  }
1079
1080  private void testNumberOfMemStoreScannersAfterFlush(List<Cell> inputCellsBeforeSnapshot,
1081      List<Cell> inputCellsAfterSnapshot) throws IOException {
1082    init(this.name.getMethodName() + "-" + inputCellsBeforeSnapshot.size());
1083    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1084    long seqId = Long.MIN_VALUE;
1085    for (Cell c : inputCellsBeforeSnapshot) {
1086      quals.add(CellUtil.cloneQualifier(c));
1087      seqId = Math.max(seqId, c.getSequenceId());
1088    }
1089    for (Cell c : inputCellsAfterSnapshot) {
1090      quals.add(CellUtil.cloneQualifier(c));
1091      seqId = Math.max(seqId, c.getSequenceId());
1092    }
1093    inputCellsBeforeSnapshot.forEach(c -> store.add(c, null));
1094    StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY);
1095    storeFlushCtx.prepare();
1096    inputCellsAfterSnapshot.forEach(c -> store.add(c, null));
1097    int numberOfMemScannersBeforeFlush = inputCellsAfterSnapshot.isEmpty() ? 1 : 2;
1098    try (StoreScanner s = (StoreScanner) store.getScanner(new Scan(), quals, seqId)) {
1099      // snapshot + active (if inputCellsAfterSnapshot isn't empty)
1100      assertEquals(numberOfMemScannersBeforeFlush, countMemStoreScanner(s));
1101      storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1102      storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1103      // snapshot has no data after flush
1104      int numberOfMemScannersAfterFlush = inputCellsAfterSnapshot.isEmpty() ? 0 : 1;
1105      boolean more;
1106      int cellCount = 0;
1107      do {
1108        List<Cell> cells = new ArrayList<>();
1109        more = s.next(cells);
1110        cellCount += cells.size();
1111        assertEquals(more ? numberOfMemScannersAfterFlush : 0, countMemStoreScanner(s));
1112      } while (more);
1113      assertEquals("The number of cells added before snapshot is " + inputCellsBeforeSnapshot.size()
1114          + ", The number of cells added after snapshot is " + inputCellsAfterSnapshot.size(),
1115          inputCellsBeforeSnapshot.size() + inputCellsAfterSnapshot.size(), cellCount);
1116      // the current scanners is cleared
1117      assertEquals(0, countMemStoreScanner(s));
1118    }
1119  }
1120
1121  private Cell createCell(byte[] qualifier, long ts, long sequenceId, byte[] value)
1122      throws IOException {
1123    return createCell(row, qualifier, ts, sequenceId, value);
1124  }
1125
1126  private Cell createCell(byte[] row, byte[] qualifier, long ts, long sequenceId, byte[] value)
1127      throws IOException {
1128    Cell c = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family)
1129        .setQualifier(qualifier).setTimestamp(ts).setType(Cell.Type.Put)
1130        .setValue(value).build();
1131    PrivateCellUtil.setSequenceId(c, sequenceId);
1132    return c;
1133  }
1134
1135  @Test
1136  public void testFlushBeforeCompletingScanWoFilter() throws IOException, InterruptedException {
1137    final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false);
1138    final int expectedSize = 3;
1139    testFlushBeforeCompletingScan(new MyListHook() {
1140      @Override
1141      public void hook(int currentSize) {
1142        if (currentSize == expectedSize - 1) {
1143          try {
1144            flushStore(store, id++);
1145            timeToGoNextRow.set(true);
1146          } catch (IOException e) {
1147            throw new RuntimeException(e);
1148          }
1149        }
1150      }
1151    }, new FilterBase() {
1152      @Override
1153      public Filter.ReturnCode filterCell(final Cell c) throws IOException {
1154        return ReturnCode.INCLUDE;
1155      }
1156    }, expectedSize);
1157  }
1158
1159  @Test
1160  public void testFlushBeforeCompletingScanWithFilter() throws IOException, InterruptedException {
1161    final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false);
1162    final int expectedSize = 2;
1163    testFlushBeforeCompletingScan(new MyListHook() {
1164      @Override
1165      public void hook(int currentSize) {
1166        if (currentSize == expectedSize - 1) {
1167          try {
1168            flushStore(store, id++);
1169            timeToGoNextRow.set(true);
1170          } catch (IOException e) {
1171            throw new RuntimeException(e);
1172          }
1173        }
1174      }
1175    }, new FilterBase() {
1176      @Override
1177      public Filter.ReturnCode filterCell(final Cell c) throws IOException {
1178        if (timeToGoNextRow.get()) {
1179          timeToGoNextRow.set(false);
1180          return ReturnCode.NEXT_ROW;
1181        } else {
1182          return ReturnCode.INCLUDE;
1183        }
1184      }
1185    }, expectedSize);
1186  }
1187
1188  @Test
1189  public void testFlushBeforeCompletingScanWithFilterHint() throws IOException,
1190      InterruptedException {
1191    final AtomicBoolean timeToGetHint = new AtomicBoolean(false);
1192    final int expectedSize = 2;
1193    testFlushBeforeCompletingScan(new MyListHook() {
1194      @Override
1195      public void hook(int currentSize) {
1196        if (currentSize == expectedSize - 1) {
1197          try {
1198            flushStore(store, id++);
1199            timeToGetHint.set(true);
1200          } catch (IOException e) {
1201            throw new RuntimeException(e);
1202          }
1203        }
1204      }
1205    }, new FilterBase() {
1206      @Override
1207      public Filter.ReturnCode filterCell(final Cell c) throws IOException {
1208        if (timeToGetHint.get()) {
1209          timeToGetHint.set(false);
1210          return Filter.ReturnCode.SEEK_NEXT_USING_HINT;
1211        } else {
1212          return Filter.ReturnCode.INCLUDE;
1213        }
1214      }
1215      @Override
1216      public Cell getNextCellHint(Cell currentCell) throws IOException {
1217        return currentCell;
1218      }
1219    }, expectedSize);
1220  }
1221
1222  private void testFlushBeforeCompletingScan(MyListHook hook, Filter filter, int expectedSize)
1223          throws IOException, InterruptedException {
1224    Configuration conf = HBaseConfiguration.create();
1225    byte[] r0 = Bytes.toBytes("row0");
1226    byte[] r1 = Bytes.toBytes("row1");
1227    byte[] r2 = Bytes.toBytes("row2");
1228    byte[] value0 = Bytes.toBytes("value0");
1229    byte[] value1 = Bytes.toBytes("value1");
1230    byte[] value2 = Bytes.toBytes("value2");
1231    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1232    long ts = EnvironmentEdgeManager.currentTime();
1233    long seqId = 100;
1234    init(name.getMethodName(), conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)),
1235      ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(1).build(),
1236      new MyStoreHook() {
1237        @Override
1238        public long getSmallestReadPoint(HStore store) {
1239          return seqId + 3;
1240        }
1241      });
1242    // The cells having the value0 won't be flushed to disk because the value of max version is 1
1243    store.add(createCell(r0, qf1, ts, seqId, value0), memStoreSizing);
1244    store.add(createCell(r0, qf2, ts, seqId, value0), memStoreSizing);
1245    store.add(createCell(r0, qf3, ts, seqId, value0), memStoreSizing);
1246    store.add(createCell(r1, qf1, ts + 1, seqId + 1, value1), memStoreSizing);
1247    store.add(createCell(r1, qf2, ts + 1, seqId + 1, value1), memStoreSizing);
1248    store.add(createCell(r1, qf3, ts + 1, seqId + 1, value1), memStoreSizing);
1249    store.add(createCell(r2, qf1, ts + 2, seqId + 2, value2), memStoreSizing);
1250    store.add(createCell(r2, qf2, ts + 2, seqId + 2, value2), memStoreSizing);
1251    store.add(createCell(r2, qf3, ts + 2, seqId + 2, value2), memStoreSizing);
1252    store.add(createCell(r1, qf1, ts + 3, seqId + 3, value1), memStoreSizing);
1253    store.add(createCell(r1, qf2, ts + 3, seqId + 3, value1), memStoreSizing);
1254    store.add(createCell(r1, qf3, ts + 3, seqId + 3, value1), memStoreSizing);
1255    List<Cell> myList = new MyList<>(hook);
1256    Scan scan = new Scan()
1257            .withStartRow(r1)
1258            .setFilter(filter);
1259    try (InternalScanner scanner = (InternalScanner) store.getScanner(
1260          scan, null, seqId + 3)){
1261      // r1
1262      scanner.next(myList);
1263      assertEquals(expectedSize, myList.size());
1264      for (Cell c : myList) {
1265        byte[] actualValue = CellUtil.cloneValue(c);
1266        assertTrue("expected:" + Bytes.toStringBinary(value1)
1267          + ", actual:" + Bytes.toStringBinary(actualValue)
1268          , Bytes.equals(actualValue, value1));
1269      }
1270      List<Cell> normalList = new ArrayList<>(3);
1271      // r2
1272      scanner.next(normalList);
1273      assertEquals(3, normalList.size());
1274      for (Cell c : normalList) {
1275        byte[] actualValue = CellUtil.cloneValue(c);
1276        assertTrue("expected:" + Bytes.toStringBinary(value2)
1277          + ", actual:" + Bytes.toStringBinary(actualValue)
1278          , Bytes.equals(actualValue, value2));
1279      }
1280    }
1281  }
1282
1283  @Test
1284  public void testCreateScannerAndSnapshotConcurrently() throws IOException, InterruptedException {
1285    Configuration conf = HBaseConfiguration.create();
1286    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore.class.getName());
1287    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
1288        .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
1289    byte[] value = Bytes.toBytes("value");
1290    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1291    long ts = EnvironmentEdgeManager.currentTime();
1292    long seqId = 100;
1293    // older data whihc shouldn't be "seen" by client
1294    store.add(createCell(qf1, ts, seqId, value), memStoreSizing);
1295    store.add(createCell(qf2, ts, seqId, value), memStoreSizing);
1296    store.add(createCell(qf3, ts, seqId, value), memStoreSizing);
1297    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1298    quals.add(qf1);
1299    quals.add(qf2);
1300    quals.add(qf3);
1301    StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY);
1302    MyCompactingMemStore.START_TEST.set(true);
1303    Runnable flush = () -> {
1304      // this is blocked until we create first scanner from pipeline and snapshot -- phase (1/5)
1305      // recreate the active memstore -- phase (4/5)
1306      storeFlushCtx.prepare();
1307    };
1308    ExecutorService service = Executors.newSingleThreadExecutor();
1309    service.submit(flush);
1310    // we get scanner from pipeline and snapshot but they are empty. -- phase (2/5)
1311    // this is blocked until we recreate the active memstore -- phase (3/5)
1312    // we get scanner from active memstore but it is empty -- phase (5/5)
1313    InternalScanner scanner = (InternalScanner) store.getScanner(
1314          new Scan(new Get(row)), quals, seqId + 1);
1315    service.shutdown();
1316    service.awaitTermination(20, TimeUnit.SECONDS);
1317    try {
1318      try {
1319        List<Cell> results = new ArrayList<>();
1320        scanner.next(results);
1321        assertEquals(3, results.size());
1322        for (Cell c : results) {
1323          byte[] actualValue = CellUtil.cloneValue(c);
1324          assertTrue("expected:" + Bytes.toStringBinary(value)
1325            + ", actual:" + Bytes.toStringBinary(actualValue)
1326            , Bytes.equals(actualValue, value));
1327        }
1328      } finally {
1329        scanner.close();
1330      }
1331    } finally {
1332      MyCompactingMemStore.START_TEST.set(false);
1333      storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1334      storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1335    }
1336  }
1337
1338  @Test
1339  public void testScanWithDoubleFlush() throws IOException {
1340    Configuration conf = HBaseConfiguration.create();
1341    // Initialize region
1342    MyStore myStore = initMyStore(name.getMethodName(), conf, new MyStoreHook(){
1343      @Override
1344      public void getScanners(MyStore store) throws IOException {
1345        final long tmpId = id++;
1346        ExecutorService s = Executors.newSingleThreadExecutor();
1347        s.submit(() -> {
1348          try {
1349            // flush the store before storescanner updates the scanners from store.
1350            // The current data will be flushed into files, and the memstore will
1351            // be clear.
1352            // -- phase (4/4)
1353            flushStore(store, tmpId);
1354          }catch (IOException ex) {
1355            throw new RuntimeException(ex);
1356          }
1357        });
1358        s.shutdown();
1359        try {
1360          // wait for the flush, the thread will be blocked in HStore#notifyChangedReadersObservers.
1361          s.awaitTermination(3, TimeUnit.SECONDS);
1362        } catch (InterruptedException ex) {
1363        }
1364      }
1365    });
1366    byte[] oldValue = Bytes.toBytes("oldValue");
1367    byte[] currentValue = Bytes.toBytes("currentValue");
1368    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1369    long ts = EnvironmentEdgeManager.currentTime();
1370    long seqId = 100;
1371    // older data whihc shouldn't be "seen" by client
1372    myStore.add(createCell(qf1, ts, seqId, oldValue), memStoreSizing);
1373    myStore.add(createCell(qf2, ts, seqId, oldValue), memStoreSizing);
1374    myStore.add(createCell(qf3, ts, seqId, oldValue), memStoreSizing);
1375    long snapshotId = id++;
1376    // push older data into snapshot -- phase (1/4)
1377    StoreFlushContext storeFlushCtx = store.createFlushContext(snapshotId, FlushLifeCycleTracker
1378        .DUMMY);
1379    storeFlushCtx.prepare();
1380
1381    // insert current data into active -- phase (2/4)
1382    myStore.add(createCell(qf1, ts + 1, seqId + 1, currentValue), memStoreSizing);
1383    myStore.add(createCell(qf2, ts + 1, seqId + 1, currentValue), memStoreSizing);
1384    myStore.add(createCell(qf3, ts + 1, seqId + 1, currentValue), memStoreSizing);
1385    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1386    quals.add(qf1);
1387    quals.add(qf2);
1388    quals.add(qf3);
1389    try (InternalScanner scanner = (InternalScanner) myStore.getScanner(
1390        new Scan(new Get(row)), quals, seqId + 1)) {
1391      // complete the flush -- phase (3/4)
1392      storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1393      storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1394
1395      List<Cell> results = new ArrayList<>();
1396      scanner.next(results);
1397      assertEquals(3, results.size());
1398      for (Cell c : results) {
1399        byte[] actualValue = CellUtil.cloneValue(c);
1400        assertTrue("expected:" + Bytes.toStringBinary(currentValue)
1401          + ", actual:" + Bytes.toStringBinary(actualValue)
1402          , Bytes.equals(actualValue, currentValue));
1403      }
1404    }
1405  }
1406
1407  @Test
1408  public void testReclaimChunkWhenScaning() throws IOException {
1409    init("testReclaimChunkWhenScaning");
1410    long ts = EnvironmentEdgeManager.currentTime();
1411    long seqId = 100;
1412    byte[] value = Bytes.toBytes("value");
1413    // older data whihc shouldn't be "seen" by client
1414    store.add(createCell(qf1, ts, seqId, value), null);
1415    store.add(createCell(qf2, ts, seqId, value), null);
1416    store.add(createCell(qf3, ts, seqId, value), null);
1417    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1418    quals.add(qf1);
1419    quals.add(qf2);
1420    quals.add(qf3);
1421    try (InternalScanner scanner = (InternalScanner) store.getScanner(
1422        new Scan(new Get(row)), quals, seqId)) {
1423      List<Cell> results = new MyList<>(size -> {
1424        switch (size) {
1425          // 1) we get the first cell (qf1)
1426          // 2) flush the data to have StoreScanner update inner scanners
1427          // 3) the chunk will be reclaimed after updaing
1428          case 1:
1429            try {
1430              flushStore(store, id++);
1431            } catch (IOException e) {
1432              throw new RuntimeException(e);
1433            }
1434            break;
1435          // 1) we get the second cell (qf2)
1436          // 2) add some cell to fill some byte into the chunk (we have only one chunk)
1437          case 2:
1438            try {
1439              byte[] newValue = Bytes.toBytes("newValue");
1440              // older data whihc shouldn't be "seen" by client
1441              store.add(createCell(qf1, ts + 1, seqId + 1, newValue), null);
1442              store.add(createCell(qf2, ts + 1, seqId + 1, newValue), null);
1443              store.add(createCell(qf3, ts + 1, seqId + 1, newValue), null);
1444            } catch (IOException e) {
1445              throw new RuntimeException(e);
1446            }
1447            break;
1448          default:
1449            break;
1450        }
1451      });
1452      scanner.next(results);
1453      assertEquals(3, results.size());
1454      for (Cell c : results) {
1455        byte[] actualValue = CellUtil.cloneValue(c);
1456        assertTrue("expected:" + Bytes.toStringBinary(value)
1457          + ", actual:" + Bytes.toStringBinary(actualValue)
1458          , Bytes.equals(actualValue, value));
1459      }
1460    }
1461  }
1462
1463  /**
1464   * If there are two running InMemoryFlushRunnable, the later InMemoryFlushRunnable
1465   * may change the versionedList. And the first InMemoryFlushRunnable will use the chagned
1466   * versionedList to remove the corresponding segments.
1467   * In short, there will be some segements which isn't in merge are removed.
1468   * @throws IOException
1469   * @throws InterruptedException
1470   */
1471  @Test
1472  public void testRunDoubleMemStoreCompactors() throws IOException, InterruptedException {
1473    int flushSize = 500;
1474    Configuration conf = HBaseConfiguration.create();
1475    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStoreWithCustomCompactor.class.getName());
1476    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.25);
1477    MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.set(0);
1478    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushSize));
1479    // Set the lower threshold to invoke the "MERGE" policy
1480    conf.set(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, String.valueOf(0));
1481    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
1482        .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
1483    byte[] value = Bytes.toBytes("thisisavarylargevalue");
1484    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1485    long ts = EnvironmentEdgeManager.currentTime();
1486    long seqId = 100;
1487    // older data whihc shouldn't be "seen" by client
1488    store.add(createCell(qf1, ts, seqId, value), memStoreSizing);
1489    store.add(createCell(qf2, ts, seqId, value), memStoreSizing);
1490    store.add(createCell(qf3, ts, seqId, value), memStoreSizing);
1491    assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
1492    StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY);
1493    storeFlushCtx.prepare();
1494    // This shouldn't invoke another in-memory flush because the first compactor thread
1495    // hasn't accomplished the in-memory compaction.
1496    store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing);
1497    store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing);
1498    store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing);
1499    assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
1500    //okay. Let the compaction be completed
1501    MyMemStoreCompactor.START_COMPACTOR_LATCH.countDown();
1502    CompactingMemStore mem = (CompactingMemStore) ((HStore)store).memstore;
1503    while (mem.isMemStoreFlushingInMemory()) {
1504      TimeUnit.SECONDS.sleep(1);
1505    }
1506    // This should invoke another in-memory flush.
1507    store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing);
1508    store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing);
1509    store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing);
1510    assertEquals(2, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
1511    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
1512      String.valueOf(TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE));
1513    storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1514    storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1515  }
1516
1517  @Test
1518  public void testAge() throws IOException {
1519    long currentTime = System.currentTimeMillis();
1520    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
1521    edge.setValue(currentTime);
1522    EnvironmentEdgeManager.injectEdge(edge);
1523    Configuration conf = TEST_UTIL.getConfiguration();
1524    ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(family);
1525    initHRegion(name.getMethodName(), conf,
1526      TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), hcd, null, false);
1527    HStore store = new HStore(region, hcd, conf) {
1528
1529      @Override
1530      protected StoreEngine<?, ?, ?, ?> createStoreEngine(HStore store, Configuration conf,
1531          CellComparator kvComparator) throws IOException {
1532        List<HStoreFile> storefiles =
1533            Arrays.asList(mockStoreFile(currentTime - 10), mockStoreFile(currentTime - 100),
1534              mockStoreFile(currentTime - 1000), mockStoreFile(currentTime - 10000));
1535        StoreFileManager sfm = mock(StoreFileManager.class);
1536        when(sfm.getStorefiles()).thenReturn(storefiles);
1537        StoreEngine<?, ?, ?, ?> storeEngine = mock(StoreEngine.class);
1538        when(storeEngine.getStoreFileManager()).thenReturn(sfm);
1539        return storeEngine;
1540      }
1541    };
1542    assertEquals(10L, store.getMinStoreFileAge().getAsLong());
1543    assertEquals(10000L, store.getMaxStoreFileAge().getAsLong());
1544    assertEquals((10 + 100 + 1000 + 10000) / 4.0, store.getAvgStoreFileAge().getAsDouble(), 1E-4);
1545  }
1546
1547  private HStoreFile mockStoreFile(long createdTime) {
1548    StoreFileInfo info = mock(StoreFileInfo.class);
1549    when(info.getCreatedTimestamp()).thenReturn(createdTime);
1550    HStoreFile sf = mock(HStoreFile.class);
1551    when(sf.getReader()).thenReturn(mock(StoreFileReader.class));
1552    when(sf.isHFile()).thenReturn(true);
1553    when(sf.getFileInfo()).thenReturn(info);
1554    return sf;
1555  }
1556
1557  private MyStore initMyStore(String methodName, Configuration conf, MyStoreHook hook)
1558      throws IOException {
1559    return (MyStore) init(methodName, conf,
1560      TableDescriptorBuilder.newBuilder(TableName.valueOf(table)),
1561      ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(5).build(), hook);
1562  }
1563
1564  private static class MyStore extends HStore {
1565    private final MyStoreHook hook;
1566
1567    MyStore(final HRegion region, final ColumnFamilyDescriptor family, final Configuration
1568        confParam, MyStoreHook hook, boolean switchToPread) throws IOException {
1569      super(region, family, confParam);
1570      this.hook = hook;
1571    }
1572
1573    @Override
1574    public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks,
1575        boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,
1576        boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt,
1577        boolean includeMemstoreScanner) throws IOException {
1578      hook.getScanners(this);
1579      return super.getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true,
1580        stopRow, false, readPt, includeMemstoreScanner);
1581    }
1582
1583    @Override
1584    public long getSmallestReadPoint() {
1585      return hook.getSmallestReadPoint(this);
1586    }
1587  }
1588
1589  private abstract static class MyStoreHook {
1590
1591    void getScanners(MyStore store) throws IOException {
1592    }
1593
1594    long getSmallestReadPoint(HStore store) {
1595      return store.getHRegion().getSmallestReadPoint();
1596    }
1597  }
1598
1599  @Test
1600  public void testSwitchingPreadtoStreamParallelyWithCompactionDischarger() throws Exception {
1601    Configuration conf = HBaseConfiguration.create();
1602    conf.set("hbase.hstore.engine.class", DummyStoreEngine.class.getName());
1603    conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 0);
1604    // Set the lower threshold to invoke the "MERGE" policy
1605    MyStore store = initMyStore(name.getMethodName(), conf, new MyStoreHook() {});
1606    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1607    long ts = System.currentTimeMillis();
1608    long seqID = 1L;
1609    // Add some data to the region and do some flushes
1610    for (int i = 1; i < 10; i++) {
1611      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1612        memStoreSizing);
1613    }
1614    // flush them
1615    flushStore(store, seqID);
1616    for (int i = 11; i < 20; i++) {
1617      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1618        memStoreSizing);
1619    }
1620    // flush them
1621    flushStore(store, seqID);
1622    for (int i = 21; i < 30; i++) {
1623      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1624        memStoreSizing);
1625    }
1626    // flush them
1627    flushStore(store, seqID);
1628
1629    assertEquals(3, store.getStorefilesCount());
1630    Scan scan = new Scan();
1631    scan.addFamily(family);
1632    Collection<HStoreFile> storefiles2 = store.getStorefiles();
1633    ArrayList<HStoreFile> actualStorefiles = Lists.newArrayList(storefiles2);
1634    StoreScanner storeScanner =
1635        (StoreScanner) store.getScanner(scan, scan.getFamilyMap().get(family), Long.MAX_VALUE);
1636    // get the current heap
1637    KeyValueHeap heap = storeScanner.heap;
1638    // create more store files
1639    for (int i = 31; i < 40; i++) {
1640      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1641        memStoreSizing);
1642    }
1643    // flush them
1644    flushStore(store, seqID);
1645
1646    for (int i = 41; i < 50; i++) {
1647      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1648        memStoreSizing);
1649    }
1650    // flush them
1651    flushStore(store, seqID);
1652    storefiles2 = store.getStorefiles();
1653    ArrayList<HStoreFile> actualStorefiles1 = Lists.newArrayList(storefiles2);
1654    actualStorefiles1.removeAll(actualStorefiles);
1655    // Do compaction
1656    MyThread thread = new MyThread(storeScanner);
1657    thread.start();
1658    store.replaceStoreFiles(actualStorefiles, actualStorefiles1);
1659    thread.join();
1660    KeyValueHeap heap2 = thread.getHeap();
1661    assertFalse(heap.equals(heap2));
1662  }
1663
1664  private static class MyThread extends Thread {
1665    private StoreScanner scanner;
1666    private KeyValueHeap heap;
1667
1668    public MyThread(StoreScanner scanner) {
1669      this.scanner = scanner;
1670    }
1671
1672    public KeyValueHeap getHeap() {
1673      return this.heap;
1674    }
1675
1676    @Override
1677    public void run() {
1678      scanner.trySwitchToStreamRead();
1679      heap = scanner.heap;
1680    }
1681  }
1682
1683  private static class MyMemStoreCompactor extends MemStoreCompactor {
1684    private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0);
1685    private static final CountDownLatch START_COMPACTOR_LATCH = new CountDownLatch(1);
1686    public MyMemStoreCompactor(CompactingMemStore compactingMemStore, MemoryCompactionPolicy
1687        compactionPolicy) throws IllegalArgumentIOException {
1688      super(compactingMemStore, compactionPolicy);
1689    }
1690
1691    @Override
1692    public boolean start() throws IOException {
1693      boolean isFirst = RUNNER_COUNT.getAndIncrement() == 0;
1694      boolean rval = super.start();
1695      if (isFirst) {
1696        try {
1697          START_COMPACTOR_LATCH.await();
1698        } catch (InterruptedException ex) {
1699          throw new RuntimeException(ex);
1700        }
1701      }
1702      return rval;
1703    }
1704  }
1705
1706  public static class MyCompactingMemStoreWithCustomCompactor extends CompactingMemStore {
1707    private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0);
1708    public MyCompactingMemStoreWithCustomCompactor(Configuration conf, CellComparatorImpl c,
1709        HStore store, RegionServicesForStores regionServices,
1710        MemoryCompactionPolicy compactionPolicy) throws IOException {
1711      super(conf, c, store, regionServices, compactionPolicy);
1712    }
1713
1714    @Override
1715    protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy)
1716        throws IllegalArgumentIOException {
1717      return new MyMemStoreCompactor(this, compactionPolicy);
1718    }
1719
1720    @Override
1721    protected boolean shouldFlushInMemory() {
1722      boolean rval = super.shouldFlushInMemory();
1723      if (rval) {
1724        RUNNER_COUNT.incrementAndGet();
1725        if (LOG.isDebugEnabled()) {
1726          LOG.debug("runner count: " + RUNNER_COUNT.get());
1727        }
1728      }
1729      return rval;
1730    }
1731  }
1732
1733  public static class MyCompactingMemStore extends CompactingMemStore {
1734    private static final AtomicBoolean START_TEST = new AtomicBoolean(false);
1735    private final CountDownLatch getScannerLatch = new CountDownLatch(1);
1736    private final CountDownLatch snapshotLatch = new CountDownLatch(1);
1737    public MyCompactingMemStore(Configuration conf, CellComparatorImpl c,
1738        HStore store, RegionServicesForStores regionServices,
1739        MemoryCompactionPolicy compactionPolicy) throws IOException {
1740      super(conf, c, store, regionServices, compactionPolicy);
1741    }
1742
1743    @Override
1744    protected List<KeyValueScanner> createList(int capacity) {
1745      if (START_TEST.get()) {
1746        try {
1747          getScannerLatch.countDown();
1748          snapshotLatch.await();
1749        } catch (InterruptedException e) {
1750          throw new RuntimeException(e);
1751        }
1752      }
1753      return new ArrayList<>(capacity);
1754    }
1755    @Override
1756    protected void pushActiveToPipeline(MutableSegment active) {
1757      if (START_TEST.get()) {
1758        try {
1759          getScannerLatch.await();
1760        } catch (InterruptedException e) {
1761          throw new RuntimeException(e);
1762        }
1763      }
1764
1765      super.pushActiveToPipeline(active);
1766      if (START_TEST.get()) {
1767        snapshotLatch.countDown();
1768      }
1769    }
1770  }
1771
1772  interface MyListHook {
1773    void hook(int currentSize);
1774  }
1775
1776  private static class MyList<T> implements List<T> {
1777    private final List<T> delegatee = new ArrayList<>();
1778    private final MyListHook hookAtAdd;
1779    MyList(final MyListHook hookAtAdd) {
1780      this.hookAtAdd = hookAtAdd;
1781    }
1782    @Override
1783    public int size() {return delegatee.size();}
1784
1785    @Override
1786    public boolean isEmpty() {return delegatee.isEmpty();}
1787
1788    @Override
1789    public boolean contains(Object o) {return delegatee.contains(o);}
1790
1791    @Override
1792    public Iterator<T> iterator() {return delegatee.iterator();}
1793
1794    @Override
1795    public Object[] toArray() {return delegatee.toArray();}
1796
1797    @Override
1798    public <R> R[] toArray(R[] a) {return delegatee.toArray(a);}
1799
1800    @Override
1801    public boolean add(T e) {
1802      hookAtAdd.hook(size());
1803      return delegatee.add(e);
1804    }
1805
1806    @Override
1807    public boolean remove(Object o) {return delegatee.remove(o);}
1808
1809    @Override
1810    public boolean containsAll(Collection<?> c) {return delegatee.containsAll(c);}
1811
1812    @Override
1813    public boolean addAll(Collection<? extends T> c) {return delegatee.addAll(c);}
1814
1815    @Override
1816    public boolean addAll(int index, Collection<? extends T> c) {return delegatee.addAll(index, c);}
1817
1818    @Override
1819    public boolean removeAll(Collection<?> c) {return delegatee.removeAll(c);}
1820
1821    @Override
1822    public boolean retainAll(Collection<?> c) {return delegatee.retainAll(c);}
1823
1824    @Override
1825    public void clear() {delegatee.clear();}
1826
1827    @Override
1828    public T get(int index) {return delegatee.get(index);}
1829
1830    @Override
1831    public T set(int index, T element) {return delegatee.set(index, element);}
1832
1833    @Override
1834    public void add(int index, T element) {delegatee.add(index, element);}
1835
1836    @Override
1837    public T remove(int index) {return delegatee.remove(index);}
1838
1839    @Override
1840    public int indexOf(Object o) {return delegatee.indexOf(o);}
1841
1842    @Override
1843    public int lastIndexOf(Object o) {return delegatee.lastIndexOf(o);}
1844
1845    @Override
1846    public ListIterator<T> listIterator() {return delegatee.listIterator();}
1847
1848    @Override
1849    public ListIterator<T> listIterator(int index) {return delegatee.listIterator(index);}
1850
1851    @Override
1852    public List<T> subList(int fromIndex, int toIndex) {return delegatee.subList(fromIndex, toIndex);}
1853  }
1854}