001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.compactions;
019
020import static org.apache.hadoop.hbase.regionserver.StripeStoreConfig.MAX_FILES_KEY;
021import static org.apache.hadoop.hbase.regionserver.StripeStoreConfig.MIN_FILES_KEY;
022import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY;
023import static org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_SIZE_KEY;
024import static org.junit.Assert.assertEquals;
025import static org.junit.Assert.assertFalse;
026import static org.junit.Assert.assertNotNull;
027import static org.junit.Assert.assertNull;
028import static org.junit.Assert.assertTrue;
029import static org.mockito.AdditionalMatchers.aryEq;
030import static org.mockito.ArgumentMatchers.any;
031import static org.mockito.ArgumentMatchers.anyBoolean;
032import static org.mockito.ArgumentMatchers.anyInt;
033import static org.mockito.ArgumentMatchers.anyLong;
034import static org.mockito.ArgumentMatchers.argThat;
035import static org.mockito.ArgumentMatchers.eq;
036import static org.mockito.ArgumentMatchers.isNull;
037import static org.mockito.Mockito.mock;
038import static org.mockito.Mockito.only;
039import static org.mockito.Mockito.times;
040import static org.mockito.Mockito.verify;
041import static org.mockito.Mockito.when;
042
043import java.io.IOException;
044import java.util.ArrayList;
045import java.util.Arrays;
046import java.util.Collection;
047import java.util.Iterator;
048import java.util.List;
049import java.util.OptionalLong;
050import org.apache.hadoop.conf.Configuration;
051import org.apache.hadoop.fs.Path;
052import org.apache.hadoop.hbase.Cell;
053import org.apache.hadoop.hbase.CellComparatorImpl;
054import org.apache.hadoop.hbase.HBaseClassTestRule;
055import org.apache.hadoop.hbase.HBaseConfiguration;
056import org.apache.hadoop.hbase.KeyValue;
057import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
058import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
059import org.apache.hadoop.hbase.client.RegionInfo;
060import org.apache.hadoop.hbase.client.RegionInfoBuilder;
061import org.apache.hadoop.hbase.io.TimeRange;
062import org.apache.hadoop.hbase.io.hfile.HFile;
063import org.apache.hadoop.hbase.regionserver.BloomType;
064import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams;
065import org.apache.hadoop.hbase.regionserver.HStore;
066import org.apache.hadoop.hbase.regionserver.HStoreFile;
067import org.apache.hadoop.hbase.regionserver.InternalScanner;
068import org.apache.hadoop.hbase.regionserver.ScanInfo;
069import org.apache.hadoop.hbase.regionserver.ScanType;
070import org.apache.hadoop.hbase.regionserver.ScannerContext;
071import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
072import org.apache.hadoop.hbase.regionserver.StoreEngine;
073import org.apache.hadoop.hbase.regionserver.StoreFileReader;
074import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
075import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
076import org.apache.hadoop.hbase.regionserver.StripeStoreConfig;
077import org.apache.hadoop.hbase.regionserver.StripeStoreFileManager;
078import org.apache.hadoop.hbase.regionserver.StripeStoreFlusher;
079import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy.StripeInformationProvider;
080import org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.StoreFileWritersCapture;
081import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
082import org.apache.hadoop.hbase.testclassification.MediumTests;
083import org.apache.hadoop.hbase.testclassification.RegionServerTests;
084import org.apache.hadoop.hbase.util.Bytes;
085import org.apache.hadoop.hbase.util.ConcatenatedLists;
086import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
087import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
088import org.junit.ClassRule;
089import org.junit.Test;
090import org.junit.experimental.categories.Category;
091import org.junit.runner.RunWith;
092import org.junit.runners.Parameterized;
093import org.junit.runners.Parameterized.Parameter;
094import org.junit.runners.Parameterized.Parameters;
095import org.mockito.ArgumentMatcher;
096
097import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
098import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
099
100@RunWith(Parameterized.class)
101@Category({ RegionServerTests.class, MediumTests.class })
102public class TestStripeCompactionPolicy {
103
104  @ClassRule
105  public static final HBaseClassTestRule CLASS_RULE =
106    HBaseClassTestRule.forClass(TestStripeCompactionPolicy.class);
107
108  private static final byte[] KEY_A = Bytes.toBytes("aaa");
109  private static final byte[] KEY_B = Bytes.toBytes("bbb");
110  private static final byte[] KEY_C = Bytes.toBytes("ccc");
111  private static final byte[] KEY_D = Bytes.toBytes("ddd");
112  private static final byte[] KEY_E = Bytes.toBytes("eee");
113  private static final KeyValue KV_A = new KeyValue(KEY_A, 0L);
114  private static final KeyValue KV_B = new KeyValue(KEY_B, 0L);
115  private static final KeyValue KV_C = new KeyValue(KEY_C, 0L);
116  private static final KeyValue KV_D = new KeyValue(KEY_D, 0L);
117  private static final KeyValue KV_E = new KeyValue(KEY_E, 0L);
118
119  private static long defaultSplitSize = 18;
120  private static float defaultSplitCount = 1.8F;
121  private final static int defaultInitialCount = 1;
122  private static long defaultTtl = 1000 * 1000;
123
124  @Parameters(name = "{index}: usePrivateReaders={0}")
125  public static Iterable<Object[]> data() {
126    return Arrays.asList(new Object[] { true }, new Object[] { false });
127  }
128
129  @Parameter
130  public boolean usePrivateReaders;
131
132  @Test
133  public void testNoStripesFromFlush() throws Exception {
134    Configuration conf = HBaseConfiguration.create();
135    conf.setBoolean(StripeStoreConfig.FLUSH_TO_L0_KEY, true);
136    StripeCompactionPolicy policy = createPolicy(conf);
137    StripeInformationProvider si = createStripesL0Only(0, 0);
138
139    KeyValue[] input = new KeyValue[] { KV_A, KV_B, KV_C, KV_D, KV_E };
140    KeyValue[][] expected = new KeyValue[][] { input };
141    verifyFlush(policy, si, input, expected, null);
142  }
143
144  @Test
145  public void testOldStripesFromFlush() throws Exception {
146    StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create());
147    StripeInformationProvider si = createStripes(0, KEY_C, KEY_D);
148
149    KeyValue[] input = new KeyValue[] { KV_B, KV_C, KV_C, KV_D, KV_E };
150    KeyValue[][] expected = new KeyValue[][] { new KeyValue[] { KV_B },
151      new KeyValue[] { KV_C, KV_C }, new KeyValue[] { KV_D, KV_E } };
152    verifyFlush(policy, si, input, expected, new byte[][] { OPEN_KEY, KEY_C, KEY_D, OPEN_KEY });
153  }
154
155  @Test
156  public void testNewStripesFromFlush() throws Exception {
157    StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create());
158    StripeInformationProvider si = createStripesL0Only(0, 0);
159    KeyValue[] input = new KeyValue[] { KV_B, KV_C, KV_C, KV_D, KV_E };
160    // Starts with one stripe; unlike flush results, must have metadata
161    KeyValue[][] expected = new KeyValue[][] { input };
162    verifyFlush(policy, si, input, expected, new byte[][] { OPEN_KEY, OPEN_KEY });
163  }
164
165  @Test
166  public void testSingleStripeCompaction() throws Exception {
167    // Create a special policy that only compacts single stripes, using standard methods.
168    Configuration conf = HBaseConfiguration.create();
169    // Test depends on this not being set to pass. Default breaks test. TODO: Revisit.
170    conf.unset("hbase.hstore.compaction.min.size");
171    conf.setFloat(CompactionConfiguration.HBASE_HSTORE_COMPACTION_RATIO_KEY, 1.0F);
172    conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 3);
173    conf.setInt(StripeStoreConfig.MAX_FILES_KEY, 4);
174    conf.setLong(StripeStoreConfig.SIZE_TO_SPLIT_KEY, 1000); // make sure the are no splits
175    StoreConfigInformation sci = mock(StoreConfigInformation.class);
176    when(sci.getRegionInfo()).thenReturn(RegionInfoBuilder.FIRST_META_REGIONINFO);
177    StripeStoreConfig ssc = new StripeStoreConfig(conf, sci);
178    StripeCompactionPolicy policy = new StripeCompactionPolicy(conf, sci, ssc) {
179      @Override
180      public StripeCompactionRequest selectCompaction(StripeInformationProvider si,
181        List<HStoreFile> filesCompacting, boolean isOffpeak) throws IOException {
182        if (!filesCompacting.isEmpty()) {
183          return null;
184        }
185        return selectSingleStripeCompaction(si, false, false, isOffpeak);
186      }
187
188      @Override
189      public boolean needsCompactions(StripeInformationProvider si,
190        List<HStoreFile> filesCompacting) {
191        if (!filesCompacting.isEmpty()) {
192          return false;
193        }
194        return needsSingleStripeCompaction(si);
195      }
196    };
197
198    // No compaction due to min files or ratio
199    StripeInformationProvider si =
200      createStripesWithSizes(0, 0, new Long[] { 2L }, new Long[] { 3L, 3L }, new Long[] { 5L, 1L });
201    verifyNoCompaction(policy, si);
202    // No compaction due to min files or ratio - will report needed, but not do any.
203    si = createStripesWithSizes(0, 0, new Long[] { 2L }, new Long[] { 3L, 3L },
204      new Long[] { 5L, 1L, 1L });
205    assertNull(policy.selectCompaction(si, al(), false));
206    assertTrue(policy.needsCompactions(si, al()));
207    // One stripe has possible compaction
208    si = createStripesWithSizes(0, 0, new Long[] { 2L }, new Long[] { 3L, 3L },
209      new Long[] { 5L, 4L, 3L });
210    verifySingleStripeCompaction(policy, si, 2, null);
211    // Several stripes have possible compactions; choose best quality (removes most files)
212    si = createStripesWithSizes(0, 0, new Long[] { 3L, 2L, 2L }, new Long[] { 2L, 2L, 1L },
213      new Long[] { 3L, 2L, 2L, 1L });
214    verifySingleStripeCompaction(policy, si, 2, null);
215    si = createStripesWithSizes(0, 0, new Long[] { 5L }, new Long[] { 3L, 2L, 2L, 1L },
216      new Long[] { 3L, 2L, 2L });
217    verifySingleStripeCompaction(policy, si, 1, null);
218    // Or with smallest files, if the count is the same
219    si = createStripesWithSizes(0, 0, new Long[] { 3L, 3L, 3L }, new Long[] { 3L, 1L, 2L },
220      new Long[] { 3L, 2L, 2L });
221    verifySingleStripeCompaction(policy, si, 1, null);
222    // Verify max count is respected.
223    si = createStripesWithSizes(0, 0, new Long[] { 5L }, new Long[] { 5L, 4L, 4L, 4L, 4L });
224    List<HStoreFile> sfs = si.getStripes().get(1).subList(1, 5);
225    verifyCompaction(policy, si, sfs, null, 1, null, si.getStartRow(1), si.getEndRow(1), true);
226    // Verify ratio is applied.
227    si = createStripesWithSizes(0, 0, new Long[] { 5L }, new Long[] { 50L, 4L, 4L, 4L, 4L });
228    sfs = si.getStripes().get(1).subList(1, 5);
229    verifyCompaction(policy, si, sfs, null, 1, null, si.getStartRow(1), si.getEndRow(1), true);
230  }
231
232  @Test
233  public void testWithParallelCompaction() throws Exception {
234    // TODO: currently only one compaction at a time per store is allowed. If this changes,
235    // the appropriate file exclusion testing would need to be done in respective tests.
236    assertNull(createPolicy(HBaseConfiguration.create())
237      .selectCompaction(mock(StripeInformationProvider.class), al(createFile()), false));
238  }
239
240  @Test
241  public void testWithReferences() throws Exception {
242    StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create());
243    StripeCompactor sc = mock(StripeCompactor.class);
244    HStoreFile ref = createFile();
245    when(ref.isReference()).thenReturn(true);
246    StripeInformationProvider si = mock(StripeInformationProvider.class);
247    Collection<HStoreFile> sfs = al(ref, createFile());
248    when(si.getStorefiles()).thenReturn(sfs);
249
250    assertTrue(policy.needsCompactions(si, al()));
251    StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
252    // UnmodifiableCollection does not implement equals so we need to change it here to a
253    // collection that implements it.
254    assertEquals(si.getStorefiles(), new ArrayList<>(scr.getRequest().getFiles()));
255    scr.execute(sc, NoLimitThroughputController.INSTANCE, null);
256    verify(sc, only()).compact(eq(scr.getRequest()), anyInt(), anyLong(), aryEq(OPEN_KEY),
257      aryEq(OPEN_KEY), aryEq(OPEN_KEY), aryEq(OPEN_KEY), any(), any());
258  }
259
260  @Test
261  public void testInitialCountFromL0() throws Exception {
262    Configuration conf = HBaseConfiguration.create();
263    conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 2);
264    StripeCompactionPolicy policy =
265      createPolicy(conf, defaultSplitSize, defaultSplitCount, 2, false);
266    StripeCompactionPolicy.StripeInformationProvider si = createStripesL0Only(3, 8);
267    verifyCompaction(policy, si, si.getStorefiles(), true, 2, 12L, OPEN_KEY, OPEN_KEY, true);
268    si = createStripesL0Only(3, 10); // If result would be too large, split into smaller parts.
269    verifyCompaction(policy, si, si.getStorefiles(), true, 3, 10L, OPEN_KEY, OPEN_KEY, true);
270    policy = createPolicy(conf, defaultSplitSize, defaultSplitCount, 6, false);
271    verifyCompaction(policy, si, si.getStorefiles(), true, 6, 5L, OPEN_KEY, OPEN_KEY, true);
272  }
273
274  @Test
275  public void testSelectL0Compaction() throws Exception {
276    // test select ALL L0 files when L0 files count > MIN_FILES_L0_KEY
277    Configuration conf = HBaseConfiguration.create();
278    conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 4);
279    StripeCompactionPolicy policy = createPolicy(conf);
280    StripeCompactionPolicy.StripeInformationProvider si = createStripesWithSizes(10, 10L,
281      new Long[] { 5L, 1L, 1L }, new Long[] { 3L, 2L, 2L }, new Long[] { 3L, 2L, 2L });
282    StripeCompactionPolicy.StripeCompactionRequest cr = policy.selectCompaction(si, al(), false);
283    assertNotNull(cr);
284    assertEquals(10, cr.getRequest().getFiles().size());
285    verifyCollectionsEqual(si.getLevel0Files(), cr.getRequest().getFiles());
286
287    // test select partial L0 files when size of L0 files > HBASE_HSTORE_COMPACTION_MAX_SIZE_KEY
288    conf.setLong(HBASE_HSTORE_COMPACTION_MAX_SIZE_KEY, 100L);
289    policy = createPolicy(conf);
290    si = createStripesWithSizes(5, 50L, new Long[] { 5L, 1L, 1L }, new Long[] { 3L, 2L, 2L },
291      new Long[] { 3L, 2L, 2L });
292    cr = policy.selectCompaction(si, al(), false);
293    assertNotNull(cr);
294    assertEquals(2, cr.getRequest().getFiles().size());
295    verifyCollectionsEqual(si.getLevel0Files().subList(0, 2), cr.getRequest().getFiles());
296
297    // test select partial L0 files when count of L0 files > MAX_FILES_KEY
298    conf.setInt(MAX_FILES_KEY, 6);
299    conf.setLong(HBASE_HSTORE_COMPACTION_MAX_SIZE_KEY, 1000L);
300    policy = createPolicy(conf);
301    si = createStripesWithSizes(10, 10L, new Long[] { 5L, 1L, 1L }, new Long[] { 3L, 2L, 2L },
302      new Long[] { 3L, 2L, 2L });
303    cr = policy.selectCompaction(si, al(), false);
304    assertNotNull(cr);
305    assertEquals(6, cr.getRequest().getFiles().size());
306    verifyCollectionsEqual(si.getLevel0Files().subList(0, 6), cr.getRequest().getFiles());
307  }
308
309  @Test
310  public void testExistingStripesFromL0() throws Exception {
311    Configuration conf = HBaseConfiguration.create();
312    conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 3);
313    StripeCompactionPolicy.StripeInformationProvider si = createStripes(3, KEY_A);
314    verifyCompaction(createPolicy(conf), si, si.getLevel0Files(), null, null,
315      si.getStripeBoundaries());
316  }
317
318  @Test
319  public void testNothingToCompactFromL0() throws Exception {
320    Configuration conf = HBaseConfiguration.create();
321    conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 4);
322    StripeCompactionPolicy.StripeInformationProvider si = createStripesL0Only(3, 10);
323    StripeCompactionPolicy policy = createPolicy(conf);
324    verifyNoCompaction(policy, si);
325
326    si = createStripes(3, KEY_A);
327    verifyNoCompaction(policy, si);
328  }
329
330  @Test
331  public void testCheckExpiredStripeCompaction() throws Exception {
332    Configuration conf = HBaseConfiguration.create();
333    conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 5);
334    conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 4);
335
336    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
337    long now = defaultTtl + 2;
338    edge.setValue(now);
339    EnvironmentEdgeManager.injectEdge(edge);
340    HStoreFile expiredFile = createFile(10), notExpiredFile = createFile(10);
341    when(expiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl - 1);
342    when(notExpiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl + 1);
343    List<HStoreFile> expired = Lists.newArrayList(expiredFile, expiredFile);
344    List<HStoreFile> mixed = Lists.newArrayList(expiredFile, notExpiredFile);
345
346    StripeCompactionPolicy policy =
347      createPolicy(conf, defaultSplitSize, defaultSplitCount, defaultInitialCount, true);
348    // Merge expired if there are eligible stripes.
349    StripeCompactionPolicy.StripeInformationProvider si =
350      createStripesWithFiles(mixed, mixed, mixed);
351    assertFalse(policy.needsCompactions(si, al()));
352
353    si = createStripesWithFiles(mixed, mixed, mixed, expired);
354    assertFalse(policy.needsSingleStripeCompaction(si));
355    assertTrue(policy.hasExpiredStripes(si));
356    assertTrue(policy.needsCompactions(si, al()));
357  }
358
359  @Test
360  public void testSplitOffStripe() throws Exception {
361    Configuration conf = HBaseConfiguration.create();
362    // Test depends on this not being set to pass. Default breaks test. TODO: Revisit.
363    conf.unset("hbase.hstore.compaction.min.size");
364    // First test everything with default split count of 2, then split into more.
365    conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 2);
366    Long[] toSplit = new Long[] { defaultSplitSize - 2, 1L, 1L };
367    Long[] noSplit = new Long[] { defaultSplitSize - 2, 1L };
368    long splitTargetSize = (long) (defaultSplitSize / defaultSplitCount);
369    // Don't split if not eligible for compaction.
370    StripeCompactionPolicy.StripeInformationProvider si =
371      createStripesWithSizes(0, 0, new Long[] { defaultSplitSize - 2, 2L });
372    assertNull(createPolicy(conf).selectCompaction(si, al(), false));
373    // Make sure everything is eligible.
374    conf.setFloat(CompactionConfiguration.HBASE_HSTORE_COMPACTION_RATIO_KEY, 500f);
375    StripeCompactionPolicy policy = createPolicy(conf);
376    verifyWholeStripesCompaction(policy, si, 0, 0, null, 2, splitTargetSize);
377    // Add some extra stripes...
378    si = createStripesWithSizes(0, 0, noSplit, noSplit, toSplit);
379    verifyWholeStripesCompaction(policy, si, 2, 2, null, 2, splitTargetSize);
380    // In the middle.
381    si = createStripesWithSizes(0, 0, noSplit, toSplit, noSplit);
382    verifyWholeStripesCompaction(policy, si, 1, 1, null, 2, splitTargetSize);
383    // No split-off with different config (larger split size).
384    // However, in this case some eligible stripe will just be compacted alone.
385    StripeCompactionPolicy specPolicy =
386      createPolicy(conf, defaultSplitSize + 1, defaultSplitCount, defaultInitialCount, false);
387    verifySingleStripeCompaction(specPolicy, si, 1, null);
388  }
389
390  @Test
391  public void testSplitOffStripeOffPeak() throws Exception {
392    // for HBASE-11439
393    Configuration conf = HBaseConfiguration.create();
394
395    // Test depends on this not being set to pass. Default breaks test. TODO: Revisit.
396    conf.unset("hbase.hstore.compaction.min.size");
397
398    conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 2);
399    // Select the last 2 files.
400    StripeCompactionPolicy.StripeInformationProvider si =
401      createStripesWithSizes(0, 0, new Long[] { defaultSplitSize - 2, 1L, 1L });
402    assertEquals(2,
403      createPolicy(conf).selectCompaction(si, al(), false).getRequest().getFiles().size());
404    // Make sure everything is eligible in offpeak.
405    conf.setFloat("hbase.hstore.compaction.ratio.offpeak", 500f);
406    assertEquals(3,
407      createPolicy(conf).selectCompaction(si, al(), true).getRequest().getFiles().size());
408  }
409
410  @Test
411  public void testSplitOffStripeDropDeletes() throws Exception {
412    Configuration conf = HBaseConfiguration.create();
413    conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 2);
414    StripeCompactionPolicy policy = createPolicy(conf);
415    Long[] toSplit = new Long[] { defaultSplitSize / 2, defaultSplitSize / 2 };
416    Long[] noSplit = new Long[] { 1L };
417    long splitTargetSize = (long) (defaultSplitSize / defaultSplitCount);
418
419    // Verify the deletes can be dropped if there are no L0 files.
420    StripeCompactionPolicy.StripeInformationProvider si =
421      createStripesWithSizes(0, 0, noSplit, toSplit);
422    verifyWholeStripesCompaction(policy, si, 1, 1, true, null, splitTargetSize);
423    // But cannot be dropped if there are.
424    si = createStripesWithSizes(2, 2, noSplit, toSplit);
425    verifyWholeStripesCompaction(policy, si, 1, 1, false, null, splitTargetSize);
426  }
427
428  @SuppressWarnings("unchecked")
429  @Test
430  public void testMergeExpiredFiles() throws Exception {
431    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
432    long now = defaultTtl + 2;
433    edge.setValue(now);
434    EnvironmentEdgeManager.injectEdge(edge);
435    try {
436      HStoreFile expiredFile = createFile(), notExpiredFile = createFile();
437      when(expiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl - 1);
438      when(notExpiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl + 1);
439      List<HStoreFile> expired = Lists.newArrayList(expiredFile, expiredFile);
440      List<HStoreFile> notExpired = Lists.newArrayList(notExpiredFile, notExpiredFile);
441      List<HStoreFile> mixed = Lists.newArrayList(expiredFile, notExpiredFile);
442
443      StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create(), defaultSplitSize,
444        defaultSplitCount, defaultInitialCount, true);
445      // Merge expired if there are eligible stripes.
446      StripeCompactionPolicy.StripeInformationProvider si =
447        createStripesWithFiles(expired, expired, expired);
448      verifyWholeStripesCompaction(policy, si, 0, 2, null, 1, Long.MAX_VALUE, false);
449      // Don't merge if nothing expired.
450      si = createStripesWithFiles(notExpired, notExpired, notExpired);
451      assertNull(policy.selectCompaction(si, al(), false));
452      // Merge one expired stripe with next.
453      si = createStripesWithFiles(notExpired, expired, notExpired);
454      verifyWholeStripesCompaction(policy, si, 1, 2, null, 1, Long.MAX_VALUE, false);
455      // Merge the biggest run out of multiple options.
456      // Merge one expired stripe with next.
457      si = createStripesWithFiles(notExpired, expired, notExpired, expired, expired, notExpired);
458      verifyWholeStripesCompaction(policy, si, 3, 4, null, 1, Long.MAX_VALUE, false);
459      // Stripe with a subset of expired files is not merged.
460      si = createStripesWithFiles(expired, expired, notExpired, expired, mixed);
461      verifyWholeStripesCompaction(policy, si, 0, 1, null, 1, Long.MAX_VALUE, false);
462    } finally {
463      EnvironmentEdgeManager.reset();
464    }
465  }
466
467  @SuppressWarnings("unchecked")
468  @Test
469  public void testMergeExpiredStripes() throws Exception {
470    // HBASE-11397
471    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
472    long now = defaultTtl + 2;
473    edge.setValue(now);
474    EnvironmentEdgeManager.injectEdge(edge);
475    try {
476      HStoreFile expiredFile = createFile(), notExpiredFile = createFile();
477      when(expiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl - 1);
478      when(notExpiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl + 1);
479      List<HStoreFile> expired = Lists.newArrayList(expiredFile, expiredFile);
480      List<HStoreFile> notExpired = Lists.newArrayList(notExpiredFile, notExpiredFile);
481
482      StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create(), defaultSplitSize,
483        defaultSplitCount, defaultInitialCount, true);
484
485      // Merge all three expired stripes into one.
486      StripeCompactionPolicy.StripeInformationProvider si =
487        createStripesWithFiles(expired, expired, expired);
488      verifyMergeCompatcion(policy, si, 0, 2);
489
490      // Merge two adjacent expired stripes into one.
491      si = createStripesWithFiles(notExpired, expired, notExpired, expired, expired, notExpired);
492      verifyMergeCompatcion(policy, si, 3, 4);
493    } finally {
494      EnvironmentEdgeManager.reset();
495    }
496  }
497
498  @SuppressWarnings("unchecked")
499  private static StripeCompactionPolicy.StripeInformationProvider
500    createStripesWithFiles(List<HStoreFile>... stripeFiles) throws Exception {
501    return createStripesWithFiles(createBoundaries(stripeFiles.length),
502      Lists.newArrayList(stripeFiles), new ArrayList<>());
503  }
504
505  @Test
506  public void testSingleStripeDropDeletes() throws Exception {
507    Configuration conf = HBaseConfiguration.create();
508    // Test depends on this not being set to pass. Default breaks test. TODO: Revisit.
509    conf.unset("hbase.hstore.compaction.min.size");
510    StripeCompactionPolicy policy = createPolicy(conf);
511    // Verify the deletes can be dropped if there are no L0 files.
512    Long[][] stripes = new Long[][] { new Long[] { 3L, 2L, 2L, 2L }, new Long[] { 6L } };
513    StripeInformationProvider si = createStripesWithSizes(0, 0, stripes);
514    verifySingleStripeCompaction(policy, si, 0, true);
515    // But cannot be dropped if there are.
516    si = createStripesWithSizes(2, 2, stripes);
517    verifySingleStripeCompaction(policy, si, 0, false);
518    // Unless there are enough to cause L0 compaction.
519    si = createStripesWithSizes(6, 2, stripes);
520    ConcatenatedLists<HStoreFile> sfs = new ConcatenatedLists<>();
521    sfs.addSublist(si.getLevel0Files());
522    sfs.addSublist(si.getStripes().get(0));
523    verifyCompaction(policy, si, sfs, si.getStartRow(0), si.getEndRow(0), si.getStripeBoundaries());
524    // If we cannot actually compact all files in some stripe, L0 is chosen.
525    si = createStripesWithSizes(6, 2,
526      new Long[][] { new Long[] { 10L, 1L, 1L, 1L, 1L }, new Long[] { 12L } });
527    verifyCompaction(policy, si, si.getLevel0Files(), null, null, si.getStripeBoundaries());
528    // even if L0 has no file
529    // if all files of stripe aren't selected, delete must not be dropped.
530    stripes = new Long[][] { new Long[] { 100L, 3L, 2L, 2L, 2L }, new Long[] { 6L } };
531    si = createStripesWithSizes(0, 0, stripes);
532    List<HStoreFile> compactFile = new ArrayList<>();
533    Iterator<HStoreFile> iter = si.getStripes().get(0).listIterator(1);
534    while (iter.hasNext()) {
535      compactFile.add(iter.next());
536    }
537    verifyCompaction(policy, si, compactFile, false, 1, null, si.getStartRow(0), si.getEndRow(0),
538      true);
539  }
540
541  @Test
542  public void testCheckExpiredL0Compaction() throws Exception {
543    Configuration conf = HBaseConfiguration.create();
544    int minL0 = 100;
545    conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, minL0);
546    conf.setInt(MIN_FILES_KEY, 4);
547
548    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
549    long now = defaultTtl + 2;
550    edge.setValue(now);
551    EnvironmentEdgeManager.injectEdge(edge);
552    HStoreFile expiredFile = createFile(10), notExpiredFile = createFile(10);
553    when(expiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl - 1);
554    when(notExpiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl + 1);
555    List<HStoreFile> expired = Lists.newArrayList(expiredFile, expiredFile);
556    List<HStoreFile> mixed = Lists.newArrayList(expiredFile, notExpiredFile);
557
558    StripeCompactionPolicy policy =
559      createPolicy(conf, defaultSplitSize, defaultSplitCount, defaultInitialCount, true);
560    // Merge expired if there are eligible stripes.
561    StripeCompactionPolicy.StripeInformationProvider si =
562      createStripesWithFiles(null, new ArrayList<>(), mixed);
563    assertFalse(policy.needsCompactions(si, al()));
564
565    List<HStoreFile> largeMixed = new ArrayList<>();
566    for (int i = 0; i < minL0 - 1; i++) {
567      largeMixed.add(i % 2 == 0 ? notExpiredFile : expiredFile);
568    }
569    si = createStripesWithFiles(null, new ArrayList<>(), largeMixed);
570    assertFalse(policy.needsCompactions(si, al()));
571
572    si = createStripesWithFiles(null, new ArrayList<>(), expired);
573    assertFalse(policy.needsSingleStripeCompaction(si));
574    assertFalse(policy.hasExpiredStripes(si));
575    assertTrue(policy.allL0FilesExpired(si));
576    assertTrue(policy.needsCompactions(si, al()));
577  }
578
579  /********* HELPER METHODS ************/
580  private static StripeCompactionPolicy createPolicy(Configuration conf) throws Exception {
581    return createPolicy(conf, defaultSplitSize, defaultSplitCount, defaultInitialCount, false);
582  }
583
584  private static StripeCompactionPolicy createPolicy(Configuration conf, long splitSize,
585    float splitCount, int initialCount, boolean hasTtl) throws Exception {
586    conf.setLong(StripeStoreConfig.SIZE_TO_SPLIT_KEY, splitSize);
587    conf.setFloat(StripeStoreConfig.SPLIT_PARTS_KEY, splitCount);
588    conf.setInt(StripeStoreConfig.INITIAL_STRIPE_COUNT_KEY, initialCount);
589    StoreConfigInformation sci = mock(StoreConfigInformation.class);
590    when(sci.getStoreFileTtl()).thenReturn(hasTtl ? defaultTtl : Long.MAX_VALUE);
591    when(sci.getRegionInfo()).thenReturn(RegionInfoBuilder.FIRST_META_REGIONINFO);
592    StripeStoreConfig ssc = new StripeStoreConfig(conf, sci);
593    return new StripeCompactionPolicy(conf, sci, ssc);
594  }
595
596  private static ArrayList<HStoreFile> al(HStoreFile... sfs) {
597    return new ArrayList<>(Arrays.asList(sfs));
598  }
599
600  private void verifyMergeCompatcion(StripeCompactionPolicy policy, StripeInformationProvider si,
601    int from, int to) throws Exception {
602    StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
603    Collection<HStoreFile> sfs = getAllFiles(si, from, to);
604    verifyCollectionsEqual(sfs, scr.getRequest().getFiles());
605
606    // All the Stripes are expired, so the Compactor will not create any Writers. We need to create
607    // an empty file to preserve metadata
608    StripeCompactor sc = createCompactor();
609    List<Path> paths = scr.execute(sc, NoLimitThroughputController.INSTANCE, null);
610    assertEquals(1, paths.size());
611  }
612
613  /**
614   * Verify the compaction that includes several entire stripes.
615   * @param policy      Policy to test.
616   * @param si          Stripe information pre-set with stripes to test.
617   * @param from        Starting stripe.
618   * @param to          Ending stripe (inclusive).
619   * @param dropDeletes Whether to drop deletes from compaction range.
620   * @param count       Expected # of resulting stripes, null if not checked.
621   * @param size        Expected target stripe size, null if not checked.
622   */
623  private void verifyWholeStripesCompaction(StripeCompactionPolicy policy,
624    StripeInformationProvider si, int from, int to, Boolean dropDeletes, Integer count, Long size,
625    boolean needsCompaction) throws IOException {
626    verifyCompaction(policy, si, getAllFiles(si, from, to), dropDeletes, count, size,
627      si.getStartRow(from), si.getEndRow(to), needsCompaction);
628  }
629
630  private void verifyWholeStripesCompaction(StripeCompactionPolicy policy,
631    StripeInformationProvider si, int from, int to, Boolean dropDeletes, Integer count, Long size)
632    throws IOException {
633    verifyWholeStripesCompaction(policy, si, from, to, dropDeletes, count, size, true);
634  }
635
636  private void verifySingleStripeCompaction(StripeCompactionPolicy policy,
637    StripeInformationProvider si, int index, Boolean dropDeletes) throws IOException {
638    verifyWholeStripesCompaction(policy, si, index, index, dropDeletes, 1, null, true);
639  }
640
641  /**
642   * Verify no compaction is needed or selected.
643   * @param policy Policy to test.
644   * @param si     Stripe information pre-set with stripes to test.
645   */
646  private void verifyNoCompaction(StripeCompactionPolicy policy, StripeInformationProvider si)
647    throws IOException {
648    assertNull(policy.selectCompaction(si, al(), false));
649    assertFalse(policy.needsCompactions(si, al()));
650  }
651
652  /**
653   * Verify arbitrary compaction.
654   * @param policy          Policy to test.
655   * @param si              Stripe information pre-set with stripes to test.
656   * @param sfs             Files that should be compacted.
657   * @param dropDeletesFrom Row from which to drop deletes.
658   * @param dropDeletesTo   Row to which to drop deletes.
659   * @param boundaries      Expected target stripe boundaries.
660   */
661  private void verifyCompaction(StripeCompactionPolicy policy, StripeInformationProvider si,
662    Collection<HStoreFile> sfs, byte[] dropDeletesFrom, byte[] dropDeletesTo,
663    final List<byte[]> boundaries) throws Exception {
664    StripeCompactor sc = mock(StripeCompactor.class);
665    assertTrue(policy.needsCompactions(si, al()));
666    StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
667    verifyCollectionsEqual(sfs, scr.getRequest().getFiles());
668    scr.execute(sc, NoLimitThroughputController.INSTANCE, null);
669    verify(sc, times(1)).compact(eq(scr.getRequest()), argThat(new ArgumentMatcher<List<byte[]>>() {
670      @Override
671      public boolean matches(List<byte[]> argument) {
672        List<byte[]> other = argument;
673        if (other.size() != boundaries.size()) {
674          return false;
675        }
676        for (int i = 0; i < other.size(); ++i) {
677          if (!Bytes.equals(other.get(i), boundaries.get(i))) {
678            return false;
679          }
680        }
681        return true;
682      }
683    }), dropDeletesFrom == null ? isNull(byte[].class) : aryEq(dropDeletesFrom),
684      dropDeletesTo == null ? isNull(byte[].class) : aryEq(dropDeletesTo), any(), any());
685  }
686
687  /**
688   * Verify arbitrary compaction.
689   * @param policy      Policy to test.
690   * @param si          Stripe information pre-set with stripes to test.
691   * @param sfs         Files that should be compacted.
692   * @param dropDeletes Whether to drop deletes from compaction range.
693   * @param count       Expected # of resulting stripes, null if not checked.
694   * @param size        Expected target stripe size, null if not checked.
695   * @param start       Left boundary of the compaction.
696   * @param end         Right boundary of the compaction.
697   */
698  private void verifyCompaction(StripeCompactionPolicy policy, StripeInformationProvider si,
699    Collection<HStoreFile> sfs, Boolean dropDeletes, Integer count, Long size, byte[] start,
700    byte[] end, boolean needsCompaction) throws IOException {
701    StripeCompactor sc = mock(StripeCompactor.class);
702    assertTrue(!needsCompaction || policy.needsCompactions(si, al()));
703    StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
704    verifyCollectionsEqual(sfs, scr.getRequest().getFiles());
705    scr.execute(sc, NoLimitThroughputController.INSTANCE, null);
706    verify(sc, times(1)).compact(eq(scr.getRequest()),
707      count == null ? anyInt() : eq(count.intValue()),
708      size == null ? anyLong() : eq(size.longValue()), aryEq(start), aryEq(end),
709      dropDeletesMatcher(dropDeletes, start), dropDeletesMatcher(dropDeletes, end), any(), any());
710  }
711
712  /** Verify arbitrary flush. */
713  protected void verifyFlush(StripeCompactionPolicy policy, StripeInformationProvider si,
714    KeyValue[] input, KeyValue[][] expected, byte[][] boundaries) throws IOException {
715    StoreFileWritersCapture writers = new StoreFileWritersCapture();
716    StripeStoreFlusher.StripeFlushRequest req =
717      policy.selectFlush(CellComparatorImpl.COMPARATOR, si, input.length);
718    StripeMultiFileWriter mw = req.createWriter();
719    mw.init(null, writers);
720    for (KeyValue kv : input) {
721      mw.append(kv);
722    }
723    boolean hasMetadata = boundaries != null;
724    mw.commitWriters(0, false);
725    writers.verifyKvs(expected, true, hasMetadata);
726    if (hasMetadata) {
727      writers.verifyBoundaries(boundaries);
728    }
729  }
730
731  private byte[] dropDeletesMatcher(Boolean dropDeletes, byte[] value) {
732    return dropDeletes == null
733      ? any()
734      : (dropDeletes.booleanValue() ? aryEq(value) : isNull(byte[].class));
735  }
736
737  private void verifyCollectionsEqual(Collection<HStoreFile> sfs, Collection<HStoreFile> scr) {
738    // Dumb.
739    assertEquals(sfs.size(), scr.size());
740    assertTrue(scr.containsAll(sfs));
741  }
742
743  private static List<HStoreFile> getAllFiles(StripeInformationProvider si, int fromStripe,
744    int toStripe) {
745    ArrayList<HStoreFile> expected = new ArrayList<>();
746    for (int i = fromStripe; i <= toStripe; ++i) {
747      expected.addAll(si.getStripes().get(i));
748    }
749    return expected;
750  }
751
752  /**
753   * @param l0Count    Number of L0 files.
754   * @param boundaries Target boundaries.
755   * @return Mock stripes.
756   */
757  private static StripeInformationProvider createStripes(int l0Count, byte[]... boundaries)
758    throws Exception {
759    List<Long> l0Sizes = new ArrayList<>();
760    for (int i = 0; i < l0Count; ++i) {
761      l0Sizes.add(5L);
762    }
763    List<List<Long>> sizes = new ArrayList<>();
764    for (int i = 0; i <= boundaries.length; ++i) {
765      sizes.add(Arrays.asList(Long.valueOf(5)));
766    }
767    return createStripes(Arrays.asList(boundaries), sizes, l0Sizes);
768  }
769
770  /**
771   * @param l0Count Number of L0 files.
772   * @param l0Size  Size of each file.
773   * @return Mock stripes.
774   */
775  private static StripeInformationProvider createStripesL0Only(int l0Count, long l0Size)
776    throws Exception {
777    List<Long> l0Sizes = new ArrayList<>();
778    for (int i = 0; i < l0Count; ++i) {
779      l0Sizes.add(l0Size);
780    }
781    return createStripes(null, new ArrayList<>(), l0Sizes);
782  }
783
784  /**
785   * @param l0Count Number of L0 files.
786   * @param l0Size  Size of each file.
787   * @param sizes   Sizes of the files; each sub-array representing a stripe.
788   * @return Mock stripes.
789   */
790  private static StripeInformationProvider createStripesWithSizes(int l0Count, long l0Size,
791    Long[]... sizes) throws Exception {
792    ArrayList<List<Long>> sizeList = new ArrayList<>(sizes.length);
793    for (Long[] size : sizes) {
794      sizeList.add(Arrays.asList(size));
795    }
796    return createStripesWithSizes(l0Count, l0Size, sizeList);
797  }
798
799  private static StripeInformationProvider createStripesWithSizes(int l0Count, long l0Size,
800    List<List<Long>> sizes) throws Exception {
801    List<byte[]> boundaries = createBoundaries(sizes.size());
802    List<Long> l0Sizes = new ArrayList<>();
803    for (int i = 0; i < l0Count; ++i) {
804      l0Sizes.add(l0Size);
805    }
806    return createStripes(boundaries, sizes, l0Sizes);
807  }
808
809  private static List<byte[]> createBoundaries(int stripeCount) {
810    byte[][] keys = new byte[][] { KEY_A, KEY_B, KEY_C, KEY_D, KEY_E };
811    assert stripeCount <= keys.length + 1;
812    List<byte[]> boundaries = new ArrayList<>();
813    boundaries.addAll(Arrays.asList(keys).subList(0, stripeCount - 1));
814    return boundaries;
815  }
816
817  private static StripeInformationProvider createStripes(List<byte[]> boundaries,
818    List<List<Long>> stripeSizes, List<Long> l0Sizes) throws Exception {
819    List<List<HStoreFile>> stripeFiles = new ArrayList<>(stripeSizes.size());
820    for (List<Long> sizes : stripeSizes) {
821      List<HStoreFile> sfs = new ArrayList<>(sizes.size());
822      for (Long size : sizes) {
823        sfs.add(createFile(size));
824      }
825      stripeFiles.add(sfs);
826    }
827    List<HStoreFile> l0Files = new ArrayList<>();
828    for (Long size : l0Sizes) {
829      l0Files.add(createFile(size));
830    }
831    return createStripesWithFiles(boundaries, stripeFiles, l0Files);
832  }
833
834  /**
835   * This method actually does all the work.
836   */
837  private static StripeInformationProvider createStripesWithFiles(List<byte[]> boundaries,
838    List<List<HStoreFile>> stripeFiles, List<HStoreFile> l0Files) throws Exception {
839    ArrayList<ImmutableList<HStoreFile>> stripes = new ArrayList<>();
840    ArrayList<byte[]> boundariesList = new ArrayList<>();
841    StripeInformationProvider si = mock(StripeInformationProvider.class);
842    if (!stripeFiles.isEmpty()) {
843      assert stripeFiles.size() == (boundaries.size() + 1);
844      boundariesList.add(OPEN_KEY);
845      for (int i = 0; i <= boundaries.size(); ++i) {
846        byte[] startKey = ((i == 0) ? OPEN_KEY : boundaries.get(i - 1));
847        byte[] endKey = ((i == boundaries.size()) ? OPEN_KEY : boundaries.get(i));
848        boundariesList.add(endKey);
849        for (HStoreFile sf : stripeFiles.get(i)) {
850          setFileStripe(sf, startKey, endKey);
851        }
852        stripes.add(ImmutableList.copyOf(stripeFiles.get(i)));
853        when(si.getStartRow(eq(i))).thenReturn(startKey);
854        when(si.getEndRow(eq(i))).thenReturn(endKey);
855      }
856    }
857    ConcatenatedLists<HStoreFile> sfs = new ConcatenatedLists<>();
858    sfs.addAllSublists(stripes);
859    sfs.addSublist(l0Files);
860    when(si.getStorefiles()).thenReturn(sfs);
861    when(si.getStripes()).thenReturn(stripes);
862    when(si.getStripeBoundaries()).thenReturn(boundariesList);
863    when(si.getStripeCount()).thenReturn(stripes.size());
864    when(si.getLevel0Files()).thenReturn(l0Files);
865    return si;
866  }
867
868  private static HStoreFile createFile(long size) throws Exception {
869    HStoreFile sf = mock(HStoreFile.class);
870    when(sf.getPath()).thenReturn(new Path("moo"));
871    StoreFileReader r = mock(StoreFileReader.class);
872    when(r.getEntries()).thenReturn(size);
873    when(r.length()).thenReturn(size);
874    when(r.getBloomFilterType()).thenReturn(BloomType.NONE);
875    when(r.getHFileReader()).thenReturn(mock(HFile.Reader.class));
876    when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong(), anyLong(),
877      anyBoolean())).thenReturn(mock(StoreFileScanner.class));
878    when(sf.getReader()).thenReturn(r);
879    when(sf.getBulkLoadTimestamp()).thenReturn(OptionalLong.empty());
880    when(r.getMaxTimestamp()).thenReturn(TimeRange.INITIAL_MAX_TIMESTAMP);
881    return sf;
882  }
883
884  private static HStoreFile createFile() throws Exception {
885    return createFile(0);
886  }
887
888  private static void setFileStripe(HStoreFile sf, byte[] startKey, byte[] endKey) {
889    when(sf.getMetadataValue(StripeStoreFileManager.STRIPE_START_KEY)).thenReturn(startKey);
890    when(sf.getMetadataValue(StripeStoreFileManager.STRIPE_END_KEY)).thenReturn(endKey);
891  }
892
893  private StripeCompactor createCompactor() throws Exception {
894    ColumnFamilyDescriptor familyDescriptor =
895      ColumnFamilyDescriptorBuilder.of(Bytes.toBytes("foo"));
896    StoreFileWritersCapture writers = new StoreFileWritersCapture();
897    HStore store = mock(HStore.class);
898    RegionInfo info = mock(RegionInfo.class);
899    when(info.getRegionNameAsString()).thenReturn("testRegion");
900    when(store.getColumnFamilyDescriptor()).thenReturn(familyDescriptor);
901    when(store.getRegionInfo()).thenReturn(info);
902    StoreEngine storeEngine = mock(StoreEngine.class);
903    when(storeEngine.createWriter(any(CreateStoreFileWriterParams.class))).thenAnswer(writers);
904    when(store.getStoreEngine()).thenReturn(storeEngine);
905
906    Configuration conf = HBaseConfiguration.create();
907    conf.setBoolean("hbase.regionserver.compaction.private.readers", usePrivateReaders);
908    final Scanner scanner = new Scanner();
909    return new StripeCompactor(conf, store) {
910      @Override
911      protected InternalScanner createScanner(HStore store, ScanInfo scanInfo,
912        List<StoreFileScanner> scanners, long smallestReadPoint, long earliestPutTs,
913        byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
914        return scanner;
915      }
916
917      @Override
918      protected InternalScanner createScanner(HStore store, ScanInfo scanInfo,
919        List<StoreFileScanner> scanners, ScanType scanType, long smallestReadPoint,
920        long earliestPutTs) throws IOException {
921        return scanner;
922      }
923    };
924  }
925
926  private static class Scanner implements InternalScanner {
927    private final ArrayList<KeyValue> kvs;
928
929    public Scanner(KeyValue... kvs) {
930      this.kvs = new ArrayList<>(Arrays.asList(kvs));
931    }
932
933    @Override
934    public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
935      if (kvs.isEmpty()) {
936        return false;
937      }
938      result.add(kvs.remove(0));
939      return !kvs.isEmpty();
940    }
941
942    @Override
943    public void close() throws IOException {
944    }
945  }
946}