001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.io.hfile.CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY;
021import static org.apache.hadoop.hbase.io.hfile.CacheConfig.CACHE_DATA_ON_READ_KEY;
022import static org.apache.hadoop.hbase.io.hfile.CacheConfig.DEFAULT_CACHE_DATA_ON_READ;
023import static org.apache.hadoop.hbase.io.hfile.CacheConfig.DEFAULT_CACHE_DATA_ON_WRITE;
024import static org.apache.hadoop.hbase.io.hfile.CacheConfig.DEFAULT_EVICT_ON_CLOSE;
025import static org.apache.hadoop.hbase.io.hfile.CacheConfig.EVICT_BLOCKS_ON_CLOSE_KEY;
026import static org.apache.hadoop.hbase.regionserver.DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY;
027import static org.junit.jupiter.api.Assertions.assertArrayEquals;
028import static org.junit.jupiter.api.Assertions.assertEquals;
029import static org.junit.jupiter.api.Assertions.assertFalse;
030import static org.junit.jupiter.api.Assertions.assertNull;
031import static org.junit.jupiter.api.Assertions.assertTrue;
032import static org.junit.jupiter.api.Assertions.fail;
033import static org.mockito.ArgumentMatchers.any;
034import static org.mockito.Mockito.mock;
035import static org.mockito.Mockito.spy;
036import static org.mockito.Mockito.times;
037import static org.mockito.Mockito.verify;
038import static org.mockito.Mockito.when;
039
040import java.io.FileNotFoundException;
041import java.io.IOException;
042import java.lang.ref.SoftReference;
043import java.security.PrivilegedExceptionAction;
044import java.util.ArrayList;
045import java.util.Arrays;
046import java.util.Collection;
047import java.util.Collections;
048import java.util.Iterator;
049import java.util.List;
050import java.util.ListIterator;
051import java.util.NavigableSet;
052import java.util.Optional;
053import java.util.TreeSet;
054import java.util.concurrent.BrokenBarrierException;
055import java.util.concurrent.ConcurrentSkipListSet;
056import java.util.concurrent.CountDownLatch;
057import java.util.concurrent.CyclicBarrier;
058import java.util.concurrent.ExecutorService;
059import java.util.concurrent.Executors;
060import java.util.concurrent.Future;
061import java.util.concurrent.ThreadPoolExecutor;
062import java.util.concurrent.TimeUnit;
063import java.util.concurrent.atomic.AtomicBoolean;
064import java.util.concurrent.atomic.AtomicInteger;
065import java.util.concurrent.atomic.AtomicLong;
066import java.util.concurrent.atomic.AtomicReference;
067import java.util.concurrent.locks.ReentrantReadWriteLock;
068import java.util.function.Consumer;
069import java.util.function.IntBinaryOperator;
070import java.util.function.IntConsumer;
071import org.apache.hadoop.conf.Configuration;
072import org.apache.hadoop.fs.FSDataOutputStream;
073import org.apache.hadoop.fs.FileStatus;
074import org.apache.hadoop.fs.FileSystem;
075import org.apache.hadoop.fs.FilterFileSystem;
076import org.apache.hadoop.fs.LocalFileSystem;
077import org.apache.hadoop.fs.Path;
078import org.apache.hadoop.fs.permission.FsPermission;
079import org.apache.hadoop.hbase.Cell;
080import org.apache.hadoop.hbase.CellBuilderType;
081import org.apache.hadoop.hbase.CellComparator;
082import org.apache.hadoop.hbase.CellComparatorImpl;
083import org.apache.hadoop.hbase.CellUtil;
084import org.apache.hadoop.hbase.ExtendedCell;
085import org.apache.hadoop.hbase.ExtendedCellBuilderFactory;
086import org.apache.hadoop.hbase.HBaseConfiguration;
087import org.apache.hadoop.hbase.HBaseTestingUtil;
088import org.apache.hadoop.hbase.HConstants;
089import org.apache.hadoop.hbase.KeyValue;
090import org.apache.hadoop.hbase.MemoryCompactionPolicy;
091import org.apache.hadoop.hbase.NamespaceDescriptor;
092import org.apache.hadoop.hbase.PrivateCellUtil;
093import org.apache.hadoop.hbase.TableName;
094import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
095import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
096import org.apache.hadoop.hbase.client.Get;
097import org.apache.hadoop.hbase.client.RegionInfo;
098import org.apache.hadoop.hbase.client.RegionInfoBuilder;
099import org.apache.hadoop.hbase.client.Scan;
100import org.apache.hadoop.hbase.client.Scan.ReadType;
101import org.apache.hadoop.hbase.client.TableDescriptor;
102import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
103import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
104import org.apache.hadoop.hbase.filter.Filter;
105import org.apache.hadoop.hbase.filter.FilterBase;
106import org.apache.hadoop.hbase.io.compress.Compression;
107import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
108import org.apache.hadoop.hbase.io.hfile.CacheConfig;
109import org.apache.hadoop.hbase.io.hfile.HFile;
110import org.apache.hadoop.hbase.io.hfile.HFileContext;
111import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
112import org.apache.hadoop.hbase.monitoring.MonitoredTask;
113import org.apache.hadoop.hbase.nio.RefCnt;
114import org.apache.hadoop.hbase.quotas.RegionSizeStoreImpl;
115import org.apache.hadoop.hbase.regionserver.ChunkCreator.ChunkType;
116import org.apache.hadoop.hbase.regionserver.MemStoreCompactionStrategy.Action;
117import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
118import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
119import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
120import org.apache.hadoop.hbase.regionserver.compactions.EverythingPolicy;
121import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
122import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
123import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
124import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
125import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
126import org.apache.hadoop.hbase.security.User;
127import org.apache.hadoop.hbase.testclassification.MediumTests;
128import org.apache.hadoop.hbase.testclassification.RegionServerTests;
129import org.apache.hadoop.hbase.util.BloomFilterUtil;
130import org.apache.hadoop.hbase.util.Bytes;
131import org.apache.hadoop.hbase.util.CommonFSUtils;
132import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
133import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
134import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
135import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
136import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
137import org.apache.hadoop.hbase.wal.WALFactory;
138import org.apache.hadoop.util.Progressable;
139import org.junit.jupiter.api.AfterAll;
140import org.junit.jupiter.api.AfterEach;
141import org.junit.jupiter.api.BeforeEach;
142import org.junit.jupiter.api.Tag;
143import org.junit.jupiter.api.Test;
144import org.junit.jupiter.api.TestInfo;
145import org.junit.jupiter.api.Timeout;
146import org.mockito.Mockito;
147import org.slf4j.Logger;
148import org.slf4j.LoggerFactory;
149
150import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
151
152/**
153 * Test class for the HStore
154 */
155@Tag(RegionServerTests.TAG)
156@Tag(MediumTests.TAG)
157public class TestHStore {
158
159  private static final Logger LOG = LoggerFactory.getLogger(TestHStore.class);
160  private String name;
161
162  HRegion region;
163  HStore store;
164  byte[] table = Bytes.toBytes("table");
165  byte[] family = Bytes.toBytes("family");
166
167  byte[] row = Bytes.toBytes("row");
168  byte[] row2 = Bytes.toBytes("row2");
169  byte[] qf1 = Bytes.toBytes("qf1");
170  byte[] qf2 = Bytes.toBytes("qf2");
171  byte[] qf3 = Bytes.toBytes("qf3");
172  byte[] qf4 = Bytes.toBytes("qf4");
173  byte[] qf5 = Bytes.toBytes("qf5");
174  byte[] qf6 = Bytes.toBytes("qf6");
175
176  NavigableSet<byte[]> qualifiers = new ConcurrentSkipListSet<>(Bytes.BYTES_COMPARATOR);
177
178  List<Cell> expected = new ArrayList<>();
179  List<Cell> result = new ArrayList<>();
180
181  long id = EnvironmentEdgeManager.currentTime();
182  Get get = new Get(row);
183
184  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
185  private static final String DIR = TEST_UTIL.getDataTestDir("TestStore").toString();
186
187  @BeforeEach
188  public void setUp(TestInfo testInfo) throws IOException {
189    this.name = testInfo.getTestMethod().get().getName();
190    qualifiers.clear();
191    qualifiers.add(qf1);
192    qualifiers.add(qf3);
193    qualifiers.add(qf5);
194
195    Iterator<byte[]> iter = qualifiers.iterator();
196    while (iter.hasNext()) {
197      byte[] next = iter.next();
198      expected.add(new KeyValue(row, family, next, 1, (byte[]) null));
199      get.addColumn(family, next);
200    }
201  }
202
203  private void init(String methodName) throws IOException {
204    init(methodName, TEST_UTIL.getConfiguration());
205  }
206
207  private HStore init(String methodName, Configuration conf) throws IOException {
208    // some of the tests write 4 versions and then flush
209    // (with HBASE-4241, lower versions are collected on flush)
210    return init(methodName, conf,
211      ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(4).build());
212  }
213
214  private HStore init(String methodName, Configuration conf, ColumnFamilyDescriptor hcd)
215    throws IOException {
216    return init(methodName, conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), hcd);
217  }
218
219  private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder,
220    ColumnFamilyDescriptor hcd) throws IOException {
221    return init(methodName, conf, builder, hcd, null);
222  }
223
224  private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder,
225    ColumnFamilyDescriptor hcd, MyStoreHook hook) throws IOException {
226    return init(methodName, conf, builder, hcd, hook, false);
227  }
228
229  private void initHRegion(String methodName, Configuration conf, TableDescriptorBuilder builder,
230    ColumnFamilyDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException {
231    TableDescriptor htd = builder.setColumnFamily(hcd).build();
232    Path basedir = new Path(DIR + methodName);
233    Path tableDir = CommonFSUtils.getTableDir(basedir, htd.getTableName());
234    final Path logdir = new Path(basedir, AbstractFSWALProvider.getWALDirectoryName(methodName));
235
236    FileSystem fs = FileSystem.get(conf);
237
238    fs.delete(logdir, true);
239    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false,
240      MemStoreLABImpl.CHUNK_SIZE_DEFAULT, 1, 0, null,
241      MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
242    RegionInfo info = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
243    Configuration walConf = new Configuration(conf);
244    CommonFSUtils.setRootDir(walConf, basedir);
245    WALFactory wals = new WALFactory(walConf, methodName);
246    region = new HRegion(new HRegionFileSystem(conf, fs, tableDir, info), wals.getWAL(info), conf,
247      htd, null);
248    region.regionServicesForStores = Mockito.spy(region.regionServicesForStores);
249    ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
250    Mockito.when(region.regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool);
251  }
252
253  private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder,
254    ColumnFamilyDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException {
255    initHRegion(methodName, conf, builder, hcd, hook, switchToPread);
256    if (hook == null) {
257      store = new HStore(region, hcd, conf, false);
258    } else {
259      store = new MyStore(region, hcd, conf, hook, switchToPread);
260    }
261    region.stores.put(store.getColumnFamilyDescriptor().getName(), store);
262    return store;
263  }
264
265  /**
266   * Test we do not lose data if we fail a flush and then close. Part of HBase-10466
267   */
268
269  @Test
270  public void testFlushSizeSizing() throws Exception {
271    LOG.info("Setting up a faulty file system that cannot write in " + name);
272    final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
273    // Only retry once.
274    conf.setInt("hbase.hstore.flush.retries.number", 1);
275    User user = User.createUserForTesting(conf, name, new String[] { "foo" });
276    // Inject our faulty LocalFileSystem
277    conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
278    user.runAs(new PrivilegedExceptionAction<Object>() {
279      @Override
280      public Object run() throws Exception {
281        // Make sure it worked (above is sensitive to caching details in hadoop core)
282        FileSystem fs = FileSystem.get(conf);
283        assertEquals(FaultyFileSystem.class, fs.getClass());
284        FaultyFileSystem ffs = (FaultyFileSystem) fs;
285
286        // Initialize region
287        init(name, conf);
288
289        MemStoreSize mss = store.memstore.getFlushableSize();
290        assertEquals(0, mss.getDataSize());
291        LOG.info("Adding some data");
292        MemStoreSizing kvSize = new NonThreadSafeMemStoreSizing();
293        store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), kvSize);
294        // add the heap size of active (mutable) segment
295        kvSize.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0, 0);
296        mss = store.memstore.getFlushableSize();
297        assertEquals(kvSize.getMemStoreSize(), mss);
298        // Flush. Bug #1 from HBASE-10466. Make sure size calculation on failed flush is right.
299        try {
300          LOG.info("Flushing");
301          flushStore(store, id++);
302          fail("Didn't bubble up IOE!");
303        } catch (IOException ioe) {
304          assertTrue(ioe.getMessage().contains("Fault injected"));
305        }
306        // due to snapshot, change mutable to immutable segment
307        kvSize.incMemStoreSize(0,
308          CSLMImmutableSegment.DEEP_OVERHEAD_CSLM - MutableSegment.DEEP_OVERHEAD, 0, 0);
309        mss = store.memstore.getFlushableSize();
310        assertEquals(kvSize.getMemStoreSize(), mss);
311        MemStoreSizing kvSize2 = new NonThreadSafeMemStoreSizing();
312        store.add(new KeyValue(row, family, qf2, 2, (byte[]) null), kvSize2);
313        kvSize2.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0, 0);
314        // Even though we add a new kv, we expect the flushable size to be 'same' since we have
315        // not yet cleared the snapshot -- the above flush failed.
316        assertEquals(kvSize.getMemStoreSize(), mss);
317        ffs.fault.set(false);
318        flushStore(store, id++);
319        mss = store.memstore.getFlushableSize();
320        // Size should be the foreground kv size.
321        assertEquals(kvSize2.getMemStoreSize(), mss);
322        flushStore(store, id++);
323        mss = store.memstore.getFlushableSize();
324        assertEquals(0, mss.getDataSize());
325        assertEquals(MutableSegment.DEEP_OVERHEAD, mss.getHeapSize());
326        return null;
327      }
328    });
329  }
330
331  @Test
332  public void testStoreBloomFilterMetricsWithBloomRowCol() throws IOException {
333    int numStoreFiles = 5;
334    writeAndRead(BloomType.ROWCOL, numStoreFiles);
335
336    assertEquals(0, store.getBloomFilterEligibleRequestsCount());
337    // hard to know exactly the numbers here, we are just trying to
338    // prove that they are incrementing
339    assertTrue(store.getBloomFilterRequestsCount() >= numStoreFiles);
340    assertTrue(store.getBloomFilterNegativeResultsCount() > 0);
341  }
342
343  @Test
344  public void testStoreBloomFilterMetricsWithBloomRow() throws IOException {
345    int numStoreFiles = 5;
346    writeAndRead(BloomType.ROWCOL, numStoreFiles);
347
348    assertEquals(0, store.getBloomFilterEligibleRequestsCount());
349    // hard to know exactly the numbers here, we are just trying to
350    // prove that they are incrementing
351    assertTrue(store.getBloomFilterRequestsCount() >= numStoreFiles);
352    assertTrue(store.getBloomFilterNegativeResultsCount() > 0);
353  }
354
355  @Test
356  public void testStoreBloomFilterMetricsWithBloomRowPrefix() throws IOException {
357    int numStoreFiles = 5;
358    writeAndRead(BloomType.ROWPREFIX_FIXED_LENGTH, numStoreFiles);
359
360    assertEquals(0, store.getBloomFilterEligibleRequestsCount());
361    // hard to know exactly the numbers here, we are just trying to
362    // prove that they are incrementing
363    assertTrue(store.getBloomFilterRequestsCount() >= numStoreFiles);
364  }
365
366  @Test
367  public void testStoreBloomFilterMetricsWithBloomNone() throws IOException {
368    int numStoreFiles = 5;
369    writeAndRead(BloomType.NONE, numStoreFiles);
370
371    assertEquals(0, store.getBloomFilterRequestsCount());
372    assertEquals(0, store.getBloomFilterNegativeResultsCount());
373
374    // hard to know exactly the numbers here, we are just trying to
375    // prove that they are incrementing
376    assertTrue(store.getBloomFilterEligibleRequestsCount() >= numStoreFiles);
377  }
378
379  private void writeAndRead(BloomType bloomType, int numStoreFiles) throws IOException {
380    Configuration conf = HBaseConfiguration.create();
381    FileSystem fs = FileSystem.get(conf);
382
383    ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.newBuilder(family)
384      .setCompressionType(Compression.Algorithm.GZ).setBloomFilterType(bloomType)
385      .setConfiguration(BloomFilterUtil.PREFIX_LENGTH_KEY, "3").build();
386    init(name, conf, hcd);
387
388    for (int i = 1; i <= numStoreFiles; i++) {
389      byte[] row = Bytes.toBytes("row" + i);
390      LOG.info("Adding some data for the store file #" + i);
391      long timeStamp = EnvironmentEdgeManager.currentTime();
392      this.store.add(new KeyValue(row, family, qf1, timeStamp, (byte[]) null), null);
393      this.store.add(new KeyValue(row, family, qf2, timeStamp, (byte[]) null), null);
394      this.store.add(new KeyValue(row, family, qf3, timeStamp, (byte[]) null), null);
395      flush(i);
396    }
397
398    // Verify the total number of store files
399    assertEquals(numStoreFiles, this.store.getStorefiles().size());
400
401    TreeSet<byte[]> columns = new TreeSet<>(Bytes.BYTES_COMPARATOR);
402    columns.add(qf1);
403
404    for (int i = 1; i <= numStoreFiles; i++) {
405      KeyValueScanner scanner =
406        store.getScanner(new Scan(new Get(Bytes.toBytes("row" + i))), columns, 0);
407      scanner.peek();
408    }
409  }
410
411  /**
412   * Verify that compression and data block encoding are respected by the createWriter method, used
413   * on store flush.
414   */
415  @Test
416  public void testCreateWriter() throws Exception {
417    Configuration conf = HBaseConfiguration.create();
418    FileSystem fs = FileSystem.get(conf);
419
420    ColumnFamilyDescriptor hcd =
421      ColumnFamilyDescriptorBuilder.newBuilder(family).setCompressionType(Compression.Algorithm.GZ)
422        .setDataBlockEncoding(DataBlockEncoding.DIFF).build();
423    init(name, conf, hcd);
424
425    // Test createWriter
426    StoreFileWriter writer = store.getStoreEngine()
427      .createWriter(CreateStoreFileWriterParams.create().maxKeyCount(4)
428        .compression(hcd.getCompressionType()).isCompaction(false).includeMVCCReadpoint(true)
429        .includesTag(false).shouldDropBehind(false));
430    Path path = writer.getPath();
431    writer.append(new KeyValue(row, family, qf1, Bytes.toBytes(1)));
432    writer.append(new KeyValue(row, family, qf2, Bytes.toBytes(2)));
433    writer.append(new KeyValue(row2, family, qf1, Bytes.toBytes(3)));
434    writer.append(new KeyValue(row2, family, qf2, Bytes.toBytes(4)));
435    writer.close();
436
437    // Verify that compression and encoding settings are respected
438    HFile.Reader reader = HFile.createReader(fs, path, new CacheConfig(conf), true, conf);
439    assertEquals(hcd.getCompressionType(), reader.getTrailer().getCompressionCodec());
440    assertEquals(hcd.getDataBlockEncoding(), reader.getDataBlockEncoding());
441    reader.close();
442  }
443
444  @Test
445  public void testDeleteExpiredStoreFiles() throws Exception {
446    testDeleteExpiredStoreFiles(0);
447    testDeleteExpiredStoreFiles(1);
448  }
449
450  /**
451   * @param minVersions the MIN_VERSIONS for the column family
452   */
453  public void testDeleteExpiredStoreFiles(int minVersions) throws Exception {
454    int storeFileNum = 4;
455    int ttl = 4;
456    IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
457    EnvironmentEdgeManagerTestHelper.injectEdge(edge);
458
459    Configuration conf = HBaseConfiguration.create();
460    // Enable the expired store file deletion
461    conf.setBoolean("hbase.store.delete.expired.storefile", true);
462    // Set the compaction threshold higher to avoid normal compactions.
463    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 5);
464
465    init(name + "-" + minVersions, conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
466      .setMinVersions(minVersions).setTimeToLive(ttl).build());
467
468    long storeTtl = this.store.getScanInfo().getTtl();
469    long sleepTime = storeTtl / storeFileNum;
470    long timeStamp;
471    // There are 4 store files and the max time stamp difference among these
472    // store files will be (this.store.ttl / storeFileNum)
473    for (int i = 1; i <= storeFileNum; i++) {
474      LOG.info("Adding some data for the store file #" + i);
475      timeStamp = EnvironmentEdgeManager.currentTime();
476      this.store.add(new KeyValue(row, family, qf1, timeStamp, (byte[]) null), null);
477      this.store.add(new KeyValue(row, family, qf2, timeStamp, (byte[]) null), null);
478      this.store.add(new KeyValue(row, family, qf3, timeStamp, (byte[]) null), null);
479      flush(i);
480      edge.incrementTime(sleepTime);
481    }
482
483    // Verify the total number of store files
484    assertEquals(storeFileNum, this.store.getStorefiles().size());
485
486    // Each call will find one expired store file and delete it before compaction happens.
487    // There will be no compaction due to threshold above. Last file will not be replaced.
488    for (int i = 1; i <= storeFileNum - 1; i++) {
489      // verify the expired store file.
490      assertFalse(this.store.requestCompaction().isPresent());
491      Collection<HStoreFile> sfs = this.store.getStorefiles();
492      // Ensure i files are gone.
493      if (minVersions == 0) {
494        assertEquals(storeFileNum - i, sfs.size());
495        // Ensure only non-expired files remain.
496        for (HStoreFile sf : sfs) {
497          assertTrue(sf.getReader().getMaxTimestamp() >= (edge.currentTime() - storeTtl));
498        }
499      } else {
500        assertEquals(storeFileNum, sfs.size());
501      }
502      // Let the next store file expired.
503      edge.incrementTime(sleepTime);
504    }
505    assertFalse(this.store.requestCompaction().isPresent());
506
507    Collection<HStoreFile> sfs = this.store.getStorefiles();
508    // Assert the last expired file is not removed.
509    if (minVersions == 0) {
510      assertEquals(1, sfs.size());
511    }
512    long ts = sfs.iterator().next().getReader().getMaxTimestamp();
513    assertTrue(ts < (edge.currentTime() - storeTtl));
514
515    for (HStoreFile sf : sfs) {
516      sf.closeStoreFile(true);
517    }
518  }
519
520  @Test
521  public void testLowestModificationTime() throws Exception {
522    Configuration conf = HBaseConfiguration.create();
523    FileSystem fs = FileSystem.get(conf);
524    // Initialize region
525    init(name, conf);
526
527    int storeFileNum = 4;
528    for (int i = 1; i <= storeFileNum; i++) {
529      LOG.info("Adding some data for the store file #" + i);
530      this.store.add(new KeyValue(row, family, qf1, i, (byte[]) null), null);
531      this.store.add(new KeyValue(row, family, qf2, i, (byte[]) null), null);
532      this.store.add(new KeyValue(row, family, qf3, i, (byte[]) null), null);
533      flush(i);
534    }
535    // after flush; check the lowest time stamp
536    long lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
537    long lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
538    assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS);
539
540    // after compact; check the lowest time stamp
541    store.compact(store.requestCompaction().get(), NoLimitThroughputController.INSTANCE, null);
542    lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
543    lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
544    assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS);
545  }
546
547  private static long getLowestTimeStampFromFS(FileSystem fs,
548    final Collection<HStoreFile> candidates) throws IOException {
549    long minTs = Long.MAX_VALUE;
550    if (candidates.isEmpty()) {
551      return minTs;
552    }
553    Path[] p = new Path[candidates.size()];
554    int i = 0;
555    for (HStoreFile sf : candidates) {
556      p[i] = sf.getPath();
557      ++i;
558    }
559
560    FileStatus[] stats = fs.listStatus(p);
561    if (stats == null || stats.length == 0) {
562      return minTs;
563    }
564    for (FileStatus s : stats) {
565      minTs = Math.min(minTs, s.getModificationTime());
566    }
567    return minTs;
568  }
569
570  //////////////////////////////////////////////////////////////////////////////
571  // Get tests
572  //////////////////////////////////////////////////////////////////////////////
573
574  private static final int BLOCKSIZE_SMALL = 8192;
575
576  /**
577   * Test for hbase-1686.
578   */
579  @Test
580  public void testEmptyStoreFile() throws IOException {
581    init(name);
582    // Write a store file.
583    this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
584    this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null);
585    flush(1);
586    // Now put in place an empty store file. Its a little tricky. Have to
587    // do manually with hacked in sequence id.
588    HStoreFile f = this.store.getStorefiles().iterator().next();
589    Path storedir = f.getPath().getParent();
590    long seqid = f.getMaxSequenceId();
591    Configuration c = HBaseConfiguration.create();
592    FileSystem fs = FileSystem.get(c);
593    HFileContext meta = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build();
594    StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c), fs)
595      .withOutputDir(storedir).withFileContext(meta).build();
596    w.appendMetadata(seqid + 1, false);
597    w.close();
598    this.store.close();
599    // Reopen it... should pick up two files
600    this.store =
601      new HStore(this.store.getHRegion(), this.store.getColumnFamilyDescriptor(), c, false);
602    assertEquals(2, this.store.getStorefilesCount());
603
604    result = HBaseTestingUtil.getFromStoreFile(store, get.getRow(), qualifiers);
605    assertEquals(1, result.size());
606  }
607
608  /**
609   * Getting data from memstore only
610   */
611  @Test
612  public void testGet_FromMemStoreOnly() throws IOException {
613    init(name);
614
615    // Put data in memstore
616    this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
617    this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null);
618    this.store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null);
619    this.store.add(new KeyValue(row, family, qf4, 1, (byte[]) null), null);
620    this.store.add(new KeyValue(row, family, qf5, 1, (byte[]) null), null);
621    this.store.add(new KeyValue(row, family, qf6, 1, (byte[]) null), null);
622
623    // Get
624    result = HBaseTestingUtil.getFromStoreFile(store, get.getRow(), qualifiers);
625
626    // Compare
627    assertCheck();
628  }
629
630  @Test
631  public void testTimeRangeIfSomeCellsAreDroppedInFlush() throws IOException {
632    testTimeRangeIfSomeCellsAreDroppedInFlush(1);
633    testTimeRangeIfSomeCellsAreDroppedInFlush(3);
634    testTimeRangeIfSomeCellsAreDroppedInFlush(5);
635  }
636
637  private void testTimeRangeIfSomeCellsAreDroppedInFlush(int maxVersion) throws IOException {
638    init(name, TEST_UTIL.getConfiguration(),
639      ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(maxVersion).build());
640    long currentTs = 100;
641    long minTs = currentTs;
642    // the extra cell won't be flushed to disk,
643    // so the min of timerange will be different between memStore and hfile.
644    for (int i = 0; i != (maxVersion + 1); ++i) {
645      this.store.add(new KeyValue(row, family, qf1, ++currentTs, (byte[]) null), null);
646      if (i == 1) {
647        minTs = currentTs;
648      }
649    }
650    flushStore(store, id++);
651
652    Collection<HStoreFile> files = store.getStorefiles();
653    assertEquals(1, files.size());
654    HStoreFile f = files.iterator().next();
655    f.initReader();
656    StoreFileReader reader = f.getReader();
657    assertEquals(minTs, reader.timeRange.getMin());
658    assertEquals(currentTs, reader.timeRange.getMax());
659  }
660
661  /**
662   * Getting data from files only
663   */
664  @Test
665  public void testGet_FromFilesOnly() throws IOException {
666    init(name);
667
668    // Put data in memstore
669    this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
670    this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null);
671    // flush
672    flush(1);
673
674    // Add more data
675    this.store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null);
676    this.store.add(new KeyValue(row, family, qf4, 1, (byte[]) null), null);
677    // flush
678    flush(2);
679
680    // Add more data
681    this.store.add(new KeyValue(row, family, qf5, 1, (byte[]) null), null);
682    this.store.add(new KeyValue(row, family, qf6, 1, (byte[]) null), null);
683    // flush
684    flush(3);
685
686    // Get
687    result = HBaseTestingUtil.getFromStoreFile(store, get.getRow(), qualifiers);
688    // this.store.get(get, qualifiers, result);
689
690    // Need to sort the result since multiple files
691    Collections.sort(result, CellComparatorImpl.COMPARATOR);
692
693    // Compare
694    assertCheck();
695  }
696
697  /**
698   * Getting data from memstore and files
699   */
700  @Test
701  public void testGet_FromMemStoreAndFiles() throws IOException {
702    init(name);
703
704    // Put data in memstore
705    this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
706    this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null);
707    // flush
708    flush(1);
709
710    // Add more data
711    this.store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null);
712    this.store.add(new KeyValue(row, family, qf4, 1, (byte[]) null), null);
713    // flush
714    flush(2);
715
716    // Add more data
717    this.store.add(new KeyValue(row, family, qf5, 1, (byte[]) null), null);
718    this.store.add(new KeyValue(row, family, qf6, 1, (byte[]) null), null);
719
720    // Get
721    result = HBaseTestingUtil.getFromStoreFile(store, get.getRow(), qualifiers);
722
723    // Need to sort the result since multiple files
724    Collections.sort(result, CellComparatorImpl.COMPARATOR);
725
726    // Compare
727    assertCheck();
728  }
729
730  private void flush(int storeFilessize) throws IOException {
731    flushStore(store, id++);
732    assertEquals(storeFilessize, this.store.getStorefiles().size());
733    assertEquals(0, ((AbstractMemStore) this.store.memstore).getActive().getCellsCount());
734  }
735
736  private void assertCheck() {
737    assertEquals(expected.size(), result.size());
738    for (int i = 0; i < expected.size(); i++) {
739      assertEquals(expected.get(i), result.get(i));
740    }
741  }
742
743  @AfterEach
744  public void tearDown() throws Exception {
745    EnvironmentEdgeManagerTestHelper.reset();
746    if (store != null) {
747      try {
748        store.close();
749      } catch (IOException e) {
750      }
751      store = null;
752    }
753    if (region != null) {
754      region.close();
755      region = null;
756    }
757  }
758
759  @AfterAll
760  public static void tearDownAfterClass() throws IOException {
761    TEST_UTIL.cleanupTestDir();
762  }
763
764  @Test
765  public void testHandleErrorsInFlush() throws Exception {
766    LOG.info("Setting up a faulty file system that cannot write");
767
768    final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
769    User user = User.createUserForTesting(conf, "testhandleerrorsinflush", new String[] { "foo" });
770    // Inject our faulty LocalFileSystem
771    conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
772    user.runAs(new PrivilegedExceptionAction<Object>() {
773      @Override
774      public Object run() throws Exception {
775        // Make sure it worked (above is sensitive to caching details in hadoop core)
776        FileSystem fs = FileSystem.get(conf);
777        assertEquals(FaultyFileSystem.class, fs.getClass());
778
779        // Initialize region
780        init(name, conf);
781
782        LOG.info("Adding some data");
783        store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
784        store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null);
785        store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null);
786
787        LOG.info("Before flush, we should have no files");
788
789        StoreFileTracker sft = StoreFileTrackerFactory.create(conf, false, store.getStoreContext());
790        Collection<StoreFileInfo> files = sft.load();
791        assertEquals(0, files != null ? files.size() : 0);
792
793        // flush
794        try {
795          LOG.info("Flushing");
796          flush(1);
797          fail("Didn't bubble up IOE!");
798        } catch (IOException ioe) {
799          assertTrue(ioe.getMessage().contains("Fault injected"));
800        }
801
802        LOG.info("After failed flush, we should still have no files!");
803        files = sft.load();
804        assertEquals(0, files != null ? files.size() : 0);
805        store.getHRegion().getWAL().close();
806        return null;
807      }
808    });
809    FileSystem.closeAllForUGI(user.getUGI());
810  }
811
812  /**
813   * Faulty file system that will fail if you write past its fault position the FIRST TIME only;
814   * thereafter it will succeed. Used by {@link TestHRegion} too.
815   */
816  static class FaultyFileSystem extends FilterFileSystem {
817    List<SoftReference<FaultyOutputStream>> outStreams = new ArrayList<>();
818    private long faultPos = 200;
819    AtomicBoolean fault = new AtomicBoolean(true);
820
821    public FaultyFileSystem() {
822      super(new LocalFileSystem());
823      LOG.info("Creating faulty!");
824    }
825
826    @Override
827    public FSDataOutputStream create(Path p) throws IOException {
828      return new FaultyOutputStream(super.create(p), faultPos, fault);
829    }
830
831    @Override
832    public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite,
833      int bufferSize, short replication, long blockSize, Progressable progress) throws IOException {
834      return new FaultyOutputStream(
835        super.create(f, permission, overwrite, bufferSize, replication, blockSize, progress),
836        faultPos, fault);
837    }
838
839    @Override
840    public FSDataOutputStream createNonRecursive(Path f, boolean overwrite, int bufferSize,
841      short replication, long blockSize, Progressable progress) throws IOException {
842      // Fake it. Call create instead. The default implementation throws an IOE
843      // that this is not supported.
844      return create(f, overwrite, bufferSize, replication, blockSize, progress);
845    }
846  }
847
848  static class FaultyOutputStream extends FSDataOutputStream {
849    volatile long faultPos = Long.MAX_VALUE;
850    private final AtomicBoolean fault;
851
852    public FaultyOutputStream(FSDataOutputStream out, long faultPos, final AtomicBoolean fault)
853      throws IOException {
854      super(out, null);
855      this.faultPos = faultPos;
856      this.fault = fault;
857    }
858
859    @Override
860    public synchronized void write(byte[] buf, int offset, int length) throws IOException {
861      LOG.info("faulty stream write at pos " + getPos());
862      injectFault();
863      super.write(buf, offset, length);
864    }
865
866    private void injectFault() throws IOException {
867      if (this.fault.get() && getPos() >= faultPos) {
868        throw new IOException("Fault injected");
869      }
870    }
871  }
872
873  private static StoreFlushContext flushStore(HStore store, long id) throws IOException {
874    StoreFlushContext storeFlushCtx = store.createFlushContext(id, FlushLifeCycleTracker.DUMMY);
875    storeFlushCtx.prepare();
876    storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
877    storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
878    return storeFlushCtx;
879  }
880
881  /**
882   * Generate a list of KeyValues for testing based on given parameters
883   * @return the rows key-value list
884   */
885  private List<ExtendedCell> getKeyValueSet(long[] timestamps, int numRows, byte[] qualifier,
886    byte[] family) {
887    List<ExtendedCell> kvList = new ArrayList<>();
888    for (int i = 1; i <= numRows; i++) {
889      byte[] b = Bytes.toBytes(i);
890      for (long timestamp : timestamps) {
891        kvList.add(new KeyValue(b, family, qualifier, timestamp, b));
892      }
893    }
894    return kvList;
895  }
896
897  /**
898   * Test to ensure correctness when using Stores with multiple timestamps
899   */
900  @Test
901  public void testMultipleTimestamps() throws IOException {
902    int numRows = 1;
903    long[] timestamps1 = new long[] { 1, 5, 10, 20 };
904    long[] timestamps2 = new long[] { 30, 80 };
905
906    init(name);
907
908    List<ExtendedCell> kvList1 = getKeyValueSet(timestamps1, numRows, qf1, family);
909    for (ExtendedCell kv : kvList1) {
910      this.store.add(kv, null);
911    }
912
913    flushStore(store, id++);
914
915    List<ExtendedCell> kvList2 = getKeyValueSet(timestamps2, numRows, qf1, family);
916    for (ExtendedCell kv : kvList2) {
917      this.store.add(kv, null);
918    }
919
920    List<Cell> result;
921    Get get = new Get(Bytes.toBytes(1));
922    get.addColumn(family, qf1);
923
924    get.setTimeRange(0, 15);
925    result = HBaseTestingUtil.getFromStoreFile(store, get);
926    assertTrue(result.size() > 0);
927
928    get.setTimeRange(40, 90);
929    result = HBaseTestingUtil.getFromStoreFile(store, get);
930    assertTrue(result.size() > 0);
931
932    get.setTimeRange(10, 45);
933    result = HBaseTestingUtil.getFromStoreFile(store, get);
934    assertTrue(result.size() > 0);
935
936    get.setTimeRange(80, 145);
937    result = HBaseTestingUtil.getFromStoreFile(store, get);
938    assertTrue(result.size() > 0);
939
940    get.setTimeRange(1, 2);
941    result = HBaseTestingUtil.getFromStoreFile(store, get);
942    assertTrue(result.size() > 0);
943
944    get.setTimeRange(90, 200);
945    result = HBaseTestingUtil.getFromStoreFile(store, get);
946    assertTrue(result.size() == 0);
947  }
948
949  /**
950   * Test for HBASE-3492 - Test split on empty colfam (no store files).
951   * @throws IOException When the IO operations fail.
952   */
953  @Test
954  public void testSplitWithEmptyColFam() throws IOException {
955    init(name);
956    assertFalse(store.getSplitPoint().isPresent());
957  }
958
959  @Test
960  public void testStoreUsesConfigurationFromHcdAndHtd() throws Exception {
961    final String CONFIG_KEY = "hbase.regionserver.thread.compaction.throttle";
962    long anyValue = 10;
963
964    // We'll check that it uses correct config and propagates it appropriately by going thru
965    // the simplest "real" path I can find - "throttleCompaction", which just checks whether
966    // a number we pass in is higher than some config value, inside compactionPolicy.
967    Configuration conf = HBaseConfiguration.create();
968    conf.setLong(CONFIG_KEY, anyValue);
969    init(name + "-xml", conf);
970    assertTrue(store.throttleCompaction(anyValue + 1));
971    assertFalse(store.throttleCompaction(anyValue));
972
973    // HTD overrides XML.
974    --anyValue;
975    init(name + "-htd", conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table))
976      .setValue(CONFIG_KEY, Long.toString(anyValue)), ColumnFamilyDescriptorBuilder.of(family));
977    assertTrue(store.throttleCompaction(anyValue + 1));
978    assertFalse(store.throttleCompaction(anyValue));
979
980    // HCD overrides them both.
981    --anyValue;
982    init(name + "-hcd", conf,
983      TableDescriptorBuilder.newBuilder(TableName.valueOf(table)).setValue(CONFIG_KEY,
984        Long.toString(anyValue)),
985      ColumnFamilyDescriptorBuilder.newBuilder(family).setValue(CONFIG_KEY, Long.toString(anyValue))
986        .build());
987    assertTrue(store.throttleCompaction(anyValue + 1));
988    assertFalse(store.throttleCompaction(anyValue));
989  }
990
991  public static class DummyStoreEngine extends DefaultStoreEngine {
992    public static DefaultCompactor lastCreatedCompactor = null;
993
994    @Override
995    protected void createComponents(Configuration conf, HStore store, CellComparator comparator)
996      throws IOException {
997      super.createComponents(conf, store, comparator);
998      lastCreatedCompactor = this.compactor;
999    }
1000  }
1001
1002  @Test
1003  public void testStoreUsesSearchEngineOverride() throws Exception {
1004    Configuration conf = HBaseConfiguration.create();
1005    conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DummyStoreEngine.class.getName());
1006    init(name, conf);
1007    assertEquals(DummyStoreEngine.lastCreatedCompactor, this.store.storeEngine.getCompactor());
1008  }
1009
1010  private void addStoreFile() throws IOException {
1011    HStoreFile f = this.store.getStorefiles().iterator().next();
1012    Path storedir = f.getPath().getParent();
1013    long seqid = this.store.getMaxSequenceId().orElse(0L);
1014    Configuration c = TEST_UTIL.getConfiguration();
1015    FileSystem fs = FileSystem.get(c);
1016    HFileContext fileContext = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build();
1017    StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c), fs)
1018      .withOutputDir(storedir).withFileContext(fileContext).build();
1019    w.appendMetadata(seqid + 1, false);
1020    w.close();
1021    LOG.info("Added store file:" + w.getPath());
1022  }
1023
1024  private void archiveStoreFile(int index) throws IOException {
1025    Collection<HStoreFile> files = this.store.getStorefiles();
1026    HStoreFile sf = null;
1027    Iterator<HStoreFile> it = files.iterator();
1028    for (int i = 0; i <= index; i++) {
1029      sf = it.next();
1030    }
1031    StoreFileTracker sft =
1032      StoreFileTrackerFactory.create(store.getRegionFileSystem().getFileSystem().getConf(),
1033        store.isPrimaryReplicaStore(), store.getStoreContext());
1034    sft.removeStoreFiles(Lists.newArrayList(sf));
1035  }
1036
1037  private void closeCompactedFile(int index) throws IOException {
1038    Collection<HStoreFile> files =
1039      this.store.getStoreEngine().getStoreFileManager().getCompactedfiles();
1040    if (files.size() > 0) {
1041      HStoreFile sf = null;
1042      Iterator<HStoreFile> it = files.iterator();
1043      for (int i = 0; i <= index; i++) {
1044        sf = it.next();
1045      }
1046      sf.closeStoreFile(true);
1047      store.getStoreEngine().getStoreFileManager()
1048        .removeCompactedFiles(Collections.singletonList(sf));
1049    }
1050  }
1051
1052  @Test
1053  public void testRefreshStoreFiles() throws Exception {
1054    init(name);
1055
1056    assertEquals(0, this.store.getStorefilesCount());
1057
1058    // Test refreshing store files when no store files are there
1059    store.refreshStoreFiles();
1060    assertEquals(0, this.store.getStorefilesCount());
1061
1062    // add some data, flush
1063    this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
1064    flush(1);
1065    assertEquals(1, this.store.getStorefilesCount());
1066
1067    // add one more file
1068    addStoreFile();
1069
1070    assertEquals(1, this.store.getStorefilesCount());
1071    store.refreshStoreFiles();
1072    assertEquals(2, this.store.getStorefilesCount());
1073
1074    // add three more files
1075    addStoreFile();
1076    addStoreFile();
1077    addStoreFile();
1078
1079    assertEquals(2, this.store.getStorefilesCount());
1080    store.refreshStoreFiles();
1081    assertEquals(5, this.store.getStorefilesCount());
1082
1083    closeCompactedFile(0);
1084    archiveStoreFile(0);
1085
1086    assertEquals(5, this.store.getStorefilesCount());
1087    store.refreshStoreFiles();
1088    assertEquals(4, this.store.getStorefilesCount());
1089
1090    archiveStoreFile(0);
1091    archiveStoreFile(1);
1092    archiveStoreFile(2);
1093
1094    assertEquals(4, this.store.getStorefilesCount());
1095    store.refreshStoreFiles();
1096    assertEquals(1, this.store.getStorefilesCount());
1097
1098    archiveStoreFile(0);
1099    store.refreshStoreFiles();
1100    assertEquals(0, this.store.getStorefilesCount());
1101  }
1102
1103  @Test
1104  public void testRefreshStoreFilesNotChanged() throws IOException {
1105    init(name);
1106
1107    assertEquals(0, this.store.getStorefilesCount());
1108
1109    // add some data, flush
1110    this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
1111    flush(1);
1112    // add one more file
1113    addStoreFile();
1114
1115    StoreEngine<?, ?, ?, ?> spiedStoreEngine = spy(store.getStoreEngine());
1116
1117    // call first time after files changed
1118    spiedStoreEngine.refreshStoreFiles();
1119    assertEquals(2, this.store.getStorefilesCount());
1120    verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any(), any(), any());
1121
1122    // call second time
1123    spiedStoreEngine.refreshStoreFiles();
1124
1125    // ensure that replaceStoreFiles is not called, i.e, the times does not change, if files are not
1126    // refreshed,
1127    verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any(), any(), any());
1128  }
1129
1130  @Test
1131  public void testScanWithCompactionAfterFlush() throws Exception {
1132    TEST_UTIL.getConfiguration().set(DEFAULT_COMPACTION_POLICY_CLASS_KEY,
1133      EverythingPolicy.class.getName());
1134    init(name);
1135
1136    assertEquals(0, this.store.getStorefilesCount());
1137
1138    KeyValue kv = new KeyValue(row, family, qf1, 1, (byte[]) null);
1139    // add some data, flush
1140    this.store.add(kv, null);
1141    flush(1);
1142    kv = new KeyValue(row, family, qf2, 1, (byte[]) null);
1143    // add some data, flush
1144    this.store.add(kv, null);
1145    flush(2);
1146    kv = new KeyValue(row, family, qf3, 1, (byte[]) null);
1147    // add some data, flush
1148    this.store.add(kv, null);
1149    flush(3);
1150
1151    ExecutorService service = Executors.newFixedThreadPool(2);
1152
1153    Scan scan = new Scan(new Get(row));
1154    Future<KeyValueScanner> scanFuture = service.submit(() -> {
1155      try {
1156        LOG.info(">>>> creating scanner");
1157        return this.store.createScanner(scan,
1158          new ScanInfo(HBaseConfiguration.create(),
1159            ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(4).build(),
1160            Long.MAX_VALUE, 0, CellComparator.getInstance()),
1161          scan.getFamilyMap().get(store.getColumnFamilyDescriptor().getName()), 0);
1162      } catch (IOException e) {
1163        e.printStackTrace();
1164        return null;
1165      }
1166    });
1167    Future compactFuture = service.submit(() -> {
1168      try {
1169        LOG.info(">>>>>> starting compaction");
1170        Optional<CompactionContext> opCompaction = this.store.requestCompaction();
1171        assertTrue(opCompaction.isPresent());
1172        store.compact(opCompaction.get(), new NoLimitThroughputController(), User.getCurrent());
1173        LOG.info(">>>>>> Compaction is finished");
1174        this.store.closeAndArchiveCompactedFiles();
1175        LOG.info(">>>>>> Compacted files deleted");
1176      } catch (IOException e) {
1177        e.printStackTrace();
1178      }
1179    });
1180
1181    KeyValueScanner kvs = scanFuture.get();
1182    compactFuture.get();
1183    ((StoreScanner) kvs).currentScanners.forEach(s -> {
1184      if (s instanceof StoreFileScanner) {
1185        assertEquals(1, ((StoreFileScanner) s).getReader().getRefCount());
1186      }
1187    });
1188    kvs.seek(kv);
1189    service.shutdownNow();
1190  }
1191
1192  private long countMemStoreScanner(StoreScanner scanner) {
1193    if (scanner.currentScanners == null) {
1194      return 0;
1195    }
1196    return scanner.currentScanners.stream().filter(s -> !s.isFileScanner()).count();
1197  }
1198
1199  @Test
1200  public void testNumberOfMemStoreScannersAfterFlush() throws IOException {
1201    long seqId = 100;
1202    long timestamp = EnvironmentEdgeManager.currentTime();
1203    ExtendedCell cell0 =
1204      ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family)
1205        .setQualifier(qf1).setTimestamp(timestamp).setType(Cell.Type.Put).setValue(qf1).build();
1206    PrivateCellUtil.setSequenceId(cell0, seqId);
1207    testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Collections.emptyList());
1208
1209    ExtendedCell cell1 =
1210      ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family)
1211        .setQualifier(qf2).setTimestamp(timestamp).setType(Cell.Type.Put).setValue(qf1).build();
1212    PrivateCellUtil.setSequenceId(cell1, seqId);
1213    testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1));
1214
1215    seqId = 101;
1216    timestamp = EnvironmentEdgeManager.currentTime();
1217    ExtendedCell cell2 =
1218      ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row2).setFamily(family)
1219        .setQualifier(qf2).setTimestamp(timestamp).setType(Cell.Type.Put).setValue(qf1).build();
1220    PrivateCellUtil.setSequenceId(cell2, seqId);
1221    testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1, cell2));
1222  }
1223
1224  private void testNumberOfMemStoreScannersAfterFlush(List<ExtendedCell> inputCellsBeforeSnapshot,
1225    List<ExtendedCell> inputCellsAfterSnapshot) throws IOException {
1226    init(name + "-" + inputCellsBeforeSnapshot.size());
1227    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1228    long seqId = Long.MIN_VALUE;
1229    for (ExtendedCell c : inputCellsBeforeSnapshot) {
1230      quals.add(CellUtil.cloneQualifier(c));
1231      seqId = Math.max(seqId, c.getSequenceId());
1232    }
1233    for (ExtendedCell c : inputCellsAfterSnapshot) {
1234      quals.add(CellUtil.cloneQualifier(c));
1235      seqId = Math.max(seqId, c.getSequenceId());
1236    }
1237    inputCellsBeforeSnapshot.forEach(c -> store.add(c, null));
1238    StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY);
1239    storeFlushCtx.prepare();
1240    inputCellsAfterSnapshot.forEach(c -> store.add(c, null));
1241    int numberOfMemScannersBeforeFlush = inputCellsAfterSnapshot.isEmpty() ? 1 : 2;
1242    try (StoreScanner s = (StoreScanner) store.getScanner(new Scan(), quals, seqId)) {
1243      // snapshot + active (if inputCellsAfterSnapshot isn't empty)
1244      assertEquals(numberOfMemScannersBeforeFlush, countMemStoreScanner(s));
1245      storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1246      storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1247      // snapshot has no data after flush
1248      int numberOfMemScannersAfterFlush = inputCellsAfterSnapshot.isEmpty() ? 0 : 1;
1249      boolean more;
1250      int cellCount = 0;
1251      do {
1252        List<Cell> cells = new ArrayList<>();
1253        more = s.next(cells);
1254        cellCount += cells.size();
1255        assertEquals(more ? numberOfMemScannersAfterFlush : 0, countMemStoreScanner(s));
1256      } while (more);
1257      assertEquals(inputCellsBeforeSnapshot.size() + inputCellsAfterSnapshot.size(), cellCount,
1258        "The number of cells added before snapshot is " + inputCellsBeforeSnapshot.size()
1259          + ", The number of cells added after snapshot is " + inputCellsAfterSnapshot.size());
1260      // the current scanners is cleared
1261      assertEquals(0, countMemStoreScanner(s));
1262    }
1263  }
1264
1265  private ExtendedCell createCell(byte[] qualifier, long ts, long sequenceId, byte[] value)
1266    throws IOException {
1267    return createCell(row, qualifier, ts, sequenceId, value);
1268  }
1269
1270  private ExtendedCell createCell(byte[] row, byte[] qualifier, long ts, long sequenceId,
1271    byte[] value) throws IOException {
1272    return ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row)
1273      .setFamily(family).setQualifier(qualifier).setTimestamp(ts).setType(Cell.Type.Put)
1274      .setValue(value).setSequenceId(sequenceId).build();
1275  }
1276
1277  private ExtendedCell createDeleteCell(byte[] row, byte[] qualifier, long ts, long sequenceId) {
1278    return ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row)
1279      .setFamily(family).setQualifier(qualifier).setTimestamp(ts).setType(Cell.Type.DeleteColumn)
1280      .setSequenceId(sequenceId).build();
1281  }
1282
1283  @Test
1284  public void testFlushBeforeCompletingScanWoFilter() throws IOException, InterruptedException {
1285    final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false);
1286    final int expectedSize = 3;
1287    testFlushBeforeCompletingScan(new MyListHook() {
1288      @Override
1289      public void hook(int currentSize) {
1290        if (currentSize == expectedSize - 1) {
1291          try {
1292            flushStore(store, id++);
1293            timeToGoNextRow.set(true);
1294          } catch (IOException e) {
1295            throw new RuntimeException(e);
1296          }
1297        }
1298      }
1299    }, new FilterBase() {
1300      @Override
1301      public Filter.ReturnCode filterCell(final Cell c) throws IOException {
1302        return ReturnCode.INCLUDE;
1303      }
1304    }, expectedSize);
1305  }
1306
1307  @Test
1308  public void testFlushBeforeCompletingScanWithFilter() throws IOException, InterruptedException {
1309    final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false);
1310    final int expectedSize = 2;
1311    testFlushBeforeCompletingScan(new MyListHook() {
1312      @Override
1313      public void hook(int currentSize) {
1314        if (currentSize == expectedSize - 1) {
1315          try {
1316            flushStore(store, id++);
1317            timeToGoNextRow.set(true);
1318          } catch (IOException e) {
1319            throw new RuntimeException(e);
1320          }
1321        }
1322      }
1323    }, new FilterBase() {
1324      @Override
1325      public Filter.ReturnCode filterCell(final Cell c) throws IOException {
1326        if (timeToGoNextRow.get()) {
1327          timeToGoNextRow.set(false);
1328          return ReturnCode.NEXT_ROW;
1329        } else {
1330          return ReturnCode.INCLUDE;
1331        }
1332      }
1333    }, expectedSize);
1334  }
1335
1336  @Test
1337  public void testFlushBeforeCompletingScanWithFilterHint()
1338    throws IOException, InterruptedException {
1339    final AtomicBoolean timeToGetHint = new AtomicBoolean(false);
1340    final int expectedSize = 2;
1341    testFlushBeforeCompletingScan(new MyListHook() {
1342      @Override
1343      public void hook(int currentSize) {
1344        if (currentSize == expectedSize - 1) {
1345          try {
1346            flushStore(store, id++);
1347            timeToGetHint.set(true);
1348          } catch (IOException e) {
1349            throw new RuntimeException(e);
1350          }
1351        }
1352      }
1353    }, new FilterBase() {
1354      @Override
1355      public Filter.ReturnCode filterCell(final Cell c) throws IOException {
1356        if (timeToGetHint.get()) {
1357          timeToGetHint.set(false);
1358          return Filter.ReturnCode.SEEK_NEXT_USING_HINT;
1359        } else {
1360          return Filter.ReturnCode.INCLUDE;
1361        }
1362      }
1363
1364      @Override
1365      public Cell getNextCellHint(Cell currentCell) throws IOException {
1366        return currentCell;
1367      }
1368    }, expectedSize);
1369  }
1370
1371  private void testFlushBeforeCompletingScan(MyListHook hook, Filter filter, int expectedSize)
1372    throws IOException, InterruptedException {
1373    Configuration conf = HBaseConfiguration.create();
1374    byte[] r0 = Bytes.toBytes("row0");
1375    byte[] r1 = Bytes.toBytes("row1");
1376    byte[] r2 = Bytes.toBytes("row2");
1377    byte[] value0 = Bytes.toBytes("value0");
1378    byte[] value1 = Bytes.toBytes("value1");
1379    byte[] value2 = Bytes.toBytes("value2");
1380    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1381    long ts = EnvironmentEdgeManager.currentTime();
1382    long seqId = 100;
1383    init(name, conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)),
1384      ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(1).build(),
1385      new MyStoreHook() {
1386        @Override
1387        public long getSmallestReadPoint(HStore store) {
1388          return seqId + 3;
1389        }
1390      });
1391    // The cells having the value0 won't be flushed to disk because the value of max version is 1
1392    store.add(createCell(r0, qf1, ts, seqId, value0), memStoreSizing);
1393    store.add(createCell(r0, qf2, ts, seqId, value0), memStoreSizing);
1394    store.add(createCell(r0, qf3, ts, seqId, value0), memStoreSizing);
1395    store.add(createCell(r1, qf1, ts + 1, seqId + 1, value1), memStoreSizing);
1396    store.add(createCell(r1, qf2, ts + 1, seqId + 1, value1), memStoreSizing);
1397    store.add(createCell(r1, qf3, ts + 1, seqId + 1, value1), memStoreSizing);
1398    store.add(createCell(r2, qf1, ts + 2, seqId + 2, value2), memStoreSizing);
1399    store.add(createCell(r2, qf2, ts + 2, seqId + 2, value2), memStoreSizing);
1400    store.add(createCell(r2, qf3, ts + 2, seqId + 2, value2), memStoreSizing);
1401    store.add(createCell(r1, qf1, ts + 3, seqId + 3, value1), memStoreSizing);
1402    store.add(createCell(r1, qf2, ts + 3, seqId + 3, value1), memStoreSizing);
1403    store.add(createCell(r1, qf3, ts + 3, seqId + 3, value1), memStoreSizing);
1404    List<Cell> myList = new MyList<>(hook);
1405    Scan scan = new Scan().withStartRow(r1).setFilter(filter);
1406    try (InternalScanner scanner = (InternalScanner) store.getScanner(scan, null, seqId + 3)) {
1407      // r1
1408      scanner.next(myList);
1409      assertEquals(expectedSize, myList.size());
1410      for (Cell c : myList) {
1411        byte[] actualValue = CellUtil.cloneValue(c);
1412        assertTrue(Bytes.equals(actualValue, value1), "expected:" + Bytes.toStringBinary(value1)
1413          + ", actual:" + Bytes.toStringBinary(actualValue));
1414      }
1415      List<Cell> normalList = new ArrayList<>(3);
1416      // r2
1417      scanner.next(normalList);
1418      assertEquals(3, normalList.size());
1419      for (Cell c : normalList) {
1420        byte[] actualValue = CellUtil.cloneValue(c);
1421        assertTrue(Bytes.equals(actualValue, value2), "expected:" + Bytes.toStringBinary(value2)
1422          + ", actual:" + Bytes.toStringBinary(actualValue));
1423      }
1424    }
1425  }
1426
1427  @Test
1428  public void testFlushBeforeCompletingScanWithDeleteCell() throws IOException {
1429    final Configuration conf = HBaseConfiguration.create();
1430
1431    byte[] r1 = Bytes.toBytes("row1");
1432    byte[] r2 = Bytes.toBytes("row2");
1433
1434    byte[] value1 = Bytes.toBytes("value1");
1435    byte[] value2 = Bytes.toBytes("value2");
1436
1437    final MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1438    final long ts = EnvironmentEdgeManager.currentTime();
1439    final long seqId = 100;
1440
1441    init(name, conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)),
1442      ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(1).build(),
1443      new MyStoreHook() {
1444        @Override
1445        long getSmallestReadPoint(HStore store) {
1446          return seqId + 3;
1447        }
1448      });
1449
1450    store.add(createCell(r1, qf1, ts + 1, seqId + 1, value2), memStoreSizing);
1451    store.add(createCell(r1, qf2, ts + 1, seqId + 1, value2), memStoreSizing);
1452    store.add(createCell(r1, qf3, ts + 1, seqId + 1, value2), memStoreSizing);
1453
1454    store.add(createDeleteCell(r1, qf1, ts + 2, seqId + 2), memStoreSizing);
1455    store.add(createDeleteCell(r1, qf2, ts + 2, seqId + 2), memStoreSizing);
1456    store.add(createDeleteCell(r1, qf3, ts + 2, seqId + 2), memStoreSizing);
1457
1458    store.add(createCell(r2, qf1, ts + 3, seqId + 3, value1), memStoreSizing);
1459    store.add(createCell(r2, qf2, ts + 3, seqId + 3, value1), memStoreSizing);
1460    store.add(createCell(r2, qf3, ts + 3, seqId + 3, value1), memStoreSizing);
1461
1462    Scan scan = new Scan().withStartRow(r1);
1463
1464    try (final InternalScanner scanner =
1465      new StoreScanner(store, store.getScanInfo(), scan, null, seqId + 3) {
1466        @Override
1467        protected KeyValueHeap newKVHeap(List<? extends KeyValueScanner> scanners,
1468          CellComparator comparator) throws IOException {
1469          return new MyKeyValueHeap(scanners, comparator, recordBlockSizeCallCount -> {
1470            if (recordBlockSizeCallCount == 6) {
1471              try {
1472                flushStore(store, id++);
1473              } catch (IOException e) {
1474                throw new RuntimeException(e);
1475              }
1476            }
1477          });
1478        }
1479      }) {
1480      List<Cell> cellResult = new ArrayList<>();
1481
1482      scanner.next(cellResult);
1483      assertEquals(0, cellResult.size());
1484
1485      cellResult.clear();
1486
1487      scanner.next(cellResult);
1488      assertEquals(3, cellResult.size());
1489      for (Cell cell : cellResult) {
1490        assertArrayEquals(r2, CellUtil.cloneRow(cell));
1491      }
1492    }
1493  }
1494
1495  @Test
1496  public void testCreateScannerAndSnapshotConcurrently() throws IOException, InterruptedException {
1497    Configuration conf = HBaseConfiguration.create();
1498    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore.class.getName());
1499    init(name, conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
1500      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
1501    byte[] value = Bytes.toBytes("value");
1502    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1503    long ts = EnvironmentEdgeManager.currentTime();
1504    long seqId = 100;
1505    // older data whihc shouldn't be "seen" by client
1506    store.add(createCell(qf1, ts, seqId, value), memStoreSizing);
1507    store.add(createCell(qf2, ts, seqId, value), memStoreSizing);
1508    store.add(createCell(qf3, ts, seqId, value), memStoreSizing);
1509    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1510    quals.add(qf1);
1511    quals.add(qf2);
1512    quals.add(qf3);
1513    StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY);
1514    MyCompactingMemStore.START_TEST.set(true);
1515    Runnable flush = () -> {
1516      // this is blocked until we create first scanner from pipeline and snapshot -- phase (1/5)
1517      // recreate the active memstore -- phase (4/5)
1518      storeFlushCtx.prepare();
1519    };
1520    ExecutorService service = Executors.newSingleThreadExecutor();
1521    service.execute(flush);
1522    // we get scanner from pipeline and snapshot but they are empty. -- phase (2/5)
1523    // this is blocked until we recreate the active memstore -- phase (3/5)
1524    // we get scanner from active memstore but it is empty -- phase (5/5)
1525    InternalScanner scanner =
1526      (InternalScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1);
1527    service.shutdown();
1528    service.awaitTermination(20, TimeUnit.SECONDS);
1529    try {
1530      try {
1531        List<Cell> results = new ArrayList<>();
1532        scanner.next(results);
1533        assertEquals(3, results.size());
1534        for (Cell c : results) {
1535          byte[] actualValue = CellUtil.cloneValue(c);
1536          assertTrue(Bytes.equals(actualValue, value), "expected:" + Bytes.toStringBinary(value)
1537            + ", actual:" + Bytes.toStringBinary(actualValue));
1538        }
1539      } finally {
1540        scanner.close();
1541      }
1542    } finally {
1543      MyCompactingMemStore.START_TEST.set(false);
1544      storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1545      storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1546    }
1547  }
1548
1549  @Test
1550  public void testScanWithDoubleFlush() throws IOException {
1551    Configuration conf = HBaseConfiguration.create();
1552    // Initialize region
1553    MyStore myStore = initMyStore(name, conf, new MyStoreHook() {
1554      @Override
1555      public void getScanners(MyStore store) throws IOException {
1556        final long tmpId = id++;
1557        ExecutorService s = Executors.newSingleThreadExecutor();
1558        s.execute(() -> {
1559          try {
1560            // flush the store before storescanner updates the scanners from store.
1561            // The current data will be flushed into files, and the memstore will
1562            // be clear.
1563            // -- phase (4/4)
1564            flushStore(store, tmpId);
1565          } catch (IOException ex) {
1566            throw new RuntimeException(ex);
1567          }
1568        });
1569        s.shutdown();
1570        try {
1571          // wait for the flush, the thread will be blocked in HStore#notifyChangedReadersObservers.
1572          s.awaitTermination(3, TimeUnit.SECONDS);
1573        } catch (InterruptedException ex) {
1574        }
1575      }
1576    });
1577    byte[] oldValue = Bytes.toBytes("oldValue");
1578    byte[] currentValue = Bytes.toBytes("currentValue");
1579    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1580    long ts = EnvironmentEdgeManager.currentTime();
1581    long seqId = 100;
1582    // older data whihc shouldn't be "seen" by client
1583    myStore.add(createCell(qf1, ts, seqId, oldValue), memStoreSizing);
1584    myStore.add(createCell(qf2, ts, seqId, oldValue), memStoreSizing);
1585    myStore.add(createCell(qf3, ts, seqId, oldValue), memStoreSizing);
1586    long snapshotId = id++;
1587    // push older data into snapshot -- phase (1/4)
1588    StoreFlushContext storeFlushCtx =
1589      store.createFlushContext(snapshotId, FlushLifeCycleTracker.DUMMY);
1590    storeFlushCtx.prepare();
1591
1592    // insert current data into active -- phase (2/4)
1593    myStore.add(createCell(qf1, ts + 1, seqId + 1, currentValue), memStoreSizing);
1594    myStore.add(createCell(qf2, ts + 1, seqId + 1, currentValue), memStoreSizing);
1595    myStore.add(createCell(qf3, ts + 1, seqId + 1, currentValue), memStoreSizing);
1596    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1597    quals.add(qf1);
1598    quals.add(qf2);
1599    quals.add(qf3);
1600    try (InternalScanner scanner =
1601      (InternalScanner) myStore.getScanner(new Scan(new Get(row)), quals, seqId + 1)) {
1602      // complete the flush -- phase (3/4)
1603      storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1604      storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1605
1606      List<Cell> results = new ArrayList<>();
1607      scanner.next(results);
1608      assertEquals(3, results.size());
1609      for (Cell c : results) {
1610        byte[] actualValue = CellUtil.cloneValue(c);
1611        assertTrue(Bytes.equals(actualValue, currentValue), "expected:"
1612          + Bytes.toStringBinary(currentValue) + ", actual:" + Bytes.toStringBinary(actualValue));
1613      }
1614    }
1615  }
1616
1617  /**
1618   * This test is for HBASE-27519, when the {@link StoreScanner} is scanning,the Flush and the
1619   * Compaction execute concurrently and theCcompaction compact and archive the flushed
1620   * {@link HStoreFile} which is used by {@link StoreScanner#updateReaders}.Before
1621   * HBASE-27519,{@link StoreScanner.updateReaders} would throw {@link FileNotFoundException}.
1622   */
1623  @Test
1624  public void testStoreScannerUpdateReadersWhenFlushAndCompactConcurrently() throws IOException {
1625    Configuration conf = HBaseConfiguration.create();
1626    conf.setBoolean(WALFactory.WAL_ENABLED, false);
1627    conf.set(DEFAULT_COMPACTION_POLICY_CLASS_KEY, EverythingPolicy.class.getName());
1628    byte[] r0 = Bytes.toBytes("row0");
1629    byte[] r1 = Bytes.toBytes("row1");
1630    final CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
1631    final AtomicBoolean shouldWaitRef = new AtomicBoolean(false);
1632    // Initialize region
1633    final MyStore myStore = initMyStore(name, conf, new MyStoreHook() {
1634      @Override
1635      public void getScanners(MyStore store) throws IOException {
1636        try {
1637          // Here this method is called by StoreScanner.updateReaders which is invoked by the
1638          // following TestHStore.flushStore
1639          if (shouldWaitRef.get()) {
1640            // wait the following compaction Task start
1641            cyclicBarrier.await();
1642            // wait the following HStore.closeAndArchiveCompactedFiles end.
1643            cyclicBarrier.await();
1644          }
1645        } catch (BrokenBarrierException | InterruptedException e) {
1646          throw new RuntimeException(e);
1647        }
1648      }
1649    });
1650
1651    final AtomicReference<Throwable> compactionExceptionRef = new AtomicReference<Throwable>(null);
1652    Runnable compactionTask = () -> {
1653      try {
1654        // Only when the StoreScanner.updateReaders invoked by TestHStore.flushStore prepares for
1655        // entering the MyStore.getScanners, compactionTask could start.
1656        cyclicBarrier.await();
1657        region.compactStore(family, new NoLimitThroughputController());
1658        myStore.closeAndArchiveCompactedFiles();
1659        // Notify StoreScanner.updateReaders could enter MyStore.getScanners.
1660        cyclicBarrier.await();
1661      } catch (Throwable e) {
1662        compactionExceptionRef.set(e);
1663      }
1664    };
1665
1666    long ts = EnvironmentEdgeManager.currentTime();
1667    long seqId = 100;
1668    byte[] value = Bytes.toBytes("value");
1669    // older data whihc shouldn't be "seen" by client
1670    myStore.add(createCell(r0, qf1, ts, seqId, value), null);
1671    flushStore(myStore, id++);
1672    myStore.add(createCell(r0, qf2, ts, seqId, value), null);
1673    flushStore(myStore, id++);
1674    myStore.add(createCell(r0, qf3, ts, seqId, value), null);
1675    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1676    quals.add(qf1);
1677    quals.add(qf2);
1678    quals.add(qf3);
1679
1680    myStore.add(createCell(r1, qf1, ts, seqId, value), null);
1681    myStore.add(createCell(r1, qf2, ts, seqId, value), null);
1682    myStore.add(createCell(r1, qf3, ts, seqId, value), null);
1683
1684    Thread.currentThread()
1685      .setName("testStoreScannerUpdateReadersWhenFlushAndCompactConcurrently thread");
1686    Scan scan = new Scan();
1687    scan.withStartRow(r0, true);
1688    try (InternalScanner scanner = (InternalScanner) myStore.getScanner(scan, quals, seqId)) {
1689      List<Cell> results = new MyList<>(size -> {
1690        switch (size) {
1691          case 1:
1692            shouldWaitRef.set(true);
1693            Thread thread = new Thread(compactionTask);
1694            thread.setName("MyCompacting Thread.");
1695            thread.start();
1696            try {
1697              flushStore(myStore, id++);
1698              thread.join();
1699            } catch (IOException | InterruptedException e) {
1700              throw new RuntimeException(e);
1701            }
1702            shouldWaitRef.set(false);
1703            break;
1704          default:
1705            break;
1706        }
1707      });
1708      // Before HBASE-27519, here would throw java.io.FileNotFoundException because the storeFile
1709      // which used by StoreScanner.updateReaders is deleted by compactionTask.
1710      scanner.next(results);
1711      // The results is r0 row cells.
1712      assertEquals(3, results.size());
1713      assertTrue(compactionExceptionRef.get() == null);
1714    }
1715  }
1716
1717  @Test
1718  public void testReclaimChunkWhenScaning() throws IOException {
1719    init("testReclaimChunkWhenScaning");
1720    long ts = EnvironmentEdgeManager.currentTime();
1721    long seqId = 100;
1722    byte[] value = Bytes.toBytes("value");
1723    // older data whihc shouldn't be "seen" by client
1724    store.add(createCell(qf1, ts, seqId, value), null);
1725    store.add(createCell(qf2, ts, seqId, value), null);
1726    store.add(createCell(qf3, ts, seqId, value), null);
1727    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1728    quals.add(qf1);
1729    quals.add(qf2);
1730    quals.add(qf3);
1731    try (InternalScanner scanner =
1732      (InternalScanner) store.getScanner(new Scan(new Get(row)), quals, seqId)) {
1733      List<Cell> results = new MyList<>(size -> {
1734        switch (size) {
1735          // 1) we get the first cell (qf1)
1736          // 2) flush the data to have StoreScanner update inner scanners
1737          // 3) the chunk will be reclaimed after updaing
1738          case 1:
1739            try {
1740              flushStore(store, id++);
1741            } catch (IOException e) {
1742              throw new RuntimeException(e);
1743            }
1744            break;
1745          // 1) we get the second cell (qf2)
1746          // 2) add some cell to fill some byte into the chunk (we have only one chunk)
1747          case 2:
1748            try {
1749              byte[] newValue = Bytes.toBytes("newValue");
1750              // older data whihc shouldn't be "seen" by client
1751              store.add(createCell(qf1, ts + 1, seqId + 1, newValue), null);
1752              store.add(createCell(qf2, ts + 1, seqId + 1, newValue), null);
1753              store.add(createCell(qf3, ts + 1, seqId + 1, newValue), null);
1754            } catch (IOException e) {
1755              throw new RuntimeException(e);
1756            }
1757            break;
1758          default:
1759            break;
1760        }
1761      });
1762      scanner.next(results);
1763      assertEquals(3, results.size());
1764      for (Cell c : results) {
1765        byte[] actualValue = CellUtil.cloneValue(c);
1766        assertTrue(Bytes.equals(actualValue, value), "expected:" + Bytes.toStringBinary(value)
1767          + ", actual:" + Bytes.toStringBinary(actualValue));
1768      }
1769    }
1770  }
1771
1772  /**
1773   * If there are two running InMemoryFlushRunnable, the later InMemoryFlushRunnable may change the
1774   * versionedList. And the first InMemoryFlushRunnable will use the chagned versionedList to remove
1775   * the corresponding segments. In short, there will be some segements which isn't in merge are
1776   * removed.
1777   */
1778  @Test
1779  public void testRunDoubleMemStoreCompactors() throws IOException, InterruptedException {
1780    int flushSize = 500;
1781    Configuration conf = HBaseConfiguration.create();
1782    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStoreWithCustomCompactor.class.getName());
1783    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.25);
1784    MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.set(0);
1785    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushSize));
1786    // Set the lower threshold to invoke the "MERGE" policy
1787    conf.set(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, String.valueOf(0));
1788    init(name, conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
1789      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
1790    byte[] value = Bytes.toBytes("thisisavarylargevalue");
1791    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1792    long ts = EnvironmentEdgeManager.currentTime();
1793    long seqId = 100;
1794    // older data whihc shouldn't be "seen" by client
1795    store.add(createCell(qf1, ts, seqId, value), memStoreSizing);
1796    store.add(createCell(qf2, ts, seqId, value), memStoreSizing);
1797    store.add(createCell(qf3, ts, seqId, value), memStoreSizing);
1798    assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
1799    StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY);
1800    storeFlushCtx.prepare();
1801    // This shouldn't invoke another in-memory flush because the first compactor thread
1802    // hasn't accomplished the in-memory compaction.
1803    store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing);
1804    store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing);
1805    store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing);
1806    assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
1807    // okay. Let the compaction be completed
1808    MyMemStoreCompactor.START_COMPACTOR_LATCH.countDown();
1809    CompactingMemStore mem = (CompactingMemStore) ((HStore) store).memstore;
1810    while (mem.isMemStoreFlushingInMemory()) {
1811      TimeUnit.SECONDS.sleep(1);
1812    }
1813    // This should invoke another in-memory flush.
1814    store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing);
1815    store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing);
1816    store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing);
1817    assertEquals(2, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
1818    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
1819      String.valueOf(TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE));
1820    storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1821    storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1822  }
1823
1824  @Test
1825  public void testAge() throws IOException {
1826    long currentTime = EnvironmentEdgeManager.currentTime();
1827    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
1828    edge.setValue(currentTime);
1829    EnvironmentEdgeManager.injectEdge(edge);
1830    Configuration conf = TEST_UTIL.getConfiguration();
1831    ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(family);
1832    initHRegion(name, conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), hcd, null,
1833      false);
1834    HStore store = new HStore(region, hcd, conf, false) {
1835
1836      @Override
1837      protected StoreEngine<?, ?, ?, ?> createStoreEngine(HStore store, Configuration conf,
1838        CellComparator kvComparator) throws IOException {
1839        List<HStoreFile> storefiles =
1840          Arrays.asList(mockStoreFile(currentTime - 10), mockStoreFile(currentTime - 100),
1841            mockStoreFile(currentTime - 1000), mockStoreFile(currentTime - 10000));
1842        StoreFileManager sfm = mock(StoreFileManager.class);
1843        when(sfm.getStoreFiles()).thenReturn(storefiles);
1844        StoreEngine<?, ?, ?, ?> storeEngine = mock(StoreEngine.class);
1845        when(storeEngine.getStoreFileManager()).thenReturn(sfm);
1846        return storeEngine;
1847      }
1848    };
1849    assertEquals(10L, store.getMinStoreFileAge().getAsLong());
1850    assertEquals(10000L, store.getMaxStoreFileAge().getAsLong());
1851    assertEquals((10 + 100 + 1000 + 10000) / 4.0, store.getAvgStoreFileAge().getAsDouble(), 1E-4);
1852  }
1853
1854  private HStoreFile mockStoreFile(long createdTime) {
1855    StoreFileInfo info = mock(StoreFileInfo.class);
1856    when(info.getCreatedTimestamp()).thenReturn(createdTime);
1857    HStoreFile sf = mock(HStoreFile.class);
1858    when(sf.getReader()).thenReturn(mock(StoreFileReader.class));
1859    when(sf.isHFile()).thenReturn(true);
1860    when(sf.getFileInfo()).thenReturn(info);
1861    return sf;
1862  }
1863
1864  private MyStore initMyStore(String methodName, Configuration conf, MyStoreHook hook)
1865    throws IOException {
1866    return (MyStore) init(methodName, conf,
1867      TableDescriptorBuilder.newBuilder(TableName.valueOf(table)),
1868      ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(5).build(), hook);
1869  }
1870
1871  private static class MyStore extends HStore {
1872    private final MyStoreHook hook;
1873
1874    MyStore(final HRegion region, final ColumnFamilyDescriptor family,
1875      final Configuration confParam, MyStoreHook hook, boolean switchToPread) throws IOException {
1876      super(region, family, confParam, false);
1877      this.hook = hook;
1878    }
1879
1880    @Override
1881    public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks,
1882      boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,
1883      boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt,
1884      boolean includeMemstoreScanner, boolean onlyLatestVersion) throws IOException {
1885      hook.getScanners(this);
1886      return super.getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true,
1887        stopRow, false, readPt, includeMemstoreScanner, onlyLatestVersion);
1888    }
1889
1890    @Override
1891    public long getSmallestReadPoint() {
1892      return hook.getSmallestReadPoint(this);
1893    }
1894  }
1895
1896  private abstract static class MyStoreHook {
1897
1898    void getScanners(MyStore store) throws IOException {
1899    }
1900
1901    long getSmallestReadPoint(HStore store) {
1902      return store.getHRegion().getSmallestReadPoint();
1903    }
1904  }
1905
1906  @Test
1907  public void testSwitchingPreadtoStreamParallelyWithCompactionDischarger() throws Exception {
1908    Configuration conf = HBaseConfiguration.create();
1909    conf.set("hbase.hstore.engine.class", DummyStoreEngine.class.getName());
1910    conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 0);
1911    // Set the lower threshold to invoke the "MERGE" policy
1912    MyStore store = initMyStore(name, conf, new MyStoreHook() {
1913    });
1914    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1915    long ts = EnvironmentEdgeManager.currentTime();
1916    long seqID = 1L;
1917    // Add some data to the region and do some flushes
1918    for (int i = 1; i < 10; i++) {
1919      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1920        memStoreSizing);
1921    }
1922    // flush them
1923    flushStore(store, seqID);
1924    for (int i = 11; i < 20; i++) {
1925      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1926        memStoreSizing);
1927    }
1928    // flush them
1929    flushStore(store, seqID);
1930    for (int i = 21; i < 30; i++) {
1931      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1932        memStoreSizing);
1933    }
1934    // flush them
1935    flushStore(store, seqID);
1936
1937    assertEquals(3, store.getStorefilesCount());
1938    Scan scan = new Scan();
1939    scan.addFamily(family);
1940    Collection<HStoreFile> storefiles2 = store.getStorefiles();
1941    ArrayList<HStoreFile> actualStorefiles = Lists.newArrayList(storefiles2);
1942    StoreScanner storeScanner =
1943      (StoreScanner) store.getScanner(scan, scan.getFamilyMap().get(family), Long.MAX_VALUE);
1944    // get the current heap
1945    KeyValueHeap heap = storeScanner.heap;
1946    // create more store files
1947    for (int i = 31; i < 40; i++) {
1948      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1949        memStoreSizing);
1950    }
1951    // flush them
1952    flushStore(store, seqID);
1953
1954    for (int i = 41; i < 50; i++) {
1955      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1956        memStoreSizing);
1957    }
1958    // flush them
1959    flushStore(store, seqID);
1960    storefiles2 = store.getStorefiles();
1961    ArrayList<HStoreFile> actualStorefiles1 = Lists.newArrayList(storefiles2);
1962    actualStorefiles1.removeAll(actualStorefiles);
1963    // Do compaction
1964    MyThread thread = new MyThread(storeScanner);
1965    thread.start();
1966    store.replaceStoreFiles(actualStorefiles, actualStorefiles1, false);
1967    thread.join();
1968    KeyValueHeap heap2 = thread.getHeap();
1969    assertFalse(heap.equals(heap2));
1970  }
1971
1972  @Test
1973  public void testMaxPreadBytesConfiguredToBeLessThanZero() throws Exception {
1974    Configuration conf = HBaseConfiguration.create();
1975    conf.set("hbase.hstore.engine.class", DummyStoreEngine.class.getName());
1976    // Set 'hbase.storescanner.pread.max.bytes' < 0, so that StoreScanner will be a STREAM type.
1977    conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, -1);
1978    MyStore store = initMyStore(name, conf, new MyStoreHook() {
1979    });
1980    Scan scan = new Scan();
1981    scan.addFamily(family);
1982    // ReadType on Scan is still DEFAULT only.
1983    assertEquals(ReadType.DEFAULT, scan.getReadType());
1984    StoreScanner storeScanner =
1985      (StoreScanner) store.getScanner(scan, scan.getFamilyMap().get(family), Long.MAX_VALUE);
1986    assertFalse(storeScanner.isScanUsePread());
1987  }
1988
1989  @Test
1990  public void testInMemoryCompactionTypeWithLowerCase() throws IOException, InterruptedException {
1991    Configuration conf = HBaseConfiguration.create();
1992    conf.set("hbase.systemtables.compacting.memstore.type", "eager");
1993    init(name, conf,
1994      TableDescriptorBuilder.newBuilder(
1995        TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME, "meta".getBytes())),
1996      ColumnFamilyDescriptorBuilder.newBuilder(family)
1997        .setInMemoryCompaction(MemoryCompactionPolicy.NONE).build());
1998    assertTrue(((MemStoreCompactor) ((CompactingMemStore) store.memstore).compactor).toString()
1999      .startsWith("eager".toUpperCase()));
2000  }
2001
2002  @Test
2003  public void testSpaceQuotaChangeAfterReplacement() throws IOException {
2004    final TableName tn = TableName.valueOf(name);
2005    init(name);
2006
2007    RegionSizeStoreImpl sizeStore = new RegionSizeStoreImpl();
2008
2009    HStoreFile sf1 = mockStoreFileWithLength(1024L);
2010    HStoreFile sf2 = mockStoreFileWithLength(2048L);
2011    HStoreFile sf3 = mockStoreFileWithLength(4096L);
2012    HStoreFile sf4 = mockStoreFileWithLength(8192L);
2013
2014    RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tn).setStartKey(Bytes.toBytes("a"))
2015      .setEndKey(Bytes.toBytes("b")).build();
2016
2017    // Compacting two files down to one, reducing size
2018    sizeStore.put(regionInfo, 1024L + 4096L);
2019    store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo, Arrays.asList(sf1, sf3),
2020      Arrays.asList(sf2));
2021
2022    assertEquals(2048L, sizeStore.getRegionSize(regionInfo).getSize());
2023
2024    // The same file length in and out should have no change
2025    store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo, Arrays.asList(sf2),
2026      Arrays.asList(sf2));
2027
2028    assertEquals(2048L, sizeStore.getRegionSize(regionInfo).getSize());
2029
2030    // Increase the total size used
2031    store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo, Arrays.asList(sf2),
2032      Arrays.asList(sf3));
2033
2034    assertEquals(4096L, sizeStore.getRegionSize(regionInfo).getSize());
2035
2036    RegionInfo regionInfo2 = RegionInfoBuilder.newBuilder(tn).setStartKey(Bytes.toBytes("b"))
2037      .setEndKey(Bytes.toBytes("c")).build();
2038    store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo2, null, Arrays.asList(sf4));
2039
2040    assertEquals(8192L, sizeStore.getRegionSize(regionInfo2).getSize());
2041  }
2042
2043  @Test
2044  public void testHFileContextSetWithCFAndTable() throws Exception {
2045    init(name);
2046    StoreFileWriter writer = store.getStoreEngine()
2047      .createWriter(CreateStoreFileWriterParams.create().maxKeyCount(10000L)
2048        .compression(Compression.Algorithm.NONE).isCompaction(true).includeMVCCReadpoint(true)
2049        .includesTag(false).shouldDropBehind(true));
2050    HFileContext hFileContext = writer.getLiveFileWriter().getFileContext();
2051    assertArrayEquals(family, hFileContext.getColumnFamily());
2052    assertArrayEquals(table, hFileContext.getTableName());
2053  }
2054
2055  // This test is for HBASE-26026, HBase Write be stuck when active segment has no cell
2056  // but its dataSize exceeds inmemoryFlushSize
2057  @Test
2058  public void testCompactingMemStoreNoCellButDataSizeExceedsInmemoryFlushSize()
2059    throws IOException, InterruptedException {
2060    Configuration conf = HBaseConfiguration.create();
2061
2062    byte[] smallValue = new byte[3];
2063    byte[] largeValue = new byte[9];
2064    final long timestamp = EnvironmentEdgeManager.currentTime();
2065    final long seqId = 100;
2066    final ExtendedCell smallCell = createCell(qf1, timestamp, seqId, smallValue);
2067    final ExtendedCell largeCell = createCell(qf2, timestamp, seqId, largeValue);
2068    int smallCellByteSize = MutableSegment.getCellLength(smallCell);
2069    int largeCellByteSize = MutableSegment.getCellLength(largeCell);
2070    int flushByteSize = smallCellByteSize + largeCellByteSize - 2;
2071
2072    // set CompactingMemStore.inmemoryFlushSize to flushByteSize.
2073    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore2.class.getName());
2074    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
2075    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
2076
2077    init(name, conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
2078      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
2079
2080    MyCompactingMemStore2 myCompactingMemStore = ((MyCompactingMemStore2) store.memstore);
2081    assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
2082    myCompactingMemStore.smallCellPreUpdateCounter.set(0);
2083    myCompactingMemStore.largeCellPreUpdateCounter.set(0);
2084
2085    final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
2086    Thread smallCellThread = new Thread(() -> {
2087      try {
2088        store.add(smallCell, new NonThreadSafeMemStoreSizing());
2089      } catch (Throwable exception) {
2090        exceptionRef.set(exception);
2091      }
2092    });
2093    smallCellThread.setName(MyCompactingMemStore2.SMALL_CELL_THREAD_NAME);
2094    smallCellThread.start();
2095
2096    String oldThreadName = Thread.currentThread().getName();
2097    try {
2098      /**
2099       * 1.smallCellThread enters CompactingMemStore.checkAndAddToActiveSize first, then
2100       * largeCellThread enters CompactingMemStore.checkAndAddToActiveSize, and then largeCellThread
2101       * invokes flushInMemory.
2102       * <p/>
2103       * 2. After largeCellThread finished CompactingMemStore.flushInMemory method, smallCellThread
2104       * can add cell to currentActive . That is to say when largeCellThread called flushInMemory
2105       * method, CompactingMemStore.active has no cell.
2106       */
2107      Thread.currentThread().setName(MyCompactingMemStore2.LARGE_CELL_THREAD_NAME);
2108      store.add(largeCell, new NonThreadSafeMemStoreSizing());
2109      smallCellThread.join();
2110
2111      for (int i = 0; i < 100; i++) {
2112        long currentTimestamp = timestamp + 100 + i;
2113        ExtendedCell cell = createCell(qf2, currentTimestamp, seqId, largeValue);
2114        store.add(cell, new NonThreadSafeMemStoreSizing());
2115      }
2116    } finally {
2117      Thread.currentThread().setName(oldThreadName);
2118    }
2119
2120    assertTrue(exceptionRef.get() == null);
2121
2122  }
2123
2124  // This test is for HBASE-26210, HBase Write be stuck when there is cell which size exceeds
2125  // InmemoryFlushSize
2126  @Test
2127  @Timeout(value = 60000, unit = TimeUnit.MILLISECONDS)
2128  public void testCompactingMemStoreCellExceedInmemoryFlushSize() throws Exception {
2129    Configuration conf = HBaseConfiguration.create();
2130    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore6.class.getName());
2131
2132    init(name, conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
2133      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
2134
2135    MyCompactingMemStore6 myCompactingMemStore = ((MyCompactingMemStore6) store.memstore);
2136
2137    int size = (int) (myCompactingMemStore.getInmemoryFlushSize());
2138    byte[] value = new byte[size + 1];
2139
2140    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
2141    long timestamp = EnvironmentEdgeManager.currentTime();
2142    long seqId = 100;
2143    ExtendedCell cell = createCell(qf1, timestamp, seqId, value);
2144    int cellByteSize = MutableSegment.getCellLength(cell);
2145    store.add(cell, memStoreSizing);
2146    assertTrue(memStoreSizing.getCellsCount() == 1);
2147    assertTrue(memStoreSizing.getDataSize() == cellByteSize);
2148    // Waiting the in memory compaction completed, see HBASE-26438
2149    myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
2150  }
2151
2152  /**
2153   * This test is for HBASE-27464, before this JIRA,when init {@link CellChunkImmutableSegment} for
2154   * 'COMPACT' action, we not force copy to current MSLab. When cell size bigger than
2155   * {@link MemStoreLABImpl#maxAlloc}, cell will stay in previous chunk which will recycle after
2156   * segment replace, and we may read wrong data when these chunk reused by others.
2157   */
2158  @Test
2159  public void testForceCloneOfBigCellForCellChunkImmutableSegment() throws Exception {
2160    Configuration conf = HBaseConfiguration.create();
2161    int maxAllocByteSize = conf.getInt(MemStoreLAB.MAX_ALLOC_KEY, MemStoreLAB.MAX_ALLOC_DEFAULT);
2162
2163    // Construct big cell,which is large than {@link MemStoreLABImpl#maxAlloc}.
2164    byte[] cellValue = new byte[maxAllocByteSize + 1];
2165    final long timestamp = EnvironmentEdgeManager.currentTime();
2166    final long seqId = 100;
2167    final byte[] rowKey1 = Bytes.toBytes("rowKey1");
2168    final ExtendedCell originalCell1 = createCell(rowKey1, qf1, timestamp, seqId, cellValue);
2169    final byte[] rowKey2 = Bytes.toBytes("rowKey2");
2170    final ExtendedCell originalCell2 = createCell(rowKey2, qf1, timestamp, seqId, cellValue);
2171    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
2172    quals.add(qf1);
2173
2174    int cellByteSize = MutableSegment.getCellLength(originalCell1);
2175    int inMemoryFlushByteSize = cellByteSize - 1;
2176
2177    // set CompactingMemStore.inmemoryFlushSize to flushByteSize.
2178    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore6.class.getName());
2179    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
2180    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(inMemoryFlushByteSize * 200));
2181    conf.setBoolean(WALFactory.WAL_ENABLED, false);
2182
2183    // Use {@link MemoryCompactionPolicy#EAGER} for always compacting.
2184    init(name, conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
2185      .setInMemoryCompaction(MemoryCompactionPolicy.EAGER).build());
2186
2187    MyCompactingMemStore6 myCompactingMemStore = ((MyCompactingMemStore6) store.memstore);
2188    assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == inMemoryFlushByteSize);
2189
2190    // Data chunk Pool is disabled.
2191    assertTrue(ChunkCreator.getInstance().getMaxCount(ChunkType.DATA_CHUNK) == 0);
2192
2193    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
2194
2195    // First compact
2196    store.add(originalCell1, memStoreSizing);
2197    // Waiting for the first in-memory compaction finished
2198    myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
2199
2200    StoreScanner storeScanner =
2201      (StoreScanner) store.getScanner(new Scan(new Get(rowKey1)), quals, seqId + 1);
2202    SegmentScanner segmentScanner = getTypeKeyValueScanner(storeScanner, SegmentScanner.class);
2203    ExtendedCell resultCell1 = segmentScanner.next();
2204    assertTrue(PrivateCellUtil.equals(resultCell1, originalCell1));
2205    int cell1ChunkId = resultCell1.getChunkId();
2206    assertTrue(cell1ChunkId != ExtendedCell.CELL_NOT_BASED_ON_CHUNK);
2207    assertNull(segmentScanner.next());
2208    segmentScanner.close();
2209    storeScanner.close();
2210    Segment segment = segmentScanner.segment;
2211    assertTrue(segment instanceof CellChunkImmutableSegment);
2212    MemStoreLABImpl memStoreLAB1 = (MemStoreLABImpl) (segmentScanner.segment.getMemStoreLAB());
2213    assertTrue(!memStoreLAB1.isClosed());
2214    assertTrue(!memStoreLAB1.chunks.isEmpty());
2215    assertTrue(!memStoreLAB1.isReclaimed());
2216
2217    // Second compact
2218    store.add(originalCell2, memStoreSizing);
2219    // Waiting for the second in-memory compaction finished
2220    myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
2221
2222    // Before HBASE-27464, here may throw java.lang.IllegalArgumentException: In CellChunkMap, cell
2223    // must be associated with chunk.. We were looking for a cell at index 0.
2224    // The cause for this exception is because the data chunk Pool is disabled,when the data chunks
2225    // are recycled after the second in-memory compaction finished,the
2226    // {@link ChunkCreator.putbackChunks} method does not put the chunks back to the data chunk
2227    // pool,it just removes them from {@link ChunkCreator#chunkIdMap},so in
2228    // {@link CellChunkMap#getCell} we could not get the data chunk by chunkId.
2229    storeScanner = (StoreScanner) store.getScanner(new Scan(new Get(rowKey1)), quals, seqId + 1);
2230    segmentScanner = getTypeKeyValueScanner(storeScanner, SegmentScanner.class);
2231    ExtendedCell newResultCell1 = segmentScanner.next();
2232    assertTrue(newResultCell1 != resultCell1);
2233    assertTrue(PrivateCellUtil.equals(newResultCell1, originalCell1));
2234
2235    ExtendedCell resultCell2 = segmentScanner.next();
2236    assertTrue(PrivateCellUtil.equals(resultCell2, originalCell2));
2237    assertNull(segmentScanner.next());
2238    segmentScanner.close();
2239    storeScanner.close();
2240
2241    segment = segmentScanner.segment;
2242    assertTrue(segment instanceof CellChunkImmutableSegment);
2243    MemStoreLABImpl memStoreLAB2 = (MemStoreLABImpl) (segmentScanner.segment.getMemStoreLAB());
2244    assertTrue(!memStoreLAB2.isClosed());
2245    assertTrue(!memStoreLAB2.chunks.isEmpty());
2246    assertTrue(!memStoreLAB2.isReclaimed());
2247    assertTrue(memStoreLAB1.isClosed());
2248    assertTrue(memStoreLAB1.chunks.isEmpty());
2249    assertTrue(memStoreLAB1.isReclaimed());
2250  }
2251
2252  // This test is for HBASE-26210 also, test write large cell and small cell concurrently when
2253  // InmemoryFlushSize is smaller,equal with and larger than cell size.
2254  @Test
2255  public void testCompactingMemStoreWriteLargeCellAndSmallCellConcurrently()
2256    throws IOException, InterruptedException {
2257    doWriteTestLargeCellAndSmallCellConcurrently(
2258      (smallCellByteSize, largeCellByteSize) -> largeCellByteSize - 1);
2259    doWriteTestLargeCellAndSmallCellConcurrently(
2260      (smallCellByteSize, largeCellByteSize) -> largeCellByteSize);
2261    doWriteTestLargeCellAndSmallCellConcurrently(
2262      (smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize - 1);
2263    doWriteTestLargeCellAndSmallCellConcurrently(
2264      (smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize);
2265    doWriteTestLargeCellAndSmallCellConcurrently(
2266      (smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize + 1);
2267  }
2268
2269  private void doWriteTestLargeCellAndSmallCellConcurrently(IntBinaryOperator getFlushByteSize)
2270    throws IOException, InterruptedException {
2271
2272    Configuration conf = HBaseConfiguration.create();
2273
2274    byte[] smallValue = new byte[3];
2275    byte[] largeValue = new byte[100];
2276    final long timestamp = EnvironmentEdgeManager.currentTime();
2277    final long seqId = 100;
2278    final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
2279    final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
2280    int smallCellByteSize = MutableSegment.getCellLength(smallCell);
2281    int largeCellByteSize = MutableSegment.getCellLength(largeCell);
2282    int flushByteSize = getFlushByteSize.applyAsInt(smallCellByteSize, largeCellByteSize);
2283    boolean flushByteSizeLessThanSmallAndLargeCellSize =
2284      flushByteSize < (smallCellByteSize + largeCellByteSize);
2285
2286    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore3.class.getName());
2287    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
2288    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
2289
2290    init(name, conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
2291      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
2292
2293    MyCompactingMemStore3 myCompactingMemStore = ((MyCompactingMemStore3) store.memstore);
2294    assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
2295    myCompactingMemStore.disableCompaction();
2296    if (flushByteSizeLessThanSmallAndLargeCellSize) {
2297      myCompactingMemStore.flushByteSizeLessThanSmallAndLargeCellSize = true;
2298    } else {
2299      myCompactingMemStore.flushByteSizeLessThanSmallAndLargeCellSize = false;
2300    }
2301
2302    final ThreadSafeMemStoreSizing memStoreSizing = new ThreadSafeMemStoreSizing();
2303    final AtomicLong totalCellByteSize = new AtomicLong(0);
2304    final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
2305    Thread smallCellThread = new Thread(() -> {
2306      try {
2307        for (int i = 1; i <= MyCompactingMemStore3.CELL_COUNT; i++) {
2308          long currentTimestamp = timestamp + i;
2309          ExtendedCell cell = createCell(qf1, currentTimestamp, seqId, smallValue);
2310          totalCellByteSize.addAndGet(MutableSegment.getCellLength(cell));
2311          store.add(cell, memStoreSizing);
2312        }
2313      } catch (Throwable exception) {
2314        exceptionRef.set(exception);
2315
2316      }
2317    });
2318    smallCellThread.setName(MyCompactingMemStore3.SMALL_CELL_THREAD_NAME);
2319    smallCellThread.start();
2320
2321    String oldThreadName = Thread.currentThread().getName();
2322    try {
2323      /**
2324       * When flushByteSizeLessThanSmallAndLargeCellSize is true:
2325       * </p>
2326       * 1.smallCellThread enters MyCompactingMemStore3.checkAndAddToActiveSize first, then
2327       * largeCellThread enters MyCompactingMemStore3.checkAndAddToActiveSize, and then
2328       * largeCellThread invokes flushInMemory.
2329       * <p/>
2330       * 2. After largeCellThread finished CompactingMemStore.flushInMemory method, smallCellThread
2331       * can run into MyCompactingMemStore3.checkAndAddToActiveSize again.
2332       * <p/>
2333       * When flushByteSizeLessThanSmallAndLargeCellSize is false: smallCellThread and
2334       * largeCellThread concurrently write one cell and wait each other, and then write another
2335       * cell etc.
2336       */
2337      Thread.currentThread().setName(MyCompactingMemStore3.LARGE_CELL_THREAD_NAME);
2338      for (int i = 1; i <= MyCompactingMemStore3.CELL_COUNT; i++) {
2339        long currentTimestamp = timestamp + i;
2340        ExtendedCell cell = createCell(qf2, currentTimestamp, seqId, largeValue);
2341        totalCellByteSize.addAndGet(MutableSegment.getCellLength(cell));
2342        store.add(cell, memStoreSizing);
2343      }
2344      smallCellThread.join();
2345
2346      assertTrue(exceptionRef.get() == null);
2347      assertTrue(memStoreSizing.getCellsCount() == (MyCompactingMemStore3.CELL_COUNT * 2));
2348      assertTrue(memStoreSizing.getDataSize() == totalCellByteSize.get());
2349      if (flushByteSizeLessThanSmallAndLargeCellSize) {
2350        assertTrue(myCompactingMemStore.flushCounter.get() == MyCompactingMemStore3.CELL_COUNT);
2351      } else {
2352        assertTrue(
2353          myCompactingMemStore.flushCounter.get() <= (MyCompactingMemStore3.CELL_COUNT - 1));
2354      }
2355    } finally {
2356      Thread.currentThread().setName(oldThreadName);
2357    }
2358  }
2359
2360  /**
2361   * <pre>
2362   * This test is for HBASE-26384,
2363   * test {@link CompactingMemStore#flattenOneSegment} and {@link CompactingMemStore#snapshot()}
2364   * execute concurrently.
2365   * The threads sequence before HBASE-26384 is(The bug only exists for branch-2,and I add UTs
2366   * for both branch-2 and master):
2367   * 1. The {@link CompactingMemStore} size exceeds
2368   *    {@link CompactingMemStore#getInmemoryFlushSize()},the write thread adds a new
2369   *    {@link ImmutableSegment}  to the head of {@link CompactingMemStore#pipeline},and start a
2370   *    in memory compact thread to execute {@link CompactingMemStore#inMemoryCompaction}.
2371   * 2. The in memory compact thread starts and then stopping before
2372   *    {@link CompactingMemStore#flattenOneSegment}.
2373   * 3. The snapshot thread starts {@link CompactingMemStore#snapshot} concurrently,after the
2374   *    snapshot thread executing {@link CompactingMemStore#getImmutableSegments},the in memory
2375   *    compact thread continues.
2376   *    Assuming {@link VersionedSegmentsList#version} returned from
2377   *    {@link CompactingMemStore#getImmutableSegments} is v.
2378   * 4. The snapshot thread stopping before {@link CompactingMemStore#swapPipelineWithNull}.
2379   * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
2380   *    {@link CompactionPipeline#version} is still v.
2381   * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
2382   *    {@link CompactionPipeline#version} is v, {@link CompactingMemStore#swapPipelineWithNull}
2383   *    thinks it is successful and continue flushing,but the {@link ImmutableSegment} in
2384   *    {@link CompactionPipeline} has changed because
2385   *    {@link CompactingMemStore#flattenOneSegment},so the {@link ImmutableSegment} is not
2386   *    removed in fact and still remaining in {@link CompactionPipeline}.
2387   *
2388   * After HBASE-26384, the 5-6 step is changed to following, which is expected behavior:
2389   * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
2390   *    {@link CompactingMemStore#flattenOneSegment} change {@link CompactionPipeline#version} to
2391   *    v+1.
2392   * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
2393   *    {@link CompactionPipeline#version} is v+1, {@link CompactingMemStore#swapPipelineWithNull}
2394   *    failed and retry the while loop in {@link CompactingMemStore#pushPipelineToSnapshot} once
2395   *    again, because there is no concurrent {@link CompactingMemStore#inMemoryCompaction} now,
2396   *    {@link CompactingMemStore#swapPipelineWithNull} succeeds.
2397   * </pre>
2398   */
2399  @Test
2400  public void testFlattenAndSnapshotCompactingMemStoreConcurrently() throws Exception {
2401    Configuration conf = HBaseConfiguration.create();
2402
2403    byte[] smallValue = new byte[3];
2404    byte[] largeValue = new byte[9];
2405    final long timestamp = EnvironmentEdgeManager.currentTime();
2406    final long seqId = 100;
2407    final ExtendedCell smallCell = createCell(qf1, timestamp, seqId, smallValue);
2408    final ExtendedCell largeCell = createCell(qf2, timestamp, seqId, largeValue);
2409    int smallCellByteSize = MutableSegment.getCellLength(smallCell);
2410    int largeCellByteSize = MutableSegment.getCellLength(largeCell);
2411    int totalCellByteSize = (smallCellByteSize + largeCellByteSize);
2412    int flushByteSize = totalCellByteSize - 2;
2413
2414    // set CompactingMemStore.inmemoryFlushSize to flushByteSize.
2415    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore4.class.getName());
2416    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
2417    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
2418
2419    init(name, conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
2420      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
2421
2422    MyCompactingMemStore4 myCompactingMemStore = ((MyCompactingMemStore4) store.memstore);
2423    assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
2424
2425    store.add(smallCell, new NonThreadSafeMemStoreSizing());
2426    store.add(largeCell, new NonThreadSafeMemStoreSizing());
2427
2428    String oldThreadName = Thread.currentThread().getName();
2429    try {
2430      Thread.currentThread().setName(MyCompactingMemStore4.TAKE_SNAPSHOT_THREAD_NAME);
2431      /**
2432       * {@link CompactingMemStore#snapshot} must wait the in memory compact thread enters
2433       * {@link CompactingMemStore#flattenOneSegment},because {@link CompactingMemStore#snapshot}
2434       * would invoke {@link CompactingMemStore#stopCompaction}.
2435       */
2436      myCompactingMemStore.snapShotStartCyclicCyclicBarrier.await();
2437
2438      MemStoreSnapshot memStoreSnapshot = myCompactingMemStore.snapshot();
2439      myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
2440
2441      assertTrue(memStoreSnapshot.getCellsCount() == 2);
2442      assertTrue(((int) (memStoreSnapshot.getDataSize())) == totalCellByteSize);
2443      VersionedSegmentsList segments = myCompactingMemStore.getImmutableSegments();
2444      assertTrue(segments.getNumOfSegments() == 0);
2445      assertTrue(segments.getNumOfCells() == 0);
2446      assertTrue(myCompactingMemStore.setInMemoryCompactionFlagCounter.get() == 1);
2447      assertTrue(myCompactingMemStore.swapPipelineWithNullCounter.get() == 2);
2448    } finally {
2449      Thread.currentThread().setName(oldThreadName);
2450    }
2451  }
2452
2453  /**
2454   * <pre>
2455   * This test is for HBASE-26384,
2456   * test {@link CompactingMemStore#flattenOneSegment}{@link CompactingMemStore#snapshot()}
2457   * and writeMemStore execute concurrently.
2458   * The threads sequence before HBASE-26384 is(The bug only exists for branch-2,and I add UTs
2459   * for both branch-2 and master):
2460   * 1. The {@link CompactingMemStore} size exceeds
2461   *    {@link CompactingMemStore#getInmemoryFlushSize()},the write thread adds a new
2462   *    {@link ImmutableSegment}  to the head of {@link CompactingMemStore#pipeline},and start a
2463   *    in memory compact thread to execute {@link CompactingMemStore#inMemoryCompaction}.
2464   * 2. The in memory compact thread starts and then stopping before
2465   *    {@link CompactingMemStore#flattenOneSegment}.
2466   * 3. The snapshot thread starts {@link CompactingMemStore#snapshot} concurrently,after the
2467   *    snapshot thread executing {@link CompactingMemStore#getImmutableSegments},the in memory
2468   *    compact thread continues.
2469   *    Assuming {@link VersionedSegmentsList#version} returned from
2470   *    {@link CompactingMemStore#getImmutableSegments} is v.
2471   * 4. The snapshot thread stopping before {@link CompactingMemStore#swapPipelineWithNull}.
2472   * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
2473   *    {@link CompactionPipeline#version} is still v.
2474   * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
2475   *    {@link CompactionPipeline#version} is v, {@link CompactingMemStore#swapPipelineWithNull}
2476   *    thinks it is successful and continue flushing,but the {@link ImmutableSegment} in
2477   *    {@link CompactionPipeline} has changed because
2478   *    {@link CompactingMemStore#flattenOneSegment},so the {@link ImmutableSegment} is not
2479   *    removed in fact and still remaining in {@link CompactionPipeline}.
2480   *
2481   * After HBASE-26384, the 5-6 step is changed to following, which is expected behavior,
2482   * and I add step 7-8 to test there is new segment added before retry.
2483   * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
2484   *    {@link CompactingMemStore#flattenOneSegment} change {@link CompactionPipeline#version} to
2485   *     v+1.
2486   * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
2487   *    {@link CompactionPipeline#version} is v+1, {@link CompactingMemStore#swapPipelineWithNull}
2488   *    failed and retry,{@link VersionedSegmentsList#version} returned from
2489   *    {@link CompactingMemStore#getImmutableSegments} is v+1.
2490   * 7. The write thread continues writing to {@link CompactingMemStore} and
2491   *    {@link CompactingMemStore} size exceeds {@link CompactingMemStore#getInmemoryFlushSize()},
2492   *    {@link CompactingMemStore#flushInMemory(MutableSegment)} is called and a new
2493   *    {@link ImmutableSegment} is added to the head of {@link CompactingMemStore#pipeline},
2494   *    {@link CompactionPipeline#version} is still v+1.
2495   * 8. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
2496   *    {@link CompactionPipeline#version} is still v+1,
2497   *    {@link CompactingMemStore#swapPipelineWithNull} succeeds.The new {@link ImmutableSegment}
2498   *    remained at the head of {@link CompactingMemStore#pipeline},the old is removed by
2499   *    {@link CompactingMemStore#swapPipelineWithNull}.
2500   * </pre>
2501   */
2502  @Test
2503  public void testFlattenSnapshotWriteCompactingMemeStoreConcurrently() throws Exception {
2504    Configuration conf = HBaseConfiguration.create();
2505
2506    byte[] smallValue = new byte[3];
2507    byte[] largeValue = new byte[9];
2508    final long timestamp = EnvironmentEdgeManager.currentTime();
2509    final long seqId = 100;
2510    final ExtendedCell smallCell = createCell(qf1, timestamp, seqId, smallValue);
2511    final ExtendedCell largeCell = createCell(qf2, timestamp, seqId, largeValue);
2512    int smallCellByteSize = MutableSegment.getCellLength(smallCell);
2513    int largeCellByteSize = MutableSegment.getCellLength(largeCell);
2514    int firstWriteCellByteSize = (smallCellByteSize + largeCellByteSize);
2515    int flushByteSize = firstWriteCellByteSize - 2;
2516
2517    // set CompactingMemStore.inmemoryFlushSize to flushByteSize.
2518    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore5.class.getName());
2519    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
2520    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
2521
2522    init(name, conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
2523      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
2524
2525    final MyCompactingMemStore5 myCompactingMemStore = ((MyCompactingMemStore5) store.memstore);
2526    assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
2527
2528    store.add(smallCell, new NonThreadSafeMemStoreSizing());
2529    store.add(largeCell, new NonThreadSafeMemStoreSizing());
2530
2531    final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
2532    final ExtendedCell writeAgainCell1 = createCell(qf3, timestamp, seqId + 1, largeValue);
2533    final ExtendedCell writeAgainCell2 = createCell(qf4, timestamp, seqId + 1, largeValue);
2534    final int writeAgainCellByteSize =
2535      MutableSegment.getCellLength(writeAgainCell1) + MutableSegment.getCellLength(writeAgainCell2);
2536    final Thread writeAgainThread = new Thread(() -> {
2537      try {
2538        myCompactingMemStore.writeMemStoreAgainStartCyclicBarrier.await();
2539
2540        store.add(writeAgainCell1, new NonThreadSafeMemStoreSizing());
2541        store.add(writeAgainCell2, new NonThreadSafeMemStoreSizing());
2542
2543        myCompactingMemStore.writeMemStoreAgainEndCyclicBarrier.await();
2544      } catch (Throwable exception) {
2545        exceptionRef.set(exception);
2546      }
2547    });
2548    writeAgainThread.setName(MyCompactingMemStore5.WRITE_AGAIN_THREAD_NAME);
2549    writeAgainThread.start();
2550
2551    String oldThreadName = Thread.currentThread().getName();
2552    try {
2553      Thread.currentThread().setName(MyCompactingMemStore5.TAKE_SNAPSHOT_THREAD_NAME);
2554      /**
2555       * {@link CompactingMemStore#snapshot} must wait the in memory compact thread enters
2556       * {@link CompactingMemStore#flattenOneSegment},because {@link CompactingMemStore#snapshot}
2557       * would invoke {@link CompactingMemStore#stopCompaction}.
2558       */
2559      myCompactingMemStore.snapShotStartCyclicCyclicBarrier.await();
2560      MemStoreSnapshot memStoreSnapshot = myCompactingMemStore.snapshot();
2561      myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
2562      writeAgainThread.join();
2563
2564      assertTrue(memStoreSnapshot.getCellsCount() == 2);
2565      assertTrue(((int) (memStoreSnapshot.getDataSize())) == firstWriteCellByteSize);
2566      VersionedSegmentsList segments = myCompactingMemStore.getImmutableSegments();
2567      assertTrue(segments.getNumOfSegments() == 1);
2568      assertTrue(
2569        ((int) (segments.getStoreSegments().get(0).getDataSize())) == writeAgainCellByteSize);
2570      assertTrue(segments.getNumOfCells() == 2);
2571      assertTrue(myCompactingMemStore.setInMemoryCompactionFlagCounter.get() == 2);
2572      assertTrue(exceptionRef.get() == null);
2573      assertTrue(myCompactingMemStore.swapPipelineWithNullCounter.get() == 2);
2574    } finally {
2575      Thread.currentThread().setName(oldThreadName);
2576    }
2577  }
2578
2579  /**
2580   * <pre>
2581   * This test is for HBASE-26465,
2582   * test {@link DefaultMemStore#clearSnapshot} and {@link DefaultMemStore#getScanners} execute
2583   * concurrently. The threads sequence before HBASE-26465 is:
2584   * 1.The flush thread starts {@link DefaultMemStore} flushing after some cells have be added to
2585   *  {@link DefaultMemStore}.
2586   * 2.The flush thread stopping before {@link DefaultMemStore#clearSnapshot} in
2587   *   {@link HStore#updateStorefiles} after completed flushing memStore to hfile.
2588   * 3.The scan thread starts and stopping after {@link DefaultMemStore#getSnapshotSegments} in
2589   *   {@link DefaultMemStore#getScanners},here the scan thread gets the
2590   *   {@link DefaultMemStore#snapshot} which is created by the flush thread.
2591   * 4.The flush thread continues {@link DefaultMemStore#clearSnapshot} and close
2592   *   {@link DefaultMemStore#snapshot},because the reference count of the corresponding
2593   *   {@link MemStoreLABImpl} is 0, the {@link Chunk}s in corresponding {@link MemStoreLABImpl}
2594   *   are recycled.
2595   * 5.The scan thread continues {@link DefaultMemStore#getScanners},and create a
2596   *   {@link SegmentScanner} for this {@link DefaultMemStore#snapshot}, and increase the
2597   *   reference count of the corresponding {@link MemStoreLABImpl}, but {@link Chunk}s in
2598   *   corresponding {@link MemStoreLABImpl} are recycled by step 4, and these {@link Chunk}s may
2599   *   be overwritten by other write threads,which may cause serious problem.
2600   * After HBASE-26465,{@link DefaultMemStore#getScanners} and
2601   * {@link DefaultMemStore#clearSnapshot} could not execute concurrently.
2602   * </pre>
2603   */
2604  @Test
2605  public void testClearSnapshotGetScannerConcurrently() throws Exception {
2606    Configuration conf = HBaseConfiguration.create();
2607
2608    byte[] smallValue = new byte[3];
2609    byte[] largeValue = new byte[9];
2610    final long timestamp = EnvironmentEdgeManager.currentTime();
2611    final long seqId = 100;
2612    final ExtendedCell smallCell = createCell(qf1, timestamp, seqId, smallValue);
2613    final ExtendedCell largeCell = createCell(qf2, timestamp, seqId, largeValue);
2614    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
2615    quals.add(qf1);
2616    quals.add(qf2);
2617
2618    conf.set(HStore.MEMSTORE_CLASS_NAME, MyDefaultMemStore.class.getName());
2619    conf.setBoolean(WALFactory.WAL_ENABLED, false);
2620
2621    init(name, conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build());
2622    MyDefaultMemStore myDefaultMemStore = (MyDefaultMemStore) (store.memstore);
2623    myDefaultMemStore.store = store;
2624
2625    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
2626    store.add(smallCell, memStoreSizing);
2627    store.add(largeCell, memStoreSizing);
2628
2629    final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
2630    final Thread flushThread = new Thread(() -> {
2631      try {
2632        flushStore(store, id++);
2633      } catch (Throwable exception) {
2634        exceptionRef.set(exception);
2635      }
2636    });
2637    flushThread.setName(MyDefaultMemStore.FLUSH_THREAD_NAME);
2638    flushThread.start();
2639
2640    String oldThreadName = Thread.currentThread().getName();
2641    StoreScanner storeScanner = null;
2642    try {
2643      Thread.currentThread().setName(MyDefaultMemStore.GET_SCANNER_THREAD_NAME);
2644
2645      /**
2646       * Wait flush thread stopping before {@link DefaultMemStore#doClearSnapshot}
2647       */
2648      myDefaultMemStore.getScannerCyclicBarrier.await();
2649
2650      storeScanner = (StoreScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1);
2651      flushThread.join();
2652
2653      if (myDefaultMemStore.shouldWait) {
2654        SegmentScanner segmentScanner = getTypeKeyValueScanner(storeScanner, SegmentScanner.class);
2655        MemStoreLABImpl memStoreLAB = (MemStoreLABImpl) (segmentScanner.segment.getMemStoreLAB());
2656        assertTrue(memStoreLAB.isClosed());
2657        assertTrue(!memStoreLAB.chunks.isEmpty());
2658        assertTrue(!memStoreLAB.isReclaimed());
2659
2660        ExtendedCell cell1 = segmentScanner.next();
2661        PrivateCellUtil.equals(smallCell, cell1);
2662        ExtendedCell cell2 = segmentScanner.next();
2663        PrivateCellUtil.equals(largeCell, cell2);
2664        assertNull(segmentScanner.next());
2665      } else {
2666        List<ExtendedCell> results = new ArrayList<>();
2667        storeScanner.next(results);
2668        assertEquals(2, results.size());
2669        PrivateCellUtil.equals(smallCell, results.get(0));
2670        PrivateCellUtil.equals(largeCell, results.get(1));
2671      }
2672      assertTrue(exceptionRef.get() == null);
2673    } finally {
2674      if (storeScanner != null) {
2675        storeScanner.close();
2676      }
2677      Thread.currentThread().setName(oldThreadName);
2678    }
2679  }
2680
2681  @SuppressWarnings("unchecked")
2682  private <T> T getTypeKeyValueScanner(StoreScanner storeScanner, Class<T> keyValueScannerClass) {
2683    List<T> resultScanners = new ArrayList<T>();
2684    for (KeyValueScanner keyValueScanner : storeScanner.currentScanners) {
2685      if (keyValueScannerClass.isInstance(keyValueScanner)) {
2686        resultScanners.add((T) keyValueScanner);
2687      }
2688    }
2689    assertTrue(resultScanners.size() == 1);
2690    return resultScanners.get(0);
2691  }
2692
2693  @Test
2694  public void testOnConfigurationChange() throws IOException {
2695    final int COMMON_MAX_FILES_TO_COMPACT = 10;
2696    final int NEW_COMMON_MAX_FILES_TO_COMPACT = 8;
2697    final int STORE_MAX_FILES_TO_COMPACT = 6;
2698
2699    // Build a table that its maxFileToCompact different from common configuration.
2700    Configuration conf = HBaseConfiguration.create();
2701    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY,
2702      COMMON_MAX_FILES_TO_COMPACT);
2703    conf.setBoolean(CACHE_DATA_ON_READ_KEY, false);
2704    conf.setBoolean(CACHE_BLOCKS_ON_WRITE_KEY, true);
2705    conf.setBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, true);
2706    ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.newBuilder(family)
2707      .setConfiguration(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY,
2708        String.valueOf(STORE_MAX_FILES_TO_COMPACT))
2709      .build();
2710    init(name, conf, hcd);
2711
2712    // After updating common configuration, the conf in HStore itself must not be changed.
2713    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY,
2714      NEW_COMMON_MAX_FILES_TO_COMPACT);
2715    this.store.onConfigurationChange(conf);
2716
2717    assertEquals(STORE_MAX_FILES_TO_COMPACT,
2718      store.getStoreEngine().getCompactionPolicy().getConf().getMaxFilesToCompact());
2719
2720    assertEquals(conf.getBoolean(CACHE_DATA_ON_READ_KEY, DEFAULT_CACHE_DATA_ON_READ), false);
2721    assertEquals(conf.getBoolean(CACHE_BLOCKS_ON_WRITE_KEY, DEFAULT_CACHE_DATA_ON_WRITE), true);
2722    assertEquals(conf.getBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, DEFAULT_EVICT_ON_CLOSE), true);
2723
2724    // reset to default values
2725    conf.getBoolean(CACHE_DATA_ON_READ_KEY, DEFAULT_CACHE_DATA_ON_READ);
2726    conf.getBoolean(CACHE_BLOCKS_ON_WRITE_KEY, DEFAULT_CACHE_DATA_ON_WRITE);
2727    conf.getBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, DEFAULT_EVICT_ON_CLOSE);
2728    this.store.onConfigurationChange(conf);
2729  }
2730
2731  /**
2732   * This test is for HBASE-26476
2733   */
2734  @Test
2735  public void testExtendsDefaultMemStore() throws Exception {
2736    Configuration conf = HBaseConfiguration.create();
2737    conf.setBoolean(WALFactory.WAL_ENABLED, false);
2738
2739    init(name, conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build());
2740    assertTrue(this.store.memstore.getClass() == DefaultMemStore.class);
2741    tearDown();
2742
2743    conf.set(HStore.MEMSTORE_CLASS_NAME, CustomDefaultMemStore.class.getName());
2744    init(name, conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build());
2745    assertTrue(this.store.memstore.getClass() == CustomDefaultMemStore.class);
2746  }
2747
2748  static class CustomDefaultMemStore extends DefaultMemStore {
2749
2750    public CustomDefaultMemStore(Configuration conf, CellComparator c,
2751      RegionServicesForStores regionServices) {
2752      super(conf, c, regionServices);
2753    }
2754
2755  }
2756
2757  /**
2758   * This test is for HBASE-26488
2759   */
2760  @Test
2761  public void testMemoryLeakWhenFlushMemStoreRetrying() throws Exception {
2762    Configuration conf = HBaseConfiguration.create();
2763
2764    byte[] smallValue = new byte[3];
2765    byte[] largeValue = new byte[9];
2766    final long timestamp = EnvironmentEdgeManager.currentTime();
2767    final long seqId = 100;
2768    final ExtendedCell smallCell = createCell(qf1, timestamp, seqId, smallValue);
2769    final ExtendedCell largeCell = createCell(qf2, timestamp, seqId, largeValue);
2770    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
2771    quals.add(qf1);
2772    quals.add(qf2);
2773
2774    conf.set(HStore.MEMSTORE_CLASS_NAME, MyDefaultMemStore1.class.getName());
2775    conf.setBoolean(WALFactory.WAL_ENABLED, false);
2776    conf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY,
2777      MyDefaultStoreFlusher.class.getName());
2778
2779    init(name, conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build());
2780    MyDefaultMemStore1 myDefaultMemStore = (MyDefaultMemStore1) (store.memstore);
2781    assertTrue((store.storeEngine.getStoreFlusher()) instanceof MyDefaultStoreFlusher);
2782
2783    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
2784    store.add(smallCell, memStoreSizing);
2785    store.add(largeCell, memStoreSizing);
2786    flushStore(store, id++);
2787
2788    MemStoreLABImpl memStoreLAB =
2789      (MemStoreLABImpl) (myDefaultMemStore.snapshotImmutableSegment.getMemStoreLAB());
2790    assertTrue(memStoreLAB.isClosed());
2791    assertTrue(memStoreLAB.getRefCntValue() == 0);
2792    assertTrue(memStoreLAB.isReclaimed());
2793    assertTrue(memStoreLAB.chunks.isEmpty());
2794    StoreScanner storeScanner = null;
2795    try {
2796      storeScanner = (StoreScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1);
2797      assertTrue(store.storeEngine.getStoreFileManager().getStorefileCount() == 1);
2798      assertTrue(store.memstore.size().getCellsCount() == 0);
2799      assertTrue(store.memstore.getSnapshotSize().getCellsCount() == 0);
2800      assertTrue(storeScanner.currentScanners.size() == 1);
2801      assertTrue(storeScanner.currentScanners.get(0) instanceof StoreFileScanner);
2802
2803      List<ExtendedCell> results = new ArrayList<>();
2804      storeScanner.next(results);
2805      assertEquals(2, results.size());
2806      PrivateCellUtil.equals(smallCell, results.get(0));
2807      PrivateCellUtil.equals(largeCell, results.get(1));
2808    } finally {
2809      if (storeScanner != null) {
2810        storeScanner.close();
2811      }
2812    }
2813  }
2814
2815  static class MyDefaultMemStore1 extends DefaultMemStore {
2816
2817    private ImmutableSegment snapshotImmutableSegment;
2818
2819    public MyDefaultMemStore1(Configuration conf, CellComparator c,
2820      RegionServicesForStores regionServices) {
2821      super(conf, c, regionServices);
2822    }
2823
2824    @Override
2825    public MemStoreSnapshot snapshot() {
2826      MemStoreSnapshot result = super.snapshot();
2827      this.snapshotImmutableSegment = snapshot;
2828      return result;
2829    }
2830
2831  }
2832
2833  public static class MyDefaultStoreFlusher extends DefaultStoreFlusher {
2834    private static final AtomicInteger failCounter = new AtomicInteger(1);
2835    private static final AtomicInteger counter = new AtomicInteger(0);
2836
2837    public MyDefaultStoreFlusher(Configuration conf, HStore store) {
2838      super(conf, store);
2839    }
2840
2841    @Override
2842    public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
2843      MonitoredTask status, ThroughputController throughputController,
2844      FlushLifeCycleTracker tracker, Consumer<Path> writerCreationTracker) throws IOException {
2845      counter.incrementAndGet();
2846      return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker,
2847        writerCreationTracker);
2848    }
2849
2850    @Override
2851    protected void performFlush(InternalScanner scanner, final CellSink sink,
2852      ThroughputController throughputController) throws IOException {
2853
2854      final int currentCount = counter.get();
2855      CellSink newCellSink = (cell) -> {
2856        if (currentCount <= failCounter.get()) {
2857          throw new IOException("Simulated exception by tests");
2858        }
2859        sink.append(cell);
2860      };
2861      super.performFlush(scanner, newCellSink, throughputController);
2862    }
2863  }
2864
2865  /**
2866   * This test is for HBASE-26494, test the {@link RefCnt} behaviors in {@link ImmutableMemStoreLAB}
2867   */
2868  @Test
2869  public void testImmutableMemStoreLABRefCnt() throws Exception {
2870    Configuration conf = HBaseConfiguration.create();
2871
2872    byte[] smallValue = new byte[3];
2873    byte[] largeValue = new byte[9];
2874    final long timestamp = EnvironmentEdgeManager.currentTime();
2875    final long seqId = 100;
2876    final ExtendedCell smallCell1 = createCell(qf1, timestamp, seqId, smallValue);
2877    final ExtendedCell largeCell1 = createCell(qf2, timestamp, seqId, largeValue);
2878    final ExtendedCell smallCell2 = createCell(qf3, timestamp, seqId + 1, smallValue);
2879    final ExtendedCell largeCell2 = createCell(qf4, timestamp, seqId + 1, largeValue);
2880    final ExtendedCell smallCell3 = createCell(qf5, timestamp, seqId + 2, smallValue);
2881    final ExtendedCell largeCell3 = createCell(qf6, timestamp, seqId + 2, largeValue);
2882
2883    int smallCellByteSize = MutableSegment.getCellLength(smallCell1);
2884    int largeCellByteSize = MutableSegment.getCellLength(largeCell1);
2885    int firstWriteCellByteSize = (smallCellByteSize + largeCellByteSize);
2886    int flushByteSize = firstWriteCellByteSize - 2;
2887
2888    // set CompactingMemStore.inmemoryFlushSize to flushByteSize.
2889    conf.set(HStore.MEMSTORE_CLASS_NAME, CompactingMemStore.class.getName());
2890    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
2891    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
2892    conf.setBoolean(WALFactory.WAL_ENABLED, false);
2893
2894    init(name, conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
2895      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
2896
2897    final CompactingMemStore myCompactingMemStore = ((CompactingMemStore) store.memstore);
2898    assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
2899    myCompactingMemStore.allowCompaction.set(false);
2900
2901    NonThreadSafeMemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
2902    store.add(smallCell1, memStoreSizing);
2903    store.add(largeCell1, memStoreSizing);
2904    store.add(smallCell2, memStoreSizing);
2905    store.add(largeCell2, memStoreSizing);
2906    store.add(smallCell3, memStoreSizing);
2907    store.add(largeCell3, memStoreSizing);
2908    VersionedSegmentsList versionedSegmentsList = myCompactingMemStore.getImmutableSegments();
2909    assertTrue(versionedSegmentsList.getNumOfSegments() == 3);
2910    List<ImmutableSegment> segments = versionedSegmentsList.getStoreSegments();
2911    List<MemStoreLABImpl> memStoreLABs = new ArrayList<MemStoreLABImpl>(segments.size());
2912    for (ImmutableSegment segment : segments) {
2913      memStoreLABs.add((MemStoreLABImpl) segment.getMemStoreLAB());
2914    }
2915    List<KeyValueScanner> scanners1 = myCompactingMemStore.getScanners(Long.MAX_VALUE);
2916    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2917      assertTrue(memStoreLAB.getRefCntValue() == 2);
2918    }
2919
2920    myCompactingMemStore.allowCompaction.set(true);
2921    myCompactingMemStore.flushInMemory();
2922
2923    versionedSegmentsList = myCompactingMemStore.getImmutableSegments();
2924    assertTrue(versionedSegmentsList.getNumOfSegments() == 1);
2925    ImmutableMemStoreLAB immutableMemStoreLAB =
2926      (ImmutableMemStoreLAB) (versionedSegmentsList.getStoreSegments().get(0).getMemStoreLAB());
2927    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2928      assertTrue(memStoreLAB.getRefCntValue() == 2);
2929    }
2930
2931    List<KeyValueScanner> scanners2 = myCompactingMemStore.getScanners(Long.MAX_VALUE);
2932    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2933      assertTrue(memStoreLAB.getRefCntValue() == 2);
2934    }
2935    assertTrue(immutableMemStoreLAB.getRefCntValue() == 2);
2936    for (KeyValueScanner scanner : scanners1) {
2937      scanner.close();
2938    }
2939    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2940      assertTrue(memStoreLAB.getRefCntValue() == 1);
2941    }
2942    for (KeyValueScanner scanner : scanners2) {
2943      scanner.close();
2944    }
2945    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2946      assertTrue(memStoreLAB.getRefCntValue() == 1);
2947    }
2948    assertTrue(immutableMemStoreLAB.getRefCntValue() == 1);
2949    flushStore(store, id++);
2950    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2951      assertTrue(memStoreLAB.getRefCntValue() == 0);
2952    }
2953    assertTrue(immutableMemStoreLAB.getRefCntValue() == 0);
2954    assertTrue(immutableMemStoreLAB.isClosed());
2955    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2956      assertTrue(memStoreLAB.isClosed());
2957      assertTrue(memStoreLAB.isReclaimed());
2958      assertTrue(memStoreLAB.chunks.isEmpty());
2959    }
2960  }
2961
2962  private HStoreFile mockStoreFileWithLength(long length) {
2963    HStoreFile sf = mock(HStoreFile.class);
2964    StoreFileReader sfr = mock(StoreFileReader.class);
2965    when(sf.isHFile()).thenReturn(true);
2966    when(sf.getReader()).thenReturn(sfr);
2967    when(sfr.length()).thenReturn(length);
2968    return sf;
2969  }
2970
2971  private static class MyThread extends Thread {
2972    private StoreScanner scanner;
2973    private KeyValueHeap heap;
2974
2975    public MyThread(StoreScanner scanner) {
2976      this.scanner = scanner;
2977    }
2978
2979    public KeyValueHeap getHeap() {
2980      return this.heap;
2981    }
2982
2983    @Override
2984    public void run() {
2985      scanner.trySwitchToStreamRead();
2986      heap = scanner.heap;
2987    }
2988  }
2989
2990  private static class MyMemStoreCompactor extends MemStoreCompactor {
2991    private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0);
2992    private static final CountDownLatch START_COMPACTOR_LATCH = new CountDownLatch(1);
2993
2994    public MyMemStoreCompactor(CompactingMemStore compactingMemStore,
2995      MemoryCompactionPolicy compactionPolicy) throws IllegalArgumentIOException {
2996      super(compactingMemStore, compactionPolicy);
2997    }
2998
2999    @Override
3000    public boolean start() throws IOException {
3001      boolean isFirst = RUNNER_COUNT.getAndIncrement() == 0;
3002      if (isFirst) {
3003        try {
3004          START_COMPACTOR_LATCH.await();
3005          return super.start();
3006        } catch (InterruptedException ex) {
3007          throw new RuntimeException(ex);
3008        }
3009      }
3010      return super.start();
3011    }
3012  }
3013
3014  public static class MyCompactingMemStoreWithCustomCompactor extends CompactingMemStore {
3015    private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0);
3016
3017    public MyCompactingMemStoreWithCustomCompactor(Configuration conf, CellComparatorImpl c,
3018      HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
3019      throws IOException {
3020      super(conf, c, store, regionServices, compactionPolicy);
3021    }
3022
3023    @Override
3024    protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy)
3025      throws IllegalArgumentIOException {
3026      return new MyMemStoreCompactor(this, compactionPolicy);
3027    }
3028
3029    @Override
3030    protected boolean setInMemoryCompactionFlag() {
3031      boolean rval = super.setInMemoryCompactionFlag();
3032      if (rval) {
3033        RUNNER_COUNT.incrementAndGet();
3034        if (LOG.isDebugEnabled()) {
3035          LOG.debug("runner count: " + RUNNER_COUNT.get());
3036        }
3037      }
3038      return rval;
3039    }
3040  }
3041
3042  public static class MyCompactingMemStore extends CompactingMemStore {
3043    private static final AtomicBoolean START_TEST = new AtomicBoolean(false);
3044    private final CountDownLatch getScannerLatch = new CountDownLatch(1);
3045    private final CountDownLatch snapshotLatch = new CountDownLatch(1);
3046
3047    public MyCompactingMemStore(Configuration conf, CellComparatorImpl c, HStore store,
3048      RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
3049      throws IOException {
3050      super(conf, c, store, regionServices, compactionPolicy);
3051    }
3052
3053    @Override
3054    protected List<KeyValueScanner> createList(int capacity) {
3055      if (START_TEST.get()) {
3056        try {
3057          getScannerLatch.countDown();
3058          snapshotLatch.await();
3059        } catch (InterruptedException e) {
3060          throw new RuntimeException(e);
3061        }
3062      }
3063      return new ArrayList<>(capacity);
3064    }
3065
3066    @Override
3067    protected void pushActiveToPipeline(MutableSegment active, boolean checkEmpty) {
3068      if (START_TEST.get()) {
3069        try {
3070          getScannerLatch.await();
3071        } catch (InterruptedException e) {
3072          throw new RuntimeException(e);
3073        }
3074      }
3075
3076      super.pushActiveToPipeline(active, checkEmpty);
3077      if (START_TEST.get()) {
3078        snapshotLatch.countDown();
3079      }
3080    }
3081  }
3082
3083  interface MyListHook {
3084    void hook(int currentSize);
3085  }
3086
3087  private static class MyList<T> implements List<T> {
3088    private final List<T> delegatee = new ArrayList<>();
3089    private final MyListHook hookAtAdd;
3090
3091    MyList(final MyListHook hookAtAdd) {
3092      this.hookAtAdd = hookAtAdd;
3093    }
3094
3095    @Override
3096    public int size() {
3097      return delegatee.size();
3098    }
3099
3100    @Override
3101    public boolean isEmpty() {
3102      return delegatee.isEmpty();
3103    }
3104
3105    @Override
3106    public boolean contains(Object o) {
3107      return delegatee.contains(o);
3108    }
3109
3110    @Override
3111    public Iterator<T> iterator() {
3112      return delegatee.iterator();
3113    }
3114
3115    @Override
3116    public Object[] toArray() {
3117      return delegatee.toArray();
3118    }
3119
3120    @Override
3121    public <R> R[] toArray(R[] a) {
3122      return delegatee.toArray(a);
3123    }
3124
3125    @Override
3126    public boolean add(T e) {
3127      hookAtAdd.hook(size());
3128      return delegatee.add(e);
3129    }
3130
3131    @Override
3132    public boolean remove(Object o) {
3133      return delegatee.remove(o);
3134    }
3135
3136    @Override
3137    public boolean containsAll(Collection<?> c) {
3138      return delegatee.containsAll(c);
3139    }
3140
3141    @Override
3142    public boolean addAll(Collection<? extends T> c) {
3143      return delegatee.addAll(c);
3144    }
3145
3146    @Override
3147    public boolean addAll(int index, Collection<? extends T> c) {
3148      return delegatee.addAll(index, c);
3149    }
3150
3151    @Override
3152    public boolean removeAll(Collection<?> c) {
3153      return delegatee.removeAll(c);
3154    }
3155
3156    @Override
3157    public boolean retainAll(Collection<?> c) {
3158      return delegatee.retainAll(c);
3159    }
3160
3161    @Override
3162    public void clear() {
3163      delegatee.clear();
3164    }
3165
3166    @Override
3167    public T get(int index) {
3168      return delegatee.get(index);
3169    }
3170
3171    @Override
3172    public T set(int index, T element) {
3173      return delegatee.set(index, element);
3174    }
3175
3176    @Override
3177    public void add(int index, T element) {
3178      delegatee.add(index, element);
3179    }
3180
3181    @Override
3182    public T remove(int index) {
3183      return delegatee.remove(index);
3184    }
3185
3186    @Override
3187    public int indexOf(Object o) {
3188      return delegatee.indexOf(o);
3189    }
3190
3191    @Override
3192    public int lastIndexOf(Object o) {
3193      return delegatee.lastIndexOf(o);
3194    }
3195
3196    @Override
3197    public ListIterator<T> listIterator() {
3198      return delegatee.listIterator();
3199    }
3200
3201    @Override
3202    public ListIterator<T> listIterator(int index) {
3203      return delegatee.listIterator(index);
3204    }
3205
3206    @Override
3207    public List<T> subList(int fromIndex, int toIndex) {
3208      return delegatee.subList(fromIndex, toIndex);
3209    }
3210  }
3211
3212  private interface MyKeyValueHeapHook {
3213    void onRecordBlockSize(int recordBlockSizeCallCount);
3214  }
3215
3216  private static class MyKeyValueHeap extends KeyValueHeap {
3217    private final MyKeyValueHeapHook hook;
3218    private int recordBlockSizeCallCount;
3219
3220    public MyKeyValueHeap(List<? extends KeyValueScanner> scanners, CellComparator comparator,
3221      MyKeyValueHeapHook hook) throws IOException {
3222      super(scanners, comparator);
3223      this.hook = hook;
3224    }
3225
3226    @Override
3227    public void recordBlockSize(IntConsumer blockSizeConsumer) {
3228      recordBlockSizeCallCount++;
3229      hook.onRecordBlockSize(recordBlockSizeCallCount);
3230      super.recordBlockSize(blockSizeConsumer);
3231    }
3232  }
3233
3234  public static class MyCompactingMemStore2 extends CompactingMemStore {
3235    private static final String LARGE_CELL_THREAD_NAME = "largeCellThread";
3236    private static final String SMALL_CELL_THREAD_NAME = "smallCellThread";
3237    private final CyclicBarrier preCyclicBarrier = new CyclicBarrier(2);
3238    private final CyclicBarrier postCyclicBarrier = new CyclicBarrier(2);
3239    private final AtomicInteger largeCellPreUpdateCounter = new AtomicInteger(0);
3240    private final AtomicInteger smallCellPreUpdateCounter = new AtomicInteger(0);
3241
3242    public MyCompactingMemStore2(Configuration conf, CellComparatorImpl cellComparator,
3243      HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
3244      throws IOException {
3245      super(conf, cellComparator, store, regionServices, compactionPolicy);
3246    }
3247
3248    @Override
3249    protected boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd,
3250      MemStoreSizing memstoreSizing) {
3251      if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) {
3252        int currentCount = largeCellPreUpdateCounter.incrementAndGet();
3253        if (currentCount <= 1) {
3254          try {
3255            /**
3256             * smallCellThread enters CompactingMemStore.checkAndAddToActiveSize first, then
3257             * largeCellThread enters CompactingMemStore.checkAndAddToActiveSize, and then
3258             * largeCellThread invokes flushInMemory.
3259             */
3260            preCyclicBarrier.await();
3261          } catch (Throwable e) {
3262            throw new RuntimeException(e);
3263          }
3264        }
3265      }
3266
3267      boolean returnValue = super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing);
3268      if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) {
3269        try {
3270          preCyclicBarrier.await();
3271        } catch (Throwable e) {
3272          throw new RuntimeException(e);
3273        }
3274      }
3275      return returnValue;
3276    }
3277
3278    @Override
3279    protected void doAdd(MutableSegment currentActive, ExtendedCell cell,
3280      MemStoreSizing memstoreSizing) {
3281      if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) {
3282        try {
3283          /**
3284           * After largeCellThread finished flushInMemory method, smallCellThread can add cell to
3285           * currentActive . That is to say when largeCellThread called flushInMemory method,
3286           * currentActive has no cell.
3287           */
3288          postCyclicBarrier.await();
3289        } catch (Throwable e) {
3290          throw new RuntimeException(e);
3291        }
3292      }
3293      super.doAdd(currentActive, cell, memstoreSizing);
3294    }
3295
3296    @Override
3297    protected void flushInMemory(MutableSegment currentActiveMutableSegment) {
3298      super.flushInMemory(currentActiveMutableSegment);
3299      if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) {
3300        if (largeCellPreUpdateCounter.get() <= 1) {
3301          try {
3302            postCyclicBarrier.await();
3303          } catch (Throwable e) {
3304            throw new RuntimeException(e);
3305          }
3306        }
3307      }
3308    }
3309
3310  }
3311
3312  public static class MyCompactingMemStore3 extends CompactingMemStore {
3313    private static final String LARGE_CELL_THREAD_NAME = "largeCellThread";
3314    private static final String SMALL_CELL_THREAD_NAME = "smallCellThread";
3315
3316    private final CyclicBarrier preCyclicBarrier = new CyclicBarrier(2);
3317    private final CyclicBarrier postCyclicBarrier = new CyclicBarrier(2);
3318    private final AtomicInteger flushCounter = new AtomicInteger(0);
3319    private static final int CELL_COUNT = 5;
3320    private boolean flushByteSizeLessThanSmallAndLargeCellSize = true;
3321
3322    public MyCompactingMemStore3(Configuration conf, CellComparatorImpl cellComparator,
3323      HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
3324      throws IOException {
3325      super(conf, cellComparator, store, regionServices, compactionPolicy);
3326    }
3327
3328    @Override
3329    protected boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd,
3330      MemStoreSizing memstoreSizing) {
3331      if (!flushByteSizeLessThanSmallAndLargeCellSize) {
3332        return super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing);
3333      }
3334      if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) {
3335        try {
3336          preCyclicBarrier.await();
3337        } catch (Throwable e) {
3338          throw new RuntimeException(e);
3339        }
3340      }
3341
3342      boolean returnValue = super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing);
3343      if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) {
3344        try {
3345          preCyclicBarrier.await();
3346        } catch (Throwable e) {
3347          throw new RuntimeException(e);
3348        }
3349      }
3350      return returnValue;
3351    }
3352
3353    @Override
3354    protected void postUpdate(MutableSegment currentActiveMutableSegment) {
3355      super.postUpdate(currentActiveMutableSegment);
3356      if (!flushByteSizeLessThanSmallAndLargeCellSize) {
3357        try {
3358          postCyclicBarrier.await();
3359        } catch (Throwable e) {
3360          throw new RuntimeException(e);
3361        }
3362        return;
3363      }
3364
3365      if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) {
3366        try {
3367          postCyclicBarrier.await();
3368        } catch (Throwable e) {
3369          throw new RuntimeException(e);
3370        }
3371      }
3372    }
3373
3374    @Override
3375    protected void flushInMemory(MutableSegment currentActiveMutableSegment) {
3376      super.flushInMemory(currentActiveMutableSegment);
3377      flushCounter.incrementAndGet();
3378      if (!flushByteSizeLessThanSmallAndLargeCellSize) {
3379        return;
3380      }
3381
3382      assertTrue(Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME));
3383      try {
3384        postCyclicBarrier.await();
3385      } catch (Throwable e) {
3386        throw new RuntimeException(e);
3387      }
3388
3389    }
3390
3391    void disableCompaction() {
3392      allowCompaction.set(false);
3393    }
3394
3395    void enableCompaction() {
3396      allowCompaction.set(true);
3397    }
3398
3399  }
3400
3401  public static class MyCompactingMemStore4 extends CompactingMemStore {
3402    private static final String TAKE_SNAPSHOT_THREAD_NAME = "takeSnapShotThread";
3403    /**
3404     * {@link CompactingMemStore#flattenOneSegment} must execute after
3405     * {@link CompactingMemStore#getImmutableSegments}
3406     */
3407    private final CyclicBarrier flattenOneSegmentPreCyclicBarrier = new CyclicBarrier(2);
3408    /**
3409     * Only after {@link CompactingMemStore#flattenOneSegment} completed,
3410     * {@link CompactingMemStore#swapPipelineWithNull} could execute.
3411     */
3412    private final CyclicBarrier flattenOneSegmentPostCyclicBarrier = new CyclicBarrier(2);
3413    /**
3414     * Only the in memory compact thread enters {@link CompactingMemStore#flattenOneSegment},the
3415     * snapshot thread starts {@link CompactingMemStore#snapshot},because
3416     * {@link CompactingMemStore#snapshot} would invoke {@link CompactingMemStore#stopCompaction}.
3417     */
3418    private final CyclicBarrier snapShotStartCyclicCyclicBarrier = new CyclicBarrier(2);
3419    /**
3420     * To wait for {@link CompactingMemStore.InMemoryCompactionRunnable} stopping.
3421     */
3422    private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2);
3423    private final AtomicInteger getImmutableSegmentsListCounter = new AtomicInteger(0);
3424    private final AtomicInteger swapPipelineWithNullCounter = new AtomicInteger(0);
3425    private final AtomicInteger flattenOneSegmentCounter = new AtomicInteger(0);
3426    private final AtomicInteger setInMemoryCompactionFlagCounter = new AtomicInteger(0);
3427
3428    public MyCompactingMemStore4(Configuration conf, CellComparatorImpl cellComparator,
3429      HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
3430      throws IOException {
3431      super(conf, cellComparator, store, regionServices, compactionPolicy);
3432    }
3433
3434    @Override
3435    public VersionedSegmentsList getImmutableSegments() {
3436      VersionedSegmentsList result = super.getImmutableSegments();
3437      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
3438        int currentCount = getImmutableSegmentsListCounter.incrementAndGet();
3439        if (currentCount <= 1) {
3440          try {
3441            flattenOneSegmentPreCyclicBarrier.await();
3442          } catch (Throwable e) {
3443            throw new RuntimeException(e);
3444          }
3445        }
3446      }
3447      return result;
3448    }
3449
3450    @Override
3451    protected boolean swapPipelineWithNull(VersionedSegmentsList segments) {
3452      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
3453        int currentCount = swapPipelineWithNullCounter.incrementAndGet();
3454        if (currentCount <= 1) {
3455          try {
3456            flattenOneSegmentPostCyclicBarrier.await();
3457          } catch (Throwable e) {
3458            throw new RuntimeException(e);
3459          }
3460        }
3461      }
3462      boolean result = super.swapPipelineWithNull(segments);
3463      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
3464        int currentCount = swapPipelineWithNullCounter.get();
3465        if (currentCount <= 1) {
3466          assertTrue(!result);
3467        }
3468        if (currentCount == 2) {
3469          assertTrue(result);
3470        }
3471      }
3472      return result;
3473
3474    }
3475
3476    @Override
3477    public void flattenOneSegment(long requesterVersion, Action action) {
3478      int currentCount = flattenOneSegmentCounter.incrementAndGet();
3479      if (currentCount <= 1) {
3480        try {
3481          /**
3482           * {@link CompactingMemStore#snapshot} could start.
3483           */
3484          snapShotStartCyclicCyclicBarrier.await();
3485          flattenOneSegmentPreCyclicBarrier.await();
3486        } catch (Throwable e) {
3487          throw new RuntimeException(e);
3488        }
3489      }
3490      super.flattenOneSegment(requesterVersion, action);
3491      if (currentCount <= 1) {
3492        try {
3493          flattenOneSegmentPostCyclicBarrier.await();
3494        } catch (Throwable e) {
3495          throw new RuntimeException(e);
3496        }
3497      }
3498    }
3499
3500    @Override
3501    protected boolean setInMemoryCompactionFlag() {
3502      boolean result = super.setInMemoryCompactionFlag();
3503      assertTrue(result);
3504      setInMemoryCompactionFlagCounter.incrementAndGet();
3505      return result;
3506    }
3507
3508    @Override
3509    void inMemoryCompaction() {
3510      try {
3511        super.inMemoryCompaction();
3512      } finally {
3513        try {
3514          inMemoryCompactionEndCyclicBarrier.await();
3515        } catch (Throwable e) {
3516          throw new RuntimeException(e);
3517        }
3518
3519      }
3520    }
3521
3522  }
3523
3524  public static class MyCompactingMemStore5 extends CompactingMemStore {
3525    private static final String TAKE_SNAPSHOT_THREAD_NAME = "takeSnapShotThread";
3526    private static final String WRITE_AGAIN_THREAD_NAME = "writeAgainThread";
3527    /**
3528     * {@link CompactingMemStore#flattenOneSegment} must execute after
3529     * {@link CompactingMemStore#getImmutableSegments}
3530     */
3531    private final CyclicBarrier flattenOneSegmentPreCyclicBarrier = new CyclicBarrier(2);
3532    /**
3533     * Only after {@link CompactingMemStore#flattenOneSegment} completed,
3534     * {@link CompactingMemStore#swapPipelineWithNull} could execute.
3535     */
3536    private final CyclicBarrier flattenOneSegmentPostCyclicBarrier = new CyclicBarrier(2);
3537    /**
3538     * Only the in memory compact thread enters {@link CompactingMemStore#flattenOneSegment},the
3539     * snapshot thread starts {@link CompactingMemStore#snapshot},because
3540     * {@link CompactingMemStore#snapshot} would invoke {@link CompactingMemStore#stopCompaction}.
3541     */
3542    private final CyclicBarrier snapShotStartCyclicCyclicBarrier = new CyclicBarrier(2);
3543    /**
3544     * To wait for {@link CompactingMemStore.InMemoryCompactionRunnable} stopping.
3545     */
3546    private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2);
3547    private final AtomicInteger getImmutableSegmentsListCounter = new AtomicInteger(0);
3548    private final AtomicInteger swapPipelineWithNullCounter = new AtomicInteger(0);
3549    private final AtomicInteger flattenOneSegmentCounter = new AtomicInteger(0);
3550    private final AtomicInteger setInMemoryCompactionFlagCounter = new AtomicInteger(0);
3551    /**
3552     * Only the snapshot thread retry {@link CompactingMemStore#swapPipelineWithNull}, writeAgain
3553     * thread could start.
3554     */
3555    private final CyclicBarrier writeMemStoreAgainStartCyclicBarrier = new CyclicBarrier(2);
3556    /**
3557     * This is used for snapshot thread,writeAgain thread and in memory compact thread. Only the
3558     * writeAgain thread completes, {@link CompactingMemStore#swapPipelineWithNull} would
3559     * execute,and in memory compact thread would exit,because we expect that in memory compact
3560     * executing only once.
3561     */
3562    private final CyclicBarrier writeMemStoreAgainEndCyclicBarrier = new CyclicBarrier(3);
3563
3564    public MyCompactingMemStore5(Configuration conf, CellComparatorImpl cellComparator,
3565      HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
3566      throws IOException {
3567      super(conf, cellComparator, store, regionServices, compactionPolicy);
3568    }
3569
3570    @Override
3571    public VersionedSegmentsList getImmutableSegments() {
3572      VersionedSegmentsList result = super.getImmutableSegments();
3573      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
3574        int currentCount = getImmutableSegmentsListCounter.incrementAndGet();
3575        if (currentCount <= 1) {
3576          try {
3577            flattenOneSegmentPreCyclicBarrier.await();
3578          } catch (Throwable e) {
3579            throw new RuntimeException(e);
3580          }
3581        }
3582
3583      }
3584
3585      return result;
3586    }
3587
3588    @Override
3589    protected boolean swapPipelineWithNull(VersionedSegmentsList segments) {
3590      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
3591        int currentCount = swapPipelineWithNullCounter.incrementAndGet();
3592        if (currentCount <= 1) {
3593          try {
3594            flattenOneSegmentPostCyclicBarrier.await();
3595          } catch (Throwable e) {
3596            throw new RuntimeException(e);
3597          }
3598        }
3599
3600        if (currentCount == 2) {
3601          try {
3602            /**
3603             * Only the snapshot thread retry {@link CompactingMemStore#swapPipelineWithNull},
3604             * writeAgain thread could start.
3605             */
3606            writeMemStoreAgainStartCyclicBarrier.await();
3607            /**
3608             * Only the writeAgain thread completes, retry
3609             * {@link CompactingMemStore#swapPipelineWithNull} would execute.
3610             */
3611            writeMemStoreAgainEndCyclicBarrier.await();
3612          } catch (Throwable e) {
3613            throw new RuntimeException(e);
3614          }
3615        }
3616
3617      }
3618      boolean result = super.swapPipelineWithNull(segments);
3619      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
3620        int currentCount = swapPipelineWithNullCounter.get();
3621        if (currentCount <= 1) {
3622          assertTrue(!result);
3623        }
3624        if (currentCount == 2) {
3625          assertTrue(result);
3626        }
3627      }
3628      return result;
3629
3630    }
3631
3632    @Override
3633    public void flattenOneSegment(long requesterVersion, Action action) {
3634      int currentCount = flattenOneSegmentCounter.incrementAndGet();
3635      if (currentCount <= 1) {
3636        try {
3637          /**
3638           * {@link CompactingMemStore#snapshot} could start.
3639           */
3640          snapShotStartCyclicCyclicBarrier.await();
3641          flattenOneSegmentPreCyclicBarrier.await();
3642        } catch (Throwable e) {
3643          throw new RuntimeException(e);
3644        }
3645      }
3646      super.flattenOneSegment(requesterVersion, action);
3647      if (currentCount <= 1) {
3648        try {
3649          flattenOneSegmentPostCyclicBarrier.await();
3650          /**
3651           * Only the writeAgain thread completes, in memory compact thread would exit,because we
3652           * expect that in memory compact executing only once.
3653           */
3654          writeMemStoreAgainEndCyclicBarrier.await();
3655        } catch (Throwable e) {
3656          throw new RuntimeException(e);
3657        }
3658
3659      }
3660    }
3661
3662    @Override
3663    protected boolean setInMemoryCompactionFlag() {
3664      boolean result = super.setInMemoryCompactionFlag();
3665      int count = setInMemoryCompactionFlagCounter.incrementAndGet();
3666      if (count <= 1) {
3667        assertTrue(result);
3668      }
3669      if (count == 2) {
3670        assertTrue(!result);
3671      }
3672      return result;
3673    }
3674
3675    @Override
3676    void inMemoryCompaction() {
3677      try {
3678        super.inMemoryCompaction();
3679      } finally {
3680        try {
3681          inMemoryCompactionEndCyclicBarrier.await();
3682        } catch (Throwable e) {
3683          throw new RuntimeException(e);
3684        }
3685
3686      }
3687    }
3688  }
3689
3690  public static class MyCompactingMemStore6 extends CompactingMemStore {
3691    private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2);
3692
3693    public MyCompactingMemStore6(Configuration conf, CellComparatorImpl cellComparator,
3694      HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
3695      throws IOException {
3696      super(conf, cellComparator, store, regionServices, compactionPolicy);
3697    }
3698
3699    @Override
3700    void inMemoryCompaction() {
3701      try {
3702        super.inMemoryCompaction();
3703      } finally {
3704        try {
3705          inMemoryCompactionEndCyclicBarrier.await();
3706        } catch (Throwable e) {
3707          throw new RuntimeException(e);
3708        }
3709
3710      }
3711    }
3712  }
3713
3714  public static class MyDefaultMemStore extends DefaultMemStore {
3715    private static final String GET_SCANNER_THREAD_NAME = "getScannerMyThread";
3716    private static final String FLUSH_THREAD_NAME = "flushMyThread";
3717    /**
3718     * Only when flush thread enters {@link DefaultMemStore#doClearSnapShot}, getScanner thread
3719     * could start.
3720     */
3721    private final CyclicBarrier getScannerCyclicBarrier = new CyclicBarrier(2);
3722    /**
3723     * Used by getScanner thread notifies flush thread {@link DefaultMemStore#getSnapshotSegments}
3724     * completed, {@link DefaultMemStore#doClearSnapShot} could continue.
3725     */
3726    private final CyclicBarrier preClearSnapShotCyclicBarrier = new CyclicBarrier(2);
3727    /**
3728     * Used by flush thread notifies getScanner thread {@link DefaultMemStore#doClearSnapShot}
3729     * completed, {@link DefaultMemStore#getScanners} could continue.
3730     */
3731    private final CyclicBarrier postClearSnapShotCyclicBarrier = new CyclicBarrier(2);
3732    private final AtomicInteger getSnapshotSegmentsCounter = new AtomicInteger(0);
3733    private final AtomicInteger clearSnapshotCounter = new AtomicInteger(0);
3734    private volatile boolean shouldWait = true;
3735    private volatile HStore store = null;
3736
3737    public MyDefaultMemStore(Configuration conf, CellComparator cellComparator,
3738      RegionServicesForStores regionServices) throws IOException {
3739      super(conf, cellComparator, regionServices);
3740    }
3741
3742    @Override
3743    protected List<Segment> getSnapshotSegments() {
3744
3745      List<Segment> result = super.getSnapshotSegments();
3746
3747      if (Thread.currentThread().getName().equals(GET_SCANNER_THREAD_NAME)) {
3748        int currentCount = getSnapshotSegmentsCounter.incrementAndGet();
3749        if (currentCount == 1) {
3750          if (this.shouldWait) {
3751            try {
3752              /**
3753               * Notify flush thread {@link DefaultMemStore#getSnapshotSegments} completed,
3754               * {@link DefaultMemStore#doClearSnapShot} could continue.
3755               */
3756              preClearSnapShotCyclicBarrier.await();
3757              /**
3758               * Wait for {@link DefaultMemStore#doClearSnapShot} completed.
3759               */
3760              postClearSnapShotCyclicBarrier.await();
3761
3762            } catch (Throwable e) {
3763              throw new RuntimeException(e);
3764            }
3765          }
3766        }
3767      }
3768      return result;
3769    }
3770
3771    @Override
3772    protected void doClearSnapShot() {
3773      if (Thread.currentThread().getName().equals(FLUSH_THREAD_NAME)) {
3774        int currentCount = clearSnapshotCounter.incrementAndGet();
3775        if (currentCount == 1) {
3776          try {
3777            if (
3778              ((ReentrantReadWriteLock) store.getStoreEngine().getLock())
3779                .isWriteLockedByCurrentThread()
3780            ) {
3781              shouldWait = false;
3782            }
3783            /**
3784             * Only when flush thread enters {@link DefaultMemStore#doClearSnapShot}, getScanner
3785             * thread could start.
3786             */
3787            getScannerCyclicBarrier.await();
3788
3789            if (shouldWait) {
3790              /**
3791               * Wait for {@link DefaultMemStore#getSnapshotSegments} completed.
3792               */
3793              preClearSnapShotCyclicBarrier.await();
3794            }
3795          } catch (Throwable e) {
3796            throw new RuntimeException(e);
3797          }
3798        }
3799      }
3800      super.doClearSnapShot();
3801
3802      if (Thread.currentThread().getName().equals(FLUSH_THREAD_NAME)) {
3803        int currentCount = clearSnapshotCounter.get();
3804        if (currentCount == 1) {
3805          if (shouldWait) {
3806            try {
3807              /**
3808               * Notify getScanner thread {@link DefaultMemStore#doClearSnapShot} completed,
3809               * {@link DefaultMemStore#getScanners} could continue.
3810               */
3811              postClearSnapShotCyclicBarrier.await();
3812            } catch (Throwable e) {
3813              throw new RuntimeException(e);
3814            }
3815          }
3816        }
3817      }
3818    }
3819  }
3820}