001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025import static org.mockito.ArgumentMatchers.any;
026import static org.mockito.Mockito.mock;
027import static org.mockito.Mockito.spy;
028import static org.mockito.Mockito.times;
029import static org.mockito.Mockito.verify;
030import static org.mockito.Mockito.when;
031
032import java.io.IOException;
033import java.lang.ref.SoftReference;
034import java.security.PrivilegedExceptionAction;
035import java.util.ArrayList;
036import java.util.Arrays;
037import java.util.Collection;
038import java.util.Collections;
039import java.util.Iterator;
040import java.util.List;
041import java.util.ListIterator;
042import java.util.NavigableSet;
043import java.util.TreeSet;
044import java.util.concurrent.ConcurrentSkipListSet;
045import java.util.concurrent.CountDownLatch;
046import java.util.concurrent.ExecutorService;
047import java.util.concurrent.Executors;
048import java.util.concurrent.ThreadPoolExecutor;
049import java.util.concurrent.TimeUnit;
050import java.util.concurrent.atomic.AtomicBoolean;
051import java.util.concurrent.atomic.AtomicInteger;
052import org.apache.hadoop.conf.Configuration;
053import org.apache.hadoop.fs.FSDataOutputStream;
054import org.apache.hadoop.fs.FileStatus;
055import org.apache.hadoop.fs.FileSystem;
056import org.apache.hadoop.fs.FilterFileSystem;
057import org.apache.hadoop.fs.LocalFileSystem;
058import org.apache.hadoop.fs.Path;
059import org.apache.hadoop.fs.permission.FsPermission;
060import org.apache.hadoop.hbase.Cell;
061import org.apache.hadoop.hbase.CellBuilderFactory;
062import org.apache.hadoop.hbase.CellBuilderType;
063import org.apache.hadoop.hbase.CellComparator;
064import org.apache.hadoop.hbase.CellComparatorImpl;
065import org.apache.hadoop.hbase.CellUtil;
066import org.apache.hadoop.hbase.HBaseClassTestRule;
067import org.apache.hadoop.hbase.HBaseConfiguration;
068import org.apache.hadoop.hbase.HBaseTestingUtility;
069import org.apache.hadoop.hbase.HConstants;
070import org.apache.hadoop.hbase.KeyValue;
071import org.apache.hadoop.hbase.MemoryCompactionPolicy;
072import org.apache.hadoop.hbase.PrivateCellUtil;
073import org.apache.hadoop.hbase.TableName;
074import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
075import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
076import org.apache.hadoop.hbase.client.Get;
077import org.apache.hadoop.hbase.client.RegionInfo;
078import org.apache.hadoop.hbase.client.RegionInfoBuilder;
079import org.apache.hadoop.hbase.client.Scan;
080import org.apache.hadoop.hbase.client.TableDescriptor;
081import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
082import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
083import org.apache.hadoop.hbase.filter.Filter;
084import org.apache.hadoop.hbase.filter.FilterBase;
085import org.apache.hadoop.hbase.io.compress.Compression;
086import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
087import org.apache.hadoop.hbase.io.hfile.CacheConfig;
088import org.apache.hadoop.hbase.io.hfile.HFile;
089import org.apache.hadoop.hbase.io.hfile.HFileContext;
090import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
091import org.apache.hadoop.hbase.monitoring.MonitoredTask;
092import org.apache.hadoop.hbase.quotas.RegionSizeStoreImpl;
093import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
094import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
095import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
096import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
097import org.apache.hadoop.hbase.security.User;
098import org.apache.hadoop.hbase.testclassification.MediumTests;
099import org.apache.hadoop.hbase.testclassification.RegionServerTests;
100import org.apache.hadoop.hbase.util.Bytes;
101import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
102import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
103import org.apache.hadoop.hbase.util.FSUtils;
104import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
105import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
106import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
107import org.apache.hadoop.hbase.wal.WALFactory;
108import org.apache.hadoop.util.Progressable;
109import org.junit.After;
110import org.junit.AfterClass;
111import org.junit.Before;
112import org.junit.ClassRule;
113import org.junit.Rule;
114import org.junit.Test;
115import org.junit.experimental.categories.Category;
116import org.junit.rules.TestName;
117import org.mockito.Mockito;
118import org.slf4j.Logger;
119import org.slf4j.LoggerFactory;
120
121import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
122
123/**
124 * Test class for the HStore
125 */
126@Category({ RegionServerTests.class, MediumTests.class })
127public class TestHStore {
128
129  @ClassRule
130  public static final HBaseClassTestRule CLASS_RULE =
131      HBaseClassTestRule.forClass(TestHStore.class);
132
133  private static final Logger LOG = LoggerFactory.getLogger(TestHStore.class);
134  @Rule
135  public TestName name = new TestName();
136
137  HRegion region;
138  HStore store;
139  byte [] table = Bytes.toBytes("table");
140  byte [] family = Bytes.toBytes("family");
141
142  byte [] row = Bytes.toBytes("row");
143  byte [] row2 = Bytes.toBytes("row2");
144  byte [] qf1 = Bytes.toBytes("qf1");
145  byte [] qf2 = Bytes.toBytes("qf2");
146  byte [] qf3 = Bytes.toBytes("qf3");
147  byte [] qf4 = Bytes.toBytes("qf4");
148  byte [] qf5 = Bytes.toBytes("qf5");
149  byte [] qf6 = Bytes.toBytes("qf6");
150
151  NavigableSet<byte[]> qualifiers = new ConcurrentSkipListSet<>(Bytes.BYTES_COMPARATOR);
152
153  List<Cell> expected = new ArrayList<>();
154  List<Cell> result = new ArrayList<>();
155
156  long id = System.currentTimeMillis();
157  Get get = new Get(row);
158
159  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
160  private static final String DIR = TEST_UTIL.getDataTestDir("TestStore").toString();
161
162
163  /**
164   * Setup
165   * @throws IOException
166   */
167  @Before
168  public void setUp() throws IOException {
169    qualifiers.clear();
170    qualifiers.add(qf1);
171    qualifiers.add(qf3);
172    qualifiers.add(qf5);
173
174    Iterator<byte[]> iter = qualifiers.iterator();
175    while(iter.hasNext()){
176      byte [] next = iter.next();
177      expected.add(new KeyValue(row, family, next, 1, (byte[])null));
178      get.addColumn(family, next);
179    }
180  }
181
182  private void init(String methodName) throws IOException {
183    init(methodName, TEST_UTIL.getConfiguration());
184  }
185
186  private HStore init(String methodName, Configuration conf) throws IOException {
187    // some of the tests write 4 versions and then flush
188    // (with HBASE-4241, lower versions are collected on flush)
189    return init(methodName, conf,
190      ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(4).build());
191  }
192
193  private HStore init(String methodName, Configuration conf, ColumnFamilyDescriptor hcd)
194      throws IOException {
195    return init(methodName, conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), hcd);
196  }
197
198  private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder,
199      ColumnFamilyDescriptor hcd) throws IOException {
200    return init(methodName, conf, builder, hcd, null);
201  }
202
203  private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder,
204      ColumnFamilyDescriptor hcd, MyStoreHook hook) throws IOException {
205    return init(methodName, conf, builder, hcd, hook, false);
206  }
207
208  private void initHRegion(String methodName, Configuration conf, TableDescriptorBuilder builder,
209      ColumnFamilyDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException {
210    TableDescriptor htd = builder.setColumnFamily(hcd).build();
211    Path basedir = new Path(DIR + methodName);
212    Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName());
213    final Path logdir = new Path(basedir, AbstractFSWALProvider.getWALDirectoryName(methodName));
214
215    FileSystem fs = FileSystem.get(conf);
216
217    fs.delete(logdir, true);
218    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false,
219      MemStoreLABImpl.CHUNK_SIZE_DEFAULT, 1, 0, null);
220    RegionInfo info = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
221    Configuration walConf = new Configuration(conf);
222    FSUtils.setRootDir(walConf, basedir);
223    WALFactory wals = new WALFactory(walConf, methodName);
224    region = new HRegion(new HRegionFileSystem(conf, fs, tableDir, info), wals.getWAL(info), conf,
225        htd, null);
226    region.regionServicesForStores = Mockito.spy(region.regionServicesForStores);
227    ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
228    Mockito.when(region.regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool);
229  }
230
231  private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder,
232      ColumnFamilyDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException {
233    initHRegion(methodName, conf, builder, hcd, hook, switchToPread);
234    if (hook == null) {
235      store = new HStore(region, hcd, conf, false);
236    } else {
237      store = new MyStore(region, hcd, conf, hook, switchToPread);
238    }
239    return store;
240  }
241
242  /**
243   * Test we do not lose data if we fail a flush and then close.
244   * Part of HBase-10466
245   * @throws Exception
246   */
247  @Test
248  public void testFlushSizeSizing() throws Exception {
249    LOG.info("Setting up a faulty file system that cannot write in " + this.name.getMethodName());
250    final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
251    // Only retry once.
252    conf.setInt("hbase.hstore.flush.retries.number", 1);
253    User user = User.createUserForTesting(conf, this.name.getMethodName(),
254      new String[]{"foo"});
255    // Inject our faulty LocalFileSystem
256    conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
257    user.runAs(new PrivilegedExceptionAction<Object>() {
258      @Override
259      public Object run() throws Exception {
260        // Make sure it worked (above is sensitive to caching details in hadoop core)
261        FileSystem fs = FileSystem.get(conf);
262        assertEquals(FaultyFileSystem.class, fs.getClass());
263        FaultyFileSystem ffs = (FaultyFileSystem)fs;
264
265        // Initialize region
266        init(name.getMethodName(), conf);
267
268        MemStoreSize mss = store.memstore.getFlushableSize();
269        assertEquals(0, mss.getDataSize());
270        LOG.info("Adding some data");
271        MemStoreSizing kvSize = new NonThreadSafeMemStoreSizing();
272        store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), kvSize);
273        // add the heap size of active (mutable) segment
274        kvSize.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0, 0);
275        mss = store.memstore.getFlushableSize();
276        assertEquals(kvSize.getMemStoreSize(), mss);
277        // Flush.  Bug #1 from HBASE-10466.  Make sure size calculation on failed flush is right.
278        try {
279          LOG.info("Flushing");
280          flushStore(store, id++);
281          fail("Didn't bubble up IOE!");
282        } catch (IOException ioe) {
283          assertTrue(ioe.getMessage().contains("Fault injected"));
284        }
285        // due to snapshot, change mutable to immutable segment
286        kvSize.incMemStoreSize(0,
287          CSLMImmutableSegment.DEEP_OVERHEAD_CSLM - MutableSegment.DEEP_OVERHEAD, 0, 0);
288        mss = store.memstore.getFlushableSize();
289        assertEquals(kvSize.getMemStoreSize(), mss);
290        MemStoreSizing kvSize2 = new NonThreadSafeMemStoreSizing();
291        store.add(new KeyValue(row, family, qf2, 2, (byte[]) null), kvSize2);
292        kvSize2.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0, 0);
293        // Even though we add a new kv, we expect the flushable size to be 'same' since we have
294        // not yet cleared the snapshot -- the above flush failed.
295        assertEquals(kvSize.getMemStoreSize(), mss);
296        ffs.fault.set(false);
297        flushStore(store, id++);
298        mss = store.memstore.getFlushableSize();
299        // Size should be the foreground kv size.
300        assertEquals(kvSize2.getMemStoreSize(), mss);
301        flushStore(store, id++);
302        mss = store.memstore.getFlushableSize();
303        assertEquals(0, mss.getDataSize());
304        assertEquals(MutableSegment.DEEP_OVERHEAD, mss.getHeapSize());
305        return null;
306      }
307    });
308  }
309
310  /**
311   * Verify that compression and data block encoding are respected by the
312   * Store.createWriterInTmp() method, used on store flush.
313   */
314  @Test
315  public void testCreateWriter() throws Exception {
316    Configuration conf = HBaseConfiguration.create();
317    FileSystem fs = FileSystem.get(conf);
318
319    ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.newBuilder(family)
320        .setCompressionType(Compression.Algorithm.GZ).setDataBlockEncoding(DataBlockEncoding.DIFF)
321        .build();
322    init(name.getMethodName(), conf, hcd);
323
324    // Test createWriterInTmp()
325    StoreFileWriter writer =
326        store.createWriterInTmp(4, hcd.getCompressionType(), false, true, false, false);
327    Path path = writer.getPath();
328    writer.append(new KeyValue(row, family, qf1, Bytes.toBytes(1)));
329    writer.append(new KeyValue(row, family, qf2, Bytes.toBytes(2)));
330    writer.append(new KeyValue(row2, family, qf1, Bytes.toBytes(3)));
331    writer.append(new KeyValue(row2, family, qf2, Bytes.toBytes(4)));
332    writer.close();
333
334    // Verify that compression and encoding settings are respected
335    HFile.Reader reader = HFile.createReader(fs, path, new CacheConfig(conf), true, conf);
336    assertEquals(hcd.getCompressionType(), reader.getTrailer().getCompressionCodec());
337    assertEquals(hcd.getDataBlockEncoding(), reader.getDataBlockEncoding());
338    reader.close();
339  }
340
341  @Test
342  public void testDeleteExpiredStoreFiles() throws Exception {
343    testDeleteExpiredStoreFiles(0);
344    testDeleteExpiredStoreFiles(1);
345  }
346
347  /*
348   * @param minVersions the MIN_VERSIONS for the column family
349   */
350  public void testDeleteExpiredStoreFiles(int minVersions) throws Exception {
351    int storeFileNum = 4;
352    int ttl = 4;
353    IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
354    EnvironmentEdgeManagerTestHelper.injectEdge(edge);
355
356    Configuration conf = HBaseConfiguration.create();
357    // Enable the expired store file deletion
358    conf.setBoolean("hbase.store.delete.expired.storefile", true);
359    // Set the compaction threshold higher to avoid normal compactions.
360    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 5);
361
362    init(name.getMethodName() + "-" + minVersions, conf, ColumnFamilyDescriptorBuilder
363        .newBuilder(family).setMinVersions(minVersions).setTimeToLive(ttl).build());
364
365    long storeTtl = this.store.getScanInfo().getTtl();
366    long sleepTime = storeTtl / storeFileNum;
367    long timeStamp;
368    // There are 4 store files and the max time stamp difference among these
369    // store files will be (this.store.ttl / storeFileNum)
370    for (int i = 1; i <= storeFileNum; i++) {
371      LOG.info("Adding some data for the store file #" + i);
372      timeStamp = EnvironmentEdgeManager.currentTime();
373      this.store.add(new KeyValue(row, family, qf1, timeStamp, (byte[]) null), null);
374      this.store.add(new KeyValue(row, family, qf2, timeStamp, (byte[]) null), null);
375      this.store.add(new KeyValue(row, family, qf3, timeStamp, (byte[]) null), null);
376      flush(i);
377      edge.incrementTime(sleepTime);
378    }
379
380    // Verify the total number of store files
381    assertEquals(storeFileNum, this.store.getStorefiles().size());
382
383     // Each call will find one expired store file and delete it before compaction happens.
384     // There will be no compaction due to threshold above. Last file will not be replaced.
385    for (int i = 1; i <= storeFileNum - 1; i++) {
386      // verify the expired store file.
387      assertFalse(this.store.requestCompaction().isPresent());
388      Collection<HStoreFile> sfs = this.store.getStorefiles();
389      // Ensure i files are gone.
390      if (minVersions == 0) {
391        assertEquals(storeFileNum - i, sfs.size());
392        // Ensure only non-expired files remain.
393        for (HStoreFile sf : sfs) {
394          assertTrue(sf.getReader().getMaxTimestamp() >= (edge.currentTime() - storeTtl));
395        }
396      } else {
397        assertEquals(storeFileNum, sfs.size());
398      }
399      // Let the next store file expired.
400      edge.incrementTime(sleepTime);
401    }
402    assertFalse(this.store.requestCompaction().isPresent());
403
404    Collection<HStoreFile> sfs = this.store.getStorefiles();
405    // Assert the last expired file is not removed.
406    if (minVersions == 0) {
407      assertEquals(1, sfs.size());
408    }
409    long ts = sfs.iterator().next().getReader().getMaxTimestamp();
410    assertTrue(ts < (edge.currentTime() - storeTtl));
411
412    for (HStoreFile sf : sfs) {
413      sf.closeStoreFile(true);
414    }
415  }
416
417  @Test
418  public void testLowestModificationTime() throws Exception {
419    Configuration conf = HBaseConfiguration.create();
420    FileSystem fs = FileSystem.get(conf);
421    // Initialize region
422    init(name.getMethodName(), conf);
423
424    int storeFileNum = 4;
425    for (int i = 1; i <= storeFileNum; i++) {
426      LOG.info("Adding some data for the store file #"+i);
427      this.store.add(new KeyValue(row, family, qf1, i, (byte[])null), null);
428      this.store.add(new KeyValue(row, family, qf2, i, (byte[])null), null);
429      this.store.add(new KeyValue(row, family, qf3, i, (byte[])null), null);
430      flush(i);
431    }
432    // after flush; check the lowest time stamp
433    long lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
434    long lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
435    assertEquals(lowestTimeStampFromManager,lowestTimeStampFromFS);
436
437    // after compact; check the lowest time stamp
438    store.compact(store.requestCompaction().get(), NoLimitThroughputController.INSTANCE, null);
439    lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
440    lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
441    assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS);
442  }
443
444  private static long getLowestTimeStampFromFS(FileSystem fs,
445      final Collection<HStoreFile> candidates) throws IOException {
446    long minTs = Long.MAX_VALUE;
447    if (candidates.isEmpty()) {
448      return minTs;
449    }
450    Path[] p = new Path[candidates.size()];
451    int i = 0;
452    for (HStoreFile sf : candidates) {
453      p[i] = sf.getPath();
454      ++i;
455    }
456
457    FileStatus[] stats = fs.listStatus(p);
458    if (stats == null || stats.length == 0) {
459      return minTs;
460    }
461    for (FileStatus s : stats) {
462      minTs = Math.min(minTs, s.getModificationTime());
463    }
464    return minTs;
465  }
466
467  //////////////////////////////////////////////////////////////////////////////
468  // Get tests
469  //////////////////////////////////////////////////////////////////////////////
470
471  private static final int BLOCKSIZE_SMALL = 8192;
472  /**
473   * Test for hbase-1686.
474   * @throws IOException
475   */
476  @Test
477  public void testEmptyStoreFile() throws IOException {
478    init(this.name.getMethodName());
479    // Write a store file.
480    this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
481    this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null);
482    flush(1);
483    // Now put in place an empty store file.  Its a little tricky.  Have to
484    // do manually with hacked in sequence id.
485    HStoreFile f = this.store.getStorefiles().iterator().next();
486    Path storedir = f.getPath().getParent();
487    long seqid = f.getMaxSequenceId();
488    Configuration c = HBaseConfiguration.create();
489    FileSystem fs = FileSystem.get(c);
490    HFileContext meta = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build();
491    StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c),
492        fs)
493            .withOutputDir(storedir)
494            .withFileContext(meta)
495            .build();
496    w.appendMetadata(seqid + 1, false);
497    w.close();
498    this.store.close();
499    // Reopen it... should pick up two files
500    this.store =
501        new HStore(this.store.getHRegion(), this.store.getColumnFamilyDescriptor(), c, false);
502    assertEquals(2, this.store.getStorefilesCount());
503
504    result = HBaseTestingUtility.getFromStoreFile(store,
505        get.getRow(),
506        qualifiers);
507    assertEquals(1, result.size());
508  }
509
510  /**
511   * Getting data from memstore only
512   * @throws IOException
513   */
514  @Test
515  public void testGet_FromMemStoreOnly() throws IOException {
516    init(this.name.getMethodName());
517
518    //Put data in memstore
519    this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
520    this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null);
521    this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null), null);
522    this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null), null);
523    this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null), null);
524    this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null), null);
525
526    //Get
527    result = HBaseTestingUtility.getFromStoreFile(store,
528        get.getRow(), qualifiers);
529
530    //Compare
531    assertCheck();
532  }
533
534  @Test
535  public void testTimeRangeIfSomeCellsAreDroppedInFlush() throws IOException {
536    testTimeRangeIfSomeCellsAreDroppedInFlush(1);
537    testTimeRangeIfSomeCellsAreDroppedInFlush(3);
538    testTimeRangeIfSomeCellsAreDroppedInFlush(5);
539  }
540
541  private void testTimeRangeIfSomeCellsAreDroppedInFlush(int maxVersion) throws IOException {
542    init(this.name.getMethodName(), TEST_UTIL.getConfiguration(),
543    ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(maxVersion).build());
544    long currentTs = 100;
545    long minTs = currentTs;
546    // the extra cell won't be flushed to disk,
547    // so the min of timerange will be different between memStore and hfile.
548    for (int i = 0; i != (maxVersion + 1); ++i) {
549      this.store.add(new KeyValue(row, family, qf1, ++currentTs, (byte[])null), null);
550      if (i == 1) {
551        minTs = currentTs;
552      }
553    }
554    flushStore(store, id++);
555
556    Collection<HStoreFile> files = store.getStorefiles();
557    assertEquals(1, files.size());
558    HStoreFile f = files.iterator().next();
559    f.initReader();
560    StoreFileReader reader = f.getReader();
561    assertEquals(minTs, reader.timeRange.getMin());
562    assertEquals(currentTs, reader.timeRange.getMax());
563  }
564
565  /**
566   * Getting data from files only
567   * @throws IOException
568   */
569  @Test
570  public void testGet_FromFilesOnly() throws IOException {
571    init(this.name.getMethodName());
572
573    //Put data in memstore
574    this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
575    this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null);
576    //flush
577    flush(1);
578
579    //Add more data
580    this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null), null);
581    this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null), null);
582    //flush
583    flush(2);
584
585    //Add more data
586    this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null), null);
587    this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null), null);
588    //flush
589    flush(3);
590
591    //Get
592    result = HBaseTestingUtility.getFromStoreFile(store,
593        get.getRow(),
594        qualifiers);
595    //this.store.get(get, qualifiers, result);
596
597    //Need to sort the result since multiple files
598    Collections.sort(result, CellComparatorImpl.COMPARATOR);
599
600    //Compare
601    assertCheck();
602  }
603
604  /**
605   * Getting data from memstore and files
606   * @throws IOException
607   */
608  @Test
609  public void testGet_FromMemStoreAndFiles() throws IOException {
610    init(this.name.getMethodName());
611
612    //Put data in memstore
613    this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
614    this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null);
615    //flush
616    flush(1);
617
618    //Add more data
619    this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null), null);
620    this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null), null);
621    //flush
622    flush(2);
623
624    //Add more data
625    this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null), null);
626    this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null), null);
627
628    //Get
629    result = HBaseTestingUtility.getFromStoreFile(store,
630        get.getRow(), qualifiers);
631
632    //Need to sort the result since multiple files
633    Collections.sort(result, CellComparatorImpl.COMPARATOR);
634
635    //Compare
636    assertCheck();
637  }
638
639  private void flush(int storeFilessize) throws IOException{
640    this.store.snapshot();
641    flushStore(store, id++);
642    assertEquals(storeFilessize, this.store.getStorefiles().size());
643    assertEquals(0, ((AbstractMemStore)this.store.memstore).getActive().getCellsCount());
644  }
645
646  private void assertCheck() {
647    assertEquals(expected.size(), result.size());
648    for(int i=0; i<expected.size(); i++) {
649      assertEquals(expected.get(i), result.get(i));
650    }
651  }
652
653  @After
654  public void tearDown() throws Exception {
655    EnvironmentEdgeManagerTestHelper.reset();
656    if (store != null) {
657      try {
658        store.close();
659      } catch (IOException e) {
660      }
661      store = null;
662    }
663    if (region != null) {
664      region.close();
665      region = null;
666    }
667  }
668
669  @AfterClass
670  public static void tearDownAfterClass() throws IOException {
671    TEST_UTIL.cleanupTestDir();
672  }
673
674  @Test
675  public void testHandleErrorsInFlush() throws Exception {
676    LOG.info("Setting up a faulty file system that cannot write");
677
678    final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
679    User user = User.createUserForTesting(conf,
680        "testhandleerrorsinflush", new String[]{"foo"});
681    // Inject our faulty LocalFileSystem
682    conf.setClass("fs.file.impl", FaultyFileSystem.class,
683        FileSystem.class);
684    user.runAs(new PrivilegedExceptionAction<Object>() {
685      @Override
686      public Object run() throws Exception {
687        // Make sure it worked (above is sensitive to caching details in hadoop core)
688        FileSystem fs = FileSystem.get(conf);
689        assertEquals(FaultyFileSystem.class, fs.getClass());
690
691        // Initialize region
692        init(name.getMethodName(), conf);
693
694        LOG.info("Adding some data");
695        store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
696        store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null);
697        store.add(new KeyValue(row, family, qf3, 1, (byte[])null), null);
698
699        LOG.info("Before flush, we should have no files");
700
701        Collection<StoreFileInfo> files =
702          store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName());
703        assertEquals(0, files != null ? files.size() : 0);
704
705        //flush
706        try {
707          LOG.info("Flushing");
708          flush(1);
709          fail("Didn't bubble up IOE!");
710        } catch (IOException ioe) {
711          assertTrue(ioe.getMessage().contains("Fault injected"));
712        }
713
714        LOG.info("After failed flush, we should still have no files!");
715        files = store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName());
716        assertEquals(0, files != null ? files.size() : 0);
717        store.getHRegion().getWAL().close();
718        return null;
719      }
720    });
721    FileSystem.closeAllForUGI(user.getUGI());
722  }
723
724  /**
725   * Faulty file system that will fail if you write past its fault position the FIRST TIME
726   * only; thereafter it will succeed.  Used by {@link TestHRegion} too.
727   */
728  static class FaultyFileSystem extends FilterFileSystem {
729    List<SoftReference<FaultyOutputStream>> outStreams = new ArrayList<>();
730    private long faultPos = 200;
731    AtomicBoolean fault = new AtomicBoolean(true);
732
733    public FaultyFileSystem() {
734      super(new LocalFileSystem());
735      System.err.println("Creating faulty!");
736    }
737
738    @Override
739    public FSDataOutputStream create(Path p) throws IOException {
740      return new FaultyOutputStream(super.create(p), faultPos, fault);
741    }
742
743    @Override
744    public FSDataOutputStream create(Path f, FsPermission permission,
745        boolean overwrite, int bufferSize, short replication, long blockSize,
746        Progressable progress) throws IOException {
747      return new FaultyOutputStream(super.create(f, permission,
748          overwrite, bufferSize, replication, blockSize, progress), faultPos, fault);
749    }
750
751    @Override
752    public FSDataOutputStream createNonRecursive(Path f, boolean overwrite,
753        int bufferSize, short replication, long blockSize, Progressable progress)
754    throws IOException {
755      // Fake it.  Call create instead.  The default implementation throws an IOE
756      // that this is not supported.
757      return create(f, overwrite, bufferSize, replication, blockSize, progress);
758    }
759  }
760
761  static class FaultyOutputStream extends FSDataOutputStream {
762    volatile long faultPos = Long.MAX_VALUE;
763    private final AtomicBoolean fault;
764
765    public FaultyOutputStream(FSDataOutputStream out, long faultPos, final AtomicBoolean fault)
766    throws IOException {
767      super(out, null);
768      this.faultPos = faultPos;
769      this.fault = fault;
770    }
771
772    @Override
773    public synchronized void write(byte[] buf, int offset, int length) throws IOException {
774      System.err.println("faulty stream write at pos " + getPos());
775      injectFault();
776      super.write(buf, offset, length);
777    }
778
779    private void injectFault() throws IOException {
780      if (this.fault.get() && getPos() >= faultPos) {
781        throw new IOException("Fault injected");
782      }
783    }
784  }
785
786  private static void flushStore(HStore store, long id) throws IOException {
787    StoreFlushContext storeFlushCtx = store.createFlushContext(id, FlushLifeCycleTracker.DUMMY);
788    storeFlushCtx.prepare();
789    storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
790    storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
791  }
792
793  /**
794   * Generate a list of KeyValues for testing based on given parameters
795   * @param timestamps
796   * @param numRows
797   * @param qualifier
798   * @param family
799   * @return the rows key-value list
800   */
801  List<Cell> getKeyValueSet(long[] timestamps, int numRows,
802      byte[] qualifier, byte[] family) {
803    List<Cell> kvList = new ArrayList<>();
804    for (int i=1;i<=numRows;i++) {
805      byte[] b = Bytes.toBytes(i);
806      for (long timestamp: timestamps) {
807        kvList.add(new KeyValue(b, family, qualifier, timestamp, b));
808      }
809    }
810    return kvList;
811  }
812
813  /**
814   * Test to ensure correctness when using Stores with multiple timestamps
815   * @throws IOException
816   */
817  @Test
818  public void testMultipleTimestamps() throws IOException {
819    int numRows = 1;
820    long[] timestamps1 = new long[] {1,5,10,20};
821    long[] timestamps2 = new long[] {30,80};
822
823    init(this.name.getMethodName());
824
825    List<Cell> kvList1 = getKeyValueSet(timestamps1,numRows, qf1, family);
826    for (Cell kv : kvList1) {
827      this.store.add(kv, null);
828    }
829
830    this.store.snapshot();
831    flushStore(store, id++);
832
833    List<Cell> kvList2 = getKeyValueSet(timestamps2,numRows, qf1, family);
834    for(Cell kv : kvList2) {
835      this.store.add(kv, null);
836    }
837
838    List<Cell> result;
839    Get get = new Get(Bytes.toBytes(1));
840    get.addColumn(family,qf1);
841
842    get.setTimeRange(0,15);
843    result = HBaseTestingUtility.getFromStoreFile(store, get);
844    assertTrue(result.size()>0);
845
846    get.setTimeRange(40,90);
847    result = HBaseTestingUtility.getFromStoreFile(store, get);
848    assertTrue(result.size()>0);
849
850    get.setTimeRange(10,45);
851    result = HBaseTestingUtility.getFromStoreFile(store, get);
852    assertTrue(result.size()>0);
853
854    get.setTimeRange(80,145);
855    result = HBaseTestingUtility.getFromStoreFile(store, get);
856    assertTrue(result.size()>0);
857
858    get.setTimeRange(1,2);
859    result = HBaseTestingUtility.getFromStoreFile(store, get);
860    assertTrue(result.size()>0);
861
862    get.setTimeRange(90,200);
863    result = HBaseTestingUtility.getFromStoreFile(store, get);
864    assertTrue(result.size()==0);
865  }
866
867  /**
868   * Test for HBASE-3492 - Test split on empty colfam (no store files).
869   *
870   * @throws IOException When the IO operations fail.
871   */
872  @Test
873  public void testSplitWithEmptyColFam() throws IOException {
874    init(this.name.getMethodName());
875    assertFalse(store.getSplitPoint().isPresent());
876    store.getHRegion().forceSplit(null);
877    assertFalse(store.getSplitPoint().isPresent());
878    store.getHRegion().clearSplit();
879  }
880
881  @Test
882  public void testStoreUsesConfigurationFromHcdAndHtd() throws Exception {
883    final String CONFIG_KEY = "hbase.regionserver.thread.compaction.throttle";
884    long anyValue = 10;
885
886    // We'll check that it uses correct config and propagates it appropriately by going thru
887    // the simplest "real" path I can find - "throttleCompaction", which just checks whether
888    // a number we pass in is higher than some config value, inside compactionPolicy.
889    Configuration conf = HBaseConfiguration.create();
890    conf.setLong(CONFIG_KEY, anyValue);
891    init(name.getMethodName() + "-xml", conf);
892    assertTrue(store.throttleCompaction(anyValue + 1));
893    assertFalse(store.throttleCompaction(anyValue));
894
895    // HTD overrides XML.
896    --anyValue;
897    init(name.getMethodName() + "-htd", conf, TableDescriptorBuilder
898        .newBuilder(TableName.valueOf(table)).setValue(CONFIG_KEY, Long.toString(anyValue)),
899      ColumnFamilyDescriptorBuilder.of(family));
900    assertTrue(store.throttleCompaction(anyValue + 1));
901    assertFalse(store.throttleCompaction(anyValue));
902
903    // HCD overrides them both.
904    --anyValue;
905    init(name.getMethodName() + "-hcd", conf,
906      TableDescriptorBuilder.newBuilder(TableName.valueOf(table)).setValue(CONFIG_KEY,
907        Long.toString(anyValue)),
908      ColumnFamilyDescriptorBuilder.newBuilder(family).setValue(CONFIG_KEY, Long.toString(anyValue))
909          .build());
910    assertTrue(store.throttleCompaction(anyValue + 1));
911    assertFalse(store.throttleCompaction(anyValue));
912  }
913
914  public static class DummyStoreEngine extends DefaultStoreEngine {
915    public static DefaultCompactor lastCreatedCompactor = null;
916
917    @Override
918    protected void createComponents(Configuration conf, HStore store, CellComparator comparator)
919        throws IOException {
920      super.createComponents(conf, store, comparator);
921      lastCreatedCompactor = this.compactor;
922    }
923  }
924
925  @Test
926  public void testStoreUsesSearchEngineOverride() throws Exception {
927    Configuration conf = HBaseConfiguration.create();
928    conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DummyStoreEngine.class.getName());
929    init(this.name.getMethodName(), conf);
930    assertEquals(DummyStoreEngine.lastCreatedCompactor,
931      this.store.storeEngine.getCompactor());
932  }
933
934  private void addStoreFile() throws IOException {
935    HStoreFile f = this.store.getStorefiles().iterator().next();
936    Path storedir = f.getPath().getParent();
937    long seqid = this.store.getMaxSequenceId().orElse(0L);
938    Configuration c = TEST_UTIL.getConfiguration();
939    FileSystem fs = FileSystem.get(c);
940    HFileContext fileContext = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build();
941    StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c),
942        fs)
943            .withOutputDir(storedir)
944            .withFileContext(fileContext)
945            .build();
946    w.appendMetadata(seqid + 1, false);
947    w.close();
948    LOG.info("Added store file:" + w.getPath());
949  }
950
951  private void archiveStoreFile(int index) throws IOException {
952    Collection<HStoreFile> files = this.store.getStorefiles();
953    HStoreFile sf = null;
954    Iterator<HStoreFile> it = files.iterator();
955    for (int i = 0; i <= index; i++) {
956      sf = it.next();
957    }
958    store.getRegionFileSystem().removeStoreFiles(store.getColumnFamilyName(), Lists.newArrayList(sf));
959  }
960
961  private void closeCompactedFile(int index) throws IOException {
962    Collection<HStoreFile> files =
963        this.store.getStoreEngine().getStoreFileManager().getCompactedfiles();
964    HStoreFile sf = null;
965    Iterator<HStoreFile> it = files.iterator();
966    for (int i = 0; i <= index; i++) {
967      sf = it.next();
968    }
969    sf.closeStoreFile(true);
970    store.getStoreEngine().getStoreFileManager().removeCompactedFiles(Lists.newArrayList(sf));
971  }
972
973  @Test
974  public void testRefreshStoreFiles() throws Exception {
975    init(name.getMethodName());
976
977    assertEquals(0, this.store.getStorefilesCount());
978
979    // Test refreshing store files when no store files are there
980    store.refreshStoreFiles();
981    assertEquals(0, this.store.getStorefilesCount());
982
983    // add some data, flush
984    this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
985    flush(1);
986    assertEquals(1, this.store.getStorefilesCount());
987
988    // add one more file
989    addStoreFile();
990
991    assertEquals(1, this.store.getStorefilesCount());
992    store.refreshStoreFiles();
993    assertEquals(2, this.store.getStorefilesCount());
994
995    // add three more files
996    addStoreFile();
997    addStoreFile();
998    addStoreFile();
999
1000    assertEquals(2, this.store.getStorefilesCount());
1001    store.refreshStoreFiles();
1002    assertEquals(5, this.store.getStorefilesCount());
1003
1004    closeCompactedFile(0);
1005    archiveStoreFile(0);
1006
1007    assertEquals(5, this.store.getStorefilesCount());
1008    store.refreshStoreFiles();
1009    assertEquals(4, this.store.getStorefilesCount());
1010
1011    archiveStoreFile(0);
1012    archiveStoreFile(1);
1013    archiveStoreFile(2);
1014
1015    assertEquals(4, this.store.getStorefilesCount());
1016    store.refreshStoreFiles();
1017    assertEquals(1, this.store.getStorefilesCount());
1018
1019    archiveStoreFile(0);
1020    store.refreshStoreFiles();
1021    assertEquals(0, this.store.getStorefilesCount());
1022  }
1023
1024  @Test
1025  public void testRefreshStoreFilesNotChanged() throws IOException {
1026    init(name.getMethodName());
1027
1028    assertEquals(0, this.store.getStorefilesCount());
1029
1030    // add some data, flush
1031    this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
1032    flush(1);
1033    // add one more file
1034    addStoreFile();
1035
1036    HStore spiedStore = spy(store);
1037
1038    // call first time after files changed
1039    spiedStore.refreshStoreFiles();
1040    assertEquals(2, this.store.getStorefilesCount());
1041    verify(spiedStore, times(1)).replaceStoreFiles(any(), any());
1042
1043    // call second time
1044    spiedStore.refreshStoreFiles();
1045
1046    //ensure that replaceStoreFiles is not called if files are not refreshed
1047    verify(spiedStore, times(0)).replaceStoreFiles(null, null);
1048  }
1049
1050  private long countMemStoreScanner(StoreScanner scanner) {
1051    if (scanner.currentScanners == null) {
1052      return 0;
1053    }
1054    return scanner.currentScanners.stream()
1055            .filter(s -> !s.isFileScanner())
1056            .count();
1057  }
1058
1059  @Test
1060  public void testNumberOfMemStoreScannersAfterFlush() throws IOException {
1061    long seqId = 100;
1062    long timestamp = System.currentTimeMillis();
1063    Cell cell0 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family)
1064        .setQualifier(qf1).setTimestamp(timestamp).setType(Cell.Type.Put)
1065        .setValue(qf1).build();
1066    PrivateCellUtil.setSequenceId(cell0, seqId);
1067    testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Collections.emptyList());
1068
1069    Cell cell1 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family)
1070        .setQualifier(qf2).setTimestamp(timestamp).setType(Cell.Type.Put)
1071        .setValue(qf1).build();
1072    PrivateCellUtil.setSequenceId(cell1, seqId);
1073    testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1));
1074
1075    seqId = 101;
1076    timestamp = System.currentTimeMillis();
1077    Cell cell2 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row2).setFamily(family)
1078        .setQualifier(qf2).setTimestamp(timestamp).setType(Cell.Type.Put)
1079        .setValue(qf1).build();
1080    PrivateCellUtil.setSequenceId(cell2, seqId);
1081    testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1, cell2));
1082  }
1083
1084  private void testNumberOfMemStoreScannersAfterFlush(List<Cell> inputCellsBeforeSnapshot,
1085      List<Cell> inputCellsAfterSnapshot) throws IOException {
1086    init(this.name.getMethodName() + "-" + inputCellsBeforeSnapshot.size());
1087    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1088    long seqId = Long.MIN_VALUE;
1089    for (Cell c : inputCellsBeforeSnapshot) {
1090      quals.add(CellUtil.cloneQualifier(c));
1091      seqId = Math.max(seqId, c.getSequenceId());
1092    }
1093    for (Cell c : inputCellsAfterSnapshot) {
1094      quals.add(CellUtil.cloneQualifier(c));
1095      seqId = Math.max(seqId, c.getSequenceId());
1096    }
1097    inputCellsBeforeSnapshot.forEach(c -> store.add(c, null));
1098    StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY);
1099    storeFlushCtx.prepare();
1100    inputCellsAfterSnapshot.forEach(c -> store.add(c, null));
1101    int numberOfMemScannersBeforeFlush = inputCellsAfterSnapshot.isEmpty() ? 1 : 2;
1102    try (StoreScanner s = (StoreScanner) store.getScanner(new Scan(), quals, seqId)) {
1103      // snapshot + active (if inputCellsAfterSnapshot isn't empty)
1104      assertEquals(numberOfMemScannersBeforeFlush, countMemStoreScanner(s));
1105      storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1106      storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1107      // snapshot has no data after flush
1108      int numberOfMemScannersAfterFlush = inputCellsAfterSnapshot.isEmpty() ? 0 : 1;
1109      boolean more;
1110      int cellCount = 0;
1111      do {
1112        List<Cell> cells = new ArrayList<>();
1113        more = s.next(cells);
1114        cellCount += cells.size();
1115        assertEquals(more ? numberOfMemScannersAfterFlush : 0, countMemStoreScanner(s));
1116      } while (more);
1117      assertEquals("The number of cells added before snapshot is " + inputCellsBeforeSnapshot.size()
1118          + ", The number of cells added after snapshot is " + inputCellsAfterSnapshot.size(),
1119          inputCellsBeforeSnapshot.size() + inputCellsAfterSnapshot.size(), cellCount);
1120      // the current scanners is cleared
1121      assertEquals(0, countMemStoreScanner(s));
1122    }
1123  }
1124
1125  private Cell createCell(byte[] qualifier, long ts, long sequenceId, byte[] value)
1126      throws IOException {
1127    return createCell(row, qualifier, ts, sequenceId, value);
1128  }
1129
1130  private Cell createCell(byte[] row, byte[] qualifier, long ts, long sequenceId, byte[] value)
1131      throws IOException {
1132    Cell c = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family)
1133        .setQualifier(qualifier).setTimestamp(ts).setType(Cell.Type.Put)
1134        .setValue(value).build();
1135    PrivateCellUtil.setSequenceId(c, sequenceId);
1136    return c;
1137  }
1138
1139  @Test
1140  public void testFlushBeforeCompletingScanWoFilter() throws IOException, InterruptedException {
1141    final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false);
1142    final int expectedSize = 3;
1143    testFlushBeforeCompletingScan(new MyListHook() {
1144      @Override
1145      public void hook(int currentSize) {
1146        if (currentSize == expectedSize - 1) {
1147          try {
1148            flushStore(store, id++);
1149            timeToGoNextRow.set(true);
1150          } catch (IOException e) {
1151            throw new RuntimeException(e);
1152          }
1153        }
1154      }
1155    }, new FilterBase() {
1156      @Override
1157      public Filter.ReturnCode filterCell(final Cell c) throws IOException {
1158        return ReturnCode.INCLUDE;
1159      }
1160    }, expectedSize);
1161  }
1162
1163  @Test
1164  public void testFlushBeforeCompletingScanWithFilter() throws IOException, InterruptedException {
1165    final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false);
1166    final int expectedSize = 2;
1167    testFlushBeforeCompletingScan(new MyListHook() {
1168      @Override
1169      public void hook(int currentSize) {
1170        if (currentSize == expectedSize - 1) {
1171          try {
1172            flushStore(store, id++);
1173            timeToGoNextRow.set(true);
1174          } catch (IOException e) {
1175            throw new RuntimeException(e);
1176          }
1177        }
1178      }
1179    }, new FilterBase() {
1180      @Override
1181      public Filter.ReturnCode filterCell(final Cell c) throws IOException {
1182        if (timeToGoNextRow.get()) {
1183          timeToGoNextRow.set(false);
1184          return ReturnCode.NEXT_ROW;
1185        } else {
1186          return ReturnCode.INCLUDE;
1187        }
1188      }
1189    }, expectedSize);
1190  }
1191
1192  @Test
1193  public void testFlushBeforeCompletingScanWithFilterHint() throws IOException,
1194      InterruptedException {
1195    final AtomicBoolean timeToGetHint = new AtomicBoolean(false);
1196    final int expectedSize = 2;
1197    testFlushBeforeCompletingScan(new MyListHook() {
1198      @Override
1199      public void hook(int currentSize) {
1200        if (currentSize == expectedSize - 1) {
1201          try {
1202            flushStore(store, id++);
1203            timeToGetHint.set(true);
1204          } catch (IOException e) {
1205            throw new RuntimeException(e);
1206          }
1207        }
1208      }
1209    }, new FilterBase() {
1210      @Override
1211      public Filter.ReturnCode filterCell(final Cell c) throws IOException {
1212        if (timeToGetHint.get()) {
1213          timeToGetHint.set(false);
1214          return Filter.ReturnCode.SEEK_NEXT_USING_HINT;
1215        } else {
1216          return Filter.ReturnCode.INCLUDE;
1217        }
1218      }
1219      @Override
1220      public Cell getNextCellHint(Cell currentCell) throws IOException {
1221        return currentCell;
1222      }
1223    }, expectedSize);
1224  }
1225
1226  private void testFlushBeforeCompletingScan(MyListHook hook, Filter filter, int expectedSize)
1227          throws IOException, InterruptedException {
1228    Configuration conf = HBaseConfiguration.create();
1229    byte[] r0 = Bytes.toBytes("row0");
1230    byte[] r1 = Bytes.toBytes("row1");
1231    byte[] r2 = Bytes.toBytes("row2");
1232    byte[] value0 = Bytes.toBytes("value0");
1233    byte[] value1 = Bytes.toBytes("value1");
1234    byte[] value2 = Bytes.toBytes("value2");
1235    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1236    long ts = EnvironmentEdgeManager.currentTime();
1237    long seqId = 100;
1238    init(name.getMethodName(), conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)),
1239      ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(1).build(),
1240      new MyStoreHook() {
1241        @Override
1242        public long getSmallestReadPoint(HStore store) {
1243          return seqId + 3;
1244        }
1245      });
1246    // The cells having the value0 won't be flushed to disk because the value of max version is 1
1247    store.add(createCell(r0, qf1, ts, seqId, value0), memStoreSizing);
1248    store.add(createCell(r0, qf2, ts, seqId, value0), memStoreSizing);
1249    store.add(createCell(r0, qf3, ts, seqId, value0), memStoreSizing);
1250    store.add(createCell(r1, qf1, ts + 1, seqId + 1, value1), memStoreSizing);
1251    store.add(createCell(r1, qf2, ts + 1, seqId + 1, value1), memStoreSizing);
1252    store.add(createCell(r1, qf3, ts + 1, seqId + 1, value1), memStoreSizing);
1253    store.add(createCell(r2, qf1, ts + 2, seqId + 2, value2), memStoreSizing);
1254    store.add(createCell(r2, qf2, ts + 2, seqId + 2, value2), memStoreSizing);
1255    store.add(createCell(r2, qf3, ts + 2, seqId + 2, value2), memStoreSizing);
1256    store.add(createCell(r1, qf1, ts + 3, seqId + 3, value1), memStoreSizing);
1257    store.add(createCell(r1, qf2, ts + 3, seqId + 3, value1), memStoreSizing);
1258    store.add(createCell(r1, qf3, ts + 3, seqId + 3, value1), memStoreSizing);
1259    List<Cell> myList = new MyList<>(hook);
1260    Scan scan = new Scan()
1261            .withStartRow(r1)
1262            .setFilter(filter);
1263    try (InternalScanner scanner = (InternalScanner) store.getScanner(
1264          scan, null, seqId + 3)){
1265      // r1
1266      scanner.next(myList);
1267      assertEquals(expectedSize, myList.size());
1268      for (Cell c : myList) {
1269        byte[] actualValue = CellUtil.cloneValue(c);
1270        assertTrue("expected:" + Bytes.toStringBinary(value1)
1271          + ", actual:" + Bytes.toStringBinary(actualValue)
1272          , Bytes.equals(actualValue, value1));
1273      }
1274      List<Cell> normalList = new ArrayList<>(3);
1275      // r2
1276      scanner.next(normalList);
1277      assertEquals(3, normalList.size());
1278      for (Cell c : normalList) {
1279        byte[] actualValue = CellUtil.cloneValue(c);
1280        assertTrue("expected:" + Bytes.toStringBinary(value2)
1281          + ", actual:" + Bytes.toStringBinary(actualValue)
1282          , Bytes.equals(actualValue, value2));
1283      }
1284    }
1285  }
1286
1287  @Test
1288  public void testCreateScannerAndSnapshotConcurrently() throws IOException, InterruptedException {
1289    Configuration conf = HBaseConfiguration.create();
1290    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore.class.getName());
1291    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
1292        .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
1293    byte[] value = Bytes.toBytes("value");
1294    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1295    long ts = EnvironmentEdgeManager.currentTime();
1296    long seqId = 100;
1297    // older data whihc shouldn't be "seen" by client
1298    store.add(createCell(qf1, ts, seqId, value), memStoreSizing);
1299    store.add(createCell(qf2, ts, seqId, value), memStoreSizing);
1300    store.add(createCell(qf3, ts, seqId, value), memStoreSizing);
1301    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1302    quals.add(qf1);
1303    quals.add(qf2);
1304    quals.add(qf3);
1305    StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY);
1306    MyCompactingMemStore.START_TEST.set(true);
1307    Runnable flush = () -> {
1308      // this is blocked until we create first scanner from pipeline and snapshot -- phase (1/5)
1309      // recreate the active memstore -- phase (4/5)
1310      storeFlushCtx.prepare();
1311    };
1312    ExecutorService service = Executors.newSingleThreadExecutor();
1313    service.submit(flush);
1314    // we get scanner from pipeline and snapshot but they are empty. -- phase (2/5)
1315    // this is blocked until we recreate the active memstore -- phase (3/5)
1316    // we get scanner from active memstore but it is empty -- phase (5/5)
1317    InternalScanner scanner = (InternalScanner) store.getScanner(
1318          new Scan(new Get(row)), quals, seqId + 1);
1319    service.shutdown();
1320    service.awaitTermination(20, TimeUnit.SECONDS);
1321    try {
1322      try {
1323        List<Cell> results = new ArrayList<>();
1324        scanner.next(results);
1325        assertEquals(3, results.size());
1326        for (Cell c : results) {
1327          byte[] actualValue = CellUtil.cloneValue(c);
1328          assertTrue("expected:" + Bytes.toStringBinary(value)
1329            + ", actual:" + Bytes.toStringBinary(actualValue)
1330            , Bytes.equals(actualValue, value));
1331        }
1332      } finally {
1333        scanner.close();
1334      }
1335    } finally {
1336      MyCompactingMemStore.START_TEST.set(false);
1337      storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1338      storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1339    }
1340  }
1341
1342  @Test
1343  public void testScanWithDoubleFlush() throws IOException {
1344    Configuration conf = HBaseConfiguration.create();
1345    // Initialize region
1346    MyStore myStore = initMyStore(name.getMethodName(), conf, new MyStoreHook(){
1347      @Override
1348      public void getScanners(MyStore store) throws IOException {
1349        final long tmpId = id++;
1350        ExecutorService s = Executors.newSingleThreadExecutor();
1351        s.submit(() -> {
1352          try {
1353            // flush the store before storescanner updates the scanners from store.
1354            // The current data will be flushed into files, and the memstore will
1355            // be clear.
1356            // -- phase (4/4)
1357            flushStore(store, tmpId);
1358          }catch (IOException ex) {
1359            throw new RuntimeException(ex);
1360          }
1361        });
1362        s.shutdown();
1363        try {
1364          // wait for the flush, the thread will be blocked in HStore#notifyChangedReadersObservers.
1365          s.awaitTermination(3, TimeUnit.SECONDS);
1366        } catch (InterruptedException ex) {
1367        }
1368      }
1369    });
1370    byte[] oldValue = Bytes.toBytes("oldValue");
1371    byte[] currentValue = Bytes.toBytes("currentValue");
1372    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1373    long ts = EnvironmentEdgeManager.currentTime();
1374    long seqId = 100;
1375    // older data whihc shouldn't be "seen" by client
1376    myStore.add(createCell(qf1, ts, seqId, oldValue), memStoreSizing);
1377    myStore.add(createCell(qf2, ts, seqId, oldValue), memStoreSizing);
1378    myStore.add(createCell(qf3, ts, seqId, oldValue), memStoreSizing);
1379    long snapshotId = id++;
1380    // push older data into snapshot -- phase (1/4)
1381    StoreFlushContext storeFlushCtx = store.createFlushContext(snapshotId, FlushLifeCycleTracker
1382        .DUMMY);
1383    storeFlushCtx.prepare();
1384
1385    // insert current data into active -- phase (2/4)
1386    myStore.add(createCell(qf1, ts + 1, seqId + 1, currentValue), memStoreSizing);
1387    myStore.add(createCell(qf2, ts + 1, seqId + 1, currentValue), memStoreSizing);
1388    myStore.add(createCell(qf3, ts + 1, seqId + 1, currentValue), memStoreSizing);
1389    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1390    quals.add(qf1);
1391    quals.add(qf2);
1392    quals.add(qf3);
1393    try (InternalScanner scanner = (InternalScanner) myStore.getScanner(
1394        new Scan(new Get(row)), quals, seqId + 1)) {
1395      // complete the flush -- phase (3/4)
1396      storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1397      storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1398
1399      List<Cell> results = new ArrayList<>();
1400      scanner.next(results);
1401      assertEquals(3, results.size());
1402      for (Cell c : results) {
1403        byte[] actualValue = CellUtil.cloneValue(c);
1404        assertTrue("expected:" + Bytes.toStringBinary(currentValue)
1405          + ", actual:" + Bytes.toStringBinary(actualValue)
1406          , Bytes.equals(actualValue, currentValue));
1407      }
1408    }
1409  }
1410
1411  @Test
1412  public void testReclaimChunkWhenScaning() throws IOException {
1413    init("testReclaimChunkWhenScaning");
1414    long ts = EnvironmentEdgeManager.currentTime();
1415    long seqId = 100;
1416    byte[] value = Bytes.toBytes("value");
1417    // older data whihc shouldn't be "seen" by client
1418    store.add(createCell(qf1, ts, seqId, value), null);
1419    store.add(createCell(qf2, ts, seqId, value), null);
1420    store.add(createCell(qf3, ts, seqId, value), null);
1421    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1422    quals.add(qf1);
1423    quals.add(qf2);
1424    quals.add(qf3);
1425    try (InternalScanner scanner = (InternalScanner) store.getScanner(
1426        new Scan(new Get(row)), quals, seqId)) {
1427      List<Cell> results = new MyList<>(size -> {
1428        switch (size) {
1429          // 1) we get the first cell (qf1)
1430          // 2) flush the data to have StoreScanner update inner scanners
1431          // 3) the chunk will be reclaimed after updaing
1432          case 1:
1433            try {
1434              flushStore(store, id++);
1435            } catch (IOException e) {
1436              throw new RuntimeException(e);
1437            }
1438            break;
1439          // 1) we get the second cell (qf2)
1440          // 2) add some cell to fill some byte into the chunk (we have only one chunk)
1441          case 2:
1442            try {
1443              byte[] newValue = Bytes.toBytes("newValue");
1444              // older data whihc shouldn't be "seen" by client
1445              store.add(createCell(qf1, ts + 1, seqId + 1, newValue), null);
1446              store.add(createCell(qf2, ts + 1, seqId + 1, newValue), null);
1447              store.add(createCell(qf3, ts + 1, seqId + 1, newValue), null);
1448            } catch (IOException e) {
1449              throw new RuntimeException(e);
1450            }
1451            break;
1452          default:
1453            break;
1454        }
1455      });
1456      scanner.next(results);
1457      assertEquals(3, results.size());
1458      for (Cell c : results) {
1459        byte[] actualValue = CellUtil.cloneValue(c);
1460        assertTrue("expected:" + Bytes.toStringBinary(value)
1461          + ", actual:" + Bytes.toStringBinary(actualValue)
1462          , Bytes.equals(actualValue, value));
1463      }
1464    }
1465  }
1466
1467  /**
1468   * If there are two running InMemoryFlushRunnable, the later InMemoryFlushRunnable
1469   * may change the versionedList. And the first InMemoryFlushRunnable will use the chagned
1470   * versionedList to remove the corresponding segments.
1471   * In short, there will be some segements which isn't in merge are removed.
1472   * @throws IOException
1473   * @throws InterruptedException
1474   */
1475  @Test
1476  public void testRunDoubleMemStoreCompactors() throws IOException, InterruptedException {
1477    int flushSize = 500;
1478    Configuration conf = HBaseConfiguration.create();
1479    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStoreWithCustomCompactor.class.getName());
1480    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.25);
1481    MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.set(0);
1482    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushSize));
1483    // Set the lower threshold to invoke the "MERGE" policy
1484    conf.set(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, String.valueOf(0));
1485    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
1486        .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
1487    byte[] value = Bytes.toBytes("thisisavarylargevalue");
1488    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1489    long ts = EnvironmentEdgeManager.currentTime();
1490    long seqId = 100;
1491    // older data whihc shouldn't be "seen" by client
1492    store.add(createCell(qf1, ts, seqId, value), memStoreSizing);
1493    store.add(createCell(qf2, ts, seqId, value), memStoreSizing);
1494    store.add(createCell(qf3, ts, seqId, value), memStoreSizing);
1495    assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
1496    StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY);
1497    storeFlushCtx.prepare();
1498    // This shouldn't invoke another in-memory flush because the first compactor thread
1499    // hasn't accomplished the in-memory compaction.
1500    store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing);
1501    store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing);
1502    store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing);
1503    assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
1504    //okay. Let the compaction be completed
1505    MyMemStoreCompactor.START_COMPACTOR_LATCH.countDown();
1506    CompactingMemStore mem = (CompactingMemStore) ((HStore)store).memstore;
1507    while (mem.isMemStoreFlushingInMemory()) {
1508      TimeUnit.SECONDS.sleep(1);
1509    }
1510    // This should invoke another in-memory flush.
1511    store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing);
1512    store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing);
1513    store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing);
1514    assertEquals(2, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
1515    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
1516      String.valueOf(TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE));
1517    storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1518    storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1519  }
1520
1521  @Test
1522  public void testAge() throws IOException {
1523    long currentTime = System.currentTimeMillis();
1524    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
1525    edge.setValue(currentTime);
1526    EnvironmentEdgeManager.injectEdge(edge);
1527    Configuration conf = TEST_UTIL.getConfiguration();
1528    ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(family);
1529    initHRegion(name.getMethodName(), conf,
1530      TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), hcd, null, false);
1531    HStore store = new HStore(region, hcd, conf, false) {
1532
1533      @Override
1534      protected StoreEngine<?, ?, ?, ?> createStoreEngine(HStore store, Configuration conf,
1535          CellComparator kvComparator) throws IOException {
1536        List<HStoreFile> storefiles =
1537            Arrays.asList(mockStoreFile(currentTime - 10), mockStoreFile(currentTime - 100),
1538              mockStoreFile(currentTime - 1000), mockStoreFile(currentTime - 10000));
1539        StoreFileManager sfm = mock(StoreFileManager.class);
1540        when(sfm.getStorefiles()).thenReturn(storefiles);
1541        StoreEngine<?, ?, ?, ?> storeEngine = mock(StoreEngine.class);
1542        when(storeEngine.getStoreFileManager()).thenReturn(sfm);
1543        return storeEngine;
1544      }
1545    };
1546    assertEquals(10L, store.getMinStoreFileAge().getAsLong());
1547    assertEquals(10000L, store.getMaxStoreFileAge().getAsLong());
1548    assertEquals((10 + 100 + 1000 + 10000) / 4.0, store.getAvgStoreFileAge().getAsDouble(), 1E-4);
1549  }
1550
1551  private HStoreFile mockStoreFile(long createdTime) {
1552    StoreFileInfo info = mock(StoreFileInfo.class);
1553    when(info.getCreatedTimestamp()).thenReturn(createdTime);
1554    HStoreFile sf = mock(HStoreFile.class);
1555    when(sf.getReader()).thenReturn(mock(StoreFileReader.class));
1556    when(sf.isHFile()).thenReturn(true);
1557    when(sf.getFileInfo()).thenReturn(info);
1558    return sf;
1559  }
1560
1561  private MyStore initMyStore(String methodName, Configuration conf, MyStoreHook hook)
1562      throws IOException {
1563    return (MyStore) init(methodName, conf,
1564      TableDescriptorBuilder.newBuilder(TableName.valueOf(table)),
1565      ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(5).build(), hook);
1566  }
1567
1568  private static class MyStore extends HStore {
1569    private final MyStoreHook hook;
1570
1571    MyStore(final HRegion region, final ColumnFamilyDescriptor family, final Configuration
1572        confParam, MyStoreHook hook, boolean switchToPread) throws IOException {
1573      super(region, family, confParam, false);
1574      this.hook = hook;
1575    }
1576
1577    @Override
1578    public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks,
1579        boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,
1580        boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt,
1581        boolean includeMemstoreScanner) throws IOException {
1582      hook.getScanners(this);
1583      return super.getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true,
1584        stopRow, false, readPt, includeMemstoreScanner);
1585    }
1586
1587    @Override
1588    public long getSmallestReadPoint() {
1589      return hook.getSmallestReadPoint(this);
1590    }
1591  }
1592
1593  private abstract static class MyStoreHook {
1594
1595    void getScanners(MyStore store) throws IOException {
1596    }
1597
1598    long getSmallestReadPoint(HStore store) {
1599      return store.getHRegion().getSmallestReadPoint();
1600    }
1601  }
1602
1603  @Test
1604  public void testSwitchingPreadtoStreamParallelyWithCompactionDischarger() throws Exception {
1605    Configuration conf = HBaseConfiguration.create();
1606    conf.set("hbase.hstore.engine.class", DummyStoreEngine.class.getName());
1607    conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 0);
1608    // Set the lower threshold to invoke the "MERGE" policy
1609    MyStore store = initMyStore(name.getMethodName(), conf, new MyStoreHook() {});
1610    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1611    long ts = System.currentTimeMillis();
1612    long seqID = 1L;
1613    // Add some data to the region and do some flushes
1614    for (int i = 1; i < 10; i++) {
1615      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1616        memStoreSizing);
1617    }
1618    // flush them
1619    flushStore(store, seqID);
1620    for (int i = 11; i < 20; i++) {
1621      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1622        memStoreSizing);
1623    }
1624    // flush them
1625    flushStore(store, seqID);
1626    for (int i = 21; i < 30; i++) {
1627      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1628        memStoreSizing);
1629    }
1630    // flush them
1631    flushStore(store, seqID);
1632
1633    assertEquals(3, store.getStorefilesCount());
1634    Scan scan = new Scan();
1635    scan.addFamily(family);
1636    Collection<HStoreFile> storefiles2 = store.getStorefiles();
1637    ArrayList<HStoreFile> actualStorefiles = Lists.newArrayList(storefiles2);
1638    StoreScanner storeScanner =
1639        (StoreScanner) store.getScanner(scan, scan.getFamilyMap().get(family), Long.MAX_VALUE);
1640    // get the current heap
1641    KeyValueHeap heap = storeScanner.heap;
1642    // create more store files
1643    for (int i = 31; i < 40; i++) {
1644      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1645        memStoreSizing);
1646    }
1647    // flush them
1648    flushStore(store, seqID);
1649
1650    for (int i = 41; i < 50; i++) {
1651      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1652        memStoreSizing);
1653    }
1654    // flush them
1655    flushStore(store, seqID);
1656    storefiles2 = store.getStorefiles();
1657    ArrayList<HStoreFile> actualStorefiles1 = Lists.newArrayList(storefiles2);
1658    actualStorefiles1.removeAll(actualStorefiles);
1659    // Do compaction
1660    MyThread thread = new MyThread(storeScanner);
1661    thread.start();
1662    store.replaceStoreFiles(actualStorefiles, actualStorefiles1);
1663    thread.join();
1664    KeyValueHeap heap2 = thread.getHeap();
1665    assertFalse(heap.equals(heap2));
1666  }
1667
1668  @Test
1669  public void testSpaceQuotaChangeAfterReplacement() throws IOException {
1670    final TableName tn = TableName.valueOf(name.getMethodName());
1671    init(name.getMethodName());
1672
1673    RegionSizeStoreImpl sizeStore = new RegionSizeStoreImpl();
1674
1675    HStoreFile sf1 = mockStoreFileWithLength(1024L);
1676    HStoreFile sf2 = mockStoreFileWithLength(2048L);
1677    HStoreFile sf3 = mockStoreFileWithLength(4096L);
1678    HStoreFile sf4 = mockStoreFileWithLength(8192L);
1679
1680    RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tn).setStartKey(Bytes.toBytes("a"))
1681        .setEndKey(Bytes.toBytes("b")).build();
1682
1683    // Compacting two files down to one, reducing size
1684    sizeStore.put(regionInfo, 1024L + 4096L);
1685    store.updateSpaceQuotaAfterFileReplacement(
1686        sizeStore, regionInfo, Arrays.asList(sf1, sf3), Arrays.asList(sf2));
1687
1688    assertEquals(2048L, sizeStore.getRegionSize(regionInfo).getSize());
1689
1690    // The same file length in and out should have no change
1691    store.updateSpaceQuotaAfterFileReplacement(
1692        sizeStore, regionInfo, Arrays.asList(sf2), Arrays.asList(sf2));
1693
1694    assertEquals(2048L, sizeStore.getRegionSize(regionInfo).getSize());
1695
1696    // Increase the total size used
1697    store.updateSpaceQuotaAfterFileReplacement(
1698        sizeStore, regionInfo, Arrays.asList(sf2), Arrays.asList(sf3));
1699
1700    assertEquals(4096L, sizeStore.getRegionSize(regionInfo).getSize());
1701
1702    RegionInfo regionInfo2 = RegionInfoBuilder.newBuilder(tn).setStartKey(Bytes.toBytes("b"))
1703        .setEndKey(Bytes.toBytes("c")).build();
1704    store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo2, null, Arrays.asList(sf4));
1705
1706    assertEquals(8192L, sizeStore.getRegionSize(regionInfo2).getSize());
1707  }
1708
1709  @Test
1710  public void testHFileContextSetWithCFAndTable() throws Exception {
1711    init(this.name.getMethodName());
1712    StoreFileWriter writer = store.createWriterInTmp(10000L,
1713        Compression.Algorithm.NONE, false, true, false, true);
1714    HFileContext hFileContext = writer.getHFileWriter().getFileContext();
1715    assertArrayEquals(family, hFileContext.getColumnFamily());
1716    assertArrayEquals(table, hFileContext.getTableName());
1717  }
1718
1719  private HStoreFile mockStoreFileWithLength(long length) {
1720    HStoreFile sf = mock(HStoreFile.class);
1721    StoreFileReader sfr = mock(StoreFileReader.class);
1722    when(sf.isHFile()).thenReturn(true);
1723    when(sf.getReader()).thenReturn(sfr);
1724    when(sfr.length()).thenReturn(length);
1725    return sf;
1726  }
1727
1728  private static class MyThread extends Thread {
1729    private StoreScanner scanner;
1730    private KeyValueHeap heap;
1731
1732    public MyThread(StoreScanner scanner) {
1733      this.scanner = scanner;
1734    }
1735
1736    public KeyValueHeap getHeap() {
1737      return this.heap;
1738    }
1739
1740    @Override
1741    public void run() {
1742      scanner.trySwitchToStreamRead();
1743      heap = scanner.heap;
1744    }
1745  }
1746
1747  private static class MyMemStoreCompactor extends MemStoreCompactor {
1748    private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0);
1749    private static final CountDownLatch START_COMPACTOR_LATCH = new CountDownLatch(1);
1750    public MyMemStoreCompactor(CompactingMemStore compactingMemStore, MemoryCompactionPolicy
1751        compactionPolicy) throws IllegalArgumentIOException {
1752      super(compactingMemStore, compactionPolicy);
1753    }
1754
1755    @Override
1756    public boolean start() throws IOException {
1757      boolean isFirst = RUNNER_COUNT.getAndIncrement() == 0;
1758      if (isFirst) {
1759        try {
1760          START_COMPACTOR_LATCH.await();
1761          return super.start();
1762        } catch (InterruptedException ex) {
1763          throw new RuntimeException(ex);
1764        }
1765      }
1766      return super.start();
1767    }
1768  }
1769
1770  public static class MyCompactingMemStoreWithCustomCompactor extends CompactingMemStore {
1771    private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0);
1772    public MyCompactingMemStoreWithCustomCompactor(Configuration conf, CellComparatorImpl c,
1773        HStore store, RegionServicesForStores regionServices,
1774        MemoryCompactionPolicy compactionPolicy) throws IOException {
1775      super(conf, c, store, regionServices, compactionPolicy);
1776    }
1777
1778    @Override
1779    protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy)
1780        throws IllegalArgumentIOException {
1781      return new MyMemStoreCompactor(this, compactionPolicy);
1782    }
1783
1784    @Override
1785    protected boolean setInMemoryCompactionFlag() {
1786      boolean rval = super.setInMemoryCompactionFlag();
1787      if (rval) {
1788        RUNNER_COUNT.incrementAndGet();
1789        if (LOG.isDebugEnabled()) {
1790          LOG.debug("runner count: " + RUNNER_COUNT.get());
1791        }
1792      }
1793      return rval;
1794    }
1795  }
1796
1797  public static class MyCompactingMemStore extends CompactingMemStore {
1798    private static final AtomicBoolean START_TEST = new AtomicBoolean(false);
1799    private final CountDownLatch getScannerLatch = new CountDownLatch(1);
1800    private final CountDownLatch snapshotLatch = new CountDownLatch(1);
1801    public MyCompactingMemStore(Configuration conf, CellComparatorImpl c,
1802        HStore store, RegionServicesForStores regionServices,
1803        MemoryCompactionPolicy compactionPolicy) throws IOException {
1804      super(conf, c, store, regionServices, compactionPolicy);
1805    }
1806
1807    @Override
1808    protected List<KeyValueScanner> createList(int capacity) {
1809      if (START_TEST.get()) {
1810        try {
1811          getScannerLatch.countDown();
1812          snapshotLatch.await();
1813        } catch (InterruptedException e) {
1814          throw new RuntimeException(e);
1815        }
1816      }
1817      return new ArrayList<>(capacity);
1818    }
1819    @Override
1820    protected void pushActiveToPipeline(MutableSegment active) {
1821      if (START_TEST.get()) {
1822        try {
1823          getScannerLatch.await();
1824        } catch (InterruptedException e) {
1825          throw new RuntimeException(e);
1826        }
1827      }
1828
1829      super.pushActiveToPipeline(active);
1830      if (START_TEST.get()) {
1831        snapshotLatch.countDown();
1832      }
1833    }
1834  }
1835
1836  interface MyListHook {
1837    void hook(int currentSize);
1838  }
1839
1840  private static class MyList<T> implements List<T> {
1841    private final List<T> delegatee = new ArrayList<>();
1842    private final MyListHook hookAtAdd;
1843    MyList(final MyListHook hookAtAdd) {
1844      this.hookAtAdd = hookAtAdd;
1845    }
1846    @Override
1847    public int size() {return delegatee.size();}
1848
1849    @Override
1850    public boolean isEmpty() {return delegatee.isEmpty();}
1851
1852    @Override
1853    public boolean contains(Object o) {return delegatee.contains(o);}
1854
1855    @Override
1856    public Iterator<T> iterator() {return delegatee.iterator();}
1857
1858    @Override
1859    public Object[] toArray() {return delegatee.toArray();}
1860
1861    @Override
1862    public <R> R[] toArray(R[] a) {return delegatee.toArray(a);}
1863
1864    @Override
1865    public boolean add(T e) {
1866      hookAtAdd.hook(size());
1867      return delegatee.add(e);
1868    }
1869
1870    @Override
1871    public boolean remove(Object o) {return delegatee.remove(o);}
1872
1873    @Override
1874    public boolean containsAll(Collection<?> c) {return delegatee.containsAll(c);}
1875
1876    @Override
1877    public boolean addAll(Collection<? extends T> c) {return delegatee.addAll(c);}
1878
1879    @Override
1880    public boolean addAll(int index, Collection<? extends T> c) {return delegatee.addAll(index, c);}
1881
1882    @Override
1883    public boolean removeAll(Collection<?> c) {return delegatee.removeAll(c);}
1884
1885    @Override
1886    public boolean retainAll(Collection<?> c) {return delegatee.retainAll(c);}
1887
1888    @Override
1889    public void clear() {delegatee.clear();}
1890
1891    @Override
1892    public T get(int index) {return delegatee.get(index);}
1893
1894    @Override
1895    public T set(int index, T element) {return delegatee.set(index, element);}
1896
1897    @Override
1898    public void add(int index, T element) {delegatee.add(index, element);}
1899
1900    @Override
1901    public T remove(int index) {return delegatee.remove(index);}
1902
1903    @Override
1904    public int indexOf(Object o) {return delegatee.indexOf(o);}
1905
1906    @Override
1907    public int lastIndexOf(Object o) {return delegatee.lastIndexOf(o);}
1908
1909    @Override
1910    public ListIterator<T> listIterator() {return delegatee.listIterator();}
1911
1912    @Override
1913    public ListIterator<T> listIterator(int index) {return delegatee.listIterator(index);}
1914
1915    @Override
1916    public List<T> subList(int fromIndex, int toIndex) {return delegatee.subList(fromIndex, toIndex);}
1917  }
1918}