001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertNull;
024import static org.junit.Assert.assertTrue;
025import static org.junit.Assert.fail;
026import static org.mockito.ArgumentMatchers.any;
027import static org.mockito.Mockito.mock;
028import static org.mockito.Mockito.spy;
029import static org.mockito.Mockito.times;
030import static org.mockito.Mockito.verify;
031import static org.mockito.Mockito.when;
032
033import java.io.IOException;
034import java.lang.ref.SoftReference;
035import java.security.PrivilegedExceptionAction;
036import java.util.ArrayList;
037import java.util.Arrays;
038import java.util.Collection;
039import java.util.Collections;
040import java.util.Iterator;
041import java.util.List;
042import java.util.ListIterator;
043import java.util.NavigableSet;
044import java.util.TreeSet;
045import java.util.concurrent.ConcurrentSkipListSet;
046import java.util.concurrent.CountDownLatch;
047import java.util.concurrent.CyclicBarrier;
048import java.util.concurrent.ExecutorService;
049import java.util.concurrent.Executors;
050import java.util.concurrent.ThreadPoolExecutor;
051import java.util.concurrent.TimeUnit;
052import java.util.concurrent.atomic.AtomicBoolean;
053import java.util.concurrent.atomic.AtomicInteger;
054import java.util.concurrent.atomic.AtomicLong;
055import java.util.concurrent.atomic.AtomicReference;
056import java.util.concurrent.locks.ReentrantReadWriteLock;
057import java.util.function.IntBinaryOperator;
058import org.apache.hadoop.conf.Configuration;
059import org.apache.hadoop.fs.FSDataOutputStream;
060import org.apache.hadoop.fs.FileStatus;
061import org.apache.hadoop.fs.FileSystem;
062import org.apache.hadoop.fs.FilterFileSystem;
063import org.apache.hadoop.fs.LocalFileSystem;
064import org.apache.hadoop.fs.Path;
065import org.apache.hadoop.fs.permission.FsPermission;
066import org.apache.hadoop.hbase.Cell;
067import org.apache.hadoop.hbase.CellBuilderFactory;
068import org.apache.hadoop.hbase.CellBuilderType;
069import org.apache.hadoop.hbase.CellComparator;
070import org.apache.hadoop.hbase.CellComparatorImpl;
071import org.apache.hadoop.hbase.CellUtil;
072import org.apache.hadoop.hbase.HBaseClassTestRule;
073import org.apache.hadoop.hbase.HBaseConfiguration;
074import org.apache.hadoop.hbase.HBaseTestingUtil;
075import org.apache.hadoop.hbase.HConstants;
076import org.apache.hadoop.hbase.KeyValue;
077import org.apache.hadoop.hbase.MemoryCompactionPolicy;
078import org.apache.hadoop.hbase.PrivateCellUtil;
079import org.apache.hadoop.hbase.TableName;
080import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
081import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
082import org.apache.hadoop.hbase.client.Get;
083import org.apache.hadoop.hbase.client.RegionInfo;
084import org.apache.hadoop.hbase.client.RegionInfoBuilder;
085import org.apache.hadoop.hbase.client.Scan;
086import org.apache.hadoop.hbase.client.Scan.ReadType;
087import org.apache.hadoop.hbase.client.TableDescriptor;
088import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
089import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
090import org.apache.hadoop.hbase.filter.Filter;
091import org.apache.hadoop.hbase.filter.FilterBase;
092import org.apache.hadoop.hbase.io.compress.Compression;
093import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
094import org.apache.hadoop.hbase.io.hfile.CacheConfig;
095import org.apache.hadoop.hbase.io.hfile.HFile;
096import org.apache.hadoop.hbase.io.hfile.HFileContext;
097import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
098import org.apache.hadoop.hbase.monitoring.MonitoredTask;
099import org.apache.hadoop.hbase.quotas.RegionSizeStoreImpl;
100import org.apache.hadoop.hbase.regionserver.MemStoreCompactionStrategy.Action;
101import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
102import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
103import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
104import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
105import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
106import org.apache.hadoop.hbase.security.User;
107import org.apache.hadoop.hbase.testclassification.MediumTests;
108import org.apache.hadoop.hbase.testclassification.RegionServerTests;
109import org.apache.hadoop.hbase.util.Bytes;
110import org.apache.hadoop.hbase.util.CommonFSUtils;
111import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
112import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
113import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
114import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
115import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
116import org.apache.hadoop.hbase.wal.WALFactory;
117import org.apache.hadoop.util.Progressable;
118import org.junit.After;
119import org.junit.AfterClass;
120import org.junit.Before;
121import org.junit.ClassRule;
122import org.junit.Rule;
123import org.junit.Test;
124import org.junit.experimental.categories.Category;
125import org.junit.rules.TestName;
126import org.mockito.Mockito;
127import org.slf4j.Logger;
128import org.slf4j.LoggerFactory;
129
130import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
131
132/**
133 * Test class for the HStore
134 */
135@Category({ RegionServerTests.class, MediumTests.class })
136public class TestHStore {
137
138  @ClassRule
139  public static final HBaseClassTestRule CLASS_RULE =
140      HBaseClassTestRule.forClass(TestHStore.class);
141
142  private static final Logger LOG = LoggerFactory.getLogger(TestHStore.class);
143  @Rule
144  public TestName name = new TestName();
145
146  HRegion region;
147  HStore store;
148  byte [] table = Bytes.toBytes("table");
149  byte [] family = Bytes.toBytes("family");
150
151  byte [] row = Bytes.toBytes("row");
152  byte [] row2 = Bytes.toBytes("row2");
153  byte [] qf1 = Bytes.toBytes("qf1");
154  byte [] qf2 = Bytes.toBytes("qf2");
155  byte [] qf3 = Bytes.toBytes("qf3");
156  byte [] qf4 = Bytes.toBytes("qf4");
157  byte [] qf5 = Bytes.toBytes("qf5");
158  byte [] qf6 = Bytes.toBytes("qf6");
159
160  NavigableSet<byte[]> qualifiers = new ConcurrentSkipListSet<>(Bytes.BYTES_COMPARATOR);
161
162  List<Cell> expected = new ArrayList<>();
163  List<Cell> result = new ArrayList<>();
164
165  long id = EnvironmentEdgeManager.currentTime();
166  Get get = new Get(row);
167
168  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
169  private static final String DIR = TEST_UTIL.getDataTestDir("TestStore").toString();
170
171  @Before
172  public void setUp() throws IOException {
173    qualifiers.clear();
174    qualifiers.add(qf1);
175    qualifiers.add(qf3);
176    qualifiers.add(qf5);
177
178    Iterator<byte[]> iter = qualifiers.iterator();
179    while(iter.hasNext()){
180      byte [] next = iter.next();
181      expected.add(new KeyValue(row, family, next, 1, (byte[])null));
182      get.addColumn(family, next);
183    }
184  }
185
186  private void init(String methodName) throws IOException {
187    init(methodName, TEST_UTIL.getConfiguration());
188  }
189
190  private HStore init(String methodName, Configuration conf) throws IOException {
191    // some of the tests write 4 versions and then flush
192    // (with HBASE-4241, lower versions are collected on flush)
193    return init(methodName, conf,
194      ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(4).build());
195  }
196
197  private HStore init(String methodName, Configuration conf, ColumnFamilyDescriptor hcd)
198      throws IOException {
199    return init(methodName, conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), hcd);
200  }
201
202  private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder,
203      ColumnFamilyDescriptor hcd) throws IOException {
204    return init(methodName, conf, builder, hcd, null);
205  }
206
207  private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder,
208      ColumnFamilyDescriptor hcd, MyStoreHook hook) throws IOException {
209    return init(methodName, conf, builder, hcd, hook, false);
210  }
211
212  private void initHRegion(String methodName, Configuration conf, TableDescriptorBuilder builder,
213      ColumnFamilyDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException {
214    TableDescriptor htd = builder.setColumnFamily(hcd).build();
215    Path basedir = new Path(DIR + methodName);
216    Path tableDir = CommonFSUtils.getTableDir(basedir, htd.getTableName());
217    final Path logdir = new Path(basedir, AbstractFSWALProvider.getWALDirectoryName(methodName));
218
219    FileSystem fs = FileSystem.get(conf);
220
221    fs.delete(logdir, true);
222    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false,
223      MemStoreLABImpl.CHUNK_SIZE_DEFAULT, 1, 0,
224      null, MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
225    RegionInfo info = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
226    Configuration walConf = new Configuration(conf);
227    CommonFSUtils.setRootDir(walConf, basedir);
228    WALFactory wals = new WALFactory(walConf, methodName);
229    region = new HRegion(new HRegionFileSystem(conf, fs, tableDir, info), wals.getWAL(info), conf,
230        htd, null);
231    region.regionServicesForStores = Mockito.spy(region.regionServicesForStores);
232    ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
233    Mockito.when(region.regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool);
234  }
235
236  private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder,
237      ColumnFamilyDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException {
238    initHRegion(methodName, conf, builder, hcd, hook, switchToPread);
239    if (hook == null) {
240      store = new HStore(region, hcd, conf, false);
241    } else {
242      store = new MyStore(region, hcd, conf, hook, switchToPread);
243    }
244    region.stores.put(store.getColumnFamilyDescriptor().getName(), store);
245    return store;
246  }
247
248  /**
249   * Test we do not lose data if we fail a flush and then close.
250   * Part of HBase-10466
251   */
252  @Test
253  public void testFlushSizeSizing() throws Exception {
254    LOG.info("Setting up a faulty file system that cannot write in " + this.name.getMethodName());
255    final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
256    // Only retry once.
257    conf.setInt("hbase.hstore.flush.retries.number", 1);
258    User user = User.createUserForTesting(conf, this.name.getMethodName(),
259      new String[]{"foo"});
260    // Inject our faulty LocalFileSystem
261    conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
262    user.runAs(new PrivilegedExceptionAction<Object>() {
263      @Override
264      public Object run() throws Exception {
265        // Make sure it worked (above is sensitive to caching details in hadoop core)
266        FileSystem fs = FileSystem.get(conf);
267        assertEquals(FaultyFileSystem.class, fs.getClass());
268        FaultyFileSystem ffs = (FaultyFileSystem)fs;
269
270        // Initialize region
271        init(name.getMethodName(), conf);
272
273        MemStoreSize mss = store.memstore.getFlushableSize();
274        assertEquals(0, mss.getDataSize());
275        LOG.info("Adding some data");
276        MemStoreSizing kvSize = new NonThreadSafeMemStoreSizing();
277        store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), kvSize);
278        // add the heap size of active (mutable) segment
279        kvSize.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0, 0);
280        mss = store.memstore.getFlushableSize();
281        assertEquals(kvSize.getMemStoreSize(), mss);
282        // Flush.  Bug #1 from HBASE-10466.  Make sure size calculation on failed flush is right.
283        try {
284          LOG.info("Flushing");
285          flushStore(store, id++);
286          fail("Didn't bubble up IOE!");
287        } catch (IOException ioe) {
288          assertTrue(ioe.getMessage().contains("Fault injected"));
289        }
290        // due to snapshot, change mutable to immutable segment
291        kvSize.incMemStoreSize(0,
292          CSLMImmutableSegment.DEEP_OVERHEAD_CSLM - MutableSegment.DEEP_OVERHEAD, 0, 0);
293        mss = store.memstore.getFlushableSize();
294        assertEquals(kvSize.getMemStoreSize(), mss);
295        MemStoreSizing kvSize2 = new NonThreadSafeMemStoreSizing();
296        store.add(new KeyValue(row, family, qf2, 2, (byte[]) null), kvSize2);
297        kvSize2.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0, 0);
298        // Even though we add a new kv, we expect the flushable size to be 'same' since we have
299        // not yet cleared the snapshot -- the above flush failed.
300        assertEquals(kvSize.getMemStoreSize(), mss);
301        ffs.fault.set(false);
302        flushStore(store, id++);
303        mss = store.memstore.getFlushableSize();
304        // Size should be the foreground kv size.
305        assertEquals(kvSize2.getMemStoreSize(), mss);
306        flushStore(store, id++);
307        mss = store.memstore.getFlushableSize();
308        assertEquals(0, mss.getDataSize());
309        assertEquals(MutableSegment.DEEP_OVERHEAD, mss.getHeapSize());
310        return null;
311      }
312    });
313  }
314
315  /**
316   * Verify that compression and data block encoding are respected by the
317   * createWriter method, used on store flush.
318   */
319  @Test
320  public void testCreateWriter() throws Exception {
321    Configuration conf = HBaseConfiguration.create();
322    FileSystem fs = FileSystem.get(conf);
323
324    ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.newBuilder(family)
325        .setCompressionType(Compression.Algorithm.GZ).setDataBlockEncoding(DataBlockEncoding.DIFF)
326        .build();
327    init(name.getMethodName(), conf, hcd);
328
329    // Test createWriter
330    StoreFileWriter writer = store.getStoreEngine()
331      .createWriter(CreateStoreFileWriterParams.create().maxKeyCount(4)
332        .compression(hcd.getCompressionType()).isCompaction(false).includeMVCCReadpoint(true)
333        .includesTag(false).shouldDropBehind(false));
334    Path path = writer.getPath();
335    writer.append(new KeyValue(row, family, qf1, Bytes.toBytes(1)));
336    writer.append(new KeyValue(row, family, qf2, Bytes.toBytes(2)));
337    writer.append(new KeyValue(row2, family, qf1, Bytes.toBytes(3)));
338    writer.append(new KeyValue(row2, family, qf2, Bytes.toBytes(4)));
339    writer.close();
340
341    // Verify that compression and encoding settings are respected
342    HFile.Reader reader = HFile.createReader(fs, path, new CacheConfig(conf), true, conf);
343    assertEquals(hcd.getCompressionType(), reader.getTrailer().getCompressionCodec());
344    assertEquals(hcd.getDataBlockEncoding(), reader.getDataBlockEncoding());
345    reader.close();
346  }
347
348  @Test
349  public void testDeleteExpiredStoreFiles() throws Exception {
350    testDeleteExpiredStoreFiles(0);
351    testDeleteExpiredStoreFiles(1);
352  }
353
354  /**
355   * @param minVersions the MIN_VERSIONS for the column family
356   */
357  public void testDeleteExpiredStoreFiles(int minVersions) throws Exception {
358    int storeFileNum = 4;
359    int ttl = 4;
360    IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
361    EnvironmentEdgeManagerTestHelper.injectEdge(edge);
362
363    Configuration conf = HBaseConfiguration.create();
364    // Enable the expired store file deletion
365    conf.setBoolean("hbase.store.delete.expired.storefile", true);
366    // Set the compaction threshold higher to avoid normal compactions.
367    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 5);
368
369    init(name.getMethodName() + "-" + minVersions, conf, ColumnFamilyDescriptorBuilder
370        .newBuilder(family).setMinVersions(minVersions).setTimeToLive(ttl).build());
371
372    long storeTtl = this.store.getScanInfo().getTtl();
373    long sleepTime = storeTtl / storeFileNum;
374    long timeStamp;
375    // There are 4 store files and the max time stamp difference among these
376    // store files will be (this.store.ttl / storeFileNum)
377    for (int i = 1; i <= storeFileNum; i++) {
378      LOG.info("Adding some data for the store file #" + i);
379      timeStamp = EnvironmentEdgeManager.currentTime();
380      this.store.add(new KeyValue(row, family, qf1, timeStamp, (byte[]) null), null);
381      this.store.add(new KeyValue(row, family, qf2, timeStamp, (byte[]) null), null);
382      this.store.add(new KeyValue(row, family, qf3, timeStamp, (byte[]) null), null);
383      flush(i);
384      edge.incrementTime(sleepTime);
385    }
386
387    // Verify the total number of store files
388    assertEquals(storeFileNum, this.store.getStorefiles().size());
389
390     // Each call will find one expired store file and delete it before compaction happens.
391     // There will be no compaction due to threshold above. Last file will not be replaced.
392    for (int i = 1; i <= storeFileNum - 1; i++) {
393      // verify the expired store file.
394      assertFalse(this.store.requestCompaction().isPresent());
395      Collection<HStoreFile> sfs = this.store.getStorefiles();
396      // Ensure i files are gone.
397      if (minVersions == 0) {
398        assertEquals(storeFileNum - i, sfs.size());
399        // Ensure only non-expired files remain.
400        for (HStoreFile sf : sfs) {
401          assertTrue(sf.getReader().getMaxTimestamp() >= (edge.currentTime() - storeTtl));
402        }
403      } else {
404        assertEquals(storeFileNum, sfs.size());
405      }
406      // Let the next store file expired.
407      edge.incrementTime(sleepTime);
408    }
409    assertFalse(this.store.requestCompaction().isPresent());
410
411    Collection<HStoreFile> sfs = this.store.getStorefiles();
412    // Assert the last expired file is not removed.
413    if (minVersions == 0) {
414      assertEquals(1, sfs.size());
415    }
416    long ts = sfs.iterator().next().getReader().getMaxTimestamp();
417    assertTrue(ts < (edge.currentTime() - storeTtl));
418
419    for (HStoreFile sf : sfs) {
420      sf.closeStoreFile(true);
421    }
422  }
423
424  @Test
425  public void testLowestModificationTime() throws Exception {
426    Configuration conf = HBaseConfiguration.create();
427    FileSystem fs = FileSystem.get(conf);
428    // Initialize region
429    init(name.getMethodName(), conf);
430
431    int storeFileNum = 4;
432    for (int i = 1; i <= storeFileNum; i++) {
433      LOG.info("Adding some data for the store file #"+i);
434      this.store.add(new KeyValue(row, family, qf1, i, (byte[])null), null);
435      this.store.add(new KeyValue(row, family, qf2, i, (byte[])null), null);
436      this.store.add(new KeyValue(row, family, qf3, i, (byte[])null), null);
437      flush(i);
438    }
439    // after flush; check the lowest time stamp
440    long lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
441    long lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
442    assertEquals(lowestTimeStampFromManager,lowestTimeStampFromFS);
443
444    // after compact; check the lowest time stamp
445    store.compact(store.requestCompaction().get(), NoLimitThroughputController.INSTANCE, null);
446    lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
447    lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
448    assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS);
449  }
450
451  private static long getLowestTimeStampFromFS(FileSystem fs,
452      final Collection<HStoreFile> candidates) throws IOException {
453    long minTs = Long.MAX_VALUE;
454    if (candidates.isEmpty()) {
455      return minTs;
456    }
457    Path[] p = new Path[candidates.size()];
458    int i = 0;
459    for (HStoreFile sf : candidates) {
460      p[i] = sf.getPath();
461      ++i;
462    }
463
464    FileStatus[] stats = fs.listStatus(p);
465    if (stats == null || stats.length == 0) {
466      return minTs;
467    }
468    for (FileStatus s : stats) {
469      minTs = Math.min(minTs, s.getModificationTime());
470    }
471    return minTs;
472  }
473
474  //////////////////////////////////////////////////////////////////////////////
475  // Get tests
476  //////////////////////////////////////////////////////////////////////////////
477
478  private static final int BLOCKSIZE_SMALL = 8192;
479
480  /**
481   * Test for hbase-1686.
482   */
483  @Test
484  public void testEmptyStoreFile() throws IOException {
485    init(this.name.getMethodName());
486    // Write a store file.
487    this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
488    this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null);
489    flush(1);
490    // Now put in place an empty store file.  Its a little tricky.  Have to
491    // do manually with hacked in sequence id.
492    HStoreFile f = this.store.getStorefiles().iterator().next();
493    Path storedir = f.getPath().getParent();
494    long seqid = f.getMaxSequenceId();
495    Configuration c = HBaseConfiguration.create();
496    FileSystem fs = FileSystem.get(c);
497    HFileContext meta = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build();
498    StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c),
499        fs)
500            .withOutputDir(storedir)
501            .withFileContext(meta)
502            .build();
503    w.appendMetadata(seqid + 1, false);
504    w.close();
505    this.store.close();
506    // Reopen it... should pick up two files
507    this.store =
508        new HStore(this.store.getHRegion(), this.store.getColumnFamilyDescriptor(), c, false);
509    assertEquals(2, this.store.getStorefilesCount());
510
511    result = HBaseTestingUtil.getFromStoreFile(store,
512        get.getRow(),
513        qualifiers);
514    assertEquals(1, result.size());
515  }
516
517  /**
518   * Getting data from memstore only
519   */
520  @Test
521  public void testGet_FromMemStoreOnly() throws IOException {
522    init(this.name.getMethodName());
523
524    //Put data in memstore
525    this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
526    this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null);
527    this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null), null);
528    this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null), null);
529    this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null), null);
530    this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null), null);
531
532    //Get
533    result = HBaseTestingUtil.getFromStoreFile(store,
534        get.getRow(), qualifiers);
535
536    //Compare
537    assertCheck();
538  }
539
540  @Test
541  public void testTimeRangeIfSomeCellsAreDroppedInFlush() throws IOException {
542    testTimeRangeIfSomeCellsAreDroppedInFlush(1);
543    testTimeRangeIfSomeCellsAreDroppedInFlush(3);
544    testTimeRangeIfSomeCellsAreDroppedInFlush(5);
545  }
546
547  private void testTimeRangeIfSomeCellsAreDroppedInFlush(int maxVersion) throws IOException {
548    init(this.name.getMethodName(), TEST_UTIL.getConfiguration(),
549    ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(maxVersion).build());
550    long currentTs = 100;
551    long minTs = currentTs;
552    // the extra cell won't be flushed to disk,
553    // so the min of timerange will be different between memStore and hfile.
554    for (int i = 0; i != (maxVersion + 1); ++i) {
555      this.store.add(new KeyValue(row, family, qf1, ++currentTs, (byte[])null), null);
556      if (i == 1) {
557        minTs = currentTs;
558      }
559    }
560    flushStore(store, id++);
561
562    Collection<HStoreFile> files = store.getStorefiles();
563    assertEquals(1, files.size());
564    HStoreFile f = files.iterator().next();
565    f.initReader();
566    StoreFileReader reader = f.getReader();
567    assertEquals(minTs, reader.timeRange.getMin());
568    assertEquals(currentTs, reader.timeRange.getMax());
569  }
570
571  /**
572   * Getting data from files only
573   */
574  @Test
575  public void testGet_FromFilesOnly() throws IOException {
576    init(this.name.getMethodName());
577
578    //Put data in memstore
579    this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
580    this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null);
581    //flush
582    flush(1);
583
584    //Add more data
585    this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null), null);
586    this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null), null);
587    //flush
588    flush(2);
589
590    //Add more data
591    this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null), null);
592    this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null), null);
593    //flush
594    flush(3);
595
596    //Get
597    result = HBaseTestingUtil.getFromStoreFile(store,
598        get.getRow(),
599        qualifiers);
600    //this.store.get(get, qualifiers, result);
601
602    //Need to sort the result since multiple files
603    Collections.sort(result, CellComparatorImpl.COMPARATOR);
604
605    //Compare
606    assertCheck();
607  }
608
609  /**
610   * Getting data from memstore and files
611   */
612  @Test
613  public void testGet_FromMemStoreAndFiles() throws IOException {
614    init(this.name.getMethodName());
615
616    //Put data in memstore
617    this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
618    this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null);
619    //flush
620    flush(1);
621
622    //Add more data
623    this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null), null);
624    this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null), null);
625    //flush
626    flush(2);
627
628    //Add more data
629    this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null), null);
630    this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null), null);
631
632    //Get
633    result = HBaseTestingUtil.getFromStoreFile(store,
634        get.getRow(), qualifiers);
635
636    //Need to sort the result since multiple files
637    Collections.sort(result, CellComparatorImpl.COMPARATOR);
638
639    //Compare
640    assertCheck();
641  }
642
643  private void flush(int storeFilessize) throws IOException {
644    flushStore(store, id++);
645    assertEquals(storeFilessize, this.store.getStorefiles().size());
646    assertEquals(0, ((AbstractMemStore)this.store.memstore).getActive().getCellsCount());
647  }
648
649  private void assertCheck() {
650    assertEquals(expected.size(), result.size());
651    for(int i=0; i<expected.size(); i++) {
652      assertEquals(expected.get(i), result.get(i));
653    }
654  }
655
656  @After
657  public void tearDown() throws Exception {
658    EnvironmentEdgeManagerTestHelper.reset();
659    if (store != null) {
660      try {
661        store.close();
662      } catch (IOException e) {
663      }
664      store = null;
665    }
666    if (region != null) {
667      region.close();
668      region = null;
669    }
670  }
671
672  @AfterClass
673  public static void tearDownAfterClass() throws IOException {
674    TEST_UTIL.cleanupTestDir();
675  }
676
677  @Test
678  public void testHandleErrorsInFlush() throws Exception {
679    LOG.info("Setting up a faulty file system that cannot write");
680
681    final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
682    User user = User.createUserForTesting(conf,
683        "testhandleerrorsinflush", new String[]{"foo"});
684    // Inject our faulty LocalFileSystem
685    conf.setClass("fs.file.impl", FaultyFileSystem.class,
686        FileSystem.class);
687    user.runAs(new PrivilegedExceptionAction<Object>() {
688      @Override
689      public Object run() throws Exception {
690        // Make sure it worked (above is sensitive to caching details in hadoop core)
691        FileSystem fs = FileSystem.get(conf);
692        assertEquals(FaultyFileSystem.class, fs.getClass());
693
694        // Initialize region
695        init(name.getMethodName(), conf);
696
697        LOG.info("Adding some data");
698        store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
699        store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null);
700        store.add(new KeyValue(row, family, qf3, 1, (byte[])null), null);
701
702        LOG.info("Before flush, we should have no files");
703
704        Collection<StoreFileInfo> files =
705          store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName());
706        assertEquals(0, files != null ? files.size() : 0);
707
708        //flush
709        try {
710          LOG.info("Flushing");
711          flush(1);
712          fail("Didn't bubble up IOE!");
713        } catch (IOException ioe) {
714          assertTrue(ioe.getMessage().contains("Fault injected"));
715        }
716
717        LOG.info("After failed flush, we should still have no files!");
718        files = store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName());
719        assertEquals(0, files != null ? files.size() : 0);
720        store.getHRegion().getWAL().close();
721        return null;
722      }
723    });
724    FileSystem.closeAllForUGI(user.getUGI());
725  }
726
727  /**
728   * Faulty file system that will fail if you write past its fault position the FIRST TIME
729   * only; thereafter it will succeed.  Used by {@link TestHRegion} too.
730   */
731  static class FaultyFileSystem extends FilterFileSystem {
732    List<SoftReference<FaultyOutputStream>> outStreams = new ArrayList<>();
733    private long faultPos = 200;
734    AtomicBoolean fault = new AtomicBoolean(true);
735
736    public FaultyFileSystem() {
737      super(new LocalFileSystem());
738      LOG.info("Creating faulty!");
739    }
740
741    @Override
742    public FSDataOutputStream create(Path p) throws IOException {
743      return new FaultyOutputStream(super.create(p), faultPos, fault);
744    }
745
746    @Override
747    public FSDataOutputStream create(Path f, FsPermission permission,
748        boolean overwrite, int bufferSize, short replication, long blockSize,
749        Progressable progress) throws IOException {
750      return new FaultyOutputStream(super.create(f, permission,
751          overwrite, bufferSize, replication, blockSize, progress), faultPos, fault);
752    }
753
754    @Override
755    public FSDataOutputStream createNonRecursive(Path f, boolean overwrite,
756        int bufferSize, short replication, long blockSize, Progressable progress)
757    throws IOException {
758      // Fake it.  Call create instead.  The default implementation throws an IOE
759      // that this is not supported.
760      return create(f, overwrite, bufferSize, replication, blockSize, progress);
761    }
762  }
763
764  static class FaultyOutputStream extends FSDataOutputStream {
765    volatile long faultPos = Long.MAX_VALUE;
766    private final AtomicBoolean fault;
767
768    public FaultyOutputStream(FSDataOutputStream out, long faultPos, final AtomicBoolean fault)
769    throws IOException {
770      super(out, null);
771      this.faultPos = faultPos;
772      this.fault = fault;
773    }
774
775    @Override
776    public synchronized void write(byte[] buf, int offset, int length) throws IOException {
777      LOG.info("faulty stream write at pos " + getPos());
778      injectFault();
779      super.write(buf, offset, length);
780    }
781
782    private void injectFault() throws IOException {
783      if (this.fault.get() && getPos() >= faultPos) {
784        throw new IOException("Fault injected");
785      }
786    }
787  }
788
789  private static StoreFlushContext flushStore(HStore store, long id) throws IOException {
790    StoreFlushContext storeFlushCtx = store.createFlushContext(id, FlushLifeCycleTracker.DUMMY);
791    storeFlushCtx.prepare();
792    storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
793    storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
794    return storeFlushCtx;
795  }
796
797  /**
798   * Generate a list of KeyValues for testing based on given parameters
799   * @return the rows key-value list
800   */
801  private List<Cell> getKeyValueSet(long[] timestamps, int numRows,
802      byte[] qualifier, byte[] family) {
803    List<Cell> kvList = new ArrayList<>();
804    for (int i=1;i<=numRows;i++) {
805      byte[] b = Bytes.toBytes(i);
806      for (long timestamp: timestamps) {
807        kvList.add(new KeyValue(b, family, qualifier, timestamp, b));
808      }
809    }
810    return kvList;
811  }
812
813  /**
814   * Test to ensure correctness when using Stores with multiple timestamps
815   */
816  @Test
817  public void testMultipleTimestamps() throws IOException {
818    int numRows = 1;
819    long[] timestamps1 = new long[] {1,5,10,20};
820    long[] timestamps2 = new long[] {30,80};
821
822    init(this.name.getMethodName());
823
824    List<Cell> kvList1 = getKeyValueSet(timestamps1,numRows, qf1, family);
825    for (Cell kv : kvList1) {
826      this.store.add(kv, null);
827    }
828
829    flushStore(store, id++);
830
831    List<Cell> kvList2 = getKeyValueSet(timestamps2,numRows, qf1, family);
832    for(Cell kv : kvList2) {
833      this.store.add(kv, null);
834    }
835
836    List<Cell> result;
837    Get get = new Get(Bytes.toBytes(1));
838    get.addColumn(family,qf1);
839
840    get.setTimeRange(0,15);
841    result = HBaseTestingUtil.getFromStoreFile(store, get);
842    assertTrue(result.size()>0);
843
844    get.setTimeRange(40,90);
845    result = HBaseTestingUtil.getFromStoreFile(store, get);
846    assertTrue(result.size()>0);
847
848    get.setTimeRange(10,45);
849    result = HBaseTestingUtil.getFromStoreFile(store, get);
850    assertTrue(result.size()>0);
851
852    get.setTimeRange(80,145);
853    result = HBaseTestingUtil.getFromStoreFile(store, get);
854    assertTrue(result.size()>0);
855
856    get.setTimeRange(1,2);
857    result = HBaseTestingUtil.getFromStoreFile(store, get);
858    assertTrue(result.size()>0);
859
860    get.setTimeRange(90,200);
861    result = HBaseTestingUtil.getFromStoreFile(store, get);
862    assertTrue(result.size()==0);
863  }
864
865  /**
866   * Test for HBASE-3492 - Test split on empty colfam (no store files).
867   *
868   * @throws IOException When the IO operations fail.
869   */
870  @Test
871  public void testSplitWithEmptyColFam() throws IOException {
872    init(this.name.getMethodName());
873    assertFalse(store.getSplitPoint().isPresent());
874  }
875
876  @Test
877  public void testStoreUsesConfigurationFromHcdAndHtd() throws Exception {
878    final String CONFIG_KEY = "hbase.regionserver.thread.compaction.throttle";
879    long anyValue = 10;
880
881    // We'll check that it uses correct config and propagates it appropriately by going thru
882    // the simplest "real" path I can find - "throttleCompaction", which just checks whether
883    // a number we pass in is higher than some config value, inside compactionPolicy.
884    Configuration conf = HBaseConfiguration.create();
885    conf.setLong(CONFIG_KEY, anyValue);
886    init(name.getMethodName() + "-xml", conf);
887    assertTrue(store.throttleCompaction(anyValue + 1));
888    assertFalse(store.throttleCompaction(anyValue));
889
890    // HTD overrides XML.
891    --anyValue;
892    init(name.getMethodName() + "-htd", conf, TableDescriptorBuilder
893        .newBuilder(TableName.valueOf(table)).setValue(CONFIG_KEY, Long.toString(anyValue)),
894      ColumnFamilyDescriptorBuilder.of(family));
895    assertTrue(store.throttleCompaction(anyValue + 1));
896    assertFalse(store.throttleCompaction(anyValue));
897
898    // HCD overrides them both.
899    --anyValue;
900    init(name.getMethodName() + "-hcd", conf,
901      TableDescriptorBuilder.newBuilder(TableName.valueOf(table)).setValue(CONFIG_KEY,
902        Long.toString(anyValue)),
903      ColumnFamilyDescriptorBuilder.newBuilder(family).setValue(CONFIG_KEY, Long.toString(anyValue))
904          .build());
905    assertTrue(store.throttleCompaction(anyValue + 1));
906    assertFalse(store.throttleCompaction(anyValue));
907  }
908
909  public static class DummyStoreEngine extends DefaultStoreEngine {
910    public static DefaultCompactor lastCreatedCompactor = null;
911
912    @Override
913    protected void createComponents(Configuration conf, HStore store, CellComparator comparator)
914        throws IOException {
915      super.createComponents(conf, store, comparator);
916      lastCreatedCompactor = this.compactor;
917    }
918  }
919
920  @Test
921  public void testStoreUsesSearchEngineOverride() throws Exception {
922    Configuration conf = HBaseConfiguration.create();
923    conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DummyStoreEngine.class.getName());
924    init(this.name.getMethodName(), conf);
925    assertEquals(DummyStoreEngine.lastCreatedCompactor,
926      this.store.storeEngine.getCompactor());
927  }
928
929  private void addStoreFile() throws IOException {
930    HStoreFile f = this.store.getStorefiles().iterator().next();
931    Path storedir = f.getPath().getParent();
932    long seqid = this.store.getMaxSequenceId().orElse(0L);
933    Configuration c = TEST_UTIL.getConfiguration();
934    FileSystem fs = FileSystem.get(c);
935    HFileContext fileContext = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build();
936    StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c),
937        fs)
938            .withOutputDir(storedir)
939            .withFileContext(fileContext)
940            .build();
941    w.appendMetadata(seqid + 1, false);
942    w.close();
943    LOG.info("Added store file:" + w.getPath());
944  }
945
946  private void archiveStoreFile(int index) throws IOException {
947    Collection<HStoreFile> files = this.store.getStorefiles();
948    HStoreFile sf = null;
949    Iterator<HStoreFile> it = files.iterator();
950    for (int i = 0; i <= index; i++) {
951      sf = it.next();
952    }
953    store.getRegionFileSystem().removeStoreFiles(store.getColumnFamilyName(), Lists.newArrayList(sf));
954  }
955
956  private void closeCompactedFile(int index) throws IOException {
957    Collection<HStoreFile> files =
958        this.store.getStoreEngine().getStoreFileManager().getCompactedfiles();
959    HStoreFile sf = null;
960    Iterator<HStoreFile> it = files.iterator();
961    for (int i = 0; i <= index; i++) {
962      sf = it.next();
963    }
964    sf.closeStoreFile(true);
965    store.getStoreEngine().getStoreFileManager().removeCompactedFiles(Lists.newArrayList(sf));
966  }
967
968  @Test
969  public void testRefreshStoreFiles() throws Exception {
970    init(name.getMethodName());
971
972    assertEquals(0, this.store.getStorefilesCount());
973
974    // Test refreshing store files when no store files are there
975    store.refreshStoreFiles();
976    assertEquals(0, this.store.getStorefilesCount());
977
978    // add some data, flush
979    this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
980    flush(1);
981    assertEquals(1, this.store.getStorefilesCount());
982
983    // add one more file
984    addStoreFile();
985
986    assertEquals(1, this.store.getStorefilesCount());
987    store.refreshStoreFiles();
988    assertEquals(2, this.store.getStorefilesCount());
989
990    // add three more files
991    addStoreFile();
992    addStoreFile();
993    addStoreFile();
994
995    assertEquals(2, this.store.getStorefilesCount());
996    store.refreshStoreFiles();
997    assertEquals(5, this.store.getStorefilesCount());
998
999    closeCompactedFile(0);
1000    archiveStoreFile(0);
1001
1002    assertEquals(5, this.store.getStorefilesCount());
1003    store.refreshStoreFiles();
1004    assertEquals(4, this.store.getStorefilesCount());
1005
1006    archiveStoreFile(0);
1007    archiveStoreFile(1);
1008    archiveStoreFile(2);
1009
1010    assertEquals(4, this.store.getStorefilesCount());
1011    store.refreshStoreFiles();
1012    assertEquals(1, this.store.getStorefilesCount());
1013
1014    archiveStoreFile(0);
1015    store.refreshStoreFiles();
1016    assertEquals(0, this.store.getStorefilesCount());
1017  }
1018
1019  @Test
1020  public void testRefreshStoreFilesNotChanged() throws IOException {
1021    init(name.getMethodName());
1022
1023    assertEquals(0, this.store.getStorefilesCount());
1024
1025    // add some data, flush
1026    this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
1027    flush(1);
1028    // add one more file
1029    addStoreFile();
1030
1031    StoreEngine<?, ?, ?, ?> spiedStoreEngine = spy(store.getStoreEngine());
1032
1033    // call first time after files changed
1034    spiedStoreEngine.refreshStoreFiles();
1035    assertEquals(2, this.store.getStorefilesCount());
1036    verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any());
1037
1038    // call second time
1039    spiedStoreEngine.refreshStoreFiles();
1040
1041    // ensure that replaceStoreFiles is not called, i.e, the times does not change, if files are not
1042    // refreshed,
1043    verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any());
1044  }
1045
1046  private long countMemStoreScanner(StoreScanner scanner) {
1047    if (scanner.currentScanners == null) {
1048      return 0;
1049    }
1050    return scanner.currentScanners.stream().filter(s -> !s.isFileScanner()).count();
1051  }
1052
1053  @Test
1054  public void testNumberOfMemStoreScannersAfterFlush() throws IOException {
1055    long seqId = 100;
1056    long timestamp = EnvironmentEdgeManager.currentTime();
1057    Cell cell0 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family)
1058        .setQualifier(qf1).setTimestamp(timestamp).setType(Cell.Type.Put)
1059        .setValue(qf1).build();
1060    PrivateCellUtil.setSequenceId(cell0, seqId);
1061    testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Collections.emptyList());
1062
1063    Cell cell1 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family)
1064        .setQualifier(qf2).setTimestamp(timestamp).setType(Cell.Type.Put)
1065        .setValue(qf1).build();
1066    PrivateCellUtil.setSequenceId(cell1, seqId);
1067    testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1));
1068
1069    seqId = 101;
1070    timestamp = EnvironmentEdgeManager.currentTime();
1071    Cell cell2 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row2).setFamily(family)
1072        .setQualifier(qf2).setTimestamp(timestamp).setType(Cell.Type.Put)
1073        .setValue(qf1).build();
1074    PrivateCellUtil.setSequenceId(cell2, seqId);
1075    testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1, cell2));
1076  }
1077
1078  private void testNumberOfMemStoreScannersAfterFlush(List<Cell> inputCellsBeforeSnapshot,
1079      List<Cell> inputCellsAfterSnapshot) throws IOException {
1080    init(this.name.getMethodName() + "-" + inputCellsBeforeSnapshot.size());
1081    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1082    long seqId = Long.MIN_VALUE;
1083    for (Cell c : inputCellsBeforeSnapshot) {
1084      quals.add(CellUtil.cloneQualifier(c));
1085      seqId = Math.max(seqId, c.getSequenceId());
1086    }
1087    for (Cell c : inputCellsAfterSnapshot) {
1088      quals.add(CellUtil.cloneQualifier(c));
1089      seqId = Math.max(seqId, c.getSequenceId());
1090    }
1091    inputCellsBeforeSnapshot.forEach(c -> store.add(c, null));
1092    StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY);
1093    storeFlushCtx.prepare();
1094    inputCellsAfterSnapshot.forEach(c -> store.add(c, null));
1095    int numberOfMemScannersBeforeFlush = inputCellsAfterSnapshot.isEmpty() ? 1 : 2;
1096    try (StoreScanner s = (StoreScanner) store.getScanner(new Scan(), quals, seqId)) {
1097      // snapshot + active (if inputCellsAfterSnapshot isn't empty)
1098      assertEquals(numberOfMemScannersBeforeFlush, countMemStoreScanner(s));
1099      storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1100      storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1101      // snapshot has no data after flush
1102      int numberOfMemScannersAfterFlush = inputCellsAfterSnapshot.isEmpty() ? 0 : 1;
1103      boolean more;
1104      int cellCount = 0;
1105      do {
1106        List<Cell> cells = new ArrayList<>();
1107        more = s.next(cells);
1108        cellCount += cells.size();
1109        assertEquals(more ? numberOfMemScannersAfterFlush : 0, countMemStoreScanner(s));
1110      } while (more);
1111      assertEquals("The number of cells added before snapshot is " + inputCellsBeforeSnapshot.size()
1112          + ", The number of cells added after snapshot is " + inputCellsAfterSnapshot.size(),
1113          inputCellsBeforeSnapshot.size() + inputCellsAfterSnapshot.size(), cellCount);
1114      // the current scanners is cleared
1115      assertEquals(0, countMemStoreScanner(s));
1116    }
1117  }
1118
1119  private Cell createCell(byte[] qualifier, long ts, long sequenceId, byte[] value)
1120      throws IOException {
1121    return createCell(row, qualifier, ts, sequenceId, value);
1122  }
1123
1124  private Cell createCell(byte[] row, byte[] qualifier, long ts, long sequenceId, byte[] value)
1125      throws IOException {
1126    Cell c = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family)
1127        .setQualifier(qualifier).setTimestamp(ts).setType(Cell.Type.Put)
1128        .setValue(value).build();
1129    PrivateCellUtil.setSequenceId(c, sequenceId);
1130    return c;
1131  }
1132
1133  @Test
1134  public void testFlushBeforeCompletingScanWoFilter() throws IOException, InterruptedException {
1135    final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false);
1136    final int expectedSize = 3;
1137    testFlushBeforeCompletingScan(new MyListHook() {
1138      @Override
1139      public void hook(int currentSize) {
1140        if (currentSize == expectedSize - 1) {
1141          try {
1142            flushStore(store, id++);
1143            timeToGoNextRow.set(true);
1144          } catch (IOException e) {
1145            throw new RuntimeException(e);
1146          }
1147        }
1148      }
1149    }, new FilterBase() {
1150      @Override
1151      public Filter.ReturnCode filterCell(final Cell c) throws IOException {
1152        return ReturnCode.INCLUDE;
1153      }
1154    }, expectedSize);
1155  }
1156
1157  @Test
1158  public void testFlushBeforeCompletingScanWithFilter() throws IOException, InterruptedException {
1159    final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false);
1160    final int expectedSize = 2;
1161    testFlushBeforeCompletingScan(new MyListHook() {
1162      @Override
1163      public void hook(int currentSize) {
1164        if (currentSize == expectedSize - 1) {
1165          try {
1166            flushStore(store, id++);
1167            timeToGoNextRow.set(true);
1168          } catch (IOException e) {
1169            throw new RuntimeException(e);
1170          }
1171        }
1172      }
1173    }, new FilterBase() {
1174      @Override
1175      public Filter.ReturnCode filterCell(final Cell c) throws IOException {
1176        if (timeToGoNextRow.get()) {
1177          timeToGoNextRow.set(false);
1178          return ReturnCode.NEXT_ROW;
1179        } else {
1180          return ReturnCode.INCLUDE;
1181        }
1182      }
1183    }, expectedSize);
1184  }
1185
1186  @Test
1187  public void testFlushBeforeCompletingScanWithFilterHint() throws IOException,
1188      InterruptedException {
1189    final AtomicBoolean timeToGetHint = new AtomicBoolean(false);
1190    final int expectedSize = 2;
1191    testFlushBeforeCompletingScan(new MyListHook() {
1192      @Override
1193      public void hook(int currentSize) {
1194        if (currentSize == expectedSize - 1) {
1195          try {
1196            flushStore(store, id++);
1197            timeToGetHint.set(true);
1198          } catch (IOException e) {
1199            throw new RuntimeException(e);
1200          }
1201        }
1202      }
1203    }, new FilterBase() {
1204      @Override
1205      public Filter.ReturnCode filterCell(final Cell c) throws IOException {
1206        if (timeToGetHint.get()) {
1207          timeToGetHint.set(false);
1208          return Filter.ReturnCode.SEEK_NEXT_USING_HINT;
1209        } else {
1210          return Filter.ReturnCode.INCLUDE;
1211        }
1212      }
1213      @Override
1214      public Cell getNextCellHint(Cell currentCell) throws IOException {
1215        return currentCell;
1216      }
1217    }, expectedSize);
1218  }
1219
1220  private void testFlushBeforeCompletingScan(MyListHook hook, Filter filter, int expectedSize)
1221          throws IOException, InterruptedException {
1222    Configuration conf = HBaseConfiguration.create();
1223    byte[] r0 = Bytes.toBytes("row0");
1224    byte[] r1 = Bytes.toBytes("row1");
1225    byte[] r2 = Bytes.toBytes("row2");
1226    byte[] value0 = Bytes.toBytes("value0");
1227    byte[] value1 = Bytes.toBytes("value1");
1228    byte[] value2 = Bytes.toBytes("value2");
1229    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1230    long ts = EnvironmentEdgeManager.currentTime();
1231    long seqId = 100;
1232    init(name.getMethodName(), conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)),
1233      ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(1).build(),
1234      new MyStoreHook() {
1235        @Override
1236        public long getSmallestReadPoint(HStore store) {
1237          return seqId + 3;
1238        }
1239      });
1240    // The cells having the value0 won't be flushed to disk because the value of max version is 1
1241    store.add(createCell(r0, qf1, ts, seqId, value0), memStoreSizing);
1242    store.add(createCell(r0, qf2, ts, seqId, value0), memStoreSizing);
1243    store.add(createCell(r0, qf3, ts, seqId, value0), memStoreSizing);
1244    store.add(createCell(r1, qf1, ts + 1, seqId + 1, value1), memStoreSizing);
1245    store.add(createCell(r1, qf2, ts + 1, seqId + 1, value1), memStoreSizing);
1246    store.add(createCell(r1, qf3, ts + 1, seqId + 1, value1), memStoreSizing);
1247    store.add(createCell(r2, qf1, ts + 2, seqId + 2, value2), memStoreSizing);
1248    store.add(createCell(r2, qf2, ts + 2, seqId + 2, value2), memStoreSizing);
1249    store.add(createCell(r2, qf3, ts + 2, seqId + 2, value2), memStoreSizing);
1250    store.add(createCell(r1, qf1, ts + 3, seqId + 3, value1), memStoreSizing);
1251    store.add(createCell(r1, qf2, ts + 3, seqId + 3, value1), memStoreSizing);
1252    store.add(createCell(r1, qf3, ts + 3, seqId + 3, value1), memStoreSizing);
1253    List<Cell> myList = new MyList<>(hook);
1254    Scan scan = new Scan()
1255            .withStartRow(r1)
1256            .setFilter(filter);
1257    try (InternalScanner scanner = (InternalScanner) store.getScanner(
1258          scan, null, seqId + 3)){
1259      // r1
1260      scanner.next(myList);
1261      assertEquals(expectedSize, myList.size());
1262      for (Cell c : myList) {
1263        byte[] actualValue = CellUtil.cloneValue(c);
1264        assertTrue("expected:" + Bytes.toStringBinary(value1)
1265          + ", actual:" + Bytes.toStringBinary(actualValue)
1266          , Bytes.equals(actualValue, value1));
1267      }
1268      List<Cell> normalList = new ArrayList<>(3);
1269      // r2
1270      scanner.next(normalList);
1271      assertEquals(3, normalList.size());
1272      for (Cell c : normalList) {
1273        byte[] actualValue = CellUtil.cloneValue(c);
1274        assertTrue("expected:" + Bytes.toStringBinary(value2)
1275          + ", actual:" + Bytes.toStringBinary(actualValue)
1276          , Bytes.equals(actualValue, value2));
1277      }
1278    }
1279  }
1280
1281  @Test
1282  public void testCreateScannerAndSnapshotConcurrently() throws IOException, InterruptedException {
1283    Configuration conf = HBaseConfiguration.create();
1284    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore.class.getName());
1285    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
1286        .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
1287    byte[] value = Bytes.toBytes("value");
1288    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1289    long ts = EnvironmentEdgeManager.currentTime();
1290    long seqId = 100;
1291    // older data whihc shouldn't be "seen" by client
1292    store.add(createCell(qf1, ts, seqId, value), memStoreSizing);
1293    store.add(createCell(qf2, ts, seqId, value), memStoreSizing);
1294    store.add(createCell(qf3, ts, seqId, value), memStoreSizing);
1295    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1296    quals.add(qf1);
1297    quals.add(qf2);
1298    quals.add(qf3);
1299    StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY);
1300    MyCompactingMemStore.START_TEST.set(true);
1301    Runnable flush = () -> {
1302      // this is blocked until we create first scanner from pipeline and snapshot -- phase (1/5)
1303      // recreate the active memstore -- phase (4/5)
1304      storeFlushCtx.prepare();
1305    };
1306    ExecutorService service = Executors.newSingleThreadExecutor();
1307    service.submit(flush);
1308    // we get scanner from pipeline and snapshot but they are empty. -- phase (2/5)
1309    // this is blocked until we recreate the active memstore -- phase (3/5)
1310    // we get scanner from active memstore but it is empty -- phase (5/5)
1311    InternalScanner scanner = (InternalScanner) store.getScanner(
1312          new Scan(new Get(row)), quals, seqId + 1);
1313    service.shutdown();
1314    service.awaitTermination(20, TimeUnit.SECONDS);
1315    try {
1316      try {
1317        List<Cell> results = new ArrayList<>();
1318        scanner.next(results);
1319        assertEquals(3, results.size());
1320        for (Cell c : results) {
1321          byte[] actualValue = CellUtil.cloneValue(c);
1322          assertTrue("expected:" + Bytes.toStringBinary(value)
1323            + ", actual:" + Bytes.toStringBinary(actualValue)
1324            , Bytes.equals(actualValue, value));
1325        }
1326      } finally {
1327        scanner.close();
1328      }
1329    } finally {
1330      MyCompactingMemStore.START_TEST.set(false);
1331      storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1332      storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1333    }
1334  }
1335
1336  @Test
1337  public void testScanWithDoubleFlush() throws IOException {
1338    Configuration conf = HBaseConfiguration.create();
1339    // Initialize region
1340    MyStore myStore = initMyStore(name.getMethodName(), conf, new MyStoreHook(){
1341      @Override
1342      public void getScanners(MyStore store) throws IOException {
1343        final long tmpId = id++;
1344        ExecutorService s = Executors.newSingleThreadExecutor();
1345        s.submit(() -> {
1346          try {
1347            // flush the store before storescanner updates the scanners from store.
1348            // The current data will be flushed into files, and the memstore will
1349            // be clear.
1350            // -- phase (4/4)
1351            flushStore(store, tmpId);
1352          }catch (IOException ex) {
1353            throw new RuntimeException(ex);
1354          }
1355        });
1356        s.shutdown();
1357        try {
1358          // wait for the flush, the thread will be blocked in HStore#notifyChangedReadersObservers.
1359          s.awaitTermination(3, TimeUnit.SECONDS);
1360        } catch (InterruptedException ex) {
1361        }
1362      }
1363    });
1364    byte[] oldValue = Bytes.toBytes("oldValue");
1365    byte[] currentValue = Bytes.toBytes("currentValue");
1366    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1367    long ts = EnvironmentEdgeManager.currentTime();
1368    long seqId = 100;
1369    // older data whihc shouldn't be "seen" by client
1370    myStore.add(createCell(qf1, ts, seqId, oldValue), memStoreSizing);
1371    myStore.add(createCell(qf2, ts, seqId, oldValue), memStoreSizing);
1372    myStore.add(createCell(qf3, ts, seqId, oldValue), memStoreSizing);
1373    long snapshotId = id++;
1374    // push older data into snapshot -- phase (1/4)
1375    StoreFlushContext storeFlushCtx = store.createFlushContext(snapshotId, FlushLifeCycleTracker
1376        .DUMMY);
1377    storeFlushCtx.prepare();
1378
1379    // insert current data into active -- phase (2/4)
1380    myStore.add(createCell(qf1, ts + 1, seqId + 1, currentValue), memStoreSizing);
1381    myStore.add(createCell(qf2, ts + 1, seqId + 1, currentValue), memStoreSizing);
1382    myStore.add(createCell(qf3, ts + 1, seqId + 1, currentValue), memStoreSizing);
1383    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1384    quals.add(qf1);
1385    quals.add(qf2);
1386    quals.add(qf3);
1387    try (InternalScanner scanner = (InternalScanner) myStore.getScanner(
1388        new Scan(new Get(row)), quals, seqId + 1)) {
1389      // complete the flush -- phase (3/4)
1390      storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1391      storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1392
1393      List<Cell> results = new ArrayList<>();
1394      scanner.next(results);
1395      assertEquals(3, results.size());
1396      for (Cell c : results) {
1397        byte[] actualValue = CellUtil.cloneValue(c);
1398        assertTrue("expected:" + Bytes.toStringBinary(currentValue)
1399          + ", actual:" + Bytes.toStringBinary(actualValue)
1400          , Bytes.equals(actualValue, currentValue));
1401      }
1402    }
1403  }
1404
1405  @Test
1406  public void testReclaimChunkWhenScaning() throws IOException {
1407    init("testReclaimChunkWhenScaning");
1408    long ts = EnvironmentEdgeManager.currentTime();
1409    long seqId = 100;
1410    byte[] value = Bytes.toBytes("value");
1411    // older data whihc shouldn't be "seen" by client
1412    store.add(createCell(qf1, ts, seqId, value), null);
1413    store.add(createCell(qf2, ts, seqId, value), null);
1414    store.add(createCell(qf3, ts, seqId, value), null);
1415    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1416    quals.add(qf1);
1417    quals.add(qf2);
1418    quals.add(qf3);
1419    try (InternalScanner scanner = (InternalScanner) store.getScanner(
1420        new Scan(new Get(row)), quals, seqId)) {
1421      List<Cell> results = new MyList<>(size -> {
1422        switch (size) {
1423          // 1) we get the first cell (qf1)
1424          // 2) flush the data to have StoreScanner update inner scanners
1425          // 3) the chunk will be reclaimed after updaing
1426          case 1:
1427            try {
1428              flushStore(store, id++);
1429            } catch (IOException e) {
1430              throw new RuntimeException(e);
1431            }
1432            break;
1433          // 1) we get the second cell (qf2)
1434          // 2) add some cell to fill some byte into the chunk (we have only one chunk)
1435          case 2:
1436            try {
1437              byte[] newValue = Bytes.toBytes("newValue");
1438              // older data whihc shouldn't be "seen" by client
1439              store.add(createCell(qf1, ts + 1, seqId + 1, newValue), null);
1440              store.add(createCell(qf2, ts + 1, seqId + 1, newValue), null);
1441              store.add(createCell(qf3, ts + 1, seqId + 1, newValue), null);
1442            } catch (IOException e) {
1443              throw new RuntimeException(e);
1444            }
1445            break;
1446          default:
1447            break;
1448        }
1449      });
1450      scanner.next(results);
1451      assertEquals(3, results.size());
1452      for (Cell c : results) {
1453        byte[] actualValue = CellUtil.cloneValue(c);
1454        assertTrue("expected:" + Bytes.toStringBinary(value)
1455          + ", actual:" + Bytes.toStringBinary(actualValue)
1456          , Bytes.equals(actualValue, value));
1457      }
1458    }
1459  }
1460
1461  /**
1462   * If there are two running InMemoryFlushRunnable, the later InMemoryFlushRunnable
1463   * may change the versionedList. And the first InMemoryFlushRunnable will use the chagned
1464   * versionedList to remove the corresponding segments.
1465   * In short, there will be some segements which isn't in merge are removed.
1466   */
1467  @Test
1468  public void testRunDoubleMemStoreCompactors() throws IOException, InterruptedException {
1469    int flushSize = 500;
1470    Configuration conf = HBaseConfiguration.create();
1471    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStoreWithCustomCompactor.class.getName());
1472    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.25);
1473    MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.set(0);
1474    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushSize));
1475    // Set the lower threshold to invoke the "MERGE" policy
1476    conf.set(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, String.valueOf(0));
1477    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
1478        .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
1479    byte[] value = Bytes.toBytes("thisisavarylargevalue");
1480    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1481    long ts = EnvironmentEdgeManager.currentTime();
1482    long seqId = 100;
1483    // older data whihc shouldn't be "seen" by client
1484    store.add(createCell(qf1, ts, seqId, value), memStoreSizing);
1485    store.add(createCell(qf2, ts, seqId, value), memStoreSizing);
1486    store.add(createCell(qf3, ts, seqId, value), memStoreSizing);
1487    assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
1488    StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY);
1489    storeFlushCtx.prepare();
1490    // This shouldn't invoke another in-memory flush because the first compactor thread
1491    // hasn't accomplished the in-memory compaction.
1492    store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing);
1493    store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing);
1494    store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing);
1495    assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
1496    //okay. Let the compaction be completed
1497    MyMemStoreCompactor.START_COMPACTOR_LATCH.countDown();
1498    CompactingMemStore mem = (CompactingMemStore) ((HStore)store).memstore;
1499    while (mem.isMemStoreFlushingInMemory()) {
1500      TimeUnit.SECONDS.sleep(1);
1501    }
1502    // This should invoke another in-memory flush.
1503    store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing);
1504    store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing);
1505    store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing);
1506    assertEquals(2, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
1507    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
1508      String.valueOf(TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE));
1509    storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1510    storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1511  }
1512
1513  @Test
1514  public void testAge() throws IOException {
1515    long currentTime = EnvironmentEdgeManager.currentTime();
1516    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
1517    edge.setValue(currentTime);
1518    EnvironmentEdgeManager.injectEdge(edge);
1519    Configuration conf = TEST_UTIL.getConfiguration();
1520    ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(family);
1521    initHRegion(name.getMethodName(), conf,
1522      TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), hcd, null, false);
1523    HStore store = new HStore(region, hcd, conf, false) {
1524
1525      @Override
1526      protected StoreEngine<?, ?, ?, ?> createStoreEngine(HStore store, Configuration conf,
1527          CellComparator kvComparator) throws IOException {
1528        List<HStoreFile> storefiles =
1529            Arrays.asList(mockStoreFile(currentTime - 10), mockStoreFile(currentTime - 100),
1530              mockStoreFile(currentTime - 1000), mockStoreFile(currentTime - 10000));
1531        StoreFileManager sfm = mock(StoreFileManager.class);
1532        when(sfm.getStorefiles()).thenReturn(storefiles);
1533        StoreEngine<?, ?, ?, ?> storeEngine = mock(StoreEngine.class);
1534        when(storeEngine.getStoreFileManager()).thenReturn(sfm);
1535        return storeEngine;
1536      }
1537    };
1538    assertEquals(10L, store.getMinStoreFileAge().getAsLong());
1539    assertEquals(10000L, store.getMaxStoreFileAge().getAsLong());
1540    assertEquals((10 + 100 + 1000 + 10000) / 4.0, store.getAvgStoreFileAge().getAsDouble(), 1E-4);
1541  }
1542
1543  private HStoreFile mockStoreFile(long createdTime) {
1544    StoreFileInfo info = mock(StoreFileInfo.class);
1545    when(info.getCreatedTimestamp()).thenReturn(createdTime);
1546    HStoreFile sf = mock(HStoreFile.class);
1547    when(sf.getReader()).thenReturn(mock(StoreFileReader.class));
1548    when(sf.isHFile()).thenReturn(true);
1549    when(sf.getFileInfo()).thenReturn(info);
1550    return sf;
1551  }
1552
1553  private MyStore initMyStore(String methodName, Configuration conf, MyStoreHook hook)
1554      throws IOException {
1555    return (MyStore) init(methodName, conf,
1556      TableDescriptorBuilder.newBuilder(TableName.valueOf(table)),
1557      ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(5).build(), hook);
1558  }
1559
1560  private static class MyStore extends HStore {
1561    private final MyStoreHook hook;
1562
1563    MyStore(final HRegion region, final ColumnFamilyDescriptor family, final Configuration
1564        confParam, MyStoreHook hook, boolean switchToPread) throws IOException {
1565      super(region, family, confParam, false);
1566      this.hook = hook;
1567    }
1568
1569    @Override
1570    public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks,
1571        boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,
1572        boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt,
1573        boolean includeMemstoreScanner) throws IOException {
1574      hook.getScanners(this);
1575      return super.getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true,
1576        stopRow, false, readPt, includeMemstoreScanner);
1577    }
1578
1579    @Override
1580    public long getSmallestReadPoint() {
1581      return hook.getSmallestReadPoint(this);
1582    }
1583  }
1584
1585  private abstract static class MyStoreHook {
1586
1587    void getScanners(MyStore store) throws IOException {
1588    }
1589
1590    long getSmallestReadPoint(HStore store) {
1591      return store.getHRegion().getSmallestReadPoint();
1592    }
1593  }
1594
1595  @Test
1596  public void testSwitchingPreadtoStreamParallelyWithCompactionDischarger() throws Exception {
1597    Configuration conf = HBaseConfiguration.create();
1598    conf.set("hbase.hstore.engine.class", DummyStoreEngine.class.getName());
1599    conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 0);
1600    // Set the lower threshold to invoke the "MERGE" policy
1601    MyStore store = initMyStore(name.getMethodName(), conf, new MyStoreHook() {});
1602    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1603    long ts = EnvironmentEdgeManager.currentTime();
1604    long seqID = 1L;
1605    // Add some data to the region and do some flushes
1606    for (int i = 1; i < 10; i++) {
1607      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1608        memStoreSizing);
1609    }
1610    // flush them
1611    flushStore(store, seqID);
1612    for (int i = 11; i < 20; i++) {
1613      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1614        memStoreSizing);
1615    }
1616    // flush them
1617    flushStore(store, seqID);
1618    for (int i = 21; i < 30; i++) {
1619      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1620        memStoreSizing);
1621    }
1622    // flush them
1623    flushStore(store, seqID);
1624
1625    assertEquals(3, store.getStorefilesCount());
1626    Scan scan = new Scan();
1627    scan.addFamily(family);
1628    Collection<HStoreFile> storefiles2 = store.getStorefiles();
1629    ArrayList<HStoreFile> actualStorefiles = Lists.newArrayList(storefiles2);
1630    StoreScanner storeScanner =
1631        (StoreScanner) store.getScanner(scan, scan.getFamilyMap().get(family), Long.MAX_VALUE);
1632    // get the current heap
1633    KeyValueHeap heap = storeScanner.heap;
1634    // create more store files
1635    for (int i = 31; i < 40; i++) {
1636      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1637        memStoreSizing);
1638    }
1639    // flush them
1640    flushStore(store, seqID);
1641
1642    for (int i = 41; i < 50; i++) {
1643      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1644        memStoreSizing);
1645    }
1646    // flush them
1647    flushStore(store, seqID);
1648    storefiles2 = store.getStorefiles();
1649    ArrayList<HStoreFile> actualStorefiles1 = Lists.newArrayList(storefiles2);
1650    actualStorefiles1.removeAll(actualStorefiles);
1651    // Do compaction
1652    MyThread thread = new MyThread(storeScanner);
1653    thread.start();
1654    store.replaceStoreFiles(actualStorefiles, actualStorefiles1, false);
1655    thread.join();
1656    KeyValueHeap heap2 = thread.getHeap();
1657    assertFalse(heap.equals(heap2));
1658  }
1659
1660  @Test
1661  public void testMaxPreadBytesConfiguredToBeLessThanZero() throws Exception {
1662    Configuration conf = HBaseConfiguration.create();
1663    conf.set("hbase.hstore.engine.class", DummyStoreEngine.class.getName());
1664    // Set 'hbase.storescanner.pread.max.bytes' < 0, so that StoreScanner will be a STREAM type.
1665    conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, -1);
1666    MyStore store = initMyStore(name.getMethodName(), conf, new MyStoreHook() {});
1667    Scan scan = new Scan();
1668    scan.addFamily(family);
1669    // ReadType on Scan is still DEFAULT only.
1670    assertEquals(ReadType.DEFAULT, scan.getReadType());
1671    StoreScanner storeScanner = (StoreScanner) store.getScanner(scan,
1672        scan.getFamilyMap().get(family), Long.MAX_VALUE);
1673    assertFalse(storeScanner.isScanUsePread());
1674  }
1675
1676  @Test
1677  public void testSpaceQuotaChangeAfterReplacement() throws IOException {
1678    final TableName tn = TableName.valueOf(name.getMethodName());
1679    init(name.getMethodName());
1680
1681    RegionSizeStoreImpl sizeStore = new RegionSizeStoreImpl();
1682
1683    HStoreFile sf1 = mockStoreFileWithLength(1024L);
1684    HStoreFile sf2 = mockStoreFileWithLength(2048L);
1685    HStoreFile sf3 = mockStoreFileWithLength(4096L);
1686    HStoreFile sf4 = mockStoreFileWithLength(8192L);
1687
1688    RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tn).setStartKey(Bytes.toBytes("a"))
1689        .setEndKey(Bytes.toBytes("b")).build();
1690
1691    // Compacting two files down to one, reducing size
1692    sizeStore.put(regionInfo, 1024L + 4096L);
1693    store.updateSpaceQuotaAfterFileReplacement(
1694        sizeStore, regionInfo, Arrays.asList(sf1, sf3), Arrays.asList(sf2));
1695
1696    assertEquals(2048L, sizeStore.getRegionSize(regionInfo).getSize());
1697
1698    // The same file length in and out should have no change
1699    store.updateSpaceQuotaAfterFileReplacement(
1700        sizeStore, regionInfo, Arrays.asList(sf2), Arrays.asList(sf2));
1701
1702    assertEquals(2048L, sizeStore.getRegionSize(regionInfo).getSize());
1703
1704    // Increase the total size used
1705    store.updateSpaceQuotaAfterFileReplacement(
1706        sizeStore, regionInfo, Arrays.asList(sf2), Arrays.asList(sf3));
1707
1708    assertEquals(4096L, sizeStore.getRegionSize(regionInfo).getSize());
1709
1710    RegionInfo regionInfo2 = RegionInfoBuilder.newBuilder(tn).setStartKey(Bytes.toBytes("b"))
1711        .setEndKey(Bytes.toBytes("c")).build();
1712    store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo2, null, Arrays.asList(sf4));
1713
1714    assertEquals(8192L, sizeStore.getRegionSize(regionInfo2).getSize());
1715  }
1716
1717  @Test
1718  public void testHFileContextSetWithCFAndTable() throws Exception {
1719    init(this.name.getMethodName());
1720    StoreFileWriter writer = store.getStoreEngine()
1721      .createWriter(CreateStoreFileWriterParams.create().maxKeyCount(10000L)
1722        .compression(Compression.Algorithm.NONE).isCompaction(true).includeMVCCReadpoint(true)
1723        .includesTag(false).shouldDropBehind(true));
1724    HFileContext hFileContext = writer.getHFileWriter().getFileContext();
1725    assertArrayEquals(family, hFileContext.getColumnFamily());
1726    assertArrayEquals(table, hFileContext.getTableName());
1727  }
1728
1729  // This test is for HBASE-26026, HBase Write be stuck when active segment has no cell
1730  // but its dataSize exceeds inmemoryFlushSize
1731  @Test
1732  public void testCompactingMemStoreNoCellButDataSizeExceedsInmemoryFlushSize()
1733      throws IOException, InterruptedException {
1734    Configuration conf = HBaseConfiguration.create();
1735
1736    byte[] smallValue = new byte[3];
1737    byte[] largeValue = new byte[9];
1738    final long timestamp = EnvironmentEdgeManager.currentTime();
1739    final long seqId = 100;
1740    final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
1741    final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
1742    int smallCellByteSize = MutableSegment.getCellLength(smallCell);
1743    int largeCellByteSize = MutableSegment.getCellLength(largeCell);
1744    int flushByteSize = smallCellByteSize + largeCellByteSize - 2;
1745
1746    // set CompactingMemStore.inmemoryFlushSize to flushByteSize.
1747    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore2.class.getName());
1748    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
1749    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
1750
1751    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
1752        .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
1753
1754    MyCompactingMemStore2 myCompactingMemStore = ((MyCompactingMemStore2) store.memstore);
1755    assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
1756    myCompactingMemStore.smallCellPreUpdateCounter.set(0);
1757    myCompactingMemStore.largeCellPreUpdateCounter.set(0);
1758
1759    final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
1760    Thread smallCellThread = new Thread(() -> {
1761      try {
1762        store.add(smallCell, new NonThreadSafeMemStoreSizing());
1763      } catch (Throwable exception) {
1764        exceptionRef.set(exception);
1765      }
1766    });
1767    smallCellThread.setName(MyCompactingMemStore2.SMALL_CELL_THREAD_NAME);
1768    smallCellThread.start();
1769
1770    String oldThreadName = Thread.currentThread().getName();
1771    try {
1772      /**
1773       * 1.smallCellThread enters CompactingMemStore.checkAndAddToActiveSize first, then
1774       * largeCellThread enters CompactingMemStore.checkAndAddToActiveSize, and then largeCellThread
1775       * invokes flushInMemory.
1776       * <p/>
1777       * 2. After largeCellThread finished CompactingMemStore.flushInMemory method, smallCellThread
1778       * can add cell to currentActive . That is to say when largeCellThread called flushInMemory
1779       * method, CompactingMemStore.active has no cell.
1780       */
1781      Thread.currentThread().setName(MyCompactingMemStore2.LARGE_CELL_THREAD_NAME);
1782      store.add(largeCell, new NonThreadSafeMemStoreSizing());
1783      smallCellThread.join();
1784
1785      for (int i = 0; i < 100; i++) {
1786        long currentTimestamp = timestamp + 100 + i;
1787        Cell cell = createCell(qf2, currentTimestamp, seqId, largeValue);
1788        store.add(cell, new NonThreadSafeMemStoreSizing());
1789      }
1790    } finally {
1791      Thread.currentThread().setName(oldThreadName);
1792    }
1793
1794    assertTrue(exceptionRef.get() == null);
1795
1796  }
1797
1798  // This test is for HBASE-26210, HBase Write be stuck when there is cell which size exceeds
1799  // InmemoryFlushSize
1800  @Test(timeout = 60000)
1801  public void testCompactingMemStoreCellExceedInmemoryFlushSize()
1802      throws Exception {
1803    Configuration conf = HBaseConfiguration.create();
1804    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore6.class.getName());
1805
1806    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
1807        .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
1808
1809    MyCompactingMemStore6 myCompactingMemStore = ((MyCompactingMemStore6) store.memstore);
1810
1811    int size = (int) (myCompactingMemStore.getInmemoryFlushSize());
1812    byte[] value = new byte[size + 1];
1813
1814    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1815    long timestamp = EnvironmentEdgeManager.currentTime();
1816    long seqId = 100;
1817    Cell cell = createCell(qf1, timestamp, seqId, value);
1818    int cellByteSize = MutableSegment.getCellLength(cell);
1819    store.add(cell, memStoreSizing);
1820    assertTrue(memStoreSizing.getCellsCount() == 1);
1821    assertTrue(memStoreSizing.getDataSize() == cellByteSize);
1822    // Waiting the in memory compaction completed, see HBASE-26438
1823    myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
1824  }
1825
1826  // This test is for HBASE-26210 also, test write large cell and small cell concurrently when
1827  // InmemoryFlushSize is smaller,equal with and larger than cell size.
1828  @Test
1829  public void testCompactingMemStoreWriteLargeCellAndSmallCellConcurrently()
1830      throws IOException, InterruptedException {
1831    doWriteTestLargeCellAndSmallCellConcurrently(
1832      (smallCellByteSize, largeCellByteSize) -> largeCellByteSize - 1);
1833    doWriteTestLargeCellAndSmallCellConcurrently(
1834      (smallCellByteSize, largeCellByteSize) -> largeCellByteSize);
1835    doWriteTestLargeCellAndSmallCellConcurrently(
1836      (smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize - 1);
1837    doWriteTestLargeCellAndSmallCellConcurrently(
1838      (smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize);
1839    doWriteTestLargeCellAndSmallCellConcurrently(
1840      (smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize + 1);
1841  }
1842
1843  private void doWriteTestLargeCellAndSmallCellConcurrently(
1844      IntBinaryOperator getFlushByteSize)
1845      throws IOException, InterruptedException {
1846
1847    Configuration conf = HBaseConfiguration.create();
1848
1849    byte[] smallValue = new byte[3];
1850    byte[] largeValue = new byte[100];
1851    final long timestamp = EnvironmentEdgeManager.currentTime();
1852    final long seqId = 100;
1853    final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
1854    final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
1855    int smallCellByteSize = MutableSegment.getCellLength(smallCell);
1856    int largeCellByteSize = MutableSegment.getCellLength(largeCell);
1857    int flushByteSize = getFlushByteSize.applyAsInt(smallCellByteSize, largeCellByteSize);
1858    boolean flushByteSizeLessThanSmallAndLargeCellSize =
1859        flushByteSize < (smallCellByteSize + largeCellByteSize);
1860
1861    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore3.class.getName());
1862    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
1863    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
1864
1865
1866    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
1867        .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
1868
1869    MyCompactingMemStore3 myCompactingMemStore = ((MyCompactingMemStore3) store.memstore);
1870    assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
1871    myCompactingMemStore.disableCompaction();
1872    if (flushByteSizeLessThanSmallAndLargeCellSize) {
1873      myCompactingMemStore.flushByteSizeLessThanSmallAndLargeCellSize = true;
1874    } else {
1875      myCompactingMemStore.flushByteSizeLessThanSmallAndLargeCellSize = false;
1876    }
1877
1878
1879    final ThreadSafeMemStoreSizing memStoreSizing = new ThreadSafeMemStoreSizing();
1880    final AtomicLong totalCellByteSize = new AtomicLong(0);
1881    final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
1882    Thread smallCellThread = new Thread(() -> {
1883      try {
1884        for (int i = 1; i <= MyCompactingMemStore3.CELL_COUNT; i++) {
1885          long currentTimestamp = timestamp + i;
1886          Cell cell = createCell(qf1, currentTimestamp, seqId, smallValue);
1887          totalCellByteSize.addAndGet(MutableSegment.getCellLength(cell));
1888          store.add(cell, memStoreSizing);
1889        }
1890      } catch (Throwable exception) {
1891        exceptionRef.set(exception);
1892
1893      }
1894    });
1895    smallCellThread.setName(MyCompactingMemStore3.SMALL_CELL_THREAD_NAME);
1896    smallCellThread.start();
1897
1898    String oldThreadName = Thread.currentThread().getName();
1899    try {
1900      /**
1901       * When flushByteSizeLessThanSmallAndLargeCellSize is true:
1902       * </p>
1903       * 1.smallCellThread enters MyCompactingMemStore3.checkAndAddToActiveSize first, then
1904       * largeCellThread enters MyCompactingMemStore3.checkAndAddToActiveSize, and then
1905       * largeCellThread invokes flushInMemory.
1906       * <p/>
1907       * 2. After largeCellThread finished CompactingMemStore.flushInMemory method, smallCellThread
1908       * can run into MyCompactingMemStore3.checkAndAddToActiveSize again.
1909       * <p/>
1910       * When flushByteSizeLessThanSmallAndLargeCellSize is false: smallCellThread and
1911       * largeCellThread concurrently write one cell and wait each other, and then write another
1912       * cell etc.
1913       */
1914      Thread.currentThread().setName(MyCompactingMemStore3.LARGE_CELL_THREAD_NAME);
1915      for (int i = 1; i <= MyCompactingMemStore3.CELL_COUNT; i++) {
1916        long currentTimestamp = timestamp + i;
1917        Cell cell = createCell(qf2, currentTimestamp, seqId, largeValue);
1918        totalCellByteSize.addAndGet(MutableSegment.getCellLength(cell));
1919        store.add(cell, memStoreSizing);
1920      }
1921      smallCellThread.join();
1922
1923      assertTrue(exceptionRef.get() == null);
1924      assertTrue(memStoreSizing.getCellsCount() == (MyCompactingMemStore3.CELL_COUNT * 2));
1925      assertTrue(memStoreSizing.getDataSize() == totalCellByteSize.get());
1926      if (flushByteSizeLessThanSmallAndLargeCellSize) {
1927        assertTrue(myCompactingMemStore.flushCounter.get() == MyCompactingMemStore3.CELL_COUNT);
1928      } else {
1929        assertTrue(
1930          myCompactingMemStore.flushCounter.get() <= (MyCompactingMemStore3.CELL_COUNT - 1));
1931      }
1932    } finally {
1933      Thread.currentThread().setName(oldThreadName);
1934    }
1935  }
1936
1937  /**
1938   * <pre>
1939   * This test is for HBASE-26384,
1940   * test {@link CompactingMemStore#flattenOneSegment} and {@link CompactingMemStore#snapshot()}
1941   * execute concurrently.
1942   * The threads sequence before HBASE-26384 is(The bug only exists for branch-2,and I add UTs
1943   * for both branch-2 and master):
1944   * 1. The {@link CompactingMemStore} size exceeds
1945   *    {@link CompactingMemStore#getInmemoryFlushSize()},the write thread adds a new
1946   *    {@link ImmutableSegment}  to the head of {@link CompactingMemStore#pipeline},and start a
1947   *    in memory compact thread to execute {@link CompactingMemStore#inMemoryCompaction}.
1948   * 2. The in memory compact thread starts and then stopping before
1949   *    {@link CompactingMemStore#flattenOneSegment}.
1950   * 3. The snapshot thread starts {@link CompactingMemStore#snapshot} concurrently,after the
1951   *    snapshot thread executing {@link CompactingMemStore#getImmutableSegments},the in memory
1952   *    compact thread continues.
1953   *    Assuming {@link VersionedSegmentsList#version} returned from
1954   *    {@link CompactingMemStore#getImmutableSegments} is v.
1955   * 4. The snapshot thread stopping before {@link CompactingMemStore#swapPipelineWithNull}.
1956   * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
1957   *    {@link CompactionPipeline#version} is still v.
1958   * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
1959   *    {@link CompactionPipeline#version} is v, {@link CompactingMemStore#swapPipelineWithNull}
1960   *    thinks it is successful and continue flushing,but the {@link ImmutableSegment} in
1961   *    {@link CompactionPipeline} has changed because
1962   *    {@link CompactingMemStore#flattenOneSegment},so the {@link ImmutableSegment} is not
1963   *    removed in fact and still remaining in {@link CompactionPipeline}.
1964   *
1965   * After HBASE-26384, the 5-6 step is changed to following, which is expected behavior:
1966   * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
1967   *    {@link CompactingMemStore#flattenOneSegment} change {@link CompactionPipeline#version} to
1968   *    v+1.
1969   * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
1970   *    {@link CompactionPipeline#version} is v+1, {@link CompactingMemStore#swapPipelineWithNull}
1971   *    failed and retry the while loop in {@link CompactingMemStore#pushPipelineToSnapshot} once
1972   *    again, because there is no concurrent {@link CompactingMemStore#inMemoryCompaction} now,
1973   *    {@link CompactingMemStore#swapPipelineWithNull} succeeds.
1974   * </pre>
1975   */
1976  @Test
1977  public void testFlattenAndSnapshotCompactingMemStoreConcurrently() throws Exception {
1978    Configuration conf = HBaseConfiguration.create();
1979
1980    byte[] smallValue = new byte[3];
1981    byte[] largeValue = new byte[9];
1982    final long timestamp = EnvironmentEdgeManager.currentTime();
1983    final long seqId = 100;
1984    final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
1985    final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
1986    int smallCellByteSize = MutableSegment.getCellLength(smallCell);
1987    int largeCellByteSize = MutableSegment.getCellLength(largeCell);
1988    int totalCellByteSize = (smallCellByteSize + largeCellByteSize);
1989    int flushByteSize = totalCellByteSize - 2;
1990
1991    // set CompactingMemStore.inmemoryFlushSize to flushByteSize.
1992    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore4.class.getName());
1993    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
1994    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
1995
1996    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
1997        .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
1998
1999    MyCompactingMemStore4 myCompactingMemStore = ((MyCompactingMemStore4) store.memstore);
2000    assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
2001
2002    store.add(smallCell, new NonThreadSafeMemStoreSizing());
2003    store.add(largeCell, new NonThreadSafeMemStoreSizing());
2004
2005    String oldThreadName = Thread.currentThread().getName();
2006    try {
2007      Thread.currentThread().setName(MyCompactingMemStore4.TAKE_SNAPSHOT_THREAD_NAME);
2008      /**
2009       * {@link CompactingMemStore#snapshot} must wait the in memory compact thread enters
2010       * {@link CompactingMemStore#flattenOneSegment},because {@link CompactingMemStore#snapshot}
2011       * would invoke {@link CompactingMemStore#stopCompaction}.
2012       */
2013      myCompactingMemStore.snapShotStartCyclicCyclicBarrier.await();
2014
2015      MemStoreSnapshot memStoreSnapshot = myCompactingMemStore.snapshot();
2016      myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
2017
2018      assertTrue(memStoreSnapshot.getCellsCount() == 2);
2019      assertTrue(((int) (memStoreSnapshot.getDataSize())) == totalCellByteSize);
2020      VersionedSegmentsList segments = myCompactingMemStore.getImmutableSegments();
2021      assertTrue(segments.getNumOfSegments() == 0);
2022      assertTrue(segments.getNumOfCells() == 0);
2023      assertTrue(myCompactingMemStore.setInMemoryCompactionFlagCounter.get() == 1);
2024      assertTrue(myCompactingMemStore.swapPipelineWithNullCounter.get() == 2);
2025    } finally {
2026      Thread.currentThread().setName(oldThreadName);
2027    }
2028  }
2029
2030  /**
2031   * <pre>
2032   * This test is for HBASE-26384,
2033   * test {@link CompactingMemStore#flattenOneSegment}{@link CompactingMemStore#snapshot()}
2034   * and writeMemStore execute concurrently.
2035   * The threads sequence before HBASE-26384 is(The bug only exists for branch-2,and I add UTs
2036   * for both branch-2 and master):
2037   * 1. The {@link CompactingMemStore} size exceeds
2038   *    {@link CompactingMemStore#getInmemoryFlushSize()},the write thread adds a new
2039   *    {@link ImmutableSegment}  to the head of {@link CompactingMemStore#pipeline},and start a
2040   *    in memory compact thread to execute {@link CompactingMemStore#inMemoryCompaction}.
2041   * 2. The in memory compact thread starts and then stopping before
2042   *    {@link CompactingMemStore#flattenOneSegment}.
2043   * 3. The snapshot thread starts {@link CompactingMemStore#snapshot} concurrently,after the
2044   *    snapshot thread executing {@link CompactingMemStore#getImmutableSegments},the in memory
2045   *    compact thread continues.
2046   *    Assuming {@link VersionedSegmentsList#version} returned from
2047   *    {@link CompactingMemStore#getImmutableSegments} is v.
2048   * 4. The snapshot thread stopping before {@link CompactingMemStore#swapPipelineWithNull}.
2049   * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
2050   *    {@link CompactionPipeline#version} is still v.
2051   * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
2052   *    {@link CompactionPipeline#version} is v, {@link CompactingMemStore#swapPipelineWithNull}
2053   *    thinks it is successful and continue flushing,but the {@link ImmutableSegment} in
2054   *    {@link CompactionPipeline} has changed because
2055   *    {@link CompactingMemStore#flattenOneSegment},so the {@link ImmutableSegment} is not
2056   *    removed in fact and still remaining in {@link CompactionPipeline}.
2057   *
2058   * After HBASE-26384, the 5-6 step is changed to following, which is expected behavior,
2059   * and I add step 7-8 to test there is new segment added before retry.
2060   * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
2061   *    {@link CompactingMemStore#flattenOneSegment} change {@link CompactionPipeline#version} to
2062   *     v+1.
2063   * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
2064   *    {@link CompactionPipeline#version} is v+1, {@link CompactingMemStore#swapPipelineWithNull}
2065   *    failed and retry,{@link VersionedSegmentsList#version} returned from
2066   *    {@link CompactingMemStore#getImmutableSegments} is v+1.
2067   * 7. The write thread continues writing to {@link CompactingMemStore} and
2068   *    {@link CompactingMemStore} size exceeds {@link CompactingMemStore#getInmemoryFlushSize()},
2069   *    {@link CompactingMemStore#flushInMemory(MutableSegment)} is called and a new
2070   *    {@link ImmutableSegment} is added to the head of {@link CompactingMemStore#pipeline},
2071   *    {@link CompactionPipeline#version} is still v+1.
2072   * 8. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
2073   *    {@link CompactionPipeline#version} is still v+1,
2074   *    {@link CompactingMemStore#swapPipelineWithNull} succeeds.The new {@link ImmutableSegment}
2075   *    remained at the head of {@link CompactingMemStore#pipeline},the old is removed by
2076   *    {@link CompactingMemStore#swapPipelineWithNull}.
2077   * </pre>
2078   */
2079  @Test
2080  public void testFlattenSnapshotWriteCompactingMemeStoreConcurrently() throws Exception {
2081    Configuration conf = HBaseConfiguration.create();
2082
2083    byte[] smallValue = new byte[3];
2084    byte[] largeValue = new byte[9];
2085    final long timestamp = EnvironmentEdgeManager.currentTime();
2086    final long seqId = 100;
2087    final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
2088    final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
2089    int smallCellByteSize = MutableSegment.getCellLength(smallCell);
2090    int largeCellByteSize = MutableSegment.getCellLength(largeCell);
2091    int firstWriteCellByteSize = (smallCellByteSize + largeCellByteSize);
2092    int flushByteSize = firstWriteCellByteSize - 2;
2093
2094    // set CompactingMemStore.inmemoryFlushSize to flushByteSize.
2095    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore5.class.getName());
2096    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
2097    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
2098
2099    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
2100        .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
2101
2102    final MyCompactingMemStore5 myCompactingMemStore = ((MyCompactingMemStore5) store.memstore);
2103    assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
2104
2105    store.add(smallCell, new NonThreadSafeMemStoreSizing());
2106    store.add(largeCell, new NonThreadSafeMemStoreSizing());
2107
2108    final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
2109    final Cell writeAgainCell1 = createCell(qf3, timestamp, seqId + 1, largeValue);
2110    final Cell writeAgainCell2 = createCell(qf4, timestamp, seqId + 1, largeValue);
2111    final int writeAgainCellByteSize = MutableSegment.getCellLength(writeAgainCell1)
2112        + MutableSegment.getCellLength(writeAgainCell2);
2113    final Thread writeAgainThread = new Thread(() -> {
2114      try {
2115        myCompactingMemStore.writeMemStoreAgainStartCyclicBarrier.await();
2116
2117        store.add(writeAgainCell1, new NonThreadSafeMemStoreSizing());
2118        store.add(writeAgainCell2, new NonThreadSafeMemStoreSizing());
2119
2120        myCompactingMemStore.writeMemStoreAgainEndCyclicBarrier.await();
2121      } catch (Throwable exception) {
2122        exceptionRef.set(exception);
2123      }
2124    });
2125    writeAgainThread.setName(MyCompactingMemStore5.WRITE_AGAIN_THREAD_NAME);
2126    writeAgainThread.start();
2127
2128    String oldThreadName = Thread.currentThread().getName();
2129    try {
2130      Thread.currentThread().setName(MyCompactingMemStore5.TAKE_SNAPSHOT_THREAD_NAME);
2131      /**
2132       * {@link CompactingMemStore#snapshot} must wait the in memory compact thread enters
2133       * {@link CompactingMemStore#flattenOneSegment},because {@link CompactingMemStore#snapshot}
2134       * would invoke {@link CompactingMemStore#stopCompaction}.
2135       */
2136      myCompactingMemStore.snapShotStartCyclicCyclicBarrier.await();
2137      MemStoreSnapshot memStoreSnapshot = myCompactingMemStore.snapshot();
2138      myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
2139      writeAgainThread.join();
2140
2141      assertTrue(memStoreSnapshot.getCellsCount() == 2);
2142      assertTrue(((int) (memStoreSnapshot.getDataSize())) == firstWriteCellByteSize);
2143      VersionedSegmentsList segments = myCompactingMemStore.getImmutableSegments();
2144      assertTrue(segments.getNumOfSegments() == 1);
2145      assertTrue(
2146        ((int) (segments.getStoreSegments().get(0).getDataSize())) == writeAgainCellByteSize);
2147      assertTrue(segments.getNumOfCells() == 2);
2148      assertTrue(myCompactingMemStore.setInMemoryCompactionFlagCounter.get() == 2);
2149      assertTrue(exceptionRef.get() == null);
2150      assertTrue(myCompactingMemStore.swapPipelineWithNullCounter.get() == 2);
2151    } finally {
2152      Thread.currentThread().setName(oldThreadName);
2153    }
2154  }
2155
2156  /**
2157   * <pre>
2158   * This test is for HBASE-26465,
2159   * test {@link DefaultMemStore#clearSnapshot} and {@link DefaultMemStore#getScanners} execute
2160   * concurrently. The threads sequence before HBASE-26465 is:
2161   * 1.The flush thread starts {@link DefaultMemStore} flushing after some cells have be added to
2162   *  {@link DefaultMemStore}.
2163   * 2.The flush thread stopping before {@link DefaultMemStore#clearSnapshot} in
2164   *   {@link HStore#updateStorefiles} after completed flushing memStore to hfile.
2165   * 3.The scan thread starts and stopping after {@link DefaultMemStore#getSnapshotSegments} in
2166   *   {@link DefaultMemStore#getScanners},here the scan thread gets the
2167   *   {@link DefaultMemStore#snapshot} which is created by the flush thread.
2168   * 4.The flush thread continues {@link DefaultMemStore#clearSnapshot} and close
2169   *   {@link DefaultMemStore#snapshot},because the reference count of the corresponding
2170   *   {@link MemStoreLABImpl} is 0, the {@link Chunk}s in corresponding {@link MemStoreLABImpl}
2171   *   are recycled.
2172   * 5.The scan thread continues {@link DefaultMemStore#getScanners},and create a
2173   *   {@link SegmentScanner} for this {@link DefaultMemStore#snapshot}, and increase the
2174   *   reference count of the corresponding {@link MemStoreLABImpl}, but {@link Chunk}s in
2175   *   corresponding {@link MemStoreLABImpl} are recycled by step 4, and these {@link Chunk}s may
2176   *   be overwritten by other write threads,which may cause serious problem.
2177   * After HBASE-26465,{@link DefaultMemStore#getScanners} and
2178   * {@link DefaultMemStore#clearSnapshot} could not execute concurrently.
2179   * </pre>
2180   */
2181  @Test
2182  public void testClearSnapshotGetScannerConcurrently() throws Exception {
2183    Configuration conf = HBaseConfiguration.create();
2184
2185    byte[] smallValue = new byte[3];
2186    byte[] largeValue = new byte[9];
2187    final long timestamp = EnvironmentEdgeManager.currentTime();
2188    final long seqId = 100;
2189    final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
2190    final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
2191    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
2192    quals.add(qf1);
2193    quals.add(qf2);
2194
2195    conf.set(HStore.MEMSTORE_CLASS_NAME, MyDefaultMemStore.class.getName());
2196    conf.setBoolean(WALFactory.WAL_ENABLED, false);
2197
2198    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build());
2199    MyDefaultMemStore myDefaultMemStore = (MyDefaultMemStore) (store.memstore);
2200    myDefaultMemStore.store = store;
2201
2202    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
2203    store.add(smallCell, memStoreSizing);
2204    store.add(largeCell, memStoreSizing);
2205
2206    final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
2207    final Thread flushThread = new Thread(() -> {
2208      try {
2209        flushStore(store, id++);
2210      } catch (Throwable exception) {
2211        exceptionRef.set(exception);
2212      }
2213    });
2214    flushThread.setName(MyDefaultMemStore.FLUSH_THREAD_NAME);
2215    flushThread.start();
2216
2217    String oldThreadName = Thread.currentThread().getName();
2218    StoreScanner storeScanner = null;
2219    try {
2220      Thread.currentThread().setName(MyDefaultMemStore.GET_SCANNER_THREAD_NAME);
2221
2222      /**
2223       * Wait flush thread stopping before {@link DefaultMemStore#doClearSnapshot}
2224       */
2225      myDefaultMemStore.getScannerCyclicBarrier.await();
2226
2227      storeScanner = (StoreScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1);
2228      flushThread.join();
2229
2230      if (myDefaultMemStore.shouldWait) {
2231        SegmentScanner segmentScanner = getTypeKeyValueScanner(storeScanner, SegmentScanner.class);
2232        MemStoreLABImpl memStoreLAB = (MemStoreLABImpl) (segmentScanner.segment.getMemStoreLAB());
2233        assertTrue(memStoreLAB.isClosed());
2234        assertTrue(!memStoreLAB.chunks.isEmpty());
2235        assertTrue(!memStoreLAB.isReclaimed());
2236
2237        Cell cell1 = segmentScanner.next();
2238        CellUtil.equals(smallCell, cell1);
2239        Cell cell2 = segmentScanner.next();
2240        CellUtil.equals(largeCell, cell2);
2241        assertNull(segmentScanner.next());
2242      } else {
2243        List<Cell> results = new ArrayList<>();
2244        storeScanner.next(results);
2245        assertEquals(2, results.size());
2246        CellUtil.equals(smallCell, results.get(0));
2247        CellUtil.equals(largeCell, results.get(1));
2248      }
2249      assertTrue(exceptionRef.get() == null);
2250    } finally {
2251      if (storeScanner != null) {
2252        storeScanner.close();
2253      }
2254      Thread.currentThread().setName(oldThreadName);
2255    }
2256  }
2257
2258  @SuppressWarnings("unchecked")
2259  private <T> T getTypeKeyValueScanner(StoreScanner storeScanner, Class<T> keyValueScannerClass) {
2260    List<T> resultScanners = new ArrayList<T>();
2261    for (KeyValueScanner keyValueScanner : storeScanner.currentScanners) {
2262      if (keyValueScannerClass.isInstance(keyValueScanner)) {
2263        resultScanners.add((T) keyValueScanner);
2264      }
2265    }
2266    assertTrue(resultScanners.size() == 1);
2267    return resultScanners.get(0);
2268  }
2269
2270  @Test 
2271  public void testOnConfigurationChange() throws IOException {
2272    final int COMMON_MAX_FILES_TO_COMPACT = 10;
2273    final int NEW_COMMON_MAX_FILES_TO_COMPACT = 8;
2274    final int STORE_MAX_FILES_TO_COMPACT = 6;
2275
2276    //Build a table that its maxFileToCompact different from common configuration.
2277    Configuration conf = HBaseConfiguration.create();
2278    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY,
2279      COMMON_MAX_FILES_TO_COMPACT);
2280    ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.newBuilder(family)
2281      .setConfiguration(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY,
2282        String.valueOf(STORE_MAX_FILES_TO_COMPACT)).build();
2283    init(this.name.getMethodName(), conf, hcd);
2284
2285    //After updating common configuration, the conf in HStore itself must not be changed.
2286    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY,
2287      NEW_COMMON_MAX_FILES_TO_COMPACT);
2288    this.store.onConfigurationChange(conf);
2289    assertEquals(STORE_MAX_FILES_TO_COMPACT,
2290      store.getStoreEngine().getCompactionPolicy().getConf().getMaxFilesToCompact());
2291  }
2292
2293  /**
2294   * This test is for HBASE-26476
2295   */
2296  @Test
2297  public void testExtendsDefaultMemStore() throws Exception {
2298    Configuration conf = HBaseConfiguration.create();
2299    conf.setBoolean(WALFactory.WAL_ENABLED, false);
2300
2301    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build());
2302    assertTrue(this.store.memstore.getClass() == DefaultMemStore.class);
2303    tearDown();
2304
2305    conf.set(HStore.MEMSTORE_CLASS_NAME, CustomDefaultMemStore.class.getName());
2306    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build());
2307    assertTrue(this.store.memstore.getClass() == CustomDefaultMemStore.class);
2308  }
2309
2310  static class CustomDefaultMemStore extends DefaultMemStore {
2311
2312    public CustomDefaultMemStore(Configuration conf, CellComparator c,
2313        RegionServicesForStores regionServices) {
2314      super(conf, c, regionServices);
2315    }
2316
2317  }
2318
2319  /**
2320   * This test is for HBASE-26488
2321   */
2322  @Test
2323  public void testMemoryLeakWhenFlushMemStoreRetrying() throws Exception {
2324
2325    Configuration conf = HBaseConfiguration.create();
2326
2327    byte[] smallValue = new byte[3];
2328    byte[] largeValue = new byte[9];
2329    final long timestamp = EnvironmentEdgeManager.currentTime();
2330    final long seqId = 100;
2331    final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
2332    final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
2333    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
2334    quals.add(qf1);
2335    quals.add(qf2);
2336
2337    conf.set(HStore.MEMSTORE_CLASS_NAME, MyDefaultMemStore1.class.getName());
2338    conf.setBoolean(WALFactory.WAL_ENABLED, false);
2339    conf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY,
2340      MyDefaultStoreFlusher.class.getName());
2341
2342    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build());
2343    MyDefaultMemStore1 myDefaultMemStore = (MyDefaultMemStore1) (store.memstore);
2344    assertTrue((store.storeEngine.getStoreFlusher()) instanceof MyDefaultStoreFlusher);
2345
2346    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
2347    store.add(smallCell, memStoreSizing);
2348    store.add(largeCell, memStoreSizing);
2349    flushStore(store, id++);
2350
2351    MemStoreLABImpl memStoreLAB =
2352        (MemStoreLABImpl) (myDefaultMemStore.snapshotImmutableSegment.getMemStoreLAB());
2353    assertTrue(memStoreLAB.isClosed());
2354    assertTrue(memStoreLAB.getRefCntValue() == 0);
2355    assertTrue(memStoreLAB.isReclaimed());
2356    assertTrue(memStoreLAB.chunks.isEmpty());
2357    StoreScanner storeScanner = null;
2358    try {
2359      storeScanner =
2360          (StoreScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1);
2361      assertTrue(store.storeEngine.getStoreFileManager().getStorefileCount() == 1);
2362      assertTrue(store.memstore.size().getCellsCount() == 0);
2363      assertTrue(store.memstore.getSnapshotSize().getCellsCount() == 0);
2364      assertTrue(storeScanner.currentScanners.size() == 1);
2365      assertTrue(storeScanner.currentScanners.get(0) instanceof StoreFileScanner);
2366
2367      List<Cell> results = new ArrayList<>();
2368      storeScanner.next(results);
2369      assertEquals(2, results.size());
2370      CellUtil.equals(smallCell, results.get(0));
2371      CellUtil.equals(largeCell, results.get(1));
2372    } finally {
2373      if (storeScanner != null) {
2374        storeScanner.close();
2375      }
2376    }
2377  }
2378
2379
2380  static class MyDefaultMemStore1 extends DefaultMemStore {
2381
2382    private ImmutableSegment snapshotImmutableSegment;
2383
2384    public MyDefaultMemStore1(Configuration conf, CellComparator c,
2385        RegionServicesForStores regionServices) {
2386      super(conf, c, regionServices);
2387    }
2388
2389    @Override
2390    public MemStoreSnapshot snapshot() {
2391      MemStoreSnapshot result = super.snapshot();
2392      this.snapshotImmutableSegment = snapshot;
2393      return result;
2394    }
2395
2396  }
2397
2398  public static class MyDefaultStoreFlusher extends DefaultStoreFlusher {
2399    private static final AtomicInteger failCounter = new AtomicInteger(1);
2400    private static final AtomicInteger counter = new AtomicInteger(0);
2401
2402    public MyDefaultStoreFlusher(Configuration conf, HStore store) {
2403      super(conf, store);
2404    }
2405
2406    @Override
2407    public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
2408        MonitoredTask status, ThroughputController throughputController,
2409        FlushLifeCycleTracker tracker) throws IOException {
2410      counter.incrementAndGet();
2411      return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker);
2412    }
2413
2414    @Override
2415    protected void performFlush(InternalScanner scanner, final CellSink sink,
2416        ThroughputController throughputController) throws IOException {
2417
2418      final int currentCount = counter.get();
2419      CellSink newCellSink = (cell) -> {
2420        if (currentCount <= failCounter.get()) {
2421          throw new IOException("Simulated exception by tests");
2422        }
2423        sink.append(cell);
2424      };
2425      super.performFlush(scanner, newCellSink, throughputController);
2426    }
2427  }
2428
2429  /**
2430   * This test is for HBASE-26494, test the {@link RefCnt} behaviors in {@link ImmutableMemStoreLAB}
2431   */
2432  @Test
2433  public void testImmutableMemStoreLABRefCnt() throws Exception {
2434    Configuration conf = HBaseConfiguration.create();
2435
2436    byte[] smallValue = new byte[3];
2437    byte[] largeValue = new byte[9];
2438    final long timestamp = EnvironmentEdgeManager.currentTime();
2439    final long seqId = 100;
2440    final Cell smallCell1 = createCell(qf1, timestamp, seqId, smallValue);
2441    final Cell largeCell1 = createCell(qf2, timestamp, seqId, largeValue);
2442    final Cell smallCell2 = createCell(qf3, timestamp, seqId+1, smallValue);
2443    final Cell largeCell2 = createCell(qf4, timestamp, seqId+1, largeValue);
2444    final Cell smallCell3 = createCell(qf5, timestamp, seqId+2, smallValue);
2445    final Cell largeCell3 = createCell(qf6, timestamp, seqId+2, largeValue);
2446
2447    int smallCellByteSize = MutableSegment.getCellLength(smallCell1);
2448    int largeCellByteSize = MutableSegment.getCellLength(largeCell1);
2449    int firstWriteCellByteSize = (smallCellByteSize + largeCellByteSize);
2450    int flushByteSize = firstWriteCellByteSize - 2;
2451
2452    // set CompactingMemStore.inmemoryFlushSize to flushByteSize.
2453    conf.set(HStore.MEMSTORE_CLASS_NAME, CompactingMemStore.class.getName());
2454    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
2455    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
2456    conf.setBoolean(WALFactory.WAL_ENABLED, false);
2457
2458    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
2459        .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
2460
2461    final CompactingMemStore myCompactingMemStore = ((CompactingMemStore) store.memstore);
2462    assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
2463    myCompactingMemStore.allowCompaction.set(false);
2464
2465    NonThreadSafeMemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
2466    store.add(smallCell1, memStoreSizing);
2467    store.add(largeCell1, memStoreSizing);
2468    store.add(smallCell2, memStoreSizing);
2469    store.add(largeCell2, memStoreSizing);
2470    store.add(smallCell3, memStoreSizing);
2471    store.add(largeCell3, memStoreSizing);
2472    VersionedSegmentsList versionedSegmentsList = myCompactingMemStore.getImmutableSegments();
2473    assertTrue(versionedSegmentsList.getNumOfSegments() == 3);
2474    List<ImmutableSegment> segments = versionedSegmentsList.getStoreSegments();
2475    List<MemStoreLABImpl> memStoreLABs = new ArrayList<MemStoreLABImpl>(segments.size());
2476    for (ImmutableSegment segment : segments) {
2477      memStoreLABs.add((MemStoreLABImpl) segment.getMemStoreLAB());
2478    }
2479    List<KeyValueScanner> scanners1 = myCompactingMemStore.getScanners(Long.MAX_VALUE);
2480    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2481      assertTrue(memStoreLAB.getRefCntValue() == 2);
2482    }
2483
2484    myCompactingMemStore.allowCompaction.set(true);
2485    myCompactingMemStore.flushInMemory();
2486
2487    versionedSegmentsList = myCompactingMemStore.getImmutableSegments();
2488    assertTrue(versionedSegmentsList.getNumOfSegments() == 1);
2489    ImmutableMemStoreLAB immutableMemStoreLAB =
2490        (ImmutableMemStoreLAB) (versionedSegmentsList.getStoreSegments().get(0).getMemStoreLAB());
2491    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2492      assertTrue(memStoreLAB.getRefCntValue() == 2);
2493    }
2494
2495    List<KeyValueScanner> scanners2 = myCompactingMemStore.getScanners(Long.MAX_VALUE);
2496    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2497      assertTrue(memStoreLAB.getRefCntValue() == 2);
2498    }
2499    assertTrue(immutableMemStoreLAB.getRefCntValue() == 2);
2500    for (KeyValueScanner scanner : scanners1) {
2501      scanner.close();
2502    }
2503    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2504      assertTrue(memStoreLAB.getRefCntValue() == 1);
2505    }
2506    for (KeyValueScanner scanner : scanners2) {
2507      scanner.close();
2508    }
2509    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2510      assertTrue(memStoreLAB.getRefCntValue() == 1);
2511    }
2512    assertTrue(immutableMemStoreLAB.getRefCntValue() == 1);
2513    flushStore(store, id++);
2514    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2515      assertTrue(memStoreLAB.getRefCntValue() == 0);
2516    }
2517    assertTrue(immutableMemStoreLAB.getRefCntValue() == 0);
2518    assertTrue(immutableMemStoreLAB.isClosed());
2519    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2520      assertTrue(memStoreLAB.isClosed());
2521      assertTrue(memStoreLAB.isReclaimed());
2522      assertTrue(memStoreLAB.chunks.isEmpty());
2523    }
2524  }
2525
2526  private HStoreFile mockStoreFileWithLength(long length) {
2527    HStoreFile sf = mock(HStoreFile.class);
2528    StoreFileReader sfr = mock(StoreFileReader.class);
2529    when(sf.isHFile()).thenReturn(true);
2530    when(sf.getReader()).thenReturn(sfr);
2531    when(sfr.length()).thenReturn(length);
2532    return sf;
2533  }
2534
2535  private static class MyThread extends Thread {
2536    private StoreScanner scanner;
2537    private KeyValueHeap heap;
2538
2539    public MyThread(StoreScanner scanner) {
2540      this.scanner = scanner;
2541    }
2542
2543    public KeyValueHeap getHeap() {
2544      return this.heap;
2545    }
2546
2547    @Override
2548    public void run() {
2549      scanner.trySwitchToStreamRead();
2550      heap = scanner.heap;
2551    }
2552  }
2553
2554  private static class MyMemStoreCompactor extends MemStoreCompactor {
2555    private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0);
2556    private static final CountDownLatch START_COMPACTOR_LATCH = new CountDownLatch(1);
2557    public MyMemStoreCompactor(CompactingMemStore compactingMemStore, MemoryCompactionPolicy
2558        compactionPolicy) throws IllegalArgumentIOException {
2559      super(compactingMemStore, compactionPolicy);
2560    }
2561
2562    @Override
2563    public boolean start() throws IOException {
2564      boolean isFirst = RUNNER_COUNT.getAndIncrement() == 0;
2565      if (isFirst) {
2566        try {
2567          START_COMPACTOR_LATCH.await();
2568          return super.start();
2569        } catch (InterruptedException ex) {
2570          throw new RuntimeException(ex);
2571        }
2572      }
2573      return super.start();
2574    }
2575  }
2576
2577  public static class MyCompactingMemStoreWithCustomCompactor extends CompactingMemStore {
2578    private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0);
2579    public MyCompactingMemStoreWithCustomCompactor(Configuration conf, CellComparatorImpl c,
2580        HStore store, RegionServicesForStores regionServices,
2581        MemoryCompactionPolicy compactionPolicy) throws IOException {
2582      super(conf, c, store, regionServices, compactionPolicy);
2583    }
2584
2585    @Override
2586    protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy)
2587        throws IllegalArgumentIOException {
2588      return new MyMemStoreCompactor(this, compactionPolicy);
2589    }
2590
2591    @Override
2592    protected boolean setInMemoryCompactionFlag() {
2593      boolean rval = super.setInMemoryCompactionFlag();
2594      if (rval) {
2595        RUNNER_COUNT.incrementAndGet();
2596        if (LOG.isDebugEnabled()) {
2597          LOG.debug("runner count: " + RUNNER_COUNT.get());
2598        }
2599      }
2600      return rval;
2601    }
2602  }
2603
2604  public static class MyCompactingMemStore extends CompactingMemStore {
2605    private static final AtomicBoolean START_TEST = new AtomicBoolean(false);
2606    private final CountDownLatch getScannerLatch = new CountDownLatch(1);
2607    private final CountDownLatch snapshotLatch = new CountDownLatch(1);
2608    public MyCompactingMemStore(Configuration conf, CellComparatorImpl c,
2609        HStore store, RegionServicesForStores regionServices,
2610        MemoryCompactionPolicy compactionPolicy) throws IOException {
2611      super(conf, c, store, regionServices, compactionPolicy);
2612    }
2613
2614    @Override
2615    protected List<KeyValueScanner> createList(int capacity) {
2616      if (START_TEST.get()) {
2617        try {
2618          getScannerLatch.countDown();
2619          snapshotLatch.await();
2620        } catch (InterruptedException e) {
2621          throw new RuntimeException(e);
2622        }
2623      }
2624      return new ArrayList<>(capacity);
2625    }
2626    @Override
2627    protected void pushActiveToPipeline(MutableSegment active, boolean checkEmpty) {
2628      if (START_TEST.get()) {
2629        try {
2630          getScannerLatch.await();
2631        } catch (InterruptedException e) {
2632          throw new RuntimeException(e);
2633        }
2634      }
2635
2636      super.pushActiveToPipeline(active, checkEmpty);
2637      if (START_TEST.get()) {
2638        snapshotLatch.countDown();
2639      }
2640    }
2641  }
2642
2643  interface MyListHook {
2644    void hook(int currentSize);
2645  }
2646
2647  private static class MyList<T> implements List<T> {
2648    private final List<T> delegatee = new ArrayList<>();
2649    private final MyListHook hookAtAdd;
2650    MyList(final MyListHook hookAtAdd) {
2651      this.hookAtAdd = hookAtAdd;
2652    }
2653    @Override
2654    public int size() {return delegatee.size();}
2655
2656    @Override
2657    public boolean isEmpty() {return delegatee.isEmpty();}
2658
2659    @Override
2660    public boolean contains(Object o) {return delegatee.contains(o);}
2661
2662    @Override
2663    public Iterator<T> iterator() {return delegatee.iterator();}
2664
2665    @Override
2666    public Object[] toArray() {return delegatee.toArray();}
2667
2668    @Override
2669    public <R> R[] toArray(R[] a) {return delegatee.toArray(a);}
2670
2671    @Override
2672    public boolean add(T e) {
2673      hookAtAdd.hook(size());
2674      return delegatee.add(e);
2675    }
2676
2677    @Override
2678    public boolean remove(Object o) {return delegatee.remove(o);}
2679
2680    @Override
2681    public boolean containsAll(Collection<?> c) {return delegatee.containsAll(c);}
2682
2683    @Override
2684    public boolean addAll(Collection<? extends T> c) {return delegatee.addAll(c);}
2685
2686    @Override
2687    public boolean addAll(int index, Collection<? extends T> c) {return delegatee.addAll(index, c);}
2688
2689    @Override
2690    public boolean removeAll(Collection<?> c) {return delegatee.removeAll(c);}
2691
2692    @Override
2693    public boolean retainAll(Collection<?> c) {return delegatee.retainAll(c);}
2694
2695    @Override
2696    public void clear() {delegatee.clear();}
2697
2698    @Override
2699    public T get(int index) {return delegatee.get(index);}
2700
2701    @Override
2702    public T set(int index, T element) {return delegatee.set(index, element);}
2703
2704    @Override
2705    public void add(int index, T element) {delegatee.add(index, element);}
2706
2707    @Override
2708    public T remove(int index) {return delegatee.remove(index);}
2709
2710    @Override
2711    public int indexOf(Object o) {return delegatee.indexOf(o);}
2712
2713    @Override
2714    public int lastIndexOf(Object o) {return delegatee.lastIndexOf(o);}
2715
2716    @Override
2717    public ListIterator<T> listIterator() {return delegatee.listIterator();}
2718
2719    @Override
2720    public ListIterator<T> listIterator(int index) {return delegatee.listIterator(index);}
2721
2722    @Override
2723    public List<T> subList(int fromIndex, int toIndex) {return delegatee.subList(fromIndex, toIndex);}
2724  }
2725
2726  public static class MyCompactingMemStore2 extends CompactingMemStore {
2727    private static final String LARGE_CELL_THREAD_NAME = "largeCellThread";
2728    private static final String SMALL_CELL_THREAD_NAME = "smallCellThread";
2729    private final CyclicBarrier preCyclicBarrier = new CyclicBarrier(2);
2730    private final CyclicBarrier postCyclicBarrier = new CyclicBarrier(2);
2731    private final AtomicInteger largeCellPreUpdateCounter = new AtomicInteger(0);
2732    private final AtomicInteger smallCellPreUpdateCounter = new AtomicInteger(0);
2733
2734    public MyCompactingMemStore2(Configuration conf, CellComparatorImpl cellComparator,
2735        HStore store, RegionServicesForStores regionServices,
2736        MemoryCompactionPolicy compactionPolicy) throws IOException {
2737      super(conf, cellComparator, store, regionServices, compactionPolicy);
2738    }
2739
2740    @Override
2741    protected boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd,
2742        MemStoreSizing memstoreSizing) {
2743      if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) {
2744        int currentCount = largeCellPreUpdateCounter.incrementAndGet();
2745        if (currentCount <= 1) {
2746          try {
2747            /**
2748             * smallCellThread enters CompactingMemStore.checkAndAddToActiveSize first, then
2749             * largeCellThread enters CompactingMemStore.checkAndAddToActiveSize, and then
2750             * largeCellThread invokes flushInMemory.
2751             */
2752            preCyclicBarrier.await();
2753          } catch (Throwable e) {
2754            throw new RuntimeException(e);
2755          }
2756        }
2757      }
2758
2759      boolean returnValue = super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing);
2760      if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) {
2761        try {
2762          preCyclicBarrier.await();
2763        } catch (Throwable e) {
2764          throw new RuntimeException(e);
2765        }
2766      }
2767      return returnValue;
2768    }
2769
2770    @Override
2771    protected void doAdd(MutableSegment currentActive, Cell cell, MemStoreSizing memstoreSizing) {
2772      if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) {
2773        try {
2774          /**
2775           * After largeCellThread finished flushInMemory method, smallCellThread can add cell to
2776           * currentActive . That is to say when largeCellThread called flushInMemory method,
2777           * currentActive has no cell.
2778           */
2779          postCyclicBarrier.await();
2780        } catch (Throwable e) {
2781          throw new RuntimeException(e);
2782        }
2783      }
2784      super.doAdd(currentActive, cell, memstoreSizing);
2785    }
2786
2787    @Override
2788    protected void flushInMemory(MutableSegment currentActiveMutableSegment) {
2789      super.flushInMemory(currentActiveMutableSegment);
2790      if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) {
2791        if (largeCellPreUpdateCounter.get() <= 1) {
2792          try {
2793            postCyclicBarrier.await();
2794          } catch (Throwable e) {
2795            throw new RuntimeException(e);
2796          }
2797        }
2798      }
2799    }
2800
2801  }
2802
2803  public static class MyCompactingMemStore3 extends CompactingMemStore {
2804    private static final String LARGE_CELL_THREAD_NAME = "largeCellThread";
2805    private static final String SMALL_CELL_THREAD_NAME = "smallCellThread";
2806
2807    private final CyclicBarrier preCyclicBarrier = new CyclicBarrier(2);
2808    private final CyclicBarrier postCyclicBarrier = new CyclicBarrier(2);
2809    private final AtomicInteger flushCounter = new AtomicInteger(0);
2810    private static final int CELL_COUNT = 5;
2811    private boolean flushByteSizeLessThanSmallAndLargeCellSize = true;
2812
2813    public MyCompactingMemStore3(Configuration conf, CellComparatorImpl cellComparator,
2814        HStore store, RegionServicesForStores regionServices,
2815        MemoryCompactionPolicy compactionPolicy) throws IOException {
2816      super(conf, cellComparator, store, regionServices, compactionPolicy);
2817    }
2818
2819    @Override
2820    protected boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd,
2821        MemStoreSizing memstoreSizing) {
2822      if (!flushByteSizeLessThanSmallAndLargeCellSize) {
2823        return super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing);
2824      }
2825      if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) {
2826        try {
2827          preCyclicBarrier.await();
2828        } catch (Throwable e) {
2829          throw new RuntimeException(e);
2830        }
2831      }
2832
2833      boolean returnValue = super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing);
2834      if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) {
2835        try {
2836          preCyclicBarrier.await();
2837        } catch (Throwable e) {
2838          throw new RuntimeException(e);
2839        }
2840      }
2841      return returnValue;
2842    }
2843
2844    @Override
2845    protected void postUpdate(MutableSegment currentActiveMutableSegment) {
2846      super.postUpdate(currentActiveMutableSegment);
2847      if (!flushByteSizeLessThanSmallAndLargeCellSize) {
2848        try {
2849          postCyclicBarrier.await();
2850        } catch (Throwable e) {
2851          throw new RuntimeException(e);
2852        }
2853        return;
2854      }
2855
2856      if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) {
2857        try {
2858          postCyclicBarrier.await();
2859        } catch (Throwable e) {
2860          throw new RuntimeException(e);
2861        }
2862      }
2863    }
2864
2865    @Override
2866    protected void flushInMemory(MutableSegment currentActiveMutableSegment) {
2867      super.flushInMemory(currentActiveMutableSegment);
2868      flushCounter.incrementAndGet();
2869      if (!flushByteSizeLessThanSmallAndLargeCellSize) {
2870        return;
2871      }
2872
2873      assertTrue(Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME));
2874      try {
2875        postCyclicBarrier.await();
2876      } catch (Throwable e) {
2877        throw new RuntimeException(e);
2878      }
2879
2880    }
2881
2882    void disableCompaction() {
2883      allowCompaction.set(false);
2884    }
2885
2886    void enableCompaction() {
2887      allowCompaction.set(true);
2888    }
2889
2890  }
2891
2892  public static class MyCompactingMemStore4 extends CompactingMemStore {
2893    private static final String TAKE_SNAPSHOT_THREAD_NAME = "takeSnapShotThread";
2894    /**
2895     * {@link CompactingMemStore#flattenOneSegment} must execute after
2896     * {@link CompactingMemStore#getImmutableSegments}
2897     */
2898    private final CyclicBarrier flattenOneSegmentPreCyclicBarrier = new CyclicBarrier(2);
2899    /**
2900     * Only after {@link CompactingMemStore#flattenOneSegment} completed,
2901     * {@link CompactingMemStore#swapPipelineWithNull} could execute.
2902     */
2903    private final CyclicBarrier flattenOneSegmentPostCyclicBarrier = new CyclicBarrier(2);
2904    /**
2905     * Only the in memory compact thread enters {@link CompactingMemStore#flattenOneSegment},the
2906     * snapshot thread starts {@link CompactingMemStore#snapshot},because
2907     * {@link CompactingMemStore#snapshot} would invoke {@link CompactingMemStore#stopCompaction}.
2908     */
2909    private final CyclicBarrier snapShotStartCyclicCyclicBarrier = new CyclicBarrier(2);
2910    /**
2911     * To wait for {@link CompactingMemStore.InMemoryCompactionRunnable} stopping.
2912     */
2913    private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2);
2914    private final AtomicInteger getImmutableSegmentsListCounter = new AtomicInteger(0);
2915    private final AtomicInteger swapPipelineWithNullCounter = new AtomicInteger(0);
2916    private final AtomicInteger flattenOneSegmentCounter = new AtomicInteger(0);
2917    private final AtomicInteger setInMemoryCompactionFlagCounter = new AtomicInteger(0);
2918
2919    public MyCompactingMemStore4(Configuration conf, CellComparatorImpl cellComparator,
2920        HStore store, RegionServicesForStores regionServices,
2921        MemoryCompactionPolicy compactionPolicy) throws IOException {
2922      super(conf, cellComparator, store, regionServices, compactionPolicy);
2923    }
2924
2925    @Override
2926    public VersionedSegmentsList getImmutableSegments() {
2927      VersionedSegmentsList result = super.getImmutableSegments();
2928      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
2929        int currentCount = getImmutableSegmentsListCounter.incrementAndGet();
2930        if (currentCount <= 1) {
2931          try {
2932            flattenOneSegmentPreCyclicBarrier.await();
2933          } catch (Throwable e) {
2934            throw new RuntimeException(e);
2935          }
2936        }
2937      }
2938      return result;
2939    }
2940
2941    @Override
2942    protected boolean swapPipelineWithNull(VersionedSegmentsList segments) {
2943      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
2944        int currentCount = swapPipelineWithNullCounter.incrementAndGet();
2945        if (currentCount <= 1) {
2946          try {
2947            flattenOneSegmentPostCyclicBarrier.await();
2948          } catch (Throwable e) {
2949            throw new RuntimeException(e);
2950          }
2951        }
2952      }
2953      boolean result = super.swapPipelineWithNull(segments);
2954      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
2955        int currentCount = swapPipelineWithNullCounter.get();
2956        if (currentCount <= 1) {
2957          assertTrue(!result);
2958        }
2959        if (currentCount == 2) {
2960          assertTrue(result);
2961        }
2962      }
2963      return result;
2964
2965    }
2966
2967    @Override
2968    public void flattenOneSegment(long requesterVersion, Action action) {
2969      int currentCount = flattenOneSegmentCounter.incrementAndGet();
2970      if (currentCount <= 1) {
2971        try {
2972          /**
2973           * {@link CompactingMemStore#snapshot} could start.
2974           */
2975          snapShotStartCyclicCyclicBarrier.await();
2976          flattenOneSegmentPreCyclicBarrier.await();
2977        } catch (Throwable e) {
2978          throw new RuntimeException(e);
2979        }
2980      }
2981      super.flattenOneSegment(requesterVersion, action);
2982      if (currentCount <= 1) {
2983        try {
2984          flattenOneSegmentPostCyclicBarrier.await();
2985        } catch (Throwable e) {
2986          throw new RuntimeException(e);
2987        }
2988      }
2989    }
2990
2991    @Override
2992    protected boolean setInMemoryCompactionFlag() {
2993      boolean result = super.setInMemoryCompactionFlag();
2994      assertTrue(result);
2995      setInMemoryCompactionFlagCounter.incrementAndGet();
2996      return result;
2997    }
2998
2999    @Override
3000    void inMemoryCompaction() {
3001      try {
3002        super.inMemoryCompaction();
3003      } finally {
3004        try {
3005          inMemoryCompactionEndCyclicBarrier.await();
3006        } catch (Throwable e) {
3007          throw new RuntimeException(e);
3008        }
3009
3010      }
3011    }
3012
3013  }
3014
3015  public static class MyCompactingMemStore5 extends CompactingMemStore {
3016    private static final String TAKE_SNAPSHOT_THREAD_NAME = "takeSnapShotThread";
3017    private static final String WRITE_AGAIN_THREAD_NAME = "writeAgainThread";
3018    /**
3019     * {@link CompactingMemStore#flattenOneSegment} must execute after
3020     * {@link CompactingMemStore#getImmutableSegments}
3021     */
3022    private final CyclicBarrier flattenOneSegmentPreCyclicBarrier = new CyclicBarrier(2);
3023    /**
3024     * Only after {@link CompactingMemStore#flattenOneSegment} completed,
3025     * {@link CompactingMemStore#swapPipelineWithNull} could execute.
3026     */
3027    private final CyclicBarrier flattenOneSegmentPostCyclicBarrier = new CyclicBarrier(2);
3028    /**
3029     * Only the in memory compact thread enters {@link CompactingMemStore#flattenOneSegment},the
3030     * snapshot thread starts {@link CompactingMemStore#snapshot},because
3031     * {@link CompactingMemStore#snapshot} would invoke {@link CompactingMemStore#stopCompaction}.
3032     */
3033    private final CyclicBarrier snapShotStartCyclicCyclicBarrier = new CyclicBarrier(2);
3034    /**
3035     * To wait for {@link CompactingMemStore.InMemoryCompactionRunnable} stopping.
3036     */
3037    private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2);
3038    private final AtomicInteger getImmutableSegmentsListCounter = new AtomicInteger(0);
3039    private final AtomicInteger swapPipelineWithNullCounter = new AtomicInteger(0);
3040    private final AtomicInteger flattenOneSegmentCounter = new AtomicInteger(0);
3041    private final AtomicInteger setInMemoryCompactionFlagCounter = new AtomicInteger(0);
3042    /**
3043     * Only the snapshot thread retry {@link CompactingMemStore#swapPipelineWithNull}, writeAgain
3044     * thread could start.
3045     */
3046    private final CyclicBarrier writeMemStoreAgainStartCyclicBarrier = new CyclicBarrier(2);
3047    /**
3048     * This is used for snapshot thread,writeAgain thread and in memory compact thread. Only the
3049     * writeAgain thread completes, {@link CompactingMemStore#swapPipelineWithNull} would
3050     * execute,and in memory compact thread would exit,because we expect that in memory compact
3051     * executing only once.
3052     */
3053    private final CyclicBarrier writeMemStoreAgainEndCyclicBarrier = new CyclicBarrier(3);
3054
3055    public MyCompactingMemStore5(Configuration conf, CellComparatorImpl cellComparator,
3056        HStore store, RegionServicesForStores regionServices,
3057        MemoryCompactionPolicy compactionPolicy) throws IOException {
3058      super(conf, cellComparator, store, regionServices, compactionPolicy);
3059    }
3060
3061    @Override
3062    public VersionedSegmentsList getImmutableSegments() {
3063      VersionedSegmentsList result = super.getImmutableSegments();
3064      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
3065        int currentCount = getImmutableSegmentsListCounter.incrementAndGet();
3066        if (currentCount <= 1) {
3067          try {
3068            flattenOneSegmentPreCyclicBarrier.await();
3069          } catch (Throwable e) {
3070            throw new RuntimeException(e);
3071          }
3072        }
3073
3074      }
3075
3076      return result;
3077    }
3078
3079    @Override
3080    protected boolean swapPipelineWithNull(VersionedSegmentsList segments) {
3081      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
3082        int currentCount = swapPipelineWithNullCounter.incrementAndGet();
3083        if (currentCount <= 1) {
3084          try {
3085            flattenOneSegmentPostCyclicBarrier.await();
3086          } catch (Throwable e) {
3087            throw new RuntimeException(e);
3088          }
3089        }
3090
3091        if (currentCount == 2) {
3092          try {
3093            /**
3094             * Only the snapshot thread retry {@link CompactingMemStore#swapPipelineWithNull},
3095             * writeAgain thread could start.
3096             */
3097            writeMemStoreAgainStartCyclicBarrier.await();
3098            /**
3099             * Only the writeAgain thread completes, retry
3100             * {@link CompactingMemStore#swapPipelineWithNull} would execute.
3101             */
3102            writeMemStoreAgainEndCyclicBarrier.await();
3103          } catch (Throwable e) {
3104            throw new RuntimeException(e);
3105          }
3106        }
3107
3108      }
3109      boolean result = super.swapPipelineWithNull(segments);
3110      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
3111        int currentCount = swapPipelineWithNullCounter.get();
3112        if (currentCount <= 1) {
3113          assertTrue(!result);
3114        }
3115        if (currentCount == 2) {
3116          assertTrue(result);
3117        }
3118      }
3119      return result;
3120
3121    }
3122
3123    @Override
3124    public void flattenOneSegment(long requesterVersion, Action action) {
3125      int currentCount = flattenOneSegmentCounter.incrementAndGet();
3126      if (currentCount <= 1) {
3127        try {
3128          /**
3129           * {@link CompactingMemStore#snapshot} could start.
3130           */
3131          snapShotStartCyclicCyclicBarrier.await();
3132          flattenOneSegmentPreCyclicBarrier.await();
3133        } catch (Throwable e) {
3134          throw new RuntimeException(e);
3135        }
3136      }
3137      super.flattenOneSegment(requesterVersion, action);
3138      if (currentCount <= 1) {
3139        try {
3140          flattenOneSegmentPostCyclicBarrier.await();
3141          /**
3142           * Only the writeAgain thread completes, in memory compact thread would exit,because we
3143           * expect that in memory compact executing only once.
3144           */
3145          writeMemStoreAgainEndCyclicBarrier.await();
3146        } catch (Throwable e) {
3147          throw new RuntimeException(e);
3148        }
3149
3150      }
3151    }
3152
3153    @Override
3154    protected boolean setInMemoryCompactionFlag() {
3155      boolean result = super.setInMemoryCompactionFlag();
3156      int count = setInMemoryCompactionFlagCounter.incrementAndGet();
3157      if (count <= 1) {
3158        assertTrue(result);
3159      }
3160      if (count == 2) {
3161        assertTrue(!result);
3162      }
3163      return result;
3164    }
3165
3166    @Override
3167    void inMemoryCompaction() {
3168      try {
3169        super.inMemoryCompaction();
3170      } finally {
3171        try {
3172          inMemoryCompactionEndCyclicBarrier.await();
3173        } catch (Throwable e) {
3174          throw new RuntimeException(e);
3175        }
3176
3177      }
3178    }
3179  }
3180
3181  public static class MyCompactingMemStore6 extends CompactingMemStore {
3182    private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2);
3183
3184    public MyCompactingMemStore6(Configuration conf, CellComparatorImpl cellComparator,
3185        HStore store, RegionServicesForStores regionServices,
3186        MemoryCompactionPolicy compactionPolicy) throws IOException {
3187      super(conf, cellComparator, store, regionServices, compactionPolicy);
3188    }
3189
3190    @Override
3191    void inMemoryCompaction() {
3192      try {
3193        super.inMemoryCompaction();
3194      } finally {
3195        try {
3196          inMemoryCompactionEndCyclicBarrier.await();
3197        } catch (Throwable e) {
3198          throw new RuntimeException(e);
3199        }
3200
3201      }
3202    }
3203  }
3204
3205  public static class MyDefaultMemStore extends DefaultMemStore {
3206    private static final String GET_SCANNER_THREAD_NAME = "getScannerMyThread";
3207    private static final String FLUSH_THREAD_NAME = "flushMyThread";
3208    /**
3209     * Only when flush thread enters {@link DefaultMemStore#doClearSnapShot}, getScanner thread
3210     * could start.
3211     */
3212    private final CyclicBarrier getScannerCyclicBarrier = new CyclicBarrier(2);
3213    /**
3214     * Used by getScanner thread notifies flush thread {@link DefaultMemStore#getSnapshotSegments}
3215     * completed, {@link DefaultMemStore#doClearSnapShot} could continue.
3216     */
3217    private final CyclicBarrier preClearSnapShotCyclicBarrier = new CyclicBarrier(2);
3218    /**
3219     * Used by flush thread notifies getScanner thread {@link DefaultMemStore#doClearSnapShot}
3220     * completed, {@link DefaultMemStore#getScanners} could continue.
3221     */
3222    private final CyclicBarrier postClearSnapShotCyclicBarrier = new CyclicBarrier(2);
3223    private final AtomicInteger getSnapshotSegmentsCounter = new AtomicInteger(0);
3224    private final AtomicInteger clearSnapshotCounter = new AtomicInteger(0);
3225    private volatile boolean shouldWait = true;
3226    private volatile HStore store = null;
3227
3228    public MyDefaultMemStore(Configuration conf, CellComparator cellComparator,
3229        RegionServicesForStores regionServices)
3230        throws IOException {
3231      super(conf, cellComparator, regionServices);
3232    }
3233
3234    @Override
3235    protected List<Segment> getSnapshotSegments() {
3236
3237      List<Segment> result = super.getSnapshotSegments();
3238
3239      if (Thread.currentThread().getName().equals(GET_SCANNER_THREAD_NAME)) {
3240        int currentCount = getSnapshotSegmentsCounter.incrementAndGet();
3241        if (currentCount == 1) {
3242          if (this.shouldWait) {
3243            try {
3244              /**
3245               * Notify flush thread {@link DefaultMemStore#getSnapshotSegments} completed,
3246               * {@link DefaultMemStore#doClearSnapShot} could continue.
3247               */
3248              preClearSnapShotCyclicBarrier.await();
3249              /**
3250               * Wait for {@link DefaultMemStore#doClearSnapShot} completed.
3251               */
3252              postClearSnapShotCyclicBarrier.await();
3253
3254            } catch (Throwable e) {
3255              throw new RuntimeException(e);
3256            }
3257          }
3258        }
3259      }
3260      return result;
3261    }
3262
3263
3264    @Override
3265    protected void doClearSnapShot() {
3266      if (Thread.currentThread().getName().equals(FLUSH_THREAD_NAME)) {
3267        int currentCount = clearSnapshotCounter.incrementAndGet();
3268        if (currentCount == 1) {
3269          try {
3270            if (((ReentrantReadWriteLock) store.getStoreEngine().getLock())
3271              .isWriteLockedByCurrentThread()) {
3272              shouldWait = false;
3273            }
3274            /**
3275             * Only when flush thread enters {@link DefaultMemStore#doClearSnapShot}, getScanner
3276             * thread could start.
3277             */
3278            getScannerCyclicBarrier.await();
3279
3280            if (shouldWait) {
3281              /**
3282               * Wait for {@link DefaultMemStore#getSnapshotSegments} completed.
3283               */
3284              preClearSnapShotCyclicBarrier.await();
3285            }
3286          } catch (Throwable e) {
3287            throw new RuntimeException(e);
3288          }
3289        }
3290      }
3291      super.doClearSnapShot();
3292
3293      if (Thread.currentThread().getName().equals(FLUSH_THREAD_NAME)) {
3294        int currentCount = clearSnapshotCounter.get();
3295        if (currentCount == 1) {
3296          if (shouldWait) {
3297            try {
3298              /**
3299               * Notify getScanner thread {@link DefaultMemStore#doClearSnapShot} completed,
3300               * {@link DefaultMemStore#getScanners} could continue.
3301               */
3302              postClearSnapShotCyclicBarrier.await();
3303            } catch (Throwable e) {
3304              throw new RuntimeException(e);
3305            }
3306          }
3307        }
3308      }
3309    }
3310  }
3311}