001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023import static org.junit.Assert.fail;
024import static org.mockito.ArgumentMatchers.any;
025import static org.mockito.Mockito.mock;
026import static org.mockito.Mockito.spy;
027import static org.mockito.Mockito.times;
028import static org.mockito.Mockito.verify;
029import static org.mockito.Mockito.when;
030
031import java.io.IOException;
032import java.lang.ref.SoftReference;
033import java.security.PrivilegedExceptionAction;
034import java.util.ArrayList;
035import java.util.Arrays;
036import java.util.Collection;
037import java.util.Collections;
038import java.util.Iterator;
039import java.util.List;
040import java.util.ListIterator;
041import java.util.NavigableSet;
042import java.util.TreeSet;
043import java.util.concurrent.ConcurrentSkipListSet;
044import java.util.concurrent.CountDownLatch;
045import java.util.concurrent.ExecutorService;
046import java.util.concurrent.Executors;
047import java.util.concurrent.ThreadPoolExecutor;
048import java.util.concurrent.TimeUnit;
049import java.util.concurrent.atomic.AtomicBoolean;
050import java.util.concurrent.atomic.AtomicInteger;
051import org.apache.hadoop.conf.Configuration;
052import org.apache.hadoop.fs.FSDataOutputStream;
053import org.apache.hadoop.fs.FileStatus;
054import org.apache.hadoop.fs.FileSystem;
055import org.apache.hadoop.fs.FilterFileSystem;
056import org.apache.hadoop.fs.LocalFileSystem;
057import org.apache.hadoop.fs.Path;
058import org.apache.hadoop.fs.permission.FsPermission;
059import org.apache.hadoop.hbase.Cell;
060import org.apache.hadoop.hbase.CellBuilderFactory;
061import org.apache.hadoop.hbase.CellBuilderType;
062import org.apache.hadoop.hbase.CellComparator;
063import org.apache.hadoop.hbase.CellComparatorImpl;
064import org.apache.hadoop.hbase.CellUtil;
065import org.apache.hadoop.hbase.HBaseClassTestRule;
066import org.apache.hadoop.hbase.HBaseConfiguration;
067import org.apache.hadoop.hbase.HBaseTestingUtility;
068import org.apache.hadoop.hbase.HConstants;
069import org.apache.hadoop.hbase.KeyValue;
070import org.apache.hadoop.hbase.MemoryCompactionPolicy;
071import org.apache.hadoop.hbase.NamespaceDescriptor;
072import org.apache.hadoop.hbase.PrivateCellUtil;
073import org.apache.hadoop.hbase.TableName;
074import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
075import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
076import org.apache.hadoop.hbase.client.Get;
077import org.apache.hadoop.hbase.client.RegionInfo;
078import org.apache.hadoop.hbase.client.RegionInfoBuilder;
079import org.apache.hadoop.hbase.client.Scan;
080import org.apache.hadoop.hbase.client.TableDescriptor;
081import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
082import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
083import org.apache.hadoop.hbase.filter.Filter;
084import org.apache.hadoop.hbase.filter.FilterBase;
085import org.apache.hadoop.hbase.io.compress.Compression;
086import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
087import org.apache.hadoop.hbase.io.hfile.CacheConfig;
088import org.apache.hadoop.hbase.io.hfile.HFile;
089import org.apache.hadoop.hbase.io.hfile.HFileContext;
090import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
091import org.apache.hadoop.hbase.monitoring.MonitoredTask;
092import org.apache.hadoop.hbase.quotas.RegionSizeStoreImpl;
093import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
094import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
095import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
096import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
097import org.apache.hadoop.hbase.security.User;
098import org.apache.hadoop.hbase.testclassification.MediumTests;
099import org.apache.hadoop.hbase.testclassification.RegionServerTests;
100import org.apache.hadoop.hbase.util.Bytes;
101import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
102import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
103import org.apache.hadoop.hbase.util.FSUtils;
104import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
105import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
106import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
107import org.apache.hadoop.hbase.wal.WALFactory;
108import org.apache.hadoop.util.Progressable;
109import org.junit.After;
110import org.junit.AfterClass;
111import org.junit.Before;
112import org.junit.ClassRule;
113import org.junit.Rule;
114import org.junit.Test;
115import org.junit.experimental.categories.Category;
116import org.junit.rules.TestName;
117import org.mockito.Mockito;
118import org.slf4j.Logger;
119import org.slf4j.LoggerFactory;
120
121import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
122
123/**
124 * Test class for the HStore
125 */
126@Category({ RegionServerTests.class, MediumTests.class })
127public class TestHStore {
128
129  @ClassRule
130  public static final HBaseClassTestRule CLASS_RULE =
131      HBaseClassTestRule.forClass(TestHStore.class);
132
133  private static final Logger LOG = LoggerFactory.getLogger(TestHStore.class);
134  @Rule
135  public TestName name = new TestName();
136
137  HRegion region;
138  HStore store;
139  byte [] table = Bytes.toBytes("table");
140  byte [] family = Bytes.toBytes("family");
141
142  byte [] row = Bytes.toBytes("row");
143  byte [] row2 = Bytes.toBytes("row2");
144  byte [] qf1 = Bytes.toBytes("qf1");
145  byte [] qf2 = Bytes.toBytes("qf2");
146  byte [] qf3 = Bytes.toBytes("qf3");
147  byte [] qf4 = Bytes.toBytes("qf4");
148  byte [] qf5 = Bytes.toBytes("qf5");
149  byte [] qf6 = Bytes.toBytes("qf6");
150
151  NavigableSet<byte[]> qualifiers = new ConcurrentSkipListSet<>(Bytes.BYTES_COMPARATOR);
152
153  List<Cell> expected = new ArrayList<>();
154  List<Cell> result = new ArrayList<>();
155
156  long id = System.currentTimeMillis();
157  Get get = new Get(row);
158
159  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
160  private static final String DIR = TEST_UTIL.getDataTestDir("TestStore").toString();
161
162
163  /**
164   * Setup
165   * @throws IOException
166   */
167  @Before
168  public void setUp() throws IOException {
169    qualifiers.add(qf1);
170    qualifiers.add(qf3);
171    qualifiers.add(qf5);
172
173    Iterator<byte[]> iter = qualifiers.iterator();
174    while(iter.hasNext()){
175      byte [] next = iter.next();
176      expected.add(new KeyValue(row, family, next, 1, (byte[])null));
177      get.addColumn(family, next);
178    }
179  }
180
181  private void init(String methodName) throws IOException {
182    init(methodName, TEST_UTIL.getConfiguration());
183  }
184
185  private HStore init(String methodName, Configuration conf) throws IOException {
186    // some of the tests write 4 versions and then flush
187    // (with HBASE-4241, lower versions are collected on flush)
188    return init(methodName, conf,
189      ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(4).build());
190  }
191
192  private HStore init(String methodName, Configuration conf, ColumnFamilyDescriptor hcd)
193      throws IOException {
194    return init(methodName, conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), hcd);
195  }
196
197  private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder,
198      ColumnFamilyDescriptor hcd) throws IOException {
199    return init(methodName, conf, builder, hcd, null);
200  }
201
202  private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder,
203      ColumnFamilyDescriptor hcd, MyStoreHook hook) throws IOException {
204    return init(methodName, conf, builder, hcd, hook, false);
205  }
206
207  private void initHRegion(String methodName, Configuration conf, TableDescriptorBuilder builder,
208      ColumnFamilyDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException {
209    TableDescriptor htd = builder.setColumnFamily(hcd).build();
210    Path basedir = new Path(DIR + methodName);
211    Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName());
212    final Path logdir = new Path(basedir, AbstractFSWALProvider.getWALDirectoryName(methodName));
213
214    FileSystem fs = FileSystem.get(conf);
215
216    fs.delete(logdir, true);
217    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false,
218      MemStoreLABImpl.CHUNK_SIZE_DEFAULT, 1, 0, null);
219    RegionInfo info = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
220    Configuration walConf = new Configuration(conf);
221    FSUtils.setRootDir(walConf, basedir);
222    WALFactory wals = new WALFactory(walConf, methodName);
223    region = new HRegion(new HRegionFileSystem(conf, fs, tableDir, info), wals.getWAL(info), conf,
224        htd, null);
225    region.regionServicesForStores = Mockito.spy(region.regionServicesForStores);
226    ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
227    Mockito.when(region.regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool);
228  }
229
230  private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder,
231      ColumnFamilyDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException {
232    initHRegion(methodName, conf, builder, hcd, hook, switchToPread);
233    if (hook == null) {
234      store = new HStore(region, hcd, conf, false);
235    } else {
236      store = new MyStore(region, hcd, conf, hook, switchToPread);
237    }
238    return store;
239  }
240
241  /**
242   * Test we do not lose data if we fail a flush and then close.
243   * Part of HBase-10466
244   * @throws Exception
245   */
246  @Test
247  public void testFlushSizeSizing() throws Exception {
248    LOG.info("Setting up a faulty file system that cannot write in " + this.name.getMethodName());
249    final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
250    // Only retry once.
251    conf.setInt("hbase.hstore.flush.retries.number", 1);
252    User user = User.createUserForTesting(conf, this.name.getMethodName(),
253      new String[]{"foo"});
254    // Inject our faulty LocalFileSystem
255    conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
256    user.runAs(new PrivilegedExceptionAction<Object>() {
257      @Override
258      public Object run() throws Exception {
259        // Make sure it worked (above is sensitive to caching details in hadoop core)
260        FileSystem fs = FileSystem.get(conf);
261        assertEquals(FaultyFileSystem.class, fs.getClass());
262        FaultyFileSystem ffs = (FaultyFileSystem)fs;
263
264        // Initialize region
265        init(name.getMethodName(), conf);
266
267        MemStoreSize mss = store.memstore.getFlushableSize();
268        assertEquals(0, mss.getDataSize());
269        LOG.info("Adding some data");
270        MemStoreSizing kvSize = new NonThreadSafeMemStoreSizing();
271        store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), kvSize);
272        // add the heap size of active (mutable) segment
273        kvSize.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0, 0);
274        mss = store.memstore.getFlushableSize();
275        assertEquals(kvSize.getMemStoreSize(), mss);
276        // Flush.  Bug #1 from HBASE-10466.  Make sure size calculation on failed flush is right.
277        try {
278          LOG.info("Flushing");
279          flushStore(store, id++);
280          fail("Didn't bubble up IOE!");
281        } catch (IOException ioe) {
282          assertTrue(ioe.getMessage().contains("Fault injected"));
283        }
284        // due to snapshot, change mutable to immutable segment
285        kvSize.incMemStoreSize(0,
286          CSLMImmutableSegment.DEEP_OVERHEAD_CSLM - MutableSegment.DEEP_OVERHEAD, 0, 0);
287        mss = store.memstore.getFlushableSize();
288        assertEquals(kvSize.getMemStoreSize(), mss);
289        MemStoreSizing kvSize2 = new NonThreadSafeMemStoreSizing();
290        store.add(new KeyValue(row, family, qf2, 2, (byte[]) null), kvSize2);
291        kvSize2.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0, 0);
292        // Even though we add a new kv, we expect the flushable size to be 'same' since we have
293        // not yet cleared the snapshot -- the above flush failed.
294        assertEquals(kvSize.getMemStoreSize(), mss);
295        ffs.fault.set(false);
296        flushStore(store, id++);
297        mss = store.memstore.getFlushableSize();
298        // Size should be the foreground kv size.
299        assertEquals(kvSize2.getMemStoreSize(), mss);
300        flushStore(store, id++);
301        mss = store.memstore.getFlushableSize();
302        assertEquals(0, mss.getDataSize());
303        assertEquals(MutableSegment.DEEP_OVERHEAD, mss.getHeapSize());
304        return null;
305      }
306    });
307  }
308
309  /**
310   * Verify that compression and data block encoding are respected by the
311   * Store.createWriterInTmp() method, used on store flush.
312   */
313  @Test
314  public void testCreateWriter() throws Exception {
315    Configuration conf = HBaseConfiguration.create();
316    FileSystem fs = FileSystem.get(conf);
317
318    ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.newBuilder(family)
319        .setCompressionType(Compression.Algorithm.GZ).setDataBlockEncoding(DataBlockEncoding.DIFF)
320        .build();
321    init(name.getMethodName(), conf, hcd);
322
323    // Test createWriterInTmp()
324    StoreFileWriter writer =
325        store.createWriterInTmp(4, hcd.getCompressionType(), false, true, false, false);
326    Path path = writer.getPath();
327    writer.append(new KeyValue(row, family, qf1, Bytes.toBytes(1)));
328    writer.append(new KeyValue(row, family, qf2, Bytes.toBytes(2)));
329    writer.append(new KeyValue(row2, family, qf1, Bytes.toBytes(3)));
330    writer.append(new KeyValue(row2, family, qf2, Bytes.toBytes(4)));
331    writer.close();
332
333    // Verify that compression and encoding settings are respected
334    HFile.Reader reader = HFile.createReader(fs, path, new CacheConfig(conf), true, conf);
335    assertEquals(hcd.getCompressionType(), reader.getCompressionAlgorithm());
336    assertEquals(hcd.getDataBlockEncoding(), reader.getDataBlockEncoding());
337    reader.close();
338  }
339
340  @Test
341  public void testDeleteExpiredStoreFiles() throws Exception {
342    testDeleteExpiredStoreFiles(0);
343    testDeleteExpiredStoreFiles(1);
344  }
345
346  /*
347   * @param minVersions the MIN_VERSIONS for the column family
348   */
349  public void testDeleteExpiredStoreFiles(int minVersions) throws Exception {
350    int storeFileNum = 4;
351    int ttl = 4;
352    IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
353    EnvironmentEdgeManagerTestHelper.injectEdge(edge);
354
355    Configuration conf = HBaseConfiguration.create();
356    // Enable the expired store file deletion
357    conf.setBoolean("hbase.store.delete.expired.storefile", true);
358    // Set the compaction threshold higher to avoid normal compactions.
359    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 5);
360
361    init(name.getMethodName() + "-" + minVersions, conf, ColumnFamilyDescriptorBuilder
362        .newBuilder(family).setMinVersions(minVersions).setTimeToLive(ttl).build());
363
364    long storeTtl = this.store.getScanInfo().getTtl();
365    long sleepTime = storeTtl / storeFileNum;
366    long timeStamp;
367    // There are 4 store files and the max time stamp difference among these
368    // store files will be (this.store.ttl / storeFileNum)
369    for (int i = 1; i <= storeFileNum; i++) {
370      LOG.info("Adding some data for the store file #" + i);
371      timeStamp = EnvironmentEdgeManager.currentTime();
372      this.store.add(new KeyValue(row, family, qf1, timeStamp, (byte[]) null), null);
373      this.store.add(new KeyValue(row, family, qf2, timeStamp, (byte[]) null), null);
374      this.store.add(new KeyValue(row, family, qf3, timeStamp, (byte[]) null), null);
375      flush(i);
376      edge.incrementTime(sleepTime);
377    }
378
379    // Verify the total number of store files
380    assertEquals(storeFileNum, this.store.getStorefiles().size());
381
382     // Each call will find one expired store file and delete it before compaction happens.
383     // There will be no compaction due to threshold above. Last file will not be replaced.
384    for (int i = 1; i <= storeFileNum - 1; i++) {
385      // verify the expired store file.
386      assertFalse(this.store.requestCompaction().isPresent());
387      Collection<HStoreFile> sfs = this.store.getStorefiles();
388      // Ensure i files are gone.
389      if (minVersions == 0) {
390        assertEquals(storeFileNum - i, sfs.size());
391        // Ensure only non-expired files remain.
392        for (HStoreFile sf : sfs) {
393          assertTrue(sf.getReader().getMaxTimestamp() >= (edge.currentTime() - storeTtl));
394        }
395      } else {
396        assertEquals(storeFileNum, sfs.size());
397      }
398      // Let the next store file expired.
399      edge.incrementTime(sleepTime);
400    }
401    assertFalse(this.store.requestCompaction().isPresent());
402
403    Collection<HStoreFile> sfs = this.store.getStorefiles();
404    // Assert the last expired file is not removed.
405    if (minVersions == 0) {
406      assertEquals(1, sfs.size());
407    }
408    long ts = sfs.iterator().next().getReader().getMaxTimestamp();
409    assertTrue(ts < (edge.currentTime() - storeTtl));
410
411    for (HStoreFile sf : sfs) {
412      sf.closeStoreFile(true);
413    }
414  }
415
416  @Test
417  public void testLowestModificationTime() throws Exception {
418    Configuration conf = HBaseConfiguration.create();
419    FileSystem fs = FileSystem.get(conf);
420    // Initialize region
421    init(name.getMethodName(), conf);
422
423    int storeFileNum = 4;
424    for (int i = 1; i <= storeFileNum; i++) {
425      LOG.info("Adding some data for the store file #"+i);
426      this.store.add(new KeyValue(row, family, qf1, i, (byte[])null), null);
427      this.store.add(new KeyValue(row, family, qf2, i, (byte[])null), null);
428      this.store.add(new KeyValue(row, family, qf3, i, (byte[])null), null);
429      flush(i);
430    }
431    // after flush; check the lowest time stamp
432    long lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
433    long lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
434    assertEquals(lowestTimeStampFromManager,lowestTimeStampFromFS);
435
436    // after compact; check the lowest time stamp
437    store.compact(store.requestCompaction().get(), NoLimitThroughputController.INSTANCE, null);
438    lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
439    lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
440    assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS);
441  }
442
443  private static long getLowestTimeStampFromFS(FileSystem fs,
444      final Collection<HStoreFile> candidates) throws IOException {
445    long minTs = Long.MAX_VALUE;
446    if (candidates.isEmpty()) {
447      return minTs;
448    }
449    Path[] p = new Path[candidates.size()];
450    int i = 0;
451    for (HStoreFile sf : candidates) {
452      p[i] = sf.getPath();
453      ++i;
454    }
455
456    FileStatus[] stats = fs.listStatus(p);
457    if (stats == null || stats.length == 0) {
458      return minTs;
459    }
460    for (FileStatus s : stats) {
461      minTs = Math.min(minTs, s.getModificationTime());
462    }
463    return minTs;
464  }
465
466  //////////////////////////////////////////////////////////////////////////////
467  // Get tests
468  //////////////////////////////////////////////////////////////////////////////
469
470  private static final int BLOCKSIZE_SMALL = 8192;
471  /**
472   * Test for hbase-1686.
473   * @throws IOException
474   */
475  @Test
476  public void testEmptyStoreFile() throws IOException {
477    init(this.name.getMethodName());
478    // Write a store file.
479    this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
480    this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null);
481    flush(1);
482    // Now put in place an empty store file.  Its a little tricky.  Have to
483    // do manually with hacked in sequence id.
484    HStoreFile f = this.store.getStorefiles().iterator().next();
485    Path storedir = f.getPath().getParent();
486    long seqid = f.getMaxSequenceId();
487    Configuration c = HBaseConfiguration.create();
488    FileSystem fs = FileSystem.get(c);
489    HFileContext meta = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build();
490    StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c),
491        fs)
492            .withOutputDir(storedir)
493            .withFileContext(meta)
494            .build();
495    w.appendMetadata(seqid + 1, false);
496    w.close();
497    this.store.close();
498    // Reopen it... should pick up two files
499    this.store =
500        new HStore(this.store.getHRegion(), this.store.getColumnFamilyDescriptor(), c, false);
501    assertEquals(2, this.store.getStorefilesCount());
502
503    result = HBaseTestingUtility.getFromStoreFile(store,
504        get.getRow(),
505        qualifiers);
506    assertEquals(1, result.size());
507  }
508
509  /**
510   * Getting data from memstore only
511   * @throws IOException
512   */
513  @Test
514  public void testGet_FromMemStoreOnly() throws IOException {
515    init(this.name.getMethodName());
516
517    //Put data in memstore
518    this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
519    this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null);
520    this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null), null);
521    this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null), null);
522    this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null), null);
523    this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null), null);
524
525    //Get
526    result = HBaseTestingUtility.getFromStoreFile(store,
527        get.getRow(), qualifiers);
528
529    //Compare
530    assertCheck();
531  }
532
533  @Test
534  public void testTimeRangeIfSomeCellsAreDroppedInFlush() throws IOException {
535    testTimeRangeIfSomeCellsAreDroppedInFlush(1);
536    testTimeRangeIfSomeCellsAreDroppedInFlush(3);
537    testTimeRangeIfSomeCellsAreDroppedInFlush(5);
538  }
539
540  private void testTimeRangeIfSomeCellsAreDroppedInFlush(int maxVersion) throws IOException {
541    init(this.name.getMethodName(), TEST_UTIL.getConfiguration(),
542    ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(maxVersion).build());
543    long currentTs = 100;
544    long minTs = currentTs;
545    // the extra cell won't be flushed to disk,
546    // so the min of timerange will be different between memStore and hfile.
547    for (int i = 0; i != (maxVersion + 1); ++i) {
548      this.store.add(new KeyValue(row, family, qf1, ++currentTs, (byte[])null), null);
549      if (i == 1) {
550        minTs = currentTs;
551      }
552    }
553    flushStore(store, id++);
554
555    Collection<HStoreFile> files = store.getStorefiles();
556    assertEquals(1, files.size());
557    HStoreFile f = files.iterator().next();
558    f.initReader();
559    StoreFileReader reader = f.getReader();
560    assertEquals(minTs, reader.timeRange.getMin());
561    assertEquals(currentTs, reader.timeRange.getMax());
562  }
563
564  /**
565   * Getting data from files only
566   * @throws IOException
567   */
568  @Test
569  public void testGet_FromFilesOnly() throws IOException {
570    init(this.name.getMethodName());
571
572    //Put data in memstore
573    this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
574    this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null);
575    //flush
576    flush(1);
577
578    //Add more data
579    this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null), null);
580    this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null), null);
581    //flush
582    flush(2);
583
584    //Add more data
585    this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null), null);
586    this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null), null);
587    //flush
588    flush(3);
589
590    //Get
591    result = HBaseTestingUtility.getFromStoreFile(store,
592        get.getRow(),
593        qualifiers);
594    //this.store.get(get, qualifiers, result);
595
596    //Need to sort the result since multiple files
597    Collections.sort(result, CellComparatorImpl.COMPARATOR);
598
599    //Compare
600    assertCheck();
601  }
602
603  /**
604   * Getting data from memstore and files
605   * @throws IOException
606   */
607  @Test
608  public void testGet_FromMemStoreAndFiles() throws IOException {
609    init(this.name.getMethodName());
610
611    //Put data in memstore
612    this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
613    this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null);
614    //flush
615    flush(1);
616
617    //Add more data
618    this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null), null);
619    this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null), null);
620    //flush
621    flush(2);
622
623    //Add more data
624    this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null), null);
625    this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null), null);
626
627    //Get
628    result = HBaseTestingUtility.getFromStoreFile(store,
629        get.getRow(), qualifiers);
630
631    //Need to sort the result since multiple files
632    Collections.sort(result, CellComparatorImpl.COMPARATOR);
633
634    //Compare
635    assertCheck();
636  }
637
638  private void flush(int storeFilessize) throws IOException{
639    this.store.snapshot();
640    flushStore(store, id++);
641    assertEquals(storeFilessize, this.store.getStorefiles().size());
642    assertEquals(0, ((AbstractMemStore)this.store.memstore).getActive().getCellsCount());
643  }
644
645  private void assertCheck() {
646    assertEquals(expected.size(), result.size());
647    for(int i=0; i<expected.size(); i++) {
648      assertEquals(expected.get(i), result.get(i));
649    }
650  }
651
652  @After
653  public void tearDown() throws Exception {
654    EnvironmentEdgeManagerTestHelper.reset();
655    if (store != null) {
656      try {
657        store.close();
658      } catch (IOException e) {
659      }
660      store = null;
661    }
662    if (region != null) {
663      region.close();
664      region = null;
665    }
666  }
667
668  @AfterClass
669  public static void tearDownAfterClass() throws IOException {
670    TEST_UTIL.cleanupTestDir();
671  }
672
673  @Test
674  public void testHandleErrorsInFlush() throws Exception {
675    LOG.info("Setting up a faulty file system that cannot write");
676
677    final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
678    User user = User.createUserForTesting(conf,
679        "testhandleerrorsinflush", new String[]{"foo"});
680    // Inject our faulty LocalFileSystem
681    conf.setClass("fs.file.impl", FaultyFileSystem.class,
682        FileSystem.class);
683    user.runAs(new PrivilegedExceptionAction<Object>() {
684      @Override
685      public Object run() throws Exception {
686        // Make sure it worked (above is sensitive to caching details in hadoop core)
687        FileSystem fs = FileSystem.get(conf);
688        assertEquals(FaultyFileSystem.class, fs.getClass());
689
690        // Initialize region
691        init(name.getMethodName(), conf);
692
693        LOG.info("Adding some data");
694        store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
695        store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null);
696        store.add(new KeyValue(row, family, qf3, 1, (byte[])null), null);
697
698        LOG.info("Before flush, we should have no files");
699
700        Collection<StoreFileInfo> files =
701          store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName());
702        assertEquals(0, files != null ? files.size() : 0);
703
704        //flush
705        try {
706          LOG.info("Flushing");
707          flush(1);
708          fail("Didn't bubble up IOE!");
709        } catch (IOException ioe) {
710          assertTrue(ioe.getMessage().contains("Fault injected"));
711        }
712
713        LOG.info("After failed flush, we should still have no files!");
714        files = store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName());
715        assertEquals(0, files != null ? files.size() : 0);
716        store.getHRegion().getWAL().close();
717        return null;
718      }
719    });
720    FileSystem.closeAllForUGI(user.getUGI());
721  }
722
723  /**
724   * Faulty file system that will fail if you write past its fault position the FIRST TIME
725   * only; thereafter it will succeed.  Used by {@link TestHRegion} too.
726   */
727  static class FaultyFileSystem extends FilterFileSystem {
728    List<SoftReference<FaultyOutputStream>> outStreams = new ArrayList<>();
729    private long faultPos = 200;
730    AtomicBoolean fault = new AtomicBoolean(true);
731
732    public FaultyFileSystem() {
733      super(new LocalFileSystem());
734      System.err.println("Creating faulty!");
735    }
736
737    @Override
738    public FSDataOutputStream create(Path p) throws IOException {
739      return new FaultyOutputStream(super.create(p), faultPos, fault);
740    }
741
742    @Override
743    public FSDataOutputStream create(Path f, FsPermission permission,
744        boolean overwrite, int bufferSize, short replication, long blockSize,
745        Progressable progress) throws IOException {
746      return new FaultyOutputStream(super.create(f, permission,
747          overwrite, bufferSize, replication, blockSize, progress), faultPos, fault);
748    }
749
750    @Override
751    public FSDataOutputStream createNonRecursive(Path f, boolean overwrite,
752        int bufferSize, short replication, long blockSize, Progressable progress)
753    throws IOException {
754      // Fake it.  Call create instead.  The default implementation throws an IOE
755      // that this is not supported.
756      return create(f, overwrite, bufferSize, replication, blockSize, progress);
757    }
758  }
759
760  static class FaultyOutputStream extends FSDataOutputStream {
761    volatile long faultPos = Long.MAX_VALUE;
762    private final AtomicBoolean fault;
763
764    public FaultyOutputStream(FSDataOutputStream out, long faultPos, final AtomicBoolean fault)
765    throws IOException {
766      super(out, null);
767      this.faultPos = faultPos;
768      this.fault = fault;
769    }
770
771    @Override
772    public synchronized void write(byte[] buf, int offset, int length) throws IOException {
773      System.err.println("faulty stream write at pos " + getPos());
774      injectFault();
775      super.write(buf, offset, length);
776    }
777
778    private void injectFault() throws IOException {
779      if (this.fault.get() && getPos() >= faultPos) {
780        throw new IOException("Fault injected");
781      }
782    }
783  }
784
785  private static void flushStore(HStore store, long id) throws IOException {
786    StoreFlushContext storeFlushCtx = store.createFlushContext(id, FlushLifeCycleTracker.DUMMY);
787    storeFlushCtx.prepare();
788    storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
789    storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
790  }
791
792  /**
793   * Generate a list of KeyValues for testing based on given parameters
794   * @param timestamps
795   * @param numRows
796   * @param qualifier
797   * @param family
798   * @return the rows key-value list
799   */
800  List<Cell> getKeyValueSet(long[] timestamps, int numRows,
801      byte[] qualifier, byte[] family) {
802    List<Cell> kvList = new ArrayList<>();
803    for (int i=1;i<=numRows;i++) {
804      byte[] b = Bytes.toBytes(i);
805      for (long timestamp: timestamps) {
806        kvList.add(new KeyValue(b, family, qualifier, timestamp, b));
807      }
808    }
809    return kvList;
810  }
811
812  /**
813   * Test to ensure correctness when using Stores with multiple timestamps
814   * @throws IOException
815   */
816  @Test
817  public void testMultipleTimestamps() throws IOException {
818    int numRows = 1;
819    long[] timestamps1 = new long[] {1,5,10,20};
820    long[] timestamps2 = new long[] {30,80};
821
822    init(this.name.getMethodName());
823
824    List<Cell> kvList1 = getKeyValueSet(timestamps1,numRows, qf1, family);
825    for (Cell kv : kvList1) {
826      this.store.add(kv, null);
827    }
828
829    this.store.snapshot();
830    flushStore(store, id++);
831
832    List<Cell> kvList2 = getKeyValueSet(timestamps2,numRows, qf1, family);
833    for(Cell kv : kvList2) {
834      this.store.add(kv, null);
835    }
836
837    List<Cell> result;
838    Get get = new Get(Bytes.toBytes(1));
839    get.addColumn(family,qf1);
840
841    get.setTimeRange(0,15);
842    result = HBaseTestingUtility.getFromStoreFile(store, get);
843    assertTrue(result.size()>0);
844
845    get.setTimeRange(40,90);
846    result = HBaseTestingUtility.getFromStoreFile(store, get);
847    assertTrue(result.size()>0);
848
849    get.setTimeRange(10,45);
850    result = HBaseTestingUtility.getFromStoreFile(store, get);
851    assertTrue(result.size()>0);
852
853    get.setTimeRange(80,145);
854    result = HBaseTestingUtility.getFromStoreFile(store, get);
855    assertTrue(result.size()>0);
856
857    get.setTimeRange(1,2);
858    result = HBaseTestingUtility.getFromStoreFile(store, get);
859    assertTrue(result.size()>0);
860
861    get.setTimeRange(90,200);
862    result = HBaseTestingUtility.getFromStoreFile(store, get);
863    assertTrue(result.size()==0);
864  }
865
866  /**
867   * Test for HBASE-3492 - Test split on empty colfam (no store files).
868   *
869   * @throws IOException When the IO operations fail.
870   */
871  @Test
872  public void testSplitWithEmptyColFam() throws IOException {
873    init(this.name.getMethodName());
874    assertFalse(store.getSplitPoint().isPresent());
875    store.getHRegion().forceSplit(null);
876    assertFalse(store.getSplitPoint().isPresent());
877    store.getHRegion().clearSplit();
878  }
879
880  @Test
881  public void testStoreUsesConfigurationFromHcdAndHtd() throws Exception {
882    final String CONFIG_KEY = "hbase.regionserver.thread.compaction.throttle";
883    long anyValue = 10;
884
885    // We'll check that it uses correct config and propagates it appropriately by going thru
886    // the simplest "real" path I can find - "throttleCompaction", which just checks whether
887    // a number we pass in is higher than some config value, inside compactionPolicy.
888    Configuration conf = HBaseConfiguration.create();
889    conf.setLong(CONFIG_KEY, anyValue);
890    init(name.getMethodName() + "-xml", conf);
891    assertTrue(store.throttleCompaction(anyValue + 1));
892    assertFalse(store.throttleCompaction(anyValue));
893
894    // HTD overrides XML.
895    --anyValue;
896    init(name.getMethodName() + "-htd", conf, TableDescriptorBuilder
897        .newBuilder(TableName.valueOf(table)).setValue(CONFIG_KEY, Long.toString(anyValue)),
898      ColumnFamilyDescriptorBuilder.of(family));
899    assertTrue(store.throttleCompaction(anyValue + 1));
900    assertFalse(store.throttleCompaction(anyValue));
901
902    // HCD overrides them both.
903    --anyValue;
904    init(name.getMethodName() + "-hcd", conf,
905      TableDescriptorBuilder.newBuilder(TableName.valueOf(table)).setValue(CONFIG_KEY,
906        Long.toString(anyValue)),
907      ColumnFamilyDescriptorBuilder.newBuilder(family).setValue(CONFIG_KEY, Long.toString(anyValue))
908          .build());
909    assertTrue(store.throttleCompaction(anyValue + 1));
910    assertFalse(store.throttleCompaction(anyValue));
911  }
912
913  public static class DummyStoreEngine extends DefaultStoreEngine {
914    public static DefaultCompactor lastCreatedCompactor = null;
915
916    @Override
917    protected void createComponents(Configuration conf, HStore store, CellComparator comparator)
918        throws IOException {
919      super.createComponents(conf, store, comparator);
920      lastCreatedCompactor = this.compactor;
921    }
922  }
923
924  @Test
925  public void testStoreUsesSearchEngineOverride() throws Exception {
926    Configuration conf = HBaseConfiguration.create();
927    conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DummyStoreEngine.class.getName());
928    init(this.name.getMethodName(), conf);
929    assertEquals(DummyStoreEngine.lastCreatedCompactor,
930      this.store.storeEngine.getCompactor());
931  }
932
933  private void addStoreFile() throws IOException {
934    HStoreFile f = this.store.getStorefiles().iterator().next();
935    Path storedir = f.getPath().getParent();
936    long seqid = this.store.getMaxSequenceId().orElse(0L);
937    Configuration c = TEST_UTIL.getConfiguration();
938    FileSystem fs = FileSystem.get(c);
939    HFileContext fileContext = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build();
940    StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c),
941        fs)
942            .withOutputDir(storedir)
943            .withFileContext(fileContext)
944            .build();
945    w.appendMetadata(seqid + 1, false);
946    w.close();
947    LOG.info("Added store file:" + w.getPath());
948  }
949
950  private void archiveStoreFile(int index) throws IOException {
951    Collection<HStoreFile> files = this.store.getStorefiles();
952    HStoreFile sf = null;
953    Iterator<HStoreFile> it = files.iterator();
954    for (int i = 0; i <= index; i++) {
955      sf = it.next();
956    }
957    store.getRegionFileSystem().removeStoreFiles(store.getColumnFamilyName(), Lists.newArrayList(sf));
958  }
959
960  private void closeCompactedFile(int index) throws IOException {
961    Collection<HStoreFile> files =
962        this.store.getStoreEngine().getStoreFileManager().getCompactedfiles();
963    HStoreFile sf = null;
964    Iterator<HStoreFile> it = files.iterator();
965    for (int i = 0; i <= index; i++) {
966      sf = it.next();
967    }
968    sf.closeStoreFile(true);
969    store.getStoreEngine().getStoreFileManager().removeCompactedFiles(Lists.newArrayList(sf));
970  }
971
972  @Test
973  public void testRefreshStoreFiles() throws Exception {
974    init(name.getMethodName());
975
976    assertEquals(0, this.store.getStorefilesCount());
977
978    // Test refreshing store files when no store files are there
979    store.refreshStoreFiles();
980    assertEquals(0, this.store.getStorefilesCount());
981
982    // add some data, flush
983    this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
984    flush(1);
985    assertEquals(1, this.store.getStorefilesCount());
986
987    // add one more file
988    addStoreFile();
989
990    assertEquals(1, this.store.getStorefilesCount());
991    store.refreshStoreFiles();
992    assertEquals(2, this.store.getStorefilesCount());
993
994    // add three more files
995    addStoreFile();
996    addStoreFile();
997    addStoreFile();
998
999    assertEquals(2, this.store.getStorefilesCount());
1000    store.refreshStoreFiles();
1001    assertEquals(5, this.store.getStorefilesCount());
1002
1003    closeCompactedFile(0);
1004    archiveStoreFile(0);
1005
1006    assertEquals(5, this.store.getStorefilesCount());
1007    store.refreshStoreFiles();
1008    assertEquals(4, this.store.getStorefilesCount());
1009
1010    archiveStoreFile(0);
1011    archiveStoreFile(1);
1012    archiveStoreFile(2);
1013
1014    assertEquals(4, this.store.getStorefilesCount());
1015    store.refreshStoreFiles();
1016    assertEquals(1, this.store.getStorefilesCount());
1017
1018    archiveStoreFile(0);
1019    store.refreshStoreFiles();
1020    assertEquals(0, this.store.getStorefilesCount());
1021  }
1022
1023  @Test
1024  public void testRefreshStoreFilesNotChanged() throws IOException {
1025    init(name.getMethodName());
1026
1027    assertEquals(0, this.store.getStorefilesCount());
1028
1029    // add some data, flush
1030    this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
1031    flush(1);
1032    // add one more file
1033    addStoreFile();
1034
1035    HStore spiedStore = spy(store);
1036
1037    // call first time after files changed
1038    spiedStore.refreshStoreFiles();
1039    assertEquals(2, this.store.getStorefilesCount());
1040    verify(spiedStore, times(1)).replaceStoreFiles(any(), any());
1041
1042    // call second time
1043    spiedStore.refreshStoreFiles();
1044
1045    //ensure that replaceStoreFiles is not called if files are not refreshed
1046    verify(spiedStore, times(0)).replaceStoreFiles(null, null);
1047  }
1048
1049  private long countMemStoreScanner(StoreScanner scanner) {
1050    if (scanner.currentScanners == null) {
1051      return 0;
1052    }
1053    return scanner.currentScanners.stream()
1054            .filter(s -> !s.isFileScanner())
1055            .count();
1056  }
1057
1058  @Test
1059  public void testNumberOfMemStoreScannersAfterFlush() throws IOException {
1060    long seqId = 100;
1061    long timestamp = System.currentTimeMillis();
1062    Cell cell0 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family)
1063        .setQualifier(qf1).setTimestamp(timestamp).setType(Cell.Type.Put)
1064        .setValue(qf1).build();
1065    PrivateCellUtil.setSequenceId(cell0, seqId);
1066    testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Collections.emptyList());
1067
1068    Cell cell1 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family)
1069        .setQualifier(qf2).setTimestamp(timestamp).setType(Cell.Type.Put)
1070        .setValue(qf1).build();
1071    PrivateCellUtil.setSequenceId(cell1, seqId);
1072    testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1));
1073
1074    seqId = 101;
1075    timestamp = System.currentTimeMillis();
1076    Cell cell2 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row2).setFamily(family)
1077        .setQualifier(qf2).setTimestamp(timestamp).setType(Cell.Type.Put)
1078        .setValue(qf1).build();
1079    PrivateCellUtil.setSequenceId(cell2, seqId);
1080    testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1, cell2));
1081  }
1082
1083  private void testNumberOfMemStoreScannersAfterFlush(List<Cell> inputCellsBeforeSnapshot,
1084      List<Cell> inputCellsAfterSnapshot) throws IOException {
1085    init(this.name.getMethodName() + "-" + inputCellsBeforeSnapshot.size());
1086    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1087    long seqId = Long.MIN_VALUE;
1088    for (Cell c : inputCellsBeforeSnapshot) {
1089      quals.add(CellUtil.cloneQualifier(c));
1090      seqId = Math.max(seqId, c.getSequenceId());
1091    }
1092    for (Cell c : inputCellsAfterSnapshot) {
1093      quals.add(CellUtil.cloneQualifier(c));
1094      seqId = Math.max(seqId, c.getSequenceId());
1095    }
1096    inputCellsBeforeSnapshot.forEach(c -> store.add(c, null));
1097    StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY);
1098    storeFlushCtx.prepare();
1099    inputCellsAfterSnapshot.forEach(c -> store.add(c, null));
1100    int numberOfMemScannersBeforeFlush = inputCellsAfterSnapshot.isEmpty() ? 1 : 2;
1101    try (StoreScanner s = (StoreScanner) store.getScanner(new Scan(), quals, seqId)) {
1102      // snapshot + active (if inputCellsAfterSnapshot isn't empty)
1103      assertEquals(numberOfMemScannersBeforeFlush, countMemStoreScanner(s));
1104      storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1105      storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1106      // snapshot has no data after flush
1107      int numberOfMemScannersAfterFlush = inputCellsAfterSnapshot.isEmpty() ? 0 : 1;
1108      boolean more;
1109      int cellCount = 0;
1110      do {
1111        List<Cell> cells = new ArrayList<>();
1112        more = s.next(cells);
1113        cellCount += cells.size();
1114        assertEquals(more ? numberOfMemScannersAfterFlush : 0, countMemStoreScanner(s));
1115      } while (more);
1116      assertEquals("The number of cells added before snapshot is " + inputCellsBeforeSnapshot.size()
1117          + ", The number of cells added after snapshot is " + inputCellsAfterSnapshot.size(),
1118          inputCellsBeforeSnapshot.size() + inputCellsAfterSnapshot.size(), cellCount);
1119      // the current scanners is cleared
1120      assertEquals(0, countMemStoreScanner(s));
1121    }
1122  }
1123
1124  private Cell createCell(byte[] qualifier, long ts, long sequenceId, byte[] value)
1125      throws IOException {
1126    return createCell(row, qualifier, ts, sequenceId, value);
1127  }
1128
1129  private Cell createCell(byte[] row, byte[] qualifier, long ts, long sequenceId, byte[] value)
1130      throws IOException {
1131    Cell c = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family)
1132        .setQualifier(qualifier).setTimestamp(ts).setType(Cell.Type.Put)
1133        .setValue(value).build();
1134    PrivateCellUtil.setSequenceId(c, sequenceId);
1135    return c;
1136  }
1137
1138  @Test
1139  public void testFlushBeforeCompletingScanWoFilter() throws IOException, InterruptedException {
1140    final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false);
1141    final int expectedSize = 3;
1142    testFlushBeforeCompletingScan(new MyListHook() {
1143      @Override
1144      public void hook(int currentSize) {
1145        if (currentSize == expectedSize - 1) {
1146          try {
1147            flushStore(store, id++);
1148            timeToGoNextRow.set(true);
1149          } catch (IOException e) {
1150            throw new RuntimeException(e);
1151          }
1152        }
1153      }
1154    }, new FilterBase() {
1155      @Override
1156      public Filter.ReturnCode filterCell(final Cell c) throws IOException {
1157        return ReturnCode.INCLUDE;
1158      }
1159    }, expectedSize);
1160  }
1161
1162  @Test
1163  public void testFlushBeforeCompletingScanWithFilter() throws IOException, InterruptedException {
1164    final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false);
1165    final int expectedSize = 2;
1166    testFlushBeforeCompletingScan(new MyListHook() {
1167      @Override
1168      public void hook(int currentSize) {
1169        if (currentSize == expectedSize - 1) {
1170          try {
1171            flushStore(store, id++);
1172            timeToGoNextRow.set(true);
1173          } catch (IOException e) {
1174            throw new RuntimeException(e);
1175          }
1176        }
1177      }
1178    }, new FilterBase() {
1179      @Override
1180      public Filter.ReturnCode filterCell(final Cell c) throws IOException {
1181        if (timeToGoNextRow.get()) {
1182          timeToGoNextRow.set(false);
1183          return ReturnCode.NEXT_ROW;
1184        } else {
1185          return ReturnCode.INCLUDE;
1186        }
1187      }
1188    }, expectedSize);
1189  }
1190
1191  @Test
1192  public void testFlushBeforeCompletingScanWithFilterHint() throws IOException,
1193      InterruptedException {
1194    final AtomicBoolean timeToGetHint = new AtomicBoolean(false);
1195    final int expectedSize = 2;
1196    testFlushBeforeCompletingScan(new MyListHook() {
1197      @Override
1198      public void hook(int currentSize) {
1199        if (currentSize == expectedSize - 1) {
1200          try {
1201            flushStore(store, id++);
1202            timeToGetHint.set(true);
1203          } catch (IOException e) {
1204            throw new RuntimeException(e);
1205          }
1206        }
1207      }
1208    }, new FilterBase() {
1209      @Override
1210      public Filter.ReturnCode filterCell(final Cell c) throws IOException {
1211        if (timeToGetHint.get()) {
1212          timeToGetHint.set(false);
1213          return Filter.ReturnCode.SEEK_NEXT_USING_HINT;
1214        } else {
1215          return Filter.ReturnCode.INCLUDE;
1216        }
1217      }
1218      @Override
1219      public Cell getNextCellHint(Cell currentCell) throws IOException {
1220        return currentCell;
1221      }
1222    }, expectedSize);
1223  }
1224
1225  private void testFlushBeforeCompletingScan(MyListHook hook, Filter filter, int expectedSize)
1226          throws IOException, InterruptedException {
1227    Configuration conf = HBaseConfiguration.create();
1228    byte[] r0 = Bytes.toBytes("row0");
1229    byte[] r1 = Bytes.toBytes("row1");
1230    byte[] r2 = Bytes.toBytes("row2");
1231    byte[] value0 = Bytes.toBytes("value0");
1232    byte[] value1 = Bytes.toBytes("value1");
1233    byte[] value2 = Bytes.toBytes("value2");
1234    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1235    long ts = EnvironmentEdgeManager.currentTime();
1236    long seqId = 100;
1237    init(name.getMethodName(), conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)),
1238      ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(1).build(),
1239      new MyStoreHook() {
1240        @Override
1241        public long getSmallestReadPoint(HStore store) {
1242          return seqId + 3;
1243        }
1244      });
1245    // The cells having the value0 won't be flushed to disk because the value of max version is 1
1246    store.add(createCell(r0, qf1, ts, seqId, value0), memStoreSizing);
1247    store.add(createCell(r0, qf2, ts, seqId, value0), memStoreSizing);
1248    store.add(createCell(r0, qf3, ts, seqId, value0), memStoreSizing);
1249    store.add(createCell(r1, qf1, ts + 1, seqId + 1, value1), memStoreSizing);
1250    store.add(createCell(r1, qf2, ts + 1, seqId + 1, value1), memStoreSizing);
1251    store.add(createCell(r1, qf3, ts + 1, seqId + 1, value1), memStoreSizing);
1252    store.add(createCell(r2, qf1, ts + 2, seqId + 2, value2), memStoreSizing);
1253    store.add(createCell(r2, qf2, ts + 2, seqId + 2, value2), memStoreSizing);
1254    store.add(createCell(r2, qf3, ts + 2, seqId + 2, value2), memStoreSizing);
1255    store.add(createCell(r1, qf1, ts + 3, seqId + 3, value1), memStoreSizing);
1256    store.add(createCell(r1, qf2, ts + 3, seqId + 3, value1), memStoreSizing);
1257    store.add(createCell(r1, qf3, ts + 3, seqId + 3, value1), memStoreSizing);
1258    List<Cell> myList = new MyList<>(hook);
1259    Scan scan = new Scan()
1260            .withStartRow(r1)
1261            .setFilter(filter);
1262    try (InternalScanner scanner = (InternalScanner) store.getScanner(
1263          scan, null, seqId + 3)){
1264      // r1
1265      scanner.next(myList);
1266      assertEquals(expectedSize, myList.size());
1267      for (Cell c : myList) {
1268        byte[] actualValue = CellUtil.cloneValue(c);
1269        assertTrue("expected:" + Bytes.toStringBinary(value1)
1270          + ", actual:" + Bytes.toStringBinary(actualValue)
1271          , Bytes.equals(actualValue, value1));
1272      }
1273      List<Cell> normalList = new ArrayList<>(3);
1274      // r2
1275      scanner.next(normalList);
1276      assertEquals(3, normalList.size());
1277      for (Cell c : normalList) {
1278        byte[] actualValue = CellUtil.cloneValue(c);
1279        assertTrue("expected:" + Bytes.toStringBinary(value2)
1280          + ", actual:" + Bytes.toStringBinary(actualValue)
1281          , Bytes.equals(actualValue, value2));
1282      }
1283    }
1284  }
1285
1286  @Test
1287  public void testCreateScannerAndSnapshotConcurrently() throws IOException, InterruptedException {
1288    Configuration conf = HBaseConfiguration.create();
1289    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore.class.getName());
1290    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
1291        .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
1292    byte[] value = Bytes.toBytes("value");
1293    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1294    long ts = EnvironmentEdgeManager.currentTime();
1295    long seqId = 100;
1296    // older data whihc shouldn't be "seen" by client
1297    store.add(createCell(qf1, ts, seqId, value), memStoreSizing);
1298    store.add(createCell(qf2, ts, seqId, value), memStoreSizing);
1299    store.add(createCell(qf3, ts, seqId, value), memStoreSizing);
1300    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1301    quals.add(qf1);
1302    quals.add(qf2);
1303    quals.add(qf3);
1304    StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY);
1305    MyCompactingMemStore.START_TEST.set(true);
1306    Runnable flush = () -> {
1307      // this is blocked until we create first scanner from pipeline and snapshot -- phase (1/5)
1308      // recreate the active memstore -- phase (4/5)
1309      storeFlushCtx.prepare();
1310    };
1311    ExecutorService service = Executors.newSingleThreadExecutor();
1312    service.submit(flush);
1313    // we get scanner from pipeline and snapshot but they are empty. -- phase (2/5)
1314    // this is blocked until we recreate the active memstore -- phase (3/5)
1315    // we get scanner from active memstore but it is empty -- phase (5/5)
1316    InternalScanner scanner = (InternalScanner) store.getScanner(
1317          new Scan(new Get(row)), quals, seqId + 1);
1318    service.shutdown();
1319    service.awaitTermination(20, TimeUnit.SECONDS);
1320    try {
1321      try {
1322        List<Cell> results = new ArrayList<>();
1323        scanner.next(results);
1324        assertEquals(3, results.size());
1325        for (Cell c : results) {
1326          byte[] actualValue = CellUtil.cloneValue(c);
1327          assertTrue("expected:" + Bytes.toStringBinary(value)
1328            + ", actual:" + Bytes.toStringBinary(actualValue)
1329            , Bytes.equals(actualValue, value));
1330        }
1331      } finally {
1332        scanner.close();
1333      }
1334    } finally {
1335      MyCompactingMemStore.START_TEST.set(false);
1336      storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1337      storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1338    }
1339  }
1340
1341  @Test
1342  public void testScanWithDoubleFlush() throws IOException {
1343    Configuration conf = HBaseConfiguration.create();
1344    // Initialize region
1345    MyStore myStore = initMyStore(name.getMethodName(), conf, new MyStoreHook(){
1346      @Override
1347      public void getScanners(MyStore store) throws IOException {
1348        final long tmpId = id++;
1349        ExecutorService s = Executors.newSingleThreadExecutor();
1350        s.submit(() -> {
1351          try {
1352            // flush the store before storescanner updates the scanners from store.
1353            // The current data will be flushed into files, and the memstore will
1354            // be clear.
1355            // -- phase (4/4)
1356            flushStore(store, tmpId);
1357          }catch (IOException ex) {
1358            throw new RuntimeException(ex);
1359          }
1360        });
1361        s.shutdown();
1362        try {
1363          // wait for the flush, the thread will be blocked in HStore#notifyChangedReadersObservers.
1364          s.awaitTermination(3, TimeUnit.SECONDS);
1365        } catch (InterruptedException ex) {
1366        }
1367      }
1368    });
1369    byte[] oldValue = Bytes.toBytes("oldValue");
1370    byte[] currentValue = Bytes.toBytes("currentValue");
1371    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1372    long ts = EnvironmentEdgeManager.currentTime();
1373    long seqId = 100;
1374    // older data whihc shouldn't be "seen" by client
1375    myStore.add(createCell(qf1, ts, seqId, oldValue), memStoreSizing);
1376    myStore.add(createCell(qf2, ts, seqId, oldValue), memStoreSizing);
1377    myStore.add(createCell(qf3, ts, seqId, oldValue), memStoreSizing);
1378    long snapshotId = id++;
1379    // push older data into snapshot -- phase (1/4)
1380    StoreFlushContext storeFlushCtx = store.createFlushContext(snapshotId, FlushLifeCycleTracker
1381        .DUMMY);
1382    storeFlushCtx.prepare();
1383
1384    // insert current data into active -- phase (2/4)
1385    myStore.add(createCell(qf1, ts + 1, seqId + 1, currentValue), memStoreSizing);
1386    myStore.add(createCell(qf2, ts + 1, seqId + 1, currentValue), memStoreSizing);
1387    myStore.add(createCell(qf3, ts + 1, seqId + 1, currentValue), memStoreSizing);
1388    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1389    quals.add(qf1);
1390    quals.add(qf2);
1391    quals.add(qf3);
1392    try (InternalScanner scanner = (InternalScanner) myStore.getScanner(
1393        new Scan(new Get(row)), quals, seqId + 1)) {
1394      // complete the flush -- phase (3/4)
1395      storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1396      storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1397
1398      List<Cell> results = new ArrayList<>();
1399      scanner.next(results);
1400      assertEquals(3, results.size());
1401      for (Cell c : results) {
1402        byte[] actualValue = CellUtil.cloneValue(c);
1403        assertTrue("expected:" + Bytes.toStringBinary(currentValue)
1404          + ", actual:" + Bytes.toStringBinary(actualValue)
1405          , Bytes.equals(actualValue, currentValue));
1406      }
1407    }
1408  }
1409
1410  @Test
1411  public void testReclaimChunkWhenScaning() throws IOException {
1412    init("testReclaimChunkWhenScaning");
1413    long ts = EnvironmentEdgeManager.currentTime();
1414    long seqId = 100;
1415    byte[] value = Bytes.toBytes("value");
1416    // older data whihc shouldn't be "seen" by client
1417    store.add(createCell(qf1, ts, seqId, value), null);
1418    store.add(createCell(qf2, ts, seqId, value), null);
1419    store.add(createCell(qf3, ts, seqId, value), null);
1420    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1421    quals.add(qf1);
1422    quals.add(qf2);
1423    quals.add(qf3);
1424    try (InternalScanner scanner = (InternalScanner) store.getScanner(
1425        new Scan(new Get(row)), quals, seqId)) {
1426      List<Cell> results = new MyList<>(size -> {
1427        switch (size) {
1428          // 1) we get the first cell (qf1)
1429          // 2) flush the data to have StoreScanner update inner scanners
1430          // 3) the chunk will be reclaimed after updaing
1431          case 1:
1432            try {
1433              flushStore(store, id++);
1434            } catch (IOException e) {
1435              throw new RuntimeException(e);
1436            }
1437            break;
1438          // 1) we get the second cell (qf2)
1439          // 2) add some cell to fill some byte into the chunk (we have only one chunk)
1440          case 2:
1441            try {
1442              byte[] newValue = Bytes.toBytes("newValue");
1443              // older data whihc shouldn't be "seen" by client
1444              store.add(createCell(qf1, ts + 1, seqId + 1, newValue), null);
1445              store.add(createCell(qf2, ts + 1, seqId + 1, newValue), null);
1446              store.add(createCell(qf3, ts + 1, seqId + 1, newValue), null);
1447            } catch (IOException e) {
1448              throw new RuntimeException(e);
1449            }
1450            break;
1451          default:
1452            break;
1453        }
1454      });
1455      scanner.next(results);
1456      assertEquals(3, results.size());
1457      for (Cell c : results) {
1458        byte[] actualValue = CellUtil.cloneValue(c);
1459        assertTrue("expected:" + Bytes.toStringBinary(value)
1460          + ", actual:" + Bytes.toStringBinary(actualValue)
1461          , Bytes.equals(actualValue, value));
1462      }
1463    }
1464  }
1465
1466  /**
1467   * If there are two running InMemoryFlushRunnable, the later InMemoryFlushRunnable
1468   * may change the versionedList. And the first InMemoryFlushRunnable will use the chagned
1469   * versionedList to remove the corresponding segments.
1470   * In short, there will be some segements which isn't in merge are removed.
1471   * @throws IOException
1472   * @throws InterruptedException
1473   */
1474  @Test
1475  public void testRunDoubleMemStoreCompactors() throws IOException, InterruptedException {
1476    int flushSize = 500;
1477    Configuration conf = HBaseConfiguration.create();
1478    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStoreWithCustomCompactor.class.getName());
1479    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.25);
1480    MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.set(0);
1481    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushSize));
1482    // Set the lower threshold to invoke the "MERGE" policy
1483    conf.set(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, String.valueOf(0));
1484    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
1485        .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
1486    byte[] value = Bytes.toBytes("thisisavarylargevalue");
1487    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1488    long ts = EnvironmentEdgeManager.currentTime();
1489    long seqId = 100;
1490    // older data whihc shouldn't be "seen" by client
1491    store.add(createCell(qf1, ts, seqId, value), memStoreSizing);
1492    store.add(createCell(qf2, ts, seqId, value), memStoreSizing);
1493    store.add(createCell(qf3, ts, seqId, value), memStoreSizing);
1494    assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
1495    StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY);
1496    storeFlushCtx.prepare();
1497    // This shouldn't invoke another in-memory flush because the first compactor thread
1498    // hasn't accomplished the in-memory compaction.
1499    store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing);
1500    store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing);
1501    store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing);
1502    assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
1503    //okay. Let the compaction be completed
1504    MyMemStoreCompactor.START_COMPACTOR_LATCH.countDown();
1505    CompactingMemStore mem = (CompactingMemStore) ((HStore)store).memstore;
1506    while (mem.isMemStoreFlushingInMemory()) {
1507      TimeUnit.SECONDS.sleep(1);
1508    }
1509    // This should invoke another in-memory flush.
1510    store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing);
1511    store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing);
1512    store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing);
1513    assertEquals(2, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
1514    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
1515      String.valueOf(TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE));
1516    storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1517    storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1518  }
1519
1520  @Test
1521  public void testAge() throws IOException {
1522    long currentTime = System.currentTimeMillis();
1523    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
1524    edge.setValue(currentTime);
1525    EnvironmentEdgeManager.injectEdge(edge);
1526    Configuration conf = TEST_UTIL.getConfiguration();
1527    ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(family);
1528    initHRegion(name.getMethodName(), conf,
1529      TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), hcd, null, false);
1530    HStore store = new HStore(region, hcd, conf, false) {
1531
1532      @Override
1533      protected StoreEngine<?, ?, ?, ?> createStoreEngine(HStore store, Configuration conf,
1534          CellComparator kvComparator) throws IOException {
1535        List<HStoreFile> storefiles =
1536            Arrays.asList(mockStoreFile(currentTime - 10), mockStoreFile(currentTime - 100),
1537              mockStoreFile(currentTime - 1000), mockStoreFile(currentTime - 10000));
1538        StoreFileManager sfm = mock(StoreFileManager.class);
1539        when(sfm.getStorefiles()).thenReturn(storefiles);
1540        StoreEngine<?, ?, ?, ?> storeEngine = mock(StoreEngine.class);
1541        when(storeEngine.getStoreFileManager()).thenReturn(sfm);
1542        return storeEngine;
1543      }
1544    };
1545    assertEquals(10L, store.getMinStoreFileAge().getAsLong());
1546    assertEquals(10000L, store.getMaxStoreFileAge().getAsLong());
1547    assertEquals((10 + 100 + 1000 + 10000) / 4.0, store.getAvgStoreFileAge().getAsDouble(), 1E-4);
1548  }
1549
1550  private HStoreFile mockStoreFile(long createdTime) {
1551    StoreFileInfo info = mock(StoreFileInfo.class);
1552    when(info.getCreatedTimestamp()).thenReturn(createdTime);
1553    HStoreFile sf = mock(HStoreFile.class);
1554    when(sf.getReader()).thenReturn(mock(StoreFileReader.class));
1555    when(sf.isHFile()).thenReturn(true);
1556    when(sf.getFileInfo()).thenReturn(info);
1557    return sf;
1558  }
1559
1560  private MyStore initMyStore(String methodName, Configuration conf, MyStoreHook hook)
1561      throws IOException {
1562    return (MyStore) init(methodName, conf,
1563      TableDescriptorBuilder.newBuilder(TableName.valueOf(table)),
1564      ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(5).build(), hook);
1565  }
1566
1567  private static class MyStore extends HStore {
1568    private final MyStoreHook hook;
1569
1570    MyStore(final HRegion region, final ColumnFamilyDescriptor family, final Configuration
1571        confParam, MyStoreHook hook, boolean switchToPread) throws IOException {
1572      super(region, family, confParam, false);
1573      this.hook = hook;
1574    }
1575
1576    @Override
1577    public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks,
1578        boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,
1579        boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt,
1580        boolean includeMemstoreScanner) throws IOException {
1581      hook.getScanners(this);
1582      return super.getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true,
1583        stopRow, false, readPt, includeMemstoreScanner);
1584    }
1585
1586    @Override
1587    public long getSmallestReadPoint() {
1588      return hook.getSmallestReadPoint(this);
1589    }
1590  }
1591
1592  private abstract static class MyStoreHook {
1593
1594    void getScanners(MyStore store) throws IOException {
1595    }
1596
1597    long getSmallestReadPoint(HStore store) {
1598      return store.getHRegion().getSmallestReadPoint();
1599    }
1600  }
1601
1602  @Test
1603  public void testSwitchingPreadtoStreamParallelyWithCompactionDischarger() throws Exception {
1604    Configuration conf = HBaseConfiguration.create();
1605    conf.set("hbase.hstore.engine.class", DummyStoreEngine.class.getName());
1606    conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 0);
1607    // Set the lower threshold to invoke the "MERGE" policy
1608    MyStore store = initMyStore(name.getMethodName(), conf, new MyStoreHook() {});
1609    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1610    long ts = System.currentTimeMillis();
1611    long seqID = 1L;
1612    // Add some data to the region and do some flushes
1613    for (int i = 1; i < 10; i++) {
1614      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1615        memStoreSizing);
1616    }
1617    // flush them
1618    flushStore(store, seqID);
1619    for (int i = 11; i < 20; i++) {
1620      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1621        memStoreSizing);
1622    }
1623    // flush them
1624    flushStore(store, seqID);
1625    for (int i = 21; i < 30; i++) {
1626      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1627        memStoreSizing);
1628    }
1629    // flush them
1630    flushStore(store, seqID);
1631
1632    assertEquals(3, store.getStorefilesCount());
1633    Scan scan = new Scan();
1634    scan.addFamily(family);
1635    Collection<HStoreFile> storefiles2 = store.getStorefiles();
1636    ArrayList<HStoreFile> actualStorefiles = Lists.newArrayList(storefiles2);
1637    StoreScanner storeScanner =
1638        (StoreScanner) store.getScanner(scan, scan.getFamilyMap().get(family), Long.MAX_VALUE);
1639    // get the current heap
1640    KeyValueHeap heap = storeScanner.heap;
1641    // create more store files
1642    for (int i = 31; i < 40; i++) {
1643      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1644        memStoreSizing);
1645    }
1646    // flush them
1647    flushStore(store, seqID);
1648
1649    for (int i = 41; i < 50; i++) {
1650      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1651        memStoreSizing);
1652    }
1653    // flush them
1654    flushStore(store, seqID);
1655    storefiles2 = store.getStorefiles();
1656    ArrayList<HStoreFile> actualStorefiles1 = Lists.newArrayList(storefiles2);
1657    actualStorefiles1.removeAll(actualStorefiles);
1658    // Do compaction
1659    MyThread thread = new MyThread(storeScanner);
1660    thread.start();
1661    store.replaceStoreFiles(actualStorefiles, actualStorefiles1);
1662    thread.join();
1663    KeyValueHeap heap2 = thread.getHeap();
1664    assertFalse(heap.equals(heap2));
1665  }
1666
1667  @Test
1668  public void testInMemoryCompactionTypeWithLowerCase() throws IOException, InterruptedException {
1669    Configuration conf = HBaseConfiguration.create();
1670    conf.set("hbase.systemtables.compacting.memstore.type", "eager");
1671    init(name.getMethodName(), conf,
1672      TableDescriptorBuilder.newBuilder(
1673        TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME, "meta".getBytes())),
1674      ColumnFamilyDescriptorBuilder.newBuilder(family)
1675        .setInMemoryCompaction(MemoryCompactionPolicy.NONE).build());
1676    assertTrue(((MemStoreCompactor) ((CompactingMemStore) store.memstore).compactor).toString()
1677      .startsWith("eager".toUpperCase()));
1678  }
1679
1680  @Test
1681  public void testSpaceQuotaChangeAfterReplacement() throws IOException {
1682    final TableName tn = TableName.valueOf(name.getMethodName());
1683    init(name.getMethodName());
1684
1685    RegionSizeStoreImpl sizeStore = new RegionSizeStoreImpl();
1686
1687    HStoreFile sf1 = mockStoreFileWithLength(1024L);
1688    HStoreFile sf2 = mockStoreFileWithLength(2048L);
1689    HStoreFile sf3 = mockStoreFileWithLength(4096L);
1690    HStoreFile sf4 = mockStoreFileWithLength(8192L);
1691
1692    RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tn).setStartKey(Bytes.toBytes("a"))
1693        .setEndKey(Bytes.toBytes("b")).build();
1694
1695    // Compacting two files down to one, reducing size
1696    sizeStore.put(regionInfo, 1024L + 4096L);
1697    store.updateSpaceQuotaAfterFileReplacement(
1698        sizeStore, regionInfo, Arrays.asList(sf1, sf3), Arrays.asList(sf2));
1699
1700    assertEquals(2048L, sizeStore.getRegionSize(regionInfo).getSize());
1701
1702    // The same file length in and out should have no change
1703    store.updateSpaceQuotaAfterFileReplacement(
1704        sizeStore, regionInfo, Arrays.asList(sf2), Arrays.asList(sf2));
1705
1706    assertEquals(2048L, sizeStore.getRegionSize(regionInfo).getSize());
1707
1708    // Increase the total size used
1709    store.updateSpaceQuotaAfterFileReplacement(
1710        sizeStore, regionInfo, Arrays.asList(sf2), Arrays.asList(sf3));
1711
1712    assertEquals(4096L, sizeStore.getRegionSize(regionInfo).getSize());
1713
1714    RegionInfo regionInfo2 = RegionInfoBuilder.newBuilder(tn).setStartKey(Bytes.toBytes("b"))
1715        .setEndKey(Bytes.toBytes("c")).build();
1716    store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo2, null, Arrays.asList(sf4));
1717
1718    assertEquals(8192L, sizeStore.getRegionSize(regionInfo2).getSize());
1719  }
1720
1721  private HStoreFile mockStoreFileWithLength(long length) {
1722    HStoreFile sf = mock(HStoreFile.class);
1723    StoreFileReader sfr = mock(StoreFileReader.class);
1724    when(sf.isHFile()).thenReturn(true);
1725    when(sf.getReader()).thenReturn(sfr);
1726    when(sfr.length()).thenReturn(length);
1727    return sf;
1728  }
1729
1730  private static class MyThread extends Thread {
1731    private StoreScanner scanner;
1732    private KeyValueHeap heap;
1733
1734    public MyThread(StoreScanner scanner) {
1735      this.scanner = scanner;
1736    }
1737
1738    public KeyValueHeap getHeap() {
1739      return this.heap;
1740    }
1741
1742    @Override
1743    public void run() {
1744      scanner.trySwitchToStreamRead();
1745      heap = scanner.heap;
1746    }
1747  }
1748
1749  private static class MyMemStoreCompactor extends MemStoreCompactor {
1750    private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0);
1751    private static final CountDownLatch START_COMPACTOR_LATCH = new CountDownLatch(1);
1752    public MyMemStoreCompactor(CompactingMemStore compactingMemStore, MemoryCompactionPolicy
1753        compactionPolicy) throws IllegalArgumentIOException {
1754      super(compactingMemStore, compactionPolicy);
1755    }
1756
1757    @Override
1758    public boolean start() throws IOException {
1759      boolean isFirst = RUNNER_COUNT.getAndIncrement() == 0;
1760      if (isFirst) {
1761        try {
1762          START_COMPACTOR_LATCH.await();
1763          return super.start();
1764        } catch (InterruptedException ex) {
1765          throw new RuntimeException(ex);
1766        }
1767      }
1768      return super.start();
1769    }
1770  }
1771
1772  public static class MyCompactingMemStoreWithCustomCompactor extends CompactingMemStore {
1773    private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0);
1774    public MyCompactingMemStoreWithCustomCompactor(Configuration conf, CellComparatorImpl c,
1775        HStore store, RegionServicesForStores regionServices,
1776        MemoryCompactionPolicy compactionPolicy) throws IOException {
1777      super(conf, c, store, regionServices, compactionPolicy);
1778    }
1779
1780    @Override
1781    protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy)
1782        throws IllegalArgumentIOException {
1783      return new MyMemStoreCompactor(this, compactionPolicy);
1784    }
1785
1786    @Override
1787    protected boolean setInMemoryCompactionFlag() {
1788      boolean rval = super.setInMemoryCompactionFlag();
1789      if (rval) {
1790        RUNNER_COUNT.incrementAndGet();
1791        if (LOG.isDebugEnabled()) {
1792          LOG.debug("runner count: " + RUNNER_COUNT.get());
1793        }
1794      }
1795      return rval;
1796    }
1797  }
1798
1799  public static class MyCompactingMemStore extends CompactingMemStore {
1800    private static final AtomicBoolean START_TEST = new AtomicBoolean(false);
1801    private final CountDownLatch getScannerLatch = new CountDownLatch(1);
1802    private final CountDownLatch snapshotLatch = new CountDownLatch(1);
1803    public MyCompactingMemStore(Configuration conf, CellComparatorImpl c,
1804        HStore store, RegionServicesForStores regionServices,
1805        MemoryCompactionPolicy compactionPolicy) throws IOException {
1806      super(conf, c, store, regionServices, compactionPolicy);
1807    }
1808
1809    @Override
1810    protected List<KeyValueScanner> createList(int capacity) {
1811      if (START_TEST.get()) {
1812        try {
1813          getScannerLatch.countDown();
1814          snapshotLatch.await();
1815        } catch (InterruptedException e) {
1816          throw new RuntimeException(e);
1817        }
1818      }
1819      return new ArrayList<>(capacity);
1820    }
1821    @Override
1822    protected void pushActiveToPipeline(MutableSegment active) {
1823      if (START_TEST.get()) {
1824        try {
1825          getScannerLatch.await();
1826        } catch (InterruptedException e) {
1827          throw new RuntimeException(e);
1828        }
1829      }
1830
1831      super.pushActiveToPipeline(active);
1832      if (START_TEST.get()) {
1833        snapshotLatch.countDown();
1834      }
1835    }
1836  }
1837
1838  interface MyListHook {
1839    void hook(int currentSize);
1840  }
1841
1842  private static class MyList<T> implements List<T> {
1843    private final List<T> delegatee = new ArrayList<>();
1844    private final MyListHook hookAtAdd;
1845    MyList(final MyListHook hookAtAdd) {
1846      this.hookAtAdd = hookAtAdd;
1847    }
1848    @Override
1849    public int size() {return delegatee.size();}
1850
1851    @Override
1852    public boolean isEmpty() {return delegatee.isEmpty();}
1853
1854    @Override
1855    public boolean contains(Object o) {return delegatee.contains(o);}
1856
1857    @Override
1858    public Iterator<T> iterator() {return delegatee.iterator();}
1859
1860    @Override
1861    public Object[] toArray() {return delegatee.toArray();}
1862
1863    @Override
1864    public <R> R[] toArray(R[] a) {return delegatee.toArray(a);}
1865
1866    @Override
1867    public boolean add(T e) {
1868      hookAtAdd.hook(size());
1869      return delegatee.add(e);
1870    }
1871
1872    @Override
1873    public boolean remove(Object o) {return delegatee.remove(o);}
1874
1875    @Override
1876    public boolean containsAll(Collection<?> c) {return delegatee.containsAll(c);}
1877
1878    @Override
1879    public boolean addAll(Collection<? extends T> c) {return delegatee.addAll(c);}
1880
1881    @Override
1882    public boolean addAll(int index, Collection<? extends T> c) {return delegatee.addAll(index, c);}
1883
1884    @Override
1885    public boolean removeAll(Collection<?> c) {return delegatee.removeAll(c);}
1886
1887    @Override
1888    public boolean retainAll(Collection<?> c) {return delegatee.retainAll(c);}
1889
1890    @Override
1891    public void clear() {delegatee.clear();}
1892
1893    @Override
1894    public T get(int index) {return delegatee.get(index);}
1895
1896    @Override
1897    public T set(int index, T element) {return delegatee.set(index, element);}
1898
1899    @Override
1900    public void add(int index, T element) {delegatee.add(index, element);}
1901
1902    @Override
1903    public T remove(int index) {return delegatee.remove(index);}
1904
1905    @Override
1906    public int indexOf(Object o) {return delegatee.indexOf(o);}
1907
1908    @Override
1909    public int lastIndexOf(Object o) {return delegatee.lastIndexOf(o);}
1910
1911    @Override
1912    public ListIterator<T> listIterator() {return delegatee.listIterator();}
1913
1914    @Override
1915    public ListIterator<T> listIterator(int index) {return delegatee.listIterator(index);}
1916
1917    @Override
1918    public List<T> subList(int fromIndex, int toIndex) {return delegatee.subList(fromIndex, toIndex);}
1919  }
1920}