001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025import static org.mockito.ArgumentMatchers.any;
026import static org.mockito.Mockito.mock;
027import static org.mockito.Mockito.spy;
028import static org.mockito.Mockito.times;
029import static org.mockito.Mockito.verify;
030import static org.mockito.Mockito.when;
031
032import java.io.IOException;
033import java.lang.ref.SoftReference;
034import java.security.PrivilegedExceptionAction;
035import java.util.ArrayList;
036import java.util.Arrays;
037import java.util.Collection;
038import java.util.Collections;
039import java.util.Iterator;
040import java.util.List;
041import java.util.ListIterator;
042import java.util.NavigableSet;
043import java.util.TreeSet;
044import java.util.concurrent.ConcurrentSkipListSet;
045import java.util.concurrent.CountDownLatch;
046import java.util.concurrent.ExecutorService;
047import java.util.concurrent.Executors;
048import java.util.concurrent.ThreadPoolExecutor;
049import java.util.concurrent.TimeUnit;
050import java.util.concurrent.atomic.AtomicBoolean;
051import java.util.concurrent.atomic.AtomicInteger;
052import org.apache.hadoop.conf.Configuration;
053import org.apache.hadoop.fs.FSDataOutputStream;
054import org.apache.hadoop.fs.FileStatus;
055import org.apache.hadoop.fs.FileSystem;
056import org.apache.hadoop.fs.FilterFileSystem;
057import org.apache.hadoop.fs.LocalFileSystem;
058import org.apache.hadoop.fs.Path;
059import org.apache.hadoop.fs.permission.FsPermission;
060import org.apache.hadoop.hbase.Cell;
061import org.apache.hadoop.hbase.CellBuilderFactory;
062import org.apache.hadoop.hbase.CellBuilderType;
063import org.apache.hadoop.hbase.CellComparator;
064import org.apache.hadoop.hbase.CellComparatorImpl;
065import org.apache.hadoop.hbase.CellUtil;
066import org.apache.hadoop.hbase.HBaseClassTestRule;
067import org.apache.hadoop.hbase.HBaseConfiguration;
068import org.apache.hadoop.hbase.HBaseTestingUtility;
069import org.apache.hadoop.hbase.HConstants;
070import org.apache.hadoop.hbase.KeyValue;
071import org.apache.hadoop.hbase.MemoryCompactionPolicy;
072import org.apache.hadoop.hbase.NamespaceDescriptor;
073import org.apache.hadoop.hbase.PrivateCellUtil;
074import org.apache.hadoop.hbase.TableName;
075import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
076import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
077import org.apache.hadoop.hbase.client.Get;
078import org.apache.hadoop.hbase.client.RegionInfo;
079import org.apache.hadoop.hbase.client.RegionInfoBuilder;
080import org.apache.hadoop.hbase.client.Scan;
081import org.apache.hadoop.hbase.client.TableDescriptor;
082import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
083import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
084import org.apache.hadoop.hbase.filter.Filter;
085import org.apache.hadoop.hbase.filter.FilterBase;
086import org.apache.hadoop.hbase.io.compress.Compression;
087import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
088import org.apache.hadoop.hbase.io.hfile.CacheConfig;
089import org.apache.hadoop.hbase.io.hfile.HFile;
090import org.apache.hadoop.hbase.io.hfile.HFileContext;
091import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
092import org.apache.hadoop.hbase.monitoring.MonitoredTask;
093import org.apache.hadoop.hbase.quotas.RegionSizeStoreImpl;
094import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
095import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
096import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
097import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
098import org.apache.hadoop.hbase.security.User;
099import org.apache.hadoop.hbase.testclassification.MediumTests;
100import org.apache.hadoop.hbase.testclassification.RegionServerTests;
101import org.apache.hadoop.hbase.util.Bytes;
102import org.apache.hadoop.hbase.util.CommonFSUtils;
103import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
104import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
105import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
106import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
107import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
108import org.apache.hadoop.hbase.wal.WALFactory;
109import org.apache.hadoop.util.Progressable;
110import org.junit.After;
111import org.junit.AfterClass;
112import org.junit.Before;
113import org.junit.ClassRule;
114import org.junit.Rule;
115import org.junit.Test;
116import org.junit.experimental.categories.Category;
117import org.junit.rules.TestName;
118import org.mockito.Mockito;
119import org.slf4j.Logger;
120import org.slf4j.LoggerFactory;
121
122import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
123
124/**
125 * Test class for the HStore
126 */
127@Category({ RegionServerTests.class, MediumTests.class })
128public class TestHStore {
129
130  @ClassRule
131  public static final HBaseClassTestRule CLASS_RULE =
132      HBaseClassTestRule.forClass(TestHStore.class);
133
134  private static final Logger LOG = LoggerFactory.getLogger(TestHStore.class);
135  @Rule
136  public TestName name = new TestName();
137
138  HRegion region;
139  HStore store;
140  byte [] table = Bytes.toBytes("table");
141  byte [] family = Bytes.toBytes("family");
142
143  byte [] row = Bytes.toBytes("row");
144  byte [] row2 = Bytes.toBytes("row2");
145  byte [] qf1 = Bytes.toBytes("qf1");
146  byte [] qf2 = Bytes.toBytes("qf2");
147  byte [] qf3 = Bytes.toBytes("qf3");
148  byte [] qf4 = Bytes.toBytes("qf4");
149  byte [] qf5 = Bytes.toBytes("qf5");
150  byte [] qf6 = Bytes.toBytes("qf6");
151
152  NavigableSet<byte[]> qualifiers = new ConcurrentSkipListSet<>(Bytes.BYTES_COMPARATOR);
153
154  List<Cell> expected = new ArrayList<>();
155  List<Cell> result = new ArrayList<>();
156
157  long id = System.currentTimeMillis();
158  Get get = new Get(row);
159
160  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
161  private static final String DIR = TEST_UTIL.getDataTestDir("TestStore").toString();
162
163
164  /**
165   * Setup
166   * @throws IOException
167   */
168  @Before
169  public void setUp() throws IOException {
170    qualifiers.clear();
171    qualifiers.add(qf1);
172    qualifiers.add(qf3);
173    qualifiers.add(qf5);
174
175    Iterator<byte[]> iter = qualifiers.iterator();
176    while(iter.hasNext()){
177      byte [] next = iter.next();
178      expected.add(new KeyValue(row, family, next, 1, (byte[])null));
179      get.addColumn(family, next);
180    }
181  }
182
183  private void init(String methodName) throws IOException {
184    init(methodName, TEST_UTIL.getConfiguration());
185  }
186
187  private HStore init(String methodName, Configuration conf) throws IOException {
188    // some of the tests write 4 versions and then flush
189    // (with HBASE-4241, lower versions are collected on flush)
190    return init(methodName, conf,
191      ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(4).build());
192  }
193
194  private HStore init(String methodName, Configuration conf, ColumnFamilyDescriptor hcd)
195      throws IOException {
196    return init(methodName, conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), hcd);
197  }
198
199  private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder,
200      ColumnFamilyDescriptor hcd) throws IOException {
201    return init(methodName, conf, builder, hcd, null);
202  }
203
204  private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder,
205      ColumnFamilyDescriptor hcd, MyStoreHook hook) throws IOException {
206    return init(methodName, conf, builder, hcd, hook, false);
207  }
208
209  private void initHRegion(String methodName, Configuration conf, TableDescriptorBuilder builder,
210      ColumnFamilyDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException {
211    TableDescriptor htd = builder.setColumnFamily(hcd).build();
212    Path basedir = new Path(DIR + methodName);
213    Path tableDir = CommonFSUtils.getTableDir(basedir, htd.getTableName());
214    final Path logdir = new Path(basedir, AbstractFSWALProvider.getWALDirectoryName(methodName));
215
216    FileSystem fs = FileSystem.get(conf);
217
218    fs.delete(logdir, true);
219    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false,
220      MemStoreLABImpl.CHUNK_SIZE_DEFAULT, 1, 0,
221      null, MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
222    RegionInfo info = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
223    Configuration walConf = new Configuration(conf);
224    CommonFSUtils.setRootDir(walConf, basedir);
225    WALFactory wals = new WALFactory(walConf, methodName);
226    region = new HRegion(new HRegionFileSystem(conf, fs, tableDir, info), wals.getWAL(info), conf,
227        htd, null);
228    region.regionServicesForStores = Mockito.spy(region.regionServicesForStores);
229    ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
230    Mockito.when(region.regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool);
231  }
232
233  private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder,
234      ColumnFamilyDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException {
235    initHRegion(methodName, conf, builder, hcd, hook, switchToPread);
236    if (hook == null) {
237      store = new HStore(region, hcd, conf, false);
238    } else {
239      store = new MyStore(region, hcd, conf, hook, switchToPread);
240    }
241    return store;
242  }
243
244  /**
245   * Test we do not lose data if we fail a flush and then close.
246   * Part of HBase-10466
247   * @throws Exception
248   */
249  @Test
250  public void testFlushSizeSizing() throws Exception {
251    LOG.info("Setting up a faulty file system that cannot write in " + this.name.getMethodName());
252    final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
253    // Only retry once.
254    conf.setInt("hbase.hstore.flush.retries.number", 1);
255    User user = User.createUserForTesting(conf, this.name.getMethodName(),
256      new String[]{"foo"});
257    // Inject our faulty LocalFileSystem
258    conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
259    user.runAs(new PrivilegedExceptionAction<Object>() {
260      @Override
261      public Object run() throws Exception {
262        // Make sure it worked (above is sensitive to caching details in hadoop core)
263        FileSystem fs = FileSystem.get(conf);
264        assertEquals(FaultyFileSystem.class, fs.getClass());
265        FaultyFileSystem ffs = (FaultyFileSystem)fs;
266
267        // Initialize region
268        init(name.getMethodName(), conf);
269
270        MemStoreSize mss = store.memstore.getFlushableSize();
271        assertEquals(0, mss.getDataSize());
272        LOG.info("Adding some data");
273        MemStoreSizing kvSize = new NonThreadSafeMemStoreSizing();
274        store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), kvSize);
275        // add the heap size of active (mutable) segment
276        kvSize.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0, 0);
277        mss = store.memstore.getFlushableSize();
278        assertEquals(kvSize.getMemStoreSize(), mss);
279        // Flush.  Bug #1 from HBASE-10466.  Make sure size calculation on failed flush is right.
280        try {
281          LOG.info("Flushing");
282          flushStore(store, id++);
283          fail("Didn't bubble up IOE!");
284        } catch (IOException ioe) {
285          assertTrue(ioe.getMessage().contains("Fault injected"));
286        }
287        // due to snapshot, change mutable to immutable segment
288        kvSize.incMemStoreSize(0,
289          CSLMImmutableSegment.DEEP_OVERHEAD_CSLM - MutableSegment.DEEP_OVERHEAD, 0, 0);
290        mss = store.memstore.getFlushableSize();
291        assertEquals(kvSize.getMemStoreSize(), mss);
292        MemStoreSizing kvSize2 = new NonThreadSafeMemStoreSizing();
293        store.add(new KeyValue(row, family, qf2, 2, (byte[]) null), kvSize2);
294        kvSize2.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0, 0);
295        // Even though we add a new kv, we expect the flushable size to be 'same' since we have
296        // not yet cleared the snapshot -- the above flush failed.
297        assertEquals(kvSize.getMemStoreSize(), mss);
298        ffs.fault.set(false);
299        flushStore(store, id++);
300        mss = store.memstore.getFlushableSize();
301        // Size should be the foreground kv size.
302        assertEquals(kvSize2.getMemStoreSize(), mss);
303        flushStore(store, id++);
304        mss = store.memstore.getFlushableSize();
305        assertEquals(0, mss.getDataSize());
306        assertEquals(MutableSegment.DEEP_OVERHEAD, mss.getHeapSize());
307        return null;
308      }
309    });
310  }
311
312  /**
313   * Verify that compression and data block encoding are respected by the
314   * Store.createWriterInTmp() method, used on store flush.
315   */
316  @Test
317  public void testCreateWriter() throws Exception {
318    Configuration conf = HBaseConfiguration.create();
319    FileSystem fs = FileSystem.get(conf);
320
321    ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.newBuilder(family)
322        .setCompressionType(Compression.Algorithm.GZ).setDataBlockEncoding(DataBlockEncoding.DIFF)
323        .build();
324    init(name.getMethodName(), conf, hcd);
325
326    // Test createWriterInTmp()
327    StoreFileWriter writer =
328        store.createWriterInTmp(4, hcd.getCompressionType(), false, true, false, false);
329    Path path = writer.getPath();
330    writer.append(new KeyValue(row, family, qf1, Bytes.toBytes(1)));
331    writer.append(new KeyValue(row, family, qf2, Bytes.toBytes(2)));
332    writer.append(new KeyValue(row2, family, qf1, Bytes.toBytes(3)));
333    writer.append(new KeyValue(row2, family, qf2, Bytes.toBytes(4)));
334    writer.close();
335
336    // Verify that compression and encoding settings are respected
337    HFile.Reader reader = HFile.createReader(fs, path, new CacheConfig(conf), true, conf);
338    assertEquals(hcd.getCompressionType(), reader.getTrailer().getCompressionCodec());
339    assertEquals(hcd.getDataBlockEncoding(), reader.getDataBlockEncoding());
340    reader.close();
341  }
342
343  @Test
344  public void testDeleteExpiredStoreFiles() throws Exception {
345    testDeleteExpiredStoreFiles(0);
346    testDeleteExpiredStoreFiles(1);
347  }
348
349  /*
350   * @param minVersions the MIN_VERSIONS for the column family
351   */
352  public void testDeleteExpiredStoreFiles(int minVersions) throws Exception {
353    int storeFileNum = 4;
354    int ttl = 4;
355    IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
356    EnvironmentEdgeManagerTestHelper.injectEdge(edge);
357
358    Configuration conf = HBaseConfiguration.create();
359    // Enable the expired store file deletion
360    conf.setBoolean("hbase.store.delete.expired.storefile", true);
361    // Set the compaction threshold higher to avoid normal compactions.
362    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 5);
363
364    init(name.getMethodName() + "-" + minVersions, conf, ColumnFamilyDescriptorBuilder
365        .newBuilder(family).setMinVersions(minVersions).setTimeToLive(ttl).build());
366
367    long storeTtl = this.store.getScanInfo().getTtl();
368    long sleepTime = storeTtl / storeFileNum;
369    long timeStamp;
370    // There are 4 store files and the max time stamp difference among these
371    // store files will be (this.store.ttl / storeFileNum)
372    for (int i = 1; i <= storeFileNum; i++) {
373      LOG.info("Adding some data for the store file #" + i);
374      timeStamp = EnvironmentEdgeManager.currentTime();
375      this.store.add(new KeyValue(row, family, qf1, timeStamp, (byte[]) null), null);
376      this.store.add(new KeyValue(row, family, qf2, timeStamp, (byte[]) null), null);
377      this.store.add(new KeyValue(row, family, qf3, timeStamp, (byte[]) null), null);
378      flush(i);
379      edge.incrementTime(sleepTime);
380    }
381
382    // Verify the total number of store files
383    assertEquals(storeFileNum, this.store.getStorefiles().size());
384
385     // Each call will find one expired store file and delete it before compaction happens.
386     // There will be no compaction due to threshold above. Last file will not be replaced.
387    for (int i = 1; i <= storeFileNum - 1; i++) {
388      // verify the expired store file.
389      assertFalse(this.store.requestCompaction().isPresent());
390      Collection<HStoreFile> sfs = this.store.getStorefiles();
391      // Ensure i files are gone.
392      if (minVersions == 0) {
393        assertEquals(storeFileNum - i, sfs.size());
394        // Ensure only non-expired files remain.
395        for (HStoreFile sf : sfs) {
396          assertTrue(sf.getReader().getMaxTimestamp() >= (edge.currentTime() - storeTtl));
397        }
398      } else {
399        assertEquals(storeFileNum, sfs.size());
400      }
401      // Let the next store file expired.
402      edge.incrementTime(sleepTime);
403    }
404    assertFalse(this.store.requestCompaction().isPresent());
405
406    Collection<HStoreFile> sfs = this.store.getStorefiles();
407    // Assert the last expired file is not removed.
408    if (minVersions == 0) {
409      assertEquals(1, sfs.size());
410    }
411    long ts = sfs.iterator().next().getReader().getMaxTimestamp();
412    assertTrue(ts < (edge.currentTime() - storeTtl));
413
414    for (HStoreFile sf : sfs) {
415      sf.closeStoreFile(true);
416    }
417  }
418
419  @Test
420  public void testLowestModificationTime() throws Exception {
421    Configuration conf = HBaseConfiguration.create();
422    FileSystem fs = FileSystem.get(conf);
423    // Initialize region
424    init(name.getMethodName(), conf);
425
426    int storeFileNum = 4;
427    for (int i = 1; i <= storeFileNum; i++) {
428      LOG.info("Adding some data for the store file #"+i);
429      this.store.add(new KeyValue(row, family, qf1, i, (byte[])null), null);
430      this.store.add(new KeyValue(row, family, qf2, i, (byte[])null), null);
431      this.store.add(new KeyValue(row, family, qf3, i, (byte[])null), null);
432      flush(i);
433    }
434    // after flush; check the lowest time stamp
435    long lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
436    long lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
437    assertEquals(lowestTimeStampFromManager,lowestTimeStampFromFS);
438
439    // after compact; check the lowest time stamp
440    store.compact(store.requestCompaction().get(), NoLimitThroughputController.INSTANCE, null);
441    lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
442    lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
443    assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS);
444  }
445
446  private static long getLowestTimeStampFromFS(FileSystem fs,
447      final Collection<HStoreFile> candidates) throws IOException {
448    long minTs = Long.MAX_VALUE;
449    if (candidates.isEmpty()) {
450      return minTs;
451    }
452    Path[] p = new Path[candidates.size()];
453    int i = 0;
454    for (HStoreFile sf : candidates) {
455      p[i] = sf.getPath();
456      ++i;
457    }
458
459    FileStatus[] stats = fs.listStatus(p);
460    if (stats == null || stats.length == 0) {
461      return minTs;
462    }
463    for (FileStatus s : stats) {
464      minTs = Math.min(minTs, s.getModificationTime());
465    }
466    return minTs;
467  }
468
469  //////////////////////////////////////////////////////////////////////////////
470  // Get tests
471  //////////////////////////////////////////////////////////////////////////////
472
473  private static final int BLOCKSIZE_SMALL = 8192;
474  /**
475   * Test for hbase-1686.
476   * @throws IOException
477   */
478  @Test
479  public void testEmptyStoreFile() throws IOException {
480    init(this.name.getMethodName());
481    // Write a store file.
482    this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
483    this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null);
484    flush(1);
485    // Now put in place an empty store file.  Its a little tricky.  Have to
486    // do manually with hacked in sequence id.
487    HStoreFile f = this.store.getStorefiles().iterator().next();
488    Path storedir = f.getPath().getParent();
489    long seqid = f.getMaxSequenceId();
490    Configuration c = HBaseConfiguration.create();
491    FileSystem fs = FileSystem.get(c);
492    HFileContext meta = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build();
493    StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c),
494        fs)
495            .withOutputDir(storedir)
496            .withFileContext(meta)
497            .build();
498    w.appendMetadata(seqid + 1, false);
499    w.close();
500    this.store.close();
501    // Reopen it... should pick up two files
502    this.store =
503        new HStore(this.store.getHRegion(), this.store.getColumnFamilyDescriptor(), c, false);
504    assertEquals(2, this.store.getStorefilesCount());
505
506    result = HBaseTestingUtility.getFromStoreFile(store,
507        get.getRow(),
508        qualifiers);
509    assertEquals(1, result.size());
510  }
511
512  /**
513   * Getting data from memstore only
514   * @throws IOException
515   */
516  @Test
517  public void testGet_FromMemStoreOnly() throws IOException {
518    init(this.name.getMethodName());
519
520    //Put data in memstore
521    this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
522    this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null);
523    this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null), null);
524    this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null), null);
525    this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null), null);
526    this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null), null);
527
528    //Get
529    result = HBaseTestingUtility.getFromStoreFile(store,
530        get.getRow(), qualifiers);
531
532    //Compare
533    assertCheck();
534  }
535
536  @Test
537  public void testTimeRangeIfSomeCellsAreDroppedInFlush() throws IOException {
538    testTimeRangeIfSomeCellsAreDroppedInFlush(1);
539    testTimeRangeIfSomeCellsAreDroppedInFlush(3);
540    testTimeRangeIfSomeCellsAreDroppedInFlush(5);
541  }
542
543  private void testTimeRangeIfSomeCellsAreDroppedInFlush(int maxVersion) throws IOException {
544    init(this.name.getMethodName(), TEST_UTIL.getConfiguration(),
545    ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(maxVersion).build());
546    long currentTs = 100;
547    long minTs = currentTs;
548    // the extra cell won't be flushed to disk,
549    // so the min of timerange will be different between memStore and hfile.
550    for (int i = 0; i != (maxVersion + 1); ++i) {
551      this.store.add(new KeyValue(row, family, qf1, ++currentTs, (byte[])null), null);
552      if (i == 1) {
553        minTs = currentTs;
554      }
555    }
556    flushStore(store, id++);
557
558    Collection<HStoreFile> files = store.getStorefiles();
559    assertEquals(1, files.size());
560    HStoreFile f = files.iterator().next();
561    f.initReader();
562    StoreFileReader reader = f.getReader();
563    assertEquals(minTs, reader.timeRange.getMin());
564    assertEquals(currentTs, reader.timeRange.getMax());
565  }
566
567  /**
568   * Getting data from files only
569   * @throws IOException
570   */
571  @Test
572  public void testGet_FromFilesOnly() throws IOException {
573    init(this.name.getMethodName());
574
575    //Put data in memstore
576    this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
577    this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null);
578    //flush
579    flush(1);
580
581    //Add more data
582    this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null), null);
583    this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null), null);
584    //flush
585    flush(2);
586
587    //Add more data
588    this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null), null);
589    this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null), null);
590    //flush
591    flush(3);
592
593    //Get
594    result = HBaseTestingUtility.getFromStoreFile(store,
595        get.getRow(),
596        qualifiers);
597    //this.store.get(get, qualifiers, result);
598
599    //Need to sort the result since multiple files
600    Collections.sort(result, CellComparatorImpl.COMPARATOR);
601
602    //Compare
603    assertCheck();
604  }
605
606  /**
607   * Getting data from memstore and files
608   * @throws IOException
609   */
610  @Test
611  public void testGet_FromMemStoreAndFiles() throws IOException {
612    init(this.name.getMethodName());
613
614    //Put data in memstore
615    this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
616    this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null);
617    //flush
618    flush(1);
619
620    //Add more data
621    this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null), null);
622    this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null), null);
623    //flush
624    flush(2);
625
626    //Add more data
627    this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null), null);
628    this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null), null);
629
630    //Get
631    result = HBaseTestingUtility.getFromStoreFile(store,
632        get.getRow(), qualifiers);
633
634    //Need to sort the result since multiple files
635    Collections.sort(result, CellComparatorImpl.COMPARATOR);
636
637    //Compare
638    assertCheck();
639  }
640
641  private void flush(int storeFilessize) throws IOException{
642    this.store.snapshot();
643    flushStore(store, id++);
644    assertEquals(storeFilessize, this.store.getStorefiles().size());
645    assertEquals(0, ((AbstractMemStore)this.store.memstore).getActive().getCellsCount());
646  }
647
648  private void assertCheck() {
649    assertEquals(expected.size(), result.size());
650    for(int i=0; i<expected.size(); i++) {
651      assertEquals(expected.get(i), result.get(i));
652    }
653  }
654
655  @After
656  public void tearDown() throws Exception {
657    EnvironmentEdgeManagerTestHelper.reset();
658    if (store != null) {
659      try {
660        store.close();
661      } catch (IOException e) {
662      }
663      store = null;
664    }
665    if (region != null) {
666      region.close();
667      region = null;
668    }
669  }
670
671  @AfterClass
672  public static void tearDownAfterClass() throws IOException {
673    TEST_UTIL.cleanupTestDir();
674  }
675
676  @Test
677  public void testHandleErrorsInFlush() throws Exception {
678    LOG.info("Setting up a faulty file system that cannot write");
679
680    final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
681    User user = User.createUserForTesting(conf,
682        "testhandleerrorsinflush", new String[]{"foo"});
683    // Inject our faulty LocalFileSystem
684    conf.setClass("fs.file.impl", FaultyFileSystem.class,
685        FileSystem.class);
686    user.runAs(new PrivilegedExceptionAction<Object>() {
687      @Override
688      public Object run() throws Exception {
689        // Make sure it worked (above is sensitive to caching details in hadoop core)
690        FileSystem fs = FileSystem.get(conf);
691        assertEquals(FaultyFileSystem.class, fs.getClass());
692
693        // Initialize region
694        init(name.getMethodName(), conf);
695
696        LOG.info("Adding some data");
697        store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
698        store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null);
699        store.add(new KeyValue(row, family, qf3, 1, (byte[])null), null);
700
701        LOG.info("Before flush, we should have no files");
702
703        Collection<StoreFileInfo> files =
704          store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName());
705        assertEquals(0, files != null ? files.size() : 0);
706
707        //flush
708        try {
709          LOG.info("Flushing");
710          flush(1);
711          fail("Didn't bubble up IOE!");
712        } catch (IOException ioe) {
713          assertTrue(ioe.getMessage().contains("Fault injected"));
714        }
715
716        LOG.info("After failed flush, we should still have no files!");
717        files = store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName());
718        assertEquals(0, files != null ? files.size() : 0);
719        store.getHRegion().getWAL().close();
720        return null;
721      }
722    });
723    FileSystem.closeAllForUGI(user.getUGI());
724  }
725
726  /**
727   * Faulty file system that will fail if you write past its fault position the FIRST TIME
728   * only; thereafter it will succeed.  Used by {@link TestHRegion} too.
729   */
730  static class FaultyFileSystem extends FilterFileSystem {
731    List<SoftReference<FaultyOutputStream>> outStreams = new ArrayList<>();
732    private long faultPos = 200;
733    AtomicBoolean fault = new AtomicBoolean(true);
734
735    public FaultyFileSystem() {
736      super(new LocalFileSystem());
737      System.err.println("Creating faulty!");
738    }
739
740    @Override
741    public FSDataOutputStream create(Path p) throws IOException {
742      return new FaultyOutputStream(super.create(p), faultPos, fault);
743    }
744
745    @Override
746    public FSDataOutputStream create(Path f, FsPermission permission,
747        boolean overwrite, int bufferSize, short replication, long blockSize,
748        Progressable progress) throws IOException {
749      return new FaultyOutputStream(super.create(f, permission,
750          overwrite, bufferSize, replication, blockSize, progress), faultPos, fault);
751    }
752
753    @Override
754    public FSDataOutputStream createNonRecursive(Path f, boolean overwrite,
755        int bufferSize, short replication, long blockSize, Progressable progress)
756    throws IOException {
757      // Fake it.  Call create instead.  The default implementation throws an IOE
758      // that this is not supported.
759      return create(f, overwrite, bufferSize, replication, blockSize, progress);
760    }
761  }
762
763  static class FaultyOutputStream extends FSDataOutputStream {
764    volatile long faultPos = Long.MAX_VALUE;
765    private final AtomicBoolean fault;
766
767    public FaultyOutputStream(FSDataOutputStream out, long faultPos, final AtomicBoolean fault)
768    throws IOException {
769      super(out, null);
770      this.faultPos = faultPos;
771      this.fault = fault;
772    }
773
774    @Override
775    public synchronized void write(byte[] buf, int offset, int length) throws IOException {
776      System.err.println("faulty stream write at pos " + getPos());
777      injectFault();
778      super.write(buf, offset, length);
779    }
780
781    private void injectFault() throws IOException {
782      if (this.fault.get() && getPos() >= faultPos) {
783        throw new IOException("Fault injected");
784      }
785    }
786  }
787
788  private static void flushStore(HStore store, long id) throws IOException {
789    StoreFlushContext storeFlushCtx = store.createFlushContext(id, FlushLifeCycleTracker.DUMMY);
790    storeFlushCtx.prepare();
791    storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
792    storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
793  }
794
795  /**
796   * Generate a list of KeyValues for testing based on given parameters
797   * @param timestamps
798   * @param numRows
799   * @param qualifier
800   * @param family
801   * @return the rows key-value list
802   */
803  List<Cell> getKeyValueSet(long[] timestamps, int numRows,
804      byte[] qualifier, byte[] family) {
805    List<Cell> kvList = new ArrayList<>();
806    for (int i=1;i<=numRows;i++) {
807      byte[] b = Bytes.toBytes(i);
808      for (long timestamp: timestamps) {
809        kvList.add(new KeyValue(b, family, qualifier, timestamp, b));
810      }
811    }
812    return kvList;
813  }
814
815  /**
816   * Test to ensure correctness when using Stores with multiple timestamps
817   * @throws IOException
818   */
819  @Test
820  public void testMultipleTimestamps() throws IOException {
821    int numRows = 1;
822    long[] timestamps1 = new long[] {1,5,10,20};
823    long[] timestamps2 = new long[] {30,80};
824
825    init(this.name.getMethodName());
826
827    List<Cell> kvList1 = getKeyValueSet(timestamps1,numRows, qf1, family);
828    for (Cell kv : kvList1) {
829      this.store.add(kv, null);
830    }
831
832    this.store.snapshot();
833    flushStore(store, id++);
834
835    List<Cell> kvList2 = getKeyValueSet(timestamps2,numRows, qf1, family);
836    for(Cell kv : kvList2) {
837      this.store.add(kv, null);
838    }
839
840    List<Cell> result;
841    Get get = new Get(Bytes.toBytes(1));
842    get.addColumn(family,qf1);
843
844    get.setTimeRange(0,15);
845    result = HBaseTestingUtility.getFromStoreFile(store, get);
846    assertTrue(result.size()>0);
847
848    get.setTimeRange(40,90);
849    result = HBaseTestingUtility.getFromStoreFile(store, get);
850    assertTrue(result.size()>0);
851
852    get.setTimeRange(10,45);
853    result = HBaseTestingUtility.getFromStoreFile(store, get);
854    assertTrue(result.size()>0);
855
856    get.setTimeRange(80,145);
857    result = HBaseTestingUtility.getFromStoreFile(store, get);
858    assertTrue(result.size()>0);
859
860    get.setTimeRange(1,2);
861    result = HBaseTestingUtility.getFromStoreFile(store, get);
862    assertTrue(result.size()>0);
863
864    get.setTimeRange(90,200);
865    result = HBaseTestingUtility.getFromStoreFile(store, get);
866    assertTrue(result.size()==0);
867  }
868
869  /**
870   * Test for HBASE-3492 - Test split on empty colfam (no store files).
871   *
872   * @throws IOException When the IO operations fail.
873   */
874  @Test
875  public void testSplitWithEmptyColFam() throws IOException {
876    init(this.name.getMethodName());
877    assertFalse(store.getSplitPoint().isPresent());
878  }
879
880  @Test
881  public void testStoreUsesConfigurationFromHcdAndHtd() throws Exception {
882    final String CONFIG_KEY = "hbase.regionserver.thread.compaction.throttle";
883    long anyValue = 10;
884
885    // We'll check that it uses correct config and propagates it appropriately by going thru
886    // the simplest "real" path I can find - "throttleCompaction", which just checks whether
887    // a number we pass in is higher than some config value, inside compactionPolicy.
888    Configuration conf = HBaseConfiguration.create();
889    conf.setLong(CONFIG_KEY, anyValue);
890    init(name.getMethodName() + "-xml", conf);
891    assertTrue(store.throttleCompaction(anyValue + 1));
892    assertFalse(store.throttleCompaction(anyValue));
893
894    // HTD overrides XML.
895    --anyValue;
896    init(name.getMethodName() + "-htd", conf, TableDescriptorBuilder
897        .newBuilder(TableName.valueOf(table)).setValue(CONFIG_KEY, Long.toString(anyValue)),
898      ColumnFamilyDescriptorBuilder.of(family));
899    assertTrue(store.throttleCompaction(anyValue + 1));
900    assertFalse(store.throttleCompaction(anyValue));
901
902    // HCD overrides them both.
903    --anyValue;
904    init(name.getMethodName() + "-hcd", conf,
905      TableDescriptorBuilder.newBuilder(TableName.valueOf(table)).setValue(CONFIG_KEY,
906        Long.toString(anyValue)),
907      ColumnFamilyDescriptorBuilder.newBuilder(family).setValue(CONFIG_KEY, Long.toString(anyValue))
908          .build());
909    assertTrue(store.throttleCompaction(anyValue + 1));
910    assertFalse(store.throttleCompaction(anyValue));
911  }
912
913  public static class DummyStoreEngine extends DefaultStoreEngine {
914    public static DefaultCompactor lastCreatedCompactor = null;
915
916    @Override
917    protected void createComponents(Configuration conf, HStore store, CellComparator comparator)
918        throws IOException {
919      super.createComponents(conf, store, comparator);
920      lastCreatedCompactor = this.compactor;
921    }
922  }
923
924  @Test
925  public void testStoreUsesSearchEngineOverride() throws Exception {
926    Configuration conf = HBaseConfiguration.create();
927    conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DummyStoreEngine.class.getName());
928    init(this.name.getMethodName(), conf);
929    assertEquals(DummyStoreEngine.lastCreatedCompactor,
930      this.store.storeEngine.getCompactor());
931  }
932
933  private void addStoreFile() throws IOException {
934    HStoreFile f = this.store.getStorefiles().iterator().next();
935    Path storedir = f.getPath().getParent();
936    long seqid = this.store.getMaxSequenceId().orElse(0L);
937    Configuration c = TEST_UTIL.getConfiguration();
938    FileSystem fs = FileSystem.get(c);
939    HFileContext fileContext = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build();
940    StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c),
941        fs)
942            .withOutputDir(storedir)
943            .withFileContext(fileContext)
944            .build();
945    w.appendMetadata(seqid + 1, false);
946    w.close();
947    LOG.info("Added store file:" + w.getPath());
948  }
949
950  private void archiveStoreFile(int index) throws IOException {
951    Collection<HStoreFile> files = this.store.getStorefiles();
952    HStoreFile sf = null;
953    Iterator<HStoreFile> it = files.iterator();
954    for (int i = 0; i <= index; i++) {
955      sf = it.next();
956    }
957    store.getRegionFileSystem().removeStoreFiles(store.getColumnFamilyName(), Lists.newArrayList(sf));
958  }
959
960  private void closeCompactedFile(int index) throws IOException {
961    Collection<HStoreFile> files =
962        this.store.getStoreEngine().getStoreFileManager().getCompactedfiles();
963    HStoreFile sf = null;
964    Iterator<HStoreFile> it = files.iterator();
965    for (int i = 0; i <= index; i++) {
966      sf = it.next();
967    }
968    sf.closeStoreFile(true);
969    store.getStoreEngine().getStoreFileManager().removeCompactedFiles(Lists.newArrayList(sf));
970  }
971
972  @Test
973  public void testRefreshStoreFiles() throws Exception {
974    init(name.getMethodName());
975
976    assertEquals(0, this.store.getStorefilesCount());
977
978    // Test refreshing store files when no store files are there
979    store.refreshStoreFiles();
980    assertEquals(0, this.store.getStorefilesCount());
981
982    // add some data, flush
983    this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
984    flush(1);
985    assertEquals(1, this.store.getStorefilesCount());
986
987    // add one more file
988    addStoreFile();
989
990    assertEquals(1, this.store.getStorefilesCount());
991    store.refreshStoreFiles();
992    assertEquals(2, this.store.getStorefilesCount());
993
994    // add three more files
995    addStoreFile();
996    addStoreFile();
997    addStoreFile();
998
999    assertEquals(2, this.store.getStorefilesCount());
1000    store.refreshStoreFiles();
1001    assertEquals(5, this.store.getStorefilesCount());
1002
1003    closeCompactedFile(0);
1004    archiveStoreFile(0);
1005
1006    assertEquals(5, this.store.getStorefilesCount());
1007    store.refreshStoreFiles();
1008    assertEquals(4, this.store.getStorefilesCount());
1009
1010    archiveStoreFile(0);
1011    archiveStoreFile(1);
1012    archiveStoreFile(2);
1013
1014    assertEquals(4, this.store.getStorefilesCount());
1015    store.refreshStoreFiles();
1016    assertEquals(1, this.store.getStorefilesCount());
1017
1018    archiveStoreFile(0);
1019    store.refreshStoreFiles();
1020    assertEquals(0, this.store.getStorefilesCount());
1021  }
1022
1023  @Test
1024  public void testRefreshStoreFilesNotChanged() throws IOException {
1025    init(name.getMethodName());
1026
1027    assertEquals(0, this.store.getStorefilesCount());
1028
1029    // add some data, flush
1030    this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
1031    flush(1);
1032    // add one more file
1033    addStoreFile();
1034
1035    HStore spiedStore = spy(store);
1036
1037    // call first time after files changed
1038    spiedStore.refreshStoreFiles();
1039    assertEquals(2, this.store.getStorefilesCount());
1040    verify(spiedStore, times(1)).replaceStoreFiles(any(), any());
1041
1042    // call second time
1043    spiedStore.refreshStoreFiles();
1044
1045    //ensure that replaceStoreFiles is not called if files are not refreshed
1046    verify(spiedStore, times(0)).replaceStoreFiles(null, null);
1047  }
1048
1049  private long countMemStoreScanner(StoreScanner scanner) {
1050    if (scanner.currentScanners == null) {
1051      return 0;
1052    }
1053    return scanner.currentScanners.stream()
1054            .filter(s -> !s.isFileScanner())
1055            .count();
1056  }
1057
1058  @Test
1059  public void testNumberOfMemStoreScannersAfterFlush() throws IOException {
1060    long seqId = 100;
1061    long timestamp = System.currentTimeMillis();
1062    Cell cell0 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family)
1063        .setQualifier(qf1).setTimestamp(timestamp).setType(Cell.Type.Put)
1064        .setValue(qf1).build();
1065    PrivateCellUtil.setSequenceId(cell0, seqId);
1066    testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Collections.emptyList());
1067
1068    Cell cell1 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family)
1069        .setQualifier(qf2).setTimestamp(timestamp).setType(Cell.Type.Put)
1070        .setValue(qf1).build();
1071    PrivateCellUtil.setSequenceId(cell1, seqId);
1072    testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1));
1073
1074    seqId = 101;
1075    timestamp = System.currentTimeMillis();
1076    Cell cell2 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row2).setFamily(family)
1077        .setQualifier(qf2).setTimestamp(timestamp).setType(Cell.Type.Put)
1078        .setValue(qf1).build();
1079    PrivateCellUtil.setSequenceId(cell2, seqId);
1080    testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1, cell2));
1081  }
1082
1083  private void testNumberOfMemStoreScannersAfterFlush(List<Cell> inputCellsBeforeSnapshot,
1084      List<Cell> inputCellsAfterSnapshot) throws IOException {
1085    init(this.name.getMethodName() + "-" + inputCellsBeforeSnapshot.size());
1086    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1087    long seqId = Long.MIN_VALUE;
1088    for (Cell c : inputCellsBeforeSnapshot) {
1089      quals.add(CellUtil.cloneQualifier(c));
1090      seqId = Math.max(seqId, c.getSequenceId());
1091    }
1092    for (Cell c : inputCellsAfterSnapshot) {
1093      quals.add(CellUtil.cloneQualifier(c));
1094      seqId = Math.max(seqId, c.getSequenceId());
1095    }
1096    inputCellsBeforeSnapshot.forEach(c -> store.add(c, null));
1097    StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY);
1098    storeFlushCtx.prepare();
1099    inputCellsAfterSnapshot.forEach(c -> store.add(c, null));
1100    int numberOfMemScannersBeforeFlush = inputCellsAfterSnapshot.isEmpty() ? 1 : 2;
1101    try (StoreScanner s = (StoreScanner) store.getScanner(new Scan(), quals, seqId)) {
1102      // snapshot + active (if inputCellsAfterSnapshot isn't empty)
1103      assertEquals(numberOfMemScannersBeforeFlush, countMemStoreScanner(s));
1104      storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1105      storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1106      // snapshot has no data after flush
1107      int numberOfMemScannersAfterFlush = inputCellsAfterSnapshot.isEmpty() ? 0 : 1;
1108      boolean more;
1109      int cellCount = 0;
1110      do {
1111        List<Cell> cells = new ArrayList<>();
1112        more = s.next(cells);
1113        cellCount += cells.size();
1114        assertEquals(more ? numberOfMemScannersAfterFlush : 0, countMemStoreScanner(s));
1115      } while (more);
1116      assertEquals("The number of cells added before snapshot is " + inputCellsBeforeSnapshot.size()
1117          + ", The number of cells added after snapshot is " + inputCellsAfterSnapshot.size(),
1118          inputCellsBeforeSnapshot.size() + inputCellsAfterSnapshot.size(), cellCount);
1119      // the current scanners is cleared
1120      assertEquals(0, countMemStoreScanner(s));
1121    }
1122  }
1123
1124  private Cell createCell(byte[] qualifier, long ts, long sequenceId, byte[] value)
1125      throws IOException {
1126    return createCell(row, qualifier, ts, sequenceId, value);
1127  }
1128
1129  private Cell createCell(byte[] row, byte[] qualifier, long ts, long sequenceId, byte[] value)
1130      throws IOException {
1131    Cell c = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family)
1132        .setQualifier(qualifier).setTimestamp(ts).setType(Cell.Type.Put)
1133        .setValue(value).build();
1134    PrivateCellUtil.setSequenceId(c, sequenceId);
1135    return c;
1136  }
1137
1138  @Test
1139  public void testFlushBeforeCompletingScanWoFilter() throws IOException, InterruptedException {
1140    final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false);
1141    final int expectedSize = 3;
1142    testFlushBeforeCompletingScan(new MyListHook() {
1143      @Override
1144      public void hook(int currentSize) {
1145        if (currentSize == expectedSize - 1) {
1146          try {
1147            flushStore(store, id++);
1148            timeToGoNextRow.set(true);
1149          } catch (IOException e) {
1150            throw new RuntimeException(e);
1151          }
1152        }
1153      }
1154    }, new FilterBase() {
1155      @Override
1156      public Filter.ReturnCode filterCell(final Cell c) throws IOException {
1157        return ReturnCode.INCLUDE;
1158      }
1159    }, expectedSize);
1160  }
1161
1162  @Test
1163  public void testFlushBeforeCompletingScanWithFilter() throws IOException, InterruptedException {
1164    final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false);
1165    final int expectedSize = 2;
1166    testFlushBeforeCompletingScan(new MyListHook() {
1167      @Override
1168      public void hook(int currentSize) {
1169        if (currentSize == expectedSize - 1) {
1170          try {
1171            flushStore(store, id++);
1172            timeToGoNextRow.set(true);
1173          } catch (IOException e) {
1174            throw new RuntimeException(e);
1175          }
1176        }
1177      }
1178    }, new FilterBase() {
1179      @Override
1180      public Filter.ReturnCode filterCell(final Cell c) throws IOException {
1181        if (timeToGoNextRow.get()) {
1182          timeToGoNextRow.set(false);
1183          return ReturnCode.NEXT_ROW;
1184        } else {
1185          return ReturnCode.INCLUDE;
1186        }
1187      }
1188    }, expectedSize);
1189  }
1190
1191  @Test
1192  public void testFlushBeforeCompletingScanWithFilterHint() throws IOException,
1193      InterruptedException {
1194    final AtomicBoolean timeToGetHint = new AtomicBoolean(false);
1195    final int expectedSize = 2;
1196    testFlushBeforeCompletingScan(new MyListHook() {
1197      @Override
1198      public void hook(int currentSize) {
1199        if (currentSize == expectedSize - 1) {
1200          try {
1201            flushStore(store, id++);
1202            timeToGetHint.set(true);
1203          } catch (IOException e) {
1204            throw new RuntimeException(e);
1205          }
1206        }
1207      }
1208    }, new FilterBase() {
1209      @Override
1210      public Filter.ReturnCode filterCell(final Cell c) throws IOException {
1211        if (timeToGetHint.get()) {
1212          timeToGetHint.set(false);
1213          return Filter.ReturnCode.SEEK_NEXT_USING_HINT;
1214        } else {
1215          return Filter.ReturnCode.INCLUDE;
1216        }
1217      }
1218      @Override
1219      public Cell getNextCellHint(Cell currentCell) throws IOException {
1220        return currentCell;
1221      }
1222    }, expectedSize);
1223  }
1224
1225  private void testFlushBeforeCompletingScan(MyListHook hook, Filter filter, int expectedSize)
1226          throws IOException, InterruptedException {
1227    Configuration conf = HBaseConfiguration.create();
1228    byte[] r0 = Bytes.toBytes("row0");
1229    byte[] r1 = Bytes.toBytes("row1");
1230    byte[] r2 = Bytes.toBytes("row2");
1231    byte[] value0 = Bytes.toBytes("value0");
1232    byte[] value1 = Bytes.toBytes("value1");
1233    byte[] value2 = Bytes.toBytes("value2");
1234    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1235    long ts = EnvironmentEdgeManager.currentTime();
1236    long seqId = 100;
1237    init(name.getMethodName(), conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)),
1238      ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(1).build(),
1239      new MyStoreHook() {
1240        @Override
1241        public long getSmallestReadPoint(HStore store) {
1242          return seqId + 3;
1243        }
1244      });
1245    // The cells having the value0 won't be flushed to disk because the value of max version is 1
1246    store.add(createCell(r0, qf1, ts, seqId, value0), memStoreSizing);
1247    store.add(createCell(r0, qf2, ts, seqId, value0), memStoreSizing);
1248    store.add(createCell(r0, qf3, ts, seqId, value0), memStoreSizing);
1249    store.add(createCell(r1, qf1, ts + 1, seqId + 1, value1), memStoreSizing);
1250    store.add(createCell(r1, qf2, ts + 1, seqId + 1, value1), memStoreSizing);
1251    store.add(createCell(r1, qf3, ts + 1, seqId + 1, value1), memStoreSizing);
1252    store.add(createCell(r2, qf1, ts + 2, seqId + 2, value2), memStoreSizing);
1253    store.add(createCell(r2, qf2, ts + 2, seqId + 2, value2), memStoreSizing);
1254    store.add(createCell(r2, qf3, ts + 2, seqId + 2, value2), memStoreSizing);
1255    store.add(createCell(r1, qf1, ts + 3, seqId + 3, value1), memStoreSizing);
1256    store.add(createCell(r1, qf2, ts + 3, seqId + 3, value1), memStoreSizing);
1257    store.add(createCell(r1, qf3, ts + 3, seqId + 3, value1), memStoreSizing);
1258    List<Cell> myList = new MyList<>(hook);
1259    Scan scan = new Scan()
1260            .withStartRow(r1)
1261            .setFilter(filter);
1262    try (InternalScanner scanner = (InternalScanner) store.getScanner(
1263          scan, null, seqId + 3)){
1264      // r1
1265      scanner.next(myList);
1266      assertEquals(expectedSize, myList.size());
1267      for (Cell c : myList) {
1268        byte[] actualValue = CellUtil.cloneValue(c);
1269        assertTrue("expected:" + Bytes.toStringBinary(value1)
1270          + ", actual:" + Bytes.toStringBinary(actualValue)
1271          , Bytes.equals(actualValue, value1));
1272      }
1273      List<Cell> normalList = new ArrayList<>(3);
1274      // r2
1275      scanner.next(normalList);
1276      assertEquals(3, normalList.size());
1277      for (Cell c : normalList) {
1278        byte[] actualValue = CellUtil.cloneValue(c);
1279        assertTrue("expected:" + Bytes.toStringBinary(value2)
1280          + ", actual:" + Bytes.toStringBinary(actualValue)
1281          , Bytes.equals(actualValue, value2));
1282      }
1283    }
1284  }
1285
1286  @Test
1287  public void testCreateScannerAndSnapshotConcurrently() throws IOException, InterruptedException {
1288    Configuration conf = HBaseConfiguration.create();
1289    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore.class.getName());
1290    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
1291        .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
1292    byte[] value = Bytes.toBytes("value");
1293    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1294    long ts = EnvironmentEdgeManager.currentTime();
1295    long seqId = 100;
1296    // older data whihc shouldn't be "seen" by client
1297    store.add(createCell(qf1, ts, seqId, value), memStoreSizing);
1298    store.add(createCell(qf2, ts, seqId, value), memStoreSizing);
1299    store.add(createCell(qf3, ts, seqId, value), memStoreSizing);
1300    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1301    quals.add(qf1);
1302    quals.add(qf2);
1303    quals.add(qf3);
1304    StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY);
1305    MyCompactingMemStore.START_TEST.set(true);
1306    Runnable flush = () -> {
1307      // this is blocked until we create first scanner from pipeline and snapshot -- phase (1/5)
1308      // recreate the active memstore -- phase (4/5)
1309      storeFlushCtx.prepare();
1310    };
1311    ExecutorService service = Executors.newSingleThreadExecutor();
1312    service.submit(flush);
1313    // we get scanner from pipeline and snapshot but they are empty. -- phase (2/5)
1314    // this is blocked until we recreate the active memstore -- phase (3/5)
1315    // we get scanner from active memstore but it is empty -- phase (5/5)
1316    InternalScanner scanner = (InternalScanner) store.getScanner(
1317          new Scan(new Get(row)), quals, seqId + 1);
1318    service.shutdown();
1319    service.awaitTermination(20, TimeUnit.SECONDS);
1320    try {
1321      try {
1322        List<Cell> results = new ArrayList<>();
1323        scanner.next(results);
1324        assertEquals(3, results.size());
1325        for (Cell c : results) {
1326          byte[] actualValue = CellUtil.cloneValue(c);
1327          assertTrue("expected:" + Bytes.toStringBinary(value)
1328            + ", actual:" + Bytes.toStringBinary(actualValue)
1329            , Bytes.equals(actualValue, value));
1330        }
1331      } finally {
1332        scanner.close();
1333      }
1334    } finally {
1335      MyCompactingMemStore.START_TEST.set(false);
1336      storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1337      storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1338    }
1339  }
1340
1341  @Test
1342  public void testScanWithDoubleFlush() throws IOException {
1343    Configuration conf = HBaseConfiguration.create();
1344    // Initialize region
1345    MyStore myStore = initMyStore(name.getMethodName(), conf, new MyStoreHook(){
1346      @Override
1347      public void getScanners(MyStore store) throws IOException {
1348        final long tmpId = id++;
1349        ExecutorService s = Executors.newSingleThreadExecutor();
1350        s.submit(() -> {
1351          try {
1352            // flush the store before storescanner updates the scanners from store.
1353            // The current data will be flushed into files, and the memstore will
1354            // be clear.
1355            // -- phase (4/4)
1356            flushStore(store, tmpId);
1357          }catch (IOException ex) {
1358            throw new RuntimeException(ex);
1359          }
1360        });
1361        s.shutdown();
1362        try {
1363          // wait for the flush, the thread will be blocked in HStore#notifyChangedReadersObservers.
1364          s.awaitTermination(3, TimeUnit.SECONDS);
1365        } catch (InterruptedException ex) {
1366        }
1367      }
1368    });
1369    byte[] oldValue = Bytes.toBytes("oldValue");
1370    byte[] currentValue = Bytes.toBytes("currentValue");
1371    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1372    long ts = EnvironmentEdgeManager.currentTime();
1373    long seqId = 100;
1374    // older data whihc shouldn't be "seen" by client
1375    myStore.add(createCell(qf1, ts, seqId, oldValue), memStoreSizing);
1376    myStore.add(createCell(qf2, ts, seqId, oldValue), memStoreSizing);
1377    myStore.add(createCell(qf3, ts, seqId, oldValue), memStoreSizing);
1378    long snapshotId = id++;
1379    // push older data into snapshot -- phase (1/4)
1380    StoreFlushContext storeFlushCtx = store.createFlushContext(snapshotId, FlushLifeCycleTracker
1381        .DUMMY);
1382    storeFlushCtx.prepare();
1383
1384    // insert current data into active -- phase (2/4)
1385    myStore.add(createCell(qf1, ts + 1, seqId + 1, currentValue), memStoreSizing);
1386    myStore.add(createCell(qf2, ts + 1, seqId + 1, currentValue), memStoreSizing);
1387    myStore.add(createCell(qf3, ts + 1, seqId + 1, currentValue), memStoreSizing);
1388    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1389    quals.add(qf1);
1390    quals.add(qf2);
1391    quals.add(qf3);
1392    try (InternalScanner scanner = (InternalScanner) myStore.getScanner(
1393        new Scan(new Get(row)), quals, seqId + 1)) {
1394      // complete the flush -- phase (3/4)
1395      storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1396      storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1397
1398      List<Cell> results = new ArrayList<>();
1399      scanner.next(results);
1400      assertEquals(3, results.size());
1401      for (Cell c : results) {
1402        byte[] actualValue = CellUtil.cloneValue(c);
1403        assertTrue("expected:" + Bytes.toStringBinary(currentValue)
1404          + ", actual:" + Bytes.toStringBinary(actualValue)
1405          , Bytes.equals(actualValue, currentValue));
1406      }
1407    }
1408  }
1409
1410  @Test
1411  public void testReclaimChunkWhenScaning() throws IOException {
1412    init("testReclaimChunkWhenScaning");
1413    long ts = EnvironmentEdgeManager.currentTime();
1414    long seqId = 100;
1415    byte[] value = Bytes.toBytes("value");
1416    // older data whihc shouldn't be "seen" by client
1417    store.add(createCell(qf1, ts, seqId, value), null);
1418    store.add(createCell(qf2, ts, seqId, value), null);
1419    store.add(createCell(qf3, ts, seqId, value), null);
1420    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1421    quals.add(qf1);
1422    quals.add(qf2);
1423    quals.add(qf3);
1424    try (InternalScanner scanner = (InternalScanner) store.getScanner(
1425        new Scan(new Get(row)), quals, seqId)) {
1426      List<Cell> results = new MyList<>(size -> {
1427        switch (size) {
1428          // 1) we get the first cell (qf1)
1429          // 2) flush the data to have StoreScanner update inner scanners
1430          // 3) the chunk will be reclaimed after updaing
1431          case 1:
1432            try {
1433              flushStore(store, id++);
1434            } catch (IOException e) {
1435              throw new RuntimeException(e);
1436            }
1437            break;
1438          // 1) we get the second cell (qf2)
1439          // 2) add some cell to fill some byte into the chunk (we have only one chunk)
1440          case 2:
1441            try {
1442              byte[] newValue = Bytes.toBytes("newValue");
1443              // older data whihc shouldn't be "seen" by client
1444              store.add(createCell(qf1, ts + 1, seqId + 1, newValue), null);
1445              store.add(createCell(qf2, ts + 1, seqId + 1, newValue), null);
1446              store.add(createCell(qf3, ts + 1, seqId + 1, newValue), null);
1447            } catch (IOException e) {
1448              throw new RuntimeException(e);
1449            }
1450            break;
1451          default:
1452            break;
1453        }
1454      });
1455      scanner.next(results);
1456      assertEquals(3, results.size());
1457      for (Cell c : results) {
1458        byte[] actualValue = CellUtil.cloneValue(c);
1459        assertTrue("expected:" + Bytes.toStringBinary(value)
1460          + ", actual:" + Bytes.toStringBinary(actualValue)
1461          , Bytes.equals(actualValue, value));
1462      }
1463    }
1464  }
1465
1466  /**
1467   * If there are two running InMemoryFlushRunnable, the later InMemoryFlushRunnable
1468   * may change the versionedList. And the first InMemoryFlushRunnable will use the chagned
1469   * versionedList to remove the corresponding segments.
1470   * In short, there will be some segements which isn't in merge are removed.
1471   * @throws IOException
1472   * @throws InterruptedException
1473   */
1474  @Test
1475  public void testRunDoubleMemStoreCompactors() throws IOException, InterruptedException {
1476    int flushSize = 500;
1477    Configuration conf = HBaseConfiguration.create();
1478    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStoreWithCustomCompactor.class.getName());
1479    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.25);
1480    MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.set(0);
1481    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushSize));
1482    // Set the lower threshold to invoke the "MERGE" policy
1483    conf.set(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, String.valueOf(0));
1484    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
1485        .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
1486    byte[] value = Bytes.toBytes("thisisavarylargevalue");
1487    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1488    long ts = EnvironmentEdgeManager.currentTime();
1489    long seqId = 100;
1490    // older data whihc shouldn't be "seen" by client
1491    store.add(createCell(qf1, ts, seqId, value), memStoreSizing);
1492    store.add(createCell(qf2, ts, seqId, value), memStoreSizing);
1493    store.add(createCell(qf3, ts, seqId, value), memStoreSizing);
1494    assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
1495    StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY);
1496    storeFlushCtx.prepare();
1497    // This shouldn't invoke another in-memory flush because the first compactor thread
1498    // hasn't accomplished the in-memory compaction.
1499    store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing);
1500    store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing);
1501    store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing);
1502    assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
1503    //okay. Let the compaction be completed
1504    MyMemStoreCompactor.START_COMPACTOR_LATCH.countDown();
1505    CompactingMemStore mem = (CompactingMemStore) ((HStore)store).memstore;
1506    while (mem.isMemStoreFlushingInMemory()) {
1507      TimeUnit.SECONDS.sleep(1);
1508    }
1509    // This should invoke another in-memory flush.
1510    store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing);
1511    store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing);
1512    store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing);
1513    assertEquals(2, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
1514    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
1515      String.valueOf(TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE));
1516    storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1517    storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1518  }
1519
1520  @Test
1521  public void testAge() throws IOException {
1522    long currentTime = System.currentTimeMillis();
1523    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
1524    edge.setValue(currentTime);
1525    EnvironmentEdgeManager.injectEdge(edge);
1526    Configuration conf = TEST_UTIL.getConfiguration();
1527    ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(family);
1528    initHRegion(name.getMethodName(), conf,
1529      TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), hcd, null, false);
1530    HStore store = new HStore(region, hcd, conf, false) {
1531
1532      @Override
1533      protected StoreEngine<?, ?, ?, ?> createStoreEngine(HStore store, Configuration conf,
1534          CellComparator kvComparator) throws IOException {
1535        List<HStoreFile> storefiles =
1536            Arrays.asList(mockStoreFile(currentTime - 10), mockStoreFile(currentTime - 100),
1537              mockStoreFile(currentTime - 1000), mockStoreFile(currentTime - 10000));
1538        StoreFileManager sfm = mock(StoreFileManager.class);
1539        when(sfm.getStorefiles()).thenReturn(storefiles);
1540        StoreEngine<?, ?, ?, ?> storeEngine = mock(StoreEngine.class);
1541        when(storeEngine.getStoreFileManager()).thenReturn(sfm);
1542        return storeEngine;
1543      }
1544    };
1545    assertEquals(10L, store.getMinStoreFileAge().getAsLong());
1546    assertEquals(10000L, store.getMaxStoreFileAge().getAsLong());
1547    assertEquals((10 + 100 + 1000 + 10000) / 4.0, store.getAvgStoreFileAge().getAsDouble(), 1E-4);
1548  }
1549
1550  private HStoreFile mockStoreFile(long createdTime) {
1551    StoreFileInfo info = mock(StoreFileInfo.class);
1552    when(info.getCreatedTimestamp()).thenReturn(createdTime);
1553    HStoreFile sf = mock(HStoreFile.class);
1554    when(sf.getReader()).thenReturn(mock(StoreFileReader.class));
1555    when(sf.isHFile()).thenReturn(true);
1556    when(sf.getFileInfo()).thenReturn(info);
1557    return sf;
1558  }
1559
1560  private MyStore initMyStore(String methodName, Configuration conf, MyStoreHook hook)
1561      throws IOException {
1562    return (MyStore) init(methodName, conf,
1563      TableDescriptorBuilder.newBuilder(TableName.valueOf(table)),
1564      ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(5).build(), hook);
1565  }
1566
1567  private static class MyStore extends HStore {
1568    private final MyStoreHook hook;
1569
1570    MyStore(final HRegion region, final ColumnFamilyDescriptor family, final Configuration
1571        confParam, MyStoreHook hook, boolean switchToPread) throws IOException {
1572      super(region, family, confParam, false);
1573      this.hook = hook;
1574    }
1575
1576    @Override
1577    public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks,
1578        boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,
1579        boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt,
1580        boolean includeMemstoreScanner) throws IOException {
1581      hook.getScanners(this);
1582      return super.getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true,
1583        stopRow, false, readPt, includeMemstoreScanner);
1584    }
1585
1586    @Override
1587    public long getSmallestReadPoint() {
1588      return hook.getSmallestReadPoint(this);
1589    }
1590  }
1591
1592  private abstract static class MyStoreHook {
1593
1594    void getScanners(MyStore store) throws IOException {
1595    }
1596
1597    long getSmallestReadPoint(HStore store) {
1598      return store.getHRegion().getSmallestReadPoint();
1599    }
1600  }
1601
1602  @Test
1603  public void testSwitchingPreadtoStreamParallelyWithCompactionDischarger() throws Exception {
1604    Configuration conf = HBaseConfiguration.create();
1605    conf.set("hbase.hstore.engine.class", DummyStoreEngine.class.getName());
1606    conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 0);
1607    // Set the lower threshold to invoke the "MERGE" policy
1608    MyStore store = initMyStore(name.getMethodName(), conf, new MyStoreHook() {});
1609    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1610    long ts = System.currentTimeMillis();
1611    long seqID = 1L;
1612    // Add some data to the region and do some flushes
1613    for (int i = 1; i < 10; i++) {
1614      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1615        memStoreSizing);
1616    }
1617    // flush them
1618    flushStore(store, seqID);
1619    for (int i = 11; i < 20; i++) {
1620      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1621        memStoreSizing);
1622    }
1623    // flush them
1624    flushStore(store, seqID);
1625    for (int i = 21; i < 30; i++) {
1626      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1627        memStoreSizing);
1628    }
1629    // flush them
1630    flushStore(store, seqID);
1631
1632    assertEquals(3, store.getStorefilesCount());
1633    Scan scan = new Scan();
1634    scan.addFamily(family);
1635    Collection<HStoreFile> storefiles2 = store.getStorefiles();
1636    ArrayList<HStoreFile> actualStorefiles = Lists.newArrayList(storefiles2);
1637    StoreScanner storeScanner =
1638        (StoreScanner) store.getScanner(scan, scan.getFamilyMap().get(family), Long.MAX_VALUE);
1639    // get the current heap
1640    KeyValueHeap heap = storeScanner.heap;
1641    // create more store files
1642    for (int i = 31; i < 40; i++) {
1643      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1644        memStoreSizing);
1645    }
1646    // flush them
1647    flushStore(store, seqID);
1648
1649    for (int i = 41; i < 50; i++) {
1650      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1651        memStoreSizing);
1652    }
1653    // flush them
1654    flushStore(store, seqID);
1655    storefiles2 = store.getStorefiles();
1656    ArrayList<HStoreFile> actualStorefiles1 = Lists.newArrayList(storefiles2);
1657    actualStorefiles1.removeAll(actualStorefiles);
1658    // Do compaction
1659    MyThread thread = new MyThread(storeScanner);
1660    thread.start();
1661    store.replaceStoreFiles(actualStorefiles, actualStorefiles1);
1662    thread.join();
1663    KeyValueHeap heap2 = thread.getHeap();
1664    assertFalse(heap.equals(heap2));
1665  }
1666
1667  @Test
1668  public void testInMemoryCompactionTypeWithLowerCase() throws IOException, InterruptedException {
1669    Configuration conf = HBaseConfiguration.create();
1670    conf.set("hbase.systemtables.compacting.memstore.type", "eager");
1671    init(name.getMethodName(), conf,
1672      TableDescriptorBuilder.newBuilder(
1673        TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME, "meta".getBytes())),
1674      ColumnFamilyDescriptorBuilder.newBuilder(family)
1675        .setInMemoryCompaction(MemoryCompactionPolicy.NONE).build());
1676    assertTrue(((MemStoreCompactor) ((CompactingMemStore) store.memstore).compactor).toString()
1677      .startsWith("eager".toUpperCase()));
1678  }
1679
1680  @Test
1681  public void testSpaceQuotaChangeAfterReplacement() throws IOException {
1682    final TableName tn = TableName.valueOf(name.getMethodName());
1683    init(name.getMethodName());
1684
1685    RegionSizeStoreImpl sizeStore = new RegionSizeStoreImpl();
1686
1687    HStoreFile sf1 = mockStoreFileWithLength(1024L);
1688    HStoreFile sf2 = mockStoreFileWithLength(2048L);
1689    HStoreFile sf3 = mockStoreFileWithLength(4096L);
1690    HStoreFile sf4 = mockStoreFileWithLength(8192L);
1691
1692    RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tn).setStartKey(Bytes.toBytes("a"))
1693        .setEndKey(Bytes.toBytes("b")).build();
1694
1695    // Compacting two files down to one, reducing size
1696    sizeStore.put(regionInfo, 1024L + 4096L);
1697    store.updateSpaceQuotaAfterFileReplacement(
1698        sizeStore, regionInfo, Arrays.asList(sf1, sf3), Arrays.asList(sf2));
1699
1700    assertEquals(2048L, sizeStore.getRegionSize(regionInfo).getSize());
1701
1702    // The same file length in and out should have no change
1703    store.updateSpaceQuotaAfterFileReplacement(
1704        sizeStore, regionInfo, Arrays.asList(sf2), Arrays.asList(sf2));
1705
1706    assertEquals(2048L, sizeStore.getRegionSize(regionInfo).getSize());
1707
1708    // Increase the total size used
1709    store.updateSpaceQuotaAfterFileReplacement(
1710        sizeStore, regionInfo, Arrays.asList(sf2), Arrays.asList(sf3));
1711
1712    assertEquals(4096L, sizeStore.getRegionSize(regionInfo).getSize());
1713
1714    RegionInfo regionInfo2 = RegionInfoBuilder.newBuilder(tn).setStartKey(Bytes.toBytes("b"))
1715        .setEndKey(Bytes.toBytes("c")).build();
1716    store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo2, null, Arrays.asList(sf4));
1717
1718    assertEquals(8192L, sizeStore.getRegionSize(regionInfo2).getSize());
1719  }
1720
1721  @Test
1722  public void testHFileContextSetWithCFAndTable() throws Exception {
1723    init(this.name.getMethodName());
1724    StoreFileWriter writer = store.createWriterInTmp(10000L,
1725        Compression.Algorithm.NONE, false, true, false, true);
1726    HFileContext hFileContext = writer.getHFileWriter().getFileContext();
1727    assertArrayEquals(family, hFileContext.getColumnFamily());
1728    assertArrayEquals(table, hFileContext.getTableName());
1729  }
1730
1731  private HStoreFile mockStoreFileWithLength(long length) {
1732    HStoreFile sf = mock(HStoreFile.class);
1733    StoreFileReader sfr = mock(StoreFileReader.class);
1734    when(sf.isHFile()).thenReturn(true);
1735    when(sf.getReader()).thenReturn(sfr);
1736    when(sfr.length()).thenReturn(length);
1737    return sf;
1738  }
1739
1740  private static class MyThread extends Thread {
1741    private StoreScanner scanner;
1742    private KeyValueHeap heap;
1743
1744    public MyThread(StoreScanner scanner) {
1745      this.scanner = scanner;
1746    }
1747
1748    public KeyValueHeap getHeap() {
1749      return this.heap;
1750    }
1751
1752    @Override
1753    public void run() {
1754      scanner.trySwitchToStreamRead();
1755      heap = scanner.heap;
1756    }
1757  }
1758
1759  private static class MyMemStoreCompactor extends MemStoreCompactor {
1760    private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0);
1761    private static final CountDownLatch START_COMPACTOR_LATCH = new CountDownLatch(1);
1762    public MyMemStoreCompactor(CompactingMemStore compactingMemStore, MemoryCompactionPolicy
1763        compactionPolicy) throws IllegalArgumentIOException {
1764      super(compactingMemStore, compactionPolicy);
1765    }
1766
1767    @Override
1768    public boolean start() throws IOException {
1769      boolean isFirst = RUNNER_COUNT.getAndIncrement() == 0;
1770      if (isFirst) {
1771        try {
1772          START_COMPACTOR_LATCH.await();
1773          return super.start();
1774        } catch (InterruptedException ex) {
1775          throw new RuntimeException(ex);
1776        }
1777      }
1778      return super.start();
1779    }
1780  }
1781
1782  public static class MyCompactingMemStoreWithCustomCompactor extends CompactingMemStore {
1783    private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0);
1784    public MyCompactingMemStoreWithCustomCompactor(Configuration conf, CellComparatorImpl c,
1785        HStore store, RegionServicesForStores regionServices,
1786        MemoryCompactionPolicy compactionPolicy) throws IOException {
1787      super(conf, c, store, regionServices, compactionPolicy);
1788    }
1789
1790    @Override
1791    protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy)
1792        throws IllegalArgumentIOException {
1793      return new MyMemStoreCompactor(this, compactionPolicy);
1794    }
1795
1796    @Override
1797    protected boolean setInMemoryCompactionFlag() {
1798      boolean rval = super.setInMemoryCompactionFlag();
1799      if (rval) {
1800        RUNNER_COUNT.incrementAndGet();
1801        if (LOG.isDebugEnabled()) {
1802          LOG.debug("runner count: " + RUNNER_COUNT.get());
1803        }
1804      }
1805      return rval;
1806    }
1807  }
1808
1809  public static class MyCompactingMemStore extends CompactingMemStore {
1810    private static final AtomicBoolean START_TEST = new AtomicBoolean(false);
1811    private final CountDownLatch getScannerLatch = new CountDownLatch(1);
1812    private final CountDownLatch snapshotLatch = new CountDownLatch(1);
1813    public MyCompactingMemStore(Configuration conf, CellComparatorImpl c,
1814        HStore store, RegionServicesForStores regionServices,
1815        MemoryCompactionPolicy compactionPolicy) throws IOException {
1816      super(conf, c, store, regionServices, compactionPolicy);
1817    }
1818
1819    @Override
1820    protected List<KeyValueScanner> createList(int capacity) {
1821      if (START_TEST.get()) {
1822        try {
1823          getScannerLatch.countDown();
1824          snapshotLatch.await();
1825        } catch (InterruptedException e) {
1826          throw new RuntimeException(e);
1827        }
1828      }
1829      return new ArrayList<>(capacity);
1830    }
1831    @Override
1832    protected void pushActiveToPipeline(MutableSegment active) {
1833      if (START_TEST.get()) {
1834        try {
1835          getScannerLatch.await();
1836        } catch (InterruptedException e) {
1837          throw new RuntimeException(e);
1838        }
1839      }
1840
1841      super.pushActiveToPipeline(active);
1842      if (START_TEST.get()) {
1843        snapshotLatch.countDown();
1844      }
1845    }
1846  }
1847
1848  interface MyListHook {
1849    void hook(int currentSize);
1850  }
1851
1852  private static class MyList<T> implements List<T> {
1853    private final List<T> delegatee = new ArrayList<>();
1854    private final MyListHook hookAtAdd;
1855    MyList(final MyListHook hookAtAdd) {
1856      this.hookAtAdd = hookAtAdd;
1857    }
1858    @Override
1859    public int size() {return delegatee.size();}
1860
1861    @Override
1862    public boolean isEmpty() {return delegatee.isEmpty();}
1863
1864    @Override
1865    public boolean contains(Object o) {return delegatee.contains(o);}
1866
1867    @Override
1868    public Iterator<T> iterator() {return delegatee.iterator();}
1869
1870    @Override
1871    public Object[] toArray() {return delegatee.toArray();}
1872
1873    @Override
1874    public <R> R[] toArray(R[] a) {return delegatee.toArray(a);}
1875
1876    @Override
1877    public boolean add(T e) {
1878      hookAtAdd.hook(size());
1879      return delegatee.add(e);
1880    }
1881
1882    @Override
1883    public boolean remove(Object o) {return delegatee.remove(o);}
1884
1885    @Override
1886    public boolean containsAll(Collection<?> c) {return delegatee.containsAll(c);}
1887
1888    @Override
1889    public boolean addAll(Collection<? extends T> c) {return delegatee.addAll(c);}
1890
1891    @Override
1892    public boolean addAll(int index, Collection<? extends T> c) {return delegatee.addAll(index, c);}
1893
1894    @Override
1895    public boolean removeAll(Collection<?> c) {return delegatee.removeAll(c);}
1896
1897    @Override
1898    public boolean retainAll(Collection<?> c) {return delegatee.retainAll(c);}
1899
1900    @Override
1901    public void clear() {delegatee.clear();}
1902
1903    @Override
1904    public T get(int index) {return delegatee.get(index);}
1905
1906    @Override
1907    public T set(int index, T element) {return delegatee.set(index, element);}
1908
1909    @Override
1910    public void add(int index, T element) {delegatee.add(index, element);}
1911
1912    @Override
1913    public T remove(int index) {return delegatee.remove(index);}
1914
1915    @Override
1916    public int indexOf(Object o) {return delegatee.indexOf(o);}
1917
1918    @Override
1919    public int lastIndexOf(Object o) {return delegatee.lastIndexOf(o);}
1920
1921    @Override
1922    public ListIterator<T> listIterator() {return delegatee.listIterator();}
1923
1924    @Override
1925    public ListIterator<T> listIterator(int index) {return delegatee.listIterator(index);}
1926
1927    @Override
1928    public List<T> subList(int fromIndex, int toIndex) {return delegatee.subList(fromIndex, toIndex);}
1929  }
1930}