001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.compactions;
019
020import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertNull;
024import static org.junit.Assert.assertTrue;
025import static org.mockito.AdditionalMatchers.aryEq;
026import static org.mockito.ArgumentMatchers.any;
027import static org.mockito.ArgumentMatchers.anyBoolean;
028import static org.mockito.ArgumentMatchers.anyInt;
029import static org.mockito.ArgumentMatchers.anyLong;
030import static org.mockito.ArgumentMatchers.anyString;
031import static org.mockito.ArgumentMatchers.argThat;
032import static org.mockito.ArgumentMatchers.eq;
033import static org.mockito.ArgumentMatchers.isNull;
034import static org.mockito.Mockito.mock;
035import static org.mockito.Mockito.only;
036import static org.mockito.Mockito.times;
037import static org.mockito.Mockito.verify;
038import static org.mockito.Mockito.when;
039import java.io.IOException;
040import java.util.ArrayList;
041import java.util.Arrays;
042import java.util.Collection;
043import java.util.Iterator;
044import java.util.List;
045import java.util.OptionalLong;
046import org.apache.hadoop.conf.Configuration;
047import org.apache.hadoop.fs.Path;
048import org.apache.hadoop.hbase.Cell;
049import org.apache.hadoop.hbase.CellComparatorImpl;
050import org.apache.hadoop.hbase.HBaseClassTestRule;
051import org.apache.hadoop.hbase.HBaseConfiguration;
052import org.apache.hadoop.hbase.HColumnDescriptor;
053import org.apache.hadoop.hbase.HRegionInfo;
054import org.apache.hadoop.hbase.KeyValue;
055import org.apache.hadoop.hbase.io.hfile.HFile;
056import org.apache.hadoop.hbase.regionserver.BloomType;
057import org.apache.hadoop.hbase.regionserver.HStore;
058import org.apache.hadoop.hbase.regionserver.HStoreFile;
059import org.apache.hadoop.hbase.regionserver.InternalScanner;
060import org.apache.hadoop.hbase.regionserver.ScanInfo;
061import org.apache.hadoop.hbase.regionserver.ScanType;
062import org.apache.hadoop.hbase.regionserver.ScannerContext;
063import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
064import org.apache.hadoop.hbase.regionserver.StoreFileReader;
065import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
066import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
067import org.apache.hadoop.hbase.regionserver.StripeStoreConfig;
068import org.apache.hadoop.hbase.regionserver.StripeStoreFileManager;
069import org.apache.hadoop.hbase.regionserver.StripeStoreFlusher;
070import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy.StripeInformationProvider;
071import org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.StoreFileWritersCapture;
072import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
073import org.apache.hadoop.hbase.testclassification.MediumTests;
074import org.apache.hadoop.hbase.testclassification.RegionServerTests;
075import org.apache.hadoop.hbase.util.Bytes;
076import org.apache.hadoop.hbase.util.ConcatenatedLists;
077import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
078import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
079import org.junit.ClassRule;
080import org.junit.Test;
081import org.junit.experimental.categories.Category;
082import org.junit.runner.RunWith;
083import org.junit.runners.Parameterized;
084import org.junit.runners.Parameterized.Parameter;
085import org.junit.runners.Parameterized.Parameters;
086import org.mockito.ArgumentMatcher;
087import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
088import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
089
090@RunWith(Parameterized.class)
091@Category({RegionServerTests.class, MediumTests.class})
092public class TestStripeCompactionPolicy {
093
094  @ClassRule
095  public static final HBaseClassTestRule CLASS_RULE =
096      HBaseClassTestRule.forClass(TestStripeCompactionPolicy.class);
097
098  private static final byte[] KEY_A = Bytes.toBytes("aaa");
099  private static final byte[] KEY_B = Bytes.toBytes("bbb");
100  private static final byte[] KEY_C = Bytes.toBytes("ccc");
101  private static final byte[] KEY_D = Bytes.toBytes("ddd");
102  private static final byte[] KEY_E = Bytes.toBytes("eee");
103  private static final KeyValue KV_A = new KeyValue(KEY_A, 0L);
104  private static final KeyValue KV_B = new KeyValue(KEY_B, 0L);
105  private static final KeyValue KV_C = new KeyValue(KEY_C, 0L);
106  private static final KeyValue KV_D = new KeyValue(KEY_D, 0L);
107  private static final KeyValue KV_E = new KeyValue(KEY_E, 0L);
108
109
110  private static long defaultSplitSize = 18;
111  private static float defaultSplitCount = 1.8F;
112  private final static int defaultInitialCount = 1;
113  private static long defaultTtl = 1000 * 1000;
114
115  @Parameters(name = "{index}: usePrivateReaders={0}")
116  public static Iterable<Object[]> data() {
117    return Arrays.asList(new Object[] { true }, new Object[] { false });
118  }
119
120  @Parameter
121  public boolean usePrivateReaders;
122
123  @Test
124  public void testNoStripesFromFlush() throws Exception {
125    Configuration conf = HBaseConfiguration.create();
126    conf.setBoolean(StripeStoreConfig.FLUSH_TO_L0_KEY, true);
127    StripeCompactionPolicy policy = createPolicy(conf);
128    StripeInformationProvider si = createStripesL0Only(0, 0);
129
130    KeyValue[] input = new KeyValue[] { KV_A, KV_B, KV_C, KV_D, KV_E };
131    KeyValue[][] expected = new KeyValue[][] { input };
132    verifyFlush(policy, si, input, expected, null);
133  }
134
135  @Test
136  public void testOldStripesFromFlush() throws Exception {
137    StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create());
138    StripeInformationProvider si = createStripes(0, KEY_C, KEY_D);
139
140    KeyValue[] input = new KeyValue[] { KV_B, KV_C, KV_C, KV_D, KV_E };
141    KeyValue[][] expected = new KeyValue[][] { new KeyValue[] { KV_B },
142      new KeyValue[] { KV_C, KV_C }, new KeyValue[] {  KV_D, KV_E } };
143    verifyFlush(policy, si, input, expected, new byte[][] { OPEN_KEY, KEY_C, KEY_D, OPEN_KEY });
144  }
145
146  @Test
147  public void testNewStripesFromFlush() throws Exception {
148    StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create());
149    StripeInformationProvider si = createStripesL0Only(0, 0);
150    KeyValue[] input = new KeyValue[] { KV_B, KV_C, KV_C, KV_D, KV_E };
151    // Starts with one stripe; unlike flush results, must have metadata
152    KeyValue[][] expected = new KeyValue[][] { input };
153    verifyFlush(policy, si, input, expected, new byte[][] { OPEN_KEY, OPEN_KEY });
154  }
155
156  @Test
157  public void testSingleStripeCompaction() throws Exception {
158    // Create a special policy that only compacts single stripes, using standard methods.
159    Configuration conf = HBaseConfiguration.create();
160    // Test depends on this not being set to pass.  Default breaks test.  TODO: Revisit.
161    conf.unset("hbase.hstore.compaction.min.size");
162    conf.setFloat(CompactionConfiguration.HBASE_HSTORE_COMPACTION_RATIO_KEY, 1.0F);
163    conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 3);
164    conf.setInt(StripeStoreConfig.MAX_FILES_KEY, 4);
165    conf.setLong(StripeStoreConfig.SIZE_TO_SPLIT_KEY, 1000); // make sure the are no splits
166    StoreConfigInformation sci = mock(StoreConfigInformation.class);
167    when(sci.getRegionInfo()).thenReturn(HRegionInfo.FIRST_META_REGIONINFO);
168    StripeStoreConfig ssc = new StripeStoreConfig(conf, sci);
169    StripeCompactionPolicy policy = new StripeCompactionPolicy(conf, sci, ssc) {
170      @Override
171      public StripeCompactionRequest selectCompaction(StripeInformationProvider si,
172          List<HStoreFile> filesCompacting, boolean isOffpeak) throws IOException {
173        if (!filesCompacting.isEmpty()) {
174          return null;
175        }
176        return selectSingleStripeCompaction(si, false, false, isOffpeak);
177      }
178
179      @Override
180      public boolean needsCompactions(
181          StripeInformationProvider si, List<HStoreFile> filesCompacting) {
182        if (!filesCompacting.isEmpty()) {
183          return false;
184        }
185        return needsSingleStripeCompaction(si);
186      }
187    };
188
189    // No compaction due to min files or ratio
190    StripeInformationProvider si = createStripesWithSizes(0, 0,
191        new Long[] { 2L }, new Long[] { 3L, 3L }, new Long[] { 5L, 1L });
192    verifyNoCompaction(policy, si);
193    // No compaction due to min files or ratio - will report needed, but not do any.
194    si = createStripesWithSizes(0, 0,
195        new Long[] { 2L }, new Long[] { 3L, 3L }, new Long[] { 5L, 1L, 1L });
196    assertNull(policy.selectCompaction(si, al(), false));
197    assertTrue(policy.needsCompactions(si, al()));
198    // One stripe has possible compaction
199    si = createStripesWithSizes(0, 0,
200        new Long[] { 2L }, new Long[] { 3L, 3L }, new Long[] { 5L, 4L, 3L });
201    verifySingleStripeCompaction(policy, si, 2, null);
202    // Several stripes have possible compactions; choose best quality (removes most files)
203    si = createStripesWithSizes(0, 0,
204        new Long[] { 3L, 2L, 2L }, new Long[] { 2L, 2L, 1L }, new Long[] { 3L, 2L, 2L, 1L });
205    verifySingleStripeCompaction(policy, si, 2, null);
206    si = createStripesWithSizes(0, 0,
207        new Long[] { 5L }, new Long[] { 3L, 2L, 2L, 1L }, new Long[] { 3L, 2L, 2L });
208    verifySingleStripeCompaction(policy, si, 1, null);
209    // Or with smallest files, if the count is the same
210    si = createStripesWithSizes(0, 0,
211        new Long[] { 3L, 3L, 3L }, new Long[] { 3L, 1L, 2L }, new Long[] { 3L, 2L, 2L });
212    verifySingleStripeCompaction(policy, si, 1, null);
213    // Verify max count is respected.
214    si = createStripesWithSizes(0, 0, new Long[] { 5L }, new Long[] { 5L, 4L, 4L, 4L, 4L });
215    List<HStoreFile> sfs = si.getStripes().get(1).subList(1, 5);
216    verifyCompaction(policy, si, sfs, null, 1, null, si.getStartRow(1), si.getEndRow(1), true);
217    // Verify ratio is applied.
218    si = createStripesWithSizes(0, 0, new Long[] { 5L }, new Long[] { 50L, 4L, 4L, 4L, 4L });
219    sfs = si.getStripes().get(1).subList(1, 5);
220    verifyCompaction(policy, si, sfs, null, 1, null, si.getStartRow(1), si.getEndRow(1), true);
221  }
222
223  @Test
224  public void testWithParallelCompaction() throws Exception {
225    // TODO: currently only one compaction at a time per store is allowed. If this changes,
226    //       the appropriate file exclusion testing would need to be done in respective tests.
227    assertNull(createPolicy(HBaseConfiguration.create()).selectCompaction(
228        mock(StripeInformationProvider.class), al(createFile()), false));
229  }
230
231  @Test
232  public void testWithReferences() throws Exception {
233    StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create());
234    StripeCompactor sc = mock(StripeCompactor.class);
235    HStoreFile ref = createFile();
236    when(ref.isReference()).thenReturn(true);
237    StripeInformationProvider si = mock(StripeInformationProvider.class);
238    Collection<HStoreFile> sfs = al(ref, createFile());
239    when(si.getStorefiles()).thenReturn(sfs);
240
241    assertTrue(policy.needsCompactions(si, al()));
242    StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
243    // UnmodifiableCollection does not implement equals so we need to change it here to a
244    // collection that implements it.
245    assertEquals(si.getStorefiles(), new ArrayList<>(scr.getRequest().getFiles()));
246    scr.execute(sc, NoLimitThroughputController.INSTANCE, null);
247    verify(sc, only()).compact(eq(scr.getRequest()), anyInt(), anyLong(), aryEq(OPEN_KEY),
248      aryEq(OPEN_KEY), aryEq(OPEN_KEY), aryEq(OPEN_KEY),
249      any(), any());
250  }
251
252  @Test
253  public void testInitialCountFromL0() throws Exception {
254    Configuration conf = HBaseConfiguration.create();
255    conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 2);
256    StripeCompactionPolicy policy = createPolicy(
257        conf, defaultSplitSize, defaultSplitCount, 2, false);
258    StripeCompactionPolicy.StripeInformationProvider si = createStripesL0Only(3, 8);
259    verifyCompaction(policy, si, si.getStorefiles(), true, 2, 12L, OPEN_KEY, OPEN_KEY, true);
260    si = createStripesL0Only(3, 10); // If result would be too large, split into smaller parts.
261    verifyCompaction(policy, si, si.getStorefiles(), true, 3, 10L, OPEN_KEY, OPEN_KEY, true);
262    policy = createPolicy(conf, defaultSplitSize, defaultSplitCount, 6, false);
263    verifyCompaction(policy, si, si.getStorefiles(), true, 6, 5L, OPEN_KEY, OPEN_KEY, true);
264  }
265
266  @Test
267  public void testExistingStripesFromL0() throws Exception {
268    Configuration conf = HBaseConfiguration.create();
269    conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 3);
270    StripeCompactionPolicy.StripeInformationProvider si = createStripes(3, KEY_A);
271    verifyCompaction(
272        createPolicy(conf), si, si.getLevel0Files(), null, null, si.getStripeBoundaries());
273  }
274
275  @Test
276  public void testNothingToCompactFromL0() throws Exception {
277    Configuration conf = HBaseConfiguration.create();
278    conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 4);
279    StripeCompactionPolicy.StripeInformationProvider si = createStripesL0Only(3, 10);
280    StripeCompactionPolicy policy = createPolicy(conf);
281    verifyNoCompaction(policy, si);
282
283    si = createStripes(3, KEY_A);
284    verifyNoCompaction(policy, si);
285  }
286
287  @Test
288  public void testSplitOffStripe() throws Exception {
289    Configuration conf = HBaseConfiguration.create();
290    // Test depends on this not being set to pass.  Default breaks test.  TODO: Revisit.
291    conf.unset("hbase.hstore.compaction.min.size");
292    // First test everything with default split count of 2, then split into more.
293    conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 2);
294    Long[] toSplit = new Long[] { defaultSplitSize - 2, 1L, 1L };
295    Long[] noSplit = new Long[] { defaultSplitSize - 2, 1L };
296    long splitTargetSize = (long)(defaultSplitSize / defaultSplitCount);
297    // Don't split if not eligible for compaction.
298    StripeCompactionPolicy.StripeInformationProvider si =
299        createStripesWithSizes(0, 0, new Long[] { defaultSplitSize - 2, 2L });
300    assertNull(createPolicy(conf).selectCompaction(si, al(), false));
301    // Make sure everything is eligible.
302    conf.setFloat(CompactionConfiguration.HBASE_HSTORE_COMPACTION_RATIO_KEY, 500f);
303    StripeCompactionPolicy policy = createPolicy(conf);
304    verifyWholeStripesCompaction(policy, si, 0, 0, null, 2, splitTargetSize);
305    // Add some extra stripes...
306    si = createStripesWithSizes(0, 0, noSplit, noSplit, toSplit);
307    verifyWholeStripesCompaction(policy, si, 2, 2, null, 2, splitTargetSize);
308    // In the middle.
309    si = createStripesWithSizes(0, 0, noSplit, toSplit, noSplit);
310    verifyWholeStripesCompaction(policy, si, 1, 1, null, 2, splitTargetSize);
311    // No split-off with different config (larger split size).
312    // However, in this case some eligible stripe will just be compacted alone.
313    StripeCompactionPolicy specPolicy = createPolicy(
314        conf, defaultSplitSize + 1, defaultSplitCount, defaultInitialCount, false);
315    verifySingleStripeCompaction(specPolicy, si, 1, null);
316  }
317
318  @Test
319  public void testSplitOffStripeOffPeak() throws Exception {
320    // for HBASE-11439
321    Configuration conf = HBaseConfiguration.create();
322
323    // Test depends on this not being set to pass.  Default breaks test.  TODO: Revisit.
324    conf.unset("hbase.hstore.compaction.min.size");
325
326    conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 2);
327    // Select the last 2 files.
328    StripeCompactionPolicy.StripeInformationProvider si =
329        createStripesWithSizes(0, 0, new Long[] { defaultSplitSize - 2, 1L, 1L });
330    assertEquals(2, createPolicy(conf).selectCompaction(si, al(), false).getRequest().getFiles()
331        .size());
332    // Make sure everything is eligible in offpeak.
333    conf.setFloat("hbase.hstore.compaction.ratio.offpeak", 500f);
334    assertEquals(3, createPolicy(conf).selectCompaction(si, al(), true).getRequest().getFiles()
335        .size());
336  }
337
338  @Test
339  public void testSplitOffStripeDropDeletes() throws Exception {
340    Configuration conf = HBaseConfiguration.create();
341    conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 2);
342    StripeCompactionPolicy policy = createPolicy(conf);
343    Long[] toSplit = new Long[] { defaultSplitSize / 2, defaultSplitSize / 2 };
344    Long[] noSplit = new Long[] { 1L };
345    long splitTargetSize = (long)(defaultSplitSize / defaultSplitCount);
346
347    // Verify the deletes can be dropped if there are no L0 files.
348    StripeCompactionPolicy.StripeInformationProvider si =
349        createStripesWithSizes(0, 0, noSplit, toSplit);
350    verifyWholeStripesCompaction(policy, si, 1, 1,    true, null, splitTargetSize);
351    // But cannot be dropped if there are.
352    si = createStripesWithSizes(2, 2, noSplit, toSplit);
353    verifyWholeStripesCompaction(policy, si, 1, 1,    false, null, splitTargetSize);
354  }
355
356  @SuppressWarnings("unchecked")
357  @Test
358  public void testMergeExpiredFiles() throws Exception {
359    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
360    long now = defaultTtl + 2;
361    edge.setValue(now);
362    EnvironmentEdgeManager.injectEdge(edge);
363    try {
364      HStoreFile expiredFile = createFile(), notExpiredFile = createFile();
365      when(expiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl - 1);
366      when(notExpiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl + 1);
367      List<HStoreFile> expired = Lists.newArrayList(expiredFile, expiredFile);
368      List<HStoreFile> notExpired = Lists.newArrayList(notExpiredFile, notExpiredFile);
369      List<HStoreFile> mixed = Lists.newArrayList(expiredFile, notExpiredFile);
370
371      StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create(),
372          defaultSplitSize, defaultSplitCount, defaultInitialCount, true);
373      // Merge expired if there are eligible stripes.
374      StripeCompactionPolicy.StripeInformationProvider si =
375          createStripesWithFiles(expired, expired, expired);
376      verifyWholeStripesCompaction(policy, si, 0, 2, null, 1, Long.MAX_VALUE, false);
377      // Don't merge if nothing expired.
378      si = createStripesWithFiles(notExpired, notExpired, notExpired);
379      assertNull(policy.selectCompaction(si, al(), false));
380      // Merge one expired stripe with next.
381      si = createStripesWithFiles(notExpired, expired, notExpired);
382      verifyWholeStripesCompaction(policy, si, 1, 2, null, 1, Long.MAX_VALUE, false);
383      // Merge the biggest run out of multiple options.
384      // Merge one expired stripe with next.
385      si = createStripesWithFiles(notExpired, expired, notExpired, expired, expired, notExpired);
386      verifyWholeStripesCompaction(policy, si, 3, 4, null, 1, Long.MAX_VALUE, false);
387      // Stripe with a subset of expired files is not merged.
388      si = createStripesWithFiles(expired, expired, notExpired, expired, mixed);
389      verifyWholeStripesCompaction(policy, si, 0, 1, null, 1, Long.MAX_VALUE, false);
390    } finally {
391      EnvironmentEdgeManager.reset();
392    }
393  }
394
395  @SuppressWarnings("unchecked")
396  @Test
397  public void testMergeExpiredStripes() throws Exception {
398    // HBASE-11397
399    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
400    long now = defaultTtl + 2;
401    edge.setValue(now);
402    EnvironmentEdgeManager.injectEdge(edge);
403    try {
404      HStoreFile expiredFile = createFile(), notExpiredFile = createFile();
405      when(expiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl - 1);
406      when(notExpiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl + 1);
407      List<HStoreFile> expired = Lists.newArrayList(expiredFile, expiredFile);
408      List<HStoreFile> notExpired = Lists.newArrayList(notExpiredFile, notExpiredFile);
409
410      StripeCompactionPolicy policy =
411          createPolicy(HBaseConfiguration.create(), defaultSplitSize, defaultSplitCount,
412            defaultInitialCount, true);
413
414      // Merge all three expired stripes into one.
415      StripeCompactionPolicy.StripeInformationProvider si =
416          createStripesWithFiles(expired, expired, expired);
417      verifyMergeCompatcion(policy, si, 0, 2);
418
419      // Merge two adjacent expired stripes into one.
420      si = createStripesWithFiles(notExpired, expired, notExpired, expired, expired, notExpired);
421      verifyMergeCompatcion(policy, si, 3, 4);
422    } finally {
423      EnvironmentEdgeManager.reset();
424    }
425  }
426
427  @SuppressWarnings("unchecked")
428  private static StripeCompactionPolicy.StripeInformationProvider createStripesWithFiles(
429      List<HStoreFile>... stripeFiles) throws Exception {
430    return createStripesWithFiles(createBoundaries(stripeFiles.length),
431        Lists.newArrayList(stripeFiles), new ArrayList<>());
432  }
433
434  @Test
435  public void testSingleStripeDropDeletes() throws Exception {
436    Configuration conf = HBaseConfiguration.create();
437    // Test depends on this not being set to pass.  Default breaks test.  TODO: Revisit.
438    conf.unset("hbase.hstore.compaction.min.size");
439    StripeCompactionPolicy policy = createPolicy(conf);
440    // Verify the deletes can be dropped if there are no L0 files.
441    Long[][] stripes = new Long[][] { new Long[] { 3L, 2L, 2L, 2L }, new Long[] { 6L } };
442    StripeInformationProvider si = createStripesWithSizes(0, 0, stripes);
443    verifySingleStripeCompaction(policy, si, 0, true);
444    // But cannot be dropped if there are.
445    si = createStripesWithSizes(2, 2, stripes);
446    verifySingleStripeCompaction(policy, si, 0, false);
447    // Unless there are enough to cause L0 compaction.
448    si = createStripesWithSizes(6, 2, stripes);
449    ConcatenatedLists<HStoreFile> sfs = new ConcatenatedLists<>();
450    sfs.addSublist(si.getLevel0Files());
451    sfs.addSublist(si.getStripes().get(0));
452    verifyCompaction(
453        policy, si, sfs, si.getStartRow(0), si.getEndRow(0), si.getStripeBoundaries());
454    // If we cannot actually compact all files in some stripe, L0 is chosen.
455    si = createStripesWithSizes(6, 2,
456        new Long[][] { new Long[] { 10L, 1L, 1L, 1L, 1L }, new Long[] { 12L } });
457    verifyCompaction(policy, si, si.getLevel0Files(), null, null, si.getStripeBoundaries());
458    // even if L0 has no file
459    // if all files of stripe aren't selected, delete must not be dropped.
460    stripes = new Long[][] { new Long[] { 100L, 3L, 2L, 2L, 2L }, new Long[] { 6L } };
461    si = createStripesWithSizes(0, 0, stripes);
462    List<HStoreFile> compactFile = new ArrayList<>();
463    Iterator<HStoreFile> iter = si.getStripes().get(0).listIterator(1);
464    while (iter.hasNext()) {
465      compactFile.add(iter.next());
466    }
467    verifyCompaction(policy, si, compactFile, false, 1, null, si.getStartRow(0), si.getEndRow(0),
468      true);
469  }
470
471  /********* HELPER METHODS ************/
472  private static StripeCompactionPolicy createPolicy(
473      Configuration conf) throws Exception {
474    return createPolicy(conf, defaultSplitSize, defaultSplitCount, defaultInitialCount, false);
475  }
476
477  private static StripeCompactionPolicy createPolicy(Configuration conf,
478      long splitSize, float splitCount, int initialCount, boolean hasTtl) throws Exception {
479    conf.setLong(StripeStoreConfig.SIZE_TO_SPLIT_KEY, splitSize);
480    conf.setFloat(StripeStoreConfig.SPLIT_PARTS_KEY, splitCount);
481    conf.setInt(StripeStoreConfig.INITIAL_STRIPE_COUNT_KEY, initialCount);
482    StoreConfigInformation sci = mock(StoreConfigInformation.class);
483    when(sci.getStoreFileTtl()).thenReturn(hasTtl ? defaultTtl : Long.MAX_VALUE);
484    when(sci.getRegionInfo()).thenReturn(HRegionInfo.FIRST_META_REGIONINFO);
485    StripeStoreConfig ssc = new StripeStoreConfig(conf, sci);
486    return new StripeCompactionPolicy(conf, sci, ssc);
487  }
488
489  private static ArrayList<HStoreFile> al(HStoreFile... sfs) {
490    return new ArrayList<>(Arrays.asList(sfs));
491  }
492
493  private void verifyMergeCompatcion(StripeCompactionPolicy policy, StripeInformationProvider si,
494      int from, int to) throws Exception {
495    StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
496    Collection<HStoreFile> sfs = getAllFiles(si, from, to);
497    verifyCollectionsEqual(sfs, scr.getRequest().getFiles());
498
499    // All the Stripes are expired, so the Compactor will not create any Writers. We need to create
500    // an empty file to preserve metadata
501    StripeCompactor sc = createCompactor();
502    List<Path> paths = scr.execute(sc, NoLimitThroughputController.INSTANCE, null);
503    assertEquals(1, paths.size());
504  }
505
506  /**
507   * Verify the compaction that includes several entire stripes.
508   * @param policy Policy to test.
509   * @param si Stripe information pre-set with stripes to test.
510   * @param from Starting stripe.
511   * @param to Ending stripe (inclusive).
512   * @param dropDeletes Whether to drop deletes from compaction range.
513   * @param count Expected # of resulting stripes, null if not checked.
514   * @param size Expected target stripe size, null if not checked.
515   */
516  private void verifyWholeStripesCompaction(StripeCompactionPolicy policy,
517      StripeInformationProvider si, int from, int to, Boolean dropDeletes,
518      Integer count, Long size, boolean needsCompaction) throws IOException {
519    verifyCompaction(policy, si, getAllFiles(si, from, to), dropDeletes,
520        count, size, si.getStartRow(from), si.getEndRow(to), needsCompaction);
521  }
522
523  private void verifyWholeStripesCompaction(StripeCompactionPolicy policy,
524      StripeInformationProvider si, int from, int to, Boolean dropDeletes,
525      Integer count, Long size) throws IOException {
526    verifyWholeStripesCompaction(policy, si, from, to, dropDeletes, count, size, true);
527  }
528
529  private void verifySingleStripeCompaction(StripeCompactionPolicy policy,
530      StripeInformationProvider si, int index, Boolean dropDeletes) throws IOException {
531    verifyWholeStripesCompaction(policy, si, index, index, dropDeletes, 1, null, true);
532  }
533
534  /**
535   * Verify no compaction is needed or selected.
536   * @param policy Policy to test.
537   * @param si Stripe information pre-set with stripes to test.
538   */
539  private void verifyNoCompaction(
540      StripeCompactionPolicy policy, StripeInformationProvider si) throws IOException {
541    assertNull(policy.selectCompaction(si, al(), false));
542    assertFalse(policy.needsCompactions(si, al()));
543  }
544
545  /**
546   * Verify arbitrary compaction.
547   * @param policy Policy to test.
548   * @param si Stripe information pre-set with stripes to test.
549   * @param sfs Files that should be compacted.
550   * @param dropDeletesFrom Row from which to drop deletes.
551   * @param dropDeletesTo Row to which to drop deletes.
552   * @param boundaries Expected target stripe boundaries.
553   */
554  private void verifyCompaction(StripeCompactionPolicy policy, StripeInformationProvider si,
555      Collection<HStoreFile> sfs, byte[] dropDeletesFrom, byte[] dropDeletesTo,
556      final List<byte[]> boundaries) throws Exception {
557    StripeCompactor sc = mock(StripeCompactor.class);
558    assertTrue(policy.needsCompactions(si, al()));
559    StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
560    verifyCollectionsEqual(sfs, scr.getRequest().getFiles());
561    scr.execute(sc, NoLimitThroughputController.INSTANCE, null);
562    verify(sc, times(1)).compact(eq(scr.getRequest()), argThat(new ArgumentMatcher<List<byte[]>>() {
563      @Override
564      public boolean matches(List<byte[]> argument) {
565        List<byte[]> other = argument;
566        if (other.size() != boundaries.size()) {
567          return false;
568        }
569        for (int i = 0; i < other.size(); ++i) {
570          if (!Bytes.equals(other.get(i), boundaries.get(i))) {
571            return false;
572          }
573        }
574        return true;
575      }
576    }), dropDeletesFrom == null ? isNull(byte[].class) : aryEq(dropDeletesFrom),
577      dropDeletesTo == null ? isNull(byte[].class) : aryEq(dropDeletesTo),
578      any(), any());
579  }
580
581  /**
582   * Verify arbitrary compaction.
583   * @param policy Policy to test.
584   * @param si Stripe information pre-set with stripes to test.
585   * @param sfs Files that should be compacted.
586   * @param dropDeletes Whether to drop deletes from compaction range.
587   * @param count Expected # of resulting stripes, null if not checked.
588   * @param size Expected target stripe size, null if not checked.
589   * @param start Left boundary of the compaction.
590   * @param end Right boundary of the compaction.
591   */
592  private void verifyCompaction(StripeCompactionPolicy policy, StripeInformationProvider si,
593      Collection<HStoreFile> sfs, Boolean dropDeletes, Integer count, Long size,
594      byte[] start, byte[] end, boolean needsCompaction) throws IOException {
595    StripeCompactor sc = mock(StripeCompactor.class);
596    assertTrue(!needsCompaction || policy.needsCompactions(si, al()));
597    StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
598    verifyCollectionsEqual(sfs, scr.getRequest().getFiles());
599    scr.execute(sc, NoLimitThroughputController.INSTANCE, null);
600    verify(sc, times(1)).compact(eq(scr.getRequest()),
601      count == null ? anyInt() : eq(count.intValue()),
602      size == null ? anyLong() : eq(size.longValue()), aryEq(start), aryEq(end),
603      dropDeletesMatcher(dropDeletes, start), dropDeletesMatcher(dropDeletes, end),
604      any(), any());
605  }
606
607  /** Verify arbitrary flush. */
608  protected void verifyFlush(StripeCompactionPolicy policy, StripeInformationProvider si,
609      KeyValue[] input, KeyValue[][] expected, byte[][] boundaries) throws IOException {
610    StoreFileWritersCapture writers = new StoreFileWritersCapture();
611    StripeStoreFlusher.StripeFlushRequest req =
612      policy.selectFlush(CellComparatorImpl.COMPARATOR, si, input.length);
613    StripeMultiFileWriter mw = req.createWriter();
614    mw.init(null, writers);
615    for (KeyValue kv : input) {
616      mw.append(kv);
617    }
618    boolean hasMetadata = boundaries != null;
619    mw.commitWriters(0, false);
620    writers.verifyKvs(expected, true, hasMetadata);
621    if (hasMetadata) {
622      writers.verifyBoundaries(boundaries);
623    }
624  }
625
626
627  private byte[] dropDeletesMatcher(Boolean dropDeletes, byte[] value) {
628    return dropDeletes == null ? any()
629            : (dropDeletes.booleanValue() ? aryEq(value) : isNull(byte[].class));
630  }
631
632  private void verifyCollectionsEqual(Collection<HStoreFile> sfs, Collection<HStoreFile> scr) {
633    // Dumb.
634    assertEquals(sfs.size(), scr.size());
635    assertTrue(scr.containsAll(sfs));
636  }
637
638  private static List<HStoreFile> getAllFiles(
639      StripeInformationProvider si, int fromStripe, int toStripe) {
640    ArrayList<HStoreFile> expected = new ArrayList<>();
641    for (int i = fromStripe; i <= toStripe; ++i) {
642      expected.addAll(si.getStripes().get(i));
643    }
644    return expected;
645  }
646
647  /**
648   * @param l0Count Number of L0 files.
649   * @param boundaries Target boundaries.
650   * @return Mock stripes.
651   */
652  private static StripeInformationProvider createStripes(
653      int l0Count, byte[]... boundaries) throws Exception {
654    List<Long> l0Sizes = new ArrayList<>();
655    for (int i = 0; i < l0Count; ++i) {
656      l0Sizes.add(5L);
657    }
658    List<List<Long>> sizes = new ArrayList<>();
659    for (int i = 0; i <= boundaries.length; ++i) {
660      sizes.add(Arrays.asList(Long.valueOf(5)));
661    }
662    return createStripes(Arrays.asList(boundaries), sizes, l0Sizes);
663  }
664
665  /**
666   * @param l0Count Number of L0 files.
667   * @param l0Size Size of each file.
668   * @return Mock stripes.
669   */
670  private static StripeInformationProvider createStripesL0Only(
671      int l0Count, long l0Size) throws Exception {
672    List<Long> l0Sizes = new ArrayList<>();
673    for (int i = 0; i < l0Count; ++i) {
674      l0Sizes.add(l0Size);
675    }
676    return createStripes(null, new ArrayList<>(), l0Sizes);
677  }
678
679  /**
680   * @param l0Count Number of L0 files.
681   * @param l0Size Size of each file.
682   * @param sizes Sizes of the files; each sub-array representing a stripe.
683   * @return Mock stripes.
684   */
685  private static StripeInformationProvider createStripesWithSizes(
686      int l0Count, long l0Size, Long[]... sizes) throws Exception {
687    ArrayList<List<Long>> sizeList = new ArrayList<>(sizes.length);
688    for (Long[] size : sizes) {
689      sizeList.add(Arrays.asList(size));
690    }
691    return createStripesWithSizes(l0Count, l0Size, sizeList);
692  }
693
694  private static StripeInformationProvider createStripesWithSizes(
695      int l0Count, long l0Size, List<List<Long>> sizes) throws Exception {
696    List<byte[]> boundaries = createBoundaries(sizes.size());
697    List<Long> l0Sizes = new ArrayList<>();
698    for (int i = 0; i < l0Count; ++i) {
699      l0Sizes.add(l0Size);
700    }
701    return createStripes(boundaries, sizes, l0Sizes);
702  }
703
704  private static List<byte[]> createBoundaries(int stripeCount) {
705    byte[][] keys = new byte[][] { KEY_A, KEY_B, KEY_C, KEY_D, KEY_E };
706    assert stripeCount <= keys.length + 1;
707    List<byte[]> boundaries = new ArrayList<>();
708    boundaries.addAll(Arrays.asList(keys).subList(0, stripeCount - 1));
709    return boundaries;
710  }
711
712  private static StripeInformationProvider createStripes(List<byte[]> boundaries,
713      List<List<Long>> stripeSizes, List<Long> l0Sizes) throws Exception {
714    List<List<HStoreFile>> stripeFiles = new ArrayList<>(stripeSizes.size());
715    for (List<Long> sizes : stripeSizes) {
716      List<HStoreFile> sfs = new ArrayList<>(sizes.size());
717      for (Long size : sizes) {
718        sfs.add(createFile(size));
719      }
720      stripeFiles.add(sfs);
721    }
722    List<HStoreFile> l0Files = new ArrayList<>();
723    for (Long size : l0Sizes) {
724      l0Files.add(createFile(size));
725    }
726    return createStripesWithFiles(boundaries, stripeFiles, l0Files);
727  }
728
729  /**
730   * This method actually does all the work.
731   */
732  private static StripeInformationProvider createStripesWithFiles(List<byte[]> boundaries,
733      List<List<HStoreFile>> stripeFiles, List<HStoreFile> l0Files) throws Exception {
734    ArrayList<ImmutableList<HStoreFile>> stripes = new ArrayList<>();
735    ArrayList<byte[]> boundariesList = new ArrayList<>();
736    StripeInformationProvider si = mock(StripeInformationProvider.class);
737    if (!stripeFiles.isEmpty()) {
738      assert stripeFiles.size() == (boundaries.size() + 1);
739      boundariesList.add(OPEN_KEY);
740      for (int i = 0; i <= boundaries.size(); ++i) {
741        byte[] startKey = ((i == 0) ? OPEN_KEY : boundaries.get(i - 1));
742        byte[] endKey = ((i == boundaries.size()) ? OPEN_KEY : boundaries.get(i));
743        boundariesList.add(endKey);
744        for (HStoreFile sf : stripeFiles.get(i)) {
745          setFileStripe(sf, startKey, endKey);
746        }
747        stripes.add(ImmutableList.copyOf(stripeFiles.get(i)));
748        when(si.getStartRow(eq(i))).thenReturn(startKey);
749        when(si.getEndRow(eq(i))).thenReturn(endKey);
750      }
751    }
752    ConcatenatedLists<HStoreFile> sfs = new ConcatenatedLists<>();
753    sfs.addAllSublists(stripes);
754    sfs.addSublist(l0Files);
755    when(si.getStorefiles()).thenReturn(sfs);
756    when(si.getStripes()).thenReturn(stripes);
757    when(si.getStripeBoundaries()).thenReturn(boundariesList);
758    when(si.getStripeCount()).thenReturn(stripes.size());
759    when(si.getLevel0Files()).thenReturn(l0Files);
760    return si;
761  }
762
763  private static HStoreFile createFile(long size) throws Exception {
764    HStoreFile sf = mock(HStoreFile.class);
765    when(sf.getPath()).thenReturn(new Path("moo"));
766    StoreFileReader r = mock(StoreFileReader.class);
767    when(r.getEntries()).thenReturn(size);
768    when(r.length()).thenReturn(size);
769    when(r.getBloomFilterType()).thenReturn(BloomType.NONE);
770    when(r.getHFileReader()).thenReturn(mock(HFile.Reader.class));
771    when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong(), anyLong(),
772      anyBoolean())).thenReturn(mock(StoreFileScanner.class));
773    when(sf.getReader()).thenReturn(r);
774    when(sf.getBulkLoadTimestamp()).thenReturn(OptionalLong.empty());
775    return sf;
776  }
777
778  private static HStoreFile createFile() throws Exception {
779    return createFile(0);
780  }
781
782  private static void setFileStripe(HStoreFile sf, byte[] startKey, byte[] endKey) {
783    when(sf.getMetadataValue(StripeStoreFileManager.STRIPE_START_KEY)).thenReturn(startKey);
784    when(sf.getMetadataValue(StripeStoreFileManager.STRIPE_END_KEY)).thenReturn(endKey);
785  }
786
787  private StripeCompactor createCompactor() throws Exception {
788    HColumnDescriptor col = new HColumnDescriptor(Bytes.toBytes("foo"));
789    StoreFileWritersCapture writers = new StoreFileWritersCapture();
790    HStore store = mock(HStore.class);
791    HRegionInfo info = mock(HRegionInfo.class);
792    when(info.getRegionNameAsString()).thenReturn("testRegion");
793    when(store.getColumnFamilyDescriptor()).thenReturn(col);
794    when(store.getRegionInfo()).thenReturn(info);
795    when(
796      store.createWriterInTmp(anyLong(), any(), anyBoolean(),
797        anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
798    when(
799      store.createWriterInTmp(anyLong(), any(), anyBoolean(),
800        anyBoolean(), anyBoolean(), anyBoolean(), anyLong(), anyString())).thenAnswer(writers);
801
802    Configuration conf = HBaseConfiguration.create();
803    conf.setBoolean("hbase.regionserver.compaction.private.readers", usePrivateReaders);
804    final Scanner scanner = new Scanner();
805    return new StripeCompactor(conf, store) {
806      @Override
807      protected InternalScanner createScanner(HStore store, ScanInfo scanInfo,
808          List<StoreFileScanner> scanners, long smallestReadPoint, long earliestPutTs,
809          byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
810        return scanner;
811      }
812
813      @Override
814      protected InternalScanner createScanner(HStore store, ScanInfo scanInfo,
815          List<StoreFileScanner> scanners, ScanType scanType, long smallestReadPoint,
816          long earliestPutTs) throws IOException {
817        return scanner;
818      }
819    };
820  }
821
822  private static class Scanner implements InternalScanner {
823    private final ArrayList<KeyValue> kvs;
824
825    public Scanner(KeyValue... kvs) {
826      this.kvs = new ArrayList<>(Arrays.asList(kvs));
827    }
828
829    @Override
830    public boolean next(List<Cell> result, ScannerContext scannerContext)
831        throws IOException {
832      if (kvs.isEmpty()) {
833        return false;
834      }
835      result.add(kvs.remove(0));
836      return !kvs.isEmpty();
837    }
838
839    @Override
840    public void close() throws IOException {
841    }
842  }
843}