001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.compactions;
019
020import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertNull;
024import static org.junit.Assert.assertTrue;
025import static org.mockito.AdditionalMatchers.aryEq;
026import static org.mockito.Matchers.any;
027import static org.mockito.Matchers.anyBoolean;
028import static org.mockito.Matchers.anyInt;
029import static org.mockito.Matchers.anyLong;
030import static org.mockito.Matchers.argThat;
031import static org.mockito.Matchers.eq;
032import static org.mockito.Matchers.isNull;
033import static org.mockito.Mockito.mock;
034import static org.mockito.Mockito.only;
035import static org.mockito.Mockito.times;
036import static org.mockito.Mockito.verify;
037import static org.mockito.Mockito.when;
038import java.io.IOException;
039import java.util.ArrayList;
040import java.util.Arrays;
041import java.util.Collection;
042import java.util.Iterator;
043import java.util.List;
044import java.util.OptionalLong;
045import org.apache.hadoop.conf.Configuration;
046import org.apache.hadoop.fs.Path;
047import org.apache.hadoop.hbase.Cell;
048import org.apache.hadoop.hbase.CellComparatorImpl;
049import org.apache.hadoop.hbase.HBaseClassTestRule;
050import org.apache.hadoop.hbase.HBaseConfiguration;
051import org.apache.hadoop.hbase.HColumnDescriptor;
052import org.apache.hadoop.hbase.HRegionInfo;
053import org.apache.hadoop.hbase.KeyValue;
054import org.apache.hadoop.hbase.io.hfile.HFile;
055import org.apache.hadoop.hbase.regionserver.BloomType;
056import org.apache.hadoop.hbase.regionserver.HStore;
057import org.apache.hadoop.hbase.regionserver.HStoreFile;
058import org.apache.hadoop.hbase.regionserver.InternalScanner;
059import org.apache.hadoop.hbase.regionserver.ScanInfo;
060import org.apache.hadoop.hbase.regionserver.ScanType;
061import org.apache.hadoop.hbase.regionserver.ScannerContext;
062import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
063import org.apache.hadoop.hbase.regionserver.StoreFileReader;
064import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
065import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
066import org.apache.hadoop.hbase.regionserver.StripeStoreConfig;
067import org.apache.hadoop.hbase.regionserver.StripeStoreFileManager;
068import org.apache.hadoop.hbase.regionserver.StripeStoreFlusher;
069import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy.StripeInformationProvider;
070import org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.StoreFileWritersCapture;
071import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
072import org.apache.hadoop.hbase.testclassification.MediumTests;
073import org.apache.hadoop.hbase.testclassification.RegionServerTests;
074import org.apache.hadoop.hbase.util.Bytes;
075import org.apache.hadoop.hbase.util.ConcatenatedLists;
076import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
077import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
078import org.junit.ClassRule;
079import org.junit.Test;
080import org.junit.experimental.categories.Category;
081import org.junit.runner.RunWith;
082import org.junit.runners.Parameterized;
083import org.junit.runners.Parameterized.Parameter;
084import org.junit.runners.Parameterized.Parameters;
085import org.mockito.ArgumentMatcher;
086import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
087import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
088
089@RunWith(Parameterized.class)
090@Category({RegionServerTests.class, MediumTests.class})
091public class TestStripeCompactionPolicy {
092
093  @ClassRule
094  public static final HBaseClassTestRule CLASS_RULE =
095      HBaseClassTestRule.forClass(TestStripeCompactionPolicy.class);
096
097  private static final byte[] KEY_A = Bytes.toBytes("aaa");
098  private static final byte[] KEY_B = Bytes.toBytes("bbb");
099  private static final byte[] KEY_C = Bytes.toBytes("ccc");
100  private static final byte[] KEY_D = Bytes.toBytes("ddd");
101  private static final byte[] KEY_E = Bytes.toBytes("eee");
102  private static final KeyValue KV_A = new KeyValue(KEY_A, 0L);
103  private static final KeyValue KV_B = new KeyValue(KEY_B, 0L);
104  private static final KeyValue KV_C = new KeyValue(KEY_C, 0L);
105  private static final KeyValue KV_D = new KeyValue(KEY_D, 0L);
106  private static final KeyValue KV_E = new KeyValue(KEY_E, 0L);
107
108
109  private static long defaultSplitSize = 18;
110  private static float defaultSplitCount = 1.8F;
111  private final static int defaultInitialCount = 1;
112  private static long defaultTtl = 1000 * 1000;
113
114  @Parameters(name = "{index}: usePrivateReaders={0}")
115  public static Iterable<Object[]> data() {
116    return Arrays.asList(new Object[] { true }, new Object[] { false });
117  }
118
119  @Parameter
120  public boolean usePrivateReaders;
121
122  @Test
123  public void testNoStripesFromFlush() throws Exception {
124    Configuration conf = HBaseConfiguration.create();
125    conf.setBoolean(StripeStoreConfig.FLUSH_TO_L0_KEY, true);
126    StripeCompactionPolicy policy = createPolicy(conf);
127    StripeInformationProvider si = createStripesL0Only(0, 0);
128
129    KeyValue[] input = new KeyValue[] { KV_A, KV_B, KV_C, KV_D, KV_E };
130    KeyValue[][] expected = new KeyValue[][] { input };
131    verifyFlush(policy, si, input, expected, null);
132  }
133
134  @Test
135  public void testOldStripesFromFlush() throws Exception {
136    StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create());
137    StripeInformationProvider si = createStripes(0, KEY_C, KEY_D);
138
139    KeyValue[] input = new KeyValue[] { KV_B, KV_C, KV_C, KV_D, KV_E };
140    KeyValue[][] expected = new KeyValue[][] { new KeyValue[] { KV_B },
141      new KeyValue[] { KV_C, KV_C }, new KeyValue[] {  KV_D, KV_E } };
142    verifyFlush(policy, si, input, expected, new byte[][] { OPEN_KEY, KEY_C, KEY_D, OPEN_KEY });
143  }
144
145  @Test
146  public void testNewStripesFromFlush() throws Exception {
147    StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create());
148    StripeInformationProvider si = createStripesL0Only(0, 0);
149    KeyValue[] input = new KeyValue[] { KV_B, KV_C, KV_C, KV_D, KV_E };
150    // Starts with one stripe; unlike flush results, must have metadata
151    KeyValue[][] expected = new KeyValue[][] { input };
152    verifyFlush(policy, si, input, expected, new byte[][] { OPEN_KEY, OPEN_KEY });
153  }
154
155  @Test
156  public void testSingleStripeCompaction() throws Exception {
157    // Create a special policy that only compacts single stripes, using standard methods.
158    Configuration conf = HBaseConfiguration.create();
159    // Test depends on this not being set to pass.  Default breaks test.  TODO: Revisit.
160    conf.unset("hbase.hstore.compaction.min.size");
161    conf.setFloat(CompactionConfiguration.HBASE_HSTORE_COMPACTION_RATIO_KEY, 1.0F);
162    conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 3);
163    conf.setInt(StripeStoreConfig.MAX_FILES_KEY, 4);
164    conf.setLong(StripeStoreConfig.SIZE_TO_SPLIT_KEY, 1000); // make sure the are no splits
165    StoreConfigInformation sci = mock(StoreConfigInformation.class);
166    when(sci.getRegionInfo()).thenReturn(HRegionInfo.FIRST_META_REGIONINFO);
167    StripeStoreConfig ssc = new StripeStoreConfig(conf, sci);
168    StripeCompactionPolicy policy = new StripeCompactionPolicy(conf, sci, ssc) {
169      @Override
170      public StripeCompactionRequest selectCompaction(StripeInformationProvider si,
171          List<HStoreFile> filesCompacting, boolean isOffpeak) throws IOException {
172        if (!filesCompacting.isEmpty()) {
173          return null;
174        }
175        return selectSingleStripeCompaction(si, false, false, isOffpeak);
176      }
177
178      @Override
179      public boolean needsCompactions(
180          StripeInformationProvider si, List<HStoreFile> filesCompacting) {
181        if (!filesCompacting.isEmpty()) {
182          return false;
183        }
184        return needsSingleStripeCompaction(si);
185      }
186    };
187
188    // No compaction due to min files or ratio
189    StripeInformationProvider si = createStripesWithSizes(0, 0,
190        new Long[] { 2L }, new Long[] { 3L, 3L }, new Long[] { 5L, 1L });
191    verifyNoCompaction(policy, si);
192    // No compaction due to min files or ratio - will report needed, but not do any.
193    si = createStripesWithSizes(0, 0,
194        new Long[] { 2L }, new Long[] { 3L, 3L }, new Long[] { 5L, 1L, 1L });
195    assertNull(policy.selectCompaction(si, al(), false));
196    assertTrue(policy.needsCompactions(si, al()));
197    // One stripe has possible compaction
198    si = createStripesWithSizes(0, 0,
199        new Long[] { 2L }, new Long[] { 3L, 3L }, new Long[] { 5L, 4L, 3L });
200    verifySingleStripeCompaction(policy, si, 2, null);
201    // Several stripes have possible compactions; choose best quality (removes most files)
202    si = createStripesWithSizes(0, 0,
203        new Long[] { 3L, 2L, 2L }, new Long[] { 2L, 2L, 1L }, new Long[] { 3L, 2L, 2L, 1L });
204    verifySingleStripeCompaction(policy, si, 2, null);
205    si = createStripesWithSizes(0, 0,
206        new Long[] { 5L }, new Long[] { 3L, 2L, 2L, 1L }, new Long[] { 3L, 2L, 2L });
207    verifySingleStripeCompaction(policy, si, 1, null);
208    // Or with smallest files, if the count is the same
209    si = createStripesWithSizes(0, 0,
210        new Long[] { 3L, 3L, 3L }, new Long[] { 3L, 1L, 2L }, new Long[] { 3L, 2L, 2L });
211    verifySingleStripeCompaction(policy, si, 1, null);
212    // Verify max count is respected.
213    si = createStripesWithSizes(0, 0, new Long[] { 5L }, new Long[] { 5L, 4L, 4L, 4L, 4L });
214    List<HStoreFile> sfs = si.getStripes().get(1).subList(1, 5);
215    verifyCompaction(policy, si, sfs, null, 1, null, si.getStartRow(1), si.getEndRow(1), true);
216    // Verify ratio is applied.
217    si = createStripesWithSizes(0, 0, new Long[] { 5L }, new Long[] { 50L, 4L, 4L, 4L, 4L });
218    sfs = si.getStripes().get(1).subList(1, 5);
219    verifyCompaction(policy, si, sfs, null, 1, null, si.getStartRow(1), si.getEndRow(1), true);
220  }
221
222  @Test
223  public void testWithParallelCompaction() throws Exception {
224    // TODO: currently only one compaction at a time per store is allowed. If this changes,
225    //       the appropriate file exclusion testing would need to be done in respective tests.
226    assertNull(createPolicy(HBaseConfiguration.create()).selectCompaction(
227        mock(StripeInformationProvider.class), al(createFile()), false));
228  }
229
230  @Test
231  public void testWithReferences() throws Exception {
232    StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create());
233    StripeCompactor sc = mock(StripeCompactor.class);
234    HStoreFile ref = createFile();
235    when(ref.isReference()).thenReturn(true);
236    StripeInformationProvider si = mock(StripeInformationProvider.class);
237    Collection<HStoreFile> sfs = al(ref, createFile());
238    when(si.getStorefiles()).thenReturn(sfs);
239
240    assertTrue(policy.needsCompactions(si, al()));
241    StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
242    // UnmodifiableCollection does not implement equals so we need to change it here to a
243    // collection that implements it.
244    assertEquals(si.getStorefiles(), new ArrayList<>(scr.getRequest().getFiles()));
245    scr.execute(sc, NoLimitThroughputController.INSTANCE, null);
246    verify(sc, only()).compact(eq(scr.getRequest()), anyInt(), anyLong(), aryEq(OPEN_KEY),
247      aryEq(OPEN_KEY), aryEq(OPEN_KEY), aryEq(OPEN_KEY),
248      any(), any());
249  }
250
251  @Test
252  public void testInitialCountFromL0() throws Exception {
253    Configuration conf = HBaseConfiguration.create();
254    conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 2);
255    StripeCompactionPolicy policy = createPolicy(
256        conf, defaultSplitSize, defaultSplitCount, 2, false);
257    StripeCompactionPolicy.StripeInformationProvider si = createStripesL0Only(3, 8);
258    verifyCompaction(policy, si, si.getStorefiles(), true, 2, 12L, OPEN_KEY, OPEN_KEY, true);
259    si = createStripesL0Only(3, 10); // If result would be too large, split into smaller parts.
260    verifyCompaction(policy, si, si.getStorefiles(), true, 3, 10L, OPEN_KEY, OPEN_KEY, true);
261    policy = createPolicy(conf, defaultSplitSize, defaultSplitCount, 6, false);
262    verifyCompaction(policy, si, si.getStorefiles(), true, 6, 5L, OPEN_KEY, OPEN_KEY, true);
263  }
264
265  @Test
266  public void testExistingStripesFromL0() throws Exception {
267    Configuration conf = HBaseConfiguration.create();
268    conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 3);
269    StripeCompactionPolicy.StripeInformationProvider si = createStripes(3, KEY_A);
270    verifyCompaction(
271        createPolicy(conf), si, si.getLevel0Files(), null, null, si.getStripeBoundaries());
272  }
273
274  @Test
275  public void testNothingToCompactFromL0() throws Exception {
276    Configuration conf = HBaseConfiguration.create();
277    conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 4);
278    StripeCompactionPolicy.StripeInformationProvider si = createStripesL0Only(3, 10);
279    StripeCompactionPolicy policy = createPolicy(conf);
280    verifyNoCompaction(policy, si);
281
282    si = createStripes(3, KEY_A);
283    verifyNoCompaction(policy, si);
284  }
285
286  @Test
287  public void testSplitOffStripe() throws Exception {
288    Configuration conf = HBaseConfiguration.create();
289    // Test depends on this not being set to pass.  Default breaks test.  TODO: Revisit.
290    conf.unset("hbase.hstore.compaction.min.size");
291    // First test everything with default split count of 2, then split into more.
292    conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 2);
293    Long[] toSplit = new Long[] { defaultSplitSize - 2, 1L, 1L };
294    Long[] noSplit = new Long[] { defaultSplitSize - 2, 1L };
295    long splitTargetSize = (long)(defaultSplitSize / defaultSplitCount);
296    // Don't split if not eligible for compaction.
297    StripeCompactionPolicy.StripeInformationProvider si =
298        createStripesWithSizes(0, 0, new Long[] { defaultSplitSize - 2, 2L });
299    assertNull(createPolicy(conf).selectCompaction(si, al(), false));
300    // Make sure everything is eligible.
301    conf.setFloat(CompactionConfiguration.HBASE_HSTORE_COMPACTION_RATIO_KEY, 500f);
302    StripeCompactionPolicy policy = createPolicy(conf);
303    verifyWholeStripesCompaction(policy, si, 0, 0, null, 2, splitTargetSize);
304    // Add some extra stripes...
305    si = createStripesWithSizes(0, 0, noSplit, noSplit, toSplit);
306    verifyWholeStripesCompaction(policy, si, 2, 2, null, 2, splitTargetSize);
307    // In the middle.
308    si = createStripesWithSizes(0, 0, noSplit, toSplit, noSplit);
309    verifyWholeStripesCompaction(policy, si, 1, 1, null, 2, splitTargetSize);
310    // No split-off with different config (larger split size).
311    // However, in this case some eligible stripe will just be compacted alone.
312    StripeCompactionPolicy specPolicy = createPolicy(
313        conf, defaultSplitSize + 1, defaultSplitCount, defaultInitialCount, false);
314    verifySingleStripeCompaction(specPolicy, si, 1, null);
315  }
316
317  @Test
318  public void testSplitOffStripeOffPeak() throws Exception {
319    // for HBASE-11439
320    Configuration conf = HBaseConfiguration.create();
321
322    // Test depends on this not being set to pass.  Default breaks test.  TODO: Revisit.
323    conf.unset("hbase.hstore.compaction.min.size");
324
325    conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 2);
326    // Select the last 2 files.
327    StripeCompactionPolicy.StripeInformationProvider si =
328        createStripesWithSizes(0, 0, new Long[] { defaultSplitSize - 2, 1L, 1L });
329    assertEquals(2, createPolicy(conf).selectCompaction(si, al(), false).getRequest().getFiles()
330        .size());
331    // Make sure everything is eligible in offpeak.
332    conf.setFloat("hbase.hstore.compaction.ratio.offpeak", 500f);
333    assertEquals(3, createPolicy(conf).selectCompaction(si, al(), true).getRequest().getFiles()
334        .size());
335  }
336
337  @Test
338  public void testSplitOffStripeDropDeletes() throws Exception {
339    Configuration conf = HBaseConfiguration.create();
340    conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 2);
341    StripeCompactionPolicy policy = createPolicy(conf);
342    Long[] toSplit = new Long[] { defaultSplitSize / 2, defaultSplitSize / 2 };
343    Long[] noSplit = new Long[] { 1L };
344    long splitTargetSize = (long)(defaultSplitSize / defaultSplitCount);
345
346    // Verify the deletes can be dropped if there are no L0 files.
347    StripeCompactionPolicy.StripeInformationProvider si =
348        createStripesWithSizes(0, 0, noSplit, toSplit);
349    verifyWholeStripesCompaction(policy, si, 1, 1,    true, null, splitTargetSize);
350    // But cannot be dropped if there are.
351    si = createStripesWithSizes(2, 2, noSplit, toSplit);
352    verifyWholeStripesCompaction(policy, si, 1, 1,    false, null, splitTargetSize);
353  }
354
355  @SuppressWarnings("unchecked")
356  @Test
357  public void testMergeExpiredFiles() throws Exception {
358    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
359    long now = defaultTtl + 2;
360    edge.setValue(now);
361    EnvironmentEdgeManager.injectEdge(edge);
362    try {
363      HStoreFile expiredFile = createFile(), notExpiredFile = createFile();
364      when(expiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl - 1);
365      when(notExpiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl + 1);
366      List<HStoreFile> expired = Lists.newArrayList(expiredFile, expiredFile);
367      List<HStoreFile> notExpired = Lists.newArrayList(notExpiredFile, notExpiredFile);
368      List<HStoreFile> mixed = Lists.newArrayList(expiredFile, notExpiredFile);
369
370      StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create(),
371          defaultSplitSize, defaultSplitCount, defaultInitialCount, true);
372      // Merge expired if there are eligible stripes.
373      StripeCompactionPolicy.StripeInformationProvider si =
374          createStripesWithFiles(expired, expired, expired);
375      verifyWholeStripesCompaction(policy, si, 0, 2, null, 1, Long.MAX_VALUE, false);
376      // Don't merge if nothing expired.
377      si = createStripesWithFiles(notExpired, notExpired, notExpired);
378      assertNull(policy.selectCompaction(si, al(), false));
379      // Merge one expired stripe with next.
380      si = createStripesWithFiles(notExpired, expired, notExpired);
381      verifyWholeStripesCompaction(policy, si, 1, 2, null, 1, Long.MAX_VALUE, false);
382      // Merge the biggest run out of multiple options.
383      // Merge one expired stripe with next.
384      si = createStripesWithFiles(notExpired, expired, notExpired, expired, expired, notExpired);
385      verifyWholeStripesCompaction(policy, si, 3, 4, null, 1, Long.MAX_VALUE, false);
386      // Stripe with a subset of expired files is not merged.
387      si = createStripesWithFiles(expired, expired, notExpired, expired, mixed);
388      verifyWholeStripesCompaction(policy, si, 0, 1, null, 1, Long.MAX_VALUE, false);
389    } finally {
390      EnvironmentEdgeManager.reset();
391    }
392  }
393
394  @SuppressWarnings("unchecked")
395  @Test
396  public void testMergeExpiredStripes() throws Exception {
397    // HBASE-11397
398    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
399    long now = defaultTtl + 2;
400    edge.setValue(now);
401    EnvironmentEdgeManager.injectEdge(edge);
402    try {
403      HStoreFile expiredFile = createFile(), notExpiredFile = createFile();
404      when(expiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl - 1);
405      when(notExpiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl + 1);
406      List<HStoreFile> expired = Lists.newArrayList(expiredFile, expiredFile);
407      List<HStoreFile> notExpired = Lists.newArrayList(notExpiredFile, notExpiredFile);
408
409      StripeCompactionPolicy policy =
410          createPolicy(HBaseConfiguration.create(), defaultSplitSize, defaultSplitCount,
411            defaultInitialCount, true);
412
413      // Merge all three expired stripes into one.
414      StripeCompactionPolicy.StripeInformationProvider si =
415          createStripesWithFiles(expired, expired, expired);
416      verifyMergeCompatcion(policy, si, 0, 2);
417
418      // Merge two adjacent expired stripes into one.
419      si = createStripesWithFiles(notExpired, expired, notExpired, expired, expired, notExpired);
420      verifyMergeCompatcion(policy, si, 3, 4);
421    } finally {
422      EnvironmentEdgeManager.reset();
423    }
424  }
425
426  @SuppressWarnings("unchecked")
427  private static StripeCompactionPolicy.StripeInformationProvider createStripesWithFiles(
428      List<HStoreFile>... stripeFiles) throws Exception {
429    return createStripesWithFiles(createBoundaries(stripeFiles.length),
430        Lists.newArrayList(stripeFiles), new ArrayList<>());
431  }
432
433  @Test
434  public void testSingleStripeDropDeletes() throws Exception {
435    Configuration conf = HBaseConfiguration.create();
436    // Test depends on this not being set to pass.  Default breaks test.  TODO: Revisit.
437    conf.unset("hbase.hstore.compaction.min.size");
438    StripeCompactionPolicy policy = createPolicy(conf);
439    // Verify the deletes can be dropped if there are no L0 files.
440    Long[][] stripes = new Long[][] { new Long[] { 3L, 2L, 2L, 2L }, new Long[] { 6L } };
441    StripeInformationProvider si = createStripesWithSizes(0, 0, stripes);
442    verifySingleStripeCompaction(policy, si, 0, true);
443    // But cannot be dropped if there are.
444    si = createStripesWithSizes(2, 2, stripes);
445    verifySingleStripeCompaction(policy, si, 0, false);
446    // Unless there are enough to cause L0 compaction.
447    si = createStripesWithSizes(6, 2, stripes);
448    ConcatenatedLists<HStoreFile> sfs = new ConcatenatedLists<>();
449    sfs.addSublist(si.getLevel0Files());
450    sfs.addSublist(si.getStripes().get(0));
451    verifyCompaction(
452        policy, si, sfs, si.getStartRow(0), si.getEndRow(0), si.getStripeBoundaries());
453    // If we cannot actually compact all files in some stripe, L0 is chosen.
454    si = createStripesWithSizes(6, 2,
455        new Long[][] { new Long[] { 10L, 1L, 1L, 1L, 1L }, new Long[] { 12L } });
456    verifyCompaction(policy, si, si.getLevel0Files(), null, null, si.getStripeBoundaries());
457    // even if L0 has no file
458    // if all files of stripe aren't selected, delete must not be dropped.
459    stripes = new Long[][] { new Long[] { 100L, 3L, 2L, 2L, 2L }, new Long[] { 6L } };
460    si = createStripesWithSizes(0, 0, stripes);
461    List<HStoreFile> compactFile = new ArrayList<>();
462    Iterator<HStoreFile> iter = si.getStripes().get(0).listIterator(1);
463    while (iter.hasNext()) {
464      compactFile.add(iter.next());
465    }
466    verifyCompaction(policy, si, compactFile, false, 1, null, si.getStartRow(0), si.getEndRow(0),
467      true);
468  }
469
470  /********* HELPER METHODS ************/
471  private static StripeCompactionPolicy createPolicy(
472      Configuration conf) throws Exception {
473    return createPolicy(conf, defaultSplitSize, defaultSplitCount, defaultInitialCount, false);
474  }
475
476  private static StripeCompactionPolicy createPolicy(Configuration conf,
477      long splitSize, float splitCount, int initialCount, boolean hasTtl) throws Exception {
478    conf.setLong(StripeStoreConfig.SIZE_TO_SPLIT_KEY, splitSize);
479    conf.setFloat(StripeStoreConfig.SPLIT_PARTS_KEY, splitCount);
480    conf.setInt(StripeStoreConfig.INITIAL_STRIPE_COUNT_KEY, initialCount);
481    StoreConfigInformation sci = mock(StoreConfigInformation.class);
482    when(sci.getStoreFileTtl()).thenReturn(hasTtl ? defaultTtl : Long.MAX_VALUE);
483    when(sci.getRegionInfo()).thenReturn(HRegionInfo.FIRST_META_REGIONINFO);
484    StripeStoreConfig ssc = new StripeStoreConfig(conf, sci);
485    return new StripeCompactionPolicy(conf, sci, ssc);
486  }
487
488  private static ArrayList<HStoreFile> al(HStoreFile... sfs) {
489    return new ArrayList<>(Arrays.asList(sfs));
490  }
491
492  private void verifyMergeCompatcion(StripeCompactionPolicy policy, StripeInformationProvider si,
493      int from, int to) throws Exception {
494    StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
495    Collection<HStoreFile> sfs = getAllFiles(si, from, to);
496    verifyCollectionsEqual(sfs, scr.getRequest().getFiles());
497
498    // All the Stripes are expired, so the Compactor will not create any Writers. We need to create
499    // an empty file to preserve metadata
500    StripeCompactor sc = createCompactor();
501    List<Path> paths = scr.execute(sc, NoLimitThroughputController.INSTANCE, null);
502    assertEquals(1, paths.size());
503  }
504
505  /**
506   * Verify the compaction that includes several entire stripes.
507   * @param policy Policy to test.
508   * @param si Stripe information pre-set with stripes to test.
509   * @param from Starting stripe.
510   * @param to Ending stripe (inclusive).
511   * @param dropDeletes Whether to drop deletes from compaction range.
512   * @param count Expected # of resulting stripes, null if not checked.
513   * @param size Expected target stripe size, null if not checked.
514   */
515  private void verifyWholeStripesCompaction(StripeCompactionPolicy policy,
516      StripeInformationProvider si, int from, int to, Boolean dropDeletes,
517      Integer count, Long size, boolean needsCompaction) throws IOException {
518    verifyCompaction(policy, si, getAllFiles(si, from, to), dropDeletes,
519        count, size, si.getStartRow(from), si.getEndRow(to), needsCompaction);
520  }
521
522  private void verifyWholeStripesCompaction(StripeCompactionPolicy policy,
523      StripeInformationProvider si, int from, int to, Boolean dropDeletes,
524      Integer count, Long size) throws IOException {
525    verifyWholeStripesCompaction(policy, si, from, to, dropDeletes, count, size, true);
526  }
527
528  private void verifySingleStripeCompaction(StripeCompactionPolicy policy,
529      StripeInformationProvider si, int index, Boolean dropDeletes) throws IOException {
530    verifyWholeStripesCompaction(policy, si, index, index, dropDeletes, 1, null, true);
531  }
532
533  /**
534   * Verify no compaction is needed or selected.
535   * @param policy Policy to test.
536   * @param si Stripe information pre-set with stripes to test.
537   */
538  private void verifyNoCompaction(
539      StripeCompactionPolicy policy, StripeInformationProvider si) throws IOException {
540    assertNull(policy.selectCompaction(si, al(), false));
541    assertFalse(policy.needsCompactions(si, al()));
542  }
543
544  /**
545   * Verify arbitrary compaction.
546   * @param policy Policy to test.
547   * @param si Stripe information pre-set with stripes to test.
548   * @param sfs Files that should be compacted.
549   * @param dropDeletesFrom Row from which to drop deletes.
550   * @param dropDeletesTo Row to which to drop deletes.
551   * @param boundaries Expected target stripe boundaries.
552   */
553  private void verifyCompaction(StripeCompactionPolicy policy, StripeInformationProvider si,
554      Collection<HStoreFile> sfs, byte[] dropDeletesFrom, byte[] dropDeletesTo,
555      final List<byte[]> boundaries) throws Exception {
556    StripeCompactor sc = mock(StripeCompactor.class);
557    assertTrue(policy.needsCompactions(si, al()));
558    StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
559    verifyCollectionsEqual(sfs, scr.getRequest().getFiles());
560    scr.execute(sc, NoLimitThroughputController.INSTANCE, null);
561    verify(sc, times(1)).compact(eq(scr.getRequest()), argThat(new ArgumentMatcher<List<byte[]>>() {
562      @Override
563      public boolean matches(List<byte[]> argument) {
564        List<byte[]> other = argument;
565        if (other.size() != boundaries.size()) {
566          return false;
567        }
568        for (int i = 0; i < other.size(); ++i) {
569          if (!Bytes.equals(other.get(i), boundaries.get(i))) {
570            return false;
571          }
572        }
573        return true;
574      }
575    }), dropDeletesFrom == null ? isNull(byte[].class) : aryEq(dropDeletesFrom),
576      dropDeletesTo == null ? isNull(byte[].class) : aryEq(dropDeletesTo),
577      any(), any());
578  }
579
580  /**
581   * Verify arbitrary compaction.
582   * @param policy Policy to test.
583   * @param si Stripe information pre-set with stripes to test.
584   * @param sfs Files that should be compacted.
585   * @param dropDeletes Whether to drop deletes from compaction range.
586   * @param count Expected # of resulting stripes, null if not checked.
587   * @param size Expected target stripe size, null if not checked.
588   * @param start Left boundary of the compaction.
589   * @param end Right boundary of the compaction.
590   */
591  private void verifyCompaction(StripeCompactionPolicy policy, StripeInformationProvider si,
592      Collection<HStoreFile> sfs, Boolean dropDeletes, Integer count, Long size,
593      byte[] start, byte[] end, boolean needsCompaction) throws IOException {
594    StripeCompactor sc = mock(StripeCompactor.class);
595    assertTrue(!needsCompaction || policy.needsCompactions(si, al()));
596    StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
597    verifyCollectionsEqual(sfs, scr.getRequest().getFiles());
598    scr.execute(sc, NoLimitThroughputController.INSTANCE, null);
599    verify(sc, times(1)).compact(eq(scr.getRequest()),
600      count == null ? anyInt() : eq(count.intValue()),
601      size == null ? anyLong() : eq(size.longValue()), aryEq(start), aryEq(end),
602      dropDeletesMatcher(dropDeletes, start), dropDeletesMatcher(dropDeletes, end),
603      any(), any());
604  }
605
606  /** Verify arbitrary flush. */
607  protected void verifyFlush(StripeCompactionPolicy policy, StripeInformationProvider si,
608      KeyValue[] input, KeyValue[][] expected, byte[][] boundaries) throws IOException {
609    StoreFileWritersCapture writers = new StoreFileWritersCapture();
610    StripeStoreFlusher.StripeFlushRequest req =
611      policy.selectFlush(CellComparatorImpl.COMPARATOR, si, input.length);
612    StripeMultiFileWriter mw = req.createWriter();
613    mw.init(null, writers);
614    for (KeyValue kv : input) {
615      mw.append(kv);
616    }
617    boolean hasMetadata = boundaries != null;
618    mw.commitWriters(0, false);
619    writers.verifyKvs(expected, true, hasMetadata);
620    if (hasMetadata) {
621      writers.verifyBoundaries(boundaries);
622    }
623  }
624
625
626  private byte[] dropDeletesMatcher(Boolean dropDeletes, byte[] value) {
627    return dropDeletes == null ? any()
628            : (dropDeletes.booleanValue() ? aryEq(value) : isNull(byte[].class));
629  }
630
631  private void verifyCollectionsEqual(Collection<HStoreFile> sfs, Collection<HStoreFile> scr) {
632    // Dumb.
633    assertEquals(sfs.size(), scr.size());
634    assertTrue(scr.containsAll(sfs));
635  }
636
637  private static List<HStoreFile> getAllFiles(
638      StripeInformationProvider si, int fromStripe, int toStripe) {
639    ArrayList<HStoreFile> expected = new ArrayList<>();
640    for (int i = fromStripe; i <= toStripe; ++i) {
641      expected.addAll(si.getStripes().get(i));
642    }
643    return expected;
644  }
645
646  /**
647   * @param l0Count Number of L0 files.
648   * @param boundaries Target boundaries.
649   * @return Mock stripes.
650   */
651  private static StripeInformationProvider createStripes(
652      int l0Count, byte[]... boundaries) throws Exception {
653    List<Long> l0Sizes = new ArrayList<>();
654    for (int i = 0; i < l0Count; ++i) {
655      l0Sizes.add(5L);
656    }
657    List<List<Long>> sizes = new ArrayList<>();
658    for (int i = 0; i <= boundaries.length; ++i) {
659      sizes.add(Arrays.asList(Long.valueOf(5)));
660    }
661    return createStripes(Arrays.asList(boundaries), sizes, l0Sizes);
662  }
663
664  /**
665   * @param l0Count Number of L0 files.
666   * @param l0Size Size of each file.
667   * @return Mock stripes.
668   */
669  private static StripeInformationProvider createStripesL0Only(
670      int l0Count, long l0Size) throws Exception {
671    List<Long> l0Sizes = new ArrayList<>();
672    for (int i = 0; i < l0Count; ++i) {
673      l0Sizes.add(l0Size);
674    }
675    return createStripes(null, new ArrayList<>(), l0Sizes);
676  }
677
678  /**
679   * @param l0Count Number of L0 files.
680   * @param l0Size Size of each file.
681   * @param sizes Sizes of the files; each sub-array representing a stripe.
682   * @return Mock stripes.
683   */
684  private static StripeInformationProvider createStripesWithSizes(
685      int l0Count, long l0Size, Long[]... sizes) throws Exception {
686    ArrayList<List<Long>> sizeList = new ArrayList<>(sizes.length);
687    for (Long[] size : sizes) {
688      sizeList.add(Arrays.asList(size));
689    }
690    return createStripesWithSizes(l0Count, l0Size, sizeList);
691  }
692
693  private static StripeInformationProvider createStripesWithSizes(
694      int l0Count, long l0Size, List<List<Long>> sizes) throws Exception {
695    List<byte[]> boundaries = createBoundaries(sizes.size());
696    List<Long> l0Sizes = new ArrayList<>();
697    for (int i = 0; i < l0Count; ++i) {
698      l0Sizes.add(l0Size);
699    }
700    return createStripes(boundaries, sizes, l0Sizes);
701  }
702
703  private static List<byte[]> createBoundaries(int stripeCount) {
704    byte[][] keys = new byte[][] { KEY_A, KEY_B, KEY_C, KEY_D, KEY_E };
705    assert stripeCount <= keys.length + 1;
706    List<byte[]> boundaries = new ArrayList<>();
707    boundaries.addAll(Arrays.asList(keys).subList(0, stripeCount - 1));
708    return boundaries;
709  }
710
711  private static StripeInformationProvider createStripes(List<byte[]> boundaries,
712      List<List<Long>> stripeSizes, List<Long> l0Sizes) throws Exception {
713    List<List<HStoreFile>> stripeFiles = new ArrayList<>(stripeSizes.size());
714    for (List<Long> sizes : stripeSizes) {
715      List<HStoreFile> sfs = new ArrayList<>(sizes.size());
716      for (Long size : sizes) {
717        sfs.add(createFile(size));
718      }
719      stripeFiles.add(sfs);
720    }
721    List<HStoreFile> l0Files = new ArrayList<>();
722    for (Long size : l0Sizes) {
723      l0Files.add(createFile(size));
724    }
725    return createStripesWithFiles(boundaries, stripeFiles, l0Files);
726  }
727
728  /**
729   * This method actually does all the work.
730   */
731  private static StripeInformationProvider createStripesWithFiles(List<byte[]> boundaries,
732      List<List<HStoreFile>> stripeFiles, List<HStoreFile> l0Files) throws Exception {
733    ArrayList<ImmutableList<HStoreFile>> stripes = new ArrayList<>();
734    ArrayList<byte[]> boundariesList = new ArrayList<>();
735    StripeInformationProvider si = mock(StripeInformationProvider.class);
736    if (!stripeFiles.isEmpty()) {
737      assert stripeFiles.size() == (boundaries.size() + 1);
738      boundariesList.add(OPEN_KEY);
739      for (int i = 0; i <= boundaries.size(); ++i) {
740        byte[] startKey = ((i == 0) ? OPEN_KEY : boundaries.get(i - 1));
741        byte[] endKey = ((i == boundaries.size()) ? OPEN_KEY : boundaries.get(i));
742        boundariesList.add(endKey);
743        for (HStoreFile sf : stripeFiles.get(i)) {
744          setFileStripe(sf, startKey, endKey);
745        }
746        stripes.add(ImmutableList.copyOf(stripeFiles.get(i)));
747        when(si.getStartRow(eq(i))).thenReturn(startKey);
748        when(si.getEndRow(eq(i))).thenReturn(endKey);
749      }
750    }
751    ConcatenatedLists<HStoreFile> sfs = new ConcatenatedLists<>();
752    sfs.addAllSublists(stripes);
753    sfs.addSublist(l0Files);
754    when(si.getStorefiles()).thenReturn(sfs);
755    when(si.getStripes()).thenReturn(stripes);
756    when(si.getStripeBoundaries()).thenReturn(boundariesList);
757    when(si.getStripeCount()).thenReturn(stripes.size());
758    when(si.getLevel0Files()).thenReturn(l0Files);
759    return si;
760  }
761
762  private static HStoreFile createFile(long size) throws Exception {
763    HStoreFile sf = mock(HStoreFile.class);
764    when(sf.getPath()).thenReturn(new Path("moo"));
765    StoreFileReader r = mock(StoreFileReader.class);
766    when(r.getEntries()).thenReturn(size);
767    when(r.length()).thenReturn(size);
768    when(r.getBloomFilterType()).thenReturn(BloomType.NONE);
769    when(r.getHFileReader()).thenReturn(mock(HFile.Reader.class));
770    when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong(), anyLong(),
771      anyBoolean())).thenReturn(mock(StoreFileScanner.class));
772    when(sf.getReader()).thenReturn(r);
773    when(sf.getBulkLoadTimestamp()).thenReturn(OptionalLong.empty());
774    return sf;
775  }
776
777  private static HStoreFile createFile() throws Exception {
778    return createFile(0);
779  }
780
781  private static void setFileStripe(HStoreFile sf, byte[] startKey, byte[] endKey) {
782    when(sf.getMetadataValue(StripeStoreFileManager.STRIPE_START_KEY)).thenReturn(startKey);
783    when(sf.getMetadataValue(StripeStoreFileManager.STRIPE_END_KEY)).thenReturn(endKey);
784  }
785
786  private StripeCompactor createCompactor() throws Exception {
787    HColumnDescriptor col = new HColumnDescriptor(Bytes.toBytes("foo"));
788    StoreFileWritersCapture writers = new StoreFileWritersCapture();
789    HStore store = mock(HStore.class);
790    HRegionInfo info = mock(HRegionInfo.class);
791    when(info.getRegionNameAsString()).thenReturn("testRegion");
792    when(store.getColumnFamilyDescriptor()).thenReturn(col);
793    when(store.getRegionInfo()).thenReturn(info);
794    when(
795      store.createWriterInTmp(anyLong(), any(), anyBoolean(),
796        anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
797    when(
798      store.createWriterInTmp(anyLong(), any(), anyBoolean(),
799        anyBoolean(), anyBoolean(), anyBoolean(), anyLong())).thenAnswer(writers);
800
801    Configuration conf = HBaseConfiguration.create();
802    conf.setBoolean("hbase.regionserver.compaction.private.readers", usePrivateReaders);
803    final Scanner scanner = new Scanner();
804    return new StripeCompactor(conf, store) {
805      @Override
806      protected InternalScanner createScanner(HStore store, ScanInfo scanInfo,
807          List<StoreFileScanner> scanners, long smallestReadPoint, long earliestPutTs,
808          byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
809        return scanner;
810      }
811
812      @Override
813      protected InternalScanner createScanner(HStore store, ScanInfo scanInfo,
814          List<StoreFileScanner> scanners, ScanType scanType, long smallestReadPoint,
815          long earliestPutTs) throws IOException {
816        return scanner;
817      }
818    };
819  }
820
821  private static class Scanner implements InternalScanner {
822    private final ArrayList<KeyValue> kvs;
823
824    public Scanner(KeyValue... kvs) {
825      this.kvs = new ArrayList<>(Arrays.asList(kvs));
826    }
827
828    @Override
829    public boolean next(List<Cell> result, ScannerContext scannerContext)
830        throws IOException {
831      if (kvs.isEmpty()) {
832        return false;
833      }
834      result.add(kvs.remove(0));
835      return !kvs.isEmpty();
836    }
837
838    @Override
839    public void close() throws IOException {
840    }
841  }
842}