001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.compactions;
019
020import static org.apache.hadoop.hbase.regionserver.StripeStoreConfig.MAX_FILES_KEY;
021import static org.apache.hadoop.hbase.regionserver.StripeStoreConfig.MIN_FILES_KEY;
022import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY;
023import static org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_SIZE_KEY;
024import static org.junit.Assert.assertEquals;
025import static org.junit.Assert.assertFalse;
026import static org.junit.Assert.assertNotNull;
027import static org.junit.Assert.assertNull;
028import static org.junit.Assert.assertTrue;
029import static org.mockito.AdditionalMatchers.aryEq;
030import static org.mockito.ArgumentMatchers.any;
031import static org.mockito.ArgumentMatchers.anyBoolean;
032import static org.mockito.ArgumentMatchers.anyInt;
033import static org.mockito.ArgumentMatchers.anyLong;
034import static org.mockito.ArgumentMatchers.argThat;
035import static org.mockito.ArgumentMatchers.eq;
036import static org.mockito.ArgumentMatchers.isNull;
037import static org.mockito.Mockito.mock;
038import static org.mockito.Mockito.only;
039import static org.mockito.Mockito.times;
040import static org.mockito.Mockito.verify;
041import static org.mockito.Mockito.when;
042
043import java.io.IOException;
044import java.util.ArrayList;
045import java.util.Arrays;
046import java.util.Collection;
047import java.util.Iterator;
048import java.util.List;
049import java.util.OptionalLong;
050import org.apache.hadoop.conf.Configuration;
051import org.apache.hadoop.fs.Path;
052import org.apache.hadoop.hbase.Cell;
053import org.apache.hadoop.hbase.CellComparatorImpl;
054import org.apache.hadoop.hbase.HBaseClassTestRule;
055import org.apache.hadoop.hbase.HBaseConfiguration;
056import org.apache.hadoop.hbase.HColumnDescriptor;
057import org.apache.hadoop.hbase.HRegionInfo;
058import org.apache.hadoop.hbase.KeyValue;
059import org.apache.hadoop.hbase.io.TimeRange;
060import org.apache.hadoop.hbase.io.hfile.HFile;
061import org.apache.hadoop.hbase.regionserver.BloomType;
062import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams;
063import org.apache.hadoop.hbase.regionserver.HStore;
064import org.apache.hadoop.hbase.regionserver.HStoreFile;
065import org.apache.hadoop.hbase.regionserver.InternalScanner;
066import org.apache.hadoop.hbase.regionserver.ScanInfo;
067import org.apache.hadoop.hbase.regionserver.ScanType;
068import org.apache.hadoop.hbase.regionserver.ScannerContext;
069import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
070import org.apache.hadoop.hbase.regionserver.StoreEngine;
071import org.apache.hadoop.hbase.regionserver.StoreFileReader;
072import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
073import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
074import org.apache.hadoop.hbase.regionserver.StripeStoreConfig;
075import org.apache.hadoop.hbase.regionserver.StripeStoreFileManager;
076import org.apache.hadoop.hbase.regionserver.StripeStoreFlusher;
077import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy.StripeInformationProvider;
078import org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.StoreFileWritersCapture;
079import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
080import org.apache.hadoop.hbase.testclassification.MediumTests;
081import org.apache.hadoop.hbase.testclassification.RegionServerTests;
082import org.apache.hadoop.hbase.util.Bytes;
083import org.apache.hadoop.hbase.util.ConcatenatedLists;
084import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
085import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
086import org.junit.ClassRule;
087import org.junit.Test;
088import org.junit.experimental.categories.Category;
089import org.junit.runner.RunWith;
090import org.junit.runners.Parameterized;
091import org.junit.runners.Parameterized.Parameter;
092import org.junit.runners.Parameterized.Parameters;
093import org.mockito.ArgumentMatcher;
094
095import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
096import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
097
098@RunWith(Parameterized.class)
099@Category({ RegionServerTests.class, MediumTests.class })
100public class TestStripeCompactionPolicy {
101
102  @ClassRule
103  public static final HBaseClassTestRule CLASS_RULE =
104    HBaseClassTestRule.forClass(TestStripeCompactionPolicy.class);
105
106  private static final byte[] KEY_A = Bytes.toBytes("aaa");
107  private static final byte[] KEY_B = Bytes.toBytes("bbb");
108  private static final byte[] KEY_C = Bytes.toBytes("ccc");
109  private static final byte[] KEY_D = Bytes.toBytes("ddd");
110  private static final byte[] KEY_E = Bytes.toBytes("eee");
111  private static final KeyValue KV_A = new KeyValue(KEY_A, 0L);
112  private static final KeyValue KV_B = new KeyValue(KEY_B, 0L);
113  private static final KeyValue KV_C = new KeyValue(KEY_C, 0L);
114  private static final KeyValue KV_D = new KeyValue(KEY_D, 0L);
115  private static final KeyValue KV_E = new KeyValue(KEY_E, 0L);
116
117  private static long defaultSplitSize = 18;
118  private static float defaultSplitCount = 1.8F;
119  private final static int defaultInitialCount = 1;
120  private static long defaultTtl = 1000 * 1000;
121
122  @Parameters(name = "{index}: usePrivateReaders={0}")
123  public static Iterable<Object[]> data() {
124    return Arrays.asList(new Object[] { true }, new Object[] { false });
125  }
126
127  @Parameter
128  public boolean usePrivateReaders;
129
130  @Test
131  public void testNoStripesFromFlush() throws Exception {
132    Configuration conf = HBaseConfiguration.create();
133    conf.setBoolean(StripeStoreConfig.FLUSH_TO_L0_KEY, true);
134    StripeCompactionPolicy policy = createPolicy(conf);
135    StripeInformationProvider si = createStripesL0Only(0, 0);
136
137    KeyValue[] input = new KeyValue[] { KV_A, KV_B, KV_C, KV_D, KV_E };
138    KeyValue[][] expected = new KeyValue[][] { input };
139    verifyFlush(policy, si, input, expected, null);
140  }
141
142  @Test
143  public void testOldStripesFromFlush() throws Exception {
144    StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create());
145    StripeInformationProvider si = createStripes(0, KEY_C, KEY_D);
146
147    KeyValue[] input = new KeyValue[] { KV_B, KV_C, KV_C, KV_D, KV_E };
148    KeyValue[][] expected = new KeyValue[][] { new KeyValue[] { KV_B },
149      new KeyValue[] { KV_C, KV_C }, new KeyValue[] { KV_D, KV_E } };
150    verifyFlush(policy, si, input, expected, new byte[][] { OPEN_KEY, KEY_C, KEY_D, OPEN_KEY });
151  }
152
153  @Test
154  public void testNewStripesFromFlush() throws Exception {
155    StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create());
156    StripeInformationProvider si = createStripesL0Only(0, 0);
157    KeyValue[] input = new KeyValue[] { KV_B, KV_C, KV_C, KV_D, KV_E };
158    // Starts with one stripe; unlike flush results, must have metadata
159    KeyValue[][] expected = new KeyValue[][] { input };
160    verifyFlush(policy, si, input, expected, new byte[][] { OPEN_KEY, OPEN_KEY });
161  }
162
163  @Test
164  public void testSingleStripeCompaction() throws Exception {
165    // Create a special policy that only compacts single stripes, using standard methods.
166    Configuration conf = HBaseConfiguration.create();
167    // Test depends on this not being set to pass. Default breaks test. TODO: Revisit.
168    conf.unset("hbase.hstore.compaction.min.size");
169    conf.setFloat(CompactionConfiguration.HBASE_HSTORE_COMPACTION_RATIO_KEY, 1.0F);
170    conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 3);
171    conf.setInt(StripeStoreConfig.MAX_FILES_KEY, 4);
172    conf.setLong(StripeStoreConfig.SIZE_TO_SPLIT_KEY, 1000); // make sure the are no splits
173    StoreConfigInformation sci = mock(StoreConfigInformation.class);
174    when(sci.getRegionInfo()).thenReturn(HRegionInfo.FIRST_META_REGIONINFO);
175    StripeStoreConfig ssc = new StripeStoreConfig(conf, sci);
176    StripeCompactionPolicy policy = new StripeCompactionPolicy(conf, sci, ssc) {
177      @Override
178      public StripeCompactionRequest selectCompaction(StripeInformationProvider si,
179        List<HStoreFile> filesCompacting, boolean isOffpeak) throws IOException {
180        if (!filesCompacting.isEmpty()) {
181          return null;
182        }
183        return selectSingleStripeCompaction(si, false, false, isOffpeak);
184      }
185
186      @Override
187      public boolean needsCompactions(StripeInformationProvider si,
188        List<HStoreFile> filesCompacting) {
189        if (!filesCompacting.isEmpty()) {
190          return false;
191        }
192        return needsSingleStripeCompaction(si);
193      }
194    };
195
196    // No compaction due to min files or ratio
197    StripeInformationProvider si =
198      createStripesWithSizes(0, 0, new Long[] { 2L }, new Long[] { 3L, 3L }, new Long[] { 5L, 1L });
199    verifyNoCompaction(policy, si);
200    // No compaction due to min files or ratio - will report needed, but not do any.
201    si = createStripesWithSizes(0, 0, new Long[] { 2L }, new Long[] { 3L, 3L },
202      new Long[] { 5L, 1L, 1L });
203    assertNull(policy.selectCompaction(si, al(), false));
204    assertTrue(policy.needsCompactions(si, al()));
205    // One stripe has possible compaction
206    si = createStripesWithSizes(0, 0, new Long[] { 2L }, new Long[] { 3L, 3L },
207      new Long[] { 5L, 4L, 3L });
208    verifySingleStripeCompaction(policy, si, 2, null);
209    // Several stripes have possible compactions; choose best quality (removes most files)
210    si = createStripesWithSizes(0, 0, new Long[] { 3L, 2L, 2L }, new Long[] { 2L, 2L, 1L },
211      new Long[] { 3L, 2L, 2L, 1L });
212    verifySingleStripeCompaction(policy, si, 2, null);
213    si = createStripesWithSizes(0, 0, new Long[] { 5L }, new Long[] { 3L, 2L, 2L, 1L },
214      new Long[] { 3L, 2L, 2L });
215    verifySingleStripeCompaction(policy, si, 1, null);
216    // Or with smallest files, if the count is the same
217    si = createStripesWithSizes(0, 0, new Long[] { 3L, 3L, 3L }, new Long[] { 3L, 1L, 2L },
218      new Long[] { 3L, 2L, 2L });
219    verifySingleStripeCompaction(policy, si, 1, null);
220    // Verify max count is respected.
221    si = createStripesWithSizes(0, 0, new Long[] { 5L }, new Long[] { 5L, 4L, 4L, 4L, 4L });
222    List<HStoreFile> sfs = si.getStripes().get(1).subList(1, 5);
223    verifyCompaction(policy, si, sfs, null, 1, null, si.getStartRow(1), si.getEndRow(1), true);
224    // Verify ratio is applied.
225    si = createStripesWithSizes(0, 0, new Long[] { 5L }, new Long[] { 50L, 4L, 4L, 4L, 4L });
226    sfs = si.getStripes().get(1).subList(1, 5);
227    verifyCompaction(policy, si, sfs, null, 1, null, si.getStartRow(1), si.getEndRow(1), true);
228  }
229
230  @Test
231  public void testWithParallelCompaction() throws Exception {
232    // TODO: currently only one compaction at a time per store is allowed. If this changes,
233    // the appropriate file exclusion testing would need to be done in respective tests.
234    assertNull(createPolicy(HBaseConfiguration.create())
235      .selectCompaction(mock(StripeInformationProvider.class), al(createFile()), false));
236  }
237
238  @Test
239  public void testWithReferences() throws Exception {
240    StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create());
241    StripeCompactor sc = mock(StripeCompactor.class);
242    HStoreFile ref = createFile();
243    when(ref.isReference()).thenReturn(true);
244    StripeInformationProvider si = mock(StripeInformationProvider.class);
245    Collection<HStoreFile> sfs = al(ref, createFile());
246    when(si.getStorefiles()).thenReturn(sfs);
247
248    assertTrue(policy.needsCompactions(si, al()));
249    StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
250    // UnmodifiableCollection does not implement equals so we need to change it here to a
251    // collection that implements it.
252    assertEquals(si.getStorefiles(), new ArrayList<>(scr.getRequest().getFiles()));
253    scr.execute(sc, NoLimitThroughputController.INSTANCE, null);
254    verify(sc, only()).compact(eq(scr.getRequest()), anyInt(), anyLong(), aryEq(OPEN_KEY),
255      aryEq(OPEN_KEY), aryEq(OPEN_KEY), aryEq(OPEN_KEY), any(), any());
256  }
257
258  @Test
259  public void testInitialCountFromL0() throws Exception {
260    Configuration conf = HBaseConfiguration.create();
261    conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 2);
262    StripeCompactionPolicy policy =
263      createPolicy(conf, defaultSplitSize, defaultSplitCount, 2, false);
264    StripeCompactionPolicy.StripeInformationProvider si = createStripesL0Only(3, 8);
265    verifyCompaction(policy, si, si.getStorefiles(), true, 2, 12L, OPEN_KEY, OPEN_KEY, true);
266    si = createStripesL0Only(3, 10); // If result would be too large, split into smaller parts.
267    verifyCompaction(policy, si, si.getStorefiles(), true, 3, 10L, OPEN_KEY, OPEN_KEY, true);
268    policy = createPolicy(conf, defaultSplitSize, defaultSplitCount, 6, false);
269    verifyCompaction(policy, si, si.getStorefiles(), true, 6, 5L, OPEN_KEY, OPEN_KEY, true);
270  }
271
272  @Test
273  public void testSelectL0Compaction() throws Exception {
274    // test select ALL L0 files when L0 files count > MIN_FILES_L0_KEY
275    Configuration conf = HBaseConfiguration.create();
276    conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 4);
277    StripeCompactionPolicy policy = createPolicy(conf);
278    StripeCompactionPolicy.StripeInformationProvider si = createStripesWithSizes(10, 10L,
279      new Long[] { 5L, 1L, 1L }, new Long[] { 3L, 2L, 2L }, new Long[] { 3L, 2L, 2L });
280    StripeCompactionPolicy.StripeCompactionRequest cr = policy.selectCompaction(si, al(), false);
281    assertNotNull(cr);
282    assertEquals(10, cr.getRequest().getFiles().size());
283    verifyCollectionsEqual(si.getLevel0Files(), cr.getRequest().getFiles());
284
285    // test select partial L0 files when size of L0 files > HBASE_HSTORE_COMPACTION_MAX_SIZE_KEY
286    conf.setLong(HBASE_HSTORE_COMPACTION_MAX_SIZE_KEY, 100L);
287    policy = createPolicy(conf);
288    si = createStripesWithSizes(5, 50L, new Long[] { 5L, 1L, 1L }, new Long[] { 3L, 2L, 2L },
289      new Long[] { 3L, 2L, 2L });
290    cr = policy.selectCompaction(si, al(), false);
291    assertNotNull(cr);
292    assertEquals(2, cr.getRequest().getFiles().size());
293    verifyCollectionsEqual(si.getLevel0Files().subList(0, 2), cr.getRequest().getFiles());
294
295    // test select partial L0 files when count of L0 files > MAX_FILES_KEY
296    conf.setInt(MAX_FILES_KEY, 6);
297    conf.setLong(HBASE_HSTORE_COMPACTION_MAX_SIZE_KEY, 1000L);
298    policy = createPolicy(conf);
299    si = createStripesWithSizes(10, 10L, new Long[] { 5L, 1L, 1L }, new Long[] { 3L, 2L, 2L },
300      new Long[] { 3L, 2L, 2L });
301    cr = policy.selectCompaction(si, al(), false);
302    assertNotNull(cr);
303    assertEquals(6, cr.getRequest().getFiles().size());
304    verifyCollectionsEqual(si.getLevel0Files().subList(0, 6), cr.getRequest().getFiles());
305  }
306
307  @Test
308  public void testExistingStripesFromL0() throws Exception {
309    Configuration conf = HBaseConfiguration.create();
310    conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 3);
311    StripeCompactionPolicy.StripeInformationProvider si = createStripes(3, KEY_A);
312    verifyCompaction(createPolicy(conf), si, si.getLevel0Files(), null, null,
313      si.getStripeBoundaries());
314  }
315
316  @Test
317  public void testNothingToCompactFromL0() throws Exception {
318    Configuration conf = HBaseConfiguration.create();
319    conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 4);
320    StripeCompactionPolicy.StripeInformationProvider si = createStripesL0Only(3, 10);
321    StripeCompactionPolicy policy = createPolicy(conf);
322    verifyNoCompaction(policy, si);
323
324    si = createStripes(3, KEY_A);
325    verifyNoCompaction(policy, si);
326  }
327
328  @Test
329  public void testCheckExpiredStripeCompaction() throws Exception {
330    Configuration conf = HBaseConfiguration.create();
331    conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 5);
332    conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 4);
333
334    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
335    long now = defaultTtl + 2;
336    edge.setValue(now);
337    EnvironmentEdgeManager.injectEdge(edge);
338    HStoreFile expiredFile = createFile(10), notExpiredFile = createFile(10);
339    when(expiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl - 1);
340    when(notExpiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl + 1);
341    List<HStoreFile> expired = Lists.newArrayList(expiredFile, expiredFile);
342    List<HStoreFile> mixed = Lists.newArrayList(expiredFile, notExpiredFile);
343
344    StripeCompactionPolicy policy =
345      createPolicy(conf, defaultSplitSize, defaultSplitCount, defaultInitialCount, true);
346    // Merge expired if there are eligible stripes.
347    StripeCompactionPolicy.StripeInformationProvider si =
348      createStripesWithFiles(mixed, mixed, mixed);
349    assertFalse(policy.needsCompactions(si, al()));
350
351    si = createStripesWithFiles(mixed, mixed, mixed, expired);
352    assertFalse(policy.needsSingleStripeCompaction(si));
353    assertTrue(policy.hasExpiredStripes(si));
354    assertTrue(policy.needsCompactions(si, al()));
355  }
356
357  @Test
358  public void testSplitOffStripe() throws Exception {
359    Configuration conf = HBaseConfiguration.create();
360    // Test depends on this not being set to pass. Default breaks test. TODO: Revisit.
361    conf.unset("hbase.hstore.compaction.min.size");
362    // First test everything with default split count of 2, then split into more.
363    conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 2);
364    Long[] toSplit = new Long[] { defaultSplitSize - 2, 1L, 1L };
365    Long[] noSplit = new Long[] { defaultSplitSize - 2, 1L };
366    long splitTargetSize = (long) (defaultSplitSize / defaultSplitCount);
367    // Don't split if not eligible for compaction.
368    StripeCompactionPolicy.StripeInformationProvider si =
369      createStripesWithSizes(0, 0, new Long[] { defaultSplitSize - 2, 2L });
370    assertNull(createPolicy(conf).selectCompaction(si, al(), false));
371    // Make sure everything is eligible.
372    conf.setFloat(CompactionConfiguration.HBASE_HSTORE_COMPACTION_RATIO_KEY, 500f);
373    StripeCompactionPolicy policy = createPolicy(conf);
374    verifyWholeStripesCompaction(policy, si, 0, 0, null, 2, splitTargetSize);
375    // Add some extra stripes...
376    si = createStripesWithSizes(0, 0, noSplit, noSplit, toSplit);
377    verifyWholeStripesCompaction(policy, si, 2, 2, null, 2, splitTargetSize);
378    // In the middle.
379    si = createStripesWithSizes(0, 0, noSplit, toSplit, noSplit);
380    verifyWholeStripesCompaction(policy, si, 1, 1, null, 2, splitTargetSize);
381    // No split-off with different config (larger split size).
382    // However, in this case some eligible stripe will just be compacted alone.
383    StripeCompactionPolicy specPolicy =
384      createPolicy(conf, defaultSplitSize + 1, defaultSplitCount, defaultInitialCount, false);
385    verifySingleStripeCompaction(specPolicy, si, 1, null);
386  }
387
388  @Test
389  public void testSplitOffStripeOffPeak() throws Exception {
390    // for HBASE-11439
391    Configuration conf = HBaseConfiguration.create();
392
393    // Test depends on this not being set to pass. Default breaks test. TODO: Revisit.
394    conf.unset("hbase.hstore.compaction.min.size");
395
396    conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 2);
397    // Select the last 2 files.
398    StripeCompactionPolicy.StripeInformationProvider si =
399      createStripesWithSizes(0, 0, new Long[] { defaultSplitSize - 2, 1L, 1L });
400    assertEquals(2,
401      createPolicy(conf).selectCompaction(si, al(), false).getRequest().getFiles().size());
402    // Make sure everything is eligible in offpeak.
403    conf.setFloat("hbase.hstore.compaction.ratio.offpeak", 500f);
404    assertEquals(3,
405      createPolicy(conf).selectCompaction(si, al(), true).getRequest().getFiles().size());
406  }
407
408  @Test
409  public void testSplitOffStripeDropDeletes() throws Exception {
410    Configuration conf = HBaseConfiguration.create();
411    conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 2);
412    StripeCompactionPolicy policy = createPolicy(conf);
413    Long[] toSplit = new Long[] { defaultSplitSize / 2, defaultSplitSize / 2 };
414    Long[] noSplit = new Long[] { 1L };
415    long splitTargetSize = (long) (defaultSplitSize / defaultSplitCount);
416
417    // Verify the deletes can be dropped if there are no L0 files.
418    StripeCompactionPolicy.StripeInformationProvider si =
419      createStripesWithSizes(0, 0, noSplit, toSplit);
420    verifyWholeStripesCompaction(policy, si, 1, 1, true, null, splitTargetSize);
421    // But cannot be dropped if there are.
422    si = createStripesWithSizes(2, 2, noSplit, toSplit);
423    verifyWholeStripesCompaction(policy, si, 1, 1, false, null, splitTargetSize);
424  }
425
426  @SuppressWarnings("unchecked")
427  @Test
428  public void testMergeExpiredFiles() throws Exception {
429    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
430    long now = defaultTtl + 2;
431    edge.setValue(now);
432    EnvironmentEdgeManager.injectEdge(edge);
433    try {
434      HStoreFile expiredFile = createFile(), notExpiredFile = createFile();
435      when(expiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl - 1);
436      when(notExpiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl + 1);
437      List<HStoreFile> expired = Lists.newArrayList(expiredFile, expiredFile);
438      List<HStoreFile> notExpired = Lists.newArrayList(notExpiredFile, notExpiredFile);
439      List<HStoreFile> mixed = Lists.newArrayList(expiredFile, notExpiredFile);
440
441      StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create(), defaultSplitSize,
442        defaultSplitCount, defaultInitialCount, true);
443      // Merge expired if there are eligible stripes.
444      StripeCompactionPolicy.StripeInformationProvider si =
445        createStripesWithFiles(expired, expired, expired);
446      verifyWholeStripesCompaction(policy, si, 0, 2, null, 1, Long.MAX_VALUE, false);
447      // Don't merge if nothing expired.
448      si = createStripesWithFiles(notExpired, notExpired, notExpired);
449      assertNull(policy.selectCompaction(si, al(), false));
450      // Merge one expired stripe with next.
451      si = createStripesWithFiles(notExpired, expired, notExpired);
452      verifyWholeStripesCompaction(policy, si, 1, 2, null, 1, Long.MAX_VALUE, false);
453      // Merge the biggest run out of multiple options.
454      // Merge one expired stripe with next.
455      si = createStripesWithFiles(notExpired, expired, notExpired, expired, expired, notExpired);
456      verifyWholeStripesCompaction(policy, si, 3, 4, null, 1, Long.MAX_VALUE, false);
457      // Stripe with a subset of expired files is not merged.
458      si = createStripesWithFiles(expired, expired, notExpired, expired, mixed);
459      verifyWholeStripesCompaction(policy, si, 0, 1, null, 1, Long.MAX_VALUE, false);
460    } finally {
461      EnvironmentEdgeManager.reset();
462    }
463  }
464
465  @SuppressWarnings("unchecked")
466  @Test
467  public void testMergeExpiredStripes() throws Exception {
468    // HBASE-11397
469    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
470    long now = defaultTtl + 2;
471    edge.setValue(now);
472    EnvironmentEdgeManager.injectEdge(edge);
473    try {
474      HStoreFile expiredFile = createFile(), notExpiredFile = createFile();
475      when(expiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl - 1);
476      when(notExpiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl + 1);
477      List<HStoreFile> expired = Lists.newArrayList(expiredFile, expiredFile);
478      List<HStoreFile> notExpired = Lists.newArrayList(notExpiredFile, notExpiredFile);
479
480      StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create(), defaultSplitSize,
481        defaultSplitCount, defaultInitialCount, true);
482
483      // Merge all three expired stripes into one.
484      StripeCompactionPolicy.StripeInformationProvider si =
485        createStripesWithFiles(expired, expired, expired);
486      verifyMergeCompatcion(policy, si, 0, 2);
487
488      // Merge two adjacent expired stripes into one.
489      si = createStripesWithFiles(notExpired, expired, notExpired, expired, expired, notExpired);
490      verifyMergeCompatcion(policy, si, 3, 4);
491    } finally {
492      EnvironmentEdgeManager.reset();
493    }
494  }
495
496  @SuppressWarnings("unchecked")
497  private static StripeCompactionPolicy.StripeInformationProvider
498    createStripesWithFiles(List<HStoreFile>... stripeFiles) throws Exception {
499    return createStripesWithFiles(createBoundaries(stripeFiles.length),
500      Lists.newArrayList(stripeFiles), new ArrayList<>());
501  }
502
503  @Test
504  public void testSingleStripeDropDeletes() throws Exception {
505    Configuration conf = HBaseConfiguration.create();
506    // Test depends on this not being set to pass. Default breaks test. TODO: Revisit.
507    conf.unset("hbase.hstore.compaction.min.size");
508    StripeCompactionPolicy policy = createPolicy(conf);
509    // Verify the deletes can be dropped if there are no L0 files.
510    Long[][] stripes = new Long[][] { new Long[] { 3L, 2L, 2L, 2L }, new Long[] { 6L } };
511    StripeInformationProvider si = createStripesWithSizes(0, 0, stripes);
512    verifySingleStripeCompaction(policy, si, 0, true);
513    // But cannot be dropped if there are.
514    si = createStripesWithSizes(2, 2, stripes);
515    verifySingleStripeCompaction(policy, si, 0, false);
516    // Unless there are enough to cause L0 compaction.
517    si = createStripesWithSizes(6, 2, stripes);
518    ConcatenatedLists<HStoreFile> sfs = new ConcatenatedLists<>();
519    sfs.addSublist(si.getLevel0Files());
520    sfs.addSublist(si.getStripes().get(0));
521    verifyCompaction(policy, si, sfs, si.getStartRow(0), si.getEndRow(0), si.getStripeBoundaries());
522    // If we cannot actually compact all files in some stripe, L0 is chosen.
523    si = createStripesWithSizes(6, 2,
524      new Long[][] { new Long[] { 10L, 1L, 1L, 1L, 1L }, new Long[] { 12L } });
525    verifyCompaction(policy, si, si.getLevel0Files(), null, null, si.getStripeBoundaries());
526    // even if L0 has no file
527    // if all files of stripe aren't selected, delete must not be dropped.
528    stripes = new Long[][] { new Long[] { 100L, 3L, 2L, 2L, 2L }, new Long[] { 6L } };
529    si = createStripesWithSizes(0, 0, stripes);
530    List<HStoreFile> compactFile = new ArrayList<>();
531    Iterator<HStoreFile> iter = si.getStripes().get(0).listIterator(1);
532    while (iter.hasNext()) {
533      compactFile.add(iter.next());
534    }
535    verifyCompaction(policy, si, compactFile, false, 1, null, si.getStartRow(0), si.getEndRow(0),
536      true);
537  }
538
539  @Test
540  public void testCheckExpiredL0Compaction() throws Exception {
541    Configuration conf = HBaseConfiguration.create();
542    int minL0 = 100;
543    conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, minL0);
544    conf.setInt(MIN_FILES_KEY, 4);
545
546    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
547    long now = defaultTtl + 2;
548    edge.setValue(now);
549    EnvironmentEdgeManager.injectEdge(edge);
550    HStoreFile expiredFile = createFile(10), notExpiredFile = createFile(10);
551    when(expiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl - 1);
552    when(notExpiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl + 1);
553    List<HStoreFile> expired = Lists.newArrayList(expiredFile, expiredFile);
554    List<HStoreFile> mixed = Lists.newArrayList(expiredFile, notExpiredFile);
555
556    StripeCompactionPolicy policy =
557      createPolicy(conf, defaultSplitSize, defaultSplitCount, defaultInitialCount, true);
558    // Merge expired if there are eligible stripes.
559    StripeCompactionPolicy.StripeInformationProvider si =
560      createStripesWithFiles(null, new ArrayList<>(), mixed);
561    assertFalse(policy.needsCompactions(si, al()));
562
563    List<HStoreFile> largeMixed = new ArrayList<>();
564    for (int i = 0; i < minL0 - 1; i++) {
565      largeMixed.add(i % 2 == 0 ? notExpiredFile : expiredFile);
566    }
567    si = createStripesWithFiles(null, new ArrayList<>(), largeMixed);
568    assertFalse(policy.needsCompactions(si, al()));
569
570    si = createStripesWithFiles(null, new ArrayList<>(), expired);
571    assertFalse(policy.needsSingleStripeCompaction(si));
572    assertFalse(policy.hasExpiredStripes(si));
573    assertTrue(policy.allL0FilesExpired(si));
574    assertTrue(policy.needsCompactions(si, al()));
575  }
576
577  /********* HELPER METHODS ************/
578  private static StripeCompactionPolicy createPolicy(Configuration conf) throws Exception {
579    return createPolicy(conf, defaultSplitSize, defaultSplitCount, defaultInitialCount, false);
580  }
581
582  private static StripeCompactionPolicy createPolicy(Configuration conf, long splitSize,
583    float splitCount, int initialCount, boolean hasTtl) throws Exception {
584    conf.setLong(StripeStoreConfig.SIZE_TO_SPLIT_KEY, splitSize);
585    conf.setFloat(StripeStoreConfig.SPLIT_PARTS_KEY, splitCount);
586    conf.setInt(StripeStoreConfig.INITIAL_STRIPE_COUNT_KEY, initialCount);
587    StoreConfigInformation sci = mock(StoreConfigInformation.class);
588    when(sci.getStoreFileTtl()).thenReturn(hasTtl ? defaultTtl : Long.MAX_VALUE);
589    when(sci.getRegionInfo()).thenReturn(HRegionInfo.FIRST_META_REGIONINFO);
590    StripeStoreConfig ssc = new StripeStoreConfig(conf, sci);
591    return new StripeCompactionPolicy(conf, sci, ssc);
592  }
593
594  private static ArrayList<HStoreFile> al(HStoreFile... sfs) {
595    return new ArrayList<>(Arrays.asList(sfs));
596  }
597
598  private void verifyMergeCompatcion(StripeCompactionPolicy policy, StripeInformationProvider si,
599    int from, int to) throws Exception {
600    StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
601    Collection<HStoreFile> sfs = getAllFiles(si, from, to);
602    verifyCollectionsEqual(sfs, scr.getRequest().getFiles());
603
604    // All the Stripes are expired, so the Compactor will not create any Writers. We need to create
605    // an empty file to preserve metadata
606    StripeCompactor sc = createCompactor();
607    List<Path> paths = scr.execute(sc, NoLimitThroughputController.INSTANCE, null);
608    assertEquals(1, paths.size());
609  }
610
611  /**
612   * Verify the compaction that includes several entire stripes.
613   * @param policy      Policy to test.
614   * @param si          Stripe information pre-set with stripes to test.
615   * @param from        Starting stripe.
616   * @param to          Ending stripe (inclusive).
617   * @param dropDeletes Whether to drop deletes from compaction range.
618   * @param count       Expected # of resulting stripes, null if not checked.
619   * @param size        Expected target stripe size, null if not checked.
620   */
621  private void verifyWholeStripesCompaction(StripeCompactionPolicy policy,
622    StripeInformationProvider si, int from, int to, Boolean dropDeletes, Integer count, Long size,
623    boolean needsCompaction) throws IOException {
624    verifyCompaction(policy, si, getAllFiles(si, from, to), dropDeletes, count, size,
625      si.getStartRow(from), si.getEndRow(to), needsCompaction);
626  }
627
628  private void verifyWholeStripesCompaction(StripeCompactionPolicy policy,
629    StripeInformationProvider si, int from, int to, Boolean dropDeletes, Integer count, Long size)
630    throws IOException {
631    verifyWholeStripesCompaction(policy, si, from, to, dropDeletes, count, size, true);
632  }
633
634  private void verifySingleStripeCompaction(StripeCompactionPolicy policy,
635    StripeInformationProvider si, int index, Boolean dropDeletes) throws IOException {
636    verifyWholeStripesCompaction(policy, si, index, index, dropDeletes, 1, null, true);
637  }
638
639  /**
640   * Verify no compaction is needed or selected.
641   * @param policy Policy to test.
642   * @param si     Stripe information pre-set with stripes to test.
643   */
644  private void verifyNoCompaction(StripeCompactionPolicy policy, StripeInformationProvider si)
645    throws IOException {
646    assertNull(policy.selectCompaction(si, al(), false));
647    assertFalse(policy.needsCompactions(si, al()));
648  }
649
650  /**
651   * Verify arbitrary compaction.
652   * @param policy          Policy to test.
653   * @param si              Stripe information pre-set with stripes to test.
654   * @param sfs             Files that should be compacted.
655   * @param dropDeletesFrom Row from which to drop deletes.
656   * @param dropDeletesTo   Row to which to drop deletes.
657   * @param boundaries      Expected target stripe boundaries.
658   */
659  private void verifyCompaction(StripeCompactionPolicy policy, StripeInformationProvider si,
660    Collection<HStoreFile> sfs, byte[] dropDeletesFrom, byte[] dropDeletesTo,
661    final List<byte[]> boundaries) throws Exception {
662    StripeCompactor sc = mock(StripeCompactor.class);
663    assertTrue(policy.needsCompactions(si, al()));
664    StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
665    verifyCollectionsEqual(sfs, scr.getRequest().getFiles());
666    scr.execute(sc, NoLimitThroughputController.INSTANCE, null);
667    verify(sc, times(1)).compact(eq(scr.getRequest()), argThat(new ArgumentMatcher<List<byte[]>>() {
668      @Override
669      public boolean matches(List<byte[]> argument) {
670        List<byte[]> other = argument;
671        if (other.size() != boundaries.size()) {
672          return false;
673        }
674        for (int i = 0; i < other.size(); ++i) {
675          if (!Bytes.equals(other.get(i), boundaries.get(i))) {
676            return false;
677          }
678        }
679        return true;
680      }
681    }), dropDeletesFrom == null ? isNull(byte[].class) : aryEq(dropDeletesFrom),
682      dropDeletesTo == null ? isNull(byte[].class) : aryEq(dropDeletesTo), any(), any());
683  }
684
685  /**
686   * Verify arbitrary compaction.
687   * @param policy      Policy to test.
688   * @param si          Stripe information pre-set with stripes to test.
689   * @param sfs         Files that should be compacted.
690   * @param dropDeletes Whether to drop deletes from compaction range.
691   * @param count       Expected # of resulting stripes, null if not checked.
692   * @param size        Expected target stripe size, null if not checked.
693   * @param start       Left boundary of the compaction.
694   * @param end         Right boundary of the compaction.
695   */
696  private void verifyCompaction(StripeCompactionPolicy policy, StripeInformationProvider si,
697    Collection<HStoreFile> sfs, Boolean dropDeletes, Integer count, Long size, byte[] start,
698    byte[] end, boolean needsCompaction) throws IOException {
699    StripeCompactor sc = mock(StripeCompactor.class);
700    assertTrue(!needsCompaction || policy.needsCompactions(si, al()));
701    StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
702    verifyCollectionsEqual(sfs, scr.getRequest().getFiles());
703    scr.execute(sc, NoLimitThroughputController.INSTANCE, null);
704    verify(sc, times(1)).compact(eq(scr.getRequest()),
705      count == null ? anyInt() : eq(count.intValue()),
706      size == null ? anyLong() : eq(size.longValue()), aryEq(start), aryEq(end),
707      dropDeletesMatcher(dropDeletes, start), dropDeletesMatcher(dropDeletes, end), any(), any());
708  }
709
710  /** Verify arbitrary flush. */
711  protected void verifyFlush(StripeCompactionPolicy policy, StripeInformationProvider si,
712    KeyValue[] input, KeyValue[][] expected, byte[][] boundaries) throws IOException {
713    StoreFileWritersCapture writers = new StoreFileWritersCapture();
714    StripeStoreFlusher.StripeFlushRequest req =
715      policy.selectFlush(CellComparatorImpl.COMPARATOR, si, input.length);
716    StripeMultiFileWriter mw = req.createWriter();
717    mw.init(null, writers);
718    for (KeyValue kv : input) {
719      mw.append(kv);
720    }
721    boolean hasMetadata = boundaries != null;
722    mw.commitWriters(0, false);
723    writers.verifyKvs(expected, true, hasMetadata);
724    if (hasMetadata) {
725      writers.verifyBoundaries(boundaries);
726    }
727  }
728
729  private byte[] dropDeletesMatcher(Boolean dropDeletes, byte[] value) {
730    return dropDeletes == null
731      ? any()
732      : (dropDeletes.booleanValue() ? aryEq(value) : isNull(byte[].class));
733  }
734
735  private void verifyCollectionsEqual(Collection<HStoreFile> sfs, Collection<HStoreFile> scr) {
736    // Dumb.
737    assertEquals(sfs.size(), scr.size());
738    assertTrue(scr.containsAll(sfs));
739  }
740
741  private static List<HStoreFile> getAllFiles(StripeInformationProvider si, int fromStripe,
742    int toStripe) {
743    ArrayList<HStoreFile> expected = new ArrayList<>();
744    for (int i = fromStripe; i <= toStripe; ++i) {
745      expected.addAll(si.getStripes().get(i));
746    }
747    return expected;
748  }
749
750  /**
751   * @param l0Count    Number of L0 files.
752   * @param boundaries Target boundaries.
753   * @return Mock stripes.
754   */
755  private static StripeInformationProvider createStripes(int l0Count, byte[]... boundaries)
756    throws Exception {
757    List<Long> l0Sizes = new ArrayList<>();
758    for (int i = 0; i < l0Count; ++i) {
759      l0Sizes.add(5L);
760    }
761    List<List<Long>> sizes = new ArrayList<>();
762    for (int i = 0; i <= boundaries.length; ++i) {
763      sizes.add(Arrays.asList(Long.valueOf(5)));
764    }
765    return createStripes(Arrays.asList(boundaries), sizes, l0Sizes);
766  }
767
768  /**
769   * @param l0Count Number of L0 files.
770   * @param l0Size  Size of each file.
771   * @return Mock stripes.
772   */
773  private static StripeInformationProvider createStripesL0Only(int l0Count, long l0Size)
774    throws Exception {
775    List<Long> l0Sizes = new ArrayList<>();
776    for (int i = 0; i < l0Count; ++i) {
777      l0Sizes.add(l0Size);
778    }
779    return createStripes(null, new ArrayList<>(), l0Sizes);
780  }
781
782  /**
783   * @param l0Count Number of L0 files.
784   * @param l0Size  Size of each file.
785   * @param sizes   Sizes of the files; each sub-array representing a stripe.
786   * @return Mock stripes.
787   */
788  private static StripeInformationProvider createStripesWithSizes(int l0Count, long l0Size,
789    Long[]... sizes) throws Exception {
790    ArrayList<List<Long>> sizeList = new ArrayList<>(sizes.length);
791    for (Long[] size : sizes) {
792      sizeList.add(Arrays.asList(size));
793    }
794    return createStripesWithSizes(l0Count, l0Size, sizeList);
795  }
796
797  private static StripeInformationProvider createStripesWithSizes(int l0Count, long l0Size,
798    List<List<Long>> sizes) throws Exception {
799    List<byte[]> boundaries = createBoundaries(sizes.size());
800    List<Long> l0Sizes = new ArrayList<>();
801    for (int i = 0; i < l0Count; ++i) {
802      l0Sizes.add(l0Size);
803    }
804    return createStripes(boundaries, sizes, l0Sizes);
805  }
806
807  private static List<byte[]> createBoundaries(int stripeCount) {
808    byte[][] keys = new byte[][] { KEY_A, KEY_B, KEY_C, KEY_D, KEY_E };
809    assert stripeCount <= keys.length + 1;
810    List<byte[]> boundaries = new ArrayList<>();
811    boundaries.addAll(Arrays.asList(keys).subList(0, stripeCount - 1));
812    return boundaries;
813  }
814
815  private static StripeInformationProvider createStripes(List<byte[]> boundaries,
816    List<List<Long>> stripeSizes, List<Long> l0Sizes) throws Exception {
817    List<List<HStoreFile>> stripeFiles = new ArrayList<>(stripeSizes.size());
818    for (List<Long> sizes : stripeSizes) {
819      List<HStoreFile> sfs = new ArrayList<>(sizes.size());
820      for (Long size : sizes) {
821        sfs.add(createFile(size));
822      }
823      stripeFiles.add(sfs);
824    }
825    List<HStoreFile> l0Files = new ArrayList<>();
826    for (Long size : l0Sizes) {
827      l0Files.add(createFile(size));
828    }
829    return createStripesWithFiles(boundaries, stripeFiles, l0Files);
830  }
831
832  /**
833   * This method actually does all the work.
834   */
835  private static StripeInformationProvider createStripesWithFiles(List<byte[]> boundaries,
836    List<List<HStoreFile>> stripeFiles, List<HStoreFile> l0Files) throws Exception {
837    ArrayList<ImmutableList<HStoreFile>> stripes = new ArrayList<>();
838    ArrayList<byte[]> boundariesList = new ArrayList<>();
839    StripeInformationProvider si = mock(StripeInformationProvider.class);
840    if (!stripeFiles.isEmpty()) {
841      assert stripeFiles.size() == (boundaries.size() + 1);
842      boundariesList.add(OPEN_KEY);
843      for (int i = 0; i <= boundaries.size(); ++i) {
844        byte[] startKey = ((i == 0) ? OPEN_KEY : boundaries.get(i - 1));
845        byte[] endKey = ((i == boundaries.size()) ? OPEN_KEY : boundaries.get(i));
846        boundariesList.add(endKey);
847        for (HStoreFile sf : stripeFiles.get(i)) {
848          setFileStripe(sf, startKey, endKey);
849        }
850        stripes.add(ImmutableList.copyOf(stripeFiles.get(i)));
851        when(si.getStartRow(eq(i))).thenReturn(startKey);
852        when(si.getEndRow(eq(i))).thenReturn(endKey);
853      }
854    }
855    ConcatenatedLists<HStoreFile> sfs = new ConcatenatedLists<>();
856    sfs.addAllSublists(stripes);
857    sfs.addSublist(l0Files);
858    when(si.getStorefiles()).thenReturn(sfs);
859    when(si.getStripes()).thenReturn(stripes);
860    when(si.getStripeBoundaries()).thenReturn(boundariesList);
861    when(si.getStripeCount()).thenReturn(stripes.size());
862    when(si.getLevel0Files()).thenReturn(l0Files);
863    return si;
864  }
865
866  private static HStoreFile createFile(long size) throws Exception {
867    HStoreFile sf = mock(HStoreFile.class);
868    when(sf.getPath()).thenReturn(new Path("moo"));
869    StoreFileReader r = mock(StoreFileReader.class);
870    when(r.getEntries()).thenReturn(size);
871    when(r.length()).thenReturn(size);
872    when(r.getBloomFilterType()).thenReturn(BloomType.NONE);
873    when(r.getHFileReader()).thenReturn(mock(HFile.Reader.class));
874    when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong(), anyLong(),
875      anyBoolean())).thenReturn(mock(StoreFileScanner.class));
876    when(sf.getReader()).thenReturn(r);
877    when(sf.getBulkLoadTimestamp()).thenReturn(OptionalLong.empty());
878    when(r.getMaxTimestamp()).thenReturn(TimeRange.INITIAL_MAX_TIMESTAMP);
879    return sf;
880  }
881
882  private static HStoreFile createFile() throws Exception {
883    return createFile(0);
884  }
885
886  private static void setFileStripe(HStoreFile sf, byte[] startKey, byte[] endKey) {
887    when(sf.getMetadataValue(StripeStoreFileManager.STRIPE_START_KEY)).thenReturn(startKey);
888    when(sf.getMetadataValue(StripeStoreFileManager.STRIPE_END_KEY)).thenReturn(endKey);
889  }
890
891  private StripeCompactor createCompactor() throws Exception {
892    HColumnDescriptor col = new HColumnDescriptor(Bytes.toBytes("foo"));
893    StoreFileWritersCapture writers = new StoreFileWritersCapture();
894    HStore store = mock(HStore.class);
895    HRegionInfo info = mock(HRegionInfo.class);
896    when(info.getRegionNameAsString()).thenReturn("testRegion");
897    when(store.getColumnFamilyDescriptor()).thenReturn(col);
898    when(store.getRegionInfo()).thenReturn(info);
899    StoreEngine storeEngine = mock(StoreEngine.class);
900    when(storeEngine.createWriter(any(CreateStoreFileWriterParams.class))).thenAnswer(writers);
901    when(store.getStoreEngine()).thenReturn(storeEngine);
902
903    Configuration conf = HBaseConfiguration.create();
904    conf.setBoolean("hbase.regionserver.compaction.private.readers", usePrivateReaders);
905    final Scanner scanner = new Scanner();
906    return new StripeCompactor(conf, store) {
907      @Override
908      protected InternalScanner createScanner(HStore store, ScanInfo scanInfo,
909        List<StoreFileScanner> scanners, long smallestReadPoint, long earliestPutTs,
910        byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
911        return scanner;
912      }
913
914      @Override
915      protected InternalScanner createScanner(HStore store, ScanInfo scanInfo,
916        List<StoreFileScanner> scanners, ScanType scanType, long smallestReadPoint,
917        long earliestPutTs) throws IOException {
918        return scanner;
919      }
920    };
921  }
922
923  private static class Scanner implements InternalScanner {
924    private final ArrayList<KeyValue> kvs;
925
926    public Scanner(KeyValue... kvs) {
927      this.kvs = new ArrayList<>(Arrays.asList(kvs));
928    }
929
930    @Override
931    public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
932      if (kvs.isEmpty()) {
933        return false;
934      }
935      result.add(kvs.remove(0));
936      return !kvs.isEmpty();
937    }
938
939    @Override
940    public void close() throws IOException {
941    }
942  }
943}