001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.io.hfile.CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY;
021import static org.apache.hadoop.hbase.io.hfile.CacheConfig.CACHE_DATA_ON_READ_KEY;
022import static org.apache.hadoop.hbase.io.hfile.CacheConfig.DEFAULT_CACHE_DATA_ON_READ;
023import static org.apache.hadoop.hbase.io.hfile.CacheConfig.DEFAULT_CACHE_DATA_ON_WRITE;
024import static org.apache.hadoop.hbase.io.hfile.CacheConfig.DEFAULT_EVICT_ON_CLOSE;
025import static org.apache.hadoop.hbase.io.hfile.CacheConfig.EVICT_BLOCKS_ON_CLOSE_KEY;
026import static org.apache.hadoop.hbase.regionserver.DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY;
027import static org.junit.Assert.assertArrayEquals;
028import static org.junit.Assert.assertEquals;
029import static org.junit.Assert.assertFalse;
030import static org.junit.Assert.assertNull;
031import static org.junit.Assert.assertTrue;
032import static org.junit.Assert.fail;
033import static org.mockito.ArgumentMatchers.any;
034import static org.mockito.Mockito.mock;
035import static org.mockito.Mockito.spy;
036import static org.mockito.Mockito.times;
037import static org.mockito.Mockito.verify;
038import static org.mockito.Mockito.when;
039
040import java.io.FileNotFoundException;
041import java.io.IOException;
042import java.lang.ref.SoftReference;
043import java.security.PrivilegedExceptionAction;
044import java.util.ArrayList;
045import java.util.Arrays;
046import java.util.Collection;
047import java.util.Collections;
048import java.util.Iterator;
049import java.util.List;
050import java.util.ListIterator;
051import java.util.NavigableSet;
052import java.util.Optional;
053import java.util.TreeSet;
054import java.util.concurrent.BrokenBarrierException;
055import java.util.concurrent.ConcurrentSkipListSet;
056import java.util.concurrent.CountDownLatch;
057import java.util.concurrent.CyclicBarrier;
058import java.util.concurrent.ExecutorService;
059import java.util.concurrent.Executors;
060import java.util.concurrent.Future;
061import java.util.concurrent.ThreadPoolExecutor;
062import java.util.concurrent.TimeUnit;
063import java.util.concurrent.atomic.AtomicBoolean;
064import java.util.concurrent.atomic.AtomicInteger;
065import java.util.concurrent.atomic.AtomicLong;
066import java.util.concurrent.atomic.AtomicReference;
067import java.util.concurrent.locks.ReentrantReadWriteLock;
068import java.util.function.Consumer;
069import java.util.function.IntBinaryOperator;
070import org.apache.hadoop.conf.Configuration;
071import org.apache.hadoop.fs.FSDataOutputStream;
072import org.apache.hadoop.fs.FileStatus;
073import org.apache.hadoop.fs.FileSystem;
074import org.apache.hadoop.fs.FilterFileSystem;
075import org.apache.hadoop.fs.LocalFileSystem;
076import org.apache.hadoop.fs.Path;
077import org.apache.hadoop.fs.permission.FsPermission;
078import org.apache.hadoop.hbase.Cell;
079import org.apache.hadoop.hbase.CellBuilderType;
080import org.apache.hadoop.hbase.CellComparator;
081import org.apache.hadoop.hbase.CellComparatorImpl;
082import org.apache.hadoop.hbase.CellUtil;
083import org.apache.hadoop.hbase.ExtendedCell;
084import org.apache.hadoop.hbase.ExtendedCellBuilderFactory;
085import org.apache.hadoop.hbase.HBaseClassTestRule;
086import org.apache.hadoop.hbase.HBaseConfiguration;
087import org.apache.hadoop.hbase.HBaseTestingUtil;
088import org.apache.hadoop.hbase.HConstants;
089import org.apache.hadoop.hbase.KeyValue;
090import org.apache.hadoop.hbase.MemoryCompactionPolicy;
091import org.apache.hadoop.hbase.PrivateCellUtil;
092import org.apache.hadoop.hbase.TableName;
093import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
094import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
095import org.apache.hadoop.hbase.client.Get;
096import org.apache.hadoop.hbase.client.RegionInfo;
097import org.apache.hadoop.hbase.client.RegionInfoBuilder;
098import org.apache.hadoop.hbase.client.Scan;
099import org.apache.hadoop.hbase.client.Scan.ReadType;
100import org.apache.hadoop.hbase.client.TableDescriptor;
101import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
102import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
103import org.apache.hadoop.hbase.filter.Filter;
104import org.apache.hadoop.hbase.filter.FilterBase;
105import org.apache.hadoop.hbase.io.compress.Compression;
106import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
107import org.apache.hadoop.hbase.io.hfile.CacheConfig;
108import org.apache.hadoop.hbase.io.hfile.HFile;
109import org.apache.hadoop.hbase.io.hfile.HFileContext;
110import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
111import org.apache.hadoop.hbase.monitoring.MonitoredTask;
112import org.apache.hadoop.hbase.nio.RefCnt;
113import org.apache.hadoop.hbase.quotas.RegionSizeStoreImpl;
114import org.apache.hadoop.hbase.regionserver.ChunkCreator.ChunkType;
115import org.apache.hadoop.hbase.regionserver.MemStoreCompactionStrategy.Action;
116import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
117import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
118import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
119import org.apache.hadoop.hbase.regionserver.compactions.EverythingPolicy;
120import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
121import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
122import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
123import org.apache.hadoop.hbase.security.User;
124import org.apache.hadoop.hbase.testclassification.MediumTests;
125import org.apache.hadoop.hbase.testclassification.RegionServerTests;
126import org.apache.hadoop.hbase.util.BloomFilterUtil;
127import org.apache.hadoop.hbase.util.Bytes;
128import org.apache.hadoop.hbase.util.CommonFSUtils;
129import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
130import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
131import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
132import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
133import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
134import org.apache.hadoop.hbase.wal.WALFactory;
135import org.apache.hadoop.util.Progressable;
136import org.junit.After;
137import org.junit.AfterClass;
138import org.junit.Before;
139import org.junit.ClassRule;
140import org.junit.Rule;
141import org.junit.Test;
142import org.junit.experimental.categories.Category;
143import org.junit.rules.TestName;
144import org.mockito.Mockito;
145import org.slf4j.Logger;
146import org.slf4j.LoggerFactory;
147
148import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
149
150/**
151 * Test class for the HStore
152 */
153@Category({ RegionServerTests.class, MediumTests.class })
154public class TestHStore {
155
156  @ClassRule
157  public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestHStore.class);
158
159  private static final Logger LOG = LoggerFactory.getLogger(TestHStore.class);
160  @Rule
161  public TestName name = new TestName();
162
163  HRegion region;
164  HStore store;
165  byte[] table = Bytes.toBytes("table");
166  byte[] family = Bytes.toBytes("family");
167
168  byte[] row = Bytes.toBytes("row");
169  byte[] row2 = Bytes.toBytes("row2");
170  byte[] qf1 = Bytes.toBytes("qf1");
171  byte[] qf2 = Bytes.toBytes("qf2");
172  byte[] qf3 = Bytes.toBytes("qf3");
173  byte[] qf4 = Bytes.toBytes("qf4");
174  byte[] qf5 = Bytes.toBytes("qf5");
175  byte[] qf6 = Bytes.toBytes("qf6");
176
177  NavigableSet<byte[]> qualifiers = new ConcurrentSkipListSet<>(Bytes.BYTES_COMPARATOR);
178
179  List<Cell> expected = new ArrayList<>();
180  List<Cell> result = new ArrayList<>();
181
182  long id = EnvironmentEdgeManager.currentTime();
183  Get get = new Get(row);
184
185  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
186  private static final String DIR = TEST_UTIL.getDataTestDir("TestStore").toString();
187
188  @Before
189  public void setUp() throws IOException {
190    qualifiers.clear();
191    qualifiers.add(qf1);
192    qualifiers.add(qf3);
193    qualifiers.add(qf5);
194
195    Iterator<byte[]> iter = qualifiers.iterator();
196    while (iter.hasNext()) {
197      byte[] next = iter.next();
198      expected.add(new KeyValue(row, family, next, 1, (byte[]) null));
199      get.addColumn(family, next);
200    }
201  }
202
203  private void init(String methodName) throws IOException {
204    init(methodName, TEST_UTIL.getConfiguration());
205  }
206
207  private HStore init(String methodName, Configuration conf) throws IOException {
208    // some of the tests write 4 versions and then flush
209    // (with HBASE-4241, lower versions are collected on flush)
210    return init(methodName, conf,
211      ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(4).build());
212  }
213
214  private HStore init(String methodName, Configuration conf, ColumnFamilyDescriptor hcd)
215    throws IOException {
216    return init(methodName, conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), hcd);
217  }
218
219  private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder,
220    ColumnFamilyDescriptor hcd) throws IOException {
221    return init(methodName, conf, builder, hcd, null);
222  }
223
224  private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder,
225    ColumnFamilyDescriptor hcd, MyStoreHook hook) throws IOException {
226    return init(methodName, conf, builder, hcd, hook, false);
227  }
228
229  private void initHRegion(String methodName, Configuration conf, TableDescriptorBuilder builder,
230    ColumnFamilyDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException {
231    TableDescriptor htd = builder.setColumnFamily(hcd).build();
232    Path basedir = new Path(DIR + methodName);
233    Path tableDir = CommonFSUtils.getTableDir(basedir, htd.getTableName());
234    final Path logdir = new Path(basedir, AbstractFSWALProvider.getWALDirectoryName(methodName));
235
236    FileSystem fs = FileSystem.get(conf);
237
238    fs.delete(logdir, true);
239    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false,
240      MemStoreLABImpl.CHUNK_SIZE_DEFAULT, 1, 0, null,
241      MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
242    RegionInfo info = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
243    Configuration walConf = new Configuration(conf);
244    CommonFSUtils.setRootDir(walConf, basedir);
245    WALFactory wals = new WALFactory(walConf, methodName);
246    region = new HRegion(new HRegionFileSystem(conf, fs, tableDir, info), wals.getWAL(info), conf,
247      htd, null);
248    region.regionServicesForStores = Mockito.spy(region.regionServicesForStores);
249    ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
250    Mockito.when(region.regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool);
251  }
252
253  private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder,
254    ColumnFamilyDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException {
255    initHRegion(methodName, conf, builder, hcd, hook, switchToPread);
256    if (hook == null) {
257      store = new HStore(region, hcd, conf, false);
258    } else {
259      store = new MyStore(region, hcd, conf, hook, switchToPread);
260    }
261    region.stores.put(store.getColumnFamilyDescriptor().getName(), store);
262    return store;
263  }
264
265  /**
266   * Test we do not lose data if we fail a flush and then close. Part of HBase-10466
267   */
268  @Test
269  public void testFlushSizeSizing() throws Exception {
270    LOG.info("Setting up a faulty file system that cannot write in " + this.name.getMethodName());
271    final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
272    // Only retry once.
273    conf.setInt("hbase.hstore.flush.retries.number", 1);
274    User user = User.createUserForTesting(conf, this.name.getMethodName(), new String[] { "foo" });
275    // Inject our faulty LocalFileSystem
276    conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
277    user.runAs(new PrivilegedExceptionAction<Object>() {
278      @Override
279      public Object run() throws Exception {
280        // Make sure it worked (above is sensitive to caching details in hadoop core)
281        FileSystem fs = FileSystem.get(conf);
282        assertEquals(FaultyFileSystem.class, fs.getClass());
283        FaultyFileSystem ffs = (FaultyFileSystem) fs;
284
285        // Initialize region
286        init(name.getMethodName(), conf);
287
288        MemStoreSize mss = store.memstore.getFlushableSize();
289        assertEquals(0, mss.getDataSize());
290        LOG.info("Adding some data");
291        MemStoreSizing kvSize = new NonThreadSafeMemStoreSizing();
292        store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), kvSize);
293        // add the heap size of active (mutable) segment
294        kvSize.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0, 0);
295        mss = store.memstore.getFlushableSize();
296        assertEquals(kvSize.getMemStoreSize(), mss);
297        // Flush. Bug #1 from HBASE-10466. Make sure size calculation on failed flush is right.
298        try {
299          LOG.info("Flushing");
300          flushStore(store, id++);
301          fail("Didn't bubble up IOE!");
302        } catch (IOException ioe) {
303          assertTrue(ioe.getMessage().contains("Fault injected"));
304        }
305        // due to snapshot, change mutable to immutable segment
306        kvSize.incMemStoreSize(0,
307          CSLMImmutableSegment.DEEP_OVERHEAD_CSLM - MutableSegment.DEEP_OVERHEAD, 0, 0);
308        mss = store.memstore.getFlushableSize();
309        assertEquals(kvSize.getMemStoreSize(), mss);
310        MemStoreSizing kvSize2 = new NonThreadSafeMemStoreSizing();
311        store.add(new KeyValue(row, family, qf2, 2, (byte[]) null), kvSize2);
312        kvSize2.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0, 0);
313        // Even though we add a new kv, we expect the flushable size to be 'same' since we have
314        // not yet cleared the snapshot -- the above flush failed.
315        assertEquals(kvSize.getMemStoreSize(), mss);
316        ffs.fault.set(false);
317        flushStore(store, id++);
318        mss = store.memstore.getFlushableSize();
319        // Size should be the foreground kv size.
320        assertEquals(kvSize2.getMemStoreSize(), mss);
321        flushStore(store, id++);
322        mss = store.memstore.getFlushableSize();
323        assertEquals(0, mss.getDataSize());
324        assertEquals(MutableSegment.DEEP_OVERHEAD, mss.getHeapSize());
325        return null;
326      }
327    });
328  }
329
330  @Test
331  public void testStoreBloomFilterMetricsWithBloomRowCol() throws IOException {
332    int numStoreFiles = 5;
333    writeAndRead(BloomType.ROWCOL, numStoreFiles);
334
335    assertEquals(0, store.getBloomFilterEligibleRequestsCount());
336    // hard to know exactly the numbers here, we are just trying to
337    // prove that they are incrementing
338    assertTrue(store.getBloomFilterRequestsCount() >= numStoreFiles);
339    assertTrue(store.getBloomFilterNegativeResultsCount() > 0);
340  }
341
342  @Test
343  public void testStoreBloomFilterMetricsWithBloomRow() throws IOException {
344    int numStoreFiles = 5;
345    writeAndRead(BloomType.ROWCOL, numStoreFiles);
346
347    assertEquals(0, store.getBloomFilterEligibleRequestsCount());
348    // hard to know exactly the numbers here, we are just trying to
349    // prove that they are incrementing
350    assertTrue(store.getBloomFilterRequestsCount() >= numStoreFiles);
351    assertTrue(store.getBloomFilterNegativeResultsCount() > 0);
352  }
353
354  @Test
355  public void testStoreBloomFilterMetricsWithBloomRowPrefix() throws IOException {
356    int numStoreFiles = 5;
357    writeAndRead(BloomType.ROWPREFIX_FIXED_LENGTH, numStoreFiles);
358
359    assertEquals(0, store.getBloomFilterEligibleRequestsCount());
360    // hard to know exactly the numbers here, we are just trying to
361    // prove that they are incrementing
362    assertTrue(store.getBloomFilterRequestsCount() >= numStoreFiles);
363  }
364
365  @Test
366  public void testStoreBloomFilterMetricsWithBloomNone() throws IOException {
367    int numStoreFiles = 5;
368    writeAndRead(BloomType.NONE, numStoreFiles);
369
370    assertEquals(0, store.getBloomFilterRequestsCount());
371    assertEquals(0, store.getBloomFilterNegativeResultsCount());
372
373    // hard to know exactly the numbers here, we are just trying to
374    // prove that they are incrementing
375    assertTrue(store.getBloomFilterEligibleRequestsCount() >= numStoreFiles);
376  }
377
378  private void writeAndRead(BloomType bloomType, int numStoreFiles) throws IOException {
379    Configuration conf = HBaseConfiguration.create();
380    FileSystem fs = FileSystem.get(conf);
381
382    ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.newBuilder(family)
383      .setCompressionType(Compression.Algorithm.GZ).setBloomFilterType(bloomType)
384      .setConfiguration(BloomFilterUtil.PREFIX_LENGTH_KEY, "3").build();
385    init(name.getMethodName(), conf, hcd);
386
387    for (int i = 1; i <= numStoreFiles; i++) {
388      byte[] row = Bytes.toBytes("row" + i);
389      LOG.info("Adding some data for the store file #" + i);
390      long timeStamp = EnvironmentEdgeManager.currentTime();
391      this.store.add(new KeyValue(row, family, qf1, timeStamp, (byte[]) null), null);
392      this.store.add(new KeyValue(row, family, qf2, timeStamp, (byte[]) null), null);
393      this.store.add(new KeyValue(row, family, qf3, timeStamp, (byte[]) null), null);
394      flush(i);
395    }
396
397    // Verify the total number of store files
398    assertEquals(numStoreFiles, this.store.getStorefiles().size());
399
400    TreeSet<byte[]> columns = new TreeSet<>(Bytes.BYTES_COMPARATOR);
401    columns.add(qf1);
402
403    for (int i = 1; i <= numStoreFiles; i++) {
404      KeyValueScanner scanner =
405        store.getScanner(new Scan(new Get(Bytes.toBytes("row" + i))), columns, 0);
406      scanner.peek();
407    }
408  }
409
410  /**
411   * Verify that compression and data block encoding are respected by the createWriter method, used
412   * on store flush.
413   */
414  @Test
415  public void testCreateWriter() throws Exception {
416    Configuration conf = HBaseConfiguration.create();
417    FileSystem fs = FileSystem.get(conf);
418
419    ColumnFamilyDescriptor hcd =
420      ColumnFamilyDescriptorBuilder.newBuilder(family).setCompressionType(Compression.Algorithm.GZ)
421        .setDataBlockEncoding(DataBlockEncoding.DIFF).build();
422    init(name.getMethodName(), conf, hcd);
423
424    // Test createWriter
425    StoreFileWriter writer = store.getStoreEngine()
426      .createWriter(CreateStoreFileWriterParams.create().maxKeyCount(4)
427        .compression(hcd.getCompressionType()).isCompaction(false).includeMVCCReadpoint(true)
428        .includesTag(false).shouldDropBehind(false));
429    Path path = writer.getPath();
430    writer.append(new KeyValue(row, family, qf1, Bytes.toBytes(1)));
431    writer.append(new KeyValue(row, family, qf2, Bytes.toBytes(2)));
432    writer.append(new KeyValue(row2, family, qf1, Bytes.toBytes(3)));
433    writer.append(new KeyValue(row2, family, qf2, Bytes.toBytes(4)));
434    writer.close();
435
436    // Verify that compression and encoding settings are respected
437    HFile.Reader reader = HFile.createReader(fs, path, new CacheConfig(conf), true, conf);
438    assertEquals(hcd.getCompressionType(), reader.getTrailer().getCompressionCodec());
439    assertEquals(hcd.getDataBlockEncoding(), reader.getDataBlockEncoding());
440    reader.close();
441  }
442
443  @Test
444  public void testDeleteExpiredStoreFiles() throws Exception {
445    testDeleteExpiredStoreFiles(0);
446    testDeleteExpiredStoreFiles(1);
447  }
448
449  /**
450   * @param minVersions the MIN_VERSIONS for the column family
451   */
452  public void testDeleteExpiredStoreFiles(int minVersions) throws Exception {
453    int storeFileNum = 4;
454    int ttl = 4;
455    IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
456    EnvironmentEdgeManagerTestHelper.injectEdge(edge);
457
458    Configuration conf = HBaseConfiguration.create();
459    // Enable the expired store file deletion
460    conf.setBoolean("hbase.store.delete.expired.storefile", true);
461    // Set the compaction threshold higher to avoid normal compactions.
462    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 5);
463
464    init(name.getMethodName() + "-" + minVersions, conf, ColumnFamilyDescriptorBuilder
465      .newBuilder(family).setMinVersions(minVersions).setTimeToLive(ttl).build());
466
467    long storeTtl = this.store.getScanInfo().getTtl();
468    long sleepTime = storeTtl / storeFileNum;
469    long timeStamp;
470    // There are 4 store files and the max time stamp difference among these
471    // store files will be (this.store.ttl / storeFileNum)
472    for (int i = 1; i <= storeFileNum; i++) {
473      LOG.info("Adding some data for the store file #" + i);
474      timeStamp = EnvironmentEdgeManager.currentTime();
475      this.store.add(new KeyValue(row, family, qf1, timeStamp, (byte[]) null), null);
476      this.store.add(new KeyValue(row, family, qf2, timeStamp, (byte[]) null), null);
477      this.store.add(new KeyValue(row, family, qf3, timeStamp, (byte[]) null), null);
478      flush(i);
479      edge.incrementTime(sleepTime);
480    }
481
482    // Verify the total number of store files
483    assertEquals(storeFileNum, this.store.getStorefiles().size());
484
485    // Each call will find one expired store file and delete it before compaction happens.
486    // There will be no compaction due to threshold above. Last file will not be replaced.
487    for (int i = 1; i <= storeFileNum - 1; i++) {
488      // verify the expired store file.
489      assertFalse(this.store.requestCompaction().isPresent());
490      Collection<HStoreFile> sfs = this.store.getStorefiles();
491      // Ensure i files are gone.
492      if (minVersions == 0) {
493        assertEquals(storeFileNum - i, sfs.size());
494        // Ensure only non-expired files remain.
495        for (HStoreFile sf : sfs) {
496          assertTrue(sf.getReader().getMaxTimestamp() >= (edge.currentTime() - storeTtl));
497        }
498      } else {
499        assertEquals(storeFileNum, sfs.size());
500      }
501      // Let the next store file expired.
502      edge.incrementTime(sleepTime);
503    }
504    assertFalse(this.store.requestCompaction().isPresent());
505
506    Collection<HStoreFile> sfs = this.store.getStorefiles();
507    // Assert the last expired file is not removed.
508    if (minVersions == 0) {
509      assertEquals(1, sfs.size());
510    }
511    long ts = sfs.iterator().next().getReader().getMaxTimestamp();
512    assertTrue(ts < (edge.currentTime() - storeTtl));
513
514    for (HStoreFile sf : sfs) {
515      sf.closeStoreFile(true);
516    }
517  }
518
519  @Test
520  public void testLowestModificationTime() throws Exception {
521    Configuration conf = HBaseConfiguration.create();
522    FileSystem fs = FileSystem.get(conf);
523    // Initialize region
524    init(name.getMethodName(), conf);
525
526    int storeFileNum = 4;
527    for (int i = 1; i <= storeFileNum; i++) {
528      LOG.info("Adding some data for the store file #" + i);
529      this.store.add(new KeyValue(row, family, qf1, i, (byte[]) null), null);
530      this.store.add(new KeyValue(row, family, qf2, i, (byte[]) null), null);
531      this.store.add(new KeyValue(row, family, qf3, i, (byte[]) null), null);
532      flush(i);
533    }
534    // after flush; check the lowest time stamp
535    long lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
536    long lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
537    assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS);
538
539    // after compact; check the lowest time stamp
540    store.compact(store.requestCompaction().get(), NoLimitThroughputController.INSTANCE, null);
541    lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
542    lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
543    assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS);
544  }
545
546  private static long getLowestTimeStampFromFS(FileSystem fs,
547    final Collection<HStoreFile> candidates) throws IOException {
548    long minTs = Long.MAX_VALUE;
549    if (candidates.isEmpty()) {
550      return minTs;
551    }
552    Path[] p = new Path[candidates.size()];
553    int i = 0;
554    for (HStoreFile sf : candidates) {
555      p[i] = sf.getPath();
556      ++i;
557    }
558
559    FileStatus[] stats = fs.listStatus(p);
560    if (stats == null || stats.length == 0) {
561      return minTs;
562    }
563    for (FileStatus s : stats) {
564      minTs = Math.min(minTs, s.getModificationTime());
565    }
566    return minTs;
567  }
568
569  //////////////////////////////////////////////////////////////////////////////
570  // Get tests
571  //////////////////////////////////////////////////////////////////////////////
572
573  private static final int BLOCKSIZE_SMALL = 8192;
574
575  /**
576   * Test for hbase-1686.
577   */
578  @Test
579  public void testEmptyStoreFile() throws IOException {
580    init(this.name.getMethodName());
581    // Write a store file.
582    this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
583    this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null);
584    flush(1);
585    // Now put in place an empty store file. Its a little tricky. Have to
586    // do manually with hacked in sequence id.
587    HStoreFile f = this.store.getStorefiles().iterator().next();
588    Path storedir = f.getPath().getParent();
589    long seqid = f.getMaxSequenceId();
590    Configuration c = HBaseConfiguration.create();
591    FileSystem fs = FileSystem.get(c);
592    HFileContext meta = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build();
593    StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c), fs)
594      .withOutputDir(storedir).withFileContext(meta).build();
595    w.appendMetadata(seqid + 1, false);
596    w.close();
597    this.store.close();
598    // Reopen it... should pick up two files
599    this.store =
600      new HStore(this.store.getHRegion(), this.store.getColumnFamilyDescriptor(), c, false);
601    assertEquals(2, this.store.getStorefilesCount());
602
603    result = HBaseTestingUtil.getFromStoreFile(store, get.getRow(), qualifiers);
604    assertEquals(1, result.size());
605  }
606
607  /**
608   * Getting data from memstore only
609   */
610  @Test
611  public void testGet_FromMemStoreOnly() throws IOException {
612    init(this.name.getMethodName());
613
614    // Put data in memstore
615    this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
616    this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null);
617    this.store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null);
618    this.store.add(new KeyValue(row, family, qf4, 1, (byte[]) null), null);
619    this.store.add(new KeyValue(row, family, qf5, 1, (byte[]) null), null);
620    this.store.add(new KeyValue(row, family, qf6, 1, (byte[]) null), null);
621
622    // Get
623    result = HBaseTestingUtil.getFromStoreFile(store, get.getRow(), qualifiers);
624
625    // Compare
626    assertCheck();
627  }
628
629  @Test
630  public void testTimeRangeIfSomeCellsAreDroppedInFlush() throws IOException {
631    testTimeRangeIfSomeCellsAreDroppedInFlush(1);
632    testTimeRangeIfSomeCellsAreDroppedInFlush(3);
633    testTimeRangeIfSomeCellsAreDroppedInFlush(5);
634  }
635
636  private void testTimeRangeIfSomeCellsAreDroppedInFlush(int maxVersion) throws IOException {
637    init(this.name.getMethodName(), TEST_UTIL.getConfiguration(),
638      ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(maxVersion).build());
639    long currentTs = 100;
640    long minTs = currentTs;
641    // the extra cell won't be flushed to disk,
642    // so the min of timerange will be different between memStore and hfile.
643    for (int i = 0; i != (maxVersion + 1); ++i) {
644      this.store.add(new KeyValue(row, family, qf1, ++currentTs, (byte[]) null), null);
645      if (i == 1) {
646        minTs = currentTs;
647      }
648    }
649    flushStore(store, id++);
650
651    Collection<HStoreFile> files = store.getStorefiles();
652    assertEquals(1, files.size());
653    HStoreFile f = files.iterator().next();
654    f.initReader();
655    StoreFileReader reader = f.getReader();
656    assertEquals(minTs, reader.timeRange.getMin());
657    assertEquals(currentTs, reader.timeRange.getMax());
658  }
659
660  /**
661   * Getting data from files only
662   */
663  @Test
664  public void testGet_FromFilesOnly() throws IOException {
665    init(this.name.getMethodName());
666
667    // Put data in memstore
668    this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
669    this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null);
670    // flush
671    flush(1);
672
673    // Add more data
674    this.store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null);
675    this.store.add(new KeyValue(row, family, qf4, 1, (byte[]) null), null);
676    // flush
677    flush(2);
678
679    // Add more data
680    this.store.add(new KeyValue(row, family, qf5, 1, (byte[]) null), null);
681    this.store.add(new KeyValue(row, family, qf6, 1, (byte[]) null), null);
682    // flush
683    flush(3);
684
685    // Get
686    result = HBaseTestingUtil.getFromStoreFile(store, get.getRow(), qualifiers);
687    // this.store.get(get, qualifiers, result);
688
689    // Need to sort the result since multiple files
690    Collections.sort(result, CellComparatorImpl.COMPARATOR);
691
692    // Compare
693    assertCheck();
694  }
695
696  /**
697   * Getting data from memstore and files
698   */
699  @Test
700  public void testGet_FromMemStoreAndFiles() throws IOException {
701    init(this.name.getMethodName());
702
703    // Put data in memstore
704    this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
705    this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null);
706    // flush
707    flush(1);
708
709    // Add more data
710    this.store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null);
711    this.store.add(new KeyValue(row, family, qf4, 1, (byte[]) null), null);
712    // flush
713    flush(2);
714
715    // Add more data
716    this.store.add(new KeyValue(row, family, qf5, 1, (byte[]) null), null);
717    this.store.add(new KeyValue(row, family, qf6, 1, (byte[]) null), null);
718
719    // Get
720    result = HBaseTestingUtil.getFromStoreFile(store, get.getRow(), qualifiers);
721
722    // Need to sort the result since multiple files
723    Collections.sort(result, CellComparatorImpl.COMPARATOR);
724
725    // Compare
726    assertCheck();
727  }
728
729  private void flush(int storeFilessize) throws IOException {
730    flushStore(store, id++);
731    assertEquals(storeFilessize, this.store.getStorefiles().size());
732    assertEquals(0, ((AbstractMemStore) this.store.memstore).getActive().getCellsCount());
733  }
734
735  private void assertCheck() {
736    assertEquals(expected.size(), result.size());
737    for (int i = 0; i < expected.size(); i++) {
738      assertEquals(expected.get(i), result.get(i));
739    }
740  }
741
742  @After
743  public void tearDown() throws Exception {
744    EnvironmentEdgeManagerTestHelper.reset();
745    if (store != null) {
746      try {
747        store.close();
748      } catch (IOException e) {
749      }
750      store = null;
751    }
752    if (region != null) {
753      region.close();
754      region = null;
755    }
756  }
757
758  @AfterClass
759  public static void tearDownAfterClass() throws IOException {
760    TEST_UTIL.cleanupTestDir();
761  }
762
763  @Test
764  public void testHandleErrorsInFlush() throws Exception {
765    LOG.info("Setting up a faulty file system that cannot write");
766
767    final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
768    User user = User.createUserForTesting(conf, "testhandleerrorsinflush", new String[] { "foo" });
769    // Inject our faulty LocalFileSystem
770    conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
771    user.runAs(new PrivilegedExceptionAction<Object>() {
772      @Override
773      public Object run() throws Exception {
774        // Make sure it worked (above is sensitive to caching details in hadoop core)
775        FileSystem fs = FileSystem.get(conf);
776        assertEquals(FaultyFileSystem.class, fs.getClass());
777
778        // Initialize region
779        init(name.getMethodName(), conf);
780
781        LOG.info("Adding some data");
782        store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
783        store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null);
784        store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null);
785
786        LOG.info("Before flush, we should have no files");
787
788        Collection<StoreFileInfo> files =
789          store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName());
790        assertEquals(0, files != null ? files.size() : 0);
791
792        // flush
793        try {
794          LOG.info("Flushing");
795          flush(1);
796          fail("Didn't bubble up IOE!");
797        } catch (IOException ioe) {
798          assertTrue(ioe.getMessage().contains("Fault injected"));
799        }
800
801        LOG.info("After failed flush, we should still have no files!");
802        files = store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName());
803        assertEquals(0, files != null ? files.size() : 0);
804        store.getHRegion().getWAL().close();
805        return null;
806      }
807    });
808    FileSystem.closeAllForUGI(user.getUGI());
809  }
810
811  /**
812   * Faulty file system that will fail if you write past its fault position the FIRST TIME only;
813   * thereafter it will succeed. Used by {@link TestHRegion} too.
814   */
815  static class FaultyFileSystem extends FilterFileSystem {
816    List<SoftReference<FaultyOutputStream>> outStreams = new ArrayList<>();
817    private long faultPos = 200;
818    AtomicBoolean fault = new AtomicBoolean(true);
819
820    public FaultyFileSystem() {
821      super(new LocalFileSystem());
822      LOG.info("Creating faulty!");
823    }
824
825    @Override
826    public FSDataOutputStream create(Path p) throws IOException {
827      return new FaultyOutputStream(super.create(p), faultPos, fault);
828    }
829
830    @Override
831    public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite,
832      int bufferSize, short replication, long blockSize, Progressable progress) throws IOException {
833      return new FaultyOutputStream(
834        super.create(f, permission, overwrite, bufferSize, replication, blockSize, progress),
835        faultPos, fault);
836    }
837
838    @Override
839    public FSDataOutputStream createNonRecursive(Path f, boolean overwrite, int bufferSize,
840      short replication, long blockSize, Progressable progress) throws IOException {
841      // Fake it. Call create instead. The default implementation throws an IOE
842      // that this is not supported.
843      return create(f, overwrite, bufferSize, replication, blockSize, progress);
844    }
845  }
846
847  static class FaultyOutputStream extends FSDataOutputStream {
848    volatile long faultPos = Long.MAX_VALUE;
849    private final AtomicBoolean fault;
850
851    public FaultyOutputStream(FSDataOutputStream out, long faultPos, final AtomicBoolean fault)
852      throws IOException {
853      super(out, null);
854      this.faultPos = faultPos;
855      this.fault = fault;
856    }
857
858    @Override
859    public synchronized void write(byte[] buf, int offset, int length) throws IOException {
860      LOG.info("faulty stream write at pos " + getPos());
861      injectFault();
862      super.write(buf, offset, length);
863    }
864
865    private void injectFault() throws IOException {
866      if (this.fault.get() && getPos() >= faultPos) {
867        throw new IOException("Fault injected");
868      }
869    }
870  }
871
872  private static StoreFlushContext flushStore(HStore store, long id) throws IOException {
873    StoreFlushContext storeFlushCtx = store.createFlushContext(id, FlushLifeCycleTracker.DUMMY);
874    storeFlushCtx.prepare();
875    storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
876    storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
877    return storeFlushCtx;
878  }
879
880  /**
881   * Generate a list of KeyValues for testing based on given parameters
882   * @return the rows key-value list
883   */
884  private List<ExtendedCell> getKeyValueSet(long[] timestamps, int numRows, byte[] qualifier,
885    byte[] family) {
886    List<ExtendedCell> kvList = new ArrayList<>();
887    for (int i = 1; i <= numRows; i++) {
888      byte[] b = Bytes.toBytes(i);
889      for (long timestamp : timestamps) {
890        kvList.add(new KeyValue(b, family, qualifier, timestamp, b));
891      }
892    }
893    return kvList;
894  }
895
896  /**
897   * Test to ensure correctness when using Stores with multiple timestamps
898   */
899  @Test
900  public void testMultipleTimestamps() throws IOException {
901    int numRows = 1;
902    long[] timestamps1 = new long[] { 1, 5, 10, 20 };
903    long[] timestamps2 = new long[] { 30, 80 };
904
905    init(this.name.getMethodName());
906
907    List<ExtendedCell> kvList1 = getKeyValueSet(timestamps1, numRows, qf1, family);
908    for (ExtendedCell kv : kvList1) {
909      this.store.add(kv, null);
910    }
911
912    flushStore(store, id++);
913
914    List<ExtendedCell> kvList2 = getKeyValueSet(timestamps2, numRows, qf1, family);
915    for (ExtendedCell kv : kvList2) {
916      this.store.add(kv, null);
917    }
918
919    List<Cell> result;
920    Get get = new Get(Bytes.toBytes(1));
921    get.addColumn(family, qf1);
922
923    get.setTimeRange(0, 15);
924    result = HBaseTestingUtil.getFromStoreFile(store, get);
925    assertTrue(result.size() > 0);
926
927    get.setTimeRange(40, 90);
928    result = HBaseTestingUtil.getFromStoreFile(store, get);
929    assertTrue(result.size() > 0);
930
931    get.setTimeRange(10, 45);
932    result = HBaseTestingUtil.getFromStoreFile(store, get);
933    assertTrue(result.size() > 0);
934
935    get.setTimeRange(80, 145);
936    result = HBaseTestingUtil.getFromStoreFile(store, get);
937    assertTrue(result.size() > 0);
938
939    get.setTimeRange(1, 2);
940    result = HBaseTestingUtil.getFromStoreFile(store, get);
941    assertTrue(result.size() > 0);
942
943    get.setTimeRange(90, 200);
944    result = HBaseTestingUtil.getFromStoreFile(store, get);
945    assertTrue(result.size() == 0);
946  }
947
948  /**
949   * Test for HBASE-3492 - Test split on empty colfam (no store files).
950   * @throws IOException When the IO operations fail.
951   */
952  @Test
953  public void testSplitWithEmptyColFam() throws IOException {
954    init(this.name.getMethodName());
955    assertFalse(store.getSplitPoint().isPresent());
956  }
957
958  @Test
959  public void testStoreUsesConfigurationFromHcdAndHtd() throws Exception {
960    final String CONFIG_KEY = "hbase.regionserver.thread.compaction.throttle";
961    long anyValue = 10;
962
963    // We'll check that it uses correct config and propagates it appropriately by going thru
964    // the simplest "real" path I can find - "throttleCompaction", which just checks whether
965    // a number we pass in is higher than some config value, inside compactionPolicy.
966    Configuration conf = HBaseConfiguration.create();
967    conf.setLong(CONFIG_KEY, anyValue);
968    init(name.getMethodName() + "-xml", conf);
969    assertTrue(store.throttleCompaction(anyValue + 1));
970    assertFalse(store.throttleCompaction(anyValue));
971
972    // HTD overrides XML.
973    --anyValue;
974    init(
975      name.getMethodName() + "-htd", conf, TableDescriptorBuilder
976        .newBuilder(TableName.valueOf(table)).setValue(CONFIG_KEY, Long.toString(anyValue)),
977      ColumnFamilyDescriptorBuilder.of(family));
978    assertTrue(store.throttleCompaction(anyValue + 1));
979    assertFalse(store.throttleCompaction(anyValue));
980
981    // HCD overrides them both.
982    --anyValue;
983    init(name.getMethodName() + "-hcd", conf,
984      TableDescriptorBuilder.newBuilder(TableName.valueOf(table)).setValue(CONFIG_KEY,
985        Long.toString(anyValue)),
986      ColumnFamilyDescriptorBuilder.newBuilder(family).setValue(CONFIG_KEY, Long.toString(anyValue))
987        .build());
988    assertTrue(store.throttleCompaction(anyValue + 1));
989    assertFalse(store.throttleCompaction(anyValue));
990  }
991
992  public static class DummyStoreEngine extends DefaultStoreEngine {
993    public static DefaultCompactor lastCreatedCompactor = null;
994
995    @Override
996    protected void createComponents(Configuration conf, HStore store, CellComparator comparator)
997      throws IOException {
998      super.createComponents(conf, store, comparator);
999      lastCreatedCompactor = this.compactor;
1000    }
1001  }
1002
1003  @Test
1004  public void testStoreUsesSearchEngineOverride() throws Exception {
1005    Configuration conf = HBaseConfiguration.create();
1006    conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DummyStoreEngine.class.getName());
1007    init(this.name.getMethodName(), conf);
1008    assertEquals(DummyStoreEngine.lastCreatedCompactor, this.store.storeEngine.getCompactor());
1009  }
1010
1011  private void addStoreFile() throws IOException {
1012    HStoreFile f = this.store.getStorefiles().iterator().next();
1013    Path storedir = f.getPath().getParent();
1014    long seqid = this.store.getMaxSequenceId().orElse(0L);
1015    Configuration c = TEST_UTIL.getConfiguration();
1016    FileSystem fs = FileSystem.get(c);
1017    HFileContext fileContext = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build();
1018    StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c), fs)
1019      .withOutputDir(storedir).withFileContext(fileContext).build();
1020    w.appendMetadata(seqid + 1, false);
1021    w.close();
1022    LOG.info("Added store file:" + w.getPath());
1023  }
1024
1025  private void archiveStoreFile(int index) throws IOException {
1026    Collection<HStoreFile> files = this.store.getStorefiles();
1027    HStoreFile sf = null;
1028    Iterator<HStoreFile> it = files.iterator();
1029    for (int i = 0; i <= index; i++) {
1030      sf = it.next();
1031    }
1032    store.getRegionFileSystem().removeStoreFiles(store.getColumnFamilyName(),
1033      Lists.newArrayList(sf));
1034  }
1035
1036  private void closeCompactedFile(int index) throws IOException {
1037    Collection<HStoreFile> files =
1038      this.store.getStoreEngine().getStoreFileManager().getCompactedfiles();
1039    if (files.size() > 0) {
1040      HStoreFile sf = null;
1041      Iterator<HStoreFile> it = files.iterator();
1042      for (int i = 0; i <= index; i++) {
1043        sf = it.next();
1044      }
1045      sf.closeStoreFile(true);
1046      store.getStoreEngine().getStoreFileManager()
1047        .removeCompactedFiles(Collections.singletonList(sf));
1048    }
1049  }
1050
1051  @Test
1052  public void testRefreshStoreFiles() throws Exception {
1053    init(name.getMethodName());
1054
1055    assertEquals(0, this.store.getStorefilesCount());
1056
1057    // Test refreshing store files when no store files are there
1058    store.refreshStoreFiles();
1059    assertEquals(0, this.store.getStorefilesCount());
1060
1061    // add some data, flush
1062    this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
1063    flush(1);
1064    assertEquals(1, this.store.getStorefilesCount());
1065
1066    // add one more file
1067    addStoreFile();
1068
1069    assertEquals(1, this.store.getStorefilesCount());
1070    store.refreshStoreFiles();
1071    assertEquals(2, this.store.getStorefilesCount());
1072
1073    // add three more files
1074    addStoreFile();
1075    addStoreFile();
1076    addStoreFile();
1077
1078    assertEquals(2, this.store.getStorefilesCount());
1079    store.refreshStoreFiles();
1080    assertEquals(5, this.store.getStorefilesCount());
1081
1082    closeCompactedFile(0);
1083    archiveStoreFile(0);
1084
1085    assertEquals(5, this.store.getStorefilesCount());
1086    store.refreshStoreFiles();
1087    assertEquals(4, this.store.getStorefilesCount());
1088
1089    archiveStoreFile(0);
1090    archiveStoreFile(1);
1091    archiveStoreFile(2);
1092
1093    assertEquals(4, this.store.getStorefilesCount());
1094    store.refreshStoreFiles();
1095    assertEquals(1, this.store.getStorefilesCount());
1096
1097    archiveStoreFile(0);
1098    store.refreshStoreFiles();
1099    assertEquals(0, this.store.getStorefilesCount());
1100  }
1101
1102  @Test
1103  public void testRefreshStoreFilesNotChanged() throws IOException {
1104    init(name.getMethodName());
1105
1106    assertEquals(0, this.store.getStorefilesCount());
1107
1108    // add some data, flush
1109    this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
1110    flush(1);
1111    // add one more file
1112    addStoreFile();
1113
1114    StoreEngine<?, ?, ?, ?> spiedStoreEngine = spy(store.getStoreEngine());
1115
1116    // call first time after files changed
1117    spiedStoreEngine.refreshStoreFiles();
1118    assertEquals(2, this.store.getStorefilesCount());
1119    verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any(), any(), any());
1120
1121    // call second time
1122    spiedStoreEngine.refreshStoreFiles();
1123
1124    // ensure that replaceStoreFiles is not called, i.e, the times does not change, if files are not
1125    // refreshed,
1126    verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any(), any(), any());
1127  }
1128
1129  @Test
1130  public void testScanWithCompactionAfterFlush() throws Exception {
1131    TEST_UTIL.getConfiguration().set(DEFAULT_COMPACTION_POLICY_CLASS_KEY,
1132      EverythingPolicy.class.getName());
1133    init(name.getMethodName());
1134
1135    assertEquals(0, this.store.getStorefilesCount());
1136
1137    KeyValue kv = new KeyValue(row, family, qf1, 1, (byte[]) null);
1138    // add some data, flush
1139    this.store.add(kv, null);
1140    flush(1);
1141    kv = new KeyValue(row, family, qf2, 1, (byte[]) null);
1142    // add some data, flush
1143    this.store.add(kv, null);
1144    flush(2);
1145    kv = new KeyValue(row, family, qf3, 1, (byte[]) null);
1146    // add some data, flush
1147    this.store.add(kv, null);
1148    flush(3);
1149
1150    ExecutorService service = Executors.newFixedThreadPool(2);
1151
1152    Scan scan = new Scan(new Get(row));
1153    Future<KeyValueScanner> scanFuture = service.submit(() -> {
1154      try {
1155        LOG.info(">>>> creating scanner");
1156        return this.store.createScanner(scan,
1157          new ScanInfo(HBaseConfiguration.create(),
1158            ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(4).build(),
1159            Long.MAX_VALUE, 0, CellComparator.getInstance()),
1160          scan.getFamilyMap().get(store.getColumnFamilyDescriptor().getName()), 0);
1161      } catch (IOException e) {
1162        e.printStackTrace();
1163        return null;
1164      }
1165    });
1166    Future compactFuture = service.submit(() -> {
1167      try {
1168        LOG.info(">>>>>> starting compaction");
1169        Optional<CompactionContext> opCompaction = this.store.requestCompaction();
1170        assertTrue(opCompaction.isPresent());
1171        store.compact(opCompaction.get(), new NoLimitThroughputController(), User.getCurrent());
1172        LOG.info(">>>>>> Compaction is finished");
1173        this.store.closeAndArchiveCompactedFiles();
1174        LOG.info(">>>>>> Compacted files deleted");
1175      } catch (IOException e) {
1176        e.printStackTrace();
1177      }
1178    });
1179
1180    KeyValueScanner kvs = scanFuture.get();
1181    compactFuture.get();
1182    ((StoreScanner) kvs).currentScanners.forEach(s -> {
1183      if (s instanceof StoreFileScanner) {
1184        assertEquals(1, ((StoreFileScanner) s).getReader().getRefCount());
1185      }
1186    });
1187    kvs.seek(kv);
1188    service.shutdownNow();
1189  }
1190
1191  private long countMemStoreScanner(StoreScanner scanner) {
1192    if (scanner.currentScanners == null) {
1193      return 0;
1194    }
1195    return scanner.currentScanners.stream().filter(s -> !s.isFileScanner()).count();
1196  }
1197
1198  @Test
1199  public void testNumberOfMemStoreScannersAfterFlush() throws IOException {
1200    long seqId = 100;
1201    long timestamp = EnvironmentEdgeManager.currentTime();
1202    ExtendedCell cell0 =
1203      ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family)
1204        .setQualifier(qf1).setTimestamp(timestamp).setType(Cell.Type.Put).setValue(qf1).build();
1205    PrivateCellUtil.setSequenceId(cell0, seqId);
1206    testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Collections.emptyList());
1207
1208    ExtendedCell cell1 =
1209      ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family)
1210        .setQualifier(qf2).setTimestamp(timestamp).setType(Cell.Type.Put).setValue(qf1).build();
1211    PrivateCellUtil.setSequenceId(cell1, seqId);
1212    testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1));
1213
1214    seqId = 101;
1215    timestamp = EnvironmentEdgeManager.currentTime();
1216    ExtendedCell cell2 =
1217      ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row2).setFamily(family)
1218        .setQualifier(qf2).setTimestamp(timestamp).setType(Cell.Type.Put).setValue(qf1).build();
1219    PrivateCellUtil.setSequenceId(cell2, seqId);
1220    testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1, cell2));
1221  }
1222
1223  private void testNumberOfMemStoreScannersAfterFlush(List<ExtendedCell> inputCellsBeforeSnapshot,
1224    List<ExtendedCell> inputCellsAfterSnapshot) throws IOException {
1225    init(this.name.getMethodName() + "-" + inputCellsBeforeSnapshot.size());
1226    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1227    long seqId = Long.MIN_VALUE;
1228    for (ExtendedCell c : inputCellsBeforeSnapshot) {
1229      quals.add(CellUtil.cloneQualifier(c));
1230      seqId = Math.max(seqId, c.getSequenceId());
1231    }
1232    for (ExtendedCell c : inputCellsAfterSnapshot) {
1233      quals.add(CellUtil.cloneQualifier(c));
1234      seqId = Math.max(seqId, c.getSequenceId());
1235    }
1236    inputCellsBeforeSnapshot.forEach(c -> store.add(c, null));
1237    StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY);
1238    storeFlushCtx.prepare();
1239    inputCellsAfterSnapshot.forEach(c -> store.add(c, null));
1240    int numberOfMemScannersBeforeFlush = inputCellsAfterSnapshot.isEmpty() ? 1 : 2;
1241    try (StoreScanner s = (StoreScanner) store.getScanner(new Scan(), quals, seqId)) {
1242      // snapshot + active (if inputCellsAfterSnapshot isn't empty)
1243      assertEquals(numberOfMemScannersBeforeFlush, countMemStoreScanner(s));
1244      storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1245      storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1246      // snapshot has no data after flush
1247      int numberOfMemScannersAfterFlush = inputCellsAfterSnapshot.isEmpty() ? 0 : 1;
1248      boolean more;
1249      int cellCount = 0;
1250      do {
1251        List<Cell> cells = new ArrayList<>();
1252        more = s.next(cells);
1253        cellCount += cells.size();
1254        assertEquals(more ? numberOfMemScannersAfterFlush : 0, countMemStoreScanner(s));
1255      } while (more);
1256      assertEquals(
1257        "The number of cells added before snapshot is " + inputCellsBeforeSnapshot.size()
1258          + ", The number of cells added after snapshot is " + inputCellsAfterSnapshot.size(),
1259        inputCellsBeforeSnapshot.size() + inputCellsAfterSnapshot.size(), cellCount);
1260      // the current scanners is cleared
1261      assertEquals(0, countMemStoreScanner(s));
1262    }
1263  }
1264
1265  private ExtendedCell createCell(byte[] qualifier, long ts, long sequenceId, byte[] value)
1266    throws IOException {
1267    return createCell(row, qualifier, ts, sequenceId, value);
1268  }
1269
1270  private ExtendedCell createCell(byte[] row, byte[] qualifier, long ts, long sequenceId,
1271    byte[] value) throws IOException {
1272    return ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row)
1273      .setFamily(family).setQualifier(qualifier).setTimestamp(ts).setType(Cell.Type.Put)
1274      .setValue(value).setSequenceId(sequenceId).build();
1275  }
1276
1277  @Test
1278  public void testFlushBeforeCompletingScanWoFilter() throws IOException, InterruptedException {
1279    final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false);
1280    final int expectedSize = 3;
1281    testFlushBeforeCompletingScan(new MyListHook() {
1282      @Override
1283      public void hook(int currentSize) {
1284        if (currentSize == expectedSize - 1) {
1285          try {
1286            flushStore(store, id++);
1287            timeToGoNextRow.set(true);
1288          } catch (IOException e) {
1289            throw new RuntimeException(e);
1290          }
1291        }
1292      }
1293    }, new FilterBase() {
1294      @Override
1295      public Filter.ReturnCode filterCell(final Cell c) throws IOException {
1296        return ReturnCode.INCLUDE;
1297      }
1298    }, expectedSize);
1299  }
1300
1301  @Test
1302  public void testFlushBeforeCompletingScanWithFilter() throws IOException, InterruptedException {
1303    final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false);
1304    final int expectedSize = 2;
1305    testFlushBeforeCompletingScan(new MyListHook() {
1306      @Override
1307      public void hook(int currentSize) {
1308        if (currentSize == expectedSize - 1) {
1309          try {
1310            flushStore(store, id++);
1311            timeToGoNextRow.set(true);
1312          } catch (IOException e) {
1313            throw new RuntimeException(e);
1314          }
1315        }
1316      }
1317    }, new FilterBase() {
1318      @Override
1319      public Filter.ReturnCode filterCell(final Cell c) throws IOException {
1320        if (timeToGoNextRow.get()) {
1321          timeToGoNextRow.set(false);
1322          return ReturnCode.NEXT_ROW;
1323        } else {
1324          return ReturnCode.INCLUDE;
1325        }
1326      }
1327    }, expectedSize);
1328  }
1329
1330  @Test
1331  public void testFlushBeforeCompletingScanWithFilterHint()
1332    throws IOException, InterruptedException {
1333    final AtomicBoolean timeToGetHint = new AtomicBoolean(false);
1334    final int expectedSize = 2;
1335    testFlushBeforeCompletingScan(new MyListHook() {
1336      @Override
1337      public void hook(int currentSize) {
1338        if (currentSize == expectedSize - 1) {
1339          try {
1340            flushStore(store, id++);
1341            timeToGetHint.set(true);
1342          } catch (IOException e) {
1343            throw new RuntimeException(e);
1344          }
1345        }
1346      }
1347    }, new FilterBase() {
1348      @Override
1349      public Filter.ReturnCode filterCell(final Cell c) throws IOException {
1350        if (timeToGetHint.get()) {
1351          timeToGetHint.set(false);
1352          return Filter.ReturnCode.SEEK_NEXT_USING_HINT;
1353        } else {
1354          return Filter.ReturnCode.INCLUDE;
1355        }
1356      }
1357
1358      @Override
1359      public Cell getNextCellHint(Cell currentCell) throws IOException {
1360        return currentCell;
1361      }
1362    }, expectedSize);
1363  }
1364
1365  private void testFlushBeforeCompletingScan(MyListHook hook, Filter filter, int expectedSize)
1366    throws IOException, InterruptedException {
1367    Configuration conf = HBaseConfiguration.create();
1368    byte[] r0 = Bytes.toBytes("row0");
1369    byte[] r1 = Bytes.toBytes("row1");
1370    byte[] r2 = Bytes.toBytes("row2");
1371    byte[] value0 = Bytes.toBytes("value0");
1372    byte[] value1 = Bytes.toBytes("value1");
1373    byte[] value2 = Bytes.toBytes("value2");
1374    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1375    long ts = EnvironmentEdgeManager.currentTime();
1376    long seqId = 100;
1377    init(name.getMethodName(), conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)),
1378      ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(1).build(),
1379      new MyStoreHook() {
1380        @Override
1381        public long getSmallestReadPoint(HStore store) {
1382          return seqId + 3;
1383        }
1384      });
1385    // The cells having the value0 won't be flushed to disk because the value of max version is 1
1386    store.add(createCell(r0, qf1, ts, seqId, value0), memStoreSizing);
1387    store.add(createCell(r0, qf2, ts, seqId, value0), memStoreSizing);
1388    store.add(createCell(r0, qf3, ts, seqId, value0), memStoreSizing);
1389    store.add(createCell(r1, qf1, ts + 1, seqId + 1, value1), memStoreSizing);
1390    store.add(createCell(r1, qf2, ts + 1, seqId + 1, value1), memStoreSizing);
1391    store.add(createCell(r1, qf3, ts + 1, seqId + 1, value1), memStoreSizing);
1392    store.add(createCell(r2, qf1, ts + 2, seqId + 2, value2), memStoreSizing);
1393    store.add(createCell(r2, qf2, ts + 2, seqId + 2, value2), memStoreSizing);
1394    store.add(createCell(r2, qf3, ts + 2, seqId + 2, value2), memStoreSizing);
1395    store.add(createCell(r1, qf1, ts + 3, seqId + 3, value1), memStoreSizing);
1396    store.add(createCell(r1, qf2, ts + 3, seqId + 3, value1), memStoreSizing);
1397    store.add(createCell(r1, qf3, ts + 3, seqId + 3, value1), memStoreSizing);
1398    List<Cell> myList = new MyList<>(hook);
1399    Scan scan = new Scan().withStartRow(r1).setFilter(filter);
1400    try (InternalScanner scanner = (InternalScanner) store.getScanner(scan, null, seqId + 3)) {
1401      // r1
1402      scanner.next(myList);
1403      assertEquals(expectedSize, myList.size());
1404      for (Cell c : myList) {
1405        byte[] actualValue = CellUtil.cloneValue(c);
1406        assertTrue("expected:" + Bytes.toStringBinary(value1) + ", actual:"
1407          + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value1));
1408      }
1409      List<Cell> normalList = new ArrayList<>(3);
1410      // r2
1411      scanner.next(normalList);
1412      assertEquals(3, normalList.size());
1413      for (Cell c : normalList) {
1414        byte[] actualValue = CellUtil.cloneValue(c);
1415        assertTrue("expected:" + Bytes.toStringBinary(value2) + ", actual:"
1416          + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value2));
1417      }
1418    }
1419  }
1420
1421  @Test
1422  public void testCreateScannerAndSnapshotConcurrently() throws IOException, InterruptedException {
1423    Configuration conf = HBaseConfiguration.create();
1424    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore.class.getName());
1425    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
1426      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
1427    byte[] value = Bytes.toBytes("value");
1428    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1429    long ts = EnvironmentEdgeManager.currentTime();
1430    long seqId = 100;
1431    // older data whihc shouldn't be "seen" by client
1432    store.add(createCell(qf1, ts, seqId, value), memStoreSizing);
1433    store.add(createCell(qf2, ts, seqId, value), memStoreSizing);
1434    store.add(createCell(qf3, ts, seqId, value), memStoreSizing);
1435    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1436    quals.add(qf1);
1437    quals.add(qf2);
1438    quals.add(qf3);
1439    StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY);
1440    MyCompactingMemStore.START_TEST.set(true);
1441    Runnable flush = () -> {
1442      // this is blocked until we create first scanner from pipeline and snapshot -- phase (1/5)
1443      // recreate the active memstore -- phase (4/5)
1444      storeFlushCtx.prepare();
1445    };
1446    ExecutorService service = Executors.newSingleThreadExecutor();
1447    service.execute(flush);
1448    // we get scanner from pipeline and snapshot but they are empty. -- phase (2/5)
1449    // this is blocked until we recreate the active memstore -- phase (3/5)
1450    // we get scanner from active memstore but it is empty -- phase (5/5)
1451    InternalScanner scanner =
1452      (InternalScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1);
1453    service.shutdown();
1454    service.awaitTermination(20, TimeUnit.SECONDS);
1455    try {
1456      try {
1457        List<Cell> results = new ArrayList<>();
1458        scanner.next(results);
1459        assertEquals(3, results.size());
1460        for (Cell c : results) {
1461          byte[] actualValue = CellUtil.cloneValue(c);
1462          assertTrue("expected:" + Bytes.toStringBinary(value) + ", actual:"
1463            + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value));
1464        }
1465      } finally {
1466        scanner.close();
1467      }
1468    } finally {
1469      MyCompactingMemStore.START_TEST.set(false);
1470      storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1471      storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1472    }
1473  }
1474
1475  @Test
1476  public void testScanWithDoubleFlush() throws IOException {
1477    Configuration conf = HBaseConfiguration.create();
1478    // Initialize region
1479    MyStore myStore = initMyStore(name.getMethodName(), conf, new MyStoreHook() {
1480      @Override
1481      public void getScanners(MyStore store) throws IOException {
1482        final long tmpId = id++;
1483        ExecutorService s = Executors.newSingleThreadExecutor();
1484        s.execute(() -> {
1485          try {
1486            // flush the store before storescanner updates the scanners from store.
1487            // The current data will be flushed into files, and the memstore will
1488            // be clear.
1489            // -- phase (4/4)
1490            flushStore(store, tmpId);
1491          } catch (IOException ex) {
1492            throw new RuntimeException(ex);
1493          }
1494        });
1495        s.shutdown();
1496        try {
1497          // wait for the flush, the thread will be blocked in HStore#notifyChangedReadersObservers.
1498          s.awaitTermination(3, TimeUnit.SECONDS);
1499        } catch (InterruptedException ex) {
1500        }
1501      }
1502    });
1503    byte[] oldValue = Bytes.toBytes("oldValue");
1504    byte[] currentValue = Bytes.toBytes("currentValue");
1505    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1506    long ts = EnvironmentEdgeManager.currentTime();
1507    long seqId = 100;
1508    // older data whihc shouldn't be "seen" by client
1509    myStore.add(createCell(qf1, ts, seqId, oldValue), memStoreSizing);
1510    myStore.add(createCell(qf2, ts, seqId, oldValue), memStoreSizing);
1511    myStore.add(createCell(qf3, ts, seqId, oldValue), memStoreSizing);
1512    long snapshotId = id++;
1513    // push older data into snapshot -- phase (1/4)
1514    StoreFlushContext storeFlushCtx =
1515      store.createFlushContext(snapshotId, FlushLifeCycleTracker.DUMMY);
1516    storeFlushCtx.prepare();
1517
1518    // insert current data into active -- phase (2/4)
1519    myStore.add(createCell(qf1, ts + 1, seqId + 1, currentValue), memStoreSizing);
1520    myStore.add(createCell(qf2, ts + 1, seqId + 1, currentValue), memStoreSizing);
1521    myStore.add(createCell(qf3, ts + 1, seqId + 1, currentValue), memStoreSizing);
1522    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1523    quals.add(qf1);
1524    quals.add(qf2);
1525    quals.add(qf3);
1526    try (InternalScanner scanner =
1527      (InternalScanner) myStore.getScanner(new Scan(new Get(row)), quals, seqId + 1)) {
1528      // complete the flush -- phase (3/4)
1529      storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1530      storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1531
1532      List<Cell> results = new ArrayList<>();
1533      scanner.next(results);
1534      assertEquals(3, results.size());
1535      for (Cell c : results) {
1536        byte[] actualValue = CellUtil.cloneValue(c);
1537        assertTrue("expected:" + Bytes.toStringBinary(currentValue) + ", actual:"
1538          + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, currentValue));
1539      }
1540    }
1541  }
1542
1543  /**
1544   * This test is for HBASE-27519, when the {@link StoreScanner} is scanning,the Flush and the
1545   * Compaction execute concurrently and theCcompaction compact and archive the flushed
1546   * {@link HStoreFile} which is used by {@link StoreScanner#updateReaders}.Before
1547   * HBASE-27519,{@link StoreScanner.updateReaders} would throw {@link FileNotFoundException}.
1548   */
1549  @Test
1550  public void testStoreScannerUpdateReadersWhenFlushAndCompactConcurrently() throws IOException {
1551    Configuration conf = HBaseConfiguration.create();
1552    conf.setBoolean(WALFactory.WAL_ENABLED, false);
1553    conf.set(DEFAULT_COMPACTION_POLICY_CLASS_KEY, EverythingPolicy.class.getName());
1554    byte[] r0 = Bytes.toBytes("row0");
1555    byte[] r1 = Bytes.toBytes("row1");
1556    final CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
1557    final AtomicBoolean shouldWaitRef = new AtomicBoolean(false);
1558    // Initialize region
1559    final MyStore myStore = initMyStore(name.getMethodName(), conf, new MyStoreHook() {
1560      @Override
1561      public void getScanners(MyStore store) throws IOException {
1562        try {
1563          // Here this method is called by StoreScanner.updateReaders which is invoked by the
1564          // following TestHStore.flushStore
1565          if (shouldWaitRef.get()) {
1566            // wait the following compaction Task start
1567            cyclicBarrier.await();
1568            // wait the following HStore.closeAndArchiveCompactedFiles end.
1569            cyclicBarrier.await();
1570          }
1571        } catch (BrokenBarrierException | InterruptedException e) {
1572          throw new RuntimeException(e);
1573        }
1574      }
1575    });
1576
1577    final AtomicReference<Throwable> compactionExceptionRef = new AtomicReference<Throwable>(null);
1578    Runnable compactionTask = () -> {
1579      try {
1580        // Only when the StoreScanner.updateReaders invoked by TestHStore.flushStore prepares for
1581        // entering the MyStore.getScanners, compactionTask could start.
1582        cyclicBarrier.await();
1583        region.compactStore(family, new NoLimitThroughputController());
1584        myStore.closeAndArchiveCompactedFiles();
1585        // Notify StoreScanner.updateReaders could enter MyStore.getScanners.
1586        cyclicBarrier.await();
1587      } catch (Throwable e) {
1588        compactionExceptionRef.set(e);
1589      }
1590    };
1591
1592    long ts = EnvironmentEdgeManager.currentTime();
1593    long seqId = 100;
1594    byte[] value = Bytes.toBytes("value");
1595    // older data whihc shouldn't be "seen" by client
1596    myStore.add(createCell(r0, qf1, ts, seqId, value), null);
1597    flushStore(myStore, id++);
1598    myStore.add(createCell(r0, qf2, ts, seqId, value), null);
1599    flushStore(myStore, id++);
1600    myStore.add(createCell(r0, qf3, ts, seqId, value), null);
1601    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1602    quals.add(qf1);
1603    quals.add(qf2);
1604    quals.add(qf3);
1605
1606    myStore.add(createCell(r1, qf1, ts, seqId, value), null);
1607    myStore.add(createCell(r1, qf2, ts, seqId, value), null);
1608    myStore.add(createCell(r1, qf3, ts, seqId, value), null);
1609
1610    Thread.currentThread()
1611      .setName("testStoreScannerUpdateReadersWhenFlushAndCompactConcurrently thread");
1612    Scan scan = new Scan();
1613    scan.withStartRow(r0, true);
1614    try (InternalScanner scanner = (InternalScanner) myStore.getScanner(scan, quals, seqId)) {
1615      List<Cell> results = new MyList<>(size -> {
1616        switch (size) {
1617          case 1:
1618            shouldWaitRef.set(true);
1619            Thread thread = new Thread(compactionTask);
1620            thread.setName("MyCompacting Thread.");
1621            thread.start();
1622            try {
1623              flushStore(myStore, id++);
1624              thread.join();
1625            } catch (IOException | InterruptedException e) {
1626              throw new RuntimeException(e);
1627            }
1628            shouldWaitRef.set(false);
1629            break;
1630          default:
1631            break;
1632        }
1633      });
1634      // Before HBASE-27519, here would throw java.io.FileNotFoundException because the storeFile
1635      // which used by StoreScanner.updateReaders is deleted by compactionTask.
1636      scanner.next(results);
1637      // The results is r0 row cells.
1638      assertEquals(3, results.size());
1639      assertTrue(compactionExceptionRef.get() == null);
1640    }
1641  }
1642
1643  @Test
1644  public void testReclaimChunkWhenScaning() throws IOException {
1645    init("testReclaimChunkWhenScaning");
1646    long ts = EnvironmentEdgeManager.currentTime();
1647    long seqId = 100;
1648    byte[] value = Bytes.toBytes("value");
1649    // older data whihc shouldn't be "seen" by client
1650    store.add(createCell(qf1, ts, seqId, value), null);
1651    store.add(createCell(qf2, ts, seqId, value), null);
1652    store.add(createCell(qf3, ts, seqId, value), null);
1653    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1654    quals.add(qf1);
1655    quals.add(qf2);
1656    quals.add(qf3);
1657    try (InternalScanner scanner =
1658      (InternalScanner) store.getScanner(new Scan(new Get(row)), quals, seqId)) {
1659      List<Cell> results = new MyList<>(size -> {
1660        switch (size) {
1661          // 1) we get the first cell (qf1)
1662          // 2) flush the data to have StoreScanner update inner scanners
1663          // 3) the chunk will be reclaimed after updaing
1664          case 1:
1665            try {
1666              flushStore(store, id++);
1667            } catch (IOException e) {
1668              throw new RuntimeException(e);
1669            }
1670            break;
1671          // 1) we get the second cell (qf2)
1672          // 2) add some cell to fill some byte into the chunk (we have only one chunk)
1673          case 2:
1674            try {
1675              byte[] newValue = Bytes.toBytes("newValue");
1676              // older data whihc shouldn't be "seen" by client
1677              store.add(createCell(qf1, ts + 1, seqId + 1, newValue), null);
1678              store.add(createCell(qf2, ts + 1, seqId + 1, newValue), null);
1679              store.add(createCell(qf3, ts + 1, seqId + 1, newValue), null);
1680            } catch (IOException e) {
1681              throw new RuntimeException(e);
1682            }
1683            break;
1684          default:
1685            break;
1686        }
1687      });
1688      scanner.next(results);
1689      assertEquals(3, results.size());
1690      for (Cell c : results) {
1691        byte[] actualValue = CellUtil.cloneValue(c);
1692        assertTrue("expected:" + Bytes.toStringBinary(value) + ", actual:"
1693          + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value));
1694      }
1695    }
1696  }
1697
1698  /**
1699   * If there are two running InMemoryFlushRunnable, the later InMemoryFlushRunnable may change the
1700   * versionedList. And the first InMemoryFlushRunnable will use the chagned versionedList to remove
1701   * the corresponding segments. In short, there will be some segements which isn't in merge are
1702   * removed.
1703   */
1704  @Test
1705  public void testRunDoubleMemStoreCompactors() throws IOException, InterruptedException {
1706    int flushSize = 500;
1707    Configuration conf = HBaseConfiguration.create();
1708    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStoreWithCustomCompactor.class.getName());
1709    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.25);
1710    MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.set(0);
1711    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushSize));
1712    // Set the lower threshold to invoke the "MERGE" policy
1713    conf.set(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, String.valueOf(0));
1714    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
1715      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
1716    byte[] value = Bytes.toBytes("thisisavarylargevalue");
1717    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1718    long ts = EnvironmentEdgeManager.currentTime();
1719    long seqId = 100;
1720    // older data whihc shouldn't be "seen" by client
1721    store.add(createCell(qf1, ts, seqId, value), memStoreSizing);
1722    store.add(createCell(qf2, ts, seqId, value), memStoreSizing);
1723    store.add(createCell(qf3, ts, seqId, value), memStoreSizing);
1724    assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
1725    StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY);
1726    storeFlushCtx.prepare();
1727    // This shouldn't invoke another in-memory flush because the first compactor thread
1728    // hasn't accomplished the in-memory compaction.
1729    store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing);
1730    store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing);
1731    store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing);
1732    assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
1733    // okay. Let the compaction be completed
1734    MyMemStoreCompactor.START_COMPACTOR_LATCH.countDown();
1735    CompactingMemStore mem = (CompactingMemStore) ((HStore) store).memstore;
1736    while (mem.isMemStoreFlushingInMemory()) {
1737      TimeUnit.SECONDS.sleep(1);
1738    }
1739    // This should invoke another in-memory flush.
1740    store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing);
1741    store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing);
1742    store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing);
1743    assertEquals(2, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
1744    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
1745      String.valueOf(TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE));
1746    storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1747    storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1748  }
1749
1750  @Test
1751  public void testAge() throws IOException {
1752    long currentTime = EnvironmentEdgeManager.currentTime();
1753    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
1754    edge.setValue(currentTime);
1755    EnvironmentEdgeManager.injectEdge(edge);
1756    Configuration conf = TEST_UTIL.getConfiguration();
1757    ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(family);
1758    initHRegion(name.getMethodName(), conf,
1759      TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), hcd, null, false);
1760    HStore store = new HStore(region, hcd, conf, false) {
1761
1762      @Override
1763      protected StoreEngine<?, ?, ?, ?> createStoreEngine(HStore store, Configuration conf,
1764        CellComparator kvComparator) throws IOException {
1765        List<HStoreFile> storefiles =
1766          Arrays.asList(mockStoreFile(currentTime - 10), mockStoreFile(currentTime - 100),
1767            mockStoreFile(currentTime - 1000), mockStoreFile(currentTime - 10000));
1768        StoreFileManager sfm = mock(StoreFileManager.class);
1769        when(sfm.getStoreFiles()).thenReturn(storefiles);
1770        StoreEngine<?, ?, ?, ?> storeEngine = mock(StoreEngine.class);
1771        when(storeEngine.getStoreFileManager()).thenReturn(sfm);
1772        return storeEngine;
1773      }
1774    };
1775    assertEquals(10L, store.getMinStoreFileAge().getAsLong());
1776    assertEquals(10000L, store.getMaxStoreFileAge().getAsLong());
1777    assertEquals((10 + 100 + 1000 + 10000) / 4.0, store.getAvgStoreFileAge().getAsDouble(), 1E-4);
1778  }
1779
1780  private HStoreFile mockStoreFile(long createdTime) {
1781    StoreFileInfo info = mock(StoreFileInfo.class);
1782    when(info.getCreatedTimestamp()).thenReturn(createdTime);
1783    HStoreFile sf = mock(HStoreFile.class);
1784    when(sf.getReader()).thenReturn(mock(StoreFileReader.class));
1785    when(sf.isHFile()).thenReturn(true);
1786    when(sf.getFileInfo()).thenReturn(info);
1787    return sf;
1788  }
1789
1790  private MyStore initMyStore(String methodName, Configuration conf, MyStoreHook hook)
1791    throws IOException {
1792    return (MyStore) init(methodName, conf,
1793      TableDescriptorBuilder.newBuilder(TableName.valueOf(table)),
1794      ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(5).build(), hook);
1795  }
1796
1797  private static class MyStore extends HStore {
1798    private final MyStoreHook hook;
1799
1800    MyStore(final HRegion region, final ColumnFamilyDescriptor family,
1801      final Configuration confParam, MyStoreHook hook, boolean switchToPread) throws IOException {
1802      super(region, family, confParam, false);
1803      this.hook = hook;
1804    }
1805
1806    @Override
1807    public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks,
1808      boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,
1809      boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt,
1810      boolean includeMemstoreScanner, boolean onlyLatestVersion) throws IOException {
1811      hook.getScanners(this);
1812      return super.getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true,
1813        stopRow, false, readPt, includeMemstoreScanner, onlyLatestVersion);
1814    }
1815
1816    @Override
1817    public long getSmallestReadPoint() {
1818      return hook.getSmallestReadPoint(this);
1819    }
1820  }
1821
1822  private abstract static class MyStoreHook {
1823
1824    void getScanners(MyStore store) throws IOException {
1825    }
1826
1827    long getSmallestReadPoint(HStore store) {
1828      return store.getHRegion().getSmallestReadPoint();
1829    }
1830  }
1831
1832  @Test
1833  public void testSwitchingPreadtoStreamParallelyWithCompactionDischarger() throws Exception {
1834    Configuration conf = HBaseConfiguration.create();
1835    conf.set("hbase.hstore.engine.class", DummyStoreEngine.class.getName());
1836    conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 0);
1837    // Set the lower threshold to invoke the "MERGE" policy
1838    MyStore store = initMyStore(name.getMethodName(), conf, new MyStoreHook() {
1839    });
1840    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1841    long ts = EnvironmentEdgeManager.currentTime();
1842    long seqID = 1L;
1843    // Add some data to the region and do some flushes
1844    for (int i = 1; i < 10; i++) {
1845      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1846        memStoreSizing);
1847    }
1848    // flush them
1849    flushStore(store, seqID);
1850    for (int i = 11; i < 20; i++) {
1851      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1852        memStoreSizing);
1853    }
1854    // flush them
1855    flushStore(store, seqID);
1856    for (int i = 21; i < 30; i++) {
1857      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1858        memStoreSizing);
1859    }
1860    // flush them
1861    flushStore(store, seqID);
1862
1863    assertEquals(3, store.getStorefilesCount());
1864    Scan scan = new Scan();
1865    scan.addFamily(family);
1866    Collection<HStoreFile> storefiles2 = store.getStorefiles();
1867    ArrayList<HStoreFile> actualStorefiles = Lists.newArrayList(storefiles2);
1868    StoreScanner storeScanner =
1869      (StoreScanner) store.getScanner(scan, scan.getFamilyMap().get(family), Long.MAX_VALUE);
1870    // get the current heap
1871    KeyValueHeap heap = storeScanner.heap;
1872    // create more store files
1873    for (int i = 31; i < 40; i++) {
1874      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1875        memStoreSizing);
1876    }
1877    // flush them
1878    flushStore(store, seqID);
1879
1880    for (int i = 41; i < 50; i++) {
1881      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1882        memStoreSizing);
1883    }
1884    // flush them
1885    flushStore(store, seqID);
1886    storefiles2 = store.getStorefiles();
1887    ArrayList<HStoreFile> actualStorefiles1 = Lists.newArrayList(storefiles2);
1888    actualStorefiles1.removeAll(actualStorefiles);
1889    // Do compaction
1890    MyThread thread = new MyThread(storeScanner);
1891    thread.start();
1892    store.replaceStoreFiles(actualStorefiles, actualStorefiles1, false);
1893    thread.join();
1894    KeyValueHeap heap2 = thread.getHeap();
1895    assertFalse(heap.equals(heap2));
1896  }
1897
1898  @Test
1899  public void testMaxPreadBytesConfiguredToBeLessThanZero() throws Exception {
1900    Configuration conf = HBaseConfiguration.create();
1901    conf.set("hbase.hstore.engine.class", DummyStoreEngine.class.getName());
1902    // Set 'hbase.storescanner.pread.max.bytes' < 0, so that StoreScanner will be a STREAM type.
1903    conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, -1);
1904    MyStore store = initMyStore(name.getMethodName(), conf, new MyStoreHook() {
1905    });
1906    Scan scan = new Scan();
1907    scan.addFamily(family);
1908    // ReadType on Scan is still DEFAULT only.
1909    assertEquals(ReadType.DEFAULT, scan.getReadType());
1910    StoreScanner storeScanner =
1911      (StoreScanner) store.getScanner(scan, scan.getFamilyMap().get(family), Long.MAX_VALUE);
1912    assertFalse(storeScanner.isScanUsePread());
1913  }
1914
1915  @Test
1916  public void testSpaceQuotaChangeAfterReplacement() throws IOException {
1917    final TableName tn = TableName.valueOf(name.getMethodName());
1918    init(name.getMethodName());
1919
1920    RegionSizeStoreImpl sizeStore = new RegionSizeStoreImpl();
1921
1922    HStoreFile sf1 = mockStoreFileWithLength(1024L);
1923    HStoreFile sf2 = mockStoreFileWithLength(2048L);
1924    HStoreFile sf3 = mockStoreFileWithLength(4096L);
1925    HStoreFile sf4 = mockStoreFileWithLength(8192L);
1926
1927    RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tn).setStartKey(Bytes.toBytes("a"))
1928      .setEndKey(Bytes.toBytes("b")).build();
1929
1930    // Compacting two files down to one, reducing size
1931    sizeStore.put(regionInfo, 1024L + 4096L);
1932    store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo, Arrays.asList(sf1, sf3),
1933      Arrays.asList(sf2));
1934
1935    assertEquals(2048L, sizeStore.getRegionSize(regionInfo).getSize());
1936
1937    // The same file length in and out should have no change
1938    store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo, Arrays.asList(sf2),
1939      Arrays.asList(sf2));
1940
1941    assertEquals(2048L, sizeStore.getRegionSize(regionInfo).getSize());
1942
1943    // Increase the total size used
1944    store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo, Arrays.asList(sf2),
1945      Arrays.asList(sf3));
1946
1947    assertEquals(4096L, sizeStore.getRegionSize(regionInfo).getSize());
1948
1949    RegionInfo regionInfo2 = RegionInfoBuilder.newBuilder(tn).setStartKey(Bytes.toBytes("b"))
1950      .setEndKey(Bytes.toBytes("c")).build();
1951    store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo2, null, Arrays.asList(sf4));
1952
1953    assertEquals(8192L, sizeStore.getRegionSize(regionInfo2).getSize());
1954  }
1955
1956  @Test
1957  public void testHFileContextSetWithCFAndTable() throws Exception {
1958    init(this.name.getMethodName());
1959    StoreFileWriter writer = store.getStoreEngine()
1960      .createWriter(CreateStoreFileWriterParams.create().maxKeyCount(10000L)
1961        .compression(Compression.Algorithm.NONE).isCompaction(true).includeMVCCReadpoint(true)
1962        .includesTag(false).shouldDropBehind(true));
1963    HFileContext hFileContext = writer.getLiveFileWriter().getFileContext();
1964    assertArrayEquals(family, hFileContext.getColumnFamily());
1965    assertArrayEquals(table, hFileContext.getTableName());
1966  }
1967
1968  // This test is for HBASE-26026, HBase Write be stuck when active segment has no cell
1969  // but its dataSize exceeds inmemoryFlushSize
1970  @Test
1971  public void testCompactingMemStoreNoCellButDataSizeExceedsInmemoryFlushSize()
1972    throws IOException, InterruptedException {
1973    Configuration conf = HBaseConfiguration.create();
1974
1975    byte[] smallValue = new byte[3];
1976    byte[] largeValue = new byte[9];
1977    final long timestamp = EnvironmentEdgeManager.currentTime();
1978    final long seqId = 100;
1979    final ExtendedCell smallCell = createCell(qf1, timestamp, seqId, smallValue);
1980    final ExtendedCell largeCell = createCell(qf2, timestamp, seqId, largeValue);
1981    int smallCellByteSize = MutableSegment.getCellLength(smallCell);
1982    int largeCellByteSize = MutableSegment.getCellLength(largeCell);
1983    int flushByteSize = smallCellByteSize + largeCellByteSize - 2;
1984
1985    // set CompactingMemStore.inmemoryFlushSize to flushByteSize.
1986    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore2.class.getName());
1987    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
1988    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
1989
1990    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
1991      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
1992
1993    MyCompactingMemStore2 myCompactingMemStore = ((MyCompactingMemStore2) store.memstore);
1994    assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
1995    myCompactingMemStore.smallCellPreUpdateCounter.set(0);
1996    myCompactingMemStore.largeCellPreUpdateCounter.set(0);
1997
1998    final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
1999    Thread smallCellThread = new Thread(() -> {
2000      try {
2001        store.add(smallCell, new NonThreadSafeMemStoreSizing());
2002      } catch (Throwable exception) {
2003        exceptionRef.set(exception);
2004      }
2005    });
2006    smallCellThread.setName(MyCompactingMemStore2.SMALL_CELL_THREAD_NAME);
2007    smallCellThread.start();
2008
2009    String oldThreadName = Thread.currentThread().getName();
2010    try {
2011      /**
2012       * 1.smallCellThread enters CompactingMemStore.checkAndAddToActiveSize first, then
2013       * largeCellThread enters CompactingMemStore.checkAndAddToActiveSize, and then largeCellThread
2014       * invokes flushInMemory.
2015       * <p/>
2016       * 2. After largeCellThread finished CompactingMemStore.flushInMemory method, smallCellThread
2017       * can add cell to currentActive . That is to say when largeCellThread called flushInMemory
2018       * method, CompactingMemStore.active has no cell.
2019       */
2020      Thread.currentThread().setName(MyCompactingMemStore2.LARGE_CELL_THREAD_NAME);
2021      store.add(largeCell, new NonThreadSafeMemStoreSizing());
2022      smallCellThread.join();
2023
2024      for (int i = 0; i < 100; i++) {
2025        long currentTimestamp = timestamp + 100 + i;
2026        ExtendedCell cell = createCell(qf2, currentTimestamp, seqId, largeValue);
2027        store.add(cell, new NonThreadSafeMemStoreSizing());
2028      }
2029    } finally {
2030      Thread.currentThread().setName(oldThreadName);
2031    }
2032
2033    assertTrue(exceptionRef.get() == null);
2034
2035  }
2036
2037  // This test is for HBASE-26210, HBase Write be stuck when there is cell which size exceeds
2038  // InmemoryFlushSize
2039  @Test(timeout = 60000)
2040  public void testCompactingMemStoreCellExceedInmemoryFlushSize() throws Exception {
2041    Configuration conf = HBaseConfiguration.create();
2042    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore6.class.getName());
2043
2044    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
2045      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
2046
2047    MyCompactingMemStore6 myCompactingMemStore = ((MyCompactingMemStore6) store.memstore);
2048
2049    int size = (int) (myCompactingMemStore.getInmemoryFlushSize());
2050    byte[] value = new byte[size + 1];
2051
2052    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
2053    long timestamp = EnvironmentEdgeManager.currentTime();
2054    long seqId = 100;
2055    ExtendedCell cell = createCell(qf1, timestamp, seqId, value);
2056    int cellByteSize = MutableSegment.getCellLength(cell);
2057    store.add(cell, memStoreSizing);
2058    assertTrue(memStoreSizing.getCellsCount() == 1);
2059    assertTrue(memStoreSizing.getDataSize() == cellByteSize);
2060    // Waiting the in memory compaction completed, see HBASE-26438
2061    myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
2062  }
2063
2064  /**
2065   * This test is for HBASE-27464, before this JIRA,when init {@link CellChunkImmutableSegment} for
2066   * 'COMPACT' action, we not force copy to current MSLab. When cell size bigger than
2067   * {@link MemStoreLABImpl#maxAlloc}, cell will stay in previous chunk which will recycle after
2068   * segment replace, and we may read wrong data when these chunk reused by others.
2069   */
2070  @Test
2071  public void testForceCloneOfBigCellForCellChunkImmutableSegment() throws Exception {
2072    Configuration conf = HBaseConfiguration.create();
2073    int maxAllocByteSize = conf.getInt(MemStoreLAB.MAX_ALLOC_KEY, MemStoreLAB.MAX_ALLOC_DEFAULT);
2074
2075    // Construct big cell,which is large than {@link MemStoreLABImpl#maxAlloc}.
2076    byte[] cellValue = new byte[maxAllocByteSize + 1];
2077    final long timestamp = EnvironmentEdgeManager.currentTime();
2078    final long seqId = 100;
2079    final byte[] rowKey1 = Bytes.toBytes("rowKey1");
2080    final ExtendedCell originalCell1 = createCell(rowKey1, qf1, timestamp, seqId, cellValue);
2081    final byte[] rowKey2 = Bytes.toBytes("rowKey2");
2082    final ExtendedCell originalCell2 = createCell(rowKey2, qf1, timestamp, seqId, cellValue);
2083    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
2084    quals.add(qf1);
2085
2086    int cellByteSize = MutableSegment.getCellLength(originalCell1);
2087    int inMemoryFlushByteSize = cellByteSize - 1;
2088
2089    // set CompactingMemStore.inmemoryFlushSize to flushByteSize.
2090    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore6.class.getName());
2091    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
2092    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(inMemoryFlushByteSize * 200));
2093    conf.setBoolean(WALFactory.WAL_ENABLED, false);
2094
2095    // Use {@link MemoryCompactionPolicy#EAGER} for always compacting.
2096    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
2097      .setInMemoryCompaction(MemoryCompactionPolicy.EAGER).build());
2098
2099    MyCompactingMemStore6 myCompactingMemStore = ((MyCompactingMemStore6) store.memstore);
2100    assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == inMemoryFlushByteSize);
2101
2102    // Data chunk Pool is disabled.
2103    assertTrue(ChunkCreator.getInstance().getMaxCount(ChunkType.DATA_CHUNK) == 0);
2104
2105    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
2106
2107    // First compact
2108    store.add(originalCell1, memStoreSizing);
2109    // Waiting for the first in-memory compaction finished
2110    myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
2111
2112    StoreScanner storeScanner =
2113      (StoreScanner) store.getScanner(new Scan(new Get(rowKey1)), quals, seqId + 1);
2114    SegmentScanner segmentScanner = getTypeKeyValueScanner(storeScanner, SegmentScanner.class);
2115    ExtendedCell resultCell1 = segmentScanner.next();
2116    assertTrue(PrivateCellUtil.equals(resultCell1, originalCell1));
2117    int cell1ChunkId = resultCell1.getChunkId();
2118    assertTrue(cell1ChunkId != ExtendedCell.CELL_NOT_BASED_ON_CHUNK);
2119    assertNull(segmentScanner.next());
2120    segmentScanner.close();
2121    storeScanner.close();
2122    Segment segment = segmentScanner.segment;
2123    assertTrue(segment instanceof CellChunkImmutableSegment);
2124    MemStoreLABImpl memStoreLAB1 = (MemStoreLABImpl) (segmentScanner.segment.getMemStoreLAB());
2125    assertTrue(!memStoreLAB1.isClosed());
2126    assertTrue(!memStoreLAB1.chunks.isEmpty());
2127    assertTrue(!memStoreLAB1.isReclaimed());
2128
2129    // Second compact
2130    store.add(originalCell2, memStoreSizing);
2131    // Waiting for the second in-memory compaction finished
2132    myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
2133
2134    // Before HBASE-27464, here may throw java.lang.IllegalArgumentException: In CellChunkMap, cell
2135    // must be associated with chunk.. We were looking for a cell at index 0.
2136    // The cause for this exception is because the data chunk Pool is disabled,when the data chunks
2137    // are recycled after the second in-memory compaction finished,the
2138    // {@link ChunkCreator.putbackChunks} method does not put the chunks back to the data chunk
2139    // pool,it just removes them from {@link ChunkCreator#chunkIdMap},so in
2140    // {@link CellChunkMap#getCell} we could not get the data chunk by chunkId.
2141    storeScanner = (StoreScanner) store.getScanner(new Scan(new Get(rowKey1)), quals, seqId + 1);
2142    segmentScanner = getTypeKeyValueScanner(storeScanner, SegmentScanner.class);
2143    ExtendedCell newResultCell1 = segmentScanner.next();
2144    assertTrue(newResultCell1 != resultCell1);
2145    assertTrue(PrivateCellUtil.equals(newResultCell1, originalCell1));
2146
2147    ExtendedCell resultCell2 = segmentScanner.next();
2148    assertTrue(PrivateCellUtil.equals(resultCell2, originalCell2));
2149    assertNull(segmentScanner.next());
2150    segmentScanner.close();
2151    storeScanner.close();
2152
2153    segment = segmentScanner.segment;
2154    assertTrue(segment instanceof CellChunkImmutableSegment);
2155    MemStoreLABImpl memStoreLAB2 = (MemStoreLABImpl) (segmentScanner.segment.getMemStoreLAB());
2156    assertTrue(!memStoreLAB2.isClosed());
2157    assertTrue(!memStoreLAB2.chunks.isEmpty());
2158    assertTrue(!memStoreLAB2.isReclaimed());
2159    assertTrue(memStoreLAB1.isClosed());
2160    assertTrue(memStoreLAB1.chunks.isEmpty());
2161    assertTrue(memStoreLAB1.isReclaimed());
2162  }
2163
2164  // This test is for HBASE-26210 also, test write large cell and small cell concurrently when
2165  // InmemoryFlushSize is smaller,equal with and larger than cell size.
2166  @Test
2167  public void testCompactingMemStoreWriteLargeCellAndSmallCellConcurrently()
2168    throws IOException, InterruptedException {
2169    doWriteTestLargeCellAndSmallCellConcurrently(
2170      (smallCellByteSize, largeCellByteSize) -> largeCellByteSize - 1);
2171    doWriteTestLargeCellAndSmallCellConcurrently(
2172      (smallCellByteSize, largeCellByteSize) -> largeCellByteSize);
2173    doWriteTestLargeCellAndSmallCellConcurrently(
2174      (smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize - 1);
2175    doWriteTestLargeCellAndSmallCellConcurrently(
2176      (smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize);
2177    doWriteTestLargeCellAndSmallCellConcurrently(
2178      (smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize + 1);
2179  }
2180
2181  private void doWriteTestLargeCellAndSmallCellConcurrently(IntBinaryOperator getFlushByteSize)
2182    throws IOException, InterruptedException {
2183
2184    Configuration conf = HBaseConfiguration.create();
2185
2186    byte[] smallValue = new byte[3];
2187    byte[] largeValue = new byte[100];
2188    final long timestamp = EnvironmentEdgeManager.currentTime();
2189    final long seqId = 100;
2190    final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
2191    final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
2192    int smallCellByteSize = MutableSegment.getCellLength(smallCell);
2193    int largeCellByteSize = MutableSegment.getCellLength(largeCell);
2194    int flushByteSize = getFlushByteSize.applyAsInt(smallCellByteSize, largeCellByteSize);
2195    boolean flushByteSizeLessThanSmallAndLargeCellSize =
2196      flushByteSize < (smallCellByteSize + largeCellByteSize);
2197
2198    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore3.class.getName());
2199    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
2200    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
2201
2202    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
2203      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
2204
2205    MyCompactingMemStore3 myCompactingMemStore = ((MyCompactingMemStore3) store.memstore);
2206    assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
2207    myCompactingMemStore.disableCompaction();
2208    if (flushByteSizeLessThanSmallAndLargeCellSize) {
2209      myCompactingMemStore.flushByteSizeLessThanSmallAndLargeCellSize = true;
2210    } else {
2211      myCompactingMemStore.flushByteSizeLessThanSmallAndLargeCellSize = false;
2212    }
2213
2214    final ThreadSafeMemStoreSizing memStoreSizing = new ThreadSafeMemStoreSizing();
2215    final AtomicLong totalCellByteSize = new AtomicLong(0);
2216    final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
2217    Thread smallCellThread = new Thread(() -> {
2218      try {
2219        for (int i = 1; i <= MyCompactingMemStore3.CELL_COUNT; i++) {
2220          long currentTimestamp = timestamp + i;
2221          ExtendedCell cell = createCell(qf1, currentTimestamp, seqId, smallValue);
2222          totalCellByteSize.addAndGet(MutableSegment.getCellLength(cell));
2223          store.add(cell, memStoreSizing);
2224        }
2225      } catch (Throwable exception) {
2226        exceptionRef.set(exception);
2227
2228      }
2229    });
2230    smallCellThread.setName(MyCompactingMemStore3.SMALL_CELL_THREAD_NAME);
2231    smallCellThread.start();
2232
2233    String oldThreadName = Thread.currentThread().getName();
2234    try {
2235      /**
2236       * When flushByteSizeLessThanSmallAndLargeCellSize is true:
2237       * </p>
2238       * 1.smallCellThread enters MyCompactingMemStore3.checkAndAddToActiveSize first, then
2239       * largeCellThread enters MyCompactingMemStore3.checkAndAddToActiveSize, and then
2240       * largeCellThread invokes flushInMemory.
2241       * <p/>
2242       * 2. After largeCellThread finished CompactingMemStore.flushInMemory method, smallCellThread
2243       * can run into MyCompactingMemStore3.checkAndAddToActiveSize again.
2244       * <p/>
2245       * When flushByteSizeLessThanSmallAndLargeCellSize is false: smallCellThread and
2246       * largeCellThread concurrently write one cell and wait each other, and then write another
2247       * cell etc.
2248       */
2249      Thread.currentThread().setName(MyCompactingMemStore3.LARGE_CELL_THREAD_NAME);
2250      for (int i = 1; i <= MyCompactingMemStore3.CELL_COUNT; i++) {
2251        long currentTimestamp = timestamp + i;
2252        ExtendedCell cell = createCell(qf2, currentTimestamp, seqId, largeValue);
2253        totalCellByteSize.addAndGet(MutableSegment.getCellLength(cell));
2254        store.add(cell, memStoreSizing);
2255      }
2256      smallCellThread.join();
2257
2258      assertTrue(exceptionRef.get() == null);
2259      assertTrue(memStoreSizing.getCellsCount() == (MyCompactingMemStore3.CELL_COUNT * 2));
2260      assertTrue(memStoreSizing.getDataSize() == totalCellByteSize.get());
2261      if (flushByteSizeLessThanSmallAndLargeCellSize) {
2262        assertTrue(myCompactingMemStore.flushCounter.get() == MyCompactingMemStore3.CELL_COUNT);
2263      } else {
2264        assertTrue(
2265          myCompactingMemStore.flushCounter.get() <= (MyCompactingMemStore3.CELL_COUNT - 1));
2266      }
2267    } finally {
2268      Thread.currentThread().setName(oldThreadName);
2269    }
2270  }
2271
2272  /**
2273   * <pre>
2274   * This test is for HBASE-26384,
2275   * test {@link CompactingMemStore#flattenOneSegment} and {@link CompactingMemStore#snapshot()}
2276   * execute concurrently.
2277   * The threads sequence before HBASE-26384 is(The bug only exists for branch-2,and I add UTs
2278   * for both branch-2 and master):
2279   * 1. The {@link CompactingMemStore} size exceeds
2280   *    {@link CompactingMemStore#getInmemoryFlushSize()},the write thread adds a new
2281   *    {@link ImmutableSegment}  to the head of {@link CompactingMemStore#pipeline},and start a
2282   *    in memory compact thread to execute {@link CompactingMemStore#inMemoryCompaction}.
2283   * 2. The in memory compact thread starts and then stopping before
2284   *    {@link CompactingMemStore#flattenOneSegment}.
2285   * 3. The snapshot thread starts {@link CompactingMemStore#snapshot} concurrently,after the
2286   *    snapshot thread executing {@link CompactingMemStore#getImmutableSegments},the in memory
2287   *    compact thread continues.
2288   *    Assuming {@link VersionedSegmentsList#version} returned from
2289   *    {@link CompactingMemStore#getImmutableSegments} is v.
2290   * 4. The snapshot thread stopping before {@link CompactingMemStore#swapPipelineWithNull}.
2291   * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
2292   *    {@link CompactionPipeline#version} is still v.
2293   * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
2294   *    {@link CompactionPipeline#version} is v, {@link CompactingMemStore#swapPipelineWithNull}
2295   *    thinks it is successful and continue flushing,but the {@link ImmutableSegment} in
2296   *    {@link CompactionPipeline} has changed because
2297   *    {@link CompactingMemStore#flattenOneSegment},so the {@link ImmutableSegment} is not
2298   *    removed in fact and still remaining in {@link CompactionPipeline}.
2299   *
2300   * After HBASE-26384, the 5-6 step is changed to following, which is expected behavior:
2301   * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
2302   *    {@link CompactingMemStore#flattenOneSegment} change {@link CompactionPipeline#version} to
2303   *    v+1.
2304   * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
2305   *    {@link CompactionPipeline#version} is v+1, {@link CompactingMemStore#swapPipelineWithNull}
2306   *    failed and retry the while loop in {@link CompactingMemStore#pushPipelineToSnapshot} once
2307   *    again, because there is no concurrent {@link CompactingMemStore#inMemoryCompaction} now,
2308   *    {@link CompactingMemStore#swapPipelineWithNull} succeeds.
2309   * </pre>
2310   */
2311  @Test
2312  public void testFlattenAndSnapshotCompactingMemStoreConcurrently() throws Exception {
2313    Configuration conf = HBaseConfiguration.create();
2314
2315    byte[] smallValue = new byte[3];
2316    byte[] largeValue = new byte[9];
2317    final long timestamp = EnvironmentEdgeManager.currentTime();
2318    final long seqId = 100;
2319    final ExtendedCell smallCell = createCell(qf1, timestamp, seqId, smallValue);
2320    final ExtendedCell largeCell = createCell(qf2, timestamp, seqId, largeValue);
2321    int smallCellByteSize = MutableSegment.getCellLength(smallCell);
2322    int largeCellByteSize = MutableSegment.getCellLength(largeCell);
2323    int totalCellByteSize = (smallCellByteSize + largeCellByteSize);
2324    int flushByteSize = totalCellByteSize - 2;
2325
2326    // set CompactingMemStore.inmemoryFlushSize to flushByteSize.
2327    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore4.class.getName());
2328    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
2329    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
2330
2331    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
2332      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
2333
2334    MyCompactingMemStore4 myCompactingMemStore = ((MyCompactingMemStore4) store.memstore);
2335    assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
2336
2337    store.add(smallCell, new NonThreadSafeMemStoreSizing());
2338    store.add(largeCell, new NonThreadSafeMemStoreSizing());
2339
2340    String oldThreadName = Thread.currentThread().getName();
2341    try {
2342      Thread.currentThread().setName(MyCompactingMemStore4.TAKE_SNAPSHOT_THREAD_NAME);
2343      /**
2344       * {@link CompactingMemStore#snapshot} must wait the in memory compact thread enters
2345       * {@link CompactingMemStore#flattenOneSegment},because {@link CompactingMemStore#snapshot}
2346       * would invoke {@link CompactingMemStore#stopCompaction}.
2347       */
2348      myCompactingMemStore.snapShotStartCyclicCyclicBarrier.await();
2349
2350      MemStoreSnapshot memStoreSnapshot = myCompactingMemStore.snapshot();
2351      myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
2352
2353      assertTrue(memStoreSnapshot.getCellsCount() == 2);
2354      assertTrue(((int) (memStoreSnapshot.getDataSize())) == totalCellByteSize);
2355      VersionedSegmentsList segments = myCompactingMemStore.getImmutableSegments();
2356      assertTrue(segments.getNumOfSegments() == 0);
2357      assertTrue(segments.getNumOfCells() == 0);
2358      assertTrue(myCompactingMemStore.setInMemoryCompactionFlagCounter.get() == 1);
2359      assertTrue(myCompactingMemStore.swapPipelineWithNullCounter.get() == 2);
2360    } finally {
2361      Thread.currentThread().setName(oldThreadName);
2362    }
2363  }
2364
2365  /**
2366   * <pre>
2367   * This test is for HBASE-26384,
2368   * test {@link CompactingMemStore#flattenOneSegment}{@link CompactingMemStore#snapshot()}
2369   * and writeMemStore execute concurrently.
2370   * The threads sequence before HBASE-26384 is(The bug only exists for branch-2,and I add UTs
2371   * for both branch-2 and master):
2372   * 1. The {@link CompactingMemStore} size exceeds
2373   *    {@link CompactingMemStore#getInmemoryFlushSize()},the write thread adds a new
2374   *    {@link ImmutableSegment}  to the head of {@link CompactingMemStore#pipeline},and start a
2375   *    in memory compact thread to execute {@link CompactingMemStore#inMemoryCompaction}.
2376   * 2. The in memory compact thread starts and then stopping before
2377   *    {@link CompactingMemStore#flattenOneSegment}.
2378   * 3. The snapshot thread starts {@link CompactingMemStore#snapshot} concurrently,after the
2379   *    snapshot thread executing {@link CompactingMemStore#getImmutableSegments},the in memory
2380   *    compact thread continues.
2381   *    Assuming {@link VersionedSegmentsList#version} returned from
2382   *    {@link CompactingMemStore#getImmutableSegments} is v.
2383   * 4. The snapshot thread stopping before {@link CompactingMemStore#swapPipelineWithNull}.
2384   * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
2385   *    {@link CompactionPipeline#version} is still v.
2386   * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
2387   *    {@link CompactionPipeline#version} is v, {@link CompactingMemStore#swapPipelineWithNull}
2388   *    thinks it is successful and continue flushing,but the {@link ImmutableSegment} in
2389   *    {@link CompactionPipeline} has changed because
2390   *    {@link CompactingMemStore#flattenOneSegment},so the {@link ImmutableSegment} is not
2391   *    removed in fact and still remaining in {@link CompactionPipeline}.
2392   *
2393   * After HBASE-26384, the 5-6 step is changed to following, which is expected behavior,
2394   * and I add step 7-8 to test there is new segment added before retry.
2395   * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
2396   *    {@link CompactingMemStore#flattenOneSegment} change {@link CompactionPipeline#version} to
2397   *     v+1.
2398   * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
2399   *    {@link CompactionPipeline#version} is v+1, {@link CompactingMemStore#swapPipelineWithNull}
2400   *    failed and retry,{@link VersionedSegmentsList#version} returned from
2401   *    {@link CompactingMemStore#getImmutableSegments} is v+1.
2402   * 7. The write thread continues writing to {@link CompactingMemStore} and
2403   *    {@link CompactingMemStore} size exceeds {@link CompactingMemStore#getInmemoryFlushSize()},
2404   *    {@link CompactingMemStore#flushInMemory(MutableSegment)} is called and a new
2405   *    {@link ImmutableSegment} is added to the head of {@link CompactingMemStore#pipeline},
2406   *    {@link CompactionPipeline#version} is still v+1.
2407   * 8. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
2408   *    {@link CompactionPipeline#version} is still v+1,
2409   *    {@link CompactingMemStore#swapPipelineWithNull} succeeds.The new {@link ImmutableSegment}
2410   *    remained at the head of {@link CompactingMemStore#pipeline},the old is removed by
2411   *    {@link CompactingMemStore#swapPipelineWithNull}.
2412   * </pre>
2413   */
2414  @Test
2415  public void testFlattenSnapshotWriteCompactingMemeStoreConcurrently() throws Exception {
2416    Configuration conf = HBaseConfiguration.create();
2417
2418    byte[] smallValue = new byte[3];
2419    byte[] largeValue = new byte[9];
2420    final long timestamp = EnvironmentEdgeManager.currentTime();
2421    final long seqId = 100;
2422    final ExtendedCell smallCell = createCell(qf1, timestamp, seqId, smallValue);
2423    final ExtendedCell largeCell = createCell(qf2, timestamp, seqId, largeValue);
2424    int smallCellByteSize = MutableSegment.getCellLength(smallCell);
2425    int largeCellByteSize = MutableSegment.getCellLength(largeCell);
2426    int firstWriteCellByteSize = (smallCellByteSize + largeCellByteSize);
2427    int flushByteSize = firstWriteCellByteSize - 2;
2428
2429    // set CompactingMemStore.inmemoryFlushSize to flushByteSize.
2430    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore5.class.getName());
2431    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
2432    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
2433
2434    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
2435      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
2436
2437    final MyCompactingMemStore5 myCompactingMemStore = ((MyCompactingMemStore5) store.memstore);
2438    assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
2439
2440    store.add(smallCell, new NonThreadSafeMemStoreSizing());
2441    store.add(largeCell, new NonThreadSafeMemStoreSizing());
2442
2443    final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
2444    final ExtendedCell writeAgainCell1 = createCell(qf3, timestamp, seqId + 1, largeValue);
2445    final ExtendedCell writeAgainCell2 = createCell(qf4, timestamp, seqId + 1, largeValue);
2446    final int writeAgainCellByteSize =
2447      MutableSegment.getCellLength(writeAgainCell1) + MutableSegment.getCellLength(writeAgainCell2);
2448    final Thread writeAgainThread = new Thread(() -> {
2449      try {
2450        myCompactingMemStore.writeMemStoreAgainStartCyclicBarrier.await();
2451
2452        store.add(writeAgainCell1, new NonThreadSafeMemStoreSizing());
2453        store.add(writeAgainCell2, new NonThreadSafeMemStoreSizing());
2454
2455        myCompactingMemStore.writeMemStoreAgainEndCyclicBarrier.await();
2456      } catch (Throwable exception) {
2457        exceptionRef.set(exception);
2458      }
2459    });
2460    writeAgainThread.setName(MyCompactingMemStore5.WRITE_AGAIN_THREAD_NAME);
2461    writeAgainThread.start();
2462
2463    String oldThreadName = Thread.currentThread().getName();
2464    try {
2465      Thread.currentThread().setName(MyCompactingMemStore5.TAKE_SNAPSHOT_THREAD_NAME);
2466      /**
2467       * {@link CompactingMemStore#snapshot} must wait the in memory compact thread enters
2468       * {@link CompactingMemStore#flattenOneSegment},because {@link CompactingMemStore#snapshot}
2469       * would invoke {@link CompactingMemStore#stopCompaction}.
2470       */
2471      myCompactingMemStore.snapShotStartCyclicCyclicBarrier.await();
2472      MemStoreSnapshot memStoreSnapshot = myCompactingMemStore.snapshot();
2473      myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
2474      writeAgainThread.join();
2475
2476      assertTrue(memStoreSnapshot.getCellsCount() == 2);
2477      assertTrue(((int) (memStoreSnapshot.getDataSize())) == firstWriteCellByteSize);
2478      VersionedSegmentsList segments = myCompactingMemStore.getImmutableSegments();
2479      assertTrue(segments.getNumOfSegments() == 1);
2480      assertTrue(
2481        ((int) (segments.getStoreSegments().get(0).getDataSize())) == writeAgainCellByteSize);
2482      assertTrue(segments.getNumOfCells() == 2);
2483      assertTrue(myCompactingMemStore.setInMemoryCompactionFlagCounter.get() == 2);
2484      assertTrue(exceptionRef.get() == null);
2485      assertTrue(myCompactingMemStore.swapPipelineWithNullCounter.get() == 2);
2486    } finally {
2487      Thread.currentThread().setName(oldThreadName);
2488    }
2489  }
2490
2491  /**
2492   * <pre>
2493   * This test is for HBASE-26465,
2494   * test {@link DefaultMemStore#clearSnapshot} and {@link DefaultMemStore#getScanners} execute
2495   * concurrently. The threads sequence before HBASE-26465 is:
2496   * 1.The flush thread starts {@link DefaultMemStore} flushing after some cells have be added to
2497   *  {@link DefaultMemStore}.
2498   * 2.The flush thread stopping before {@link DefaultMemStore#clearSnapshot} in
2499   *   {@link HStore#updateStorefiles} after completed flushing memStore to hfile.
2500   * 3.The scan thread starts and stopping after {@link DefaultMemStore#getSnapshotSegments} in
2501   *   {@link DefaultMemStore#getScanners},here the scan thread gets the
2502   *   {@link DefaultMemStore#snapshot} which is created by the flush thread.
2503   * 4.The flush thread continues {@link DefaultMemStore#clearSnapshot} and close
2504   *   {@link DefaultMemStore#snapshot},because the reference count of the corresponding
2505   *   {@link MemStoreLABImpl} is 0, the {@link Chunk}s in corresponding {@link MemStoreLABImpl}
2506   *   are recycled.
2507   * 5.The scan thread continues {@link DefaultMemStore#getScanners},and create a
2508   *   {@link SegmentScanner} for this {@link DefaultMemStore#snapshot}, and increase the
2509   *   reference count of the corresponding {@link MemStoreLABImpl}, but {@link Chunk}s in
2510   *   corresponding {@link MemStoreLABImpl} are recycled by step 4, and these {@link Chunk}s may
2511   *   be overwritten by other write threads,which may cause serious problem.
2512   * After HBASE-26465,{@link DefaultMemStore#getScanners} and
2513   * {@link DefaultMemStore#clearSnapshot} could not execute concurrently.
2514   * </pre>
2515   */
2516  @Test
2517  public void testClearSnapshotGetScannerConcurrently() throws Exception {
2518    Configuration conf = HBaseConfiguration.create();
2519
2520    byte[] smallValue = new byte[3];
2521    byte[] largeValue = new byte[9];
2522    final long timestamp = EnvironmentEdgeManager.currentTime();
2523    final long seqId = 100;
2524    final ExtendedCell smallCell = createCell(qf1, timestamp, seqId, smallValue);
2525    final ExtendedCell largeCell = createCell(qf2, timestamp, seqId, largeValue);
2526    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
2527    quals.add(qf1);
2528    quals.add(qf2);
2529
2530    conf.set(HStore.MEMSTORE_CLASS_NAME, MyDefaultMemStore.class.getName());
2531    conf.setBoolean(WALFactory.WAL_ENABLED, false);
2532
2533    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build());
2534    MyDefaultMemStore myDefaultMemStore = (MyDefaultMemStore) (store.memstore);
2535    myDefaultMemStore.store = store;
2536
2537    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
2538    store.add(smallCell, memStoreSizing);
2539    store.add(largeCell, memStoreSizing);
2540
2541    final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
2542    final Thread flushThread = new Thread(() -> {
2543      try {
2544        flushStore(store, id++);
2545      } catch (Throwable exception) {
2546        exceptionRef.set(exception);
2547      }
2548    });
2549    flushThread.setName(MyDefaultMemStore.FLUSH_THREAD_NAME);
2550    flushThread.start();
2551
2552    String oldThreadName = Thread.currentThread().getName();
2553    StoreScanner storeScanner = null;
2554    try {
2555      Thread.currentThread().setName(MyDefaultMemStore.GET_SCANNER_THREAD_NAME);
2556
2557      /**
2558       * Wait flush thread stopping before {@link DefaultMemStore#doClearSnapshot}
2559       */
2560      myDefaultMemStore.getScannerCyclicBarrier.await();
2561
2562      storeScanner = (StoreScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1);
2563      flushThread.join();
2564
2565      if (myDefaultMemStore.shouldWait) {
2566        SegmentScanner segmentScanner = getTypeKeyValueScanner(storeScanner, SegmentScanner.class);
2567        MemStoreLABImpl memStoreLAB = (MemStoreLABImpl) (segmentScanner.segment.getMemStoreLAB());
2568        assertTrue(memStoreLAB.isClosed());
2569        assertTrue(!memStoreLAB.chunks.isEmpty());
2570        assertTrue(!memStoreLAB.isReclaimed());
2571
2572        ExtendedCell cell1 = segmentScanner.next();
2573        PrivateCellUtil.equals(smallCell, cell1);
2574        ExtendedCell cell2 = segmentScanner.next();
2575        PrivateCellUtil.equals(largeCell, cell2);
2576        assertNull(segmentScanner.next());
2577      } else {
2578        List<ExtendedCell> results = new ArrayList<>();
2579        storeScanner.next((List) results);
2580        assertEquals(2, results.size());
2581        PrivateCellUtil.equals(smallCell, results.get(0));
2582        PrivateCellUtil.equals(largeCell, results.get(1));
2583      }
2584      assertTrue(exceptionRef.get() == null);
2585    } finally {
2586      if (storeScanner != null) {
2587        storeScanner.close();
2588      }
2589      Thread.currentThread().setName(oldThreadName);
2590    }
2591  }
2592
2593  @SuppressWarnings("unchecked")
2594  private <T> T getTypeKeyValueScanner(StoreScanner storeScanner, Class<T> keyValueScannerClass) {
2595    List<T> resultScanners = new ArrayList<T>();
2596    for (KeyValueScanner keyValueScanner : storeScanner.currentScanners) {
2597      if (keyValueScannerClass.isInstance(keyValueScanner)) {
2598        resultScanners.add((T) keyValueScanner);
2599      }
2600    }
2601    assertTrue(resultScanners.size() == 1);
2602    return resultScanners.get(0);
2603  }
2604
2605  @Test
2606  public void testOnConfigurationChange() throws IOException {
2607    final int COMMON_MAX_FILES_TO_COMPACT = 10;
2608    final int NEW_COMMON_MAX_FILES_TO_COMPACT = 8;
2609    final int STORE_MAX_FILES_TO_COMPACT = 6;
2610
2611    // Build a table that its maxFileToCompact different from common configuration.
2612    Configuration conf = HBaseConfiguration.create();
2613    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY,
2614      COMMON_MAX_FILES_TO_COMPACT);
2615    conf.setBoolean(CACHE_DATA_ON_READ_KEY, false);
2616    conf.setBoolean(CACHE_BLOCKS_ON_WRITE_KEY, true);
2617    conf.setBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, true);
2618    ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.newBuilder(family)
2619      .setConfiguration(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY,
2620        String.valueOf(STORE_MAX_FILES_TO_COMPACT))
2621      .build();
2622    init(this.name.getMethodName(), conf, hcd);
2623
2624    // After updating common configuration, the conf in HStore itself must not be changed.
2625    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY,
2626      NEW_COMMON_MAX_FILES_TO_COMPACT);
2627    this.store.onConfigurationChange(conf);
2628
2629    assertEquals(STORE_MAX_FILES_TO_COMPACT,
2630      store.getStoreEngine().getCompactionPolicy().getConf().getMaxFilesToCompact());
2631
2632    assertEquals(conf.getBoolean(CACHE_DATA_ON_READ_KEY, DEFAULT_CACHE_DATA_ON_READ), false);
2633    assertEquals(conf.getBoolean(CACHE_BLOCKS_ON_WRITE_KEY, DEFAULT_CACHE_DATA_ON_WRITE), true);
2634    assertEquals(conf.getBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, DEFAULT_EVICT_ON_CLOSE), true);
2635
2636    // reset to default values
2637    conf.getBoolean(CACHE_DATA_ON_READ_KEY, DEFAULT_CACHE_DATA_ON_READ);
2638    conf.getBoolean(CACHE_BLOCKS_ON_WRITE_KEY, DEFAULT_CACHE_DATA_ON_WRITE);
2639    conf.getBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, DEFAULT_EVICT_ON_CLOSE);
2640    this.store.onConfigurationChange(conf);
2641  }
2642
2643  /**
2644   * This test is for HBASE-26476
2645   */
2646  @Test
2647  public void testExtendsDefaultMemStore() throws Exception {
2648    Configuration conf = HBaseConfiguration.create();
2649    conf.setBoolean(WALFactory.WAL_ENABLED, false);
2650
2651    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build());
2652    assertTrue(this.store.memstore.getClass() == DefaultMemStore.class);
2653    tearDown();
2654
2655    conf.set(HStore.MEMSTORE_CLASS_NAME, CustomDefaultMemStore.class.getName());
2656    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build());
2657    assertTrue(this.store.memstore.getClass() == CustomDefaultMemStore.class);
2658  }
2659
2660  static class CustomDefaultMemStore extends DefaultMemStore {
2661
2662    public CustomDefaultMemStore(Configuration conf, CellComparator c,
2663      RegionServicesForStores regionServices) {
2664      super(conf, c, regionServices);
2665    }
2666
2667  }
2668
2669  /**
2670   * This test is for HBASE-26488
2671   */
2672  @Test
2673  public void testMemoryLeakWhenFlushMemStoreRetrying() throws Exception {
2674    Configuration conf = HBaseConfiguration.create();
2675
2676    byte[] smallValue = new byte[3];
2677    byte[] largeValue = new byte[9];
2678    final long timestamp = EnvironmentEdgeManager.currentTime();
2679    final long seqId = 100;
2680    final ExtendedCell smallCell = createCell(qf1, timestamp, seqId, smallValue);
2681    final ExtendedCell largeCell = createCell(qf2, timestamp, seqId, largeValue);
2682    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
2683    quals.add(qf1);
2684    quals.add(qf2);
2685
2686    conf.set(HStore.MEMSTORE_CLASS_NAME, MyDefaultMemStore1.class.getName());
2687    conf.setBoolean(WALFactory.WAL_ENABLED, false);
2688    conf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY,
2689      MyDefaultStoreFlusher.class.getName());
2690
2691    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build());
2692    MyDefaultMemStore1 myDefaultMemStore = (MyDefaultMemStore1) (store.memstore);
2693    assertTrue((store.storeEngine.getStoreFlusher()) instanceof MyDefaultStoreFlusher);
2694
2695    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
2696    store.add(smallCell, memStoreSizing);
2697    store.add(largeCell, memStoreSizing);
2698    flushStore(store, id++);
2699
2700    MemStoreLABImpl memStoreLAB =
2701      (MemStoreLABImpl) (myDefaultMemStore.snapshotImmutableSegment.getMemStoreLAB());
2702    assertTrue(memStoreLAB.isClosed());
2703    assertTrue(memStoreLAB.getRefCntValue() == 0);
2704    assertTrue(memStoreLAB.isReclaimed());
2705    assertTrue(memStoreLAB.chunks.isEmpty());
2706    StoreScanner storeScanner = null;
2707    try {
2708      storeScanner = (StoreScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1);
2709      assertTrue(store.storeEngine.getStoreFileManager().getStorefileCount() == 1);
2710      assertTrue(store.memstore.size().getCellsCount() == 0);
2711      assertTrue(store.memstore.getSnapshotSize().getCellsCount() == 0);
2712      assertTrue(storeScanner.currentScanners.size() == 1);
2713      assertTrue(storeScanner.currentScanners.get(0) instanceof StoreFileScanner);
2714
2715      List<ExtendedCell> results = new ArrayList<>();
2716      storeScanner.next((List) results);
2717      assertEquals(2, results.size());
2718      PrivateCellUtil.equals(smallCell, results.get(0));
2719      PrivateCellUtil.equals(largeCell, results.get(1));
2720    } finally {
2721      if (storeScanner != null) {
2722        storeScanner.close();
2723      }
2724    }
2725  }
2726
2727  static class MyDefaultMemStore1 extends DefaultMemStore {
2728
2729    private ImmutableSegment snapshotImmutableSegment;
2730
2731    public MyDefaultMemStore1(Configuration conf, CellComparator c,
2732      RegionServicesForStores regionServices) {
2733      super(conf, c, regionServices);
2734    }
2735
2736    @Override
2737    public MemStoreSnapshot snapshot() {
2738      MemStoreSnapshot result = super.snapshot();
2739      this.snapshotImmutableSegment = snapshot;
2740      return result;
2741    }
2742
2743  }
2744
2745  public static class MyDefaultStoreFlusher extends DefaultStoreFlusher {
2746    private static final AtomicInteger failCounter = new AtomicInteger(1);
2747    private static final AtomicInteger counter = new AtomicInteger(0);
2748
2749    public MyDefaultStoreFlusher(Configuration conf, HStore store) {
2750      super(conf, store);
2751    }
2752
2753    @Override
2754    public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
2755      MonitoredTask status, ThroughputController throughputController,
2756      FlushLifeCycleTracker tracker, Consumer<Path> writerCreationTracker) throws IOException {
2757      counter.incrementAndGet();
2758      return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker,
2759        writerCreationTracker);
2760    }
2761
2762    @Override
2763    protected void performFlush(InternalScanner scanner, final CellSink sink,
2764      ThroughputController throughputController) throws IOException {
2765
2766      final int currentCount = counter.get();
2767      CellSink newCellSink = (cell) -> {
2768        if (currentCount <= failCounter.get()) {
2769          throw new IOException("Simulated exception by tests");
2770        }
2771        sink.append(cell);
2772      };
2773      super.performFlush(scanner, newCellSink, throughputController);
2774    }
2775  }
2776
2777  /**
2778   * This test is for HBASE-26494, test the {@link RefCnt} behaviors in {@link ImmutableMemStoreLAB}
2779   */
2780  @Test
2781  public void testImmutableMemStoreLABRefCnt() throws Exception {
2782    Configuration conf = HBaseConfiguration.create();
2783
2784    byte[] smallValue = new byte[3];
2785    byte[] largeValue = new byte[9];
2786    final long timestamp = EnvironmentEdgeManager.currentTime();
2787    final long seqId = 100;
2788    final ExtendedCell smallCell1 = createCell(qf1, timestamp, seqId, smallValue);
2789    final ExtendedCell largeCell1 = createCell(qf2, timestamp, seqId, largeValue);
2790    final ExtendedCell smallCell2 = createCell(qf3, timestamp, seqId + 1, smallValue);
2791    final ExtendedCell largeCell2 = createCell(qf4, timestamp, seqId + 1, largeValue);
2792    final ExtendedCell smallCell3 = createCell(qf5, timestamp, seqId + 2, smallValue);
2793    final ExtendedCell largeCell3 = createCell(qf6, timestamp, seqId + 2, largeValue);
2794
2795    int smallCellByteSize = MutableSegment.getCellLength(smallCell1);
2796    int largeCellByteSize = MutableSegment.getCellLength(largeCell1);
2797    int firstWriteCellByteSize = (smallCellByteSize + largeCellByteSize);
2798    int flushByteSize = firstWriteCellByteSize - 2;
2799
2800    // set CompactingMemStore.inmemoryFlushSize to flushByteSize.
2801    conf.set(HStore.MEMSTORE_CLASS_NAME, CompactingMemStore.class.getName());
2802    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
2803    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
2804    conf.setBoolean(WALFactory.WAL_ENABLED, false);
2805
2806    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
2807      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
2808
2809    final CompactingMemStore myCompactingMemStore = ((CompactingMemStore) store.memstore);
2810    assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
2811    myCompactingMemStore.allowCompaction.set(false);
2812
2813    NonThreadSafeMemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
2814    store.add(smallCell1, memStoreSizing);
2815    store.add(largeCell1, memStoreSizing);
2816    store.add(smallCell2, memStoreSizing);
2817    store.add(largeCell2, memStoreSizing);
2818    store.add(smallCell3, memStoreSizing);
2819    store.add(largeCell3, memStoreSizing);
2820    VersionedSegmentsList versionedSegmentsList = myCompactingMemStore.getImmutableSegments();
2821    assertTrue(versionedSegmentsList.getNumOfSegments() == 3);
2822    List<ImmutableSegment> segments = versionedSegmentsList.getStoreSegments();
2823    List<MemStoreLABImpl> memStoreLABs = new ArrayList<MemStoreLABImpl>(segments.size());
2824    for (ImmutableSegment segment : segments) {
2825      memStoreLABs.add((MemStoreLABImpl) segment.getMemStoreLAB());
2826    }
2827    List<KeyValueScanner> scanners1 = myCompactingMemStore.getScanners(Long.MAX_VALUE);
2828    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2829      assertTrue(memStoreLAB.getRefCntValue() == 2);
2830    }
2831
2832    myCompactingMemStore.allowCompaction.set(true);
2833    myCompactingMemStore.flushInMemory();
2834
2835    versionedSegmentsList = myCompactingMemStore.getImmutableSegments();
2836    assertTrue(versionedSegmentsList.getNumOfSegments() == 1);
2837    ImmutableMemStoreLAB immutableMemStoreLAB =
2838      (ImmutableMemStoreLAB) (versionedSegmentsList.getStoreSegments().get(0).getMemStoreLAB());
2839    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2840      assertTrue(memStoreLAB.getRefCntValue() == 2);
2841    }
2842
2843    List<KeyValueScanner> scanners2 = myCompactingMemStore.getScanners(Long.MAX_VALUE);
2844    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2845      assertTrue(memStoreLAB.getRefCntValue() == 2);
2846    }
2847    assertTrue(immutableMemStoreLAB.getRefCntValue() == 2);
2848    for (KeyValueScanner scanner : scanners1) {
2849      scanner.close();
2850    }
2851    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2852      assertTrue(memStoreLAB.getRefCntValue() == 1);
2853    }
2854    for (KeyValueScanner scanner : scanners2) {
2855      scanner.close();
2856    }
2857    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2858      assertTrue(memStoreLAB.getRefCntValue() == 1);
2859    }
2860    assertTrue(immutableMemStoreLAB.getRefCntValue() == 1);
2861    flushStore(store, id++);
2862    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2863      assertTrue(memStoreLAB.getRefCntValue() == 0);
2864    }
2865    assertTrue(immutableMemStoreLAB.getRefCntValue() == 0);
2866    assertTrue(immutableMemStoreLAB.isClosed());
2867    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2868      assertTrue(memStoreLAB.isClosed());
2869      assertTrue(memStoreLAB.isReclaimed());
2870      assertTrue(memStoreLAB.chunks.isEmpty());
2871    }
2872  }
2873
2874  private HStoreFile mockStoreFileWithLength(long length) {
2875    HStoreFile sf = mock(HStoreFile.class);
2876    StoreFileReader sfr = mock(StoreFileReader.class);
2877    when(sf.isHFile()).thenReturn(true);
2878    when(sf.getReader()).thenReturn(sfr);
2879    when(sfr.length()).thenReturn(length);
2880    return sf;
2881  }
2882
2883  private static class MyThread extends Thread {
2884    private StoreScanner scanner;
2885    private KeyValueHeap heap;
2886
2887    public MyThread(StoreScanner scanner) {
2888      this.scanner = scanner;
2889    }
2890
2891    public KeyValueHeap getHeap() {
2892      return this.heap;
2893    }
2894
2895    @Override
2896    public void run() {
2897      scanner.trySwitchToStreamRead();
2898      heap = scanner.heap;
2899    }
2900  }
2901
2902  private static class MyMemStoreCompactor extends MemStoreCompactor {
2903    private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0);
2904    private static final CountDownLatch START_COMPACTOR_LATCH = new CountDownLatch(1);
2905
2906    public MyMemStoreCompactor(CompactingMemStore compactingMemStore,
2907      MemoryCompactionPolicy compactionPolicy) throws IllegalArgumentIOException {
2908      super(compactingMemStore, compactionPolicy);
2909    }
2910
2911    @Override
2912    public boolean start() throws IOException {
2913      boolean isFirst = RUNNER_COUNT.getAndIncrement() == 0;
2914      if (isFirst) {
2915        try {
2916          START_COMPACTOR_LATCH.await();
2917          return super.start();
2918        } catch (InterruptedException ex) {
2919          throw new RuntimeException(ex);
2920        }
2921      }
2922      return super.start();
2923    }
2924  }
2925
2926  public static class MyCompactingMemStoreWithCustomCompactor extends CompactingMemStore {
2927    private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0);
2928
2929    public MyCompactingMemStoreWithCustomCompactor(Configuration conf, CellComparatorImpl c,
2930      HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
2931      throws IOException {
2932      super(conf, c, store, regionServices, compactionPolicy);
2933    }
2934
2935    @Override
2936    protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy)
2937      throws IllegalArgumentIOException {
2938      return new MyMemStoreCompactor(this, compactionPolicy);
2939    }
2940
2941    @Override
2942    protected boolean setInMemoryCompactionFlag() {
2943      boolean rval = super.setInMemoryCompactionFlag();
2944      if (rval) {
2945        RUNNER_COUNT.incrementAndGet();
2946        if (LOG.isDebugEnabled()) {
2947          LOG.debug("runner count: " + RUNNER_COUNT.get());
2948        }
2949      }
2950      return rval;
2951    }
2952  }
2953
2954  public static class MyCompactingMemStore extends CompactingMemStore {
2955    private static final AtomicBoolean START_TEST = new AtomicBoolean(false);
2956    private final CountDownLatch getScannerLatch = new CountDownLatch(1);
2957    private final CountDownLatch snapshotLatch = new CountDownLatch(1);
2958
2959    public MyCompactingMemStore(Configuration conf, CellComparatorImpl c, HStore store,
2960      RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
2961      throws IOException {
2962      super(conf, c, store, regionServices, compactionPolicy);
2963    }
2964
2965    @Override
2966    protected List<KeyValueScanner> createList(int capacity) {
2967      if (START_TEST.get()) {
2968        try {
2969          getScannerLatch.countDown();
2970          snapshotLatch.await();
2971        } catch (InterruptedException e) {
2972          throw new RuntimeException(e);
2973        }
2974      }
2975      return new ArrayList<>(capacity);
2976    }
2977
2978    @Override
2979    protected void pushActiveToPipeline(MutableSegment active, boolean checkEmpty) {
2980      if (START_TEST.get()) {
2981        try {
2982          getScannerLatch.await();
2983        } catch (InterruptedException e) {
2984          throw new RuntimeException(e);
2985        }
2986      }
2987
2988      super.pushActiveToPipeline(active, checkEmpty);
2989      if (START_TEST.get()) {
2990        snapshotLatch.countDown();
2991      }
2992    }
2993  }
2994
2995  interface MyListHook {
2996    void hook(int currentSize);
2997  }
2998
2999  private static class MyList<T> implements List<T> {
3000    private final List<T> delegatee = new ArrayList<>();
3001    private final MyListHook hookAtAdd;
3002
3003    MyList(final MyListHook hookAtAdd) {
3004      this.hookAtAdd = hookAtAdd;
3005    }
3006
3007    @Override
3008    public int size() {
3009      return delegatee.size();
3010    }
3011
3012    @Override
3013    public boolean isEmpty() {
3014      return delegatee.isEmpty();
3015    }
3016
3017    @Override
3018    public boolean contains(Object o) {
3019      return delegatee.contains(o);
3020    }
3021
3022    @Override
3023    public Iterator<T> iterator() {
3024      return delegatee.iterator();
3025    }
3026
3027    @Override
3028    public Object[] toArray() {
3029      return delegatee.toArray();
3030    }
3031
3032    @Override
3033    public <R> R[] toArray(R[] a) {
3034      return delegatee.toArray(a);
3035    }
3036
3037    @Override
3038    public boolean add(T e) {
3039      hookAtAdd.hook(size());
3040      return delegatee.add(e);
3041    }
3042
3043    @Override
3044    public boolean remove(Object o) {
3045      return delegatee.remove(o);
3046    }
3047
3048    @Override
3049    public boolean containsAll(Collection<?> c) {
3050      return delegatee.containsAll(c);
3051    }
3052
3053    @Override
3054    public boolean addAll(Collection<? extends T> c) {
3055      return delegatee.addAll(c);
3056    }
3057
3058    @Override
3059    public boolean addAll(int index, Collection<? extends T> c) {
3060      return delegatee.addAll(index, c);
3061    }
3062
3063    @Override
3064    public boolean removeAll(Collection<?> c) {
3065      return delegatee.removeAll(c);
3066    }
3067
3068    @Override
3069    public boolean retainAll(Collection<?> c) {
3070      return delegatee.retainAll(c);
3071    }
3072
3073    @Override
3074    public void clear() {
3075      delegatee.clear();
3076    }
3077
3078    @Override
3079    public T get(int index) {
3080      return delegatee.get(index);
3081    }
3082
3083    @Override
3084    public T set(int index, T element) {
3085      return delegatee.set(index, element);
3086    }
3087
3088    @Override
3089    public void add(int index, T element) {
3090      delegatee.add(index, element);
3091    }
3092
3093    @Override
3094    public T remove(int index) {
3095      return delegatee.remove(index);
3096    }
3097
3098    @Override
3099    public int indexOf(Object o) {
3100      return delegatee.indexOf(o);
3101    }
3102
3103    @Override
3104    public int lastIndexOf(Object o) {
3105      return delegatee.lastIndexOf(o);
3106    }
3107
3108    @Override
3109    public ListIterator<T> listIterator() {
3110      return delegatee.listIterator();
3111    }
3112
3113    @Override
3114    public ListIterator<T> listIterator(int index) {
3115      return delegatee.listIterator(index);
3116    }
3117
3118    @Override
3119    public List<T> subList(int fromIndex, int toIndex) {
3120      return delegatee.subList(fromIndex, toIndex);
3121    }
3122  }
3123
3124  public static class MyCompactingMemStore2 extends CompactingMemStore {
3125    private static final String LARGE_CELL_THREAD_NAME = "largeCellThread";
3126    private static final String SMALL_CELL_THREAD_NAME = "smallCellThread";
3127    private final CyclicBarrier preCyclicBarrier = new CyclicBarrier(2);
3128    private final CyclicBarrier postCyclicBarrier = new CyclicBarrier(2);
3129    private final AtomicInteger largeCellPreUpdateCounter = new AtomicInteger(0);
3130    private final AtomicInteger smallCellPreUpdateCounter = new AtomicInteger(0);
3131
3132    public MyCompactingMemStore2(Configuration conf, CellComparatorImpl cellComparator,
3133      HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
3134      throws IOException {
3135      super(conf, cellComparator, store, regionServices, compactionPolicy);
3136    }
3137
3138    @Override
3139    protected boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd,
3140      MemStoreSizing memstoreSizing) {
3141      if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) {
3142        int currentCount = largeCellPreUpdateCounter.incrementAndGet();
3143        if (currentCount <= 1) {
3144          try {
3145            /**
3146             * smallCellThread enters CompactingMemStore.checkAndAddToActiveSize first, then
3147             * largeCellThread enters CompactingMemStore.checkAndAddToActiveSize, and then
3148             * largeCellThread invokes flushInMemory.
3149             */
3150            preCyclicBarrier.await();
3151          } catch (Throwable e) {
3152            throw new RuntimeException(e);
3153          }
3154        }
3155      }
3156
3157      boolean returnValue = super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing);
3158      if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) {
3159        try {
3160          preCyclicBarrier.await();
3161        } catch (Throwable e) {
3162          throw new RuntimeException(e);
3163        }
3164      }
3165      return returnValue;
3166    }
3167
3168    @Override
3169    protected void doAdd(MutableSegment currentActive, ExtendedCell cell,
3170      MemStoreSizing memstoreSizing) {
3171      if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) {
3172        try {
3173          /**
3174           * After largeCellThread finished flushInMemory method, smallCellThread can add cell to
3175           * currentActive . That is to say when largeCellThread called flushInMemory method,
3176           * currentActive has no cell.
3177           */
3178          postCyclicBarrier.await();
3179        } catch (Throwable e) {
3180          throw new RuntimeException(e);
3181        }
3182      }
3183      super.doAdd(currentActive, cell, memstoreSizing);
3184    }
3185
3186    @Override
3187    protected void flushInMemory(MutableSegment currentActiveMutableSegment) {
3188      super.flushInMemory(currentActiveMutableSegment);
3189      if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) {
3190        if (largeCellPreUpdateCounter.get() <= 1) {
3191          try {
3192            postCyclicBarrier.await();
3193          } catch (Throwable e) {
3194            throw new RuntimeException(e);
3195          }
3196        }
3197      }
3198    }
3199
3200  }
3201
3202  public static class MyCompactingMemStore3 extends CompactingMemStore {
3203    private static final String LARGE_CELL_THREAD_NAME = "largeCellThread";
3204    private static final String SMALL_CELL_THREAD_NAME = "smallCellThread";
3205
3206    private final CyclicBarrier preCyclicBarrier = new CyclicBarrier(2);
3207    private final CyclicBarrier postCyclicBarrier = new CyclicBarrier(2);
3208    private final AtomicInteger flushCounter = new AtomicInteger(0);
3209    private static final int CELL_COUNT = 5;
3210    private boolean flushByteSizeLessThanSmallAndLargeCellSize = true;
3211
3212    public MyCompactingMemStore3(Configuration conf, CellComparatorImpl cellComparator,
3213      HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
3214      throws IOException {
3215      super(conf, cellComparator, store, regionServices, compactionPolicy);
3216    }
3217
3218    @Override
3219    protected boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd,
3220      MemStoreSizing memstoreSizing) {
3221      if (!flushByteSizeLessThanSmallAndLargeCellSize) {
3222        return super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing);
3223      }
3224      if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) {
3225        try {
3226          preCyclicBarrier.await();
3227        } catch (Throwable e) {
3228          throw new RuntimeException(e);
3229        }
3230      }
3231
3232      boolean returnValue = super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing);
3233      if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) {
3234        try {
3235          preCyclicBarrier.await();
3236        } catch (Throwable e) {
3237          throw new RuntimeException(e);
3238        }
3239      }
3240      return returnValue;
3241    }
3242
3243    @Override
3244    protected void postUpdate(MutableSegment currentActiveMutableSegment) {
3245      super.postUpdate(currentActiveMutableSegment);
3246      if (!flushByteSizeLessThanSmallAndLargeCellSize) {
3247        try {
3248          postCyclicBarrier.await();
3249        } catch (Throwable e) {
3250          throw new RuntimeException(e);
3251        }
3252        return;
3253      }
3254
3255      if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) {
3256        try {
3257          postCyclicBarrier.await();
3258        } catch (Throwable e) {
3259          throw new RuntimeException(e);
3260        }
3261      }
3262    }
3263
3264    @Override
3265    protected void flushInMemory(MutableSegment currentActiveMutableSegment) {
3266      super.flushInMemory(currentActiveMutableSegment);
3267      flushCounter.incrementAndGet();
3268      if (!flushByteSizeLessThanSmallAndLargeCellSize) {
3269        return;
3270      }
3271
3272      assertTrue(Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME));
3273      try {
3274        postCyclicBarrier.await();
3275      } catch (Throwable e) {
3276        throw new RuntimeException(e);
3277      }
3278
3279    }
3280
3281    void disableCompaction() {
3282      allowCompaction.set(false);
3283    }
3284
3285    void enableCompaction() {
3286      allowCompaction.set(true);
3287    }
3288
3289  }
3290
3291  public static class MyCompactingMemStore4 extends CompactingMemStore {
3292    private static final String TAKE_SNAPSHOT_THREAD_NAME = "takeSnapShotThread";
3293    /**
3294     * {@link CompactingMemStore#flattenOneSegment} must execute after
3295     * {@link CompactingMemStore#getImmutableSegments}
3296     */
3297    private final CyclicBarrier flattenOneSegmentPreCyclicBarrier = new CyclicBarrier(2);
3298    /**
3299     * Only after {@link CompactingMemStore#flattenOneSegment} completed,
3300     * {@link CompactingMemStore#swapPipelineWithNull} could execute.
3301     */
3302    private final CyclicBarrier flattenOneSegmentPostCyclicBarrier = new CyclicBarrier(2);
3303    /**
3304     * Only the in memory compact thread enters {@link CompactingMemStore#flattenOneSegment},the
3305     * snapshot thread starts {@link CompactingMemStore#snapshot},because
3306     * {@link CompactingMemStore#snapshot} would invoke {@link CompactingMemStore#stopCompaction}.
3307     */
3308    private final CyclicBarrier snapShotStartCyclicCyclicBarrier = new CyclicBarrier(2);
3309    /**
3310     * To wait for {@link CompactingMemStore.InMemoryCompactionRunnable} stopping.
3311     */
3312    private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2);
3313    private final AtomicInteger getImmutableSegmentsListCounter = new AtomicInteger(0);
3314    private final AtomicInteger swapPipelineWithNullCounter = new AtomicInteger(0);
3315    private final AtomicInteger flattenOneSegmentCounter = new AtomicInteger(0);
3316    private final AtomicInteger setInMemoryCompactionFlagCounter = new AtomicInteger(0);
3317
3318    public MyCompactingMemStore4(Configuration conf, CellComparatorImpl cellComparator,
3319      HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
3320      throws IOException {
3321      super(conf, cellComparator, store, regionServices, compactionPolicy);
3322    }
3323
3324    @Override
3325    public VersionedSegmentsList getImmutableSegments() {
3326      VersionedSegmentsList result = super.getImmutableSegments();
3327      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
3328        int currentCount = getImmutableSegmentsListCounter.incrementAndGet();
3329        if (currentCount <= 1) {
3330          try {
3331            flattenOneSegmentPreCyclicBarrier.await();
3332          } catch (Throwable e) {
3333            throw new RuntimeException(e);
3334          }
3335        }
3336      }
3337      return result;
3338    }
3339
3340    @Override
3341    protected boolean swapPipelineWithNull(VersionedSegmentsList segments) {
3342      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
3343        int currentCount = swapPipelineWithNullCounter.incrementAndGet();
3344        if (currentCount <= 1) {
3345          try {
3346            flattenOneSegmentPostCyclicBarrier.await();
3347          } catch (Throwable e) {
3348            throw new RuntimeException(e);
3349          }
3350        }
3351      }
3352      boolean result = super.swapPipelineWithNull(segments);
3353      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
3354        int currentCount = swapPipelineWithNullCounter.get();
3355        if (currentCount <= 1) {
3356          assertTrue(!result);
3357        }
3358        if (currentCount == 2) {
3359          assertTrue(result);
3360        }
3361      }
3362      return result;
3363
3364    }
3365
3366    @Override
3367    public void flattenOneSegment(long requesterVersion, Action action) {
3368      int currentCount = flattenOneSegmentCounter.incrementAndGet();
3369      if (currentCount <= 1) {
3370        try {
3371          /**
3372           * {@link CompactingMemStore#snapshot} could start.
3373           */
3374          snapShotStartCyclicCyclicBarrier.await();
3375          flattenOneSegmentPreCyclicBarrier.await();
3376        } catch (Throwable e) {
3377          throw new RuntimeException(e);
3378        }
3379      }
3380      super.flattenOneSegment(requesterVersion, action);
3381      if (currentCount <= 1) {
3382        try {
3383          flattenOneSegmentPostCyclicBarrier.await();
3384        } catch (Throwable e) {
3385          throw new RuntimeException(e);
3386        }
3387      }
3388    }
3389
3390    @Override
3391    protected boolean setInMemoryCompactionFlag() {
3392      boolean result = super.setInMemoryCompactionFlag();
3393      assertTrue(result);
3394      setInMemoryCompactionFlagCounter.incrementAndGet();
3395      return result;
3396    }
3397
3398    @Override
3399    void inMemoryCompaction() {
3400      try {
3401        super.inMemoryCompaction();
3402      } finally {
3403        try {
3404          inMemoryCompactionEndCyclicBarrier.await();
3405        } catch (Throwable e) {
3406          throw new RuntimeException(e);
3407        }
3408
3409      }
3410    }
3411
3412  }
3413
3414  public static class MyCompactingMemStore5 extends CompactingMemStore {
3415    private static final String TAKE_SNAPSHOT_THREAD_NAME = "takeSnapShotThread";
3416    private static final String WRITE_AGAIN_THREAD_NAME = "writeAgainThread";
3417    /**
3418     * {@link CompactingMemStore#flattenOneSegment} must execute after
3419     * {@link CompactingMemStore#getImmutableSegments}
3420     */
3421    private final CyclicBarrier flattenOneSegmentPreCyclicBarrier = new CyclicBarrier(2);
3422    /**
3423     * Only after {@link CompactingMemStore#flattenOneSegment} completed,
3424     * {@link CompactingMemStore#swapPipelineWithNull} could execute.
3425     */
3426    private final CyclicBarrier flattenOneSegmentPostCyclicBarrier = new CyclicBarrier(2);
3427    /**
3428     * Only the in memory compact thread enters {@link CompactingMemStore#flattenOneSegment},the
3429     * snapshot thread starts {@link CompactingMemStore#snapshot},because
3430     * {@link CompactingMemStore#snapshot} would invoke {@link CompactingMemStore#stopCompaction}.
3431     */
3432    private final CyclicBarrier snapShotStartCyclicCyclicBarrier = new CyclicBarrier(2);
3433    /**
3434     * To wait for {@link CompactingMemStore.InMemoryCompactionRunnable} stopping.
3435     */
3436    private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2);
3437    private final AtomicInteger getImmutableSegmentsListCounter = new AtomicInteger(0);
3438    private final AtomicInteger swapPipelineWithNullCounter = new AtomicInteger(0);
3439    private final AtomicInteger flattenOneSegmentCounter = new AtomicInteger(0);
3440    private final AtomicInteger setInMemoryCompactionFlagCounter = new AtomicInteger(0);
3441    /**
3442     * Only the snapshot thread retry {@link CompactingMemStore#swapPipelineWithNull}, writeAgain
3443     * thread could start.
3444     */
3445    private final CyclicBarrier writeMemStoreAgainStartCyclicBarrier = new CyclicBarrier(2);
3446    /**
3447     * This is used for snapshot thread,writeAgain thread and in memory compact thread. Only the
3448     * writeAgain thread completes, {@link CompactingMemStore#swapPipelineWithNull} would
3449     * execute,and in memory compact thread would exit,because we expect that in memory compact
3450     * executing only once.
3451     */
3452    private final CyclicBarrier writeMemStoreAgainEndCyclicBarrier = new CyclicBarrier(3);
3453
3454    public MyCompactingMemStore5(Configuration conf, CellComparatorImpl cellComparator,
3455      HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
3456      throws IOException {
3457      super(conf, cellComparator, store, regionServices, compactionPolicy);
3458    }
3459
3460    @Override
3461    public VersionedSegmentsList getImmutableSegments() {
3462      VersionedSegmentsList result = super.getImmutableSegments();
3463      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
3464        int currentCount = getImmutableSegmentsListCounter.incrementAndGet();
3465        if (currentCount <= 1) {
3466          try {
3467            flattenOneSegmentPreCyclicBarrier.await();
3468          } catch (Throwable e) {
3469            throw new RuntimeException(e);
3470          }
3471        }
3472
3473      }
3474
3475      return result;
3476    }
3477
3478    @Override
3479    protected boolean swapPipelineWithNull(VersionedSegmentsList segments) {
3480      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
3481        int currentCount = swapPipelineWithNullCounter.incrementAndGet();
3482        if (currentCount <= 1) {
3483          try {
3484            flattenOneSegmentPostCyclicBarrier.await();
3485          } catch (Throwable e) {
3486            throw new RuntimeException(e);
3487          }
3488        }
3489
3490        if (currentCount == 2) {
3491          try {
3492            /**
3493             * Only the snapshot thread retry {@link CompactingMemStore#swapPipelineWithNull},
3494             * writeAgain thread could start.
3495             */
3496            writeMemStoreAgainStartCyclicBarrier.await();
3497            /**
3498             * Only the writeAgain thread completes, retry
3499             * {@link CompactingMemStore#swapPipelineWithNull} would execute.
3500             */
3501            writeMemStoreAgainEndCyclicBarrier.await();
3502          } catch (Throwable e) {
3503            throw new RuntimeException(e);
3504          }
3505        }
3506
3507      }
3508      boolean result = super.swapPipelineWithNull(segments);
3509      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
3510        int currentCount = swapPipelineWithNullCounter.get();
3511        if (currentCount <= 1) {
3512          assertTrue(!result);
3513        }
3514        if (currentCount == 2) {
3515          assertTrue(result);
3516        }
3517      }
3518      return result;
3519
3520    }
3521
3522    @Override
3523    public void flattenOneSegment(long requesterVersion, Action action) {
3524      int currentCount = flattenOneSegmentCounter.incrementAndGet();
3525      if (currentCount <= 1) {
3526        try {
3527          /**
3528           * {@link CompactingMemStore#snapshot} could start.
3529           */
3530          snapShotStartCyclicCyclicBarrier.await();
3531          flattenOneSegmentPreCyclicBarrier.await();
3532        } catch (Throwable e) {
3533          throw new RuntimeException(e);
3534        }
3535      }
3536      super.flattenOneSegment(requesterVersion, action);
3537      if (currentCount <= 1) {
3538        try {
3539          flattenOneSegmentPostCyclicBarrier.await();
3540          /**
3541           * Only the writeAgain thread completes, in memory compact thread would exit,because we
3542           * expect that in memory compact executing only once.
3543           */
3544          writeMemStoreAgainEndCyclicBarrier.await();
3545        } catch (Throwable e) {
3546          throw new RuntimeException(e);
3547        }
3548
3549      }
3550    }
3551
3552    @Override
3553    protected boolean setInMemoryCompactionFlag() {
3554      boolean result = super.setInMemoryCompactionFlag();
3555      int count = setInMemoryCompactionFlagCounter.incrementAndGet();
3556      if (count <= 1) {
3557        assertTrue(result);
3558      }
3559      if (count == 2) {
3560        assertTrue(!result);
3561      }
3562      return result;
3563    }
3564
3565    @Override
3566    void inMemoryCompaction() {
3567      try {
3568        super.inMemoryCompaction();
3569      } finally {
3570        try {
3571          inMemoryCompactionEndCyclicBarrier.await();
3572        } catch (Throwable e) {
3573          throw new RuntimeException(e);
3574        }
3575
3576      }
3577    }
3578  }
3579
3580  public static class MyCompactingMemStore6 extends CompactingMemStore {
3581    private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2);
3582
3583    public MyCompactingMemStore6(Configuration conf, CellComparatorImpl cellComparator,
3584      HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
3585      throws IOException {
3586      super(conf, cellComparator, store, regionServices, compactionPolicy);
3587    }
3588
3589    @Override
3590    void inMemoryCompaction() {
3591      try {
3592        super.inMemoryCompaction();
3593      } finally {
3594        try {
3595          inMemoryCompactionEndCyclicBarrier.await();
3596        } catch (Throwable e) {
3597          throw new RuntimeException(e);
3598        }
3599
3600      }
3601    }
3602  }
3603
3604  public static class MyDefaultMemStore extends DefaultMemStore {
3605    private static final String GET_SCANNER_THREAD_NAME = "getScannerMyThread";
3606    private static final String FLUSH_THREAD_NAME = "flushMyThread";
3607    /**
3608     * Only when flush thread enters {@link DefaultMemStore#doClearSnapShot}, getScanner thread
3609     * could start.
3610     */
3611    private final CyclicBarrier getScannerCyclicBarrier = new CyclicBarrier(2);
3612    /**
3613     * Used by getScanner thread notifies flush thread {@link DefaultMemStore#getSnapshotSegments}
3614     * completed, {@link DefaultMemStore#doClearSnapShot} could continue.
3615     */
3616    private final CyclicBarrier preClearSnapShotCyclicBarrier = new CyclicBarrier(2);
3617    /**
3618     * Used by flush thread notifies getScanner thread {@link DefaultMemStore#doClearSnapShot}
3619     * completed, {@link DefaultMemStore#getScanners} could continue.
3620     */
3621    private final CyclicBarrier postClearSnapShotCyclicBarrier = new CyclicBarrier(2);
3622    private final AtomicInteger getSnapshotSegmentsCounter = new AtomicInteger(0);
3623    private final AtomicInteger clearSnapshotCounter = new AtomicInteger(0);
3624    private volatile boolean shouldWait = true;
3625    private volatile HStore store = null;
3626
3627    public MyDefaultMemStore(Configuration conf, CellComparator cellComparator,
3628      RegionServicesForStores regionServices) throws IOException {
3629      super(conf, cellComparator, regionServices);
3630    }
3631
3632    @Override
3633    protected List<Segment> getSnapshotSegments() {
3634
3635      List<Segment> result = super.getSnapshotSegments();
3636
3637      if (Thread.currentThread().getName().equals(GET_SCANNER_THREAD_NAME)) {
3638        int currentCount = getSnapshotSegmentsCounter.incrementAndGet();
3639        if (currentCount == 1) {
3640          if (this.shouldWait) {
3641            try {
3642              /**
3643               * Notify flush thread {@link DefaultMemStore#getSnapshotSegments} completed,
3644               * {@link DefaultMemStore#doClearSnapShot} could continue.
3645               */
3646              preClearSnapShotCyclicBarrier.await();
3647              /**
3648               * Wait for {@link DefaultMemStore#doClearSnapShot} completed.
3649               */
3650              postClearSnapShotCyclicBarrier.await();
3651
3652            } catch (Throwable e) {
3653              throw new RuntimeException(e);
3654            }
3655          }
3656        }
3657      }
3658      return result;
3659    }
3660
3661    @Override
3662    protected void doClearSnapShot() {
3663      if (Thread.currentThread().getName().equals(FLUSH_THREAD_NAME)) {
3664        int currentCount = clearSnapshotCounter.incrementAndGet();
3665        if (currentCount == 1) {
3666          try {
3667            if (
3668              ((ReentrantReadWriteLock) store.getStoreEngine().getLock())
3669                .isWriteLockedByCurrentThread()
3670            ) {
3671              shouldWait = false;
3672            }
3673            /**
3674             * Only when flush thread enters {@link DefaultMemStore#doClearSnapShot}, getScanner
3675             * thread could start.
3676             */
3677            getScannerCyclicBarrier.await();
3678
3679            if (shouldWait) {
3680              /**
3681               * Wait for {@link DefaultMemStore#getSnapshotSegments} completed.
3682               */
3683              preClearSnapShotCyclicBarrier.await();
3684            }
3685          } catch (Throwable e) {
3686            throw new RuntimeException(e);
3687          }
3688        }
3689      }
3690      super.doClearSnapShot();
3691
3692      if (Thread.currentThread().getName().equals(FLUSH_THREAD_NAME)) {
3693        int currentCount = clearSnapshotCounter.get();
3694        if (currentCount == 1) {
3695          if (shouldWait) {
3696            try {
3697              /**
3698               * Notify getScanner thread {@link DefaultMemStore#doClearSnapShot} completed,
3699               * {@link DefaultMemStore#getScanners} could continue.
3700               */
3701              postClearSnapShotCyclicBarrier.await();
3702            } catch (Throwable e) {
3703              throw new RuntimeException(e);
3704            }
3705          }
3706        }
3707      }
3708    }
3709  }
3710}