001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.io.hfile.CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY;
021import static org.apache.hadoop.hbase.io.hfile.CacheConfig.CACHE_DATA_ON_READ_KEY;
022import static org.apache.hadoop.hbase.io.hfile.CacheConfig.DEFAULT_CACHE_DATA_ON_READ;
023import static org.apache.hadoop.hbase.io.hfile.CacheConfig.DEFAULT_CACHE_DATA_ON_WRITE;
024import static org.apache.hadoop.hbase.io.hfile.CacheConfig.DEFAULT_EVICT_ON_CLOSE;
025import static org.apache.hadoop.hbase.io.hfile.CacheConfig.EVICT_BLOCKS_ON_CLOSE_KEY;
026import static org.apache.hadoop.hbase.regionserver.DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY;
027import static org.junit.Assert.assertArrayEquals;
028import static org.junit.Assert.assertEquals;
029import static org.junit.Assert.assertFalse;
030import static org.junit.Assert.assertNull;
031import static org.junit.Assert.assertTrue;
032import static org.junit.Assert.fail;
033import static org.mockito.ArgumentMatchers.any;
034import static org.mockito.Mockito.mock;
035import static org.mockito.Mockito.spy;
036import static org.mockito.Mockito.times;
037import static org.mockito.Mockito.verify;
038import static org.mockito.Mockito.when;
039
040import java.io.FileNotFoundException;
041import java.io.IOException;
042import java.lang.ref.SoftReference;
043import java.security.PrivilegedExceptionAction;
044import java.util.ArrayList;
045import java.util.Arrays;
046import java.util.Collection;
047import java.util.Collections;
048import java.util.Iterator;
049import java.util.List;
050import java.util.ListIterator;
051import java.util.NavigableSet;
052import java.util.Optional;
053import java.util.TreeSet;
054import java.util.concurrent.BrokenBarrierException;
055import java.util.concurrent.ConcurrentSkipListSet;
056import java.util.concurrent.CountDownLatch;
057import java.util.concurrent.CyclicBarrier;
058import java.util.concurrent.ExecutorService;
059import java.util.concurrent.Executors;
060import java.util.concurrent.Future;
061import java.util.concurrent.ThreadPoolExecutor;
062import java.util.concurrent.TimeUnit;
063import java.util.concurrent.atomic.AtomicBoolean;
064import java.util.concurrent.atomic.AtomicInteger;
065import java.util.concurrent.atomic.AtomicLong;
066import java.util.concurrent.atomic.AtomicReference;
067import java.util.concurrent.locks.ReentrantReadWriteLock;
068import java.util.function.Consumer;
069import java.util.function.IntBinaryOperator;
070import java.util.function.IntConsumer;
071import org.apache.hadoop.conf.Configuration;
072import org.apache.hadoop.fs.FSDataOutputStream;
073import org.apache.hadoop.fs.FileStatus;
074import org.apache.hadoop.fs.FileSystem;
075import org.apache.hadoop.fs.FilterFileSystem;
076import org.apache.hadoop.fs.LocalFileSystem;
077import org.apache.hadoop.fs.Path;
078import org.apache.hadoop.fs.permission.FsPermission;
079import org.apache.hadoop.hbase.Cell;
080import org.apache.hadoop.hbase.CellBuilderType;
081import org.apache.hadoop.hbase.CellComparator;
082import org.apache.hadoop.hbase.CellComparatorImpl;
083import org.apache.hadoop.hbase.CellUtil;
084import org.apache.hadoop.hbase.ExtendedCell;
085import org.apache.hadoop.hbase.ExtendedCellBuilderFactory;
086import org.apache.hadoop.hbase.HBaseClassTestRule;
087import org.apache.hadoop.hbase.HBaseConfiguration;
088import org.apache.hadoop.hbase.HBaseTestingUtil;
089import org.apache.hadoop.hbase.HConstants;
090import org.apache.hadoop.hbase.KeyValue;
091import org.apache.hadoop.hbase.MemoryCompactionPolicy;
092import org.apache.hadoop.hbase.NamespaceDescriptor;
093import org.apache.hadoop.hbase.PrivateCellUtil;
094import org.apache.hadoop.hbase.TableName;
095import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
096import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
097import org.apache.hadoop.hbase.client.Get;
098import org.apache.hadoop.hbase.client.RegionInfo;
099import org.apache.hadoop.hbase.client.RegionInfoBuilder;
100import org.apache.hadoop.hbase.client.Scan;
101import org.apache.hadoop.hbase.client.Scan.ReadType;
102import org.apache.hadoop.hbase.client.TableDescriptor;
103import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
104import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
105import org.apache.hadoop.hbase.filter.Filter;
106import org.apache.hadoop.hbase.filter.FilterBase;
107import org.apache.hadoop.hbase.io.compress.Compression;
108import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
109import org.apache.hadoop.hbase.io.hfile.CacheConfig;
110import org.apache.hadoop.hbase.io.hfile.HFile;
111import org.apache.hadoop.hbase.io.hfile.HFileContext;
112import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
113import org.apache.hadoop.hbase.monitoring.MonitoredTask;
114import org.apache.hadoop.hbase.nio.RefCnt;
115import org.apache.hadoop.hbase.quotas.RegionSizeStoreImpl;
116import org.apache.hadoop.hbase.regionserver.ChunkCreator.ChunkType;
117import org.apache.hadoop.hbase.regionserver.MemStoreCompactionStrategy.Action;
118import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
119import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
120import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
121import org.apache.hadoop.hbase.regionserver.compactions.EverythingPolicy;
122import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
123import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
124import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
125import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
126import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
127import org.apache.hadoop.hbase.security.User;
128import org.apache.hadoop.hbase.testclassification.MediumTests;
129import org.apache.hadoop.hbase.testclassification.RegionServerTests;
130import org.apache.hadoop.hbase.util.BloomFilterUtil;
131import org.apache.hadoop.hbase.util.Bytes;
132import org.apache.hadoop.hbase.util.CommonFSUtils;
133import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
134import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
135import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
136import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
137import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
138import org.apache.hadoop.hbase.wal.WALFactory;
139import org.apache.hadoop.util.Progressable;
140import org.junit.After;
141import org.junit.AfterClass;
142import org.junit.Before;
143import org.junit.ClassRule;
144import org.junit.Rule;
145import org.junit.Test;
146import org.junit.experimental.categories.Category;
147import org.junit.rules.TestName;
148import org.mockito.Mockito;
149import org.slf4j.Logger;
150import org.slf4j.LoggerFactory;
151
152import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
153
154/**
155 * Test class for the HStore
156 */
157@Category({ RegionServerTests.class, MediumTests.class })
158public class TestHStore {
159
160  @ClassRule
161  public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestHStore.class);
162
163  private static final Logger LOG = LoggerFactory.getLogger(TestHStore.class);
164  @Rule
165  public TestName name = new TestName();
166
167  HRegion region;
168  HStore store;
169  byte[] table = Bytes.toBytes("table");
170  byte[] family = Bytes.toBytes("family");
171
172  byte[] row = Bytes.toBytes("row");
173  byte[] row2 = Bytes.toBytes("row2");
174  byte[] qf1 = Bytes.toBytes("qf1");
175  byte[] qf2 = Bytes.toBytes("qf2");
176  byte[] qf3 = Bytes.toBytes("qf3");
177  byte[] qf4 = Bytes.toBytes("qf4");
178  byte[] qf5 = Bytes.toBytes("qf5");
179  byte[] qf6 = Bytes.toBytes("qf6");
180
181  NavigableSet<byte[]> qualifiers = new ConcurrentSkipListSet<>(Bytes.BYTES_COMPARATOR);
182
183  List<Cell> expected = new ArrayList<>();
184  List<Cell> result = new ArrayList<>();
185
186  long id = EnvironmentEdgeManager.currentTime();
187  Get get = new Get(row);
188
189  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
190  private static final String DIR = TEST_UTIL.getDataTestDir("TestStore").toString();
191
192  @Before
193  public void setUp() throws IOException {
194    qualifiers.clear();
195    qualifiers.add(qf1);
196    qualifiers.add(qf3);
197    qualifiers.add(qf5);
198
199    Iterator<byte[]> iter = qualifiers.iterator();
200    while (iter.hasNext()) {
201      byte[] next = iter.next();
202      expected.add(new KeyValue(row, family, next, 1, (byte[]) null));
203      get.addColumn(family, next);
204    }
205  }
206
207  private void init(String methodName) throws IOException {
208    init(methodName, TEST_UTIL.getConfiguration());
209  }
210
211  private HStore init(String methodName, Configuration conf) throws IOException {
212    // some of the tests write 4 versions and then flush
213    // (with HBASE-4241, lower versions are collected on flush)
214    return init(methodName, conf,
215      ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(4).build());
216  }
217
218  private HStore init(String methodName, Configuration conf, ColumnFamilyDescriptor hcd)
219    throws IOException {
220    return init(methodName, conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), hcd);
221  }
222
223  private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder,
224    ColumnFamilyDescriptor hcd) throws IOException {
225    return init(methodName, conf, builder, hcd, null);
226  }
227
228  private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder,
229    ColumnFamilyDescriptor hcd, MyStoreHook hook) throws IOException {
230    return init(methodName, conf, builder, hcd, hook, false);
231  }
232
233  private void initHRegion(String methodName, Configuration conf, TableDescriptorBuilder builder,
234    ColumnFamilyDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException {
235    TableDescriptor htd = builder.setColumnFamily(hcd).build();
236    Path basedir = new Path(DIR + methodName);
237    Path tableDir = CommonFSUtils.getTableDir(basedir, htd.getTableName());
238    final Path logdir = new Path(basedir, AbstractFSWALProvider.getWALDirectoryName(methodName));
239
240    FileSystem fs = FileSystem.get(conf);
241
242    fs.delete(logdir, true);
243    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false,
244      MemStoreLABImpl.CHUNK_SIZE_DEFAULT, 1, 0, null,
245      MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
246    RegionInfo info = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
247    Configuration walConf = new Configuration(conf);
248    CommonFSUtils.setRootDir(walConf, basedir);
249    WALFactory wals = new WALFactory(walConf, methodName);
250    region = new HRegion(new HRegionFileSystem(conf, fs, tableDir, info), wals.getWAL(info), conf,
251      htd, null);
252    region.regionServicesForStores = Mockito.spy(region.regionServicesForStores);
253    ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
254    Mockito.when(region.regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool);
255  }
256
257  private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder,
258    ColumnFamilyDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException {
259    initHRegion(methodName, conf, builder, hcd, hook, switchToPread);
260    if (hook == null) {
261      store = new HStore(region, hcd, conf, false);
262    } else {
263      store = new MyStore(region, hcd, conf, hook, switchToPread);
264    }
265    region.stores.put(store.getColumnFamilyDescriptor().getName(), store);
266    return store;
267  }
268
269  /**
270   * Test we do not lose data if we fail a flush and then close. Part of HBase-10466
271   */
272  @Test
273  public void testFlushSizeSizing() throws Exception {
274    LOG.info("Setting up a faulty file system that cannot write in " + this.name.getMethodName());
275    final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
276    // Only retry once.
277    conf.setInt("hbase.hstore.flush.retries.number", 1);
278    User user = User.createUserForTesting(conf, this.name.getMethodName(), new String[] { "foo" });
279    // Inject our faulty LocalFileSystem
280    conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
281    user.runAs(new PrivilegedExceptionAction<Object>() {
282      @Override
283      public Object run() throws Exception {
284        // Make sure it worked (above is sensitive to caching details in hadoop core)
285        FileSystem fs = FileSystem.get(conf);
286        assertEquals(FaultyFileSystem.class, fs.getClass());
287        FaultyFileSystem ffs = (FaultyFileSystem) fs;
288
289        // Initialize region
290        init(name.getMethodName(), conf);
291
292        MemStoreSize mss = store.memstore.getFlushableSize();
293        assertEquals(0, mss.getDataSize());
294        LOG.info("Adding some data");
295        MemStoreSizing kvSize = new NonThreadSafeMemStoreSizing();
296        store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), kvSize);
297        // add the heap size of active (mutable) segment
298        kvSize.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0, 0);
299        mss = store.memstore.getFlushableSize();
300        assertEquals(kvSize.getMemStoreSize(), mss);
301        // Flush. Bug #1 from HBASE-10466. Make sure size calculation on failed flush is right.
302        try {
303          LOG.info("Flushing");
304          flushStore(store, id++);
305          fail("Didn't bubble up IOE!");
306        } catch (IOException ioe) {
307          assertTrue(ioe.getMessage().contains("Fault injected"));
308        }
309        // due to snapshot, change mutable to immutable segment
310        kvSize.incMemStoreSize(0,
311          CSLMImmutableSegment.DEEP_OVERHEAD_CSLM - MutableSegment.DEEP_OVERHEAD, 0, 0);
312        mss = store.memstore.getFlushableSize();
313        assertEquals(kvSize.getMemStoreSize(), mss);
314        MemStoreSizing kvSize2 = new NonThreadSafeMemStoreSizing();
315        store.add(new KeyValue(row, family, qf2, 2, (byte[]) null), kvSize2);
316        kvSize2.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0, 0);
317        // Even though we add a new kv, we expect the flushable size to be 'same' since we have
318        // not yet cleared the snapshot -- the above flush failed.
319        assertEquals(kvSize.getMemStoreSize(), mss);
320        ffs.fault.set(false);
321        flushStore(store, id++);
322        mss = store.memstore.getFlushableSize();
323        // Size should be the foreground kv size.
324        assertEquals(kvSize2.getMemStoreSize(), mss);
325        flushStore(store, id++);
326        mss = store.memstore.getFlushableSize();
327        assertEquals(0, mss.getDataSize());
328        assertEquals(MutableSegment.DEEP_OVERHEAD, mss.getHeapSize());
329        return null;
330      }
331    });
332  }
333
334  @Test
335  public void testStoreBloomFilterMetricsWithBloomRowCol() throws IOException {
336    int numStoreFiles = 5;
337    writeAndRead(BloomType.ROWCOL, numStoreFiles);
338
339    assertEquals(0, store.getBloomFilterEligibleRequestsCount());
340    // hard to know exactly the numbers here, we are just trying to
341    // prove that they are incrementing
342    assertTrue(store.getBloomFilterRequestsCount() >= numStoreFiles);
343    assertTrue(store.getBloomFilterNegativeResultsCount() > 0);
344  }
345
346  @Test
347  public void testStoreBloomFilterMetricsWithBloomRow() throws IOException {
348    int numStoreFiles = 5;
349    writeAndRead(BloomType.ROWCOL, numStoreFiles);
350
351    assertEquals(0, store.getBloomFilterEligibleRequestsCount());
352    // hard to know exactly the numbers here, we are just trying to
353    // prove that they are incrementing
354    assertTrue(store.getBloomFilterRequestsCount() >= numStoreFiles);
355    assertTrue(store.getBloomFilterNegativeResultsCount() > 0);
356  }
357
358  @Test
359  public void testStoreBloomFilterMetricsWithBloomRowPrefix() throws IOException {
360    int numStoreFiles = 5;
361    writeAndRead(BloomType.ROWPREFIX_FIXED_LENGTH, numStoreFiles);
362
363    assertEquals(0, store.getBloomFilterEligibleRequestsCount());
364    // hard to know exactly the numbers here, we are just trying to
365    // prove that they are incrementing
366    assertTrue(store.getBloomFilterRequestsCount() >= numStoreFiles);
367  }
368
369  @Test
370  public void testStoreBloomFilterMetricsWithBloomNone() throws IOException {
371    int numStoreFiles = 5;
372    writeAndRead(BloomType.NONE, numStoreFiles);
373
374    assertEquals(0, store.getBloomFilterRequestsCount());
375    assertEquals(0, store.getBloomFilterNegativeResultsCount());
376
377    // hard to know exactly the numbers here, we are just trying to
378    // prove that they are incrementing
379    assertTrue(store.getBloomFilterEligibleRequestsCount() >= numStoreFiles);
380  }
381
382  private void writeAndRead(BloomType bloomType, int numStoreFiles) throws IOException {
383    Configuration conf = HBaseConfiguration.create();
384    FileSystem fs = FileSystem.get(conf);
385
386    ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.newBuilder(family)
387      .setCompressionType(Compression.Algorithm.GZ).setBloomFilterType(bloomType)
388      .setConfiguration(BloomFilterUtil.PREFIX_LENGTH_KEY, "3").build();
389    init(name.getMethodName(), conf, hcd);
390
391    for (int i = 1; i <= numStoreFiles; i++) {
392      byte[] row = Bytes.toBytes("row" + i);
393      LOG.info("Adding some data for the store file #" + i);
394      long timeStamp = EnvironmentEdgeManager.currentTime();
395      this.store.add(new KeyValue(row, family, qf1, timeStamp, (byte[]) null), null);
396      this.store.add(new KeyValue(row, family, qf2, timeStamp, (byte[]) null), null);
397      this.store.add(new KeyValue(row, family, qf3, timeStamp, (byte[]) null), null);
398      flush(i);
399    }
400
401    // Verify the total number of store files
402    assertEquals(numStoreFiles, this.store.getStorefiles().size());
403
404    TreeSet<byte[]> columns = new TreeSet<>(Bytes.BYTES_COMPARATOR);
405    columns.add(qf1);
406
407    for (int i = 1; i <= numStoreFiles; i++) {
408      KeyValueScanner scanner =
409        store.getScanner(new Scan(new Get(Bytes.toBytes("row" + i))), columns, 0);
410      scanner.peek();
411    }
412  }
413
414  /**
415   * Verify that compression and data block encoding are respected by the createWriter method, used
416   * on store flush.
417   */
418  @Test
419  public void testCreateWriter() throws Exception {
420    Configuration conf = HBaseConfiguration.create();
421    FileSystem fs = FileSystem.get(conf);
422
423    ColumnFamilyDescriptor hcd =
424      ColumnFamilyDescriptorBuilder.newBuilder(family).setCompressionType(Compression.Algorithm.GZ)
425        .setDataBlockEncoding(DataBlockEncoding.DIFF).build();
426    init(name.getMethodName(), conf, hcd);
427
428    // Test createWriter
429    StoreFileWriter writer = store.getStoreEngine()
430      .createWriter(CreateStoreFileWriterParams.create().maxKeyCount(4)
431        .compression(hcd.getCompressionType()).isCompaction(false).includeMVCCReadpoint(true)
432        .includesTag(false).shouldDropBehind(false));
433    Path path = writer.getPath();
434    writer.append(new KeyValue(row, family, qf1, Bytes.toBytes(1)));
435    writer.append(new KeyValue(row, family, qf2, Bytes.toBytes(2)));
436    writer.append(new KeyValue(row2, family, qf1, Bytes.toBytes(3)));
437    writer.append(new KeyValue(row2, family, qf2, Bytes.toBytes(4)));
438    writer.close();
439
440    // Verify that compression and encoding settings are respected
441    HFile.Reader reader = HFile.createReader(fs, path, new CacheConfig(conf), true, conf);
442    assertEquals(hcd.getCompressionType(), reader.getTrailer().getCompressionCodec());
443    assertEquals(hcd.getDataBlockEncoding(), reader.getDataBlockEncoding());
444    reader.close();
445  }
446
447  @Test
448  public void testDeleteExpiredStoreFiles() throws Exception {
449    testDeleteExpiredStoreFiles(0);
450    testDeleteExpiredStoreFiles(1);
451  }
452
453  /**
454   * @param minVersions the MIN_VERSIONS for the column family
455   */
456  public void testDeleteExpiredStoreFiles(int minVersions) throws Exception {
457    int storeFileNum = 4;
458    int ttl = 4;
459    IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
460    EnvironmentEdgeManagerTestHelper.injectEdge(edge);
461
462    Configuration conf = HBaseConfiguration.create();
463    // Enable the expired store file deletion
464    conf.setBoolean("hbase.store.delete.expired.storefile", true);
465    // Set the compaction threshold higher to avoid normal compactions.
466    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 5);
467
468    init(name.getMethodName() + "-" + minVersions, conf, ColumnFamilyDescriptorBuilder
469      .newBuilder(family).setMinVersions(minVersions).setTimeToLive(ttl).build());
470
471    long storeTtl = this.store.getScanInfo().getTtl();
472    long sleepTime = storeTtl / storeFileNum;
473    long timeStamp;
474    // There are 4 store files and the max time stamp difference among these
475    // store files will be (this.store.ttl / storeFileNum)
476    for (int i = 1; i <= storeFileNum; i++) {
477      LOG.info("Adding some data for the store file #" + i);
478      timeStamp = EnvironmentEdgeManager.currentTime();
479      this.store.add(new KeyValue(row, family, qf1, timeStamp, (byte[]) null), null);
480      this.store.add(new KeyValue(row, family, qf2, timeStamp, (byte[]) null), null);
481      this.store.add(new KeyValue(row, family, qf3, timeStamp, (byte[]) null), null);
482      flush(i);
483      edge.incrementTime(sleepTime);
484    }
485
486    // Verify the total number of store files
487    assertEquals(storeFileNum, this.store.getStorefiles().size());
488
489    // Each call will find one expired store file and delete it before compaction happens.
490    // There will be no compaction due to threshold above. Last file will not be replaced.
491    for (int i = 1; i <= storeFileNum - 1; i++) {
492      // verify the expired store file.
493      assertFalse(this.store.requestCompaction().isPresent());
494      Collection<HStoreFile> sfs = this.store.getStorefiles();
495      // Ensure i files are gone.
496      if (minVersions == 0) {
497        assertEquals(storeFileNum - i, sfs.size());
498        // Ensure only non-expired files remain.
499        for (HStoreFile sf : sfs) {
500          assertTrue(sf.getReader().getMaxTimestamp() >= (edge.currentTime() - storeTtl));
501        }
502      } else {
503        assertEquals(storeFileNum, sfs.size());
504      }
505      // Let the next store file expired.
506      edge.incrementTime(sleepTime);
507    }
508    assertFalse(this.store.requestCompaction().isPresent());
509
510    Collection<HStoreFile> sfs = this.store.getStorefiles();
511    // Assert the last expired file is not removed.
512    if (minVersions == 0) {
513      assertEquals(1, sfs.size());
514    }
515    long ts = sfs.iterator().next().getReader().getMaxTimestamp();
516    assertTrue(ts < (edge.currentTime() - storeTtl));
517
518    for (HStoreFile sf : sfs) {
519      sf.closeStoreFile(true);
520    }
521  }
522
523  @Test
524  public void testLowestModificationTime() throws Exception {
525    Configuration conf = HBaseConfiguration.create();
526    FileSystem fs = FileSystem.get(conf);
527    // Initialize region
528    init(name.getMethodName(), conf);
529
530    int storeFileNum = 4;
531    for (int i = 1; i <= storeFileNum; i++) {
532      LOG.info("Adding some data for the store file #" + i);
533      this.store.add(new KeyValue(row, family, qf1, i, (byte[]) null), null);
534      this.store.add(new KeyValue(row, family, qf2, i, (byte[]) null), null);
535      this.store.add(new KeyValue(row, family, qf3, i, (byte[]) null), null);
536      flush(i);
537    }
538    // after flush; check the lowest time stamp
539    long lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
540    long lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
541    assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS);
542
543    // after compact; check the lowest time stamp
544    store.compact(store.requestCompaction().get(), NoLimitThroughputController.INSTANCE, null);
545    lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
546    lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
547    assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS);
548  }
549
550  private static long getLowestTimeStampFromFS(FileSystem fs,
551    final Collection<HStoreFile> candidates) throws IOException {
552    long minTs = Long.MAX_VALUE;
553    if (candidates.isEmpty()) {
554      return minTs;
555    }
556    Path[] p = new Path[candidates.size()];
557    int i = 0;
558    for (HStoreFile sf : candidates) {
559      p[i] = sf.getPath();
560      ++i;
561    }
562
563    FileStatus[] stats = fs.listStatus(p);
564    if (stats == null || stats.length == 0) {
565      return minTs;
566    }
567    for (FileStatus s : stats) {
568      minTs = Math.min(minTs, s.getModificationTime());
569    }
570    return minTs;
571  }
572
573  //////////////////////////////////////////////////////////////////////////////
574  // Get tests
575  //////////////////////////////////////////////////////////////////////////////
576
577  private static final int BLOCKSIZE_SMALL = 8192;
578
579  /**
580   * Test for hbase-1686.
581   */
582  @Test
583  public void testEmptyStoreFile() throws IOException {
584    init(this.name.getMethodName());
585    // Write a store file.
586    this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
587    this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null);
588    flush(1);
589    // Now put in place an empty store file. Its a little tricky. Have to
590    // do manually with hacked in sequence id.
591    HStoreFile f = this.store.getStorefiles().iterator().next();
592    Path storedir = f.getPath().getParent();
593    long seqid = f.getMaxSequenceId();
594    Configuration c = HBaseConfiguration.create();
595    FileSystem fs = FileSystem.get(c);
596    HFileContext meta = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build();
597    StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c), fs)
598      .withOutputDir(storedir).withFileContext(meta).build();
599    w.appendMetadata(seqid + 1, false);
600    w.close();
601    this.store.close();
602    // Reopen it... should pick up two files
603    this.store =
604      new HStore(this.store.getHRegion(), this.store.getColumnFamilyDescriptor(), c, false);
605    assertEquals(2, this.store.getStorefilesCount());
606
607    result = HBaseTestingUtil.getFromStoreFile(store, get.getRow(), qualifiers);
608    assertEquals(1, result.size());
609  }
610
611  /**
612   * Getting data from memstore only
613   */
614  @Test
615  public void testGet_FromMemStoreOnly() throws IOException {
616    init(this.name.getMethodName());
617
618    // Put data in memstore
619    this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
620    this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null);
621    this.store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null);
622    this.store.add(new KeyValue(row, family, qf4, 1, (byte[]) null), null);
623    this.store.add(new KeyValue(row, family, qf5, 1, (byte[]) null), null);
624    this.store.add(new KeyValue(row, family, qf6, 1, (byte[]) null), null);
625
626    // Get
627    result = HBaseTestingUtil.getFromStoreFile(store, get.getRow(), qualifiers);
628
629    // Compare
630    assertCheck();
631  }
632
633  @Test
634  public void testTimeRangeIfSomeCellsAreDroppedInFlush() throws IOException {
635    testTimeRangeIfSomeCellsAreDroppedInFlush(1);
636    testTimeRangeIfSomeCellsAreDroppedInFlush(3);
637    testTimeRangeIfSomeCellsAreDroppedInFlush(5);
638  }
639
640  private void testTimeRangeIfSomeCellsAreDroppedInFlush(int maxVersion) throws IOException {
641    init(this.name.getMethodName(), TEST_UTIL.getConfiguration(),
642      ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(maxVersion).build());
643    long currentTs = 100;
644    long minTs = currentTs;
645    // the extra cell won't be flushed to disk,
646    // so the min of timerange will be different between memStore and hfile.
647    for (int i = 0; i != (maxVersion + 1); ++i) {
648      this.store.add(new KeyValue(row, family, qf1, ++currentTs, (byte[]) null), null);
649      if (i == 1) {
650        minTs = currentTs;
651      }
652    }
653    flushStore(store, id++);
654
655    Collection<HStoreFile> files = store.getStorefiles();
656    assertEquals(1, files.size());
657    HStoreFile f = files.iterator().next();
658    f.initReader();
659    StoreFileReader reader = f.getReader();
660    assertEquals(minTs, reader.timeRange.getMin());
661    assertEquals(currentTs, reader.timeRange.getMax());
662  }
663
664  /**
665   * Getting data from files only
666   */
667  @Test
668  public void testGet_FromFilesOnly() throws IOException {
669    init(this.name.getMethodName());
670
671    // Put data in memstore
672    this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
673    this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null);
674    // flush
675    flush(1);
676
677    // Add more data
678    this.store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null);
679    this.store.add(new KeyValue(row, family, qf4, 1, (byte[]) null), null);
680    // flush
681    flush(2);
682
683    // Add more data
684    this.store.add(new KeyValue(row, family, qf5, 1, (byte[]) null), null);
685    this.store.add(new KeyValue(row, family, qf6, 1, (byte[]) null), null);
686    // flush
687    flush(3);
688
689    // Get
690    result = HBaseTestingUtil.getFromStoreFile(store, get.getRow(), qualifiers);
691    // this.store.get(get, qualifiers, result);
692
693    // Need to sort the result since multiple files
694    Collections.sort(result, CellComparatorImpl.COMPARATOR);
695
696    // Compare
697    assertCheck();
698  }
699
700  /**
701   * Getting data from memstore and files
702   */
703  @Test
704  public void testGet_FromMemStoreAndFiles() throws IOException {
705    init(this.name.getMethodName());
706
707    // Put data in memstore
708    this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
709    this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null);
710    // flush
711    flush(1);
712
713    // Add more data
714    this.store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null);
715    this.store.add(new KeyValue(row, family, qf4, 1, (byte[]) null), null);
716    // flush
717    flush(2);
718
719    // Add more data
720    this.store.add(new KeyValue(row, family, qf5, 1, (byte[]) null), null);
721    this.store.add(new KeyValue(row, family, qf6, 1, (byte[]) null), null);
722
723    // Get
724    result = HBaseTestingUtil.getFromStoreFile(store, get.getRow(), qualifiers);
725
726    // Need to sort the result since multiple files
727    Collections.sort(result, CellComparatorImpl.COMPARATOR);
728
729    // Compare
730    assertCheck();
731  }
732
733  private void flush(int storeFilessize) throws IOException {
734    flushStore(store, id++);
735    assertEquals(storeFilessize, this.store.getStorefiles().size());
736    assertEquals(0, ((AbstractMemStore) this.store.memstore).getActive().getCellsCount());
737  }
738
739  private void assertCheck() {
740    assertEquals(expected.size(), result.size());
741    for (int i = 0; i < expected.size(); i++) {
742      assertEquals(expected.get(i), result.get(i));
743    }
744  }
745
746  @After
747  public void tearDown() throws Exception {
748    EnvironmentEdgeManagerTestHelper.reset();
749    if (store != null) {
750      try {
751        store.close();
752      } catch (IOException e) {
753      }
754      store = null;
755    }
756    if (region != null) {
757      region.close();
758      region = null;
759    }
760  }
761
762  @AfterClass
763  public static void tearDownAfterClass() throws IOException {
764    TEST_UTIL.cleanupTestDir();
765  }
766
767  @Test
768  public void testHandleErrorsInFlush() throws Exception {
769    LOG.info("Setting up a faulty file system that cannot write");
770
771    final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
772    User user = User.createUserForTesting(conf, "testhandleerrorsinflush", new String[] { "foo" });
773    // Inject our faulty LocalFileSystem
774    conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
775    user.runAs(new PrivilegedExceptionAction<Object>() {
776      @Override
777      public Object run() throws Exception {
778        // Make sure it worked (above is sensitive to caching details in hadoop core)
779        FileSystem fs = FileSystem.get(conf);
780        assertEquals(FaultyFileSystem.class, fs.getClass());
781
782        // Initialize region
783        init(name.getMethodName(), conf);
784
785        LOG.info("Adding some data");
786        store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
787        store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null);
788        store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null);
789
790        LOG.info("Before flush, we should have no files");
791
792        StoreFileTracker sft = StoreFileTrackerFactory.create(conf, false, store.getStoreContext());
793        Collection<StoreFileInfo> files = sft.load();
794        assertEquals(0, files != null ? files.size() : 0);
795
796        // flush
797        try {
798          LOG.info("Flushing");
799          flush(1);
800          fail("Didn't bubble up IOE!");
801        } catch (IOException ioe) {
802          assertTrue(ioe.getMessage().contains("Fault injected"));
803        }
804
805        LOG.info("After failed flush, we should still have no files!");
806        files = sft.load();
807        assertEquals(0, files != null ? files.size() : 0);
808        store.getHRegion().getWAL().close();
809        return null;
810      }
811    });
812    FileSystem.closeAllForUGI(user.getUGI());
813  }
814
815  /**
816   * Faulty file system that will fail if you write past its fault position the FIRST TIME only;
817   * thereafter it will succeed. Used by {@link TestHRegion} too.
818   */
819  static class FaultyFileSystem extends FilterFileSystem {
820    List<SoftReference<FaultyOutputStream>> outStreams = new ArrayList<>();
821    private long faultPos = 200;
822    AtomicBoolean fault = new AtomicBoolean(true);
823
824    public FaultyFileSystem() {
825      super(new LocalFileSystem());
826      LOG.info("Creating faulty!");
827    }
828
829    @Override
830    public FSDataOutputStream create(Path p) throws IOException {
831      return new FaultyOutputStream(super.create(p), faultPos, fault);
832    }
833
834    @Override
835    public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite,
836      int bufferSize, short replication, long blockSize, Progressable progress) throws IOException {
837      return new FaultyOutputStream(
838        super.create(f, permission, overwrite, bufferSize, replication, blockSize, progress),
839        faultPos, fault);
840    }
841
842    @Override
843    public FSDataOutputStream createNonRecursive(Path f, boolean overwrite, int bufferSize,
844      short replication, long blockSize, Progressable progress) throws IOException {
845      // Fake it. Call create instead. The default implementation throws an IOE
846      // that this is not supported.
847      return create(f, overwrite, bufferSize, replication, blockSize, progress);
848    }
849  }
850
851  static class FaultyOutputStream extends FSDataOutputStream {
852    volatile long faultPos = Long.MAX_VALUE;
853    private final AtomicBoolean fault;
854
855    public FaultyOutputStream(FSDataOutputStream out, long faultPos, final AtomicBoolean fault)
856      throws IOException {
857      super(out, null);
858      this.faultPos = faultPos;
859      this.fault = fault;
860    }
861
862    @Override
863    public synchronized void write(byte[] buf, int offset, int length) throws IOException {
864      LOG.info("faulty stream write at pos " + getPos());
865      injectFault();
866      super.write(buf, offset, length);
867    }
868
869    private void injectFault() throws IOException {
870      if (this.fault.get() && getPos() >= faultPos) {
871        throw new IOException("Fault injected");
872      }
873    }
874  }
875
876  private static StoreFlushContext flushStore(HStore store, long id) throws IOException {
877    StoreFlushContext storeFlushCtx = store.createFlushContext(id, FlushLifeCycleTracker.DUMMY);
878    storeFlushCtx.prepare();
879    storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
880    storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
881    return storeFlushCtx;
882  }
883
884  /**
885   * Generate a list of KeyValues for testing based on given parameters
886   * @return the rows key-value list
887   */
888  private List<ExtendedCell> getKeyValueSet(long[] timestamps, int numRows, byte[] qualifier,
889    byte[] family) {
890    List<ExtendedCell> kvList = new ArrayList<>();
891    for (int i = 1; i <= numRows; i++) {
892      byte[] b = Bytes.toBytes(i);
893      for (long timestamp : timestamps) {
894        kvList.add(new KeyValue(b, family, qualifier, timestamp, b));
895      }
896    }
897    return kvList;
898  }
899
900  /**
901   * Test to ensure correctness when using Stores with multiple timestamps
902   */
903  @Test
904  public void testMultipleTimestamps() throws IOException {
905    int numRows = 1;
906    long[] timestamps1 = new long[] { 1, 5, 10, 20 };
907    long[] timestamps2 = new long[] { 30, 80 };
908
909    init(this.name.getMethodName());
910
911    List<ExtendedCell> kvList1 = getKeyValueSet(timestamps1, numRows, qf1, family);
912    for (ExtendedCell kv : kvList1) {
913      this.store.add(kv, null);
914    }
915
916    flushStore(store, id++);
917
918    List<ExtendedCell> kvList2 = getKeyValueSet(timestamps2, numRows, qf1, family);
919    for (ExtendedCell kv : kvList2) {
920      this.store.add(kv, null);
921    }
922
923    List<Cell> result;
924    Get get = new Get(Bytes.toBytes(1));
925    get.addColumn(family, qf1);
926
927    get.setTimeRange(0, 15);
928    result = HBaseTestingUtil.getFromStoreFile(store, get);
929    assertTrue(result.size() > 0);
930
931    get.setTimeRange(40, 90);
932    result = HBaseTestingUtil.getFromStoreFile(store, get);
933    assertTrue(result.size() > 0);
934
935    get.setTimeRange(10, 45);
936    result = HBaseTestingUtil.getFromStoreFile(store, get);
937    assertTrue(result.size() > 0);
938
939    get.setTimeRange(80, 145);
940    result = HBaseTestingUtil.getFromStoreFile(store, get);
941    assertTrue(result.size() > 0);
942
943    get.setTimeRange(1, 2);
944    result = HBaseTestingUtil.getFromStoreFile(store, get);
945    assertTrue(result.size() > 0);
946
947    get.setTimeRange(90, 200);
948    result = HBaseTestingUtil.getFromStoreFile(store, get);
949    assertTrue(result.size() == 0);
950  }
951
952  /**
953   * Test for HBASE-3492 - Test split on empty colfam (no store files).
954   * @throws IOException When the IO operations fail.
955   */
956  @Test
957  public void testSplitWithEmptyColFam() throws IOException {
958    init(this.name.getMethodName());
959    assertFalse(store.getSplitPoint().isPresent());
960  }
961
962  @Test
963  public void testStoreUsesConfigurationFromHcdAndHtd() throws Exception {
964    final String CONFIG_KEY = "hbase.regionserver.thread.compaction.throttle";
965    long anyValue = 10;
966
967    // We'll check that it uses correct config and propagates it appropriately by going thru
968    // the simplest "real" path I can find - "throttleCompaction", which just checks whether
969    // a number we pass in is higher than some config value, inside compactionPolicy.
970    Configuration conf = HBaseConfiguration.create();
971    conf.setLong(CONFIG_KEY, anyValue);
972    init(name.getMethodName() + "-xml", conf);
973    assertTrue(store.throttleCompaction(anyValue + 1));
974    assertFalse(store.throttleCompaction(anyValue));
975
976    // HTD overrides XML.
977    --anyValue;
978    init(
979      name.getMethodName() + "-htd", conf, TableDescriptorBuilder
980        .newBuilder(TableName.valueOf(table)).setValue(CONFIG_KEY, Long.toString(anyValue)),
981      ColumnFamilyDescriptorBuilder.of(family));
982    assertTrue(store.throttleCompaction(anyValue + 1));
983    assertFalse(store.throttleCompaction(anyValue));
984
985    // HCD overrides them both.
986    --anyValue;
987    init(name.getMethodName() + "-hcd", conf,
988      TableDescriptorBuilder.newBuilder(TableName.valueOf(table)).setValue(CONFIG_KEY,
989        Long.toString(anyValue)),
990      ColumnFamilyDescriptorBuilder.newBuilder(family).setValue(CONFIG_KEY, Long.toString(anyValue))
991        .build());
992    assertTrue(store.throttleCompaction(anyValue + 1));
993    assertFalse(store.throttleCompaction(anyValue));
994  }
995
996  public static class DummyStoreEngine extends DefaultStoreEngine {
997    public static DefaultCompactor lastCreatedCompactor = null;
998
999    @Override
1000    protected void createComponents(Configuration conf, HStore store, CellComparator comparator)
1001      throws IOException {
1002      super.createComponents(conf, store, comparator);
1003      lastCreatedCompactor = this.compactor;
1004    }
1005  }
1006
1007  @Test
1008  public void testStoreUsesSearchEngineOverride() throws Exception {
1009    Configuration conf = HBaseConfiguration.create();
1010    conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DummyStoreEngine.class.getName());
1011    init(this.name.getMethodName(), conf);
1012    assertEquals(DummyStoreEngine.lastCreatedCompactor, this.store.storeEngine.getCompactor());
1013  }
1014
1015  private void addStoreFile() throws IOException {
1016    HStoreFile f = this.store.getStorefiles().iterator().next();
1017    Path storedir = f.getPath().getParent();
1018    long seqid = this.store.getMaxSequenceId().orElse(0L);
1019    Configuration c = TEST_UTIL.getConfiguration();
1020    FileSystem fs = FileSystem.get(c);
1021    HFileContext fileContext = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build();
1022    StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c), fs)
1023      .withOutputDir(storedir).withFileContext(fileContext).build();
1024    w.appendMetadata(seqid + 1, false);
1025    w.close();
1026    LOG.info("Added store file:" + w.getPath());
1027  }
1028
1029  private void archiveStoreFile(int index) throws IOException {
1030    Collection<HStoreFile> files = this.store.getStorefiles();
1031    HStoreFile sf = null;
1032    Iterator<HStoreFile> it = files.iterator();
1033    for (int i = 0; i <= index; i++) {
1034      sf = it.next();
1035    }
1036    StoreFileTracker sft =
1037      StoreFileTrackerFactory.create(store.getRegionFileSystem().getFileSystem().getConf(),
1038        store.isPrimaryReplicaStore(), store.getStoreContext());
1039    sft.removeStoreFiles(Lists.newArrayList(sf));
1040  }
1041
1042  private void closeCompactedFile(int index) throws IOException {
1043    Collection<HStoreFile> files =
1044      this.store.getStoreEngine().getStoreFileManager().getCompactedfiles();
1045    if (files.size() > 0) {
1046      HStoreFile sf = null;
1047      Iterator<HStoreFile> it = files.iterator();
1048      for (int i = 0; i <= index; i++) {
1049        sf = it.next();
1050      }
1051      sf.closeStoreFile(true);
1052      store.getStoreEngine().getStoreFileManager()
1053        .removeCompactedFiles(Collections.singletonList(sf));
1054    }
1055  }
1056
1057  @Test
1058  public void testRefreshStoreFiles() throws Exception {
1059    init(name.getMethodName());
1060
1061    assertEquals(0, this.store.getStorefilesCount());
1062
1063    // Test refreshing store files when no store files are there
1064    store.refreshStoreFiles();
1065    assertEquals(0, this.store.getStorefilesCount());
1066
1067    // add some data, flush
1068    this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
1069    flush(1);
1070    assertEquals(1, this.store.getStorefilesCount());
1071
1072    // add one more file
1073    addStoreFile();
1074
1075    assertEquals(1, this.store.getStorefilesCount());
1076    store.refreshStoreFiles();
1077    assertEquals(2, this.store.getStorefilesCount());
1078
1079    // add three more files
1080    addStoreFile();
1081    addStoreFile();
1082    addStoreFile();
1083
1084    assertEquals(2, this.store.getStorefilesCount());
1085    store.refreshStoreFiles();
1086    assertEquals(5, this.store.getStorefilesCount());
1087
1088    closeCompactedFile(0);
1089    archiveStoreFile(0);
1090
1091    assertEquals(5, this.store.getStorefilesCount());
1092    store.refreshStoreFiles();
1093    assertEquals(4, this.store.getStorefilesCount());
1094
1095    archiveStoreFile(0);
1096    archiveStoreFile(1);
1097    archiveStoreFile(2);
1098
1099    assertEquals(4, this.store.getStorefilesCount());
1100    store.refreshStoreFiles();
1101    assertEquals(1, this.store.getStorefilesCount());
1102
1103    archiveStoreFile(0);
1104    store.refreshStoreFiles();
1105    assertEquals(0, this.store.getStorefilesCount());
1106  }
1107
1108  @Test
1109  public void testRefreshStoreFilesNotChanged() throws IOException {
1110    init(name.getMethodName());
1111
1112    assertEquals(0, this.store.getStorefilesCount());
1113
1114    // add some data, flush
1115    this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
1116    flush(1);
1117    // add one more file
1118    addStoreFile();
1119
1120    StoreEngine<?, ?, ?, ?> spiedStoreEngine = spy(store.getStoreEngine());
1121
1122    // call first time after files changed
1123    spiedStoreEngine.refreshStoreFiles();
1124    assertEquals(2, this.store.getStorefilesCount());
1125    verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any(), any(), any());
1126
1127    // call second time
1128    spiedStoreEngine.refreshStoreFiles();
1129
1130    // ensure that replaceStoreFiles is not called, i.e, the times does not change, if files are not
1131    // refreshed,
1132    verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any(), any(), any());
1133  }
1134
1135  @Test
1136  public void testScanWithCompactionAfterFlush() throws Exception {
1137    TEST_UTIL.getConfiguration().set(DEFAULT_COMPACTION_POLICY_CLASS_KEY,
1138      EverythingPolicy.class.getName());
1139    init(name.getMethodName());
1140
1141    assertEquals(0, this.store.getStorefilesCount());
1142
1143    KeyValue kv = new KeyValue(row, family, qf1, 1, (byte[]) null);
1144    // add some data, flush
1145    this.store.add(kv, null);
1146    flush(1);
1147    kv = new KeyValue(row, family, qf2, 1, (byte[]) null);
1148    // add some data, flush
1149    this.store.add(kv, null);
1150    flush(2);
1151    kv = new KeyValue(row, family, qf3, 1, (byte[]) null);
1152    // add some data, flush
1153    this.store.add(kv, null);
1154    flush(3);
1155
1156    ExecutorService service = Executors.newFixedThreadPool(2);
1157
1158    Scan scan = new Scan(new Get(row));
1159    Future<KeyValueScanner> scanFuture = service.submit(() -> {
1160      try {
1161        LOG.info(">>>> creating scanner");
1162        return this.store.createScanner(scan,
1163          new ScanInfo(HBaseConfiguration.create(),
1164            ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(4).build(),
1165            Long.MAX_VALUE, 0, CellComparator.getInstance()),
1166          scan.getFamilyMap().get(store.getColumnFamilyDescriptor().getName()), 0);
1167      } catch (IOException e) {
1168        e.printStackTrace();
1169        return null;
1170      }
1171    });
1172    Future compactFuture = service.submit(() -> {
1173      try {
1174        LOG.info(">>>>>> starting compaction");
1175        Optional<CompactionContext> opCompaction = this.store.requestCompaction();
1176        assertTrue(opCompaction.isPresent());
1177        store.compact(opCompaction.get(), new NoLimitThroughputController(), User.getCurrent());
1178        LOG.info(">>>>>> Compaction is finished");
1179        this.store.closeAndArchiveCompactedFiles();
1180        LOG.info(">>>>>> Compacted files deleted");
1181      } catch (IOException e) {
1182        e.printStackTrace();
1183      }
1184    });
1185
1186    KeyValueScanner kvs = scanFuture.get();
1187    compactFuture.get();
1188    ((StoreScanner) kvs).currentScanners.forEach(s -> {
1189      if (s instanceof StoreFileScanner) {
1190        assertEquals(1, ((StoreFileScanner) s).getReader().getRefCount());
1191      }
1192    });
1193    kvs.seek(kv);
1194    service.shutdownNow();
1195  }
1196
1197  private long countMemStoreScanner(StoreScanner scanner) {
1198    if (scanner.currentScanners == null) {
1199      return 0;
1200    }
1201    return scanner.currentScanners.stream().filter(s -> !s.isFileScanner()).count();
1202  }
1203
1204  @Test
1205  public void testNumberOfMemStoreScannersAfterFlush() throws IOException {
1206    long seqId = 100;
1207    long timestamp = EnvironmentEdgeManager.currentTime();
1208    ExtendedCell cell0 =
1209      ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family)
1210        .setQualifier(qf1).setTimestamp(timestamp).setType(Cell.Type.Put).setValue(qf1).build();
1211    PrivateCellUtil.setSequenceId(cell0, seqId);
1212    testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Collections.emptyList());
1213
1214    ExtendedCell cell1 =
1215      ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family)
1216        .setQualifier(qf2).setTimestamp(timestamp).setType(Cell.Type.Put).setValue(qf1).build();
1217    PrivateCellUtil.setSequenceId(cell1, seqId);
1218    testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1));
1219
1220    seqId = 101;
1221    timestamp = EnvironmentEdgeManager.currentTime();
1222    ExtendedCell cell2 =
1223      ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row2).setFamily(family)
1224        .setQualifier(qf2).setTimestamp(timestamp).setType(Cell.Type.Put).setValue(qf1).build();
1225    PrivateCellUtil.setSequenceId(cell2, seqId);
1226    testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1, cell2));
1227  }
1228
1229  private void testNumberOfMemStoreScannersAfterFlush(List<ExtendedCell> inputCellsBeforeSnapshot,
1230    List<ExtendedCell> inputCellsAfterSnapshot) throws IOException {
1231    init(this.name.getMethodName() + "-" + inputCellsBeforeSnapshot.size());
1232    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1233    long seqId = Long.MIN_VALUE;
1234    for (ExtendedCell c : inputCellsBeforeSnapshot) {
1235      quals.add(CellUtil.cloneQualifier(c));
1236      seqId = Math.max(seqId, c.getSequenceId());
1237    }
1238    for (ExtendedCell c : inputCellsAfterSnapshot) {
1239      quals.add(CellUtil.cloneQualifier(c));
1240      seqId = Math.max(seqId, c.getSequenceId());
1241    }
1242    inputCellsBeforeSnapshot.forEach(c -> store.add(c, null));
1243    StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY);
1244    storeFlushCtx.prepare();
1245    inputCellsAfterSnapshot.forEach(c -> store.add(c, null));
1246    int numberOfMemScannersBeforeFlush = inputCellsAfterSnapshot.isEmpty() ? 1 : 2;
1247    try (StoreScanner s = (StoreScanner) store.getScanner(new Scan(), quals, seqId)) {
1248      // snapshot + active (if inputCellsAfterSnapshot isn't empty)
1249      assertEquals(numberOfMemScannersBeforeFlush, countMemStoreScanner(s));
1250      storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1251      storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1252      // snapshot has no data after flush
1253      int numberOfMemScannersAfterFlush = inputCellsAfterSnapshot.isEmpty() ? 0 : 1;
1254      boolean more;
1255      int cellCount = 0;
1256      do {
1257        List<Cell> cells = new ArrayList<>();
1258        more = s.next(cells);
1259        cellCount += cells.size();
1260        assertEquals(more ? numberOfMemScannersAfterFlush : 0, countMemStoreScanner(s));
1261      } while (more);
1262      assertEquals(
1263        "The number of cells added before snapshot is " + inputCellsBeforeSnapshot.size()
1264          + ", The number of cells added after snapshot is " + inputCellsAfterSnapshot.size(),
1265        inputCellsBeforeSnapshot.size() + inputCellsAfterSnapshot.size(), cellCount);
1266      // the current scanners is cleared
1267      assertEquals(0, countMemStoreScanner(s));
1268    }
1269  }
1270
1271  private ExtendedCell createCell(byte[] qualifier, long ts, long sequenceId, byte[] value)
1272    throws IOException {
1273    return createCell(row, qualifier, ts, sequenceId, value);
1274  }
1275
1276  private ExtendedCell createCell(byte[] row, byte[] qualifier, long ts, long sequenceId,
1277    byte[] value) throws IOException {
1278    return ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row)
1279      .setFamily(family).setQualifier(qualifier).setTimestamp(ts).setType(Cell.Type.Put)
1280      .setValue(value).setSequenceId(sequenceId).build();
1281  }
1282
1283  private ExtendedCell createDeleteCell(byte[] row, byte[] qualifier, long ts, long sequenceId) {
1284    return ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row)
1285      .setFamily(family).setQualifier(qualifier).setTimestamp(ts).setType(Cell.Type.DeleteColumn)
1286      .setSequenceId(sequenceId).build();
1287  }
1288
1289  @Test
1290  public void testFlushBeforeCompletingScanWoFilter() throws IOException, InterruptedException {
1291    final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false);
1292    final int expectedSize = 3;
1293    testFlushBeforeCompletingScan(new MyListHook() {
1294      @Override
1295      public void hook(int currentSize) {
1296        if (currentSize == expectedSize - 1) {
1297          try {
1298            flushStore(store, id++);
1299            timeToGoNextRow.set(true);
1300          } catch (IOException e) {
1301            throw new RuntimeException(e);
1302          }
1303        }
1304      }
1305    }, new FilterBase() {
1306      @Override
1307      public Filter.ReturnCode filterCell(final Cell c) throws IOException {
1308        return ReturnCode.INCLUDE;
1309      }
1310    }, expectedSize);
1311  }
1312
1313  @Test
1314  public void testFlushBeforeCompletingScanWithFilter() throws IOException, InterruptedException {
1315    final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false);
1316    final int expectedSize = 2;
1317    testFlushBeforeCompletingScan(new MyListHook() {
1318      @Override
1319      public void hook(int currentSize) {
1320        if (currentSize == expectedSize - 1) {
1321          try {
1322            flushStore(store, id++);
1323            timeToGoNextRow.set(true);
1324          } catch (IOException e) {
1325            throw new RuntimeException(e);
1326          }
1327        }
1328      }
1329    }, new FilterBase() {
1330      @Override
1331      public Filter.ReturnCode filterCell(final Cell c) throws IOException {
1332        if (timeToGoNextRow.get()) {
1333          timeToGoNextRow.set(false);
1334          return ReturnCode.NEXT_ROW;
1335        } else {
1336          return ReturnCode.INCLUDE;
1337        }
1338      }
1339    }, expectedSize);
1340  }
1341
1342  @Test
1343  public void testFlushBeforeCompletingScanWithFilterHint()
1344    throws IOException, InterruptedException {
1345    final AtomicBoolean timeToGetHint = new AtomicBoolean(false);
1346    final int expectedSize = 2;
1347    testFlushBeforeCompletingScan(new MyListHook() {
1348      @Override
1349      public void hook(int currentSize) {
1350        if (currentSize == expectedSize - 1) {
1351          try {
1352            flushStore(store, id++);
1353            timeToGetHint.set(true);
1354          } catch (IOException e) {
1355            throw new RuntimeException(e);
1356          }
1357        }
1358      }
1359    }, new FilterBase() {
1360      @Override
1361      public Filter.ReturnCode filterCell(final Cell c) throws IOException {
1362        if (timeToGetHint.get()) {
1363          timeToGetHint.set(false);
1364          return Filter.ReturnCode.SEEK_NEXT_USING_HINT;
1365        } else {
1366          return Filter.ReturnCode.INCLUDE;
1367        }
1368      }
1369
1370      @Override
1371      public Cell getNextCellHint(Cell currentCell) throws IOException {
1372        return currentCell;
1373      }
1374    }, expectedSize);
1375  }
1376
1377  private void testFlushBeforeCompletingScan(MyListHook hook, Filter filter, int expectedSize)
1378    throws IOException, InterruptedException {
1379    Configuration conf = HBaseConfiguration.create();
1380    byte[] r0 = Bytes.toBytes("row0");
1381    byte[] r1 = Bytes.toBytes("row1");
1382    byte[] r2 = Bytes.toBytes("row2");
1383    byte[] value0 = Bytes.toBytes("value0");
1384    byte[] value1 = Bytes.toBytes("value1");
1385    byte[] value2 = Bytes.toBytes("value2");
1386    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1387    long ts = EnvironmentEdgeManager.currentTime();
1388    long seqId = 100;
1389    init(name.getMethodName(), conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)),
1390      ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(1).build(),
1391      new MyStoreHook() {
1392        @Override
1393        public long getSmallestReadPoint(HStore store) {
1394          return seqId + 3;
1395        }
1396      });
1397    // The cells having the value0 won't be flushed to disk because the value of max version is 1
1398    store.add(createCell(r0, qf1, ts, seqId, value0), memStoreSizing);
1399    store.add(createCell(r0, qf2, ts, seqId, value0), memStoreSizing);
1400    store.add(createCell(r0, qf3, ts, seqId, value0), memStoreSizing);
1401    store.add(createCell(r1, qf1, ts + 1, seqId + 1, value1), memStoreSizing);
1402    store.add(createCell(r1, qf2, ts + 1, seqId + 1, value1), memStoreSizing);
1403    store.add(createCell(r1, qf3, ts + 1, seqId + 1, value1), memStoreSizing);
1404    store.add(createCell(r2, qf1, ts + 2, seqId + 2, value2), memStoreSizing);
1405    store.add(createCell(r2, qf2, ts + 2, seqId + 2, value2), memStoreSizing);
1406    store.add(createCell(r2, qf3, ts + 2, seqId + 2, value2), memStoreSizing);
1407    store.add(createCell(r1, qf1, ts + 3, seqId + 3, value1), memStoreSizing);
1408    store.add(createCell(r1, qf2, ts + 3, seqId + 3, value1), memStoreSizing);
1409    store.add(createCell(r1, qf3, ts + 3, seqId + 3, value1), memStoreSizing);
1410    List<Cell> myList = new MyList<>(hook);
1411    Scan scan = new Scan().withStartRow(r1).setFilter(filter);
1412    try (InternalScanner scanner = (InternalScanner) store.getScanner(scan, null, seqId + 3)) {
1413      // r1
1414      scanner.next(myList);
1415      assertEquals(expectedSize, myList.size());
1416      for (Cell c : myList) {
1417        byte[] actualValue = CellUtil.cloneValue(c);
1418        assertTrue("expected:" + Bytes.toStringBinary(value1) + ", actual:"
1419          + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value1));
1420      }
1421      List<Cell> normalList = new ArrayList<>(3);
1422      // r2
1423      scanner.next(normalList);
1424      assertEquals(3, normalList.size());
1425      for (Cell c : normalList) {
1426        byte[] actualValue = CellUtil.cloneValue(c);
1427        assertTrue("expected:" + Bytes.toStringBinary(value2) + ", actual:"
1428          + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value2));
1429      }
1430    }
1431  }
1432
1433  @Test
1434  public void testFlushBeforeCompletingScanWithDeleteCell() throws IOException {
1435    final Configuration conf = HBaseConfiguration.create();
1436
1437    byte[] r1 = Bytes.toBytes("row1");
1438    byte[] r2 = Bytes.toBytes("row2");
1439
1440    byte[] value1 = Bytes.toBytes("value1");
1441    byte[] value2 = Bytes.toBytes("value2");
1442
1443    final MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1444    final long ts = EnvironmentEdgeManager.currentTime();
1445    final long seqId = 100;
1446
1447    init(name.getMethodName(), conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)),
1448      ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(1).build(),
1449      new MyStoreHook() {
1450        @Override
1451        long getSmallestReadPoint(HStore store) {
1452          return seqId + 3;
1453        }
1454      });
1455
1456    store.add(createCell(r1, qf1, ts + 1, seqId + 1, value2), memStoreSizing);
1457    store.add(createCell(r1, qf2, ts + 1, seqId + 1, value2), memStoreSizing);
1458    store.add(createCell(r1, qf3, ts + 1, seqId + 1, value2), memStoreSizing);
1459
1460    store.add(createDeleteCell(r1, qf1, ts + 2, seqId + 2), memStoreSizing);
1461    store.add(createDeleteCell(r1, qf2, ts + 2, seqId + 2), memStoreSizing);
1462    store.add(createDeleteCell(r1, qf3, ts + 2, seqId + 2), memStoreSizing);
1463
1464    store.add(createCell(r2, qf1, ts + 3, seqId + 3, value1), memStoreSizing);
1465    store.add(createCell(r2, qf2, ts + 3, seqId + 3, value1), memStoreSizing);
1466    store.add(createCell(r2, qf3, ts + 3, seqId + 3, value1), memStoreSizing);
1467
1468    Scan scan = new Scan().withStartRow(r1);
1469
1470    try (final InternalScanner scanner =
1471      new StoreScanner(store, store.getScanInfo(), scan, null, seqId + 3) {
1472        @Override
1473        protected KeyValueHeap newKVHeap(List<? extends KeyValueScanner> scanners,
1474          CellComparator comparator) throws IOException {
1475          return new MyKeyValueHeap(scanners, comparator, recordBlockSizeCallCount -> {
1476            if (recordBlockSizeCallCount == 6) {
1477              try {
1478                flushStore(store, id++);
1479              } catch (IOException e) {
1480                throw new RuntimeException(e);
1481              }
1482            }
1483          });
1484        }
1485      }) {
1486      List<Cell> cellResult = new ArrayList<>();
1487
1488      scanner.next(cellResult);
1489      assertEquals(0, cellResult.size());
1490
1491      cellResult.clear();
1492
1493      scanner.next(cellResult);
1494      assertEquals(3, cellResult.size());
1495      for (Cell cell : cellResult) {
1496        assertArrayEquals(r2, CellUtil.cloneRow(cell));
1497      }
1498    }
1499  }
1500
1501  @Test
1502  public void testCreateScannerAndSnapshotConcurrently() throws IOException, InterruptedException {
1503    Configuration conf = HBaseConfiguration.create();
1504    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore.class.getName());
1505    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
1506      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
1507    byte[] value = Bytes.toBytes("value");
1508    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1509    long ts = EnvironmentEdgeManager.currentTime();
1510    long seqId = 100;
1511    // older data whihc shouldn't be "seen" by client
1512    store.add(createCell(qf1, ts, seqId, value), memStoreSizing);
1513    store.add(createCell(qf2, ts, seqId, value), memStoreSizing);
1514    store.add(createCell(qf3, ts, seqId, value), memStoreSizing);
1515    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1516    quals.add(qf1);
1517    quals.add(qf2);
1518    quals.add(qf3);
1519    StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY);
1520    MyCompactingMemStore.START_TEST.set(true);
1521    Runnable flush = () -> {
1522      // this is blocked until we create first scanner from pipeline and snapshot -- phase (1/5)
1523      // recreate the active memstore -- phase (4/5)
1524      storeFlushCtx.prepare();
1525    };
1526    ExecutorService service = Executors.newSingleThreadExecutor();
1527    service.execute(flush);
1528    // we get scanner from pipeline and snapshot but they are empty. -- phase (2/5)
1529    // this is blocked until we recreate the active memstore -- phase (3/5)
1530    // we get scanner from active memstore but it is empty -- phase (5/5)
1531    InternalScanner scanner =
1532      (InternalScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1);
1533    service.shutdown();
1534    service.awaitTermination(20, TimeUnit.SECONDS);
1535    try {
1536      try {
1537        List<Cell> results = new ArrayList<>();
1538        scanner.next(results);
1539        assertEquals(3, results.size());
1540        for (Cell c : results) {
1541          byte[] actualValue = CellUtil.cloneValue(c);
1542          assertTrue("expected:" + Bytes.toStringBinary(value) + ", actual:"
1543            + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value));
1544        }
1545      } finally {
1546        scanner.close();
1547      }
1548    } finally {
1549      MyCompactingMemStore.START_TEST.set(false);
1550      storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1551      storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1552    }
1553  }
1554
1555  @Test
1556  public void testScanWithDoubleFlush() throws IOException {
1557    Configuration conf = HBaseConfiguration.create();
1558    // Initialize region
1559    MyStore myStore = initMyStore(name.getMethodName(), conf, new MyStoreHook() {
1560      @Override
1561      public void getScanners(MyStore store) throws IOException {
1562        final long tmpId = id++;
1563        ExecutorService s = Executors.newSingleThreadExecutor();
1564        s.execute(() -> {
1565          try {
1566            // flush the store before storescanner updates the scanners from store.
1567            // The current data will be flushed into files, and the memstore will
1568            // be clear.
1569            // -- phase (4/4)
1570            flushStore(store, tmpId);
1571          } catch (IOException ex) {
1572            throw new RuntimeException(ex);
1573          }
1574        });
1575        s.shutdown();
1576        try {
1577          // wait for the flush, the thread will be blocked in HStore#notifyChangedReadersObservers.
1578          s.awaitTermination(3, TimeUnit.SECONDS);
1579        } catch (InterruptedException ex) {
1580        }
1581      }
1582    });
1583    byte[] oldValue = Bytes.toBytes("oldValue");
1584    byte[] currentValue = Bytes.toBytes("currentValue");
1585    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1586    long ts = EnvironmentEdgeManager.currentTime();
1587    long seqId = 100;
1588    // older data whihc shouldn't be "seen" by client
1589    myStore.add(createCell(qf1, ts, seqId, oldValue), memStoreSizing);
1590    myStore.add(createCell(qf2, ts, seqId, oldValue), memStoreSizing);
1591    myStore.add(createCell(qf3, ts, seqId, oldValue), memStoreSizing);
1592    long snapshotId = id++;
1593    // push older data into snapshot -- phase (1/4)
1594    StoreFlushContext storeFlushCtx =
1595      store.createFlushContext(snapshotId, FlushLifeCycleTracker.DUMMY);
1596    storeFlushCtx.prepare();
1597
1598    // insert current data into active -- phase (2/4)
1599    myStore.add(createCell(qf1, ts + 1, seqId + 1, currentValue), memStoreSizing);
1600    myStore.add(createCell(qf2, ts + 1, seqId + 1, currentValue), memStoreSizing);
1601    myStore.add(createCell(qf3, ts + 1, seqId + 1, currentValue), memStoreSizing);
1602    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1603    quals.add(qf1);
1604    quals.add(qf2);
1605    quals.add(qf3);
1606    try (InternalScanner scanner =
1607      (InternalScanner) myStore.getScanner(new Scan(new Get(row)), quals, seqId + 1)) {
1608      // complete the flush -- phase (3/4)
1609      storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1610      storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1611
1612      List<Cell> results = new ArrayList<>();
1613      scanner.next(results);
1614      assertEquals(3, results.size());
1615      for (Cell c : results) {
1616        byte[] actualValue = CellUtil.cloneValue(c);
1617        assertTrue("expected:" + Bytes.toStringBinary(currentValue) + ", actual:"
1618          + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, currentValue));
1619      }
1620    }
1621  }
1622
1623  /**
1624   * This test is for HBASE-27519, when the {@link StoreScanner} is scanning,the Flush and the
1625   * Compaction execute concurrently and theCcompaction compact and archive the flushed
1626   * {@link HStoreFile} which is used by {@link StoreScanner#updateReaders}.Before
1627   * HBASE-27519,{@link StoreScanner.updateReaders} would throw {@link FileNotFoundException}.
1628   */
1629  @Test
1630  public void testStoreScannerUpdateReadersWhenFlushAndCompactConcurrently() throws IOException {
1631    Configuration conf = HBaseConfiguration.create();
1632    conf.setBoolean(WALFactory.WAL_ENABLED, false);
1633    conf.set(DEFAULT_COMPACTION_POLICY_CLASS_KEY, EverythingPolicy.class.getName());
1634    byte[] r0 = Bytes.toBytes("row0");
1635    byte[] r1 = Bytes.toBytes("row1");
1636    final CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
1637    final AtomicBoolean shouldWaitRef = new AtomicBoolean(false);
1638    // Initialize region
1639    final MyStore myStore = initMyStore(name.getMethodName(), conf, new MyStoreHook() {
1640      @Override
1641      public void getScanners(MyStore store) throws IOException {
1642        try {
1643          // Here this method is called by StoreScanner.updateReaders which is invoked by the
1644          // following TestHStore.flushStore
1645          if (shouldWaitRef.get()) {
1646            // wait the following compaction Task start
1647            cyclicBarrier.await();
1648            // wait the following HStore.closeAndArchiveCompactedFiles end.
1649            cyclicBarrier.await();
1650          }
1651        } catch (BrokenBarrierException | InterruptedException e) {
1652          throw new RuntimeException(e);
1653        }
1654      }
1655    });
1656
1657    final AtomicReference<Throwable> compactionExceptionRef = new AtomicReference<Throwable>(null);
1658    Runnable compactionTask = () -> {
1659      try {
1660        // Only when the StoreScanner.updateReaders invoked by TestHStore.flushStore prepares for
1661        // entering the MyStore.getScanners, compactionTask could start.
1662        cyclicBarrier.await();
1663        region.compactStore(family, new NoLimitThroughputController());
1664        myStore.closeAndArchiveCompactedFiles();
1665        // Notify StoreScanner.updateReaders could enter MyStore.getScanners.
1666        cyclicBarrier.await();
1667      } catch (Throwable e) {
1668        compactionExceptionRef.set(e);
1669      }
1670    };
1671
1672    long ts = EnvironmentEdgeManager.currentTime();
1673    long seqId = 100;
1674    byte[] value = Bytes.toBytes("value");
1675    // older data whihc shouldn't be "seen" by client
1676    myStore.add(createCell(r0, qf1, ts, seqId, value), null);
1677    flushStore(myStore, id++);
1678    myStore.add(createCell(r0, qf2, ts, seqId, value), null);
1679    flushStore(myStore, id++);
1680    myStore.add(createCell(r0, qf3, ts, seqId, value), null);
1681    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1682    quals.add(qf1);
1683    quals.add(qf2);
1684    quals.add(qf3);
1685
1686    myStore.add(createCell(r1, qf1, ts, seqId, value), null);
1687    myStore.add(createCell(r1, qf2, ts, seqId, value), null);
1688    myStore.add(createCell(r1, qf3, ts, seqId, value), null);
1689
1690    Thread.currentThread()
1691      .setName("testStoreScannerUpdateReadersWhenFlushAndCompactConcurrently thread");
1692    Scan scan = new Scan();
1693    scan.withStartRow(r0, true);
1694    try (InternalScanner scanner = (InternalScanner) myStore.getScanner(scan, quals, seqId)) {
1695      List<Cell> results = new MyList<>(size -> {
1696        switch (size) {
1697          case 1:
1698            shouldWaitRef.set(true);
1699            Thread thread = new Thread(compactionTask);
1700            thread.setName("MyCompacting Thread.");
1701            thread.start();
1702            try {
1703              flushStore(myStore, id++);
1704              thread.join();
1705            } catch (IOException | InterruptedException e) {
1706              throw new RuntimeException(e);
1707            }
1708            shouldWaitRef.set(false);
1709            break;
1710          default:
1711            break;
1712        }
1713      });
1714      // Before HBASE-27519, here would throw java.io.FileNotFoundException because the storeFile
1715      // which used by StoreScanner.updateReaders is deleted by compactionTask.
1716      scanner.next(results);
1717      // The results is r0 row cells.
1718      assertEquals(3, results.size());
1719      assertTrue(compactionExceptionRef.get() == null);
1720    }
1721  }
1722
1723  @Test
1724  public void testReclaimChunkWhenScaning() throws IOException {
1725    init("testReclaimChunkWhenScaning");
1726    long ts = EnvironmentEdgeManager.currentTime();
1727    long seqId = 100;
1728    byte[] value = Bytes.toBytes("value");
1729    // older data whihc shouldn't be "seen" by client
1730    store.add(createCell(qf1, ts, seqId, value), null);
1731    store.add(createCell(qf2, ts, seqId, value), null);
1732    store.add(createCell(qf3, ts, seqId, value), null);
1733    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1734    quals.add(qf1);
1735    quals.add(qf2);
1736    quals.add(qf3);
1737    try (InternalScanner scanner =
1738      (InternalScanner) store.getScanner(new Scan(new Get(row)), quals, seqId)) {
1739      List<Cell> results = new MyList<>(size -> {
1740        switch (size) {
1741          // 1) we get the first cell (qf1)
1742          // 2) flush the data to have StoreScanner update inner scanners
1743          // 3) the chunk will be reclaimed after updaing
1744          case 1:
1745            try {
1746              flushStore(store, id++);
1747            } catch (IOException e) {
1748              throw new RuntimeException(e);
1749            }
1750            break;
1751          // 1) we get the second cell (qf2)
1752          // 2) add some cell to fill some byte into the chunk (we have only one chunk)
1753          case 2:
1754            try {
1755              byte[] newValue = Bytes.toBytes("newValue");
1756              // older data whihc shouldn't be "seen" by client
1757              store.add(createCell(qf1, ts + 1, seqId + 1, newValue), null);
1758              store.add(createCell(qf2, ts + 1, seqId + 1, newValue), null);
1759              store.add(createCell(qf3, ts + 1, seqId + 1, newValue), null);
1760            } catch (IOException e) {
1761              throw new RuntimeException(e);
1762            }
1763            break;
1764          default:
1765            break;
1766        }
1767      });
1768      scanner.next(results);
1769      assertEquals(3, results.size());
1770      for (Cell c : results) {
1771        byte[] actualValue = CellUtil.cloneValue(c);
1772        assertTrue("expected:" + Bytes.toStringBinary(value) + ", actual:"
1773          + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value));
1774      }
1775    }
1776  }
1777
1778  /**
1779   * If there are two running InMemoryFlushRunnable, the later InMemoryFlushRunnable may change the
1780   * versionedList. And the first InMemoryFlushRunnable will use the chagned versionedList to remove
1781   * the corresponding segments. In short, there will be some segements which isn't in merge are
1782   * removed.
1783   */
1784  @Test
1785  public void testRunDoubleMemStoreCompactors() throws IOException, InterruptedException {
1786    int flushSize = 500;
1787    Configuration conf = HBaseConfiguration.create();
1788    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStoreWithCustomCompactor.class.getName());
1789    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.25);
1790    MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.set(0);
1791    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushSize));
1792    // Set the lower threshold to invoke the "MERGE" policy
1793    conf.set(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, String.valueOf(0));
1794    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
1795      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
1796    byte[] value = Bytes.toBytes("thisisavarylargevalue");
1797    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1798    long ts = EnvironmentEdgeManager.currentTime();
1799    long seqId = 100;
1800    // older data whihc shouldn't be "seen" by client
1801    store.add(createCell(qf1, ts, seqId, value), memStoreSizing);
1802    store.add(createCell(qf2, ts, seqId, value), memStoreSizing);
1803    store.add(createCell(qf3, ts, seqId, value), memStoreSizing);
1804    assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
1805    StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY);
1806    storeFlushCtx.prepare();
1807    // This shouldn't invoke another in-memory flush because the first compactor thread
1808    // hasn't accomplished the in-memory compaction.
1809    store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing);
1810    store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing);
1811    store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing);
1812    assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
1813    // okay. Let the compaction be completed
1814    MyMemStoreCompactor.START_COMPACTOR_LATCH.countDown();
1815    CompactingMemStore mem = (CompactingMemStore) ((HStore) store).memstore;
1816    while (mem.isMemStoreFlushingInMemory()) {
1817      TimeUnit.SECONDS.sleep(1);
1818    }
1819    // This should invoke another in-memory flush.
1820    store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing);
1821    store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing);
1822    store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing);
1823    assertEquals(2, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
1824    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
1825      String.valueOf(TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE));
1826    storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1827    storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1828  }
1829
1830  @Test
1831  public void testAge() throws IOException {
1832    long currentTime = EnvironmentEdgeManager.currentTime();
1833    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
1834    edge.setValue(currentTime);
1835    EnvironmentEdgeManager.injectEdge(edge);
1836    Configuration conf = TEST_UTIL.getConfiguration();
1837    ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(family);
1838    initHRegion(name.getMethodName(), conf,
1839      TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), hcd, null, false);
1840    HStore store = new HStore(region, hcd, conf, false) {
1841
1842      @Override
1843      protected StoreEngine<?, ?, ?, ?> createStoreEngine(HStore store, Configuration conf,
1844        CellComparator kvComparator) throws IOException {
1845        List<HStoreFile> storefiles =
1846          Arrays.asList(mockStoreFile(currentTime - 10), mockStoreFile(currentTime - 100),
1847            mockStoreFile(currentTime - 1000), mockStoreFile(currentTime - 10000));
1848        StoreFileManager sfm = mock(StoreFileManager.class);
1849        when(sfm.getStoreFiles()).thenReturn(storefiles);
1850        StoreEngine<?, ?, ?, ?> storeEngine = mock(StoreEngine.class);
1851        when(storeEngine.getStoreFileManager()).thenReturn(sfm);
1852        return storeEngine;
1853      }
1854    };
1855    assertEquals(10L, store.getMinStoreFileAge().getAsLong());
1856    assertEquals(10000L, store.getMaxStoreFileAge().getAsLong());
1857    assertEquals((10 + 100 + 1000 + 10000) / 4.0, store.getAvgStoreFileAge().getAsDouble(), 1E-4);
1858  }
1859
1860  private HStoreFile mockStoreFile(long createdTime) {
1861    StoreFileInfo info = mock(StoreFileInfo.class);
1862    when(info.getCreatedTimestamp()).thenReturn(createdTime);
1863    HStoreFile sf = mock(HStoreFile.class);
1864    when(sf.getReader()).thenReturn(mock(StoreFileReader.class));
1865    when(sf.isHFile()).thenReturn(true);
1866    when(sf.getFileInfo()).thenReturn(info);
1867    return sf;
1868  }
1869
1870  private MyStore initMyStore(String methodName, Configuration conf, MyStoreHook hook)
1871    throws IOException {
1872    return (MyStore) init(methodName, conf,
1873      TableDescriptorBuilder.newBuilder(TableName.valueOf(table)),
1874      ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(5).build(), hook);
1875  }
1876
1877  private static class MyStore extends HStore {
1878    private final MyStoreHook hook;
1879
1880    MyStore(final HRegion region, final ColumnFamilyDescriptor family,
1881      final Configuration confParam, MyStoreHook hook, boolean switchToPread) throws IOException {
1882      super(region, family, confParam, false);
1883      this.hook = hook;
1884    }
1885
1886    @Override
1887    public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks,
1888      boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,
1889      boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt,
1890      boolean includeMemstoreScanner, boolean onlyLatestVersion) throws IOException {
1891      hook.getScanners(this);
1892      return super.getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true,
1893        stopRow, false, readPt, includeMemstoreScanner, onlyLatestVersion);
1894    }
1895
1896    @Override
1897    public long getSmallestReadPoint() {
1898      return hook.getSmallestReadPoint(this);
1899    }
1900  }
1901
1902  private abstract static class MyStoreHook {
1903
1904    void getScanners(MyStore store) throws IOException {
1905    }
1906
1907    long getSmallestReadPoint(HStore store) {
1908      return store.getHRegion().getSmallestReadPoint();
1909    }
1910  }
1911
1912  @Test
1913  public void testSwitchingPreadtoStreamParallelyWithCompactionDischarger() throws Exception {
1914    Configuration conf = HBaseConfiguration.create();
1915    conf.set("hbase.hstore.engine.class", DummyStoreEngine.class.getName());
1916    conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 0);
1917    // Set the lower threshold to invoke the "MERGE" policy
1918    MyStore store = initMyStore(name.getMethodName(), conf, new MyStoreHook() {
1919    });
1920    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1921    long ts = EnvironmentEdgeManager.currentTime();
1922    long seqID = 1L;
1923    // Add some data to the region and do some flushes
1924    for (int i = 1; i < 10; i++) {
1925      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1926        memStoreSizing);
1927    }
1928    // flush them
1929    flushStore(store, seqID);
1930    for (int i = 11; i < 20; i++) {
1931      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1932        memStoreSizing);
1933    }
1934    // flush them
1935    flushStore(store, seqID);
1936    for (int i = 21; i < 30; i++) {
1937      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1938        memStoreSizing);
1939    }
1940    // flush them
1941    flushStore(store, seqID);
1942
1943    assertEquals(3, store.getStorefilesCount());
1944    Scan scan = new Scan();
1945    scan.addFamily(family);
1946    Collection<HStoreFile> storefiles2 = store.getStorefiles();
1947    ArrayList<HStoreFile> actualStorefiles = Lists.newArrayList(storefiles2);
1948    StoreScanner storeScanner =
1949      (StoreScanner) store.getScanner(scan, scan.getFamilyMap().get(family), Long.MAX_VALUE);
1950    // get the current heap
1951    KeyValueHeap heap = storeScanner.heap;
1952    // create more store files
1953    for (int i = 31; i < 40; i++) {
1954      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1955        memStoreSizing);
1956    }
1957    // flush them
1958    flushStore(store, seqID);
1959
1960    for (int i = 41; i < 50; i++) {
1961      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1962        memStoreSizing);
1963    }
1964    // flush them
1965    flushStore(store, seqID);
1966    storefiles2 = store.getStorefiles();
1967    ArrayList<HStoreFile> actualStorefiles1 = Lists.newArrayList(storefiles2);
1968    actualStorefiles1.removeAll(actualStorefiles);
1969    // Do compaction
1970    MyThread thread = new MyThread(storeScanner);
1971    thread.start();
1972    store.replaceStoreFiles(actualStorefiles, actualStorefiles1, false);
1973    thread.join();
1974    KeyValueHeap heap2 = thread.getHeap();
1975    assertFalse(heap.equals(heap2));
1976  }
1977
1978  @Test
1979  public void testMaxPreadBytesConfiguredToBeLessThanZero() throws Exception {
1980    Configuration conf = HBaseConfiguration.create();
1981    conf.set("hbase.hstore.engine.class", DummyStoreEngine.class.getName());
1982    // Set 'hbase.storescanner.pread.max.bytes' < 0, so that StoreScanner will be a STREAM type.
1983    conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, -1);
1984    MyStore store = initMyStore(name.getMethodName(), conf, new MyStoreHook() {
1985    });
1986    Scan scan = new Scan();
1987    scan.addFamily(family);
1988    // ReadType on Scan is still DEFAULT only.
1989    assertEquals(ReadType.DEFAULT, scan.getReadType());
1990    StoreScanner storeScanner =
1991      (StoreScanner) store.getScanner(scan, scan.getFamilyMap().get(family), Long.MAX_VALUE);
1992    assertFalse(storeScanner.isScanUsePread());
1993  }
1994
1995  @Test
1996  public void testInMemoryCompactionTypeWithLowerCase() throws IOException, InterruptedException {
1997    Configuration conf = HBaseConfiguration.create();
1998    conf.set("hbase.systemtables.compacting.memstore.type", "eager");
1999    init(name.getMethodName(), conf,
2000      TableDescriptorBuilder.newBuilder(
2001        TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME, "meta".getBytes())),
2002      ColumnFamilyDescriptorBuilder.newBuilder(family)
2003        .setInMemoryCompaction(MemoryCompactionPolicy.NONE).build());
2004    assertTrue(((MemStoreCompactor) ((CompactingMemStore) store.memstore).compactor).toString()
2005      .startsWith("eager".toUpperCase()));
2006  }
2007
2008  @Test
2009  public void testSpaceQuotaChangeAfterReplacement() throws IOException {
2010    final TableName tn = TableName.valueOf(name.getMethodName());
2011    init(name.getMethodName());
2012
2013    RegionSizeStoreImpl sizeStore = new RegionSizeStoreImpl();
2014
2015    HStoreFile sf1 = mockStoreFileWithLength(1024L);
2016    HStoreFile sf2 = mockStoreFileWithLength(2048L);
2017    HStoreFile sf3 = mockStoreFileWithLength(4096L);
2018    HStoreFile sf4 = mockStoreFileWithLength(8192L);
2019
2020    RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tn).setStartKey(Bytes.toBytes("a"))
2021      .setEndKey(Bytes.toBytes("b")).build();
2022
2023    // Compacting two files down to one, reducing size
2024    sizeStore.put(regionInfo, 1024L + 4096L);
2025    store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo, Arrays.asList(sf1, sf3),
2026      Arrays.asList(sf2));
2027
2028    assertEquals(2048L, sizeStore.getRegionSize(regionInfo).getSize());
2029
2030    // The same file length in and out should have no change
2031    store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo, Arrays.asList(sf2),
2032      Arrays.asList(sf2));
2033
2034    assertEquals(2048L, sizeStore.getRegionSize(regionInfo).getSize());
2035
2036    // Increase the total size used
2037    store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo, Arrays.asList(sf2),
2038      Arrays.asList(sf3));
2039
2040    assertEquals(4096L, sizeStore.getRegionSize(regionInfo).getSize());
2041
2042    RegionInfo regionInfo2 = RegionInfoBuilder.newBuilder(tn).setStartKey(Bytes.toBytes("b"))
2043      .setEndKey(Bytes.toBytes("c")).build();
2044    store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo2, null, Arrays.asList(sf4));
2045
2046    assertEquals(8192L, sizeStore.getRegionSize(regionInfo2).getSize());
2047  }
2048
2049  @Test
2050  public void testHFileContextSetWithCFAndTable() throws Exception {
2051    init(this.name.getMethodName());
2052    StoreFileWriter writer = store.getStoreEngine()
2053      .createWriter(CreateStoreFileWriterParams.create().maxKeyCount(10000L)
2054        .compression(Compression.Algorithm.NONE).isCompaction(true).includeMVCCReadpoint(true)
2055        .includesTag(false).shouldDropBehind(true));
2056    HFileContext hFileContext = writer.getLiveFileWriter().getFileContext();
2057    assertArrayEquals(family, hFileContext.getColumnFamily());
2058    assertArrayEquals(table, hFileContext.getTableName());
2059  }
2060
2061  // This test is for HBASE-26026, HBase Write be stuck when active segment has no cell
2062  // but its dataSize exceeds inmemoryFlushSize
2063  @Test
2064  public void testCompactingMemStoreNoCellButDataSizeExceedsInmemoryFlushSize()
2065    throws IOException, InterruptedException {
2066    Configuration conf = HBaseConfiguration.create();
2067
2068    byte[] smallValue = new byte[3];
2069    byte[] largeValue = new byte[9];
2070    final long timestamp = EnvironmentEdgeManager.currentTime();
2071    final long seqId = 100;
2072    final ExtendedCell smallCell = createCell(qf1, timestamp, seqId, smallValue);
2073    final ExtendedCell largeCell = createCell(qf2, timestamp, seqId, largeValue);
2074    int smallCellByteSize = MutableSegment.getCellLength(smallCell);
2075    int largeCellByteSize = MutableSegment.getCellLength(largeCell);
2076    int flushByteSize = smallCellByteSize + largeCellByteSize - 2;
2077
2078    // set CompactingMemStore.inmemoryFlushSize to flushByteSize.
2079    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore2.class.getName());
2080    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
2081    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
2082
2083    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
2084      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
2085
2086    MyCompactingMemStore2 myCompactingMemStore = ((MyCompactingMemStore2) store.memstore);
2087    assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
2088    myCompactingMemStore.smallCellPreUpdateCounter.set(0);
2089    myCompactingMemStore.largeCellPreUpdateCounter.set(0);
2090
2091    final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
2092    Thread smallCellThread = new Thread(() -> {
2093      try {
2094        store.add(smallCell, new NonThreadSafeMemStoreSizing());
2095      } catch (Throwable exception) {
2096        exceptionRef.set(exception);
2097      }
2098    });
2099    smallCellThread.setName(MyCompactingMemStore2.SMALL_CELL_THREAD_NAME);
2100    smallCellThread.start();
2101
2102    String oldThreadName = Thread.currentThread().getName();
2103    try {
2104      /**
2105       * 1.smallCellThread enters CompactingMemStore.checkAndAddToActiveSize first, then
2106       * largeCellThread enters CompactingMemStore.checkAndAddToActiveSize, and then largeCellThread
2107       * invokes flushInMemory.
2108       * <p/>
2109       * 2. After largeCellThread finished CompactingMemStore.flushInMemory method, smallCellThread
2110       * can add cell to currentActive . That is to say when largeCellThread called flushInMemory
2111       * method, CompactingMemStore.active has no cell.
2112       */
2113      Thread.currentThread().setName(MyCompactingMemStore2.LARGE_CELL_THREAD_NAME);
2114      store.add(largeCell, new NonThreadSafeMemStoreSizing());
2115      smallCellThread.join();
2116
2117      for (int i = 0; i < 100; i++) {
2118        long currentTimestamp = timestamp + 100 + i;
2119        ExtendedCell cell = createCell(qf2, currentTimestamp, seqId, largeValue);
2120        store.add(cell, new NonThreadSafeMemStoreSizing());
2121      }
2122    } finally {
2123      Thread.currentThread().setName(oldThreadName);
2124    }
2125
2126    assertTrue(exceptionRef.get() == null);
2127
2128  }
2129
2130  // This test is for HBASE-26210, HBase Write be stuck when there is cell which size exceeds
2131  // InmemoryFlushSize
2132  @Test(timeout = 60000)
2133  public void testCompactingMemStoreCellExceedInmemoryFlushSize() throws Exception {
2134    Configuration conf = HBaseConfiguration.create();
2135    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore6.class.getName());
2136
2137    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
2138      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
2139
2140    MyCompactingMemStore6 myCompactingMemStore = ((MyCompactingMemStore6) store.memstore);
2141
2142    int size = (int) (myCompactingMemStore.getInmemoryFlushSize());
2143    byte[] value = new byte[size + 1];
2144
2145    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
2146    long timestamp = EnvironmentEdgeManager.currentTime();
2147    long seqId = 100;
2148    ExtendedCell cell = createCell(qf1, timestamp, seqId, value);
2149    int cellByteSize = MutableSegment.getCellLength(cell);
2150    store.add(cell, memStoreSizing);
2151    assertTrue(memStoreSizing.getCellsCount() == 1);
2152    assertTrue(memStoreSizing.getDataSize() == cellByteSize);
2153    // Waiting the in memory compaction completed, see HBASE-26438
2154    myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
2155  }
2156
2157  /**
2158   * This test is for HBASE-27464, before this JIRA,when init {@link CellChunkImmutableSegment} for
2159   * 'COMPACT' action, we not force copy to current MSLab. When cell size bigger than
2160   * {@link MemStoreLABImpl#maxAlloc}, cell will stay in previous chunk which will recycle after
2161   * segment replace, and we may read wrong data when these chunk reused by others.
2162   */
2163  @Test
2164  public void testForceCloneOfBigCellForCellChunkImmutableSegment() throws Exception {
2165    Configuration conf = HBaseConfiguration.create();
2166    int maxAllocByteSize = conf.getInt(MemStoreLAB.MAX_ALLOC_KEY, MemStoreLAB.MAX_ALLOC_DEFAULT);
2167
2168    // Construct big cell,which is large than {@link MemStoreLABImpl#maxAlloc}.
2169    byte[] cellValue = new byte[maxAllocByteSize + 1];
2170    final long timestamp = EnvironmentEdgeManager.currentTime();
2171    final long seqId = 100;
2172    final byte[] rowKey1 = Bytes.toBytes("rowKey1");
2173    final ExtendedCell originalCell1 = createCell(rowKey1, qf1, timestamp, seqId, cellValue);
2174    final byte[] rowKey2 = Bytes.toBytes("rowKey2");
2175    final ExtendedCell originalCell2 = createCell(rowKey2, qf1, timestamp, seqId, cellValue);
2176    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
2177    quals.add(qf1);
2178
2179    int cellByteSize = MutableSegment.getCellLength(originalCell1);
2180    int inMemoryFlushByteSize = cellByteSize - 1;
2181
2182    // set CompactingMemStore.inmemoryFlushSize to flushByteSize.
2183    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore6.class.getName());
2184    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
2185    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(inMemoryFlushByteSize * 200));
2186    conf.setBoolean(WALFactory.WAL_ENABLED, false);
2187
2188    // Use {@link MemoryCompactionPolicy#EAGER} for always compacting.
2189    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
2190      .setInMemoryCompaction(MemoryCompactionPolicy.EAGER).build());
2191
2192    MyCompactingMemStore6 myCompactingMemStore = ((MyCompactingMemStore6) store.memstore);
2193    assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == inMemoryFlushByteSize);
2194
2195    // Data chunk Pool is disabled.
2196    assertTrue(ChunkCreator.getInstance().getMaxCount(ChunkType.DATA_CHUNK) == 0);
2197
2198    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
2199
2200    // First compact
2201    store.add(originalCell1, memStoreSizing);
2202    // Waiting for the first in-memory compaction finished
2203    myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
2204
2205    StoreScanner storeScanner =
2206      (StoreScanner) store.getScanner(new Scan(new Get(rowKey1)), quals, seqId + 1);
2207    SegmentScanner segmentScanner = getTypeKeyValueScanner(storeScanner, SegmentScanner.class);
2208    ExtendedCell resultCell1 = segmentScanner.next();
2209    assertTrue(PrivateCellUtil.equals(resultCell1, originalCell1));
2210    int cell1ChunkId = resultCell1.getChunkId();
2211    assertTrue(cell1ChunkId != ExtendedCell.CELL_NOT_BASED_ON_CHUNK);
2212    assertNull(segmentScanner.next());
2213    segmentScanner.close();
2214    storeScanner.close();
2215    Segment segment = segmentScanner.segment;
2216    assertTrue(segment instanceof CellChunkImmutableSegment);
2217    MemStoreLABImpl memStoreLAB1 = (MemStoreLABImpl) (segmentScanner.segment.getMemStoreLAB());
2218    assertTrue(!memStoreLAB1.isClosed());
2219    assertTrue(!memStoreLAB1.chunks.isEmpty());
2220    assertTrue(!memStoreLAB1.isReclaimed());
2221
2222    // Second compact
2223    store.add(originalCell2, memStoreSizing);
2224    // Waiting for the second in-memory compaction finished
2225    myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
2226
2227    // Before HBASE-27464, here may throw java.lang.IllegalArgumentException: In CellChunkMap, cell
2228    // must be associated with chunk.. We were looking for a cell at index 0.
2229    // The cause for this exception is because the data chunk Pool is disabled,when the data chunks
2230    // are recycled after the second in-memory compaction finished,the
2231    // {@link ChunkCreator.putbackChunks} method does not put the chunks back to the data chunk
2232    // pool,it just removes them from {@link ChunkCreator#chunkIdMap},so in
2233    // {@link CellChunkMap#getCell} we could not get the data chunk by chunkId.
2234    storeScanner = (StoreScanner) store.getScanner(new Scan(new Get(rowKey1)), quals, seqId + 1);
2235    segmentScanner = getTypeKeyValueScanner(storeScanner, SegmentScanner.class);
2236    ExtendedCell newResultCell1 = segmentScanner.next();
2237    assertTrue(newResultCell1 != resultCell1);
2238    assertTrue(PrivateCellUtil.equals(newResultCell1, originalCell1));
2239
2240    ExtendedCell resultCell2 = segmentScanner.next();
2241    assertTrue(PrivateCellUtil.equals(resultCell2, originalCell2));
2242    assertNull(segmentScanner.next());
2243    segmentScanner.close();
2244    storeScanner.close();
2245
2246    segment = segmentScanner.segment;
2247    assertTrue(segment instanceof CellChunkImmutableSegment);
2248    MemStoreLABImpl memStoreLAB2 = (MemStoreLABImpl) (segmentScanner.segment.getMemStoreLAB());
2249    assertTrue(!memStoreLAB2.isClosed());
2250    assertTrue(!memStoreLAB2.chunks.isEmpty());
2251    assertTrue(!memStoreLAB2.isReclaimed());
2252    assertTrue(memStoreLAB1.isClosed());
2253    assertTrue(memStoreLAB1.chunks.isEmpty());
2254    assertTrue(memStoreLAB1.isReclaimed());
2255  }
2256
2257  // This test is for HBASE-26210 also, test write large cell and small cell concurrently when
2258  // InmemoryFlushSize is smaller,equal with and larger than cell size.
2259  @Test
2260  public void testCompactingMemStoreWriteLargeCellAndSmallCellConcurrently()
2261    throws IOException, InterruptedException {
2262    doWriteTestLargeCellAndSmallCellConcurrently(
2263      (smallCellByteSize, largeCellByteSize) -> largeCellByteSize - 1);
2264    doWriteTestLargeCellAndSmallCellConcurrently(
2265      (smallCellByteSize, largeCellByteSize) -> largeCellByteSize);
2266    doWriteTestLargeCellAndSmallCellConcurrently(
2267      (smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize - 1);
2268    doWriteTestLargeCellAndSmallCellConcurrently(
2269      (smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize);
2270    doWriteTestLargeCellAndSmallCellConcurrently(
2271      (smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize + 1);
2272  }
2273
2274  private void doWriteTestLargeCellAndSmallCellConcurrently(IntBinaryOperator getFlushByteSize)
2275    throws IOException, InterruptedException {
2276
2277    Configuration conf = HBaseConfiguration.create();
2278
2279    byte[] smallValue = new byte[3];
2280    byte[] largeValue = new byte[100];
2281    final long timestamp = EnvironmentEdgeManager.currentTime();
2282    final long seqId = 100;
2283    final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
2284    final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
2285    int smallCellByteSize = MutableSegment.getCellLength(smallCell);
2286    int largeCellByteSize = MutableSegment.getCellLength(largeCell);
2287    int flushByteSize = getFlushByteSize.applyAsInt(smallCellByteSize, largeCellByteSize);
2288    boolean flushByteSizeLessThanSmallAndLargeCellSize =
2289      flushByteSize < (smallCellByteSize + largeCellByteSize);
2290
2291    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore3.class.getName());
2292    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
2293    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
2294
2295    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
2296      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
2297
2298    MyCompactingMemStore3 myCompactingMemStore = ((MyCompactingMemStore3) store.memstore);
2299    assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
2300    myCompactingMemStore.disableCompaction();
2301    if (flushByteSizeLessThanSmallAndLargeCellSize) {
2302      myCompactingMemStore.flushByteSizeLessThanSmallAndLargeCellSize = true;
2303    } else {
2304      myCompactingMemStore.flushByteSizeLessThanSmallAndLargeCellSize = false;
2305    }
2306
2307    final ThreadSafeMemStoreSizing memStoreSizing = new ThreadSafeMemStoreSizing();
2308    final AtomicLong totalCellByteSize = new AtomicLong(0);
2309    final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
2310    Thread smallCellThread = new Thread(() -> {
2311      try {
2312        for (int i = 1; i <= MyCompactingMemStore3.CELL_COUNT; i++) {
2313          long currentTimestamp = timestamp + i;
2314          ExtendedCell cell = createCell(qf1, currentTimestamp, seqId, smallValue);
2315          totalCellByteSize.addAndGet(MutableSegment.getCellLength(cell));
2316          store.add(cell, memStoreSizing);
2317        }
2318      } catch (Throwable exception) {
2319        exceptionRef.set(exception);
2320
2321      }
2322    });
2323    smallCellThread.setName(MyCompactingMemStore3.SMALL_CELL_THREAD_NAME);
2324    smallCellThread.start();
2325
2326    String oldThreadName = Thread.currentThread().getName();
2327    try {
2328      /**
2329       * When flushByteSizeLessThanSmallAndLargeCellSize is true:
2330       * </p>
2331       * 1.smallCellThread enters MyCompactingMemStore3.checkAndAddToActiveSize first, then
2332       * largeCellThread enters MyCompactingMemStore3.checkAndAddToActiveSize, and then
2333       * largeCellThread invokes flushInMemory.
2334       * <p/>
2335       * 2. After largeCellThread finished CompactingMemStore.flushInMemory method, smallCellThread
2336       * can run into MyCompactingMemStore3.checkAndAddToActiveSize again.
2337       * <p/>
2338       * When flushByteSizeLessThanSmallAndLargeCellSize is false: smallCellThread and
2339       * largeCellThread concurrently write one cell and wait each other, and then write another
2340       * cell etc.
2341       */
2342      Thread.currentThread().setName(MyCompactingMemStore3.LARGE_CELL_THREAD_NAME);
2343      for (int i = 1; i <= MyCompactingMemStore3.CELL_COUNT; i++) {
2344        long currentTimestamp = timestamp + i;
2345        ExtendedCell cell = createCell(qf2, currentTimestamp, seqId, largeValue);
2346        totalCellByteSize.addAndGet(MutableSegment.getCellLength(cell));
2347        store.add(cell, memStoreSizing);
2348      }
2349      smallCellThread.join();
2350
2351      assertTrue(exceptionRef.get() == null);
2352      assertTrue(memStoreSizing.getCellsCount() == (MyCompactingMemStore3.CELL_COUNT * 2));
2353      assertTrue(memStoreSizing.getDataSize() == totalCellByteSize.get());
2354      if (flushByteSizeLessThanSmallAndLargeCellSize) {
2355        assertTrue(myCompactingMemStore.flushCounter.get() == MyCompactingMemStore3.CELL_COUNT);
2356      } else {
2357        assertTrue(
2358          myCompactingMemStore.flushCounter.get() <= (MyCompactingMemStore3.CELL_COUNT - 1));
2359      }
2360    } finally {
2361      Thread.currentThread().setName(oldThreadName);
2362    }
2363  }
2364
2365  /**
2366   * <pre>
2367   * This test is for HBASE-26384,
2368   * test {@link CompactingMemStore#flattenOneSegment} and {@link CompactingMemStore#snapshot()}
2369   * execute concurrently.
2370   * The threads sequence before HBASE-26384 is(The bug only exists for branch-2,and I add UTs
2371   * for both branch-2 and master):
2372   * 1. The {@link CompactingMemStore} size exceeds
2373   *    {@link CompactingMemStore#getInmemoryFlushSize()},the write thread adds a new
2374   *    {@link ImmutableSegment}  to the head of {@link CompactingMemStore#pipeline},and start a
2375   *    in memory compact thread to execute {@link CompactingMemStore#inMemoryCompaction}.
2376   * 2. The in memory compact thread starts and then stopping before
2377   *    {@link CompactingMemStore#flattenOneSegment}.
2378   * 3. The snapshot thread starts {@link CompactingMemStore#snapshot} concurrently,after the
2379   *    snapshot thread executing {@link CompactingMemStore#getImmutableSegments},the in memory
2380   *    compact thread continues.
2381   *    Assuming {@link VersionedSegmentsList#version} returned from
2382   *    {@link CompactingMemStore#getImmutableSegments} is v.
2383   * 4. The snapshot thread stopping before {@link CompactingMemStore#swapPipelineWithNull}.
2384   * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
2385   *    {@link CompactionPipeline#version} is still v.
2386   * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
2387   *    {@link CompactionPipeline#version} is v, {@link CompactingMemStore#swapPipelineWithNull}
2388   *    thinks it is successful and continue flushing,but the {@link ImmutableSegment} in
2389   *    {@link CompactionPipeline} has changed because
2390   *    {@link CompactingMemStore#flattenOneSegment},so the {@link ImmutableSegment} is not
2391   *    removed in fact and still remaining in {@link CompactionPipeline}.
2392   *
2393   * After HBASE-26384, the 5-6 step is changed to following, which is expected behavior:
2394   * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
2395   *    {@link CompactingMemStore#flattenOneSegment} change {@link CompactionPipeline#version} to
2396   *    v+1.
2397   * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
2398   *    {@link CompactionPipeline#version} is v+1, {@link CompactingMemStore#swapPipelineWithNull}
2399   *    failed and retry the while loop in {@link CompactingMemStore#pushPipelineToSnapshot} once
2400   *    again, because there is no concurrent {@link CompactingMemStore#inMemoryCompaction} now,
2401   *    {@link CompactingMemStore#swapPipelineWithNull} succeeds.
2402   * </pre>
2403   */
2404  @Test
2405  public void testFlattenAndSnapshotCompactingMemStoreConcurrently() throws Exception {
2406    Configuration conf = HBaseConfiguration.create();
2407
2408    byte[] smallValue = new byte[3];
2409    byte[] largeValue = new byte[9];
2410    final long timestamp = EnvironmentEdgeManager.currentTime();
2411    final long seqId = 100;
2412    final ExtendedCell smallCell = createCell(qf1, timestamp, seqId, smallValue);
2413    final ExtendedCell largeCell = createCell(qf2, timestamp, seqId, largeValue);
2414    int smallCellByteSize = MutableSegment.getCellLength(smallCell);
2415    int largeCellByteSize = MutableSegment.getCellLength(largeCell);
2416    int totalCellByteSize = (smallCellByteSize + largeCellByteSize);
2417    int flushByteSize = totalCellByteSize - 2;
2418
2419    // set CompactingMemStore.inmemoryFlushSize to flushByteSize.
2420    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore4.class.getName());
2421    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
2422    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
2423
2424    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
2425      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
2426
2427    MyCompactingMemStore4 myCompactingMemStore = ((MyCompactingMemStore4) store.memstore);
2428    assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
2429
2430    store.add(smallCell, new NonThreadSafeMemStoreSizing());
2431    store.add(largeCell, new NonThreadSafeMemStoreSizing());
2432
2433    String oldThreadName = Thread.currentThread().getName();
2434    try {
2435      Thread.currentThread().setName(MyCompactingMemStore4.TAKE_SNAPSHOT_THREAD_NAME);
2436      /**
2437       * {@link CompactingMemStore#snapshot} must wait the in memory compact thread enters
2438       * {@link CompactingMemStore#flattenOneSegment},because {@link CompactingMemStore#snapshot}
2439       * would invoke {@link CompactingMemStore#stopCompaction}.
2440       */
2441      myCompactingMemStore.snapShotStartCyclicCyclicBarrier.await();
2442
2443      MemStoreSnapshot memStoreSnapshot = myCompactingMemStore.snapshot();
2444      myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
2445
2446      assertTrue(memStoreSnapshot.getCellsCount() == 2);
2447      assertTrue(((int) (memStoreSnapshot.getDataSize())) == totalCellByteSize);
2448      VersionedSegmentsList segments = myCompactingMemStore.getImmutableSegments();
2449      assertTrue(segments.getNumOfSegments() == 0);
2450      assertTrue(segments.getNumOfCells() == 0);
2451      assertTrue(myCompactingMemStore.setInMemoryCompactionFlagCounter.get() == 1);
2452      assertTrue(myCompactingMemStore.swapPipelineWithNullCounter.get() == 2);
2453    } finally {
2454      Thread.currentThread().setName(oldThreadName);
2455    }
2456  }
2457
2458  /**
2459   * <pre>
2460   * This test is for HBASE-26384,
2461   * test {@link CompactingMemStore#flattenOneSegment}{@link CompactingMemStore#snapshot()}
2462   * and writeMemStore execute concurrently.
2463   * The threads sequence before HBASE-26384 is(The bug only exists for branch-2,and I add UTs
2464   * for both branch-2 and master):
2465   * 1. The {@link CompactingMemStore} size exceeds
2466   *    {@link CompactingMemStore#getInmemoryFlushSize()},the write thread adds a new
2467   *    {@link ImmutableSegment}  to the head of {@link CompactingMemStore#pipeline},and start a
2468   *    in memory compact thread to execute {@link CompactingMemStore#inMemoryCompaction}.
2469   * 2. The in memory compact thread starts and then stopping before
2470   *    {@link CompactingMemStore#flattenOneSegment}.
2471   * 3. The snapshot thread starts {@link CompactingMemStore#snapshot} concurrently,after the
2472   *    snapshot thread executing {@link CompactingMemStore#getImmutableSegments},the in memory
2473   *    compact thread continues.
2474   *    Assuming {@link VersionedSegmentsList#version} returned from
2475   *    {@link CompactingMemStore#getImmutableSegments} is v.
2476   * 4. The snapshot thread stopping before {@link CompactingMemStore#swapPipelineWithNull}.
2477   * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
2478   *    {@link CompactionPipeline#version} is still v.
2479   * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
2480   *    {@link CompactionPipeline#version} is v, {@link CompactingMemStore#swapPipelineWithNull}
2481   *    thinks it is successful and continue flushing,but the {@link ImmutableSegment} in
2482   *    {@link CompactionPipeline} has changed because
2483   *    {@link CompactingMemStore#flattenOneSegment},so the {@link ImmutableSegment} is not
2484   *    removed in fact and still remaining in {@link CompactionPipeline}.
2485   *
2486   * After HBASE-26384, the 5-6 step is changed to following, which is expected behavior,
2487   * and I add step 7-8 to test there is new segment added before retry.
2488   * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
2489   *    {@link CompactingMemStore#flattenOneSegment} change {@link CompactionPipeline#version} to
2490   *     v+1.
2491   * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
2492   *    {@link CompactionPipeline#version} is v+1, {@link CompactingMemStore#swapPipelineWithNull}
2493   *    failed and retry,{@link VersionedSegmentsList#version} returned from
2494   *    {@link CompactingMemStore#getImmutableSegments} is v+1.
2495   * 7. The write thread continues writing to {@link CompactingMemStore} and
2496   *    {@link CompactingMemStore} size exceeds {@link CompactingMemStore#getInmemoryFlushSize()},
2497   *    {@link CompactingMemStore#flushInMemory(MutableSegment)} is called and a new
2498   *    {@link ImmutableSegment} is added to the head of {@link CompactingMemStore#pipeline},
2499   *    {@link CompactionPipeline#version} is still v+1.
2500   * 8. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
2501   *    {@link CompactionPipeline#version} is still v+1,
2502   *    {@link CompactingMemStore#swapPipelineWithNull} succeeds.The new {@link ImmutableSegment}
2503   *    remained at the head of {@link CompactingMemStore#pipeline},the old is removed by
2504   *    {@link CompactingMemStore#swapPipelineWithNull}.
2505   * </pre>
2506   */
2507  @Test
2508  public void testFlattenSnapshotWriteCompactingMemeStoreConcurrently() throws Exception {
2509    Configuration conf = HBaseConfiguration.create();
2510
2511    byte[] smallValue = new byte[3];
2512    byte[] largeValue = new byte[9];
2513    final long timestamp = EnvironmentEdgeManager.currentTime();
2514    final long seqId = 100;
2515    final ExtendedCell smallCell = createCell(qf1, timestamp, seqId, smallValue);
2516    final ExtendedCell largeCell = createCell(qf2, timestamp, seqId, largeValue);
2517    int smallCellByteSize = MutableSegment.getCellLength(smallCell);
2518    int largeCellByteSize = MutableSegment.getCellLength(largeCell);
2519    int firstWriteCellByteSize = (smallCellByteSize + largeCellByteSize);
2520    int flushByteSize = firstWriteCellByteSize - 2;
2521
2522    // set CompactingMemStore.inmemoryFlushSize to flushByteSize.
2523    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore5.class.getName());
2524    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
2525    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
2526
2527    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
2528      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
2529
2530    final MyCompactingMemStore5 myCompactingMemStore = ((MyCompactingMemStore5) store.memstore);
2531    assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
2532
2533    store.add(smallCell, new NonThreadSafeMemStoreSizing());
2534    store.add(largeCell, new NonThreadSafeMemStoreSizing());
2535
2536    final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
2537    final ExtendedCell writeAgainCell1 = createCell(qf3, timestamp, seqId + 1, largeValue);
2538    final ExtendedCell writeAgainCell2 = createCell(qf4, timestamp, seqId + 1, largeValue);
2539    final int writeAgainCellByteSize =
2540      MutableSegment.getCellLength(writeAgainCell1) + MutableSegment.getCellLength(writeAgainCell2);
2541    final Thread writeAgainThread = new Thread(() -> {
2542      try {
2543        myCompactingMemStore.writeMemStoreAgainStartCyclicBarrier.await();
2544
2545        store.add(writeAgainCell1, new NonThreadSafeMemStoreSizing());
2546        store.add(writeAgainCell2, new NonThreadSafeMemStoreSizing());
2547
2548        myCompactingMemStore.writeMemStoreAgainEndCyclicBarrier.await();
2549      } catch (Throwable exception) {
2550        exceptionRef.set(exception);
2551      }
2552    });
2553    writeAgainThread.setName(MyCompactingMemStore5.WRITE_AGAIN_THREAD_NAME);
2554    writeAgainThread.start();
2555
2556    String oldThreadName = Thread.currentThread().getName();
2557    try {
2558      Thread.currentThread().setName(MyCompactingMemStore5.TAKE_SNAPSHOT_THREAD_NAME);
2559      /**
2560       * {@link CompactingMemStore#snapshot} must wait the in memory compact thread enters
2561       * {@link CompactingMemStore#flattenOneSegment},because {@link CompactingMemStore#snapshot}
2562       * would invoke {@link CompactingMemStore#stopCompaction}.
2563       */
2564      myCompactingMemStore.snapShotStartCyclicCyclicBarrier.await();
2565      MemStoreSnapshot memStoreSnapshot = myCompactingMemStore.snapshot();
2566      myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
2567      writeAgainThread.join();
2568
2569      assertTrue(memStoreSnapshot.getCellsCount() == 2);
2570      assertTrue(((int) (memStoreSnapshot.getDataSize())) == firstWriteCellByteSize);
2571      VersionedSegmentsList segments = myCompactingMemStore.getImmutableSegments();
2572      assertTrue(segments.getNumOfSegments() == 1);
2573      assertTrue(
2574        ((int) (segments.getStoreSegments().get(0).getDataSize())) == writeAgainCellByteSize);
2575      assertTrue(segments.getNumOfCells() == 2);
2576      assertTrue(myCompactingMemStore.setInMemoryCompactionFlagCounter.get() == 2);
2577      assertTrue(exceptionRef.get() == null);
2578      assertTrue(myCompactingMemStore.swapPipelineWithNullCounter.get() == 2);
2579    } finally {
2580      Thread.currentThread().setName(oldThreadName);
2581    }
2582  }
2583
2584  /**
2585   * <pre>
2586   * This test is for HBASE-26465,
2587   * test {@link DefaultMemStore#clearSnapshot} and {@link DefaultMemStore#getScanners} execute
2588   * concurrently. The threads sequence before HBASE-26465 is:
2589   * 1.The flush thread starts {@link DefaultMemStore} flushing after some cells have be added to
2590   *  {@link DefaultMemStore}.
2591   * 2.The flush thread stopping before {@link DefaultMemStore#clearSnapshot} in
2592   *   {@link HStore#updateStorefiles} after completed flushing memStore to hfile.
2593   * 3.The scan thread starts and stopping after {@link DefaultMemStore#getSnapshotSegments} in
2594   *   {@link DefaultMemStore#getScanners},here the scan thread gets the
2595   *   {@link DefaultMemStore#snapshot} which is created by the flush thread.
2596   * 4.The flush thread continues {@link DefaultMemStore#clearSnapshot} and close
2597   *   {@link DefaultMemStore#snapshot},because the reference count of the corresponding
2598   *   {@link MemStoreLABImpl} is 0, the {@link Chunk}s in corresponding {@link MemStoreLABImpl}
2599   *   are recycled.
2600   * 5.The scan thread continues {@link DefaultMemStore#getScanners},and create a
2601   *   {@link SegmentScanner} for this {@link DefaultMemStore#snapshot}, and increase the
2602   *   reference count of the corresponding {@link MemStoreLABImpl}, but {@link Chunk}s in
2603   *   corresponding {@link MemStoreLABImpl} are recycled by step 4, and these {@link Chunk}s may
2604   *   be overwritten by other write threads,which may cause serious problem.
2605   * After HBASE-26465,{@link DefaultMemStore#getScanners} and
2606   * {@link DefaultMemStore#clearSnapshot} could not execute concurrently.
2607   * </pre>
2608   */
2609  @Test
2610  public void testClearSnapshotGetScannerConcurrently() throws Exception {
2611    Configuration conf = HBaseConfiguration.create();
2612
2613    byte[] smallValue = new byte[3];
2614    byte[] largeValue = new byte[9];
2615    final long timestamp = EnvironmentEdgeManager.currentTime();
2616    final long seqId = 100;
2617    final ExtendedCell smallCell = createCell(qf1, timestamp, seqId, smallValue);
2618    final ExtendedCell largeCell = createCell(qf2, timestamp, seqId, largeValue);
2619    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
2620    quals.add(qf1);
2621    quals.add(qf2);
2622
2623    conf.set(HStore.MEMSTORE_CLASS_NAME, MyDefaultMemStore.class.getName());
2624    conf.setBoolean(WALFactory.WAL_ENABLED, false);
2625
2626    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build());
2627    MyDefaultMemStore myDefaultMemStore = (MyDefaultMemStore) (store.memstore);
2628    myDefaultMemStore.store = store;
2629
2630    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
2631    store.add(smallCell, memStoreSizing);
2632    store.add(largeCell, memStoreSizing);
2633
2634    final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
2635    final Thread flushThread = new Thread(() -> {
2636      try {
2637        flushStore(store, id++);
2638      } catch (Throwable exception) {
2639        exceptionRef.set(exception);
2640      }
2641    });
2642    flushThread.setName(MyDefaultMemStore.FLUSH_THREAD_NAME);
2643    flushThread.start();
2644
2645    String oldThreadName = Thread.currentThread().getName();
2646    StoreScanner storeScanner = null;
2647    try {
2648      Thread.currentThread().setName(MyDefaultMemStore.GET_SCANNER_THREAD_NAME);
2649
2650      /**
2651       * Wait flush thread stopping before {@link DefaultMemStore#doClearSnapshot}
2652       */
2653      myDefaultMemStore.getScannerCyclicBarrier.await();
2654
2655      storeScanner = (StoreScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1);
2656      flushThread.join();
2657
2658      if (myDefaultMemStore.shouldWait) {
2659        SegmentScanner segmentScanner = getTypeKeyValueScanner(storeScanner, SegmentScanner.class);
2660        MemStoreLABImpl memStoreLAB = (MemStoreLABImpl) (segmentScanner.segment.getMemStoreLAB());
2661        assertTrue(memStoreLAB.isClosed());
2662        assertTrue(!memStoreLAB.chunks.isEmpty());
2663        assertTrue(!memStoreLAB.isReclaimed());
2664
2665        ExtendedCell cell1 = segmentScanner.next();
2666        PrivateCellUtil.equals(smallCell, cell1);
2667        ExtendedCell cell2 = segmentScanner.next();
2668        PrivateCellUtil.equals(largeCell, cell2);
2669        assertNull(segmentScanner.next());
2670      } else {
2671        List<ExtendedCell> results = new ArrayList<>();
2672        storeScanner.next(results);
2673        assertEquals(2, results.size());
2674        PrivateCellUtil.equals(smallCell, results.get(0));
2675        PrivateCellUtil.equals(largeCell, results.get(1));
2676      }
2677      assertTrue(exceptionRef.get() == null);
2678    } finally {
2679      if (storeScanner != null) {
2680        storeScanner.close();
2681      }
2682      Thread.currentThread().setName(oldThreadName);
2683    }
2684  }
2685
2686  @SuppressWarnings("unchecked")
2687  private <T> T getTypeKeyValueScanner(StoreScanner storeScanner, Class<T> keyValueScannerClass) {
2688    List<T> resultScanners = new ArrayList<T>();
2689    for (KeyValueScanner keyValueScanner : storeScanner.currentScanners) {
2690      if (keyValueScannerClass.isInstance(keyValueScanner)) {
2691        resultScanners.add((T) keyValueScanner);
2692      }
2693    }
2694    assertTrue(resultScanners.size() == 1);
2695    return resultScanners.get(0);
2696  }
2697
2698  @Test
2699  public void testOnConfigurationChange() throws IOException {
2700    final int COMMON_MAX_FILES_TO_COMPACT = 10;
2701    final int NEW_COMMON_MAX_FILES_TO_COMPACT = 8;
2702    final int STORE_MAX_FILES_TO_COMPACT = 6;
2703
2704    // Build a table that its maxFileToCompact different from common configuration.
2705    Configuration conf = HBaseConfiguration.create();
2706    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY,
2707      COMMON_MAX_FILES_TO_COMPACT);
2708    conf.setBoolean(CACHE_DATA_ON_READ_KEY, false);
2709    conf.setBoolean(CACHE_BLOCKS_ON_WRITE_KEY, true);
2710    conf.setBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, true);
2711    ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.newBuilder(family)
2712      .setConfiguration(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY,
2713        String.valueOf(STORE_MAX_FILES_TO_COMPACT))
2714      .build();
2715    init(this.name.getMethodName(), conf, hcd);
2716
2717    // After updating common configuration, the conf in HStore itself must not be changed.
2718    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY,
2719      NEW_COMMON_MAX_FILES_TO_COMPACT);
2720    this.store.onConfigurationChange(conf);
2721
2722    assertEquals(STORE_MAX_FILES_TO_COMPACT,
2723      store.getStoreEngine().getCompactionPolicy().getConf().getMaxFilesToCompact());
2724
2725    assertEquals(conf.getBoolean(CACHE_DATA_ON_READ_KEY, DEFAULT_CACHE_DATA_ON_READ), false);
2726    assertEquals(conf.getBoolean(CACHE_BLOCKS_ON_WRITE_KEY, DEFAULT_CACHE_DATA_ON_WRITE), true);
2727    assertEquals(conf.getBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, DEFAULT_EVICT_ON_CLOSE), true);
2728
2729    // reset to default values
2730    conf.getBoolean(CACHE_DATA_ON_READ_KEY, DEFAULT_CACHE_DATA_ON_READ);
2731    conf.getBoolean(CACHE_BLOCKS_ON_WRITE_KEY, DEFAULT_CACHE_DATA_ON_WRITE);
2732    conf.getBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, DEFAULT_EVICT_ON_CLOSE);
2733    this.store.onConfigurationChange(conf);
2734  }
2735
2736  /**
2737   * This test is for HBASE-26476
2738   */
2739  @Test
2740  public void testExtendsDefaultMemStore() throws Exception {
2741    Configuration conf = HBaseConfiguration.create();
2742    conf.setBoolean(WALFactory.WAL_ENABLED, false);
2743
2744    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build());
2745    assertTrue(this.store.memstore.getClass() == DefaultMemStore.class);
2746    tearDown();
2747
2748    conf.set(HStore.MEMSTORE_CLASS_NAME, CustomDefaultMemStore.class.getName());
2749    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build());
2750    assertTrue(this.store.memstore.getClass() == CustomDefaultMemStore.class);
2751  }
2752
2753  static class CustomDefaultMemStore extends DefaultMemStore {
2754
2755    public CustomDefaultMemStore(Configuration conf, CellComparator c,
2756      RegionServicesForStores regionServices) {
2757      super(conf, c, regionServices);
2758    }
2759
2760  }
2761
2762  /**
2763   * This test is for HBASE-26488
2764   */
2765  @Test
2766  public void testMemoryLeakWhenFlushMemStoreRetrying() throws Exception {
2767    Configuration conf = HBaseConfiguration.create();
2768
2769    byte[] smallValue = new byte[3];
2770    byte[] largeValue = new byte[9];
2771    final long timestamp = EnvironmentEdgeManager.currentTime();
2772    final long seqId = 100;
2773    final ExtendedCell smallCell = createCell(qf1, timestamp, seqId, smallValue);
2774    final ExtendedCell largeCell = createCell(qf2, timestamp, seqId, largeValue);
2775    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
2776    quals.add(qf1);
2777    quals.add(qf2);
2778
2779    conf.set(HStore.MEMSTORE_CLASS_NAME, MyDefaultMemStore1.class.getName());
2780    conf.setBoolean(WALFactory.WAL_ENABLED, false);
2781    conf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY,
2782      MyDefaultStoreFlusher.class.getName());
2783
2784    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build());
2785    MyDefaultMemStore1 myDefaultMemStore = (MyDefaultMemStore1) (store.memstore);
2786    assertTrue((store.storeEngine.getStoreFlusher()) instanceof MyDefaultStoreFlusher);
2787
2788    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
2789    store.add(smallCell, memStoreSizing);
2790    store.add(largeCell, memStoreSizing);
2791    flushStore(store, id++);
2792
2793    MemStoreLABImpl memStoreLAB =
2794      (MemStoreLABImpl) (myDefaultMemStore.snapshotImmutableSegment.getMemStoreLAB());
2795    assertTrue(memStoreLAB.isClosed());
2796    assertTrue(memStoreLAB.getRefCntValue() == 0);
2797    assertTrue(memStoreLAB.isReclaimed());
2798    assertTrue(memStoreLAB.chunks.isEmpty());
2799    StoreScanner storeScanner = null;
2800    try {
2801      storeScanner = (StoreScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1);
2802      assertTrue(store.storeEngine.getStoreFileManager().getStorefileCount() == 1);
2803      assertTrue(store.memstore.size().getCellsCount() == 0);
2804      assertTrue(store.memstore.getSnapshotSize().getCellsCount() == 0);
2805      assertTrue(storeScanner.currentScanners.size() == 1);
2806      assertTrue(storeScanner.currentScanners.get(0) instanceof StoreFileScanner);
2807
2808      List<ExtendedCell> results = new ArrayList<>();
2809      storeScanner.next(results);
2810      assertEquals(2, results.size());
2811      PrivateCellUtil.equals(smallCell, results.get(0));
2812      PrivateCellUtil.equals(largeCell, results.get(1));
2813    } finally {
2814      if (storeScanner != null) {
2815        storeScanner.close();
2816      }
2817    }
2818  }
2819
2820  static class MyDefaultMemStore1 extends DefaultMemStore {
2821
2822    private ImmutableSegment snapshotImmutableSegment;
2823
2824    public MyDefaultMemStore1(Configuration conf, CellComparator c,
2825      RegionServicesForStores regionServices) {
2826      super(conf, c, regionServices);
2827    }
2828
2829    @Override
2830    public MemStoreSnapshot snapshot() {
2831      MemStoreSnapshot result = super.snapshot();
2832      this.snapshotImmutableSegment = snapshot;
2833      return result;
2834    }
2835
2836  }
2837
2838  public static class MyDefaultStoreFlusher extends DefaultStoreFlusher {
2839    private static final AtomicInteger failCounter = new AtomicInteger(1);
2840    private static final AtomicInteger counter = new AtomicInteger(0);
2841
2842    public MyDefaultStoreFlusher(Configuration conf, HStore store) {
2843      super(conf, store);
2844    }
2845
2846    @Override
2847    public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
2848      MonitoredTask status, ThroughputController throughputController,
2849      FlushLifeCycleTracker tracker, Consumer<Path> writerCreationTracker) throws IOException {
2850      counter.incrementAndGet();
2851      return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker,
2852        writerCreationTracker);
2853    }
2854
2855    @Override
2856    protected void performFlush(InternalScanner scanner, final CellSink sink,
2857      ThroughputController throughputController) throws IOException {
2858
2859      final int currentCount = counter.get();
2860      CellSink newCellSink = (cell) -> {
2861        if (currentCount <= failCounter.get()) {
2862          throw new IOException("Simulated exception by tests");
2863        }
2864        sink.append(cell);
2865      };
2866      super.performFlush(scanner, newCellSink, throughputController);
2867    }
2868  }
2869
2870  /**
2871   * This test is for HBASE-26494, test the {@link RefCnt} behaviors in {@link ImmutableMemStoreLAB}
2872   */
2873  @Test
2874  public void testImmutableMemStoreLABRefCnt() throws Exception {
2875    Configuration conf = HBaseConfiguration.create();
2876
2877    byte[] smallValue = new byte[3];
2878    byte[] largeValue = new byte[9];
2879    final long timestamp = EnvironmentEdgeManager.currentTime();
2880    final long seqId = 100;
2881    final ExtendedCell smallCell1 = createCell(qf1, timestamp, seqId, smallValue);
2882    final ExtendedCell largeCell1 = createCell(qf2, timestamp, seqId, largeValue);
2883    final ExtendedCell smallCell2 = createCell(qf3, timestamp, seqId + 1, smallValue);
2884    final ExtendedCell largeCell2 = createCell(qf4, timestamp, seqId + 1, largeValue);
2885    final ExtendedCell smallCell3 = createCell(qf5, timestamp, seqId + 2, smallValue);
2886    final ExtendedCell largeCell3 = createCell(qf6, timestamp, seqId + 2, largeValue);
2887
2888    int smallCellByteSize = MutableSegment.getCellLength(smallCell1);
2889    int largeCellByteSize = MutableSegment.getCellLength(largeCell1);
2890    int firstWriteCellByteSize = (smallCellByteSize + largeCellByteSize);
2891    int flushByteSize = firstWriteCellByteSize - 2;
2892
2893    // set CompactingMemStore.inmemoryFlushSize to flushByteSize.
2894    conf.set(HStore.MEMSTORE_CLASS_NAME, CompactingMemStore.class.getName());
2895    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
2896    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
2897    conf.setBoolean(WALFactory.WAL_ENABLED, false);
2898
2899    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
2900      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
2901
2902    final CompactingMemStore myCompactingMemStore = ((CompactingMemStore) store.memstore);
2903    assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
2904    myCompactingMemStore.allowCompaction.set(false);
2905
2906    NonThreadSafeMemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
2907    store.add(smallCell1, memStoreSizing);
2908    store.add(largeCell1, memStoreSizing);
2909    store.add(smallCell2, memStoreSizing);
2910    store.add(largeCell2, memStoreSizing);
2911    store.add(smallCell3, memStoreSizing);
2912    store.add(largeCell3, memStoreSizing);
2913    VersionedSegmentsList versionedSegmentsList = myCompactingMemStore.getImmutableSegments();
2914    assertTrue(versionedSegmentsList.getNumOfSegments() == 3);
2915    List<ImmutableSegment> segments = versionedSegmentsList.getStoreSegments();
2916    List<MemStoreLABImpl> memStoreLABs = new ArrayList<MemStoreLABImpl>(segments.size());
2917    for (ImmutableSegment segment : segments) {
2918      memStoreLABs.add((MemStoreLABImpl) segment.getMemStoreLAB());
2919    }
2920    List<KeyValueScanner> scanners1 = myCompactingMemStore.getScanners(Long.MAX_VALUE);
2921    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2922      assertTrue(memStoreLAB.getRefCntValue() == 2);
2923    }
2924
2925    myCompactingMemStore.allowCompaction.set(true);
2926    myCompactingMemStore.flushInMemory();
2927
2928    versionedSegmentsList = myCompactingMemStore.getImmutableSegments();
2929    assertTrue(versionedSegmentsList.getNumOfSegments() == 1);
2930    ImmutableMemStoreLAB immutableMemStoreLAB =
2931      (ImmutableMemStoreLAB) (versionedSegmentsList.getStoreSegments().get(0).getMemStoreLAB());
2932    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2933      assertTrue(memStoreLAB.getRefCntValue() == 2);
2934    }
2935
2936    List<KeyValueScanner> scanners2 = myCompactingMemStore.getScanners(Long.MAX_VALUE);
2937    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2938      assertTrue(memStoreLAB.getRefCntValue() == 2);
2939    }
2940    assertTrue(immutableMemStoreLAB.getRefCntValue() == 2);
2941    for (KeyValueScanner scanner : scanners1) {
2942      scanner.close();
2943    }
2944    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2945      assertTrue(memStoreLAB.getRefCntValue() == 1);
2946    }
2947    for (KeyValueScanner scanner : scanners2) {
2948      scanner.close();
2949    }
2950    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2951      assertTrue(memStoreLAB.getRefCntValue() == 1);
2952    }
2953    assertTrue(immutableMemStoreLAB.getRefCntValue() == 1);
2954    flushStore(store, id++);
2955    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2956      assertTrue(memStoreLAB.getRefCntValue() == 0);
2957    }
2958    assertTrue(immutableMemStoreLAB.getRefCntValue() == 0);
2959    assertTrue(immutableMemStoreLAB.isClosed());
2960    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2961      assertTrue(memStoreLAB.isClosed());
2962      assertTrue(memStoreLAB.isReclaimed());
2963      assertTrue(memStoreLAB.chunks.isEmpty());
2964    }
2965  }
2966
2967  private HStoreFile mockStoreFileWithLength(long length) {
2968    HStoreFile sf = mock(HStoreFile.class);
2969    StoreFileReader sfr = mock(StoreFileReader.class);
2970    when(sf.isHFile()).thenReturn(true);
2971    when(sf.getReader()).thenReturn(sfr);
2972    when(sfr.length()).thenReturn(length);
2973    return sf;
2974  }
2975
2976  private static class MyThread extends Thread {
2977    private StoreScanner scanner;
2978    private KeyValueHeap heap;
2979
2980    public MyThread(StoreScanner scanner) {
2981      this.scanner = scanner;
2982    }
2983
2984    public KeyValueHeap getHeap() {
2985      return this.heap;
2986    }
2987
2988    @Override
2989    public void run() {
2990      scanner.trySwitchToStreamRead();
2991      heap = scanner.heap;
2992    }
2993  }
2994
2995  private static class MyMemStoreCompactor extends MemStoreCompactor {
2996    private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0);
2997    private static final CountDownLatch START_COMPACTOR_LATCH = new CountDownLatch(1);
2998
2999    public MyMemStoreCompactor(CompactingMemStore compactingMemStore,
3000      MemoryCompactionPolicy compactionPolicy) throws IllegalArgumentIOException {
3001      super(compactingMemStore, compactionPolicy);
3002    }
3003
3004    @Override
3005    public boolean start() throws IOException {
3006      boolean isFirst = RUNNER_COUNT.getAndIncrement() == 0;
3007      if (isFirst) {
3008        try {
3009          START_COMPACTOR_LATCH.await();
3010          return super.start();
3011        } catch (InterruptedException ex) {
3012          throw new RuntimeException(ex);
3013        }
3014      }
3015      return super.start();
3016    }
3017  }
3018
3019  public static class MyCompactingMemStoreWithCustomCompactor extends CompactingMemStore {
3020    private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0);
3021
3022    public MyCompactingMemStoreWithCustomCompactor(Configuration conf, CellComparatorImpl c,
3023      HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
3024      throws IOException {
3025      super(conf, c, store, regionServices, compactionPolicy);
3026    }
3027
3028    @Override
3029    protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy)
3030      throws IllegalArgumentIOException {
3031      return new MyMemStoreCompactor(this, compactionPolicy);
3032    }
3033
3034    @Override
3035    protected boolean setInMemoryCompactionFlag() {
3036      boolean rval = super.setInMemoryCompactionFlag();
3037      if (rval) {
3038        RUNNER_COUNT.incrementAndGet();
3039        if (LOG.isDebugEnabled()) {
3040          LOG.debug("runner count: " + RUNNER_COUNT.get());
3041        }
3042      }
3043      return rval;
3044    }
3045  }
3046
3047  public static class MyCompactingMemStore extends CompactingMemStore {
3048    private static final AtomicBoolean START_TEST = new AtomicBoolean(false);
3049    private final CountDownLatch getScannerLatch = new CountDownLatch(1);
3050    private final CountDownLatch snapshotLatch = new CountDownLatch(1);
3051
3052    public MyCompactingMemStore(Configuration conf, CellComparatorImpl c, HStore store,
3053      RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
3054      throws IOException {
3055      super(conf, c, store, regionServices, compactionPolicy);
3056    }
3057
3058    @Override
3059    protected List<KeyValueScanner> createList(int capacity) {
3060      if (START_TEST.get()) {
3061        try {
3062          getScannerLatch.countDown();
3063          snapshotLatch.await();
3064        } catch (InterruptedException e) {
3065          throw new RuntimeException(e);
3066        }
3067      }
3068      return new ArrayList<>(capacity);
3069    }
3070
3071    @Override
3072    protected void pushActiveToPipeline(MutableSegment active, boolean checkEmpty) {
3073      if (START_TEST.get()) {
3074        try {
3075          getScannerLatch.await();
3076        } catch (InterruptedException e) {
3077          throw new RuntimeException(e);
3078        }
3079      }
3080
3081      super.pushActiveToPipeline(active, checkEmpty);
3082      if (START_TEST.get()) {
3083        snapshotLatch.countDown();
3084      }
3085    }
3086  }
3087
3088  interface MyListHook {
3089    void hook(int currentSize);
3090  }
3091
3092  private static class MyList<T> implements List<T> {
3093    private final List<T> delegatee = new ArrayList<>();
3094    private final MyListHook hookAtAdd;
3095
3096    MyList(final MyListHook hookAtAdd) {
3097      this.hookAtAdd = hookAtAdd;
3098    }
3099
3100    @Override
3101    public int size() {
3102      return delegatee.size();
3103    }
3104
3105    @Override
3106    public boolean isEmpty() {
3107      return delegatee.isEmpty();
3108    }
3109
3110    @Override
3111    public boolean contains(Object o) {
3112      return delegatee.contains(o);
3113    }
3114
3115    @Override
3116    public Iterator<T> iterator() {
3117      return delegatee.iterator();
3118    }
3119
3120    @Override
3121    public Object[] toArray() {
3122      return delegatee.toArray();
3123    }
3124
3125    @Override
3126    public <R> R[] toArray(R[] a) {
3127      return delegatee.toArray(a);
3128    }
3129
3130    @Override
3131    public boolean add(T e) {
3132      hookAtAdd.hook(size());
3133      return delegatee.add(e);
3134    }
3135
3136    @Override
3137    public boolean remove(Object o) {
3138      return delegatee.remove(o);
3139    }
3140
3141    @Override
3142    public boolean containsAll(Collection<?> c) {
3143      return delegatee.containsAll(c);
3144    }
3145
3146    @Override
3147    public boolean addAll(Collection<? extends T> c) {
3148      return delegatee.addAll(c);
3149    }
3150
3151    @Override
3152    public boolean addAll(int index, Collection<? extends T> c) {
3153      return delegatee.addAll(index, c);
3154    }
3155
3156    @Override
3157    public boolean removeAll(Collection<?> c) {
3158      return delegatee.removeAll(c);
3159    }
3160
3161    @Override
3162    public boolean retainAll(Collection<?> c) {
3163      return delegatee.retainAll(c);
3164    }
3165
3166    @Override
3167    public void clear() {
3168      delegatee.clear();
3169    }
3170
3171    @Override
3172    public T get(int index) {
3173      return delegatee.get(index);
3174    }
3175
3176    @Override
3177    public T set(int index, T element) {
3178      return delegatee.set(index, element);
3179    }
3180
3181    @Override
3182    public void add(int index, T element) {
3183      delegatee.add(index, element);
3184    }
3185
3186    @Override
3187    public T remove(int index) {
3188      return delegatee.remove(index);
3189    }
3190
3191    @Override
3192    public int indexOf(Object o) {
3193      return delegatee.indexOf(o);
3194    }
3195
3196    @Override
3197    public int lastIndexOf(Object o) {
3198      return delegatee.lastIndexOf(o);
3199    }
3200
3201    @Override
3202    public ListIterator<T> listIterator() {
3203      return delegatee.listIterator();
3204    }
3205
3206    @Override
3207    public ListIterator<T> listIterator(int index) {
3208      return delegatee.listIterator(index);
3209    }
3210
3211    @Override
3212    public List<T> subList(int fromIndex, int toIndex) {
3213      return delegatee.subList(fromIndex, toIndex);
3214    }
3215  }
3216
3217  private interface MyKeyValueHeapHook {
3218    void onRecordBlockSize(int recordBlockSizeCallCount);
3219  }
3220
3221  private static class MyKeyValueHeap extends KeyValueHeap {
3222    private final MyKeyValueHeapHook hook;
3223    private int recordBlockSizeCallCount;
3224
3225    public MyKeyValueHeap(List<? extends KeyValueScanner> scanners, CellComparator comparator,
3226      MyKeyValueHeapHook hook) throws IOException {
3227      super(scanners, comparator);
3228      this.hook = hook;
3229    }
3230
3231    @Override
3232    public void recordBlockSize(IntConsumer blockSizeConsumer) {
3233      recordBlockSizeCallCount++;
3234      hook.onRecordBlockSize(recordBlockSizeCallCount);
3235      super.recordBlockSize(blockSizeConsumer);
3236    }
3237  }
3238
3239  public static class MyCompactingMemStore2 extends CompactingMemStore {
3240    private static final String LARGE_CELL_THREAD_NAME = "largeCellThread";
3241    private static final String SMALL_CELL_THREAD_NAME = "smallCellThread";
3242    private final CyclicBarrier preCyclicBarrier = new CyclicBarrier(2);
3243    private final CyclicBarrier postCyclicBarrier = new CyclicBarrier(2);
3244    private final AtomicInteger largeCellPreUpdateCounter = new AtomicInteger(0);
3245    private final AtomicInteger smallCellPreUpdateCounter = new AtomicInteger(0);
3246
3247    public MyCompactingMemStore2(Configuration conf, CellComparatorImpl cellComparator,
3248      HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
3249      throws IOException {
3250      super(conf, cellComparator, store, regionServices, compactionPolicy);
3251    }
3252
3253    @Override
3254    protected boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd,
3255      MemStoreSizing memstoreSizing) {
3256      if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) {
3257        int currentCount = largeCellPreUpdateCounter.incrementAndGet();
3258        if (currentCount <= 1) {
3259          try {
3260            /**
3261             * smallCellThread enters CompactingMemStore.checkAndAddToActiveSize first, then
3262             * largeCellThread enters CompactingMemStore.checkAndAddToActiveSize, and then
3263             * largeCellThread invokes flushInMemory.
3264             */
3265            preCyclicBarrier.await();
3266          } catch (Throwable e) {
3267            throw new RuntimeException(e);
3268          }
3269        }
3270      }
3271
3272      boolean returnValue = super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing);
3273      if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) {
3274        try {
3275          preCyclicBarrier.await();
3276        } catch (Throwable e) {
3277          throw new RuntimeException(e);
3278        }
3279      }
3280      return returnValue;
3281    }
3282
3283    @Override
3284    protected void doAdd(MutableSegment currentActive, ExtendedCell cell,
3285      MemStoreSizing memstoreSizing) {
3286      if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) {
3287        try {
3288          /**
3289           * After largeCellThread finished flushInMemory method, smallCellThread can add cell to
3290           * currentActive . That is to say when largeCellThread called flushInMemory method,
3291           * currentActive has no cell.
3292           */
3293          postCyclicBarrier.await();
3294        } catch (Throwable e) {
3295          throw new RuntimeException(e);
3296        }
3297      }
3298      super.doAdd(currentActive, cell, memstoreSizing);
3299    }
3300
3301    @Override
3302    protected void flushInMemory(MutableSegment currentActiveMutableSegment) {
3303      super.flushInMemory(currentActiveMutableSegment);
3304      if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) {
3305        if (largeCellPreUpdateCounter.get() <= 1) {
3306          try {
3307            postCyclicBarrier.await();
3308          } catch (Throwable e) {
3309            throw new RuntimeException(e);
3310          }
3311        }
3312      }
3313    }
3314
3315  }
3316
3317  public static class MyCompactingMemStore3 extends CompactingMemStore {
3318    private static final String LARGE_CELL_THREAD_NAME = "largeCellThread";
3319    private static final String SMALL_CELL_THREAD_NAME = "smallCellThread";
3320
3321    private final CyclicBarrier preCyclicBarrier = new CyclicBarrier(2);
3322    private final CyclicBarrier postCyclicBarrier = new CyclicBarrier(2);
3323    private final AtomicInteger flushCounter = new AtomicInteger(0);
3324    private static final int CELL_COUNT = 5;
3325    private boolean flushByteSizeLessThanSmallAndLargeCellSize = true;
3326
3327    public MyCompactingMemStore3(Configuration conf, CellComparatorImpl cellComparator,
3328      HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
3329      throws IOException {
3330      super(conf, cellComparator, store, regionServices, compactionPolicy);
3331    }
3332
3333    @Override
3334    protected boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd,
3335      MemStoreSizing memstoreSizing) {
3336      if (!flushByteSizeLessThanSmallAndLargeCellSize) {
3337        return super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing);
3338      }
3339      if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) {
3340        try {
3341          preCyclicBarrier.await();
3342        } catch (Throwable e) {
3343          throw new RuntimeException(e);
3344        }
3345      }
3346
3347      boolean returnValue = super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing);
3348      if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) {
3349        try {
3350          preCyclicBarrier.await();
3351        } catch (Throwable e) {
3352          throw new RuntimeException(e);
3353        }
3354      }
3355      return returnValue;
3356    }
3357
3358    @Override
3359    protected void postUpdate(MutableSegment currentActiveMutableSegment) {
3360      super.postUpdate(currentActiveMutableSegment);
3361      if (!flushByteSizeLessThanSmallAndLargeCellSize) {
3362        try {
3363          postCyclicBarrier.await();
3364        } catch (Throwable e) {
3365          throw new RuntimeException(e);
3366        }
3367        return;
3368      }
3369
3370      if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) {
3371        try {
3372          postCyclicBarrier.await();
3373        } catch (Throwable e) {
3374          throw new RuntimeException(e);
3375        }
3376      }
3377    }
3378
3379    @Override
3380    protected void flushInMemory(MutableSegment currentActiveMutableSegment) {
3381      super.flushInMemory(currentActiveMutableSegment);
3382      flushCounter.incrementAndGet();
3383      if (!flushByteSizeLessThanSmallAndLargeCellSize) {
3384        return;
3385      }
3386
3387      assertTrue(Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME));
3388      try {
3389        postCyclicBarrier.await();
3390      } catch (Throwable e) {
3391        throw new RuntimeException(e);
3392      }
3393
3394    }
3395
3396    void disableCompaction() {
3397      allowCompaction.set(false);
3398    }
3399
3400    void enableCompaction() {
3401      allowCompaction.set(true);
3402    }
3403
3404  }
3405
3406  public static class MyCompactingMemStore4 extends CompactingMemStore {
3407    private static final String TAKE_SNAPSHOT_THREAD_NAME = "takeSnapShotThread";
3408    /**
3409     * {@link CompactingMemStore#flattenOneSegment} must execute after
3410     * {@link CompactingMemStore#getImmutableSegments}
3411     */
3412    private final CyclicBarrier flattenOneSegmentPreCyclicBarrier = new CyclicBarrier(2);
3413    /**
3414     * Only after {@link CompactingMemStore#flattenOneSegment} completed,
3415     * {@link CompactingMemStore#swapPipelineWithNull} could execute.
3416     */
3417    private final CyclicBarrier flattenOneSegmentPostCyclicBarrier = new CyclicBarrier(2);
3418    /**
3419     * Only the in memory compact thread enters {@link CompactingMemStore#flattenOneSegment},the
3420     * snapshot thread starts {@link CompactingMemStore#snapshot},because
3421     * {@link CompactingMemStore#snapshot} would invoke {@link CompactingMemStore#stopCompaction}.
3422     */
3423    private final CyclicBarrier snapShotStartCyclicCyclicBarrier = new CyclicBarrier(2);
3424    /**
3425     * To wait for {@link CompactingMemStore.InMemoryCompactionRunnable} stopping.
3426     */
3427    private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2);
3428    private final AtomicInteger getImmutableSegmentsListCounter = new AtomicInteger(0);
3429    private final AtomicInteger swapPipelineWithNullCounter = new AtomicInteger(0);
3430    private final AtomicInteger flattenOneSegmentCounter = new AtomicInteger(0);
3431    private final AtomicInteger setInMemoryCompactionFlagCounter = new AtomicInteger(0);
3432
3433    public MyCompactingMemStore4(Configuration conf, CellComparatorImpl cellComparator,
3434      HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
3435      throws IOException {
3436      super(conf, cellComparator, store, regionServices, compactionPolicy);
3437    }
3438
3439    @Override
3440    public VersionedSegmentsList getImmutableSegments() {
3441      VersionedSegmentsList result = super.getImmutableSegments();
3442      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
3443        int currentCount = getImmutableSegmentsListCounter.incrementAndGet();
3444        if (currentCount <= 1) {
3445          try {
3446            flattenOneSegmentPreCyclicBarrier.await();
3447          } catch (Throwable e) {
3448            throw new RuntimeException(e);
3449          }
3450        }
3451      }
3452      return result;
3453    }
3454
3455    @Override
3456    protected boolean swapPipelineWithNull(VersionedSegmentsList segments) {
3457      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
3458        int currentCount = swapPipelineWithNullCounter.incrementAndGet();
3459        if (currentCount <= 1) {
3460          try {
3461            flattenOneSegmentPostCyclicBarrier.await();
3462          } catch (Throwable e) {
3463            throw new RuntimeException(e);
3464          }
3465        }
3466      }
3467      boolean result = super.swapPipelineWithNull(segments);
3468      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
3469        int currentCount = swapPipelineWithNullCounter.get();
3470        if (currentCount <= 1) {
3471          assertTrue(!result);
3472        }
3473        if (currentCount == 2) {
3474          assertTrue(result);
3475        }
3476      }
3477      return result;
3478
3479    }
3480
3481    @Override
3482    public void flattenOneSegment(long requesterVersion, Action action) {
3483      int currentCount = flattenOneSegmentCounter.incrementAndGet();
3484      if (currentCount <= 1) {
3485        try {
3486          /**
3487           * {@link CompactingMemStore#snapshot} could start.
3488           */
3489          snapShotStartCyclicCyclicBarrier.await();
3490          flattenOneSegmentPreCyclicBarrier.await();
3491        } catch (Throwable e) {
3492          throw new RuntimeException(e);
3493        }
3494      }
3495      super.flattenOneSegment(requesterVersion, action);
3496      if (currentCount <= 1) {
3497        try {
3498          flattenOneSegmentPostCyclicBarrier.await();
3499        } catch (Throwable e) {
3500          throw new RuntimeException(e);
3501        }
3502      }
3503    }
3504
3505    @Override
3506    protected boolean setInMemoryCompactionFlag() {
3507      boolean result = super.setInMemoryCompactionFlag();
3508      assertTrue(result);
3509      setInMemoryCompactionFlagCounter.incrementAndGet();
3510      return result;
3511    }
3512
3513    @Override
3514    void inMemoryCompaction() {
3515      try {
3516        super.inMemoryCompaction();
3517      } finally {
3518        try {
3519          inMemoryCompactionEndCyclicBarrier.await();
3520        } catch (Throwable e) {
3521          throw new RuntimeException(e);
3522        }
3523
3524      }
3525    }
3526
3527  }
3528
3529  public static class MyCompactingMemStore5 extends CompactingMemStore {
3530    private static final String TAKE_SNAPSHOT_THREAD_NAME = "takeSnapShotThread";
3531    private static final String WRITE_AGAIN_THREAD_NAME = "writeAgainThread";
3532    /**
3533     * {@link CompactingMemStore#flattenOneSegment} must execute after
3534     * {@link CompactingMemStore#getImmutableSegments}
3535     */
3536    private final CyclicBarrier flattenOneSegmentPreCyclicBarrier = new CyclicBarrier(2);
3537    /**
3538     * Only after {@link CompactingMemStore#flattenOneSegment} completed,
3539     * {@link CompactingMemStore#swapPipelineWithNull} could execute.
3540     */
3541    private final CyclicBarrier flattenOneSegmentPostCyclicBarrier = new CyclicBarrier(2);
3542    /**
3543     * Only the in memory compact thread enters {@link CompactingMemStore#flattenOneSegment},the
3544     * snapshot thread starts {@link CompactingMemStore#snapshot},because
3545     * {@link CompactingMemStore#snapshot} would invoke {@link CompactingMemStore#stopCompaction}.
3546     */
3547    private final CyclicBarrier snapShotStartCyclicCyclicBarrier = new CyclicBarrier(2);
3548    /**
3549     * To wait for {@link CompactingMemStore.InMemoryCompactionRunnable} stopping.
3550     */
3551    private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2);
3552    private final AtomicInteger getImmutableSegmentsListCounter = new AtomicInteger(0);
3553    private final AtomicInteger swapPipelineWithNullCounter = new AtomicInteger(0);
3554    private final AtomicInteger flattenOneSegmentCounter = new AtomicInteger(0);
3555    private final AtomicInteger setInMemoryCompactionFlagCounter = new AtomicInteger(0);
3556    /**
3557     * Only the snapshot thread retry {@link CompactingMemStore#swapPipelineWithNull}, writeAgain
3558     * thread could start.
3559     */
3560    private final CyclicBarrier writeMemStoreAgainStartCyclicBarrier = new CyclicBarrier(2);
3561    /**
3562     * This is used for snapshot thread,writeAgain thread and in memory compact thread. Only the
3563     * writeAgain thread completes, {@link CompactingMemStore#swapPipelineWithNull} would
3564     * execute,and in memory compact thread would exit,because we expect that in memory compact
3565     * executing only once.
3566     */
3567    private final CyclicBarrier writeMemStoreAgainEndCyclicBarrier = new CyclicBarrier(3);
3568
3569    public MyCompactingMemStore5(Configuration conf, CellComparatorImpl cellComparator,
3570      HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
3571      throws IOException {
3572      super(conf, cellComparator, store, regionServices, compactionPolicy);
3573    }
3574
3575    @Override
3576    public VersionedSegmentsList getImmutableSegments() {
3577      VersionedSegmentsList result = super.getImmutableSegments();
3578      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
3579        int currentCount = getImmutableSegmentsListCounter.incrementAndGet();
3580        if (currentCount <= 1) {
3581          try {
3582            flattenOneSegmentPreCyclicBarrier.await();
3583          } catch (Throwable e) {
3584            throw new RuntimeException(e);
3585          }
3586        }
3587
3588      }
3589
3590      return result;
3591    }
3592
3593    @Override
3594    protected boolean swapPipelineWithNull(VersionedSegmentsList segments) {
3595      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
3596        int currentCount = swapPipelineWithNullCounter.incrementAndGet();
3597        if (currentCount <= 1) {
3598          try {
3599            flattenOneSegmentPostCyclicBarrier.await();
3600          } catch (Throwable e) {
3601            throw new RuntimeException(e);
3602          }
3603        }
3604
3605        if (currentCount == 2) {
3606          try {
3607            /**
3608             * Only the snapshot thread retry {@link CompactingMemStore#swapPipelineWithNull},
3609             * writeAgain thread could start.
3610             */
3611            writeMemStoreAgainStartCyclicBarrier.await();
3612            /**
3613             * Only the writeAgain thread completes, retry
3614             * {@link CompactingMemStore#swapPipelineWithNull} would execute.
3615             */
3616            writeMemStoreAgainEndCyclicBarrier.await();
3617          } catch (Throwable e) {
3618            throw new RuntimeException(e);
3619          }
3620        }
3621
3622      }
3623      boolean result = super.swapPipelineWithNull(segments);
3624      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
3625        int currentCount = swapPipelineWithNullCounter.get();
3626        if (currentCount <= 1) {
3627          assertTrue(!result);
3628        }
3629        if (currentCount == 2) {
3630          assertTrue(result);
3631        }
3632      }
3633      return result;
3634
3635    }
3636
3637    @Override
3638    public void flattenOneSegment(long requesterVersion, Action action) {
3639      int currentCount = flattenOneSegmentCounter.incrementAndGet();
3640      if (currentCount <= 1) {
3641        try {
3642          /**
3643           * {@link CompactingMemStore#snapshot} could start.
3644           */
3645          snapShotStartCyclicCyclicBarrier.await();
3646          flattenOneSegmentPreCyclicBarrier.await();
3647        } catch (Throwable e) {
3648          throw new RuntimeException(e);
3649        }
3650      }
3651      super.flattenOneSegment(requesterVersion, action);
3652      if (currentCount <= 1) {
3653        try {
3654          flattenOneSegmentPostCyclicBarrier.await();
3655          /**
3656           * Only the writeAgain thread completes, in memory compact thread would exit,because we
3657           * expect that in memory compact executing only once.
3658           */
3659          writeMemStoreAgainEndCyclicBarrier.await();
3660        } catch (Throwable e) {
3661          throw new RuntimeException(e);
3662        }
3663
3664      }
3665    }
3666
3667    @Override
3668    protected boolean setInMemoryCompactionFlag() {
3669      boolean result = super.setInMemoryCompactionFlag();
3670      int count = setInMemoryCompactionFlagCounter.incrementAndGet();
3671      if (count <= 1) {
3672        assertTrue(result);
3673      }
3674      if (count == 2) {
3675        assertTrue(!result);
3676      }
3677      return result;
3678    }
3679
3680    @Override
3681    void inMemoryCompaction() {
3682      try {
3683        super.inMemoryCompaction();
3684      } finally {
3685        try {
3686          inMemoryCompactionEndCyclicBarrier.await();
3687        } catch (Throwable e) {
3688          throw new RuntimeException(e);
3689        }
3690
3691      }
3692    }
3693  }
3694
3695  public static class MyCompactingMemStore6 extends CompactingMemStore {
3696    private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2);
3697
3698    public MyCompactingMemStore6(Configuration conf, CellComparatorImpl cellComparator,
3699      HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
3700      throws IOException {
3701      super(conf, cellComparator, store, regionServices, compactionPolicy);
3702    }
3703
3704    @Override
3705    void inMemoryCompaction() {
3706      try {
3707        super.inMemoryCompaction();
3708      } finally {
3709        try {
3710          inMemoryCompactionEndCyclicBarrier.await();
3711        } catch (Throwable e) {
3712          throw new RuntimeException(e);
3713        }
3714
3715      }
3716    }
3717  }
3718
3719  public static class MyDefaultMemStore extends DefaultMemStore {
3720    private static final String GET_SCANNER_THREAD_NAME = "getScannerMyThread";
3721    private static final String FLUSH_THREAD_NAME = "flushMyThread";
3722    /**
3723     * Only when flush thread enters {@link DefaultMemStore#doClearSnapShot}, getScanner thread
3724     * could start.
3725     */
3726    private final CyclicBarrier getScannerCyclicBarrier = new CyclicBarrier(2);
3727    /**
3728     * Used by getScanner thread notifies flush thread {@link DefaultMemStore#getSnapshotSegments}
3729     * completed, {@link DefaultMemStore#doClearSnapShot} could continue.
3730     */
3731    private final CyclicBarrier preClearSnapShotCyclicBarrier = new CyclicBarrier(2);
3732    /**
3733     * Used by flush thread notifies getScanner thread {@link DefaultMemStore#doClearSnapShot}
3734     * completed, {@link DefaultMemStore#getScanners} could continue.
3735     */
3736    private final CyclicBarrier postClearSnapShotCyclicBarrier = new CyclicBarrier(2);
3737    private final AtomicInteger getSnapshotSegmentsCounter = new AtomicInteger(0);
3738    private final AtomicInteger clearSnapshotCounter = new AtomicInteger(0);
3739    private volatile boolean shouldWait = true;
3740    private volatile HStore store = null;
3741
3742    public MyDefaultMemStore(Configuration conf, CellComparator cellComparator,
3743      RegionServicesForStores regionServices) throws IOException {
3744      super(conf, cellComparator, regionServices);
3745    }
3746
3747    @Override
3748    protected List<Segment> getSnapshotSegments() {
3749
3750      List<Segment> result = super.getSnapshotSegments();
3751
3752      if (Thread.currentThread().getName().equals(GET_SCANNER_THREAD_NAME)) {
3753        int currentCount = getSnapshotSegmentsCounter.incrementAndGet();
3754        if (currentCount == 1) {
3755          if (this.shouldWait) {
3756            try {
3757              /**
3758               * Notify flush thread {@link DefaultMemStore#getSnapshotSegments} completed,
3759               * {@link DefaultMemStore#doClearSnapShot} could continue.
3760               */
3761              preClearSnapShotCyclicBarrier.await();
3762              /**
3763               * Wait for {@link DefaultMemStore#doClearSnapShot} completed.
3764               */
3765              postClearSnapShotCyclicBarrier.await();
3766
3767            } catch (Throwable e) {
3768              throw new RuntimeException(e);
3769            }
3770          }
3771        }
3772      }
3773      return result;
3774    }
3775
3776    @Override
3777    protected void doClearSnapShot() {
3778      if (Thread.currentThread().getName().equals(FLUSH_THREAD_NAME)) {
3779        int currentCount = clearSnapshotCounter.incrementAndGet();
3780        if (currentCount == 1) {
3781          try {
3782            if (
3783              ((ReentrantReadWriteLock) store.getStoreEngine().getLock())
3784                .isWriteLockedByCurrentThread()
3785            ) {
3786              shouldWait = false;
3787            }
3788            /**
3789             * Only when flush thread enters {@link DefaultMemStore#doClearSnapShot}, getScanner
3790             * thread could start.
3791             */
3792            getScannerCyclicBarrier.await();
3793
3794            if (shouldWait) {
3795              /**
3796               * Wait for {@link DefaultMemStore#getSnapshotSegments} completed.
3797               */
3798              preClearSnapShotCyclicBarrier.await();
3799            }
3800          } catch (Throwable e) {
3801            throw new RuntimeException(e);
3802          }
3803        }
3804      }
3805      super.doClearSnapShot();
3806
3807      if (Thread.currentThread().getName().equals(FLUSH_THREAD_NAME)) {
3808        int currentCount = clearSnapshotCounter.get();
3809        if (currentCount == 1) {
3810          if (shouldWait) {
3811            try {
3812              /**
3813               * Notify getScanner thread {@link DefaultMemStore#doClearSnapShot} completed,
3814               * {@link DefaultMemStore#getScanners} could continue.
3815               */
3816              postClearSnapShotCyclicBarrier.await();
3817            } catch (Throwable e) {
3818              throw new RuntimeException(e);
3819            }
3820          }
3821        }
3822      }
3823    }
3824  }
3825}