001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.compactions;
019
020import static org.apache.hadoop.hbase.regionserver.StripeStoreConfig.MAX_FILES_KEY;
021import static org.apache.hadoop.hbase.regionserver.StripeStoreConfig.MIN_FILES_KEY;
022import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY;
023import static org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_SIZE_KEY;
024import static org.junit.jupiter.api.Assertions.assertEquals;
025import static org.junit.jupiter.api.Assertions.assertFalse;
026import static org.junit.jupiter.api.Assertions.assertNotNull;
027import static org.junit.jupiter.api.Assertions.assertNull;
028import static org.junit.jupiter.api.Assertions.assertTrue;
029import static org.mockito.AdditionalMatchers.aryEq;
030import static org.mockito.ArgumentMatchers.any;
031import static org.mockito.ArgumentMatchers.anyBoolean;
032import static org.mockito.ArgumentMatchers.anyInt;
033import static org.mockito.ArgumentMatchers.anyLong;
034import static org.mockito.ArgumentMatchers.argThat;
035import static org.mockito.ArgumentMatchers.eq;
036import static org.mockito.ArgumentMatchers.isNull;
037import static org.mockito.Mockito.mock;
038import static org.mockito.Mockito.only;
039import static org.mockito.Mockito.times;
040import static org.mockito.Mockito.verify;
041import static org.mockito.Mockito.when;
042
043import java.io.IOException;
044import java.util.ArrayList;
045import java.util.Arrays;
046import java.util.Collection;
047import java.util.Iterator;
048import java.util.List;
049import java.util.OptionalLong;
050import java.util.stream.Stream;
051import org.apache.hadoop.conf.Configuration;
052import org.apache.hadoop.fs.Path;
053import org.apache.hadoop.hbase.CellComparatorImpl;
054import org.apache.hadoop.hbase.ExtendedCell;
055import org.apache.hadoop.hbase.HBaseConfiguration;
056import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate;
057import org.apache.hadoop.hbase.KeyValue;
058import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
059import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
060import org.apache.hadoop.hbase.client.RegionInfo;
061import org.apache.hadoop.hbase.client.RegionInfoBuilder;
062import org.apache.hadoop.hbase.io.TimeRange;
063import org.apache.hadoop.hbase.io.hfile.HFile;
064import org.apache.hadoop.hbase.regionserver.BloomType;
065import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams;
066import org.apache.hadoop.hbase.regionserver.HStore;
067import org.apache.hadoop.hbase.regionserver.HStoreFile;
068import org.apache.hadoop.hbase.regionserver.InternalScanner;
069import org.apache.hadoop.hbase.regionserver.ScanInfo;
070import org.apache.hadoop.hbase.regionserver.ScanType;
071import org.apache.hadoop.hbase.regionserver.ScannerContext;
072import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
073import org.apache.hadoop.hbase.regionserver.StoreEngine;
074import org.apache.hadoop.hbase.regionserver.StoreFileReader;
075import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
076import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
077import org.apache.hadoop.hbase.regionserver.StripeStoreConfig;
078import org.apache.hadoop.hbase.regionserver.StripeStoreFileManager;
079import org.apache.hadoop.hbase.regionserver.StripeStoreFlusher;
080import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy.StripeInformationProvider;
081import org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.StoreFileWritersCapture;
082import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
083import org.apache.hadoop.hbase.testclassification.MediumTests;
084import org.apache.hadoop.hbase.testclassification.RegionServerTests;
085import org.apache.hadoop.hbase.util.Bytes;
086import org.apache.hadoop.hbase.util.ConcatenatedLists;
087import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
088import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
089import org.junit.jupiter.api.Tag;
090import org.junit.jupiter.api.TestTemplate;
091import org.junit.jupiter.params.provider.Arguments;
092import org.mockito.ArgumentMatcher;
093
094import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
095import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
096
097@Tag(RegionServerTests.TAG)
098@Tag(MediumTests.TAG)
099@HBaseParameterizedTestTemplate(name = "{index}: usePrivateReaders={0}")
100public class TestStripeCompactionPolicy {
101
102  private static final byte[] KEY_A = Bytes.toBytes("aaa");
103  private static final byte[] KEY_B = Bytes.toBytes("bbb");
104  private static final byte[] KEY_C = Bytes.toBytes("ccc");
105  private static final byte[] KEY_D = Bytes.toBytes("ddd");
106  private static final byte[] KEY_E = Bytes.toBytes("eee");
107  private static final KeyValue KV_A = new KeyValue(KEY_A, 0L);
108  private static final KeyValue KV_B = new KeyValue(KEY_B, 0L);
109  private static final KeyValue KV_C = new KeyValue(KEY_C, 0L);
110  private static final KeyValue KV_D = new KeyValue(KEY_D, 0L);
111  private static final KeyValue KV_E = new KeyValue(KEY_E, 0L);
112
113  private static long defaultSplitSize = 18;
114  private static float defaultSplitCount = 1.8F;
115  private final static int defaultInitialCount = 1;
116  private static long defaultTtl = 1000 * 1000;
117
118  public static Stream<Arguments> parameters() {
119    return Stream.of(Arguments.of(true), Arguments.of(false));
120  }
121
122  private final boolean usePrivateReaders;
123
124  public TestStripeCompactionPolicy(boolean usePrivateReaders) {
125    this.usePrivateReaders = usePrivateReaders;
126  }
127
128  @TestTemplate
129  public void testNoStripesFromFlush() throws Exception {
130    Configuration conf = HBaseConfiguration.create();
131    conf.setBoolean(StripeStoreConfig.FLUSH_TO_L0_KEY, true);
132    StripeCompactionPolicy policy = createPolicy(conf);
133    StripeInformationProvider si = createStripesL0Only(0, 0);
134
135    KeyValue[] input = new KeyValue[] { KV_A, KV_B, KV_C, KV_D, KV_E };
136    KeyValue[][] expected = new KeyValue[][] { input };
137    verifyFlush(policy, si, input, expected, null);
138  }
139
140  @TestTemplate
141  public void testOldStripesFromFlush() throws Exception {
142    StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create());
143    StripeInformationProvider si = createStripes(0, KEY_C, KEY_D);
144
145    KeyValue[] input = new KeyValue[] { KV_B, KV_C, KV_C, KV_D, KV_E };
146    KeyValue[][] expected = new KeyValue[][] { new KeyValue[] { KV_B },
147      new KeyValue[] { KV_C, KV_C }, new KeyValue[] { KV_D, KV_E } };
148    verifyFlush(policy, si, input, expected, new byte[][] { OPEN_KEY, KEY_C, KEY_D, OPEN_KEY });
149  }
150
151  @TestTemplate
152  public void testNewStripesFromFlush() throws Exception {
153    StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create());
154    StripeInformationProvider si = createStripesL0Only(0, 0);
155    KeyValue[] input = new KeyValue[] { KV_B, KV_C, KV_C, KV_D, KV_E };
156    // Starts with one stripe; unlike flush results, must have metadata
157    KeyValue[][] expected = new KeyValue[][] { input };
158    verifyFlush(policy, si, input, expected, new byte[][] { OPEN_KEY, OPEN_KEY });
159  }
160
161  @TestTemplate
162  public void testSingleStripeCompaction() throws Exception {
163    // Create a special policy that only compacts single stripes, using standard methods.
164    Configuration conf = HBaseConfiguration.create();
165    // Test depends on this not being set to pass. Default breaks test. TODO: Revisit.
166    conf.unset("hbase.hstore.compaction.min.size");
167    conf.setFloat(CompactionConfiguration.HBASE_HSTORE_COMPACTION_RATIO_KEY, 1.0F);
168    conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 3);
169    conf.setInt(StripeStoreConfig.MAX_FILES_KEY, 4);
170    conf.setLong(StripeStoreConfig.SIZE_TO_SPLIT_KEY, 1000); // make sure the are no splits
171    StoreConfigInformation sci = mock(StoreConfigInformation.class);
172    when(sci.getRegionInfo()).thenReturn(RegionInfoBuilder.FIRST_META_REGIONINFO);
173    StripeStoreConfig ssc = new StripeStoreConfig(conf, sci);
174    StripeCompactionPolicy policy = new StripeCompactionPolicy(conf, sci, ssc) {
175      @Override
176      public StripeCompactionRequest selectCompaction(StripeInformationProvider si,
177        List<HStoreFile> filesCompacting, boolean isOffpeak) throws IOException {
178        if (!filesCompacting.isEmpty()) {
179          return null;
180        }
181        return selectSingleStripeCompaction(si, false, false, isOffpeak);
182      }
183
184      @Override
185      public boolean needsCompactions(StripeInformationProvider si,
186        List<HStoreFile> filesCompacting) {
187        if (!filesCompacting.isEmpty()) {
188          return false;
189        }
190        return needsSingleStripeCompaction(si);
191      }
192    };
193
194    // No compaction due to min files or ratio
195    StripeInformationProvider si =
196      createStripesWithSizes(0, 0, new Long[] { 2L }, new Long[] { 3L, 3L }, new Long[] { 5L, 1L });
197    verifyNoCompaction(policy, si);
198    // No compaction due to min files or ratio - will report needed, but not do any.
199    si = createStripesWithSizes(0, 0, new Long[] { 2L }, new Long[] { 3L, 3L },
200      new Long[] { 5L, 1L, 1L });
201    assertNull(policy.selectCompaction(si, al(), false));
202    assertTrue(policy.needsCompactions(si, al()));
203    // One stripe has possible compaction
204    si = createStripesWithSizes(0, 0, new Long[] { 2L }, new Long[] { 3L, 3L },
205      new Long[] { 5L, 4L, 3L });
206    verifySingleStripeCompaction(policy, si, 2, null);
207    // Several stripes have possible compactions; choose best quality (removes most files)
208    si = createStripesWithSizes(0, 0, new Long[] { 3L, 2L, 2L }, new Long[] { 2L, 2L, 1L },
209      new Long[] { 3L, 2L, 2L, 1L });
210    verifySingleStripeCompaction(policy, si, 2, null);
211    si = createStripesWithSizes(0, 0, new Long[] { 5L }, new Long[] { 3L, 2L, 2L, 1L },
212      new Long[] { 3L, 2L, 2L });
213    verifySingleStripeCompaction(policy, si, 1, null);
214    // Or with smallest files, if the count is the same
215    si = createStripesWithSizes(0, 0, new Long[] { 3L, 3L, 3L }, new Long[] { 3L, 1L, 2L },
216      new Long[] { 3L, 2L, 2L });
217    verifySingleStripeCompaction(policy, si, 1, null);
218    // Verify max count is respected.
219    si = createStripesWithSizes(0, 0, new Long[] { 5L }, new Long[] { 5L, 4L, 4L, 4L, 4L });
220    List<HStoreFile> sfs = si.getStripes().get(1).subList(1, 5);
221    verifyCompaction(policy, si, sfs, null, 1, null, si.getStartRow(1), si.getEndRow(1), true);
222    // Verify ratio is applied.
223    si = createStripesWithSizes(0, 0, new Long[] { 5L }, new Long[] { 50L, 4L, 4L, 4L, 4L });
224    sfs = si.getStripes().get(1).subList(1, 5);
225    verifyCompaction(policy, si, sfs, null, 1, null, si.getStartRow(1), si.getEndRow(1), true);
226  }
227
228  @TestTemplate
229  public void testWithParallelCompaction() throws Exception {
230    // TODO: currently only one compaction at a time per store is allowed. If this changes,
231    // the appropriate file exclusion testing would need to be done in respective tests.
232    assertNull(createPolicy(HBaseConfiguration.create())
233      .selectCompaction(mock(StripeInformationProvider.class), al(createFile()), false));
234  }
235
236  @TestTemplate
237  public void testWithReferences() throws Exception {
238    StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create());
239    StripeCompactor sc = mock(StripeCompactor.class);
240    HStoreFile ref = createFile();
241    when(ref.isReference()).thenReturn(true);
242    StripeInformationProvider si = mock(StripeInformationProvider.class);
243    Collection<HStoreFile> sfs = al(ref, createFile());
244    when(si.getStoreFiles()).thenReturn(sfs);
245
246    assertTrue(policy.needsCompactions(si, al()));
247    StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
248    // UnmodifiableCollection does not implement equals so we need to change it here to a
249    // collection that implements it.
250    assertEquals(si.getStoreFiles(), new ArrayList<>(scr.getRequest().getFiles()));
251    scr.execute(sc, NoLimitThroughputController.INSTANCE, null);
252    verify(sc, only()).compact(eq(scr.getRequest()), anyInt(), anyLong(), aryEq(OPEN_KEY),
253      aryEq(OPEN_KEY), aryEq(OPEN_KEY), aryEq(OPEN_KEY), any(), any());
254  }
255
256  @TestTemplate
257  public void testInitialCountFromL0() throws Exception {
258    Configuration conf = HBaseConfiguration.create();
259    conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 2);
260    StripeCompactionPolicy policy =
261      createPolicy(conf, defaultSplitSize, defaultSplitCount, 2, false);
262    StripeCompactionPolicy.StripeInformationProvider si = createStripesL0Only(3, 8);
263    verifyCompaction(policy, si, si.getStoreFiles(), true, 2, 12L, OPEN_KEY, OPEN_KEY, true);
264    si = createStripesL0Only(3, 10); // If result would be too large, split into smaller parts.
265    verifyCompaction(policy, si, si.getStoreFiles(), true, 3, 10L, OPEN_KEY, OPEN_KEY, true);
266    policy = createPolicy(conf, defaultSplitSize, defaultSplitCount, 6, false);
267    verifyCompaction(policy, si, si.getStoreFiles(), true, 6, 5L, OPEN_KEY, OPEN_KEY, true);
268  }
269
270  @TestTemplate
271  public void testSelectL0Compaction() throws Exception {
272    // test select ALL L0 files when L0 files count > MIN_FILES_L0_KEY
273    Configuration conf = HBaseConfiguration.create();
274    conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 4);
275    StripeCompactionPolicy policy = createPolicy(conf);
276    StripeCompactionPolicy.StripeInformationProvider si = createStripesWithSizes(10, 10L,
277      new Long[] { 5L, 1L, 1L }, new Long[] { 3L, 2L, 2L }, new Long[] { 3L, 2L, 2L });
278    StripeCompactionPolicy.StripeCompactionRequest cr = policy.selectCompaction(si, al(), false);
279    assertNotNull(cr);
280    assertEquals(10, cr.getRequest().getFiles().size());
281    verifyCollectionsEqual(si.getLevel0Files(), cr.getRequest().getFiles());
282
283    // test select partial L0 files when size of L0 files > HBASE_HSTORE_COMPACTION_MAX_SIZE_KEY
284    conf.setLong(HBASE_HSTORE_COMPACTION_MAX_SIZE_KEY, 100L);
285    policy = createPolicy(conf);
286    si = createStripesWithSizes(5, 50L, new Long[] { 5L, 1L, 1L }, new Long[] { 3L, 2L, 2L },
287      new Long[] { 3L, 2L, 2L });
288    cr = policy.selectCompaction(si, al(), false);
289    assertNotNull(cr);
290    assertEquals(2, cr.getRequest().getFiles().size());
291    verifyCollectionsEqual(si.getLevel0Files().subList(0, 2), cr.getRequest().getFiles());
292
293    // test select partial L0 files when count of L0 files > MAX_FILES_KEY
294    conf.setInt(MAX_FILES_KEY, 6);
295    conf.setLong(HBASE_HSTORE_COMPACTION_MAX_SIZE_KEY, 1000L);
296    policy = createPolicy(conf);
297    si = createStripesWithSizes(10, 10L, new Long[] { 5L, 1L, 1L }, new Long[] { 3L, 2L, 2L },
298      new Long[] { 3L, 2L, 2L });
299    cr = policy.selectCompaction(si, al(), false);
300    assertNotNull(cr);
301    assertEquals(6, cr.getRequest().getFiles().size());
302    verifyCollectionsEqual(si.getLevel0Files().subList(0, 6), cr.getRequest().getFiles());
303  }
304
305  @TestTemplate
306  public void testExistingStripesFromL0() throws Exception {
307    Configuration conf = HBaseConfiguration.create();
308    conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 3);
309    StripeCompactionPolicy.StripeInformationProvider si = createStripes(3, KEY_A);
310    verifyCompaction(createPolicy(conf), si, si.getLevel0Files(), null, null,
311      si.getStripeBoundaries());
312  }
313
314  @TestTemplate
315  public void testNothingToCompactFromL0() throws Exception {
316    Configuration conf = HBaseConfiguration.create();
317    conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 4);
318    StripeCompactionPolicy.StripeInformationProvider si = createStripesL0Only(3, 10);
319    StripeCompactionPolicy policy = createPolicy(conf);
320    verifyNoCompaction(policy, si);
321
322    si = createStripes(3, KEY_A);
323    verifyNoCompaction(policy, si);
324  }
325
326  @TestTemplate
327  public void testCheckExpiredStripeCompaction() throws Exception {
328    Configuration conf = HBaseConfiguration.create();
329    conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 5);
330    conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 4);
331
332    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
333    long now = defaultTtl + 2;
334    edge.setValue(now);
335    EnvironmentEdgeManager.injectEdge(edge);
336    HStoreFile expiredFile = createFile(10), notExpiredFile = createFile(10);
337    when(expiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl - 1);
338    when(notExpiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl + 1);
339    List<HStoreFile> expired = Lists.newArrayList(expiredFile, expiredFile);
340    List<HStoreFile> mixed = Lists.newArrayList(expiredFile, notExpiredFile);
341
342    StripeCompactionPolicy policy =
343      createPolicy(conf, defaultSplitSize, defaultSplitCount, defaultInitialCount, true);
344    // Merge expired if there are eligible stripes.
345    StripeCompactionPolicy.StripeInformationProvider si =
346      createStripesWithFiles(mixed, mixed, mixed);
347    assertFalse(policy.needsCompactions(si, al()));
348
349    si = createStripesWithFiles(mixed, mixed, mixed, expired);
350    assertFalse(policy.needsSingleStripeCompaction(si));
351    assertTrue(policy.hasExpiredStripes(si));
352    assertTrue(policy.needsCompactions(si, al()));
353  }
354
355  @TestTemplate
356  public void testSplitOffStripe() throws Exception {
357    Configuration conf = HBaseConfiguration.create();
358    // Test depends on this not being set to pass. Default breaks test. TODO: Revisit.
359    conf.unset("hbase.hstore.compaction.min.size");
360    // First test everything with default split count of 2, then split into more.
361    conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 2);
362    Long[] toSplit = new Long[] { defaultSplitSize - 2, 1L, 1L };
363    Long[] noSplit = new Long[] { defaultSplitSize - 2, 1L };
364    long splitTargetSize = (long) (defaultSplitSize / defaultSplitCount);
365    // Don't split if not eligible for compaction.
366    StripeCompactionPolicy.StripeInformationProvider si =
367      createStripesWithSizes(0, 0, new Long[] { defaultSplitSize - 2, 2L });
368    assertNull(createPolicy(conf).selectCompaction(si, al(), false));
369    // Make sure everything is eligible.
370    conf.setFloat(CompactionConfiguration.HBASE_HSTORE_COMPACTION_RATIO_KEY, 500f);
371    StripeCompactionPolicy policy = createPolicy(conf);
372    verifyWholeStripesCompaction(policy, si, 0, 0, null, 2, splitTargetSize);
373    // Add some extra stripes...
374    si = createStripesWithSizes(0, 0, noSplit, noSplit, toSplit);
375    verifyWholeStripesCompaction(policy, si, 2, 2, null, 2, splitTargetSize);
376    // In the middle.
377    si = createStripesWithSizes(0, 0, noSplit, toSplit, noSplit);
378    verifyWholeStripesCompaction(policy, si, 1, 1, null, 2, splitTargetSize);
379    // No split-off with different config (larger split size).
380    // However, in this case some eligible stripe will just be compacted alone.
381    StripeCompactionPolicy specPolicy =
382      createPolicy(conf, defaultSplitSize + 1, defaultSplitCount, defaultInitialCount, false);
383    verifySingleStripeCompaction(specPolicy, si, 1, null);
384  }
385
386  @TestTemplate
387  public void testSplitOffStripeOffPeak() throws Exception {
388    // for HBASE-11439
389    Configuration conf = HBaseConfiguration.create();
390
391    // Test depends on this not being set to pass. Default breaks test. TODO: Revisit.
392    conf.unset("hbase.hstore.compaction.min.size");
393
394    conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 2);
395    // Select the last 2 files.
396    StripeCompactionPolicy.StripeInformationProvider si =
397      createStripesWithSizes(0, 0, new Long[] { defaultSplitSize - 2, 1L, 1L });
398    assertEquals(2,
399      createPolicy(conf).selectCompaction(si, al(), false).getRequest().getFiles().size());
400    // Make sure everything is eligible in offpeak.
401    conf.setFloat("hbase.hstore.compaction.ratio.offpeak", 500f);
402    assertEquals(3,
403      createPolicy(conf).selectCompaction(si, al(), true).getRequest().getFiles().size());
404  }
405
406  @TestTemplate
407  public void testSplitOffStripeDropDeletes() throws Exception {
408    Configuration conf = HBaseConfiguration.create();
409    conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 2);
410    StripeCompactionPolicy policy = createPolicy(conf);
411    Long[] toSplit = new Long[] { defaultSplitSize / 2, defaultSplitSize / 2 };
412    Long[] noSplit = new Long[] { 1L };
413    long splitTargetSize = (long) (defaultSplitSize / defaultSplitCount);
414
415    // Verify the deletes can be dropped if there are no L0 files.
416    StripeCompactionPolicy.StripeInformationProvider si =
417      createStripesWithSizes(0, 0, noSplit, toSplit);
418    verifyWholeStripesCompaction(policy, si, 1, 1, true, null, splitTargetSize);
419    // But cannot be dropped if there are.
420    si = createStripesWithSizes(2, 2, noSplit, toSplit);
421    verifyWholeStripesCompaction(policy, si, 1, 1, false, null, splitTargetSize);
422  }
423
424  @SuppressWarnings("unchecked")
425  @TestTemplate
426  public void testMergeExpiredFiles() throws Exception {
427    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
428    long now = defaultTtl + 2;
429    edge.setValue(now);
430    EnvironmentEdgeManager.injectEdge(edge);
431    try {
432      HStoreFile expiredFile = createFile(), notExpiredFile = createFile();
433      when(expiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl - 1);
434      when(notExpiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl + 1);
435      List<HStoreFile> expired = Lists.newArrayList(expiredFile, expiredFile);
436      List<HStoreFile> notExpired = Lists.newArrayList(notExpiredFile, notExpiredFile);
437      List<HStoreFile> mixed = Lists.newArrayList(expiredFile, notExpiredFile);
438
439      StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create(), defaultSplitSize,
440        defaultSplitCount, defaultInitialCount, true);
441      // Merge expired if there are eligible stripes.
442      StripeCompactionPolicy.StripeInformationProvider si =
443        createStripesWithFiles(expired, expired, expired);
444      verifyWholeStripesCompaction(policy, si, 0, 2, null, 1, Long.MAX_VALUE, false);
445      // Don't merge if nothing expired.
446      si = createStripesWithFiles(notExpired, notExpired, notExpired);
447      assertNull(policy.selectCompaction(si, al(), false));
448      // Merge one expired stripe with next.
449      si = createStripesWithFiles(notExpired, expired, notExpired);
450      verifyWholeStripesCompaction(policy, si, 1, 2, null, 1, Long.MAX_VALUE, false);
451      // Merge the biggest run out of multiple options.
452      // Merge one expired stripe with next.
453      si = createStripesWithFiles(notExpired, expired, notExpired, expired, expired, notExpired);
454      verifyWholeStripesCompaction(policy, si, 3, 4, null, 1, Long.MAX_VALUE, false);
455      // Stripe with a subset of expired files is not merged.
456      si = createStripesWithFiles(expired, expired, notExpired, expired, mixed);
457      verifyWholeStripesCompaction(policy, si, 0, 1, null, 1, Long.MAX_VALUE, false);
458    } finally {
459      EnvironmentEdgeManager.reset();
460    }
461  }
462
463  @SuppressWarnings("unchecked")
464  @TestTemplate
465  public void testMergeExpiredStripes() throws Exception {
466    // HBASE-11397
467    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
468    long now = defaultTtl + 2;
469    edge.setValue(now);
470    EnvironmentEdgeManager.injectEdge(edge);
471    try {
472      HStoreFile expiredFile = createFile(), notExpiredFile = createFile();
473      when(expiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl - 1);
474      when(notExpiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl + 1);
475      List<HStoreFile> expired = Lists.newArrayList(expiredFile, expiredFile);
476      List<HStoreFile> notExpired = Lists.newArrayList(notExpiredFile, notExpiredFile);
477
478      StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create(), defaultSplitSize,
479        defaultSplitCount, defaultInitialCount, true);
480
481      // Merge all three expired stripes into one.
482      StripeCompactionPolicy.StripeInformationProvider si =
483        createStripesWithFiles(expired, expired, expired);
484      verifyMergeCompatcion(policy, si, 0, 2);
485
486      // Merge two adjacent expired stripes into one.
487      si = createStripesWithFiles(notExpired, expired, notExpired, expired, expired, notExpired);
488      verifyMergeCompatcion(policy, si, 3, 4);
489    } finally {
490      EnvironmentEdgeManager.reset();
491    }
492  }
493
494  @SuppressWarnings("unchecked")
495  private static StripeCompactionPolicy.StripeInformationProvider
496    createStripesWithFiles(List<HStoreFile>... stripeFiles) throws Exception {
497    return createStripesWithFiles(createBoundaries(stripeFiles.length),
498      Lists.newArrayList(stripeFiles), new ArrayList<>());
499  }
500
501  @TestTemplate
502  public void testSingleStripeDropDeletes() throws Exception {
503    Configuration conf = HBaseConfiguration.create();
504    // Test depends on this not being set to pass. Default breaks test. TODO: Revisit.
505    conf.unset("hbase.hstore.compaction.min.size");
506    StripeCompactionPolicy policy = createPolicy(conf);
507    // Verify the deletes can be dropped if there are no L0 files.
508    Long[][] stripes = new Long[][] { new Long[] { 3L, 2L, 2L, 2L }, new Long[] { 6L } };
509    StripeInformationProvider si = createStripesWithSizes(0, 0, stripes);
510    verifySingleStripeCompaction(policy, si, 0, true);
511    // But cannot be dropped if there are.
512    si = createStripesWithSizes(2, 2, stripes);
513    verifySingleStripeCompaction(policy, si, 0, false);
514    // Unless there are enough to cause L0 compaction.
515    si = createStripesWithSizes(6, 2, stripes);
516    ConcatenatedLists<HStoreFile> sfs = new ConcatenatedLists<>();
517    sfs.addSublist(si.getLevel0Files());
518    sfs.addSublist(si.getStripes().get(0));
519    verifyCompaction(policy, si, sfs, si.getStartRow(0), si.getEndRow(0), si.getStripeBoundaries());
520    // If we cannot actually compact all files in some stripe, L0 is chosen.
521    si = createStripesWithSizes(6, 2,
522      new Long[][] { new Long[] { 10L, 1L, 1L, 1L, 1L }, new Long[] { 12L } });
523    verifyCompaction(policy, si, si.getLevel0Files(), null, null, si.getStripeBoundaries());
524    // even if L0 has no file
525    // if all files of stripe aren't selected, delete must not be dropped.
526    stripes = new Long[][] { new Long[] { 100L, 3L, 2L, 2L, 2L }, new Long[] { 6L } };
527    si = createStripesWithSizes(0, 0, stripes);
528    List<HStoreFile> compactFile = new ArrayList<>();
529    Iterator<HStoreFile> iter = si.getStripes().get(0).listIterator(1);
530    while (iter.hasNext()) {
531      compactFile.add(iter.next());
532    }
533    verifyCompaction(policy, si, compactFile, false, 1, null, si.getStartRow(0), si.getEndRow(0),
534      true);
535  }
536
537  @TestTemplate
538  public void testCheckExpiredL0Compaction() throws Exception {
539    Configuration conf = HBaseConfiguration.create();
540    int minL0 = 100;
541    conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, minL0);
542    conf.setInt(MIN_FILES_KEY, 4);
543
544    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
545    long now = defaultTtl + 2;
546    edge.setValue(now);
547    EnvironmentEdgeManager.injectEdge(edge);
548    HStoreFile expiredFile = createFile(10), notExpiredFile = createFile(10);
549    when(expiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl - 1);
550    when(notExpiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl + 1);
551    List<HStoreFile> expired = Lists.newArrayList(expiredFile, expiredFile);
552    List<HStoreFile> mixed = Lists.newArrayList(expiredFile, notExpiredFile);
553
554    StripeCompactionPolicy policy =
555      createPolicy(conf, defaultSplitSize, defaultSplitCount, defaultInitialCount, true);
556    // Merge expired if there are eligible stripes.
557    StripeCompactionPolicy.StripeInformationProvider si =
558      createStripesWithFiles(null, new ArrayList<>(), mixed);
559    assertFalse(policy.needsCompactions(si, al()));
560
561    List<HStoreFile> largeMixed = new ArrayList<>();
562    for (int i = 0; i < minL0 - 1; i++) {
563      largeMixed.add(i % 2 == 0 ? notExpiredFile : expiredFile);
564    }
565    si = createStripesWithFiles(null, new ArrayList<>(), largeMixed);
566    assertFalse(policy.needsCompactions(si, al()));
567
568    si = createStripesWithFiles(null, new ArrayList<>(), expired);
569    assertFalse(policy.needsSingleStripeCompaction(si));
570    assertFalse(policy.hasExpiredStripes(si));
571    assertTrue(policy.allL0FilesExpired(si));
572    assertTrue(policy.needsCompactions(si, al()));
573  }
574
575  /********* HELPER METHODS ************/
576  private static StripeCompactionPolicy createPolicy(Configuration conf) throws Exception {
577    return createPolicy(conf, defaultSplitSize, defaultSplitCount, defaultInitialCount, false);
578  }
579
580  private static StripeCompactionPolicy createPolicy(Configuration conf, long splitSize,
581    float splitCount, int initialCount, boolean hasTtl) throws Exception {
582    conf.setLong(StripeStoreConfig.SIZE_TO_SPLIT_KEY, splitSize);
583    conf.setFloat(StripeStoreConfig.SPLIT_PARTS_KEY, splitCount);
584    conf.setInt(StripeStoreConfig.INITIAL_STRIPE_COUNT_KEY, initialCount);
585    StoreConfigInformation sci = mock(StoreConfigInformation.class);
586    when(sci.getStoreFileTtl()).thenReturn(hasTtl ? defaultTtl : Long.MAX_VALUE);
587    when(sci.getRegionInfo()).thenReturn(RegionInfoBuilder.FIRST_META_REGIONINFO);
588    StripeStoreConfig ssc = new StripeStoreConfig(conf, sci);
589    return new StripeCompactionPolicy(conf, sci, ssc);
590  }
591
592  private static ArrayList<HStoreFile> al(HStoreFile... sfs) {
593    return new ArrayList<>(Arrays.asList(sfs));
594  }
595
596  private void verifyMergeCompatcion(StripeCompactionPolicy policy, StripeInformationProvider si,
597    int from, int to) throws Exception {
598    StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
599    Collection<HStoreFile> sfs = getAllFiles(si, from, to);
600    verifyCollectionsEqual(sfs, scr.getRequest().getFiles());
601
602    // All the Stripes are expired, so the Compactor will not create any Writers. We need to create
603    // an empty file to preserve metadata
604    StripeCompactor sc = createCompactor();
605    List<Path> paths = scr.execute(sc, NoLimitThroughputController.INSTANCE, null);
606    assertEquals(1, paths.size());
607  }
608
609  /**
610   * Verify the compaction that includes several entire stripes.
611   * @param policy      Policy to test.
612   * @param si          Stripe information pre-set with stripes to test.
613   * @param from        Starting stripe.
614   * @param to          Ending stripe (inclusive).
615   * @param dropDeletes Whether to drop deletes from compaction range.
616   * @param count       Expected # of resulting stripes, null if not checked.
617   * @param size        Expected target stripe size, null if not checked.
618   */
619  private void verifyWholeStripesCompaction(StripeCompactionPolicy policy,
620    StripeInformationProvider si, int from, int to, Boolean dropDeletes, Integer count, Long size,
621    boolean needsCompaction) throws IOException {
622    verifyCompaction(policy, si, getAllFiles(si, from, to), dropDeletes, count, size,
623      si.getStartRow(from), si.getEndRow(to), needsCompaction);
624  }
625
626  private void verifyWholeStripesCompaction(StripeCompactionPolicy policy,
627    StripeInformationProvider si, int from, int to, Boolean dropDeletes, Integer count, Long size)
628    throws IOException {
629    verifyWholeStripesCompaction(policy, si, from, to, dropDeletes, count, size, true);
630  }
631
632  private void verifySingleStripeCompaction(StripeCompactionPolicy policy,
633    StripeInformationProvider si, int index, Boolean dropDeletes) throws IOException {
634    verifyWholeStripesCompaction(policy, si, index, index, dropDeletes, 1, null, true);
635  }
636
637  /**
638   * Verify no compaction is needed or selected.
639   * @param policy Policy to test.
640   * @param si     Stripe information pre-set with stripes to test.
641   */
642  private void verifyNoCompaction(StripeCompactionPolicy policy, StripeInformationProvider si)
643    throws IOException {
644    assertNull(policy.selectCompaction(si, al(), false));
645    assertFalse(policy.needsCompactions(si, al()));
646  }
647
648  /**
649   * Verify arbitrary compaction.
650   * @param policy          Policy to test.
651   * @param si              Stripe information pre-set with stripes to test.
652   * @param sfs             Files that should be compacted.
653   * @param dropDeletesFrom Row from which to drop deletes.
654   * @param dropDeletesTo   Row to which to drop deletes.
655   * @param boundaries      Expected target stripe boundaries.
656   */
657  private void verifyCompaction(StripeCompactionPolicy policy, StripeInformationProvider si,
658    Collection<HStoreFile> sfs, byte[] dropDeletesFrom, byte[] dropDeletesTo,
659    final List<byte[]> boundaries) throws Exception {
660    StripeCompactor sc = mock(StripeCompactor.class);
661    assertTrue(policy.needsCompactions(si, al()));
662    StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
663    verifyCollectionsEqual(sfs, scr.getRequest().getFiles());
664    scr.execute(sc, NoLimitThroughputController.INSTANCE, null);
665    verify(sc, times(1)).compact(eq(scr.getRequest()), argThat(new ArgumentMatcher<List<byte[]>>() {
666      @Override
667      public boolean matches(List<byte[]> argument) {
668        List<byte[]> other = argument;
669        if (other.size() != boundaries.size()) {
670          return false;
671        }
672        for (int i = 0; i < other.size(); ++i) {
673          if (!Bytes.equals(other.get(i), boundaries.get(i))) {
674            return false;
675          }
676        }
677        return true;
678      }
679    }), dropDeletesFrom == null ? isNull(byte[].class) : aryEq(dropDeletesFrom),
680      dropDeletesTo == null ? isNull(byte[].class) : aryEq(dropDeletesTo), any(), any());
681  }
682
683  /**
684   * Verify arbitrary compaction.
685   * @param policy      Policy to test.
686   * @param si          Stripe information pre-set with stripes to test.
687   * @param sfs         Files that should be compacted.
688   * @param dropDeletes Whether to drop deletes from compaction range.
689   * @param count       Expected # of resulting stripes, null if not checked.
690   * @param size        Expected target stripe size, null if not checked.
691   * @param start       Left boundary of the compaction.
692   * @param end         Right boundary of the compaction.
693   */
694  private void verifyCompaction(StripeCompactionPolicy policy, StripeInformationProvider si,
695    Collection<HStoreFile> sfs, Boolean dropDeletes, Integer count, Long size, byte[] start,
696    byte[] end, boolean needsCompaction) throws IOException {
697    StripeCompactor sc = mock(StripeCompactor.class);
698    assertTrue(!needsCompaction || policy.needsCompactions(si, al()));
699    StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
700    verifyCollectionsEqual(sfs, scr.getRequest().getFiles());
701    scr.execute(sc, NoLimitThroughputController.INSTANCE, null);
702    verify(sc, times(1)).compact(eq(scr.getRequest()),
703      count == null ? anyInt() : eq(count.intValue()),
704      size == null ? anyLong() : eq(size.longValue()), aryEq(start), aryEq(end),
705      dropDeletesMatcher(dropDeletes, start), dropDeletesMatcher(dropDeletes, end), any(), any());
706  }
707
708  /** Verify arbitrary flush. */
709  protected void verifyFlush(StripeCompactionPolicy policy, StripeInformationProvider si,
710    KeyValue[] input, KeyValue[][] expected, byte[][] boundaries) throws IOException {
711    StoreFileWritersCapture writers = new StoreFileWritersCapture();
712    StripeStoreFlusher.StripeFlushRequest req =
713      policy.selectFlush(CellComparatorImpl.COMPARATOR, si, input.length);
714    StripeMultiFileWriter mw = req.createWriter();
715    mw.init(null, writers);
716    for (KeyValue kv : input) {
717      mw.append(kv);
718    }
719    boolean hasMetadata = boundaries != null;
720    mw.commitWriters(0, false);
721    writers.verifyKvs(expected, true, hasMetadata);
722    if (hasMetadata) {
723      writers.verifyBoundaries(boundaries);
724    }
725  }
726
727  private byte[] dropDeletesMatcher(Boolean dropDeletes, byte[] value) {
728    return dropDeletes == null
729      ? any()
730      : (dropDeletes.booleanValue() ? aryEq(value) : isNull(byte[].class));
731  }
732
733  private void verifyCollectionsEqual(Collection<HStoreFile> sfs, Collection<HStoreFile> scr) {
734    // Dumb.
735    assertEquals(sfs.size(), scr.size());
736    assertTrue(scr.containsAll(sfs));
737  }
738
739  private static List<HStoreFile> getAllFiles(StripeInformationProvider si, int fromStripe,
740    int toStripe) {
741    ArrayList<HStoreFile> expected = new ArrayList<>();
742    for (int i = fromStripe; i <= toStripe; ++i) {
743      expected.addAll(si.getStripes().get(i));
744    }
745    return expected;
746  }
747
748  /**
749   * @param l0Count    Number of L0 files.
750   * @param boundaries Target boundaries.
751   * @return Mock stripes.
752   */
753  private static StripeInformationProvider createStripes(int l0Count, byte[]... boundaries)
754    throws Exception {
755    List<Long> l0Sizes = new ArrayList<>();
756    for (int i = 0; i < l0Count; ++i) {
757      l0Sizes.add(5L);
758    }
759    List<List<Long>> sizes = new ArrayList<>();
760    for (int i = 0; i <= boundaries.length; ++i) {
761      sizes.add(Arrays.asList(Long.valueOf(5)));
762    }
763    return createStripes(Arrays.asList(boundaries), sizes, l0Sizes);
764  }
765
766  /**
767   * @param l0Count Number of L0 files.
768   * @param l0Size  Size of each file.
769   * @return Mock stripes.
770   */
771  private static StripeInformationProvider createStripesL0Only(int l0Count, long l0Size)
772    throws Exception {
773    List<Long> l0Sizes = new ArrayList<>();
774    for (int i = 0; i < l0Count; ++i) {
775      l0Sizes.add(l0Size);
776    }
777    return createStripes(null, new ArrayList<>(), l0Sizes);
778  }
779
780  /**
781   * @param l0Count Number of L0 files.
782   * @param l0Size  Size of each file.
783   * @param sizes   Sizes of the files; each sub-array representing a stripe.
784   * @return Mock stripes.
785   */
786  private static StripeInformationProvider createStripesWithSizes(int l0Count, long l0Size,
787    Long[]... sizes) throws Exception {
788    ArrayList<List<Long>> sizeList = new ArrayList<>(sizes.length);
789    for (Long[] size : sizes) {
790      sizeList.add(Arrays.asList(size));
791    }
792    return createStripesWithSizes(l0Count, l0Size, sizeList);
793  }
794
795  private static StripeInformationProvider createStripesWithSizes(int l0Count, long l0Size,
796    List<List<Long>> sizes) throws Exception {
797    List<byte[]> boundaries = createBoundaries(sizes.size());
798    List<Long> l0Sizes = new ArrayList<>();
799    for (int i = 0; i < l0Count; ++i) {
800      l0Sizes.add(l0Size);
801    }
802    return createStripes(boundaries, sizes, l0Sizes);
803  }
804
805  private static List<byte[]> createBoundaries(int stripeCount) {
806    byte[][] keys = new byte[][] { KEY_A, KEY_B, KEY_C, KEY_D, KEY_E };
807    assert stripeCount <= keys.length + 1;
808    List<byte[]> boundaries = new ArrayList<>();
809    boundaries.addAll(Arrays.asList(keys).subList(0, stripeCount - 1));
810    return boundaries;
811  }
812
813  private static StripeInformationProvider createStripes(List<byte[]> boundaries,
814    List<List<Long>> stripeSizes, List<Long> l0Sizes) throws Exception {
815    List<List<HStoreFile>> stripeFiles = new ArrayList<>(stripeSizes.size());
816    for (List<Long> sizes : stripeSizes) {
817      List<HStoreFile> sfs = new ArrayList<>(sizes.size());
818      for (Long size : sizes) {
819        sfs.add(createFile(size));
820      }
821      stripeFiles.add(sfs);
822    }
823    List<HStoreFile> l0Files = new ArrayList<>();
824    for (Long size : l0Sizes) {
825      l0Files.add(createFile(size));
826    }
827    return createStripesWithFiles(boundaries, stripeFiles, l0Files);
828  }
829
830  /**
831   * This method actually does all the work.
832   */
833  private static StripeInformationProvider createStripesWithFiles(List<byte[]> boundaries,
834    List<List<HStoreFile>> stripeFiles, List<HStoreFile> l0Files) throws Exception {
835    ArrayList<ImmutableList<HStoreFile>> stripes = new ArrayList<>();
836    ArrayList<byte[]> boundariesList = new ArrayList<>();
837    StripeInformationProvider si = mock(StripeInformationProvider.class);
838    if (!stripeFiles.isEmpty()) {
839      assert stripeFiles.size() == (boundaries.size() + 1);
840      boundariesList.add(OPEN_KEY);
841      for (int i = 0; i <= boundaries.size(); ++i) {
842        byte[] startKey = ((i == 0) ? OPEN_KEY : boundaries.get(i - 1));
843        byte[] endKey = ((i == boundaries.size()) ? OPEN_KEY : boundaries.get(i));
844        boundariesList.add(endKey);
845        for (HStoreFile sf : stripeFiles.get(i)) {
846          setFileStripe(sf, startKey, endKey);
847        }
848        stripes.add(ImmutableList.copyOf(stripeFiles.get(i)));
849        when(si.getStartRow(eq(i))).thenReturn(startKey);
850        when(si.getEndRow(eq(i))).thenReturn(endKey);
851      }
852    }
853    ConcatenatedLists<HStoreFile> sfs = new ConcatenatedLists<>();
854    sfs.addAllSublists(stripes);
855    sfs.addSublist(l0Files);
856    when(si.getStoreFiles()).thenReturn(sfs);
857    when(si.getStripes()).thenReturn(stripes);
858    when(si.getStripeBoundaries()).thenReturn(boundariesList);
859    when(si.getStripeCount()).thenReturn(stripes.size());
860    when(si.getLevel0Files()).thenReturn(l0Files);
861    return si;
862  }
863
864  private static HStoreFile createFile(long size) throws Exception {
865    HStoreFile sf = mock(HStoreFile.class);
866    when(sf.getPath()).thenReturn(new Path("moo"));
867    StoreFileReader r = mock(StoreFileReader.class);
868    when(r.getEntries()).thenReturn(size);
869    when(r.length()).thenReturn(size);
870    when(r.getBloomFilterType()).thenReturn(BloomType.NONE);
871    when(r.getHFileReader()).thenReturn(mock(HFile.Reader.class));
872    when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong(), anyLong(),
873      anyBoolean())).thenReturn(mock(StoreFileScanner.class));
874    when(sf.getReader()).thenReturn(r);
875    when(sf.getBulkLoadTimestamp()).thenReturn(OptionalLong.empty());
876    when(r.getMaxTimestamp()).thenReturn(TimeRange.INITIAL_MAX_TIMESTAMP);
877    return sf;
878  }
879
880  private static HStoreFile createFile() throws Exception {
881    return createFile(0);
882  }
883
884  private static void setFileStripe(HStoreFile sf, byte[] startKey, byte[] endKey) {
885    when(sf.getMetadataValue(StripeStoreFileManager.STRIPE_START_KEY)).thenReturn(startKey);
886    when(sf.getMetadataValue(StripeStoreFileManager.STRIPE_END_KEY)).thenReturn(endKey);
887  }
888
889  private StripeCompactor createCompactor() throws Exception {
890    ColumnFamilyDescriptor familyDescriptor =
891      ColumnFamilyDescriptorBuilder.of(Bytes.toBytes("foo"));
892    StoreFileWritersCapture writers = new StoreFileWritersCapture();
893    HStore store = mock(HStore.class);
894    RegionInfo info = mock(RegionInfo.class);
895    when(info.getRegionNameAsString()).thenReturn("testRegion");
896    when(store.getColumnFamilyDescriptor()).thenReturn(familyDescriptor);
897    when(store.getRegionInfo()).thenReturn(info);
898    StoreEngine storeEngine = mock(StoreEngine.class);
899    when(storeEngine.createWriter(any(CreateStoreFileWriterParams.class))).thenAnswer(writers);
900    when(store.getStoreEngine()).thenReturn(storeEngine);
901
902    Configuration conf = HBaseConfiguration.create();
903    conf.setBoolean("hbase.regionserver.compaction.private.readers", usePrivateReaders);
904    final Scanner scanner = new Scanner();
905    return new StripeCompactor(conf, store) {
906      @Override
907      protected InternalScanner createScanner(HStore store, ScanInfo scanInfo,
908        List<StoreFileScanner> scanners, long smallestReadPoint, long earliestPutTs,
909        byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
910        return scanner;
911      }
912
913      @Override
914      protected InternalScanner createScanner(HStore store, ScanInfo scanInfo,
915        List<StoreFileScanner> scanners, ScanType scanType, long smallestReadPoint,
916        long earliestPutTs) throws IOException {
917        return scanner;
918      }
919    };
920  }
921
922  private static class Scanner implements InternalScanner {
923    private final ArrayList<KeyValue> kvs;
924
925    public Scanner(KeyValue... kvs) {
926      this.kvs = new ArrayList<>(Arrays.asList(kvs));
927    }
928
929    @Override
930    public boolean next(List<? super ExtendedCell> result, ScannerContext scannerContext)
931      throws IOException {
932      if (kvs.isEmpty()) {
933        return false;
934      }
935      result.add(kvs.remove(0));
936      return !kvs.isEmpty();
937    }
938
939    @Override
940    public void close() throws IOException {
941    }
942  }
943}