001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025import static org.mockito.ArgumentMatchers.any;
026import static org.mockito.Mockito.mock;
027import static org.mockito.Mockito.spy;
028import static org.mockito.Mockito.times;
029import static org.mockito.Mockito.verify;
030import static org.mockito.Mockito.when;
031
032import java.io.IOException;
033import java.lang.ref.SoftReference;
034import java.security.PrivilegedExceptionAction;
035import java.util.ArrayList;
036import java.util.Arrays;
037import java.util.Collection;
038import java.util.Collections;
039import java.util.Iterator;
040import java.util.List;
041import java.util.ListIterator;
042import java.util.NavigableSet;
043import java.util.TreeSet;
044import java.util.concurrent.ConcurrentSkipListSet;
045import java.util.concurrent.CountDownLatch;
046import java.util.concurrent.ExecutorService;
047import java.util.concurrent.Executors;
048import java.util.concurrent.ThreadPoolExecutor;
049import java.util.concurrent.TimeUnit;
050import java.util.concurrent.atomic.AtomicBoolean;
051import java.util.concurrent.atomic.AtomicInteger;
052import org.apache.hadoop.conf.Configuration;
053import org.apache.hadoop.fs.FSDataOutputStream;
054import org.apache.hadoop.fs.FileStatus;
055import org.apache.hadoop.fs.FileSystem;
056import org.apache.hadoop.fs.FilterFileSystem;
057import org.apache.hadoop.fs.LocalFileSystem;
058import org.apache.hadoop.fs.Path;
059import org.apache.hadoop.fs.permission.FsPermission;
060import org.apache.hadoop.hbase.Cell;
061import org.apache.hadoop.hbase.CellBuilderFactory;
062import org.apache.hadoop.hbase.CellBuilderType;
063import org.apache.hadoop.hbase.CellComparator;
064import org.apache.hadoop.hbase.CellComparatorImpl;
065import org.apache.hadoop.hbase.CellUtil;
066import org.apache.hadoop.hbase.HBaseClassTestRule;
067import org.apache.hadoop.hbase.HBaseConfiguration;
068import org.apache.hadoop.hbase.HBaseTestingUtility;
069import org.apache.hadoop.hbase.HConstants;
070import org.apache.hadoop.hbase.KeyValue;
071import org.apache.hadoop.hbase.MemoryCompactionPolicy;
072import org.apache.hadoop.hbase.PrivateCellUtil;
073import org.apache.hadoop.hbase.TableName;
074import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
075import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
076import org.apache.hadoop.hbase.client.Get;
077import org.apache.hadoop.hbase.client.RegionInfo;
078import org.apache.hadoop.hbase.client.RegionInfoBuilder;
079import org.apache.hadoop.hbase.client.Scan;
080import org.apache.hadoop.hbase.client.TableDescriptor;
081import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
082import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
083import org.apache.hadoop.hbase.filter.Filter;
084import org.apache.hadoop.hbase.filter.FilterBase;
085import org.apache.hadoop.hbase.io.compress.Compression;
086import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
087import org.apache.hadoop.hbase.io.hfile.CacheConfig;
088import org.apache.hadoop.hbase.io.hfile.HFile;
089import org.apache.hadoop.hbase.io.hfile.HFileContext;
090import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
091import org.apache.hadoop.hbase.monitoring.MonitoredTask;
092import org.apache.hadoop.hbase.quotas.RegionSizeStoreImpl;
093import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
094import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
095import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
096import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
097import org.apache.hadoop.hbase.security.User;
098import org.apache.hadoop.hbase.testclassification.MediumTests;
099import org.apache.hadoop.hbase.testclassification.RegionServerTests;
100import org.apache.hadoop.hbase.util.Bytes;
101import org.apache.hadoop.hbase.util.CommonFSUtils;
102import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
103import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
104import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
105import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
106import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
107import org.apache.hadoop.hbase.wal.WALFactory;
108import org.apache.hadoop.util.Progressable;
109import org.junit.After;
110import org.junit.AfterClass;
111import org.junit.Before;
112import org.junit.ClassRule;
113import org.junit.Rule;
114import org.junit.Test;
115import org.junit.experimental.categories.Category;
116import org.junit.rules.TestName;
117import org.mockito.Mockito;
118import org.slf4j.Logger;
119import org.slf4j.LoggerFactory;
120
121import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
122
123/**
124 * Test class for the HStore
125 */
126@Category({ RegionServerTests.class, MediumTests.class })
127public class TestHStore {
128
129  @ClassRule
130  public static final HBaseClassTestRule CLASS_RULE =
131      HBaseClassTestRule.forClass(TestHStore.class);
132
133  private static final Logger LOG = LoggerFactory.getLogger(TestHStore.class);
134  @Rule
135  public TestName name = new TestName();
136
137  HRegion region;
138  HStore store;
139  byte [] table = Bytes.toBytes("table");
140  byte [] family = Bytes.toBytes("family");
141
142  byte [] row = Bytes.toBytes("row");
143  byte [] row2 = Bytes.toBytes("row2");
144  byte [] qf1 = Bytes.toBytes("qf1");
145  byte [] qf2 = Bytes.toBytes("qf2");
146  byte [] qf3 = Bytes.toBytes("qf3");
147  byte [] qf4 = Bytes.toBytes("qf4");
148  byte [] qf5 = Bytes.toBytes("qf5");
149  byte [] qf6 = Bytes.toBytes("qf6");
150
151  NavigableSet<byte[]> qualifiers = new ConcurrentSkipListSet<>(Bytes.BYTES_COMPARATOR);
152
153  List<Cell> expected = new ArrayList<>();
154  List<Cell> result = new ArrayList<>();
155
156  long id = System.currentTimeMillis();
157  Get get = new Get(row);
158
159  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
160  private static final String DIR = TEST_UTIL.getDataTestDir("TestStore").toString();
161
162
163  /**
164   * Setup
165   * @throws IOException
166   */
167  @Before
168  public void setUp() throws IOException {
169    qualifiers.clear();
170    qualifiers.add(qf1);
171    qualifiers.add(qf3);
172    qualifiers.add(qf5);
173
174    Iterator<byte[]> iter = qualifiers.iterator();
175    while(iter.hasNext()){
176      byte [] next = iter.next();
177      expected.add(new KeyValue(row, family, next, 1, (byte[])null));
178      get.addColumn(family, next);
179    }
180  }
181
182  private void init(String methodName) throws IOException {
183    init(methodName, TEST_UTIL.getConfiguration());
184  }
185
186  private HStore init(String methodName, Configuration conf) throws IOException {
187    // some of the tests write 4 versions and then flush
188    // (with HBASE-4241, lower versions are collected on flush)
189    return init(methodName, conf,
190      ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(4).build());
191  }
192
193  private HStore init(String methodName, Configuration conf, ColumnFamilyDescriptor hcd)
194      throws IOException {
195    return init(methodName, conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), hcd);
196  }
197
198  private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder,
199      ColumnFamilyDescriptor hcd) throws IOException {
200    return init(methodName, conf, builder, hcd, null);
201  }
202
203  private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder,
204      ColumnFamilyDescriptor hcd, MyStoreHook hook) throws IOException {
205    return init(methodName, conf, builder, hcd, hook, false);
206  }
207
208  private void initHRegion(String methodName, Configuration conf, TableDescriptorBuilder builder,
209      ColumnFamilyDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException {
210    TableDescriptor htd = builder.setColumnFamily(hcd).build();
211    Path basedir = new Path(DIR + methodName);
212    Path tableDir = CommonFSUtils.getTableDir(basedir, htd.getTableName());
213    final Path logdir = new Path(basedir, AbstractFSWALProvider.getWALDirectoryName(methodName));
214
215    FileSystem fs = FileSystem.get(conf);
216
217    fs.delete(logdir, true);
218    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false,
219      MemStoreLABImpl.CHUNK_SIZE_DEFAULT, 1, 0, null);
220    RegionInfo info = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
221    Configuration walConf = new Configuration(conf);
222    CommonFSUtils.setRootDir(walConf, basedir);
223    WALFactory wals = new WALFactory(walConf, methodName);
224    region = new HRegion(new HRegionFileSystem(conf, fs, tableDir, info), wals.getWAL(info), conf,
225        htd, null);
226    region.regionServicesForStores = Mockito.spy(region.regionServicesForStores);
227    ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
228    Mockito.when(region.regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool);
229  }
230
231  private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder,
232      ColumnFamilyDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException {
233    initHRegion(methodName, conf, builder, hcd, hook, switchToPread);
234    if (hook == null) {
235      store = new HStore(region, hcd, conf, false);
236    } else {
237      store = new MyStore(region, hcd, conf, hook, switchToPread);
238    }
239    return store;
240  }
241
242  /**
243   * Test we do not lose data if we fail a flush and then close.
244   * Part of HBase-10466
245   * @throws Exception
246   */
247  @Test
248  public void testFlushSizeSizing() throws Exception {
249    LOG.info("Setting up a faulty file system that cannot write in " + this.name.getMethodName());
250    final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
251    // Only retry once.
252    conf.setInt("hbase.hstore.flush.retries.number", 1);
253    User user = User.createUserForTesting(conf, this.name.getMethodName(),
254      new String[]{"foo"});
255    // Inject our faulty LocalFileSystem
256    conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
257    user.runAs(new PrivilegedExceptionAction<Object>() {
258      @Override
259      public Object run() throws Exception {
260        // Make sure it worked (above is sensitive to caching details in hadoop core)
261        FileSystem fs = FileSystem.get(conf);
262        assertEquals(FaultyFileSystem.class, fs.getClass());
263        FaultyFileSystem ffs = (FaultyFileSystem)fs;
264
265        // Initialize region
266        init(name.getMethodName(), conf);
267
268        MemStoreSize mss = store.memstore.getFlushableSize();
269        assertEquals(0, mss.getDataSize());
270        LOG.info("Adding some data");
271        MemStoreSizing kvSize = new NonThreadSafeMemStoreSizing();
272        store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), kvSize);
273        // add the heap size of active (mutable) segment
274        kvSize.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0, 0);
275        mss = store.memstore.getFlushableSize();
276        assertEquals(kvSize.getMemStoreSize(), mss);
277        // Flush.  Bug #1 from HBASE-10466.  Make sure size calculation on failed flush is right.
278        try {
279          LOG.info("Flushing");
280          flushStore(store, id++);
281          fail("Didn't bubble up IOE!");
282        } catch (IOException ioe) {
283          assertTrue(ioe.getMessage().contains("Fault injected"));
284        }
285        // due to snapshot, change mutable to immutable segment
286        kvSize.incMemStoreSize(0,
287          CSLMImmutableSegment.DEEP_OVERHEAD_CSLM - MutableSegment.DEEP_OVERHEAD, 0, 0);
288        mss = store.memstore.getFlushableSize();
289        assertEquals(kvSize.getMemStoreSize(), mss);
290        MemStoreSizing kvSize2 = new NonThreadSafeMemStoreSizing();
291        store.add(new KeyValue(row, family, qf2, 2, (byte[]) null), kvSize2);
292        kvSize2.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0, 0);
293        // Even though we add a new kv, we expect the flushable size to be 'same' since we have
294        // not yet cleared the snapshot -- the above flush failed.
295        assertEquals(kvSize.getMemStoreSize(), mss);
296        ffs.fault.set(false);
297        flushStore(store, id++);
298        mss = store.memstore.getFlushableSize();
299        // Size should be the foreground kv size.
300        assertEquals(kvSize2.getMemStoreSize(), mss);
301        flushStore(store, id++);
302        mss = store.memstore.getFlushableSize();
303        assertEquals(0, mss.getDataSize());
304        assertEquals(MutableSegment.DEEP_OVERHEAD, mss.getHeapSize());
305        return null;
306      }
307    });
308  }
309
310  /**
311   * Verify that compression and data block encoding are respected by the
312   * Store.createWriterInTmp() method, used on store flush.
313   */
314  @Test
315  public void testCreateWriter() throws Exception {
316    Configuration conf = HBaseConfiguration.create();
317    FileSystem fs = FileSystem.get(conf);
318
319    ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.newBuilder(family)
320        .setCompressionType(Compression.Algorithm.GZ).setDataBlockEncoding(DataBlockEncoding.DIFF)
321        .build();
322    init(name.getMethodName(), conf, hcd);
323
324    // Test createWriterInTmp()
325    StoreFileWriter writer =
326        store.createWriterInTmp(4, hcd.getCompressionType(), false, true, false, false);
327    Path path = writer.getPath();
328    writer.append(new KeyValue(row, family, qf1, Bytes.toBytes(1)));
329    writer.append(new KeyValue(row, family, qf2, Bytes.toBytes(2)));
330    writer.append(new KeyValue(row2, family, qf1, Bytes.toBytes(3)));
331    writer.append(new KeyValue(row2, family, qf2, Bytes.toBytes(4)));
332    writer.close();
333
334    // Verify that compression and encoding settings are respected
335    HFile.Reader reader = HFile.createReader(fs, path, new CacheConfig(conf), true, conf);
336    assertEquals(hcd.getCompressionType(), reader.getTrailer().getCompressionCodec());
337    assertEquals(hcd.getDataBlockEncoding(), reader.getDataBlockEncoding());
338    reader.close();
339  }
340
341  @Test
342  public void testDeleteExpiredStoreFiles() throws Exception {
343    testDeleteExpiredStoreFiles(0);
344    testDeleteExpiredStoreFiles(1);
345  }
346
347  /*
348   * @param minVersions the MIN_VERSIONS for the column family
349   */
350  public void testDeleteExpiredStoreFiles(int minVersions) throws Exception {
351    int storeFileNum = 4;
352    int ttl = 4;
353    IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
354    EnvironmentEdgeManagerTestHelper.injectEdge(edge);
355
356    Configuration conf = HBaseConfiguration.create();
357    // Enable the expired store file deletion
358    conf.setBoolean("hbase.store.delete.expired.storefile", true);
359    // Set the compaction threshold higher to avoid normal compactions.
360    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 5);
361
362    init(name.getMethodName() + "-" + minVersions, conf, ColumnFamilyDescriptorBuilder
363        .newBuilder(family).setMinVersions(minVersions).setTimeToLive(ttl).build());
364
365    long storeTtl = this.store.getScanInfo().getTtl();
366    long sleepTime = storeTtl / storeFileNum;
367    long timeStamp;
368    // There are 4 store files and the max time stamp difference among these
369    // store files will be (this.store.ttl / storeFileNum)
370    for (int i = 1; i <= storeFileNum; i++) {
371      LOG.info("Adding some data for the store file #" + i);
372      timeStamp = EnvironmentEdgeManager.currentTime();
373      this.store.add(new KeyValue(row, family, qf1, timeStamp, (byte[]) null), null);
374      this.store.add(new KeyValue(row, family, qf2, timeStamp, (byte[]) null), null);
375      this.store.add(new KeyValue(row, family, qf3, timeStamp, (byte[]) null), null);
376      flush(i);
377      edge.incrementTime(sleepTime);
378    }
379
380    // Verify the total number of store files
381    assertEquals(storeFileNum, this.store.getStorefiles().size());
382
383     // Each call will find one expired store file and delete it before compaction happens.
384     // There will be no compaction due to threshold above. Last file will not be replaced.
385    for (int i = 1; i <= storeFileNum - 1; i++) {
386      // verify the expired store file.
387      assertFalse(this.store.requestCompaction().isPresent());
388      Collection<HStoreFile> sfs = this.store.getStorefiles();
389      // Ensure i files are gone.
390      if (minVersions == 0) {
391        assertEquals(storeFileNum - i, sfs.size());
392        // Ensure only non-expired files remain.
393        for (HStoreFile sf : sfs) {
394          assertTrue(sf.getReader().getMaxTimestamp() >= (edge.currentTime() - storeTtl));
395        }
396      } else {
397        assertEquals(storeFileNum, sfs.size());
398      }
399      // Let the next store file expired.
400      edge.incrementTime(sleepTime);
401    }
402    assertFalse(this.store.requestCompaction().isPresent());
403
404    Collection<HStoreFile> sfs = this.store.getStorefiles();
405    // Assert the last expired file is not removed.
406    if (minVersions == 0) {
407      assertEquals(1, sfs.size());
408    }
409    long ts = sfs.iterator().next().getReader().getMaxTimestamp();
410    assertTrue(ts < (edge.currentTime() - storeTtl));
411
412    for (HStoreFile sf : sfs) {
413      sf.closeStoreFile(true);
414    }
415  }
416
417  @Test
418  public void testLowestModificationTime() throws Exception {
419    Configuration conf = HBaseConfiguration.create();
420    FileSystem fs = FileSystem.get(conf);
421    // Initialize region
422    init(name.getMethodName(), conf);
423
424    int storeFileNum = 4;
425    for (int i = 1; i <= storeFileNum; i++) {
426      LOG.info("Adding some data for the store file #"+i);
427      this.store.add(new KeyValue(row, family, qf1, i, (byte[])null), null);
428      this.store.add(new KeyValue(row, family, qf2, i, (byte[])null), null);
429      this.store.add(new KeyValue(row, family, qf3, i, (byte[])null), null);
430      flush(i);
431    }
432    // after flush; check the lowest time stamp
433    long lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
434    long lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
435    assertEquals(lowestTimeStampFromManager,lowestTimeStampFromFS);
436
437    // after compact; check the lowest time stamp
438    store.compact(store.requestCompaction().get(), NoLimitThroughputController.INSTANCE, null);
439    lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
440    lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
441    assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS);
442  }
443
444  private static long getLowestTimeStampFromFS(FileSystem fs,
445      final Collection<HStoreFile> candidates) throws IOException {
446    long minTs = Long.MAX_VALUE;
447    if (candidates.isEmpty()) {
448      return minTs;
449    }
450    Path[] p = new Path[candidates.size()];
451    int i = 0;
452    for (HStoreFile sf : candidates) {
453      p[i] = sf.getPath();
454      ++i;
455    }
456
457    FileStatus[] stats = fs.listStatus(p);
458    if (stats == null || stats.length == 0) {
459      return minTs;
460    }
461    for (FileStatus s : stats) {
462      minTs = Math.min(minTs, s.getModificationTime());
463    }
464    return minTs;
465  }
466
467  //////////////////////////////////////////////////////////////////////////////
468  // Get tests
469  //////////////////////////////////////////////////////////////////////////////
470
471  private static final int BLOCKSIZE_SMALL = 8192;
472  /**
473   * Test for hbase-1686.
474   * @throws IOException
475   */
476  @Test
477  public void testEmptyStoreFile() throws IOException {
478    init(this.name.getMethodName());
479    // Write a store file.
480    this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
481    this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null);
482    flush(1);
483    // Now put in place an empty store file.  Its a little tricky.  Have to
484    // do manually with hacked in sequence id.
485    HStoreFile f = this.store.getStorefiles().iterator().next();
486    Path storedir = f.getPath().getParent();
487    long seqid = f.getMaxSequenceId();
488    Configuration c = HBaseConfiguration.create();
489    FileSystem fs = FileSystem.get(c);
490    HFileContext meta = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build();
491    StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c),
492        fs)
493            .withOutputDir(storedir)
494            .withFileContext(meta)
495            .build();
496    w.appendMetadata(seqid + 1, false);
497    w.close();
498    this.store.close();
499    // Reopen it... should pick up two files
500    this.store =
501        new HStore(this.store.getHRegion(), this.store.getColumnFamilyDescriptor(), c, false);
502    assertEquals(2, this.store.getStorefilesCount());
503
504    result = HBaseTestingUtility.getFromStoreFile(store,
505        get.getRow(),
506        qualifiers);
507    assertEquals(1, result.size());
508  }
509
510  /**
511   * Getting data from memstore only
512   * @throws IOException
513   */
514  @Test
515  public void testGet_FromMemStoreOnly() throws IOException {
516    init(this.name.getMethodName());
517
518    //Put data in memstore
519    this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
520    this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null);
521    this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null), null);
522    this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null), null);
523    this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null), null);
524    this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null), null);
525
526    //Get
527    result = HBaseTestingUtility.getFromStoreFile(store,
528        get.getRow(), qualifiers);
529
530    //Compare
531    assertCheck();
532  }
533
534  @Test
535  public void testTimeRangeIfSomeCellsAreDroppedInFlush() throws IOException {
536    testTimeRangeIfSomeCellsAreDroppedInFlush(1);
537    testTimeRangeIfSomeCellsAreDroppedInFlush(3);
538    testTimeRangeIfSomeCellsAreDroppedInFlush(5);
539  }
540
541  private void testTimeRangeIfSomeCellsAreDroppedInFlush(int maxVersion) throws IOException {
542    init(this.name.getMethodName(), TEST_UTIL.getConfiguration(),
543    ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(maxVersion).build());
544    long currentTs = 100;
545    long minTs = currentTs;
546    // the extra cell won't be flushed to disk,
547    // so the min of timerange will be different between memStore and hfile.
548    for (int i = 0; i != (maxVersion + 1); ++i) {
549      this.store.add(new KeyValue(row, family, qf1, ++currentTs, (byte[])null), null);
550      if (i == 1) {
551        minTs = currentTs;
552      }
553    }
554    flushStore(store, id++);
555
556    Collection<HStoreFile> files = store.getStorefiles();
557    assertEquals(1, files.size());
558    HStoreFile f = files.iterator().next();
559    f.initReader();
560    StoreFileReader reader = f.getReader();
561    assertEquals(minTs, reader.timeRange.getMin());
562    assertEquals(currentTs, reader.timeRange.getMax());
563  }
564
565  /**
566   * Getting data from files only
567   * @throws IOException
568   */
569  @Test
570  public void testGet_FromFilesOnly() throws IOException {
571    init(this.name.getMethodName());
572
573    //Put data in memstore
574    this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
575    this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null);
576    //flush
577    flush(1);
578
579    //Add more data
580    this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null), null);
581    this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null), null);
582    //flush
583    flush(2);
584
585    //Add more data
586    this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null), null);
587    this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null), null);
588    //flush
589    flush(3);
590
591    //Get
592    result = HBaseTestingUtility.getFromStoreFile(store,
593        get.getRow(),
594        qualifiers);
595    //this.store.get(get, qualifiers, result);
596
597    //Need to sort the result since multiple files
598    Collections.sort(result, CellComparatorImpl.COMPARATOR);
599
600    //Compare
601    assertCheck();
602  }
603
604  /**
605   * Getting data from memstore and files
606   * @throws IOException
607   */
608  @Test
609  public void testGet_FromMemStoreAndFiles() throws IOException {
610    init(this.name.getMethodName());
611
612    //Put data in memstore
613    this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
614    this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null);
615    //flush
616    flush(1);
617
618    //Add more data
619    this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null), null);
620    this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null), null);
621    //flush
622    flush(2);
623
624    //Add more data
625    this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null), null);
626    this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null), null);
627
628    //Get
629    result = HBaseTestingUtility.getFromStoreFile(store,
630        get.getRow(), qualifiers);
631
632    //Need to sort the result since multiple files
633    Collections.sort(result, CellComparatorImpl.COMPARATOR);
634
635    //Compare
636    assertCheck();
637  }
638
639  private void flush(int storeFilessize) throws IOException{
640    this.store.snapshot();
641    flushStore(store, id++);
642    assertEquals(storeFilessize, this.store.getStorefiles().size());
643    assertEquals(0, ((AbstractMemStore)this.store.memstore).getActive().getCellsCount());
644  }
645
646  private void assertCheck() {
647    assertEquals(expected.size(), result.size());
648    for(int i=0; i<expected.size(); i++) {
649      assertEquals(expected.get(i), result.get(i));
650    }
651  }
652
653  @After
654  public void tearDown() throws Exception {
655    EnvironmentEdgeManagerTestHelper.reset();
656    if (store != null) {
657      try {
658        store.close();
659      } catch (IOException e) {
660      }
661      store = null;
662    }
663    if (region != null) {
664      region.close();
665      region = null;
666    }
667  }
668
669  @AfterClass
670  public static void tearDownAfterClass() throws IOException {
671    TEST_UTIL.cleanupTestDir();
672  }
673
674  @Test
675  public void testHandleErrorsInFlush() throws Exception {
676    LOG.info("Setting up a faulty file system that cannot write");
677
678    final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
679    User user = User.createUserForTesting(conf,
680        "testhandleerrorsinflush", new String[]{"foo"});
681    // Inject our faulty LocalFileSystem
682    conf.setClass("fs.file.impl", FaultyFileSystem.class,
683        FileSystem.class);
684    user.runAs(new PrivilegedExceptionAction<Object>() {
685      @Override
686      public Object run() throws Exception {
687        // Make sure it worked (above is sensitive to caching details in hadoop core)
688        FileSystem fs = FileSystem.get(conf);
689        assertEquals(FaultyFileSystem.class, fs.getClass());
690
691        // Initialize region
692        init(name.getMethodName(), conf);
693
694        LOG.info("Adding some data");
695        store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
696        store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null);
697        store.add(new KeyValue(row, family, qf3, 1, (byte[])null), null);
698
699        LOG.info("Before flush, we should have no files");
700
701        Collection<StoreFileInfo> files =
702          store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName());
703        assertEquals(0, files != null ? files.size() : 0);
704
705        //flush
706        try {
707          LOG.info("Flushing");
708          flush(1);
709          fail("Didn't bubble up IOE!");
710        } catch (IOException ioe) {
711          assertTrue(ioe.getMessage().contains("Fault injected"));
712        }
713
714        LOG.info("After failed flush, we should still have no files!");
715        files = store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName());
716        assertEquals(0, files != null ? files.size() : 0);
717        store.getHRegion().getWAL().close();
718        return null;
719      }
720    });
721    FileSystem.closeAllForUGI(user.getUGI());
722  }
723
724  /**
725   * Faulty file system that will fail if you write past its fault position the FIRST TIME
726   * only; thereafter it will succeed.  Used by {@link TestHRegion} too.
727   */
728  static class FaultyFileSystem extends FilterFileSystem {
729    List<SoftReference<FaultyOutputStream>> outStreams = new ArrayList<>();
730    private long faultPos = 200;
731    AtomicBoolean fault = new AtomicBoolean(true);
732
733    public FaultyFileSystem() {
734      super(new LocalFileSystem());
735      System.err.println("Creating faulty!");
736    }
737
738    @Override
739    public FSDataOutputStream create(Path p) throws IOException {
740      return new FaultyOutputStream(super.create(p), faultPos, fault);
741    }
742
743    @Override
744    public FSDataOutputStream create(Path f, FsPermission permission,
745        boolean overwrite, int bufferSize, short replication, long blockSize,
746        Progressable progress) throws IOException {
747      return new FaultyOutputStream(super.create(f, permission,
748          overwrite, bufferSize, replication, blockSize, progress), faultPos, fault);
749    }
750
751    @Override
752    public FSDataOutputStream createNonRecursive(Path f, boolean overwrite,
753        int bufferSize, short replication, long blockSize, Progressable progress)
754    throws IOException {
755      // Fake it.  Call create instead.  The default implementation throws an IOE
756      // that this is not supported.
757      return create(f, overwrite, bufferSize, replication, blockSize, progress);
758    }
759  }
760
761  static class FaultyOutputStream extends FSDataOutputStream {
762    volatile long faultPos = Long.MAX_VALUE;
763    private final AtomicBoolean fault;
764
765    public FaultyOutputStream(FSDataOutputStream out, long faultPos, final AtomicBoolean fault)
766    throws IOException {
767      super(out, null);
768      this.faultPos = faultPos;
769      this.fault = fault;
770    }
771
772    @Override
773    public synchronized void write(byte[] buf, int offset, int length) throws IOException {
774      System.err.println("faulty stream write at pos " + getPos());
775      injectFault();
776      super.write(buf, offset, length);
777    }
778
779    private void injectFault() throws IOException {
780      if (this.fault.get() && getPos() >= faultPos) {
781        throw new IOException("Fault injected");
782      }
783    }
784  }
785
786  private static void flushStore(HStore store, long id) throws IOException {
787    StoreFlushContext storeFlushCtx = store.createFlushContext(id, FlushLifeCycleTracker.DUMMY);
788    storeFlushCtx.prepare();
789    storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
790    storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
791  }
792
793  /**
794   * Generate a list of KeyValues for testing based on given parameters
795   * @param timestamps
796   * @param numRows
797   * @param qualifier
798   * @param family
799   * @return the rows key-value list
800   */
801  List<Cell> getKeyValueSet(long[] timestamps, int numRows,
802      byte[] qualifier, byte[] family) {
803    List<Cell> kvList = new ArrayList<>();
804    for (int i=1;i<=numRows;i++) {
805      byte[] b = Bytes.toBytes(i);
806      for (long timestamp: timestamps) {
807        kvList.add(new KeyValue(b, family, qualifier, timestamp, b));
808      }
809    }
810    return kvList;
811  }
812
813  /**
814   * Test to ensure correctness when using Stores with multiple timestamps
815   * @throws IOException
816   */
817  @Test
818  public void testMultipleTimestamps() throws IOException {
819    int numRows = 1;
820    long[] timestamps1 = new long[] {1,5,10,20};
821    long[] timestamps2 = new long[] {30,80};
822
823    init(this.name.getMethodName());
824
825    List<Cell> kvList1 = getKeyValueSet(timestamps1,numRows, qf1, family);
826    for (Cell kv : kvList1) {
827      this.store.add(kv, null);
828    }
829
830    this.store.snapshot();
831    flushStore(store, id++);
832
833    List<Cell> kvList2 = getKeyValueSet(timestamps2,numRows, qf1, family);
834    for(Cell kv : kvList2) {
835      this.store.add(kv, null);
836    }
837
838    List<Cell> result;
839    Get get = new Get(Bytes.toBytes(1));
840    get.addColumn(family,qf1);
841
842    get.setTimeRange(0,15);
843    result = HBaseTestingUtility.getFromStoreFile(store, get);
844    assertTrue(result.size()>0);
845
846    get.setTimeRange(40,90);
847    result = HBaseTestingUtility.getFromStoreFile(store, get);
848    assertTrue(result.size()>0);
849
850    get.setTimeRange(10,45);
851    result = HBaseTestingUtility.getFromStoreFile(store, get);
852    assertTrue(result.size()>0);
853
854    get.setTimeRange(80,145);
855    result = HBaseTestingUtility.getFromStoreFile(store, get);
856    assertTrue(result.size()>0);
857
858    get.setTimeRange(1,2);
859    result = HBaseTestingUtility.getFromStoreFile(store, get);
860    assertTrue(result.size()>0);
861
862    get.setTimeRange(90,200);
863    result = HBaseTestingUtility.getFromStoreFile(store, get);
864    assertTrue(result.size()==0);
865  }
866
867  /**
868   * Test for HBASE-3492 - Test split on empty colfam (no store files).
869   *
870   * @throws IOException When the IO operations fail.
871   */
872  @Test
873  public void testSplitWithEmptyColFam() throws IOException {
874    init(this.name.getMethodName());
875    assertFalse(store.getSplitPoint().isPresent());
876  }
877
878  @Test
879  public void testStoreUsesConfigurationFromHcdAndHtd() throws Exception {
880    final String CONFIG_KEY = "hbase.regionserver.thread.compaction.throttle";
881    long anyValue = 10;
882
883    // We'll check that it uses correct config and propagates it appropriately by going thru
884    // the simplest "real" path I can find - "throttleCompaction", which just checks whether
885    // a number we pass in is higher than some config value, inside compactionPolicy.
886    Configuration conf = HBaseConfiguration.create();
887    conf.setLong(CONFIG_KEY, anyValue);
888    init(name.getMethodName() + "-xml", conf);
889    assertTrue(store.throttleCompaction(anyValue + 1));
890    assertFalse(store.throttleCompaction(anyValue));
891
892    // HTD overrides XML.
893    --anyValue;
894    init(name.getMethodName() + "-htd", conf, TableDescriptorBuilder
895        .newBuilder(TableName.valueOf(table)).setValue(CONFIG_KEY, Long.toString(anyValue)),
896      ColumnFamilyDescriptorBuilder.of(family));
897    assertTrue(store.throttleCompaction(anyValue + 1));
898    assertFalse(store.throttleCompaction(anyValue));
899
900    // HCD overrides them both.
901    --anyValue;
902    init(name.getMethodName() + "-hcd", conf,
903      TableDescriptorBuilder.newBuilder(TableName.valueOf(table)).setValue(CONFIG_KEY,
904        Long.toString(anyValue)),
905      ColumnFamilyDescriptorBuilder.newBuilder(family).setValue(CONFIG_KEY, Long.toString(anyValue))
906          .build());
907    assertTrue(store.throttleCompaction(anyValue + 1));
908    assertFalse(store.throttleCompaction(anyValue));
909  }
910
911  public static class DummyStoreEngine extends DefaultStoreEngine {
912    public static DefaultCompactor lastCreatedCompactor = null;
913
914    @Override
915    protected void createComponents(Configuration conf, HStore store, CellComparator comparator)
916        throws IOException {
917      super.createComponents(conf, store, comparator);
918      lastCreatedCompactor = this.compactor;
919    }
920  }
921
922  @Test
923  public void testStoreUsesSearchEngineOverride() throws Exception {
924    Configuration conf = HBaseConfiguration.create();
925    conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DummyStoreEngine.class.getName());
926    init(this.name.getMethodName(), conf);
927    assertEquals(DummyStoreEngine.lastCreatedCompactor,
928      this.store.storeEngine.getCompactor());
929  }
930
931  private void addStoreFile() throws IOException {
932    HStoreFile f = this.store.getStorefiles().iterator().next();
933    Path storedir = f.getPath().getParent();
934    long seqid = this.store.getMaxSequenceId().orElse(0L);
935    Configuration c = TEST_UTIL.getConfiguration();
936    FileSystem fs = FileSystem.get(c);
937    HFileContext fileContext = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build();
938    StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c),
939        fs)
940            .withOutputDir(storedir)
941            .withFileContext(fileContext)
942            .build();
943    w.appendMetadata(seqid + 1, false);
944    w.close();
945    LOG.info("Added store file:" + w.getPath());
946  }
947
948  private void archiveStoreFile(int index) throws IOException {
949    Collection<HStoreFile> files = this.store.getStorefiles();
950    HStoreFile sf = null;
951    Iterator<HStoreFile> it = files.iterator();
952    for (int i = 0; i <= index; i++) {
953      sf = it.next();
954    }
955    store.getRegionFileSystem().removeStoreFiles(store.getColumnFamilyName(), Lists.newArrayList(sf));
956  }
957
958  private void closeCompactedFile(int index) throws IOException {
959    Collection<HStoreFile> files =
960        this.store.getStoreEngine().getStoreFileManager().getCompactedfiles();
961    HStoreFile sf = null;
962    Iterator<HStoreFile> it = files.iterator();
963    for (int i = 0; i <= index; i++) {
964      sf = it.next();
965    }
966    sf.closeStoreFile(true);
967    store.getStoreEngine().getStoreFileManager().removeCompactedFiles(Lists.newArrayList(sf));
968  }
969
970  @Test
971  public void testRefreshStoreFiles() throws Exception {
972    init(name.getMethodName());
973
974    assertEquals(0, this.store.getStorefilesCount());
975
976    // Test refreshing store files when no store files are there
977    store.refreshStoreFiles();
978    assertEquals(0, this.store.getStorefilesCount());
979
980    // add some data, flush
981    this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
982    flush(1);
983    assertEquals(1, this.store.getStorefilesCount());
984
985    // add one more file
986    addStoreFile();
987
988    assertEquals(1, this.store.getStorefilesCount());
989    store.refreshStoreFiles();
990    assertEquals(2, this.store.getStorefilesCount());
991
992    // add three more files
993    addStoreFile();
994    addStoreFile();
995    addStoreFile();
996
997    assertEquals(2, this.store.getStorefilesCount());
998    store.refreshStoreFiles();
999    assertEquals(5, this.store.getStorefilesCount());
1000
1001    closeCompactedFile(0);
1002    archiveStoreFile(0);
1003
1004    assertEquals(5, this.store.getStorefilesCount());
1005    store.refreshStoreFiles();
1006    assertEquals(4, this.store.getStorefilesCount());
1007
1008    archiveStoreFile(0);
1009    archiveStoreFile(1);
1010    archiveStoreFile(2);
1011
1012    assertEquals(4, this.store.getStorefilesCount());
1013    store.refreshStoreFiles();
1014    assertEquals(1, this.store.getStorefilesCount());
1015
1016    archiveStoreFile(0);
1017    store.refreshStoreFiles();
1018    assertEquals(0, this.store.getStorefilesCount());
1019  }
1020
1021  @Test
1022  public void testRefreshStoreFilesNotChanged() throws IOException {
1023    init(name.getMethodName());
1024
1025    assertEquals(0, this.store.getStorefilesCount());
1026
1027    // add some data, flush
1028    this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
1029    flush(1);
1030    // add one more file
1031    addStoreFile();
1032
1033    HStore spiedStore = spy(store);
1034
1035    // call first time after files changed
1036    spiedStore.refreshStoreFiles();
1037    assertEquals(2, this.store.getStorefilesCount());
1038    verify(spiedStore, times(1)).replaceStoreFiles(any(), any());
1039
1040    // call second time
1041    spiedStore.refreshStoreFiles();
1042
1043    //ensure that replaceStoreFiles is not called if files are not refreshed
1044    verify(spiedStore, times(0)).replaceStoreFiles(null, null);
1045  }
1046
1047  private long countMemStoreScanner(StoreScanner scanner) {
1048    if (scanner.currentScanners == null) {
1049      return 0;
1050    }
1051    return scanner.currentScanners.stream()
1052            .filter(s -> !s.isFileScanner())
1053            .count();
1054  }
1055
1056  @Test
1057  public void testNumberOfMemStoreScannersAfterFlush() throws IOException {
1058    long seqId = 100;
1059    long timestamp = System.currentTimeMillis();
1060    Cell cell0 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family)
1061        .setQualifier(qf1).setTimestamp(timestamp).setType(Cell.Type.Put)
1062        .setValue(qf1).build();
1063    PrivateCellUtil.setSequenceId(cell0, seqId);
1064    testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Collections.emptyList());
1065
1066    Cell cell1 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family)
1067        .setQualifier(qf2).setTimestamp(timestamp).setType(Cell.Type.Put)
1068        .setValue(qf1).build();
1069    PrivateCellUtil.setSequenceId(cell1, seqId);
1070    testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1));
1071
1072    seqId = 101;
1073    timestamp = System.currentTimeMillis();
1074    Cell cell2 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row2).setFamily(family)
1075        .setQualifier(qf2).setTimestamp(timestamp).setType(Cell.Type.Put)
1076        .setValue(qf1).build();
1077    PrivateCellUtil.setSequenceId(cell2, seqId);
1078    testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1, cell2));
1079  }
1080
1081  private void testNumberOfMemStoreScannersAfterFlush(List<Cell> inputCellsBeforeSnapshot,
1082      List<Cell> inputCellsAfterSnapshot) throws IOException {
1083    init(this.name.getMethodName() + "-" + inputCellsBeforeSnapshot.size());
1084    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1085    long seqId = Long.MIN_VALUE;
1086    for (Cell c : inputCellsBeforeSnapshot) {
1087      quals.add(CellUtil.cloneQualifier(c));
1088      seqId = Math.max(seqId, c.getSequenceId());
1089    }
1090    for (Cell c : inputCellsAfterSnapshot) {
1091      quals.add(CellUtil.cloneQualifier(c));
1092      seqId = Math.max(seqId, c.getSequenceId());
1093    }
1094    inputCellsBeforeSnapshot.forEach(c -> store.add(c, null));
1095    StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY);
1096    storeFlushCtx.prepare();
1097    inputCellsAfterSnapshot.forEach(c -> store.add(c, null));
1098    int numberOfMemScannersBeforeFlush = inputCellsAfterSnapshot.isEmpty() ? 1 : 2;
1099    try (StoreScanner s = (StoreScanner) store.getScanner(new Scan(), quals, seqId)) {
1100      // snapshot + active (if inputCellsAfterSnapshot isn't empty)
1101      assertEquals(numberOfMemScannersBeforeFlush, countMemStoreScanner(s));
1102      storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1103      storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1104      // snapshot has no data after flush
1105      int numberOfMemScannersAfterFlush = inputCellsAfterSnapshot.isEmpty() ? 0 : 1;
1106      boolean more;
1107      int cellCount = 0;
1108      do {
1109        List<Cell> cells = new ArrayList<>();
1110        more = s.next(cells);
1111        cellCount += cells.size();
1112        assertEquals(more ? numberOfMemScannersAfterFlush : 0, countMemStoreScanner(s));
1113      } while (more);
1114      assertEquals("The number of cells added before snapshot is " + inputCellsBeforeSnapshot.size()
1115          + ", The number of cells added after snapshot is " + inputCellsAfterSnapshot.size(),
1116          inputCellsBeforeSnapshot.size() + inputCellsAfterSnapshot.size(), cellCount);
1117      // the current scanners is cleared
1118      assertEquals(0, countMemStoreScanner(s));
1119    }
1120  }
1121
1122  private Cell createCell(byte[] qualifier, long ts, long sequenceId, byte[] value)
1123      throws IOException {
1124    return createCell(row, qualifier, ts, sequenceId, value);
1125  }
1126
1127  private Cell createCell(byte[] row, byte[] qualifier, long ts, long sequenceId, byte[] value)
1128      throws IOException {
1129    Cell c = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family)
1130        .setQualifier(qualifier).setTimestamp(ts).setType(Cell.Type.Put)
1131        .setValue(value).build();
1132    PrivateCellUtil.setSequenceId(c, sequenceId);
1133    return c;
1134  }
1135
1136  @Test
1137  public void testFlushBeforeCompletingScanWoFilter() throws IOException, InterruptedException {
1138    final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false);
1139    final int expectedSize = 3;
1140    testFlushBeforeCompletingScan(new MyListHook() {
1141      @Override
1142      public void hook(int currentSize) {
1143        if (currentSize == expectedSize - 1) {
1144          try {
1145            flushStore(store, id++);
1146            timeToGoNextRow.set(true);
1147          } catch (IOException e) {
1148            throw new RuntimeException(e);
1149          }
1150        }
1151      }
1152    }, new FilterBase() {
1153      @Override
1154      public Filter.ReturnCode filterCell(final Cell c) throws IOException {
1155        return ReturnCode.INCLUDE;
1156      }
1157    }, expectedSize);
1158  }
1159
1160  @Test
1161  public void testFlushBeforeCompletingScanWithFilter() throws IOException, InterruptedException {
1162    final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false);
1163    final int expectedSize = 2;
1164    testFlushBeforeCompletingScan(new MyListHook() {
1165      @Override
1166      public void hook(int currentSize) {
1167        if (currentSize == expectedSize - 1) {
1168          try {
1169            flushStore(store, id++);
1170            timeToGoNextRow.set(true);
1171          } catch (IOException e) {
1172            throw new RuntimeException(e);
1173          }
1174        }
1175      }
1176    }, new FilterBase() {
1177      @Override
1178      public Filter.ReturnCode filterCell(final Cell c) throws IOException {
1179        if (timeToGoNextRow.get()) {
1180          timeToGoNextRow.set(false);
1181          return ReturnCode.NEXT_ROW;
1182        } else {
1183          return ReturnCode.INCLUDE;
1184        }
1185      }
1186    }, expectedSize);
1187  }
1188
1189  @Test
1190  public void testFlushBeforeCompletingScanWithFilterHint() throws IOException,
1191      InterruptedException {
1192    final AtomicBoolean timeToGetHint = new AtomicBoolean(false);
1193    final int expectedSize = 2;
1194    testFlushBeforeCompletingScan(new MyListHook() {
1195      @Override
1196      public void hook(int currentSize) {
1197        if (currentSize == expectedSize - 1) {
1198          try {
1199            flushStore(store, id++);
1200            timeToGetHint.set(true);
1201          } catch (IOException e) {
1202            throw new RuntimeException(e);
1203          }
1204        }
1205      }
1206    }, new FilterBase() {
1207      @Override
1208      public Filter.ReturnCode filterCell(final Cell c) throws IOException {
1209        if (timeToGetHint.get()) {
1210          timeToGetHint.set(false);
1211          return Filter.ReturnCode.SEEK_NEXT_USING_HINT;
1212        } else {
1213          return Filter.ReturnCode.INCLUDE;
1214        }
1215      }
1216      @Override
1217      public Cell getNextCellHint(Cell currentCell) throws IOException {
1218        return currentCell;
1219      }
1220    }, expectedSize);
1221  }
1222
1223  private void testFlushBeforeCompletingScan(MyListHook hook, Filter filter, int expectedSize)
1224          throws IOException, InterruptedException {
1225    Configuration conf = HBaseConfiguration.create();
1226    byte[] r0 = Bytes.toBytes("row0");
1227    byte[] r1 = Bytes.toBytes("row1");
1228    byte[] r2 = Bytes.toBytes("row2");
1229    byte[] value0 = Bytes.toBytes("value0");
1230    byte[] value1 = Bytes.toBytes("value1");
1231    byte[] value2 = Bytes.toBytes("value2");
1232    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1233    long ts = EnvironmentEdgeManager.currentTime();
1234    long seqId = 100;
1235    init(name.getMethodName(), conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)),
1236      ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(1).build(),
1237      new MyStoreHook() {
1238        @Override
1239        public long getSmallestReadPoint(HStore store) {
1240          return seqId + 3;
1241        }
1242      });
1243    // The cells having the value0 won't be flushed to disk because the value of max version is 1
1244    store.add(createCell(r0, qf1, ts, seqId, value0), memStoreSizing);
1245    store.add(createCell(r0, qf2, ts, seqId, value0), memStoreSizing);
1246    store.add(createCell(r0, qf3, ts, seqId, value0), memStoreSizing);
1247    store.add(createCell(r1, qf1, ts + 1, seqId + 1, value1), memStoreSizing);
1248    store.add(createCell(r1, qf2, ts + 1, seqId + 1, value1), memStoreSizing);
1249    store.add(createCell(r1, qf3, ts + 1, seqId + 1, value1), memStoreSizing);
1250    store.add(createCell(r2, qf1, ts + 2, seqId + 2, value2), memStoreSizing);
1251    store.add(createCell(r2, qf2, ts + 2, seqId + 2, value2), memStoreSizing);
1252    store.add(createCell(r2, qf3, ts + 2, seqId + 2, value2), memStoreSizing);
1253    store.add(createCell(r1, qf1, ts + 3, seqId + 3, value1), memStoreSizing);
1254    store.add(createCell(r1, qf2, ts + 3, seqId + 3, value1), memStoreSizing);
1255    store.add(createCell(r1, qf3, ts + 3, seqId + 3, value1), memStoreSizing);
1256    List<Cell> myList = new MyList<>(hook);
1257    Scan scan = new Scan()
1258            .withStartRow(r1)
1259            .setFilter(filter);
1260    try (InternalScanner scanner = (InternalScanner) store.getScanner(
1261          scan, null, seqId + 3)){
1262      // r1
1263      scanner.next(myList);
1264      assertEquals(expectedSize, myList.size());
1265      for (Cell c : myList) {
1266        byte[] actualValue = CellUtil.cloneValue(c);
1267        assertTrue("expected:" + Bytes.toStringBinary(value1)
1268          + ", actual:" + Bytes.toStringBinary(actualValue)
1269          , Bytes.equals(actualValue, value1));
1270      }
1271      List<Cell> normalList = new ArrayList<>(3);
1272      // r2
1273      scanner.next(normalList);
1274      assertEquals(3, normalList.size());
1275      for (Cell c : normalList) {
1276        byte[] actualValue = CellUtil.cloneValue(c);
1277        assertTrue("expected:" + Bytes.toStringBinary(value2)
1278          + ", actual:" + Bytes.toStringBinary(actualValue)
1279          , Bytes.equals(actualValue, value2));
1280      }
1281    }
1282  }
1283
1284  @Test
1285  public void testCreateScannerAndSnapshotConcurrently() throws IOException, InterruptedException {
1286    Configuration conf = HBaseConfiguration.create();
1287    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore.class.getName());
1288    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
1289        .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
1290    byte[] value = Bytes.toBytes("value");
1291    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1292    long ts = EnvironmentEdgeManager.currentTime();
1293    long seqId = 100;
1294    // older data whihc shouldn't be "seen" by client
1295    store.add(createCell(qf1, ts, seqId, value), memStoreSizing);
1296    store.add(createCell(qf2, ts, seqId, value), memStoreSizing);
1297    store.add(createCell(qf3, ts, seqId, value), memStoreSizing);
1298    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1299    quals.add(qf1);
1300    quals.add(qf2);
1301    quals.add(qf3);
1302    StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY);
1303    MyCompactingMemStore.START_TEST.set(true);
1304    Runnable flush = () -> {
1305      // this is blocked until we create first scanner from pipeline and snapshot -- phase (1/5)
1306      // recreate the active memstore -- phase (4/5)
1307      storeFlushCtx.prepare();
1308    };
1309    ExecutorService service = Executors.newSingleThreadExecutor();
1310    service.submit(flush);
1311    // we get scanner from pipeline and snapshot but they are empty. -- phase (2/5)
1312    // this is blocked until we recreate the active memstore -- phase (3/5)
1313    // we get scanner from active memstore but it is empty -- phase (5/5)
1314    InternalScanner scanner = (InternalScanner) store.getScanner(
1315          new Scan(new Get(row)), quals, seqId + 1);
1316    service.shutdown();
1317    service.awaitTermination(20, TimeUnit.SECONDS);
1318    try {
1319      try {
1320        List<Cell> results = new ArrayList<>();
1321        scanner.next(results);
1322        assertEquals(3, results.size());
1323        for (Cell c : results) {
1324          byte[] actualValue = CellUtil.cloneValue(c);
1325          assertTrue("expected:" + Bytes.toStringBinary(value)
1326            + ", actual:" + Bytes.toStringBinary(actualValue)
1327            , Bytes.equals(actualValue, value));
1328        }
1329      } finally {
1330        scanner.close();
1331      }
1332    } finally {
1333      MyCompactingMemStore.START_TEST.set(false);
1334      storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1335      storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1336    }
1337  }
1338
1339  @Test
1340  public void testScanWithDoubleFlush() throws IOException {
1341    Configuration conf = HBaseConfiguration.create();
1342    // Initialize region
1343    MyStore myStore = initMyStore(name.getMethodName(), conf, new MyStoreHook(){
1344      @Override
1345      public void getScanners(MyStore store) throws IOException {
1346        final long tmpId = id++;
1347        ExecutorService s = Executors.newSingleThreadExecutor();
1348        s.submit(() -> {
1349          try {
1350            // flush the store before storescanner updates the scanners from store.
1351            // The current data will be flushed into files, and the memstore will
1352            // be clear.
1353            // -- phase (4/4)
1354            flushStore(store, tmpId);
1355          }catch (IOException ex) {
1356            throw new RuntimeException(ex);
1357          }
1358        });
1359        s.shutdown();
1360        try {
1361          // wait for the flush, the thread will be blocked in HStore#notifyChangedReadersObservers.
1362          s.awaitTermination(3, TimeUnit.SECONDS);
1363        } catch (InterruptedException ex) {
1364        }
1365      }
1366    });
1367    byte[] oldValue = Bytes.toBytes("oldValue");
1368    byte[] currentValue = Bytes.toBytes("currentValue");
1369    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1370    long ts = EnvironmentEdgeManager.currentTime();
1371    long seqId = 100;
1372    // older data whihc shouldn't be "seen" by client
1373    myStore.add(createCell(qf1, ts, seqId, oldValue), memStoreSizing);
1374    myStore.add(createCell(qf2, ts, seqId, oldValue), memStoreSizing);
1375    myStore.add(createCell(qf3, ts, seqId, oldValue), memStoreSizing);
1376    long snapshotId = id++;
1377    // push older data into snapshot -- phase (1/4)
1378    StoreFlushContext storeFlushCtx = store.createFlushContext(snapshotId, FlushLifeCycleTracker
1379        .DUMMY);
1380    storeFlushCtx.prepare();
1381
1382    // insert current data into active -- phase (2/4)
1383    myStore.add(createCell(qf1, ts + 1, seqId + 1, currentValue), memStoreSizing);
1384    myStore.add(createCell(qf2, ts + 1, seqId + 1, currentValue), memStoreSizing);
1385    myStore.add(createCell(qf3, ts + 1, seqId + 1, currentValue), memStoreSizing);
1386    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1387    quals.add(qf1);
1388    quals.add(qf2);
1389    quals.add(qf3);
1390    try (InternalScanner scanner = (InternalScanner) myStore.getScanner(
1391        new Scan(new Get(row)), quals, seqId + 1)) {
1392      // complete the flush -- phase (3/4)
1393      storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1394      storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1395
1396      List<Cell> results = new ArrayList<>();
1397      scanner.next(results);
1398      assertEquals(3, results.size());
1399      for (Cell c : results) {
1400        byte[] actualValue = CellUtil.cloneValue(c);
1401        assertTrue("expected:" + Bytes.toStringBinary(currentValue)
1402          + ", actual:" + Bytes.toStringBinary(actualValue)
1403          , Bytes.equals(actualValue, currentValue));
1404      }
1405    }
1406  }
1407
1408  @Test
1409  public void testReclaimChunkWhenScaning() throws IOException {
1410    init("testReclaimChunkWhenScaning");
1411    long ts = EnvironmentEdgeManager.currentTime();
1412    long seqId = 100;
1413    byte[] value = Bytes.toBytes("value");
1414    // older data whihc shouldn't be "seen" by client
1415    store.add(createCell(qf1, ts, seqId, value), null);
1416    store.add(createCell(qf2, ts, seqId, value), null);
1417    store.add(createCell(qf3, ts, seqId, value), null);
1418    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1419    quals.add(qf1);
1420    quals.add(qf2);
1421    quals.add(qf3);
1422    try (InternalScanner scanner = (InternalScanner) store.getScanner(
1423        new Scan(new Get(row)), quals, seqId)) {
1424      List<Cell> results = new MyList<>(size -> {
1425        switch (size) {
1426          // 1) we get the first cell (qf1)
1427          // 2) flush the data to have StoreScanner update inner scanners
1428          // 3) the chunk will be reclaimed after updaing
1429          case 1:
1430            try {
1431              flushStore(store, id++);
1432            } catch (IOException e) {
1433              throw new RuntimeException(e);
1434            }
1435            break;
1436          // 1) we get the second cell (qf2)
1437          // 2) add some cell to fill some byte into the chunk (we have only one chunk)
1438          case 2:
1439            try {
1440              byte[] newValue = Bytes.toBytes("newValue");
1441              // older data whihc shouldn't be "seen" by client
1442              store.add(createCell(qf1, ts + 1, seqId + 1, newValue), null);
1443              store.add(createCell(qf2, ts + 1, seqId + 1, newValue), null);
1444              store.add(createCell(qf3, ts + 1, seqId + 1, newValue), null);
1445            } catch (IOException e) {
1446              throw new RuntimeException(e);
1447            }
1448            break;
1449          default:
1450            break;
1451        }
1452      });
1453      scanner.next(results);
1454      assertEquals(3, results.size());
1455      for (Cell c : results) {
1456        byte[] actualValue = CellUtil.cloneValue(c);
1457        assertTrue("expected:" + Bytes.toStringBinary(value)
1458          + ", actual:" + Bytes.toStringBinary(actualValue)
1459          , Bytes.equals(actualValue, value));
1460      }
1461    }
1462  }
1463
1464  /**
1465   * If there are two running InMemoryFlushRunnable, the later InMemoryFlushRunnable
1466   * may change the versionedList. And the first InMemoryFlushRunnable will use the chagned
1467   * versionedList to remove the corresponding segments.
1468   * In short, there will be some segements which isn't in merge are removed.
1469   * @throws IOException
1470   * @throws InterruptedException
1471   */
1472  @Test
1473  public void testRunDoubleMemStoreCompactors() throws IOException, InterruptedException {
1474    int flushSize = 500;
1475    Configuration conf = HBaseConfiguration.create();
1476    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStoreWithCustomCompactor.class.getName());
1477    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.25);
1478    MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.set(0);
1479    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushSize));
1480    // Set the lower threshold to invoke the "MERGE" policy
1481    conf.set(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, String.valueOf(0));
1482    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
1483        .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
1484    byte[] value = Bytes.toBytes("thisisavarylargevalue");
1485    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1486    long ts = EnvironmentEdgeManager.currentTime();
1487    long seqId = 100;
1488    // older data whihc shouldn't be "seen" by client
1489    store.add(createCell(qf1, ts, seqId, value), memStoreSizing);
1490    store.add(createCell(qf2, ts, seqId, value), memStoreSizing);
1491    store.add(createCell(qf3, ts, seqId, value), memStoreSizing);
1492    assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
1493    StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY);
1494    storeFlushCtx.prepare();
1495    // This shouldn't invoke another in-memory flush because the first compactor thread
1496    // hasn't accomplished the in-memory compaction.
1497    store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing);
1498    store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing);
1499    store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing);
1500    assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
1501    //okay. Let the compaction be completed
1502    MyMemStoreCompactor.START_COMPACTOR_LATCH.countDown();
1503    CompactingMemStore mem = (CompactingMemStore) ((HStore)store).memstore;
1504    while (mem.isMemStoreFlushingInMemory()) {
1505      TimeUnit.SECONDS.sleep(1);
1506    }
1507    // This should invoke another in-memory flush.
1508    store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing);
1509    store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing);
1510    store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing);
1511    assertEquals(2, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
1512    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
1513      String.valueOf(TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE));
1514    storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1515    storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1516  }
1517
1518  @Test
1519  public void testAge() throws IOException {
1520    long currentTime = System.currentTimeMillis();
1521    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
1522    edge.setValue(currentTime);
1523    EnvironmentEdgeManager.injectEdge(edge);
1524    Configuration conf = TEST_UTIL.getConfiguration();
1525    ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(family);
1526    initHRegion(name.getMethodName(), conf,
1527      TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), hcd, null, false);
1528    HStore store = new HStore(region, hcd, conf, false) {
1529
1530      @Override
1531      protected StoreEngine<?, ?, ?, ?> createStoreEngine(HStore store, Configuration conf,
1532          CellComparator kvComparator) throws IOException {
1533        List<HStoreFile> storefiles =
1534            Arrays.asList(mockStoreFile(currentTime - 10), mockStoreFile(currentTime - 100),
1535              mockStoreFile(currentTime - 1000), mockStoreFile(currentTime - 10000));
1536        StoreFileManager sfm = mock(StoreFileManager.class);
1537        when(sfm.getStorefiles()).thenReturn(storefiles);
1538        StoreEngine<?, ?, ?, ?> storeEngine = mock(StoreEngine.class);
1539        when(storeEngine.getStoreFileManager()).thenReturn(sfm);
1540        return storeEngine;
1541      }
1542    };
1543    assertEquals(10L, store.getMinStoreFileAge().getAsLong());
1544    assertEquals(10000L, store.getMaxStoreFileAge().getAsLong());
1545    assertEquals((10 + 100 + 1000 + 10000) / 4.0, store.getAvgStoreFileAge().getAsDouble(), 1E-4);
1546  }
1547
1548  private HStoreFile mockStoreFile(long createdTime) {
1549    StoreFileInfo info = mock(StoreFileInfo.class);
1550    when(info.getCreatedTimestamp()).thenReturn(createdTime);
1551    HStoreFile sf = mock(HStoreFile.class);
1552    when(sf.getReader()).thenReturn(mock(StoreFileReader.class));
1553    when(sf.isHFile()).thenReturn(true);
1554    when(sf.getFileInfo()).thenReturn(info);
1555    return sf;
1556  }
1557
1558  private MyStore initMyStore(String methodName, Configuration conf, MyStoreHook hook)
1559      throws IOException {
1560    return (MyStore) init(methodName, conf,
1561      TableDescriptorBuilder.newBuilder(TableName.valueOf(table)),
1562      ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(5).build(), hook);
1563  }
1564
1565  private static class MyStore extends HStore {
1566    private final MyStoreHook hook;
1567
1568    MyStore(final HRegion region, final ColumnFamilyDescriptor family, final Configuration
1569        confParam, MyStoreHook hook, boolean switchToPread) throws IOException {
1570      super(region, family, confParam, false);
1571      this.hook = hook;
1572    }
1573
1574    @Override
1575    public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks,
1576        boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,
1577        boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt,
1578        boolean includeMemstoreScanner) throws IOException {
1579      hook.getScanners(this);
1580      return super.getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true,
1581        stopRow, false, readPt, includeMemstoreScanner);
1582    }
1583
1584    @Override
1585    public long getSmallestReadPoint() {
1586      return hook.getSmallestReadPoint(this);
1587    }
1588  }
1589
1590  private abstract static class MyStoreHook {
1591
1592    void getScanners(MyStore store) throws IOException {
1593    }
1594
1595    long getSmallestReadPoint(HStore store) {
1596      return store.getHRegion().getSmallestReadPoint();
1597    }
1598  }
1599
1600  @Test
1601  public void testSwitchingPreadtoStreamParallelyWithCompactionDischarger() throws Exception {
1602    Configuration conf = HBaseConfiguration.create();
1603    conf.set("hbase.hstore.engine.class", DummyStoreEngine.class.getName());
1604    conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 0);
1605    // Set the lower threshold to invoke the "MERGE" policy
1606    MyStore store = initMyStore(name.getMethodName(), conf, new MyStoreHook() {});
1607    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1608    long ts = System.currentTimeMillis();
1609    long seqID = 1L;
1610    // Add some data to the region and do some flushes
1611    for (int i = 1; i < 10; i++) {
1612      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1613        memStoreSizing);
1614    }
1615    // flush them
1616    flushStore(store, seqID);
1617    for (int i = 11; i < 20; i++) {
1618      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1619        memStoreSizing);
1620    }
1621    // flush them
1622    flushStore(store, seqID);
1623    for (int i = 21; i < 30; i++) {
1624      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1625        memStoreSizing);
1626    }
1627    // flush them
1628    flushStore(store, seqID);
1629
1630    assertEquals(3, store.getStorefilesCount());
1631    Scan scan = new Scan();
1632    scan.addFamily(family);
1633    Collection<HStoreFile> storefiles2 = store.getStorefiles();
1634    ArrayList<HStoreFile> actualStorefiles = Lists.newArrayList(storefiles2);
1635    StoreScanner storeScanner =
1636        (StoreScanner) store.getScanner(scan, scan.getFamilyMap().get(family), Long.MAX_VALUE);
1637    // get the current heap
1638    KeyValueHeap heap = storeScanner.heap;
1639    // create more store files
1640    for (int i = 31; i < 40; i++) {
1641      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1642        memStoreSizing);
1643    }
1644    // flush them
1645    flushStore(store, seqID);
1646
1647    for (int i = 41; i < 50; i++) {
1648      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1649        memStoreSizing);
1650    }
1651    // flush them
1652    flushStore(store, seqID);
1653    storefiles2 = store.getStorefiles();
1654    ArrayList<HStoreFile> actualStorefiles1 = Lists.newArrayList(storefiles2);
1655    actualStorefiles1.removeAll(actualStorefiles);
1656    // Do compaction
1657    MyThread thread = new MyThread(storeScanner);
1658    thread.start();
1659    store.replaceStoreFiles(actualStorefiles, actualStorefiles1);
1660    thread.join();
1661    KeyValueHeap heap2 = thread.getHeap();
1662    assertFalse(heap.equals(heap2));
1663  }
1664
1665  @Test
1666  public void testSpaceQuotaChangeAfterReplacement() throws IOException {
1667    final TableName tn = TableName.valueOf(name.getMethodName());
1668    init(name.getMethodName());
1669
1670    RegionSizeStoreImpl sizeStore = new RegionSizeStoreImpl();
1671
1672    HStoreFile sf1 = mockStoreFileWithLength(1024L);
1673    HStoreFile sf2 = mockStoreFileWithLength(2048L);
1674    HStoreFile sf3 = mockStoreFileWithLength(4096L);
1675    HStoreFile sf4 = mockStoreFileWithLength(8192L);
1676
1677    RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tn).setStartKey(Bytes.toBytes("a"))
1678        .setEndKey(Bytes.toBytes("b")).build();
1679
1680    // Compacting two files down to one, reducing size
1681    sizeStore.put(regionInfo, 1024L + 4096L);
1682    store.updateSpaceQuotaAfterFileReplacement(
1683        sizeStore, regionInfo, Arrays.asList(sf1, sf3), Arrays.asList(sf2));
1684
1685    assertEquals(2048L, sizeStore.getRegionSize(regionInfo).getSize());
1686
1687    // The same file length in and out should have no change
1688    store.updateSpaceQuotaAfterFileReplacement(
1689        sizeStore, regionInfo, Arrays.asList(sf2), Arrays.asList(sf2));
1690
1691    assertEquals(2048L, sizeStore.getRegionSize(regionInfo).getSize());
1692
1693    // Increase the total size used
1694    store.updateSpaceQuotaAfterFileReplacement(
1695        sizeStore, regionInfo, Arrays.asList(sf2), Arrays.asList(sf3));
1696
1697    assertEquals(4096L, sizeStore.getRegionSize(regionInfo).getSize());
1698
1699    RegionInfo regionInfo2 = RegionInfoBuilder.newBuilder(tn).setStartKey(Bytes.toBytes("b"))
1700        .setEndKey(Bytes.toBytes("c")).build();
1701    store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo2, null, Arrays.asList(sf4));
1702
1703    assertEquals(8192L, sizeStore.getRegionSize(regionInfo2).getSize());
1704  }
1705
1706  @Test
1707  public void testHFileContextSetWithCFAndTable() throws Exception {
1708    init(this.name.getMethodName());
1709    StoreFileWriter writer = store.createWriterInTmp(10000L,
1710        Compression.Algorithm.NONE, false, true, false, true);
1711    HFileContext hFileContext = writer.getHFileWriter().getFileContext();
1712    assertArrayEquals(family, hFileContext.getColumnFamily());
1713    assertArrayEquals(table, hFileContext.getTableName());
1714  }
1715
1716  private HStoreFile mockStoreFileWithLength(long length) {
1717    HStoreFile sf = mock(HStoreFile.class);
1718    StoreFileReader sfr = mock(StoreFileReader.class);
1719    when(sf.isHFile()).thenReturn(true);
1720    when(sf.getReader()).thenReturn(sfr);
1721    when(sfr.length()).thenReturn(length);
1722    return sf;
1723  }
1724
1725  private static class MyThread extends Thread {
1726    private StoreScanner scanner;
1727    private KeyValueHeap heap;
1728
1729    public MyThread(StoreScanner scanner) {
1730      this.scanner = scanner;
1731    }
1732
1733    public KeyValueHeap getHeap() {
1734      return this.heap;
1735    }
1736
1737    @Override
1738    public void run() {
1739      scanner.trySwitchToStreamRead();
1740      heap = scanner.heap;
1741    }
1742  }
1743
1744  private static class MyMemStoreCompactor extends MemStoreCompactor {
1745    private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0);
1746    private static final CountDownLatch START_COMPACTOR_LATCH = new CountDownLatch(1);
1747    public MyMemStoreCompactor(CompactingMemStore compactingMemStore, MemoryCompactionPolicy
1748        compactionPolicy) throws IllegalArgumentIOException {
1749      super(compactingMemStore, compactionPolicy);
1750    }
1751
1752    @Override
1753    public boolean start() throws IOException {
1754      boolean isFirst = RUNNER_COUNT.getAndIncrement() == 0;
1755      if (isFirst) {
1756        try {
1757          START_COMPACTOR_LATCH.await();
1758          return super.start();
1759        } catch (InterruptedException ex) {
1760          throw new RuntimeException(ex);
1761        }
1762      }
1763      return super.start();
1764    }
1765  }
1766
1767  public static class MyCompactingMemStoreWithCustomCompactor extends CompactingMemStore {
1768    private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0);
1769    public MyCompactingMemStoreWithCustomCompactor(Configuration conf, CellComparatorImpl c,
1770        HStore store, RegionServicesForStores regionServices,
1771        MemoryCompactionPolicy compactionPolicy) throws IOException {
1772      super(conf, c, store, regionServices, compactionPolicy);
1773    }
1774
1775    @Override
1776    protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy)
1777        throws IllegalArgumentIOException {
1778      return new MyMemStoreCompactor(this, compactionPolicy);
1779    }
1780
1781    @Override
1782    protected boolean setInMemoryCompactionFlag() {
1783      boolean rval = super.setInMemoryCompactionFlag();
1784      if (rval) {
1785        RUNNER_COUNT.incrementAndGet();
1786        if (LOG.isDebugEnabled()) {
1787          LOG.debug("runner count: " + RUNNER_COUNT.get());
1788        }
1789      }
1790      return rval;
1791    }
1792  }
1793
1794  public static class MyCompactingMemStore extends CompactingMemStore {
1795    private static final AtomicBoolean START_TEST = new AtomicBoolean(false);
1796    private final CountDownLatch getScannerLatch = new CountDownLatch(1);
1797    private final CountDownLatch snapshotLatch = new CountDownLatch(1);
1798    public MyCompactingMemStore(Configuration conf, CellComparatorImpl c,
1799        HStore store, RegionServicesForStores regionServices,
1800        MemoryCompactionPolicy compactionPolicy) throws IOException {
1801      super(conf, c, store, regionServices, compactionPolicy);
1802    }
1803
1804    @Override
1805    protected List<KeyValueScanner> createList(int capacity) {
1806      if (START_TEST.get()) {
1807        try {
1808          getScannerLatch.countDown();
1809          snapshotLatch.await();
1810        } catch (InterruptedException e) {
1811          throw new RuntimeException(e);
1812        }
1813      }
1814      return new ArrayList<>(capacity);
1815    }
1816    @Override
1817    protected void pushActiveToPipeline(MutableSegment active) {
1818      if (START_TEST.get()) {
1819        try {
1820          getScannerLatch.await();
1821        } catch (InterruptedException e) {
1822          throw new RuntimeException(e);
1823        }
1824      }
1825
1826      super.pushActiveToPipeline(active);
1827      if (START_TEST.get()) {
1828        snapshotLatch.countDown();
1829      }
1830    }
1831  }
1832
1833  interface MyListHook {
1834    void hook(int currentSize);
1835  }
1836
1837  private static class MyList<T> implements List<T> {
1838    private final List<T> delegatee = new ArrayList<>();
1839    private final MyListHook hookAtAdd;
1840    MyList(final MyListHook hookAtAdd) {
1841      this.hookAtAdd = hookAtAdd;
1842    }
1843    @Override
1844    public int size() {return delegatee.size();}
1845
1846    @Override
1847    public boolean isEmpty() {return delegatee.isEmpty();}
1848
1849    @Override
1850    public boolean contains(Object o) {return delegatee.contains(o);}
1851
1852    @Override
1853    public Iterator<T> iterator() {return delegatee.iterator();}
1854
1855    @Override
1856    public Object[] toArray() {return delegatee.toArray();}
1857
1858    @Override
1859    public <R> R[] toArray(R[] a) {return delegatee.toArray(a);}
1860
1861    @Override
1862    public boolean add(T e) {
1863      hookAtAdd.hook(size());
1864      return delegatee.add(e);
1865    }
1866
1867    @Override
1868    public boolean remove(Object o) {return delegatee.remove(o);}
1869
1870    @Override
1871    public boolean containsAll(Collection<?> c) {return delegatee.containsAll(c);}
1872
1873    @Override
1874    public boolean addAll(Collection<? extends T> c) {return delegatee.addAll(c);}
1875
1876    @Override
1877    public boolean addAll(int index, Collection<? extends T> c) {return delegatee.addAll(index, c);}
1878
1879    @Override
1880    public boolean removeAll(Collection<?> c) {return delegatee.removeAll(c);}
1881
1882    @Override
1883    public boolean retainAll(Collection<?> c) {return delegatee.retainAll(c);}
1884
1885    @Override
1886    public void clear() {delegatee.clear();}
1887
1888    @Override
1889    public T get(int index) {return delegatee.get(index);}
1890
1891    @Override
1892    public T set(int index, T element) {return delegatee.set(index, element);}
1893
1894    @Override
1895    public void add(int index, T element) {delegatee.add(index, element);}
1896
1897    @Override
1898    public T remove(int index) {return delegatee.remove(index);}
1899
1900    @Override
1901    public int indexOf(Object o) {return delegatee.indexOf(o);}
1902
1903    @Override
1904    public int lastIndexOf(Object o) {return delegatee.lastIndexOf(o);}
1905
1906    @Override
1907    public ListIterator<T> listIterator() {return delegatee.listIterator();}
1908
1909    @Override
1910    public ListIterator<T> listIterator(int index) {return delegatee.listIterator(index);}
1911
1912    @Override
1913    public List<T> subList(int fromIndex, int toIndex) {return delegatee.subList(fromIndex, toIndex);}
1914  }
1915}