001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertNull;
024import static org.junit.Assert.assertTrue;
025import static org.junit.Assert.fail;
026import static org.mockito.ArgumentMatchers.any;
027import static org.mockito.Mockito.mock;
028import static org.mockito.Mockito.spy;
029import static org.mockito.Mockito.times;
030import static org.mockito.Mockito.verify;
031import static org.mockito.Mockito.when;
032
033import java.io.IOException;
034import java.lang.ref.SoftReference;
035import java.security.PrivilegedExceptionAction;
036import java.util.ArrayList;
037import java.util.Arrays;
038import java.util.Collection;
039import java.util.Collections;
040import java.util.Iterator;
041import java.util.List;
042import java.util.ListIterator;
043import java.util.NavigableSet;
044import java.util.TreeSet;
045import java.util.concurrent.ConcurrentSkipListSet;
046import java.util.concurrent.CountDownLatch;
047import java.util.concurrent.CyclicBarrier;
048import java.util.concurrent.ExecutorService;
049import java.util.concurrent.Executors;
050import java.util.concurrent.ThreadPoolExecutor;
051import java.util.concurrent.TimeUnit;
052import java.util.concurrent.atomic.AtomicBoolean;
053import java.util.concurrent.atomic.AtomicInteger;
054import java.util.concurrent.atomic.AtomicLong;
055import java.util.concurrent.atomic.AtomicReference;
056import java.util.concurrent.locks.ReentrantReadWriteLock;
057import java.util.function.Consumer;
058import java.util.function.IntBinaryOperator;
059import org.apache.hadoop.conf.Configuration;
060import org.apache.hadoop.fs.FSDataOutputStream;
061import org.apache.hadoop.fs.FileStatus;
062import org.apache.hadoop.fs.FileSystem;
063import org.apache.hadoop.fs.FilterFileSystem;
064import org.apache.hadoop.fs.LocalFileSystem;
065import org.apache.hadoop.fs.Path;
066import org.apache.hadoop.fs.permission.FsPermission;
067import org.apache.hadoop.hbase.Cell;
068import org.apache.hadoop.hbase.CellBuilderFactory;
069import org.apache.hadoop.hbase.CellBuilderType;
070import org.apache.hadoop.hbase.CellComparator;
071import org.apache.hadoop.hbase.CellComparatorImpl;
072import org.apache.hadoop.hbase.CellUtil;
073import org.apache.hadoop.hbase.HBaseClassTestRule;
074import org.apache.hadoop.hbase.HBaseConfiguration;
075import org.apache.hadoop.hbase.HBaseTestingUtility;
076import org.apache.hadoop.hbase.HConstants;
077import org.apache.hadoop.hbase.KeyValue;
078import org.apache.hadoop.hbase.MemoryCompactionPolicy;
079import org.apache.hadoop.hbase.NamespaceDescriptor;
080import org.apache.hadoop.hbase.PrivateCellUtil;
081import org.apache.hadoop.hbase.TableName;
082import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
083import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
084import org.apache.hadoop.hbase.client.Get;
085import org.apache.hadoop.hbase.client.RegionInfo;
086import org.apache.hadoop.hbase.client.RegionInfoBuilder;
087import org.apache.hadoop.hbase.client.Scan;
088import org.apache.hadoop.hbase.client.Scan.ReadType;
089import org.apache.hadoop.hbase.client.TableDescriptor;
090import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
091import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
092import org.apache.hadoop.hbase.filter.Filter;
093import org.apache.hadoop.hbase.filter.FilterBase;
094import org.apache.hadoop.hbase.io.compress.Compression;
095import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
096import org.apache.hadoop.hbase.io.hfile.CacheConfig;
097import org.apache.hadoop.hbase.io.hfile.HFile;
098import org.apache.hadoop.hbase.io.hfile.HFileContext;
099import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
100import org.apache.hadoop.hbase.monitoring.MonitoredTask;
101import org.apache.hadoop.hbase.nio.RefCnt;
102import org.apache.hadoop.hbase.quotas.RegionSizeStoreImpl;
103import org.apache.hadoop.hbase.regionserver.MemStoreCompactionStrategy.Action;
104import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
105import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
106import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
107import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
108import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
109import org.apache.hadoop.hbase.security.User;
110import org.apache.hadoop.hbase.testclassification.MediumTests;
111import org.apache.hadoop.hbase.testclassification.RegionServerTests;
112import org.apache.hadoop.hbase.util.BloomFilterUtil;
113import org.apache.hadoop.hbase.util.Bytes;
114import org.apache.hadoop.hbase.util.CommonFSUtils;
115import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
116import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
117import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
118import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
119import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
120import org.apache.hadoop.hbase.wal.WALFactory;
121import org.apache.hadoop.util.Progressable;
122import org.junit.After;
123import org.junit.AfterClass;
124import org.junit.Before;
125import org.junit.ClassRule;
126import org.junit.Rule;
127import org.junit.Test;
128import org.junit.experimental.categories.Category;
129import org.junit.rules.TestName;
130import org.mockito.Mockito;
131import org.slf4j.Logger;
132import org.slf4j.LoggerFactory;
133
134import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
135
136/**
137 * Test class for the HStore
138 */
139@Category({ RegionServerTests.class, MediumTests.class })
140public class TestHStore {
141
142  @ClassRule
143  public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestHStore.class);
144
145  private static final Logger LOG = LoggerFactory.getLogger(TestHStore.class);
146  @Rule
147  public TestName name = new TestName();
148
149  HRegion region;
150  HStore store;
151  byte[] table = Bytes.toBytes("table");
152  byte[] family = Bytes.toBytes("family");
153
154  byte[] row = Bytes.toBytes("row");
155  byte[] row2 = Bytes.toBytes("row2");
156  byte[] qf1 = Bytes.toBytes("qf1");
157  byte[] qf2 = Bytes.toBytes("qf2");
158  byte[] qf3 = Bytes.toBytes("qf3");
159  byte[] qf4 = Bytes.toBytes("qf4");
160  byte[] qf5 = Bytes.toBytes("qf5");
161  byte[] qf6 = Bytes.toBytes("qf6");
162
163  NavigableSet<byte[]> qualifiers = new ConcurrentSkipListSet<>(Bytes.BYTES_COMPARATOR);
164
165  List<Cell> expected = new ArrayList<>();
166  List<Cell> result = new ArrayList<>();
167
168  long id = EnvironmentEdgeManager.currentTime();
169  Get get = new Get(row);
170
171  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
172  private static final String DIR = TEST_UTIL.getDataTestDir("TestStore").toString();
173
174  @Before
175  public void setUp() throws IOException {
176    qualifiers.clear();
177    qualifiers.add(qf1);
178    qualifiers.add(qf3);
179    qualifiers.add(qf5);
180
181    Iterator<byte[]> iter = qualifiers.iterator();
182    while (iter.hasNext()) {
183      byte[] next = iter.next();
184      expected.add(new KeyValue(row, family, next, 1, (byte[]) null));
185      get.addColumn(family, next);
186    }
187  }
188
189  private void init(String methodName) throws IOException {
190    init(methodName, TEST_UTIL.getConfiguration());
191  }
192
193  private HStore init(String methodName, Configuration conf) throws IOException {
194    // some of the tests write 4 versions and then flush
195    // (with HBASE-4241, lower versions are collected on flush)
196    return init(methodName, conf,
197      ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(4).build());
198  }
199
200  private HStore init(String methodName, Configuration conf, ColumnFamilyDescriptor hcd)
201    throws IOException {
202    return init(methodName, conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), hcd);
203  }
204
205  private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder,
206    ColumnFamilyDescriptor hcd) throws IOException {
207    return init(methodName, conf, builder, hcd, null);
208  }
209
210  private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder,
211    ColumnFamilyDescriptor hcd, MyStoreHook hook) throws IOException {
212    return init(methodName, conf, builder, hcd, hook, false);
213  }
214
215  private void initHRegion(String methodName, Configuration conf, TableDescriptorBuilder builder,
216    ColumnFamilyDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException {
217    TableDescriptor htd = builder.setColumnFamily(hcd).build();
218    Path basedir = new Path(DIR + methodName);
219    Path tableDir = CommonFSUtils.getTableDir(basedir, htd.getTableName());
220    final Path logdir = new Path(basedir, AbstractFSWALProvider.getWALDirectoryName(methodName));
221
222    FileSystem fs = FileSystem.get(conf);
223
224    fs.delete(logdir, true);
225    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false,
226      MemStoreLABImpl.CHUNK_SIZE_DEFAULT, 1, 0, null,
227      MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
228    RegionInfo info = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
229    Configuration walConf = new Configuration(conf);
230    CommonFSUtils.setRootDir(walConf, basedir);
231    WALFactory wals = new WALFactory(walConf, methodName);
232    region = new HRegion(new HRegionFileSystem(conf, fs, tableDir, info), wals.getWAL(info), conf,
233      htd, null);
234    region.regionServicesForStores = Mockito.spy(region.regionServicesForStores);
235    ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
236    Mockito.when(region.regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool);
237  }
238
239  private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder,
240    ColumnFamilyDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException {
241    initHRegion(methodName, conf, builder, hcd, hook, switchToPread);
242    if (hook == null) {
243      store = new HStore(region, hcd, conf, false);
244    } else {
245      store = new MyStore(region, hcd, conf, hook, switchToPread);
246    }
247    region.stores.put(store.getColumnFamilyDescriptor().getName(), store);
248    return store;
249  }
250
251  /**
252   * Test we do not lose data if we fail a flush and then close. Part of HBase-10466
253   */
254  @Test
255  public void testFlushSizeSizing() throws Exception {
256    LOG.info("Setting up a faulty file system that cannot write in " + this.name.getMethodName());
257    final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
258    // Only retry once.
259    conf.setInt("hbase.hstore.flush.retries.number", 1);
260    User user = User.createUserForTesting(conf, this.name.getMethodName(), new String[] { "foo" });
261    // Inject our faulty LocalFileSystem
262    conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
263    user.runAs(new PrivilegedExceptionAction<Object>() {
264      @Override
265      public Object run() throws Exception {
266        // Make sure it worked (above is sensitive to caching details in hadoop core)
267        FileSystem fs = FileSystem.get(conf);
268        assertEquals(FaultyFileSystem.class, fs.getClass());
269        FaultyFileSystem ffs = (FaultyFileSystem) fs;
270
271        // Initialize region
272        init(name.getMethodName(), conf);
273
274        MemStoreSize mss = store.memstore.getFlushableSize();
275        assertEquals(0, mss.getDataSize());
276        LOG.info("Adding some data");
277        MemStoreSizing kvSize = new NonThreadSafeMemStoreSizing();
278        store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), kvSize);
279        // add the heap size of active (mutable) segment
280        kvSize.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0, 0);
281        mss = store.memstore.getFlushableSize();
282        assertEquals(kvSize.getMemStoreSize(), mss);
283        // Flush. Bug #1 from HBASE-10466. Make sure size calculation on failed flush is right.
284        try {
285          LOG.info("Flushing");
286          flushStore(store, id++);
287          fail("Didn't bubble up IOE!");
288        } catch (IOException ioe) {
289          assertTrue(ioe.getMessage().contains("Fault injected"));
290        }
291        // due to snapshot, change mutable to immutable segment
292        kvSize.incMemStoreSize(0,
293          CSLMImmutableSegment.DEEP_OVERHEAD_CSLM - MutableSegment.DEEP_OVERHEAD, 0, 0);
294        mss = store.memstore.getFlushableSize();
295        assertEquals(kvSize.getMemStoreSize(), mss);
296        MemStoreSizing kvSize2 = new NonThreadSafeMemStoreSizing();
297        store.add(new KeyValue(row, family, qf2, 2, (byte[]) null), kvSize2);
298        kvSize2.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0, 0);
299        // Even though we add a new kv, we expect the flushable size to be 'same' since we have
300        // not yet cleared the snapshot -- the above flush failed.
301        assertEquals(kvSize.getMemStoreSize(), mss);
302        ffs.fault.set(false);
303        flushStore(store, id++);
304        mss = store.memstore.getFlushableSize();
305        // Size should be the foreground kv size.
306        assertEquals(kvSize2.getMemStoreSize(), mss);
307        flushStore(store, id++);
308        mss = store.memstore.getFlushableSize();
309        assertEquals(0, mss.getDataSize());
310        assertEquals(MutableSegment.DEEP_OVERHEAD, mss.getHeapSize());
311        return null;
312      }
313    });
314  }
315
316  @Test
317  public void testStoreBloomFilterMetricsWithBloomRowCol() throws IOException {
318    int numStoreFiles = 5;
319    writeAndRead(BloomType.ROWCOL, numStoreFiles);
320
321    assertEquals(0, store.getBloomFilterEligibleRequestsCount());
322    // hard to know exactly the numbers here, we are just trying to
323    // prove that they are incrementing
324    assertTrue(store.getBloomFilterRequestsCount() >= numStoreFiles);
325    assertTrue(store.getBloomFilterNegativeResultsCount() > 0);
326  }
327
328  @Test
329  public void testStoreBloomFilterMetricsWithBloomRow() throws IOException {
330    int numStoreFiles = 5;
331    writeAndRead(BloomType.ROWCOL, numStoreFiles);
332
333    assertEquals(0, store.getBloomFilterEligibleRequestsCount());
334    // hard to know exactly the numbers here, we are just trying to
335    // prove that they are incrementing
336    assertTrue(store.getBloomFilterRequestsCount() >= numStoreFiles);
337    assertTrue(store.getBloomFilterNegativeResultsCount() > 0);
338  }
339
340  @Test
341  public void testStoreBloomFilterMetricsWithBloomRowPrefix() throws IOException {
342    int numStoreFiles = 5;
343    writeAndRead(BloomType.ROWPREFIX_FIXED_LENGTH, numStoreFiles);
344
345    assertEquals(0, store.getBloomFilterEligibleRequestsCount());
346    // hard to know exactly the numbers here, we are just trying to
347    // prove that they are incrementing
348    assertTrue(store.getBloomFilterRequestsCount() >= numStoreFiles);
349  }
350
351  @Test
352  public void testStoreBloomFilterMetricsWithBloomNone() throws IOException {
353    int numStoreFiles = 5;
354    writeAndRead(BloomType.NONE, numStoreFiles);
355
356    assertEquals(0, store.getBloomFilterRequestsCount());
357    assertEquals(0, store.getBloomFilterNegativeResultsCount());
358
359    // hard to know exactly the numbers here, we are just trying to
360    // prove that they are incrementing
361    assertTrue(store.getBloomFilterEligibleRequestsCount() >= numStoreFiles);
362  }
363
364  private void writeAndRead(BloomType bloomType, int numStoreFiles) throws IOException {
365    Configuration conf = HBaseConfiguration.create();
366    FileSystem fs = FileSystem.get(conf);
367
368    ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.newBuilder(family)
369      .setCompressionType(Compression.Algorithm.GZ).setBloomFilterType(bloomType)
370      .setConfiguration(BloomFilterUtil.PREFIX_LENGTH_KEY, "3").build();
371    init(name.getMethodName(), conf, hcd);
372
373    for (int i = 1; i <= numStoreFiles; i++) {
374      byte[] row = Bytes.toBytes("row" + i);
375      LOG.info("Adding some data for the store file #" + i);
376      long timeStamp = EnvironmentEdgeManager.currentTime();
377      this.store.add(new KeyValue(row, family, qf1, timeStamp, (byte[]) null), null);
378      this.store.add(new KeyValue(row, family, qf2, timeStamp, (byte[]) null), null);
379      this.store.add(new KeyValue(row, family, qf3, timeStamp, (byte[]) null), null);
380      flush(i);
381    }
382
383    // Verify the total number of store files
384    assertEquals(numStoreFiles, this.store.getStorefiles().size());
385
386    TreeSet<byte[]> columns = new TreeSet<>(Bytes.BYTES_COMPARATOR);
387    columns.add(qf1);
388
389    for (int i = 1; i <= numStoreFiles; i++) {
390      KeyValueScanner scanner =
391        store.getScanner(new Scan(new Get(Bytes.toBytes("row" + i))), columns, 0);
392      scanner.peek();
393    }
394  }
395
396  /**
397   * Verify that compression and data block encoding are respected by the createWriter method, used
398   * on store flush.
399   */
400  @Test
401  public void testCreateWriter() throws Exception {
402    Configuration conf = HBaseConfiguration.create();
403    FileSystem fs = FileSystem.get(conf);
404
405    ColumnFamilyDescriptor hcd =
406      ColumnFamilyDescriptorBuilder.newBuilder(family).setCompressionType(Compression.Algorithm.GZ)
407        .setDataBlockEncoding(DataBlockEncoding.DIFF).build();
408    init(name.getMethodName(), conf, hcd);
409
410    // Test createWriter
411    StoreFileWriter writer = store.getStoreEngine()
412      .createWriter(CreateStoreFileWriterParams.create().maxKeyCount(4)
413        .compression(hcd.getCompressionType()).isCompaction(false).includeMVCCReadpoint(true)
414        .includesTag(false).shouldDropBehind(false));
415    Path path = writer.getPath();
416    writer.append(new KeyValue(row, family, qf1, Bytes.toBytes(1)));
417    writer.append(new KeyValue(row, family, qf2, Bytes.toBytes(2)));
418    writer.append(new KeyValue(row2, family, qf1, Bytes.toBytes(3)));
419    writer.append(new KeyValue(row2, family, qf2, Bytes.toBytes(4)));
420    writer.close();
421
422    // Verify that compression and encoding settings are respected
423    HFile.Reader reader = HFile.createReader(fs, path, new CacheConfig(conf), true, conf);
424    assertEquals(hcd.getCompressionType(), reader.getTrailer().getCompressionCodec());
425    assertEquals(hcd.getDataBlockEncoding(), reader.getDataBlockEncoding());
426    reader.close();
427  }
428
429  @Test
430  public void testDeleteExpiredStoreFiles() throws Exception {
431    testDeleteExpiredStoreFiles(0);
432    testDeleteExpiredStoreFiles(1);
433  }
434
435  /**
436   * @param minVersions the MIN_VERSIONS for the column family
437   */
438  public void testDeleteExpiredStoreFiles(int minVersions) throws Exception {
439    int storeFileNum = 4;
440    int ttl = 4;
441    IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
442    EnvironmentEdgeManagerTestHelper.injectEdge(edge);
443
444    Configuration conf = HBaseConfiguration.create();
445    // Enable the expired store file deletion
446    conf.setBoolean("hbase.store.delete.expired.storefile", true);
447    // Set the compaction threshold higher to avoid normal compactions.
448    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 5);
449
450    init(name.getMethodName() + "-" + minVersions, conf, ColumnFamilyDescriptorBuilder
451      .newBuilder(family).setMinVersions(minVersions).setTimeToLive(ttl).build());
452
453    long storeTtl = this.store.getScanInfo().getTtl();
454    long sleepTime = storeTtl / storeFileNum;
455    long timeStamp;
456    // There are 4 store files and the max time stamp difference among these
457    // store files will be (this.store.ttl / storeFileNum)
458    for (int i = 1; i <= storeFileNum; i++) {
459      LOG.info("Adding some data for the store file #" + i);
460      timeStamp = EnvironmentEdgeManager.currentTime();
461      this.store.add(new KeyValue(row, family, qf1, timeStamp, (byte[]) null), null);
462      this.store.add(new KeyValue(row, family, qf2, timeStamp, (byte[]) null), null);
463      this.store.add(new KeyValue(row, family, qf3, timeStamp, (byte[]) null), null);
464      flush(i);
465      edge.incrementTime(sleepTime);
466    }
467
468    // Verify the total number of store files
469    assertEquals(storeFileNum, this.store.getStorefiles().size());
470
471    // Each call will find one expired store file and delete it before compaction happens.
472    // There will be no compaction due to threshold above. Last file will not be replaced.
473    for (int i = 1; i <= storeFileNum - 1; i++) {
474      // verify the expired store file.
475      assertFalse(this.store.requestCompaction().isPresent());
476      Collection<HStoreFile> sfs = this.store.getStorefiles();
477      // Ensure i files are gone.
478      if (minVersions == 0) {
479        assertEquals(storeFileNum - i, sfs.size());
480        // Ensure only non-expired files remain.
481        for (HStoreFile sf : sfs) {
482          assertTrue(sf.getReader().getMaxTimestamp() >= (edge.currentTime() - storeTtl));
483        }
484      } else {
485        assertEquals(storeFileNum, sfs.size());
486      }
487      // Let the next store file expired.
488      edge.incrementTime(sleepTime);
489    }
490    assertFalse(this.store.requestCompaction().isPresent());
491
492    Collection<HStoreFile> sfs = this.store.getStorefiles();
493    // Assert the last expired file is not removed.
494    if (minVersions == 0) {
495      assertEquals(1, sfs.size());
496    }
497    long ts = sfs.iterator().next().getReader().getMaxTimestamp();
498    assertTrue(ts < (edge.currentTime() - storeTtl));
499
500    for (HStoreFile sf : sfs) {
501      sf.closeStoreFile(true);
502    }
503  }
504
505  @Test
506  public void testLowestModificationTime() throws Exception {
507    Configuration conf = HBaseConfiguration.create();
508    FileSystem fs = FileSystem.get(conf);
509    // Initialize region
510    init(name.getMethodName(), conf);
511
512    int storeFileNum = 4;
513    for (int i = 1; i <= storeFileNum; i++) {
514      LOG.info("Adding some data for the store file #" + i);
515      this.store.add(new KeyValue(row, family, qf1, i, (byte[]) null), null);
516      this.store.add(new KeyValue(row, family, qf2, i, (byte[]) null), null);
517      this.store.add(new KeyValue(row, family, qf3, i, (byte[]) null), null);
518      flush(i);
519    }
520    // after flush; check the lowest time stamp
521    long lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
522    long lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
523    assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS);
524
525    // after compact; check the lowest time stamp
526    store.compact(store.requestCompaction().get(), NoLimitThroughputController.INSTANCE, null);
527    lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
528    lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
529    assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS);
530  }
531
532  private static long getLowestTimeStampFromFS(FileSystem fs,
533    final Collection<HStoreFile> candidates) throws IOException {
534    long minTs = Long.MAX_VALUE;
535    if (candidates.isEmpty()) {
536      return minTs;
537    }
538    Path[] p = new Path[candidates.size()];
539    int i = 0;
540    for (HStoreFile sf : candidates) {
541      p[i] = sf.getPath();
542      ++i;
543    }
544
545    FileStatus[] stats = fs.listStatus(p);
546    if (stats == null || stats.length == 0) {
547      return minTs;
548    }
549    for (FileStatus s : stats) {
550      minTs = Math.min(minTs, s.getModificationTime());
551    }
552    return minTs;
553  }
554
555  //////////////////////////////////////////////////////////////////////////////
556  // Get tests
557  //////////////////////////////////////////////////////////////////////////////
558
559  private static final int BLOCKSIZE_SMALL = 8192;
560
561  /**
562   * Test for hbase-1686.
563   */
564  @Test
565  public void testEmptyStoreFile() throws IOException {
566    init(this.name.getMethodName());
567    // Write a store file.
568    this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
569    this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null);
570    flush(1);
571    // Now put in place an empty store file. Its a little tricky. Have to
572    // do manually with hacked in sequence id.
573    HStoreFile f = this.store.getStorefiles().iterator().next();
574    Path storedir = f.getPath().getParent();
575    long seqid = f.getMaxSequenceId();
576    Configuration c = HBaseConfiguration.create();
577    FileSystem fs = FileSystem.get(c);
578    HFileContext meta = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build();
579    StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c), fs)
580      .withOutputDir(storedir).withFileContext(meta).build();
581    w.appendMetadata(seqid + 1, false);
582    w.close();
583    this.store.close();
584    // Reopen it... should pick up two files
585    this.store =
586      new HStore(this.store.getHRegion(), this.store.getColumnFamilyDescriptor(), c, false);
587    assertEquals(2, this.store.getStorefilesCount());
588
589    result = HBaseTestingUtility.getFromStoreFile(store, get.getRow(), qualifiers);
590    assertEquals(1, result.size());
591  }
592
593  /**
594   * Getting data from memstore only
595   */
596  @Test
597  public void testGet_FromMemStoreOnly() throws IOException {
598    init(this.name.getMethodName());
599
600    // Put data in memstore
601    this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
602    this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null);
603    this.store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null);
604    this.store.add(new KeyValue(row, family, qf4, 1, (byte[]) null), null);
605    this.store.add(new KeyValue(row, family, qf5, 1, (byte[]) null), null);
606    this.store.add(new KeyValue(row, family, qf6, 1, (byte[]) null), null);
607
608    // Get
609    result = HBaseTestingUtility.getFromStoreFile(store, get.getRow(), qualifiers);
610
611    // Compare
612    assertCheck();
613  }
614
615  @Test
616  public void testTimeRangeIfSomeCellsAreDroppedInFlush() throws IOException {
617    testTimeRangeIfSomeCellsAreDroppedInFlush(1);
618    testTimeRangeIfSomeCellsAreDroppedInFlush(3);
619    testTimeRangeIfSomeCellsAreDroppedInFlush(5);
620  }
621
622  private void testTimeRangeIfSomeCellsAreDroppedInFlush(int maxVersion) throws IOException {
623    init(this.name.getMethodName(), TEST_UTIL.getConfiguration(),
624      ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(maxVersion).build());
625    long currentTs = 100;
626    long minTs = currentTs;
627    // the extra cell won't be flushed to disk,
628    // so the min of timerange will be different between memStore and hfile.
629    for (int i = 0; i != (maxVersion + 1); ++i) {
630      this.store.add(new KeyValue(row, family, qf1, ++currentTs, (byte[]) null), null);
631      if (i == 1) {
632        minTs = currentTs;
633      }
634    }
635    flushStore(store, id++);
636
637    Collection<HStoreFile> files = store.getStorefiles();
638    assertEquals(1, files.size());
639    HStoreFile f = files.iterator().next();
640    f.initReader();
641    StoreFileReader reader = f.getReader();
642    assertEquals(minTs, reader.timeRange.getMin());
643    assertEquals(currentTs, reader.timeRange.getMax());
644  }
645
646  /**
647   * Getting data from files only
648   */
649  @Test
650  public void testGet_FromFilesOnly() throws IOException {
651    init(this.name.getMethodName());
652
653    // Put data in memstore
654    this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
655    this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null);
656    // flush
657    flush(1);
658
659    // Add more data
660    this.store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null);
661    this.store.add(new KeyValue(row, family, qf4, 1, (byte[]) null), null);
662    // flush
663    flush(2);
664
665    // Add more data
666    this.store.add(new KeyValue(row, family, qf5, 1, (byte[]) null), null);
667    this.store.add(new KeyValue(row, family, qf6, 1, (byte[]) null), null);
668    // flush
669    flush(3);
670
671    // Get
672    result = HBaseTestingUtility.getFromStoreFile(store, get.getRow(), qualifiers);
673    // this.store.get(get, qualifiers, result);
674
675    // Need to sort the result since multiple files
676    Collections.sort(result, CellComparatorImpl.COMPARATOR);
677
678    // Compare
679    assertCheck();
680  }
681
682  /**
683   * Getting data from memstore and files
684   */
685  @Test
686  public void testGet_FromMemStoreAndFiles() throws IOException {
687    init(this.name.getMethodName());
688
689    // Put data in memstore
690    this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
691    this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null);
692    // flush
693    flush(1);
694
695    // Add more data
696    this.store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null);
697    this.store.add(new KeyValue(row, family, qf4, 1, (byte[]) null), null);
698    // flush
699    flush(2);
700
701    // Add more data
702    this.store.add(new KeyValue(row, family, qf5, 1, (byte[]) null), null);
703    this.store.add(new KeyValue(row, family, qf6, 1, (byte[]) null), null);
704
705    // Get
706    result = HBaseTestingUtility.getFromStoreFile(store, get.getRow(), qualifiers);
707
708    // Need to sort the result since multiple files
709    Collections.sort(result, CellComparatorImpl.COMPARATOR);
710
711    // Compare
712    assertCheck();
713  }
714
715  private void flush(int storeFilessize) throws IOException {
716    flushStore(store, id++);
717    assertEquals(storeFilessize, this.store.getStorefiles().size());
718    assertEquals(0, ((AbstractMemStore) this.store.memstore).getActive().getCellsCount());
719  }
720
721  private void assertCheck() {
722    assertEquals(expected.size(), result.size());
723    for (int i = 0; i < expected.size(); i++) {
724      assertEquals(expected.get(i), result.get(i));
725    }
726  }
727
728  @After
729  public void tearDown() throws Exception {
730    EnvironmentEdgeManagerTestHelper.reset();
731    if (store != null) {
732      try {
733        store.close();
734      } catch (IOException e) {
735      }
736      store = null;
737    }
738    if (region != null) {
739      region.close();
740      region = null;
741    }
742  }
743
744  @AfterClass
745  public static void tearDownAfterClass() throws IOException {
746    TEST_UTIL.cleanupTestDir();
747  }
748
749  @Test
750  public void testHandleErrorsInFlush() throws Exception {
751    LOG.info("Setting up a faulty file system that cannot write");
752
753    final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
754    User user = User.createUserForTesting(conf, "testhandleerrorsinflush", new String[] { "foo" });
755    // Inject our faulty LocalFileSystem
756    conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
757    user.runAs(new PrivilegedExceptionAction<Object>() {
758      @Override
759      public Object run() throws Exception {
760        // Make sure it worked (above is sensitive to caching details in hadoop core)
761        FileSystem fs = FileSystem.get(conf);
762        assertEquals(FaultyFileSystem.class, fs.getClass());
763
764        // Initialize region
765        init(name.getMethodName(), conf);
766
767        LOG.info("Adding some data");
768        store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
769        store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null);
770        store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null);
771
772        LOG.info("Before flush, we should have no files");
773
774        Collection<StoreFileInfo> files =
775          store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName());
776        assertEquals(0, files != null ? files.size() : 0);
777
778        // flush
779        try {
780          LOG.info("Flushing");
781          flush(1);
782          fail("Didn't bubble up IOE!");
783        } catch (IOException ioe) {
784          assertTrue(ioe.getMessage().contains("Fault injected"));
785        }
786
787        LOG.info("After failed flush, we should still have no files!");
788        files = store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName());
789        assertEquals(0, files != null ? files.size() : 0);
790        store.getHRegion().getWAL().close();
791        return null;
792      }
793    });
794    FileSystem.closeAllForUGI(user.getUGI());
795  }
796
797  /**
798   * Faulty file system that will fail if you write past its fault position the FIRST TIME only;
799   * thereafter it will succeed. Used by {@link TestHRegion} too.
800   */
801  static class FaultyFileSystem extends FilterFileSystem {
802    List<SoftReference<FaultyOutputStream>> outStreams = new ArrayList<>();
803    private long faultPos = 200;
804    AtomicBoolean fault = new AtomicBoolean(true);
805
806    public FaultyFileSystem() {
807      super(new LocalFileSystem());
808      LOG.info("Creating faulty!");
809    }
810
811    @Override
812    public FSDataOutputStream create(Path p) throws IOException {
813      return new FaultyOutputStream(super.create(p), faultPos, fault);
814    }
815
816    @Override
817    public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite,
818      int bufferSize, short replication, long blockSize, Progressable progress) throws IOException {
819      return new FaultyOutputStream(
820        super.create(f, permission, overwrite, bufferSize, replication, blockSize, progress),
821        faultPos, fault);
822    }
823
824    @Override
825    public FSDataOutputStream createNonRecursive(Path f, boolean overwrite, int bufferSize,
826      short replication, long blockSize, Progressable progress) throws IOException {
827      // Fake it. Call create instead. The default implementation throws an IOE
828      // that this is not supported.
829      return create(f, overwrite, bufferSize, replication, blockSize, progress);
830    }
831  }
832
833  static class FaultyOutputStream extends FSDataOutputStream {
834    volatile long faultPos = Long.MAX_VALUE;
835    private final AtomicBoolean fault;
836
837    public FaultyOutputStream(FSDataOutputStream out, long faultPos, final AtomicBoolean fault)
838      throws IOException {
839      super(out, null);
840      this.faultPos = faultPos;
841      this.fault = fault;
842    }
843
844    @Override
845    public synchronized void write(byte[] buf, int offset, int length) throws IOException {
846      LOG.info("faulty stream write at pos " + getPos());
847      injectFault();
848      super.write(buf, offset, length);
849    }
850
851    private void injectFault() throws IOException {
852      if (this.fault.get() && getPos() >= faultPos) {
853        throw new IOException("Fault injected");
854      }
855    }
856  }
857
858  private static StoreFlushContext flushStore(HStore store, long id) throws IOException {
859    StoreFlushContext storeFlushCtx = store.createFlushContext(id, FlushLifeCycleTracker.DUMMY);
860    storeFlushCtx.prepare();
861    storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
862    storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
863    return storeFlushCtx;
864  }
865
866  /**
867   * Generate a list of KeyValues for testing based on given parameters
868   * @return the rows key-value list
869   */
870  private List<Cell> getKeyValueSet(long[] timestamps, int numRows, byte[] qualifier,
871    byte[] family) {
872    List<Cell> kvList = new ArrayList<>();
873    for (int i = 1; i <= numRows; i++) {
874      byte[] b = Bytes.toBytes(i);
875      for (long timestamp : timestamps) {
876        kvList.add(new KeyValue(b, family, qualifier, timestamp, b));
877      }
878    }
879    return kvList;
880  }
881
882  /**
883   * Test to ensure correctness when using Stores with multiple timestamps
884   */
885  @Test
886  public void testMultipleTimestamps() throws IOException {
887    int numRows = 1;
888    long[] timestamps1 = new long[] { 1, 5, 10, 20 };
889    long[] timestamps2 = new long[] { 30, 80 };
890
891    init(this.name.getMethodName());
892
893    List<Cell> kvList1 = getKeyValueSet(timestamps1, numRows, qf1, family);
894    for (Cell kv : kvList1) {
895      this.store.add(kv, null);
896    }
897
898    flushStore(store, id++);
899
900    List<Cell> kvList2 = getKeyValueSet(timestamps2, numRows, qf1, family);
901    for (Cell kv : kvList2) {
902      this.store.add(kv, null);
903    }
904
905    List<Cell> result;
906    Get get = new Get(Bytes.toBytes(1));
907    get.addColumn(family, qf1);
908
909    get.setTimeRange(0, 15);
910    result = HBaseTestingUtility.getFromStoreFile(store, get);
911    assertTrue(result.size() > 0);
912
913    get.setTimeRange(40, 90);
914    result = HBaseTestingUtility.getFromStoreFile(store, get);
915    assertTrue(result.size() > 0);
916
917    get.setTimeRange(10, 45);
918    result = HBaseTestingUtility.getFromStoreFile(store, get);
919    assertTrue(result.size() > 0);
920
921    get.setTimeRange(80, 145);
922    result = HBaseTestingUtility.getFromStoreFile(store, get);
923    assertTrue(result.size() > 0);
924
925    get.setTimeRange(1, 2);
926    result = HBaseTestingUtility.getFromStoreFile(store, get);
927    assertTrue(result.size() > 0);
928
929    get.setTimeRange(90, 200);
930    result = HBaseTestingUtility.getFromStoreFile(store, get);
931    assertTrue(result.size() == 0);
932  }
933
934  /**
935   * Test for HBASE-3492 - Test split on empty colfam (no store files).
936   * @throws IOException When the IO operations fail.
937   */
938  @Test
939  public void testSplitWithEmptyColFam() throws IOException {
940    init(this.name.getMethodName());
941    assertFalse(store.getSplitPoint().isPresent());
942  }
943
944  @Test
945  public void testStoreUsesConfigurationFromHcdAndHtd() throws Exception {
946    final String CONFIG_KEY = "hbase.regionserver.thread.compaction.throttle";
947    long anyValue = 10;
948
949    // We'll check that it uses correct config and propagates it appropriately by going thru
950    // the simplest "real" path I can find - "throttleCompaction", which just checks whether
951    // a number we pass in is higher than some config value, inside compactionPolicy.
952    Configuration conf = HBaseConfiguration.create();
953    conf.setLong(CONFIG_KEY, anyValue);
954    init(name.getMethodName() + "-xml", conf);
955    assertTrue(store.throttleCompaction(anyValue + 1));
956    assertFalse(store.throttleCompaction(anyValue));
957
958    // HTD overrides XML.
959    --anyValue;
960    init(
961      name.getMethodName() + "-htd", conf, TableDescriptorBuilder
962        .newBuilder(TableName.valueOf(table)).setValue(CONFIG_KEY, Long.toString(anyValue)),
963      ColumnFamilyDescriptorBuilder.of(family));
964    assertTrue(store.throttleCompaction(anyValue + 1));
965    assertFalse(store.throttleCompaction(anyValue));
966
967    // HCD overrides them both.
968    --anyValue;
969    init(name.getMethodName() + "-hcd", conf,
970      TableDescriptorBuilder.newBuilder(TableName.valueOf(table)).setValue(CONFIG_KEY,
971        Long.toString(anyValue)),
972      ColumnFamilyDescriptorBuilder.newBuilder(family).setValue(CONFIG_KEY, Long.toString(anyValue))
973        .build());
974    assertTrue(store.throttleCompaction(anyValue + 1));
975    assertFalse(store.throttleCompaction(anyValue));
976  }
977
978  public static class DummyStoreEngine extends DefaultStoreEngine {
979    public static DefaultCompactor lastCreatedCompactor = null;
980
981    @Override
982    protected void createComponents(Configuration conf, HStore store, CellComparator comparator)
983      throws IOException {
984      super.createComponents(conf, store, comparator);
985      lastCreatedCompactor = this.compactor;
986    }
987  }
988
989  @Test
990  public void testStoreUsesSearchEngineOverride() throws Exception {
991    Configuration conf = HBaseConfiguration.create();
992    conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DummyStoreEngine.class.getName());
993    init(this.name.getMethodName(), conf);
994    assertEquals(DummyStoreEngine.lastCreatedCompactor, this.store.storeEngine.getCompactor());
995  }
996
997  private void addStoreFile() throws IOException {
998    HStoreFile f = this.store.getStorefiles().iterator().next();
999    Path storedir = f.getPath().getParent();
1000    long seqid = this.store.getMaxSequenceId().orElse(0L);
1001    Configuration c = TEST_UTIL.getConfiguration();
1002    FileSystem fs = FileSystem.get(c);
1003    HFileContext fileContext = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build();
1004    StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c), fs)
1005      .withOutputDir(storedir).withFileContext(fileContext).build();
1006    w.appendMetadata(seqid + 1, false);
1007    w.close();
1008    LOG.info("Added store file:" + w.getPath());
1009  }
1010
1011  private void archiveStoreFile(int index) throws IOException {
1012    Collection<HStoreFile> files = this.store.getStorefiles();
1013    HStoreFile sf = null;
1014    Iterator<HStoreFile> it = files.iterator();
1015    for (int i = 0; i <= index; i++) {
1016      sf = it.next();
1017    }
1018    store.getRegionFileSystem().removeStoreFiles(store.getColumnFamilyName(),
1019      Lists.newArrayList(sf));
1020  }
1021
1022  private void closeCompactedFile(int index) throws IOException {
1023    Collection<HStoreFile> files =
1024      this.store.getStoreEngine().getStoreFileManager().getCompactedfiles();
1025    if (files.size() > 0) {
1026      HStoreFile sf = null;
1027      Iterator<HStoreFile> it = files.iterator();
1028      for (int i = 0; i <= index; i++) {
1029        sf = it.next();
1030      }
1031      sf.closeStoreFile(true);
1032      store.getStoreEngine().getStoreFileManager()
1033        .removeCompactedFiles(Collections.singletonList(sf));
1034    }
1035  }
1036
1037  @Test
1038  public void testRefreshStoreFiles() throws Exception {
1039    init(name.getMethodName());
1040
1041    assertEquals(0, this.store.getStorefilesCount());
1042
1043    // Test refreshing store files when no store files are there
1044    store.refreshStoreFiles();
1045    assertEquals(0, this.store.getStorefilesCount());
1046
1047    // add some data, flush
1048    this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
1049    flush(1);
1050    assertEquals(1, this.store.getStorefilesCount());
1051
1052    // add one more file
1053    addStoreFile();
1054
1055    assertEquals(1, this.store.getStorefilesCount());
1056    store.refreshStoreFiles();
1057    assertEquals(2, this.store.getStorefilesCount());
1058
1059    // add three more files
1060    addStoreFile();
1061    addStoreFile();
1062    addStoreFile();
1063
1064    assertEquals(2, this.store.getStorefilesCount());
1065    store.refreshStoreFiles();
1066    assertEquals(5, this.store.getStorefilesCount());
1067
1068    closeCompactedFile(0);
1069    archiveStoreFile(0);
1070
1071    assertEquals(5, this.store.getStorefilesCount());
1072    store.refreshStoreFiles();
1073    assertEquals(4, this.store.getStorefilesCount());
1074
1075    archiveStoreFile(0);
1076    archiveStoreFile(1);
1077    archiveStoreFile(2);
1078
1079    assertEquals(4, this.store.getStorefilesCount());
1080    store.refreshStoreFiles();
1081    assertEquals(1, this.store.getStorefilesCount());
1082
1083    archiveStoreFile(0);
1084    store.refreshStoreFiles();
1085    assertEquals(0, this.store.getStorefilesCount());
1086  }
1087
1088  @Test
1089  public void testRefreshStoreFilesNotChanged() throws IOException {
1090    init(name.getMethodName());
1091
1092    assertEquals(0, this.store.getStorefilesCount());
1093
1094    // add some data, flush
1095    this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
1096    flush(1);
1097    // add one more file
1098    addStoreFile();
1099
1100    StoreEngine<?, ?, ?, ?> spiedStoreEngine = spy(store.getStoreEngine());
1101
1102    // call first time after files changed
1103    spiedStoreEngine.refreshStoreFiles();
1104    assertEquals(2, this.store.getStorefilesCount());
1105    verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any(), any(), any());
1106
1107    // call second time
1108    spiedStoreEngine.refreshStoreFiles();
1109
1110    // ensure that replaceStoreFiles is not called, i.e, the times does not change, if files are not
1111    // refreshed,
1112    verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any(), any(), any());
1113  }
1114
1115  private long countMemStoreScanner(StoreScanner scanner) {
1116    if (scanner.currentScanners == null) {
1117      return 0;
1118    }
1119    return scanner.currentScanners.stream().filter(s -> !s.isFileScanner()).count();
1120  }
1121
1122  @Test
1123  public void testNumberOfMemStoreScannersAfterFlush() throws IOException {
1124    long seqId = 100;
1125    long timestamp = EnvironmentEdgeManager.currentTime();
1126    Cell cell0 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family)
1127      .setQualifier(qf1).setTimestamp(timestamp).setType(Cell.Type.Put).setValue(qf1).build();
1128    PrivateCellUtil.setSequenceId(cell0, seqId);
1129    testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Collections.emptyList());
1130
1131    Cell cell1 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family)
1132      .setQualifier(qf2).setTimestamp(timestamp).setType(Cell.Type.Put).setValue(qf1).build();
1133    PrivateCellUtil.setSequenceId(cell1, seqId);
1134    testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1));
1135
1136    seqId = 101;
1137    timestamp = EnvironmentEdgeManager.currentTime();
1138    Cell cell2 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row2).setFamily(family)
1139      .setQualifier(qf2).setTimestamp(timestamp).setType(Cell.Type.Put).setValue(qf1).build();
1140    PrivateCellUtil.setSequenceId(cell2, seqId);
1141    testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1, cell2));
1142  }
1143
1144  private void testNumberOfMemStoreScannersAfterFlush(List<Cell> inputCellsBeforeSnapshot,
1145    List<Cell> inputCellsAfterSnapshot) throws IOException {
1146    init(this.name.getMethodName() + "-" + inputCellsBeforeSnapshot.size());
1147    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1148    long seqId = Long.MIN_VALUE;
1149    for (Cell c : inputCellsBeforeSnapshot) {
1150      quals.add(CellUtil.cloneQualifier(c));
1151      seqId = Math.max(seqId, c.getSequenceId());
1152    }
1153    for (Cell c : inputCellsAfterSnapshot) {
1154      quals.add(CellUtil.cloneQualifier(c));
1155      seqId = Math.max(seqId, c.getSequenceId());
1156    }
1157    inputCellsBeforeSnapshot.forEach(c -> store.add(c, null));
1158    StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY);
1159    storeFlushCtx.prepare();
1160    inputCellsAfterSnapshot.forEach(c -> store.add(c, null));
1161    int numberOfMemScannersBeforeFlush = inputCellsAfterSnapshot.isEmpty() ? 1 : 2;
1162    try (StoreScanner s = (StoreScanner) store.getScanner(new Scan(), quals, seqId)) {
1163      // snapshot + active (if inputCellsAfterSnapshot isn't empty)
1164      assertEquals(numberOfMemScannersBeforeFlush, countMemStoreScanner(s));
1165      storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1166      storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1167      // snapshot has no data after flush
1168      int numberOfMemScannersAfterFlush = inputCellsAfterSnapshot.isEmpty() ? 0 : 1;
1169      boolean more;
1170      int cellCount = 0;
1171      do {
1172        List<Cell> cells = new ArrayList<>();
1173        more = s.next(cells);
1174        cellCount += cells.size();
1175        assertEquals(more ? numberOfMemScannersAfterFlush : 0, countMemStoreScanner(s));
1176      } while (more);
1177      assertEquals(
1178        "The number of cells added before snapshot is " + inputCellsBeforeSnapshot.size()
1179          + ", The number of cells added after snapshot is " + inputCellsAfterSnapshot.size(),
1180        inputCellsBeforeSnapshot.size() + inputCellsAfterSnapshot.size(), cellCount);
1181      // the current scanners is cleared
1182      assertEquals(0, countMemStoreScanner(s));
1183    }
1184  }
1185
1186  private Cell createCell(byte[] qualifier, long ts, long sequenceId, byte[] value)
1187    throws IOException {
1188    return createCell(row, qualifier, ts, sequenceId, value);
1189  }
1190
1191  private Cell createCell(byte[] row, byte[] qualifier, long ts, long sequenceId, byte[] value)
1192    throws IOException {
1193    Cell c = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family)
1194      .setQualifier(qualifier).setTimestamp(ts).setType(Cell.Type.Put).setValue(value).build();
1195    PrivateCellUtil.setSequenceId(c, sequenceId);
1196    return c;
1197  }
1198
1199  @Test
1200  public void testFlushBeforeCompletingScanWoFilter() throws IOException, InterruptedException {
1201    final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false);
1202    final int expectedSize = 3;
1203    testFlushBeforeCompletingScan(new MyListHook() {
1204      @Override
1205      public void hook(int currentSize) {
1206        if (currentSize == expectedSize - 1) {
1207          try {
1208            flushStore(store, id++);
1209            timeToGoNextRow.set(true);
1210          } catch (IOException e) {
1211            throw new RuntimeException(e);
1212          }
1213        }
1214      }
1215    }, new FilterBase() {
1216      @Override
1217      public Filter.ReturnCode filterCell(final Cell c) throws IOException {
1218        return ReturnCode.INCLUDE;
1219      }
1220    }, expectedSize);
1221  }
1222
1223  @Test
1224  public void testFlushBeforeCompletingScanWithFilter() throws IOException, InterruptedException {
1225    final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false);
1226    final int expectedSize = 2;
1227    testFlushBeforeCompletingScan(new MyListHook() {
1228      @Override
1229      public void hook(int currentSize) {
1230        if (currentSize == expectedSize - 1) {
1231          try {
1232            flushStore(store, id++);
1233            timeToGoNextRow.set(true);
1234          } catch (IOException e) {
1235            throw new RuntimeException(e);
1236          }
1237        }
1238      }
1239    }, new FilterBase() {
1240      @Override
1241      public Filter.ReturnCode filterCell(final Cell c) throws IOException {
1242        if (timeToGoNextRow.get()) {
1243          timeToGoNextRow.set(false);
1244          return ReturnCode.NEXT_ROW;
1245        } else {
1246          return ReturnCode.INCLUDE;
1247        }
1248      }
1249    }, expectedSize);
1250  }
1251
1252  @Test
1253  public void testFlushBeforeCompletingScanWithFilterHint()
1254    throws IOException, InterruptedException {
1255    final AtomicBoolean timeToGetHint = new AtomicBoolean(false);
1256    final int expectedSize = 2;
1257    testFlushBeforeCompletingScan(new MyListHook() {
1258      @Override
1259      public void hook(int currentSize) {
1260        if (currentSize == expectedSize - 1) {
1261          try {
1262            flushStore(store, id++);
1263            timeToGetHint.set(true);
1264          } catch (IOException e) {
1265            throw new RuntimeException(e);
1266          }
1267        }
1268      }
1269    }, new FilterBase() {
1270      @Override
1271      public Filter.ReturnCode filterCell(final Cell c) throws IOException {
1272        if (timeToGetHint.get()) {
1273          timeToGetHint.set(false);
1274          return Filter.ReturnCode.SEEK_NEXT_USING_HINT;
1275        } else {
1276          return Filter.ReturnCode.INCLUDE;
1277        }
1278      }
1279
1280      @Override
1281      public Cell getNextCellHint(Cell currentCell) throws IOException {
1282        return currentCell;
1283      }
1284    }, expectedSize);
1285  }
1286
1287  private void testFlushBeforeCompletingScan(MyListHook hook, Filter filter, int expectedSize)
1288    throws IOException, InterruptedException {
1289    Configuration conf = HBaseConfiguration.create();
1290    byte[] r0 = Bytes.toBytes("row0");
1291    byte[] r1 = Bytes.toBytes("row1");
1292    byte[] r2 = Bytes.toBytes("row2");
1293    byte[] value0 = Bytes.toBytes("value0");
1294    byte[] value1 = Bytes.toBytes("value1");
1295    byte[] value2 = Bytes.toBytes("value2");
1296    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1297    long ts = EnvironmentEdgeManager.currentTime();
1298    long seqId = 100;
1299    init(name.getMethodName(), conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)),
1300      ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(1).build(),
1301      new MyStoreHook() {
1302        @Override
1303        public long getSmallestReadPoint(HStore store) {
1304          return seqId + 3;
1305        }
1306      });
1307    // The cells having the value0 won't be flushed to disk because the value of max version is 1
1308    store.add(createCell(r0, qf1, ts, seqId, value0), memStoreSizing);
1309    store.add(createCell(r0, qf2, ts, seqId, value0), memStoreSizing);
1310    store.add(createCell(r0, qf3, ts, seqId, value0), memStoreSizing);
1311    store.add(createCell(r1, qf1, ts + 1, seqId + 1, value1), memStoreSizing);
1312    store.add(createCell(r1, qf2, ts + 1, seqId + 1, value1), memStoreSizing);
1313    store.add(createCell(r1, qf3, ts + 1, seqId + 1, value1), memStoreSizing);
1314    store.add(createCell(r2, qf1, ts + 2, seqId + 2, value2), memStoreSizing);
1315    store.add(createCell(r2, qf2, ts + 2, seqId + 2, value2), memStoreSizing);
1316    store.add(createCell(r2, qf3, ts + 2, seqId + 2, value2), memStoreSizing);
1317    store.add(createCell(r1, qf1, ts + 3, seqId + 3, value1), memStoreSizing);
1318    store.add(createCell(r1, qf2, ts + 3, seqId + 3, value1), memStoreSizing);
1319    store.add(createCell(r1, qf3, ts + 3, seqId + 3, value1), memStoreSizing);
1320    List<Cell> myList = new MyList<>(hook);
1321    Scan scan = new Scan().withStartRow(r1).setFilter(filter);
1322    try (InternalScanner scanner = (InternalScanner) store.getScanner(scan, null, seqId + 3)) {
1323      // r1
1324      scanner.next(myList);
1325      assertEquals(expectedSize, myList.size());
1326      for (Cell c : myList) {
1327        byte[] actualValue = CellUtil.cloneValue(c);
1328        assertTrue("expected:" + Bytes.toStringBinary(value1) + ", actual:"
1329          + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value1));
1330      }
1331      List<Cell> normalList = new ArrayList<>(3);
1332      // r2
1333      scanner.next(normalList);
1334      assertEquals(3, normalList.size());
1335      for (Cell c : normalList) {
1336        byte[] actualValue = CellUtil.cloneValue(c);
1337        assertTrue("expected:" + Bytes.toStringBinary(value2) + ", actual:"
1338          + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value2));
1339      }
1340    }
1341  }
1342
1343  @Test
1344  public void testCreateScannerAndSnapshotConcurrently() throws IOException, InterruptedException {
1345    Configuration conf = HBaseConfiguration.create();
1346    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore.class.getName());
1347    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
1348      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
1349    byte[] value = Bytes.toBytes("value");
1350    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1351    long ts = EnvironmentEdgeManager.currentTime();
1352    long seqId = 100;
1353    // older data whihc shouldn't be "seen" by client
1354    store.add(createCell(qf1, ts, seqId, value), memStoreSizing);
1355    store.add(createCell(qf2, ts, seqId, value), memStoreSizing);
1356    store.add(createCell(qf3, ts, seqId, value), memStoreSizing);
1357    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1358    quals.add(qf1);
1359    quals.add(qf2);
1360    quals.add(qf3);
1361    StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY);
1362    MyCompactingMemStore.START_TEST.set(true);
1363    Runnable flush = () -> {
1364      // this is blocked until we create first scanner from pipeline and snapshot -- phase (1/5)
1365      // recreate the active memstore -- phase (4/5)
1366      storeFlushCtx.prepare();
1367    };
1368    ExecutorService service = Executors.newSingleThreadExecutor();
1369    service.execute(flush);
1370    // we get scanner from pipeline and snapshot but they are empty. -- phase (2/5)
1371    // this is blocked until we recreate the active memstore -- phase (3/5)
1372    // we get scanner from active memstore but it is empty -- phase (5/5)
1373    InternalScanner scanner =
1374      (InternalScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1);
1375    service.shutdown();
1376    service.awaitTermination(20, TimeUnit.SECONDS);
1377    try {
1378      try {
1379        List<Cell> results = new ArrayList<>();
1380        scanner.next(results);
1381        assertEquals(3, results.size());
1382        for (Cell c : results) {
1383          byte[] actualValue = CellUtil.cloneValue(c);
1384          assertTrue("expected:" + Bytes.toStringBinary(value) + ", actual:"
1385            + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value));
1386        }
1387      } finally {
1388        scanner.close();
1389      }
1390    } finally {
1391      MyCompactingMemStore.START_TEST.set(false);
1392      storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1393      storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1394    }
1395  }
1396
1397  @Test
1398  public void testScanWithDoubleFlush() throws IOException {
1399    Configuration conf = HBaseConfiguration.create();
1400    // Initialize region
1401    MyStore myStore = initMyStore(name.getMethodName(), conf, new MyStoreHook() {
1402      @Override
1403      public void getScanners(MyStore store) throws IOException {
1404        final long tmpId = id++;
1405        ExecutorService s = Executors.newSingleThreadExecutor();
1406        s.execute(() -> {
1407          try {
1408            // flush the store before storescanner updates the scanners from store.
1409            // The current data will be flushed into files, and the memstore will
1410            // be clear.
1411            // -- phase (4/4)
1412            flushStore(store, tmpId);
1413          } catch (IOException ex) {
1414            throw new RuntimeException(ex);
1415          }
1416        });
1417        s.shutdown();
1418        try {
1419          // wait for the flush, the thread will be blocked in HStore#notifyChangedReadersObservers.
1420          s.awaitTermination(3, TimeUnit.SECONDS);
1421        } catch (InterruptedException ex) {
1422        }
1423      }
1424    });
1425    byte[] oldValue = Bytes.toBytes("oldValue");
1426    byte[] currentValue = Bytes.toBytes("currentValue");
1427    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1428    long ts = EnvironmentEdgeManager.currentTime();
1429    long seqId = 100;
1430    // older data whihc shouldn't be "seen" by client
1431    myStore.add(createCell(qf1, ts, seqId, oldValue), memStoreSizing);
1432    myStore.add(createCell(qf2, ts, seqId, oldValue), memStoreSizing);
1433    myStore.add(createCell(qf3, ts, seqId, oldValue), memStoreSizing);
1434    long snapshotId = id++;
1435    // push older data into snapshot -- phase (1/4)
1436    StoreFlushContext storeFlushCtx =
1437      store.createFlushContext(snapshotId, FlushLifeCycleTracker.DUMMY);
1438    storeFlushCtx.prepare();
1439
1440    // insert current data into active -- phase (2/4)
1441    myStore.add(createCell(qf1, ts + 1, seqId + 1, currentValue), memStoreSizing);
1442    myStore.add(createCell(qf2, ts + 1, seqId + 1, currentValue), memStoreSizing);
1443    myStore.add(createCell(qf3, ts + 1, seqId + 1, currentValue), memStoreSizing);
1444    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1445    quals.add(qf1);
1446    quals.add(qf2);
1447    quals.add(qf3);
1448    try (InternalScanner scanner =
1449      (InternalScanner) myStore.getScanner(new Scan(new Get(row)), quals, seqId + 1)) {
1450      // complete the flush -- phase (3/4)
1451      storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1452      storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1453
1454      List<Cell> results = new ArrayList<>();
1455      scanner.next(results);
1456      assertEquals(3, results.size());
1457      for (Cell c : results) {
1458        byte[] actualValue = CellUtil.cloneValue(c);
1459        assertTrue("expected:" + Bytes.toStringBinary(currentValue) + ", actual:"
1460          + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, currentValue));
1461      }
1462    }
1463  }
1464
1465  @Test
1466  public void testReclaimChunkWhenScaning() throws IOException {
1467    init("testReclaimChunkWhenScaning");
1468    long ts = EnvironmentEdgeManager.currentTime();
1469    long seqId = 100;
1470    byte[] value = Bytes.toBytes("value");
1471    // older data whihc shouldn't be "seen" by client
1472    store.add(createCell(qf1, ts, seqId, value), null);
1473    store.add(createCell(qf2, ts, seqId, value), null);
1474    store.add(createCell(qf3, ts, seqId, value), null);
1475    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1476    quals.add(qf1);
1477    quals.add(qf2);
1478    quals.add(qf3);
1479    try (InternalScanner scanner =
1480      (InternalScanner) store.getScanner(new Scan(new Get(row)), quals, seqId)) {
1481      List<Cell> results = new MyList<>(size -> {
1482        switch (size) {
1483          // 1) we get the first cell (qf1)
1484          // 2) flush the data to have StoreScanner update inner scanners
1485          // 3) the chunk will be reclaimed after updaing
1486          case 1:
1487            try {
1488              flushStore(store, id++);
1489            } catch (IOException e) {
1490              throw new RuntimeException(e);
1491            }
1492            break;
1493          // 1) we get the second cell (qf2)
1494          // 2) add some cell to fill some byte into the chunk (we have only one chunk)
1495          case 2:
1496            try {
1497              byte[] newValue = Bytes.toBytes("newValue");
1498              // older data whihc shouldn't be "seen" by client
1499              store.add(createCell(qf1, ts + 1, seqId + 1, newValue), null);
1500              store.add(createCell(qf2, ts + 1, seqId + 1, newValue), null);
1501              store.add(createCell(qf3, ts + 1, seqId + 1, newValue), null);
1502            } catch (IOException e) {
1503              throw new RuntimeException(e);
1504            }
1505            break;
1506          default:
1507            break;
1508        }
1509      });
1510      scanner.next(results);
1511      assertEquals(3, results.size());
1512      for (Cell c : results) {
1513        byte[] actualValue = CellUtil.cloneValue(c);
1514        assertTrue("expected:" + Bytes.toStringBinary(value) + ", actual:"
1515          + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value));
1516      }
1517    }
1518  }
1519
1520  /**
1521   * If there are two running InMemoryFlushRunnable, the later InMemoryFlushRunnable may change the
1522   * versionedList. And the first InMemoryFlushRunnable will use the chagned versionedList to remove
1523   * the corresponding segments. In short, there will be some segements which isn't in merge are
1524   * removed.
1525   */
1526  @Test
1527  public void testRunDoubleMemStoreCompactors() throws IOException, InterruptedException {
1528    int flushSize = 500;
1529    Configuration conf = HBaseConfiguration.create();
1530    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStoreWithCustomCompactor.class.getName());
1531    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.25);
1532    MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.set(0);
1533    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushSize));
1534    // Set the lower threshold to invoke the "MERGE" policy
1535    conf.set(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, String.valueOf(0));
1536    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
1537      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
1538    byte[] value = Bytes.toBytes("thisisavarylargevalue");
1539    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1540    long ts = EnvironmentEdgeManager.currentTime();
1541    long seqId = 100;
1542    // older data whihc shouldn't be "seen" by client
1543    store.add(createCell(qf1, ts, seqId, value), memStoreSizing);
1544    store.add(createCell(qf2, ts, seqId, value), memStoreSizing);
1545    store.add(createCell(qf3, ts, seqId, value), memStoreSizing);
1546    assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
1547    StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY);
1548    storeFlushCtx.prepare();
1549    // This shouldn't invoke another in-memory flush because the first compactor thread
1550    // hasn't accomplished the in-memory compaction.
1551    store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing);
1552    store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing);
1553    store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing);
1554    assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
1555    // okay. Let the compaction be completed
1556    MyMemStoreCompactor.START_COMPACTOR_LATCH.countDown();
1557    CompactingMemStore mem = (CompactingMemStore) ((HStore) store).memstore;
1558    while (mem.isMemStoreFlushingInMemory()) {
1559      TimeUnit.SECONDS.sleep(1);
1560    }
1561    // This should invoke another in-memory flush.
1562    store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing);
1563    store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing);
1564    store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing);
1565    assertEquals(2, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
1566    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
1567      String.valueOf(TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE));
1568    storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1569    storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1570  }
1571
1572  @Test
1573  public void testAge() throws IOException {
1574    long currentTime = EnvironmentEdgeManager.currentTime();
1575    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
1576    edge.setValue(currentTime);
1577    EnvironmentEdgeManager.injectEdge(edge);
1578    Configuration conf = TEST_UTIL.getConfiguration();
1579    ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(family);
1580    initHRegion(name.getMethodName(), conf,
1581      TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), hcd, null, false);
1582    HStore store = new HStore(region, hcd, conf, false) {
1583
1584      @Override
1585      protected StoreEngine<?, ?, ?, ?> createStoreEngine(HStore store, Configuration conf,
1586        CellComparator kvComparator) throws IOException {
1587        List<HStoreFile> storefiles =
1588          Arrays.asList(mockStoreFile(currentTime - 10), mockStoreFile(currentTime - 100),
1589            mockStoreFile(currentTime - 1000), mockStoreFile(currentTime - 10000));
1590        StoreFileManager sfm = mock(StoreFileManager.class);
1591        when(sfm.getStorefiles()).thenReturn(storefiles);
1592        StoreEngine<?, ?, ?, ?> storeEngine = mock(StoreEngine.class);
1593        when(storeEngine.getStoreFileManager()).thenReturn(sfm);
1594        return storeEngine;
1595      }
1596    };
1597    assertEquals(10L, store.getMinStoreFileAge().getAsLong());
1598    assertEquals(10000L, store.getMaxStoreFileAge().getAsLong());
1599    assertEquals((10 + 100 + 1000 + 10000) / 4.0, store.getAvgStoreFileAge().getAsDouble(), 1E-4);
1600  }
1601
1602  private HStoreFile mockStoreFile(long createdTime) {
1603    StoreFileInfo info = mock(StoreFileInfo.class);
1604    when(info.getCreatedTimestamp()).thenReturn(createdTime);
1605    HStoreFile sf = mock(HStoreFile.class);
1606    when(sf.getReader()).thenReturn(mock(StoreFileReader.class));
1607    when(sf.isHFile()).thenReturn(true);
1608    when(sf.getFileInfo()).thenReturn(info);
1609    return sf;
1610  }
1611
1612  private MyStore initMyStore(String methodName, Configuration conf, MyStoreHook hook)
1613    throws IOException {
1614    return (MyStore) init(methodName, conf,
1615      TableDescriptorBuilder.newBuilder(TableName.valueOf(table)),
1616      ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(5).build(), hook);
1617  }
1618
1619  private static class MyStore extends HStore {
1620    private final MyStoreHook hook;
1621
1622    MyStore(final HRegion region, final ColumnFamilyDescriptor family,
1623      final Configuration confParam, MyStoreHook hook, boolean switchToPread) throws IOException {
1624      super(region, family, confParam, false);
1625      this.hook = hook;
1626    }
1627
1628    @Override
1629    public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks,
1630      boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,
1631      boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt,
1632      boolean includeMemstoreScanner) throws IOException {
1633      hook.getScanners(this);
1634      return super.getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true,
1635        stopRow, false, readPt, includeMemstoreScanner);
1636    }
1637
1638    @Override
1639    public long getSmallestReadPoint() {
1640      return hook.getSmallestReadPoint(this);
1641    }
1642  }
1643
1644  private abstract static class MyStoreHook {
1645
1646    void getScanners(MyStore store) throws IOException {
1647    }
1648
1649    long getSmallestReadPoint(HStore store) {
1650      return store.getHRegion().getSmallestReadPoint();
1651    }
1652  }
1653
1654  @Test
1655  public void testSwitchingPreadtoStreamParallelyWithCompactionDischarger() throws Exception {
1656    Configuration conf = HBaseConfiguration.create();
1657    conf.set("hbase.hstore.engine.class", DummyStoreEngine.class.getName());
1658    conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 0);
1659    // Set the lower threshold to invoke the "MERGE" policy
1660    MyStore store = initMyStore(name.getMethodName(), conf, new MyStoreHook() {
1661    });
1662    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1663    long ts = EnvironmentEdgeManager.currentTime();
1664    long seqID = 1L;
1665    // Add some data to the region and do some flushes
1666    for (int i = 1; i < 10; i++) {
1667      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1668        memStoreSizing);
1669    }
1670    // flush them
1671    flushStore(store, seqID);
1672    for (int i = 11; i < 20; i++) {
1673      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1674        memStoreSizing);
1675    }
1676    // flush them
1677    flushStore(store, seqID);
1678    for (int i = 21; i < 30; i++) {
1679      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1680        memStoreSizing);
1681    }
1682    // flush them
1683    flushStore(store, seqID);
1684
1685    assertEquals(3, store.getStorefilesCount());
1686    Scan scan = new Scan();
1687    scan.addFamily(family);
1688    Collection<HStoreFile> storefiles2 = store.getStorefiles();
1689    ArrayList<HStoreFile> actualStorefiles = Lists.newArrayList(storefiles2);
1690    StoreScanner storeScanner =
1691      (StoreScanner) store.getScanner(scan, scan.getFamilyMap().get(family), Long.MAX_VALUE);
1692    // get the current heap
1693    KeyValueHeap heap = storeScanner.heap;
1694    // create more store files
1695    for (int i = 31; i < 40; i++) {
1696      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1697        memStoreSizing);
1698    }
1699    // flush them
1700    flushStore(store, seqID);
1701
1702    for (int i = 41; i < 50; i++) {
1703      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1704        memStoreSizing);
1705    }
1706    // flush them
1707    flushStore(store, seqID);
1708    storefiles2 = store.getStorefiles();
1709    ArrayList<HStoreFile> actualStorefiles1 = Lists.newArrayList(storefiles2);
1710    actualStorefiles1.removeAll(actualStorefiles);
1711    // Do compaction
1712    MyThread thread = new MyThread(storeScanner);
1713    thread.start();
1714    store.replaceStoreFiles(actualStorefiles, actualStorefiles1, false);
1715    thread.join();
1716    KeyValueHeap heap2 = thread.getHeap();
1717    assertFalse(heap.equals(heap2));
1718  }
1719
1720  @Test
1721  public void testMaxPreadBytesConfiguredToBeLessThanZero() throws Exception {
1722    Configuration conf = HBaseConfiguration.create();
1723    conf.set("hbase.hstore.engine.class", DummyStoreEngine.class.getName());
1724    // Set 'hbase.storescanner.pread.max.bytes' < 0, so that StoreScanner will be a STREAM type.
1725    conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, -1);
1726    MyStore store = initMyStore(name.getMethodName(), conf, new MyStoreHook() {
1727    });
1728    Scan scan = new Scan();
1729    scan.addFamily(family);
1730    // ReadType on Scan is still DEFAULT only.
1731    assertEquals(ReadType.DEFAULT, scan.getReadType());
1732    StoreScanner storeScanner =
1733      (StoreScanner) store.getScanner(scan, scan.getFamilyMap().get(family), Long.MAX_VALUE);
1734    assertFalse(storeScanner.isScanUsePread());
1735  }
1736
1737  @Test
1738  public void testInMemoryCompactionTypeWithLowerCase() throws IOException, InterruptedException {
1739    Configuration conf = HBaseConfiguration.create();
1740    conf.set("hbase.systemtables.compacting.memstore.type", "eager");
1741    init(name.getMethodName(), conf,
1742      TableDescriptorBuilder.newBuilder(
1743        TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME, "meta".getBytes())),
1744      ColumnFamilyDescriptorBuilder.newBuilder(family)
1745        .setInMemoryCompaction(MemoryCompactionPolicy.NONE).build());
1746    assertTrue(((MemStoreCompactor) ((CompactingMemStore) store.memstore).compactor).toString()
1747      .startsWith("eager".toUpperCase()));
1748  }
1749
1750  @Test
1751  public void testSpaceQuotaChangeAfterReplacement() throws IOException {
1752    final TableName tn = TableName.valueOf(name.getMethodName());
1753    init(name.getMethodName());
1754
1755    RegionSizeStoreImpl sizeStore = new RegionSizeStoreImpl();
1756
1757    HStoreFile sf1 = mockStoreFileWithLength(1024L);
1758    HStoreFile sf2 = mockStoreFileWithLength(2048L);
1759    HStoreFile sf3 = mockStoreFileWithLength(4096L);
1760    HStoreFile sf4 = mockStoreFileWithLength(8192L);
1761
1762    RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tn).setStartKey(Bytes.toBytes("a"))
1763      .setEndKey(Bytes.toBytes("b")).build();
1764
1765    // Compacting two files down to one, reducing size
1766    sizeStore.put(regionInfo, 1024L + 4096L);
1767    store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo, Arrays.asList(sf1, sf3),
1768      Arrays.asList(sf2));
1769
1770    assertEquals(2048L, sizeStore.getRegionSize(regionInfo).getSize());
1771
1772    // The same file length in and out should have no change
1773    store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo, Arrays.asList(sf2),
1774      Arrays.asList(sf2));
1775
1776    assertEquals(2048L, sizeStore.getRegionSize(regionInfo).getSize());
1777
1778    // Increase the total size used
1779    store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo, Arrays.asList(sf2),
1780      Arrays.asList(sf3));
1781
1782    assertEquals(4096L, sizeStore.getRegionSize(regionInfo).getSize());
1783
1784    RegionInfo regionInfo2 = RegionInfoBuilder.newBuilder(tn).setStartKey(Bytes.toBytes("b"))
1785      .setEndKey(Bytes.toBytes("c")).build();
1786    store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo2, null, Arrays.asList(sf4));
1787
1788    assertEquals(8192L, sizeStore.getRegionSize(regionInfo2).getSize());
1789  }
1790
1791  @Test
1792  public void testHFileContextSetWithCFAndTable() throws Exception {
1793    init(this.name.getMethodName());
1794    StoreFileWriter writer = store.getStoreEngine()
1795      .createWriter(CreateStoreFileWriterParams.create().maxKeyCount(10000L)
1796        .compression(Compression.Algorithm.NONE).isCompaction(true).includeMVCCReadpoint(true)
1797        .includesTag(false).shouldDropBehind(true));
1798    HFileContext hFileContext = writer.getHFileWriter().getFileContext();
1799    assertArrayEquals(family, hFileContext.getColumnFamily());
1800    assertArrayEquals(table, hFileContext.getTableName());
1801  }
1802
1803  // This test is for HBASE-26026, HBase Write be stuck when active segment has no cell
1804  // but its dataSize exceeds inmemoryFlushSize
1805  @Test
1806  public void testCompactingMemStoreNoCellButDataSizeExceedsInmemoryFlushSize()
1807    throws IOException, InterruptedException {
1808    Configuration conf = HBaseConfiguration.create();
1809
1810    byte[] smallValue = new byte[3];
1811    byte[] largeValue = new byte[9];
1812    final long timestamp = EnvironmentEdgeManager.currentTime();
1813    final long seqId = 100;
1814    final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
1815    final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
1816    int smallCellByteSize = MutableSegment.getCellLength(smallCell);
1817    int largeCellByteSize = MutableSegment.getCellLength(largeCell);
1818    int flushByteSize = smallCellByteSize + largeCellByteSize - 2;
1819
1820    // set CompactingMemStore.inmemoryFlushSize to flushByteSize.
1821    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore2.class.getName());
1822    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
1823    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
1824
1825    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
1826      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
1827
1828    MyCompactingMemStore2 myCompactingMemStore = ((MyCompactingMemStore2) store.memstore);
1829    assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
1830    myCompactingMemStore.smallCellPreUpdateCounter.set(0);
1831    myCompactingMemStore.largeCellPreUpdateCounter.set(0);
1832
1833    final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
1834    Thread smallCellThread = new Thread(() -> {
1835      try {
1836        store.add(smallCell, new NonThreadSafeMemStoreSizing());
1837      } catch (Throwable exception) {
1838        exceptionRef.set(exception);
1839      }
1840    });
1841    smallCellThread.setName(MyCompactingMemStore2.SMALL_CELL_THREAD_NAME);
1842    smallCellThread.start();
1843
1844    String oldThreadName = Thread.currentThread().getName();
1845    try {
1846      /**
1847       * 1.smallCellThread enters CompactingMemStore.checkAndAddToActiveSize first, then
1848       * largeCellThread enters CompactingMemStore.checkAndAddToActiveSize, and then largeCellThread
1849       * invokes flushInMemory.
1850       * <p/>
1851       * 2. After largeCellThread finished CompactingMemStore.flushInMemory method, smallCellThread
1852       * can add cell to currentActive . That is to say when largeCellThread called flushInMemory
1853       * method, CompactingMemStore.active has no cell.
1854       */
1855      Thread.currentThread().setName(MyCompactingMemStore2.LARGE_CELL_THREAD_NAME);
1856      store.add(largeCell, new NonThreadSafeMemStoreSizing());
1857      smallCellThread.join();
1858
1859      for (int i = 0; i < 100; i++) {
1860        long currentTimestamp = timestamp + 100 + i;
1861        Cell cell = createCell(qf2, currentTimestamp, seqId, largeValue);
1862        store.add(cell, new NonThreadSafeMemStoreSizing());
1863      }
1864    } finally {
1865      Thread.currentThread().setName(oldThreadName);
1866    }
1867
1868    assertTrue(exceptionRef.get() == null);
1869
1870  }
1871
1872  // This test is for HBASE-26210, HBase Write be stuck when there is cell which size exceeds
1873  // InmemoryFlushSize
1874  @Test(timeout = 60000)
1875  public void testCompactingMemStoreCellExceedInmemoryFlushSize() throws Exception {
1876    Configuration conf = HBaseConfiguration.create();
1877    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore6.class.getName());
1878
1879    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
1880      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
1881
1882    MyCompactingMemStore6 myCompactingMemStore = ((MyCompactingMemStore6) store.memstore);
1883
1884    int size = (int) (myCompactingMemStore.getInmemoryFlushSize());
1885    byte[] value = new byte[size + 1];
1886
1887    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1888    long timestamp = EnvironmentEdgeManager.currentTime();
1889    long seqId = 100;
1890    Cell cell = createCell(qf1, timestamp, seqId, value);
1891    int cellByteSize = MutableSegment.getCellLength(cell);
1892    store.add(cell, memStoreSizing);
1893    assertTrue(memStoreSizing.getCellsCount() == 1);
1894    assertTrue(memStoreSizing.getDataSize() == cellByteSize);
1895    // Waiting the in memory compaction completed, see HBASE-26438
1896    myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
1897  }
1898
1899  // This test is for HBASE-26210 also, test write large cell and small cell concurrently when
1900  // InmemoryFlushSize is smaller,equal with and larger than cell size.
1901  @Test
1902  public void testCompactingMemStoreWriteLargeCellAndSmallCellConcurrently()
1903    throws IOException, InterruptedException {
1904    doWriteTestLargeCellAndSmallCellConcurrently(
1905      (smallCellByteSize, largeCellByteSize) -> largeCellByteSize - 1);
1906    doWriteTestLargeCellAndSmallCellConcurrently(
1907      (smallCellByteSize, largeCellByteSize) -> largeCellByteSize);
1908    doWriteTestLargeCellAndSmallCellConcurrently(
1909      (smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize - 1);
1910    doWriteTestLargeCellAndSmallCellConcurrently(
1911      (smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize);
1912    doWriteTestLargeCellAndSmallCellConcurrently(
1913      (smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize + 1);
1914  }
1915
1916  private void doWriteTestLargeCellAndSmallCellConcurrently(IntBinaryOperator getFlushByteSize)
1917    throws IOException, InterruptedException {
1918
1919    Configuration conf = HBaseConfiguration.create();
1920
1921    byte[] smallValue = new byte[3];
1922    byte[] largeValue = new byte[100];
1923    final long timestamp = EnvironmentEdgeManager.currentTime();
1924    final long seqId = 100;
1925    final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
1926    final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
1927    int smallCellByteSize = MutableSegment.getCellLength(smallCell);
1928    int largeCellByteSize = MutableSegment.getCellLength(largeCell);
1929    int flushByteSize = getFlushByteSize.applyAsInt(smallCellByteSize, largeCellByteSize);
1930    boolean flushByteSizeLessThanSmallAndLargeCellSize =
1931      flushByteSize < (smallCellByteSize + largeCellByteSize);
1932
1933    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore3.class.getName());
1934    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
1935    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
1936
1937    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
1938      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
1939
1940    MyCompactingMemStore3 myCompactingMemStore = ((MyCompactingMemStore3) store.memstore);
1941    assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
1942    myCompactingMemStore.disableCompaction();
1943    if (flushByteSizeLessThanSmallAndLargeCellSize) {
1944      myCompactingMemStore.flushByteSizeLessThanSmallAndLargeCellSize = true;
1945    } else {
1946      myCompactingMemStore.flushByteSizeLessThanSmallAndLargeCellSize = false;
1947    }
1948
1949    final ThreadSafeMemStoreSizing memStoreSizing = new ThreadSafeMemStoreSizing();
1950    final AtomicLong totalCellByteSize = new AtomicLong(0);
1951    final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
1952    Thread smallCellThread = new Thread(() -> {
1953      try {
1954        for (int i = 1; i <= MyCompactingMemStore3.CELL_COUNT; i++) {
1955          long currentTimestamp = timestamp + i;
1956          Cell cell = createCell(qf1, currentTimestamp, seqId, smallValue);
1957          totalCellByteSize.addAndGet(MutableSegment.getCellLength(cell));
1958          store.add(cell, memStoreSizing);
1959        }
1960      } catch (Throwable exception) {
1961        exceptionRef.set(exception);
1962
1963      }
1964    });
1965    smallCellThread.setName(MyCompactingMemStore3.SMALL_CELL_THREAD_NAME);
1966    smallCellThread.start();
1967
1968    String oldThreadName = Thread.currentThread().getName();
1969    try {
1970      /**
1971       * When flushByteSizeLessThanSmallAndLargeCellSize is true:
1972       * </p>
1973       * 1.smallCellThread enters MyCompactingMemStore3.checkAndAddToActiveSize first, then
1974       * largeCellThread enters MyCompactingMemStore3.checkAndAddToActiveSize, and then
1975       * largeCellThread invokes flushInMemory.
1976       * <p/>
1977       * 2. After largeCellThread finished CompactingMemStore.flushInMemory method, smallCellThread
1978       * can run into MyCompactingMemStore3.checkAndAddToActiveSize again.
1979       * <p/>
1980       * When flushByteSizeLessThanSmallAndLargeCellSize is false: smallCellThread and
1981       * largeCellThread concurrently write one cell and wait each other, and then write another
1982       * cell etc.
1983       */
1984      Thread.currentThread().setName(MyCompactingMemStore3.LARGE_CELL_THREAD_NAME);
1985      for (int i = 1; i <= MyCompactingMemStore3.CELL_COUNT; i++) {
1986        long currentTimestamp = timestamp + i;
1987        Cell cell = createCell(qf2, currentTimestamp, seqId, largeValue);
1988        totalCellByteSize.addAndGet(MutableSegment.getCellLength(cell));
1989        store.add(cell, memStoreSizing);
1990      }
1991      smallCellThread.join();
1992
1993      assertTrue(exceptionRef.get() == null);
1994      assertTrue(memStoreSizing.getCellsCount() == (MyCompactingMemStore3.CELL_COUNT * 2));
1995      assertTrue(memStoreSizing.getDataSize() == totalCellByteSize.get());
1996      if (flushByteSizeLessThanSmallAndLargeCellSize) {
1997        assertTrue(myCompactingMemStore.flushCounter.get() == MyCompactingMemStore3.CELL_COUNT);
1998      } else {
1999        assertTrue(
2000          myCompactingMemStore.flushCounter.get() <= (MyCompactingMemStore3.CELL_COUNT - 1));
2001      }
2002    } finally {
2003      Thread.currentThread().setName(oldThreadName);
2004    }
2005  }
2006
2007  /**
2008   * <pre>
2009    * This test is for HBASE-26384,
2010   * test {@link CompactingMemStore#flattenOneSegment} and {@link CompactingMemStore#snapshot()}
2011   * execute concurrently.
2012   * The threads sequence before HBASE-26384 is(The bug only exists for branch-2,and I add UTs
2013   * for both branch-2 and master):
2014   * 1. The {@link CompactingMemStore} size exceeds
2015   *    {@link CompactingMemStore#getInmemoryFlushSize()},the write thread adds a new
2016   *    {@link ImmutableSegment}  to the head of {@link CompactingMemStore#pipeline},and start a
2017   *    in memory compact thread to execute {@link CompactingMemStore#inMemoryCompaction}.
2018   * 2. The in memory compact thread starts and then stopping before
2019   *    {@link CompactingMemStore#flattenOneSegment}.
2020   * 3. The snapshot thread starts {@link CompactingMemStore#snapshot} concurrently,after the
2021   *    snapshot thread executing {@link CompactingMemStore#getImmutableSegments},the in memory
2022   *    compact thread continues.
2023   *    Assuming {@link VersionedSegmentsList#version} returned from
2024   *    {@link CompactingMemStore#getImmutableSegments} is v.
2025   * 4. The snapshot thread stopping before {@link CompactingMemStore#swapPipelineWithNull}.
2026   * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
2027   *    {@link CompactionPipeline#version} is still v.
2028   * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
2029   *    {@link CompactionPipeline#version} is v, {@link CompactingMemStore#swapPipelineWithNull}
2030   *    thinks it is successful and continue flushing,but the {@link ImmutableSegment} in
2031   *    {@link CompactionPipeline} has changed because
2032   *    {@link CompactingMemStore#flattenOneSegment},so the {@link ImmutableSegment} is not
2033   *    removed in fact and still remaining in {@link CompactionPipeline}.
2034   *
2035   * After HBASE-26384, the 5-6 step is changed to following, which is expected behavior:
2036   * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
2037   *    {@link CompactingMemStore#flattenOneSegment} change {@link CompactionPipeline#version} to
2038   *    v+1.
2039   * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
2040   *    {@link CompactionPipeline#version} is v+1, {@link CompactingMemStore#swapPipelineWithNull}
2041   *    failed and retry the while loop in {@link CompactingMemStore#pushPipelineToSnapshot} once
2042   *    again, because there is no concurrent {@link CompactingMemStore#inMemoryCompaction} now,
2043   *    {@link CompactingMemStore#swapPipelineWithNull} succeeds.
2044   * </pre>
2045   */
2046  @Test
2047  public void testFlattenAndSnapshotCompactingMemStoreConcurrently() throws Exception {
2048    Configuration conf = HBaseConfiguration.create();
2049
2050    byte[] smallValue = new byte[3];
2051    byte[] largeValue = new byte[9];
2052    final long timestamp = EnvironmentEdgeManager.currentTime();
2053    final long seqId = 100;
2054    final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
2055    final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
2056    int smallCellByteSize = MutableSegment.getCellLength(smallCell);
2057    int largeCellByteSize = MutableSegment.getCellLength(largeCell);
2058    int totalCellByteSize = (smallCellByteSize + largeCellByteSize);
2059    int flushByteSize = totalCellByteSize - 2;
2060
2061    // set CompactingMemStore.inmemoryFlushSize to flushByteSize.
2062    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore4.class.getName());
2063    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
2064    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
2065
2066    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
2067      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
2068
2069    MyCompactingMemStore4 myCompactingMemStore = ((MyCompactingMemStore4) store.memstore);
2070    assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
2071
2072    store.add(smallCell, new NonThreadSafeMemStoreSizing());
2073    store.add(largeCell, new NonThreadSafeMemStoreSizing());
2074
2075    String oldThreadName = Thread.currentThread().getName();
2076    try {
2077      Thread.currentThread().setName(MyCompactingMemStore4.TAKE_SNAPSHOT_THREAD_NAME);
2078      /**
2079       * {@link CompactingMemStore#snapshot} must wait the in memory compact thread enters
2080       * {@link CompactingMemStore#flattenOneSegment},because {@link CompactingMemStore#snapshot}
2081       * would invoke {@link CompactingMemStore#stopCompaction}.
2082       */
2083      myCompactingMemStore.snapShotStartCyclicCyclicBarrier.await();
2084
2085      MemStoreSnapshot memStoreSnapshot = myCompactingMemStore.snapshot();
2086      myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
2087
2088      assertTrue(memStoreSnapshot.getCellsCount() == 2);
2089      assertTrue(((int) (memStoreSnapshot.getDataSize())) == totalCellByteSize);
2090      VersionedSegmentsList segments = myCompactingMemStore.getImmutableSegments();
2091      assertTrue(segments.getNumOfSegments() == 0);
2092      assertTrue(segments.getNumOfCells() == 0);
2093      assertTrue(myCompactingMemStore.setInMemoryCompactionFlagCounter.get() == 1);
2094      assertTrue(myCompactingMemStore.swapPipelineWithNullCounter.get() == 2);
2095    } finally {
2096      Thread.currentThread().setName(oldThreadName);
2097    }
2098  }
2099
2100  /**
2101   * <pre>
2102   * This test is for HBASE-26384,
2103   * test {@link CompactingMemStore#flattenOneSegment}{@link CompactingMemStore#snapshot()}
2104   * and writeMemStore execute concurrently.
2105   * The threads sequence before HBASE-26384 is(The bug only exists for branch-2,and I add UTs
2106   * for both branch-2 and master):
2107   * 1. The {@link CompactingMemStore} size exceeds
2108   *    {@link CompactingMemStore#getInmemoryFlushSize()},the write thread adds a new
2109   *    {@link ImmutableSegment}  to the head of {@link CompactingMemStore#pipeline},and start a
2110   *    in memory compact thread to execute {@link CompactingMemStore#inMemoryCompaction}.
2111   * 2. The in memory compact thread starts and then stopping before
2112   *    {@link CompactingMemStore#flattenOneSegment}.
2113   * 3. The snapshot thread starts {@link CompactingMemStore#snapshot} concurrently,after the
2114   *    snapshot thread executing {@link CompactingMemStore#getImmutableSegments},the in memory
2115   *    compact thread continues.
2116   *    Assuming {@link VersionedSegmentsList#version} returned from
2117   *    {@link CompactingMemStore#getImmutableSegments} is v.
2118   * 4. The snapshot thread stopping before {@link CompactingMemStore#swapPipelineWithNull}.
2119   * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
2120   *    {@link CompactionPipeline#version} is still v.
2121   * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
2122   *    {@link CompactionPipeline#version} is v, {@link CompactingMemStore#swapPipelineWithNull}
2123   *    thinks it is successful and continue flushing,but the {@link ImmutableSegment} in
2124   *    {@link CompactionPipeline} has changed because
2125   *    {@link CompactingMemStore#flattenOneSegment},so the {@link ImmutableSegment} is not
2126   *    removed in fact and still remaining in {@link CompactionPipeline}.
2127   *
2128   * After HBASE-26384, the 5-6 step is changed to following, which is expected behavior,
2129   * and I add step 7-8 to test there is new segment added before retry.
2130   * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
2131   *    {@link CompactingMemStore#flattenOneSegment} change {@link CompactionPipeline#version} to
2132   *     v+1.
2133   * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
2134   *    {@link CompactionPipeline#version} is v+1, {@link CompactingMemStore#swapPipelineWithNull}
2135   *    failed and retry,{@link VersionedSegmentsList#version} returned from
2136   *    {@link CompactingMemStore#getImmutableSegments} is v+1.
2137   * 7. The write thread continues writing to {@link CompactingMemStore} and
2138   *    {@link CompactingMemStore} size exceeds {@link CompactingMemStore#getInmemoryFlushSize()},
2139   *    {@link CompactingMemStore#flushInMemory(MutableSegment)} is called and a new
2140   *    {@link ImmutableSegment} is added to the head of {@link CompactingMemStore#pipeline},
2141   *    {@link CompactionPipeline#version} is still v+1.
2142   * 8. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
2143   *    {@link CompactionPipeline#version} is still v+1,
2144   *    {@link CompactingMemStore#swapPipelineWithNull} succeeds.The new {@link ImmutableSegment}
2145   *    remained at the head of {@link CompactingMemStore#pipeline},the old is removed by
2146   *    {@link CompactingMemStore#swapPipelineWithNull}.
2147   * </pre>
2148   */
2149  @Test
2150  public void testFlattenSnapshotWriteCompactingMemeStoreConcurrently() throws Exception {
2151    Configuration conf = HBaseConfiguration.create();
2152
2153    byte[] smallValue = new byte[3];
2154    byte[] largeValue = new byte[9];
2155    final long timestamp = EnvironmentEdgeManager.currentTime();
2156    final long seqId = 100;
2157    final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
2158    final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
2159    int smallCellByteSize = MutableSegment.getCellLength(smallCell);
2160    int largeCellByteSize = MutableSegment.getCellLength(largeCell);
2161    int firstWriteCellByteSize = (smallCellByteSize + largeCellByteSize);
2162    int flushByteSize = firstWriteCellByteSize - 2;
2163
2164    // set CompactingMemStore.inmemoryFlushSize to flushByteSize.
2165    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore5.class.getName());
2166    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
2167    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
2168
2169    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
2170      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
2171
2172    final MyCompactingMemStore5 myCompactingMemStore = ((MyCompactingMemStore5) store.memstore);
2173    assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
2174
2175    store.add(smallCell, new NonThreadSafeMemStoreSizing());
2176    store.add(largeCell, new NonThreadSafeMemStoreSizing());
2177
2178    final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
2179    final Cell writeAgainCell1 = createCell(qf3, timestamp, seqId + 1, largeValue);
2180    final Cell writeAgainCell2 = createCell(qf4, timestamp, seqId + 1, largeValue);
2181    final int writeAgainCellByteSize =
2182      MutableSegment.getCellLength(writeAgainCell1) + MutableSegment.getCellLength(writeAgainCell2);
2183    final Thread writeAgainThread = new Thread(() -> {
2184      try {
2185        myCompactingMemStore.writeMemStoreAgainStartCyclicBarrier.await();
2186
2187        store.add(writeAgainCell1, new NonThreadSafeMemStoreSizing());
2188        store.add(writeAgainCell2, new NonThreadSafeMemStoreSizing());
2189
2190        myCompactingMemStore.writeMemStoreAgainEndCyclicBarrier.await();
2191      } catch (Throwable exception) {
2192        exceptionRef.set(exception);
2193      }
2194    });
2195    writeAgainThread.setName(MyCompactingMemStore5.WRITE_AGAIN_THREAD_NAME);
2196    writeAgainThread.start();
2197
2198    String oldThreadName = Thread.currentThread().getName();
2199    try {
2200      Thread.currentThread().setName(MyCompactingMemStore5.TAKE_SNAPSHOT_THREAD_NAME);
2201      /**
2202       * {@link CompactingMemStore#snapshot} must wait the in memory compact thread enters
2203       * {@link CompactingMemStore#flattenOneSegment},because {@link CompactingMemStore#snapshot}
2204       * would invoke {@link CompactingMemStore#stopCompaction}.
2205       */
2206      myCompactingMemStore.snapShotStartCyclicCyclicBarrier.await();
2207      MemStoreSnapshot memStoreSnapshot = myCompactingMemStore.snapshot();
2208      myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
2209      writeAgainThread.join();
2210
2211      assertTrue(memStoreSnapshot.getCellsCount() == 2);
2212      assertTrue(((int) (memStoreSnapshot.getDataSize())) == firstWriteCellByteSize);
2213      VersionedSegmentsList segments = myCompactingMemStore.getImmutableSegments();
2214      assertTrue(segments.getNumOfSegments() == 1);
2215      assertTrue(
2216        ((int) (segments.getStoreSegments().get(0).getDataSize())) == writeAgainCellByteSize);
2217      assertTrue(segments.getNumOfCells() == 2);
2218      assertTrue(myCompactingMemStore.setInMemoryCompactionFlagCounter.get() == 2);
2219      assertTrue(exceptionRef.get() == null);
2220      assertTrue(myCompactingMemStore.swapPipelineWithNullCounter.get() == 2);
2221    } finally {
2222      Thread.currentThread().setName(oldThreadName);
2223    }
2224  }
2225
2226  /**
2227   * <pre>
2228   * This test is for HBASE-26465,
2229   * test {@link DefaultMemStore#clearSnapshot} and {@link DefaultMemStore#getScanners} execute
2230   * concurrently. The threads sequence before HBASE-26465 is:
2231   * 1.The flush thread starts {@link DefaultMemStore} flushing after some cells have be added to
2232   *  {@link DefaultMemStore}.
2233   * 2.The flush thread stopping before {@link DefaultMemStore#clearSnapshot} in
2234   *   {@link HStore#updateStorefiles} after completed flushing memStore to hfile.
2235   * 3.The scan thread starts and stopping after {@link DefaultMemStore#getSnapshotSegments} in
2236   *   {@link DefaultMemStore#getScanners},here the scan thread gets the
2237   *   {@link DefaultMemStore#snapshot} which is created by the flush thread.
2238   * 4.The flush thread continues {@link DefaultMemStore#clearSnapshot} and close
2239   *   {@link DefaultMemStore#snapshot},because the reference count of the corresponding
2240   *   {@link MemStoreLABImpl} is 0, the {@link Chunk}s in corresponding {@link MemStoreLABImpl}
2241   *   are recycled.
2242   * 5.The scan thread continues {@link DefaultMemStore#getScanners},and create a
2243   *   {@link SegmentScanner} for this {@link DefaultMemStore#snapshot}, and increase the
2244   *   reference count of the corresponding {@link MemStoreLABImpl}, but {@link Chunk}s in
2245   *   corresponding {@link MemStoreLABImpl} are recycled by step 4, and these {@link Chunk}s may
2246   *   be overwritten by other write threads,which may cause serious problem.
2247   * After HBASE-26465,{@link DefaultMemStore#getScanners} and
2248   * {@link DefaultMemStore#clearSnapshot} could not execute concurrently.
2249   * </pre>
2250   */
2251  @Test
2252  public void testClearSnapshotGetScannerConcurrently() throws Exception {
2253    Configuration conf = HBaseConfiguration.create();
2254
2255    byte[] smallValue = new byte[3];
2256    byte[] largeValue = new byte[9];
2257    final long timestamp = EnvironmentEdgeManager.currentTime();
2258    final long seqId = 100;
2259    final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
2260    final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
2261    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
2262    quals.add(qf1);
2263    quals.add(qf2);
2264
2265    conf.set(HStore.MEMSTORE_CLASS_NAME, MyDefaultMemStore.class.getName());
2266    conf.setBoolean(WALFactory.WAL_ENABLED, false);
2267
2268    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build());
2269    MyDefaultMemStore myDefaultMemStore = (MyDefaultMemStore) (store.memstore);
2270    myDefaultMemStore.store = store;
2271
2272    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
2273    store.add(smallCell, memStoreSizing);
2274    store.add(largeCell, memStoreSizing);
2275
2276    final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
2277    final Thread flushThread = new Thread(() -> {
2278      try {
2279        flushStore(store, id++);
2280      } catch (Throwable exception) {
2281        exceptionRef.set(exception);
2282      }
2283    });
2284    flushThread.setName(MyDefaultMemStore.FLUSH_THREAD_NAME);
2285    flushThread.start();
2286
2287    String oldThreadName = Thread.currentThread().getName();
2288    StoreScanner storeScanner = null;
2289    try {
2290      Thread.currentThread().setName(MyDefaultMemStore.GET_SCANNER_THREAD_NAME);
2291
2292      /**
2293       * Wait flush thread stopping before {@link DefaultMemStore#doClearSnapshot}
2294       */
2295      myDefaultMemStore.getScannerCyclicBarrier.await();
2296
2297      storeScanner = (StoreScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1);
2298      flushThread.join();
2299
2300      if (myDefaultMemStore.shouldWait) {
2301        SegmentScanner segmentScanner = getTypeKeyValueScanner(storeScanner, SegmentScanner.class);
2302        MemStoreLABImpl memStoreLAB = (MemStoreLABImpl) (segmentScanner.segment.getMemStoreLAB());
2303        assertTrue(memStoreLAB.isClosed());
2304        assertTrue(!memStoreLAB.chunks.isEmpty());
2305        assertTrue(!memStoreLAB.isReclaimed());
2306
2307        Cell cell1 = segmentScanner.next();
2308        CellUtil.equals(smallCell, cell1);
2309        Cell cell2 = segmentScanner.next();
2310        CellUtil.equals(largeCell, cell2);
2311        assertNull(segmentScanner.next());
2312      } else {
2313        List<Cell> results = new ArrayList<>();
2314        storeScanner.next(results);
2315        assertEquals(2, results.size());
2316        CellUtil.equals(smallCell, results.get(0));
2317        CellUtil.equals(largeCell, results.get(1));
2318      }
2319      assertTrue(exceptionRef.get() == null);
2320    } finally {
2321      if (storeScanner != null) {
2322        storeScanner.close();
2323      }
2324      Thread.currentThread().setName(oldThreadName);
2325    }
2326  }
2327
2328  @SuppressWarnings("unchecked")
2329  private <T> T getTypeKeyValueScanner(StoreScanner storeScanner, Class<T> keyValueScannerClass) {
2330    List<T> resultScanners = new ArrayList<T>();
2331    for (KeyValueScanner keyValueScanner : storeScanner.currentScanners) {
2332      if (keyValueScannerClass.isInstance(keyValueScanner)) {
2333        resultScanners.add((T) keyValueScanner);
2334      }
2335    }
2336    assertTrue(resultScanners.size() == 1);
2337    return resultScanners.get(0);
2338  }
2339
2340  @Test
2341  public void testOnConfigurationChange() throws IOException {
2342    final int COMMON_MAX_FILES_TO_COMPACT = 10;
2343    final int NEW_COMMON_MAX_FILES_TO_COMPACT = 8;
2344    final int STORE_MAX_FILES_TO_COMPACT = 6;
2345
2346    // Build a table that its maxFileToCompact different from common configuration.
2347    Configuration conf = HBaseConfiguration.create();
2348    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY,
2349      COMMON_MAX_FILES_TO_COMPACT);
2350    ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.newBuilder(family)
2351      .setConfiguration(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY,
2352        String.valueOf(STORE_MAX_FILES_TO_COMPACT))
2353      .build();
2354    init(this.name.getMethodName(), conf, hcd);
2355
2356    // After updating common configuration, the conf in HStore itself must not be changed.
2357    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY,
2358      NEW_COMMON_MAX_FILES_TO_COMPACT);
2359    this.store.onConfigurationChange(conf);
2360    assertEquals(STORE_MAX_FILES_TO_COMPACT,
2361      store.getStoreEngine().getCompactionPolicy().getConf().getMaxFilesToCompact());
2362  }
2363
2364  /**
2365   * This test is for HBASE-26476
2366   */
2367  @Test
2368  public void testExtendsDefaultMemStore() throws Exception {
2369    Configuration conf = HBaseConfiguration.create();
2370    conf.setBoolean(WALFactory.WAL_ENABLED, false);
2371
2372    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build());
2373    assertTrue(this.store.memstore.getClass() == DefaultMemStore.class);
2374    tearDown();
2375
2376    conf.set(HStore.MEMSTORE_CLASS_NAME, CustomDefaultMemStore.class.getName());
2377    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build());
2378    assertTrue(this.store.memstore.getClass() == CustomDefaultMemStore.class);
2379  }
2380
2381  static class CustomDefaultMemStore extends DefaultMemStore {
2382
2383    public CustomDefaultMemStore(Configuration conf, CellComparator c,
2384      RegionServicesForStores regionServices) {
2385      super(conf, c, regionServices);
2386    }
2387
2388  }
2389
2390  /**
2391   * This test is for HBASE-26488
2392   */
2393  @Test
2394  public void testMemoryLeakWhenFlushMemStoreRetrying() throws Exception {
2395
2396    Configuration conf = HBaseConfiguration.create();
2397
2398    byte[] smallValue = new byte[3];
2399    byte[] largeValue = new byte[9];
2400    final long timestamp = EnvironmentEdgeManager.currentTime();
2401    final long seqId = 100;
2402    final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
2403    final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
2404    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
2405    quals.add(qf1);
2406    quals.add(qf2);
2407
2408    conf.set(HStore.MEMSTORE_CLASS_NAME, MyDefaultMemStore1.class.getName());
2409    conf.setBoolean(WALFactory.WAL_ENABLED, false);
2410    conf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY,
2411      MyDefaultStoreFlusher.class.getName());
2412
2413    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build());
2414    MyDefaultMemStore1 myDefaultMemStore = (MyDefaultMemStore1) (store.memstore);
2415    assertTrue((store.storeEngine.getStoreFlusher()) instanceof MyDefaultStoreFlusher);
2416
2417    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
2418    store.add(smallCell, memStoreSizing);
2419    store.add(largeCell, memStoreSizing);
2420    flushStore(store, id++);
2421
2422    MemStoreLABImpl memStoreLAB =
2423      (MemStoreLABImpl) (myDefaultMemStore.snapshotImmutableSegment.getMemStoreLAB());
2424    assertTrue(memStoreLAB.isClosed());
2425    assertTrue(memStoreLAB.getRefCntValue() == 0);
2426    assertTrue(memStoreLAB.isReclaimed());
2427    assertTrue(memStoreLAB.chunks.isEmpty());
2428    StoreScanner storeScanner = null;
2429    try {
2430      storeScanner = (StoreScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1);
2431      assertTrue(store.storeEngine.getStoreFileManager().getStorefileCount() == 1);
2432      assertTrue(store.memstore.size().getCellsCount() == 0);
2433      assertTrue(store.memstore.getSnapshotSize().getCellsCount() == 0);
2434      assertTrue(storeScanner.currentScanners.size() == 1);
2435      assertTrue(storeScanner.currentScanners.get(0) instanceof StoreFileScanner);
2436
2437      List<Cell> results = new ArrayList<>();
2438      storeScanner.next(results);
2439      assertEquals(2, results.size());
2440      CellUtil.equals(smallCell, results.get(0));
2441      CellUtil.equals(largeCell, results.get(1));
2442    } finally {
2443      if (storeScanner != null) {
2444        storeScanner.close();
2445      }
2446    }
2447  }
2448
2449  static class MyDefaultMemStore1 extends DefaultMemStore {
2450
2451    private ImmutableSegment snapshotImmutableSegment;
2452
2453    public MyDefaultMemStore1(Configuration conf, CellComparator c,
2454      RegionServicesForStores regionServices) {
2455      super(conf, c, regionServices);
2456    }
2457
2458    @Override
2459    public MemStoreSnapshot snapshot() {
2460      MemStoreSnapshot result = super.snapshot();
2461      this.snapshotImmutableSegment = snapshot;
2462      return result;
2463    }
2464
2465  }
2466
2467  public static class MyDefaultStoreFlusher extends DefaultStoreFlusher {
2468    private static final AtomicInteger failCounter = new AtomicInteger(1);
2469    private static final AtomicInteger counter = new AtomicInteger(0);
2470
2471    public MyDefaultStoreFlusher(Configuration conf, HStore store) {
2472      super(conf, store);
2473    }
2474
2475    @Override
2476    public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
2477      MonitoredTask status, ThroughputController throughputController,
2478      FlushLifeCycleTracker tracker, Consumer<Path> writerCreationTracker) throws IOException {
2479      counter.incrementAndGet();
2480      return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker,
2481        writerCreationTracker);
2482    }
2483
2484    @Override
2485    protected void performFlush(InternalScanner scanner, final CellSink sink,
2486      ThroughputController throughputController) throws IOException {
2487
2488      final int currentCount = counter.get();
2489      CellSink newCellSink = (cell) -> {
2490        if (currentCount <= failCounter.get()) {
2491          throw new IOException("Simulated exception by tests");
2492        }
2493        sink.append(cell);
2494      };
2495      super.performFlush(scanner, newCellSink, throughputController);
2496    }
2497  }
2498
2499  /**
2500   * This test is for HBASE-26494, test the {@link RefCnt} behaviors in {@link ImmutableMemStoreLAB}
2501   */
2502  @Test
2503  public void testImmutableMemStoreLABRefCnt() throws Exception {
2504    Configuration conf = HBaseConfiguration.create();
2505
2506    byte[] smallValue = new byte[3];
2507    byte[] largeValue = new byte[9];
2508    final long timestamp = EnvironmentEdgeManager.currentTime();
2509    final long seqId = 100;
2510    final Cell smallCell1 = createCell(qf1, timestamp, seqId, smallValue);
2511    final Cell largeCell1 = createCell(qf2, timestamp, seqId, largeValue);
2512    final Cell smallCell2 = createCell(qf3, timestamp, seqId + 1, smallValue);
2513    final Cell largeCell2 = createCell(qf4, timestamp, seqId + 1, largeValue);
2514    final Cell smallCell3 = createCell(qf5, timestamp, seqId + 2, smallValue);
2515    final Cell largeCell3 = createCell(qf6, timestamp, seqId + 2, largeValue);
2516
2517    int smallCellByteSize = MutableSegment.getCellLength(smallCell1);
2518    int largeCellByteSize = MutableSegment.getCellLength(largeCell1);
2519    int firstWriteCellByteSize = (smallCellByteSize + largeCellByteSize);
2520    int flushByteSize = firstWriteCellByteSize - 2;
2521
2522    // set CompactingMemStore.inmemoryFlushSize to flushByteSize.
2523    conf.set(HStore.MEMSTORE_CLASS_NAME, CompactingMemStore.class.getName());
2524    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
2525    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
2526    conf.setBoolean(WALFactory.WAL_ENABLED, false);
2527
2528    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
2529      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
2530
2531    final CompactingMemStore myCompactingMemStore = ((CompactingMemStore) store.memstore);
2532    assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
2533    myCompactingMemStore.allowCompaction.set(false);
2534
2535    NonThreadSafeMemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
2536    store.add(smallCell1, memStoreSizing);
2537    store.add(largeCell1, memStoreSizing);
2538    store.add(smallCell2, memStoreSizing);
2539    store.add(largeCell2, memStoreSizing);
2540    store.add(smallCell3, memStoreSizing);
2541    store.add(largeCell3, memStoreSizing);
2542    VersionedSegmentsList versionedSegmentsList = myCompactingMemStore.getImmutableSegments();
2543    assertTrue(versionedSegmentsList.getNumOfSegments() == 3);
2544    List<ImmutableSegment> segments = versionedSegmentsList.getStoreSegments();
2545    List<MemStoreLABImpl> memStoreLABs = new ArrayList<MemStoreLABImpl>(segments.size());
2546    for (ImmutableSegment segment : segments) {
2547      memStoreLABs.add((MemStoreLABImpl) segment.getMemStoreLAB());
2548    }
2549    List<KeyValueScanner> scanners1 = myCompactingMemStore.getScanners(Long.MAX_VALUE);
2550    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2551      assertTrue(memStoreLAB.getRefCntValue() == 2);
2552    }
2553
2554    myCompactingMemStore.allowCompaction.set(true);
2555    myCompactingMemStore.flushInMemory();
2556
2557    versionedSegmentsList = myCompactingMemStore.getImmutableSegments();
2558    assertTrue(versionedSegmentsList.getNumOfSegments() == 1);
2559    ImmutableMemStoreLAB immutableMemStoreLAB =
2560      (ImmutableMemStoreLAB) (versionedSegmentsList.getStoreSegments().get(0).getMemStoreLAB());
2561    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2562      assertTrue(memStoreLAB.getRefCntValue() == 2);
2563    }
2564
2565    List<KeyValueScanner> scanners2 = myCompactingMemStore.getScanners(Long.MAX_VALUE);
2566    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2567      assertTrue(memStoreLAB.getRefCntValue() == 2);
2568    }
2569    assertTrue(immutableMemStoreLAB.getRefCntValue() == 2);
2570    for (KeyValueScanner scanner : scanners1) {
2571      scanner.close();
2572    }
2573    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2574      assertTrue(memStoreLAB.getRefCntValue() == 1);
2575    }
2576    for (KeyValueScanner scanner : scanners2) {
2577      scanner.close();
2578    }
2579    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2580      assertTrue(memStoreLAB.getRefCntValue() == 1);
2581    }
2582    assertTrue(immutableMemStoreLAB.getRefCntValue() == 1);
2583    flushStore(store, id++);
2584    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2585      assertTrue(memStoreLAB.getRefCntValue() == 0);
2586    }
2587    assertTrue(immutableMemStoreLAB.getRefCntValue() == 0);
2588    assertTrue(immutableMemStoreLAB.isClosed());
2589    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2590      assertTrue(memStoreLAB.isClosed());
2591      assertTrue(memStoreLAB.isReclaimed());
2592      assertTrue(memStoreLAB.chunks.isEmpty());
2593    }
2594  }
2595
2596  private HStoreFile mockStoreFileWithLength(long length) {
2597    HStoreFile sf = mock(HStoreFile.class);
2598    StoreFileReader sfr = mock(StoreFileReader.class);
2599    when(sf.isHFile()).thenReturn(true);
2600    when(sf.getReader()).thenReturn(sfr);
2601    when(sfr.length()).thenReturn(length);
2602    return sf;
2603  }
2604
2605  private static class MyThread extends Thread {
2606    private StoreScanner scanner;
2607    private KeyValueHeap heap;
2608
2609    public MyThread(StoreScanner scanner) {
2610      this.scanner = scanner;
2611    }
2612
2613    public KeyValueHeap getHeap() {
2614      return this.heap;
2615    }
2616
2617    @Override
2618    public void run() {
2619      scanner.trySwitchToStreamRead();
2620      heap = scanner.heap;
2621    }
2622  }
2623
2624  private static class MyMemStoreCompactor extends MemStoreCompactor {
2625    private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0);
2626    private static final CountDownLatch START_COMPACTOR_LATCH = new CountDownLatch(1);
2627
2628    public MyMemStoreCompactor(CompactingMemStore compactingMemStore,
2629      MemoryCompactionPolicy compactionPolicy) throws IllegalArgumentIOException {
2630      super(compactingMemStore, compactionPolicy);
2631    }
2632
2633    @Override
2634    public boolean start() throws IOException {
2635      boolean isFirst = RUNNER_COUNT.getAndIncrement() == 0;
2636      if (isFirst) {
2637        try {
2638          START_COMPACTOR_LATCH.await();
2639          return super.start();
2640        } catch (InterruptedException ex) {
2641          throw new RuntimeException(ex);
2642        }
2643      }
2644      return super.start();
2645    }
2646  }
2647
2648  public static class MyCompactingMemStoreWithCustomCompactor extends CompactingMemStore {
2649    private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0);
2650
2651    public MyCompactingMemStoreWithCustomCompactor(Configuration conf, CellComparatorImpl c,
2652      HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
2653      throws IOException {
2654      super(conf, c, store, regionServices, compactionPolicy);
2655    }
2656
2657    @Override
2658    protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy)
2659      throws IllegalArgumentIOException {
2660      return new MyMemStoreCompactor(this, compactionPolicy);
2661    }
2662
2663    @Override
2664    protected boolean setInMemoryCompactionFlag() {
2665      boolean rval = super.setInMemoryCompactionFlag();
2666      if (rval) {
2667        RUNNER_COUNT.incrementAndGet();
2668        if (LOG.isDebugEnabled()) {
2669          LOG.debug("runner count: " + RUNNER_COUNT.get());
2670        }
2671      }
2672      return rval;
2673    }
2674  }
2675
2676  public static class MyCompactingMemStore extends CompactingMemStore {
2677    private static final AtomicBoolean START_TEST = new AtomicBoolean(false);
2678    private final CountDownLatch getScannerLatch = new CountDownLatch(1);
2679    private final CountDownLatch snapshotLatch = new CountDownLatch(1);
2680
2681    public MyCompactingMemStore(Configuration conf, CellComparatorImpl c, HStore store,
2682      RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
2683      throws IOException {
2684      super(conf, c, store, regionServices, compactionPolicy);
2685    }
2686
2687    @Override
2688    protected List<KeyValueScanner> createList(int capacity) {
2689      if (START_TEST.get()) {
2690        try {
2691          getScannerLatch.countDown();
2692          snapshotLatch.await();
2693        } catch (InterruptedException e) {
2694          throw new RuntimeException(e);
2695        }
2696      }
2697      return new ArrayList<>(capacity);
2698    }
2699
2700    @Override
2701    protected void pushActiveToPipeline(MutableSegment active, boolean checkEmpty) {
2702      if (START_TEST.get()) {
2703        try {
2704          getScannerLatch.await();
2705        } catch (InterruptedException e) {
2706          throw new RuntimeException(e);
2707        }
2708      }
2709
2710      super.pushActiveToPipeline(active, checkEmpty);
2711      if (START_TEST.get()) {
2712        snapshotLatch.countDown();
2713      }
2714    }
2715  }
2716
2717  interface MyListHook {
2718    void hook(int currentSize);
2719  }
2720
2721  private static class MyList<T> implements List<T> {
2722    private final List<T> delegatee = new ArrayList<>();
2723    private final MyListHook hookAtAdd;
2724
2725    MyList(final MyListHook hookAtAdd) {
2726      this.hookAtAdd = hookAtAdd;
2727    }
2728
2729    @Override
2730    public int size() {
2731      return delegatee.size();
2732    }
2733
2734    @Override
2735    public boolean isEmpty() {
2736      return delegatee.isEmpty();
2737    }
2738
2739    @Override
2740    public boolean contains(Object o) {
2741      return delegatee.contains(o);
2742    }
2743
2744    @Override
2745    public Iterator<T> iterator() {
2746      return delegatee.iterator();
2747    }
2748
2749    @Override
2750    public Object[] toArray() {
2751      return delegatee.toArray();
2752    }
2753
2754    @Override
2755    public <R> R[] toArray(R[] a) {
2756      return delegatee.toArray(a);
2757    }
2758
2759    @Override
2760    public boolean add(T e) {
2761      hookAtAdd.hook(size());
2762      return delegatee.add(e);
2763    }
2764
2765    @Override
2766    public boolean remove(Object o) {
2767      return delegatee.remove(o);
2768    }
2769
2770    @Override
2771    public boolean containsAll(Collection<?> c) {
2772      return delegatee.containsAll(c);
2773    }
2774
2775    @Override
2776    public boolean addAll(Collection<? extends T> c) {
2777      return delegatee.addAll(c);
2778    }
2779
2780    @Override
2781    public boolean addAll(int index, Collection<? extends T> c) {
2782      return delegatee.addAll(index, c);
2783    }
2784
2785    @Override
2786    public boolean removeAll(Collection<?> c) {
2787      return delegatee.removeAll(c);
2788    }
2789
2790    @Override
2791    public boolean retainAll(Collection<?> c) {
2792      return delegatee.retainAll(c);
2793    }
2794
2795    @Override
2796    public void clear() {
2797      delegatee.clear();
2798    }
2799
2800    @Override
2801    public T get(int index) {
2802      return delegatee.get(index);
2803    }
2804
2805    @Override
2806    public T set(int index, T element) {
2807      return delegatee.set(index, element);
2808    }
2809
2810    @Override
2811    public void add(int index, T element) {
2812      delegatee.add(index, element);
2813    }
2814
2815    @Override
2816    public T remove(int index) {
2817      return delegatee.remove(index);
2818    }
2819
2820    @Override
2821    public int indexOf(Object o) {
2822      return delegatee.indexOf(o);
2823    }
2824
2825    @Override
2826    public int lastIndexOf(Object o) {
2827      return delegatee.lastIndexOf(o);
2828    }
2829
2830    @Override
2831    public ListIterator<T> listIterator() {
2832      return delegatee.listIterator();
2833    }
2834
2835    @Override
2836    public ListIterator<T> listIterator(int index) {
2837      return delegatee.listIterator(index);
2838    }
2839
2840    @Override
2841    public List<T> subList(int fromIndex, int toIndex) {
2842      return delegatee.subList(fromIndex, toIndex);
2843    }
2844  }
2845
2846  public static class MyCompactingMemStore2 extends CompactingMemStore {
2847    private static final String LARGE_CELL_THREAD_NAME = "largeCellThread";
2848    private static final String SMALL_CELL_THREAD_NAME = "smallCellThread";
2849    private final CyclicBarrier preCyclicBarrier = new CyclicBarrier(2);
2850    private final CyclicBarrier postCyclicBarrier = new CyclicBarrier(2);
2851    private final AtomicInteger largeCellPreUpdateCounter = new AtomicInteger(0);
2852    private final AtomicInteger smallCellPreUpdateCounter = new AtomicInteger(0);
2853
2854    public MyCompactingMemStore2(Configuration conf, CellComparatorImpl cellComparator,
2855      HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
2856      throws IOException {
2857      super(conf, cellComparator, store, regionServices, compactionPolicy);
2858    }
2859
2860    @Override
2861    protected boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd,
2862      MemStoreSizing memstoreSizing) {
2863      if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) {
2864        int currentCount = largeCellPreUpdateCounter.incrementAndGet();
2865        if (currentCount <= 1) {
2866          try {
2867            /**
2868             * smallCellThread enters CompactingMemStore.checkAndAddToActiveSize first, then
2869             * largeCellThread enters CompactingMemStore.checkAndAddToActiveSize, and then
2870             * largeCellThread invokes flushInMemory.
2871             */
2872            preCyclicBarrier.await();
2873          } catch (Throwable e) {
2874            throw new RuntimeException(e);
2875          }
2876        }
2877      }
2878
2879      boolean returnValue = super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing);
2880      if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) {
2881        try {
2882          preCyclicBarrier.await();
2883        } catch (Throwable e) {
2884          throw new RuntimeException(e);
2885        }
2886      }
2887      return returnValue;
2888    }
2889
2890    @Override
2891    protected void doAdd(MutableSegment currentActive, Cell cell, MemStoreSizing memstoreSizing) {
2892      if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) {
2893        try {
2894          /**
2895           * After largeCellThread finished flushInMemory method, smallCellThread can add cell to
2896           * currentActive . That is to say when largeCellThread called flushInMemory method,
2897           * currentActive has no cell.
2898           */
2899          postCyclicBarrier.await();
2900        } catch (Throwable e) {
2901          throw new RuntimeException(e);
2902        }
2903      }
2904      super.doAdd(currentActive, cell, memstoreSizing);
2905    }
2906
2907    @Override
2908    protected void flushInMemory(MutableSegment currentActiveMutableSegment) {
2909      super.flushInMemory(currentActiveMutableSegment);
2910      if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) {
2911        if (largeCellPreUpdateCounter.get() <= 1) {
2912          try {
2913            postCyclicBarrier.await();
2914          } catch (Throwable e) {
2915            throw new RuntimeException(e);
2916          }
2917        }
2918      }
2919    }
2920
2921  }
2922
2923  public static class MyCompactingMemStore3 extends CompactingMemStore {
2924    private static final String LARGE_CELL_THREAD_NAME = "largeCellThread";
2925    private static final String SMALL_CELL_THREAD_NAME = "smallCellThread";
2926
2927    private final CyclicBarrier preCyclicBarrier = new CyclicBarrier(2);
2928    private final CyclicBarrier postCyclicBarrier = new CyclicBarrier(2);
2929    private final AtomicInteger flushCounter = new AtomicInteger(0);
2930    private static final int CELL_COUNT = 5;
2931    private boolean flushByteSizeLessThanSmallAndLargeCellSize = true;
2932
2933    public MyCompactingMemStore3(Configuration conf, CellComparatorImpl cellComparator,
2934      HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
2935      throws IOException {
2936      super(conf, cellComparator, store, regionServices, compactionPolicy);
2937    }
2938
2939    @Override
2940    protected boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd,
2941      MemStoreSizing memstoreSizing) {
2942      if (!flushByteSizeLessThanSmallAndLargeCellSize) {
2943        return super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing);
2944      }
2945      if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) {
2946        try {
2947          preCyclicBarrier.await();
2948        } catch (Throwable e) {
2949          throw new RuntimeException(e);
2950        }
2951      }
2952
2953      boolean returnValue = super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing);
2954      if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) {
2955        try {
2956          preCyclicBarrier.await();
2957        } catch (Throwable e) {
2958          throw new RuntimeException(e);
2959        }
2960      }
2961      return returnValue;
2962    }
2963
2964    @Override
2965    protected void postUpdate(MutableSegment currentActiveMutableSegment) {
2966      super.postUpdate(currentActiveMutableSegment);
2967      if (!flushByteSizeLessThanSmallAndLargeCellSize) {
2968        try {
2969          postCyclicBarrier.await();
2970        } catch (Throwable e) {
2971          throw new RuntimeException(e);
2972        }
2973        return;
2974      }
2975
2976      if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) {
2977        try {
2978          postCyclicBarrier.await();
2979        } catch (Throwable e) {
2980          throw new RuntimeException(e);
2981        }
2982      }
2983    }
2984
2985    @Override
2986    protected void flushInMemory(MutableSegment currentActiveMutableSegment) {
2987      super.flushInMemory(currentActiveMutableSegment);
2988      flushCounter.incrementAndGet();
2989      if (!flushByteSizeLessThanSmallAndLargeCellSize) {
2990        return;
2991      }
2992
2993      assertTrue(Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME));
2994      try {
2995        postCyclicBarrier.await();
2996      } catch (Throwable e) {
2997        throw new RuntimeException(e);
2998      }
2999
3000    }
3001
3002    void disableCompaction() {
3003      allowCompaction.set(false);
3004    }
3005
3006    void enableCompaction() {
3007      allowCompaction.set(true);
3008    }
3009
3010  }
3011
3012  public static class MyCompactingMemStore4 extends CompactingMemStore {
3013    private static final String TAKE_SNAPSHOT_THREAD_NAME = "takeSnapShotThread";
3014    /**
3015     * {@link CompactingMemStore#flattenOneSegment} must execute after
3016     * {@link CompactingMemStore#getImmutableSegments}
3017     */
3018    private final CyclicBarrier flattenOneSegmentPreCyclicBarrier = new CyclicBarrier(2);
3019    /**
3020     * Only after {@link CompactingMemStore#flattenOneSegment} completed,
3021     * {@link CompactingMemStore#swapPipelineWithNull} could execute.
3022     */
3023    private final CyclicBarrier flattenOneSegmentPostCyclicBarrier = new CyclicBarrier(2);
3024    /**
3025     * Only the in memory compact thread enters {@link CompactingMemStore#flattenOneSegment},the
3026     * snapshot thread starts {@link CompactingMemStore#snapshot},because
3027     * {@link CompactingMemStore#snapshot} would invoke {@link CompactingMemStore#stopCompaction}.
3028     */
3029    private final CyclicBarrier snapShotStartCyclicCyclicBarrier = new CyclicBarrier(2);
3030    /**
3031     * To wait for {@link CompactingMemStore.InMemoryCompactionRunnable} stopping.
3032     */
3033    private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2);
3034    private final AtomicInteger getImmutableSegmentsListCounter = new AtomicInteger(0);
3035    private final AtomicInteger swapPipelineWithNullCounter = new AtomicInteger(0);
3036    private final AtomicInteger flattenOneSegmentCounter = new AtomicInteger(0);
3037    private final AtomicInteger setInMemoryCompactionFlagCounter = new AtomicInteger(0);
3038
3039    public MyCompactingMemStore4(Configuration conf, CellComparatorImpl cellComparator,
3040      HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
3041      throws IOException {
3042      super(conf, cellComparator, store, regionServices, compactionPolicy);
3043    }
3044
3045    @Override
3046    public VersionedSegmentsList getImmutableSegments() {
3047      VersionedSegmentsList result = super.getImmutableSegments();
3048      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
3049        int currentCount = getImmutableSegmentsListCounter.incrementAndGet();
3050        if (currentCount <= 1) {
3051          try {
3052            flattenOneSegmentPreCyclicBarrier.await();
3053          } catch (Throwable e) {
3054            throw new RuntimeException(e);
3055          }
3056        }
3057      }
3058      return result;
3059    }
3060
3061    @Override
3062    protected boolean swapPipelineWithNull(VersionedSegmentsList segments) {
3063      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
3064        int currentCount = swapPipelineWithNullCounter.incrementAndGet();
3065        if (currentCount <= 1) {
3066          try {
3067            flattenOneSegmentPostCyclicBarrier.await();
3068          } catch (Throwable e) {
3069            throw new RuntimeException(e);
3070          }
3071        }
3072      }
3073      boolean result = super.swapPipelineWithNull(segments);
3074      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
3075        int currentCount = swapPipelineWithNullCounter.get();
3076        if (currentCount <= 1) {
3077          assertTrue(!result);
3078        }
3079        if (currentCount == 2) {
3080          assertTrue(result);
3081        }
3082      }
3083      return result;
3084
3085    }
3086
3087    @Override
3088    public void flattenOneSegment(long requesterVersion, Action action) {
3089      int currentCount = flattenOneSegmentCounter.incrementAndGet();
3090      if (currentCount <= 1) {
3091        try {
3092          /**
3093           * {@link CompactingMemStore#snapshot} could start.
3094           */
3095          snapShotStartCyclicCyclicBarrier.await();
3096          flattenOneSegmentPreCyclicBarrier.await();
3097        } catch (Throwable e) {
3098          throw new RuntimeException(e);
3099        }
3100      }
3101      super.flattenOneSegment(requesterVersion, action);
3102      if (currentCount <= 1) {
3103        try {
3104          flattenOneSegmentPostCyclicBarrier.await();
3105        } catch (Throwable e) {
3106          throw new RuntimeException(e);
3107        }
3108      }
3109    }
3110
3111    @Override
3112    protected boolean setInMemoryCompactionFlag() {
3113      boolean result = super.setInMemoryCompactionFlag();
3114      assertTrue(result);
3115      setInMemoryCompactionFlagCounter.incrementAndGet();
3116      return result;
3117    }
3118
3119    @Override
3120    void inMemoryCompaction() {
3121      try {
3122        super.inMemoryCompaction();
3123      } finally {
3124        try {
3125          inMemoryCompactionEndCyclicBarrier.await();
3126        } catch (Throwable e) {
3127          throw new RuntimeException(e);
3128        }
3129
3130      }
3131    }
3132
3133  }
3134
3135  public static class MyCompactingMemStore5 extends CompactingMemStore {
3136    private static final String TAKE_SNAPSHOT_THREAD_NAME = "takeSnapShotThread";
3137    private static final String WRITE_AGAIN_THREAD_NAME = "writeAgainThread";
3138    /**
3139     * {@link CompactingMemStore#flattenOneSegment} must execute after
3140     * {@link CompactingMemStore#getImmutableSegments}
3141     */
3142    private final CyclicBarrier flattenOneSegmentPreCyclicBarrier = new CyclicBarrier(2);
3143    /**
3144     * Only after {@link CompactingMemStore#flattenOneSegment} completed,
3145     * {@link CompactingMemStore#swapPipelineWithNull} could execute.
3146     */
3147    private final CyclicBarrier flattenOneSegmentPostCyclicBarrier = new CyclicBarrier(2);
3148    /**
3149     * Only the in memory compact thread enters {@link CompactingMemStore#flattenOneSegment},the
3150     * snapshot thread starts {@link CompactingMemStore#snapshot},because
3151     * {@link CompactingMemStore#snapshot} would invoke {@link CompactingMemStore#stopCompaction}.
3152     */
3153    private final CyclicBarrier snapShotStartCyclicCyclicBarrier = new CyclicBarrier(2);
3154    /**
3155     * To wait for {@link CompactingMemStore.InMemoryCompactionRunnable} stopping.
3156     */
3157    private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2);
3158    private final AtomicInteger getImmutableSegmentsListCounter = new AtomicInteger(0);
3159    private final AtomicInteger swapPipelineWithNullCounter = new AtomicInteger(0);
3160    private final AtomicInteger flattenOneSegmentCounter = new AtomicInteger(0);
3161    private final AtomicInteger setInMemoryCompactionFlagCounter = new AtomicInteger(0);
3162    /**
3163     * Only the snapshot thread retry {@link CompactingMemStore#swapPipelineWithNull}, writeAgain
3164     * thread could start.
3165     */
3166    private final CyclicBarrier writeMemStoreAgainStartCyclicBarrier = new CyclicBarrier(2);
3167    /**
3168     * This is used for snapshot thread,writeAgain thread and in memory compact thread. Only the
3169     * writeAgain thread completes, {@link CompactingMemStore#swapPipelineWithNull} would
3170     * execute,and in memory compact thread would exit,because we expect that in memory compact
3171     * executing only once.
3172     */
3173    private final CyclicBarrier writeMemStoreAgainEndCyclicBarrier = new CyclicBarrier(3);
3174
3175    public MyCompactingMemStore5(Configuration conf, CellComparatorImpl cellComparator,
3176      HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
3177      throws IOException {
3178      super(conf, cellComparator, store, regionServices, compactionPolicy);
3179    }
3180
3181    @Override
3182    public VersionedSegmentsList getImmutableSegments() {
3183      VersionedSegmentsList result = super.getImmutableSegments();
3184      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
3185        int currentCount = getImmutableSegmentsListCounter.incrementAndGet();
3186        if (currentCount <= 1) {
3187          try {
3188            flattenOneSegmentPreCyclicBarrier.await();
3189          } catch (Throwable e) {
3190            throw new RuntimeException(e);
3191          }
3192        }
3193
3194      }
3195
3196      return result;
3197    }
3198
3199    @Override
3200    protected boolean swapPipelineWithNull(VersionedSegmentsList segments) {
3201      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
3202        int currentCount = swapPipelineWithNullCounter.incrementAndGet();
3203        if (currentCount <= 1) {
3204          try {
3205            flattenOneSegmentPostCyclicBarrier.await();
3206          } catch (Throwable e) {
3207            throw new RuntimeException(e);
3208          }
3209        }
3210
3211        if (currentCount == 2) {
3212          try {
3213            /**
3214             * Only the snapshot thread retry {@link CompactingMemStore#swapPipelineWithNull},
3215             * writeAgain thread could start.
3216             */
3217            writeMemStoreAgainStartCyclicBarrier.await();
3218            /**
3219             * Only the writeAgain thread completes, retry
3220             * {@link CompactingMemStore#swapPipelineWithNull} would execute.
3221             */
3222            writeMemStoreAgainEndCyclicBarrier.await();
3223          } catch (Throwable e) {
3224            throw new RuntimeException(e);
3225          }
3226        }
3227
3228      }
3229      boolean result = super.swapPipelineWithNull(segments);
3230      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
3231        int currentCount = swapPipelineWithNullCounter.get();
3232        if (currentCount <= 1) {
3233          assertTrue(!result);
3234        }
3235        if (currentCount == 2) {
3236          assertTrue(result);
3237        }
3238      }
3239      return result;
3240
3241    }
3242
3243    @Override
3244    public void flattenOneSegment(long requesterVersion, Action action) {
3245      int currentCount = flattenOneSegmentCounter.incrementAndGet();
3246      if (currentCount <= 1) {
3247        try {
3248          /**
3249           * {@link CompactingMemStore#snapshot} could start.
3250           */
3251          snapShotStartCyclicCyclicBarrier.await();
3252          flattenOneSegmentPreCyclicBarrier.await();
3253        } catch (Throwable e) {
3254          throw new RuntimeException(e);
3255        }
3256      }
3257      super.flattenOneSegment(requesterVersion, action);
3258      if (currentCount <= 1) {
3259        try {
3260          flattenOneSegmentPostCyclicBarrier.await();
3261          /**
3262           * Only the writeAgain thread completes, in memory compact thread would exit,because we
3263           * expect that in memory compact executing only once.
3264           */
3265          writeMemStoreAgainEndCyclicBarrier.await();
3266        } catch (Throwable e) {
3267          throw new RuntimeException(e);
3268        }
3269
3270      }
3271    }
3272
3273    @Override
3274    protected boolean setInMemoryCompactionFlag() {
3275      boolean result = super.setInMemoryCompactionFlag();
3276      int count = setInMemoryCompactionFlagCounter.incrementAndGet();
3277      if (count <= 1) {
3278        assertTrue(result);
3279      }
3280      if (count == 2) {
3281        assertTrue(!result);
3282      }
3283      return result;
3284    }
3285
3286    @Override
3287    void inMemoryCompaction() {
3288      try {
3289        super.inMemoryCompaction();
3290      } finally {
3291        try {
3292          inMemoryCompactionEndCyclicBarrier.await();
3293        } catch (Throwable e) {
3294          throw new RuntimeException(e);
3295        }
3296
3297      }
3298    }
3299  }
3300
3301  public static class MyCompactingMemStore6 extends CompactingMemStore {
3302    private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2);
3303
3304    public MyCompactingMemStore6(Configuration conf, CellComparatorImpl cellComparator,
3305      HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
3306      throws IOException {
3307      super(conf, cellComparator, store, regionServices, compactionPolicy);
3308    }
3309
3310    @Override
3311    void inMemoryCompaction() {
3312      try {
3313        super.inMemoryCompaction();
3314      } finally {
3315        try {
3316          inMemoryCompactionEndCyclicBarrier.await();
3317        } catch (Throwable e) {
3318          throw new RuntimeException(e);
3319        }
3320
3321      }
3322    }
3323  }
3324
3325  public static class MyDefaultMemStore extends DefaultMemStore {
3326    private static final String GET_SCANNER_THREAD_NAME = "getScannerMyThread";
3327    private static final String FLUSH_THREAD_NAME = "flushMyThread";
3328    /**
3329     * Only when flush thread enters {@link DefaultMemStore#doClearSnapShot}, getScanner thread
3330     * could start.
3331     */
3332    private final CyclicBarrier getScannerCyclicBarrier = new CyclicBarrier(2);
3333    /**
3334     * Used by getScanner thread notifies flush thread {@link DefaultMemStore#getSnapshotSegments}
3335     * completed, {@link DefaultMemStore#doClearSnapShot} could continue.
3336     */
3337    private final CyclicBarrier preClearSnapShotCyclicBarrier = new CyclicBarrier(2);
3338    /**
3339     * Used by flush thread notifies getScanner thread {@link DefaultMemStore#doClearSnapShot}
3340     * completed, {@link DefaultMemStore#getScanners} could continue.
3341     */
3342    private final CyclicBarrier postClearSnapShotCyclicBarrier = new CyclicBarrier(2);
3343    private final AtomicInteger getSnapshotSegmentsCounter = new AtomicInteger(0);
3344    private final AtomicInteger clearSnapshotCounter = new AtomicInteger(0);
3345    private volatile boolean shouldWait = true;
3346    private volatile HStore store = null;
3347
3348    public MyDefaultMemStore(Configuration conf, CellComparator cellComparator,
3349      RegionServicesForStores regionServices) throws IOException {
3350      super(conf, cellComparator, regionServices);
3351    }
3352
3353    @Override
3354    protected List<Segment> getSnapshotSegments() {
3355
3356      List<Segment> result = super.getSnapshotSegments();
3357
3358      if (Thread.currentThread().getName().equals(GET_SCANNER_THREAD_NAME)) {
3359        int currentCount = getSnapshotSegmentsCounter.incrementAndGet();
3360        if (currentCount == 1) {
3361          if (this.shouldWait) {
3362            try {
3363              /**
3364               * Notify flush thread {@link DefaultMemStore#getSnapshotSegments} completed,
3365               * {@link DefaultMemStore#doClearSnapShot} could continue.
3366               */
3367              preClearSnapShotCyclicBarrier.await();
3368              /**
3369               * Wait for {@link DefaultMemStore#doClearSnapShot} completed.
3370               */
3371              postClearSnapShotCyclicBarrier.await();
3372
3373            } catch (Throwable e) {
3374              throw new RuntimeException(e);
3375            }
3376          }
3377        }
3378      }
3379      return result;
3380    }
3381
3382    @Override
3383    protected void doClearSnapShot() {
3384      if (Thread.currentThread().getName().equals(FLUSH_THREAD_NAME)) {
3385        int currentCount = clearSnapshotCounter.incrementAndGet();
3386        if (currentCount == 1) {
3387          try {
3388            if (
3389              ((ReentrantReadWriteLock) store.getStoreEngine().getLock())
3390                .isWriteLockedByCurrentThread()
3391            ) {
3392              shouldWait = false;
3393            }
3394            /**
3395             * Only when flush thread enters {@link DefaultMemStore#doClearSnapShot}, getScanner
3396             * thread could start.
3397             */
3398            getScannerCyclicBarrier.await();
3399
3400            if (shouldWait) {
3401              /**
3402               * Wait for {@link DefaultMemStore#getSnapshotSegments} completed.
3403               */
3404              preClearSnapShotCyclicBarrier.await();
3405            }
3406          } catch (Throwable e) {
3407            throw new RuntimeException(e);
3408          }
3409        }
3410      }
3411      super.doClearSnapShot();
3412
3413      if (Thread.currentThread().getName().equals(FLUSH_THREAD_NAME)) {
3414        int currentCount = clearSnapshotCounter.get();
3415        if (currentCount == 1) {
3416          if (shouldWait) {
3417            try {
3418              /**
3419               * Notify getScanner thread {@link DefaultMemStore#doClearSnapShot} completed,
3420               * {@link DefaultMemStore#getScanners} could continue.
3421               */
3422              postClearSnapShotCyclicBarrier.await();
3423            } catch (Throwable e) {
3424              throw new RuntimeException(e);
3425            }
3426          }
3427        }
3428      }
3429    }
3430  }
3431}