001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertNull;
024import static org.junit.Assert.assertTrue;
025import static org.junit.Assert.fail;
026import static org.mockito.ArgumentMatchers.any;
027import static org.mockito.Mockito.mock;
028import static org.mockito.Mockito.spy;
029import static org.mockito.Mockito.times;
030import static org.mockito.Mockito.verify;
031import static org.mockito.Mockito.when;
032
033import java.io.IOException;
034import java.lang.ref.SoftReference;
035import java.security.PrivilegedExceptionAction;
036import java.util.ArrayList;
037import java.util.Arrays;
038import java.util.Collection;
039import java.util.Collections;
040import java.util.Iterator;
041import java.util.List;
042import java.util.ListIterator;
043import java.util.NavigableSet;
044import java.util.TreeSet;
045import java.util.concurrent.ConcurrentSkipListSet;
046import java.util.concurrent.CountDownLatch;
047import java.util.concurrent.CyclicBarrier;
048import java.util.concurrent.ExecutorService;
049import java.util.concurrent.Executors;
050import java.util.concurrent.ThreadPoolExecutor;
051import java.util.concurrent.TimeUnit;
052import java.util.concurrent.atomic.AtomicBoolean;
053import java.util.concurrent.atomic.AtomicInteger;
054import java.util.concurrent.atomic.AtomicLong;
055import java.util.concurrent.atomic.AtomicReference;
056import java.util.function.IntBinaryOperator;
057
058import org.apache.hadoop.conf.Configuration;
059import org.apache.hadoop.fs.FSDataOutputStream;
060import org.apache.hadoop.fs.FileStatus;
061import org.apache.hadoop.fs.FileSystem;
062import org.apache.hadoop.fs.FilterFileSystem;
063import org.apache.hadoop.fs.LocalFileSystem;
064import org.apache.hadoop.fs.Path;
065import org.apache.hadoop.fs.permission.FsPermission;
066import org.apache.hadoop.hbase.Cell;
067import org.apache.hadoop.hbase.CellBuilderFactory;
068import org.apache.hadoop.hbase.CellBuilderType;
069import org.apache.hadoop.hbase.CellComparator;
070import org.apache.hadoop.hbase.CellComparatorImpl;
071import org.apache.hadoop.hbase.CellUtil;
072import org.apache.hadoop.hbase.HBaseClassTestRule;
073import org.apache.hadoop.hbase.HBaseConfiguration;
074import org.apache.hadoop.hbase.HBaseTestingUtil;
075import org.apache.hadoop.hbase.HConstants;
076import org.apache.hadoop.hbase.KeyValue;
077import org.apache.hadoop.hbase.MemoryCompactionPolicy;
078import org.apache.hadoop.hbase.PrivateCellUtil;
079import org.apache.hadoop.hbase.TableName;
080import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
081import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
082import org.apache.hadoop.hbase.client.Get;
083import org.apache.hadoop.hbase.client.RegionInfo;
084import org.apache.hadoop.hbase.client.RegionInfoBuilder;
085import org.apache.hadoop.hbase.client.Scan;
086import org.apache.hadoop.hbase.client.Scan.ReadType;
087import org.apache.hadoop.hbase.client.TableDescriptor;
088import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
089import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
090import org.apache.hadoop.hbase.filter.Filter;
091import org.apache.hadoop.hbase.filter.FilterBase;
092import org.apache.hadoop.hbase.io.compress.Compression;
093import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
094import org.apache.hadoop.hbase.io.hfile.CacheConfig;
095import org.apache.hadoop.hbase.io.hfile.HFile;
096import org.apache.hadoop.hbase.io.hfile.HFileContext;
097import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
098import org.apache.hadoop.hbase.monitoring.MonitoredTask;
099import org.apache.hadoop.hbase.quotas.RegionSizeStoreImpl;
100import org.apache.hadoop.hbase.regionserver.MemStoreCompactionStrategy.Action;
101import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
102import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
103import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
104import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
105import org.apache.hadoop.hbase.security.User;
106import org.apache.hadoop.hbase.testclassification.MediumTests;
107import org.apache.hadoop.hbase.testclassification.RegionServerTests;
108import org.apache.hadoop.hbase.util.Bytes;
109import org.apache.hadoop.hbase.util.CommonFSUtils;
110import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
111import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
112import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
113import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
114import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
115import org.apache.hadoop.hbase.wal.WALFactory;
116import org.apache.hadoop.util.Progressable;
117import org.junit.After;
118import org.junit.AfterClass;
119import org.junit.Before;
120import org.junit.ClassRule;
121import org.junit.Rule;
122import org.junit.Test;
123import org.junit.experimental.categories.Category;
124import org.junit.rules.TestName;
125import org.mockito.Mockito;
126import org.slf4j.Logger;
127import org.slf4j.LoggerFactory;
128
129import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
130
131/**
132 * Test class for the HStore
133 */
134@Category({ RegionServerTests.class, MediumTests.class })
135public class TestHStore {
136
137  @ClassRule
138  public static final HBaseClassTestRule CLASS_RULE =
139      HBaseClassTestRule.forClass(TestHStore.class);
140
141  private static final Logger LOG = LoggerFactory.getLogger(TestHStore.class);
142  @Rule
143  public TestName name = new TestName();
144
145  HRegion region;
146  HStore store;
147  byte [] table = Bytes.toBytes("table");
148  byte [] family = Bytes.toBytes("family");
149
150  byte [] row = Bytes.toBytes("row");
151  byte [] row2 = Bytes.toBytes("row2");
152  byte [] qf1 = Bytes.toBytes("qf1");
153  byte [] qf2 = Bytes.toBytes("qf2");
154  byte [] qf3 = Bytes.toBytes("qf3");
155  byte [] qf4 = Bytes.toBytes("qf4");
156  byte [] qf5 = Bytes.toBytes("qf5");
157  byte [] qf6 = Bytes.toBytes("qf6");
158
159  NavigableSet<byte[]> qualifiers = new ConcurrentSkipListSet<>(Bytes.BYTES_COMPARATOR);
160
161  List<Cell> expected = new ArrayList<>();
162  List<Cell> result = new ArrayList<>();
163
164  long id = EnvironmentEdgeManager.currentTime();
165  Get get = new Get(row);
166
167  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
168  private static final String DIR = TEST_UTIL.getDataTestDir("TestStore").toString();
169
170  @Before
171  public void setUp() throws IOException {
172    qualifiers.clear();
173    qualifiers.add(qf1);
174    qualifiers.add(qf3);
175    qualifiers.add(qf5);
176
177    Iterator<byte[]> iter = qualifiers.iterator();
178    while(iter.hasNext()){
179      byte [] next = iter.next();
180      expected.add(new KeyValue(row, family, next, 1, (byte[])null));
181      get.addColumn(family, next);
182    }
183  }
184
185  private void init(String methodName) throws IOException {
186    init(methodName, TEST_UTIL.getConfiguration());
187  }
188
189  private HStore init(String methodName, Configuration conf) throws IOException {
190    // some of the tests write 4 versions and then flush
191    // (with HBASE-4241, lower versions are collected on flush)
192    return init(methodName, conf,
193      ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(4).build());
194  }
195
196  private HStore init(String methodName, Configuration conf, ColumnFamilyDescriptor hcd)
197      throws IOException {
198    return init(methodName, conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), hcd);
199  }
200
201  private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder,
202      ColumnFamilyDescriptor hcd) throws IOException {
203    return init(methodName, conf, builder, hcd, null);
204  }
205
206  private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder,
207      ColumnFamilyDescriptor hcd, MyStoreHook hook) throws IOException {
208    return init(methodName, conf, builder, hcd, hook, false);
209  }
210
211  private void initHRegion(String methodName, Configuration conf, TableDescriptorBuilder builder,
212      ColumnFamilyDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException {
213    TableDescriptor htd = builder.setColumnFamily(hcd).build();
214    Path basedir = new Path(DIR + methodName);
215    Path tableDir = CommonFSUtils.getTableDir(basedir, htd.getTableName());
216    final Path logdir = new Path(basedir, AbstractFSWALProvider.getWALDirectoryName(methodName));
217
218    FileSystem fs = FileSystem.get(conf);
219
220    fs.delete(logdir, true);
221    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false,
222      MemStoreLABImpl.CHUNK_SIZE_DEFAULT, 1, 0,
223      null, MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
224    RegionInfo info = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
225    Configuration walConf = new Configuration(conf);
226    CommonFSUtils.setRootDir(walConf, basedir);
227    WALFactory wals = new WALFactory(walConf, methodName);
228    region = new HRegion(new HRegionFileSystem(conf, fs, tableDir, info), wals.getWAL(info), conf,
229        htd, null);
230    region.regionServicesForStores = Mockito.spy(region.regionServicesForStores);
231    ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
232    Mockito.when(region.regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool);
233  }
234
235  private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder,
236      ColumnFamilyDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException {
237    initHRegion(methodName, conf, builder, hcd, hook, switchToPread);
238    if (hook == null) {
239      store = new HStore(region, hcd, conf, false);
240    } else {
241      store = new MyStore(region, hcd, conf, hook, switchToPread);
242    }
243    region.stores.put(store.getColumnFamilyDescriptor().getName(), store);
244    return store;
245  }
246
247  /**
248   * Test we do not lose data if we fail a flush and then close.
249   * Part of HBase-10466
250   */
251  @Test
252  public void testFlushSizeSizing() throws Exception {
253    LOG.info("Setting up a faulty file system that cannot write in " + this.name.getMethodName());
254    final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
255    // Only retry once.
256    conf.setInt("hbase.hstore.flush.retries.number", 1);
257    User user = User.createUserForTesting(conf, this.name.getMethodName(),
258      new String[]{"foo"});
259    // Inject our faulty LocalFileSystem
260    conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
261    user.runAs(new PrivilegedExceptionAction<Object>() {
262      @Override
263      public Object run() throws Exception {
264        // Make sure it worked (above is sensitive to caching details in hadoop core)
265        FileSystem fs = FileSystem.get(conf);
266        assertEquals(FaultyFileSystem.class, fs.getClass());
267        FaultyFileSystem ffs = (FaultyFileSystem)fs;
268
269        // Initialize region
270        init(name.getMethodName(), conf);
271
272        MemStoreSize mss = store.memstore.getFlushableSize();
273        assertEquals(0, mss.getDataSize());
274        LOG.info("Adding some data");
275        MemStoreSizing kvSize = new NonThreadSafeMemStoreSizing();
276        store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), kvSize);
277        // add the heap size of active (mutable) segment
278        kvSize.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0, 0);
279        mss = store.memstore.getFlushableSize();
280        assertEquals(kvSize.getMemStoreSize(), mss);
281        // Flush.  Bug #1 from HBASE-10466.  Make sure size calculation on failed flush is right.
282        try {
283          LOG.info("Flushing");
284          flushStore(store, id++);
285          fail("Didn't bubble up IOE!");
286        } catch (IOException ioe) {
287          assertTrue(ioe.getMessage().contains("Fault injected"));
288        }
289        // due to snapshot, change mutable to immutable segment
290        kvSize.incMemStoreSize(0,
291          CSLMImmutableSegment.DEEP_OVERHEAD_CSLM - MutableSegment.DEEP_OVERHEAD, 0, 0);
292        mss = store.memstore.getFlushableSize();
293        assertEquals(kvSize.getMemStoreSize(), mss);
294        MemStoreSizing kvSize2 = new NonThreadSafeMemStoreSizing();
295        store.add(new KeyValue(row, family, qf2, 2, (byte[]) null), kvSize2);
296        kvSize2.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0, 0);
297        // Even though we add a new kv, we expect the flushable size to be 'same' since we have
298        // not yet cleared the snapshot -- the above flush failed.
299        assertEquals(kvSize.getMemStoreSize(), mss);
300        ffs.fault.set(false);
301        flushStore(store, id++);
302        mss = store.memstore.getFlushableSize();
303        // Size should be the foreground kv size.
304        assertEquals(kvSize2.getMemStoreSize(), mss);
305        flushStore(store, id++);
306        mss = store.memstore.getFlushableSize();
307        assertEquals(0, mss.getDataSize());
308        assertEquals(MutableSegment.DEEP_OVERHEAD, mss.getHeapSize());
309        return null;
310      }
311    });
312  }
313
314  /**
315   * Verify that compression and data block encoding are respected by the
316   * Store.createWriterInTmp() method, used on store flush.
317   */
318  @Test
319  public void testCreateWriter() throws Exception {
320    Configuration conf = HBaseConfiguration.create();
321    FileSystem fs = FileSystem.get(conf);
322
323    ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.newBuilder(family)
324        .setCompressionType(Compression.Algorithm.GZ).setDataBlockEncoding(DataBlockEncoding.DIFF)
325        .build();
326    init(name.getMethodName(), conf, hcd);
327
328    // Test createWriterInTmp()
329    StoreFileWriter writer =
330        store.createWriterInTmp(4, hcd.getCompressionType(), false, true, false, false);
331    Path path = writer.getPath();
332    writer.append(new KeyValue(row, family, qf1, Bytes.toBytes(1)));
333    writer.append(new KeyValue(row, family, qf2, Bytes.toBytes(2)));
334    writer.append(new KeyValue(row2, family, qf1, Bytes.toBytes(3)));
335    writer.append(new KeyValue(row2, family, qf2, Bytes.toBytes(4)));
336    writer.close();
337
338    // Verify that compression and encoding settings are respected
339    HFile.Reader reader = HFile.createReader(fs, path, new CacheConfig(conf), true, conf);
340    assertEquals(hcd.getCompressionType(), reader.getTrailer().getCompressionCodec());
341    assertEquals(hcd.getDataBlockEncoding(), reader.getDataBlockEncoding());
342    reader.close();
343  }
344
345  @Test
346  public void testDeleteExpiredStoreFiles() throws Exception {
347    testDeleteExpiredStoreFiles(0);
348    testDeleteExpiredStoreFiles(1);
349  }
350
351  /**
352   * @param minVersions the MIN_VERSIONS for the column family
353   */
354  public void testDeleteExpiredStoreFiles(int minVersions) throws Exception {
355    int storeFileNum = 4;
356    int ttl = 4;
357    IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
358    EnvironmentEdgeManagerTestHelper.injectEdge(edge);
359
360    Configuration conf = HBaseConfiguration.create();
361    // Enable the expired store file deletion
362    conf.setBoolean("hbase.store.delete.expired.storefile", true);
363    // Set the compaction threshold higher to avoid normal compactions.
364    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 5);
365
366    init(name.getMethodName() + "-" + minVersions, conf, ColumnFamilyDescriptorBuilder
367        .newBuilder(family).setMinVersions(minVersions).setTimeToLive(ttl).build());
368
369    long storeTtl = this.store.getScanInfo().getTtl();
370    long sleepTime = storeTtl / storeFileNum;
371    long timeStamp;
372    // There are 4 store files and the max time stamp difference among these
373    // store files will be (this.store.ttl / storeFileNum)
374    for (int i = 1; i <= storeFileNum; i++) {
375      LOG.info("Adding some data for the store file #" + i);
376      timeStamp = EnvironmentEdgeManager.currentTime();
377      this.store.add(new KeyValue(row, family, qf1, timeStamp, (byte[]) null), null);
378      this.store.add(new KeyValue(row, family, qf2, timeStamp, (byte[]) null), null);
379      this.store.add(new KeyValue(row, family, qf3, timeStamp, (byte[]) null), null);
380      flush(i);
381      edge.incrementTime(sleepTime);
382    }
383
384    // Verify the total number of store files
385    assertEquals(storeFileNum, this.store.getStorefiles().size());
386
387     // Each call will find one expired store file and delete it before compaction happens.
388     // There will be no compaction due to threshold above. Last file will not be replaced.
389    for (int i = 1; i <= storeFileNum - 1; i++) {
390      // verify the expired store file.
391      assertFalse(this.store.requestCompaction().isPresent());
392      Collection<HStoreFile> sfs = this.store.getStorefiles();
393      // Ensure i files are gone.
394      if (minVersions == 0) {
395        assertEquals(storeFileNum - i, sfs.size());
396        // Ensure only non-expired files remain.
397        for (HStoreFile sf : sfs) {
398          assertTrue(sf.getReader().getMaxTimestamp() >= (edge.currentTime() - storeTtl));
399        }
400      } else {
401        assertEquals(storeFileNum, sfs.size());
402      }
403      // Let the next store file expired.
404      edge.incrementTime(sleepTime);
405    }
406    assertFalse(this.store.requestCompaction().isPresent());
407
408    Collection<HStoreFile> sfs = this.store.getStorefiles();
409    // Assert the last expired file is not removed.
410    if (minVersions == 0) {
411      assertEquals(1, sfs.size());
412    }
413    long ts = sfs.iterator().next().getReader().getMaxTimestamp();
414    assertTrue(ts < (edge.currentTime() - storeTtl));
415
416    for (HStoreFile sf : sfs) {
417      sf.closeStoreFile(true);
418    }
419  }
420
421  @Test
422  public void testLowestModificationTime() throws Exception {
423    Configuration conf = HBaseConfiguration.create();
424    FileSystem fs = FileSystem.get(conf);
425    // Initialize region
426    init(name.getMethodName(), conf);
427
428    int storeFileNum = 4;
429    for (int i = 1; i <= storeFileNum; i++) {
430      LOG.info("Adding some data for the store file #"+i);
431      this.store.add(new KeyValue(row, family, qf1, i, (byte[])null), null);
432      this.store.add(new KeyValue(row, family, qf2, i, (byte[])null), null);
433      this.store.add(new KeyValue(row, family, qf3, i, (byte[])null), null);
434      flush(i);
435    }
436    // after flush; check the lowest time stamp
437    long lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
438    long lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
439    assertEquals(lowestTimeStampFromManager,lowestTimeStampFromFS);
440
441    // after compact; check the lowest time stamp
442    store.compact(store.requestCompaction().get(), NoLimitThroughputController.INSTANCE, null);
443    lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
444    lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
445    assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS);
446  }
447
448  private static long getLowestTimeStampFromFS(FileSystem fs,
449      final Collection<HStoreFile> candidates) throws IOException {
450    long minTs = Long.MAX_VALUE;
451    if (candidates.isEmpty()) {
452      return minTs;
453    }
454    Path[] p = new Path[candidates.size()];
455    int i = 0;
456    for (HStoreFile sf : candidates) {
457      p[i] = sf.getPath();
458      ++i;
459    }
460
461    FileStatus[] stats = fs.listStatus(p);
462    if (stats == null || stats.length == 0) {
463      return minTs;
464    }
465    for (FileStatus s : stats) {
466      minTs = Math.min(minTs, s.getModificationTime());
467    }
468    return minTs;
469  }
470
471  //////////////////////////////////////////////////////////////////////////////
472  // Get tests
473  //////////////////////////////////////////////////////////////////////////////
474
475  private static final int BLOCKSIZE_SMALL = 8192;
476
477  /**
478   * Test for hbase-1686.
479   */
480  @Test
481  public void testEmptyStoreFile() throws IOException {
482    init(this.name.getMethodName());
483    // Write a store file.
484    this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
485    this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null);
486    flush(1);
487    // Now put in place an empty store file.  Its a little tricky.  Have to
488    // do manually with hacked in sequence id.
489    HStoreFile f = this.store.getStorefiles().iterator().next();
490    Path storedir = f.getPath().getParent();
491    long seqid = f.getMaxSequenceId();
492    Configuration c = HBaseConfiguration.create();
493    FileSystem fs = FileSystem.get(c);
494    HFileContext meta = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build();
495    StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c),
496        fs)
497            .withOutputDir(storedir)
498            .withFileContext(meta)
499            .build();
500    w.appendMetadata(seqid + 1, false);
501    w.close();
502    this.store.close();
503    // Reopen it... should pick up two files
504    this.store =
505        new HStore(this.store.getHRegion(), this.store.getColumnFamilyDescriptor(), c, false);
506    assertEquals(2, this.store.getStorefilesCount());
507
508    result = HBaseTestingUtil.getFromStoreFile(store,
509        get.getRow(),
510        qualifiers);
511    assertEquals(1, result.size());
512  }
513
514  /**
515   * Getting data from memstore only
516   */
517  @Test
518  public void testGet_FromMemStoreOnly() throws IOException {
519    init(this.name.getMethodName());
520
521    //Put data in memstore
522    this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
523    this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null);
524    this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null), null);
525    this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null), null);
526    this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null), null);
527    this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null), null);
528
529    //Get
530    result = HBaseTestingUtil.getFromStoreFile(store,
531        get.getRow(), qualifiers);
532
533    //Compare
534    assertCheck();
535  }
536
537  @Test
538  public void testTimeRangeIfSomeCellsAreDroppedInFlush() throws IOException {
539    testTimeRangeIfSomeCellsAreDroppedInFlush(1);
540    testTimeRangeIfSomeCellsAreDroppedInFlush(3);
541    testTimeRangeIfSomeCellsAreDroppedInFlush(5);
542  }
543
544  private void testTimeRangeIfSomeCellsAreDroppedInFlush(int maxVersion) throws IOException {
545    init(this.name.getMethodName(), TEST_UTIL.getConfiguration(),
546    ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(maxVersion).build());
547    long currentTs = 100;
548    long minTs = currentTs;
549    // the extra cell won't be flushed to disk,
550    // so the min of timerange will be different between memStore and hfile.
551    for (int i = 0; i != (maxVersion + 1); ++i) {
552      this.store.add(new KeyValue(row, family, qf1, ++currentTs, (byte[])null), null);
553      if (i == 1) {
554        minTs = currentTs;
555      }
556    }
557    flushStore(store, id++);
558
559    Collection<HStoreFile> files = store.getStorefiles();
560    assertEquals(1, files.size());
561    HStoreFile f = files.iterator().next();
562    f.initReader();
563    StoreFileReader reader = f.getReader();
564    assertEquals(minTs, reader.timeRange.getMin());
565    assertEquals(currentTs, reader.timeRange.getMax());
566  }
567
568  /**
569   * Getting data from files only
570   */
571  @Test
572  public void testGet_FromFilesOnly() throws IOException {
573    init(this.name.getMethodName());
574
575    //Put data in memstore
576    this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
577    this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null);
578    //flush
579    flush(1);
580
581    //Add more data
582    this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null), null);
583    this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null), null);
584    //flush
585    flush(2);
586
587    //Add more data
588    this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null), null);
589    this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null), null);
590    //flush
591    flush(3);
592
593    //Get
594    result = HBaseTestingUtil.getFromStoreFile(store,
595        get.getRow(),
596        qualifiers);
597    //this.store.get(get, qualifiers, result);
598
599    //Need to sort the result since multiple files
600    Collections.sort(result, CellComparatorImpl.COMPARATOR);
601
602    //Compare
603    assertCheck();
604  }
605
606  /**
607   * Getting data from memstore and files
608   */
609  @Test
610  public void testGet_FromMemStoreAndFiles() throws IOException {
611    init(this.name.getMethodName());
612
613    //Put data in memstore
614    this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
615    this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null);
616    //flush
617    flush(1);
618
619    //Add more data
620    this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null), null);
621    this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null), null);
622    //flush
623    flush(2);
624
625    //Add more data
626    this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null), null);
627    this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null), null);
628
629    //Get
630    result = HBaseTestingUtil.getFromStoreFile(store,
631        get.getRow(), qualifiers);
632
633    //Need to sort the result since multiple files
634    Collections.sort(result, CellComparatorImpl.COMPARATOR);
635
636    //Compare
637    assertCheck();
638  }
639
640  private void flush(int storeFilessize) throws IOException {
641    flushStore(store, id++);
642    assertEquals(storeFilessize, this.store.getStorefiles().size());
643    assertEquals(0, ((AbstractMemStore)this.store.memstore).getActive().getCellsCount());
644  }
645
646  private void assertCheck() {
647    assertEquals(expected.size(), result.size());
648    for(int i=0; i<expected.size(); i++) {
649      assertEquals(expected.get(i), result.get(i));
650    }
651  }
652
653  @After
654  public void tearDown() throws Exception {
655    EnvironmentEdgeManagerTestHelper.reset();
656    if (store != null) {
657      try {
658        store.close();
659      } catch (IOException e) {
660      }
661      store = null;
662    }
663    if (region != null) {
664      region.close();
665      region = null;
666    }
667  }
668
669  @AfterClass
670  public static void tearDownAfterClass() throws IOException {
671    TEST_UTIL.cleanupTestDir();
672  }
673
674  @Test
675  public void testHandleErrorsInFlush() throws Exception {
676    LOG.info("Setting up a faulty file system that cannot write");
677
678    final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
679    User user = User.createUserForTesting(conf,
680        "testhandleerrorsinflush", new String[]{"foo"});
681    // Inject our faulty LocalFileSystem
682    conf.setClass("fs.file.impl", FaultyFileSystem.class,
683        FileSystem.class);
684    user.runAs(new PrivilegedExceptionAction<Object>() {
685      @Override
686      public Object run() throws Exception {
687        // Make sure it worked (above is sensitive to caching details in hadoop core)
688        FileSystem fs = FileSystem.get(conf);
689        assertEquals(FaultyFileSystem.class, fs.getClass());
690
691        // Initialize region
692        init(name.getMethodName(), conf);
693
694        LOG.info("Adding some data");
695        store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
696        store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null);
697        store.add(new KeyValue(row, family, qf3, 1, (byte[])null), null);
698
699        LOG.info("Before flush, we should have no files");
700
701        Collection<StoreFileInfo> files =
702          store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName());
703        assertEquals(0, files != null ? files.size() : 0);
704
705        //flush
706        try {
707          LOG.info("Flushing");
708          flush(1);
709          fail("Didn't bubble up IOE!");
710        } catch (IOException ioe) {
711          assertTrue(ioe.getMessage().contains("Fault injected"));
712        }
713
714        LOG.info("After failed flush, we should still have no files!");
715        files = store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName());
716        assertEquals(0, files != null ? files.size() : 0);
717        store.getHRegion().getWAL().close();
718        return null;
719      }
720    });
721    FileSystem.closeAllForUGI(user.getUGI());
722  }
723
724  /**
725   * Faulty file system that will fail if you write past its fault position the FIRST TIME
726   * only; thereafter it will succeed.  Used by {@link TestHRegion} too.
727   */
728  static class FaultyFileSystem extends FilterFileSystem {
729    List<SoftReference<FaultyOutputStream>> outStreams = new ArrayList<>();
730    private long faultPos = 200;
731    AtomicBoolean fault = new AtomicBoolean(true);
732
733    public FaultyFileSystem() {
734      super(new LocalFileSystem());
735      LOG.info("Creating faulty!");
736    }
737
738    @Override
739    public FSDataOutputStream create(Path p) throws IOException {
740      return new FaultyOutputStream(super.create(p), faultPos, fault);
741    }
742
743    @Override
744    public FSDataOutputStream create(Path f, FsPermission permission,
745        boolean overwrite, int bufferSize, short replication, long blockSize,
746        Progressable progress) throws IOException {
747      return new FaultyOutputStream(super.create(f, permission,
748          overwrite, bufferSize, replication, blockSize, progress), faultPos, fault);
749    }
750
751    @Override
752    public FSDataOutputStream createNonRecursive(Path f, boolean overwrite,
753        int bufferSize, short replication, long blockSize, Progressable progress)
754    throws IOException {
755      // Fake it.  Call create instead.  The default implementation throws an IOE
756      // that this is not supported.
757      return create(f, overwrite, bufferSize, replication, blockSize, progress);
758    }
759  }
760
761  static class FaultyOutputStream extends FSDataOutputStream {
762    volatile long faultPos = Long.MAX_VALUE;
763    private final AtomicBoolean fault;
764
765    public FaultyOutputStream(FSDataOutputStream out, long faultPos, final AtomicBoolean fault)
766    throws IOException {
767      super(out, null);
768      this.faultPos = faultPos;
769      this.fault = fault;
770    }
771
772    @Override
773    public synchronized void write(byte[] buf, int offset, int length) throws IOException {
774      LOG.info("faulty stream write at pos " + getPos());
775      injectFault();
776      super.write(buf, offset, length);
777    }
778
779    private void injectFault() throws IOException {
780      if (this.fault.get() && getPos() >= faultPos) {
781        throw new IOException("Fault injected");
782      }
783    }
784  }
785
786  private static void flushStore(HStore store, long id) throws IOException {
787    StoreFlushContext storeFlushCtx = store.createFlushContext(id, FlushLifeCycleTracker.DUMMY);
788    storeFlushCtx.prepare();
789    storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
790    storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
791  }
792
793  /**
794   * Generate a list of KeyValues for testing based on given parameters
795   * @return the rows key-value list
796   */
797  private List<Cell> getKeyValueSet(long[] timestamps, int numRows,
798      byte[] qualifier, byte[] family) {
799    List<Cell> kvList = new ArrayList<>();
800    for (int i=1;i<=numRows;i++) {
801      byte[] b = Bytes.toBytes(i);
802      for (long timestamp: timestamps) {
803        kvList.add(new KeyValue(b, family, qualifier, timestamp, b));
804      }
805    }
806    return kvList;
807  }
808
809  /**
810   * Test to ensure correctness when using Stores with multiple timestamps
811   */
812  @Test
813  public void testMultipleTimestamps() throws IOException {
814    int numRows = 1;
815    long[] timestamps1 = new long[] {1,5,10,20};
816    long[] timestamps2 = new long[] {30,80};
817
818    init(this.name.getMethodName());
819
820    List<Cell> kvList1 = getKeyValueSet(timestamps1,numRows, qf1, family);
821    for (Cell kv : kvList1) {
822      this.store.add(kv, null);
823    }
824
825    flushStore(store, id++);
826
827    List<Cell> kvList2 = getKeyValueSet(timestamps2,numRows, qf1, family);
828    for(Cell kv : kvList2) {
829      this.store.add(kv, null);
830    }
831
832    List<Cell> result;
833    Get get = new Get(Bytes.toBytes(1));
834    get.addColumn(family,qf1);
835
836    get.setTimeRange(0,15);
837    result = HBaseTestingUtil.getFromStoreFile(store, get);
838    assertTrue(result.size()>0);
839
840    get.setTimeRange(40,90);
841    result = HBaseTestingUtil.getFromStoreFile(store, get);
842    assertTrue(result.size()>0);
843
844    get.setTimeRange(10,45);
845    result = HBaseTestingUtil.getFromStoreFile(store, get);
846    assertTrue(result.size()>0);
847
848    get.setTimeRange(80,145);
849    result = HBaseTestingUtil.getFromStoreFile(store, get);
850    assertTrue(result.size()>0);
851
852    get.setTimeRange(1,2);
853    result = HBaseTestingUtil.getFromStoreFile(store, get);
854    assertTrue(result.size()>0);
855
856    get.setTimeRange(90,200);
857    result = HBaseTestingUtil.getFromStoreFile(store, get);
858    assertTrue(result.size()==0);
859  }
860
861  /**
862   * Test for HBASE-3492 - Test split on empty colfam (no store files).
863   *
864   * @throws IOException When the IO operations fail.
865   */
866  @Test
867  public void testSplitWithEmptyColFam() throws IOException {
868    init(this.name.getMethodName());
869    assertFalse(store.getSplitPoint().isPresent());
870  }
871
872  @Test
873  public void testStoreUsesConfigurationFromHcdAndHtd() throws Exception {
874    final String CONFIG_KEY = "hbase.regionserver.thread.compaction.throttle";
875    long anyValue = 10;
876
877    // We'll check that it uses correct config and propagates it appropriately by going thru
878    // the simplest "real" path I can find - "throttleCompaction", which just checks whether
879    // a number we pass in is higher than some config value, inside compactionPolicy.
880    Configuration conf = HBaseConfiguration.create();
881    conf.setLong(CONFIG_KEY, anyValue);
882    init(name.getMethodName() + "-xml", conf);
883    assertTrue(store.throttleCompaction(anyValue + 1));
884    assertFalse(store.throttleCompaction(anyValue));
885
886    // HTD overrides XML.
887    --anyValue;
888    init(name.getMethodName() + "-htd", conf, TableDescriptorBuilder
889        .newBuilder(TableName.valueOf(table)).setValue(CONFIG_KEY, Long.toString(anyValue)),
890      ColumnFamilyDescriptorBuilder.of(family));
891    assertTrue(store.throttleCompaction(anyValue + 1));
892    assertFalse(store.throttleCompaction(anyValue));
893
894    // HCD overrides them both.
895    --anyValue;
896    init(name.getMethodName() + "-hcd", conf,
897      TableDescriptorBuilder.newBuilder(TableName.valueOf(table)).setValue(CONFIG_KEY,
898        Long.toString(anyValue)),
899      ColumnFamilyDescriptorBuilder.newBuilder(family).setValue(CONFIG_KEY, Long.toString(anyValue))
900          .build());
901    assertTrue(store.throttleCompaction(anyValue + 1));
902    assertFalse(store.throttleCompaction(anyValue));
903  }
904
905  public static class DummyStoreEngine extends DefaultStoreEngine {
906    public static DefaultCompactor lastCreatedCompactor = null;
907
908    @Override
909    protected void createComponents(Configuration conf, HStore store, CellComparator comparator)
910        throws IOException {
911      super.createComponents(conf, store, comparator);
912      lastCreatedCompactor = this.compactor;
913    }
914  }
915
916  @Test
917  public void testStoreUsesSearchEngineOverride() throws Exception {
918    Configuration conf = HBaseConfiguration.create();
919    conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DummyStoreEngine.class.getName());
920    init(this.name.getMethodName(), conf);
921    assertEquals(DummyStoreEngine.lastCreatedCompactor,
922      this.store.storeEngine.getCompactor());
923  }
924
925  private void addStoreFile() throws IOException {
926    HStoreFile f = this.store.getStorefiles().iterator().next();
927    Path storedir = f.getPath().getParent();
928    long seqid = this.store.getMaxSequenceId().orElse(0L);
929    Configuration c = TEST_UTIL.getConfiguration();
930    FileSystem fs = FileSystem.get(c);
931    HFileContext fileContext = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build();
932    StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c),
933        fs)
934            .withOutputDir(storedir)
935            .withFileContext(fileContext)
936            .build();
937    w.appendMetadata(seqid + 1, false);
938    w.close();
939    LOG.info("Added store file:" + w.getPath());
940  }
941
942  private void archiveStoreFile(int index) throws IOException {
943    Collection<HStoreFile> files = this.store.getStorefiles();
944    HStoreFile sf = null;
945    Iterator<HStoreFile> it = files.iterator();
946    for (int i = 0; i <= index; i++) {
947      sf = it.next();
948    }
949    store.getRegionFileSystem().removeStoreFiles(store.getColumnFamilyName(), Lists.newArrayList(sf));
950  }
951
952  private void closeCompactedFile(int index) throws IOException {
953    Collection<HStoreFile> files =
954        this.store.getStoreEngine().getStoreFileManager().getCompactedfiles();
955    HStoreFile sf = null;
956    Iterator<HStoreFile> it = files.iterator();
957    for (int i = 0; i <= index; i++) {
958      sf = it.next();
959    }
960    sf.closeStoreFile(true);
961    store.getStoreEngine().getStoreFileManager().removeCompactedFiles(Lists.newArrayList(sf));
962  }
963
964  @Test
965  public void testRefreshStoreFiles() throws Exception {
966    init(name.getMethodName());
967
968    assertEquals(0, this.store.getStorefilesCount());
969
970    // Test refreshing store files when no store files are there
971    store.refreshStoreFiles();
972    assertEquals(0, this.store.getStorefilesCount());
973
974    // add some data, flush
975    this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
976    flush(1);
977    assertEquals(1, this.store.getStorefilesCount());
978
979    // add one more file
980    addStoreFile();
981
982    assertEquals(1, this.store.getStorefilesCount());
983    store.refreshStoreFiles();
984    assertEquals(2, this.store.getStorefilesCount());
985
986    // add three more files
987    addStoreFile();
988    addStoreFile();
989    addStoreFile();
990
991    assertEquals(2, this.store.getStorefilesCount());
992    store.refreshStoreFiles();
993    assertEquals(5, this.store.getStorefilesCount());
994
995    closeCompactedFile(0);
996    archiveStoreFile(0);
997
998    assertEquals(5, this.store.getStorefilesCount());
999    store.refreshStoreFiles();
1000    assertEquals(4, this.store.getStorefilesCount());
1001
1002    archiveStoreFile(0);
1003    archiveStoreFile(1);
1004    archiveStoreFile(2);
1005
1006    assertEquals(4, this.store.getStorefilesCount());
1007    store.refreshStoreFiles();
1008    assertEquals(1, this.store.getStorefilesCount());
1009
1010    archiveStoreFile(0);
1011    store.refreshStoreFiles();
1012    assertEquals(0, this.store.getStorefilesCount());
1013  }
1014
1015  @Test
1016  public void testRefreshStoreFilesNotChanged() throws IOException {
1017    init(name.getMethodName());
1018
1019    assertEquals(0, this.store.getStorefilesCount());
1020
1021    // add some data, flush
1022    this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
1023    flush(1);
1024    // add one more file
1025    addStoreFile();
1026
1027    HStore spiedStore = spy(store);
1028
1029    // call first time after files changed
1030    spiedStore.refreshStoreFiles();
1031    assertEquals(2, this.store.getStorefilesCount());
1032    verify(spiedStore, times(1)).replaceStoreFiles(any(), any());
1033
1034    // call second time
1035    spiedStore.refreshStoreFiles();
1036
1037    // ensure that replaceStoreFiles is not called, i.e, the times does not change, if files are not
1038    // refreshed,
1039    verify(spiedStore, times(1)).replaceStoreFiles(any(), any());
1040  }
1041
1042  private long countMemStoreScanner(StoreScanner scanner) {
1043    if (scanner.currentScanners == null) {
1044      return 0;
1045    }
1046    return scanner.currentScanners.stream().filter(s -> !s.isFileScanner()).count();
1047  }
1048
1049  @Test
1050  public void testNumberOfMemStoreScannersAfterFlush() throws IOException {
1051    long seqId = 100;
1052    long timestamp = EnvironmentEdgeManager.currentTime();
1053    Cell cell0 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family)
1054        .setQualifier(qf1).setTimestamp(timestamp).setType(Cell.Type.Put)
1055        .setValue(qf1).build();
1056    PrivateCellUtil.setSequenceId(cell0, seqId);
1057    testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Collections.emptyList());
1058
1059    Cell cell1 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family)
1060        .setQualifier(qf2).setTimestamp(timestamp).setType(Cell.Type.Put)
1061        .setValue(qf1).build();
1062    PrivateCellUtil.setSequenceId(cell1, seqId);
1063    testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1));
1064
1065    seqId = 101;
1066    timestamp = EnvironmentEdgeManager.currentTime();
1067    Cell cell2 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row2).setFamily(family)
1068        .setQualifier(qf2).setTimestamp(timestamp).setType(Cell.Type.Put)
1069        .setValue(qf1).build();
1070    PrivateCellUtil.setSequenceId(cell2, seqId);
1071    testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1, cell2));
1072  }
1073
1074  private void testNumberOfMemStoreScannersAfterFlush(List<Cell> inputCellsBeforeSnapshot,
1075      List<Cell> inputCellsAfterSnapshot) throws IOException {
1076    init(this.name.getMethodName() + "-" + inputCellsBeforeSnapshot.size());
1077    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1078    long seqId = Long.MIN_VALUE;
1079    for (Cell c : inputCellsBeforeSnapshot) {
1080      quals.add(CellUtil.cloneQualifier(c));
1081      seqId = Math.max(seqId, c.getSequenceId());
1082    }
1083    for (Cell c : inputCellsAfterSnapshot) {
1084      quals.add(CellUtil.cloneQualifier(c));
1085      seqId = Math.max(seqId, c.getSequenceId());
1086    }
1087    inputCellsBeforeSnapshot.forEach(c -> store.add(c, null));
1088    StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY);
1089    storeFlushCtx.prepare();
1090    inputCellsAfterSnapshot.forEach(c -> store.add(c, null));
1091    int numberOfMemScannersBeforeFlush = inputCellsAfterSnapshot.isEmpty() ? 1 : 2;
1092    try (StoreScanner s = (StoreScanner) store.getScanner(new Scan(), quals, seqId)) {
1093      // snapshot + active (if inputCellsAfterSnapshot isn't empty)
1094      assertEquals(numberOfMemScannersBeforeFlush, countMemStoreScanner(s));
1095      storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1096      storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1097      // snapshot has no data after flush
1098      int numberOfMemScannersAfterFlush = inputCellsAfterSnapshot.isEmpty() ? 0 : 1;
1099      boolean more;
1100      int cellCount = 0;
1101      do {
1102        List<Cell> cells = new ArrayList<>();
1103        more = s.next(cells);
1104        cellCount += cells.size();
1105        assertEquals(more ? numberOfMemScannersAfterFlush : 0, countMemStoreScanner(s));
1106      } while (more);
1107      assertEquals("The number of cells added before snapshot is " + inputCellsBeforeSnapshot.size()
1108          + ", The number of cells added after snapshot is " + inputCellsAfterSnapshot.size(),
1109          inputCellsBeforeSnapshot.size() + inputCellsAfterSnapshot.size(), cellCount);
1110      // the current scanners is cleared
1111      assertEquals(0, countMemStoreScanner(s));
1112    }
1113  }
1114
1115  private Cell createCell(byte[] qualifier, long ts, long sequenceId, byte[] value)
1116      throws IOException {
1117    return createCell(row, qualifier, ts, sequenceId, value);
1118  }
1119
1120  private Cell createCell(byte[] row, byte[] qualifier, long ts, long sequenceId, byte[] value)
1121      throws IOException {
1122    Cell c = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family)
1123        .setQualifier(qualifier).setTimestamp(ts).setType(Cell.Type.Put)
1124        .setValue(value).build();
1125    PrivateCellUtil.setSequenceId(c, sequenceId);
1126    return c;
1127  }
1128
1129  @Test
1130  public void testFlushBeforeCompletingScanWoFilter() throws IOException, InterruptedException {
1131    final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false);
1132    final int expectedSize = 3;
1133    testFlushBeforeCompletingScan(new MyListHook() {
1134      @Override
1135      public void hook(int currentSize) {
1136        if (currentSize == expectedSize - 1) {
1137          try {
1138            flushStore(store, id++);
1139            timeToGoNextRow.set(true);
1140          } catch (IOException e) {
1141            throw new RuntimeException(e);
1142          }
1143        }
1144      }
1145    }, new FilterBase() {
1146      @Override
1147      public Filter.ReturnCode filterCell(final Cell c) throws IOException {
1148        return ReturnCode.INCLUDE;
1149      }
1150    }, expectedSize);
1151  }
1152
1153  @Test
1154  public void testFlushBeforeCompletingScanWithFilter() throws IOException, InterruptedException {
1155    final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false);
1156    final int expectedSize = 2;
1157    testFlushBeforeCompletingScan(new MyListHook() {
1158      @Override
1159      public void hook(int currentSize) {
1160        if (currentSize == expectedSize - 1) {
1161          try {
1162            flushStore(store, id++);
1163            timeToGoNextRow.set(true);
1164          } catch (IOException e) {
1165            throw new RuntimeException(e);
1166          }
1167        }
1168      }
1169    }, new FilterBase() {
1170      @Override
1171      public Filter.ReturnCode filterCell(final Cell c) throws IOException {
1172        if (timeToGoNextRow.get()) {
1173          timeToGoNextRow.set(false);
1174          return ReturnCode.NEXT_ROW;
1175        } else {
1176          return ReturnCode.INCLUDE;
1177        }
1178      }
1179    }, expectedSize);
1180  }
1181
1182  @Test
1183  public void testFlushBeforeCompletingScanWithFilterHint() throws IOException,
1184      InterruptedException {
1185    final AtomicBoolean timeToGetHint = new AtomicBoolean(false);
1186    final int expectedSize = 2;
1187    testFlushBeforeCompletingScan(new MyListHook() {
1188      @Override
1189      public void hook(int currentSize) {
1190        if (currentSize == expectedSize - 1) {
1191          try {
1192            flushStore(store, id++);
1193            timeToGetHint.set(true);
1194          } catch (IOException e) {
1195            throw new RuntimeException(e);
1196          }
1197        }
1198      }
1199    }, new FilterBase() {
1200      @Override
1201      public Filter.ReturnCode filterCell(final Cell c) throws IOException {
1202        if (timeToGetHint.get()) {
1203          timeToGetHint.set(false);
1204          return Filter.ReturnCode.SEEK_NEXT_USING_HINT;
1205        } else {
1206          return Filter.ReturnCode.INCLUDE;
1207        }
1208      }
1209      @Override
1210      public Cell getNextCellHint(Cell currentCell) throws IOException {
1211        return currentCell;
1212      }
1213    }, expectedSize);
1214  }
1215
1216  private void testFlushBeforeCompletingScan(MyListHook hook, Filter filter, int expectedSize)
1217          throws IOException, InterruptedException {
1218    Configuration conf = HBaseConfiguration.create();
1219    byte[] r0 = Bytes.toBytes("row0");
1220    byte[] r1 = Bytes.toBytes("row1");
1221    byte[] r2 = Bytes.toBytes("row2");
1222    byte[] value0 = Bytes.toBytes("value0");
1223    byte[] value1 = Bytes.toBytes("value1");
1224    byte[] value2 = Bytes.toBytes("value2");
1225    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1226    long ts = EnvironmentEdgeManager.currentTime();
1227    long seqId = 100;
1228    init(name.getMethodName(), conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)),
1229      ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(1).build(),
1230      new MyStoreHook() {
1231        @Override
1232        public long getSmallestReadPoint(HStore store) {
1233          return seqId + 3;
1234        }
1235      });
1236    // The cells having the value0 won't be flushed to disk because the value of max version is 1
1237    store.add(createCell(r0, qf1, ts, seqId, value0), memStoreSizing);
1238    store.add(createCell(r0, qf2, ts, seqId, value0), memStoreSizing);
1239    store.add(createCell(r0, qf3, ts, seqId, value0), memStoreSizing);
1240    store.add(createCell(r1, qf1, ts + 1, seqId + 1, value1), memStoreSizing);
1241    store.add(createCell(r1, qf2, ts + 1, seqId + 1, value1), memStoreSizing);
1242    store.add(createCell(r1, qf3, ts + 1, seqId + 1, value1), memStoreSizing);
1243    store.add(createCell(r2, qf1, ts + 2, seqId + 2, value2), memStoreSizing);
1244    store.add(createCell(r2, qf2, ts + 2, seqId + 2, value2), memStoreSizing);
1245    store.add(createCell(r2, qf3, ts + 2, seqId + 2, value2), memStoreSizing);
1246    store.add(createCell(r1, qf1, ts + 3, seqId + 3, value1), memStoreSizing);
1247    store.add(createCell(r1, qf2, ts + 3, seqId + 3, value1), memStoreSizing);
1248    store.add(createCell(r1, qf3, ts + 3, seqId + 3, value1), memStoreSizing);
1249    List<Cell> myList = new MyList<>(hook);
1250    Scan scan = new Scan()
1251            .withStartRow(r1)
1252            .setFilter(filter);
1253    try (InternalScanner scanner = (InternalScanner) store.getScanner(
1254          scan, null, seqId + 3)){
1255      // r1
1256      scanner.next(myList);
1257      assertEquals(expectedSize, myList.size());
1258      for (Cell c : myList) {
1259        byte[] actualValue = CellUtil.cloneValue(c);
1260        assertTrue("expected:" + Bytes.toStringBinary(value1)
1261          + ", actual:" + Bytes.toStringBinary(actualValue)
1262          , Bytes.equals(actualValue, value1));
1263      }
1264      List<Cell> normalList = new ArrayList<>(3);
1265      // r2
1266      scanner.next(normalList);
1267      assertEquals(3, normalList.size());
1268      for (Cell c : normalList) {
1269        byte[] actualValue = CellUtil.cloneValue(c);
1270        assertTrue("expected:" + Bytes.toStringBinary(value2)
1271          + ", actual:" + Bytes.toStringBinary(actualValue)
1272          , Bytes.equals(actualValue, value2));
1273      }
1274    }
1275  }
1276
1277  @Test
1278  public void testCreateScannerAndSnapshotConcurrently() throws IOException, InterruptedException {
1279    Configuration conf = HBaseConfiguration.create();
1280    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore.class.getName());
1281    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
1282        .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
1283    byte[] value = Bytes.toBytes("value");
1284    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1285    long ts = EnvironmentEdgeManager.currentTime();
1286    long seqId = 100;
1287    // older data whihc shouldn't be "seen" by client
1288    store.add(createCell(qf1, ts, seqId, value), memStoreSizing);
1289    store.add(createCell(qf2, ts, seqId, value), memStoreSizing);
1290    store.add(createCell(qf3, ts, seqId, value), memStoreSizing);
1291    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1292    quals.add(qf1);
1293    quals.add(qf2);
1294    quals.add(qf3);
1295    StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY);
1296    MyCompactingMemStore.START_TEST.set(true);
1297    Runnable flush = () -> {
1298      // this is blocked until we create first scanner from pipeline and snapshot -- phase (1/5)
1299      // recreate the active memstore -- phase (4/5)
1300      storeFlushCtx.prepare();
1301    };
1302    ExecutorService service = Executors.newSingleThreadExecutor();
1303    service.submit(flush);
1304    // we get scanner from pipeline and snapshot but they are empty. -- phase (2/5)
1305    // this is blocked until we recreate the active memstore -- phase (3/5)
1306    // we get scanner from active memstore but it is empty -- phase (5/5)
1307    InternalScanner scanner = (InternalScanner) store.getScanner(
1308          new Scan(new Get(row)), quals, seqId + 1);
1309    service.shutdown();
1310    service.awaitTermination(20, TimeUnit.SECONDS);
1311    try {
1312      try {
1313        List<Cell> results = new ArrayList<>();
1314        scanner.next(results);
1315        assertEquals(3, results.size());
1316        for (Cell c : results) {
1317          byte[] actualValue = CellUtil.cloneValue(c);
1318          assertTrue("expected:" + Bytes.toStringBinary(value)
1319            + ", actual:" + Bytes.toStringBinary(actualValue)
1320            , Bytes.equals(actualValue, value));
1321        }
1322      } finally {
1323        scanner.close();
1324      }
1325    } finally {
1326      MyCompactingMemStore.START_TEST.set(false);
1327      storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1328      storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1329    }
1330  }
1331
1332  @Test
1333  public void testScanWithDoubleFlush() throws IOException {
1334    Configuration conf = HBaseConfiguration.create();
1335    // Initialize region
1336    MyStore myStore = initMyStore(name.getMethodName(), conf, new MyStoreHook(){
1337      @Override
1338      public void getScanners(MyStore store) throws IOException {
1339        final long tmpId = id++;
1340        ExecutorService s = Executors.newSingleThreadExecutor();
1341        s.submit(() -> {
1342          try {
1343            // flush the store before storescanner updates the scanners from store.
1344            // The current data will be flushed into files, and the memstore will
1345            // be clear.
1346            // -- phase (4/4)
1347            flushStore(store, tmpId);
1348          }catch (IOException ex) {
1349            throw new RuntimeException(ex);
1350          }
1351        });
1352        s.shutdown();
1353        try {
1354          // wait for the flush, the thread will be blocked in HStore#notifyChangedReadersObservers.
1355          s.awaitTermination(3, TimeUnit.SECONDS);
1356        } catch (InterruptedException ex) {
1357        }
1358      }
1359    });
1360    byte[] oldValue = Bytes.toBytes("oldValue");
1361    byte[] currentValue = Bytes.toBytes("currentValue");
1362    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1363    long ts = EnvironmentEdgeManager.currentTime();
1364    long seqId = 100;
1365    // older data whihc shouldn't be "seen" by client
1366    myStore.add(createCell(qf1, ts, seqId, oldValue), memStoreSizing);
1367    myStore.add(createCell(qf2, ts, seqId, oldValue), memStoreSizing);
1368    myStore.add(createCell(qf3, ts, seqId, oldValue), memStoreSizing);
1369    long snapshotId = id++;
1370    // push older data into snapshot -- phase (1/4)
1371    StoreFlushContext storeFlushCtx = store.createFlushContext(snapshotId, FlushLifeCycleTracker
1372        .DUMMY);
1373    storeFlushCtx.prepare();
1374
1375    // insert current data into active -- phase (2/4)
1376    myStore.add(createCell(qf1, ts + 1, seqId + 1, currentValue), memStoreSizing);
1377    myStore.add(createCell(qf2, ts + 1, seqId + 1, currentValue), memStoreSizing);
1378    myStore.add(createCell(qf3, ts + 1, seqId + 1, currentValue), memStoreSizing);
1379    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1380    quals.add(qf1);
1381    quals.add(qf2);
1382    quals.add(qf3);
1383    try (InternalScanner scanner = (InternalScanner) myStore.getScanner(
1384        new Scan(new Get(row)), quals, seqId + 1)) {
1385      // complete the flush -- phase (3/4)
1386      storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1387      storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1388
1389      List<Cell> results = new ArrayList<>();
1390      scanner.next(results);
1391      assertEquals(3, results.size());
1392      for (Cell c : results) {
1393        byte[] actualValue = CellUtil.cloneValue(c);
1394        assertTrue("expected:" + Bytes.toStringBinary(currentValue)
1395          + ", actual:" + Bytes.toStringBinary(actualValue)
1396          , Bytes.equals(actualValue, currentValue));
1397      }
1398    }
1399  }
1400
1401  @Test
1402  public void testReclaimChunkWhenScaning() throws IOException {
1403    init("testReclaimChunkWhenScaning");
1404    long ts = EnvironmentEdgeManager.currentTime();
1405    long seqId = 100;
1406    byte[] value = Bytes.toBytes("value");
1407    // older data whihc shouldn't be "seen" by client
1408    store.add(createCell(qf1, ts, seqId, value), null);
1409    store.add(createCell(qf2, ts, seqId, value), null);
1410    store.add(createCell(qf3, ts, seqId, value), null);
1411    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1412    quals.add(qf1);
1413    quals.add(qf2);
1414    quals.add(qf3);
1415    try (InternalScanner scanner = (InternalScanner) store.getScanner(
1416        new Scan(new Get(row)), quals, seqId)) {
1417      List<Cell> results = new MyList<>(size -> {
1418        switch (size) {
1419          // 1) we get the first cell (qf1)
1420          // 2) flush the data to have StoreScanner update inner scanners
1421          // 3) the chunk will be reclaimed after updaing
1422          case 1:
1423            try {
1424              flushStore(store, id++);
1425            } catch (IOException e) {
1426              throw new RuntimeException(e);
1427            }
1428            break;
1429          // 1) we get the second cell (qf2)
1430          // 2) add some cell to fill some byte into the chunk (we have only one chunk)
1431          case 2:
1432            try {
1433              byte[] newValue = Bytes.toBytes("newValue");
1434              // older data whihc shouldn't be "seen" by client
1435              store.add(createCell(qf1, ts + 1, seqId + 1, newValue), null);
1436              store.add(createCell(qf2, ts + 1, seqId + 1, newValue), null);
1437              store.add(createCell(qf3, ts + 1, seqId + 1, newValue), null);
1438            } catch (IOException e) {
1439              throw new RuntimeException(e);
1440            }
1441            break;
1442          default:
1443            break;
1444        }
1445      });
1446      scanner.next(results);
1447      assertEquals(3, results.size());
1448      for (Cell c : results) {
1449        byte[] actualValue = CellUtil.cloneValue(c);
1450        assertTrue("expected:" + Bytes.toStringBinary(value)
1451          + ", actual:" + Bytes.toStringBinary(actualValue)
1452          , Bytes.equals(actualValue, value));
1453      }
1454    }
1455  }
1456
1457  /**
1458   * If there are two running InMemoryFlushRunnable, the later InMemoryFlushRunnable
1459   * may change the versionedList. And the first InMemoryFlushRunnable will use the chagned
1460   * versionedList to remove the corresponding segments.
1461   * In short, there will be some segements which isn't in merge are removed.
1462   */
1463  @Test
1464  public void testRunDoubleMemStoreCompactors() throws IOException, InterruptedException {
1465    int flushSize = 500;
1466    Configuration conf = HBaseConfiguration.create();
1467    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStoreWithCustomCompactor.class.getName());
1468    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.25);
1469    MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.set(0);
1470    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushSize));
1471    // Set the lower threshold to invoke the "MERGE" policy
1472    conf.set(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, String.valueOf(0));
1473    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
1474        .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
1475    byte[] value = Bytes.toBytes("thisisavarylargevalue");
1476    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1477    long ts = EnvironmentEdgeManager.currentTime();
1478    long seqId = 100;
1479    // older data whihc shouldn't be "seen" by client
1480    store.add(createCell(qf1, ts, seqId, value), memStoreSizing);
1481    store.add(createCell(qf2, ts, seqId, value), memStoreSizing);
1482    store.add(createCell(qf3, ts, seqId, value), memStoreSizing);
1483    assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
1484    StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY);
1485    storeFlushCtx.prepare();
1486    // This shouldn't invoke another in-memory flush because the first compactor thread
1487    // hasn't accomplished the in-memory compaction.
1488    store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing);
1489    store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing);
1490    store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing);
1491    assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
1492    //okay. Let the compaction be completed
1493    MyMemStoreCompactor.START_COMPACTOR_LATCH.countDown();
1494    CompactingMemStore mem = (CompactingMemStore) ((HStore)store).memstore;
1495    while (mem.isMemStoreFlushingInMemory()) {
1496      TimeUnit.SECONDS.sleep(1);
1497    }
1498    // This should invoke another in-memory flush.
1499    store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing);
1500    store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing);
1501    store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing);
1502    assertEquals(2, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
1503    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
1504      String.valueOf(TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE));
1505    storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1506    storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1507  }
1508
1509  @Test
1510  public void testAge() throws IOException {
1511    long currentTime = EnvironmentEdgeManager.currentTime();
1512    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
1513    edge.setValue(currentTime);
1514    EnvironmentEdgeManager.injectEdge(edge);
1515    Configuration conf = TEST_UTIL.getConfiguration();
1516    ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(family);
1517    initHRegion(name.getMethodName(), conf,
1518      TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), hcd, null, false);
1519    HStore store = new HStore(region, hcd, conf, false) {
1520
1521      @Override
1522      protected StoreEngine<?, ?, ?, ?> createStoreEngine(HStore store, Configuration conf,
1523          CellComparator kvComparator) throws IOException {
1524        List<HStoreFile> storefiles =
1525            Arrays.asList(mockStoreFile(currentTime - 10), mockStoreFile(currentTime - 100),
1526              mockStoreFile(currentTime - 1000), mockStoreFile(currentTime - 10000));
1527        StoreFileManager sfm = mock(StoreFileManager.class);
1528        when(sfm.getStorefiles()).thenReturn(storefiles);
1529        StoreEngine<?, ?, ?, ?> storeEngine = mock(StoreEngine.class);
1530        when(storeEngine.getStoreFileManager()).thenReturn(sfm);
1531        return storeEngine;
1532      }
1533    };
1534    assertEquals(10L, store.getMinStoreFileAge().getAsLong());
1535    assertEquals(10000L, store.getMaxStoreFileAge().getAsLong());
1536    assertEquals((10 + 100 + 1000 + 10000) / 4.0, store.getAvgStoreFileAge().getAsDouble(), 1E-4);
1537  }
1538
1539  private HStoreFile mockStoreFile(long createdTime) {
1540    StoreFileInfo info = mock(StoreFileInfo.class);
1541    when(info.getCreatedTimestamp()).thenReturn(createdTime);
1542    HStoreFile sf = mock(HStoreFile.class);
1543    when(sf.getReader()).thenReturn(mock(StoreFileReader.class));
1544    when(sf.isHFile()).thenReturn(true);
1545    when(sf.getFileInfo()).thenReturn(info);
1546    return sf;
1547  }
1548
1549  private MyStore initMyStore(String methodName, Configuration conf, MyStoreHook hook)
1550      throws IOException {
1551    return (MyStore) init(methodName, conf,
1552      TableDescriptorBuilder.newBuilder(TableName.valueOf(table)),
1553      ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(5).build(), hook);
1554  }
1555
1556  private static class MyStore extends HStore {
1557    private final MyStoreHook hook;
1558
1559    MyStore(final HRegion region, final ColumnFamilyDescriptor family, final Configuration
1560        confParam, MyStoreHook hook, boolean switchToPread) throws IOException {
1561      super(region, family, confParam, false);
1562      this.hook = hook;
1563    }
1564
1565    @Override
1566    public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks,
1567        boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,
1568        boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt,
1569        boolean includeMemstoreScanner) throws IOException {
1570      hook.getScanners(this);
1571      return super.getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true,
1572        stopRow, false, readPt, includeMemstoreScanner);
1573    }
1574
1575    @Override
1576    public long getSmallestReadPoint() {
1577      return hook.getSmallestReadPoint(this);
1578    }
1579  }
1580
1581  private abstract static class MyStoreHook {
1582
1583    void getScanners(MyStore store) throws IOException {
1584    }
1585
1586    long getSmallestReadPoint(HStore store) {
1587      return store.getHRegion().getSmallestReadPoint();
1588    }
1589  }
1590
1591  @Test
1592  public void testSwitchingPreadtoStreamParallelyWithCompactionDischarger() throws Exception {
1593    Configuration conf = HBaseConfiguration.create();
1594    conf.set("hbase.hstore.engine.class", DummyStoreEngine.class.getName());
1595    conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 0);
1596    // Set the lower threshold to invoke the "MERGE" policy
1597    MyStore store = initMyStore(name.getMethodName(), conf, new MyStoreHook() {});
1598    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1599    long ts = EnvironmentEdgeManager.currentTime();
1600    long seqID = 1L;
1601    // Add some data to the region and do some flushes
1602    for (int i = 1; i < 10; i++) {
1603      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1604        memStoreSizing);
1605    }
1606    // flush them
1607    flushStore(store, seqID);
1608    for (int i = 11; i < 20; i++) {
1609      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1610        memStoreSizing);
1611    }
1612    // flush them
1613    flushStore(store, seqID);
1614    for (int i = 21; i < 30; i++) {
1615      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1616        memStoreSizing);
1617    }
1618    // flush them
1619    flushStore(store, seqID);
1620
1621    assertEquals(3, store.getStorefilesCount());
1622    Scan scan = new Scan();
1623    scan.addFamily(family);
1624    Collection<HStoreFile> storefiles2 = store.getStorefiles();
1625    ArrayList<HStoreFile> actualStorefiles = Lists.newArrayList(storefiles2);
1626    StoreScanner storeScanner =
1627        (StoreScanner) store.getScanner(scan, scan.getFamilyMap().get(family), Long.MAX_VALUE);
1628    // get the current heap
1629    KeyValueHeap heap = storeScanner.heap;
1630    // create more store files
1631    for (int i = 31; i < 40; i++) {
1632      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1633        memStoreSizing);
1634    }
1635    // flush them
1636    flushStore(store, seqID);
1637
1638    for (int i = 41; i < 50; i++) {
1639      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1640        memStoreSizing);
1641    }
1642    // flush them
1643    flushStore(store, seqID);
1644    storefiles2 = store.getStorefiles();
1645    ArrayList<HStoreFile> actualStorefiles1 = Lists.newArrayList(storefiles2);
1646    actualStorefiles1.removeAll(actualStorefiles);
1647    // Do compaction
1648    MyThread thread = new MyThread(storeScanner);
1649    thread.start();
1650    store.replaceStoreFiles(actualStorefiles, actualStorefiles1);
1651    thread.join();
1652    KeyValueHeap heap2 = thread.getHeap();
1653    assertFalse(heap.equals(heap2));
1654  }
1655
1656  @Test
1657  public void testMaxPreadBytesConfiguredToBeLessThanZero() throws Exception {
1658    Configuration conf = HBaseConfiguration.create();
1659    conf.set("hbase.hstore.engine.class", DummyStoreEngine.class.getName());
1660    // Set 'hbase.storescanner.pread.max.bytes' < 0, so that StoreScanner will be a STREAM type.
1661    conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, -1);
1662    MyStore store = initMyStore(name.getMethodName(), conf, new MyStoreHook() {});
1663    Scan scan = new Scan();
1664    scan.addFamily(family);
1665    // ReadType on Scan is still DEFAULT only.
1666    assertEquals(ReadType.DEFAULT, scan.getReadType());
1667    StoreScanner storeScanner = (StoreScanner) store.getScanner(scan,
1668        scan.getFamilyMap().get(family), Long.MAX_VALUE);
1669    assertFalse(storeScanner.isScanUsePread());
1670  }
1671
1672  @Test
1673  public void testSpaceQuotaChangeAfterReplacement() throws IOException {
1674    final TableName tn = TableName.valueOf(name.getMethodName());
1675    init(name.getMethodName());
1676
1677    RegionSizeStoreImpl sizeStore = new RegionSizeStoreImpl();
1678
1679    HStoreFile sf1 = mockStoreFileWithLength(1024L);
1680    HStoreFile sf2 = mockStoreFileWithLength(2048L);
1681    HStoreFile sf3 = mockStoreFileWithLength(4096L);
1682    HStoreFile sf4 = mockStoreFileWithLength(8192L);
1683
1684    RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tn).setStartKey(Bytes.toBytes("a"))
1685        .setEndKey(Bytes.toBytes("b")).build();
1686
1687    // Compacting two files down to one, reducing size
1688    sizeStore.put(regionInfo, 1024L + 4096L);
1689    store.updateSpaceQuotaAfterFileReplacement(
1690        sizeStore, regionInfo, Arrays.asList(sf1, sf3), Arrays.asList(sf2));
1691
1692    assertEquals(2048L, sizeStore.getRegionSize(regionInfo).getSize());
1693
1694    // The same file length in and out should have no change
1695    store.updateSpaceQuotaAfterFileReplacement(
1696        sizeStore, regionInfo, Arrays.asList(sf2), Arrays.asList(sf2));
1697
1698    assertEquals(2048L, sizeStore.getRegionSize(regionInfo).getSize());
1699
1700    // Increase the total size used
1701    store.updateSpaceQuotaAfterFileReplacement(
1702        sizeStore, regionInfo, Arrays.asList(sf2), Arrays.asList(sf3));
1703
1704    assertEquals(4096L, sizeStore.getRegionSize(regionInfo).getSize());
1705
1706    RegionInfo regionInfo2 = RegionInfoBuilder.newBuilder(tn).setStartKey(Bytes.toBytes("b"))
1707        .setEndKey(Bytes.toBytes("c")).build();
1708    store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo2, null, Arrays.asList(sf4));
1709
1710    assertEquals(8192L, sizeStore.getRegionSize(regionInfo2).getSize());
1711  }
1712
1713  @Test
1714  public void testHFileContextSetWithCFAndTable() throws Exception {
1715    init(this.name.getMethodName());
1716    StoreFileWriter writer = store.createWriterInTmp(10000L,
1717        Compression.Algorithm.NONE, false, true, false, true);
1718    HFileContext hFileContext = writer.getHFileWriter().getFileContext();
1719    assertArrayEquals(family, hFileContext.getColumnFamily());
1720    assertArrayEquals(table, hFileContext.getTableName());
1721  }
1722
1723  // This test is for HBASE-26026, HBase Write be stuck when active segment has no cell
1724  // but its dataSize exceeds inmemoryFlushSize
1725  @Test
1726  public void testCompactingMemStoreNoCellButDataSizeExceedsInmemoryFlushSize()
1727      throws IOException, InterruptedException {
1728    Configuration conf = HBaseConfiguration.create();
1729
1730    byte[] smallValue = new byte[3];
1731    byte[] largeValue = new byte[9];
1732    final long timestamp = EnvironmentEdgeManager.currentTime();
1733    final long seqId = 100;
1734    final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
1735    final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
1736    int smallCellByteSize = MutableSegment.getCellLength(smallCell);
1737    int largeCellByteSize = MutableSegment.getCellLength(largeCell);
1738    int flushByteSize = smallCellByteSize + largeCellByteSize - 2;
1739
1740    // set CompactingMemStore.inmemoryFlushSize to flushByteSize.
1741    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore2.class.getName());
1742    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
1743    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
1744
1745    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
1746        .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
1747
1748    MyCompactingMemStore2 myCompactingMemStore = ((MyCompactingMemStore2) store.memstore);
1749    assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
1750    myCompactingMemStore.smallCellPreUpdateCounter.set(0);
1751    myCompactingMemStore.largeCellPreUpdateCounter.set(0);
1752
1753    final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
1754    Thread smallCellThread = new Thread(() -> {
1755      try {
1756        store.add(smallCell, new NonThreadSafeMemStoreSizing());
1757      } catch (Throwable exception) {
1758        exceptionRef.set(exception);
1759      }
1760    });
1761    smallCellThread.setName(MyCompactingMemStore2.SMALL_CELL_THREAD_NAME);
1762    smallCellThread.start();
1763
1764    String oldThreadName = Thread.currentThread().getName();
1765    try {
1766      /**
1767       * 1.smallCellThread enters CompactingMemStore.checkAndAddToActiveSize first, then
1768       * largeCellThread enters CompactingMemStore.checkAndAddToActiveSize, and then largeCellThread
1769       * invokes flushInMemory.
1770       * <p/>
1771       * 2. After largeCellThread finished CompactingMemStore.flushInMemory method, smallCellThread
1772       * can add cell to currentActive . That is to say when largeCellThread called flushInMemory
1773       * method, CompactingMemStore.active has no cell.
1774       */
1775      Thread.currentThread().setName(MyCompactingMemStore2.LARGE_CELL_THREAD_NAME);
1776      store.add(largeCell, new NonThreadSafeMemStoreSizing());
1777      smallCellThread.join();
1778
1779      for (int i = 0; i < 100; i++) {
1780        long currentTimestamp = timestamp + 100 + i;
1781        Cell cell = createCell(qf2, currentTimestamp, seqId, largeValue);
1782        store.add(cell, new NonThreadSafeMemStoreSizing());
1783      }
1784    } finally {
1785      Thread.currentThread().setName(oldThreadName);
1786    }
1787
1788    assertTrue(exceptionRef.get() == null);
1789
1790  }
1791
1792  // This test is for HBASE-26210, HBase Write be stuck when there is cell which size exceeds
1793  // InmemoryFlushSize
1794  @Test(timeout = 60000)
1795  public void testCompactingMemStoreCellExceedInmemoryFlushSize()
1796      throws Exception {
1797    Configuration conf = HBaseConfiguration.create();
1798    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore6.class.getName());
1799
1800    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
1801        .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
1802
1803    MyCompactingMemStore6 myCompactingMemStore = ((MyCompactingMemStore6) store.memstore);
1804
1805    int size = (int) (myCompactingMemStore.getInmemoryFlushSize());
1806    byte[] value = new byte[size + 1];
1807
1808    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1809    long timestamp = EnvironmentEdgeManager.currentTime();
1810    long seqId = 100;
1811    Cell cell = createCell(qf1, timestamp, seqId, value);
1812    int cellByteSize = MutableSegment.getCellLength(cell);
1813    store.add(cell, memStoreSizing);
1814    assertTrue(memStoreSizing.getCellsCount() == 1);
1815    assertTrue(memStoreSizing.getDataSize() == cellByteSize);
1816    // Waiting the in memory compaction completed, see HBASE-26438
1817    myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
1818  }
1819
1820  // This test is for HBASE-26210 also, test write large cell and small cell concurrently when
1821  // InmemoryFlushSize is smaller,equal with and larger than cell size.
1822  @Test
1823  public void testCompactingMemStoreWriteLargeCellAndSmallCellConcurrently()
1824      throws IOException, InterruptedException {
1825    doWriteTestLargeCellAndSmallCellConcurrently(
1826      (smallCellByteSize, largeCellByteSize) -> largeCellByteSize - 1);
1827    doWriteTestLargeCellAndSmallCellConcurrently(
1828      (smallCellByteSize, largeCellByteSize) -> largeCellByteSize);
1829    doWriteTestLargeCellAndSmallCellConcurrently(
1830      (smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize - 1);
1831    doWriteTestLargeCellAndSmallCellConcurrently(
1832      (smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize);
1833    doWriteTestLargeCellAndSmallCellConcurrently(
1834      (smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize + 1);
1835  }
1836
1837  private void doWriteTestLargeCellAndSmallCellConcurrently(
1838      IntBinaryOperator getFlushByteSize)
1839      throws IOException, InterruptedException {
1840
1841    Configuration conf = HBaseConfiguration.create();
1842
1843    byte[] smallValue = new byte[3];
1844    byte[] largeValue = new byte[100];
1845    final long timestamp = EnvironmentEdgeManager.currentTime();
1846    final long seqId = 100;
1847    final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
1848    final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
1849    int smallCellByteSize = MutableSegment.getCellLength(smallCell);
1850    int largeCellByteSize = MutableSegment.getCellLength(largeCell);
1851    int flushByteSize = getFlushByteSize.applyAsInt(smallCellByteSize, largeCellByteSize);
1852    boolean flushByteSizeLessThanSmallAndLargeCellSize =
1853        flushByteSize < (smallCellByteSize + largeCellByteSize);
1854
1855    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore3.class.getName());
1856    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
1857    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
1858
1859
1860    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
1861        .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
1862
1863    MyCompactingMemStore3 myCompactingMemStore = ((MyCompactingMemStore3) store.memstore);
1864    assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
1865    myCompactingMemStore.disableCompaction();
1866    if (flushByteSizeLessThanSmallAndLargeCellSize) {
1867      myCompactingMemStore.flushByteSizeLessThanSmallAndLargeCellSize = true;
1868    } else {
1869      myCompactingMemStore.flushByteSizeLessThanSmallAndLargeCellSize = false;
1870    }
1871
1872
1873    final ThreadSafeMemStoreSizing memStoreSizing = new ThreadSafeMemStoreSizing();
1874    final AtomicLong totalCellByteSize = new AtomicLong(0);
1875    final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
1876    Thread smallCellThread = new Thread(() -> {
1877      try {
1878        for (int i = 1; i <= MyCompactingMemStore3.CELL_COUNT; i++) {
1879          long currentTimestamp = timestamp + i;
1880          Cell cell = createCell(qf1, currentTimestamp, seqId, smallValue);
1881          totalCellByteSize.addAndGet(MutableSegment.getCellLength(cell));
1882          store.add(cell, memStoreSizing);
1883        }
1884      } catch (Throwable exception) {
1885        exceptionRef.set(exception);
1886
1887      }
1888    });
1889    smallCellThread.setName(MyCompactingMemStore3.SMALL_CELL_THREAD_NAME);
1890    smallCellThread.start();
1891
1892    String oldThreadName = Thread.currentThread().getName();
1893    try {
1894      /**
1895       * When flushByteSizeLessThanSmallAndLargeCellSize is true:
1896       * </p>
1897       * 1.smallCellThread enters MyCompactingMemStore3.checkAndAddToActiveSize first, then
1898       * largeCellThread enters MyCompactingMemStore3.checkAndAddToActiveSize, and then
1899       * largeCellThread invokes flushInMemory.
1900       * <p/>
1901       * 2. After largeCellThread finished CompactingMemStore.flushInMemory method, smallCellThread
1902       * can run into MyCompactingMemStore3.checkAndAddToActiveSize again.
1903       * <p/>
1904       * When flushByteSizeLessThanSmallAndLargeCellSize is false: smallCellThread and
1905       * largeCellThread concurrently write one cell and wait each other, and then write another
1906       * cell etc.
1907       */
1908      Thread.currentThread().setName(MyCompactingMemStore3.LARGE_CELL_THREAD_NAME);
1909      for (int i = 1; i <= MyCompactingMemStore3.CELL_COUNT; i++) {
1910        long currentTimestamp = timestamp + i;
1911        Cell cell = createCell(qf2, currentTimestamp, seqId, largeValue);
1912        totalCellByteSize.addAndGet(MutableSegment.getCellLength(cell));
1913        store.add(cell, memStoreSizing);
1914      }
1915      smallCellThread.join();
1916
1917      assertTrue(exceptionRef.get() == null);
1918      assertTrue(memStoreSizing.getCellsCount() == (MyCompactingMemStore3.CELL_COUNT * 2));
1919      assertTrue(memStoreSizing.getDataSize() == totalCellByteSize.get());
1920      if (flushByteSizeLessThanSmallAndLargeCellSize) {
1921        assertTrue(myCompactingMemStore.flushCounter.get() == MyCompactingMemStore3.CELL_COUNT);
1922      } else {
1923        assertTrue(
1924          myCompactingMemStore.flushCounter.get() <= (MyCompactingMemStore3.CELL_COUNT - 1));
1925      }
1926    } finally {
1927      Thread.currentThread().setName(oldThreadName);
1928    }
1929  }
1930
1931  /**
1932   * <pre>
1933   * This test is for HBASE-26384,
1934   * test {@link CompactingMemStore#flattenOneSegment} and {@link CompactingMemStore#snapshot()}
1935   * execute concurrently.
1936   * The threads sequence before HBASE-26384 is(The bug only exists for branch-2,and I add UTs
1937   * for both branch-2 and master):
1938   * 1. The {@link CompactingMemStore} size exceeds
1939   *    {@link CompactingMemStore#getInmemoryFlushSize()},the write thread adds a new
1940   *    {@link ImmutableSegment}  to the head of {@link CompactingMemStore#pipeline},and start a
1941   *    in memory compact thread to execute {@link CompactingMemStore#inMemoryCompaction}.
1942   * 2. The in memory compact thread starts and then stopping before
1943   *    {@link CompactingMemStore#flattenOneSegment}.
1944   * 3. The snapshot thread starts {@link CompactingMemStore#snapshot} concurrently,after the
1945   *    snapshot thread executing {@link CompactingMemStore#getImmutableSegments},the in memory
1946   *    compact thread continues.
1947   *    Assuming {@link VersionedSegmentsList#version} returned from
1948   *    {@link CompactingMemStore#getImmutableSegments} is v.
1949   * 4. The snapshot thread stopping before {@link CompactingMemStore#swapPipelineWithNull}.
1950   * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
1951   *    {@link CompactionPipeline#version} is still v.
1952   * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
1953   *    {@link CompactionPipeline#version} is v, {@link CompactingMemStore#swapPipelineWithNull}
1954   *    thinks it is successful and continue flushing,but the {@link ImmutableSegment} in
1955   *    {@link CompactionPipeline} has changed because
1956   *    {@link CompactingMemStore#flattenOneSegment},so the {@link ImmutableSegment} is not
1957   *    removed in fact and still remaining in {@link CompactionPipeline}.
1958   *
1959   * After HBASE-26384, the 5-6 step is changed to following, which is expected behavior:
1960   * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
1961   *    {@link CompactingMemStore#flattenOneSegment} change {@link CompactionPipeline#version} to
1962   *    v+1.
1963   * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
1964   *    {@link CompactionPipeline#version} is v+1, {@link CompactingMemStore#swapPipelineWithNull}
1965   *    failed and retry the while loop in {@link CompactingMemStore#pushPipelineToSnapshot} once
1966   *    again, because there is no concurrent {@link CompactingMemStore#inMemoryCompaction} now,
1967   *    {@link CompactingMemStore#swapPipelineWithNull} succeeds.
1968   * </pre>
1969   */
1970  @Test
1971  public void testFlattenAndSnapshotCompactingMemStoreConcurrently() throws Exception {
1972    Configuration conf = HBaseConfiguration.create();
1973
1974    byte[] smallValue = new byte[3];
1975    byte[] largeValue = new byte[9];
1976    final long timestamp = EnvironmentEdgeManager.currentTime();
1977    final long seqId = 100;
1978    final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
1979    final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
1980    int smallCellByteSize = MutableSegment.getCellLength(smallCell);
1981    int largeCellByteSize = MutableSegment.getCellLength(largeCell);
1982    int totalCellByteSize = (smallCellByteSize + largeCellByteSize);
1983    int flushByteSize = totalCellByteSize - 2;
1984
1985    // set CompactingMemStore.inmemoryFlushSize to flushByteSize.
1986    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore4.class.getName());
1987    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
1988    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
1989
1990    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
1991        .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
1992
1993    MyCompactingMemStore4 myCompactingMemStore = ((MyCompactingMemStore4) store.memstore);
1994    assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
1995
1996    store.add(smallCell, new NonThreadSafeMemStoreSizing());
1997    store.add(largeCell, new NonThreadSafeMemStoreSizing());
1998
1999    String oldThreadName = Thread.currentThread().getName();
2000    try {
2001      Thread.currentThread().setName(MyCompactingMemStore4.TAKE_SNAPSHOT_THREAD_NAME);
2002      /**
2003       * {@link CompactingMemStore#snapshot} must wait the in memory compact thread enters
2004       * {@link CompactingMemStore#flattenOneSegment},because {@link CompactingMemStore#snapshot}
2005       * would invoke {@link CompactingMemStore#stopCompaction}.
2006       */
2007      myCompactingMemStore.snapShotStartCyclicCyclicBarrier.await();
2008
2009      MemStoreSnapshot memStoreSnapshot = myCompactingMemStore.snapshot();
2010      myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
2011
2012      assertTrue(memStoreSnapshot.getCellsCount() == 2);
2013      assertTrue(((int) (memStoreSnapshot.getDataSize())) == totalCellByteSize);
2014      VersionedSegmentsList segments = myCompactingMemStore.getImmutableSegments();
2015      assertTrue(segments.getNumOfSegments() == 0);
2016      assertTrue(segments.getNumOfCells() == 0);
2017      assertTrue(myCompactingMemStore.setInMemoryCompactionFlagCounter.get() == 1);
2018      assertTrue(myCompactingMemStore.swapPipelineWithNullCounter.get() == 2);
2019    } finally {
2020      Thread.currentThread().setName(oldThreadName);
2021    }
2022  }
2023
2024  /**
2025   * <pre>
2026   * This test is for HBASE-26384,
2027   * test {@link CompactingMemStore#flattenOneSegment}{@link CompactingMemStore#snapshot()}
2028   * and writeMemStore execute concurrently.
2029   * The threads sequence before HBASE-26384 is(The bug only exists for branch-2,and I add UTs
2030   * for both branch-2 and master):
2031   * 1. The {@link CompactingMemStore} size exceeds
2032   *    {@link CompactingMemStore#getInmemoryFlushSize()},the write thread adds a new
2033   *    {@link ImmutableSegment}  to the head of {@link CompactingMemStore#pipeline},and start a
2034   *    in memory compact thread to execute {@link CompactingMemStore#inMemoryCompaction}.
2035   * 2. The in memory compact thread starts and then stopping before
2036   *    {@link CompactingMemStore#flattenOneSegment}.
2037   * 3. The snapshot thread starts {@link CompactingMemStore#snapshot} concurrently,after the
2038   *    snapshot thread executing {@link CompactingMemStore#getImmutableSegments},the in memory
2039   *    compact thread continues.
2040   *    Assuming {@link VersionedSegmentsList#version} returned from
2041   *    {@link CompactingMemStore#getImmutableSegments} is v.
2042   * 4. The snapshot thread stopping before {@link CompactingMemStore#swapPipelineWithNull}.
2043   * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
2044   *    {@link CompactionPipeline#version} is still v.
2045   * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
2046   *    {@link CompactionPipeline#version} is v, {@link CompactingMemStore#swapPipelineWithNull}
2047   *    thinks it is successful and continue flushing,but the {@link ImmutableSegment} in
2048   *    {@link CompactionPipeline} has changed because
2049   *    {@link CompactingMemStore#flattenOneSegment},so the {@link ImmutableSegment} is not
2050   *    removed in fact and still remaining in {@link CompactionPipeline}.
2051   *
2052   * After HBASE-26384, the 5-6 step is changed to following, which is expected behavior,
2053   * and I add step 7-8 to test there is new segment added before retry.
2054   * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
2055   *    {@link CompactingMemStore#flattenOneSegment} change {@link CompactionPipeline#version} to
2056   *     v+1.
2057   * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
2058   *    {@link CompactionPipeline#version} is v+1, {@link CompactingMemStore#swapPipelineWithNull}
2059   *    failed and retry,{@link VersionedSegmentsList#version} returned from
2060   *    {@link CompactingMemStore#getImmutableSegments} is v+1.
2061   * 7. The write thread continues writing to {@link CompactingMemStore} and
2062   *    {@link CompactingMemStore} size exceeds {@link CompactingMemStore#getInmemoryFlushSize()},
2063   *    {@link CompactingMemStore#flushInMemory(MutableSegment)} is called and a new
2064   *    {@link ImmutableSegment} is added to the head of {@link CompactingMemStore#pipeline},
2065   *    {@link CompactionPipeline#version} is still v+1.
2066   * 8. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
2067   *    {@link CompactionPipeline#version} is still v+1,
2068   *    {@link CompactingMemStore#swapPipelineWithNull} succeeds.The new {@link ImmutableSegment}
2069   *    remained at the head of {@link CompactingMemStore#pipeline},the old is removed by
2070   *    {@link CompactingMemStore#swapPipelineWithNull}.
2071   * </pre>
2072   */
2073  @Test
2074  public void testFlattenSnapshotWriteCompactingMemeStoreConcurrently() throws Exception {
2075    Configuration conf = HBaseConfiguration.create();
2076
2077    byte[] smallValue = new byte[3];
2078    byte[] largeValue = new byte[9];
2079    final long timestamp = EnvironmentEdgeManager.currentTime();
2080    final long seqId = 100;
2081    final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
2082    final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
2083    int smallCellByteSize = MutableSegment.getCellLength(smallCell);
2084    int largeCellByteSize = MutableSegment.getCellLength(largeCell);
2085    int firstWriteCellByteSize = (smallCellByteSize + largeCellByteSize);
2086    int flushByteSize = firstWriteCellByteSize - 2;
2087
2088    // set CompactingMemStore.inmemoryFlushSize to flushByteSize.
2089    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore5.class.getName());
2090    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
2091    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
2092
2093    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
2094        .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
2095
2096    final MyCompactingMemStore5 myCompactingMemStore = ((MyCompactingMemStore5) store.memstore);
2097    assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
2098
2099    store.add(smallCell, new NonThreadSafeMemStoreSizing());
2100    store.add(largeCell, new NonThreadSafeMemStoreSizing());
2101
2102    final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
2103    final Cell writeAgainCell1 = createCell(qf3, timestamp, seqId + 1, largeValue);
2104    final Cell writeAgainCell2 = createCell(qf4, timestamp, seqId + 1, largeValue);
2105    final int writeAgainCellByteSize = MutableSegment.getCellLength(writeAgainCell1)
2106        + MutableSegment.getCellLength(writeAgainCell2);
2107    final Thread writeAgainThread = new Thread(() -> {
2108      try {
2109        myCompactingMemStore.writeMemStoreAgainStartCyclicBarrier.await();
2110
2111        store.add(writeAgainCell1, new NonThreadSafeMemStoreSizing());
2112        store.add(writeAgainCell2, new NonThreadSafeMemStoreSizing());
2113
2114        myCompactingMemStore.writeMemStoreAgainEndCyclicBarrier.await();
2115      } catch (Throwable exception) {
2116        exceptionRef.set(exception);
2117      }
2118    });
2119    writeAgainThread.setName(MyCompactingMemStore5.WRITE_AGAIN_THREAD_NAME);
2120    writeAgainThread.start();
2121
2122    String oldThreadName = Thread.currentThread().getName();
2123    try {
2124      Thread.currentThread().setName(MyCompactingMemStore5.TAKE_SNAPSHOT_THREAD_NAME);
2125      /**
2126       * {@link CompactingMemStore#snapshot} must wait the in memory compact thread enters
2127       * {@link CompactingMemStore#flattenOneSegment},because {@link CompactingMemStore#snapshot}
2128       * would invoke {@link CompactingMemStore#stopCompaction}.
2129       */
2130      myCompactingMemStore.snapShotStartCyclicCyclicBarrier.await();
2131      MemStoreSnapshot memStoreSnapshot = myCompactingMemStore.snapshot();
2132      myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
2133      writeAgainThread.join();
2134
2135      assertTrue(memStoreSnapshot.getCellsCount() == 2);
2136      assertTrue(((int) (memStoreSnapshot.getDataSize())) == firstWriteCellByteSize);
2137      VersionedSegmentsList segments = myCompactingMemStore.getImmutableSegments();
2138      assertTrue(segments.getNumOfSegments() == 1);
2139      assertTrue(
2140        ((int) (segments.getStoreSegments().get(0).getDataSize())) == writeAgainCellByteSize);
2141      assertTrue(segments.getNumOfCells() == 2);
2142      assertTrue(myCompactingMemStore.setInMemoryCompactionFlagCounter.get() == 2);
2143      assertTrue(exceptionRef.get() == null);
2144      assertTrue(myCompactingMemStore.swapPipelineWithNullCounter.get() == 2);
2145    } finally {
2146      Thread.currentThread().setName(oldThreadName);
2147    }
2148  }
2149
2150  /**
2151   * <pre>
2152   * This test is for HBASE-26465,
2153   * test {@link DefaultMemStore#clearSnapshot} and {@link DefaultMemStore#getScanners} execute
2154   * concurrently. The threads sequence before HBASE-26465 is:
2155   * 1.The flush thread starts {@link DefaultMemStore} flushing after some cells have be added to
2156   *  {@link DefaultMemStore}.
2157   * 2.The flush thread stopping before {@link DefaultMemStore#clearSnapshot} in
2158   *   {@link HStore#updateStorefiles} after completed flushing memStore to hfile.
2159   * 3.The scan thread starts and stopping after {@link DefaultMemStore#getSnapshotSegments} in
2160   *   {@link DefaultMemStore#getScanners},here the scan thread gets the
2161   *   {@link DefaultMemStore#snapshot} which is created by the flush thread.
2162   * 4.The flush thread continues {@link DefaultMemStore#clearSnapshot} and close
2163   *   {@link DefaultMemStore#snapshot},because the reference count of the corresponding
2164   *   {@link MemStoreLABImpl} is 0, the {@link Chunk}s in corresponding {@link MemStoreLABImpl}
2165   *   are recycled.
2166   * 5.The scan thread continues {@link DefaultMemStore#getScanners},and create a
2167   *   {@link SegmentScanner} for this {@link DefaultMemStore#snapshot}, and increase the
2168   *   reference count of the corresponding {@link MemStoreLABImpl}, but {@link Chunk}s in
2169   *   corresponding {@link MemStoreLABImpl} are recycled by step 4, and these {@link Chunk}s may
2170   *   be overwritten by other write threads,which may cause serious problem.
2171   * After HBASE-26465,{@link DefaultMemStore#getScanners} and
2172   * {@link DefaultMemStore#clearSnapshot} could not execute concurrently.
2173   * </pre>
2174   */
2175  @Test
2176  public void testClearSnapshotGetScannerConcurrently() throws Exception {
2177    Configuration conf = HBaseConfiguration.create();
2178
2179    byte[] smallValue = new byte[3];
2180    byte[] largeValue = new byte[9];
2181    final long timestamp = EnvironmentEdgeManager.currentTime();
2182    final long seqId = 100;
2183    final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
2184    final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
2185    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
2186    quals.add(qf1);
2187    quals.add(qf2);
2188
2189    conf.set(HStore.MEMSTORE_CLASS_NAME, MyDefaultMemStore.class.getName());
2190    conf.setBoolean(WALFactory.WAL_ENABLED, false);
2191
2192    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build());
2193    MyDefaultMemStore myDefaultMemStore = (MyDefaultMemStore) (store.memstore);
2194    myDefaultMemStore.store = store;
2195
2196    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
2197    store.add(smallCell, memStoreSizing);
2198    store.add(largeCell, memStoreSizing);
2199
2200    final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
2201    final Thread flushThread = new Thread(() -> {
2202      try {
2203        flushStore(store, id++);
2204      } catch (Throwable exception) {
2205        exceptionRef.set(exception);
2206      }
2207    });
2208    flushThread.setName(MyDefaultMemStore.FLUSH_THREAD_NAME);
2209    flushThread.start();
2210
2211    String oldThreadName = Thread.currentThread().getName();
2212    StoreScanner storeScanner = null;
2213    try {
2214      Thread.currentThread().setName(MyDefaultMemStore.GET_SCANNER_THREAD_NAME);
2215
2216      /**
2217       * Wait flush thread stopping before {@link DefaultMemStore#doClearSnapshot}
2218       */
2219      myDefaultMemStore.getScannerCyclicBarrier.await();
2220
2221      storeScanner = (StoreScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1);
2222      flushThread.join();
2223
2224      if (myDefaultMemStore.shouldWait) {
2225        SegmentScanner segmentScanner = getSegmentScanner(storeScanner);
2226        MemStoreLABImpl memStoreLAB = (MemStoreLABImpl) (segmentScanner.segment.getMemStoreLAB());
2227        assertTrue(memStoreLAB.isClosed());
2228        assertTrue(!memStoreLAB.chunks.isEmpty());
2229        assertTrue(!memStoreLAB.isReclaimed());
2230
2231        Cell cell1 = segmentScanner.next();
2232        CellUtil.equals(smallCell, cell1);
2233        Cell cell2 = segmentScanner.next();
2234        CellUtil.equals(largeCell, cell2);
2235        assertNull(segmentScanner.next());
2236      } else {
2237        List<Cell> results = new ArrayList<>();
2238        storeScanner.next(results);
2239        assertEquals(2, results.size());
2240        CellUtil.equals(smallCell, results.get(0));
2241        CellUtil.equals(largeCell, results.get(1));
2242      }
2243      assertTrue(exceptionRef.get() == null);
2244    } finally {
2245      if (storeScanner != null) {
2246        storeScanner.close();
2247      }
2248      Thread.currentThread().setName(oldThreadName);
2249    }
2250  }
2251
2252  private SegmentScanner getSegmentScanner(StoreScanner storeScanner) {
2253    List<SegmentScanner> segmentScanners = new ArrayList<SegmentScanner>();
2254    for (KeyValueScanner keyValueScanner : storeScanner.currentScanners) {
2255      if (keyValueScanner instanceof SegmentScanner) {
2256        segmentScanners.add((SegmentScanner) keyValueScanner);
2257      }
2258    }
2259
2260    assertTrue(segmentScanners.size() == 1);
2261    return segmentScanners.get(0);
2262  }
2263
2264  @Test 
2265  public void testOnConfigurationChange() throws IOException {
2266    final int COMMON_MAX_FILES_TO_COMPACT = 10;
2267    final int NEW_COMMON_MAX_FILES_TO_COMPACT = 8;
2268    final int STORE_MAX_FILES_TO_COMPACT = 6;
2269
2270    //Build a table that its maxFileToCompact different from common configuration.
2271    Configuration conf = HBaseConfiguration.create();
2272    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY,
2273      COMMON_MAX_FILES_TO_COMPACT);
2274    ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.newBuilder(family)
2275      .setConfiguration(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY,
2276        String.valueOf(STORE_MAX_FILES_TO_COMPACT)).build();
2277    init(this.name.getMethodName(), conf, hcd);
2278
2279    //After updating common configuration, the conf in HStore itself must not be changed.
2280    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY,
2281      NEW_COMMON_MAX_FILES_TO_COMPACT);
2282    this.store.onConfigurationChange(conf);
2283    assertEquals(STORE_MAX_FILES_TO_COMPACT,
2284      store.getStoreEngine().getCompactionPolicy().getConf().getMaxFilesToCompact());
2285  }
2286
2287  /**
2288   * This test is for HBASE-26476
2289   */
2290  @Test
2291  public void testExtendsDefaultMemStore() throws Exception {
2292    Configuration conf = HBaseConfiguration.create();
2293    conf.setBoolean(WALFactory.WAL_ENABLED, false);
2294
2295    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build());
2296    assertTrue(this.store.memstore.getClass() == DefaultMemStore.class);
2297    tearDown();
2298
2299    conf.set(HStore.MEMSTORE_CLASS_NAME, CustomDefaultMemStore.class.getName());
2300    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build());
2301    assertTrue(this.store.memstore.getClass() == CustomDefaultMemStore.class);
2302  }
2303
2304  static class CustomDefaultMemStore extends DefaultMemStore {
2305
2306    public CustomDefaultMemStore(Configuration conf, CellComparator c,
2307        RegionServicesForStores regionServices) {
2308      super(conf, c, regionServices);
2309    }
2310
2311  }
2312
2313  private HStoreFile mockStoreFileWithLength(long length) {
2314    HStoreFile sf = mock(HStoreFile.class);
2315    StoreFileReader sfr = mock(StoreFileReader.class);
2316    when(sf.isHFile()).thenReturn(true);
2317    when(sf.getReader()).thenReturn(sfr);
2318    when(sfr.length()).thenReturn(length);
2319    return sf;
2320  }
2321
2322  private static class MyThread extends Thread {
2323    private StoreScanner scanner;
2324    private KeyValueHeap heap;
2325
2326    public MyThread(StoreScanner scanner) {
2327      this.scanner = scanner;
2328    }
2329
2330    public KeyValueHeap getHeap() {
2331      return this.heap;
2332    }
2333
2334    @Override
2335    public void run() {
2336      scanner.trySwitchToStreamRead();
2337      heap = scanner.heap;
2338    }
2339  }
2340
2341  private static class MyMemStoreCompactor extends MemStoreCompactor {
2342    private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0);
2343    private static final CountDownLatch START_COMPACTOR_LATCH = new CountDownLatch(1);
2344    public MyMemStoreCompactor(CompactingMemStore compactingMemStore, MemoryCompactionPolicy
2345        compactionPolicy) throws IllegalArgumentIOException {
2346      super(compactingMemStore, compactionPolicy);
2347    }
2348
2349    @Override
2350    public boolean start() throws IOException {
2351      boolean isFirst = RUNNER_COUNT.getAndIncrement() == 0;
2352      if (isFirst) {
2353        try {
2354          START_COMPACTOR_LATCH.await();
2355          return super.start();
2356        } catch (InterruptedException ex) {
2357          throw new RuntimeException(ex);
2358        }
2359      }
2360      return super.start();
2361    }
2362  }
2363
2364  public static class MyCompactingMemStoreWithCustomCompactor extends CompactingMemStore {
2365    private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0);
2366    public MyCompactingMemStoreWithCustomCompactor(Configuration conf, CellComparatorImpl c,
2367        HStore store, RegionServicesForStores regionServices,
2368        MemoryCompactionPolicy compactionPolicy) throws IOException {
2369      super(conf, c, store, regionServices, compactionPolicy);
2370    }
2371
2372    @Override
2373    protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy)
2374        throws IllegalArgumentIOException {
2375      return new MyMemStoreCompactor(this, compactionPolicy);
2376    }
2377
2378    @Override
2379    protected boolean setInMemoryCompactionFlag() {
2380      boolean rval = super.setInMemoryCompactionFlag();
2381      if (rval) {
2382        RUNNER_COUNT.incrementAndGet();
2383        if (LOG.isDebugEnabled()) {
2384          LOG.debug("runner count: " + RUNNER_COUNT.get());
2385        }
2386      }
2387      return rval;
2388    }
2389  }
2390
2391  public static class MyCompactingMemStore extends CompactingMemStore {
2392    private static final AtomicBoolean START_TEST = new AtomicBoolean(false);
2393    private final CountDownLatch getScannerLatch = new CountDownLatch(1);
2394    private final CountDownLatch snapshotLatch = new CountDownLatch(1);
2395    public MyCompactingMemStore(Configuration conf, CellComparatorImpl c,
2396        HStore store, RegionServicesForStores regionServices,
2397        MemoryCompactionPolicy compactionPolicy) throws IOException {
2398      super(conf, c, store, regionServices, compactionPolicy);
2399    }
2400
2401    @Override
2402    protected List<KeyValueScanner> createList(int capacity) {
2403      if (START_TEST.get()) {
2404        try {
2405          getScannerLatch.countDown();
2406          snapshotLatch.await();
2407        } catch (InterruptedException e) {
2408          throw new RuntimeException(e);
2409        }
2410      }
2411      return new ArrayList<>(capacity);
2412    }
2413    @Override
2414    protected void pushActiveToPipeline(MutableSegment active, boolean checkEmpty) {
2415      if (START_TEST.get()) {
2416        try {
2417          getScannerLatch.await();
2418        } catch (InterruptedException e) {
2419          throw new RuntimeException(e);
2420        }
2421      }
2422
2423      super.pushActiveToPipeline(active, checkEmpty);
2424      if (START_TEST.get()) {
2425        snapshotLatch.countDown();
2426      }
2427    }
2428  }
2429
2430  interface MyListHook {
2431    void hook(int currentSize);
2432  }
2433
2434  private static class MyList<T> implements List<T> {
2435    private final List<T> delegatee = new ArrayList<>();
2436    private final MyListHook hookAtAdd;
2437    MyList(final MyListHook hookAtAdd) {
2438      this.hookAtAdd = hookAtAdd;
2439    }
2440    @Override
2441    public int size() {return delegatee.size();}
2442
2443    @Override
2444    public boolean isEmpty() {return delegatee.isEmpty();}
2445
2446    @Override
2447    public boolean contains(Object o) {return delegatee.contains(o);}
2448
2449    @Override
2450    public Iterator<T> iterator() {return delegatee.iterator();}
2451
2452    @Override
2453    public Object[] toArray() {return delegatee.toArray();}
2454
2455    @Override
2456    public <R> R[] toArray(R[] a) {return delegatee.toArray(a);}
2457
2458    @Override
2459    public boolean add(T e) {
2460      hookAtAdd.hook(size());
2461      return delegatee.add(e);
2462    }
2463
2464    @Override
2465    public boolean remove(Object o) {return delegatee.remove(o);}
2466
2467    @Override
2468    public boolean containsAll(Collection<?> c) {return delegatee.containsAll(c);}
2469
2470    @Override
2471    public boolean addAll(Collection<? extends T> c) {return delegatee.addAll(c);}
2472
2473    @Override
2474    public boolean addAll(int index, Collection<? extends T> c) {return delegatee.addAll(index, c);}
2475
2476    @Override
2477    public boolean removeAll(Collection<?> c) {return delegatee.removeAll(c);}
2478
2479    @Override
2480    public boolean retainAll(Collection<?> c) {return delegatee.retainAll(c);}
2481
2482    @Override
2483    public void clear() {delegatee.clear();}
2484
2485    @Override
2486    public T get(int index) {return delegatee.get(index);}
2487
2488    @Override
2489    public T set(int index, T element) {return delegatee.set(index, element);}
2490
2491    @Override
2492    public void add(int index, T element) {delegatee.add(index, element);}
2493
2494    @Override
2495    public T remove(int index) {return delegatee.remove(index);}
2496
2497    @Override
2498    public int indexOf(Object o) {return delegatee.indexOf(o);}
2499
2500    @Override
2501    public int lastIndexOf(Object o) {return delegatee.lastIndexOf(o);}
2502
2503    @Override
2504    public ListIterator<T> listIterator() {return delegatee.listIterator();}
2505
2506    @Override
2507    public ListIterator<T> listIterator(int index) {return delegatee.listIterator(index);}
2508
2509    @Override
2510    public List<T> subList(int fromIndex, int toIndex) {return delegatee.subList(fromIndex, toIndex);}
2511  }
2512
2513  public static class MyCompactingMemStore2 extends CompactingMemStore {
2514    private static final String LARGE_CELL_THREAD_NAME = "largeCellThread";
2515    private static final String SMALL_CELL_THREAD_NAME = "smallCellThread";
2516    private final CyclicBarrier preCyclicBarrier = new CyclicBarrier(2);
2517    private final CyclicBarrier postCyclicBarrier = new CyclicBarrier(2);
2518    private final AtomicInteger largeCellPreUpdateCounter = new AtomicInteger(0);
2519    private final AtomicInteger smallCellPreUpdateCounter = new AtomicInteger(0);
2520
2521    public MyCompactingMemStore2(Configuration conf, CellComparatorImpl cellComparator,
2522        HStore store, RegionServicesForStores regionServices,
2523        MemoryCompactionPolicy compactionPolicy) throws IOException {
2524      super(conf, cellComparator, store, regionServices, compactionPolicy);
2525    }
2526
2527    @Override
2528    protected boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd,
2529        MemStoreSizing memstoreSizing) {
2530      if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) {
2531        int currentCount = largeCellPreUpdateCounter.incrementAndGet();
2532        if (currentCount <= 1) {
2533          try {
2534            /**
2535             * smallCellThread enters CompactingMemStore.checkAndAddToActiveSize first, then
2536             * largeCellThread enters CompactingMemStore.checkAndAddToActiveSize, and then
2537             * largeCellThread invokes flushInMemory.
2538             */
2539            preCyclicBarrier.await();
2540          } catch (Throwable e) {
2541            throw new RuntimeException(e);
2542          }
2543        }
2544      }
2545
2546      boolean returnValue = super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing);
2547      if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) {
2548        try {
2549          preCyclicBarrier.await();
2550        } catch (Throwable e) {
2551          throw new RuntimeException(e);
2552        }
2553      }
2554      return returnValue;
2555    }
2556
2557    @Override
2558    protected void doAdd(MutableSegment currentActive, Cell cell, MemStoreSizing memstoreSizing) {
2559      if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) {
2560        try {
2561          /**
2562           * After largeCellThread finished flushInMemory method, smallCellThread can add cell to
2563           * currentActive . That is to say when largeCellThread called flushInMemory method,
2564           * currentActive has no cell.
2565           */
2566          postCyclicBarrier.await();
2567        } catch (Throwable e) {
2568          throw new RuntimeException(e);
2569        }
2570      }
2571      super.doAdd(currentActive, cell, memstoreSizing);
2572    }
2573
2574    @Override
2575    protected void flushInMemory(MutableSegment currentActiveMutableSegment) {
2576      super.flushInMemory(currentActiveMutableSegment);
2577      if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) {
2578        if (largeCellPreUpdateCounter.get() <= 1) {
2579          try {
2580            postCyclicBarrier.await();
2581          } catch (Throwable e) {
2582            throw new RuntimeException(e);
2583          }
2584        }
2585      }
2586    }
2587
2588  }
2589
2590  public static class MyCompactingMemStore3 extends CompactingMemStore {
2591    private static final String LARGE_CELL_THREAD_NAME = "largeCellThread";
2592    private static final String SMALL_CELL_THREAD_NAME = "smallCellThread";
2593
2594    private final CyclicBarrier preCyclicBarrier = new CyclicBarrier(2);
2595    private final CyclicBarrier postCyclicBarrier = new CyclicBarrier(2);
2596    private final AtomicInteger flushCounter = new AtomicInteger(0);
2597    private static final int CELL_COUNT = 5;
2598    private boolean flushByteSizeLessThanSmallAndLargeCellSize = true;
2599
2600    public MyCompactingMemStore3(Configuration conf, CellComparatorImpl cellComparator,
2601        HStore store, RegionServicesForStores regionServices,
2602        MemoryCompactionPolicy compactionPolicy) throws IOException {
2603      super(conf, cellComparator, store, regionServices, compactionPolicy);
2604    }
2605
2606    @Override
2607    protected boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd,
2608        MemStoreSizing memstoreSizing) {
2609      if (!flushByteSizeLessThanSmallAndLargeCellSize) {
2610        return super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing);
2611      }
2612      if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) {
2613        try {
2614          preCyclicBarrier.await();
2615        } catch (Throwable e) {
2616          throw new RuntimeException(e);
2617        }
2618      }
2619
2620      boolean returnValue = super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing);
2621      if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) {
2622        try {
2623          preCyclicBarrier.await();
2624        } catch (Throwable e) {
2625          throw new RuntimeException(e);
2626        }
2627      }
2628      return returnValue;
2629    }
2630
2631    @Override
2632    protected void postUpdate(MutableSegment currentActiveMutableSegment) {
2633      super.postUpdate(currentActiveMutableSegment);
2634      if (!flushByteSizeLessThanSmallAndLargeCellSize) {
2635        try {
2636          postCyclicBarrier.await();
2637        } catch (Throwable e) {
2638          throw new RuntimeException(e);
2639        }
2640        return;
2641      }
2642
2643      if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) {
2644        try {
2645          postCyclicBarrier.await();
2646        } catch (Throwable e) {
2647          throw new RuntimeException(e);
2648        }
2649      }
2650    }
2651
2652    @Override
2653    protected void flushInMemory(MutableSegment currentActiveMutableSegment) {
2654      super.flushInMemory(currentActiveMutableSegment);
2655      flushCounter.incrementAndGet();
2656      if (!flushByteSizeLessThanSmallAndLargeCellSize) {
2657        return;
2658      }
2659
2660      assertTrue(Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME));
2661      try {
2662        postCyclicBarrier.await();
2663      } catch (Throwable e) {
2664        throw new RuntimeException(e);
2665      }
2666
2667    }
2668
2669    void disableCompaction() {
2670      allowCompaction.set(false);
2671    }
2672
2673    void enableCompaction() {
2674      allowCompaction.set(true);
2675    }
2676
2677  }
2678
2679  public static class MyCompactingMemStore4 extends CompactingMemStore {
2680    private static final String TAKE_SNAPSHOT_THREAD_NAME = "takeSnapShotThread";
2681    /**
2682     * {@link CompactingMemStore#flattenOneSegment} must execute after
2683     * {@link CompactingMemStore#getImmutableSegments}
2684     */
2685    private final CyclicBarrier flattenOneSegmentPreCyclicBarrier = new CyclicBarrier(2);
2686    /**
2687     * Only after {@link CompactingMemStore#flattenOneSegment} completed,
2688     * {@link CompactingMemStore#swapPipelineWithNull} could execute.
2689     */
2690    private final CyclicBarrier flattenOneSegmentPostCyclicBarrier = new CyclicBarrier(2);
2691    /**
2692     * Only the in memory compact thread enters {@link CompactingMemStore#flattenOneSegment},the
2693     * snapshot thread starts {@link CompactingMemStore#snapshot},because
2694     * {@link CompactingMemStore#snapshot} would invoke {@link CompactingMemStore#stopCompaction}.
2695     */
2696    private final CyclicBarrier snapShotStartCyclicCyclicBarrier = new CyclicBarrier(2);
2697    /**
2698     * To wait for {@link CompactingMemStore.InMemoryCompactionRunnable} stopping.
2699     */
2700    private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2);
2701    private final AtomicInteger getImmutableSegmentsListCounter = new AtomicInteger(0);
2702    private final AtomicInteger swapPipelineWithNullCounter = new AtomicInteger(0);
2703    private final AtomicInteger flattenOneSegmentCounter = new AtomicInteger(0);
2704    private final AtomicInteger setInMemoryCompactionFlagCounter = new AtomicInteger(0);
2705
2706    public MyCompactingMemStore4(Configuration conf, CellComparatorImpl cellComparator,
2707        HStore store, RegionServicesForStores regionServices,
2708        MemoryCompactionPolicy compactionPolicy) throws IOException {
2709      super(conf, cellComparator, store, regionServices, compactionPolicy);
2710    }
2711
2712    @Override
2713    public VersionedSegmentsList getImmutableSegments() {
2714      VersionedSegmentsList result = super.getImmutableSegments();
2715      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
2716        int currentCount = getImmutableSegmentsListCounter.incrementAndGet();
2717        if (currentCount <= 1) {
2718          try {
2719            flattenOneSegmentPreCyclicBarrier.await();
2720          } catch (Throwable e) {
2721            throw new RuntimeException(e);
2722          }
2723        }
2724      }
2725      return result;
2726    }
2727
2728    @Override
2729    protected boolean swapPipelineWithNull(VersionedSegmentsList segments) {
2730      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
2731        int currentCount = swapPipelineWithNullCounter.incrementAndGet();
2732        if (currentCount <= 1) {
2733          try {
2734            flattenOneSegmentPostCyclicBarrier.await();
2735          } catch (Throwable e) {
2736            throw new RuntimeException(e);
2737          }
2738        }
2739      }
2740      boolean result = super.swapPipelineWithNull(segments);
2741      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
2742        int currentCount = swapPipelineWithNullCounter.get();
2743        if (currentCount <= 1) {
2744          assertTrue(!result);
2745        }
2746        if (currentCount == 2) {
2747          assertTrue(result);
2748        }
2749      }
2750      return result;
2751
2752    }
2753
2754    @Override
2755    public void flattenOneSegment(long requesterVersion, Action action) {
2756      int currentCount = flattenOneSegmentCounter.incrementAndGet();
2757      if (currentCount <= 1) {
2758        try {
2759          /**
2760           * {@link CompactingMemStore#snapshot} could start.
2761           */
2762          snapShotStartCyclicCyclicBarrier.await();
2763          flattenOneSegmentPreCyclicBarrier.await();
2764        } catch (Throwable e) {
2765          throw new RuntimeException(e);
2766        }
2767      }
2768      super.flattenOneSegment(requesterVersion, action);
2769      if (currentCount <= 1) {
2770        try {
2771          flattenOneSegmentPostCyclicBarrier.await();
2772        } catch (Throwable e) {
2773          throw new RuntimeException(e);
2774        }
2775      }
2776    }
2777
2778    @Override
2779    protected boolean setInMemoryCompactionFlag() {
2780      boolean result = super.setInMemoryCompactionFlag();
2781      assertTrue(result);
2782      setInMemoryCompactionFlagCounter.incrementAndGet();
2783      return result;
2784    }
2785
2786    @Override
2787    void inMemoryCompaction() {
2788      try {
2789        super.inMemoryCompaction();
2790      } finally {
2791        try {
2792          inMemoryCompactionEndCyclicBarrier.await();
2793        } catch (Throwable e) {
2794          throw new RuntimeException(e);
2795        }
2796
2797      }
2798    }
2799
2800  }
2801
2802  public static class MyCompactingMemStore5 extends CompactingMemStore {
2803    private static final String TAKE_SNAPSHOT_THREAD_NAME = "takeSnapShotThread";
2804    private static final String WRITE_AGAIN_THREAD_NAME = "writeAgainThread";
2805    /**
2806     * {@link CompactingMemStore#flattenOneSegment} must execute after
2807     * {@link CompactingMemStore#getImmutableSegments}
2808     */
2809    private final CyclicBarrier flattenOneSegmentPreCyclicBarrier = new CyclicBarrier(2);
2810    /**
2811     * Only after {@link CompactingMemStore#flattenOneSegment} completed,
2812     * {@link CompactingMemStore#swapPipelineWithNull} could execute.
2813     */
2814    private final CyclicBarrier flattenOneSegmentPostCyclicBarrier = new CyclicBarrier(2);
2815    /**
2816     * Only the in memory compact thread enters {@link CompactingMemStore#flattenOneSegment},the
2817     * snapshot thread starts {@link CompactingMemStore#snapshot},because
2818     * {@link CompactingMemStore#snapshot} would invoke {@link CompactingMemStore#stopCompaction}.
2819     */
2820    private final CyclicBarrier snapShotStartCyclicCyclicBarrier = new CyclicBarrier(2);
2821    /**
2822     * To wait for {@link CompactingMemStore.InMemoryCompactionRunnable} stopping.
2823     */
2824    private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2);
2825    private final AtomicInteger getImmutableSegmentsListCounter = new AtomicInteger(0);
2826    private final AtomicInteger swapPipelineWithNullCounter = new AtomicInteger(0);
2827    private final AtomicInteger flattenOneSegmentCounter = new AtomicInteger(0);
2828    private final AtomicInteger setInMemoryCompactionFlagCounter = new AtomicInteger(0);
2829    /**
2830     * Only the snapshot thread retry {@link CompactingMemStore#swapPipelineWithNull}, writeAgain
2831     * thread could start.
2832     */
2833    private final CyclicBarrier writeMemStoreAgainStartCyclicBarrier = new CyclicBarrier(2);
2834    /**
2835     * This is used for snapshot thread,writeAgain thread and in memory compact thread. Only the
2836     * writeAgain thread completes, {@link CompactingMemStore#swapPipelineWithNull} would
2837     * execute,and in memory compact thread would exit,because we expect that in memory compact
2838     * executing only once.
2839     */
2840    private final CyclicBarrier writeMemStoreAgainEndCyclicBarrier = new CyclicBarrier(3);
2841
2842    public MyCompactingMemStore5(Configuration conf, CellComparatorImpl cellComparator,
2843        HStore store, RegionServicesForStores regionServices,
2844        MemoryCompactionPolicy compactionPolicy) throws IOException {
2845      super(conf, cellComparator, store, regionServices, compactionPolicy);
2846    }
2847
2848    @Override
2849    public VersionedSegmentsList getImmutableSegments() {
2850      VersionedSegmentsList result = super.getImmutableSegments();
2851      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
2852        int currentCount = getImmutableSegmentsListCounter.incrementAndGet();
2853        if (currentCount <= 1) {
2854          try {
2855            flattenOneSegmentPreCyclicBarrier.await();
2856          } catch (Throwable e) {
2857            throw new RuntimeException(e);
2858          }
2859        }
2860
2861      }
2862
2863      return result;
2864    }
2865
2866    @Override
2867    protected boolean swapPipelineWithNull(VersionedSegmentsList segments) {
2868      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
2869        int currentCount = swapPipelineWithNullCounter.incrementAndGet();
2870        if (currentCount <= 1) {
2871          try {
2872            flattenOneSegmentPostCyclicBarrier.await();
2873          } catch (Throwable e) {
2874            throw new RuntimeException(e);
2875          }
2876        }
2877
2878        if (currentCount == 2) {
2879          try {
2880            /**
2881             * Only the snapshot thread retry {@link CompactingMemStore#swapPipelineWithNull},
2882             * writeAgain thread could start.
2883             */
2884            writeMemStoreAgainStartCyclicBarrier.await();
2885            /**
2886             * Only the writeAgain thread completes, retry
2887             * {@link CompactingMemStore#swapPipelineWithNull} would execute.
2888             */
2889            writeMemStoreAgainEndCyclicBarrier.await();
2890          } catch (Throwable e) {
2891            throw new RuntimeException(e);
2892          }
2893        }
2894
2895      }
2896      boolean result = super.swapPipelineWithNull(segments);
2897      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
2898        int currentCount = swapPipelineWithNullCounter.get();
2899        if (currentCount <= 1) {
2900          assertTrue(!result);
2901        }
2902        if (currentCount == 2) {
2903          assertTrue(result);
2904        }
2905      }
2906      return result;
2907
2908    }
2909
2910    @Override
2911    public void flattenOneSegment(long requesterVersion, Action action) {
2912      int currentCount = flattenOneSegmentCounter.incrementAndGet();
2913      if (currentCount <= 1) {
2914        try {
2915          /**
2916           * {@link CompactingMemStore#snapshot} could start.
2917           */
2918          snapShotStartCyclicCyclicBarrier.await();
2919          flattenOneSegmentPreCyclicBarrier.await();
2920        } catch (Throwable e) {
2921          throw new RuntimeException(e);
2922        }
2923      }
2924      super.flattenOneSegment(requesterVersion, action);
2925      if (currentCount <= 1) {
2926        try {
2927          flattenOneSegmentPostCyclicBarrier.await();
2928          /**
2929           * Only the writeAgain thread completes, in memory compact thread would exit,because we
2930           * expect that in memory compact executing only once.
2931           */
2932          writeMemStoreAgainEndCyclicBarrier.await();
2933        } catch (Throwable e) {
2934          throw new RuntimeException(e);
2935        }
2936
2937      }
2938    }
2939
2940    @Override
2941    protected boolean setInMemoryCompactionFlag() {
2942      boolean result = super.setInMemoryCompactionFlag();
2943      int count = setInMemoryCompactionFlagCounter.incrementAndGet();
2944      if (count <= 1) {
2945        assertTrue(result);
2946      }
2947      if (count == 2) {
2948        assertTrue(!result);
2949      }
2950      return result;
2951    }
2952
2953    @Override
2954    void inMemoryCompaction() {
2955      try {
2956        super.inMemoryCompaction();
2957      } finally {
2958        try {
2959          inMemoryCompactionEndCyclicBarrier.await();
2960        } catch (Throwable e) {
2961          throw new RuntimeException(e);
2962        }
2963
2964      }
2965    }
2966  }
2967
2968  public static class MyCompactingMemStore6 extends CompactingMemStore {
2969    private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2);
2970
2971    public MyCompactingMemStore6(Configuration conf, CellComparatorImpl cellComparator,
2972        HStore store, RegionServicesForStores regionServices,
2973        MemoryCompactionPolicy compactionPolicy) throws IOException {
2974      super(conf, cellComparator, store, regionServices, compactionPolicy);
2975    }
2976
2977    @Override
2978    void inMemoryCompaction() {
2979      try {
2980        super.inMemoryCompaction();
2981      } finally {
2982        try {
2983          inMemoryCompactionEndCyclicBarrier.await();
2984        } catch (Throwable e) {
2985          throw new RuntimeException(e);
2986        }
2987
2988      }
2989    }
2990  }
2991
2992  public static class MyDefaultMemStore extends DefaultMemStore {
2993    private static final String GET_SCANNER_THREAD_NAME = "getScannerMyThread";
2994    private static final String FLUSH_THREAD_NAME = "flushMyThread";
2995    /**
2996     * Only when flush thread enters {@link DefaultMemStore#doClearSnapShot}, getScanner thread
2997     * could start.
2998     */
2999    private final CyclicBarrier getScannerCyclicBarrier = new CyclicBarrier(2);
3000    /**
3001     * Used by getScanner thread notifies flush thread {@link DefaultMemStore#getSnapshotSegments}
3002     * completed, {@link DefaultMemStore#doClearSnapShot} could continue.
3003     */
3004    private final CyclicBarrier preClearSnapShotCyclicBarrier = new CyclicBarrier(2);
3005    /**
3006     * Used by flush thread notifies getScanner thread {@link DefaultMemStore#doClearSnapShot}
3007     * completed, {@link DefaultMemStore#getScanners} could continue.
3008     */
3009    private final CyclicBarrier postClearSnapShotCyclicBarrier = new CyclicBarrier(2);
3010    private final AtomicInteger getSnapshotSegmentsCounter = new AtomicInteger(0);
3011    private final AtomicInteger clearSnapshotCounter = new AtomicInteger(0);
3012    private volatile boolean shouldWait = true;
3013    private volatile HStore store = null;
3014
3015    public MyDefaultMemStore(Configuration conf, CellComparator cellComparator,
3016        RegionServicesForStores regionServices)
3017        throws IOException {
3018      super(conf, cellComparator, regionServices);
3019    }
3020
3021    @Override
3022    protected List<Segment> getSnapshotSegments() {
3023
3024      List<Segment> result = super.getSnapshotSegments();
3025
3026      if (Thread.currentThread().getName().equals(GET_SCANNER_THREAD_NAME)) {
3027        int currentCount = getSnapshotSegmentsCounter.incrementAndGet();
3028        if (currentCount == 1) {
3029          if (this.shouldWait) {
3030            try {
3031              /**
3032               * Notify flush thread {@link DefaultMemStore#getSnapshotSegments} completed,
3033               * {@link DefaultMemStore#doClearSnapShot} could continue.
3034               */
3035              preClearSnapShotCyclicBarrier.await();
3036              /**
3037               * Wait for {@link DefaultMemStore#doClearSnapShot} completed.
3038               */
3039              postClearSnapShotCyclicBarrier.await();
3040
3041            } catch (Throwable e) {
3042              throw new RuntimeException(e);
3043            }
3044          }
3045        }
3046      }
3047      return result;
3048    }
3049
3050
3051    @Override
3052    protected void doClearSnapShot() {
3053      if (Thread.currentThread().getName().equals(FLUSH_THREAD_NAME)) {
3054        int currentCount = clearSnapshotCounter.incrementAndGet();
3055        if (currentCount == 1) {
3056          try {
3057            if (store.lock.isWriteLockedByCurrentThread()) {
3058              shouldWait = false;
3059            }
3060            /**
3061             * Only when flush thread enters {@link DefaultMemStore#doClearSnapShot}, getScanner
3062             * thread could start.
3063             */
3064            getScannerCyclicBarrier.await();
3065
3066            if (shouldWait) {
3067              /**
3068               * Wait for {@link DefaultMemStore#getSnapshotSegments} completed.
3069               */
3070              preClearSnapShotCyclicBarrier.await();
3071            }
3072          } catch (Throwable e) {
3073            throw new RuntimeException(e);
3074          }
3075        }
3076      }
3077      super.doClearSnapShot();
3078
3079      if (Thread.currentThread().getName().equals(FLUSH_THREAD_NAME)) {
3080        int currentCount = clearSnapshotCounter.get();
3081        if (currentCount == 1) {
3082          if (shouldWait) {
3083            try {
3084              /**
3085               * Notify getScanner thread {@link DefaultMemStore#doClearSnapShot} completed,
3086               * {@link DefaultMemStore#getScanners} could continue.
3087               */
3088              postClearSnapShotCyclicBarrier.await();
3089            } catch (Throwable e) {
3090              throw new RuntimeException(e);
3091            }
3092          }
3093        }
3094      }
3095    }
3096
3097
3098  }
3099}