001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.io.hfile.CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY;
021import static org.apache.hadoop.hbase.io.hfile.CacheConfig.CACHE_DATA_ON_READ_KEY;
022import static org.apache.hadoop.hbase.io.hfile.CacheConfig.DEFAULT_CACHE_DATA_ON_READ;
023import static org.apache.hadoop.hbase.io.hfile.CacheConfig.DEFAULT_CACHE_DATA_ON_WRITE;
024import static org.apache.hadoop.hbase.io.hfile.CacheConfig.DEFAULT_EVICT_ON_CLOSE;
025import static org.apache.hadoop.hbase.io.hfile.CacheConfig.EVICT_BLOCKS_ON_CLOSE_KEY;
026import static org.apache.hadoop.hbase.regionserver.DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY;
027import static org.junit.Assert.assertArrayEquals;
028import static org.junit.Assert.assertEquals;
029import static org.junit.Assert.assertFalse;
030import static org.junit.Assert.assertNull;
031import static org.junit.Assert.assertTrue;
032import static org.junit.Assert.fail;
033import static org.mockito.ArgumentMatchers.any;
034import static org.mockito.Mockito.mock;
035import static org.mockito.Mockito.spy;
036import static org.mockito.Mockito.times;
037import static org.mockito.Mockito.verify;
038import static org.mockito.Mockito.when;
039
040import java.io.FileNotFoundException;
041import java.io.IOException;
042import java.lang.ref.SoftReference;
043import java.security.PrivilegedExceptionAction;
044import java.util.ArrayList;
045import java.util.Arrays;
046import java.util.Collection;
047import java.util.Collections;
048import java.util.Iterator;
049import java.util.List;
050import java.util.ListIterator;
051import java.util.NavigableSet;
052import java.util.Optional;
053import java.util.TreeSet;
054import java.util.concurrent.BrokenBarrierException;
055import java.util.concurrent.ConcurrentSkipListSet;
056import java.util.concurrent.CountDownLatch;
057import java.util.concurrent.CyclicBarrier;
058import java.util.concurrent.ExecutorService;
059import java.util.concurrent.Executors;
060import java.util.concurrent.Future;
061import java.util.concurrent.ThreadPoolExecutor;
062import java.util.concurrent.TimeUnit;
063import java.util.concurrent.atomic.AtomicBoolean;
064import java.util.concurrent.atomic.AtomicInteger;
065import java.util.concurrent.atomic.AtomicLong;
066import java.util.concurrent.atomic.AtomicReference;
067import java.util.concurrent.locks.ReentrantReadWriteLock;
068import java.util.function.Consumer;
069import java.util.function.IntBinaryOperator;
070import org.apache.hadoop.conf.Configuration;
071import org.apache.hadoop.fs.FSDataOutputStream;
072import org.apache.hadoop.fs.FileStatus;
073import org.apache.hadoop.fs.FileSystem;
074import org.apache.hadoop.fs.FilterFileSystem;
075import org.apache.hadoop.fs.LocalFileSystem;
076import org.apache.hadoop.fs.Path;
077import org.apache.hadoop.fs.permission.FsPermission;
078import org.apache.hadoop.hbase.Cell;
079import org.apache.hadoop.hbase.CellBuilderType;
080import org.apache.hadoop.hbase.CellComparator;
081import org.apache.hadoop.hbase.CellComparatorImpl;
082import org.apache.hadoop.hbase.CellUtil;
083import org.apache.hadoop.hbase.ExtendedCell;
084import org.apache.hadoop.hbase.ExtendedCellBuilderFactory;
085import org.apache.hadoop.hbase.HBaseClassTestRule;
086import org.apache.hadoop.hbase.HBaseConfiguration;
087import org.apache.hadoop.hbase.HBaseTestingUtil;
088import org.apache.hadoop.hbase.HConstants;
089import org.apache.hadoop.hbase.KeyValue;
090import org.apache.hadoop.hbase.MemoryCompactionPolicy;
091import org.apache.hadoop.hbase.NamespaceDescriptor;
092import org.apache.hadoop.hbase.PrivateCellUtil;
093import org.apache.hadoop.hbase.TableName;
094import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
095import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
096import org.apache.hadoop.hbase.client.Get;
097import org.apache.hadoop.hbase.client.RegionInfo;
098import org.apache.hadoop.hbase.client.RegionInfoBuilder;
099import org.apache.hadoop.hbase.client.Scan;
100import org.apache.hadoop.hbase.client.Scan.ReadType;
101import org.apache.hadoop.hbase.client.TableDescriptor;
102import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
103import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
104import org.apache.hadoop.hbase.filter.Filter;
105import org.apache.hadoop.hbase.filter.FilterBase;
106import org.apache.hadoop.hbase.io.compress.Compression;
107import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
108import org.apache.hadoop.hbase.io.hfile.CacheConfig;
109import org.apache.hadoop.hbase.io.hfile.HFile;
110import org.apache.hadoop.hbase.io.hfile.HFileContext;
111import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
112import org.apache.hadoop.hbase.monitoring.MonitoredTask;
113import org.apache.hadoop.hbase.nio.RefCnt;
114import org.apache.hadoop.hbase.quotas.RegionSizeStoreImpl;
115import org.apache.hadoop.hbase.regionserver.ChunkCreator.ChunkType;
116import org.apache.hadoop.hbase.regionserver.MemStoreCompactionStrategy.Action;
117import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
118import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
119import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
120import org.apache.hadoop.hbase.regionserver.compactions.EverythingPolicy;
121import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
122import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
123import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
124import org.apache.hadoop.hbase.security.User;
125import org.apache.hadoop.hbase.testclassification.MediumTests;
126import org.apache.hadoop.hbase.testclassification.RegionServerTests;
127import org.apache.hadoop.hbase.util.BloomFilterUtil;
128import org.apache.hadoop.hbase.util.Bytes;
129import org.apache.hadoop.hbase.util.CommonFSUtils;
130import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
131import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
132import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
133import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
134import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
135import org.apache.hadoop.hbase.wal.WALFactory;
136import org.apache.hadoop.util.Progressable;
137import org.junit.After;
138import org.junit.AfterClass;
139import org.junit.Before;
140import org.junit.ClassRule;
141import org.junit.Rule;
142import org.junit.Test;
143import org.junit.experimental.categories.Category;
144import org.junit.rules.TestName;
145import org.mockito.Mockito;
146import org.slf4j.Logger;
147import org.slf4j.LoggerFactory;
148
149import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
150
151/**
152 * Test class for the HStore
153 */
154@Category({ RegionServerTests.class, MediumTests.class })
155public class TestHStore {
156
157  @ClassRule
158  public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestHStore.class);
159
160  private static final Logger LOG = LoggerFactory.getLogger(TestHStore.class);
161  @Rule
162  public TestName name = new TestName();
163
164  HRegion region;
165  HStore store;
166  byte[] table = Bytes.toBytes("table");
167  byte[] family = Bytes.toBytes("family");
168
169  byte[] row = Bytes.toBytes("row");
170  byte[] row2 = Bytes.toBytes("row2");
171  byte[] qf1 = Bytes.toBytes("qf1");
172  byte[] qf2 = Bytes.toBytes("qf2");
173  byte[] qf3 = Bytes.toBytes("qf3");
174  byte[] qf4 = Bytes.toBytes("qf4");
175  byte[] qf5 = Bytes.toBytes("qf5");
176  byte[] qf6 = Bytes.toBytes("qf6");
177
178  NavigableSet<byte[]> qualifiers = new ConcurrentSkipListSet<>(Bytes.BYTES_COMPARATOR);
179
180  List<Cell> expected = new ArrayList<>();
181  List<Cell> result = new ArrayList<>();
182
183  long id = EnvironmentEdgeManager.currentTime();
184  Get get = new Get(row);
185
186  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
187  private static final String DIR = TEST_UTIL.getDataTestDir("TestStore").toString();
188
189  @Before
190  public void setUp() throws IOException {
191    qualifiers.clear();
192    qualifiers.add(qf1);
193    qualifiers.add(qf3);
194    qualifiers.add(qf5);
195
196    Iterator<byte[]> iter = qualifiers.iterator();
197    while (iter.hasNext()) {
198      byte[] next = iter.next();
199      expected.add(new KeyValue(row, family, next, 1, (byte[]) null));
200      get.addColumn(family, next);
201    }
202  }
203
204  private void init(String methodName) throws IOException {
205    init(methodName, TEST_UTIL.getConfiguration());
206  }
207
208  private HStore init(String methodName, Configuration conf) throws IOException {
209    // some of the tests write 4 versions and then flush
210    // (with HBASE-4241, lower versions are collected on flush)
211    return init(methodName, conf,
212      ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(4).build());
213  }
214
215  private HStore init(String methodName, Configuration conf, ColumnFamilyDescriptor hcd)
216    throws IOException {
217    return init(methodName, conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), hcd);
218  }
219
220  private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder,
221    ColumnFamilyDescriptor hcd) throws IOException {
222    return init(methodName, conf, builder, hcd, null);
223  }
224
225  private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder,
226    ColumnFamilyDescriptor hcd, MyStoreHook hook) throws IOException {
227    return init(methodName, conf, builder, hcd, hook, false);
228  }
229
230  private void initHRegion(String methodName, Configuration conf, TableDescriptorBuilder builder,
231    ColumnFamilyDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException {
232    TableDescriptor htd = builder.setColumnFamily(hcd).build();
233    Path basedir = new Path(DIR + methodName);
234    Path tableDir = CommonFSUtils.getTableDir(basedir, htd.getTableName());
235    final Path logdir = new Path(basedir, AbstractFSWALProvider.getWALDirectoryName(methodName));
236
237    FileSystem fs = FileSystem.get(conf);
238
239    fs.delete(logdir, true);
240    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false,
241      MemStoreLABImpl.CHUNK_SIZE_DEFAULT, 1, 0, null,
242      MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
243    RegionInfo info = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
244    Configuration walConf = new Configuration(conf);
245    CommonFSUtils.setRootDir(walConf, basedir);
246    WALFactory wals = new WALFactory(walConf, methodName);
247    region = new HRegion(new HRegionFileSystem(conf, fs, tableDir, info), wals.getWAL(info), conf,
248      htd, null);
249    region.regionServicesForStores = Mockito.spy(region.regionServicesForStores);
250    ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
251    Mockito.when(region.regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool);
252  }
253
254  private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder,
255    ColumnFamilyDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException {
256    initHRegion(methodName, conf, builder, hcd, hook, switchToPread);
257    if (hook == null) {
258      store = new HStore(region, hcd, conf, false);
259    } else {
260      store = new MyStore(region, hcd, conf, hook, switchToPread);
261    }
262    region.stores.put(store.getColumnFamilyDescriptor().getName(), store);
263    return store;
264  }
265
266  /**
267   * Test we do not lose data if we fail a flush and then close. Part of HBase-10466
268   */
269  @Test
270  public void testFlushSizeSizing() throws Exception {
271    LOG.info("Setting up a faulty file system that cannot write in " + this.name.getMethodName());
272    final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
273    // Only retry once.
274    conf.setInt("hbase.hstore.flush.retries.number", 1);
275    User user = User.createUserForTesting(conf, this.name.getMethodName(), new String[] { "foo" });
276    // Inject our faulty LocalFileSystem
277    conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
278    user.runAs(new PrivilegedExceptionAction<Object>() {
279      @Override
280      public Object run() throws Exception {
281        // Make sure it worked (above is sensitive to caching details in hadoop core)
282        FileSystem fs = FileSystem.get(conf);
283        assertEquals(FaultyFileSystem.class, fs.getClass());
284        FaultyFileSystem ffs = (FaultyFileSystem) fs;
285
286        // Initialize region
287        init(name.getMethodName(), conf);
288
289        MemStoreSize mss = store.memstore.getFlushableSize();
290        assertEquals(0, mss.getDataSize());
291        LOG.info("Adding some data");
292        MemStoreSizing kvSize = new NonThreadSafeMemStoreSizing();
293        store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), kvSize);
294        // add the heap size of active (mutable) segment
295        kvSize.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0, 0);
296        mss = store.memstore.getFlushableSize();
297        assertEquals(kvSize.getMemStoreSize(), mss);
298        // Flush. Bug #1 from HBASE-10466. Make sure size calculation on failed flush is right.
299        try {
300          LOG.info("Flushing");
301          flushStore(store, id++);
302          fail("Didn't bubble up IOE!");
303        } catch (IOException ioe) {
304          assertTrue(ioe.getMessage().contains("Fault injected"));
305        }
306        // due to snapshot, change mutable to immutable segment
307        kvSize.incMemStoreSize(0,
308          CSLMImmutableSegment.DEEP_OVERHEAD_CSLM - MutableSegment.DEEP_OVERHEAD, 0, 0);
309        mss = store.memstore.getFlushableSize();
310        assertEquals(kvSize.getMemStoreSize(), mss);
311        MemStoreSizing kvSize2 = new NonThreadSafeMemStoreSizing();
312        store.add(new KeyValue(row, family, qf2, 2, (byte[]) null), kvSize2);
313        kvSize2.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0, 0);
314        // Even though we add a new kv, we expect the flushable size to be 'same' since we have
315        // not yet cleared the snapshot -- the above flush failed.
316        assertEquals(kvSize.getMemStoreSize(), mss);
317        ffs.fault.set(false);
318        flushStore(store, id++);
319        mss = store.memstore.getFlushableSize();
320        // Size should be the foreground kv size.
321        assertEquals(kvSize2.getMemStoreSize(), mss);
322        flushStore(store, id++);
323        mss = store.memstore.getFlushableSize();
324        assertEquals(0, mss.getDataSize());
325        assertEquals(MutableSegment.DEEP_OVERHEAD, mss.getHeapSize());
326        return null;
327      }
328    });
329  }
330
331  @Test
332  public void testStoreBloomFilterMetricsWithBloomRowCol() throws IOException {
333    int numStoreFiles = 5;
334    writeAndRead(BloomType.ROWCOL, numStoreFiles);
335
336    assertEquals(0, store.getBloomFilterEligibleRequestsCount());
337    // hard to know exactly the numbers here, we are just trying to
338    // prove that they are incrementing
339    assertTrue(store.getBloomFilterRequestsCount() >= numStoreFiles);
340    assertTrue(store.getBloomFilterNegativeResultsCount() > 0);
341  }
342
343  @Test
344  public void testStoreBloomFilterMetricsWithBloomRow() throws IOException {
345    int numStoreFiles = 5;
346    writeAndRead(BloomType.ROWCOL, numStoreFiles);
347
348    assertEquals(0, store.getBloomFilterEligibleRequestsCount());
349    // hard to know exactly the numbers here, we are just trying to
350    // prove that they are incrementing
351    assertTrue(store.getBloomFilterRequestsCount() >= numStoreFiles);
352    assertTrue(store.getBloomFilterNegativeResultsCount() > 0);
353  }
354
355  @Test
356  public void testStoreBloomFilterMetricsWithBloomRowPrefix() throws IOException {
357    int numStoreFiles = 5;
358    writeAndRead(BloomType.ROWPREFIX_FIXED_LENGTH, numStoreFiles);
359
360    assertEquals(0, store.getBloomFilterEligibleRequestsCount());
361    // hard to know exactly the numbers here, we are just trying to
362    // prove that they are incrementing
363    assertTrue(store.getBloomFilterRequestsCount() >= numStoreFiles);
364  }
365
366  @Test
367  public void testStoreBloomFilterMetricsWithBloomNone() throws IOException {
368    int numStoreFiles = 5;
369    writeAndRead(BloomType.NONE, numStoreFiles);
370
371    assertEquals(0, store.getBloomFilterRequestsCount());
372    assertEquals(0, store.getBloomFilterNegativeResultsCount());
373
374    // hard to know exactly the numbers here, we are just trying to
375    // prove that they are incrementing
376    assertTrue(store.getBloomFilterEligibleRequestsCount() >= numStoreFiles);
377  }
378
379  private void writeAndRead(BloomType bloomType, int numStoreFiles) throws IOException {
380    Configuration conf = HBaseConfiguration.create();
381    FileSystem fs = FileSystem.get(conf);
382
383    ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.newBuilder(family)
384      .setCompressionType(Compression.Algorithm.GZ).setBloomFilterType(bloomType)
385      .setConfiguration(BloomFilterUtil.PREFIX_LENGTH_KEY, "3").build();
386    init(name.getMethodName(), conf, hcd);
387
388    for (int i = 1; i <= numStoreFiles; i++) {
389      byte[] row = Bytes.toBytes("row" + i);
390      LOG.info("Adding some data for the store file #" + i);
391      long timeStamp = EnvironmentEdgeManager.currentTime();
392      this.store.add(new KeyValue(row, family, qf1, timeStamp, (byte[]) null), null);
393      this.store.add(new KeyValue(row, family, qf2, timeStamp, (byte[]) null), null);
394      this.store.add(new KeyValue(row, family, qf3, timeStamp, (byte[]) null), null);
395      flush(i);
396    }
397
398    // Verify the total number of store files
399    assertEquals(numStoreFiles, this.store.getStorefiles().size());
400
401    TreeSet<byte[]> columns = new TreeSet<>(Bytes.BYTES_COMPARATOR);
402    columns.add(qf1);
403
404    for (int i = 1; i <= numStoreFiles; i++) {
405      KeyValueScanner scanner =
406        store.getScanner(new Scan(new Get(Bytes.toBytes("row" + i))), columns, 0);
407      scanner.peek();
408    }
409  }
410
411  /**
412   * Verify that compression and data block encoding are respected by the createWriter method, used
413   * on store flush.
414   */
415  @Test
416  public void testCreateWriter() throws Exception {
417    Configuration conf = HBaseConfiguration.create();
418    FileSystem fs = FileSystem.get(conf);
419
420    ColumnFamilyDescriptor hcd =
421      ColumnFamilyDescriptorBuilder.newBuilder(family).setCompressionType(Compression.Algorithm.GZ)
422        .setDataBlockEncoding(DataBlockEncoding.DIFF).build();
423    init(name.getMethodName(), conf, hcd);
424
425    // Test createWriter
426    StoreFileWriter writer = store.getStoreEngine()
427      .createWriter(CreateStoreFileWriterParams.create().maxKeyCount(4)
428        .compression(hcd.getCompressionType()).isCompaction(false).includeMVCCReadpoint(true)
429        .includesTag(false).shouldDropBehind(false));
430    Path path = writer.getPath();
431    writer.append(new KeyValue(row, family, qf1, Bytes.toBytes(1)));
432    writer.append(new KeyValue(row, family, qf2, Bytes.toBytes(2)));
433    writer.append(new KeyValue(row2, family, qf1, Bytes.toBytes(3)));
434    writer.append(new KeyValue(row2, family, qf2, Bytes.toBytes(4)));
435    writer.close();
436
437    // Verify that compression and encoding settings are respected
438    HFile.Reader reader = HFile.createReader(fs, path, new CacheConfig(conf), true, conf);
439    assertEquals(hcd.getCompressionType(), reader.getTrailer().getCompressionCodec());
440    assertEquals(hcd.getDataBlockEncoding(), reader.getDataBlockEncoding());
441    reader.close();
442  }
443
444  @Test
445  public void testDeleteExpiredStoreFiles() throws Exception {
446    testDeleteExpiredStoreFiles(0);
447    testDeleteExpiredStoreFiles(1);
448  }
449
450  /**
451   * @param minVersions the MIN_VERSIONS for the column family
452   */
453  public void testDeleteExpiredStoreFiles(int minVersions) throws Exception {
454    int storeFileNum = 4;
455    int ttl = 4;
456    IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
457    EnvironmentEdgeManagerTestHelper.injectEdge(edge);
458
459    Configuration conf = HBaseConfiguration.create();
460    // Enable the expired store file deletion
461    conf.setBoolean("hbase.store.delete.expired.storefile", true);
462    // Set the compaction threshold higher to avoid normal compactions.
463    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 5);
464
465    init(name.getMethodName() + "-" + minVersions, conf, ColumnFamilyDescriptorBuilder
466      .newBuilder(family).setMinVersions(minVersions).setTimeToLive(ttl).build());
467
468    long storeTtl = this.store.getScanInfo().getTtl();
469    long sleepTime = storeTtl / storeFileNum;
470    long timeStamp;
471    // There are 4 store files and the max time stamp difference among these
472    // store files will be (this.store.ttl / storeFileNum)
473    for (int i = 1; i <= storeFileNum; i++) {
474      LOG.info("Adding some data for the store file #" + i);
475      timeStamp = EnvironmentEdgeManager.currentTime();
476      this.store.add(new KeyValue(row, family, qf1, timeStamp, (byte[]) null), null);
477      this.store.add(new KeyValue(row, family, qf2, timeStamp, (byte[]) null), null);
478      this.store.add(new KeyValue(row, family, qf3, timeStamp, (byte[]) null), null);
479      flush(i);
480      edge.incrementTime(sleepTime);
481    }
482
483    // Verify the total number of store files
484    assertEquals(storeFileNum, this.store.getStorefiles().size());
485
486    // Each call will find one expired store file and delete it before compaction happens.
487    // There will be no compaction due to threshold above. Last file will not be replaced.
488    for (int i = 1; i <= storeFileNum - 1; i++) {
489      // verify the expired store file.
490      assertFalse(this.store.requestCompaction().isPresent());
491      Collection<HStoreFile> sfs = this.store.getStorefiles();
492      // Ensure i files are gone.
493      if (minVersions == 0) {
494        assertEquals(storeFileNum - i, sfs.size());
495        // Ensure only non-expired files remain.
496        for (HStoreFile sf : sfs) {
497          assertTrue(sf.getReader().getMaxTimestamp() >= (edge.currentTime() - storeTtl));
498        }
499      } else {
500        assertEquals(storeFileNum, sfs.size());
501      }
502      // Let the next store file expired.
503      edge.incrementTime(sleepTime);
504    }
505    assertFalse(this.store.requestCompaction().isPresent());
506
507    Collection<HStoreFile> sfs = this.store.getStorefiles();
508    // Assert the last expired file is not removed.
509    if (minVersions == 0) {
510      assertEquals(1, sfs.size());
511    }
512    long ts = sfs.iterator().next().getReader().getMaxTimestamp();
513    assertTrue(ts < (edge.currentTime() - storeTtl));
514
515    for (HStoreFile sf : sfs) {
516      sf.closeStoreFile(true);
517    }
518  }
519
520  @Test
521  public void testLowestModificationTime() throws Exception {
522    Configuration conf = HBaseConfiguration.create();
523    FileSystem fs = FileSystem.get(conf);
524    // Initialize region
525    init(name.getMethodName(), conf);
526
527    int storeFileNum = 4;
528    for (int i = 1; i <= storeFileNum; i++) {
529      LOG.info("Adding some data for the store file #" + i);
530      this.store.add(new KeyValue(row, family, qf1, i, (byte[]) null), null);
531      this.store.add(new KeyValue(row, family, qf2, i, (byte[]) null), null);
532      this.store.add(new KeyValue(row, family, qf3, i, (byte[]) null), null);
533      flush(i);
534    }
535    // after flush; check the lowest time stamp
536    long lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
537    long lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
538    assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS);
539
540    // after compact; check the lowest time stamp
541    store.compact(store.requestCompaction().get(), NoLimitThroughputController.INSTANCE, null);
542    lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
543    lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
544    assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS);
545  }
546
547  private static long getLowestTimeStampFromFS(FileSystem fs,
548    final Collection<HStoreFile> candidates) throws IOException {
549    long minTs = Long.MAX_VALUE;
550    if (candidates.isEmpty()) {
551      return minTs;
552    }
553    Path[] p = new Path[candidates.size()];
554    int i = 0;
555    for (HStoreFile sf : candidates) {
556      p[i] = sf.getPath();
557      ++i;
558    }
559
560    FileStatus[] stats = fs.listStatus(p);
561    if (stats == null || stats.length == 0) {
562      return minTs;
563    }
564    for (FileStatus s : stats) {
565      minTs = Math.min(minTs, s.getModificationTime());
566    }
567    return minTs;
568  }
569
570  //////////////////////////////////////////////////////////////////////////////
571  // Get tests
572  //////////////////////////////////////////////////////////////////////////////
573
574  private static final int BLOCKSIZE_SMALL = 8192;
575
576  /**
577   * Test for hbase-1686.
578   */
579  @Test
580  public void testEmptyStoreFile() throws IOException {
581    init(this.name.getMethodName());
582    // Write a store file.
583    this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
584    this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null);
585    flush(1);
586    // Now put in place an empty store file. Its a little tricky. Have to
587    // do manually with hacked in sequence id.
588    HStoreFile f = this.store.getStorefiles().iterator().next();
589    Path storedir = f.getPath().getParent();
590    long seqid = f.getMaxSequenceId();
591    Configuration c = HBaseConfiguration.create();
592    FileSystem fs = FileSystem.get(c);
593    HFileContext meta = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build();
594    StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c), fs)
595      .withOutputDir(storedir).withFileContext(meta).build();
596    w.appendMetadata(seqid + 1, false);
597    w.close();
598    this.store.close();
599    // Reopen it... should pick up two files
600    this.store =
601      new HStore(this.store.getHRegion(), this.store.getColumnFamilyDescriptor(), c, false);
602    assertEquals(2, this.store.getStorefilesCount());
603
604    result = HBaseTestingUtil.getFromStoreFile(store, get.getRow(), qualifiers);
605    assertEquals(1, result.size());
606  }
607
608  /**
609   * Getting data from memstore only
610   */
611  @Test
612  public void testGet_FromMemStoreOnly() throws IOException {
613    init(this.name.getMethodName());
614
615    // Put data in memstore
616    this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
617    this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null);
618    this.store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null);
619    this.store.add(new KeyValue(row, family, qf4, 1, (byte[]) null), null);
620    this.store.add(new KeyValue(row, family, qf5, 1, (byte[]) null), null);
621    this.store.add(new KeyValue(row, family, qf6, 1, (byte[]) null), null);
622
623    // Get
624    result = HBaseTestingUtil.getFromStoreFile(store, get.getRow(), qualifiers);
625
626    // Compare
627    assertCheck();
628  }
629
630  @Test
631  public void testTimeRangeIfSomeCellsAreDroppedInFlush() throws IOException {
632    testTimeRangeIfSomeCellsAreDroppedInFlush(1);
633    testTimeRangeIfSomeCellsAreDroppedInFlush(3);
634    testTimeRangeIfSomeCellsAreDroppedInFlush(5);
635  }
636
637  private void testTimeRangeIfSomeCellsAreDroppedInFlush(int maxVersion) throws IOException {
638    init(this.name.getMethodName(), TEST_UTIL.getConfiguration(),
639      ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(maxVersion).build());
640    long currentTs = 100;
641    long minTs = currentTs;
642    // the extra cell won't be flushed to disk,
643    // so the min of timerange will be different between memStore and hfile.
644    for (int i = 0; i != (maxVersion + 1); ++i) {
645      this.store.add(new KeyValue(row, family, qf1, ++currentTs, (byte[]) null), null);
646      if (i == 1) {
647        minTs = currentTs;
648      }
649    }
650    flushStore(store, id++);
651
652    Collection<HStoreFile> files = store.getStorefiles();
653    assertEquals(1, files.size());
654    HStoreFile f = files.iterator().next();
655    f.initReader();
656    StoreFileReader reader = f.getReader();
657    assertEquals(minTs, reader.timeRange.getMin());
658    assertEquals(currentTs, reader.timeRange.getMax());
659  }
660
661  /**
662   * Getting data from files only
663   */
664  @Test
665  public void testGet_FromFilesOnly() throws IOException {
666    init(this.name.getMethodName());
667
668    // Put data in memstore
669    this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
670    this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null);
671    // flush
672    flush(1);
673
674    // Add more data
675    this.store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null);
676    this.store.add(new KeyValue(row, family, qf4, 1, (byte[]) null), null);
677    // flush
678    flush(2);
679
680    // Add more data
681    this.store.add(new KeyValue(row, family, qf5, 1, (byte[]) null), null);
682    this.store.add(new KeyValue(row, family, qf6, 1, (byte[]) null), null);
683    // flush
684    flush(3);
685
686    // Get
687    result = HBaseTestingUtil.getFromStoreFile(store, get.getRow(), qualifiers);
688    // this.store.get(get, qualifiers, result);
689
690    // Need to sort the result since multiple files
691    Collections.sort(result, CellComparatorImpl.COMPARATOR);
692
693    // Compare
694    assertCheck();
695  }
696
697  /**
698   * Getting data from memstore and files
699   */
700  @Test
701  public void testGet_FromMemStoreAndFiles() throws IOException {
702    init(this.name.getMethodName());
703
704    // Put data in memstore
705    this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
706    this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null);
707    // flush
708    flush(1);
709
710    // Add more data
711    this.store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null);
712    this.store.add(new KeyValue(row, family, qf4, 1, (byte[]) null), null);
713    // flush
714    flush(2);
715
716    // Add more data
717    this.store.add(new KeyValue(row, family, qf5, 1, (byte[]) null), null);
718    this.store.add(new KeyValue(row, family, qf6, 1, (byte[]) null), null);
719
720    // Get
721    result = HBaseTestingUtil.getFromStoreFile(store, get.getRow(), qualifiers);
722
723    // Need to sort the result since multiple files
724    Collections.sort(result, CellComparatorImpl.COMPARATOR);
725
726    // Compare
727    assertCheck();
728  }
729
730  private void flush(int storeFilessize) throws IOException {
731    flushStore(store, id++);
732    assertEquals(storeFilessize, this.store.getStorefiles().size());
733    assertEquals(0, ((AbstractMemStore) this.store.memstore).getActive().getCellsCount());
734  }
735
736  private void assertCheck() {
737    assertEquals(expected.size(), result.size());
738    for (int i = 0; i < expected.size(); i++) {
739      assertEquals(expected.get(i), result.get(i));
740    }
741  }
742
743  @After
744  public void tearDown() throws Exception {
745    EnvironmentEdgeManagerTestHelper.reset();
746    if (store != null) {
747      try {
748        store.close();
749      } catch (IOException e) {
750      }
751      store = null;
752    }
753    if (region != null) {
754      region.close();
755      region = null;
756    }
757  }
758
759  @AfterClass
760  public static void tearDownAfterClass() throws IOException {
761    TEST_UTIL.cleanupTestDir();
762  }
763
764  @Test
765  public void testHandleErrorsInFlush() throws Exception {
766    LOG.info("Setting up a faulty file system that cannot write");
767
768    final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
769    User user = User.createUserForTesting(conf, "testhandleerrorsinflush", new String[] { "foo" });
770    // Inject our faulty LocalFileSystem
771    conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
772    user.runAs(new PrivilegedExceptionAction<Object>() {
773      @Override
774      public Object run() throws Exception {
775        // Make sure it worked (above is sensitive to caching details in hadoop core)
776        FileSystem fs = FileSystem.get(conf);
777        assertEquals(FaultyFileSystem.class, fs.getClass());
778
779        // Initialize region
780        init(name.getMethodName(), conf);
781
782        LOG.info("Adding some data");
783        store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
784        store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null);
785        store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null);
786
787        LOG.info("Before flush, we should have no files");
788
789        Collection<StoreFileInfo> files =
790          store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName());
791        assertEquals(0, files != null ? files.size() : 0);
792
793        // flush
794        try {
795          LOG.info("Flushing");
796          flush(1);
797          fail("Didn't bubble up IOE!");
798        } catch (IOException ioe) {
799          assertTrue(ioe.getMessage().contains("Fault injected"));
800        }
801
802        LOG.info("After failed flush, we should still have no files!");
803        files = store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName());
804        assertEquals(0, files != null ? files.size() : 0);
805        store.getHRegion().getWAL().close();
806        return null;
807      }
808    });
809    FileSystem.closeAllForUGI(user.getUGI());
810  }
811
812  /**
813   * Faulty file system that will fail if you write past its fault position the FIRST TIME only;
814   * thereafter it will succeed. Used by {@link TestHRegion} too.
815   */
816  static class FaultyFileSystem extends FilterFileSystem {
817    List<SoftReference<FaultyOutputStream>> outStreams = new ArrayList<>();
818    private long faultPos = 200;
819    AtomicBoolean fault = new AtomicBoolean(true);
820
821    public FaultyFileSystem() {
822      super(new LocalFileSystem());
823      LOG.info("Creating faulty!");
824    }
825
826    @Override
827    public FSDataOutputStream create(Path p) throws IOException {
828      return new FaultyOutputStream(super.create(p), faultPos, fault);
829    }
830
831    @Override
832    public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite,
833      int bufferSize, short replication, long blockSize, Progressable progress) throws IOException {
834      return new FaultyOutputStream(
835        super.create(f, permission, overwrite, bufferSize, replication, blockSize, progress),
836        faultPos, fault);
837    }
838
839    @Override
840    public FSDataOutputStream createNonRecursive(Path f, boolean overwrite, int bufferSize,
841      short replication, long blockSize, Progressable progress) throws IOException {
842      // Fake it. Call create instead. The default implementation throws an IOE
843      // that this is not supported.
844      return create(f, overwrite, bufferSize, replication, blockSize, progress);
845    }
846  }
847
848  static class FaultyOutputStream extends FSDataOutputStream {
849    volatile long faultPos = Long.MAX_VALUE;
850    private final AtomicBoolean fault;
851
852    public FaultyOutputStream(FSDataOutputStream out, long faultPos, final AtomicBoolean fault)
853      throws IOException {
854      super(out, null);
855      this.faultPos = faultPos;
856      this.fault = fault;
857    }
858
859    @Override
860    public synchronized void write(byte[] buf, int offset, int length) throws IOException {
861      LOG.info("faulty stream write at pos " + getPos());
862      injectFault();
863      super.write(buf, offset, length);
864    }
865
866    private void injectFault() throws IOException {
867      if (this.fault.get() && getPos() >= faultPos) {
868        throw new IOException("Fault injected");
869      }
870    }
871  }
872
873  private static StoreFlushContext flushStore(HStore store, long id) throws IOException {
874    StoreFlushContext storeFlushCtx = store.createFlushContext(id, FlushLifeCycleTracker.DUMMY);
875    storeFlushCtx.prepare();
876    storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
877    storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
878    return storeFlushCtx;
879  }
880
881  /**
882   * Generate a list of KeyValues for testing based on given parameters
883   * @return the rows key-value list
884   */
885  private List<ExtendedCell> getKeyValueSet(long[] timestamps, int numRows, byte[] qualifier,
886    byte[] family) {
887    List<ExtendedCell> kvList = new ArrayList<>();
888    for (int i = 1; i <= numRows; i++) {
889      byte[] b = Bytes.toBytes(i);
890      for (long timestamp : timestamps) {
891        kvList.add(new KeyValue(b, family, qualifier, timestamp, b));
892      }
893    }
894    return kvList;
895  }
896
897  /**
898   * Test to ensure correctness when using Stores with multiple timestamps
899   */
900  @Test
901  public void testMultipleTimestamps() throws IOException {
902    int numRows = 1;
903    long[] timestamps1 = new long[] { 1, 5, 10, 20 };
904    long[] timestamps2 = new long[] { 30, 80 };
905
906    init(this.name.getMethodName());
907
908    List<ExtendedCell> kvList1 = getKeyValueSet(timestamps1, numRows, qf1, family);
909    for (ExtendedCell kv : kvList1) {
910      this.store.add(kv, null);
911    }
912
913    flushStore(store, id++);
914
915    List<ExtendedCell> kvList2 = getKeyValueSet(timestamps2, numRows, qf1, family);
916    for (ExtendedCell kv : kvList2) {
917      this.store.add(kv, null);
918    }
919
920    List<Cell> result;
921    Get get = new Get(Bytes.toBytes(1));
922    get.addColumn(family, qf1);
923
924    get.setTimeRange(0, 15);
925    result = HBaseTestingUtil.getFromStoreFile(store, get);
926    assertTrue(result.size() > 0);
927
928    get.setTimeRange(40, 90);
929    result = HBaseTestingUtil.getFromStoreFile(store, get);
930    assertTrue(result.size() > 0);
931
932    get.setTimeRange(10, 45);
933    result = HBaseTestingUtil.getFromStoreFile(store, get);
934    assertTrue(result.size() > 0);
935
936    get.setTimeRange(80, 145);
937    result = HBaseTestingUtil.getFromStoreFile(store, get);
938    assertTrue(result.size() > 0);
939
940    get.setTimeRange(1, 2);
941    result = HBaseTestingUtil.getFromStoreFile(store, get);
942    assertTrue(result.size() > 0);
943
944    get.setTimeRange(90, 200);
945    result = HBaseTestingUtil.getFromStoreFile(store, get);
946    assertTrue(result.size() == 0);
947  }
948
949  /**
950   * Test for HBASE-3492 - Test split on empty colfam (no store files).
951   * @throws IOException When the IO operations fail.
952   */
953  @Test
954  public void testSplitWithEmptyColFam() throws IOException {
955    init(this.name.getMethodName());
956    assertFalse(store.getSplitPoint().isPresent());
957  }
958
959  @Test
960  public void testStoreUsesConfigurationFromHcdAndHtd() throws Exception {
961    final String CONFIG_KEY = "hbase.regionserver.thread.compaction.throttle";
962    long anyValue = 10;
963
964    // We'll check that it uses correct config and propagates it appropriately by going thru
965    // the simplest "real" path I can find - "throttleCompaction", which just checks whether
966    // a number we pass in is higher than some config value, inside compactionPolicy.
967    Configuration conf = HBaseConfiguration.create();
968    conf.setLong(CONFIG_KEY, anyValue);
969    init(name.getMethodName() + "-xml", conf);
970    assertTrue(store.throttleCompaction(anyValue + 1));
971    assertFalse(store.throttleCompaction(anyValue));
972
973    // HTD overrides XML.
974    --anyValue;
975    init(
976      name.getMethodName() + "-htd", conf, TableDescriptorBuilder
977        .newBuilder(TableName.valueOf(table)).setValue(CONFIG_KEY, Long.toString(anyValue)),
978      ColumnFamilyDescriptorBuilder.of(family));
979    assertTrue(store.throttleCompaction(anyValue + 1));
980    assertFalse(store.throttleCompaction(anyValue));
981
982    // HCD overrides them both.
983    --anyValue;
984    init(name.getMethodName() + "-hcd", conf,
985      TableDescriptorBuilder.newBuilder(TableName.valueOf(table)).setValue(CONFIG_KEY,
986        Long.toString(anyValue)),
987      ColumnFamilyDescriptorBuilder.newBuilder(family).setValue(CONFIG_KEY, Long.toString(anyValue))
988        .build());
989    assertTrue(store.throttleCompaction(anyValue + 1));
990    assertFalse(store.throttleCompaction(anyValue));
991  }
992
993  public static class DummyStoreEngine extends DefaultStoreEngine {
994    public static DefaultCompactor lastCreatedCompactor = null;
995
996    @Override
997    protected void createComponents(Configuration conf, HStore store, CellComparator comparator)
998      throws IOException {
999      super.createComponents(conf, store, comparator);
1000      lastCreatedCompactor = this.compactor;
1001    }
1002  }
1003
1004  @Test
1005  public void testStoreUsesSearchEngineOverride() throws Exception {
1006    Configuration conf = HBaseConfiguration.create();
1007    conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DummyStoreEngine.class.getName());
1008    init(this.name.getMethodName(), conf);
1009    assertEquals(DummyStoreEngine.lastCreatedCompactor, this.store.storeEngine.getCompactor());
1010  }
1011
1012  private void addStoreFile() throws IOException {
1013    HStoreFile f = this.store.getStorefiles().iterator().next();
1014    Path storedir = f.getPath().getParent();
1015    long seqid = this.store.getMaxSequenceId().orElse(0L);
1016    Configuration c = TEST_UTIL.getConfiguration();
1017    FileSystem fs = FileSystem.get(c);
1018    HFileContext fileContext = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build();
1019    StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c), fs)
1020      .withOutputDir(storedir).withFileContext(fileContext).build();
1021    w.appendMetadata(seqid + 1, false);
1022    w.close();
1023    LOG.info("Added store file:" + w.getPath());
1024  }
1025
1026  private void archiveStoreFile(int index) throws IOException {
1027    Collection<HStoreFile> files = this.store.getStorefiles();
1028    HStoreFile sf = null;
1029    Iterator<HStoreFile> it = files.iterator();
1030    for (int i = 0; i <= index; i++) {
1031      sf = it.next();
1032    }
1033    store.getRegionFileSystem().removeStoreFiles(store.getColumnFamilyName(),
1034      Lists.newArrayList(sf));
1035  }
1036
1037  private void closeCompactedFile(int index) throws IOException {
1038    Collection<HStoreFile> files =
1039      this.store.getStoreEngine().getStoreFileManager().getCompactedfiles();
1040    if (files.size() > 0) {
1041      HStoreFile sf = null;
1042      Iterator<HStoreFile> it = files.iterator();
1043      for (int i = 0; i <= index; i++) {
1044        sf = it.next();
1045      }
1046      sf.closeStoreFile(true);
1047      store.getStoreEngine().getStoreFileManager()
1048        .removeCompactedFiles(Collections.singletonList(sf));
1049    }
1050  }
1051
1052  @Test
1053  public void testRefreshStoreFiles() throws Exception {
1054    init(name.getMethodName());
1055
1056    assertEquals(0, this.store.getStorefilesCount());
1057
1058    // Test refreshing store files when no store files are there
1059    store.refreshStoreFiles();
1060    assertEquals(0, this.store.getStorefilesCount());
1061
1062    // add some data, flush
1063    this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
1064    flush(1);
1065    assertEquals(1, this.store.getStorefilesCount());
1066
1067    // add one more file
1068    addStoreFile();
1069
1070    assertEquals(1, this.store.getStorefilesCount());
1071    store.refreshStoreFiles();
1072    assertEquals(2, this.store.getStorefilesCount());
1073
1074    // add three more files
1075    addStoreFile();
1076    addStoreFile();
1077    addStoreFile();
1078
1079    assertEquals(2, this.store.getStorefilesCount());
1080    store.refreshStoreFiles();
1081    assertEquals(5, this.store.getStorefilesCount());
1082
1083    closeCompactedFile(0);
1084    archiveStoreFile(0);
1085
1086    assertEquals(5, this.store.getStorefilesCount());
1087    store.refreshStoreFiles();
1088    assertEquals(4, this.store.getStorefilesCount());
1089
1090    archiveStoreFile(0);
1091    archiveStoreFile(1);
1092    archiveStoreFile(2);
1093
1094    assertEquals(4, this.store.getStorefilesCount());
1095    store.refreshStoreFiles();
1096    assertEquals(1, this.store.getStorefilesCount());
1097
1098    archiveStoreFile(0);
1099    store.refreshStoreFiles();
1100    assertEquals(0, this.store.getStorefilesCount());
1101  }
1102
1103  @Test
1104  public void testRefreshStoreFilesNotChanged() throws IOException {
1105    init(name.getMethodName());
1106
1107    assertEquals(0, this.store.getStorefilesCount());
1108
1109    // add some data, flush
1110    this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
1111    flush(1);
1112    // add one more file
1113    addStoreFile();
1114
1115    StoreEngine<?, ?, ?, ?> spiedStoreEngine = spy(store.getStoreEngine());
1116
1117    // call first time after files changed
1118    spiedStoreEngine.refreshStoreFiles();
1119    assertEquals(2, this.store.getStorefilesCount());
1120    verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any(), any(), any());
1121
1122    // call second time
1123    spiedStoreEngine.refreshStoreFiles();
1124
1125    // ensure that replaceStoreFiles is not called, i.e, the times does not change, if files are not
1126    // refreshed,
1127    verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any(), any(), any());
1128  }
1129
1130  @Test
1131  public void testScanWithCompactionAfterFlush() throws Exception {
1132    TEST_UTIL.getConfiguration().set(DEFAULT_COMPACTION_POLICY_CLASS_KEY,
1133      EverythingPolicy.class.getName());
1134    init(name.getMethodName());
1135
1136    assertEquals(0, this.store.getStorefilesCount());
1137
1138    KeyValue kv = new KeyValue(row, family, qf1, 1, (byte[]) null);
1139    // add some data, flush
1140    this.store.add(kv, null);
1141    flush(1);
1142    kv = new KeyValue(row, family, qf2, 1, (byte[]) null);
1143    // add some data, flush
1144    this.store.add(kv, null);
1145    flush(2);
1146    kv = new KeyValue(row, family, qf3, 1, (byte[]) null);
1147    // add some data, flush
1148    this.store.add(kv, null);
1149    flush(3);
1150
1151    ExecutorService service = Executors.newFixedThreadPool(2);
1152
1153    Scan scan = new Scan(new Get(row));
1154    Future<KeyValueScanner> scanFuture = service.submit(() -> {
1155      try {
1156        LOG.info(">>>> creating scanner");
1157        return this.store.createScanner(scan,
1158          new ScanInfo(HBaseConfiguration.create(),
1159            ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(4).build(),
1160            Long.MAX_VALUE, 0, CellComparator.getInstance()),
1161          scan.getFamilyMap().get(store.getColumnFamilyDescriptor().getName()), 0);
1162      } catch (IOException e) {
1163        e.printStackTrace();
1164        return null;
1165      }
1166    });
1167    Future compactFuture = service.submit(() -> {
1168      try {
1169        LOG.info(">>>>>> starting compaction");
1170        Optional<CompactionContext> opCompaction = this.store.requestCompaction();
1171        assertTrue(opCompaction.isPresent());
1172        store.compact(opCompaction.get(), new NoLimitThroughputController(), User.getCurrent());
1173        LOG.info(">>>>>> Compaction is finished");
1174        this.store.closeAndArchiveCompactedFiles();
1175        LOG.info(">>>>>> Compacted files deleted");
1176      } catch (IOException e) {
1177        e.printStackTrace();
1178      }
1179    });
1180
1181    KeyValueScanner kvs = scanFuture.get();
1182    compactFuture.get();
1183    ((StoreScanner) kvs).currentScanners.forEach(s -> {
1184      if (s instanceof StoreFileScanner) {
1185        assertEquals(1, ((StoreFileScanner) s).getReader().getRefCount());
1186      }
1187    });
1188    kvs.seek(kv);
1189    service.shutdownNow();
1190  }
1191
1192  private long countMemStoreScanner(StoreScanner scanner) {
1193    if (scanner.currentScanners == null) {
1194      return 0;
1195    }
1196    return scanner.currentScanners.stream().filter(s -> !s.isFileScanner()).count();
1197  }
1198
1199  @Test
1200  public void testNumberOfMemStoreScannersAfterFlush() throws IOException {
1201    long seqId = 100;
1202    long timestamp = EnvironmentEdgeManager.currentTime();
1203    ExtendedCell cell0 =
1204      ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family)
1205        .setQualifier(qf1).setTimestamp(timestamp).setType(Cell.Type.Put).setValue(qf1).build();
1206    PrivateCellUtil.setSequenceId(cell0, seqId);
1207    testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Collections.emptyList());
1208
1209    ExtendedCell cell1 =
1210      ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family)
1211        .setQualifier(qf2).setTimestamp(timestamp).setType(Cell.Type.Put).setValue(qf1).build();
1212    PrivateCellUtil.setSequenceId(cell1, seqId);
1213    testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1));
1214
1215    seqId = 101;
1216    timestamp = EnvironmentEdgeManager.currentTime();
1217    ExtendedCell cell2 =
1218      ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row2).setFamily(family)
1219        .setQualifier(qf2).setTimestamp(timestamp).setType(Cell.Type.Put).setValue(qf1).build();
1220    PrivateCellUtil.setSequenceId(cell2, seqId);
1221    testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1, cell2));
1222  }
1223
1224  private void testNumberOfMemStoreScannersAfterFlush(List<ExtendedCell> inputCellsBeforeSnapshot,
1225    List<ExtendedCell> inputCellsAfterSnapshot) throws IOException {
1226    init(this.name.getMethodName() + "-" + inputCellsBeforeSnapshot.size());
1227    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1228    long seqId = Long.MIN_VALUE;
1229    for (ExtendedCell c : inputCellsBeforeSnapshot) {
1230      quals.add(CellUtil.cloneQualifier(c));
1231      seqId = Math.max(seqId, c.getSequenceId());
1232    }
1233    for (ExtendedCell c : inputCellsAfterSnapshot) {
1234      quals.add(CellUtil.cloneQualifier(c));
1235      seqId = Math.max(seqId, c.getSequenceId());
1236    }
1237    inputCellsBeforeSnapshot.forEach(c -> store.add(c, null));
1238    StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY);
1239    storeFlushCtx.prepare();
1240    inputCellsAfterSnapshot.forEach(c -> store.add(c, null));
1241    int numberOfMemScannersBeforeFlush = inputCellsAfterSnapshot.isEmpty() ? 1 : 2;
1242    try (StoreScanner s = (StoreScanner) store.getScanner(new Scan(), quals, seqId)) {
1243      // snapshot + active (if inputCellsAfterSnapshot isn't empty)
1244      assertEquals(numberOfMemScannersBeforeFlush, countMemStoreScanner(s));
1245      storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1246      storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1247      // snapshot has no data after flush
1248      int numberOfMemScannersAfterFlush = inputCellsAfterSnapshot.isEmpty() ? 0 : 1;
1249      boolean more;
1250      int cellCount = 0;
1251      do {
1252        List<Cell> cells = new ArrayList<>();
1253        more = s.next(cells);
1254        cellCount += cells.size();
1255        assertEquals(more ? numberOfMemScannersAfterFlush : 0, countMemStoreScanner(s));
1256      } while (more);
1257      assertEquals(
1258        "The number of cells added before snapshot is " + inputCellsBeforeSnapshot.size()
1259          + ", The number of cells added after snapshot is " + inputCellsAfterSnapshot.size(),
1260        inputCellsBeforeSnapshot.size() + inputCellsAfterSnapshot.size(), cellCount);
1261      // the current scanners is cleared
1262      assertEquals(0, countMemStoreScanner(s));
1263    }
1264  }
1265
1266  private ExtendedCell createCell(byte[] qualifier, long ts, long sequenceId, byte[] value)
1267    throws IOException {
1268    return createCell(row, qualifier, ts, sequenceId, value);
1269  }
1270
1271  private ExtendedCell createCell(byte[] row, byte[] qualifier, long ts, long sequenceId,
1272    byte[] value) throws IOException {
1273    return ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row)
1274      .setFamily(family).setQualifier(qualifier).setTimestamp(ts).setType(Cell.Type.Put)
1275      .setValue(value).setSequenceId(sequenceId).build();
1276  }
1277
1278  @Test
1279  public void testFlushBeforeCompletingScanWoFilter() throws IOException, InterruptedException {
1280    final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false);
1281    final int expectedSize = 3;
1282    testFlushBeforeCompletingScan(new MyListHook() {
1283      @Override
1284      public void hook(int currentSize) {
1285        if (currentSize == expectedSize - 1) {
1286          try {
1287            flushStore(store, id++);
1288            timeToGoNextRow.set(true);
1289          } catch (IOException e) {
1290            throw new RuntimeException(e);
1291          }
1292        }
1293      }
1294    }, new FilterBase() {
1295      @Override
1296      public Filter.ReturnCode filterCell(final Cell c) throws IOException {
1297        return ReturnCode.INCLUDE;
1298      }
1299    }, expectedSize);
1300  }
1301
1302  @Test
1303  public void testFlushBeforeCompletingScanWithFilter() throws IOException, InterruptedException {
1304    final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false);
1305    final int expectedSize = 2;
1306    testFlushBeforeCompletingScan(new MyListHook() {
1307      @Override
1308      public void hook(int currentSize) {
1309        if (currentSize == expectedSize - 1) {
1310          try {
1311            flushStore(store, id++);
1312            timeToGoNextRow.set(true);
1313          } catch (IOException e) {
1314            throw new RuntimeException(e);
1315          }
1316        }
1317      }
1318    }, new FilterBase() {
1319      @Override
1320      public Filter.ReturnCode filterCell(final Cell c) throws IOException {
1321        if (timeToGoNextRow.get()) {
1322          timeToGoNextRow.set(false);
1323          return ReturnCode.NEXT_ROW;
1324        } else {
1325          return ReturnCode.INCLUDE;
1326        }
1327      }
1328    }, expectedSize);
1329  }
1330
1331  @Test
1332  public void testFlushBeforeCompletingScanWithFilterHint()
1333    throws IOException, InterruptedException {
1334    final AtomicBoolean timeToGetHint = new AtomicBoolean(false);
1335    final int expectedSize = 2;
1336    testFlushBeforeCompletingScan(new MyListHook() {
1337      @Override
1338      public void hook(int currentSize) {
1339        if (currentSize == expectedSize - 1) {
1340          try {
1341            flushStore(store, id++);
1342            timeToGetHint.set(true);
1343          } catch (IOException e) {
1344            throw new RuntimeException(e);
1345          }
1346        }
1347      }
1348    }, new FilterBase() {
1349      @Override
1350      public Filter.ReturnCode filterCell(final Cell c) throws IOException {
1351        if (timeToGetHint.get()) {
1352          timeToGetHint.set(false);
1353          return Filter.ReturnCode.SEEK_NEXT_USING_HINT;
1354        } else {
1355          return Filter.ReturnCode.INCLUDE;
1356        }
1357      }
1358
1359      @Override
1360      public Cell getNextCellHint(Cell currentCell) throws IOException {
1361        return currentCell;
1362      }
1363    }, expectedSize);
1364  }
1365
1366  private void testFlushBeforeCompletingScan(MyListHook hook, Filter filter, int expectedSize)
1367    throws IOException, InterruptedException {
1368    Configuration conf = HBaseConfiguration.create();
1369    byte[] r0 = Bytes.toBytes("row0");
1370    byte[] r1 = Bytes.toBytes("row1");
1371    byte[] r2 = Bytes.toBytes("row2");
1372    byte[] value0 = Bytes.toBytes("value0");
1373    byte[] value1 = Bytes.toBytes("value1");
1374    byte[] value2 = Bytes.toBytes("value2");
1375    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1376    long ts = EnvironmentEdgeManager.currentTime();
1377    long seqId = 100;
1378    init(name.getMethodName(), conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)),
1379      ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(1).build(),
1380      new MyStoreHook() {
1381        @Override
1382        public long getSmallestReadPoint(HStore store) {
1383          return seqId + 3;
1384        }
1385      });
1386    // The cells having the value0 won't be flushed to disk because the value of max version is 1
1387    store.add(createCell(r0, qf1, ts, seqId, value0), memStoreSizing);
1388    store.add(createCell(r0, qf2, ts, seqId, value0), memStoreSizing);
1389    store.add(createCell(r0, qf3, ts, seqId, value0), memStoreSizing);
1390    store.add(createCell(r1, qf1, ts + 1, seqId + 1, value1), memStoreSizing);
1391    store.add(createCell(r1, qf2, ts + 1, seqId + 1, value1), memStoreSizing);
1392    store.add(createCell(r1, qf3, ts + 1, seqId + 1, value1), memStoreSizing);
1393    store.add(createCell(r2, qf1, ts + 2, seqId + 2, value2), memStoreSizing);
1394    store.add(createCell(r2, qf2, ts + 2, seqId + 2, value2), memStoreSizing);
1395    store.add(createCell(r2, qf3, ts + 2, seqId + 2, value2), memStoreSizing);
1396    store.add(createCell(r1, qf1, ts + 3, seqId + 3, value1), memStoreSizing);
1397    store.add(createCell(r1, qf2, ts + 3, seqId + 3, value1), memStoreSizing);
1398    store.add(createCell(r1, qf3, ts + 3, seqId + 3, value1), memStoreSizing);
1399    List<Cell> myList = new MyList<>(hook);
1400    Scan scan = new Scan().withStartRow(r1).setFilter(filter);
1401    try (InternalScanner scanner = (InternalScanner) store.getScanner(scan, null, seqId + 3)) {
1402      // r1
1403      scanner.next(myList);
1404      assertEquals(expectedSize, myList.size());
1405      for (Cell c : myList) {
1406        byte[] actualValue = CellUtil.cloneValue(c);
1407        assertTrue("expected:" + Bytes.toStringBinary(value1) + ", actual:"
1408          + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value1));
1409      }
1410      List<Cell> normalList = new ArrayList<>(3);
1411      // r2
1412      scanner.next(normalList);
1413      assertEquals(3, normalList.size());
1414      for (Cell c : normalList) {
1415        byte[] actualValue = CellUtil.cloneValue(c);
1416        assertTrue("expected:" + Bytes.toStringBinary(value2) + ", actual:"
1417          + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value2));
1418      }
1419    }
1420  }
1421
1422  @Test
1423  public void testCreateScannerAndSnapshotConcurrently() throws IOException, InterruptedException {
1424    Configuration conf = HBaseConfiguration.create();
1425    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore.class.getName());
1426    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
1427      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
1428    byte[] value = Bytes.toBytes("value");
1429    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1430    long ts = EnvironmentEdgeManager.currentTime();
1431    long seqId = 100;
1432    // older data whihc shouldn't be "seen" by client
1433    store.add(createCell(qf1, ts, seqId, value), memStoreSizing);
1434    store.add(createCell(qf2, ts, seqId, value), memStoreSizing);
1435    store.add(createCell(qf3, ts, seqId, value), memStoreSizing);
1436    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1437    quals.add(qf1);
1438    quals.add(qf2);
1439    quals.add(qf3);
1440    StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY);
1441    MyCompactingMemStore.START_TEST.set(true);
1442    Runnable flush = () -> {
1443      // this is blocked until we create first scanner from pipeline and snapshot -- phase (1/5)
1444      // recreate the active memstore -- phase (4/5)
1445      storeFlushCtx.prepare();
1446    };
1447    ExecutorService service = Executors.newSingleThreadExecutor();
1448    service.execute(flush);
1449    // we get scanner from pipeline and snapshot but they are empty. -- phase (2/5)
1450    // this is blocked until we recreate the active memstore -- phase (3/5)
1451    // we get scanner from active memstore but it is empty -- phase (5/5)
1452    InternalScanner scanner =
1453      (InternalScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1);
1454    service.shutdown();
1455    service.awaitTermination(20, TimeUnit.SECONDS);
1456    try {
1457      try {
1458        List<Cell> results = new ArrayList<>();
1459        scanner.next(results);
1460        assertEquals(3, results.size());
1461        for (Cell c : results) {
1462          byte[] actualValue = CellUtil.cloneValue(c);
1463          assertTrue("expected:" + Bytes.toStringBinary(value) + ", actual:"
1464            + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value));
1465        }
1466      } finally {
1467        scanner.close();
1468      }
1469    } finally {
1470      MyCompactingMemStore.START_TEST.set(false);
1471      storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1472      storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1473    }
1474  }
1475
1476  @Test
1477  public void testScanWithDoubleFlush() throws IOException {
1478    Configuration conf = HBaseConfiguration.create();
1479    // Initialize region
1480    MyStore myStore = initMyStore(name.getMethodName(), conf, new MyStoreHook() {
1481      @Override
1482      public void getScanners(MyStore store) throws IOException {
1483        final long tmpId = id++;
1484        ExecutorService s = Executors.newSingleThreadExecutor();
1485        s.execute(() -> {
1486          try {
1487            // flush the store before storescanner updates the scanners from store.
1488            // The current data will be flushed into files, and the memstore will
1489            // be clear.
1490            // -- phase (4/4)
1491            flushStore(store, tmpId);
1492          } catch (IOException ex) {
1493            throw new RuntimeException(ex);
1494          }
1495        });
1496        s.shutdown();
1497        try {
1498          // wait for the flush, the thread will be blocked in HStore#notifyChangedReadersObservers.
1499          s.awaitTermination(3, TimeUnit.SECONDS);
1500        } catch (InterruptedException ex) {
1501        }
1502      }
1503    });
1504    byte[] oldValue = Bytes.toBytes("oldValue");
1505    byte[] currentValue = Bytes.toBytes("currentValue");
1506    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1507    long ts = EnvironmentEdgeManager.currentTime();
1508    long seqId = 100;
1509    // older data whihc shouldn't be "seen" by client
1510    myStore.add(createCell(qf1, ts, seqId, oldValue), memStoreSizing);
1511    myStore.add(createCell(qf2, ts, seqId, oldValue), memStoreSizing);
1512    myStore.add(createCell(qf3, ts, seqId, oldValue), memStoreSizing);
1513    long snapshotId = id++;
1514    // push older data into snapshot -- phase (1/4)
1515    StoreFlushContext storeFlushCtx =
1516      store.createFlushContext(snapshotId, FlushLifeCycleTracker.DUMMY);
1517    storeFlushCtx.prepare();
1518
1519    // insert current data into active -- phase (2/4)
1520    myStore.add(createCell(qf1, ts + 1, seqId + 1, currentValue), memStoreSizing);
1521    myStore.add(createCell(qf2, ts + 1, seqId + 1, currentValue), memStoreSizing);
1522    myStore.add(createCell(qf3, ts + 1, seqId + 1, currentValue), memStoreSizing);
1523    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1524    quals.add(qf1);
1525    quals.add(qf2);
1526    quals.add(qf3);
1527    try (InternalScanner scanner =
1528      (InternalScanner) myStore.getScanner(new Scan(new Get(row)), quals, seqId + 1)) {
1529      // complete the flush -- phase (3/4)
1530      storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1531      storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1532
1533      List<Cell> results = new ArrayList<>();
1534      scanner.next(results);
1535      assertEquals(3, results.size());
1536      for (Cell c : results) {
1537        byte[] actualValue = CellUtil.cloneValue(c);
1538        assertTrue("expected:" + Bytes.toStringBinary(currentValue) + ", actual:"
1539          + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, currentValue));
1540      }
1541    }
1542  }
1543
1544  /**
1545   * This test is for HBASE-27519, when the {@link StoreScanner} is scanning,the Flush and the
1546   * Compaction execute concurrently and theCcompaction compact and archive the flushed
1547   * {@link HStoreFile} which is used by {@link StoreScanner#updateReaders}.Before
1548   * HBASE-27519,{@link StoreScanner.updateReaders} would throw {@link FileNotFoundException}.
1549   */
1550  @Test
1551  public void testStoreScannerUpdateReadersWhenFlushAndCompactConcurrently() throws IOException {
1552    Configuration conf = HBaseConfiguration.create();
1553    conf.setBoolean(WALFactory.WAL_ENABLED, false);
1554    conf.set(DEFAULT_COMPACTION_POLICY_CLASS_KEY, EverythingPolicy.class.getName());
1555    byte[] r0 = Bytes.toBytes("row0");
1556    byte[] r1 = Bytes.toBytes("row1");
1557    final CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
1558    final AtomicBoolean shouldWaitRef = new AtomicBoolean(false);
1559    // Initialize region
1560    final MyStore myStore = initMyStore(name.getMethodName(), conf, new MyStoreHook() {
1561      @Override
1562      public void getScanners(MyStore store) throws IOException {
1563        try {
1564          // Here this method is called by StoreScanner.updateReaders which is invoked by the
1565          // following TestHStore.flushStore
1566          if (shouldWaitRef.get()) {
1567            // wait the following compaction Task start
1568            cyclicBarrier.await();
1569            // wait the following HStore.closeAndArchiveCompactedFiles end.
1570            cyclicBarrier.await();
1571          }
1572        } catch (BrokenBarrierException | InterruptedException e) {
1573          throw new RuntimeException(e);
1574        }
1575      }
1576    });
1577
1578    final AtomicReference<Throwable> compactionExceptionRef = new AtomicReference<Throwable>(null);
1579    Runnable compactionTask = () -> {
1580      try {
1581        // Only when the StoreScanner.updateReaders invoked by TestHStore.flushStore prepares for
1582        // entering the MyStore.getScanners, compactionTask could start.
1583        cyclicBarrier.await();
1584        region.compactStore(family, new NoLimitThroughputController());
1585        myStore.closeAndArchiveCompactedFiles();
1586        // Notify StoreScanner.updateReaders could enter MyStore.getScanners.
1587        cyclicBarrier.await();
1588      } catch (Throwable e) {
1589        compactionExceptionRef.set(e);
1590      }
1591    };
1592
1593    long ts = EnvironmentEdgeManager.currentTime();
1594    long seqId = 100;
1595    byte[] value = Bytes.toBytes("value");
1596    // older data whihc shouldn't be "seen" by client
1597    myStore.add(createCell(r0, qf1, ts, seqId, value), null);
1598    flushStore(myStore, id++);
1599    myStore.add(createCell(r0, qf2, ts, seqId, value), null);
1600    flushStore(myStore, id++);
1601    myStore.add(createCell(r0, qf3, ts, seqId, value), null);
1602    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1603    quals.add(qf1);
1604    quals.add(qf2);
1605    quals.add(qf3);
1606
1607    myStore.add(createCell(r1, qf1, ts, seqId, value), null);
1608    myStore.add(createCell(r1, qf2, ts, seqId, value), null);
1609    myStore.add(createCell(r1, qf3, ts, seqId, value), null);
1610
1611    Thread.currentThread()
1612      .setName("testStoreScannerUpdateReadersWhenFlushAndCompactConcurrently thread");
1613    Scan scan = new Scan();
1614    scan.withStartRow(r0, true);
1615    try (InternalScanner scanner = (InternalScanner) myStore.getScanner(scan, quals, seqId)) {
1616      List<Cell> results = new MyList<>(size -> {
1617        switch (size) {
1618          case 1:
1619            shouldWaitRef.set(true);
1620            Thread thread = new Thread(compactionTask);
1621            thread.setName("MyCompacting Thread.");
1622            thread.start();
1623            try {
1624              flushStore(myStore, id++);
1625              thread.join();
1626            } catch (IOException | InterruptedException e) {
1627              throw new RuntimeException(e);
1628            }
1629            shouldWaitRef.set(false);
1630            break;
1631          default:
1632            break;
1633        }
1634      });
1635      // Before HBASE-27519, here would throw java.io.FileNotFoundException because the storeFile
1636      // which used by StoreScanner.updateReaders is deleted by compactionTask.
1637      scanner.next(results);
1638      // The results is r0 row cells.
1639      assertEquals(3, results.size());
1640      assertTrue(compactionExceptionRef.get() == null);
1641    }
1642  }
1643
1644  @Test
1645  public void testReclaimChunkWhenScaning() throws IOException {
1646    init("testReclaimChunkWhenScaning");
1647    long ts = EnvironmentEdgeManager.currentTime();
1648    long seqId = 100;
1649    byte[] value = Bytes.toBytes("value");
1650    // older data whihc shouldn't be "seen" by client
1651    store.add(createCell(qf1, ts, seqId, value), null);
1652    store.add(createCell(qf2, ts, seqId, value), null);
1653    store.add(createCell(qf3, ts, seqId, value), null);
1654    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1655    quals.add(qf1);
1656    quals.add(qf2);
1657    quals.add(qf3);
1658    try (InternalScanner scanner =
1659      (InternalScanner) store.getScanner(new Scan(new Get(row)), quals, seqId)) {
1660      List<Cell> results = new MyList<>(size -> {
1661        switch (size) {
1662          // 1) we get the first cell (qf1)
1663          // 2) flush the data to have StoreScanner update inner scanners
1664          // 3) the chunk will be reclaimed after updaing
1665          case 1:
1666            try {
1667              flushStore(store, id++);
1668            } catch (IOException e) {
1669              throw new RuntimeException(e);
1670            }
1671            break;
1672          // 1) we get the second cell (qf2)
1673          // 2) add some cell to fill some byte into the chunk (we have only one chunk)
1674          case 2:
1675            try {
1676              byte[] newValue = Bytes.toBytes("newValue");
1677              // older data whihc shouldn't be "seen" by client
1678              store.add(createCell(qf1, ts + 1, seqId + 1, newValue), null);
1679              store.add(createCell(qf2, ts + 1, seqId + 1, newValue), null);
1680              store.add(createCell(qf3, ts + 1, seqId + 1, newValue), null);
1681            } catch (IOException e) {
1682              throw new RuntimeException(e);
1683            }
1684            break;
1685          default:
1686            break;
1687        }
1688      });
1689      scanner.next(results);
1690      assertEquals(3, results.size());
1691      for (Cell c : results) {
1692        byte[] actualValue = CellUtil.cloneValue(c);
1693        assertTrue("expected:" + Bytes.toStringBinary(value) + ", actual:"
1694          + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value));
1695      }
1696    }
1697  }
1698
1699  /**
1700   * If there are two running InMemoryFlushRunnable, the later InMemoryFlushRunnable may change the
1701   * versionedList. And the first InMemoryFlushRunnable will use the chagned versionedList to remove
1702   * the corresponding segments. In short, there will be some segements which isn't in merge are
1703   * removed.
1704   */
1705  @Test
1706  public void testRunDoubleMemStoreCompactors() throws IOException, InterruptedException {
1707    int flushSize = 500;
1708    Configuration conf = HBaseConfiguration.create();
1709    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStoreWithCustomCompactor.class.getName());
1710    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.25);
1711    MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.set(0);
1712    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushSize));
1713    // Set the lower threshold to invoke the "MERGE" policy
1714    conf.set(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, String.valueOf(0));
1715    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
1716      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
1717    byte[] value = Bytes.toBytes("thisisavarylargevalue");
1718    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1719    long ts = EnvironmentEdgeManager.currentTime();
1720    long seqId = 100;
1721    // older data whihc shouldn't be "seen" by client
1722    store.add(createCell(qf1, ts, seqId, value), memStoreSizing);
1723    store.add(createCell(qf2, ts, seqId, value), memStoreSizing);
1724    store.add(createCell(qf3, ts, seqId, value), memStoreSizing);
1725    assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
1726    StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY);
1727    storeFlushCtx.prepare();
1728    // This shouldn't invoke another in-memory flush because the first compactor thread
1729    // hasn't accomplished the in-memory compaction.
1730    store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing);
1731    store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing);
1732    store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing);
1733    assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
1734    // okay. Let the compaction be completed
1735    MyMemStoreCompactor.START_COMPACTOR_LATCH.countDown();
1736    CompactingMemStore mem = (CompactingMemStore) ((HStore) store).memstore;
1737    while (mem.isMemStoreFlushingInMemory()) {
1738      TimeUnit.SECONDS.sleep(1);
1739    }
1740    // This should invoke another in-memory flush.
1741    store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing);
1742    store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing);
1743    store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing);
1744    assertEquals(2, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
1745    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
1746      String.valueOf(TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE));
1747    storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1748    storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1749  }
1750
1751  @Test
1752  public void testAge() throws IOException {
1753    long currentTime = EnvironmentEdgeManager.currentTime();
1754    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
1755    edge.setValue(currentTime);
1756    EnvironmentEdgeManager.injectEdge(edge);
1757    Configuration conf = TEST_UTIL.getConfiguration();
1758    ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(family);
1759    initHRegion(name.getMethodName(), conf,
1760      TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), hcd, null, false);
1761    HStore store = new HStore(region, hcd, conf, false) {
1762
1763      @Override
1764      protected StoreEngine<?, ?, ?, ?> createStoreEngine(HStore store, Configuration conf,
1765        CellComparator kvComparator) throws IOException {
1766        List<HStoreFile> storefiles =
1767          Arrays.asList(mockStoreFile(currentTime - 10), mockStoreFile(currentTime - 100),
1768            mockStoreFile(currentTime - 1000), mockStoreFile(currentTime - 10000));
1769        StoreFileManager sfm = mock(StoreFileManager.class);
1770        when(sfm.getStoreFiles()).thenReturn(storefiles);
1771        StoreEngine<?, ?, ?, ?> storeEngine = mock(StoreEngine.class);
1772        when(storeEngine.getStoreFileManager()).thenReturn(sfm);
1773        return storeEngine;
1774      }
1775    };
1776    assertEquals(10L, store.getMinStoreFileAge().getAsLong());
1777    assertEquals(10000L, store.getMaxStoreFileAge().getAsLong());
1778    assertEquals((10 + 100 + 1000 + 10000) / 4.0, store.getAvgStoreFileAge().getAsDouble(), 1E-4);
1779  }
1780
1781  private HStoreFile mockStoreFile(long createdTime) {
1782    StoreFileInfo info = mock(StoreFileInfo.class);
1783    when(info.getCreatedTimestamp()).thenReturn(createdTime);
1784    HStoreFile sf = mock(HStoreFile.class);
1785    when(sf.getReader()).thenReturn(mock(StoreFileReader.class));
1786    when(sf.isHFile()).thenReturn(true);
1787    when(sf.getFileInfo()).thenReturn(info);
1788    return sf;
1789  }
1790
1791  private MyStore initMyStore(String methodName, Configuration conf, MyStoreHook hook)
1792    throws IOException {
1793    return (MyStore) init(methodName, conf,
1794      TableDescriptorBuilder.newBuilder(TableName.valueOf(table)),
1795      ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(5).build(), hook);
1796  }
1797
1798  private static class MyStore extends HStore {
1799    private final MyStoreHook hook;
1800
1801    MyStore(final HRegion region, final ColumnFamilyDescriptor family,
1802      final Configuration confParam, MyStoreHook hook, boolean switchToPread) throws IOException {
1803      super(region, family, confParam, false);
1804      this.hook = hook;
1805    }
1806
1807    @Override
1808    public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks,
1809      boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,
1810      boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt,
1811      boolean includeMemstoreScanner, boolean onlyLatestVersion) throws IOException {
1812      hook.getScanners(this);
1813      return super.getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true,
1814        stopRow, false, readPt, includeMemstoreScanner, onlyLatestVersion);
1815    }
1816
1817    @Override
1818    public long getSmallestReadPoint() {
1819      return hook.getSmallestReadPoint(this);
1820    }
1821  }
1822
1823  private abstract static class MyStoreHook {
1824
1825    void getScanners(MyStore store) throws IOException {
1826    }
1827
1828    long getSmallestReadPoint(HStore store) {
1829      return store.getHRegion().getSmallestReadPoint();
1830    }
1831  }
1832
1833  @Test
1834  public void testSwitchingPreadtoStreamParallelyWithCompactionDischarger() throws Exception {
1835    Configuration conf = HBaseConfiguration.create();
1836    conf.set("hbase.hstore.engine.class", DummyStoreEngine.class.getName());
1837    conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 0);
1838    // Set the lower threshold to invoke the "MERGE" policy
1839    MyStore store = initMyStore(name.getMethodName(), conf, new MyStoreHook() {
1840    });
1841    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1842    long ts = EnvironmentEdgeManager.currentTime();
1843    long seqID = 1L;
1844    // Add some data to the region and do some flushes
1845    for (int i = 1; i < 10; i++) {
1846      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1847        memStoreSizing);
1848    }
1849    // flush them
1850    flushStore(store, seqID);
1851    for (int i = 11; i < 20; i++) {
1852      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1853        memStoreSizing);
1854    }
1855    // flush them
1856    flushStore(store, seqID);
1857    for (int i = 21; i < 30; i++) {
1858      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1859        memStoreSizing);
1860    }
1861    // flush them
1862    flushStore(store, seqID);
1863
1864    assertEquals(3, store.getStorefilesCount());
1865    Scan scan = new Scan();
1866    scan.addFamily(family);
1867    Collection<HStoreFile> storefiles2 = store.getStorefiles();
1868    ArrayList<HStoreFile> actualStorefiles = Lists.newArrayList(storefiles2);
1869    StoreScanner storeScanner =
1870      (StoreScanner) store.getScanner(scan, scan.getFamilyMap().get(family), Long.MAX_VALUE);
1871    // get the current heap
1872    KeyValueHeap heap = storeScanner.heap;
1873    // create more store files
1874    for (int i = 31; i < 40; i++) {
1875      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1876        memStoreSizing);
1877    }
1878    // flush them
1879    flushStore(store, seqID);
1880
1881    for (int i = 41; i < 50; i++) {
1882      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1883        memStoreSizing);
1884    }
1885    // flush them
1886    flushStore(store, seqID);
1887    storefiles2 = store.getStorefiles();
1888    ArrayList<HStoreFile> actualStorefiles1 = Lists.newArrayList(storefiles2);
1889    actualStorefiles1.removeAll(actualStorefiles);
1890    // Do compaction
1891    MyThread thread = new MyThread(storeScanner);
1892    thread.start();
1893    store.replaceStoreFiles(actualStorefiles, actualStorefiles1, false);
1894    thread.join();
1895    KeyValueHeap heap2 = thread.getHeap();
1896    assertFalse(heap.equals(heap2));
1897  }
1898
1899  @Test
1900  public void testMaxPreadBytesConfiguredToBeLessThanZero() throws Exception {
1901    Configuration conf = HBaseConfiguration.create();
1902    conf.set("hbase.hstore.engine.class", DummyStoreEngine.class.getName());
1903    // Set 'hbase.storescanner.pread.max.bytes' < 0, so that StoreScanner will be a STREAM type.
1904    conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, -1);
1905    MyStore store = initMyStore(name.getMethodName(), conf, new MyStoreHook() {
1906    });
1907    Scan scan = new Scan();
1908    scan.addFamily(family);
1909    // ReadType on Scan is still DEFAULT only.
1910    assertEquals(ReadType.DEFAULT, scan.getReadType());
1911    StoreScanner storeScanner =
1912      (StoreScanner) store.getScanner(scan, scan.getFamilyMap().get(family), Long.MAX_VALUE);
1913    assertFalse(storeScanner.isScanUsePread());
1914  }
1915
1916  @Test
1917  public void testInMemoryCompactionTypeWithLowerCase() throws IOException, InterruptedException {
1918    Configuration conf = HBaseConfiguration.create();
1919    conf.set("hbase.systemtables.compacting.memstore.type", "eager");
1920    init(name.getMethodName(), conf,
1921      TableDescriptorBuilder.newBuilder(
1922        TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME, "meta".getBytes())),
1923      ColumnFamilyDescriptorBuilder.newBuilder(family)
1924        .setInMemoryCompaction(MemoryCompactionPolicy.NONE).build());
1925    assertTrue(((MemStoreCompactor) ((CompactingMemStore) store.memstore).compactor).toString()
1926      .startsWith("eager".toUpperCase()));
1927  }
1928
1929  @Test
1930  public void testSpaceQuotaChangeAfterReplacement() throws IOException {
1931    final TableName tn = TableName.valueOf(name.getMethodName());
1932    init(name.getMethodName());
1933
1934    RegionSizeStoreImpl sizeStore = new RegionSizeStoreImpl();
1935
1936    HStoreFile sf1 = mockStoreFileWithLength(1024L);
1937    HStoreFile sf2 = mockStoreFileWithLength(2048L);
1938    HStoreFile sf3 = mockStoreFileWithLength(4096L);
1939    HStoreFile sf4 = mockStoreFileWithLength(8192L);
1940
1941    RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tn).setStartKey(Bytes.toBytes("a"))
1942      .setEndKey(Bytes.toBytes("b")).build();
1943
1944    // Compacting two files down to one, reducing size
1945    sizeStore.put(regionInfo, 1024L + 4096L);
1946    store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo, Arrays.asList(sf1, sf3),
1947      Arrays.asList(sf2));
1948
1949    assertEquals(2048L, sizeStore.getRegionSize(regionInfo).getSize());
1950
1951    // The same file length in and out should have no change
1952    store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo, Arrays.asList(sf2),
1953      Arrays.asList(sf2));
1954
1955    assertEquals(2048L, sizeStore.getRegionSize(regionInfo).getSize());
1956
1957    // Increase the total size used
1958    store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo, Arrays.asList(sf2),
1959      Arrays.asList(sf3));
1960
1961    assertEquals(4096L, sizeStore.getRegionSize(regionInfo).getSize());
1962
1963    RegionInfo regionInfo2 = RegionInfoBuilder.newBuilder(tn).setStartKey(Bytes.toBytes("b"))
1964      .setEndKey(Bytes.toBytes("c")).build();
1965    store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo2, null, Arrays.asList(sf4));
1966
1967    assertEquals(8192L, sizeStore.getRegionSize(regionInfo2).getSize());
1968  }
1969
1970  @Test
1971  public void testHFileContextSetWithCFAndTable() throws Exception {
1972    init(this.name.getMethodName());
1973    StoreFileWriter writer = store.getStoreEngine()
1974      .createWriter(CreateStoreFileWriterParams.create().maxKeyCount(10000L)
1975        .compression(Compression.Algorithm.NONE).isCompaction(true).includeMVCCReadpoint(true)
1976        .includesTag(false).shouldDropBehind(true));
1977    HFileContext hFileContext = writer.getLiveFileWriter().getFileContext();
1978    assertArrayEquals(family, hFileContext.getColumnFamily());
1979    assertArrayEquals(table, hFileContext.getTableName());
1980  }
1981
1982  // This test is for HBASE-26026, HBase Write be stuck when active segment has no cell
1983  // but its dataSize exceeds inmemoryFlushSize
1984  @Test
1985  public void testCompactingMemStoreNoCellButDataSizeExceedsInmemoryFlushSize()
1986    throws IOException, InterruptedException {
1987    Configuration conf = HBaseConfiguration.create();
1988
1989    byte[] smallValue = new byte[3];
1990    byte[] largeValue = new byte[9];
1991    final long timestamp = EnvironmentEdgeManager.currentTime();
1992    final long seqId = 100;
1993    final ExtendedCell smallCell = createCell(qf1, timestamp, seqId, smallValue);
1994    final ExtendedCell largeCell = createCell(qf2, timestamp, seqId, largeValue);
1995    int smallCellByteSize = MutableSegment.getCellLength(smallCell);
1996    int largeCellByteSize = MutableSegment.getCellLength(largeCell);
1997    int flushByteSize = smallCellByteSize + largeCellByteSize - 2;
1998
1999    // set CompactingMemStore.inmemoryFlushSize to flushByteSize.
2000    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore2.class.getName());
2001    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
2002    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
2003
2004    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
2005      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
2006
2007    MyCompactingMemStore2 myCompactingMemStore = ((MyCompactingMemStore2) store.memstore);
2008    assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
2009    myCompactingMemStore.smallCellPreUpdateCounter.set(0);
2010    myCompactingMemStore.largeCellPreUpdateCounter.set(0);
2011
2012    final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
2013    Thread smallCellThread = new Thread(() -> {
2014      try {
2015        store.add(smallCell, new NonThreadSafeMemStoreSizing());
2016      } catch (Throwable exception) {
2017        exceptionRef.set(exception);
2018      }
2019    });
2020    smallCellThread.setName(MyCompactingMemStore2.SMALL_CELL_THREAD_NAME);
2021    smallCellThread.start();
2022
2023    String oldThreadName = Thread.currentThread().getName();
2024    try {
2025      /**
2026       * 1.smallCellThread enters CompactingMemStore.checkAndAddToActiveSize first, then
2027       * largeCellThread enters CompactingMemStore.checkAndAddToActiveSize, and then largeCellThread
2028       * invokes flushInMemory.
2029       * <p/>
2030       * 2. After largeCellThread finished CompactingMemStore.flushInMemory method, smallCellThread
2031       * can add cell to currentActive . That is to say when largeCellThread called flushInMemory
2032       * method, CompactingMemStore.active has no cell.
2033       */
2034      Thread.currentThread().setName(MyCompactingMemStore2.LARGE_CELL_THREAD_NAME);
2035      store.add(largeCell, new NonThreadSafeMemStoreSizing());
2036      smallCellThread.join();
2037
2038      for (int i = 0; i < 100; i++) {
2039        long currentTimestamp = timestamp + 100 + i;
2040        ExtendedCell cell = createCell(qf2, currentTimestamp, seqId, largeValue);
2041        store.add(cell, new NonThreadSafeMemStoreSizing());
2042      }
2043    } finally {
2044      Thread.currentThread().setName(oldThreadName);
2045    }
2046
2047    assertTrue(exceptionRef.get() == null);
2048
2049  }
2050
2051  // This test is for HBASE-26210, HBase Write be stuck when there is cell which size exceeds
2052  // InmemoryFlushSize
2053  @Test(timeout = 60000)
2054  public void testCompactingMemStoreCellExceedInmemoryFlushSize() throws Exception {
2055    Configuration conf = HBaseConfiguration.create();
2056    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore6.class.getName());
2057
2058    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
2059      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
2060
2061    MyCompactingMemStore6 myCompactingMemStore = ((MyCompactingMemStore6) store.memstore);
2062
2063    int size = (int) (myCompactingMemStore.getInmemoryFlushSize());
2064    byte[] value = new byte[size + 1];
2065
2066    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
2067    long timestamp = EnvironmentEdgeManager.currentTime();
2068    long seqId = 100;
2069    ExtendedCell cell = createCell(qf1, timestamp, seqId, value);
2070    int cellByteSize = MutableSegment.getCellLength(cell);
2071    store.add(cell, memStoreSizing);
2072    assertTrue(memStoreSizing.getCellsCount() == 1);
2073    assertTrue(memStoreSizing.getDataSize() == cellByteSize);
2074    // Waiting the in memory compaction completed, see HBASE-26438
2075    myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
2076  }
2077
2078  /**
2079   * This test is for HBASE-27464, before this JIRA,when init {@link CellChunkImmutableSegment} for
2080   * 'COMPACT' action, we not force copy to current MSLab. When cell size bigger than
2081   * {@link MemStoreLABImpl#maxAlloc}, cell will stay in previous chunk which will recycle after
2082   * segment replace, and we may read wrong data when these chunk reused by others.
2083   */
2084  @Test
2085  public void testForceCloneOfBigCellForCellChunkImmutableSegment() throws Exception {
2086    Configuration conf = HBaseConfiguration.create();
2087    int maxAllocByteSize = conf.getInt(MemStoreLAB.MAX_ALLOC_KEY, MemStoreLAB.MAX_ALLOC_DEFAULT);
2088
2089    // Construct big cell,which is large than {@link MemStoreLABImpl#maxAlloc}.
2090    byte[] cellValue = new byte[maxAllocByteSize + 1];
2091    final long timestamp = EnvironmentEdgeManager.currentTime();
2092    final long seqId = 100;
2093    final byte[] rowKey1 = Bytes.toBytes("rowKey1");
2094    final ExtendedCell originalCell1 = createCell(rowKey1, qf1, timestamp, seqId, cellValue);
2095    final byte[] rowKey2 = Bytes.toBytes("rowKey2");
2096    final ExtendedCell originalCell2 = createCell(rowKey2, qf1, timestamp, seqId, cellValue);
2097    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
2098    quals.add(qf1);
2099
2100    int cellByteSize = MutableSegment.getCellLength(originalCell1);
2101    int inMemoryFlushByteSize = cellByteSize - 1;
2102
2103    // set CompactingMemStore.inmemoryFlushSize to flushByteSize.
2104    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore6.class.getName());
2105    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
2106    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(inMemoryFlushByteSize * 200));
2107    conf.setBoolean(WALFactory.WAL_ENABLED, false);
2108
2109    // Use {@link MemoryCompactionPolicy#EAGER} for always compacting.
2110    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
2111      .setInMemoryCompaction(MemoryCompactionPolicy.EAGER).build());
2112
2113    MyCompactingMemStore6 myCompactingMemStore = ((MyCompactingMemStore6) store.memstore);
2114    assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == inMemoryFlushByteSize);
2115
2116    // Data chunk Pool is disabled.
2117    assertTrue(ChunkCreator.getInstance().getMaxCount(ChunkType.DATA_CHUNK) == 0);
2118
2119    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
2120
2121    // First compact
2122    store.add(originalCell1, memStoreSizing);
2123    // Waiting for the first in-memory compaction finished
2124    myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
2125
2126    StoreScanner storeScanner =
2127      (StoreScanner) store.getScanner(new Scan(new Get(rowKey1)), quals, seqId + 1);
2128    SegmentScanner segmentScanner = getTypeKeyValueScanner(storeScanner, SegmentScanner.class);
2129    ExtendedCell resultCell1 = segmentScanner.next();
2130    assertTrue(PrivateCellUtil.equals(resultCell1, originalCell1));
2131    int cell1ChunkId = resultCell1.getChunkId();
2132    assertTrue(cell1ChunkId != ExtendedCell.CELL_NOT_BASED_ON_CHUNK);
2133    assertNull(segmentScanner.next());
2134    segmentScanner.close();
2135    storeScanner.close();
2136    Segment segment = segmentScanner.segment;
2137    assertTrue(segment instanceof CellChunkImmutableSegment);
2138    MemStoreLABImpl memStoreLAB1 = (MemStoreLABImpl) (segmentScanner.segment.getMemStoreLAB());
2139    assertTrue(!memStoreLAB1.isClosed());
2140    assertTrue(!memStoreLAB1.chunks.isEmpty());
2141    assertTrue(!memStoreLAB1.isReclaimed());
2142
2143    // Second compact
2144    store.add(originalCell2, memStoreSizing);
2145    // Waiting for the second in-memory compaction finished
2146    myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
2147
2148    // Before HBASE-27464, here may throw java.lang.IllegalArgumentException: In CellChunkMap, cell
2149    // must be associated with chunk.. We were looking for a cell at index 0.
2150    // The cause for this exception is because the data chunk Pool is disabled,when the data chunks
2151    // are recycled after the second in-memory compaction finished,the
2152    // {@link ChunkCreator.putbackChunks} method does not put the chunks back to the data chunk
2153    // pool,it just removes them from {@link ChunkCreator#chunkIdMap},so in
2154    // {@link CellChunkMap#getCell} we could not get the data chunk by chunkId.
2155    storeScanner = (StoreScanner) store.getScanner(new Scan(new Get(rowKey1)), quals, seqId + 1);
2156    segmentScanner = getTypeKeyValueScanner(storeScanner, SegmentScanner.class);
2157    ExtendedCell newResultCell1 = segmentScanner.next();
2158    assertTrue(newResultCell1 != resultCell1);
2159    assertTrue(PrivateCellUtil.equals(newResultCell1, originalCell1));
2160
2161    ExtendedCell resultCell2 = segmentScanner.next();
2162    assertTrue(PrivateCellUtil.equals(resultCell2, originalCell2));
2163    assertNull(segmentScanner.next());
2164    segmentScanner.close();
2165    storeScanner.close();
2166
2167    segment = segmentScanner.segment;
2168    assertTrue(segment instanceof CellChunkImmutableSegment);
2169    MemStoreLABImpl memStoreLAB2 = (MemStoreLABImpl) (segmentScanner.segment.getMemStoreLAB());
2170    assertTrue(!memStoreLAB2.isClosed());
2171    assertTrue(!memStoreLAB2.chunks.isEmpty());
2172    assertTrue(!memStoreLAB2.isReclaimed());
2173    assertTrue(memStoreLAB1.isClosed());
2174    assertTrue(memStoreLAB1.chunks.isEmpty());
2175    assertTrue(memStoreLAB1.isReclaimed());
2176  }
2177
2178  // This test is for HBASE-26210 also, test write large cell and small cell concurrently when
2179  // InmemoryFlushSize is smaller,equal with and larger than cell size.
2180  @Test
2181  public void testCompactingMemStoreWriteLargeCellAndSmallCellConcurrently()
2182    throws IOException, InterruptedException {
2183    doWriteTestLargeCellAndSmallCellConcurrently(
2184      (smallCellByteSize, largeCellByteSize) -> largeCellByteSize - 1);
2185    doWriteTestLargeCellAndSmallCellConcurrently(
2186      (smallCellByteSize, largeCellByteSize) -> largeCellByteSize);
2187    doWriteTestLargeCellAndSmallCellConcurrently(
2188      (smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize - 1);
2189    doWriteTestLargeCellAndSmallCellConcurrently(
2190      (smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize);
2191    doWriteTestLargeCellAndSmallCellConcurrently(
2192      (smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize + 1);
2193  }
2194
2195  private void doWriteTestLargeCellAndSmallCellConcurrently(IntBinaryOperator getFlushByteSize)
2196    throws IOException, InterruptedException {
2197
2198    Configuration conf = HBaseConfiguration.create();
2199
2200    byte[] smallValue = new byte[3];
2201    byte[] largeValue = new byte[100];
2202    final long timestamp = EnvironmentEdgeManager.currentTime();
2203    final long seqId = 100;
2204    final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
2205    final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
2206    int smallCellByteSize = MutableSegment.getCellLength(smallCell);
2207    int largeCellByteSize = MutableSegment.getCellLength(largeCell);
2208    int flushByteSize = getFlushByteSize.applyAsInt(smallCellByteSize, largeCellByteSize);
2209    boolean flushByteSizeLessThanSmallAndLargeCellSize =
2210      flushByteSize < (smallCellByteSize + largeCellByteSize);
2211
2212    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore3.class.getName());
2213    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
2214    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
2215
2216    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
2217      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
2218
2219    MyCompactingMemStore3 myCompactingMemStore = ((MyCompactingMemStore3) store.memstore);
2220    assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
2221    myCompactingMemStore.disableCompaction();
2222    if (flushByteSizeLessThanSmallAndLargeCellSize) {
2223      myCompactingMemStore.flushByteSizeLessThanSmallAndLargeCellSize = true;
2224    } else {
2225      myCompactingMemStore.flushByteSizeLessThanSmallAndLargeCellSize = false;
2226    }
2227
2228    final ThreadSafeMemStoreSizing memStoreSizing = new ThreadSafeMemStoreSizing();
2229    final AtomicLong totalCellByteSize = new AtomicLong(0);
2230    final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
2231    Thread smallCellThread = new Thread(() -> {
2232      try {
2233        for (int i = 1; i <= MyCompactingMemStore3.CELL_COUNT; i++) {
2234          long currentTimestamp = timestamp + i;
2235          ExtendedCell cell = createCell(qf1, currentTimestamp, seqId, smallValue);
2236          totalCellByteSize.addAndGet(MutableSegment.getCellLength(cell));
2237          store.add(cell, memStoreSizing);
2238        }
2239      } catch (Throwable exception) {
2240        exceptionRef.set(exception);
2241
2242      }
2243    });
2244    smallCellThread.setName(MyCompactingMemStore3.SMALL_CELL_THREAD_NAME);
2245    smallCellThread.start();
2246
2247    String oldThreadName = Thread.currentThread().getName();
2248    try {
2249      /**
2250       * When flushByteSizeLessThanSmallAndLargeCellSize is true:
2251       * </p>
2252       * 1.smallCellThread enters MyCompactingMemStore3.checkAndAddToActiveSize first, then
2253       * largeCellThread enters MyCompactingMemStore3.checkAndAddToActiveSize, and then
2254       * largeCellThread invokes flushInMemory.
2255       * <p/>
2256       * 2. After largeCellThread finished CompactingMemStore.flushInMemory method, smallCellThread
2257       * can run into MyCompactingMemStore3.checkAndAddToActiveSize again.
2258       * <p/>
2259       * When flushByteSizeLessThanSmallAndLargeCellSize is false: smallCellThread and
2260       * largeCellThread concurrently write one cell and wait each other, and then write another
2261       * cell etc.
2262       */
2263      Thread.currentThread().setName(MyCompactingMemStore3.LARGE_CELL_THREAD_NAME);
2264      for (int i = 1; i <= MyCompactingMemStore3.CELL_COUNT; i++) {
2265        long currentTimestamp = timestamp + i;
2266        ExtendedCell cell = createCell(qf2, currentTimestamp, seqId, largeValue);
2267        totalCellByteSize.addAndGet(MutableSegment.getCellLength(cell));
2268        store.add(cell, memStoreSizing);
2269      }
2270      smallCellThread.join();
2271
2272      assertTrue(exceptionRef.get() == null);
2273      assertTrue(memStoreSizing.getCellsCount() == (MyCompactingMemStore3.CELL_COUNT * 2));
2274      assertTrue(memStoreSizing.getDataSize() == totalCellByteSize.get());
2275      if (flushByteSizeLessThanSmallAndLargeCellSize) {
2276        assertTrue(myCompactingMemStore.flushCounter.get() == MyCompactingMemStore3.CELL_COUNT);
2277      } else {
2278        assertTrue(
2279          myCompactingMemStore.flushCounter.get() <= (MyCompactingMemStore3.CELL_COUNT - 1));
2280      }
2281    } finally {
2282      Thread.currentThread().setName(oldThreadName);
2283    }
2284  }
2285
2286  /**
2287   * <pre>
2288   * This test is for HBASE-26384,
2289   * test {@link CompactingMemStore#flattenOneSegment} and {@link CompactingMemStore#snapshot()}
2290   * execute concurrently.
2291   * The threads sequence before HBASE-26384 is(The bug only exists for branch-2,and I add UTs
2292   * for both branch-2 and master):
2293   * 1. The {@link CompactingMemStore} size exceeds
2294   *    {@link CompactingMemStore#getInmemoryFlushSize()},the write thread adds a new
2295   *    {@link ImmutableSegment}  to the head of {@link CompactingMemStore#pipeline},and start a
2296   *    in memory compact thread to execute {@link CompactingMemStore#inMemoryCompaction}.
2297   * 2. The in memory compact thread starts and then stopping before
2298   *    {@link CompactingMemStore#flattenOneSegment}.
2299   * 3. The snapshot thread starts {@link CompactingMemStore#snapshot} concurrently,after the
2300   *    snapshot thread executing {@link CompactingMemStore#getImmutableSegments},the in memory
2301   *    compact thread continues.
2302   *    Assuming {@link VersionedSegmentsList#version} returned from
2303   *    {@link CompactingMemStore#getImmutableSegments} is v.
2304   * 4. The snapshot thread stopping before {@link CompactingMemStore#swapPipelineWithNull}.
2305   * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
2306   *    {@link CompactionPipeline#version} is still v.
2307   * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
2308   *    {@link CompactionPipeline#version} is v, {@link CompactingMemStore#swapPipelineWithNull}
2309   *    thinks it is successful and continue flushing,but the {@link ImmutableSegment} in
2310   *    {@link CompactionPipeline} has changed because
2311   *    {@link CompactingMemStore#flattenOneSegment},so the {@link ImmutableSegment} is not
2312   *    removed in fact and still remaining in {@link CompactionPipeline}.
2313   *
2314   * After HBASE-26384, the 5-6 step is changed to following, which is expected behavior:
2315   * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
2316   *    {@link CompactingMemStore#flattenOneSegment} change {@link CompactionPipeline#version} to
2317   *    v+1.
2318   * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
2319   *    {@link CompactionPipeline#version} is v+1, {@link CompactingMemStore#swapPipelineWithNull}
2320   *    failed and retry the while loop in {@link CompactingMemStore#pushPipelineToSnapshot} once
2321   *    again, because there is no concurrent {@link CompactingMemStore#inMemoryCompaction} now,
2322   *    {@link CompactingMemStore#swapPipelineWithNull} succeeds.
2323   * </pre>
2324   */
2325  @Test
2326  public void testFlattenAndSnapshotCompactingMemStoreConcurrently() throws Exception {
2327    Configuration conf = HBaseConfiguration.create();
2328
2329    byte[] smallValue = new byte[3];
2330    byte[] largeValue = new byte[9];
2331    final long timestamp = EnvironmentEdgeManager.currentTime();
2332    final long seqId = 100;
2333    final ExtendedCell smallCell = createCell(qf1, timestamp, seqId, smallValue);
2334    final ExtendedCell largeCell = createCell(qf2, timestamp, seqId, largeValue);
2335    int smallCellByteSize = MutableSegment.getCellLength(smallCell);
2336    int largeCellByteSize = MutableSegment.getCellLength(largeCell);
2337    int totalCellByteSize = (smallCellByteSize + largeCellByteSize);
2338    int flushByteSize = totalCellByteSize - 2;
2339
2340    // set CompactingMemStore.inmemoryFlushSize to flushByteSize.
2341    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore4.class.getName());
2342    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
2343    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
2344
2345    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
2346      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
2347
2348    MyCompactingMemStore4 myCompactingMemStore = ((MyCompactingMemStore4) store.memstore);
2349    assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
2350
2351    store.add(smallCell, new NonThreadSafeMemStoreSizing());
2352    store.add(largeCell, new NonThreadSafeMemStoreSizing());
2353
2354    String oldThreadName = Thread.currentThread().getName();
2355    try {
2356      Thread.currentThread().setName(MyCompactingMemStore4.TAKE_SNAPSHOT_THREAD_NAME);
2357      /**
2358       * {@link CompactingMemStore#snapshot} must wait the in memory compact thread enters
2359       * {@link CompactingMemStore#flattenOneSegment},because {@link CompactingMemStore#snapshot}
2360       * would invoke {@link CompactingMemStore#stopCompaction}.
2361       */
2362      myCompactingMemStore.snapShotStartCyclicCyclicBarrier.await();
2363
2364      MemStoreSnapshot memStoreSnapshot = myCompactingMemStore.snapshot();
2365      myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
2366
2367      assertTrue(memStoreSnapshot.getCellsCount() == 2);
2368      assertTrue(((int) (memStoreSnapshot.getDataSize())) == totalCellByteSize);
2369      VersionedSegmentsList segments = myCompactingMemStore.getImmutableSegments();
2370      assertTrue(segments.getNumOfSegments() == 0);
2371      assertTrue(segments.getNumOfCells() == 0);
2372      assertTrue(myCompactingMemStore.setInMemoryCompactionFlagCounter.get() == 1);
2373      assertTrue(myCompactingMemStore.swapPipelineWithNullCounter.get() == 2);
2374    } finally {
2375      Thread.currentThread().setName(oldThreadName);
2376    }
2377  }
2378
2379  /**
2380   * <pre>
2381   * This test is for HBASE-26384,
2382   * test {@link CompactingMemStore#flattenOneSegment}{@link CompactingMemStore#snapshot()}
2383   * and writeMemStore execute concurrently.
2384   * The threads sequence before HBASE-26384 is(The bug only exists for branch-2,and I add UTs
2385   * for both branch-2 and master):
2386   * 1. The {@link CompactingMemStore} size exceeds
2387   *    {@link CompactingMemStore#getInmemoryFlushSize()},the write thread adds a new
2388   *    {@link ImmutableSegment}  to the head of {@link CompactingMemStore#pipeline},and start a
2389   *    in memory compact thread to execute {@link CompactingMemStore#inMemoryCompaction}.
2390   * 2. The in memory compact thread starts and then stopping before
2391   *    {@link CompactingMemStore#flattenOneSegment}.
2392   * 3. The snapshot thread starts {@link CompactingMemStore#snapshot} concurrently,after the
2393   *    snapshot thread executing {@link CompactingMemStore#getImmutableSegments},the in memory
2394   *    compact thread continues.
2395   *    Assuming {@link VersionedSegmentsList#version} returned from
2396   *    {@link CompactingMemStore#getImmutableSegments} is v.
2397   * 4. The snapshot thread stopping before {@link CompactingMemStore#swapPipelineWithNull}.
2398   * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
2399   *    {@link CompactionPipeline#version} is still v.
2400   * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
2401   *    {@link CompactionPipeline#version} is v, {@link CompactingMemStore#swapPipelineWithNull}
2402   *    thinks it is successful and continue flushing,but the {@link ImmutableSegment} in
2403   *    {@link CompactionPipeline} has changed because
2404   *    {@link CompactingMemStore#flattenOneSegment},so the {@link ImmutableSegment} is not
2405   *    removed in fact and still remaining in {@link CompactionPipeline}.
2406   *
2407   * After HBASE-26384, the 5-6 step is changed to following, which is expected behavior,
2408   * and I add step 7-8 to test there is new segment added before retry.
2409   * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
2410   *    {@link CompactingMemStore#flattenOneSegment} change {@link CompactionPipeline#version} to
2411   *     v+1.
2412   * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
2413   *    {@link CompactionPipeline#version} is v+1, {@link CompactingMemStore#swapPipelineWithNull}
2414   *    failed and retry,{@link VersionedSegmentsList#version} returned from
2415   *    {@link CompactingMemStore#getImmutableSegments} is v+1.
2416   * 7. The write thread continues writing to {@link CompactingMemStore} and
2417   *    {@link CompactingMemStore} size exceeds {@link CompactingMemStore#getInmemoryFlushSize()},
2418   *    {@link CompactingMemStore#flushInMemory(MutableSegment)} is called and a new
2419   *    {@link ImmutableSegment} is added to the head of {@link CompactingMemStore#pipeline},
2420   *    {@link CompactionPipeline#version} is still v+1.
2421   * 8. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
2422   *    {@link CompactionPipeline#version} is still v+1,
2423   *    {@link CompactingMemStore#swapPipelineWithNull} succeeds.The new {@link ImmutableSegment}
2424   *    remained at the head of {@link CompactingMemStore#pipeline},the old is removed by
2425   *    {@link CompactingMemStore#swapPipelineWithNull}.
2426   * </pre>
2427   */
2428  @Test
2429  public void testFlattenSnapshotWriteCompactingMemeStoreConcurrently() throws Exception {
2430    Configuration conf = HBaseConfiguration.create();
2431
2432    byte[] smallValue = new byte[3];
2433    byte[] largeValue = new byte[9];
2434    final long timestamp = EnvironmentEdgeManager.currentTime();
2435    final long seqId = 100;
2436    final ExtendedCell smallCell = createCell(qf1, timestamp, seqId, smallValue);
2437    final ExtendedCell largeCell = createCell(qf2, timestamp, seqId, largeValue);
2438    int smallCellByteSize = MutableSegment.getCellLength(smallCell);
2439    int largeCellByteSize = MutableSegment.getCellLength(largeCell);
2440    int firstWriteCellByteSize = (smallCellByteSize + largeCellByteSize);
2441    int flushByteSize = firstWriteCellByteSize - 2;
2442
2443    // set CompactingMemStore.inmemoryFlushSize to flushByteSize.
2444    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore5.class.getName());
2445    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
2446    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
2447
2448    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
2449      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
2450
2451    final MyCompactingMemStore5 myCompactingMemStore = ((MyCompactingMemStore5) store.memstore);
2452    assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
2453
2454    store.add(smallCell, new NonThreadSafeMemStoreSizing());
2455    store.add(largeCell, new NonThreadSafeMemStoreSizing());
2456
2457    final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
2458    final ExtendedCell writeAgainCell1 = createCell(qf3, timestamp, seqId + 1, largeValue);
2459    final ExtendedCell writeAgainCell2 = createCell(qf4, timestamp, seqId + 1, largeValue);
2460    final int writeAgainCellByteSize =
2461      MutableSegment.getCellLength(writeAgainCell1) + MutableSegment.getCellLength(writeAgainCell2);
2462    final Thread writeAgainThread = new Thread(() -> {
2463      try {
2464        myCompactingMemStore.writeMemStoreAgainStartCyclicBarrier.await();
2465
2466        store.add(writeAgainCell1, new NonThreadSafeMemStoreSizing());
2467        store.add(writeAgainCell2, new NonThreadSafeMemStoreSizing());
2468
2469        myCompactingMemStore.writeMemStoreAgainEndCyclicBarrier.await();
2470      } catch (Throwable exception) {
2471        exceptionRef.set(exception);
2472      }
2473    });
2474    writeAgainThread.setName(MyCompactingMemStore5.WRITE_AGAIN_THREAD_NAME);
2475    writeAgainThread.start();
2476
2477    String oldThreadName = Thread.currentThread().getName();
2478    try {
2479      Thread.currentThread().setName(MyCompactingMemStore5.TAKE_SNAPSHOT_THREAD_NAME);
2480      /**
2481       * {@link CompactingMemStore#snapshot} must wait the in memory compact thread enters
2482       * {@link CompactingMemStore#flattenOneSegment},because {@link CompactingMemStore#snapshot}
2483       * would invoke {@link CompactingMemStore#stopCompaction}.
2484       */
2485      myCompactingMemStore.snapShotStartCyclicCyclicBarrier.await();
2486      MemStoreSnapshot memStoreSnapshot = myCompactingMemStore.snapshot();
2487      myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
2488      writeAgainThread.join();
2489
2490      assertTrue(memStoreSnapshot.getCellsCount() == 2);
2491      assertTrue(((int) (memStoreSnapshot.getDataSize())) == firstWriteCellByteSize);
2492      VersionedSegmentsList segments = myCompactingMemStore.getImmutableSegments();
2493      assertTrue(segments.getNumOfSegments() == 1);
2494      assertTrue(
2495        ((int) (segments.getStoreSegments().get(0).getDataSize())) == writeAgainCellByteSize);
2496      assertTrue(segments.getNumOfCells() == 2);
2497      assertTrue(myCompactingMemStore.setInMemoryCompactionFlagCounter.get() == 2);
2498      assertTrue(exceptionRef.get() == null);
2499      assertTrue(myCompactingMemStore.swapPipelineWithNullCounter.get() == 2);
2500    } finally {
2501      Thread.currentThread().setName(oldThreadName);
2502    }
2503  }
2504
2505  /**
2506   * <pre>
2507   * This test is for HBASE-26465,
2508   * test {@link DefaultMemStore#clearSnapshot} and {@link DefaultMemStore#getScanners} execute
2509   * concurrently. The threads sequence before HBASE-26465 is:
2510   * 1.The flush thread starts {@link DefaultMemStore} flushing after some cells have be added to
2511   *  {@link DefaultMemStore}.
2512   * 2.The flush thread stopping before {@link DefaultMemStore#clearSnapshot} in
2513   *   {@link HStore#updateStorefiles} after completed flushing memStore to hfile.
2514   * 3.The scan thread starts and stopping after {@link DefaultMemStore#getSnapshotSegments} in
2515   *   {@link DefaultMemStore#getScanners},here the scan thread gets the
2516   *   {@link DefaultMemStore#snapshot} which is created by the flush thread.
2517   * 4.The flush thread continues {@link DefaultMemStore#clearSnapshot} and close
2518   *   {@link DefaultMemStore#snapshot},because the reference count of the corresponding
2519   *   {@link MemStoreLABImpl} is 0, the {@link Chunk}s in corresponding {@link MemStoreLABImpl}
2520   *   are recycled.
2521   * 5.The scan thread continues {@link DefaultMemStore#getScanners},and create a
2522   *   {@link SegmentScanner} for this {@link DefaultMemStore#snapshot}, and increase the
2523   *   reference count of the corresponding {@link MemStoreLABImpl}, but {@link Chunk}s in
2524   *   corresponding {@link MemStoreLABImpl} are recycled by step 4, and these {@link Chunk}s may
2525   *   be overwritten by other write threads,which may cause serious problem.
2526   * After HBASE-26465,{@link DefaultMemStore#getScanners} and
2527   * {@link DefaultMemStore#clearSnapshot} could not execute concurrently.
2528   * </pre>
2529   */
2530  @Test
2531  public void testClearSnapshotGetScannerConcurrently() throws Exception {
2532    Configuration conf = HBaseConfiguration.create();
2533
2534    byte[] smallValue = new byte[3];
2535    byte[] largeValue = new byte[9];
2536    final long timestamp = EnvironmentEdgeManager.currentTime();
2537    final long seqId = 100;
2538    final ExtendedCell smallCell = createCell(qf1, timestamp, seqId, smallValue);
2539    final ExtendedCell largeCell = createCell(qf2, timestamp, seqId, largeValue);
2540    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
2541    quals.add(qf1);
2542    quals.add(qf2);
2543
2544    conf.set(HStore.MEMSTORE_CLASS_NAME, MyDefaultMemStore.class.getName());
2545    conf.setBoolean(WALFactory.WAL_ENABLED, false);
2546
2547    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build());
2548    MyDefaultMemStore myDefaultMemStore = (MyDefaultMemStore) (store.memstore);
2549    myDefaultMemStore.store = store;
2550
2551    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
2552    store.add(smallCell, memStoreSizing);
2553    store.add(largeCell, memStoreSizing);
2554
2555    final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
2556    final Thread flushThread = new Thread(() -> {
2557      try {
2558        flushStore(store, id++);
2559      } catch (Throwable exception) {
2560        exceptionRef.set(exception);
2561      }
2562    });
2563    flushThread.setName(MyDefaultMemStore.FLUSH_THREAD_NAME);
2564    flushThread.start();
2565
2566    String oldThreadName = Thread.currentThread().getName();
2567    StoreScanner storeScanner = null;
2568    try {
2569      Thread.currentThread().setName(MyDefaultMemStore.GET_SCANNER_THREAD_NAME);
2570
2571      /**
2572       * Wait flush thread stopping before {@link DefaultMemStore#doClearSnapshot}
2573       */
2574      myDefaultMemStore.getScannerCyclicBarrier.await();
2575
2576      storeScanner = (StoreScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1);
2577      flushThread.join();
2578
2579      if (myDefaultMemStore.shouldWait) {
2580        SegmentScanner segmentScanner = getTypeKeyValueScanner(storeScanner, SegmentScanner.class);
2581        MemStoreLABImpl memStoreLAB = (MemStoreLABImpl) (segmentScanner.segment.getMemStoreLAB());
2582        assertTrue(memStoreLAB.isClosed());
2583        assertTrue(!memStoreLAB.chunks.isEmpty());
2584        assertTrue(!memStoreLAB.isReclaimed());
2585
2586        ExtendedCell cell1 = segmentScanner.next();
2587        PrivateCellUtil.equals(smallCell, cell1);
2588        ExtendedCell cell2 = segmentScanner.next();
2589        PrivateCellUtil.equals(largeCell, cell2);
2590        assertNull(segmentScanner.next());
2591      } else {
2592        List<ExtendedCell> results = new ArrayList<>();
2593        storeScanner.next(results);
2594        assertEquals(2, results.size());
2595        PrivateCellUtil.equals(smallCell, results.get(0));
2596        PrivateCellUtil.equals(largeCell, results.get(1));
2597      }
2598      assertTrue(exceptionRef.get() == null);
2599    } finally {
2600      if (storeScanner != null) {
2601        storeScanner.close();
2602      }
2603      Thread.currentThread().setName(oldThreadName);
2604    }
2605  }
2606
2607  @SuppressWarnings("unchecked")
2608  private <T> T getTypeKeyValueScanner(StoreScanner storeScanner, Class<T> keyValueScannerClass) {
2609    List<T> resultScanners = new ArrayList<T>();
2610    for (KeyValueScanner keyValueScanner : storeScanner.currentScanners) {
2611      if (keyValueScannerClass.isInstance(keyValueScanner)) {
2612        resultScanners.add((T) keyValueScanner);
2613      }
2614    }
2615    assertTrue(resultScanners.size() == 1);
2616    return resultScanners.get(0);
2617  }
2618
2619  @Test
2620  public void testOnConfigurationChange() throws IOException {
2621    final int COMMON_MAX_FILES_TO_COMPACT = 10;
2622    final int NEW_COMMON_MAX_FILES_TO_COMPACT = 8;
2623    final int STORE_MAX_FILES_TO_COMPACT = 6;
2624
2625    // Build a table that its maxFileToCompact different from common configuration.
2626    Configuration conf = HBaseConfiguration.create();
2627    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY,
2628      COMMON_MAX_FILES_TO_COMPACT);
2629    conf.setBoolean(CACHE_DATA_ON_READ_KEY, false);
2630    conf.setBoolean(CACHE_BLOCKS_ON_WRITE_KEY, true);
2631    conf.setBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, true);
2632    ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.newBuilder(family)
2633      .setConfiguration(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY,
2634        String.valueOf(STORE_MAX_FILES_TO_COMPACT))
2635      .build();
2636    init(this.name.getMethodName(), conf, hcd);
2637
2638    // After updating common configuration, the conf in HStore itself must not be changed.
2639    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY,
2640      NEW_COMMON_MAX_FILES_TO_COMPACT);
2641    this.store.onConfigurationChange(conf);
2642
2643    assertEquals(STORE_MAX_FILES_TO_COMPACT,
2644      store.getStoreEngine().getCompactionPolicy().getConf().getMaxFilesToCompact());
2645
2646    assertEquals(conf.getBoolean(CACHE_DATA_ON_READ_KEY, DEFAULT_CACHE_DATA_ON_READ), false);
2647    assertEquals(conf.getBoolean(CACHE_BLOCKS_ON_WRITE_KEY, DEFAULT_CACHE_DATA_ON_WRITE), true);
2648    assertEquals(conf.getBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, DEFAULT_EVICT_ON_CLOSE), true);
2649
2650    // reset to default values
2651    conf.getBoolean(CACHE_DATA_ON_READ_KEY, DEFAULT_CACHE_DATA_ON_READ);
2652    conf.getBoolean(CACHE_BLOCKS_ON_WRITE_KEY, DEFAULT_CACHE_DATA_ON_WRITE);
2653    conf.getBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, DEFAULT_EVICT_ON_CLOSE);
2654    this.store.onConfigurationChange(conf);
2655  }
2656
2657  /**
2658   * This test is for HBASE-26476
2659   */
2660  @Test
2661  public void testExtendsDefaultMemStore() throws Exception {
2662    Configuration conf = HBaseConfiguration.create();
2663    conf.setBoolean(WALFactory.WAL_ENABLED, false);
2664
2665    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build());
2666    assertTrue(this.store.memstore.getClass() == DefaultMemStore.class);
2667    tearDown();
2668
2669    conf.set(HStore.MEMSTORE_CLASS_NAME, CustomDefaultMemStore.class.getName());
2670    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build());
2671    assertTrue(this.store.memstore.getClass() == CustomDefaultMemStore.class);
2672  }
2673
2674  static class CustomDefaultMemStore extends DefaultMemStore {
2675
2676    public CustomDefaultMemStore(Configuration conf, CellComparator c,
2677      RegionServicesForStores regionServices) {
2678      super(conf, c, regionServices);
2679    }
2680
2681  }
2682
2683  /**
2684   * This test is for HBASE-26488
2685   */
2686  @Test
2687  public void testMemoryLeakWhenFlushMemStoreRetrying() throws Exception {
2688    Configuration conf = HBaseConfiguration.create();
2689
2690    byte[] smallValue = new byte[3];
2691    byte[] largeValue = new byte[9];
2692    final long timestamp = EnvironmentEdgeManager.currentTime();
2693    final long seqId = 100;
2694    final ExtendedCell smallCell = createCell(qf1, timestamp, seqId, smallValue);
2695    final ExtendedCell largeCell = createCell(qf2, timestamp, seqId, largeValue);
2696    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
2697    quals.add(qf1);
2698    quals.add(qf2);
2699
2700    conf.set(HStore.MEMSTORE_CLASS_NAME, MyDefaultMemStore1.class.getName());
2701    conf.setBoolean(WALFactory.WAL_ENABLED, false);
2702    conf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY,
2703      MyDefaultStoreFlusher.class.getName());
2704
2705    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build());
2706    MyDefaultMemStore1 myDefaultMemStore = (MyDefaultMemStore1) (store.memstore);
2707    assertTrue((store.storeEngine.getStoreFlusher()) instanceof MyDefaultStoreFlusher);
2708
2709    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
2710    store.add(smallCell, memStoreSizing);
2711    store.add(largeCell, memStoreSizing);
2712    flushStore(store, id++);
2713
2714    MemStoreLABImpl memStoreLAB =
2715      (MemStoreLABImpl) (myDefaultMemStore.snapshotImmutableSegment.getMemStoreLAB());
2716    assertTrue(memStoreLAB.isClosed());
2717    assertTrue(memStoreLAB.getRefCntValue() == 0);
2718    assertTrue(memStoreLAB.isReclaimed());
2719    assertTrue(memStoreLAB.chunks.isEmpty());
2720    StoreScanner storeScanner = null;
2721    try {
2722      storeScanner = (StoreScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1);
2723      assertTrue(store.storeEngine.getStoreFileManager().getStorefileCount() == 1);
2724      assertTrue(store.memstore.size().getCellsCount() == 0);
2725      assertTrue(store.memstore.getSnapshotSize().getCellsCount() == 0);
2726      assertTrue(storeScanner.currentScanners.size() == 1);
2727      assertTrue(storeScanner.currentScanners.get(0) instanceof StoreFileScanner);
2728
2729      List<ExtendedCell> results = new ArrayList<>();
2730      storeScanner.next(results);
2731      assertEquals(2, results.size());
2732      PrivateCellUtil.equals(smallCell, results.get(0));
2733      PrivateCellUtil.equals(largeCell, results.get(1));
2734    } finally {
2735      if (storeScanner != null) {
2736        storeScanner.close();
2737      }
2738    }
2739  }
2740
2741  static class MyDefaultMemStore1 extends DefaultMemStore {
2742
2743    private ImmutableSegment snapshotImmutableSegment;
2744
2745    public MyDefaultMemStore1(Configuration conf, CellComparator c,
2746      RegionServicesForStores regionServices) {
2747      super(conf, c, regionServices);
2748    }
2749
2750    @Override
2751    public MemStoreSnapshot snapshot() {
2752      MemStoreSnapshot result = super.snapshot();
2753      this.snapshotImmutableSegment = snapshot;
2754      return result;
2755    }
2756
2757  }
2758
2759  public static class MyDefaultStoreFlusher extends DefaultStoreFlusher {
2760    private static final AtomicInteger failCounter = new AtomicInteger(1);
2761    private static final AtomicInteger counter = new AtomicInteger(0);
2762
2763    public MyDefaultStoreFlusher(Configuration conf, HStore store) {
2764      super(conf, store);
2765    }
2766
2767    @Override
2768    public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
2769      MonitoredTask status, ThroughputController throughputController,
2770      FlushLifeCycleTracker tracker, Consumer<Path> writerCreationTracker) throws IOException {
2771      counter.incrementAndGet();
2772      return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker,
2773        writerCreationTracker);
2774    }
2775
2776    @Override
2777    protected void performFlush(InternalScanner scanner, final CellSink sink,
2778      ThroughputController throughputController) throws IOException {
2779
2780      final int currentCount = counter.get();
2781      CellSink newCellSink = (cell) -> {
2782        if (currentCount <= failCounter.get()) {
2783          throw new IOException("Simulated exception by tests");
2784        }
2785        sink.append(cell);
2786      };
2787      super.performFlush(scanner, newCellSink, throughputController);
2788    }
2789  }
2790
2791  /**
2792   * This test is for HBASE-26494, test the {@link RefCnt} behaviors in {@link ImmutableMemStoreLAB}
2793   */
2794  @Test
2795  public void testImmutableMemStoreLABRefCnt() throws Exception {
2796    Configuration conf = HBaseConfiguration.create();
2797
2798    byte[] smallValue = new byte[3];
2799    byte[] largeValue = new byte[9];
2800    final long timestamp = EnvironmentEdgeManager.currentTime();
2801    final long seqId = 100;
2802    final ExtendedCell smallCell1 = createCell(qf1, timestamp, seqId, smallValue);
2803    final ExtendedCell largeCell1 = createCell(qf2, timestamp, seqId, largeValue);
2804    final ExtendedCell smallCell2 = createCell(qf3, timestamp, seqId + 1, smallValue);
2805    final ExtendedCell largeCell2 = createCell(qf4, timestamp, seqId + 1, largeValue);
2806    final ExtendedCell smallCell3 = createCell(qf5, timestamp, seqId + 2, smallValue);
2807    final ExtendedCell largeCell3 = createCell(qf6, timestamp, seqId + 2, largeValue);
2808
2809    int smallCellByteSize = MutableSegment.getCellLength(smallCell1);
2810    int largeCellByteSize = MutableSegment.getCellLength(largeCell1);
2811    int firstWriteCellByteSize = (smallCellByteSize + largeCellByteSize);
2812    int flushByteSize = firstWriteCellByteSize - 2;
2813
2814    // set CompactingMemStore.inmemoryFlushSize to flushByteSize.
2815    conf.set(HStore.MEMSTORE_CLASS_NAME, CompactingMemStore.class.getName());
2816    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
2817    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
2818    conf.setBoolean(WALFactory.WAL_ENABLED, false);
2819
2820    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
2821      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
2822
2823    final CompactingMemStore myCompactingMemStore = ((CompactingMemStore) store.memstore);
2824    assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
2825    myCompactingMemStore.allowCompaction.set(false);
2826
2827    NonThreadSafeMemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
2828    store.add(smallCell1, memStoreSizing);
2829    store.add(largeCell1, memStoreSizing);
2830    store.add(smallCell2, memStoreSizing);
2831    store.add(largeCell2, memStoreSizing);
2832    store.add(smallCell3, memStoreSizing);
2833    store.add(largeCell3, memStoreSizing);
2834    VersionedSegmentsList versionedSegmentsList = myCompactingMemStore.getImmutableSegments();
2835    assertTrue(versionedSegmentsList.getNumOfSegments() == 3);
2836    List<ImmutableSegment> segments = versionedSegmentsList.getStoreSegments();
2837    List<MemStoreLABImpl> memStoreLABs = new ArrayList<MemStoreLABImpl>(segments.size());
2838    for (ImmutableSegment segment : segments) {
2839      memStoreLABs.add((MemStoreLABImpl) segment.getMemStoreLAB());
2840    }
2841    List<KeyValueScanner> scanners1 = myCompactingMemStore.getScanners(Long.MAX_VALUE);
2842    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2843      assertTrue(memStoreLAB.getRefCntValue() == 2);
2844    }
2845
2846    myCompactingMemStore.allowCompaction.set(true);
2847    myCompactingMemStore.flushInMemory();
2848
2849    versionedSegmentsList = myCompactingMemStore.getImmutableSegments();
2850    assertTrue(versionedSegmentsList.getNumOfSegments() == 1);
2851    ImmutableMemStoreLAB immutableMemStoreLAB =
2852      (ImmutableMemStoreLAB) (versionedSegmentsList.getStoreSegments().get(0).getMemStoreLAB());
2853    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2854      assertTrue(memStoreLAB.getRefCntValue() == 2);
2855    }
2856
2857    List<KeyValueScanner> scanners2 = myCompactingMemStore.getScanners(Long.MAX_VALUE);
2858    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2859      assertTrue(memStoreLAB.getRefCntValue() == 2);
2860    }
2861    assertTrue(immutableMemStoreLAB.getRefCntValue() == 2);
2862    for (KeyValueScanner scanner : scanners1) {
2863      scanner.close();
2864    }
2865    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2866      assertTrue(memStoreLAB.getRefCntValue() == 1);
2867    }
2868    for (KeyValueScanner scanner : scanners2) {
2869      scanner.close();
2870    }
2871    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2872      assertTrue(memStoreLAB.getRefCntValue() == 1);
2873    }
2874    assertTrue(immutableMemStoreLAB.getRefCntValue() == 1);
2875    flushStore(store, id++);
2876    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2877      assertTrue(memStoreLAB.getRefCntValue() == 0);
2878    }
2879    assertTrue(immutableMemStoreLAB.getRefCntValue() == 0);
2880    assertTrue(immutableMemStoreLAB.isClosed());
2881    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2882      assertTrue(memStoreLAB.isClosed());
2883      assertTrue(memStoreLAB.isReclaimed());
2884      assertTrue(memStoreLAB.chunks.isEmpty());
2885    }
2886  }
2887
2888  private HStoreFile mockStoreFileWithLength(long length) {
2889    HStoreFile sf = mock(HStoreFile.class);
2890    StoreFileReader sfr = mock(StoreFileReader.class);
2891    when(sf.isHFile()).thenReturn(true);
2892    when(sf.getReader()).thenReturn(sfr);
2893    when(sfr.length()).thenReturn(length);
2894    return sf;
2895  }
2896
2897  private static class MyThread extends Thread {
2898    private StoreScanner scanner;
2899    private KeyValueHeap heap;
2900
2901    public MyThread(StoreScanner scanner) {
2902      this.scanner = scanner;
2903    }
2904
2905    public KeyValueHeap getHeap() {
2906      return this.heap;
2907    }
2908
2909    @Override
2910    public void run() {
2911      scanner.trySwitchToStreamRead();
2912      heap = scanner.heap;
2913    }
2914  }
2915
2916  private static class MyMemStoreCompactor extends MemStoreCompactor {
2917    private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0);
2918    private static final CountDownLatch START_COMPACTOR_LATCH = new CountDownLatch(1);
2919
2920    public MyMemStoreCompactor(CompactingMemStore compactingMemStore,
2921      MemoryCompactionPolicy compactionPolicy) throws IllegalArgumentIOException {
2922      super(compactingMemStore, compactionPolicy);
2923    }
2924
2925    @Override
2926    public boolean start() throws IOException {
2927      boolean isFirst = RUNNER_COUNT.getAndIncrement() == 0;
2928      if (isFirst) {
2929        try {
2930          START_COMPACTOR_LATCH.await();
2931          return super.start();
2932        } catch (InterruptedException ex) {
2933          throw new RuntimeException(ex);
2934        }
2935      }
2936      return super.start();
2937    }
2938  }
2939
2940  public static class MyCompactingMemStoreWithCustomCompactor extends CompactingMemStore {
2941    private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0);
2942
2943    public MyCompactingMemStoreWithCustomCompactor(Configuration conf, CellComparatorImpl c,
2944      HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
2945      throws IOException {
2946      super(conf, c, store, regionServices, compactionPolicy);
2947    }
2948
2949    @Override
2950    protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy)
2951      throws IllegalArgumentIOException {
2952      return new MyMemStoreCompactor(this, compactionPolicy);
2953    }
2954
2955    @Override
2956    protected boolean setInMemoryCompactionFlag() {
2957      boolean rval = super.setInMemoryCompactionFlag();
2958      if (rval) {
2959        RUNNER_COUNT.incrementAndGet();
2960        if (LOG.isDebugEnabled()) {
2961          LOG.debug("runner count: " + RUNNER_COUNT.get());
2962        }
2963      }
2964      return rval;
2965    }
2966  }
2967
2968  public static class MyCompactingMemStore extends CompactingMemStore {
2969    private static final AtomicBoolean START_TEST = new AtomicBoolean(false);
2970    private final CountDownLatch getScannerLatch = new CountDownLatch(1);
2971    private final CountDownLatch snapshotLatch = new CountDownLatch(1);
2972
2973    public MyCompactingMemStore(Configuration conf, CellComparatorImpl c, HStore store,
2974      RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
2975      throws IOException {
2976      super(conf, c, store, regionServices, compactionPolicy);
2977    }
2978
2979    @Override
2980    protected List<KeyValueScanner> createList(int capacity) {
2981      if (START_TEST.get()) {
2982        try {
2983          getScannerLatch.countDown();
2984          snapshotLatch.await();
2985        } catch (InterruptedException e) {
2986          throw new RuntimeException(e);
2987        }
2988      }
2989      return new ArrayList<>(capacity);
2990    }
2991
2992    @Override
2993    protected void pushActiveToPipeline(MutableSegment active, boolean checkEmpty) {
2994      if (START_TEST.get()) {
2995        try {
2996          getScannerLatch.await();
2997        } catch (InterruptedException e) {
2998          throw new RuntimeException(e);
2999        }
3000      }
3001
3002      super.pushActiveToPipeline(active, checkEmpty);
3003      if (START_TEST.get()) {
3004        snapshotLatch.countDown();
3005      }
3006    }
3007  }
3008
3009  interface MyListHook {
3010    void hook(int currentSize);
3011  }
3012
3013  private static class MyList<T> implements List<T> {
3014    private final List<T> delegatee = new ArrayList<>();
3015    private final MyListHook hookAtAdd;
3016
3017    MyList(final MyListHook hookAtAdd) {
3018      this.hookAtAdd = hookAtAdd;
3019    }
3020
3021    @Override
3022    public int size() {
3023      return delegatee.size();
3024    }
3025
3026    @Override
3027    public boolean isEmpty() {
3028      return delegatee.isEmpty();
3029    }
3030
3031    @Override
3032    public boolean contains(Object o) {
3033      return delegatee.contains(o);
3034    }
3035
3036    @Override
3037    public Iterator<T> iterator() {
3038      return delegatee.iterator();
3039    }
3040
3041    @Override
3042    public Object[] toArray() {
3043      return delegatee.toArray();
3044    }
3045
3046    @Override
3047    public <R> R[] toArray(R[] a) {
3048      return delegatee.toArray(a);
3049    }
3050
3051    @Override
3052    public boolean add(T e) {
3053      hookAtAdd.hook(size());
3054      return delegatee.add(e);
3055    }
3056
3057    @Override
3058    public boolean remove(Object o) {
3059      return delegatee.remove(o);
3060    }
3061
3062    @Override
3063    public boolean containsAll(Collection<?> c) {
3064      return delegatee.containsAll(c);
3065    }
3066
3067    @Override
3068    public boolean addAll(Collection<? extends T> c) {
3069      return delegatee.addAll(c);
3070    }
3071
3072    @Override
3073    public boolean addAll(int index, Collection<? extends T> c) {
3074      return delegatee.addAll(index, c);
3075    }
3076
3077    @Override
3078    public boolean removeAll(Collection<?> c) {
3079      return delegatee.removeAll(c);
3080    }
3081
3082    @Override
3083    public boolean retainAll(Collection<?> c) {
3084      return delegatee.retainAll(c);
3085    }
3086
3087    @Override
3088    public void clear() {
3089      delegatee.clear();
3090    }
3091
3092    @Override
3093    public T get(int index) {
3094      return delegatee.get(index);
3095    }
3096
3097    @Override
3098    public T set(int index, T element) {
3099      return delegatee.set(index, element);
3100    }
3101
3102    @Override
3103    public void add(int index, T element) {
3104      delegatee.add(index, element);
3105    }
3106
3107    @Override
3108    public T remove(int index) {
3109      return delegatee.remove(index);
3110    }
3111
3112    @Override
3113    public int indexOf(Object o) {
3114      return delegatee.indexOf(o);
3115    }
3116
3117    @Override
3118    public int lastIndexOf(Object o) {
3119      return delegatee.lastIndexOf(o);
3120    }
3121
3122    @Override
3123    public ListIterator<T> listIterator() {
3124      return delegatee.listIterator();
3125    }
3126
3127    @Override
3128    public ListIterator<T> listIterator(int index) {
3129      return delegatee.listIterator(index);
3130    }
3131
3132    @Override
3133    public List<T> subList(int fromIndex, int toIndex) {
3134      return delegatee.subList(fromIndex, toIndex);
3135    }
3136  }
3137
3138  public static class MyCompactingMemStore2 extends CompactingMemStore {
3139    private static final String LARGE_CELL_THREAD_NAME = "largeCellThread";
3140    private static final String SMALL_CELL_THREAD_NAME = "smallCellThread";
3141    private final CyclicBarrier preCyclicBarrier = new CyclicBarrier(2);
3142    private final CyclicBarrier postCyclicBarrier = new CyclicBarrier(2);
3143    private final AtomicInteger largeCellPreUpdateCounter = new AtomicInteger(0);
3144    private final AtomicInteger smallCellPreUpdateCounter = new AtomicInteger(0);
3145
3146    public MyCompactingMemStore2(Configuration conf, CellComparatorImpl cellComparator,
3147      HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
3148      throws IOException {
3149      super(conf, cellComparator, store, regionServices, compactionPolicy);
3150    }
3151
3152    @Override
3153    protected boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd,
3154      MemStoreSizing memstoreSizing) {
3155      if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) {
3156        int currentCount = largeCellPreUpdateCounter.incrementAndGet();
3157        if (currentCount <= 1) {
3158          try {
3159            /**
3160             * smallCellThread enters CompactingMemStore.checkAndAddToActiveSize first, then
3161             * largeCellThread enters CompactingMemStore.checkAndAddToActiveSize, and then
3162             * largeCellThread invokes flushInMemory.
3163             */
3164            preCyclicBarrier.await();
3165          } catch (Throwable e) {
3166            throw new RuntimeException(e);
3167          }
3168        }
3169      }
3170
3171      boolean returnValue = super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing);
3172      if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) {
3173        try {
3174          preCyclicBarrier.await();
3175        } catch (Throwable e) {
3176          throw new RuntimeException(e);
3177        }
3178      }
3179      return returnValue;
3180    }
3181
3182    @Override
3183    protected void doAdd(MutableSegment currentActive, ExtendedCell cell,
3184      MemStoreSizing memstoreSizing) {
3185      if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) {
3186        try {
3187          /**
3188           * After largeCellThread finished flushInMemory method, smallCellThread can add cell to
3189           * currentActive . That is to say when largeCellThread called flushInMemory method,
3190           * currentActive has no cell.
3191           */
3192          postCyclicBarrier.await();
3193        } catch (Throwable e) {
3194          throw new RuntimeException(e);
3195        }
3196      }
3197      super.doAdd(currentActive, cell, memstoreSizing);
3198    }
3199
3200    @Override
3201    protected void flushInMemory(MutableSegment currentActiveMutableSegment) {
3202      super.flushInMemory(currentActiveMutableSegment);
3203      if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) {
3204        if (largeCellPreUpdateCounter.get() <= 1) {
3205          try {
3206            postCyclicBarrier.await();
3207          } catch (Throwable e) {
3208            throw new RuntimeException(e);
3209          }
3210        }
3211      }
3212    }
3213
3214  }
3215
3216  public static class MyCompactingMemStore3 extends CompactingMemStore {
3217    private static final String LARGE_CELL_THREAD_NAME = "largeCellThread";
3218    private static final String SMALL_CELL_THREAD_NAME = "smallCellThread";
3219
3220    private final CyclicBarrier preCyclicBarrier = new CyclicBarrier(2);
3221    private final CyclicBarrier postCyclicBarrier = new CyclicBarrier(2);
3222    private final AtomicInteger flushCounter = new AtomicInteger(0);
3223    private static final int CELL_COUNT = 5;
3224    private boolean flushByteSizeLessThanSmallAndLargeCellSize = true;
3225
3226    public MyCompactingMemStore3(Configuration conf, CellComparatorImpl cellComparator,
3227      HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
3228      throws IOException {
3229      super(conf, cellComparator, store, regionServices, compactionPolicy);
3230    }
3231
3232    @Override
3233    protected boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd,
3234      MemStoreSizing memstoreSizing) {
3235      if (!flushByteSizeLessThanSmallAndLargeCellSize) {
3236        return super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing);
3237      }
3238      if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) {
3239        try {
3240          preCyclicBarrier.await();
3241        } catch (Throwable e) {
3242          throw new RuntimeException(e);
3243        }
3244      }
3245
3246      boolean returnValue = super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing);
3247      if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) {
3248        try {
3249          preCyclicBarrier.await();
3250        } catch (Throwable e) {
3251          throw new RuntimeException(e);
3252        }
3253      }
3254      return returnValue;
3255    }
3256
3257    @Override
3258    protected void postUpdate(MutableSegment currentActiveMutableSegment) {
3259      super.postUpdate(currentActiveMutableSegment);
3260      if (!flushByteSizeLessThanSmallAndLargeCellSize) {
3261        try {
3262          postCyclicBarrier.await();
3263        } catch (Throwable e) {
3264          throw new RuntimeException(e);
3265        }
3266        return;
3267      }
3268
3269      if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) {
3270        try {
3271          postCyclicBarrier.await();
3272        } catch (Throwable e) {
3273          throw new RuntimeException(e);
3274        }
3275      }
3276    }
3277
3278    @Override
3279    protected void flushInMemory(MutableSegment currentActiveMutableSegment) {
3280      super.flushInMemory(currentActiveMutableSegment);
3281      flushCounter.incrementAndGet();
3282      if (!flushByteSizeLessThanSmallAndLargeCellSize) {
3283        return;
3284      }
3285
3286      assertTrue(Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME));
3287      try {
3288        postCyclicBarrier.await();
3289      } catch (Throwable e) {
3290        throw new RuntimeException(e);
3291      }
3292
3293    }
3294
3295    void disableCompaction() {
3296      allowCompaction.set(false);
3297    }
3298
3299    void enableCompaction() {
3300      allowCompaction.set(true);
3301    }
3302
3303  }
3304
3305  public static class MyCompactingMemStore4 extends CompactingMemStore {
3306    private static final String TAKE_SNAPSHOT_THREAD_NAME = "takeSnapShotThread";
3307    /**
3308     * {@link CompactingMemStore#flattenOneSegment} must execute after
3309     * {@link CompactingMemStore#getImmutableSegments}
3310     */
3311    private final CyclicBarrier flattenOneSegmentPreCyclicBarrier = new CyclicBarrier(2);
3312    /**
3313     * Only after {@link CompactingMemStore#flattenOneSegment} completed,
3314     * {@link CompactingMemStore#swapPipelineWithNull} could execute.
3315     */
3316    private final CyclicBarrier flattenOneSegmentPostCyclicBarrier = new CyclicBarrier(2);
3317    /**
3318     * Only the in memory compact thread enters {@link CompactingMemStore#flattenOneSegment},the
3319     * snapshot thread starts {@link CompactingMemStore#snapshot},because
3320     * {@link CompactingMemStore#snapshot} would invoke {@link CompactingMemStore#stopCompaction}.
3321     */
3322    private final CyclicBarrier snapShotStartCyclicCyclicBarrier = new CyclicBarrier(2);
3323    /**
3324     * To wait for {@link CompactingMemStore.InMemoryCompactionRunnable} stopping.
3325     */
3326    private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2);
3327    private final AtomicInteger getImmutableSegmentsListCounter = new AtomicInteger(0);
3328    private final AtomicInteger swapPipelineWithNullCounter = new AtomicInteger(0);
3329    private final AtomicInteger flattenOneSegmentCounter = new AtomicInteger(0);
3330    private final AtomicInteger setInMemoryCompactionFlagCounter = new AtomicInteger(0);
3331
3332    public MyCompactingMemStore4(Configuration conf, CellComparatorImpl cellComparator,
3333      HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
3334      throws IOException {
3335      super(conf, cellComparator, store, regionServices, compactionPolicy);
3336    }
3337
3338    @Override
3339    public VersionedSegmentsList getImmutableSegments() {
3340      VersionedSegmentsList result = super.getImmutableSegments();
3341      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
3342        int currentCount = getImmutableSegmentsListCounter.incrementAndGet();
3343        if (currentCount <= 1) {
3344          try {
3345            flattenOneSegmentPreCyclicBarrier.await();
3346          } catch (Throwable e) {
3347            throw new RuntimeException(e);
3348          }
3349        }
3350      }
3351      return result;
3352    }
3353
3354    @Override
3355    protected boolean swapPipelineWithNull(VersionedSegmentsList segments) {
3356      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
3357        int currentCount = swapPipelineWithNullCounter.incrementAndGet();
3358        if (currentCount <= 1) {
3359          try {
3360            flattenOneSegmentPostCyclicBarrier.await();
3361          } catch (Throwable e) {
3362            throw new RuntimeException(e);
3363          }
3364        }
3365      }
3366      boolean result = super.swapPipelineWithNull(segments);
3367      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
3368        int currentCount = swapPipelineWithNullCounter.get();
3369        if (currentCount <= 1) {
3370          assertTrue(!result);
3371        }
3372        if (currentCount == 2) {
3373          assertTrue(result);
3374        }
3375      }
3376      return result;
3377
3378    }
3379
3380    @Override
3381    public void flattenOneSegment(long requesterVersion, Action action) {
3382      int currentCount = flattenOneSegmentCounter.incrementAndGet();
3383      if (currentCount <= 1) {
3384        try {
3385          /**
3386           * {@link CompactingMemStore#snapshot} could start.
3387           */
3388          snapShotStartCyclicCyclicBarrier.await();
3389          flattenOneSegmentPreCyclicBarrier.await();
3390        } catch (Throwable e) {
3391          throw new RuntimeException(e);
3392        }
3393      }
3394      super.flattenOneSegment(requesterVersion, action);
3395      if (currentCount <= 1) {
3396        try {
3397          flattenOneSegmentPostCyclicBarrier.await();
3398        } catch (Throwable e) {
3399          throw new RuntimeException(e);
3400        }
3401      }
3402    }
3403
3404    @Override
3405    protected boolean setInMemoryCompactionFlag() {
3406      boolean result = super.setInMemoryCompactionFlag();
3407      assertTrue(result);
3408      setInMemoryCompactionFlagCounter.incrementAndGet();
3409      return result;
3410    }
3411
3412    @Override
3413    void inMemoryCompaction() {
3414      try {
3415        super.inMemoryCompaction();
3416      } finally {
3417        try {
3418          inMemoryCompactionEndCyclicBarrier.await();
3419        } catch (Throwable e) {
3420          throw new RuntimeException(e);
3421        }
3422
3423      }
3424    }
3425
3426  }
3427
3428  public static class MyCompactingMemStore5 extends CompactingMemStore {
3429    private static final String TAKE_SNAPSHOT_THREAD_NAME = "takeSnapShotThread";
3430    private static final String WRITE_AGAIN_THREAD_NAME = "writeAgainThread";
3431    /**
3432     * {@link CompactingMemStore#flattenOneSegment} must execute after
3433     * {@link CompactingMemStore#getImmutableSegments}
3434     */
3435    private final CyclicBarrier flattenOneSegmentPreCyclicBarrier = new CyclicBarrier(2);
3436    /**
3437     * Only after {@link CompactingMemStore#flattenOneSegment} completed,
3438     * {@link CompactingMemStore#swapPipelineWithNull} could execute.
3439     */
3440    private final CyclicBarrier flattenOneSegmentPostCyclicBarrier = new CyclicBarrier(2);
3441    /**
3442     * Only the in memory compact thread enters {@link CompactingMemStore#flattenOneSegment},the
3443     * snapshot thread starts {@link CompactingMemStore#snapshot},because
3444     * {@link CompactingMemStore#snapshot} would invoke {@link CompactingMemStore#stopCompaction}.
3445     */
3446    private final CyclicBarrier snapShotStartCyclicCyclicBarrier = new CyclicBarrier(2);
3447    /**
3448     * To wait for {@link CompactingMemStore.InMemoryCompactionRunnable} stopping.
3449     */
3450    private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2);
3451    private final AtomicInteger getImmutableSegmentsListCounter = new AtomicInteger(0);
3452    private final AtomicInteger swapPipelineWithNullCounter = new AtomicInteger(0);
3453    private final AtomicInteger flattenOneSegmentCounter = new AtomicInteger(0);
3454    private final AtomicInteger setInMemoryCompactionFlagCounter = new AtomicInteger(0);
3455    /**
3456     * Only the snapshot thread retry {@link CompactingMemStore#swapPipelineWithNull}, writeAgain
3457     * thread could start.
3458     */
3459    private final CyclicBarrier writeMemStoreAgainStartCyclicBarrier = new CyclicBarrier(2);
3460    /**
3461     * This is used for snapshot thread,writeAgain thread and in memory compact thread. Only the
3462     * writeAgain thread completes, {@link CompactingMemStore#swapPipelineWithNull} would
3463     * execute,and in memory compact thread would exit,because we expect that in memory compact
3464     * executing only once.
3465     */
3466    private final CyclicBarrier writeMemStoreAgainEndCyclicBarrier = new CyclicBarrier(3);
3467
3468    public MyCompactingMemStore5(Configuration conf, CellComparatorImpl cellComparator,
3469      HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
3470      throws IOException {
3471      super(conf, cellComparator, store, regionServices, compactionPolicy);
3472    }
3473
3474    @Override
3475    public VersionedSegmentsList getImmutableSegments() {
3476      VersionedSegmentsList result = super.getImmutableSegments();
3477      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
3478        int currentCount = getImmutableSegmentsListCounter.incrementAndGet();
3479        if (currentCount <= 1) {
3480          try {
3481            flattenOneSegmentPreCyclicBarrier.await();
3482          } catch (Throwable e) {
3483            throw new RuntimeException(e);
3484          }
3485        }
3486
3487      }
3488
3489      return result;
3490    }
3491
3492    @Override
3493    protected boolean swapPipelineWithNull(VersionedSegmentsList segments) {
3494      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
3495        int currentCount = swapPipelineWithNullCounter.incrementAndGet();
3496        if (currentCount <= 1) {
3497          try {
3498            flattenOneSegmentPostCyclicBarrier.await();
3499          } catch (Throwable e) {
3500            throw new RuntimeException(e);
3501          }
3502        }
3503
3504        if (currentCount == 2) {
3505          try {
3506            /**
3507             * Only the snapshot thread retry {@link CompactingMemStore#swapPipelineWithNull},
3508             * writeAgain thread could start.
3509             */
3510            writeMemStoreAgainStartCyclicBarrier.await();
3511            /**
3512             * Only the writeAgain thread completes, retry
3513             * {@link CompactingMemStore#swapPipelineWithNull} would execute.
3514             */
3515            writeMemStoreAgainEndCyclicBarrier.await();
3516          } catch (Throwable e) {
3517            throw new RuntimeException(e);
3518          }
3519        }
3520
3521      }
3522      boolean result = super.swapPipelineWithNull(segments);
3523      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
3524        int currentCount = swapPipelineWithNullCounter.get();
3525        if (currentCount <= 1) {
3526          assertTrue(!result);
3527        }
3528        if (currentCount == 2) {
3529          assertTrue(result);
3530        }
3531      }
3532      return result;
3533
3534    }
3535
3536    @Override
3537    public void flattenOneSegment(long requesterVersion, Action action) {
3538      int currentCount = flattenOneSegmentCounter.incrementAndGet();
3539      if (currentCount <= 1) {
3540        try {
3541          /**
3542           * {@link CompactingMemStore#snapshot} could start.
3543           */
3544          snapShotStartCyclicCyclicBarrier.await();
3545          flattenOneSegmentPreCyclicBarrier.await();
3546        } catch (Throwable e) {
3547          throw new RuntimeException(e);
3548        }
3549      }
3550      super.flattenOneSegment(requesterVersion, action);
3551      if (currentCount <= 1) {
3552        try {
3553          flattenOneSegmentPostCyclicBarrier.await();
3554          /**
3555           * Only the writeAgain thread completes, in memory compact thread would exit,because we
3556           * expect that in memory compact executing only once.
3557           */
3558          writeMemStoreAgainEndCyclicBarrier.await();
3559        } catch (Throwable e) {
3560          throw new RuntimeException(e);
3561        }
3562
3563      }
3564    }
3565
3566    @Override
3567    protected boolean setInMemoryCompactionFlag() {
3568      boolean result = super.setInMemoryCompactionFlag();
3569      int count = setInMemoryCompactionFlagCounter.incrementAndGet();
3570      if (count <= 1) {
3571        assertTrue(result);
3572      }
3573      if (count == 2) {
3574        assertTrue(!result);
3575      }
3576      return result;
3577    }
3578
3579    @Override
3580    void inMemoryCompaction() {
3581      try {
3582        super.inMemoryCompaction();
3583      } finally {
3584        try {
3585          inMemoryCompactionEndCyclicBarrier.await();
3586        } catch (Throwable e) {
3587          throw new RuntimeException(e);
3588        }
3589
3590      }
3591    }
3592  }
3593
3594  public static class MyCompactingMemStore6 extends CompactingMemStore {
3595    private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2);
3596
3597    public MyCompactingMemStore6(Configuration conf, CellComparatorImpl cellComparator,
3598      HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
3599      throws IOException {
3600      super(conf, cellComparator, store, regionServices, compactionPolicy);
3601    }
3602
3603    @Override
3604    void inMemoryCompaction() {
3605      try {
3606        super.inMemoryCompaction();
3607      } finally {
3608        try {
3609          inMemoryCompactionEndCyclicBarrier.await();
3610        } catch (Throwable e) {
3611          throw new RuntimeException(e);
3612        }
3613
3614      }
3615    }
3616  }
3617
3618  public static class MyDefaultMemStore extends DefaultMemStore {
3619    private static final String GET_SCANNER_THREAD_NAME = "getScannerMyThread";
3620    private static final String FLUSH_THREAD_NAME = "flushMyThread";
3621    /**
3622     * Only when flush thread enters {@link DefaultMemStore#doClearSnapShot}, getScanner thread
3623     * could start.
3624     */
3625    private final CyclicBarrier getScannerCyclicBarrier = new CyclicBarrier(2);
3626    /**
3627     * Used by getScanner thread notifies flush thread {@link DefaultMemStore#getSnapshotSegments}
3628     * completed, {@link DefaultMemStore#doClearSnapShot} could continue.
3629     */
3630    private final CyclicBarrier preClearSnapShotCyclicBarrier = new CyclicBarrier(2);
3631    /**
3632     * Used by flush thread notifies getScanner thread {@link DefaultMemStore#doClearSnapShot}
3633     * completed, {@link DefaultMemStore#getScanners} could continue.
3634     */
3635    private final CyclicBarrier postClearSnapShotCyclicBarrier = new CyclicBarrier(2);
3636    private final AtomicInteger getSnapshotSegmentsCounter = new AtomicInteger(0);
3637    private final AtomicInteger clearSnapshotCounter = new AtomicInteger(0);
3638    private volatile boolean shouldWait = true;
3639    private volatile HStore store = null;
3640
3641    public MyDefaultMemStore(Configuration conf, CellComparator cellComparator,
3642      RegionServicesForStores regionServices) throws IOException {
3643      super(conf, cellComparator, regionServices);
3644    }
3645
3646    @Override
3647    protected List<Segment> getSnapshotSegments() {
3648
3649      List<Segment> result = super.getSnapshotSegments();
3650
3651      if (Thread.currentThread().getName().equals(GET_SCANNER_THREAD_NAME)) {
3652        int currentCount = getSnapshotSegmentsCounter.incrementAndGet();
3653        if (currentCount == 1) {
3654          if (this.shouldWait) {
3655            try {
3656              /**
3657               * Notify flush thread {@link DefaultMemStore#getSnapshotSegments} completed,
3658               * {@link DefaultMemStore#doClearSnapShot} could continue.
3659               */
3660              preClearSnapShotCyclicBarrier.await();
3661              /**
3662               * Wait for {@link DefaultMemStore#doClearSnapShot} completed.
3663               */
3664              postClearSnapShotCyclicBarrier.await();
3665
3666            } catch (Throwable e) {
3667              throw new RuntimeException(e);
3668            }
3669          }
3670        }
3671      }
3672      return result;
3673    }
3674
3675    @Override
3676    protected void doClearSnapShot() {
3677      if (Thread.currentThread().getName().equals(FLUSH_THREAD_NAME)) {
3678        int currentCount = clearSnapshotCounter.incrementAndGet();
3679        if (currentCount == 1) {
3680          try {
3681            if (
3682              ((ReentrantReadWriteLock) store.getStoreEngine().getLock())
3683                .isWriteLockedByCurrentThread()
3684            ) {
3685              shouldWait = false;
3686            }
3687            /**
3688             * Only when flush thread enters {@link DefaultMemStore#doClearSnapShot}, getScanner
3689             * thread could start.
3690             */
3691            getScannerCyclicBarrier.await();
3692
3693            if (shouldWait) {
3694              /**
3695               * Wait for {@link DefaultMemStore#getSnapshotSegments} completed.
3696               */
3697              preClearSnapShotCyclicBarrier.await();
3698            }
3699          } catch (Throwable e) {
3700            throw new RuntimeException(e);
3701          }
3702        }
3703      }
3704      super.doClearSnapShot();
3705
3706      if (Thread.currentThread().getName().equals(FLUSH_THREAD_NAME)) {
3707        int currentCount = clearSnapshotCounter.get();
3708        if (currentCount == 1) {
3709          if (shouldWait) {
3710            try {
3711              /**
3712               * Notify getScanner thread {@link DefaultMemStore#doClearSnapShot} completed,
3713               * {@link DefaultMemStore#getScanners} could continue.
3714               */
3715              postClearSnapShotCyclicBarrier.await();
3716            } catch (Throwable e) {
3717              throw new RuntimeException(e);
3718            }
3719          }
3720        }
3721      }
3722    }
3723  }
3724}