001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.regionserver.DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY;
021import static org.junit.Assert.assertArrayEquals;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertFalse;
024import static org.junit.Assert.assertNull;
025import static org.junit.Assert.assertTrue;
026import static org.junit.Assert.fail;
027import static org.mockito.ArgumentMatchers.any;
028import static org.mockito.Mockito.mock;
029import static org.mockito.Mockito.spy;
030import static org.mockito.Mockito.times;
031import static org.mockito.Mockito.verify;
032import static org.mockito.Mockito.when;
033
034import java.io.FileNotFoundException;
035import java.io.IOException;
036import java.lang.ref.SoftReference;
037import java.security.PrivilegedExceptionAction;
038import java.util.ArrayList;
039import java.util.Arrays;
040import java.util.Collection;
041import java.util.Collections;
042import java.util.Iterator;
043import java.util.List;
044import java.util.ListIterator;
045import java.util.NavigableSet;
046import java.util.Optional;
047import java.util.TreeSet;
048import java.util.concurrent.BrokenBarrierException;
049import java.util.concurrent.ConcurrentSkipListSet;
050import java.util.concurrent.CountDownLatch;
051import java.util.concurrent.CyclicBarrier;
052import java.util.concurrent.ExecutorService;
053import java.util.concurrent.Executors;
054import java.util.concurrent.Future;
055import java.util.concurrent.ThreadPoolExecutor;
056import java.util.concurrent.TimeUnit;
057import java.util.concurrent.atomic.AtomicBoolean;
058import java.util.concurrent.atomic.AtomicInteger;
059import java.util.concurrent.atomic.AtomicLong;
060import java.util.concurrent.atomic.AtomicReference;
061import java.util.concurrent.locks.ReentrantReadWriteLock;
062import java.util.function.Consumer;
063import java.util.function.IntBinaryOperator;
064import org.apache.hadoop.conf.Configuration;
065import org.apache.hadoop.fs.FSDataOutputStream;
066import org.apache.hadoop.fs.FileStatus;
067import org.apache.hadoop.fs.FileSystem;
068import org.apache.hadoop.fs.FilterFileSystem;
069import org.apache.hadoop.fs.LocalFileSystem;
070import org.apache.hadoop.fs.Path;
071import org.apache.hadoop.fs.permission.FsPermission;
072import org.apache.hadoop.hbase.Cell;
073import org.apache.hadoop.hbase.CellBuilderFactory;
074import org.apache.hadoop.hbase.CellBuilderType;
075import org.apache.hadoop.hbase.CellComparator;
076import org.apache.hadoop.hbase.CellComparatorImpl;
077import org.apache.hadoop.hbase.CellUtil;
078import org.apache.hadoop.hbase.ExtendedCell;
079import org.apache.hadoop.hbase.HBaseClassTestRule;
080import org.apache.hadoop.hbase.HBaseConfiguration;
081import org.apache.hadoop.hbase.HBaseTestingUtil;
082import org.apache.hadoop.hbase.HConstants;
083import org.apache.hadoop.hbase.KeyValue;
084import org.apache.hadoop.hbase.MemoryCompactionPolicy;
085import org.apache.hadoop.hbase.PrivateCellUtil;
086import org.apache.hadoop.hbase.TableName;
087import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
088import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
089import org.apache.hadoop.hbase.client.Get;
090import org.apache.hadoop.hbase.client.RegionInfo;
091import org.apache.hadoop.hbase.client.RegionInfoBuilder;
092import org.apache.hadoop.hbase.client.Scan;
093import org.apache.hadoop.hbase.client.Scan.ReadType;
094import org.apache.hadoop.hbase.client.TableDescriptor;
095import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
096import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
097import org.apache.hadoop.hbase.filter.Filter;
098import org.apache.hadoop.hbase.filter.FilterBase;
099import org.apache.hadoop.hbase.io.compress.Compression;
100import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
101import org.apache.hadoop.hbase.io.hfile.CacheConfig;
102import org.apache.hadoop.hbase.io.hfile.HFile;
103import org.apache.hadoop.hbase.io.hfile.HFileContext;
104import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
105import org.apache.hadoop.hbase.monitoring.MonitoredTask;
106import org.apache.hadoop.hbase.nio.RefCnt;
107import org.apache.hadoop.hbase.quotas.RegionSizeStoreImpl;
108import org.apache.hadoop.hbase.regionserver.ChunkCreator.ChunkType;
109import org.apache.hadoop.hbase.regionserver.MemStoreCompactionStrategy.Action;
110import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
111import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
112import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
113import org.apache.hadoop.hbase.regionserver.compactions.EverythingPolicy;
114import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
115import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
116import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
117import org.apache.hadoop.hbase.security.User;
118import org.apache.hadoop.hbase.testclassification.MediumTests;
119import org.apache.hadoop.hbase.testclassification.RegionServerTests;
120import org.apache.hadoop.hbase.util.BloomFilterUtil;
121import org.apache.hadoop.hbase.util.Bytes;
122import org.apache.hadoop.hbase.util.CommonFSUtils;
123import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
124import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
125import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
126import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
127import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
128import org.apache.hadoop.hbase.wal.WALFactory;
129import org.apache.hadoop.util.Progressable;
130import org.junit.After;
131import org.junit.AfterClass;
132import org.junit.Before;
133import org.junit.ClassRule;
134import org.junit.Rule;
135import org.junit.Test;
136import org.junit.experimental.categories.Category;
137import org.junit.rules.TestName;
138import org.mockito.Mockito;
139import org.slf4j.Logger;
140import org.slf4j.LoggerFactory;
141
142import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
143
144/**
145 * Test class for the HStore
146 */
147@Category({ RegionServerTests.class, MediumTests.class })
148public class TestHStore {
149
150  @ClassRule
151  public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestHStore.class);
152
153  private static final Logger LOG = LoggerFactory.getLogger(TestHStore.class);
154  @Rule
155  public TestName name = new TestName();
156
157  HRegion region;
158  HStore store;
159  byte[] table = Bytes.toBytes("table");
160  byte[] family = Bytes.toBytes("family");
161
162  byte[] row = Bytes.toBytes("row");
163  byte[] row2 = Bytes.toBytes("row2");
164  byte[] qf1 = Bytes.toBytes("qf1");
165  byte[] qf2 = Bytes.toBytes("qf2");
166  byte[] qf3 = Bytes.toBytes("qf3");
167  byte[] qf4 = Bytes.toBytes("qf4");
168  byte[] qf5 = Bytes.toBytes("qf5");
169  byte[] qf6 = Bytes.toBytes("qf6");
170
171  NavigableSet<byte[]> qualifiers = new ConcurrentSkipListSet<>(Bytes.BYTES_COMPARATOR);
172
173  List<Cell> expected = new ArrayList<>();
174  List<Cell> result = new ArrayList<>();
175
176  long id = EnvironmentEdgeManager.currentTime();
177  Get get = new Get(row);
178
179  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
180  private static final String DIR = TEST_UTIL.getDataTestDir("TestStore").toString();
181
182  @Before
183  public void setUp() throws IOException {
184    qualifiers.clear();
185    qualifiers.add(qf1);
186    qualifiers.add(qf3);
187    qualifiers.add(qf5);
188
189    Iterator<byte[]> iter = qualifiers.iterator();
190    while (iter.hasNext()) {
191      byte[] next = iter.next();
192      expected.add(new KeyValue(row, family, next, 1, (byte[]) null));
193      get.addColumn(family, next);
194    }
195  }
196
197  private void init(String methodName) throws IOException {
198    init(methodName, TEST_UTIL.getConfiguration());
199  }
200
201  private HStore init(String methodName, Configuration conf) throws IOException {
202    // some of the tests write 4 versions and then flush
203    // (with HBASE-4241, lower versions are collected on flush)
204    return init(methodName, conf,
205      ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(4).build());
206  }
207
208  private HStore init(String methodName, Configuration conf, ColumnFamilyDescriptor hcd)
209    throws IOException {
210    return init(methodName, conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), hcd);
211  }
212
213  private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder,
214    ColumnFamilyDescriptor hcd) throws IOException {
215    return init(methodName, conf, builder, hcd, null);
216  }
217
218  private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder,
219    ColumnFamilyDescriptor hcd, MyStoreHook hook) throws IOException {
220    return init(methodName, conf, builder, hcd, hook, false);
221  }
222
223  private void initHRegion(String methodName, Configuration conf, TableDescriptorBuilder builder,
224    ColumnFamilyDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException {
225    TableDescriptor htd = builder.setColumnFamily(hcd).build();
226    Path basedir = new Path(DIR + methodName);
227    Path tableDir = CommonFSUtils.getTableDir(basedir, htd.getTableName());
228    final Path logdir = new Path(basedir, AbstractFSWALProvider.getWALDirectoryName(methodName));
229
230    FileSystem fs = FileSystem.get(conf);
231
232    fs.delete(logdir, true);
233    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false,
234      MemStoreLABImpl.CHUNK_SIZE_DEFAULT, 1, 0, null,
235      MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
236    RegionInfo info = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
237    Configuration walConf = new Configuration(conf);
238    CommonFSUtils.setRootDir(walConf, basedir);
239    WALFactory wals = new WALFactory(walConf, methodName);
240    region = new HRegion(new HRegionFileSystem(conf, fs, tableDir, info), wals.getWAL(info), conf,
241      htd, null);
242    region.regionServicesForStores = Mockito.spy(region.regionServicesForStores);
243    ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
244    Mockito.when(region.regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool);
245  }
246
247  private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder,
248    ColumnFamilyDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException {
249    initHRegion(methodName, conf, builder, hcd, hook, switchToPread);
250    if (hook == null) {
251      store = new HStore(region, hcd, conf, false);
252    } else {
253      store = new MyStore(region, hcd, conf, hook, switchToPread);
254    }
255    region.stores.put(store.getColumnFamilyDescriptor().getName(), store);
256    return store;
257  }
258
259  /**
260   * Test we do not lose data if we fail a flush and then close. Part of HBase-10466
261   */
262  @Test
263  public void testFlushSizeSizing() throws Exception {
264    LOG.info("Setting up a faulty file system that cannot write in " + this.name.getMethodName());
265    final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
266    // Only retry once.
267    conf.setInt("hbase.hstore.flush.retries.number", 1);
268    User user = User.createUserForTesting(conf, this.name.getMethodName(), new String[] { "foo" });
269    // Inject our faulty LocalFileSystem
270    conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
271    user.runAs(new PrivilegedExceptionAction<Object>() {
272      @Override
273      public Object run() throws Exception {
274        // Make sure it worked (above is sensitive to caching details in hadoop core)
275        FileSystem fs = FileSystem.get(conf);
276        assertEquals(FaultyFileSystem.class, fs.getClass());
277        FaultyFileSystem ffs = (FaultyFileSystem) fs;
278
279        // Initialize region
280        init(name.getMethodName(), conf);
281
282        MemStoreSize mss = store.memstore.getFlushableSize();
283        assertEquals(0, mss.getDataSize());
284        LOG.info("Adding some data");
285        MemStoreSizing kvSize = new NonThreadSafeMemStoreSizing();
286        store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), kvSize);
287        // add the heap size of active (mutable) segment
288        kvSize.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0, 0);
289        mss = store.memstore.getFlushableSize();
290        assertEquals(kvSize.getMemStoreSize(), mss);
291        // Flush. Bug #1 from HBASE-10466. Make sure size calculation on failed flush is right.
292        try {
293          LOG.info("Flushing");
294          flushStore(store, id++);
295          fail("Didn't bubble up IOE!");
296        } catch (IOException ioe) {
297          assertTrue(ioe.getMessage().contains("Fault injected"));
298        }
299        // due to snapshot, change mutable to immutable segment
300        kvSize.incMemStoreSize(0,
301          CSLMImmutableSegment.DEEP_OVERHEAD_CSLM - MutableSegment.DEEP_OVERHEAD, 0, 0);
302        mss = store.memstore.getFlushableSize();
303        assertEquals(kvSize.getMemStoreSize(), mss);
304        MemStoreSizing kvSize2 = new NonThreadSafeMemStoreSizing();
305        store.add(new KeyValue(row, family, qf2, 2, (byte[]) null), kvSize2);
306        kvSize2.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0, 0);
307        // Even though we add a new kv, we expect the flushable size to be 'same' since we have
308        // not yet cleared the snapshot -- the above flush failed.
309        assertEquals(kvSize.getMemStoreSize(), mss);
310        ffs.fault.set(false);
311        flushStore(store, id++);
312        mss = store.memstore.getFlushableSize();
313        // Size should be the foreground kv size.
314        assertEquals(kvSize2.getMemStoreSize(), mss);
315        flushStore(store, id++);
316        mss = store.memstore.getFlushableSize();
317        assertEquals(0, mss.getDataSize());
318        assertEquals(MutableSegment.DEEP_OVERHEAD, mss.getHeapSize());
319        return null;
320      }
321    });
322  }
323
324  @Test
325  public void testStoreBloomFilterMetricsWithBloomRowCol() throws IOException {
326    int numStoreFiles = 5;
327    writeAndRead(BloomType.ROWCOL, numStoreFiles);
328
329    assertEquals(0, store.getBloomFilterEligibleRequestsCount());
330    // hard to know exactly the numbers here, we are just trying to
331    // prove that they are incrementing
332    assertTrue(store.getBloomFilterRequestsCount() >= numStoreFiles);
333    assertTrue(store.getBloomFilterNegativeResultsCount() > 0);
334  }
335
336  @Test
337  public void testStoreBloomFilterMetricsWithBloomRow() throws IOException {
338    int numStoreFiles = 5;
339    writeAndRead(BloomType.ROWCOL, numStoreFiles);
340
341    assertEquals(0, store.getBloomFilterEligibleRequestsCount());
342    // hard to know exactly the numbers here, we are just trying to
343    // prove that they are incrementing
344    assertTrue(store.getBloomFilterRequestsCount() >= numStoreFiles);
345    assertTrue(store.getBloomFilterNegativeResultsCount() > 0);
346  }
347
348  @Test
349  public void testStoreBloomFilterMetricsWithBloomRowPrefix() throws IOException {
350    int numStoreFiles = 5;
351    writeAndRead(BloomType.ROWPREFIX_FIXED_LENGTH, numStoreFiles);
352
353    assertEquals(0, store.getBloomFilterEligibleRequestsCount());
354    // hard to know exactly the numbers here, we are just trying to
355    // prove that they are incrementing
356    assertTrue(store.getBloomFilterRequestsCount() >= numStoreFiles);
357  }
358
359  @Test
360  public void testStoreBloomFilterMetricsWithBloomNone() throws IOException {
361    int numStoreFiles = 5;
362    writeAndRead(BloomType.NONE, numStoreFiles);
363
364    assertEquals(0, store.getBloomFilterRequestsCount());
365    assertEquals(0, store.getBloomFilterNegativeResultsCount());
366
367    // hard to know exactly the numbers here, we are just trying to
368    // prove that they are incrementing
369    assertTrue(store.getBloomFilterEligibleRequestsCount() >= numStoreFiles);
370  }
371
372  private void writeAndRead(BloomType bloomType, int numStoreFiles) throws IOException {
373    Configuration conf = HBaseConfiguration.create();
374    FileSystem fs = FileSystem.get(conf);
375
376    ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.newBuilder(family)
377      .setCompressionType(Compression.Algorithm.GZ).setBloomFilterType(bloomType)
378      .setConfiguration(BloomFilterUtil.PREFIX_LENGTH_KEY, "3").build();
379    init(name.getMethodName(), conf, hcd);
380
381    for (int i = 1; i <= numStoreFiles; i++) {
382      byte[] row = Bytes.toBytes("row" + i);
383      LOG.info("Adding some data for the store file #" + i);
384      long timeStamp = EnvironmentEdgeManager.currentTime();
385      this.store.add(new KeyValue(row, family, qf1, timeStamp, (byte[]) null), null);
386      this.store.add(new KeyValue(row, family, qf2, timeStamp, (byte[]) null), null);
387      this.store.add(new KeyValue(row, family, qf3, timeStamp, (byte[]) null), null);
388      flush(i);
389    }
390
391    // Verify the total number of store files
392    assertEquals(numStoreFiles, this.store.getStorefiles().size());
393
394    TreeSet<byte[]> columns = new TreeSet<>(Bytes.BYTES_COMPARATOR);
395    columns.add(qf1);
396
397    for (int i = 1; i <= numStoreFiles; i++) {
398      KeyValueScanner scanner =
399        store.getScanner(new Scan(new Get(Bytes.toBytes("row" + i))), columns, 0);
400      scanner.peek();
401    }
402  }
403
404  /**
405   * Verify that compression and data block encoding are respected by the createWriter method, used
406   * on store flush.
407   */
408  @Test
409  public void testCreateWriter() throws Exception {
410    Configuration conf = HBaseConfiguration.create();
411    FileSystem fs = FileSystem.get(conf);
412
413    ColumnFamilyDescriptor hcd =
414      ColumnFamilyDescriptorBuilder.newBuilder(family).setCompressionType(Compression.Algorithm.GZ)
415        .setDataBlockEncoding(DataBlockEncoding.DIFF).build();
416    init(name.getMethodName(), conf, hcd);
417
418    // Test createWriter
419    StoreFileWriter writer = store.getStoreEngine()
420      .createWriter(CreateStoreFileWriterParams.create().maxKeyCount(4)
421        .compression(hcd.getCompressionType()).isCompaction(false).includeMVCCReadpoint(true)
422        .includesTag(false).shouldDropBehind(false));
423    Path path = writer.getPath();
424    writer.append(new KeyValue(row, family, qf1, Bytes.toBytes(1)));
425    writer.append(new KeyValue(row, family, qf2, Bytes.toBytes(2)));
426    writer.append(new KeyValue(row2, family, qf1, Bytes.toBytes(3)));
427    writer.append(new KeyValue(row2, family, qf2, Bytes.toBytes(4)));
428    writer.close();
429
430    // Verify that compression and encoding settings are respected
431    HFile.Reader reader = HFile.createReader(fs, path, new CacheConfig(conf), true, conf);
432    assertEquals(hcd.getCompressionType(), reader.getTrailer().getCompressionCodec());
433    assertEquals(hcd.getDataBlockEncoding(), reader.getDataBlockEncoding());
434    reader.close();
435  }
436
437  @Test
438  public void testDeleteExpiredStoreFiles() throws Exception {
439    testDeleteExpiredStoreFiles(0);
440    testDeleteExpiredStoreFiles(1);
441  }
442
443  /**
444   * @param minVersions the MIN_VERSIONS for the column family
445   */
446  public void testDeleteExpiredStoreFiles(int minVersions) throws Exception {
447    int storeFileNum = 4;
448    int ttl = 4;
449    IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
450    EnvironmentEdgeManagerTestHelper.injectEdge(edge);
451
452    Configuration conf = HBaseConfiguration.create();
453    // Enable the expired store file deletion
454    conf.setBoolean("hbase.store.delete.expired.storefile", true);
455    // Set the compaction threshold higher to avoid normal compactions.
456    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 5);
457
458    init(name.getMethodName() + "-" + minVersions, conf, ColumnFamilyDescriptorBuilder
459      .newBuilder(family).setMinVersions(minVersions).setTimeToLive(ttl).build());
460
461    long storeTtl = this.store.getScanInfo().getTtl();
462    long sleepTime = storeTtl / storeFileNum;
463    long timeStamp;
464    // There are 4 store files and the max time stamp difference among these
465    // store files will be (this.store.ttl / storeFileNum)
466    for (int i = 1; i <= storeFileNum; i++) {
467      LOG.info("Adding some data for the store file #" + i);
468      timeStamp = EnvironmentEdgeManager.currentTime();
469      this.store.add(new KeyValue(row, family, qf1, timeStamp, (byte[]) null), null);
470      this.store.add(new KeyValue(row, family, qf2, timeStamp, (byte[]) null), null);
471      this.store.add(new KeyValue(row, family, qf3, timeStamp, (byte[]) null), null);
472      flush(i);
473      edge.incrementTime(sleepTime);
474    }
475
476    // Verify the total number of store files
477    assertEquals(storeFileNum, this.store.getStorefiles().size());
478
479    // Each call will find one expired store file and delete it before compaction happens.
480    // There will be no compaction due to threshold above. Last file will not be replaced.
481    for (int i = 1; i <= storeFileNum - 1; i++) {
482      // verify the expired store file.
483      assertFalse(this.store.requestCompaction().isPresent());
484      Collection<HStoreFile> sfs = this.store.getStorefiles();
485      // Ensure i files are gone.
486      if (minVersions == 0) {
487        assertEquals(storeFileNum - i, sfs.size());
488        // Ensure only non-expired files remain.
489        for (HStoreFile sf : sfs) {
490          assertTrue(sf.getReader().getMaxTimestamp() >= (edge.currentTime() - storeTtl));
491        }
492      } else {
493        assertEquals(storeFileNum, sfs.size());
494      }
495      // Let the next store file expired.
496      edge.incrementTime(sleepTime);
497    }
498    assertFalse(this.store.requestCompaction().isPresent());
499
500    Collection<HStoreFile> sfs = this.store.getStorefiles();
501    // Assert the last expired file is not removed.
502    if (minVersions == 0) {
503      assertEquals(1, sfs.size());
504    }
505    long ts = sfs.iterator().next().getReader().getMaxTimestamp();
506    assertTrue(ts < (edge.currentTime() - storeTtl));
507
508    for (HStoreFile sf : sfs) {
509      sf.closeStoreFile(true);
510    }
511  }
512
513  @Test
514  public void testLowestModificationTime() throws Exception {
515    Configuration conf = HBaseConfiguration.create();
516    FileSystem fs = FileSystem.get(conf);
517    // Initialize region
518    init(name.getMethodName(), conf);
519
520    int storeFileNum = 4;
521    for (int i = 1; i <= storeFileNum; i++) {
522      LOG.info("Adding some data for the store file #" + i);
523      this.store.add(new KeyValue(row, family, qf1, i, (byte[]) null), null);
524      this.store.add(new KeyValue(row, family, qf2, i, (byte[]) null), null);
525      this.store.add(new KeyValue(row, family, qf3, i, (byte[]) null), null);
526      flush(i);
527    }
528    // after flush; check the lowest time stamp
529    long lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
530    long lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
531    assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS);
532
533    // after compact; check the lowest time stamp
534    store.compact(store.requestCompaction().get(), NoLimitThroughputController.INSTANCE, null);
535    lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
536    lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
537    assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS);
538  }
539
540  private static long getLowestTimeStampFromFS(FileSystem fs,
541    final Collection<HStoreFile> candidates) throws IOException {
542    long minTs = Long.MAX_VALUE;
543    if (candidates.isEmpty()) {
544      return minTs;
545    }
546    Path[] p = new Path[candidates.size()];
547    int i = 0;
548    for (HStoreFile sf : candidates) {
549      p[i] = sf.getPath();
550      ++i;
551    }
552
553    FileStatus[] stats = fs.listStatus(p);
554    if (stats == null || stats.length == 0) {
555      return minTs;
556    }
557    for (FileStatus s : stats) {
558      minTs = Math.min(minTs, s.getModificationTime());
559    }
560    return minTs;
561  }
562
563  //////////////////////////////////////////////////////////////////////////////
564  // Get tests
565  //////////////////////////////////////////////////////////////////////////////
566
567  private static final int BLOCKSIZE_SMALL = 8192;
568
569  /**
570   * Test for hbase-1686.
571   */
572  @Test
573  public void testEmptyStoreFile() throws IOException {
574    init(this.name.getMethodName());
575    // Write a store file.
576    this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
577    this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null);
578    flush(1);
579    // Now put in place an empty store file. Its a little tricky. Have to
580    // do manually with hacked in sequence id.
581    HStoreFile f = this.store.getStorefiles().iterator().next();
582    Path storedir = f.getPath().getParent();
583    long seqid = f.getMaxSequenceId();
584    Configuration c = HBaseConfiguration.create();
585    FileSystem fs = FileSystem.get(c);
586    HFileContext meta = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build();
587    StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c), fs)
588      .withOutputDir(storedir).withFileContext(meta).build();
589    w.appendMetadata(seqid + 1, false);
590    w.close();
591    this.store.close();
592    // Reopen it... should pick up two files
593    this.store =
594      new HStore(this.store.getHRegion(), this.store.getColumnFamilyDescriptor(), c, false);
595    assertEquals(2, this.store.getStorefilesCount());
596
597    result = HBaseTestingUtil.getFromStoreFile(store, get.getRow(), qualifiers);
598    assertEquals(1, result.size());
599  }
600
601  /**
602   * Getting data from memstore only
603   */
604  @Test
605  public void testGet_FromMemStoreOnly() throws IOException {
606    init(this.name.getMethodName());
607
608    // Put data in memstore
609    this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
610    this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null);
611    this.store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null);
612    this.store.add(new KeyValue(row, family, qf4, 1, (byte[]) null), null);
613    this.store.add(new KeyValue(row, family, qf5, 1, (byte[]) null), null);
614    this.store.add(new KeyValue(row, family, qf6, 1, (byte[]) null), null);
615
616    // Get
617    result = HBaseTestingUtil.getFromStoreFile(store, get.getRow(), qualifiers);
618
619    // Compare
620    assertCheck();
621  }
622
623  @Test
624  public void testTimeRangeIfSomeCellsAreDroppedInFlush() throws IOException {
625    testTimeRangeIfSomeCellsAreDroppedInFlush(1);
626    testTimeRangeIfSomeCellsAreDroppedInFlush(3);
627    testTimeRangeIfSomeCellsAreDroppedInFlush(5);
628  }
629
630  private void testTimeRangeIfSomeCellsAreDroppedInFlush(int maxVersion) throws IOException {
631    init(this.name.getMethodName(), TEST_UTIL.getConfiguration(),
632      ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(maxVersion).build());
633    long currentTs = 100;
634    long minTs = currentTs;
635    // the extra cell won't be flushed to disk,
636    // so the min of timerange will be different between memStore and hfile.
637    for (int i = 0; i != (maxVersion + 1); ++i) {
638      this.store.add(new KeyValue(row, family, qf1, ++currentTs, (byte[]) null), null);
639      if (i == 1) {
640        minTs = currentTs;
641      }
642    }
643    flushStore(store, id++);
644
645    Collection<HStoreFile> files = store.getStorefiles();
646    assertEquals(1, files.size());
647    HStoreFile f = files.iterator().next();
648    f.initReader();
649    StoreFileReader reader = f.getReader();
650    assertEquals(minTs, reader.timeRange.getMin());
651    assertEquals(currentTs, reader.timeRange.getMax());
652  }
653
654  /**
655   * Getting data from files only
656   */
657  @Test
658  public void testGet_FromFilesOnly() throws IOException {
659    init(this.name.getMethodName());
660
661    // Put data in memstore
662    this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
663    this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null);
664    // flush
665    flush(1);
666
667    // Add more data
668    this.store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null);
669    this.store.add(new KeyValue(row, family, qf4, 1, (byte[]) null), null);
670    // flush
671    flush(2);
672
673    // Add more data
674    this.store.add(new KeyValue(row, family, qf5, 1, (byte[]) null), null);
675    this.store.add(new KeyValue(row, family, qf6, 1, (byte[]) null), null);
676    // flush
677    flush(3);
678
679    // Get
680    result = HBaseTestingUtil.getFromStoreFile(store, get.getRow(), qualifiers);
681    // this.store.get(get, qualifiers, result);
682
683    // Need to sort the result since multiple files
684    Collections.sort(result, CellComparatorImpl.COMPARATOR);
685
686    // Compare
687    assertCheck();
688  }
689
690  /**
691   * Getting data from memstore and files
692   */
693  @Test
694  public void testGet_FromMemStoreAndFiles() throws IOException {
695    init(this.name.getMethodName());
696
697    // Put data in memstore
698    this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
699    this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null);
700    // flush
701    flush(1);
702
703    // Add more data
704    this.store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null);
705    this.store.add(new KeyValue(row, family, qf4, 1, (byte[]) null), null);
706    // flush
707    flush(2);
708
709    // Add more data
710    this.store.add(new KeyValue(row, family, qf5, 1, (byte[]) null), null);
711    this.store.add(new KeyValue(row, family, qf6, 1, (byte[]) null), null);
712
713    // Get
714    result = HBaseTestingUtil.getFromStoreFile(store, get.getRow(), qualifiers);
715
716    // Need to sort the result since multiple files
717    Collections.sort(result, CellComparatorImpl.COMPARATOR);
718
719    // Compare
720    assertCheck();
721  }
722
723  private void flush(int storeFilessize) throws IOException {
724    flushStore(store, id++);
725    assertEquals(storeFilessize, this.store.getStorefiles().size());
726    assertEquals(0, ((AbstractMemStore) this.store.memstore).getActive().getCellsCount());
727  }
728
729  private void assertCheck() {
730    assertEquals(expected.size(), result.size());
731    for (int i = 0; i < expected.size(); i++) {
732      assertEquals(expected.get(i), result.get(i));
733    }
734  }
735
736  @After
737  public void tearDown() throws Exception {
738    EnvironmentEdgeManagerTestHelper.reset();
739    if (store != null) {
740      try {
741        store.close();
742      } catch (IOException e) {
743      }
744      store = null;
745    }
746    if (region != null) {
747      region.close();
748      region = null;
749    }
750  }
751
752  @AfterClass
753  public static void tearDownAfterClass() throws IOException {
754    TEST_UTIL.cleanupTestDir();
755  }
756
757  @Test
758  public void testHandleErrorsInFlush() throws Exception {
759    LOG.info("Setting up a faulty file system that cannot write");
760
761    final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
762    User user = User.createUserForTesting(conf, "testhandleerrorsinflush", new String[] { "foo" });
763    // Inject our faulty LocalFileSystem
764    conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
765    user.runAs(new PrivilegedExceptionAction<Object>() {
766      @Override
767      public Object run() throws Exception {
768        // Make sure it worked (above is sensitive to caching details in hadoop core)
769        FileSystem fs = FileSystem.get(conf);
770        assertEquals(FaultyFileSystem.class, fs.getClass());
771
772        // Initialize region
773        init(name.getMethodName(), conf);
774
775        LOG.info("Adding some data");
776        store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
777        store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null);
778        store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null);
779
780        LOG.info("Before flush, we should have no files");
781
782        Collection<StoreFileInfo> files =
783          store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName());
784        assertEquals(0, files != null ? files.size() : 0);
785
786        // flush
787        try {
788          LOG.info("Flushing");
789          flush(1);
790          fail("Didn't bubble up IOE!");
791        } catch (IOException ioe) {
792          assertTrue(ioe.getMessage().contains("Fault injected"));
793        }
794
795        LOG.info("After failed flush, we should still have no files!");
796        files = store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName());
797        assertEquals(0, files != null ? files.size() : 0);
798        store.getHRegion().getWAL().close();
799        return null;
800      }
801    });
802    FileSystem.closeAllForUGI(user.getUGI());
803  }
804
805  /**
806   * Faulty file system that will fail if you write past its fault position the FIRST TIME only;
807   * thereafter it will succeed. Used by {@link TestHRegion} too.
808   */
809  static class FaultyFileSystem extends FilterFileSystem {
810    List<SoftReference<FaultyOutputStream>> outStreams = new ArrayList<>();
811    private long faultPos = 200;
812    AtomicBoolean fault = new AtomicBoolean(true);
813
814    public FaultyFileSystem() {
815      super(new LocalFileSystem());
816      LOG.info("Creating faulty!");
817    }
818
819    @Override
820    public FSDataOutputStream create(Path p) throws IOException {
821      return new FaultyOutputStream(super.create(p), faultPos, fault);
822    }
823
824    @Override
825    public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite,
826      int bufferSize, short replication, long blockSize, Progressable progress) throws IOException {
827      return new FaultyOutputStream(
828        super.create(f, permission, overwrite, bufferSize, replication, blockSize, progress),
829        faultPos, fault);
830    }
831
832    @Override
833    public FSDataOutputStream createNonRecursive(Path f, boolean overwrite, int bufferSize,
834      short replication, long blockSize, Progressable progress) throws IOException {
835      // Fake it. Call create instead. The default implementation throws an IOE
836      // that this is not supported.
837      return create(f, overwrite, bufferSize, replication, blockSize, progress);
838    }
839  }
840
841  static class FaultyOutputStream extends FSDataOutputStream {
842    volatile long faultPos = Long.MAX_VALUE;
843    private final AtomicBoolean fault;
844
845    public FaultyOutputStream(FSDataOutputStream out, long faultPos, final AtomicBoolean fault)
846      throws IOException {
847      super(out, null);
848      this.faultPos = faultPos;
849      this.fault = fault;
850    }
851
852    @Override
853    public synchronized void write(byte[] buf, int offset, int length) throws IOException {
854      LOG.info("faulty stream write at pos " + getPos());
855      injectFault();
856      super.write(buf, offset, length);
857    }
858
859    private void injectFault() throws IOException {
860      if (this.fault.get() && getPos() >= faultPos) {
861        throw new IOException("Fault injected");
862      }
863    }
864  }
865
866  private static StoreFlushContext flushStore(HStore store, long id) throws IOException {
867    StoreFlushContext storeFlushCtx = store.createFlushContext(id, FlushLifeCycleTracker.DUMMY);
868    storeFlushCtx.prepare();
869    storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
870    storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
871    return storeFlushCtx;
872  }
873
874  /**
875   * Generate a list of KeyValues for testing based on given parameters
876   * @return the rows key-value list
877   */
878  private List<Cell> getKeyValueSet(long[] timestamps, int numRows, byte[] qualifier,
879    byte[] family) {
880    List<Cell> kvList = new ArrayList<>();
881    for (int i = 1; i <= numRows; i++) {
882      byte[] b = Bytes.toBytes(i);
883      for (long timestamp : timestamps) {
884        kvList.add(new KeyValue(b, family, qualifier, timestamp, b));
885      }
886    }
887    return kvList;
888  }
889
890  /**
891   * Test to ensure correctness when using Stores with multiple timestamps
892   */
893  @Test
894  public void testMultipleTimestamps() throws IOException {
895    int numRows = 1;
896    long[] timestamps1 = new long[] { 1, 5, 10, 20 };
897    long[] timestamps2 = new long[] { 30, 80 };
898
899    init(this.name.getMethodName());
900
901    List<Cell> kvList1 = getKeyValueSet(timestamps1, numRows, qf1, family);
902    for (Cell kv : kvList1) {
903      this.store.add(kv, null);
904    }
905
906    flushStore(store, id++);
907
908    List<Cell> kvList2 = getKeyValueSet(timestamps2, numRows, qf1, family);
909    for (Cell kv : kvList2) {
910      this.store.add(kv, null);
911    }
912
913    List<Cell> result;
914    Get get = new Get(Bytes.toBytes(1));
915    get.addColumn(family, qf1);
916
917    get.setTimeRange(0, 15);
918    result = HBaseTestingUtil.getFromStoreFile(store, get);
919    assertTrue(result.size() > 0);
920
921    get.setTimeRange(40, 90);
922    result = HBaseTestingUtil.getFromStoreFile(store, get);
923    assertTrue(result.size() > 0);
924
925    get.setTimeRange(10, 45);
926    result = HBaseTestingUtil.getFromStoreFile(store, get);
927    assertTrue(result.size() > 0);
928
929    get.setTimeRange(80, 145);
930    result = HBaseTestingUtil.getFromStoreFile(store, get);
931    assertTrue(result.size() > 0);
932
933    get.setTimeRange(1, 2);
934    result = HBaseTestingUtil.getFromStoreFile(store, get);
935    assertTrue(result.size() > 0);
936
937    get.setTimeRange(90, 200);
938    result = HBaseTestingUtil.getFromStoreFile(store, get);
939    assertTrue(result.size() == 0);
940  }
941
942  /**
943   * Test for HBASE-3492 - Test split on empty colfam (no store files).
944   * @throws IOException When the IO operations fail.
945   */
946  @Test
947  public void testSplitWithEmptyColFam() throws IOException {
948    init(this.name.getMethodName());
949    assertFalse(store.getSplitPoint().isPresent());
950  }
951
952  @Test
953  public void testStoreUsesConfigurationFromHcdAndHtd() throws Exception {
954    final String CONFIG_KEY = "hbase.regionserver.thread.compaction.throttle";
955    long anyValue = 10;
956
957    // We'll check that it uses correct config and propagates it appropriately by going thru
958    // the simplest "real" path I can find - "throttleCompaction", which just checks whether
959    // a number we pass in is higher than some config value, inside compactionPolicy.
960    Configuration conf = HBaseConfiguration.create();
961    conf.setLong(CONFIG_KEY, anyValue);
962    init(name.getMethodName() + "-xml", conf);
963    assertTrue(store.throttleCompaction(anyValue + 1));
964    assertFalse(store.throttleCompaction(anyValue));
965
966    // HTD overrides XML.
967    --anyValue;
968    init(
969      name.getMethodName() + "-htd", conf, TableDescriptorBuilder
970        .newBuilder(TableName.valueOf(table)).setValue(CONFIG_KEY, Long.toString(anyValue)),
971      ColumnFamilyDescriptorBuilder.of(family));
972    assertTrue(store.throttleCompaction(anyValue + 1));
973    assertFalse(store.throttleCompaction(anyValue));
974
975    // HCD overrides them both.
976    --anyValue;
977    init(name.getMethodName() + "-hcd", conf,
978      TableDescriptorBuilder.newBuilder(TableName.valueOf(table)).setValue(CONFIG_KEY,
979        Long.toString(anyValue)),
980      ColumnFamilyDescriptorBuilder.newBuilder(family).setValue(CONFIG_KEY, Long.toString(anyValue))
981        .build());
982    assertTrue(store.throttleCompaction(anyValue + 1));
983    assertFalse(store.throttleCompaction(anyValue));
984  }
985
986  public static class DummyStoreEngine extends DefaultStoreEngine {
987    public static DefaultCompactor lastCreatedCompactor = null;
988
989    @Override
990    protected void createComponents(Configuration conf, HStore store, CellComparator comparator)
991      throws IOException {
992      super.createComponents(conf, store, comparator);
993      lastCreatedCompactor = this.compactor;
994    }
995  }
996
997  @Test
998  public void testStoreUsesSearchEngineOverride() throws Exception {
999    Configuration conf = HBaseConfiguration.create();
1000    conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DummyStoreEngine.class.getName());
1001    init(this.name.getMethodName(), conf);
1002    assertEquals(DummyStoreEngine.lastCreatedCompactor, this.store.storeEngine.getCompactor());
1003  }
1004
1005  private void addStoreFile() throws IOException {
1006    HStoreFile f = this.store.getStorefiles().iterator().next();
1007    Path storedir = f.getPath().getParent();
1008    long seqid = this.store.getMaxSequenceId().orElse(0L);
1009    Configuration c = TEST_UTIL.getConfiguration();
1010    FileSystem fs = FileSystem.get(c);
1011    HFileContext fileContext = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build();
1012    StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c), fs)
1013      .withOutputDir(storedir).withFileContext(fileContext).build();
1014    w.appendMetadata(seqid + 1, false);
1015    w.close();
1016    LOG.info("Added store file:" + w.getPath());
1017  }
1018
1019  private void archiveStoreFile(int index) throws IOException {
1020    Collection<HStoreFile> files = this.store.getStorefiles();
1021    HStoreFile sf = null;
1022    Iterator<HStoreFile> it = files.iterator();
1023    for (int i = 0; i <= index; i++) {
1024      sf = it.next();
1025    }
1026    store.getRegionFileSystem().removeStoreFiles(store.getColumnFamilyName(),
1027      Lists.newArrayList(sf));
1028  }
1029
1030  private void closeCompactedFile(int index) throws IOException {
1031    Collection<HStoreFile> files =
1032      this.store.getStoreEngine().getStoreFileManager().getCompactedfiles();
1033    if (files.size() > 0) {
1034      HStoreFile sf = null;
1035      Iterator<HStoreFile> it = files.iterator();
1036      for (int i = 0; i <= index; i++) {
1037        sf = it.next();
1038      }
1039      sf.closeStoreFile(true);
1040      store.getStoreEngine().getStoreFileManager()
1041        .removeCompactedFiles(Collections.singletonList(sf));
1042    }
1043  }
1044
1045  @Test
1046  public void testRefreshStoreFiles() throws Exception {
1047    init(name.getMethodName());
1048
1049    assertEquals(0, this.store.getStorefilesCount());
1050
1051    // Test refreshing store files when no store files are there
1052    store.refreshStoreFiles();
1053    assertEquals(0, this.store.getStorefilesCount());
1054
1055    // add some data, flush
1056    this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
1057    flush(1);
1058    assertEquals(1, this.store.getStorefilesCount());
1059
1060    // add one more file
1061    addStoreFile();
1062
1063    assertEquals(1, this.store.getStorefilesCount());
1064    store.refreshStoreFiles();
1065    assertEquals(2, this.store.getStorefilesCount());
1066
1067    // add three more files
1068    addStoreFile();
1069    addStoreFile();
1070    addStoreFile();
1071
1072    assertEquals(2, this.store.getStorefilesCount());
1073    store.refreshStoreFiles();
1074    assertEquals(5, this.store.getStorefilesCount());
1075
1076    closeCompactedFile(0);
1077    archiveStoreFile(0);
1078
1079    assertEquals(5, this.store.getStorefilesCount());
1080    store.refreshStoreFiles();
1081    assertEquals(4, this.store.getStorefilesCount());
1082
1083    archiveStoreFile(0);
1084    archiveStoreFile(1);
1085    archiveStoreFile(2);
1086
1087    assertEquals(4, this.store.getStorefilesCount());
1088    store.refreshStoreFiles();
1089    assertEquals(1, this.store.getStorefilesCount());
1090
1091    archiveStoreFile(0);
1092    store.refreshStoreFiles();
1093    assertEquals(0, this.store.getStorefilesCount());
1094  }
1095
1096  @Test
1097  public void testRefreshStoreFilesNotChanged() throws IOException {
1098    init(name.getMethodName());
1099
1100    assertEquals(0, this.store.getStorefilesCount());
1101
1102    // add some data, flush
1103    this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
1104    flush(1);
1105    // add one more file
1106    addStoreFile();
1107
1108    StoreEngine<?, ?, ?, ?> spiedStoreEngine = spy(store.getStoreEngine());
1109
1110    // call first time after files changed
1111    spiedStoreEngine.refreshStoreFiles();
1112    assertEquals(2, this.store.getStorefilesCount());
1113    verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any(), any(), any());
1114
1115    // call second time
1116    spiedStoreEngine.refreshStoreFiles();
1117
1118    // ensure that replaceStoreFiles is not called, i.e, the times does not change, if files are not
1119    // refreshed,
1120    verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any(), any(), any());
1121  }
1122
1123  @Test
1124  public void testScanWithCompactionAfterFlush() throws Exception {
1125    TEST_UTIL.getConfiguration().set(DEFAULT_COMPACTION_POLICY_CLASS_KEY,
1126      EverythingPolicy.class.getName());
1127    init(name.getMethodName());
1128
1129    assertEquals(0, this.store.getStorefilesCount());
1130
1131    KeyValue kv = new KeyValue(row, family, qf1, 1, (byte[]) null);
1132    // add some data, flush
1133    this.store.add(kv, null);
1134    flush(1);
1135    kv = new KeyValue(row, family, qf2, 1, (byte[]) null);
1136    // add some data, flush
1137    this.store.add(kv, null);
1138    flush(2);
1139    kv = new KeyValue(row, family, qf3, 1, (byte[]) null);
1140    // add some data, flush
1141    this.store.add(kv, null);
1142    flush(3);
1143
1144    ExecutorService service = Executors.newFixedThreadPool(2);
1145
1146    Scan scan = new Scan(new Get(row));
1147    Future<KeyValueScanner> scanFuture = service.submit(() -> {
1148      try {
1149        LOG.info(">>>> creating scanner");
1150        return this.store.createScanner(scan,
1151          new ScanInfo(HBaseConfiguration.create(),
1152            ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(4).build(),
1153            Long.MAX_VALUE, 0, CellComparator.getInstance()),
1154          scan.getFamilyMap().get(store.getColumnFamilyDescriptor().getName()), 0);
1155      } catch (IOException e) {
1156        e.printStackTrace();
1157        return null;
1158      }
1159    });
1160    Future compactFuture = service.submit(() -> {
1161      try {
1162        LOG.info(">>>>>> starting compaction");
1163        Optional<CompactionContext> opCompaction = this.store.requestCompaction();
1164        assertTrue(opCompaction.isPresent());
1165        store.compact(opCompaction.get(), new NoLimitThroughputController(), User.getCurrent());
1166        LOG.info(">>>>>> Compaction is finished");
1167        this.store.closeAndArchiveCompactedFiles();
1168        LOG.info(">>>>>> Compacted files deleted");
1169      } catch (IOException e) {
1170        e.printStackTrace();
1171      }
1172    });
1173
1174    KeyValueScanner kvs = scanFuture.get();
1175    compactFuture.get();
1176    ((StoreScanner) kvs).currentScanners.forEach(s -> {
1177      if (s instanceof StoreFileScanner) {
1178        assertEquals(1, ((StoreFileScanner) s).getReader().getRefCount());
1179      }
1180    });
1181    kvs.seek(kv);
1182    service.shutdownNow();
1183  }
1184
1185  private long countMemStoreScanner(StoreScanner scanner) {
1186    if (scanner.currentScanners == null) {
1187      return 0;
1188    }
1189    return scanner.currentScanners.stream().filter(s -> !s.isFileScanner()).count();
1190  }
1191
1192  @Test
1193  public void testNumberOfMemStoreScannersAfterFlush() throws IOException {
1194    long seqId = 100;
1195    long timestamp = EnvironmentEdgeManager.currentTime();
1196    Cell cell0 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family)
1197      .setQualifier(qf1).setTimestamp(timestamp).setType(Cell.Type.Put).setValue(qf1).build();
1198    PrivateCellUtil.setSequenceId(cell0, seqId);
1199    testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Collections.emptyList());
1200
1201    Cell cell1 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family)
1202      .setQualifier(qf2).setTimestamp(timestamp).setType(Cell.Type.Put).setValue(qf1).build();
1203    PrivateCellUtil.setSequenceId(cell1, seqId);
1204    testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1));
1205
1206    seqId = 101;
1207    timestamp = EnvironmentEdgeManager.currentTime();
1208    Cell cell2 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row2).setFamily(family)
1209      .setQualifier(qf2).setTimestamp(timestamp).setType(Cell.Type.Put).setValue(qf1).build();
1210    PrivateCellUtil.setSequenceId(cell2, seqId);
1211    testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1, cell2));
1212  }
1213
1214  private void testNumberOfMemStoreScannersAfterFlush(List<Cell> inputCellsBeforeSnapshot,
1215    List<Cell> inputCellsAfterSnapshot) throws IOException {
1216    init(this.name.getMethodName() + "-" + inputCellsBeforeSnapshot.size());
1217    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1218    long seqId = Long.MIN_VALUE;
1219    for (Cell c : inputCellsBeforeSnapshot) {
1220      quals.add(CellUtil.cloneQualifier(c));
1221      seqId = Math.max(seqId, c.getSequenceId());
1222    }
1223    for (Cell c : inputCellsAfterSnapshot) {
1224      quals.add(CellUtil.cloneQualifier(c));
1225      seqId = Math.max(seqId, c.getSequenceId());
1226    }
1227    inputCellsBeforeSnapshot.forEach(c -> store.add(c, null));
1228    StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY);
1229    storeFlushCtx.prepare();
1230    inputCellsAfterSnapshot.forEach(c -> store.add(c, null));
1231    int numberOfMemScannersBeforeFlush = inputCellsAfterSnapshot.isEmpty() ? 1 : 2;
1232    try (StoreScanner s = (StoreScanner) store.getScanner(new Scan(), quals, seqId)) {
1233      // snapshot + active (if inputCellsAfterSnapshot isn't empty)
1234      assertEquals(numberOfMemScannersBeforeFlush, countMemStoreScanner(s));
1235      storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1236      storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1237      // snapshot has no data after flush
1238      int numberOfMemScannersAfterFlush = inputCellsAfterSnapshot.isEmpty() ? 0 : 1;
1239      boolean more;
1240      int cellCount = 0;
1241      do {
1242        List<Cell> cells = new ArrayList<>();
1243        more = s.next(cells);
1244        cellCount += cells.size();
1245        assertEquals(more ? numberOfMemScannersAfterFlush : 0, countMemStoreScanner(s));
1246      } while (more);
1247      assertEquals(
1248        "The number of cells added before snapshot is " + inputCellsBeforeSnapshot.size()
1249          + ", The number of cells added after snapshot is " + inputCellsAfterSnapshot.size(),
1250        inputCellsBeforeSnapshot.size() + inputCellsAfterSnapshot.size(), cellCount);
1251      // the current scanners is cleared
1252      assertEquals(0, countMemStoreScanner(s));
1253    }
1254  }
1255
1256  private Cell createCell(byte[] qualifier, long ts, long sequenceId, byte[] value)
1257    throws IOException {
1258    return createCell(row, qualifier, ts, sequenceId, value);
1259  }
1260
1261  private Cell createCell(byte[] row, byte[] qualifier, long ts, long sequenceId, byte[] value)
1262    throws IOException {
1263    Cell c = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family)
1264      .setQualifier(qualifier).setTimestamp(ts).setType(Cell.Type.Put).setValue(value).build();
1265    PrivateCellUtil.setSequenceId(c, sequenceId);
1266    return c;
1267  }
1268
1269  @Test
1270  public void testFlushBeforeCompletingScanWoFilter() throws IOException, InterruptedException {
1271    final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false);
1272    final int expectedSize = 3;
1273    testFlushBeforeCompletingScan(new MyListHook() {
1274      @Override
1275      public void hook(int currentSize) {
1276        if (currentSize == expectedSize - 1) {
1277          try {
1278            flushStore(store, id++);
1279            timeToGoNextRow.set(true);
1280          } catch (IOException e) {
1281            throw new RuntimeException(e);
1282          }
1283        }
1284      }
1285    }, new FilterBase() {
1286      @Override
1287      public Filter.ReturnCode filterCell(final Cell c) throws IOException {
1288        return ReturnCode.INCLUDE;
1289      }
1290    }, expectedSize);
1291  }
1292
1293  @Test
1294  public void testFlushBeforeCompletingScanWithFilter() throws IOException, InterruptedException {
1295    final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false);
1296    final int expectedSize = 2;
1297    testFlushBeforeCompletingScan(new MyListHook() {
1298      @Override
1299      public void hook(int currentSize) {
1300        if (currentSize == expectedSize - 1) {
1301          try {
1302            flushStore(store, id++);
1303            timeToGoNextRow.set(true);
1304          } catch (IOException e) {
1305            throw new RuntimeException(e);
1306          }
1307        }
1308      }
1309    }, new FilterBase() {
1310      @Override
1311      public Filter.ReturnCode filterCell(final Cell c) throws IOException {
1312        if (timeToGoNextRow.get()) {
1313          timeToGoNextRow.set(false);
1314          return ReturnCode.NEXT_ROW;
1315        } else {
1316          return ReturnCode.INCLUDE;
1317        }
1318      }
1319    }, expectedSize);
1320  }
1321
1322  @Test
1323  public void testFlushBeforeCompletingScanWithFilterHint()
1324    throws IOException, InterruptedException {
1325    final AtomicBoolean timeToGetHint = new AtomicBoolean(false);
1326    final int expectedSize = 2;
1327    testFlushBeforeCompletingScan(new MyListHook() {
1328      @Override
1329      public void hook(int currentSize) {
1330        if (currentSize == expectedSize - 1) {
1331          try {
1332            flushStore(store, id++);
1333            timeToGetHint.set(true);
1334          } catch (IOException e) {
1335            throw new RuntimeException(e);
1336          }
1337        }
1338      }
1339    }, new FilterBase() {
1340      @Override
1341      public Filter.ReturnCode filterCell(final Cell c) throws IOException {
1342        if (timeToGetHint.get()) {
1343          timeToGetHint.set(false);
1344          return Filter.ReturnCode.SEEK_NEXT_USING_HINT;
1345        } else {
1346          return Filter.ReturnCode.INCLUDE;
1347        }
1348      }
1349
1350      @Override
1351      public Cell getNextCellHint(Cell currentCell) throws IOException {
1352        return currentCell;
1353      }
1354    }, expectedSize);
1355  }
1356
1357  private void testFlushBeforeCompletingScan(MyListHook hook, Filter filter, int expectedSize)
1358    throws IOException, InterruptedException {
1359    Configuration conf = HBaseConfiguration.create();
1360    byte[] r0 = Bytes.toBytes("row0");
1361    byte[] r1 = Bytes.toBytes("row1");
1362    byte[] r2 = Bytes.toBytes("row2");
1363    byte[] value0 = Bytes.toBytes("value0");
1364    byte[] value1 = Bytes.toBytes("value1");
1365    byte[] value2 = Bytes.toBytes("value2");
1366    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1367    long ts = EnvironmentEdgeManager.currentTime();
1368    long seqId = 100;
1369    init(name.getMethodName(), conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)),
1370      ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(1).build(),
1371      new MyStoreHook() {
1372        @Override
1373        public long getSmallestReadPoint(HStore store) {
1374          return seqId + 3;
1375        }
1376      });
1377    // The cells having the value0 won't be flushed to disk because the value of max version is 1
1378    store.add(createCell(r0, qf1, ts, seqId, value0), memStoreSizing);
1379    store.add(createCell(r0, qf2, ts, seqId, value0), memStoreSizing);
1380    store.add(createCell(r0, qf3, ts, seqId, value0), memStoreSizing);
1381    store.add(createCell(r1, qf1, ts + 1, seqId + 1, value1), memStoreSizing);
1382    store.add(createCell(r1, qf2, ts + 1, seqId + 1, value1), memStoreSizing);
1383    store.add(createCell(r1, qf3, ts + 1, seqId + 1, value1), memStoreSizing);
1384    store.add(createCell(r2, qf1, ts + 2, seqId + 2, value2), memStoreSizing);
1385    store.add(createCell(r2, qf2, ts + 2, seqId + 2, value2), memStoreSizing);
1386    store.add(createCell(r2, qf3, ts + 2, seqId + 2, value2), memStoreSizing);
1387    store.add(createCell(r1, qf1, ts + 3, seqId + 3, value1), memStoreSizing);
1388    store.add(createCell(r1, qf2, ts + 3, seqId + 3, value1), memStoreSizing);
1389    store.add(createCell(r1, qf3, ts + 3, seqId + 3, value1), memStoreSizing);
1390    List<Cell> myList = new MyList<>(hook);
1391    Scan scan = new Scan().withStartRow(r1).setFilter(filter);
1392    try (InternalScanner scanner = (InternalScanner) store.getScanner(scan, null, seqId + 3)) {
1393      // r1
1394      scanner.next(myList);
1395      assertEquals(expectedSize, myList.size());
1396      for (Cell c : myList) {
1397        byte[] actualValue = CellUtil.cloneValue(c);
1398        assertTrue("expected:" + Bytes.toStringBinary(value1) + ", actual:"
1399          + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value1));
1400      }
1401      List<Cell> normalList = new ArrayList<>(3);
1402      // r2
1403      scanner.next(normalList);
1404      assertEquals(3, normalList.size());
1405      for (Cell c : normalList) {
1406        byte[] actualValue = CellUtil.cloneValue(c);
1407        assertTrue("expected:" + Bytes.toStringBinary(value2) + ", actual:"
1408          + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value2));
1409      }
1410    }
1411  }
1412
1413  @Test
1414  public void testCreateScannerAndSnapshotConcurrently() throws IOException, InterruptedException {
1415    Configuration conf = HBaseConfiguration.create();
1416    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore.class.getName());
1417    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
1418      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
1419    byte[] value = Bytes.toBytes("value");
1420    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1421    long ts = EnvironmentEdgeManager.currentTime();
1422    long seqId = 100;
1423    // older data whihc shouldn't be "seen" by client
1424    store.add(createCell(qf1, ts, seqId, value), memStoreSizing);
1425    store.add(createCell(qf2, ts, seqId, value), memStoreSizing);
1426    store.add(createCell(qf3, ts, seqId, value), memStoreSizing);
1427    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1428    quals.add(qf1);
1429    quals.add(qf2);
1430    quals.add(qf3);
1431    StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY);
1432    MyCompactingMemStore.START_TEST.set(true);
1433    Runnable flush = () -> {
1434      // this is blocked until we create first scanner from pipeline and snapshot -- phase (1/5)
1435      // recreate the active memstore -- phase (4/5)
1436      storeFlushCtx.prepare();
1437    };
1438    ExecutorService service = Executors.newSingleThreadExecutor();
1439    service.execute(flush);
1440    // we get scanner from pipeline and snapshot but they are empty. -- phase (2/5)
1441    // this is blocked until we recreate the active memstore -- phase (3/5)
1442    // we get scanner from active memstore but it is empty -- phase (5/5)
1443    InternalScanner scanner =
1444      (InternalScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1);
1445    service.shutdown();
1446    service.awaitTermination(20, TimeUnit.SECONDS);
1447    try {
1448      try {
1449        List<Cell> results = new ArrayList<>();
1450        scanner.next(results);
1451        assertEquals(3, results.size());
1452        for (Cell c : results) {
1453          byte[] actualValue = CellUtil.cloneValue(c);
1454          assertTrue("expected:" + Bytes.toStringBinary(value) + ", actual:"
1455            + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value));
1456        }
1457      } finally {
1458        scanner.close();
1459      }
1460    } finally {
1461      MyCompactingMemStore.START_TEST.set(false);
1462      storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1463      storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1464    }
1465  }
1466
1467  @Test
1468  public void testScanWithDoubleFlush() throws IOException {
1469    Configuration conf = HBaseConfiguration.create();
1470    // Initialize region
1471    MyStore myStore = initMyStore(name.getMethodName(), conf, new MyStoreHook() {
1472      @Override
1473      public void getScanners(MyStore store) throws IOException {
1474        final long tmpId = id++;
1475        ExecutorService s = Executors.newSingleThreadExecutor();
1476        s.execute(() -> {
1477          try {
1478            // flush the store before storescanner updates the scanners from store.
1479            // The current data will be flushed into files, and the memstore will
1480            // be clear.
1481            // -- phase (4/4)
1482            flushStore(store, tmpId);
1483          } catch (IOException ex) {
1484            throw new RuntimeException(ex);
1485          }
1486        });
1487        s.shutdown();
1488        try {
1489          // wait for the flush, the thread will be blocked in HStore#notifyChangedReadersObservers.
1490          s.awaitTermination(3, TimeUnit.SECONDS);
1491        } catch (InterruptedException ex) {
1492        }
1493      }
1494    });
1495    byte[] oldValue = Bytes.toBytes("oldValue");
1496    byte[] currentValue = Bytes.toBytes("currentValue");
1497    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1498    long ts = EnvironmentEdgeManager.currentTime();
1499    long seqId = 100;
1500    // older data whihc shouldn't be "seen" by client
1501    myStore.add(createCell(qf1, ts, seqId, oldValue), memStoreSizing);
1502    myStore.add(createCell(qf2, ts, seqId, oldValue), memStoreSizing);
1503    myStore.add(createCell(qf3, ts, seqId, oldValue), memStoreSizing);
1504    long snapshotId = id++;
1505    // push older data into snapshot -- phase (1/4)
1506    StoreFlushContext storeFlushCtx =
1507      store.createFlushContext(snapshotId, FlushLifeCycleTracker.DUMMY);
1508    storeFlushCtx.prepare();
1509
1510    // insert current data into active -- phase (2/4)
1511    myStore.add(createCell(qf1, ts + 1, seqId + 1, currentValue), memStoreSizing);
1512    myStore.add(createCell(qf2, ts + 1, seqId + 1, currentValue), memStoreSizing);
1513    myStore.add(createCell(qf3, ts + 1, seqId + 1, currentValue), memStoreSizing);
1514    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1515    quals.add(qf1);
1516    quals.add(qf2);
1517    quals.add(qf3);
1518    try (InternalScanner scanner =
1519      (InternalScanner) myStore.getScanner(new Scan(new Get(row)), quals, seqId + 1)) {
1520      // complete the flush -- phase (3/4)
1521      storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1522      storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1523
1524      List<Cell> results = new ArrayList<>();
1525      scanner.next(results);
1526      assertEquals(3, results.size());
1527      for (Cell c : results) {
1528        byte[] actualValue = CellUtil.cloneValue(c);
1529        assertTrue("expected:" + Bytes.toStringBinary(currentValue) + ", actual:"
1530          + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, currentValue));
1531      }
1532    }
1533  }
1534
1535  /**
1536   * This test is for HBASE-27519, when the {@link StoreScanner} is scanning,the Flush and the
1537   * Compaction execute concurrently and theCcompaction compact and archive the flushed
1538   * {@link HStoreFile} which is used by {@link StoreScanner#updateReaders}.Before
1539   * HBASE-27519,{@link StoreScanner.updateReaders} would throw {@link FileNotFoundException}.
1540   */
1541  @Test
1542  public void testStoreScannerUpdateReadersWhenFlushAndCompactConcurrently() throws IOException {
1543    Configuration conf = HBaseConfiguration.create();
1544    conf.setBoolean(WALFactory.WAL_ENABLED, false);
1545    conf.set(DEFAULT_COMPACTION_POLICY_CLASS_KEY, EverythingPolicy.class.getName());
1546    byte[] r0 = Bytes.toBytes("row0");
1547    byte[] r1 = Bytes.toBytes("row1");
1548    final CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
1549    final AtomicBoolean shouldWaitRef = new AtomicBoolean(false);
1550    // Initialize region
1551    final MyStore myStore = initMyStore(name.getMethodName(), conf, new MyStoreHook() {
1552      @Override
1553      public void getScanners(MyStore store) throws IOException {
1554        try {
1555          // Here this method is called by StoreScanner.updateReaders which is invoked by the
1556          // following TestHStore.flushStore
1557          if (shouldWaitRef.get()) {
1558            // wait the following compaction Task start
1559            cyclicBarrier.await();
1560            // wait the following HStore.closeAndArchiveCompactedFiles end.
1561            cyclicBarrier.await();
1562          }
1563        } catch (BrokenBarrierException | InterruptedException e) {
1564          throw new RuntimeException(e);
1565        }
1566      }
1567    });
1568
1569    final AtomicReference<Throwable> compactionExceptionRef = new AtomicReference<Throwable>(null);
1570    Runnable compactionTask = () -> {
1571      try {
1572        // Only when the StoreScanner.updateReaders invoked by TestHStore.flushStore prepares for
1573        // entering the MyStore.getScanners, compactionTask could start.
1574        cyclicBarrier.await();
1575        region.compactStore(family, new NoLimitThroughputController());
1576        myStore.closeAndArchiveCompactedFiles();
1577        // Notify StoreScanner.updateReaders could enter MyStore.getScanners.
1578        cyclicBarrier.await();
1579      } catch (Throwable e) {
1580        compactionExceptionRef.set(e);
1581      }
1582    };
1583
1584    long ts = EnvironmentEdgeManager.currentTime();
1585    long seqId = 100;
1586    byte[] value = Bytes.toBytes("value");
1587    // older data whihc shouldn't be "seen" by client
1588    myStore.add(createCell(r0, qf1, ts, seqId, value), null);
1589    flushStore(myStore, id++);
1590    myStore.add(createCell(r0, qf2, ts, seqId, value), null);
1591    flushStore(myStore, id++);
1592    myStore.add(createCell(r0, qf3, ts, seqId, value), null);
1593    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1594    quals.add(qf1);
1595    quals.add(qf2);
1596    quals.add(qf3);
1597
1598    myStore.add(createCell(r1, qf1, ts, seqId, value), null);
1599    myStore.add(createCell(r1, qf2, ts, seqId, value), null);
1600    myStore.add(createCell(r1, qf3, ts, seqId, value), null);
1601
1602    Thread.currentThread()
1603      .setName("testStoreScannerUpdateReadersWhenFlushAndCompactConcurrently thread");
1604    Scan scan = new Scan();
1605    scan.withStartRow(r0, true);
1606    try (InternalScanner scanner = (InternalScanner) myStore.getScanner(scan, quals, seqId)) {
1607      List<Cell> results = new MyList<>(size -> {
1608        switch (size) {
1609          case 1:
1610            shouldWaitRef.set(true);
1611            Thread thread = new Thread(compactionTask);
1612            thread.setName("MyCompacting Thread.");
1613            thread.start();
1614            try {
1615              flushStore(myStore, id++);
1616              thread.join();
1617            } catch (IOException | InterruptedException e) {
1618              throw new RuntimeException(e);
1619            }
1620            shouldWaitRef.set(false);
1621            break;
1622          default:
1623            break;
1624        }
1625      });
1626      // Before HBASE-27519, here would throw java.io.FileNotFoundException because the storeFile
1627      // which used by StoreScanner.updateReaders is deleted by compactionTask.
1628      scanner.next(results);
1629      // The results is r0 row cells.
1630      assertEquals(3, results.size());
1631      assertTrue(compactionExceptionRef.get() == null);
1632    }
1633  }
1634
1635  @Test
1636  public void testReclaimChunkWhenScaning() throws IOException {
1637    init("testReclaimChunkWhenScaning");
1638    long ts = EnvironmentEdgeManager.currentTime();
1639    long seqId = 100;
1640    byte[] value = Bytes.toBytes("value");
1641    // older data whihc shouldn't be "seen" by client
1642    store.add(createCell(qf1, ts, seqId, value), null);
1643    store.add(createCell(qf2, ts, seqId, value), null);
1644    store.add(createCell(qf3, ts, seqId, value), null);
1645    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1646    quals.add(qf1);
1647    quals.add(qf2);
1648    quals.add(qf3);
1649    try (InternalScanner scanner =
1650      (InternalScanner) store.getScanner(new Scan(new Get(row)), quals, seqId)) {
1651      List<Cell> results = new MyList<>(size -> {
1652        switch (size) {
1653          // 1) we get the first cell (qf1)
1654          // 2) flush the data to have StoreScanner update inner scanners
1655          // 3) the chunk will be reclaimed after updaing
1656          case 1:
1657            try {
1658              flushStore(store, id++);
1659            } catch (IOException e) {
1660              throw new RuntimeException(e);
1661            }
1662            break;
1663          // 1) we get the second cell (qf2)
1664          // 2) add some cell to fill some byte into the chunk (we have only one chunk)
1665          case 2:
1666            try {
1667              byte[] newValue = Bytes.toBytes("newValue");
1668              // older data whihc shouldn't be "seen" by client
1669              store.add(createCell(qf1, ts + 1, seqId + 1, newValue), null);
1670              store.add(createCell(qf2, ts + 1, seqId + 1, newValue), null);
1671              store.add(createCell(qf3, ts + 1, seqId + 1, newValue), null);
1672            } catch (IOException e) {
1673              throw new RuntimeException(e);
1674            }
1675            break;
1676          default:
1677            break;
1678        }
1679      });
1680      scanner.next(results);
1681      assertEquals(3, results.size());
1682      for (Cell c : results) {
1683        byte[] actualValue = CellUtil.cloneValue(c);
1684        assertTrue("expected:" + Bytes.toStringBinary(value) + ", actual:"
1685          + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value));
1686      }
1687    }
1688  }
1689
1690  /**
1691   * If there are two running InMemoryFlushRunnable, the later InMemoryFlushRunnable may change the
1692   * versionedList. And the first InMemoryFlushRunnable will use the chagned versionedList to remove
1693   * the corresponding segments. In short, there will be some segements which isn't in merge are
1694   * removed.
1695   */
1696  @Test
1697  public void testRunDoubleMemStoreCompactors() throws IOException, InterruptedException {
1698    int flushSize = 500;
1699    Configuration conf = HBaseConfiguration.create();
1700    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStoreWithCustomCompactor.class.getName());
1701    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.25);
1702    MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.set(0);
1703    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushSize));
1704    // Set the lower threshold to invoke the "MERGE" policy
1705    conf.set(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, String.valueOf(0));
1706    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
1707      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
1708    byte[] value = Bytes.toBytes("thisisavarylargevalue");
1709    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1710    long ts = EnvironmentEdgeManager.currentTime();
1711    long seqId = 100;
1712    // older data whihc shouldn't be "seen" by client
1713    store.add(createCell(qf1, ts, seqId, value), memStoreSizing);
1714    store.add(createCell(qf2, ts, seqId, value), memStoreSizing);
1715    store.add(createCell(qf3, ts, seqId, value), memStoreSizing);
1716    assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
1717    StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY);
1718    storeFlushCtx.prepare();
1719    // This shouldn't invoke another in-memory flush because the first compactor thread
1720    // hasn't accomplished the in-memory compaction.
1721    store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing);
1722    store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing);
1723    store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing);
1724    assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
1725    // okay. Let the compaction be completed
1726    MyMemStoreCompactor.START_COMPACTOR_LATCH.countDown();
1727    CompactingMemStore mem = (CompactingMemStore) ((HStore) store).memstore;
1728    while (mem.isMemStoreFlushingInMemory()) {
1729      TimeUnit.SECONDS.sleep(1);
1730    }
1731    // This should invoke another in-memory flush.
1732    store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing);
1733    store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing);
1734    store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing);
1735    assertEquals(2, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
1736    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
1737      String.valueOf(TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE));
1738    storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1739    storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1740  }
1741
1742  @Test
1743  public void testAge() throws IOException {
1744    long currentTime = EnvironmentEdgeManager.currentTime();
1745    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
1746    edge.setValue(currentTime);
1747    EnvironmentEdgeManager.injectEdge(edge);
1748    Configuration conf = TEST_UTIL.getConfiguration();
1749    ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(family);
1750    initHRegion(name.getMethodName(), conf,
1751      TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), hcd, null, false);
1752    HStore store = new HStore(region, hcd, conf, false) {
1753
1754      @Override
1755      protected StoreEngine<?, ?, ?, ?> createStoreEngine(HStore store, Configuration conf,
1756        CellComparator kvComparator) throws IOException {
1757        List<HStoreFile> storefiles =
1758          Arrays.asList(mockStoreFile(currentTime - 10), mockStoreFile(currentTime - 100),
1759            mockStoreFile(currentTime - 1000), mockStoreFile(currentTime - 10000));
1760        StoreFileManager sfm = mock(StoreFileManager.class);
1761        when(sfm.getStorefiles()).thenReturn(storefiles);
1762        StoreEngine<?, ?, ?, ?> storeEngine = mock(StoreEngine.class);
1763        when(storeEngine.getStoreFileManager()).thenReturn(sfm);
1764        return storeEngine;
1765      }
1766    };
1767    assertEquals(10L, store.getMinStoreFileAge().getAsLong());
1768    assertEquals(10000L, store.getMaxStoreFileAge().getAsLong());
1769    assertEquals((10 + 100 + 1000 + 10000) / 4.0, store.getAvgStoreFileAge().getAsDouble(), 1E-4);
1770  }
1771
1772  private HStoreFile mockStoreFile(long createdTime) {
1773    StoreFileInfo info = mock(StoreFileInfo.class);
1774    when(info.getCreatedTimestamp()).thenReturn(createdTime);
1775    HStoreFile sf = mock(HStoreFile.class);
1776    when(sf.getReader()).thenReturn(mock(StoreFileReader.class));
1777    when(sf.isHFile()).thenReturn(true);
1778    when(sf.getFileInfo()).thenReturn(info);
1779    return sf;
1780  }
1781
1782  private MyStore initMyStore(String methodName, Configuration conf, MyStoreHook hook)
1783    throws IOException {
1784    return (MyStore) init(methodName, conf,
1785      TableDescriptorBuilder.newBuilder(TableName.valueOf(table)),
1786      ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(5).build(), hook);
1787  }
1788
1789  private static class MyStore extends HStore {
1790    private final MyStoreHook hook;
1791
1792    MyStore(final HRegion region, final ColumnFamilyDescriptor family,
1793      final Configuration confParam, MyStoreHook hook, boolean switchToPread) throws IOException {
1794      super(region, family, confParam, false);
1795      this.hook = hook;
1796    }
1797
1798    @Override
1799    public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks,
1800      boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,
1801      boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt,
1802      boolean includeMemstoreScanner) throws IOException {
1803      hook.getScanners(this);
1804      return super.getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true,
1805        stopRow, false, readPt, includeMemstoreScanner);
1806    }
1807
1808    @Override
1809    public long getSmallestReadPoint() {
1810      return hook.getSmallestReadPoint(this);
1811    }
1812  }
1813
1814  private abstract static class MyStoreHook {
1815
1816    void getScanners(MyStore store) throws IOException {
1817    }
1818
1819    long getSmallestReadPoint(HStore store) {
1820      return store.getHRegion().getSmallestReadPoint();
1821    }
1822  }
1823
1824  @Test
1825  public void testSwitchingPreadtoStreamParallelyWithCompactionDischarger() throws Exception {
1826    Configuration conf = HBaseConfiguration.create();
1827    conf.set("hbase.hstore.engine.class", DummyStoreEngine.class.getName());
1828    conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 0);
1829    // Set the lower threshold to invoke the "MERGE" policy
1830    MyStore store = initMyStore(name.getMethodName(), conf, new MyStoreHook() {
1831    });
1832    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1833    long ts = EnvironmentEdgeManager.currentTime();
1834    long seqID = 1L;
1835    // Add some data to the region and do some flushes
1836    for (int i = 1; i < 10; i++) {
1837      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1838        memStoreSizing);
1839    }
1840    // flush them
1841    flushStore(store, seqID);
1842    for (int i = 11; i < 20; i++) {
1843      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1844        memStoreSizing);
1845    }
1846    // flush them
1847    flushStore(store, seqID);
1848    for (int i = 21; i < 30; i++) {
1849      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1850        memStoreSizing);
1851    }
1852    // flush them
1853    flushStore(store, seqID);
1854
1855    assertEquals(3, store.getStorefilesCount());
1856    Scan scan = new Scan();
1857    scan.addFamily(family);
1858    Collection<HStoreFile> storefiles2 = store.getStorefiles();
1859    ArrayList<HStoreFile> actualStorefiles = Lists.newArrayList(storefiles2);
1860    StoreScanner storeScanner =
1861      (StoreScanner) store.getScanner(scan, scan.getFamilyMap().get(family), Long.MAX_VALUE);
1862    // get the current heap
1863    KeyValueHeap heap = storeScanner.heap;
1864    // create more store files
1865    for (int i = 31; i < 40; i++) {
1866      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1867        memStoreSizing);
1868    }
1869    // flush them
1870    flushStore(store, seqID);
1871
1872    for (int i = 41; i < 50; i++) {
1873      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1874        memStoreSizing);
1875    }
1876    // flush them
1877    flushStore(store, seqID);
1878    storefiles2 = store.getStorefiles();
1879    ArrayList<HStoreFile> actualStorefiles1 = Lists.newArrayList(storefiles2);
1880    actualStorefiles1.removeAll(actualStorefiles);
1881    // Do compaction
1882    MyThread thread = new MyThread(storeScanner);
1883    thread.start();
1884    store.replaceStoreFiles(actualStorefiles, actualStorefiles1, false);
1885    thread.join();
1886    KeyValueHeap heap2 = thread.getHeap();
1887    assertFalse(heap.equals(heap2));
1888  }
1889
1890  @Test
1891  public void testMaxPreadBytesConfiguredToBeLessThanZero() throws Exception {
1892    Configuration conf = HBaseConfiguration.create();
1893    conf.set("hbase.hstore.engine.class", DummyStoreEngine.class.getName());
1894    // Set 'hbase.storescanner.pread.max.bytes' < 0, so that StoreScanner will be a STREAM type.
1895    conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, -1);
1896    MyStore store = initMyStore(name.getMethodName(), conf, new MyStoreHook() {
1897    });
1898    Scan scan = new Scan();
1899    scan.addFamily(family);
1900    // ReadType on Scan is still DEFAULT only.
1901    assertEquals(ReadType.DEFAULT, scan.getReadType());
1902    StoreScanner storeScanner =
1903      (StoreScanner) store.getScanner(scan, scan.getFamilyMap().get(family), Long.MAX_VALUE);
1904    assertFalse(storeScanner.isScanUsePread());
1905  }
1906
1907  @Test
1908  public void testSpaceQuotaChangeAfterReplacement() throws IOException {
1909    final TableName tn = TableName.valueOf(name.getMethodName());
1910    init(name.getMethodName());
1911
1912    RegionSizeStoreImpl sizeStore = new RegionSizeStoreImpl();
1913
1914    HStoreFile sf1 = mockStoreFileWithLength(1024L);
1915    HStoreFile sf2 = mockStoreFileWithLength(2048L);
1916    HStoreFile sf3 = mockStoreFileWithLength(4096L);
1917    HStoreFile sf4 = mockStoreFileWithLength(8192L);
1918
1919    RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tn).setStartKey(Bytes.toBytes("a"))
1920      .setEndKey(Bytes.toBytes("b")).build();
1921
1922    // Compacting two files down to one, reducing size
1923    sizeStore.put(regionInfo, 1024L + 4096L);
1924    store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo, Arrays.asList(sf1, sf3),
1925      Arrays.asList(sf2));
1926
1927    assertEquals(2048L, sizeStore.getRegionSize(regionInfo).getSize());
1928
1929    // The same file length in and out should have no change
1930    store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo, Arrays.asList(sf2),
1931      Arrays.asList(sf2));
1932
1933    assertEquals(2048L, sizeStore.getRegionSize(regionInfo).getSize());
1934
1935    // Increase the total size used
1936    store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo, Arrays.asList(sf2),
1937      Arrays.asList(sf3));
1938
1939    assertEquals(4096L, sizeStore.getRegionSize(regionInfo).getSize());
1940
1941    RegionInfo regionInfo2 = RegionInfoBuilder.newBuilder(tn).setStartKey(Bytes.toBytes("b"))
1942      .setEndKey(Bytes.toBytes("c")).build();
1943    store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo2, null, Arrays.asList(sf4));
1944
1945    assertEquals(8192L, sizeStore.getRegionSize(regionInfo2).getSize());
1946  }
1947
1948  @Test
1949  public void testHFileContextSetWithCFAndTable() throws Exception {
1950    init(this.name.getMethodName());
1951    StoreFileWriter writer = store.getStoreEngine()
1952      .createWriter(CreateStoreFileWriterParams.create().maxKeyCount(10000L)
1953        .compression(Compression.Algorithm.NONE).isCompaction(true).includeMVCCReadpoint(true)
1954        .includesTag(false).shouldDropBehind(true));
1955    HFileContext hFileContext = writer.getHFileWriter().getFileContext();
1956    assertArrayEquals(family, hFileContext.getColumnFamily());
1957    assertArrayEquals(table, hFileContext.getTableName());
1958  }
1959
1960  // This test is for HBASE-26026, HBase Write be stuck when active segment has no cell
1961  // but its dataSize exceeds inmemoryFlushSize
1962  @Test
1963  public void testCompactingMemStoreNoCellButDataSizeExceedsInmemoryFlushSize()
1964    throws IOException, InterruptedException {
1965    Configuration conf = HBaseConfiguration.create();
1966
1967    byte[] smallValue = new byte[3];
1968    byte[] largeValue = new byte[9];
1969    final long timestamp = EnvironmentEdgeManager.currentTime();
1970    final long seqId = 100;
1971    final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
1972    final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
1973    int smallCellByteSize = MutableSegment.getCellLength(smallCell);
1974    int largeCellByteSize = MutableSegment.getCellLength(largeCell);
1975    int flushByteSize = smallCellByteSize + largeCellByteSize - 2;
1976
1977    // set CompactingMemStore.inmemoryFlushSize to flushByteSize.
1978    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore2.class.getName());
1979    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
1980    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
1981
1982    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
1983      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
1984
1985    MyCompactingMemStore2 myCompactingMemStore = ((MyCompactingMemStore2) store.memstore);
1986    assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
1987    myCompactingMemStore.smallCellPreUpdateCounter.set(0);
1988    myCompactingMemStore.largeCellPreUpdateCounter.set(0);
1989
1990    final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
1991    Thread smallCellThread = new Thread(() -> {
1992      try {
1993        store.add(smallCell, new NonThreadSafeMemStoreSizing());
1994      } catch (Throwable exception) {
1995        exceptionRef.set(exception);
1996      }
1997    });
1998    smallCellThread.setName(MyCompactingMemStore2.SMALL_CELL_THREAD_NAME);
1999    smallCellThread.start();
2000
2001    String oldThreadName = Thread.currentThread().getName();
2002    try {
2003      /**
2004       * 1.smallCellThread enters CompactingMemStore.checkAndAddToActiveSize first, then
2005       * largeCellThread enters CompactingMemStore.checkAndAddToActiveSize, and then largeCellThread
2006       * invokes flushInMemory.
2007       * <p/>
2008       * 2. After largeCellThread finished CompactingMemStore.flushInMemory method, smallCellThread
2009       * can add cell to currentActive . That is to say when largeCellThread called flushInMemory
2010       * method, CompactingMemStore.active has no cell.
2011       */
2012      Thread.currentThread().setName(MyCompactingMemStore2.LARGE_CELL_THREAD_NAME);
2013      store.add(largeCell, new NonThreadSafeMemStoreSizing());
2014      smallCellThread.join();
2015
2016      for (int i = 0; i < 100; i++) {
2017        long currentTimestamp = timestamp + 100 + i;
2018        Cell cell = createCell(qf2, currentTimestamp, seqId, largeValue);
2019        store.add(cell, new NonThreadSafeMemStoreSizing());
2020      }
2021    } finally {
2022      Thread.currentThread().setName(oldThreadName);
2023    }
2024
2025    assertTrue(exceptionRef.get() == null);
2026
2027  }
2028
2029  // This test is for HBASE-26210, HBase Write be stuck when there is cell which size exceeds
2030  // InmemoryFlushSize
2031  @Test(timeout = 60000)
2032  public void testCompactingMemStoreCellExceedInmemoryFlushSize() throws Exception {
2033    Configuration conf = HBaseConfiguration.create();
2034    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore6.class.getName());
2035
2036    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
2037      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
2038
2039    MyCompactingMemStore6 myCompactingMemStore = ((MyCompactingMemStore6) store.memstore);
2040
2041    int size = (int) (myCompactingMemStore.getInmemoryFlushSize());
2042    byte[] value = new byte[size + 1];
2043
2044    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
2045    long timestamp = EnvironmentEdgeManager.currentTime();
2046    long seqId = 100;
2047    Cell cell = createCell(qf1, timestamp, seqId, value);
2048    int cellByteSize = MutableSegment.getCellLength(cell);
2049    store.add(cell, memStoreSizing);
2050    assertTrue(memStoreSizing.getCellsCount() == 1);
2051    assertTrue(memStoreSizing.getDataSize() == cellByteSize);
2052    // Waiting the in memory compaction completed, see HBASE-26438
2053    myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
2054  }
2055
2056  /**
2057   * This test is for HBASE-27464, before this JIRA,when init {@link CellChunkImmutableSegment} for
2058   * 'COMPACT' action, we not force copy to current MSLab. When cell size bigger than
2059   * {@link MemStoreLABImpl#maxAlloc}, cell will stay in previous chunk which will recycle after
2060   * segment replace, and we may read wrong data when these chunk reused by others.
2061   */
2062  @Test
2063  public void testForceCloneOfBigCellForCellChunkImmutableSegment() throws Exception {
2064    Configuration conf = HBaseConfiguration.create();
2065    int maxAllocByteSize = conf.getInt(MemStoreLAB.MAX_ALLOC_KEY, MemStoreLAB.MAX_ALLOC_DEFAULT);
2066
2067    // Construct big cell,which is large than {@link MemStoreLABImpl#maxAlloc}.
2068    byte[] cellValue = new byte[maxAllocByteSize + 1];
2069    final long timestamp = EnvironmentEdgeManager.currentTime();
2070    final long seqId = 100;
2071    final byte[] rowKey1 = Bytes.toBytes("rowKey1");
2072    final Cell originalCell1 = createCell(rowKey1, qf1, timestamp, seqId, cellValue);
2073    final byte[] rowKey2 = Bytes.toBytes("rowKey2");
2074    final Cell originalCell2 = createCell(rowKey2, qf1, timestamp, seqId, cellValue);
2075    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
2076    quals.add(qf1);
2077
2078    int cellByteSize = MutableSegment.getCellLength(originalCell1);
2079    int inMemoryFlushByteSize = cellByteSize - 1;
2080
2081    // set CompactingMemStore.inmemoryFlushSize to flushByteSize.
2082    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore6.class.getName());
2083    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
2084    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(inMemoryFlushByteSize * 200));
2085    conf.setBoolean(WALFactory.WAL_ENABLED, false);
2086
2087    // Use {@link MemoryCompactionPolicy#EAGER} for always compacting.
2088    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
2089      .setInMemoryCompaction(MemoryCompactionPolicy.EAGER).build());
2090
2091    MyCompactingMemStore6 myCompactingMemStore = ((MyCompactingMemStore6) store.memstore);
2092    assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == inMemoryFlushByteSize);
2093
2094    // Data chunk Pool is disabled.
2095    assertTrue(ChunkCreator.getInstance().getMaxCount(ChunkType.DATA_CHUNK) == 0);
2096
2097    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
2098
2099    // First compact
2100    store.add(originalCell1, memStoreSizing);
2101    // Waiting for the first in-memory compaction finished
2102    myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
2103
2104    StoreScanner storeScanner =
2105      (StoreScanner) store.getScanner(new Scan(new Get(rowKey1)), quals, seqId + 1);
2106    SegmentScanner segmentScanner = getTypeKeyValueScanner(storeScanner, SegmentScanner.class);
2107    Cell resultCell1 = segmentScanner.next();
2108    assertTrue(CellUtil.equals(resultCell1, originalCell1));
2109    int cell1ChunkId = ((ExtendedCell) resultCell1).getChunkId();
2110    assertTrue(cell1ChunkId != ExtendedCell.CELL_NOT_BASED_ON_CHUNK);
2111    assertNull(segmentScanner.next());
2112    segmentScanner.close();
2113    storeScanner.close();
2114    Segment segment = segmentScanner.segment;
2115    assertTrue(segment instanceof CellChunkImmutableSegment);
2116    MemStoreLABImpl memStoreLAB1 = (MemStoreLABImpl) (segmentScanner.segment.getMemStoreLAB());
2117    assertTrue(!memStoreLAB1.isClosed());
2118    assertTrue(!memStoreLAB1.chunks.isEmpty());
2119    assertTrue(!memStoreLAB1.isReclaimed());
2120
2121    // Second compact
2122    store.add(originalCell2, memStoreSizing);
2123    // Waiting for the second in-memory compaction finished
2124    myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
2125
2126    // Before HBASE-27464, here may throw java.lang.IllegalArgumentException: In CellChunkMap, cell
2127    // must be associated with chunk.. We were looking for a cell at index 0.
2128    // The cause for this exception is because the data chunk Pool is disabled,when the data chunks
2129    // are recycled after the second in-memory compaction finished,the
2130    // {@link ChunkCreator.putbackChunks} method does not put the chunks back to the data chunk
2131    // pool,it just removes them from {@link ChunkCreator#chunkIdMap},so in
2132    // {@link CellChunkMap#getCell} we could not get the data chunk by chunkId.
2133    storeScanner = (StoreScanner) store.getScanner(new Scan(new Get(rowKey1)), quals, seqId + 1);
2134    segmentScanner = getTypeKeyValueScanner(storeScanner, SegmentScanner.class);
2135    Cell newResultCell1 = segmentScanner.next();
2136    assertTrue(newResultCell1 != resultCell1);
2137    assertTrue(CellUtil.equals(newResultCell1, originalCell1));
2138
2139    Cell resultCell2 = segmentScanner.next();
2140    assertTrue(CellUtil.equals(resultCell2, originalCell2));
2141    assertNull(segmentScanner.next());
2142    segmentScanner.close();
2143    storeScanner.close();
2144
2145    segment = segmentScanner.segment;
2146    assertTrue(segment instanceof CellChunkImmutableSegment);
2147    MemStoreLABImpl memStoreLAB2 = (MemStoreLABImpl) (segmentScanner.segment.getMemStoreLAB());
2148    assertTrue(!memStoreLAB2.isClosed());
2149    assertTrue(!memStoreLAB2.chunks.isEmpty());
2150    assertTrue(!memStoreLAB2.isReclaimed());
2151    assertTrue(memStoreLAB1.isClosed());
2152    assertTrue(memStoreLAB1.chunks.isEmpty());
2153    assertTrue(memStoreLAB1.isReclaimed());
2154  }
2155
2156  // This test is for HBASE-26210 also, test write large cell and small cell concurrently when
2157  // InmemoryFlushSize is smaller,equal with and larger than cell size.
2158  @Test
2159  public void testCompactingMemStoreWriteLargeCellAndSmallCellConcurrently()
2160    throws IOException, InterruptedException {
2161    doWriteTestLargeCellAndSmallCellConcurrently(
2162      (smallCellByteSize, largeCellByteSize) -> largeCellByteSize - 1);
2163    doWriteTestLargeCellAndSmallCellConcurrently(
2164      (smallCellByteSize, largeCellByteSize) -> largeCellByteSize);
2165    doWriteTestLargeCellAndSmallCellConcurrently(
2166      (smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize - 1);
2167    doWriteTestLargeCellAndSmallCellConcurrently(
2168      (smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize);
2169    doWriteTestLargeCellAndSmallCellConcurrently(
2170      (smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize + 1);
2171  }
2172
2173  private void doWriteTestLargeCellAndSmallCellConcurrently(IntBinaryOperator getFlushByteSize)
2174    throws IOException, InterruptedException {
2175
2176    Configuration conf = HBaseConfiguration.create();
2177
2178    byte[] smallValue = new byte[3];
2179    byte[] largeValue = new byte[100];
2180    final long timestamp = EnvironmentEdgeManager.currentTime();
2181    final long seqId = 100;
2182    final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
2183    final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
2184    int smallCellByteSize = MutableSegment.getCellLength(smallCell);
2185    int largeCellByteSize = MutableSegment.getCellLength(largeCell);
2186    int flushByteSize = getFlushByteSize.applyAsInt(smallCellByteSize, largeCellByteSize);
2187    boolean flushByteSizeLessThanSmallAndLargeCellSize =
2188      flushByteSize < (smallCellByteSize + largeCellByteSize);
2189
2190    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore3.class.getName());
2191    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
2192    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
2193
2194    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
2195      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
2196
2197    MyCompactingMemStore3 myCompactingMemStore = ((MyCompactingMemStore3) store.memstore);
2198    assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
2199    myCompactingMemStore.disableCompaction();
2200    if (flushByteSizeLessThanSmallAndLargeCellSize) {
2201      myCompactingMemStore.flushByteSizeLessThanSmallAndLargeCellSize = true;
2202    } else {
2203      myCompactingMemStore.flushByteSizeLessThanSmallAndLargeCellSize = false;
2204    }
2205
2206    final ThreadSafeMemStoreSizing memStoreSizing = new ThreadSafeMemStoreSizing();
2207    final AtomicLong totalCellByteSize = new AtomicLong(0);
2208    final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
2209    Thread smallCellThread = new Thread(() -> {
2210      try {
2211        for (int i = 1; i <= MyCompactingMemStore3.CELL_COUNT; i++) {
2212          long currentTimestamp = timestamp + i;
2213          Cell cell = createCell(qf1, currentTimestamp, seqId, smallValue);
2214          totalCellByteSize.addAndGet(MutableSegment.getCellLength(cell));
2215          store.add(cell, memStoreSizing);
2216        }
2217      } catch (Throwable exception) {
2218        exceptionRef.set(exception);
2219
2220      }
2221    });
2222    smallCellThread.setName(MyCompactingMemStore3.SMALL_CELL_THREAD_NAME);
2223    smallCellThread.start();
2224
2225    String oldThreadName = Thread.currentThread().getName();
2226    try {
2227      /**
2228       * When flushByteSizeLessThanSmallAndLargeCellSize is true:
2229       * </p>
2230       * 1.smallCellThread enters MyCompactingMemStore3.checkAndAddToActiveSize first, then
2231       * largeCellThread enters MyCompactingMemStore3.checkAndAddToActiveSize, and then
2232       * largeCellThread invokes flushInMemory.
2233       * <p/>
2234       * 2. After largeCellThread finished CompactingMemStore.flushInMemory method, smallCellThread
2235       * can run into MyCompactingMemStore3.checkAndAddToActiveSize again.
2236       * <p/>
2237       * When flushByteSizeLessThanSmallAndLargeCellSize is false: smallCellThread and
2238       * largeCellThread concurrently write one cell and wait each other, and then write another
2239       * cell etc.
2240       */
2241      Thread.currentThread().setName(MyCompactingMemStore3.LARGE_CELL_THREAD_NAME);
2242      for (int i = 1; i <= MyCompactingMemStore3.CELL_COUNT; i++) {
2243        long currentTimestamp = timestamp + i;
2244        Cell cell = createCell(qf2, currentTimestamp, seqId, largeValue);
2245        totalCellByteSize.addAndGet(MutableSegment.getCellLength(cell));
2246        store.add(cell, memStoreSizing);
2247      }
2248      smallCellThread.join();
2249
2250      assertTrue(exceptionRef.get() == null);
2251      assertTrue(memStoreSizing.getCellsCount() == (MyCompactingMemStore3.CELL_COUNT * 2));
2252      assertTrue(memStoreSizing.getDataSize() == totalCellByteSize.get());
2253      if (flushByteSizeLessThanSmallAndLargeCellSize) {
2254        assertTrue(myCompactingMemStore.flushCounter.get() == MyCompactingMemStore3.CELL_COUNT);
2255      } else {
2256        assertTrue(
2257          myCompactingMemStore.flushCounter.get() <= (MyCompactingMemStore3.CELL_COUNT - 1));
2258      }
2259    } finally {
2260      Thread.currentThread().setName(oldThreadName);
2261    }
2262  }
2263
2264  /**
2265   * <pre>
2266   * This test is for HBASE-26384,
2267   * test {@link CompactingMemStore#flattenOneSegment} and {@link CompactingMemStore#snapshot()}
2268   * execute concurrently.
2269   * The threads sequence before HBASE-26384 is(The bug only exists for branch-2,and I add UTs
2270   * for both branch-2 and master):
2271   * 1. The {@link CompactingMemStore} size exceeds
2272   *    {@link CompactingMemStore#getInmemoryFlushSize()},the write thread adds a new
2273   *    {@link ImmutableSegment}  to the head of {@link CompactingMemStore#pipeline},and start a
2274   *    in memory compact thread to execute {@link CompactingMemStore#inMemoryCompaction}.
2275   * 2. The in memory compact thread starts and then stopping before
2276   *    {@link CompactingMemStore#flattenOneSegment}.
2277   * 3. The snapshot thread starts {@link CompactingMemStore#snapshot} concurrently,after the
2278   *    snapshot thread executing {@link CompactingMemStore#getImmutableSegments},the in memory
2279   *    compact thread continues.
2280   *    Assuming {@link VersionedSegmentsList#version} returned from
2281   *    {@link CompactingMemStore#getImmutableSegments} is v.
2282   * 4. The snapshot thread stopping before {@link CompactingMemStore#swapPipelineWithNull}.
2283   * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
2284   *    {@link CompactionPipeline#version} is still v.
2285   * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
2286   *    {@link CompactionPipeline#version} is v, {@link CompactingMemStore#swapPipelineWithNull}
2287   *    thinks it is successful and continue flushing,but the {@link ImmutableSegment} in
2288   *    {@link CompactionPipeline} has changed because
2289   *    {@link CompactingMemStore#flattenOneSegment},so the {@link ImmutableSegment} is not
2290   *    removed in fact and still remaining in {@link CompactionPipeline}.
2291   *
2292   * After HBASE-26384, the 5-6 step is changed to following, which is expected behavior:
2293   * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
2294   *    {@link CompactingMemStore#flattenOneSegment} change {@link CompactionPipeline#version} to
2295   *    v+1.
2296   * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
2297   *    {@link CompactionPipeline#version} is v+1, {@link CompactingMemStore#swapPipelineWithNull}
2298   *    failed and retry the while loop in {@link CompactingMemStore#pushPipelineToSnapshot} once
2299   *    again, because there is no concurrent {@link CompactingMemStore#inMemoryCompaction} now,
2300   *    {@link CompactingMemStore#swapPipelineWithNull} succeeds.
2301   * </pre>
2302   */
2303  @Test
2304  public void testFlattenAndSnapshotCompactingMemStoreConcurrently() throws Exception {
2305    Configuration conf = HBaseConfiguration.create();
2306
2307    byte[] smallValue = new byte[3];
2308    byte[] largeValue = new byte[9];
2309    final long timestamp = EnvironmentEdgeManager.currentTime();
2310    final long seqId = 100;
2311    final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
2312    final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
2313    int smallCellByteSize = MutableSegment.getCellLength(smallCell);
2314    int largeCellByteSize = MutableSegment.getCellLength(largeCell);
2315    int totalCellByteSize = (smallCellByteSize + largeCellByteSize);
2316    int flushByteSize = totalCellByteSize - 2;
2317
2318    // set CompactingMemStore.inmemoryFlushSize to flushByteSize.
2319    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore4.class.getName());
2320    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
2321    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
2322
2323    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
2324      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
2325
2326    MyCompactingMemStore4 myCompactingMemStore = ((MyCompactingMemStore4) store.memstore);
2327    assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
2328
2329    store.add(smallCell, new NonThreadSafeMemStoreSizing());
2330    store.add(largeCell, new NonThreadSafeMemStoreSizing());
2331
2332    String oldThreadName = Thread.currentThread().getName();
2333    try {
2334      Thread.currentThread().setName(MyCompactingMemStore4.TAKE_SNAPSHOT_THREAD_NAME);
2335      /**
2336       * {@link CompactingMemStore#snapshot} must wait the in memory compact thread enters
2337       * {@link CompactingMemStore#flattenOneSegment},because {@link CompactingMemStore#snapshot}
2338       * would invoke {@link CompactingMemStore#stopCompaction}.
2339       */
2340      myCompactingMemStore.snapShotStartCyclicCyclicBarrier.await();
2341
2342      MemStoreSnapshot memStoreSnapshot = myCompactingMemStore.snapshot();
2343      myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
2344
2345      assertTrue(memStoreSnapshot.getCellsCount() == 2);
2346      assertTrue(((int) (memStoreSnapshot.getDataSize())) == totalCellByteSize);
2347      VersionedSegmentsList segments = myCompactingMemStore.getImmutableSegments();
2348      assertTrue(segments.getNumOfSegments() == 0);
2349      assertTrue(segments.getNumOfCells() == 0);
2350      assertTrue(myCompactingMemStore.setInMemoryCompactionFlagCounter.get() == 1);
2351      assertTrue(myCompactingMemStore.swapPipelineWithNullCounter.get() == 2);
2352    } finally {
2353      Thread.currentThread().setName(oldThreadName);
2354    }
2355  }
2356
2357  /**
2358   * <pre>
2359   * This test is for HBASE-26384,
2360   * test {@link CompactingMemStore#flattenOneSegment}{@link CompactingMemStore#snapshot()}
2361   * and writeMemStore execute concurrently.
2362   * The threads sequence before HBASE-26384 is(The bug only exists for branch-2,and I add UTs
2363   * for both branch-2 and master):
2364   * 1. The {@link CompactingMemStore} size exceeds
2365   *    {@link CompactingMemStore#getInmemoryFlushSize()},the write thread adds a new
2366   *    {@link ImmutableSegment}  to the head of {@link CompactingMemStore#pipeline},and start a
2367   *    in memory compact thread to execute {@link CompactingMemStore#inMemoryCompaction}.
2368   * 2. The in memory compact thread starts and then stopping before
2369   *    {@link CompactingMemStore#flattenOneSegment}.
2370   * 3. The snapshot thread starts {@link CompactingMemStore#snapshot} concurrently,after the
2371   *    snapshot thread executing {@link CompactingMemStore#getImmutableSegments},the in memory
2372   *    compact thread continues.
2373   *    Assuming {@link VersionedSegmentsList#version} returned from
2374   *    {@link CompactingMemStore#getImmutableSegments} is v.
2375   * 4. The snapshot thread stopping before {@link CompactingMemStore#swapPipelineWithNull}.
2376   * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
2377   *    {@link CompactionPipeline#version} is still v.
2378   * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
2379   *    {@link CompactionPipeline#version} is v, {@link CompactingMemStore#swapPipelineWithNull}
2380   *    thinks it is successful and continue flushing,but the {@link ImmutableSegment} in
2381   *    {@link CompactionPipeline} has changed because
2382   *    {@link CompactingMemStore#flattenOneSegment},so the {@link ImmutableSegment} is not
2383   *    removed in fact and still remaining in {@link CompactionPipeline}.
2384   *
2385   * After HBASE-26384, the 5-6 step is changed to following, which is expected behavior,
2386   * and I add step 7-8 to test there is new segment added before retry.
2387   * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
2388   *    {@link CompactingMemStore#flattenOneSegment} change {@link CompactionPipeline#version} to
2389   *     v+1.
2390   * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
2391   *    {@link CompactionPipeline#version} is v+1, {@link CompactingMemStore#swapPipelineWithNull}
2392   *    failed and retry,{@link VersionedSegmentsList#version} returned from
2393   *    {@link CompactingMemStore#getImmutableSegments} is v+1.
2394   * 7. The write thread continues writing to {@link CompactingMemStore} and
2395   *    {@link CompactingMemStore} size exceeds {@link CompactingMemStore#getInmemoryFlushSize()},
2396   *    {@link CompactingMemStore#flushInMemory(MutableSegment)} is called and a new
2397   *    {@link ImmutableSegment} is added to the head of {@link CompactingMemStore#pipeline},
2398   *    {@link CompactionPipeline#version} is still v+1.
2399   * 8. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
2400   *    {@link CompactionPipeline#version} is still v+1,
2401   *    {@link CompactingMemStore#swapPipelineWithNull} succeeds.The new {@link ImmutableSegment}
2402   *    remained at the head of {@link CompactingMemStore#pipeline},the old is removed by
2403   *    {@link CompactingMemStore#swapPipelineWithNull}.
2404   * </pre>
2405   */
2406  @Test
2407  public void testFlattenSnapshotWriteCompactingMemeStoreConcurrently() throws Exception {
2408    Configuration conf = HBaseConfiguration.create();
2409
2410    byte[] smallValue = new byte[3];
2411    byte[] largeValue = new byte[9];
2412    final long timestamp = EnvironmentEdgeManager.currentTime();
2413    final long seqId = 100;
2414    final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
2415    final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
2416    int smallCellByteSize = MutableSegment.getCellLength(smallCell);
2417    int largeCellByteSize = MutableSegment.getCellLength(largeCell);
2418    int firstWriteCellByteSize = (smallCellByteSize + largeCellByteSize);
2419    int flushByteSize = firstWriteCellByteSize - 2;
2420
2421    // set CompactingMemStore.inmemoryFlushSize to flushByteSize.
2422    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore5.class.getName());
2423    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
2424    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
2425
2426    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
2427      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
2428
2429    final MyCompactingMemStore5 myCompactingMemStore = ((MyCompactingMemStore5) store.memstore);
2430    assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
2431
2432    store.add(smallCell, new NonThreadSafeMemStoreSizing());
2433    store.add(largeCell, new NonThreadSafeMemStoreSizing());
2434
2435    final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
2436    final Cell writeAgainCell1 = createCell(qf3, timestamp, seqId + 1, largeValue);
2437    final Cell writeAgainCell2 = createCell(qf4, timestamp, seqId + 1, largeValue);
2438    final int writeAgainCellByteSize =
2439      MutableSegment.getCellLength(writeAgainCell1) + MutableSegment.getCellLength(writeAgainCell2);
2440    final Thread writeAgainThread = new Thread(() -> {
2441      try {
2442        myCompactingMemStore.writeMemStoreAgainStartCyclicBarrier.await();
2443
2444        store.add(writeAgainCell1, new NonThreadSafeMemStoreSizing());
2445        store.add(writeAgainCell2, new NonThreadSafeMemStoreSizing());
2446
2447        myCompactingMemStore.writeMemStoreAgainEndCyclicBarrier.await();
2448      } catch (Throwable exception) {
2449        exceptionRef.set(exception);
2450      }
2451    });
2452    writeAgainThread.setName(MyCompactingMemStore5.WRITE_AGAIN_THREAD_NAME);
2453    writeAgainThread.start();
2454
2455    String oldThreadName = Thread.currentThread().getName();
2456    try {
2457      Thread.currentThread().setName(MyCompactingMemStore5.TAKE_SNAPSHOT_THREAD_NAME);
2458      /**
2459       * {@link CompactingMemStore#snapshot} must wait the in memory compact thread enters
2460       * {@link CompactingMemStore#flattenOneSegment},because {@link CompactingMemStore#snapshot}
2461       * would invoke {@link CompactingMemStore#stopCompaction}.
2462       */
2463      myCompactingMemStore.snapShotStartCyclicCyclicBarrier.await();
2464      MemStoreSnapshot memStoreSnapshot = myCompactingMemStore.snapshot();
2465      myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
2466      writeAgainThread.join();
2467
2468      assertTrue(memStoreSnapshot.getCellsCount() == 2);
2469      assertTrue(((int) (memStoreSnapshot.getDataSize())) == firstWriteCellByteSize);
2470      VersionedSegmentsList segments = myCompactingMemStore.getImmutableSegments();
2471      assertTrue(segments.getNumOfSegments() == 1);
2472      assertTrue(
2473        ((int) (segments.getStoreSegments().get(0).getDataSize())) == writeAgainCellByteSize);
2474      assertTrue(segments.getNumOfCells() == 2);
2475      assertTrue(myCompactingMemStore.setInMemoryCompactionFlagCounter.get() == 2);
2476      assertTrue(exceptionRef.get() == null);
2477      assertTrue(myCompactingMemStore.swapPipelineWithNullCounter.get() == 2);
2478    } finally {
2479      Thread.currentThread().setName(oldThreadName);
2480    }
2481  }
2482
2483  /**
2484   * <pre>
2485   * This test is for HBASE-26465,
2486   * test {@link DefaultMemStore#clearSnapshot} and {@link DefaultMemStore#getScanners} execute
2487   * concurrently. The threads sequence before HBASE-26465 is:
2488   * 1.The flush thread starts {@link DefaultMemStore} flushing after some cells have be added to
2489   *  {@link DefaultMemStore}.
2490   * 2.The flush thread stopping before {@link DefaultMemStore#clearSnapshot} in
2491   *   {@link HStore#updateStorefiles} after completed flushing memStore to hfile.
2492   * 3.The scan thread starts and stopping after {@link DefaultMemStore#getSnapshotSegments} in
2493   *   {@link DefaultMemStore#getScanners},here the scan thread gets the
2494   *   {@link DefaultMemStore#snapshot} which is created by the flush thread.
2495   * 4.The flush thread continues {@link DefaultMemStore#clearSnapshot} and close
2496   *   {@link DefaultMemStore#snapshot},because the reference count of the corresponding
2497   *   {@link MemStoreLABImpl} is 0, the {@link Chunk}s in corresponding {@link MemStoreLABImpl}
2498   *   are recycled.
2499   * 5.The scan thread continues {@link DefaultMemStore#getScanners},and create a
2500   *   {@link SegmentScanner} for this {@link DefaultMemStore#snapshot}, and increase the
2501   *   reference count of the corresponding {@link MemStoreLABImpl}, but {@link Chunk}s in
2502   *   corresponding {@link MemStoreLABImpl} are recycled by step 4, and these {@link Chunk}s may
2503   *   be overwritten by other write threads,which may cause serious problem.
2504   * After HBASE-26465,{@link DefaultMemStore#getScanners} and
2505   * {@link DefaultMemStore#clearSnapshot} could not execute concurrently.
2506   * </pre>
2507   */
2508  @Test
2509  public void testClearSnapshotGetScannerConcurrently() throws Exception {
2510    Configuration conf = HBaseConfiguration.create();
2511
2512    byte[] smallValue = new byte[3];
2513    byte[] largeValue = new byte[9];
2514    final long timestamp = EnvironmentEdgeManager.currentTime();
2515    final long seqId = 100;
2516    final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
2517    final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
2518    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
2519    quals.add(qf1);
2520    quals.add(qf2);
2521
2522    conf.set(HStore.MEMSTORE_CLASS_NAME, MyDefaultMemStore.class.getName());
2523    conf.setBoolean(WALFactory.WAL_ENABLED, false);
2524
2525    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build());
2526    MyDefaultMemStore myDefaultMemStore = (MyDefaultMemStore) (store.memstore);
2527    myDefaultMemStore.store = store;
2528
2529    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
2530    store.add(smallCell, memStoreSizing);
2531    store.add(largeCell, memStoreSizing);
2532
2533    final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
2534    final Thread flushThread = new Thread(() -> {
2535      try {
2536        flushStore(store, id++);
2537      } catch (Throwable exception) {
2538        exceptionRef.set(exception);
2539      }
2540    });
2541    flushThread.setName(MyDefaultMemStore.FLUSH_THREAD_NAME);
2542    flushThread.start();
2543
2544    String oldThreadName = Thread.currentThread().getName();
2545    StoreScanner storeScanner = null;
2546    try {
2547      Thread.currentThread().setName(MyDefaultMemStore.GET_SCANNER_THREAD_NAME);
2548
2549      /**
2550       * Wait flush thread stopping before {@link DefaultMemStore#doClearSnapshot}
2551       */
2552      myDefaultMemStore.getScannerCyclicBarrier.await();
2553
2554      storeScanner = (StoreScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1);
2555      flushThread.join();
2556
2557      if (myDefaultMemStore.shouldWait) {
2558        SegmentScanner segmentScanner = getTypeKeyValueScanner(storeScanner, SegmentScanner.class);
2559        MemStoreLABImpl memStoreLAB = (MemStoreLABImpl) (segmentScanner.segment.getMemStoreLAB());
2560        assertTrue(memStoreLAB.isClosed());
2561        assertTrue(!memStoreLAB.chunks.isEmpty());
2562        assertTrue(!memStoreLAB.isReclaimed());
2563
2564        Cell cell1 = segmentScanner.next();
2565        CellUtil.equals(smallCell, cell1);
2566        Cell cell2 = segmentScanner.next();
2567        CellUtil.equals(largeCell, cell2);
2568        assertNull(segmentScanner.next());
2569      } else {
2570        List<Cell> results = new ArrayList<>();
2571        storeScanner.next(results);
2572        assertEquals(2, results.size());
2573        CellUtil.equals(smallCell, results.get(0));
2574        CellUtil.equals(largeCell, results.get(1));
2575      }
2576      assertTrue(exceptionRef.get() == null);
2577    } finally {
2578      if (storeScanner != null) {
2579        storeScanner.close();
2580      }
2581      Thread.currentThread().setName(oldThreadName);
2582    }
2583  }
2584
2585  @SuppressWarnings("unchecked")
2586  private <T> T getTypeKeyValueScanner(StoreScanner storeScanner, Class<T> keyValueScannerClass) {
2587    List<T> resultScanners = new ArrayList<T>();
2588    for (KeyValueScanner keyValueScanner : storeScanner.currentScanners) {
2589      if (keyValueScannerClass.isInstance(keyValueScanner)) {
2590        resultScanners.add((T) keyValueScanner);
2591      }
2592    }
2593    assertTrue(resultScanners.size() == 1);
2594    return resultScanners.get(0);
2595  }
2596
2597  @Test
2598  public void testOnConfigurationChange() throws IOException {
2599    final int COMMON_MAX_FILES_TO_COMPACT = 10;
2600    final int NEW_COMMON_MAX_FILES_TO_COMPACT = 8;
2601    final int STORE_MAX_FILES_TO_COMPACT = 6;
2602
2603    // Build a table that its maxFileToCompact different from common configuration.
2604    Configuration conf = HBaseConfiguration.create();
2605    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY,
2606      COMMON_MAX_FILES_TO_COMPACT);
2607    ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.newBuilder(family)
2608      .setConfiguration(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY,
2609        String.valueOf(STORE_MAX_FILES_TO_COMPACT))
2610      .build();
2611    init(this.name.getMethodName(), conf, hcd);
2612
2613    // After updating common configuration, the conf in HStore itself must not be changed.
2614    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY,
2615      NEW_COMMON_MAX_FILES_TO_COMPACT);
2616    this.store.onConfigurationChange(conf);
2617    assertEquals(STORE_MAX_FILES_TO_COMPACT,
2618      store.getStoreEngine().getCompactionPolicy().getConf().getMaxFilesToCompact());
2619  }
2620
2621  /**
2622   * This test is for HBASE-26476
2623   */
2624  @Test
2625  public void testExtendsDefaultMemStore() throws Exception {
2626    Configuration conf = HBaseConfiguration.create();
2627    conf.setBoolean(WALFactory.WAL_ENABLED, false);
2628
2629    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build());
2630    assertTrue(this.store.memstore.getClass() == DefaultMemStore.class);
2631    tearDown();
2632
2633    conf.set(HStore.MEMSTORE_CLASS_NAME, CustomDefaultMemStore.class.getName());
2634    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build());
2635    assertTrue(this.store.memstore.getClass() == CustomDefaultMemStore.class);
2636  }
2637
2638  static class CustomDefaultMemStore extends DefaultMemStore {
2639
2640    public CustomDefaultMemStore(Configuration conf, CellComparator c,
2641      RegionServicesForStores regionServices) {
2642      super(conf, c, regionServices);
2643    }
2644
2645  }
2646
2647  /**
2648   * This test is for HBASE-26488
2649   */
2650  @Test
2651  public void testMemoryLeakWhenFlushMemStoreRetrying() throws Exception {
2652
2653    Configuration conf = HBaseConfiguration.create();
2654
2655    byte[] smallValue = new byte[3];
2656    byte[] largeValue = new byte[9];
2657    final long timestamp = EnvironmentEdgeManager.currentTime();
2658    final long seqId = 100;
2659    final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
2660    final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
2661    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
2662    quals.add(qf1);
2663    quals.add(qf2);
2664
2665    conf.set(HStore.MEMSTORE_CLASS_NAME, MyDefaultMemStore1.class.getName());
2666    conf.setBoolean(WALFactory.WAL_ENABLED, false);
2667    conf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY,
2668      MyDefaultStoreFlusher.class.getName());
2669
2670    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build());
2671    MyDefaultMemStore1 myDefaultMemStore = (MyDefaultMemStore1) (store.memstore);
2672    assertTrue((store.storeEngine.getStoreFlusher()) instanceof MyDefaultStoreFlusher);
2673
2674    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
2675    store.add(smallCell, memStoreSizing);
2676    store.add(largeCell, memStoreSizing);
2677    flushStore(store, id++);
2678
2679    MemStoreLABImpl memStoreLAB =
2680      (MemStoreLABImpl) (myDefaultMemStore.snapshotImmutableSegment.getMemStoreLAB());
2681    assertTrue(memStoreLAB.isClosed());
2682    assertTrue(memStoreLAB.getRefCntValue() == 0);
2683    assertTrue(memStoreLAB.isReclaimed());
2684    assertTrue(memStoreLAB.chunks.isEmpty());
2685    StoreScanner storeScanner = null;
2686    try {
2687      storeScanner = (StoreScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1);
2688      assertTrue(store.storeEngine.getStoreFileManager().getStorefileCount() == 1);
2689      assertTrue(store.memstore.size().getCellsCount() == 0);
2690      assertTrue(store.memstore.getSnapshotSize().getCellsCount() == 0);
2691      assertTrue(storeScanner.currentScanners.size() == 1);
2692      assertTrue(storeScanner.currentScanners.get(0) instanceof StoreFileScanner);
2693
2694      List<Cell> results = new ArrayList<>();
2695      storeScanner.next(results);
2696      assertEquals(2, results.size());
2697      CellUtil.equals(smallCell, results.get(0));
2698      CellUtil.equals(largeCell, results.get(1));
2699    } finally {
2700      if (storeScanner != null) {
2701        storeScanner.close();
2702      }
2703    }
2704  }
2705
2706  static class MyDefaultMemStore1 extends DefaultMemStore {
2707
2708    private ImmutableSegment snapshotImmutableSegment;
2709
2710    public MyDefaultMemStore1(Configuration conf, CellComparator c,
2711      RegionServicesForStores regionServices) {
2712      super(conf, c, regionServices);
2713    }
2714
2715    @Override
2716    public MemStoreSnapshot snapshot() {
2717      MemStoreSnapshot result = super.snapshot();
2718      this.snapshotImmutableSegment = snapshot;
2719      return result;
2720    }
2721
2722  }
2723
2724  public static class MyDefaultStoreFlusher extends DefaultStoreFlusher {
2725    private static final AtomicInteger failCounter = new AtomicInteger(1);
2726    private static final AtomicInteger counter = new AtomicInteger(0);
2727
2728    public MyDefaultStoreFlusher(Configuration conf, HStore store) {
2729      super(conf, store);
2730    }
2731
2732    @Override
2733    public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
2734      MonitoredTask status, ThroughputController throughputController,
2735      FlushLifeCycleTracker tracker, Consumer<Path> writerCreationTracker) throws IOException {
2736      counter.incrementAndGet();
2737      return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker,
2738        writerCreationTracker);
2739    }
2740
2741    @Override
2742    protected void performFlush(InternalScanner scanner, final CellSink sink,
2743      ThroughputController throughputController) throws IOException {
2744
2745      final int currentCount = counter.get();
2746      CellSink newCellSink = (cell) -> {
2747        if (currentCount <= failCounter.get()) {
2748          throw new IOException("Simulated exception by tests");
2749        }
2750        sink.append(cell);
2751      };
2752      super.performFlush(scanner, newCellSink, throughputController);
2753    }
2754  }
2755
2756  /**
2757   * This test is for HBASE-26494, test the {@link RefCnt} behaviors in {@link ImmutableMemStoreLAB}
2758   */
2759  @Test
2760  public void testImmutableMemStoreLABRefCnt() throws Exception {
2761    Configuration conf = HBaseConfiguration.create();
2762
2763    byte[] smallValue = new byte[3];
2764    byte[] largeValue = new byte[9];
2765    final long timestamp = EnvironmentEdgeManager.currentTime();
2766    final long seqId = 100;
2767    final Cell smallCell1 = createCell(qf1, timestamp, seqId, smallValue);
2768    final Cell largeCell1 = createCell(qf2, timestamp, seqId, largeValue);
2769    final Cell smallCell2 = createCell(qf3, timestamp, seqId + 1, smallValue);
2770    final Cell largeCell2 = createCell(qf4, timestamp, seqId + 1, largeValue);
2771    final Cell smallCell3 = createCell(qf5, timestamp, seqId + 2, smallValue);
2772    final Cell largeCell3 = createCell(qf6, timestamp, seqId + 2, largeValue);
2773
2774    int smallCellByteSize = MutableSegment.getCellLength(smallCell1);
2775    int largeCellByteSize = MutableSegment.getCellLength(largeCell1);
2776    int firstWriteCellByteSize = (smallCellByteSize + largeCellByteSize);
2777    int flushByteSize = firstWriteCellByteSize - 2;
2778
2779    // set CompactingMemStore.inmemoryFlushSize to flushByteSize.
2780    conf.set(HStore.MEMSTORE_CLASS_NAME, CompactingMemStore.class.getName());
2781    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
2782    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
2783    conf.setBoolean(WALFactory.WAL_ENABLED, false);
2784
2785    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
2786      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
2787
2788    final CompactingMemStore myCompactingMemStore = ((CompactingMemStore) store.memstore);
2789    assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
2790    myCompactingMemStore.allowCompaction.set(false);
2791
2792    NonThreadSafeMemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
2793    store.add(smallCell1, memStoreSizing);
2794    store.add(largeCell1, memStoreSizing);
2795    store.add(smallCell2, memStoreSizing);
2796    store.add(largeCell2, memStoreSizing);
2797    store.add(smallCell3, memStoreSizing);
2798    store.add(largeCell3, memStoreSizing);
2799    VersionedSegmentsList versionedSegmentsList = myCompactingMemStore.getImmutableSegments();
2800    assertTrue(versionedSegmentsList.getNumOfSegments() == 3);
2801    List<ImmutableSegment> segments = versionedSegmentsList.getStoreSegments();
2802    List<MemStoreLABImpl> memStoreLABs = new ArrayList<MemStoreLABImpl>(segments.size());
2803    for (ImmutableSegment segment : segments) {
2804      memStoreLABs.add((MemStoreLABImpl) segment.getMemStoreLAB());
2805    }
2806    List<KeyValueScanner> scanners1 = myCompactingMemStore.getScanners(Long.MAX_VALUE);
2807    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2808      assertTrue(memStoreLAB.getRefCntValue() == 2);
2809    }
2810
2811    myCompactingMemStore.allowCompaction.set(true);
2812    myCompactingMemStore.flushInMemory();
2813
2814    versionedSegmentsList = myCompactingMemStore.getImmutableSegments();
2815    assertTrue(versionedSegmentsList.getNumOfSegments() == 1);
2816    ImmutableMemStoreLAB immutableMemStoreLAB =
2817      (ImmutableMemStoreLAB) (versionedSegmentsList.getStoreSegments().get(0).getMemStoreLAB());
2818    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2819      assertTrue(memStoreLAB.getRefCntValue() == 2);
2820    }
2821
2822    List<KeyValueScanner> scanners2 = myCompactingMemStore.getScanners(Long.MAX_VALUE);
2823    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2824      assertTrue(memStoreLAB.getRefCntValue() == 2);
2825    }
2826    assertTrue(immutableMemStoreLAB.getRefCntValue() == 2);
2827    for (KeyValueScanner scanner : scanners1) {
2828      scanner.close();
2829    }
2830    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2831      assertTrue(memStoreLAB.getRefCntValue() == 1);
2832    }
2833    for (KeyValueScanner scanner : scanners2) {
2834      scanner.close();
2835    }
2836    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2837      assertTrue(memStoreLAB.getRefCntValue() == 1);
2838    }
2839    assertTrue(immutableMemStoreLAB.getRefCntValue() == 1);
2840    flushStore(store, id++);
2841    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2842      assertTrue(memStoreLAB.getRefCntValue() == 0);
2843    }
2844    assertTrue(immutableMemStoreLAB.getRefCntValue() == 0);
2845    assertTrue(immutableMemStoreLAB.isClosed());
2846    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2847      assertTrue(memStoreLAB.isClosed());
2848      assertTrue(memStoreLAB.isReclaimed());
2849      assertTrue(memStoreLAB.chunks.isEmpty());
2850    }
2851  }
2852
2853  private HStoreFile mockStoreFileWithLength(long length) {
2854    HStoreFile sf = mock(HStoreFile.class);
2855    StoreFileReader sfr = mock(StoreFileReader.class);
2856    when(sf.isHFile()).thenReturn(true);
2857    when(sf.getReader()).thenReturn(sfr);
2858    when(sfr.length()).thenReturn(length);
2859    return sf;
2860  }
2861
2862  private static class MyThread extends Thread {
2863    private StoreScanner scanner;
2864    private KeyValueHeap heap;
2865
2866    public MyThread(StoreScanner scanner) {
2867      this.scanner = scanner;
2868    }
2869
2870    public KeyValueHeap getHeap() {
2871      return this.heap;
2872    }
2873
2874    @Override
2875    public void run() {
2876      scanner.trySwitchToStreamRead();
2877      heap = scanner.heap;
2878    }
2879  }
2880
2881  private static class MyMemStoreCompactor extends MemStoreCompactor {
2882    private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0);
2883    private static final CountDownLatch START_COMPACTOR_LATCH = new CountDownLatch(1);
2884
2885    public MyMemStoreCompactor(CompactingMemStore compactingMemStore,
2886      MemoryCompactionPolicy compactionPolicy) throws IllegalArgumentIOException {
2887      super(compactingMemStore, compactionPolicy);
2888    }
2889
2890    @Override
2891    public boolean start() throws IOException {
2892      boolean isFirst = RUNNER_COUNT.getAndIncrement() == 0;
2893      if (isFirst) {
2894        try {
2895          START_COMPACTOR_LATCH.await();
2896          return super.start();
2897        } catch (InterruptedException ex) {
2898          throw new RuntimeException(ex);
2899        }
2900      }
2901      return super.start();
2902    }
2903  }
2904
2905  public static class MyCompactingMemStoreWithCustomCompactor extends CompactingMemStore {
2906    private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0);
2907
2908    public MyCompactingMemStoreWithCustomCompactor(Configuration conf, CellComparatorImpl c,
2909      HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
2910      throws IOException {
2911      super(conf, c, store, regionServices, compactionPolicy);
2912    }
2913
2914    @Override
2915    protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy)
2916      throws IllegalArgumentIOException {
2917      return new MyMemStoreCompactor(this, compactionPolicy);
2918    }
2919
2920    @Override
2921    protected boolean setInMemoryCompactionFlag() {
2922      boolean rval = super.setInMemoryCompactionFlag();
2923      if (rval) {
2924        RUNNER_COUNT.incrementAndGet();
2925        if (LOG.isDebugEnabled()) {
2926          LOG.debug("runner count: " + RUNNER_COUNT.get());
2927        }
2928      }
2929      return rval;
2930    }
2931  }
2932
2933  public static class MyCompactingMemStore extends CompactingMemStore {
2934    private static final AtomicBoolean START_TEST = new AtomicBoolean(false);
2935    private final CountDownLatch getScannerLatch = new CountDownLatch(1);
2936    private final CountDownLatch snapshotLatch = new CountDownLatch(1);
2937
2938    public MyCompactingMemStore(Configuration conf, CellComparatorImpl c, HStore store,
2939      RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
2940      throws IOException {
2941      super(conf, c, store, regionServices, compactionPolicy);
2942    }
2943
2944    @Override
2945    protected List<KeyValueScanner> createList(int capacity) {
2946      if (START_TEST.get()) {
2947        try {
2948          getScannerLatch.countDown();
2949          snapshotLatch.await();
2950        } catch (InterruptedException e) {
2951          throw new RuntimeException(e);
2952        }
2953      }
2954      return new ArrayList<>(capacity);
2955    }
2956
2957    @Override
2958    protected void pushActiveToPipeline(MutableSegment active, boolean checkEmpty) {
2959      if (START_TEST.get()) {
2960        try {
2961          getScannerLatch.await();
2962        } catch (InterruptedException e) {
2963          throw new RuntimeException(e);
2964        }
2965      }
2966
2967      super.pushActiveToPipeline(active, checkEmpty);
2968      if (START_TEST.get()) {
2969        snapshotLatch.countDown();
2970      }
2971    }
2972  }
2973
2974  interface MyListHook {
2975    void hook(int currentSize);
2976  }
2977
2978  private static class MyList<T> implements List<T> {
2979    private final List<T> delegatee = new ArrayList<>();
2980    private final MyListHook hookAtAdd;
2981
2982    MyList(final MyListHook hookAtAdd) {
2983      this.hookAtAdd = hookAtAdd;
2984    }
2985
2986    @Override
2987    public int size() {
2988      return delegatee.size();
2989    }
2990
2991    @Override
2992    public boolean isEmpty() {
2993      return delegatee.isEmpty();
2994    }
2995
2996    @Override
2997    public boolean contains(Object o) {
2998      return delegatee.contains(o);
2999    }
3000
3001    @Override
3002    public Iterator<T> iterator() {
3003      return delegatee.iterator();
3004    }
3005
3006    @Override
3007    public Object[] toArray() {
3008      return delegatee.toArray();
3009    }
3010
3011    @Override
3012    public <R> R[] toArray(R[] a) {
3013      return delegatee.toArray(a);
3014    }
3015
3016    @Override
3017    public boolean add(T e) {
3018      hookAtAdd.hook(size());
3019      return delegatee.add(e);
3020    }
3021
3022    @Override
3023    public boolean remove(Object o) {
3024      return delegatee.remove(o);
3025    }
3026
3027    @Override
3028    public boolean containsAll(Collection<?> c) {
3029      return delegatee.containsAll(c);
3030    }
3031
3032    @Override
3033    public boolean addAll(Collection<? extends T> c) {
3034      return delegatee.addAll(c);
3035    }
3036
3037    @Override
3038    public boolean addAll(int index, Collection<? extends T> c) {
3039      return delegatee.addAll(index, c);
3040    }
3041
3042    @Override
3043    public boolean removeAll(Collection<?> c) {
3044      return delegatee.removeAll(c);
3045    }
3046
3047    @Override
3048    public boolean retainAll(Collection<?> c) {
3049      return delegatee.retainAll(c);
3050    }
3051
3052    @Override
3053    public void clear() {
3054      delegatee.clear();
3055    }
3056
3057    @Override
3058    public T get(int index) {
3059      return delegatee.get(index);
3060    }
3061
3062    @Override
3063    public T set(int index, T element) {
3064      return delegatee.set(index, element);
3065    }
3066
3067    @Override
3068    public void add(int index, T element) {
3069      delegatee.add(index, element);
3070    }
3071
3072    @Override
3073    public T remove(int index) {
3074      return delegatee.remove(index);
3075    }
3076
3077    @Override
3078    public int indexOf(Object o) {
3079      return delegatee.indexOf(o);
3080    }
3081
3082    @Override
3083    public int lastIndexOf(Object o) {
3084      return delegatee.lastIndexOf(o);
3085    }
3086
3087    @Override
3088    public ListIterator<T> listIterator() {
3089      return delegatee.listIterator();
3090    }
3091
3092    @Override
3093    public ListIterator<T> listIterator(int index) {
3094      return delegatee.listIterator(index);
3095    }
3096
3097    @Override
3098    public List<T> subList(int fromIndex, int toIndex) {
3099      return delegatee.subList(fromIndex, toIndex);
3100    }
3101  }
3102
3103  public static class MyCompactingMemStore2 extends CompactingMemStore {
3104    private static final String LARGE_CELL_THREAD_NAME = "largeCellThread";
3105    private static final String SMALL_CELL_THREAD_NAME = "smallCellThread";
3106    private final CyclicBarrier preCyclicBarrier = new CyclicBarrier(2);
3107    private final CyclicBarrier postCyclicBarrier = new CyclicBarrier(2);
3108    private final AtomicInteger largeCellPreUpdateCounter = new AtomicInteger(0);
3109    private final AtomicInteger smallCellPreUpdateCounter = new AtomicInteger(0);
3110
3111    public MyCompactingMemStore2(Configuration conf, CellComparatorImpl cellComparator,
3112      HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
3113      throws IOException {
3114      super(conf, cellComparator, store, regionServices, compactionPolicy);
3115    }
3116
3117    @Override
3118    protected boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd,
3119      MemStoreSizing memstoreSizing) {
3120      if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) {
3121        int currentCount = largeCellPreUpdateCounter.incrementAndGet();
3122        if (currentCount <= 1) {
3123          try {
3124            /**
3125             * smallCellThread enters CompactingMemStore.checkAndAddToActiveSize first, then
3126             * largeCellThread enters CompactingMemStore.checkAndAddToActiveSize, and then
3127             * largeCellThread invokes flushInMemory.
3128             */
3129            preCyclicBarrier.await();
3130          } catch (Throwable e) {
3131            throw new RuntimeException(e);
3132          }
3133        }
3134      }
3135
3136      boolean returnValue = super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing);
3137      if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) {
3138        try {
3139          preCyclicBarrier.await();
3140        } catch (Throwable e) {
3141          throw new RuntimeException(e);
3142        }
3143      }
3144      return returnValue;
3145    }
3146
3147    @Override
3148    protected void doAdd(MutableSegment currentActive, Cell cell, MemStoreSizing memstoreSizing) {
3149      if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) {
3150        try {
3151          /**
3152           * After largeCellThread finished flushInMemory method, smallCellThread can add cell to
3153           * currentActive . That is to say when largeCellThread called flushInMemory method,
3154           * currentActive has no cell.
3155           */
3156          postCyclicBarrier.await();
3157        } catch (Throwable e) {
3158          throw new RuntimeException(e);
3159        }
3160      }
3161      super.doAdd(currentActive, cell, memstoreSizing);
3162    }
3163
3164    @Override
3165    protected void flushInMemory(MutableSegment currentActiveMutableSegment) {
3166      super.flushInMemory(currentActiveMutableSegment);
3167      if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) {
3168        if (largeCellPreUpdateCounter.get() <= 1) {
3169          try {
3170            postCyclicBarrier.await();
3171          } catch (Throwable e) {
3172            throw new RuntimeException(e);
3173          }
3174        }
3175      }
3176    }
3177
3178  }
3179
3180  public static class MyCompactingMemStore3 extends CompactingMemStore {
3181    private static final String LARGE_CELL_THREAD_NAME = "largeCellThread";
3182    private static final String SMALL_CELL_THREAD_NAME = "smallCellThread";
3183
3184    private final CyclicBarrier preCyclicBarrier = new CyclicBarrier(2);
3185    private final CyclicBarrier postCyclicBarrier = new CyclicBarrier(2);
3186    private final AtomicInteger flushCounter = new AtomicInteger(0);
3187    private static final int CELL_COUNT = 5;
3188    private boolean flushByteSizeLessThanSmallAndLargeCellSize = true;
3189
3190    public MyCompactingMemStore3(Configuration conf, CellComparatorImpl cellComparator,
3191      HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
3192      throws IOException {
3193      super(conf, cellComparator, store, regionServices, compactionPolicy);
3194    }
3195
3196    @Override
3197    protected boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd,
3198      MemStoreSizing memstoreSizing) {
3199      if (!flushByteSizeLessThanSmallAndLargeCellSize) {
3200        return super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing);
3201      }
3202      if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) {
3203        try {
3204          preCyclicBarrier.await();
3205        } catch (Throwable e) {
3206          throw new RuntimeException(e);
3207        }
3208      }
3209
3210      boolean returnValue = super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing);
3211      if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) {
3212        try {
3213          preCyclicBarrier.await();
3214        } catch (Throwable e) {
3215          throw new RuntimeException(e);
3216        }
3217      }
3218      return returnValue;
3219    }
3220
3221    @Override
3222    protected void postUpdate(MutableSegment currentActiveMutableSegment) {
3223      super.postUpdate(currentActiveMutableSegment);
3224      if (!flushByteSizeLessThanSmallAndLargeCellSize) {
3225        try {
3226          postCyclicBarrier.await();
3227        } catch (Throwable e) {
3228          throw new RuntimeException(e);
3229        }
3230        return;
3231      }
3232
3233      if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) {
3234        try {
3235          postCyclicBarrier.await();
3236        } catch (Throwable e) {
3237          throw new RuntimeException(e);
3238        }
3239      }
3240    }
3241
3242    @Override
3243    protected void flushInMemory(MutableSegment currentActiveMutableSegment) {
3244      super.flushInMemory(currentActiveMutableSegment);
3245      flushCounter.incrementAndGet();
3246      if (!flushByteSizeLessThanSmallAndLargeCellSize) {
3247        return;
3248      }
3249
3250      assertTrue(Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME));
3251      try {
3252        postCyclicBarrier.await();
3253      } catch (Throwable e) {
3254        throw new RuntimeException(e);
3255      }
3256
3257    }
3258
3259    void disableCompaction() {
3260      allowCompaction.set(false);
3261    }
3262
3263    void enableCompaction() {
3264      allowCompaction.set(true);
3265    }
3266
3267  }
3268
3269  public static class MyCompactingMemStore4 extends CompactingMemStore {
3270    private static final String TAKE_SNAPSHOT_THREAD_NAME = "takeSnapShotThread";
3271    /**
3272     * {@link CompactingMemStore#flattenOneSegment} must execute after
3273     * {@link CompactingMemStore#getImmutableSegments}
3274     */
3275    private final CyclicBarrier flattenOneSegmentPreCyclicBarrier = new CyclicBarrier(2);
3276    /**
3277     * Only after {@link CompactingMemStore#flattenOneSegment} completed,
3278     * {@link CompactingMemStore#swapPipelineWithNull} could execute.
3279     */
3280    private final CyclicBarrier flattenOneSegmentPostCyclicBarrier = new CyclicBarrier(2);
3281    /**
3282     * Only the in memory compact thread enters {@link CompactingMemStore#flattenOneSegment},the
3283     * snapshot thread starts {@link CompactingMemStore#snapshot},because
3284     * {@link CompactingMemStore#snapshot} would invoke {@link CompactingMemStore#stopCompaction}.
3285     */
3286    private final CyclicBarrier snapShotStartCyclicCyclicBarrier = new CyclicBarrier(2);
3287    /**
3288     * To wait for {@link CompactingMemStore.InMemoryCompactionRunnable} stopping.
3289     */
3290    private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2);
3291    private final AtomicInteger getImmutableSegmentsListCounter = new AtomicInteger(0);
3292    private final AtomicInteger swapPipelineWithNullCounter = new AtomicInteger(0);
3293    private final AtomicInteger flattenOneSegmentCounter = new AtomicInteger(0);
3294    private final AtomicInteger setInMemoryCompactionFlagCounter = new AtomicInteger(0);
3295
3296    public MyCompactingMemStore4(Configuration conf, CellComparatorImpl cellComparator,
3297      HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
3298      throws IOException {
3299      super(conf, cellComparator, store, regionServices, compactionPolicy);
3300    }
3301
3302    @Override
3303    public VersionedSegmentsList getImmutableSegments() {
3304      VersionedSegmentsList result = super.getImmutableSegments();
3305      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
3306        int currentCount = getImmutableSegmentsListCounter.incrementAndGet();
3307        if (currentCount <= 1) {
3308          try {
3309            flattenOneSegmentPreCyclicBarrier.await();
3310          } catch (Throwable e) {
3311            throw new RuntimeException(e);
3312          }
3313        }
3314      }
3315      return result;
3316    }
3317
3318    @Override
3319    protected boolean swapPipelineWithNull(VersionedSegmentsList segments) {
3320      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
3321        int currentCount = swapPipelineWithNullCounter.incrementAndGet();
3322        if (currentCount <= 1) {
3323          try {
3324            flattenOneSegmentPostCyclicBarrier.await();
3325          } catch (Throwable e) {
3326            throw new RuntimeException(e);
3327          }
3328        }
3329      }
3330      boolean result = super.swapPipelineWithNull(segments);
3331      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
3332        int currentCount = swapPipelineWithNullCounter.get();
3333        if (currentCount <= 1) {
3334          assertTrue(!result);
3335        }
3336        if (currentCount == 2) {
3337          assertTrue(result);
3338        }
3339      }
3340      return result;
3341
3342    }
3343
3344    @Override
3345    public void flattenOneSegment(long requesterVersion, Action action) {
3346      int currentCount = flattenOneSegmentCounter.incrementAndGet();
3347      if (currentCount <= 1) {
3348        try {
3349          /**
3350           * {@link CompactingMemStore#snapshot} could start.
3351           */
3352          snapShotStartCyclicCyclicBarrier.await();
3353          flattenOneSegmentPreCyclicBarrier.await();
3354        } catch (Throwable e) {
3355          throw new RuntimeException(e);
3356        }
3357      }
3358      super.flattenOneSegment(requesterVersion, action);
3359      if (currentCount <= 1) {
3360        try {
3361          flattenOneSegmentPostCyclicBarrier.await();
3362        } catch (Throwable e) {
3363          throw new RuntimeException(e);
3364        }
3365      }
3366    }
3367
3368    @Override
3369    protected boolean setInMemoryCompactionFlag() {
3370      boolean result = super.setInMemoryCompactionFlag();
3371      assertTrue(result);
3372      setInMemoryCompactionFlagCounter.incrementAndGet();
3373      return result;
3374    }
3375
3376    @Override
3377    void inMemoryCompaction() {
3378      try {
3379        super.inMemoryCompaction();
3380      } finally {
3381        try {
3382          inMemoryCompactionEndCyclicBarrier.await();
3383        } catch (Throwable e) {
3384          throw new RuntimeException(e);
3385        }
3386
3387      }
3388    }
3389
3390  }
3391
3392  public static class MyCompactingMemStore5 extends CompactingMemStore {
3393    private static final String TAKE_SNAPSHOT_THREAD_NAME = "takeSnapShotThread";
3394    private static final String WRITE_AGAIN_THREAD_NAME = "writeAgainThread";
3395    /**
3396     * {@link CompactingMemStore#flattenOneSegment} must execute after
3397     * {@link CompactingMemStore#getImmutableSegments}
3398     */
3399    private final CyclicBarrier flattenOneSegmentPreCyclicBarrier = new CyclicBarrier(2);
3400    /**
3401     * Only after {@link CompactingMemStore#flattenOneSegment} completed,
3402     * {@link CompactingMemStore#swapPipelineWithNull} could execute.
3403     */
3404    private final CyclicBarrier flattenOneSegmentPostCyclicBarrier = new CyclicBarrier(2);
3405    /**
3406     * Only the in memory compact thread enters {@link CompactingMemStore#flattenOneSegment},the
3407     * snapshot thread starts {@link CompactingMemStore#snapshot},because
3408     * {@link CompactingMemStore#snapshot} would invoke {@link CompactingMemStore#stopCompaction}.
3409     */
3410    private final CyclicBarrier snapShotStartCyclicCyclicBarrier = new CyclicBarrier(2);
3411    /**
3412     * To wait for {@link CompactingMemStore.InMemoryCompactionRunnable} stopping.
3413     */
3414    private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2);
3415    private final AtomicInteger getImmutableSegmentsListCounter = new AtomicInteger(0);
3416    private final AtomicInteger swapPipelineWithNullCounter = new AtomicInteger(0);
3417    private final AtomicInteger flattenOneSegmentCounter = new AtomicInteger(0);
3418    private final AtomicInteger setInMemoryCompactionFlagCounter = new AtomicInteger(0);
3419    /**
3420     * Only the snapshot thread retry {@link CompactingMemStore#swapPipelineWithNull}, writeAgain
3421     * thread could start.
3422     */
3423    private final CyclicBarrier writeMemStoreAgainStartCyclicBarrier = new CyclicBarrier(2);
3424    /**
3425     * This is used for snapshot thread,writeAgain thread and in memory compact thread. Only the
3426     * writeAgain thread completes, {@link CompactingMemStore#swapPipelineWithNull} would
3427     * execute,and in memory compact thread would exit,because we expect that in memory compact
3428     * executing only once.
3429     */
3430    private final CyclicBarrier writeMemStoreAgainEndCyclicBarrier = new CyclicBarrier(3);
3431
3432    public MyCompactingMemStore5(Configuration conf, CellComparatorImpl cellComparator,
3433      HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
3434      throws IOException {
3435      super(conf, cellComparator, store, regionServices, compactionPolicy);
3436    }
3437
3438    @Override
3439    public VersionedSegmentsList getImmutableSegments() {
3440      VersionedSegmentsList result = super.getImmutableSegments();
3441      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
3442        int currentCount = getImmutableSegmentsListCounter.incrementAndGet();
3443        if (currentCount <= 1) {
3444          try {
3445            flattenOneSegmentPreCyclicBarrier.await();
3446          } catch (Throwable e) {
3447            throw new RuntimeException(e);
3448          }
3449        }
3450
3451      }
3452
3453      return result;
3454    }
3455
3456    @Override
3457    protected boolean swapPipelineWithNull(VersionedSegmentsList segments) {
3458      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
3459        int currentCount = swapPipelineWithNullCounter.incrementAndGet();
3460        if (currentCount <= 1) {
3461          try {
3462            flattenOneSegmentPostCyclicBarrier.await();
3463          } catch (Throwable e) {
3464            throw new RuntimeException(e);
3465          }
3466        }
3467
3468        if (currentCount == 2) {
3469          try {
3470            /**
3471             * Only the snapshot thread retry {@link CompactingMemStore#swapPipelineWithNull},
3472             * writeAgain thread could start.
3473             */
3474            writeMemStoreAgainStartCyclicBarrier.await();
3475            /**
3476             * Only the writeAgain thread completes, retry
3477             * {@link CompactingMemStore#swapPipelineWithNull} would execute.
3478             */
3479            writeMemStoreAgainEndCyclicBarrier.await();
3480          } catch (Throwable e) {
3481            throw new RuntimeException(e);
3482          }
3483        }
3484
3485      }
3486      boolean result = super.swapPipelineWithNull(segments);
3487      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
3488        int currentCount = swapPipelineWithNullCounter.get();
3489        if (currentCount <= 1) {
3490          assertTrue(!result);
3491        }
3492        if (currentCount == 2) {
3493          assertTrue(result);
3494        }
3495      }
3496      return result;
3497
3498    }
3499
3500    @Override
3501    public void flattenOneSegment(long requesterVersion, Action action) {
3502      int currentCount = flattenOneSegmentCounter.incrementAndGet();
3503      if (currentCount <= 1) {
3504        try {
3505          /**
3506           * {@link CompactingMemStore#snapshot} could start.
3507           */
3508          snapShotStartCyclicCyclicBarrier.await();
3509          flattenOneSegmentPreCyclicBarrier.await();
3510        } catch (Throwable e) {
3511          throw new RuntimeException(e);
3512        }
3513      }
3514      super.flattenOneSegment(requesterVersion, action);
3515      if (currentCount <= 1) {
3516        try {
3517          flattenOneSegmentPostCyclicBarrier.await();
3518          /**
3519           * Only the writeAgain thread completes, in memory compact thread would exit,because we
3520           * expect that in memory compact executing only once.
3521           */
3522          writeMemStoreAgainEndCyclicBarrier.await();
3523        } catch (Throwable e) {
3524          throw new RuntimeException(e);
3525        }
3526
3527      }
3528    }
3529
3530    @Override
3531    protected boolean setInMemoryCompactionFlag() {
3532      boolean result = super.setInMemoryCompactionFlag();
3533      int count = setInMemoryCompactionFlagCounter.incrementAndGet();
3534      if (count <= 1) {
3535        assertTrue(result);
3536      }
3537      if (count == 2) {
3538        assertTrue(!result);
3539      }
3540      return result;
3541    }
3542
3543    @Override
3544    void inMemoryCompaction() {
3545      try {
3546        super.inMemoryCompaction();
3547      } finally {
3548        try {
3549          inMemoryCompactionEndCyclicBarrier.await();
3550        } catch (Throwable e) {
3551          throw new RuntimeException(e);
3552        }
3553
3554      }
3555    }
3556  }
3557
3558  public static class MyCompactingMemStore6 extends CompactingMemStore {
3559    private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2);
3560
3561    public MyCompactingMemStore6(Configuration conf, CellComparatorImpl cellComparator,
3562      HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
3563      throws IOException {
3564      super(conf, cellComparator, store, regionServices, compactionPolicy);
3565    }
3566
3567    @Override
3568    void inMemoryCompaction() {
3569      try {
3570        super.inMemoryCompaction();
3571      } finally {
3572        try {
3573          inMemoryCompactionEndCyclicBarrier.await();
3574        } catch (Throwable e) {
3575          throw new RuntimeException(e);
3576        }
3577
3578      }
3579    }
3580  }
3581
3582  public static class MyDefaultMemStore extends DefaultMemStore {
3583    private static final String GET_SCANNER_THREAD_NAME = "getScannerMyThread";
3584    private static final String FLUSH_THREAD_NAME = "flushMyThread";
3585    /**
3586     * Only when flush thread enters {@link DefaultMemStore#doClearSnapShot}, getScanner thread
3587     * could start.
3588     */
3589    private final CyclicBarrier getScannerCyclicBarrier = new CyclicBarrier(2);
3590    /**
3591     * Used by getScanner thread notifies flush thread {@link DefaultMemStore#getSnapshotSegments}
3592     * completed, {@link DefaultMemStore#doClearSnapShot} could continue.
3593     */
3594    private final CyclicBarrier preClearSnapShotCyclicBarrier = new CyclicBarrier(2);
3595    /**
3596     * Used by flush thread notifies getScanner thread {@link DefaultMemStore#doClearSnapShot}
3597     * completed, {@link DefaultMemStore#getScanners} could continue.
3598     */
3599    private final CyclicBarrier postClearSnapShotCyclicBarrier = new CyclicBarrier(2);
3600    private final AtomicInteger getSnapshotSegmentsCounter = new AtomicInteger(0);
3601    private final AtomicInteger clearSnapshotCounter = new AtomicInteger(0);
3602    private volatile boolean shouldWait = true;
3603    private volatile HStore store = null;
3604
3605    public MyDefaultMemStore(Configuration conf, CellComparator cellComparator,
3606      RegionServicesForStores regionServices) throws IOException {
3607      super(conf, cellComparator, regionServices);
3608    }
3609
3610    @Override
3611    protected List<Segment> getSnapshotSegments() {
3612
3613      List<Segment> result = super.getSnapshotSegments();
3614
3615      if (Thread.currentThread().getName().equals(GET_SCANNER_THREAD_NAME)) {
3616        int currentCount = getSnapshotSegmentsCounter.incrementAndGet();
3617        if (currentCount == 1) {
3618          if (this.shouldWait) {
3619            try {
3620              /**
3621               * Notify flush thread {@link DefaultMemStore#getSnapshotSegments} completed,
3622               * {@link DefaultMemStore#doClearSnapShot} could continue.
3623               */
3624              preClearSnapShotCyclicBarrier.await();
3625              /**
3626               * Wait for {@link DefaultMemStore#doClearSnapShot} completed.
3627               */
3628              postClearSnapShotCyclicBarrier.await();
3629
3630            } catch (Throwable e) {
3631              throw new RuntimeException(e);
3632            }
3633          }
3634        }
3635      }
3636      return result;
3637    }
3638
3639    @Override
3640    protected void doClearSnapShot() {
3641      if (Thread.currentThread().getName().equals(FLUSH_THREAD_NAME)) {
3642        int currentCount = clearSnapshotCounter.incrementAndGet();
3643        if (currentCount == 1) {
3644          try {
3645            if (
3646              ((ReentrantReadWriteLock) store.getStoreEngine().getLock())
3647                .isWriteLockedByCurrentThread()
3648            ) {
3649              shouldWait = false;
3650            }
3651            /**
3652             * Only when flush thread enters {@link DefaultMemStore#doClearSnapShot}, getScanner
3653             * thread could start.
3654             */
3655            getScannerCyclicBarrier.await();
3656
3657            if (shouldWait) {
3658              /**
3659               * Wait for {@link DefaultMemStore#getSnapshotSegments} completed.
3660               */
3661              preClearSnapShotCyclicBarrier.await();
3662            }
3663          } catch (Throwable e) {
3664            throw new RuntimeException(e);
3665          }
3666        }
3667      }
3668      super.doClearSnapShot();
3669
3670      if (Thread.currentThread().getName().equals(FLUSH_THREAD_NAME)) {
3671        int currentCount = clearSnapshotCounter.get();
3672        if (currentCount == 1) {
3673          if (shouldWait) {
3674            try {
3675              /**
3676               * Notify getScanner thread {@link DefaultMemStore#doClearSnapShot} completed,
3677               * {@link DefaultMemStore#getScanners} could continue.
3678               */
3679              postClearSnapShotCyclicBarrier.await();
3680            } catch (Throwable e) {
3681              throw new RuntimeException(e);
3682            }
3683          }
3684        }
3685      }
3686    }
3687  }
3688}