001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertNull;
024import static org.junit.Assert.assertTrue;
025import static org.junit.Assert.fail;
026import static org.mockito.ArgumentMatchers.any;
027import static org.mockito.Mockito.mock;
028import static org.mockito.Mockito.spy;
029import static org.mockito.Mockito.times;
030import static org.mockito.Mockito.verify;
031import static org.mockito.Mockito.when;
032
033import java.io.IOException;
034import java.lang.ref.SoftReference;
035import java.security.PrivilegedExceptionAction;
036import java.util.ArrayList;
037import java.util.Arrays;
038import java.util.Collection;
039import java.util.Collections;
040import java.util.Iterator;
041import java.util.List;
042import java.util.ListIterator;
043import java.util.NavigableSet;
044import java.util.TreeSet;
045import java.util.concurrent.ConcurrentSkipListSet;
046import java.util.concurrent.CountDownLatch;
047import java.util.concurrent.CyclicBarrier;
048import java.util.concurrent.ExecutorService;
049import java.util.concurrent.Executors;
050import java.util.concurrent.ThreadPoolExecutor;
051import java.util.concurrent.TimeUnit;
052import java.util.concurrent.atomic.AtomicBoolean;
053import java.util.concurrent.atomic.AtomicInteger;
054import java.util.concurrent.atomic.AtomicLong;
055import java.util.concurrent.atomic.AtomicReference;
056import java.util.concurrent.locks.ReentrantReadWriteLock;
057import java.util.function.Consumer;
058import java.util.function.IntBinaryOperator;
059import org.apache.hadoop.conf.Configuration;
060import org.apache.hadoop.fs.FSDataOutputStream;
061import org.apache.hadoop.fs.FileStatus;
062import org.apache.hadoop.fs.FileSystem;
063import org.apache.hadoop.fs.FilterFileSystem;
064import org.apache.hadoop.fs.LocalFileSystem;
065import org.apache.hadoop.fs.Path;
066import org.apache.hadoop.fs.permission.FsPermission;
067import org.apache.hadoop.hbase.Cell;
068import org.apache.hadoop.hbase.CellBuilderFactory;
069import org.apache.hadoop.hbase.CellBuilderType;
070import org.apache.hadoop.hbase.CellComparator;
071import org.apache.hadoop.hbase.CellComparatorImpl;
072import org.apache.hadoop.hbase.CellUtil;
073import org.apache.hadoop.hbase.HBaseClassTestRule;
074import org.apache.hadoop.hbase.HBaseConfiguration;
075import org.apache.hadoop.hbase.HBaseTestingUtil;
076import org.apache.hadoop.hbase.HConstants;
077import org.apache.hadoop.hbase.KeyValue;
078import org.apache.hadoop.hbase.MemoryCompactionPolicy;
079import org.apache.hadoop.hbase.PrivateCellUtil;
080import org.apache.hadoop.hbase.TableName;
081import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
082import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
083import org.apache.hadoop.hbase.client.Get;
084import org.apache.hadoop.hbase.client.RegionInfo;
085import org.apache.hadoop.hbase.client.RegionInfoBuilder;
086import org.apache.hadoop.hbase.client.Scan;
087import org.apache.hadoop.hbase.client.Scan.ReadType;
088import org.apache.hadoop.hbase.client.TableDescriptor;
089import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
090import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
091import org.apache.hadoop.hbase.filter.Filter;
092import org.apache.hadoop.hbase.filter.FilterBase;
093import org.apache.hadoop.hbase.io.compress.Compression;
094import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
095import org.apache.hadoop.hbase.io.hfile.CacheConfig;
096import org.apache.hadoop.hbase.io.hfile.HFile;
097import org.apache.hadoop.hbase.io.hfile.HFileContext;
098import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
099import org.apache.hadoop.hbase.monitoring.MonitoredTask;
100import org.apache.hadoop.hbase.nio.RefCnt;
101import org.apache.hadoop.hbase.quotas.RegionSizeStoreImpl;
102import org.apache.hadoop.hbase.regionserver.MemStoreCompactionStrategy.Action;
103import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
104import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
105import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
106import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
107import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
108import org.apache.hadoop.hbase.security.User;
109import org.apache.hadoop.hbase.testclassification.MediumTests;
110import org.apache.hadoop.hbase.testclassification.RegionServerTests;
111import org.apache.hadoop.hbase.util.Bytes;
112import org.apache.hadoop.hbase.util.CommonFSUtils;
113import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
114import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
115import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
116import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
117import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
118import org.apache.hadoop.hbase.wal.WALFactory;
119import org.apache.hadoop.util.Progressable;
120import org.junit.After;
121import org.junit.AfterClass;
122import org.junit.Before;
123import org.junit.ClassRule;
124import org.junit.Rule;
125import org.junit.Test;
126import org.junit.experimental.categories.Category;
127import org.junit.rules.TestName;
128import org.mockito.Mockito;
129import org.slf4j.Logger;
130import org.slf4j.LoggerFactory;
131
132import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
133
134/**
135 * Test class for the HStore
136 */
137@Category({ RegionServerTests.class, MediumTests.class })
138public class TestHStore {
139
140  @ClassRule
141  public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestHStore.class);
142
143  private static final Logger LOG = LoggerFactory.getLogger(TestHStore.class);
144  @Rule
145  public TestName name = new TestName();
146
147  HRegion region;
148  HStore store;
149  byte[] table = Bytes.toBytes("table");
150  byte[] family = Bytes.toBytes("family");
151
152  byte[] row = Bytes.toBytes("row");
153  byte[] row2 = Bytes.toBytes("row2");
154  byte[] qf1 = Bytes.toBytes("qf1");
155  byte[] qf2 = Bytes.toBytes("qf2");
156  byte[] qf3 = Bytes.toBytes("qf3");
157  byte[] qf4 = Bytes.toBytes("qf4");
158  byte[] qf5 = Bytes.toBytes("qf5");
159  byte[] qf6 = Bytes.toBytes("qf6");
160
161  NavigableSet<byte[]> qualifiers = new ConcurrentSkipListSet<>(Bytes.BYTES_COMPARATOR);
162
163  List<Cell> expected = new ArrayList<>();
164  List<Cell> result = new ArrayList<>();
165
166  long id = EnvironmentEdgeManager.currentTime();
167  Get get = new Get(row);
168
169  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
170  private static final String DIR = TEST_UTIL.getDataTestDir("TestStore").toString();
171
172  @Before
173  public void setUp() throws IOException {
174    qualifiers.clear();
175    qualifiers.add(qf1);
176    qualifiers.add(qf3);
177    qualifiers.add(qf5);
178
179    Iterator<byte[]> iter = qualifiers.iterator();
180    while (iter.hasNext()) {
181      byte[] next = iter.next();
182      expected.add(new KeyValue(row, family, next, 1, (byte[]) null));
183      get.addColumn(family, next);
184    }
185  }
186
187  private void init(String methodName) throws IOException {
188    init(methodName, TEST_UTIL.getConfiguration());
189  }
190
191  private HStore init(String methodName, Configuration conf) throws IOException {
192    // some of the tests write 4 versions and then flush
193    // (with HBASE-4241, lower versions are collected on flush)
194    return init(methodName, conf,
195      ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(4).build());
196  }
197
198  private HStore init(String methodName, Configuration conf, ColumnFamilyDescriptor hcd)
199    throws IOException {
200    return init(methodName, conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), hcd);
201  }
202
203  private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder,
204    ColumnFamilyDescriptor hcd) throws IOException {
205    return init(methodName, conf, builder, hcd, null);
206  }
207
208  private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder,
209    ColumnFamilyDescriptor hcd, MyStoreHook hook) throws IOException {
210    return init(methodName, conf, builder, hcd, hook, false);
211  }
212
213  private void initHRegion(String methodName, Configuration conf, TableDescriptorBuilder builder,
214    ColumnFamilyDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException {
215    TableDescriptor htd = builder.setColumnFamily(hcd).build();
216    Path basedir = new Path(DIR + methodName);
217    Path tableDir = CommonFSUtils.getTableDir(basedir, htd.getTableName());
218    final Path logdir = new Path(basedir, AbstractFSWALProvider.getWALDirectoryName(methodName));
219
220    FileSystem fs = FileSystem.get(conf);
221
222    fs.delete(logdir, true);
223    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false,
224      MemStoreLABImpl.CHUNK_SIZE_DEFAULT, 1, 0, null,
225      MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
226    RegionInfo info = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
227    Configuration walConf = new Configuration(conf);
228    CommonFSUtils.setRootDir(walConf, basedir);
229    WALFactory wals = new WALFactory(walConf, methodName);
230    region = new HRegion(new HRegionFileSystem(conf, fs, tableDir, info), wals.getWAL(info), conf,
231      htd, null);
232    region.regionServicesForStores = Mockito.spy(region.regionServicesForStores);
233    ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
234    Mockito.when(region.regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool);
235  }
236
237  private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder,
238    ColumnFamilyDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException {
239    initHRegion(methodName, conf, builder, hcd, hook, switchToPread);
240    if (hook == null) {
241      store = new HStore(region, hcd, conf, false);
242    } else {
243      store = new MyStore(region, hcd, conf, hook, switchToPread);
244    }
245    region.stores.put(store.getColumnFamilyDescriptor().getName(), store);
246    return store;
247  }
248
249  /**
250   * Test we do not lose data if we fail a flush and then close. Part of HBase-10466
251   */
252  @Test
253  public void testFlushSizeSizing() throws Exception {
254    LOG.info("Setting up a faulty file system that cannot write in " + this.name.getMethodName());
255    final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
256    // Only retry once.
257    conf.setInt("hbase.hstore.flush.retries.number", 1);
258    User user = User.createUserForTesting(conf, this.name.getMethodName(), new String[] { "foo" });
259    // Inject our faulty LocalFileSystem
260    conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
261    user.runAs(new PrivilegedExceptionAction<Object>() {
262      @Override
263      public Object run() throws Exception {
264        // Make sure it worked (above is sensitive to caching details in hadoop core)
265        FileSystem fs = FileSystem.get(conf);
266        assertEquals(FaultyFileSystem.class, fs.getClass());
267        FaultyFileSystem ffs = (FaultyFileSystem) fs;
268
269        // Initialize region
270        init(name.getMethodName(), conf);
271
272        MemStoreSize mss = store.memstore.getFlushableSize();
273        assertEquals(0, mss.getDataSize());
274        LOG.info("Adding some data");
275        MemStoreSizing kvSize = new NonThreadSafeMemStoreSizing();
276        store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), kvSize);
277        // add the heap size of active (mutable) segment
278        kvSize.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0, 0);
279        mss = store.memstore.getFlushableSize();
280        assertEquals(kvSize.getMemStoreSize(), mss);
281        // Flush. Bug #1 from HBASE-10466. Make sure size calculation on failed flush is right.
282        try {
283          LOG.info("Flushing");
284          flushStore(store, id++);
285          fail("Didn't bubble up IOE!");
286        } catch (IOException ioe) {
287          assertTrue(ioe.getMessage().contains("Fault injected"));
288        }
289        // due to snapshot, change mutable to immutable segment
290        kvSize.incMemStoreSize(0,
291          CSLMImmutableSegment.DEEP_OVERHEAD_CSLM - MutableSegment.DEEP_OVERHEAD, 0, 0);
292        mss = store.memstore.getFlushableSize();
293        assertEquals(kvSize.getMemStoreSize(), mss);
294        MemStoreSizing kvSize2 = new NonThreadSafeMemStoreSizing();
295        store.add(new KeyValue(row, family, qf2, 2, (byte[]) null), kvSize2);
296        kvSize2.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0, 0);
297        // Even though we add a new kv, we expect the flushable size to be 'same' since we have
298        // not yet cleared the snapshot -- the above flush failed.
299        assertEquals(kvSize.getMemStoreSize(), mss);
300        ffs.fault.set(false);
301        flushStore(store, id++);
302        mss = store.memstore.getFlushableSize();
303        // Size should be the foreground kv size.
304        assertEquals(kvSize2.getMemStoreSize(), mss);
305        flushStore(store, id++);
306        mss = store.memstore.getFlushableSize();
307        assertEquals(0, mss.getDataSize());
308        assertEquals(MutableSegment.DEEP_OVERHEAD, mss.getHeapSize());
309        return null;
310      }
311    });
312  }
313
314  /**
315   * Verify that compression and data block encoding are respected by the createWriter method, used
316   * on store flush.
317   */
318  @Test
319  public void testCreateWriter() throws Exception {
320    Configuration conf = HBaseConfiguration.create();
321    FileSystem fs = FileSystem.get(conf);
322
323    ColumnFamilyDescriptor hcd =
324      ColumnFamilyDescriptorBuilder.newBuilder(family).setCompressionType(Compression.Algorithm.GZ)
325        .setDataBlockEncoding(DataBlockEncoding.DIFF).build();
326    init(name.getMethodName(), conf, hcd);
327
328    // Test createWriter
329    StoreFileWriter writer = store.getStoreEngine()
330      .createWriter(CreateStoreFileWriterParams.create().maxKeyCount(4)
331        .compression(hcd.getCompressionType()).isCompaction(false).includeMVCCReadpoint(true)
332        .includesTag(false).shouldDropBehind(false));
333    Path path = writer.getPath();
334    writer.append(new KeyValue(row, family, qf1, Bytes.toBytes(1)));
335    writer.append(new KeyValue(row, family, qf2, Bytes.toBytes(2)));
336    writer.append(new KeyValue(row2, family, qf1, Bytes.toBytes(3)));
337    writer.append(new KeyValue(row2, family, qf2, Bytes.toBytes(4)));
338    writer.close();
339
340    // Verify that compression and encoding settings are respected
341    HFile.Reader reader = HFile.createReader(fs, path, new CacheConfig(conf), true, conf);
342    assertEquals(hcd.getCompressionType(), reader.getTrailer().getCompressionCodec());
343    assertEquals(hcd.getDataBlockEncoding(), reader.getDataBlockEncoding());
344    reader.close();
345  }
346
347  @Test
348  public void testDeleteExpiredStoreFiles() throws Exception {
349    testDeleteExpiredStoreFiles(0);
350    testDeleteExpiredStoreFiles(1);
351  }
352
353  /**
354   * @param minVersions the MIN_VERSIONS for the column family
355   */
356  public void testDeleteExpiredStoreFiles(int minVersions) throws Exception {
357    int storeFileNum = 4;
358    int ttl = 4;
359    IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
360    EnvironmentEdgeManagerTestHelper.injectEdge(edge);
361
362    Configuration conf = HBaseConfiguration.create();
363    // Enable the expired store file deletion
364    conf.setBoolean("hbase.store.delete.expired.storefile", true);
365    // Set the compaction threshold higher to avoid normal compactions.
366    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 5);
367
368    init(name.getMethodName() + "-" + minVersions, conf, ColumnFamilyDescriptorBuilder
369      .newBuilder(family).setMinVersions(minVersions).setTimeToLive(ttl).build());
370
371    long storeTtl = this.store.getScanInfo().getTtl();
372    long sleepTime = storeTtl / storeFileNum;
373    long timeStamp;
374    // There are 4 store files and the max time stamp difference among these
375    // store files will be (this.store.ttl / storeFileNum)
376    for (int i = 1; i <= storeFileNum; i++) {
377      LOG.info("Adding some data for the store file #" + i);
378      timeStamp = EnvironmentEdgeManager.currentTime();
379      this.store.add(new KeyValue(row, family, qf1, timeStamp, (byte[]) null), null);
380      this.store.add(new KeyValue(row, family, qf2, timeStamp, (byte[]) null), null);
381      this.store.add(new KeyValue(row, family, qf3, timeStamp, (byte[]) null), null);
382      flush(i);
383      edge.incrementTime(sleepTime);
384    }
385
386    // Verify the total number of store files
387    assertEquals(storeFileNum, this.store.getStorefiles().size());
388
389    // Each call will find one expired store file and delete it before compaction happens.
390    // There will be no compaction due to threshold above. Last file will not be replaced.
391    for (int i = 1; i <= storeFileNum - 1; i++) {
392      // verify the expired store file.
393      assertFalse(this.store.requestCompaction().isPresent());
394      Collection<HStoreFile> sfs = this.store.getStorefiles();
395      // Ensure i files are gone.
396      if (minVersions == 0) {
397        assertEquals(storeFileNum - i, sfs.size());
398        // Ensure only non-expired files remain.
399        for (HStoreFile sf : sfs) {
400          assertTrue(sf.getReader().getMaxTimestamp() >= (edge.currentTime() - storeTtl));
401        }
402      } else {
403        assertEquals(storeFileNum, sfs.size());
404      }
405      // Let the next store file expired.
406      edge.incrementTime(sleepTime);
407    }
408    assertFalse(this.store.requestCompaction().isPresent());
409
410    Collection<HStoreFile> sfs = this.store.getStorefiles();
411    // Assert the last expired file is not removed.
412    if (minVersions == 0) {
413      assertEquals(1, sfs.size());
414    }
415    long ts = sfs.iterator().next().getReader().getMaxTimestamp();
416    assertTrue(ts < (edge.currentTime() - storeTtl));
417
418    for (HStoreFile sf : sfs) {
419      sf.closeStoreFile(true);
420    }
421  }
422
423  @Test
424  public void testLowestModificationTime() throws Exception {
425    Configuration conf = HBaseConfiguration.create();
426    FileSystem fs = FileSystem.get(conf);
427    // Initialize region
428    init(name.getMethodName(), conf);
429
430    int storeFileNum = 4;
431    for (int i = 1; i <= storeFileNum; i++) {
432      LOG.info("Adding some data for the store file #" + i);
433      this.store.add(new KeyValue(row, family, qf1, i, (byte[]) null), null);
434      this.store.add(new KeyValue(row, family, qf2, i, (byte[]) null), null);
435      this.store.add(new KeyValue(row, family, qf3, i, (byte[]) null), null);
436      flush(i);
437    }
438    // after flush; check the lowest time stamp
439    long lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
440    long lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
441    assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS);
442
443    // after compact; check the lowest time stamp
444    store.compact(store.requestCompaction().get(), NoLimitThroughputController.INSTANCE, null);
445    lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
446    lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
447    assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS);
448  }
449
450  private static long getLowestTimeStampFromFS(FileSystem fs,
451    final Collection<HStoreFile> candidates) throws IOException {
452    long minTs = Long.MAX_VALUE;
453    if (candidates.isEmpty()) {
454      return minTs;
455    }
456    Path[] p = new Path[candidates.size()];
457    int i = 0;
458    for (HStoreFile sf : candidates) {
459      p[i] = sf.getPath();
460      ++i;
461    }
462
463    FileStatus[] stats = fs.listStatus(p);
464    if (stats == null || stats.length == 0) {
465      return minTs;
466    }
467    for (FileStatus s : stats) {
468      minTs = Math.min(minTs, s.getModificationTime());
469    }
470    return minTs;
471  }
472
473  //////////////////////////////////////////////////////////////////////////////
474  // Get tests
475  //////////////////////////////////////////////////////////////////////////////
476
477  private static final int BLOCKSIZE_SMALL = 8192;
478
479  /**
480   * Test for hbase-1686.
481   */
482  @Test
483  public void testEmptyStoreFile() throws IOException {
484    init(this.name.getMethodName());
485    // Write a store file.
486    this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
487    this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null);
488    flush(1);
489    // Now put in place an empty store file. Its a little tricky. Have to
490    // do manually with hacked in sequence id.
491    HStoreFile f = this.store.getStorefiles().iterator().next();
492    Path storedir = f.getPath().getParent();
493    long seqid = f.getMaxSequenceId();
494    Configuration c = HBaseConfiguration.create();
495    FileSystem fs = FileSystem.get(c);
496    HFileContext meta = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build();
497    StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c), fs)
498      .withOutputDir(storedir).withFileContext(meta).build();
499    w.appendMetadata(seqid + 1, false);
500    w.close();
501    this.store.close();
502    // Reopen it... should pick up two files
503    this.store =
504      new HStore(this.store.getHRegion(), this.store.getColumnFamilyDescriptor(), c, false);
505    assertEquals(2, this.store.getStorefilesCount());
506
507    result = HBaseTestingUtil.getFromStoreFile(store, get.getRow(), qualifiers);
508    assertEquals(1, result.size());
509  }
510
511  /**
512   * Getting data from memstore only
513   */
514  @Test
515  public void testGet_FromMemStoreOnly() throws IOException {
516    init(this.name.getMethodName());
517
518    // Put data in memstore
519    this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
520    this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null);
521    this.store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null);
522    this.store.add(new KeyValue(row, family, qf4, 1, (byte[]) null), null);
523    this.store.add(new KeyValue(row, family, qf5, 1, (byte[]) null), null);
524    this.store.add(new KeyValue(row, family, qf6, 1, (byte[]) null), null);
525
526    // Get
527    result = HBaseTestingUtil.getFromStoreFile(store, get.getRow(), qualifiers);
528
529    // Compare
530    assertCheck();
531  }
532
533  @Test
534  public void testTimeRangeIfSomeCellsAreDroppedInFlush() throws IOException {
535    testTimeRangeIfSomeCellsAreDroppedInFlush(1);
536    testTimeRangeIfSomeCellsAreDroppedInFlush(3);
537    testTimeRangeIfSomeCellsAreDroppedInFlush(5);
538  }
539
540  private void testTimeRangeIfSomeCellsAreDroppedInFlush(int maxVersion) throws IOException {
541    init(this.name.getMethodName(), TEST_UTIL.getConfiguration(),
542      ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(maxVersion).build());
543    long currentTs = 100;
544    long minTs = currentTs;
545    // the extra cell won't be flushed to disk,
546    // so the min of timerange will be different between memStore and hfile.
547    for (int i = 0; i != (maxVersion + 1); ++i) {
548      this.store.add(new KeyValue(row, family, qf1, ++currentTs, (byte[]) null), null);
549      if (i == 1) {
550        minTs = currentTs;
551      }
552    }
553    flushStore(store, id++);
554
555    Collection<HStoreFile> files = store.getStorefiles();
556    assertEquals(1, files.size());
557    HStoreFile f = files.iterator().next();
558    f.initReader();
559    StoreFileReader reader = f.getReader();
560    assertEquals(minTs, reader.timeRange.getMin());
561    assertEquals(currentTs, reader.timeRange.getMax());
562  }
563
564  /**
565   * Getting data from files only
566   */
567  @Test
568  public void testGet_FromFilesOnly() throws IOException {
569    init(this.name.getMethodName());
570
571    // Put data in memstore
572    this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
573    this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null);
574    // flush
575    flush(1);
576
577    // Add more data
578    this.store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null);
579    this.store.add(new KeyValue(row, family, qf4, 1, (byte[]) null), null);
580    // flush
581    flush(2);
582
583    // Add more data
584    this.store.add(new KeyValue(row, family, qf5, 1, (byte[]) null), null);
585    this.store.add(new KeyValue(row, family, qf6, 1, (byte[]) null), null);
586    // flush
587    flush(3);
588
589    // Get
590    result = HBaseTestingUtil.getFromStoreFile(store, get.getRow(), qualifiers);
591    // this.store.get(get, qualifiers, result);
592
593    // Need to sort the result since multiple files
594    Collections.sort(result, CellComparatorImpl.COMPARATOR);
595
596    // Compare
597    assertCheck();
598  }
599
600  /**
601   * Getting data from memstore and files
602   */
603  @Test
604  public void testGet_FromMemStoreAndFiles() throws IOException {
605    init(this.name.getMethodName());
606
607    // Put data in memstore
608    this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
609    this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null);
610    // flush
611    flush(1);
612
613    // Add more data
614    this.store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null);
615    this.store.add(new KeyValue(row, family, qf4, 1, (byte[]) null), null);
616    // flush
617    flush(2);
618
619    // Add more data
620    this.store.add(new KeyValue(row, family, qf5, 1, (byte[]) null), null);
621    this.store.add(new KeyValue(row, family, qf6, 1, (byte[]) null), null);
622
623    // Get
624    result = HBaseTestingUtil.getFromStoreFile(store, get.getRow(), qualifiers);
625
626    // Need to sort the result since multiple files
627    Collections.sort(result, CellComparatorImpl.COMPARATOR);
628
629    // Compare
630    assertCheck();
631  }
632
633  private void flush(int storeFilessize) throws IOException {
634    flushStore(store, id++);
635    assertEquals(storeFilessize, this.store.getStorefiles().size());
636    assertEquals(0, ((AbstractMemStore) this.store.memstore).getActive().getCellsCount());
637  }
638
639  private void assertCheck() {
640    assertEquals(expected.size(), result.size());
641    for (int i = 0; i < expected.size(); i++) {
642      assertEquals(expected.get(i), result.get(i));
643    }
644  }
645
646  @After
647  public void tearDown() throws Exception {
648    EnvironmentEdgeManagerTestHelper.reset();
649    if (store != null) {
650      try {
651        store.close();
652      } catch (IOException e) {
653      }
654      store = null;
655    }
656    if (region != null) {
657      region.close();
658      region = null;
659    }
660  }
661
662  @AfterClass
663  public static void tearDownAfterClass() throws IOException {
664    TEST_UTIL.cleanupTestDir();
665  }
666
667  @Test
668  public void testHandleErrorsInFlush() throws Exception {
669    LOG.info("Setting up a faulty file system that cannot write");
670
671    final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
672    User user = User.createUserForTesting(conf, "testhandleerrorsinflush", new String[] { "foo" });
673    // Inject our faulty LocalFileSystem
674    conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
675    user.runAs(new PrivilegedExceptionAction<Object>() {
676      @Override
677      public Object run() throws Exception {
678        // Make sure it worked (above is sensitive to caching details in hadoop core)
679        FileSystem fs = FileSystem.get(conf);
680        assertEquals(FaultyFileSystem.class, fs.getClass());
681
682        // Initialize region
683        init(name.getMethodName(), conf);
684
685        LOG.info("Adding some data");
686        store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
687        store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null);
688        store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null);
689
690        LOG.info("Before flush, we should have no files");
691
692        Collection<StoreFileInfo> files =
693          store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName());
694        assertEquals(0, files != null ? files.size() : 0);
695
696        // flush
697        try {
698          LOG.info("Flushing");
699          flush(1);
700          fail("Didn't bubble up IOE!");
701        } catch (IOException ioe) {
702          assertTrue(ioe.getMessage().contains("Fault injected"));
703        }
704
705        LOG.info("After failed flush, we should still have no files!");
706        files = store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName());
707        assertEquals(0, files != null ? files.size() : 0);
708        store.getHRegion().getWAL().close();
709        return null;
710      }
711    });
712    FileSystem.closeAllForUGI(user.getUGI());
713  }
714
715  /**
716   * Faulty file system that will fail if you write past its fault position the FIRST TIME only;
717   * thereafter it will succeed. Used by {@link TestHRegion} too.
718   */
719  static class FaultyFileSystem extends FilterFileSystem {
720    List<SoftReference<FaultyOutputStream>> outStreams = new ArrayList<>();
721    private long faultPos = 200;
722    AtomicBoolean fault = new AtomicBoolean(true);
723
724    public FaultyFileSystem() {
725      super(new LocalFileSystem());
726      LOG.info("Creating faulty!");
727    }
728
729    @Override
730    public FSDataOutputStream create(Path p) throws IOException {
731      return new FaultyOutputStream(super.create(p), faultPos, fault);
732    }
733
734    @Override
735    public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite,
736      int bufferSize, short replication, long blockSize, Progressable progress) throws IOException {
737      return new FaultyOutputStream(
738        super.create(f, permission, overwrite, bufferSize, replication, blockSize, progress),
739        faultPos, fault);
740    }
741
742    @Override
743    public FSDataOutputStream createNonRecursive(Path f, boolean overwrite, int bufferSize,
744      short replication, long blockSize, Progressable progress) throws IOException {
745      // Fake it. Call create instead. The default implementation throws an IOE
746      // that this is not supported.
747      return create(f, overwrite, bufferSize, replication, blockSize, progress);
748    }
749  }
750
751  static class FaultyOutputStream extends FSDataOutputStream {
752    volatile long faultPos = Long.MAX_VALUE;
753    private final AtomicBoolean fault;
754
755    public FaultyOutputStream(FSDataOutputStream out, long faultPos, final AtomicBoolean fault)
756      throws IOException {
757      super(out, null);
758      this.faultPos = faultPos;
759      this.fault = fault;
760    }
761
762    @Override
763    public synchronized void write(byte[] buf, int offset, int length) throws IOException {
764      LOG.info("faulty stream write at pos " + getPos());
765      injectFault();
766      super.write(buf, offset, length);
767    }
768
769    private void injectFault() throws IOException {
770      if (this.fault.get() && getPos() >= faultPos) {
771        throw new IOException("Fault injected");
772      }
773    }
774  }
775
776  private static StoreFlushContext flushStore(HStore store, long id) throws IOException {
777    StoreFlushContext storeFlushCtx = store.createFlushContext(id, FlushLifeCycleTracker.DUMMY);
778    storeFlushCtx.prepare();
779    storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
780    storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
781    return storeFlushCtx;
782  }
783
784  /**
785   * Generate a list of KeyValues for testing based on given parameters
786   * @return the rows key-value list
787   */
788  private List<Cell> getKeyValueSet(long[] timestamps, int numRows, byte[] qualifier,
789    byte[] family) {
790    List<Cell> kvList = new ArrayList<>();
791    for (int i = 1; i <= numRows; i++) {
792      byte[] b = Bytes.toBytes(i);
793      for (long timestamp : timestamps) {
794        kvList.add(new KeyValue(b, family, qualifier, timestamp, b));
795      }
796    }
797    return kvList;
798  }
799
800  /**
801   * Test to ensure correctness when using Stores with multiple timestamps
802   */
803  @Test
804  public void testMultipleTimestamps() throws IOException {
805    int numRows = 1;
806    long[] timestamps1 = new long[] { 1, 5, 10, 20 };
807    long[] timestamps2 = new long[] { 30, 80 };
808
809    init(this.name.getMethodName());
810
811    List<Cell> kvList1 = getKeyValueSet(timestamps1, numRows, qf1, family);
812    for (Cell kv : kvList1) {
813      this.store.add(kv, null);
814    }
815
816    flushStore(store, id++);
817
818    List<Cell> kvList2 = getKeyValueSet(timestamps2, numRows, qf1, family);
819    for (Cell kv : kvList2) {
820      this.store.add(kv, null);
821    }
822
823    List<Cell> result;
824    Get get = new Get(Bytes.toBytes(1));
825    get.addColumn(family, qf1);
826
827    get.setTimeRange(0, 15);
828    result = HBaseTestingUtil.getFromStoreFile(store, get);
829    assertTrue(result.size() > 0);
830
831    get.setTimeRange(40, 90);
832    result = HBaseTestingUtil.getFromStoreFile(store, get);
833    assertTrue(result.size() > 0);
834
835    get.setTimeRange(10, 45);
836    result = HBaseTestingUtil.getFromStoreFile(store, get);
837    assertTrue(result.size() > 0);
838
839    get.setTimeRange(80, 145);
840    result = HBaseTestingUtil.getFromStoreFile(store, get);
841    assertTrue(result.size() > 0);
842
843    get.setTimeRange(1, 2);
844    result = HBaseTestingUtil.getFromStoreFile(store, get);
845    assertTrue(result.size() > 0);
846
847    get.setTimeRange(90, 200);
848    result = HBaseTestingUtil.getFromStoreFile(store, get);
849    assertTrue(result.size() == 0);
850  }
851
852  /**
853   * Test for HBASE-3492 - Test split on empty colfam (no store files).
854   * @throws IOException When the IO operations fail.
855   */
856  @Test
857  public void testSplitWithEmptyColFam() throws IOException {
858    init(this.name.getMethodName());
859    assertFalse(store.getSplitPoint().isPresent());
860  }
861
862  @Test
863  public void testStoreUsesConfigurationFromHcdAndHtd() throws Exception {
864    final String CONFIG_KEY = "hbase.regionserver.thread.compaction.throttle";
865    long anyValue = 10;
866
867    // We'll check that it uses correct config and propagates it appropriately by going thru
868    // the simplest "real" path I can find - "throttleCompaction", which just checks whether
869    // a number we pass in is higher than some config value, inside compactionPolicy.
870    Configuration conf = HBaseConfiguration.create();
871    conf.setLong(CONFIG_KEY, anyValue);
872    init(name.getMethodName() + "-xml", conf);
873    assertTrue(store.throttleCompaction(anyValue + 1));
874    assertFalse(store.throttleCompaction(anyValue));
875
876    // HTD overrides XML.
877    --anyValue;
878    init(
879      name.getMethodName() + "-htd", conf, TableDescriptorBuilder
880        .newBuilder(TableName.valueOf(table)).setValue(CONFIG_KEY, Long.toString(anyValue)),
881      ColumnFamilyDescriptorBuilder.of(family));
882    assertTrue(store.throttleCompaction(anyValue + 1));
883    assertFalse(store.throttleCompaction(anyValue));
884
885    // HCD overrides them both.
886    --anyValue;
887    init(name.getMethodName() + "-hcd", conf,
888      TableDescriptorBuilder.newBuilder(TableName.valueOf(table)).setValue(CONFIG_KEY,
889        Long.toString(anyValue)),
890      ColumnFamilyDescriptorBuilder.newBuilder(family).setValue(CONFIG_KEY, Long.toString(anyValue))
891        .build());
892    assertTrue(store.throttleCompaction(anyValue + 1));
893    assertFalse(store.throttleCompaction(anyValue));
894  }
895
896  public static class DummyStoreEngine extends DefaultStoreEngine {
897    public static DefaultCompactor lastCreatedCompactor = null;
898
899    @Override
900    protected void createComponents(Configuration conf, HStore store, CellComparator comparator)
901      throws IOException {
902      super.createComponents(conf, store, comparator);
903      lastCreatedCompactor = this.compactor;
904    }
905  }
906
907  @Test
908  public void testStoreUsesSearchEngineOverride() throws Exception {
909    Configuration conf = HBaseConfiguration.create();
910    conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DummyStoreEngine.class.getName());
911    init(this.name.getMethodName(), conf);
912    assertEquals(DummyStoreEngine.lastCreatedCompactor, this.store.storeEngine.getCompactor());
913  }
914
915  private void addStoreFile() throws IOException {
916    HStoreFile f = this.store.getStorefiles().iterator().next();
917    Path storedir = f.getPath().getParent();
918    long seqid = this.store.getMaxSequenceId().orElse(0L);
919    Configuration c = TEST_UTIL.getConfiguration();
920    FileSystem fs = FileSystem.get(c);
921    HFileContext fileContext = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build();
922    StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c), fs)
923      .withOutputDir(storedir).withFileContext(fileContext).build();
924    w.appendMetadata(seqid + 1, false);
925    w.close();
926    LOG.info("Added store file:" + w.getPath());
927  }
928
929  private void archiveStoreFile(int index) throws IOException {
930    Collection<HStoreFile> files = this.store.getStorefiles();
931    HStoreFile sf = null;
932    Iterator<HStoreFile> it = files.iterator();
933    for (int i = 0; i <= index; i++) {
934      sf = it.next();
935    }
936    store.getRegionFileSystem().removeStoreFiles(store.getColumnFamilyName(),
937      Lists.newArrayList(sf));
938  }
939
940  private void closeCompactedFile(int index) throws IOException {
941    Collection<HStoreFile> files =
942      this.store.getStoreEngine().getStoreFileManager().getCompactedfiles();
943    if (files.size() > 0) {
944      HStoreFile sf = null;
945      Iterator<HStoreFile> it = files.iterator();
946      for (int i = 0; i <= index; i++) {
947        sf = it.next();
948      }
949      sf.closeStoreFile(true);
950      store.getStoreEngine().getStoreFileManager()
951        .removeCompactedFiles(Collections.singletonList(sf));
952    }
953  }
954
955  @Test
956  public void testRefreshStoreFiles() throws Exception {
957    init(name.getMethodName());
958
959    assertEquals(0, this.store.getStorefilesCount());
960
961    // Test refreshing store files when no store files are there
962    store.refreshStoreFiles();
963    assertEquals(0, this.store.getStorefilesCount());
964
965    // add some data, flush
966    this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
967    flush(1);
968    assertEquals(1, this.store.getStorefilesCount());
969
970    // add one more file
971    addStoreFile();
972
973    assertEquals(1, this.store.getStorefilesCount());
974    store.refreshStoreFiles();
975    assertEquals(2, this.store.getStorefilesCount());
976
977    // add three more files
978    addStoreFile();
979    addStoreFile();
980    addStoreFile();
981
982    assertEquals(2, this.store.getStorefilesCount());
983    store.refreshStoreFiles();
984    assertEquals(5, this.store.getStorefilesCount());
985
986    closeCompactedFile(0);
987    archiveStoreFile(0);
988
989    assertEquals(5, this.store.getStorefilesCount());
990    store.refreshStoreFiles();
991    assertEquals(4, this.store.getStorefilesCount());
992
993    archiveStoreFile(0);
994    archiveStoreFile(1);
995    archiveStoreFile(2);
996
997    assertEquals(4, this.store.getStorefilesCount());
998    store.refreshStoreFiles();
999    assertEquals(1, this.store.getStorefilesCount());
1000
1001    archiveStoreFile(0);
1002    store.refreshStoreFiles();
1003    assertEquals(0, this.store.getStorefilesCount());
1004  }
1005
1006  @Test
1007  public void testRefreshStoreFilesNotChanged() throws IOException {
1008    init(name.getMethodName());
1009
1010    assertEquals(0, this.store.getStorefilesCount());
1011
1012    // add some data, flush
1013    this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
1014    flush(1);
1015    // add one more file
1016    addStoreFile();
1017
1018    StoreEngine<?, ?, ?, ?> spiedStoreEngine = spy(store.getStoreEngine());
1019
1020    // call first time after files changed
1021    spiedStoreEngine.refreshStoreFiles();
1022    assertEquals(2, this.store.getStorefilesCount());
1023    verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any(), any(), any());
1024
1025    // call second time
1026    spiedStoreEngine.refreshStoreFiles();
1027
1028    // ensure that replaceStoreFiles is not called, i.e, the times does not change, if files are not
1029    // refreshed,
1030    verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any(), any(), any());
1031  }
1032
1033  private long countMemStoreScanner(StoreScanner scanner) {
1034    if (scanner.currentScanners == null) {
1035      return 0;
1036    }
1037    return scanner.currentScanners.stream().filter(s -> !s.isFileScanner()).count();
1038  }
1039
1040  @Test
1041  public void testNumberOfMemStoreScannersAfterFlush() throws IOException {
1042    long seqId = 100;
1043    long timestamp = EnvironmentEdgeManager.currentTime();
1044    Cell cell0 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family)
1045      .setQualifier(qf1).setTimestamp(timestamp).setType(Cell.Type.Put).setValue(qf1).build();
1046    PrivateCellUtil.setSequenceId(cell0, seqId);
1047    testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Collections.emptyList());
1048
1049    Cell cell1 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family)
1050      .setQualifier(qf2).setTimestamp(timestamp).setType(Cell.Type.Put).setValue(qf1).build();
1051    PrivateCellUtil.setSequenceId(cell1, seqId);
1052    testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1));
1053
1054    seqId = 101;
1055    timestamp = EnvironmentEdgeManager.currentTime();
1056    Cell cell2 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row2).setFamily(family)
1057      .setQualifier(qf2).setTimestamp(timestamp).setType(Cell.Type.Put).setValue(qf1).build();
1058    PrivateCellUtil.setSequenceId(cell2, seqId);
1059    testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1, cell2));
1060  }
1061
1062  private void testNumberOfMemStoreScannersAfterFlush(List<Cell> inputCellsBeforeSnapshot,
1063    List<Cell> inputCellsAfterSnapshot) throws IOException {
1064    init(this.name.getMethodName() + "-" + inputCellsBeforeSnapshot.size());
1065    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1066    long seqId = Long.MIN_VALUE;
1067    for (Cell c : inputCellsBeforeSnapshot) {
1068      quals.add(CellUtil.cloneQualifier(c));
1069      seqId = Math.max(seqId, c.getSequenceId());
1070    }
1071    for (Cell c : inputCellsAfterSnapshot) {
1072      quals.add(CellUtil.cloneQualifier(c));
1073      seqId = Math.max(seqId, c.getSequenceId());
1074    }
1075    inputCellsBeforeSnapshot.forEach(c -> store.add(c, null));
1076    StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY);
1077    storeFlushCtx.prepare();
1078    inputCellsAfterSnapshot.forEach(c -> store.add(c, null));
1079    int numberOfMemScannersBeforeFlush = inputCellsAfterSnapshot.isEmpty() ? 1 : 2;
1080    try (StoreScanner s = (StoreScanner) store.getScanner(new Scan(), quals, seqId)) {
1081      // snapshot + active (if inputCellsAfterSnapshot isn't empty)
1082      assertEquals(numberOfMemScannersBeforeFlush, countMemStoreScanner(s));
1083      storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1084      storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1085      // snapshot has no data after flush
1086      int numberOfMemScannersAfterFlush = inputCellsAfterSnapshot.isEmpty() ? 0 : 1;
1087      boolean more;
1088      int cellCount = 0;
1089      do {
1090        List<Cell> cells = new ArrayList<>();
1091        more = s.next(cells);
1092        cellCount += cells.size();
1093        assertEquals(more ? numberOfMemScannersAfterFlush : 0, countMemStoreScanner(s));
1094      } while (more);
1095      assertEquals(
1096        "The number of cells added before snapshot is " + inputCellsBeforeSnapshot.size()
1097          + ", The number of cells added after snapshot is " + inputCellsAfterSnapshot.size(),
1098        inputCellsBeforeSnapshot.size() + inputCellsAfterSnapshot.size(), cellCount);
1099      // the current scanners is cleared
1100      assertEquals(0, countMemStoreScanner(s));
1101    }
1102  }
1103
1104  private Cell createCell(byte[] qualifier, long ts, long sequenceId, byte[] value)
1105    throws IOException {
1106    return createCell(row, qualifier, ts, sequenceId, value);
1107  }
1108
1109  private Cell createCell(byte[] row, byte[] qualifier, long ts, long sequenceId, byte[] value)
1110    throws IOException {
1111    Cell c = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family)
1112      .setQualifier(qualifier).setTimestamp(ts).setType(Cell.Type.Put).setValue(value).build();
1113    PrivateCellUtil.setSequenceId(c, sequenceId);
1114    return c;
1115  }
1116
1117  @Test
1118  public void testFlushBeforeCompletingScanWoFilter() throws IOException, InterruptedException {
1119    final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false);
1120    final int expectedSize = 3;
1121    testFlushBeforeCompletingScan(new MyListHook() {
1122      @Override
1123      public void hook(int currentSize) {
1124        if (currentSize == expectedSize - 1) {
1125          try {
1126            flushStore(store, id++);
1127            timeToGoNextRow.set(true);
1128          } catch (IOException e) {
1129            throw new RuntimeException(e);
1130          }
1131        }
1132      }
1133    }, new FilterBase() {
1134      @Override
1135      public Filter.ReturnCode filterCell(final Cell c) throws IOException {
1136        return ReturnCode.INCLUDE;
1137      }
1138    }, expectedSize);
1139  }
1140
1141  @Test
1142  public void testFlushBeforeCompletingScanWithFilter() throws IOException, InterruptedException {
1143    final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false);
1144    final int expectedSize = 2;
1145    testFlushBeforeCompletingScan(new MyListHook() {
1146      @Override
1147      public void hook(int currentSize) {
1148        if (currentSize == expectedSize - 1) {
1149          try {
1150            flushStore(store, id++);
1151            timeToGoNextRow.set(true);
1152          } catch (IOException e) {
1153            throw new RuntimeException(e);
1154          }
1155        }
1156      }
1157    }, new FilterBase() {
1158      @Override
1159      public Filter.ReturnCode filterCell(final Cell c) throws IOException {
1160        if (timeToGoNextRow.get()) {
1161          timeToGoNextRow.set(false);
1162          return ReturnCode.NEXT_ROW;
1163        } else {
1164          return ReturnCode.INCLUDE;
1165        }
1166      }
1167    }, expectedSize);
1168  }
1169
1170  @Test
1171  public void testFlushBeforeCompletingScanWithFilterHint()
1172    throws IOException, InterruptedException {
1173    final AtomicBoolean timeToGetHint = new AtomicBoolean(false);
1174    final int expectedSize = 2;
1175    testFlushBeforeCompletingScan(new MyListHook() {
1176      @Override
1177      public void hook(int currentSize) {
1178        if (currentSize == expectedSize - 1) {
1179          try {
1180            flushStore(store, id++);
1181            timeToGetHint.set(true);
1182          } catch (IOException e) {
1183            throw new RuntimeException(e);
1184          }
1185        }
1186      }
1187    }, new FilterBase() {
1188      @Override
1189      public Filter.ReturnCode filterCell(final Cell c) throws IOException {
1190        if (timeToGetHint.get()) {
1191          timeToGetHint.set(false);
1192          return Filter.ReturnCode.SEEK_NEXT_USING_HINT;
1193        } else {
1194          return Filter.ReturnCode.INCLUDE;
1195        }
1196      }
1197
1198      @Override
1199      public Cell getNextCellHint(Cell currentCell) throws IOException {
1200        return currentCell;
1201      }
1202    }, expectedSize);
1203  }
1204
1205  private void testFlushBeforeCompletingScan(MyListHook hook, Filter filter, int expectedSize)
1206    throws IOException, InterruptedException {
1207    Configuration conf = HBaseConfiguration.create();
1208    byte[] r0 = Bytes.toBytes("row0");
1209    byte[] r1 = Bytes.toBytes("row1");
1210    byte[] r2 = Bytes.toBytes("row2");
1211    byte[] value0 = Bytes.toBytes("value0");
1212    byte[] value1 = Bytes.toBytes("value1");
1213    byte[] value2 = Bytes.toBytes("value2");
1214    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1215    long ts = EnvironmentEdgeManager.currentTime();
1216    long seqId = 100;
1217    init(name.getMethodName(), conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)),
1218      ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(1).build(),
1219      new MyStoreHook() {
1220        @Override
1221        public long getSmallestReadPoint(HStore store) {
1222          return seqId + 3;
1223        }
1224      });
1225    // The cells having the value0 won't be flushed to disk because the value of max version is 1
1226    store.add(createCell(r0, qf1, ts, seqId, value0), memStoreSizing);
1227    store.add(createCell(r0, qf2, ts, seqId, value0), memStoreSizing);
1228    store.add(createCell(r0, qf3, ts, seqId, value0), memStoreSizing);
1229    store.add(createCell(r1, qf1, ts + 1, seqId + 1, value1), memStoreSizing);
1230    store.add(createCell(r1, qf2, ts + 1, seqId + 1, value1), memStoreSizing);
1231    store.add(createCell(r1, qf3, ts + 1, seqId + 1, value1), memStoreSizing);
1232    store.add(createCell(r2, qf1, ts + 2, seqId + 2, value2), memStoreSizing);
1233    store.add(createCell(r2, qf2, ts + 2, seqId + 2, value2), memStoreSizing);
1234    store.add(createCell(r2, qf3, ts + 2, seqId + 2, value2), memStoreSizing);
1235    store.add(createCell(r1, qf1, ts + 3, seqId + 3, value1), memStoreSizing);
1236    store.add(createCell(r1, qf2, ts + 3, seqId + 3, value1), memStoreSizing);
1237    store.add(createCell(r1, qf3, ts + 3, seqId + 3, value1), memStoreSizing);
1238    List<Cell> myList = new MyList<>(hook);
1239    Scan scan = new Scan().withStartRow(r1).setFilter(filter);
1240    try (InternalScanner scanner = (InternalScanner) store.getScanner(scan, null, seqId + 3)) {
1241      // r1
1242      scanner.next(myList);
1243      assertEquals(expectedSize, myList.size());
1244      for (Cell c : myList) {
1245        byte[] actualValue = CellUtil.cloneValue(c);
1246        assertTrue("expected:" + Bytes.toStringBinary(value1) + ", actual:"
1247          + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value1));
1248      }
1249      List<Cell> normalList = new ArrayList<>(3);
1250      // r2
1251      scanner.next(normalList);
1252      assertEquals(3, normalList.size());
1253      for (Cell c : normalList) {
1254        byte[] actualValue = CellUtil.cloneValue(c);
1255        assertTrue("expected:" + Bytes.toStringBinary(value2) + ", actual:"
1256          + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value2));
1257      }
1258    }
1259  }
1260
1261  @Test
1262  public void testPreventLoopRead() throws Exception {
1263    init(this.name.getMethodName());
1264    Configuration conf = HBaseConfiguration.create();
1265    // use small heart beat cells
1266    conf.setLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, 2);
1267    IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
1268    EnvironmentEdgeManager.injectEdge(edge);
1269    byte[] r0 = Bytes.toBytes("row0");
1270    byte[] value0 = Bytes.toBytes("value0");
1271    byte[] value1 = Bytes.toBytes("value1");
1272    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1273    long ts = EnvironmentEdgeManager.currentTime();
1274    long seqId = 100;
1275    init(name.getMethodName(), conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)),
1276      ColumnFamilyDescriptorBuilder.newBuilder(family).setTimeToLive(10).build(),
1277      new MyStoreHook() {
1278        @Override
1279        public long getSmallestReadPoint(HStore store) {
1280          return seqId + 3;
1281        }
1282      });
1283    // The cells having the value0 will be expired
1284    store.add(createCell(r0, qf1, ts, seqId, value0), memStoreSizing);
1285    store.add(createCell(r0, qf2, ts, seqId, value0), memStoreSizing);
1286    store.add(createCell(r0, qf3, ts, seqId, value0), memStoreSizing);
1287    store.add(createCell(r0, qf4, ts + 10000 + 1, seqId, value1), memStoreSizing);
1288    store.add(createCell(r0, qf5, ts, seqId, value0), memStoreSizing);
1289    store.add(createCell(r0, qf6, ts + 10000 + 1, seqId, value1), memStoreSizing);
1290
1291    List<Cell> myList = new ArrayList<>();
1292    Scan scan = new Scan().withStartRow(r0);
1293    ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(false);
1294    // test normal scan, should return all the cells
1295    ScannerContext scannerContext = contextBuilder.build();
1296    try (InternalScanner scanner = (InternalScanner) store.getScanner(scan, null, seqId + 3)) {
1297      scanner.next(myList, scannerContext);
1298      assertEquals(6, myList.size());
1299    }
1300
1301    // test skip two ttl cells and return with empty results, default prevent loop skip is on
1302    edge.incrementTime(10 * 1000);
1303    scannerContext = contextBuilder.build();
1304    myList.clear();
1305    try (InternalScanner scanner = (InternalScanner) store.getScanner(scan, null, seqId + 3)) {
1306      // r0
1307      scanner.next(myList, scannerContext);
1308      assertEquals(0, myList.size());
1309    }
1310
1311    // should scan all non-ttl expired cells by iterative next
1312    int resultCells = 0;
1313    try (InternalScanner scanner = (InternalScanner) store.getScanner(scan, null, seqId + 3)) {
1314      boolean hasMore = true;
1315      while (hasMore) {
1316        myList.clear();
1317        hasMore = scanner.next(myList, scannerContext);
1318        assertTrue(myList.size() < 6);
1319        resultCells += myList.size();
1320      }
1321      for (Cell c : myList) {
1322        byte[] actualValue = CellUtil.cloneValue(c);
1323        assertTrue("expected:" + Bytes.toStringBinary(value1) + ", actual:"
1324          + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value1));
1325      }
1326    }
1327    assertEquals(2, resultCells);
1328  }
1329
1330  @Test
1331  public void testCreateScannerAndSnapshotConcurrently() throws IOException, InterruptedException {
1332    Configuration conf = HBaseConfiguration.create();
1333    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore.class.getName());
1334    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
1335      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
1336    byte[] value = Bytes.toBytes("value");
1337    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1338    long ts = EnvironmentEdgeManager.currentTime();
1339    long seqId = 100;
1340    // older data whihc shouldn't be "seen" by client
1341    store.add(createCell(qf1, ts, seqId, value), memStoreSizing);
1342    store.add(createCell(qf2, ts, seqId, value), memStoreSizing);
1343    store.add(createCell(qf3, ts, seqId, value), memStoreSizing);
1344    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1345    quals.add(qf1);
1346    quals.add(qf2);
1347    quals.add(qf3);
1348    StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY);
1349    MyCompactingMemStore.START_TEST.set(true);
1350    Runnable flush = () -> {
1351      // this is blocked until we create first scanner from pipeline and snapshot -- phase (1/5)
1352      // recreate the active memstore -- phase (4/5)
1353      storeFlushCtx.prepare();
1354    };
1355    ExecutorService service = Executors.newSingleThreadExecutor();
1356    service.submit(flush);
1357    // we get scanner from pipeline and snapshot but they are empty. -- phase (2/5)
1358    // this is blocked until we recreate the active memstore -- phase (3/5)
1359    // we get scanner from active memstore but it is empty -- phase (5/5)
1360    InternalScanner scanner =
1361      (InternalScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1);
1362    service.shutdown();
1363    service.awaitTermination(20, TimeUnit.SECONDS);
1364    try {
1365      try {
1366        List<Cell> results = new ArrayList<>();
1367        scanner.next(results);
1368        assertEquals(3, results.size());
1369        for (Cell c : results) {
1370          byte[] actualValue = CellUtil.cloneValue(c);
1371          assertTrue("expected:" + Bytes.toStringBinary(value) + ", actual:"
1372            + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value));
1373        }
1374      } finally {
1375        scanner.close();
1376      }
1377    } finally {
1378      MyCompactingMemStore.START_TEST.set(false);
1379      storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1380      storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1381    }
1382  }
1383
1384  @Test
1385  public void testScanWithDoubleFlush() throws IOException {
1386    Configuration conf = HBaseConfiguration.create();
1387    // Initialize region
1388    MyStore myStore = initMyStore(name.getMethodName(), conf, new MyStoreHook() {
1389      @Override
1390      public void getScanners(MyStore store) throws IOException {
1391        final long tmpId = id++;
1392        ExecutorService s = Executors.newSingleThreadExecutor();
1393        s.submit(() -> {
1394          try {
1395            // flush the store before storescanner updates the scanners from store.
1396            // The current data will be flushed into files, and the memstore will
1397            // be clear.
1398            // -- phase (4/4)
1399            flushStore(store, tmpId);
1400          } catch (IOException ex) {
1401            throw new RuntimeException(ex);
1402          }
1403        });
1404        s.shutdown();
1405        try {
1406          // wait for the flush, the thread will be blocked in HStore#notifyChangedReadersObservers.
1407          s.awaitTermination(3, TimeUnit.SECONDS);
1408        } catch (InterruptedException ex) {
1409        }
1410      }
1411    });
1412    byte[] oldValue = Bytes.toBytes("oldValue");
1413    byte[] currentValue = Bytes.toBytes("currentValue");
1414    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1415    long ts = EnvironmentEdgeManager.currentTime();
1416    long seqId = 100;
1417    // older data whihc shouldn't be "seen" by client
1418    myStore.add(createCell(qf1, ts, seqId, oldValue), memStoreSizing);
1419    myStore.add(createCell(qf2, ts, seqId, oldValue), memStoreSizing);
1420    myStore.add(createCell(qf3, ts, seqId, oldValue), memStoreSizing);
1421    long snapshotId = id++;
1422    // push older data into snapshot -- phase (1/4)
1423    StoreFlushContext storeFlushCtx =
1424      store.createFlushContext(snapshotId, FlushLifeCycleTracker.DUMMY);
1425    storeFlushCtx.prepare();
1426
1427    // insert current data into active -- phase (2/4)
1428    myStore.add(createCell(qf1, ts + 1, seqId + 1, currentValue), memStoreSizing);
1429    myStore.add(createCell(qf2, ts + 1, seqId + 1, currentValue), memStoreSizing);
1430    myStore.add(createCell(qf3, ts + 1, seqId + 1, currentValue), memStoreSizing);
1431    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1432    quals.add(qf1);
1433    quals.add(qf2);
1434    quals.add(qf3);
1435    try (InternalScanner scanner =
1436      (InternalScanner) myStore.getScanner(new Scan(new Get(row)), quals, seqId + 1)) {
1437      // complete the flush -- phase (3/4)
1438      storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1439      storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1440
1441      List<Cell> results = new ArrayList<>();
1442      scanner.next(results);
1443      assertEquals(3, results.size());
1444      for (Cell c : results) {
1445        byte[] actualValue = CellUtil.cloneValue(c);
1446        assertTrue("expected:" + Bytes.toStringBinary(currentValue) + ", actual:"
1447          + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, currentValue));
1448      }
1449    }
1450  }
1451
1452  @Test
1453  public void testReclaimChunkWhenScaning() throws IOException {
1454    init("testReclaimChunkWhenScaning");
1455    long ts = EnvironmentEdgeManager.currentTime();
1456    long seqId = 100;
1457    byte[] value = Bytes.toBytes("value");
1458    // older data whihc shouldn't be "seen" by client
1459    store.add(createCell(qf1, ts, seqId, value), null);
1460    store.add(createCell(qf2, ts, seqId, value), null);
1461    store.add(createCell(qf3, ts, seqId, value), null);
1462    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1463    quals.add(qf1);
1464    quals.add(qf2);
1465    quals.add(qf3);
1466    try (InternalScanner scanner =
1467      (InternalScanner) store.getScanner(new Scan(new Get(row)), quals, seqId)) {
1468      List<Cell> results = new MyList<>(size -> {
1469        switch (size) {
1470          // 1) we get the first cell (qf1)
1471          // 2) flush the data to have StoreScanner update inner scanners
1472          // 3) the chunk will be reclaimed after updaing
1473          case 1:
1474            try {
1475              flushStore(store, id++);
1476            } catch (IOException e) {
1477              throw new RuntimeException(e);
1478            }
1479            break;
1480          // 1) we get the second cell (qf2)
1481          // 2) add some cell to fill some byte into the chunk (we have only one chunk)
1482          case 2:
1483            try {
1484              byte[] newValue = Bytes.toBytes("newValue");
1485              // older data whihc shouldn't be "seen" by client
1486              store.add(createCell(qf1, ts + 1, seqId + 1, newValue), null);
1487              store.add(createCell(qf2, ts + 1, seqId + 1, newValue), null);
1488              store.add(createCell(qf3, ts + 1, seqId + 1, newValue), null);
1489            } catch (IOException e) {
1490              throw new RuntimeException(e);
1491            }
1492            break;
1493          default:
1494            break;
1495        }
1496      });
1497      scanner.next(results);
1498      assertEquals(3, results.size());
1499      for (Cell c : results) {
1500        byte[] actualValue = CellUtil.cloneValue(c);
1501        assertTrue("expected:" + Bytes.toStringBinary(value) + ", actual:"
1502          + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value));
1503      }
1504    }
1505  }
1506
1507  /**
1508   * If there are two running InMemoryFlushRunnable, the later InMemoryFlushRunnable may change the
1509   * versionedList. And the first InMemoryFlushRunnable will use the chagned versionedList to remove
1510   * the corresponding segments. In short, there will be some segements which isn't in merge are
1511   * removed.
1512   */
1513  @Test
1514  public void testRunDoubleMemStoreCompactors() throws IOException, InterruptedException {
1515    int flushSize = 500;
1516    Configuration conf = HBaseConfiguration.create();
1517    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStoreWithCustomCompactor.class.getName());
1518    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.25);
1519    MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.set(0);
1520    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushSize));
1521    // Set the lower threshold to invoke the "MERGE" policy
1522    conf.set(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, String.valueOf(0));
1523    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
1524      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
1525    byte[] value = Bytes.toBytes("thisisavarylargevalue");
1526    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1527    long ts = EnvironmentEdgeManager.currentTime();
1528    long seqId = 100;
1529    // older data whihc shouldn't be "seen" by client
1530    store.add(createCell(qf1, ts, seqId, value), memStoreSizing);
1531    store.add(createCell(qf2, ts, seqId, value), memStoreSizing);
1532    store.add(createCell(qf3, ts, seqId, value), memStoreSizing);
1533    assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
1534    StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY);
1535    storeFlushCtx.prepare();
1536    // This shouldn't invoke another in-memory flush because the first compactor thread
1537    // hasn't accomplished the in-memory compaction.
1538    store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing);
1539    store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing);
1540    store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing);
1541    assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
1542    // okay. Let the compaction be completed
1543    MyMemStoreCompactor.START_COMPACTOR_LATCH.countDown();
1544    CompactingMemStore mem = (CompactingMemStore) ((HStore) store).memstore;
1545    while (mem.isMemStoreFlushingInMemory()) {
1546      TimeUnit.SECONDS.sleep(1);
1547    }
1548    // This should invoke another in-memory flush.
1549    store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing);
1550    store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing);
1551    store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing);
1552    assertEquals(2, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
1553    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
1554      String.valueOf(TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE));
1555    storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1556    storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1557  }
1558
1559  @Test
1560  public void testAge() throws IOException {
1561    long currentTime = EnvironmentEdgeManager.currentTime();
1562    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
1563    edge.setValue(currentTime);
1564    EnvironmentEdgeManager.injectEdge(edge);
1565    Configuration conf = TEST_UTIL.getConfiguration();
1566    ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(family);
1567    initHRegion(name.getMethodName(), conf,
1568      TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), hcd, null, false);
1569    HStore store = new HStore(region, hcd, conf, false) {
1570
1571      @Override
1572      protected StoreEngine<?, ?, ?, ?> createStoreEngine(HStore store, Configuration conf,
1573        CellComparator kvComparator) throws IOException {
1574        List<HStoreFile> storefiles =
1575          Arrays.asList(mockStoreFile(currentTime - 10), mockStoreFile(currentTime - 100),
1576            mockStoreFile(currentTime - 1000), mockStoreFile(currentTime - 10000));
1577        StoreFileManager sfm = mock(StoreFileManager.class);
1578        when(sfm.getStorefiles()).thenReturn(storefiles);
1579        StoreEngine<?, ?, ?, ?> storeEngine = mock(StoreEngine.class);
1580        when(storeEngine.getStoreFileManager()).thenReturn(sfm);
1581        return storeEngine;
1582      }
1583    };
1584    assertEquals(10L, store.getMinStoreFileAge().getAsLong());
1585    assertEquals(10000L, store.getMaxStoreFileAge().getAsLong());
1586    assertEquals((10 + 100 + 1000 + 10000) / 4.0, store.getAvgStoreFileAge().getAsDouble(), 1E-4);
1587  }
1588
1589  private HStoreFile mockStoreFile(long createdTime) {
1590    StoreFileInfo info = mock(StoreFileInfo.class);
1591    when(info.getCreatedTimestamp()).thenReturn(createdTime);
1592    HStoreFile sf = mock(HStoreFile.class);
1593    when(sf.getReader()).thenReturn(mock(StoreFileReader.class));
1594    when(sf.isHFile()).thenReturn(true);
1595    when(sf.getFileInfo()).thenReturn(info);
1596    return sf;
1597  }
1598
1599  private MyStore initMyStore(String methodName, Configuration conf, MyStoreHook hook)
1600    throws IOException {
1601    return (MyStore) init(methodName, conf,
1602      TableDescriptorBuilder.newBuilder(TableName.valueOf(table)),
1603      ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(5).build(), hook);
1604  }
1605
1606  private static class MyStore extends HStore {
1607    private final MyStoreHook hook;
1608
1609    MyStore(final HRegion region, final ColumnFamilyDescriptor family,
1610      final Configuration confParam, MyStoreHook hook, boolean switchToPread) throws IOException {
1611      super(region, family, confParam, false);
1612      this.hook = hook;
1613    }
1614
1615    @Override
1616    public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks,
1617      boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,
1618      boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt,
1619      boolean includeMemstoreScanner) throws IOException {
1620      hook.getScanners(this);
1621      return super.getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true,
1622        stopRow, false, readPt, includeMemstoreScanner);
1623    }
1624
1625    @Override
1626    public long getSmallestReadPoint() {
1627      return hook.getSmallestReadPoint(this);
1628    }
1629  }
1630
1631  private abstract static class MyStoreHook {
1632
1633    void getScanners(MyStore store) throws IOException {
1634    }
1635
1636    long getSmallestReadPoint(HStore store) {
1637      return store.getHRegion().getSmallestReadPoint();
1638    }
1639  }
1640
1641  @Test
1642  public void testSwitchingPreadtoStreamParallelyWithCompactionDischarger() throws Exception {
1643    Configuration conf = HBaseConfiguration.create();
1644    conf.set("hbase.hstore.engine.class", DummyStoreEngine.class.getName());
1645    conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 0);
1646    // Set the lower threshold to invoke the "MERGE" policy
1647    MyStore store = initMyStore(name.getMethodName(), conf, new MyStoreHook() {
1648    });
1649    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1650    long ts = EnvironmentEdgeManager.currentTime();
1651    long seqID = 1L;
1652    // Add some data to the region and do some flushes
1653    for (int i = 1; i < 10; i++) {
1654      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1655        memStoreSizing);
1656    }
1657    // flush them
1658    flushStore(store, seqID);
1659    for (int i = 11; i < 20; i++) {
1660      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1661        memStoreSizing);
1662    }
1663    // flush them
1664    flushStore(store, seqID);
1665    for (int i = 21; i < 30; i++) {
1666      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1667        memStoreSizing);
1668    }
1669    // flush them
1670    flushStore(store, seqID);
1671
1672    assertEquals(3, store.getStorefilesCount());
1673    Scan scan = new Scan();
1674    scan.addFamily(family);
1675    Collection<HStoreFile> storefiles2 = store.getStorefiles();
1676    ArrayList<HStoreFile> actualStorefiles = Lists.newArrayList(storefiles2);
1677    StoreScanner storeScanner =
1678      (StoreScanner) store.getScanner(scan, scan.getFamilyMap().get(family), Long.MAX_VALUE);
1679    // get the current heap
1680    KeyValueHeap heap = storeScanner.heap;
1681    // create more store files
1682    for (int i = 31; i < 40; i++) {
1683      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1684        memStoreSizing);
1685    }
1686    // flush them
1687    flushStore(store, seqID);
1688
1689    for (int i = 41; i < 50; i++) {
1690      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1691        memStoreSizing);
1692    }
1693    // flush them
1694    flushStore(store, seqID);
1695    storefiles2 = store.getStorefiles();
1696    ArrayList<HStoreFile> actualStorefiles1 = Lists.newArrayList(storefiles2);
1697    actualStorefiles1.removeAll(actualStorefiles);
1698    // Do compaction
1699    MyThread thread = new MyThread(storeScanner);
1700    thread.start();
1701    store.replaceStoreFiles(actualStorefiles, actualStorefiles1, false);
1702    thread.join();
1703    KeyValueHeap heap2 = thread.getHeap();
1704    assertFalse(heap.equals(heap2));
1705  }
1706
1707  @Test
1708  public void testMaxPreadBytesConfiguredToBeLessThanZero() throws Exception {
1709    Configuration conf = HBaseConfiguration.create();
1710    conf.set("hbase.hstore.engine.class", DummyStoreEngine.class.getName());
1711    // Set 'hbase.storescanner.pread.max.bytes' < 0, so that StoreScanner will be a STREAM type.
1712    conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, -1);
1713    MyStore store = initMyStore(name.getMethodName(), conf, new MyStoreHook() {
1714    });
1715    Scan scan = new Scan();
1716    scan.addFamily(family);
1717    // ReadType on Scan is still DEFAULT only.
1718    assertEquals(ReadType.DEFAULT, scan.getReadType());
1719    StoreScanner storeScanner =
1720      (StoreScanner) store.getScanner(scan, scan.getFamilyMap().get(family), Long.MAX_VALUE);
1721    assertFalse(storeScanner.isScanUsePread());
1722  }
1723
1724  @Test
1725  public void testSpaceQuotaChangeAfterReplacement() throws IOException {
1726    final TableName tn = TableName.valueOf(name.getMethodName());
1727    init(name.getMethodName());
1728
1729    RegionSizeStoreImpl sizeStore = new RegionSizeStoreImpl();
1730
1731    HStoreFile sf1 = mockStoreFileWithLength(1024L);
1732    HStoreFile sf2 = mockStoreFileWithLength(2048L);
1733    HStoreFile sf3 = mockStoreFileWithLength(4096L);
1734    HStoreFile sf4 = mockStoreFileWithLength(8192L);
1735
1736    RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tn).setStartKey(Bytes.toBytes("a"))
1737      .setEndKey(Bytes.toBytes("b")).build();
1738
1739    // Compacting two files down to one, reducing size
1740    sizeStore.put(regionInfo, 1024L + 4096L);
1741    store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo, Arrays.asList(sf1, sf3),
1742      Arrays.asList(sf2));
1743
1744    assertEquals(2048L, sizeStore.getRegionSize(regionInfo).getSize());
1745
1746    // The same file length in and out should have no change
1747    store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo, Arrays.asList(sf2),
1748      Arrays.asList(sf2));
1749
1750    assertEquals(2048L, sizeStore.getRegionSize(regionInfo).getSize());
1751
1752    // Increase the total size used
1753    store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo, Arrays.asList(sf2),
1754      Arrays.asList(sf3));
1755
1756    assertEquals(4096L, sizeStore.getRegionSize(regionInfo).getSize());
1757
1758    RegionInfo regionInfo2 = RegionInfoBuilder.newBuilder(tn).setStartKey(Bytes.toBytes("b"))
1759      .setEndKey(Bytes.toBytes("c")).build();
1760    store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo2, null, Arrays.asList(sf4));
1761
1762    assertEquals(8192L, sizeStore.getRegionSize(regionInfo2).getSize());
1763  }
1764
1765  @Test
1766  public void testHFileContextSetWithCFAndTable() throws Exception {
1767    init(this.name.getMethodName());
1768    StoreFileWriter writer = store.getStoreEngine()
1769      .createWriter(CreateStoreFileWriterParams.create().maxKeyCount(10000L)
1770        .compression(Compression.Algorithm.NONE).isCompaction(true).includeMVCCReadpoint(true)
1771        .includesTag(false).shouldDropBehind(true));
1772    HFileContext hFileContext = writer.getHFileWriter().getFileContext();
1773    assertArrayEquals(family, hFileContext.getColumnFamily());
1774    assertArrayEquals(table, hFileContext.getTableName());
1775  }
1776
1777  // This test is for HBASE-26026, HBase Write be stuck when active segment has no cell
1778  // but its dataSize exceeds inmemoryFlushSize
1779  @Test
1780  public void testCompactingMemStoreNoCellButDataSizeExceedsInmemoryFlushSize()
1781    throws IOException, InterruptedException {
1782    Configuration conf = HBaseConfiguration.create();
1783
1784    byte[] smallValue = new byte[3];
1785    byte[] largeValue = new byte[9];
1786    final long timestamp = EnvironmentEdgeManager.currentTime();
1787    final long seqId = 100;
1788    final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
1789    final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
1790    int smallCellByteSize = MutableSegment.getCellLength(smallCell);
1791    int largeCellByteSize = MutableSegment.getCellLength(largeCell);
1792    int flushByteSize = smallCellByteSize + largeCellByteSize - 2;
1793
1794    // set CompactingMemStore.inmemoryFlushSize to flushByteSize.
1795    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore2.class.getName());
1796    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
1797    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
1798
1799    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
1800      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
1801
1802    MyCompactingMemStore2 myCompactingMemStore = ((MyCompactingMemStore2) store.memstore);
1803    assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
1804    myCompactingMemStore.smallCellPreUpdateCounter.set(0);
1805    myCompactingMemStore.largeCellPreUpdateCounter.set(0);
1806
1807    final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
1808    Thread smallCellThread = new Thread(() -> {
1809      try {
1810        store.add(smallCell, new NonThreadSafeMemStoreSizing());
1811      } catch (Throwable exception) {
1812        exceptionRef.set(exception);
1813      }
1814    });
1815    smallCellThread.setName(MyCompactingMemStore2.SMALL_CELL_THREAD_NAME);
1816    smallCellThread.start();
1817
1818    String oldThreadName = Thread.currentThread().getName();
1819    try {
1820      /**
1821       * 1.smallCellThread enters CompactingMemStore.checkAndAddToActiveSize first, then
1822       * largeCellThread enters CompactingMemStore.checkAndAddToActiveSize, and then largeCellThread
1823       * invokes flushInMemory.
1824       * <p/>
1825       * 2. After largeCellThread finished CompactingMemStore.flushInMemory method, smallCellThread
1826       * can add cell to currentActive . That is to say when largeCellThread called flushInMemory
1827       * method, CompactingMemStore.active has no cell.
1828       */
1829      Thread.currentThread().setName(MyCompactingMemStore2.LARGE_CELL_THREAD_NAME);
1830      store.add(largeCell, new NonThreadSafeMemStoreSizing());
1831      smallCellThread.join();
1832
1833      for (int i = 0; i < 100; i++) {
1834        long currentTimestamp = timestamp + 100 + i;
1835        Cell cell = createCell(qf2, currentTimestamp, seqId, largeValue);
1836        store.add(cell, new NonThreadSafeMemStoreSizing());
1837      }
1838    } finally {
1839      Thread.currentThread().setName(oldThreadName);
1840    }
1841
1842    assertTrue(exceptionRef.get() == null);
1843
1844  }
1845
1846  // This test is for HBASE-26210, HBase Write be stuck when there is cell which size exceeds
1847  // InmemoryFlushSize
1848  @Test(timeout = 60000)
1849  public void testCompactingMemStoreCellExceedInmemoryFlushSize() throws Exception {
1850    Configuration conf = HBaseConfiguration.create();
1851    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore6.class.getName());
1852
1853    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
1854      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
1855
1856    MyCompactingMemStore6 myCompactingMemStore = ((MyCompactingMemStore6) store.memstore);
1857
1858    int size = (int) (myCompactingMemStore.getInmemoryFlushSize());
1859    byte[] value = new byte[size + 1];
1860
1861    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1862    long timestamp = EnvironmentEdgeManager.currentTime();
1863    long seqId = 100;
1864    Cell cell = createCell(qf1, timestamp, seqId, value);
1865    int cellByteSize = MutableSegment.getCellLength(cell);
1866    store.add(cell, memStoreSizing);
1867    assertTrue(memStoreSizing.getCellsCount() == 1);
1868    assertTrue(memStoreSizing.getDataSize() == cellByteSize);
1869    // Waiting the in memory compaction completed, see HBASE-26438
1870    myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
1871  }
1872
1873  // This test is for HBASE-26210 also, test write large cell and small cell concurrently when
1874  // InmemoryFlushSize is smaller,equal with and larger than cell size.
1875  @Test
1876  public void testCompactingMemStoreWriteLargeCellAndSmallCellConcurrently()
1877    throws IOException, InterruptedException {
1878    doWriteTestLargeCellAndSmallCellConcurrently(
1879      (smallCellByteSize, largeCellByteSize) -> largeCellByteSize - 1);
1880    doWriteTestLargeCellAndSmallCellConcurrently(
1881      (smallCellByteSize, largeCellByteSize) -> largeCellByteSize);
1882    doWriteTestLargeCellAndSmallCellConcurrently(
1883      (smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize - 1);
1884    doWriteTestLargeCellAndSmallCellConcurrently(
1885      (smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize);
1886    doWriteTestLargeCellAndSmallCellConcurrently(
1887      (smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize + 1);
1888  }
1889
1890  private void doWriteTestLargeCellAndSmallCellConcurrently(IntBinaryOperator getFlushByteSize)
1891    throws IOException, InterruptedException {
1892
1893    Configuration conf = HBaseConfiguration.create();
1894
1895    byte[] smallValue = new byte[3];
1896    byte[] largeValue = new byte[100];
1897    final long timestamp = EnvironmentEdgeManager.currentTime();
1898    final long seqId = 100;
1899    final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
1900    final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
1901    int smallCellByteSize = MutableSegment.getCellLength(smallCell);
1902    int largeCellByteSize = MutableSegment.getCellLength(largeCell);
1903    int flushByteSize = getFlushByteSize.applyAsInt(smallCellByteSize, largeCellByteSize);
1904    boolean flushByteSizeLessThanSmallAndLargeCellSize =
1905      flushByteSize < (smallCellByteSize + largeCellByteSize);
1906
1907    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore3.class.getName());
1908    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
1909    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
1910
1911    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
1912      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
1913
1914    MyCompactingMemStore3 myCompactingMemStore = ((MyCompactingMemStore3) store.memstore);
1915    assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
1916    myCompactingMemStore.disableCompaction();
1917    if (flushByteSizeLessThanSmallAndLargeCellSize) {
1918      myCompactingMemStore.flushByteSizeLessThanSmallAndLargeCellSize = true;
1919    } else {
1920      myCompactingMemStore.flushByteSizeLessThanSmallAndLargeCellSize = false;
1921    }
1922
1923    final ThreadSafeMemStoreSizing memStoreSizing = new ThreadSafeMemStoreSizing();
1924    final AtomicLong totalCellByteSize = new AtomicLong(0);
1925    final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
1926    Thread smallCellThread = new Thread(() -> {
1927      try {
1928        for (int i = 1; i <= MyCompactingMemStore3.CELL_COUNT; i++) {
1929          long currentTimestamp = timestamp + i;
1930          Cell cell = createCell(qf1, currentTimestamp, seqId, smallValue);
1931          totalCellByteSize.addAndGet(MutableSegment.getCellLength(cell));
1932          store.add(cell, memStoreSizing);
1933        }
1934      } catch (Throwable exception) {
1935        exceptionRef.set(exception);
1936
1937      }
1938    });
1939    smallCellThread.setName(MyCompactingMemStore3.SMALL_CELL_THREAD_NAME);
1940    smallCellThread.start();
1941
1942    String oldThreadName = Thread.currentThread().getName();
1943    try {
1944      /**
1945       * When flushByteSizeLessThanSmallAndLargeCellSize is true:
1946       * </p>
1947       * 1.smallCellThread enters MyCompactingMemStore3.checkAndAddToActiveSize first, then
1948       * largeCellThread enters MyCompactingMemStore3.checkAndAddToActiveSize, and then
1949       * largeCellThread invokes flushInMemory.
1950       * <p/>
1951       * 2. After largeCellThread finished CompactingMemStore.flushInMemory method, smallCellThread
1952       * can run into MyCompactingMemStore3.checkAndAddToActiveSize again.
1953       * <p/>
1954       * When flushByteSizeLessThanSmallAndLargeCellSize is false: smallCellThread and
1955       * largeCellThread concurrently write one cell and wait each other, and then write another
1956       * cell etc.
1957       */
1958      Thread.currentThread().setName(MyCompactingMemStore3.LARGE_CELL_THREAD_NAME);
1959      for (int i = 1; i <= MyCompactingMemStore3.CELL_COUNT; i++) {
1960        long currentTimestamp = timestamp + i;
1961        Cell cell = createCell(qf2, currentTimestamp, seqId, largeValue);
1962        totalCellByteSize.addAndGet(MutableSegment.getCellLength(cell));
1963        store.add(cell, memStoreSizing);
1964      }
1965      smallCellThread.join();
1966
1967      assertTrue(exceptionRef.get() == null);
1968      assertTrue(memStoreSizing.getCellsCount() == (MyCompactingMemStore3.CELL_COUNT * 2));
1969      assertTrue(memStoreSizing.getDataSize() == totalCellByteSize.get());
1970      if (flushByteSizeLessThanSmallAndLargeCellSize) {
1971        assertTrue(myCompactingMemStore.flushCounter.get() == MyCompactingMemStore3.CELL_COUNT);
1972      } else {
1973        assertTrue(
1974          myCompactingMemStore.flushCounter.get() <= (MyCompactingMemStore3.CELL_COUNT - 1));
1975      }
1976    } finally {
1977      Thread.currentThread().setName(oldThreadName);
1978    }
1979  }
1980
1981  /**
1982   * <pre>
1983   * This test is for HBASE-26384,
1984   * test {@link CompactingMemStore#flattenOneSegment} and {@link CompactingMemStore#snapshot()}
1985   * execute concurrently.
1986   * The threads sequence before HBASE-26384 is(The bug only exists for branch-2,and I add UTs
1987   * for both branch-2 and master):
1988   * 1. The {@link CompactingMemStore} size exceeds
1989   *    {@link CompactingMemStore#getInmemoryFlushSize()},the write thread adds a new
1990   *    {@link ImmutableSegment}  to the head of {@link CompactingMemStore#pipeline},and start a
1991   *    in memory compact thread to execute {@link CompactingMemStore#inMemoryCompaction}.
1992   * 2. The in memory compact thread starts and then stopping before
1993   *    {@link CompactingMemStore#flattenOneSegment}.
1994   * 3. The snapshot thread starts {@link CompactingMemStore#snapshot} concurrently,after the
1995   *    snapshot thread executing {@link CompactingMemStore#getImmutableSegments},the in memory
1996   *    compact thread continues.
1997   *    Assuming {@link VersionedSegmentsList#version} returned from
1998   *    {@link CompactingMemStore#getImmutableSegments} is v.
1999   * 4. The snapshot thread stopping before {@link CompactingMemStore#swapPipelineWithNull}.
2000   * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
2001   *    {@link CompactionPipeline#version} is still v.
2002   * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
2003   *    {@link CompactionPipeline#version} is v, {@link CompactingMemStore#swapPipelineWithNull}
2004   *    thinks it is successful and continue flushing,but the {@link ImmutableSegment} in
2005   *    {@link CompactionPipeline} has changed because
2006   *    {@link CompactingMemStore#flattenOneSegment},so the {@link ImmutableSegment} is not
2007   *    removed in fact and still remaining in {@link CompactionPipeline}.
2008   *
2009   * After HBASE-26384, the 5-6 step is changed to following, which is expected behavior:
2010   * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
2011   *    {@link CompactingMemStore#flattenOneSegment} change {@link CompactionPipeline#version} to
2012   *    v+1.
2013   * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
2014   *    {@link CompactionPipeline#version} is v+1, {@link CompactingMemStore#swapPipelineWithNull}
2015   *    failed and retry the while loop in {@link CompactingMemStore#pushPipelineToSnapshot} once
2016   *    again, because there is no concurrent {@link CompactingMemStore#inMemoryCompaction} now,
2017   *    {@link CompactingMemStore#swapPipelineWithNull} succeeds.
2018   * </pre>
2019   */
2020  @Test
2021  public void testFlattenAndSnapshotCompactingMemStoreConcurrently() throws Exception {
2022    Configuration conf = HBaseConfiguration.create();
2023
2024    byte[] smallValue = new byte[3];
2025    byte[] largeValue = new byte[9];
2026    final long timestamp = EnvironmentEdgeManager.currentTime();
2027    final long seqId = 100;
2028    final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
2029    final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
2030    int smallCellByteSize = MutableSegment.getCellLength(smallCell);
2031    int largeCellByteSize = MutableSegment.getCellLength(largeCell);
2032    int totalCellByteSize = (smallCellByteSize + largeCellByteSize);
2033    int flushByteSize = totalCellByteSize - 2;
2034
2035    // set CompactingMemStore.inmemoryFlushSize to flushByteSize.
2036    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore4.class.getName());
2037    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
2038    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
2039
2040    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
2041      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
2042
2043    MyCompactingMemStore4 myCompactingMemStore = ((MyCompactingMemStore4) store.memstore);
2044    assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
2045
2046    store.add(smallCell, new NonThreadSafeMemStoreSizing());
2047    store.add(largeCell, new NonThreadSafeMemStoreSizing());
2048
2049    String oldThreadName = Thread.currentThread().getName();
2050    try {
2051      Thread.currentThread().setName(MyCompactingMemStore4.TAKE_SNAPSHOT_THREAD_NAME);
2052      /**
2053       * {@link CompactingMemStore#snapshot} must wait the in memory compact thread enters
2054       * {@link CompactingMemStore#flattenOneSegment},because {@link CompactingMemStore#snapshot}
2055       * would invoke {@link CompactingMemStore#stopCompaction}.
2056       */
2057      myCompactingMemStore.snapShotStartCyclicCyclicBarrier.await();
2058
2059      MemStoreSnapshot memStoreSnapshot = myCompactingMemStore.snapshot();
2060      myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
2061
2062      assertTrue(memStoreSnapshot.getCellsCount() == 2);
2063      assertTrue(((int) (memStoreSnapshot.getDataSize())) == totalCellByteSize);
2064      VersionedSegmentsList segments = myCompactingMemStore.getImmutableSegments();
2065      assertTrue(segments.getNumOfSegments() == 0);
2066      assertTrue(segments.getNumOfCells() == 0);
2067      assertTrue(myCompactingMemStore.setInMemoryCompactionFlagCounter.get() == 1);
2068      assertTrue(myCompactingMemStore.swapPipelineWithNullCounter.get() == 2);
2069    } finally {
2070      Thread.currentThread().setName(oldThreadName);
2071    }
2072  }
2073
2074  /**
2075   * <pre>
2076   * This test is for HBASE-26384,
2077   * test {@link CompactingMemStore#flattenOneSegment}{@link CompactingMemStore#snapshot()}
2078   * and writeMemStore execute concurrently.
2079   * The threads sequence before HBASE-26384 is(The bug only exists for branch-2,and I add UTs
2080   * for both branch-2 and master):
2081   * 1. The {@link CompactingMemStore} size exceeds
2082   *    {@link CompactingMemStore#getInmemoryFlushSize()},the write thread adds a new
2083   *    {@link ImmutableSegment}  to the head of {@link CompactingMemStore#pipeline},and start a
2084   *    in memory compact thread to execute {@link CompactingMemStore#inMemoryCompaction}.
2085   * 2. The in memory compact thread starts and then stopping before
2086   *    {@link CompactingMemStore#flattenOneSegment}.
2087   * 3. The snapshot thread starts {@link CompactingMemStore#snapshot} concurrently,after the
2088   *    snapshot thread executing {@link CompactingMemStore#getImmutableSegments},the in memory
2089   *    compact thread continues.
2090   *    Assuming {@link VersionedSegmentsList#version} returned from
2091   *    {@link CompactingMemStore#getImmutableSegments} is v.
2092   * 4. The snapshot thread stopping before {@link CompactingMemStore#swapPipelineWithNull}.
2093   * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
2094   *    {@link CompactionPipeline#version} is still v.
2095   * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
2096   *    {@link CompactionPipeline#version} is v, {@link CompactingMemStore#swapPipelineWithNull}
2097   *    thinks it is successful and continue flushing,but the {@link ImmutableSegment} in
2098   *    {@link CompactionPipeline} has changed because
2099   *    {@link CompactingMemStore#flattenOneSegment},so the {@link ImmutableSegment} is not
2100   *    removed in fact and still remaining in {@link CompactionPipeline}.
2101   *
2102   * After HBASE-26384, the 5-6 step is changed to following, which is expected behavior,
2103   * and I add step 7-8 to test there is new segment added before retry.
2104   * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
2105   *    {@link CompactingMemStore#flattenOneSegment} change {@link CompactionPipeline#version} to
2106   *     v+1.
2107   * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
2108   *    {@link CompactionPipeline#version} is v+1, {@link CompactingMemStore#swapPipelineWithNull}
2109   *    failed and retry,{@link VersionedSegmentsList#version} returned from
2110   *    {@link CompactingMemStore#getImmutableSegments} is v+1.
2111   * 7. The write thread continues writing to {@link CompactingMemStore} and
2112   *    {@link CompactingMemStore} size exceeds {@link CompactingMemStore#getInmemoryFlushSize()},
2113   *    {@link CompactingMemStore#flushInMemory(MutableSegment)} is called and a new
2114   *    {@link ImmutableSegment} is added to the head of {@link CompactingMemStore#pipeline},
2115   *    {@link CompactionPipeline#version} is still v+1.
2116   * 8. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
2117   *    {@link CompactionPipeline#version} is still v+1,
2118   *    {@link CompactingMemStore#swapPipelineWithNull} succeeds.The new {@link ImmutableSegment}
2119   *    remained at the head of {@link CompactingMemStore#pipeline},the old is removed by
2120   *    {@link CompactingMemStore#swapPipelineWithNull}.
2121   * </pre>
2122   */
2123  @Test
2124  public void testFlattenSnapshotWriteCompactingMemeStoreConcurrently() throws Exception {
2125    Configuration conf = HBaseConfiguration.create();
2126
2127    byte[] smallValue = new byte[3];
2128    byte[] largeValue = new byte[9];
2129    final long timestamp = EnvironmentEdgeManager.currentTime();
2130    final long seqId = 100;
2131    final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
2132    final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
2133    int smallCellByteSize = MutableSegment.getCellLength(smallCell);
2134    int largeCellByteSize = MutableSegment.getCellLength(largeCell);
2135    int firstWriteCellByteSize = (smallCellByteSize + largeCellByteSize);
2136    int flushByteSize = firstWriteCellByteSize - 2;
2137
2138    // set CompactingMemStore.inmemoryFlushSize to flushByteSize.
2139    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore5.class.getName());
2140    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
2141    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
2142
2143    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
2144      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
2145
2146    final MyCompactingMemStore5 myCompactingMemStore = ((MyCompactingMemStore5) store.memstore);
2147    assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
2148
2149    store.add(smallCell, new NonThreadSafeMemStoreSizing());
2150    store.add(largeCell, new NonThreadSafeMemStoreSizing());
2151
2152    final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
2153    final Cell writeAgainCell1 = createCell(qf3, timestamp, seqId + 1, largeValue);
2154    final Cell writeAgainCell2 = createCell(qf4, timestamp, seqId + 1, largeValue);
2155    final int writeAgainCellByteSize =
2156      MutableSegment.getCellLength(writeAgainCell1) + MutableSegment.getCellLength(writeAgainCell2);
2157    final Thread writeAgainThread = new Thread(() -> {
2158      try {
2159        myCompactingMemStore.writeMemStoreAgainStartCyclicBarrier.await();
2160
2161        store.add(writeAgainCell1, new NonThreadSafeMemStoreSizing());
2162        store.add(writeAgainCell2, new NonThreadSafeMemStoreSizing());
2163
2164        myCompactingMemStore.writeMemStoreAgainEndCyclicBarrier.await();
2165      } catch (Throwable exception) {
2166        exceptionRef.set(exception);
2167      }
2168    });
2169    writeAgainThread.setName(MyCompactingMemStore5.WRITE_AGAIN_THREAD_NAME);
2170    writeAgainThread.start();
2171
2172    String oldThreadName = Thread.currentThread().getName();
2173    try {
2174      Thread.currentThread().setName(MyCompactingMemStore5.TAKE_SNAPSHOT_THREAD_NAME);
2175      /**
2176       * {@link CompactingMemStore#snapshot} must wait the in memory compact thread enters
2177       * {@link CompactingMemStore#flattenOneSegment},because {@link CompactingMemStore#snapshot}
2178       * would invoke {@link CompactingMemStore#stopCompaction}.
2179       */
2180      myCompactingMemStore.snapShotStartCyclicCyclicBarrier.await();
2181      MemStoreSnapshot memStoreSnapshot = myCompactingMemStore.snapshot();
2182      myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
2183      writeAgainThread.join();
2184
2185      assertTrue(memStoreSnapshot.getCellsCount() == 2);
2186      assertTrue(((int) (memStoreSnapshot.getDataSize())) == firstWriteCellByteSize);
2187      VersionedSegmentsList segments = myCompactingMemStore.getImmutableSegments();
2188      assertTrue(segments.getNumOfSegments() == 1);
2189      assertTrue(
2190        ((int) (segments.getStoreSegments().get(0).getDataSize())) == writeAgainCellByteSize);
2191      assertTrue(segments.getNumOfCells() == 2);
2192      assertTrue(myCompactingMemStore.setInMemoryCompactionFlagCounter.get() == 2);
2193      assertTrue(exceptionRef.get() == null);
2194      assertTrue(myCompactingMemStore.swapPipelineWithNullCounter.get() == 2);
2195    } finally {
2196      Thread.currentThread().setName(oldThreadName);
2197    }
2198  }
2199
2200  /**
2201   * <pre>
2202   * This test is for HBASE-26465,
2203   * test {@link DefaultMemStore#clearSnapshot} and {@link DefaultMemStore#getScanners} execute
2204   * concurrently. The threads sequence before HBASE-26465 is:
2205   * 1.The flush thread starts {@link DefaultMemStore} flushing after some cells have be added to
2206   *  {@link DefaultMemStore}.
2207   * 2.The flush thread stopping before {@link DefaultMemStore#clearSnapshot} in
2208   *   {@link HStore#updateStorefiles} after completed flushing memStore to hfile.
2209   * 3.The scan thread starts and stopping after {@link DefaultMemStore#getSnapshotSegments} in
2210   *   {@link DefaultMemStore#getScanners},here the scan thread gets the
2211   *   {@link DefaultMemStore#snapshot} which is created by the flush thread.
2212   * 4.The flush thread continues {@link DefaultMemStore#clearSnapshot} and close
2213   *   {@link DefaultMemStore#snapshot},because the reference count of the corresponding
2214   *   {@link MemStoreLABImpl} is 0, the {@link Chunk}s in corresponding {@link MemStoreLABImpl}
2215   *   are recycled.
2216   * 5.The scan thread continues {@link DefaultMemStore#getScanners},and create a
2217   *   {@link SegmentScanner} for this {@link DefaultMemStore#snapshot}, and increase the
2218   *   reference count of the corresponding {@link MemStoreLABImpl}, but {@link Chunk}s in
2219   *   corresponding {@link MemStoreLABImpl} are recycled by step 4, and these {@link Chunk}s may
2220   *   be overwritten by other write threads,which may cause serious problem.
2221   * After HBASE-26465,{@link DefaultMemStore#getScanners} and
2222   * {@link DefaultMemStore#clearSnapshot} could not execute concurrently.
2223   * </pre>
2224   */
2225  @Test
2226  public void testClearSnapshotGetScannerConcurrently() throws Exception {
2227    Configuration conf = HBaseConfiguration.create();
2228
2229    byte[] smallValue = new byte[3];
2230    byte[] largeValue = new byte[9];
2231    final long timestamp = EnvironmentEdgeManager.currentTime();
2232    final long seqId = 100;
2233    final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
2234    final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
2235    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
2236    quals.add(qf1);
2237    quals.add(qf2);
2238
2239    conf.set(HStore.MEMSTORE_CLASS_NAME, MyDefaultMemStore.class.getName());
2240    conf.setBoolean(WALFactory.WAL_ENABLED, false);
2241
2242    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build());
2243    MyDefaultMemStore myDefaultMemStore = (MyDefaultMemStore) (store.memstore);
2244    myDefaultMemStore.store = store;
2245
2246    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
2247    store.add(smallCell, memStoreSizing);
2248    store.add(largeCell, memStoreSizing);
2249
2250    final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
2251    final Thread flushThread = new Thread(() -> {
2252      try {
2253        flushStore(store, id++);
2254      } catch (Throwable exception) {
2255        exceptionRef.set(exception);
2256      }
2257    });
2258    flushThread.setName(MyDefaultMemStore.FLUSH_THREAD_NAME);
2259    flushThread.start();
2260
2261    String oldThreadName = Thread.currentThread().getName();
2262    StoreScanner storeScanner = null;
2263    try {
2264      Thread.currentThread().setName(MyDefaultMemStore.GET_SCANNER_THREAD_NAME);
2265
2266      /**
2267       * Wait flush thread stopping before {@link DefaultMemStore#doClearSnapshot}
2268       */
2269      myDefaultMemStore.getScannerCyclicBarrier.await();
2270
2271      storeScanner = (StoreScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1);
2272      flushThread.join();
2273
2274      if (myDefaultMemStore.shouldWait) {
2275        SegmentScanner segmentScanner = getTypeKeyValueScanner(storeScanner, SegmentScanner.class);
2276        MemStoreLABImpl memStoreLAB = (MemStoreLABImpl) (segmentScanner.segment.getMemStoreLAB());
2277        assertTrue(memStoreLAB.isClosed());
2278        assertTrue(!memStoreLAB.chunks.isEmpty());
2279        assertTrue(!memStoreLAB.isReclaimed());
2280
2281        Cell cell1 = segmentScanner.next();
2282        CellUtil.equals(smallCell, cell1);
2283        Cell cell2 = segmentScanner.next();
2284        CellUtil.equals(largeCell, cell2);
2285        assertNull(segmentScanner.next());
2286      } else {
2287        List<Cell> results = new ArrayList<>();
2288        storeScanner.next(results);
2289        assertEquals(2, results.size());
2290        CellUtil.equals(smallCell, results.get(0));
2291        CellUtil.equals(largeCell, results.get(1));
2292      }
2293      assertTrue(exceptionRef.get() == null);
2294    } finally {
2295      if (storeScanner != null) {
2296        storeScanner.close();
2297      }
2298      Thread.currentThread().setName(oldThreadName);
2299    }
2300  }
2301
2302  @SuppressWarnings("unchecked")
2303  private <T> T getTypeKeyValueScanner(StoreScanner storeScanner, Class<T> keyValueScannerClass) {
2304    List<T> resultScanners = new ArrayList<T>();
2305    for (KeyValueScanner keyValueScanner : storeScanner.currentScanners) {
2306      if (keyValueScannerClass.isInstance(keyValueScanner)) {
2307        resultScanners.add((T) keyValueScanner);
2308      }
2309    }
2310    assertTrue(resultScanners.size() == 1);
2311    return resultScanners.get(0);
2312  }
2313
2314  @Test
2315  public void testOnConfigurationChange() throws IOException {
2316    final int COMMON_MAX_FILES_TO_COMPACT = 10;
2317    final int NEW_COMMON_MAX_FILES_TO_COMPACT = 8;
2318    final int STORE_MAX_FILES_TO_COMPACT = 6;
2319
2320    // Build a table that its maxFileToCompact different from common configuration.
2321    Configuration conf = HBaseConfiguration.create();
2322    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY,
2323      COMMON_MAX_FILES_TO_COMPACT);
2324    ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.newBuilder(family)
2325      .setConfiguration(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY,
2326        String.valueOf(STORE_MAX_FILES_TO_COMPACT))
2327      .build();
2328    init(this.name.getMethodName(), conf, hcd);
2329
2330    // After updating common configuration, the conf in HStore itself must not be changed.
2331    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY,
2332      NEW_COMMON_MAX_FILES_TO_COMPACT);
2333    this.store.onConfigurationChange(conf);
2334    assertEquals(STORE_MAX_FILES_TO_COMPACT,
2335      store.getStoreEngine().getCompactionPolicy().getConf().getMaxFilesToCompact());
2336  }
2337
2338  /**
2339   * This test is for HBASE-26476
2340   */
2341  @Test
2342  public void testExtendsDefaultMemStore() throws Exception {
2343    Configuration conf = HBaseConfiguration.create();
2344    conf.setBoolean(WALFactory.WAL_ENABLED, false);
2345
2346    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build());
2347    assertTrue(this.store.memstore.getClass() == DefaultMemStore.class);
2348    tearDown();
2349
2350    conf.set(HStore.MEMSTORE_CLASS_NAME, CustomDefaultMemStore.class.getName());
2351    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build());
2352    assertTrue(this.store.memstore.getClass() == CustomDefaultMemStore.class);
2353  }
2354
2355  static class CustomDefaultMemStore extends DefaultMemStore {
2356
2357    public CustomDefaultMemStore(Configuration conf, CellComparator c,
2358      RegionServicesForStores regionServices) {
2359      super(conf, c, regionServices);
2360    }
2361
2362  }
2363
2364  /**
2365   * This test is for HBASE-26488
2366   */
2367  @Test
2368  public void testMemoryLeakWhenFlushMemStoreRetrying() throws Exception {
2369
2370    Configuration conf = HBaseConfiguration.create();
2371
2372    byte[] smallValue = new byte[3];
2373    byte[] largeValue = new byte[9];
2374    final long timestamp = EnvironmentEdgeManager.currentTime();
2375    final long seqId = 100;
2376    final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
2377    final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
2378    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
2379    quals.add(qf1);
2380    quals.add(qf2);
2381
2382    conf.set(HStore.MEMSTORE_CLASS_NAME, MyDefaultMemStore1.class.getName());
2383    conf.setBoolean(WALFactory.WAL_ENABLED, false);
2384    conf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY,
2385      MyDefaultStoreFlusher.class.getName());
2386
2387    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build());
2388    MyDefaultMemStore1 myDefaultMemStore = (MyDefaultMemStore1) (store.memstore);
2389    assertTrue((store.storeEngine.getStoreFlusher()) instanceof MyDefaultStoreFlusher);
2390
2391    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
2392    store.add(smallCell, memStoreSizing);
2393    store.add(largeCell, memStoreSizing);
2394    flushStore(store, id++);
2395
2396    MemStoreLABImpl memStoreLAB =
2397      (MemStoreLABImpl) (myDefaultMemStore.snapshotImmutableSegment.getMemStoreLAB());
2398    assertTrue(memStoreLAB.isClosed());
2399    assertTrue(memStoreLAB.getRefCntValue() == 0);
2400    assertTrue(memStoreLAB.isReclaimed());
2401    assertTrue(memStoreLAB.chunks.isEmpty());
2402    StoreScanner storeScanner = null;
2403    try {
2404      storeScanner = (StoreScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1);
2405      assertTrue(store.storeEngine.getStoreFileManager().getStorefileCount() == 1);
2406      assertTrue(store.memstore.size().getCellsCount() == 0);
2407      assertTrue(store.memstore.getSnapshotSize().getCellsCount() == 0);
2408      assertTrue(storeScanner.currentScanners.size() == 1);
2409      assertTrue(storeScanner.currentScanners.get(0) instanceof StoreFileScanner);
2410
2411      List<Cell> results = new ArrayList<>();
2412      storeScanner.next(results);
2413      assertEquals(2, results.size());
2414      CellUtil.equals(smallCell, results.get(0));
2415      CellUtil.equals(largeCell, results.get(1));
2416    } finally {
2417      if (storeScanner != null) {
2418        storeScanner.close();
2419      }
2420    }
2421  }
2422
2423  static class MyDefaultMemStore1 extends DefaultMemStore {
2424
2425    private ImmutableSegment snapshotImmutableSegment;
2426
2427    public MyDefaultMemStore1(Configuration conf, CellComparator c,
2428      RegionServicesForStores regionServices) {
2429      super(conf, c, regionServices);
2430    }
2431
2432    @Override
2433    public MemStoreSnapshot snapshot() {
2434      MemStoreSnapshot result = super.snapshot();
2435      this.snapshotImmutableSegment = snapshot;
2436      return result;
2437    }
2438
2439  }
2440
2441  public static class MyDefaultStoreFlusher extends DefaultStoreFlusher {
2442    private static final AtomicInteger failCounter = new AtomicInteger(1);
2443    private static final AtomicInteger counter = new AtomicInteger(0);
2444
2445    public MyDefaultStoreFlusher(Configuration conf, HStore store) {
2446      super(conf, store);
2447    }
2448
2449    @Override
2450    public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
2451      MonitoredTask status, ThroughputController throughputController,
2452      FlushLifeCycleTracker tracker, Consumer<Path> writerCreationTracker) throws IOException {
2453      counter.incrementAndGet();
2454      return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker,
2455        writerCreationTracker);
2456    }
2457
2458    @Override
2459    protected void performFlush(InternalScanner scanner, final CellSink sink,
2460      ThroughputController throughputController) throws IOException {
2461
2462      final int currentCount = counter.get();
2463      CellSink newCellSink = (cell) -> {
2464        if (currentCount <= failCounter.get()) {
2465          throw new IOException("Simulated exception by tests");
2466        }
2467        sink.append(cell);
2468      };
2469      super.performFlush(scanner, newCellSink, throughputController);
2470    }
2471  }
2472
2473  /**
2474   * This test is for HBASE-26494, test the {@link RefCnt} behaviors in {@link ImmutableMemStoreLAB}
2475   */
2476  @Test
2477  public void testImmutableMemStoreLABRefCnt() throws Exception {
2478    Configuration conf = HBaseConfiguration.create();
2479
2480    byte[] smallValue = new byte[3];
2481    byte[] largeValue = new byte[9];
2482    final long timestamp = EnvironmentEdgeManager.currentTime();
2483    final long seqId = 100;
2484    final Cell smallCell1 = createCell(qf1, timestamp, seqId, smallValue);
2485    final Cell largeCell1 = createCell(qf2, timestamp, seqId, largeValue);
2486    final Cell smallCell2 = createCell(qf3, timestamp, seqId + 1, smallValue);
2487    final Cell largeCell2 = createCell(qf4, timestamp, seqId + 1, largeValue);
2488    final Cell smallCell3 = createCell(qf5, timestamp, seqId + 2, smallValue);
2489    final Cell largeCell3 = createCell(qf6, timestamp, seqId + 2, largeValue);
2490
2491    int smallCellByteSize = MutableSegment.getCellLength(smallCell1);
2492    int largeCellByteSize = MutableSegment.getCellLength(largeCell1);
2493    int firstWriteCellByteSize = (smallCellByteSize + largeCellByteSize);
2494    int flushByteSize = firstWriteCellByteSize - 2;
2495
2496    // set CompactingMemStore.inmemoryFlushSize to flushByteSize.
2497    conf.set(HStore.MEMSTORE_CLASS_NAME, CompactingMemStore.class.getName());
2498    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
2499    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
2500    conf.setBoolean(WALFactory.WAL_ENABLED, false);
2501
2502    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
2503      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
2504
2505    final CompactingMemStore myCompactingMemStore = ((CompactingMemStore) store.memstore);
2506    assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
2507    myCompactingMemStore.allowCompaction.set(false);
2508
2509    NonThreadSafeMemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
2510    store.add(smallCell1, memStoreSizing);
2511    store.add(largeCell1, memStoreSizing);
2512    store.add(smallCell2, memStoreSizing);
2513    store.add(largeCell2, memStoreSizing);
2514    store.add(smallCell3, memStoreSizing);
2515    store.add(largeCell3, memStoreSizing);
2516    VersionedSegmentsList versionedSegmentsList = myCompactingMemStore.getImmutableSegments();
2517    assertTrue(versionedSegmentsList.getNumOfSegments() == 3);
2518    List<ImmutableSegment> segments = versionedSegmentsList.getStoreSegments();
2519    List<MemStoreLABImpl> memStoreLABs = new ArrayList<MemStoreLABImpl>(segments.size());
2520    for (ImmutableSegment segment : segments) {
2521      memStoreLABs.add((MemStoreLABImpl) segment.getMemStoreLAB());
2522    }
2523    List<KeyValueScanner> scanners1 = myCompactingMemStore.getScanners(Long.MAX_VALUE);
2524    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2525      assertTrue(memStoreLAB.getRefCntValue() == 2);
2526    }
2527
2528    myCompactingMemStore.allowCompaction.set(true);
2529    myCompactingMemStore.flushInMemory();
2530
2531    versionedSegmentsList = myCompactingMemStore.getImmutableSegments();
2532    assertTrue(versionedSegmentsList.getNumOfSegments() == 1);
2533    ImmutableMemStoreLAB immutableMemStoreLAB =
2534      (ImmutableMemStoreLAB) (versionedSegmentsList.getStoreSegments().get(0).getMemStoreLAB());
2535    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2536      assertTrue(memStoreLAB.getRefCntValue() == 2);
2537    }
2538
2539    List<KeyValueScanner> scanners2 = myCompactingMemStore.getScanners(Long.MAX_VALUE);
2540    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2541      assertTrue(memStoreLAB.getRefCntValue() == 2);
2542    }
2543    assertTrue(immutableMemStoreLAB.getRefCntValue() == 2);
2544    for (KeyValueScanner scanner : scanners1) {
2545      scanner.close();
2546    }
2547    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2548      assertTrue(memStoreLAB.getRefCntValue() == 1);
2549    }
2550    for (KeyValueScanner scanner : scanners2) {
2551      scanner.close();
2552    }
2553    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2554      assertTrue(memStoreLAB.getRefCntValue() == 1);
2555    }
2556    assertTrue(immutableMemStoreLAB.getRefCntValue() == 1);
2557    flushStore(store, id++);
2558    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2559      assertTrue(memStoreLAB.getRefCntValue() == 0);
2560    }
2561    assertTrue(immutableMemStoreLAB.getRefCntValue() == 0);
2562    assertTrue(immutableMemStoreLAB.isClosed());
2563    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2564      assertTrue(memStoreLAB.isClosed());
2565      assertTrue(memStoreLAB.isReclaimed());
2566      assertTrue(memStoreLAB.chunks.isEmpty());
2567    }
2568  }
2569
2570  private HStoreFile mockStoreFileWithLength(long length) {
2571    HStoreFile sf = mock(HStoreFile.class);
2572    StoreFileReader sfr = mock(StoreFileReader.class);
2573    when(sf.isHFile()).thenReturn(true);
2574    when(sf.getReader()).thenReturn(sfr);
2575    when(sfr.length()).thenReturn(length);
2576    return sf;
2577  }
2578
2579  private static class MyThread extends Thread {
2580    private StoreScanner scanner;
2581    private KeyValueHeap heap;
2582
2583    public MyThread(StoreScanner scanner) {
2584      this.scanner = scanner;
2585    }
2586
2587    public KeyValueHeap getHeap() {
2588      return this.heap;
2589    }
2590
2591    @Override
2592    public void run() {
2593      scanner.trySwitchToStreamRead();
2594      heap = scanner.heap;
2595    }
2596  }
2597
2598  private static class MyMemStoreCompactor extends MemStoreCompactor {
2599    private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0);
2600    private static final CountDownLatch START_COMPACTOR_LATCH = new CountDownLatch(1);
2601
2602    public MyMemStoreCompactor(CompactingMemStore compactingMemStore,
2603      MemoryCompactionPolicy compactionPolicy) throws IllegalArgumentIOException {
2604      super(compactingMemStore, compactionPolicy);
2605    }
2606
2607    @Override
2608    public boolean start() throws IOException {
2609      boolean isFirst = RUNNER_COUNT.getAndIncrement() == 0;
2610      if (isFirst) {
2611        try {
2612          START_COMPACTOR_LATCH.await();
2613          return super.start();
2614        } catch (InterruptedException ex) {
2615          throw new RuntimeException(ex);
2616        }
2617      }
2618      return super.start();
2619    }
2620  }
2621
2622  public static class MyCompactingMemStoreWithCustomCompactor extends CompactingMemStore {
2623    private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0);
2624
2625    public MyCompactingMemStoreWithCustomCompactor(Configuration conf, CellComparatorImpl c,
2626      HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
2627      throws IOException {
2628      super(conf, c, store, regionServices, compactionPolicy);
2629    }
2630
2631    @Override
2632    protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy)
2633      throws IllegalArgumentIOException {
2634      return new MyMemStoreCompactor(this, compactionPolicy);
2635    }
2636
2637    @Override
2638    protected boolean setInMemoryCompactionFlag() {
2639      boolean rval = super.setInMemoryCompactionFlag();
2640      if (rval) {
2641        RUNNER_COUNT.incrementAndGet();
2642        if (LOG.isDebugEnabled()) {
2643          LOG.debug("runner count: " + RUNNER_COUNT.get());
2644        }
2645      }
2646      return rval;
2647    }
2648  }
2649
2650  public static class MyCompactingMemStore extends CompactingMemStore {
2651    private static final AtomicBoolean START_TEST = new AtomicBoolean(false);
2652    private final CountDownLatch getScannerLatch = new CountDownLatch(1);
2653    private final CountDownLatch snapshotLatch = new CountDownLatch(1);
2654
2655    public MyCompactingMemStore(Configuration conf, CellComparatorImpl c, HStore store,
2656      RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
2657      throws IOException {
2658      super(conf, c, store, regionServices, compactionPolicy);
2659    }
2660
2661    @Override
2662    protected List<KeyValueScanner> createList(int capacity) {
2663      if (START_TEST.get()) {
2664        try {
2665          getScannerLatch.countDown();
2666          snapshotLatch.await();
2667        } catch (InterruptedException e) {
2668          throw new RuntimeException(e);
2669        }
2670      }
2671      return new ArrayList<>(capacity);
2672    }
2673
2674    @Override
2675    protected void pushActiveToPipeline(MutableSegment active, boolean checkEmpty) {
2676      if (START_TEST.get()) {
2677        try {
2678          getScannerLatch.await();
2679        } catch (InterruptedException e) {
2680          throw new RuntimeException(e);
2681        }
2682      }
2683
2684      super.pushActiveToPipeline(active, checkEmpty);
2685      if (START_TEST.get()) {
2686        snapshotLatch.countDown();
2687      }
2688    }
2689  }
2690
2691  interface MyListHook {
2692    void hook(int currentSize);
2693  }
2694
2695  private static class MyList<T> implements List<T> {
2696    private final List<T> delegatee = new ArrayList<>();
2697    private final MyListHook hookAtAdd;
2698
2699    MyList(final MyListHook hookAtAdd) {
2700      this.hookAtAdd = hookAtAdd;
2701    }
2702
2703    @Override
2704    public int size() {
2705      return delegatee.size();
2706    }
2707
2708    @Override
2709    public boolean isEmpty() {
2710      return delegatee.isEmpty();
2711    }
2712
2713    @Override
2714    public boolean contains(Object o) {
2715      return delegatee.contains(o);
2716    }
2717
2718    @Override
2719    public Iterator<T> iterator() {
2720      return delegatee.iterator();
2721    }
2722
2723    @Override
2724    public Object[] toArray() {
2725      return delegatee.toArray();
2726    }
2727
2728    @Override
2729    public <R> R[] toArray(R[] a) {
2730      return delegatee.toArray(a);
2731    }
2732
2733    @Override
2734    public boolean add(T e) {
2735      hookAtAdd.hook(size());
2736      return delegatee.add(e);
2737    }
2738
2739    @Override
2740    public boolean remove(Object o) {
2741      return delegatee.remove(o);
2742    }
2743
2744    @Override
2745    public boolean containsAll(Collection<?> c) {
2746      return delegatee.containsAll(c);
2747    }
2748
2749    @Override
2750    public boolean addAll(Collection<? extends T> c) {
2751      return delegatee.addAll(c);
2752    }
2753
2754    @Override
2755    public boolean addAll(int index, Collection<? extends T> c) {
2756      return delegatee.addAll(index, c);
2757    }
2758
2759    @Override
2760    public boolean removeAll(Collection<?> c) {
2761      return delegatee.removeAll(c);
2762    }
2763
2764    @Override
2765    public boolean retainAll(Collection<?> c) {
2766      return delegatee.retainAll(c);
2767    }
2768
2769    @Override
2770    public void clear() {
2771      delegatee.clear();
2772    }
2773
2774    @Override
2775    public T get(int index) {
2776      return delegatee.get(index);
2777    }
2778
2779    @Override
2780    public T set(int index, T element) {
2781      return delegatee.set(index, element);
2782    }
2783
2784    @Override
2785    public void add(int index, T element) {
2786      delegatee.add(index, element);
2787    }
2788
2789    @Override
2790    public T remove(int index) {
2791      return delegatee.remove(index);
2792    }
2793
2794    @Override
2795    public int indexOf(Object o) {
2796      return delegatee.indexOf(o);
2797    }
2798
2799    @Override
2800    public int lastIndexOf(Object o) {
2801      return delegatee.lastIndexOf(o);
2802    }
2803
2804    @Override
2805    public ListIterator<T> listIterator() {
2806      return delegatee.listIterator();
2807    }
2808
2809    @Override
2810    public ListIterator<T> listIterator(int index) {
2811      return delegatee.listIterator(index);
2812    }
2813
2814    @Override
2815    public List<T> subList(int fromIndex, int toIndex) {
2816      return delegatee.subList(fromIndex, toIndex);
2817    }
2818  }
2819
2820  public static class MyCompactingMemStore2 extends CompactingMemStore {
2821    private static final String LARGE_CELL_THREAD_NAME = "largeCellThread";
2822    private static final String SMALL_CELL_THREAD_NAME = "smallCellThread";
2823    private final CyclicBarrier preCyclicBarrier = new CyclicBarrier(2);
2824    private final CyclicBarrier postCyclicBarrier = new CyclicBarrier(2);
2825    private final AtomicInteger largeCellPreUpdateCounter = new AtomicInteger(0);
2826    private final AtomicInteger smallCellPreUpdateCounter = new AtomicInteger(0);
2827
2828    public MyCompactingMemStore2(Configuration conf, CellComparatorImpl cellComparator,
2829      HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
2830      throws IOException {
2831      super(conf, cellComparator, store, regionServices, compactionPolicy);
2832    }
2833
2834    @Override
2835    protected boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd,
2836      MemStoreSizing memstoreSizing) {
2837      if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) {
2838        int currentCount = largeCellPreUpdateCounter.incrementAndGet();
2839        if (currentCount <= 1) {
2840          try {
2841            /**
2842             * smallCellThread enters CompactingMemStore.checkAndAddToActiveSize first, then
2843             * largeCellThread enters CompactingMemStore.checkAndAddToActiveSize, and then
2844             * largeCellThread invokes flushInMemory.
2845             */
2846            preCyclicBarrier.await();
2847          } catch (Throwable e) {
2848            throw new RuntimeException(e);
2849          }
2850        }
2851      }
2852
2853      boolean returnValue = super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing);
2854      if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) {
2855        try {
2856          preCyclicBarrier.await();
2857        } catch (Throwable e) {
2858          throw new RuntimeException(e);
2859        }
2860      }
2861      return returnValue;
2862    }
2863
2864    @Override
2865    protected void doAdd(MutableSegment currentActive, Cell cell, MemStoreSizing memstoreSizing) {
2866      if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) {
2867        try {
2868          /**
2869           * After largeCellThread finished flushInMemory method, smallCellThread can add cell to
2870           * currentActive . That is to say when largeCellThread called flushInMemory method,
2871           * currentActive has no cell.
2872           */
2873          postCyclicBarrier.await();
2874        } catch (Throwable e) {
2875          throw new RuntimeException(e);
2876        }
2877      }
2878      super.doAdd(currentActive, cell, memstoreSizing);
2879    }
2880
2881    @Override
2882    protected void flushInMemory(MutableSegment currentActiveMutableSegment) {
2883      super.flushInMemory(currentActiveMutableSegment);
2884      if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) {
2885        if (largeCellPreUpdateCounter.get() <= 1) {
2886          try {
2887            postCyclicBarrier.await();
2888          } catch (Throwable e) {
2889            throw new RuntimeException(e);
2890          }
2891        }
2892      }
2893    }
2894
2895  }
2896
2897  public static class MyCompactingMemStore3 extends CompactingMemStore {
2898    private static final String LARGE_CELL_THREAD_NAME = "largeCellThread";
2899    private static final String SMALL_CELL_THREAD_NAME = "smallCellThread";
2900
2901    private final CyclicBarrier preCyclicBarrier = new CyclicBarrier(2);
2902    private final CyclicBarrier postCyclicBarrier = new CyclicBarrier(2);
2903    private final AtomicInteger flushCounter = new AtomicInteger(0);
2904    private static final int CELL_COUNT = 5;
2905    private boolean flushByteSizeLessThanSmallAndLargeCellSize = true;
2906
2907    public MyCompactingMemStore3(Configuration conf, CellComparatorImpl cellComparator,
2908      HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
2909      throws IOException {
2910      super(conf, cellComparator, store, regionServices, compactionPolicy);
2911    }
2912
2913    @Override
2914    protected boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd,
2915      MemStoreSizing memstoreSizing) {
2916      if (!flushByteSizeLessThanSmallAndLargeCellSize) {
2917        return super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing);
2918      }
2919      if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) {
2920        try {
2921          preCyclicBarrier.await();
2922        } catch (Throwable e) {
2923          throw new RuntimeException(e);
2924        }
2925      }
2926
2927      boolean returnValue = super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing);
2928      if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) {
2929        try {
2930          preCyclicBarrier.await();
2931        } catch (Throwable e) {
2932          throw new RuntimeException(e);
2933        }
2934      }
2935      return returnValue;
2936    }
2937
2938    @Override
2939    protected void postUpdate(MutableSegment currentActiveMutableSegment) {
2940      super.postUpdate(currentActiveMutableSegment);
2941      if (!flushByteSizeLessThanSmallAndLargeCellSize) {
2942        try {
2943          postCyclicBarrier.await();
2944        } catch (Throwable e) {
2945          throw new RuntimeException(e);
2946        }
2947        return;
2948      }
2949
2950      if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) {
2951        try {
2952          postCyclicBarrier.await();
2953        } catch (Throwable e) {
2954          throw new RuntimeException(e);
2955        }
2956      }
2957    }
2958
2959    @Override
2960    protected void flushInMemory(MutableSegment currentActiveMutableSegment) {
2961      super.flushInMemory(currentActiveMutableSegment);
2962      flushCounter.incrementAndGet();
2963      if (!flushByteSizeLessThanSmallAndLargeCellSize) {
2964        return;
2965      }
2966
2967      assertTrue(Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME));
2968      try {
2969        postCyclicBarrier.await();
2970      } catch (Throwable e) {
2971        throw new RuntimeException(e);
2972      }
2973
2974    }
2975
2976    void disableCompaction() {
2977      allowCompaction.set(false);
2978    }
2979
2980    void enableCompaction() {
2981      allowCompaction.set(true);
2982    }
2983
2984  }
2985
2986  public static class MyCompactingMemStore4 extends CompactingMemStore {
2987    private static final String TAKE_SNAPSHOT_THREAD_NAME = "takeSnapShotThread";
2988    /**
2989     * {@link CompactingMemStore#flattenOneSegment} must execute after
2990     * {@link CompactingMemStore#getImmutableSegments}
2991     */
2992    private final CyclicBarrier flattenOneSegmentPreCyclicBarrier = new CyclicBarrier(2);
2993    /**
2994     * Only after {@link CompactingMemStore#flattenOneSegment} completed,
2995     * {@link CompactingMemStore#swapPipelineWithNull} could execute.
2996     */
2997    private final CyclicBarrier flattenOneSegmentPostCyclicBarrier = new CyclicBarrier(2);
2998    /**
2999     * Only the in memory compact thread enters {@link CompactingMemStore#flattenOneSegment},the
3000     * snapshot thread starts {@link CompactingMemStore#snapshot},because
3001     * {@link CompactingMemStore#snapshot} would invoke {@link CompactingMemStore#stopCompaction}.
3002     */
3003    private final CyclicBarrier snapShotStartCyclicCyclicBarrier = new CyclicBarrier(2);
3004    /**
3005     * To wait for {@link CompactingMemStore.InMemoryCompactionRunnable} stopping.
3006     */
3007    private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2);
3008    private final AtomicInteger getImmutableSegmentsListCounter = new AtomicInteger(0);
3009    private final AtomicInteger swapPipelineWithNullCounter = new AtomicInteger(0);
3010    private final AtomicInteger flattenOneSegmentCounter = new AtomicInteger(0);
3011    private final AtomicInteger setInMemoryCompactionFlagCounter = new AtomicInteger(0);
3012
3013    public MyCompactingMemStore4(Configuration conf, CellComparatorImpl cellComparator,
3014      HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
3015      throws IOException {
3016      super(conf, cellComparator, store, regionServices, compactionPolicy);
3017    }
3018
3019    @Override
3020    public VersionedSegmentsList getImmutableSegments() {
3021      VersionedSegmentsList result = super.getImmutableSegments();
3022      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
3023        int currentCount = getImmutableSegmentsListCounter.incrementAndGet();
3024        if (currentCount <= 1) {
3025          try {
3026            flattenOneSegmentPreCyclicBarrier.await();
3027          } catch (Throwable e) {
3028            throw new RuntimeException(e);
3029          }
3030        }
3031      }
3032      return result;
3033    }
3034
3035    @Override
3036    protected boolean swapPipelineWithNull(VersionedSegmentsList segments) {
3037      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
3038        int currentCount = swapPipelineWithNullCounter.incrementAndGet();
3039        if (currentCount <= 1) {
3040          try {
3041            flattenOneSegmentPostCyclicBarrier.await();
3042          } catch (Throwable e) {
3043            throw new RuntimeException(e);
3044          }
3045        }
3046      }
3047      boolean result = super.swapPipelineWithNull(segments);
3048      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
3049        int currentCount = swapPipelineWithNullCounter.get();
3050        if (currentCount <= 1) {
3051          assertTrue(!result);
3052        }
3053        if (currentCount == 2) {
3054          assertTrue(result);
3055        }
3056      }
3057      return result;
3058
3059    }
3060
3061    @Override
3062    public void flattenOneSegment(long requesterVersion, Action action) {
3063      int currentCount = flattenOneSegmentCounter.incrementAndGet();
3064      if (currentCount <= 1) {
3065        try {
3066          /**
3067           * {@link CompactingMemStore#snapshot} could start.
3068           */
3069          snapShotStartCyclicCyclicBarrier.await();
3070          flattenOneSegmentPreCyclicBarrier.await();
3071        } catch (Throwable e) {
3072          throw new RuntimeException(e);
3073        }
3074      }
3075      super.flattenOneSegment(requesterVersion, action);
3076      if (currentCount <= 1) {
3077        try {
3078          flattenOneSegmentPostCyclicBarrier.await();
3079        } catch (Throwable e) {
3080          throw new RuntimeException(e);
3081        }
3082      }
3083    }
3084
3085    @Override
3086    protected boolean setInMemoryCompactionFlag() {
3087      boolean result = super.setInMemoryCompactionFlag();
3088      assertTrue(result);
3089      setInMemoryCompactionFlagCounter.incrementAndGet();
3090      return result;
3091    }
3092
3093    @Override
3094    void inMemoryCompaction() {
3095      try {
3096        super.inMemoryCompaction();
3097      } finally {
3098        try {
3099          inMemoryCompactionEndCyclicBarrier.await();
3100        } catch (Throwable e) {
3101          throw new RuntimeException(e);
3102        }
3103
3104      }
3105    }
3106
3107  }
3108
3109  public static class MyCompactingMemStore5 extends CompactingMemStore {
3110    private static final String TAKE_SNAPSHOT_THREAD_NAME = "takeSnapShotThread";
3111    private static final String WRITE_AGAIN_THREAD_NAME = "writeAgainThread";
3112    /**
3113     * {@link CompactingMemStore#flattenOneSegment} must execute after
3114     * {@link CompactingMemStore#getImmutableSegments}
3115     */
3116    private final CyclicBarrier flattenOneSegmentPreCyclicBarrier = new CyclicBarrier(2);
3117    /**
3118     * Only after {@link CompactingMemStore#flattenOneSegment} completed,
3119     * {@link CompactingMemStore#swapPipelineWithNull} could execute.
3120     */
3121    private final CyclicBarrier flattenOneSegmentPostCyclicBarrier = new CyclicBarrier(2);
3122    /**
3123     * Only the in memory compact thread enters {@link CompactingMemStore#flattenOneSegment},the
3124     * snapshot thread starts {@link CompactingMemStore#snapshot},because
3125     * {@link CompactingMemStore#snapshot} would invoke {@link CompactingMemStore#stopCompaction}.
3126     */
3127    private final CyclicBarrier snapShotStartCyclicCyclicBarrier = new CyclicBarrier(2);
3128    /**
3129     * To wait for {@link CompactingMemStore.InMemoryCompactionRunnable} stopping.
3130     */
3131    private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2);
3132    private final AtomicInteger getImmutableSegmentsListCounter = new AtomicInteger(0);
3133    private final AtomicInteger swapPipelineWithNullCounter = new AtomicInteger(0);
3134    private final AtomicInteger flattenOneSegmentCounter = new AtomicInteger(0);
3135    private final AtomicInteger setInMemoryCompactionFlagCounter = new AtomicInteger(0);
3136    /**
3137     * Only the snapshot thread retry {@link CompactingMemStore#swapPipelineWithNull}, writeAgain
3138     * thread could start.
3139     */
3140    private final CyclicBarrier writeMemStoreAgainStartCyclicBarrier = new CyclicBarrier(2);
3141    /**
3142     * This is used for snapshot thread,writeAgain thread and in memory compact thread. Only the
3143     * writeAgain thread completes, {@link CompactingMemStore#swapPipelineWithNull} would
3144     * execute,and in memory compact thread would exit,because we expect that in memory compact
3145     * executing only once.
3146     */
3147    private final CyclicBarrier writeMemStoreAgainEndCyclicBarrier = new CyclicBarrier(3);
3148
3149    public MyCompactingMemStore5(Configuration conf, CellComparatorImpl cellComparator,
3150      HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
3151      throws IOException {
3152      super(conf, cellComparator, store, regionServices, compactionPolicy);
3153    }
3154
3155    @Override
3156    public VersionedSegmentsList getImmutableSegments() {
3157      VersionedSegmentsList result = super.getImmutableSegments();
3158      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
3159        int currentCount = getImmutableSegmentsListCounter.incrementAndGet();
3160        if (currentCount <= 1) {
3161          try {
3162            flattenOneSegmentPreCyclicBarrier.await();
3163          } catch (Throwable e) {
3164            throw new RuntimeException(e);
3165          }
3166        }
3167
3168      }
3169
3170      return result;
3171    }
3172
3173    @Override
3174    protected boolean swapPipelineWithNull(VersionedSegmentsList segments) {
3175      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
3176        int currentCount = swapPipelineWithNullCounter.incrementAndGet();
3177        if (currentCount <= 1) {
3178          try {
3179            flattenOneSegmentPostCyclicBarrier.await();
3180          } catch (Throwable e) {
3181            throw new RuntimeException(e);
3182          }
3183        }
3184
3185        if (currentCount == 2) {
3186          try {
3187            /**
3188             * Only the snapshot thread retry {@link CompactingMemStore#swapPipelineWithNull},
3189             * writeAgain thread could start.
3190             */
3191            writeMemStoreAgainStartCyclicBarrier.await();
3192            /**
3193             * Only the writeAgain thread completes, retry
3194             * {@link CompactingMemStore#swapPipelineWithNull} would execute.
3195             */
3196            writeMemStoreAgainEndCyclicBarrier.await();
3197          } catch (Throwable e) {
3198            throw new RuntimeException(e);
3199          }
3200        }
3201
3202      }
3203      boolean result = super.swapPipelineWithNull(segments);
3204      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
3205        int currentCount = swapPipelineWithNullCounter.get();
3206        if (currentCount <= 1) {
3207          assertTrue(!result);
3208        }
3209        if (currentCount == 2) {
3210          assertTrue(result);
3211        }
3212      }
3213      return result;
3214
3215    }
3216
3217    @Override
3218    public void flattenOneSegment(long requesterVersion, Action action) {
3219      int currentCount = flattenOneSegmentCounter.incrementAndGet();
3220      if (currentCount <= 1) {
3221        try {
3222          /**
3223           * {@link CompactingMemStore#snapshot} could start.
3224           */
3225          snapShotStartCyclicCyclicBarrier.await();
3226          flattenOneSegmentPreCyclicBarrier.await();
3227        } catch (Throwable e) {
3228          throw new RuntimeException(e);
3229        }
3230      }
3231      super.flattenOneSegment(requesterVersion, action);
3232      if (currentCount <= 1) {
3233        try {
3234          flattenOneSegmentPostCyclicBarrier.await();
3235          /**
3236           * Only the writeAgain thread completes, in memory compact thread would exit,because we
3237           * expect that in memory compact executing only once.
3238           */
3239          writeMemStoreAgainEndCyclicBarrier.await();
3240        } catch (Throwable e) {
3241          throw new RuntimeException(e);
3242        }
3243
3244      }
3245    }
3246
3247    @Override
3248    protected boolean setInMemoryCompactionFlag() {
3249      boolean result = super.setInMemoryCompactionFlag();
3250      int count = setInMemoryCompactionFlagCounter.incrementAndGet();
3251      if (count <= 1) {
3252        assertTrue(result);
3253      }
3254      if (count == 2) {
3255        assertTrue(!result);
3256      }
3257      return result;
3258    }
3259
3260    @Override
3261    void inMemoryCompaction() {
3262      try {
3263        super.inMemoryCompaction();
3264      } finally {
3265        try {
3266          inMemoryCompactionEndCyclicBarrier.await();
3267        } catch (Throwable e) {
3268          throw new RuntimeException(e);
3269        }
3270
3271      }
3272    }
3273  }
3274
3275  public static class MyCompactingMemStore6 extends CompactingMemStore {
3276    private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2);
3277
3278    public MyCompactingMemStore6(Configuration conf, CellComparatorImpl cellComparator,
3279      HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
3280      throws IOException {
3281      super(conf, cellComparator, store, regionServices, compactionPolicy);
3282    }
3283
3284    @Override
3285    void inMemoryCompaction() {
3286      try {
3287        super.inMemoryCompaction();
3288      } finally {
3289        try {
3290          inMemoryCompactionEndCyclicBarrier.await();
3291        } catch (Throwable e) {
3292          throw new RuntimeException(e);
3293        }
3294
3295      }
3296    }
3297  }
3298
3299  public static class MyDefaultMemStore extends DefaultMemStore {
3300    private static final String GET_SCANNER_THREAD_NAME = "getScannerMyThread";
3301    private static final String FLUSH_THREAD_NAME = "flushMyThread";
3302    /**
3303     * Only when flush thread enters {@link DefaultMemStore#doClearSnapShot}, getScanner thread
3304     * could start.
3305     */
3306    private final CyclicBarrier getScannerCyclicBarrier = new CyclicBarrier(2);
3307    /**
3308     * Used by getScanner thread notifies flush thread {@link DefaultMemStore#getSnapshotSegments}
3309     * completed, {@link DefaultMemStore#doClearSnapShot} could continue.
3310     */
3311    private final CyclicBarrier preClearSnapShotCyclicBarrier = new CyclicBarrier(2);
3312    /**
3313     * Used by flush thread notifies getScanner thread {@link DefaultMemStore#doClearSnapShot}
3314     * completed, {@link DefaultMemStore#getScanners} could continue.
3315     */
3316    private final CyclicBarrier postClearSnapShotCyclicBarrier = new CyclicBarrier(2);
3317    private final AtomicInteger getSnapshotSegmentsCounter = new AtomicInteger(0);
3318    private final AtomicInteger clearSnapshotCounter = new AtomicInteger(0);
3319    private volatile boolean shouldWait = true;
3320    private volatile HStore store = null;
3321
3322    public MyDefaultMemStore(Configuration conf, CellComparator cellComparator,
3323      RegionServicesForStores regionServices) throws IOException {
3324      super(conf, cellComparator, regionServices);
3325    }
3326
3327    @Override
3328    protected List<Segment> getSnapshotSegments() {
3329
3330      List<Segment> result = super.getSnapshotSegments();
3331
3332      if (Thread.currentThread().getName().equals(GET_SCANNER_THREAD_NAME)) {
3333        int currentCount = getSnapshotSegmentsCounter.incrementAndGet();
3334        if (currentCount == 1) {
3335          if (this.shouldWait) {
3336            try {
3337              /**
3338               * Notify flush thread {@link DefaultMemStore#getSnapshotSegments} completed,
3339               * {@link DefaultMemStore#doClearSnapShot} could continue.
3340               */
3341              preClearSnapShotCyclicBarrier.await();
3342              /**
3343               * Wait for {@link DefaultMemStore#doClearSnapShot} completed.
3344               */
3345              postClearSnapShotCyclicBarrier.await();
3346
3347            } catch (Throwable e) {
3348              throw new RuntimeException(e);
3349            }
3350          }
3351        }
3352      }
3353      return result;
3354    }
3355
3356    @Override
3357    protected void doClearSnapShot() {
3358      if (Thread.currentThread().getName().equals(FLUSH_THREAD_NAME)) {
3359        int currentCount = clearSnapshotCounter.incrementAndGet();
3360        if (currentCount == 1) {
3361          try {
3362            if (
3363              ((ReentrantReadWriteLock) store.getStoreEngine().getLock())
3364                .isWriteLockedByCurrentThread()
3365            ) {
3366              shouldWait = false;
3367            }
3368            /**
3369             * Only when flush thread enters {@link DefaultMemStore#doClearSnapShot}, getScanner
3370             * thread could start.
3371             */
3372            getScannerCyclicBarrier.await();
3373
3374            if (shouldWait) {
3375              /**
3376               * Wait for {@link DefaultMemStore#getSnapshotSegments} completed.
3377               */
3378              preClearSnapShotCyclicBarrier.await();
3379            }
3380          } catch (Throwable e) {
3381            throw new RuntimeException(e);
3382          }
3383        }
3384      }
3385      super.doClearSnapShot();
3386
3387      if (Thread.currentThread().getName().equals(FLUSH_THREAD_NAME)) {
3388        int currentCount = clearSnapshotCounter.get();
3389        if (currentCount == 1) {
3390          if (shouldWait) {
3391            try {
3392              /**
3393               * Notify getScanner thread {@link DefaultMemStore#doClearSnapShot} completed,
3394               * {@link DefaultMemStore#getScanners} could continue.
3395               */
3396              postClearSnapShotCyclicBarrier.await();
3397            } catch (Throwable e) {
3398              throw new RuntimeException(e);
3399            }
3400          }
3401        }
3402      }
3403    }
3404  }
3405}