001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.compactions;
019
020import static org.apache.hadoop.hbase.regionserver.StripeStoreConfig.MAX_FILES_KEY;
021import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY;
022import static org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_SIZE_KEY;
023import static org.junit.Assert.assertEquals;
024import static org.junit.Assert.assertFalse;
025import static org.junit.Assert.assertNotNull;
026import static org.junit.Assert.assertNull;
027import static org.junit.Assert.assertTrue;
028import static org.mockito.AdditionalMatchers.aryEq;
029import static org.mockito.ArgumentMatchers.any;
030import static org.mockito.ArgumentMatchers.anyBoolean;
031import static org.mockito.ArgumentMatchers.anyInt;
032import static org.mockito.ArgumentMatchers.anyLong;
033import static org.mockito.ArgumentMatchers.argThat;
034import static org.mockito.ArgumentMatchers.eq;
035import static org.mockito.ArgumentMatchers.isNull;
036import static org.mockito.Mockito.mock;
037import static org.mockito.Mockito.only;
038import static org.mockito.Mockito.times;
039import static org.mockito.Mockito.verify;
040import static org.mockito.Mockito.when;
041import java.io.IOException;
042import java.util.ArrayList;
043import java.util.Arrays;
044import java.util.Collection;
045import java.util.Iterator;
046import java.util.List;
047import java.util.OptionalLong;
048import org.apache.hadoop.conf.Configuration;
049import org.apache.hadoop.fs.Path;
050import org.apache.hadoop.hbase.Cell;
051import org.apache.hadoop.hbase.CellComparatorImpl;
052import org.apache.hadoop.hbase.HBaseClassTestRule;
053import org.apache.hadoop.hbase.HBaseConfiguration;
054import org.apache.hadoop.hbase.KeyValue;
055import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
056import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
057import org.apache.hadoop.hbase.client.RegionInfo;
058import org.apache.hadoop.hbase.client.RegionInfoBuilder;
059import org.apache.hadoop.hbase.io.TimeRange;
060import org.apache.hadoop.hbase.io.hfile.HFile;
061import org.apache.hadoop.hbase.regionserver.BloomType;
062import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams;
063import org.apache.hadoop.hbase.regionserver.HStore;
064import org.apache.hadoop.hbase.regionserver.HStoreFile;
065import org.apache.hadoop.hbase.regionserver.InternalScanner;
066import org.apache.hadoop.hbase.regionserver.ScanInfo;
067import org.apache.hadoop.hbase.regionserver.ScanType;
068import org.apache.hadoop.hbase.regionserver.ScannerContext;
069import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
070import org.apache.hadoop.hbase.regionserver.StoreEngine;
071import org.apache.hadoop.hbase.regionserver.StoreFileReader;
072import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
073import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
074import org.apache.hadoop.hbase.regionserver.StripeStoreConfig;
075import org.apache.hadoop.hbase.regionserver.StripeStoreFileManager;
076import org.apache.hadoop.hbase.regionserver.StripeStoreFlusher;
077import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy.StripeInformationProvider;
078import org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.StoreFileWritersCapture;
079import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
080import org.apache.hadoop.hbase.testclassification.MediumTests;
081import org.apache.hadoop.hbase.testclassification.RegionServerTests;
082import org.apache.hadoop.hbase.util.Bytes;
083import org.apache.hadoop.hbase.util.ConcatenatedLists;
084import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
085import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
086import org.junit.ClassRule;
087import org.junit.Test;
088import org.junit.experimental.categories.Category;
089import org.junit.runner.RunWith;
090import org.junit.runners.Parameterized;
091import org.junit.runners.Parameterized.Parameter;
092import org.junit.runners.Parameterized.Parameters;
093import org.mockito.ArgumentMatcher;
094import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
095import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
096
097@RunWith(Parameterized.class)
098@Category({RegionServerTests.class, MediumTests.class})
099public class TestStripeCompactionPolicy {
100
101  @ClassRule
102  public static final HBaseClassTestRule CLASS_RULE =
103      HBaseClassTestRule.forClass(TestStripeCompactionPolicy.class);
104
105  private static final byte[] KEY_A = Bytes.toBytes("aaa");
106  private static final byte[] KEY_B = Bytes.toBytes("bbb");
107  private static final byte[] KEY_C = Bytes.toBytes("ccc");
108  private static final byte[] KEY_D = Bytes.toBytes("ddd");
109  private static final byte[] KEY_E = Bytes.toBytes("eee");
110  private static final KeyValue KV_A = new KeyValue(KEY_A, 0L);
111  private static final KeyValue KV_B = new KeyValue(KEY_B, 0L);
112  private static final KeyValue KV_C = new KeyValue(KEY_C, 0L);
113  private static final KeyValue KV_D = new KeyValue(KEY_D, 0L);
114  private static final KeyValue KV_E = new KeyValue(KEY_E, 0L);
115
116
117  private static long defaultSplitSize = 18;
118  private static float defaultSplitCount = 1.8F;
119  private final static int defaultInitialCount = 1;
120  private static long defaultTtl = 1000 * 1000;
121
122  @Parameters(name = "{index}: usePrivateReaders={0}")
123  public static Iterable<Object[]> data() {
124    return Arrays.asList(new Object[] { true }, new Object[] { false });
125  }
126
127  @Parameter
128  public boolean usePrivateReaders;
129
130  @Test
131  public void testNoStripesFromFlush() throws Exception {
132    Configuration conf = HBaseConfiguration.create();
133    conf.setBoolean(StripeStoreConfig.FLUSH_TO_L0_KEY, true);
134    StripeCompactionPolicy policy = createPolicy(conf);
135    StripeInformationProvider si = createStripesL0Only(0, 0);
136
137    KeyValue[] input = new KeyValue[] { KV_A, KV_B, KV_C, KV_D, KV_E };
138    KeyValue[][] expected = new KeyValue[][] { input };
139    verifyFlush(policy, si, input, expected, null);
140  }
141
142  @Test
143  public void testOldStripesFromFlush() throws Exception {
144    StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create());
145    StripeInformationProvider si = createStripes(0, KEY_C, KEY_D);
146
147    KeyValue[] input = new KeyValue[] { KV_B, KV_C, KV_C, KV_D, KV_E };
148    KeyValue[][] expected = new KeyValue[][] { new KeyValue[] { KV_B },
149      new KeyValue[] { KV_C, KV_C }, new KeyValue[] {  KV_D, KV_E } };
150    verifyFlush(policy, si, input, expected, new byte[][] { OPEN_KEY, KEY_C, KEY_D, OPEN_KEY });
151  }
152
153  @Test
154  public void testNewStripesFromFlush() throws Exception {
155    StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create());
156    StripeInformationProvider si = createStripesL0Only(0, 0);
157    KeyValue[] input = new KeyValue[] { KV_B, KV_C, KV_C, KV_D, KV_E };
158    // Starts with one stripe; unlike flush results, must have metadata
159    KeyValue[][] expected = new KeyValue[][] { input };
160    verifyFlush(policy, si, input, expected, new byte[][] { OPEN_KEY, OPEN_KEY });
161  }
162
163  @Test
164  public void testSingleStripeCompaction() throws Exception {
165    // Create a special policy that only compacts single stripes, using standard methods.
166    Configuration conf = HBaseConfiguration.create();
167    // Test depends on this not being set to pass.  Default breaks test.  TODO: Revisit.
168    conf.unset("hbase.hstore.compaction.min.size");
169    conf.setFloat(CompactionConfiguration.HBASE_HSTORE_COMPACTION_RATIO_KEY, 1.0F);
170    conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 3);
171    conf.setInt(StripeStoreConfig.MAX_FILES_KEY, 4);
172    conf.setLong(StripeStoreConfig.SIZE_TO_SPLIT_KEY, 1000); // make sure the are no splits
173    StoreConfigInformation sci = mock(StoreConfigInformation.class);
174    when(sci.getRegionInfo()).thenReturn(RegionInfoBuilder.FIRST_META_REGIONINFO);
175    StripeStoreConfig ssc = new StripeStoreConfig(conf, sci);
176    StripeCompactionPolicy policy = new StripeCompactionPolicy(conf, sci, ssc) {
177      @Override
178      public StripeCompactionRequest selectCompaction(StripeInformationProvider si,
179          List<HStoreFile> filesCompacting, boolean isOffpeak) throws IOException {
180        if (!filesCompacting.isEmpty()) {
181          return null;
182        }
183        return selectSingleStripeCompaction(si, false, false, isOffpeak);
184      }
185
186      @Override
187      public boolean needsCompactions(
188          StripeInformationProvider si, List<HStoreFile> filesCompacting) {
189        if (!filesCompacting.isEmpty()) {
190          return false;
191        }
192        return needsSingleStripeCompaction(si);
193      }
194    };
195
196    // No compaction due to min files or ratio
197    StripeInformationProvider si = createStripesWithSizes(0, 0,
198        new Long[] { 2L }, new Long[] { 3L, 3L }, new Long[] { 5L, 1L });
199    verifyNoCompaction(policy, si);
200    // No compaction due to min files or ratio - will report needed, but not do any.
201    si = createStripesWithSizes(0, 0,
202        new Long[] { 2L }, new Long[] { 3L, 3L }, new Long[] { 5L, 1L, 1L });
203    assertNull(policy.selectCompaction(si, al(), false));
204    assertTrue(policy.needsCompactions(si, al()));
205    // One stripe has possible compaction
206    si = createStripesWithSizes(0, 0,
207        new Long[] { 2L }, new Long[] { 3L, 3L }, new Long[] { 5L, 4L, 3L });
208    verifySingleStripeCompaction(policy, si, 2, null);
209    // Several stripes have possible compactions; choose best quality (removes most files)
210    si = createStripesWithSizes(0, 0,
211        new Long[] { 3L, 2L, 2L }, new Long[] { 2L, 2L, 1L }, new Long[] { 3L, 2L, 2L, 1L });
212    verifySingleStripeCompaction(policy, si, 2, null);
213    si = createStripesWithSizes(0, 0,
214        new Long[] { 5L }, new Long[] { 3L, 2L, 2L, 1L }, new Long[] { 3L, 2L, 2L });
215    verifySingleStripeCompaction(policy, si, 1, null);
216    // Or with smallest files, if the count is the same
217    si = createStripesWithSizes(0, 0,
218        new Long[] { 3L, 3L, 3L }, new Long[] { 3L, 1L, 2L }, new Long[] { 3L, 2L, 2L });
219    verifySingleStripeCompaction(policy, si, 1, null);
220    // Verify max count is respected.
221    si = createStripesWithSizes(0, 0, new Long[] { 5L }, new Long[] { 5L, 4L, 4L, 4L, 4L });
222    List<HStoreFile> sfs = si.getStripes().get(1).subList(1, 5);
223    verifyCompaction(policy, si, sfs, null, 1, null, si.getStartRow(1), si.getEndRow(1), true);
224    // Verify ratio is applied.
225    si = createStripesWithSizes(0, 0, new Long[] { 5L }, new Long[] { 50L, 4L, 4L, 4L, 4L });
226    sfs = si.getStripes().get(1).subList(1, 5);
227    verifyCompaction(policy, si, sfs, null, 1, null, si.getStartRow(1), si.getEndRow(1), true);
228  }
229
230  @Test
231  public void testWithParallelCompaction() throws Exception {
232    // TODO: currently only one compaction at a time per store is allowed. If this changes,
233    //       the appropriate file exclusion testing would need to be done in respective tests.
234    assertNull(createPolicy(HBaseConfiguration.create()).selectCompaction(
235        mock(StripeInformationProvider.class), al(createFile()), false));
236  }
237
238  @Test
239  public void testWithReferences() throws Exception {
240    StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create());
241    StripeCompactor sc = mock(StripeCompactor.class);
242    HStoreFile ref = createFile();
243    when(ref.isReference()).thenReturn(true);
244    StripeInformationProvider si = mock(StripeInformationProvider.class);
245    Collection<HStoreFile> sfs = al(ref, createFile());
246    when(si.getStorefiles()).thenReturn(sfs);
247
248    assertTrue(policy.needsCompactions(si, al()));
249    StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
250    // UnmodifiableCollection does not implement equals so we need to change it here to a
251    // collection that implements it.
252    assertEquals(si.getStorefiles(), new ArrayList<>(scr.getRequest().getFiles()));
253    scr.execute(sc, NoLimitThroughputController.INSTANCE, null);
254    verify(sc, only()).compact(eq(scr.getRequest()), anyInt(), anyLong(), aryEq(OPEN_KEY),
255      aryEq(OPEN_KEY), aryEq(OPEN_KEY), aryEq(OPEN_KEY),
256      any(), any());
257  }
258
259  @Test
260  public void testInitialCountFromL0() throws Exception {
261    Configuration conf = HBaseConfiguration.create();
262    conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 2);
263    StripeCompactionPolicy policy = createPolicy(
264        conf, defaultSplitSize, defaultSplitCount, 2, false);
265    StripeCompactionPolicy.StripeInformationProvider si = createStripesL0Only(3, 8);
266    verifyCompaction(policy, si, si.getStorefiles(), true, 2, 12L, OPEN_KEY, OPEN_KEY, true);
267    si = createStripesL0Only(3, 10); // If result would be too large, split into smaller parts.
268    verifyCompaction(policy, si, si.getStorefiles(), true, 3, 10L, OPEN_KEY, OPEN_KEY, true);
269    policy = createPolicy(conf, defaultSplitSize, defaultSplitCount, 6, false);
270    verifyCompaction(policy, si, si.getStorefiles(), true, 6, 5L, OPEN_KEY, OPEN_KEY, true);
271  }
272
273  @Test
274  public void testSelectL0Compaction() throws Exception {
275    //test select ALL L0 files when L0 files count > MIN_FILES_L0_KEY
276    Configuration conf = HBaseConfiguration.create();
277    conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 4);
278    StripeCompactionPolicy policy = createPolicy(conf);
279    StripeCompactionPolicy.StripeInformationProvider si = createStripesWithSizes(10, 10L,
280      new Long[] { 5L, 1L, 1L }, new Long[] { 3L, 2L, 2L }, new Long[] { 3L, 2L, 2L });
281    StripeCompactionPolicy.StripeCompactionRequest cr = policy.selectCompaction(si, al(), false);
282    assertNotNull(cr);
283    assertEquals(10, cr.getRequest().getFiles().size());
284    verifyCollectionsEqual(si.getLevel0Files(), cr.getRequest().getFiles());
285
286    // test select partial L0 files when size of L0 files > HBASE_HSTORE_COMPACTION_MAX_SIZE_KEY
287    conf.setLong(HBASE_HSTORE_COMPACTION_MAX_SIZE_KEY, 100L);
288    policy = createPolicy(conf);
289    si = createStripesWithSizes(5, 50L,
290        new Long[] { 5L, 1L, 1L }, new Long[] { 3L, 2L, 2L }, new Long[] { 3L, 2L, 2L });
291    cr = policy.selectCompaction(si, al(), false);
292    assertNotNull(cr);
293    assertEquals(2, cr.getRequest().getFiles().size());
294    verifyCollectionsEqual(si.getLevel0Files().subList(0, 2), cr.getRequest().getFiles());
295
296    // test select partial L0 files when count of L0 files > MAX_FILES_KEY
297    conf.setInt(MAX_FILES_KEY, 6);
298    conf.setLong(HBASE_HSTORE_COMPACTION_MAX_SIZE_KEY, 1000L);
299    policy = createPolicy(conf);
300    si = createStripesWithSizes(10, 10L,
301      new Long[] { 5L, 1L, 1L }, new Long[] { 3L, 2L, 2L }, new Long[] { 3L, 2L, 2L });
302    cr = policy.selectCompaction(si, al(), false);
303    assertNotNull(cr);
304    assertEquals(6, cr.getRequest().getFiles().size());
305    verifyCollectionsEqual(si.getLevel0Files().subList(0, 6), cr.getRequest().getFiles());
306  }
307
308  @Test
309  public void testExistingStripesFromL0() throws Exception {
310    Configuration conf = HBaseConfiguration.create();
311    conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 3);
312    StripeCompactionPolicy.StripeInformationProvider si = createStripes(3, KEY_A);
313    verifyCompaction(
314        createPolicy(conf), si, si.getLevel0Files(), null, null, si.getStripeBoundaries());
315  }
316
317  @Test
318  public void testNothingToCompactFromL0() throws Exception {
319    Configuration conf = HBaseConfiguration.create();
320    conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 4);
321    StripeCompactionPolicy.StripeInformationProvider si = createStripesL0Only(3, 10);
322    StripeCompactionPolicy policy = createPolicy(conf);
323    verifyNoCompaction(policy, si);
324
325    si = createStripes(3, KEY_A);
326    verifyNoCompaction(policy, si);
327  }
328
329  @Test
330  public void testCheckExpiredStripeCompaction() throws Exception {
331    Configuration conf = HBaseConfiguration.create();
332    conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 5);
333    conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 4);
334
335    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
336    long now = defaultTtl + 2;
337    edge.setValue(now);
338    EnvironmentEdgeManager.injectEdge(edge);
339    HStoreFile expiredFile = createFile(10), notExpiredFile = createFile(10);
340    when(expiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl - 1);
341    when(notExpiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl + 1);
342    List<HStoreFile> expired = Lists.newArrayList(expiredFile, expiredFile);
343    List<HStoreFile> mixed = Lists.newArrayList(expiredFile, notExpiredFile);
344
345    StripeCompactionPolicy policy =
346      createPolicy(conf, defaultSplitSize, defaultSplitCount, defaultInitialCount, true);
347    // Merge expired if there are eligible stripes.
348    StripeCompactionPolicy.StripeInformationProvider si =
349      createStripesWithFiles(mixed, mixed, mixed);
350    assertFalse(policy.needsCompactions(si, al()));
351
352    si = createStripesWithFiles(mixed, mixed, mixed, expired);
353    assertFalse(policy.needsSingleStripeCompaction(si));
354    assertTrue(policy.hasExpiredStripes(si));
355    assertTrue(policy.needsCompactions(si, al()));
356  }
357
358  @Test
359  public void testSplitOffStripe() throws Exception {
360    Configuration conf = HBaseConfiguration.create();
361    // Test depends on this not being set to pass.  Default breaks test.  TODO: Revisit.
362    conf.unset("hbase.hstore.compaction.min.size");
363    // First test everything with default split count of 2, then split into more.
364    conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 2);
365    Long[] toSplit = new Long[] { defaultSplitSize - 2, 1L, 1L };
366    Long[] noSplit = new Long[] { defaultSplitSize - 2, 1L };
367    long splitTargetSize = (long)(defaultSplitSize / defaultSplitCount);
368    // Don't split if not eligible for compaction.
369    StripeCompactionPolicy.StripeInformationProvider si =
370        createStripesWithSizes(0, 0, new Long[] { defaultSplitSize - 2, 2L });
371    assertNull(createPolicy(conf).selectCompaction(si, al(), false));
372    // Make sure everything is eligible.
373    conf.setFloat(CompactionConfiguration.HBASE_HSTORE_COMPACTION_RATIO_KEY, 500f);
374    StripeCompactionPolicy policy = createPolicy(conf);
375    verifyWholeStripesCompaction(policy, si, 0, 0, null, 2, splitTargetSize);
376    // Add some extra stripes...
377    si = createStripesWithSizes(0, 0, noSplit, noSplit, toSplit);
378    verifyWholeStripesCompaction(policy, si, 2, 2, null, 2, splitTargetSize);
379    // In the middle.
380    si = createStripesWithSizes(0, 0, noSplit, toSplit, noSplit);
381    verifyWholeStripesCompaction(policy, si, 1, 1, null, 2, splitTargetSize);
382    // No split-off with different config (larger split size).
383    // However, in this case some eligible stripe will just be compacted alone.
384    StripeCompactionPolicy specPolicy = createPolicy(
385        conf, defaultSplitSize + 1, defaultSplitCount, defaultInitialCount, false);
386    verifySingleStripeCompaction(specPolicy, si, 1, null);
387  }
388
389  @Test
390  public void testSplitOffStripeOffPeak() throws Exception {
391    // for HBASE-11439
392    Configuration conf = HBaseConfiguration.create();
393
394    // Test depends on this not being set to pass.  Default breaks test.  TODO: Revisit.
395    conf.unset("hbase.hstore.compaction.min.size");
396
397    conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 2);
398    // Select the last 2 files.
399    StripeCompactionPolicy.StripeInformationProvider si =
400        createStripesWithSizes(0, 0, new Long[] { defaultSplitSize - 2, 1L, 1L });
401    assertEquals(2, createPolicy(conf).selectCompaction(si, al(), false).getRequest().getFiles()
402        .size());
403    // Make sure everything is eligible in offpeak.
404    conf.setFloat("hbase.hstore.compaction.ratio.offpeak", 500f);
405    assertEquals(3, createPolicy(conf).selectCompaction(si, al(), true).getRequest().getFiles()
406        .size());
407  }
408
409  @Test
410  public void testSplitOffStripeDropDeletes() throws Exception {
411    Configuration conf = HBaseConfiguration.create();
412    conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 2);
413    StripeCompactionPolicy policy = createPolicy(conf);
414    Long[] toSplit = new Long[] { defaultSplitSize / 2, defaultSplitSize / 2 };
415    Long[] noSplit = new Long[] { 1L };
416    long splitTargetSize = (long)(defaultSplitSize / defaultSplitCount);
417
418    // Verify the deletes can be dropped if there are no L0 files.
419    StripeCompactionPolicy.StripeInformationProvider si =
420        createStripesWithSizes(0, 0, noSplit, toSplit);
421    verifyWholeStripesCompaction(policy, si, 1, 1,    true, null, splitTargetSize);
422    // But cannot be dropped if there are.
423    si = createStripesWithSizes(2, 2, noSplit, toSplit);
424    verifyWholeStripesCompaction(policy, si, 1, 1,    false, null, splitTargetSize);
425  }
426
427  @SuppressWarnings("unchecked")
428  @Test
429  public void testMergeExpiredFiles() throws Exception {
430    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
431    long now = defaultTtl + 2;
432    edge.setValue(now);
433    EnvironmentEdgeManager.injectEdge(edge);
434    try {
435      HStoreFile expiredFile = createFile(), notExpiredFile = createFile();
436      when(expiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl - 1);
437      when(notExpiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl + 1);
438      List<HStoreFile> expired = Lists.newArrayList(expiredFile, expiredFile);
439      List<HStoreFile> notExpired = Lists.newArrayList(notExpiredFile, notExpiredFile);
440      List<HStoreFile> mixed = Lists.newArrayList(expiredFile, notExpiredFile);
441
442      StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create(),
443          defaultSplitSize, defaultSplitCount, defaultInitialCount, true);
444      // Merge expired if there are eligible stripes.
445      StripeCompactionPolicy.StripeInformationProvider si =
446          createStripesWithFiles(expired, expired, expired);
447      verifyWholeStripesCompaction(policy, si, 0, 2, null, 1, Long.MAX_VALUE, false);
448      // Don't merge if nothing expired.
449      si = createStripesWithFiles(notExpired, notExpired, notExpired);
450      assertNull(policy.selectCompaction(si, al(), false));
451      // Merge one expired stripe with next.
452      si = createStripesWithFiles(notExpired, expired, notExpired);
453      verifyWholeStripesCompaction(policy, si, 1, 2, null, 1, Long.MAX_VALUE, false);
454      // Merge the biggest run out of multiple options.
455      // Merge one expired stripe with next.
456      si = createStripesWithFiles(notExpired, expired, notExpired, expired, expired, notExpired);
457      verifyWholeStripesCompaction(policy, si, 3, 4, null, 1, Long.MAX_VALUE, false);
458      // Stripe with a subset of expired files is not merged.
459      si = createStripesWithFiles(expired, expired, notExpired, expired, mixed);
460      verifyWholeStripesCompaction(policy, si, 0, 1, null, 1, Long.MAX_VALUE, false);
461    } finally {
462      EnvironmentEdgeManager.reset();
463    }
464  }
465
466  @SuppressWarnings("unchecked")
467  @Test
468  public void testMergeExpiredStripes() throws Exception {
469    // HBASE-11397
470    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
471    long now = defaultTtl + 2;
472    edge.setValue(now);
473    EnvironmentEdgeManager.injectEdge(edge);
474    try {
475      HStoreFile expiredFile = createFile(), notExpiredFile = createFile();
476      when(expiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl - 1);
477      when(notExpiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl + 1);
478      List<HStoreFile> expired = Lists.newArrayList(expiredFile, expiredFile);
479      List<HStoreFile> notExpired = Lists.newArrayList(notExpiredFile, notExpiredFile);
480
481      StripeCompactionPolicy policy =
482          createPolicy(HBaseConfiguration.create(), defaultSplitSize, defaultSplitCount,
483            defaultInitialCount, true);
484
485      // Merge all three expired stripes into one.
486      StripeCompactionPolicy.StripeInformationProvider si =
487          createStripesWithFiles(expired, expired, expired);
488      verifyMergeCompatcion(policy, si, 0, 2);
489
490      // Merge two adjacent expired stripes into one.
491      si = createStripesWithFiles(notExpired, expired, notExpired, expired, expired, notExpired);
492      verifyMergeCompatcion(policy, si, 3, 4);
493    } finally {
494      EnvironmentEdgeManager.reset();
495    }
496  }
497
498  @SuppressWarnings("unchecked")
499  private static StripeCompactionPolicy.StripeInformationProvider createStripesWithFiles(
500      List<HStoreFile>... stripeFiles) throws Exception {
501    return createStripesWithFiles(createBoundaries(stripeFiles.length),
502        Lists.newArrayList(stripeFiles), new ArrayList<>());
503  }
504
505  @Test
506  public void testSingleStripeDropDeletes() throws Exception {
507    Configuration conf = HBaseConfiguration.create();
508    // Test depends on this not being set to pass.  Default breaks test.  TODO: Revisit.
509    conf.unset("hbase.hstore.compaction.min.size");
510    StripeCompactionPolicy policy = createPolicy(conf);
511    // Verify the deletes can be dropped if there are no L0 files.
512    Long[][] stripes = new Long[][] { new Long[] { 3L, 2L, 2L, 2L }, new Long[] { 6L } };
513    StripeInformationProvider si = createStripesWithSizes(0, 0, stripes);
514    verifySingleStripeCompaction(policy, si, 0, true);
515    // But cannot be dropped if there are.
516    si = createStripesWithSizes(2, 2, stripes);
517    verifySingleStripeCompaction(policy, si, 0, false);
518    // Unless there are enough to cause L0 compaction.
519    si = createStripesWithSizes(6, 2, stripes);
520    ConcatenatedLists<HStoreFile> sfs = new ConcatenatedLists<>();
521    sfs.addSublist(si.getLevel0Files());
522    sfs.addSublist(si.getStripes().get(0));
523    verifyCompaction(
524        policy, si, sfs, si.getStartRow(0), si.getEndRow(0), si.getStripeBoundaries());
525    // If we cannot actually compact all files in some stripe, L0 is chosen.
526    si = createStripesWithSizes(6, 2,
527        new Long[][] { new Long[] { 10L, 1L, 1L, 1L, 1L }, new Long[] { 12L } });
528    verifyCompaction(policy, si, si.getLevel0Files(), null, null, si.getStripeBoundaries());
529    // even if L0 has no file
530    // if all files of stripe aren't selected, delete must not be dropped.
531    stripes = new Long[][] { new Long[] { 100L, 3L, 2L, 2L, 2L }, new Long[] { 6L } };
532    si = createStripesWithSizes(0, 0, stripes);
533    List<HStoreFile> compactFile = new ArrayList<>();
534    Iterator<HStoreFile> iter = si.getStripes().get(0).listIterator(1);
535    while (iter.hasNext()) {
536      compactFile.add(iter.next());
537    }
538    verifyCompaction(policy, si, compactFile, false, 1, null, si.getStartRow(0), si.getEndRow(0),
539      true);
540  }
541
542  /********* HELPER METHODS ************/
543  private static StripeCompactionPolicy createPolicy(
544      Configuration conf) throws Exception {
545    return createPolicy(conf, defaultSplitSize, defaultSplitCount, defaultInitialCount, false);
546  }
547
548  private static StripeCompactionPolicy createPolicy(Configuration conf,
549      long splitSize, float splitCount, int initialCount, boolean hasTtl) throws Exception {
550    conf.setLong(StripeStoreConfig.SIZE_TO_SPLIT_KEY, splitSize);
551    conf.setFloat(StripeStoreConfig.SPLIT_PARTS_KEY, splitCount);
552    conf.setInt(StripeStoreConfig.INITIAL_STRIPE_COUNT_KEY, initialCount);
553    StoreConfigInformation sci = mock(StoreConfigInformation.class);
554    when(sci.getStoreFileTtl()).thenReturn(hasTtl ? defaultTtl : Long.MAX_VALUE);
555    when(sci.getRegionInfo()).thenReturn(RegionInfoBuilder.FIRST_META_REGIONINFO);
556    StripeStoreConfig ssc = new StripeStoreConfig(conf, sci);
557    return new StripeCompactionPolicy(conf, sci, ssc);
558  }
559
560  private static ArrayList<HStoreFile> al(HStoreFile... sfs) {
561    return new ArrayList<>(Arrays.asList(sfs));
562  }
563
564  private void verifyMergeCompatcion(StripeCompactionPolicy policy, StripeInformationProvider si,
565      int from, int to) throws Exception {
566    StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
567    Collection<HStoreFile> sfs = getAllFiles(si, from, to);
568    verifyCollectionsEqual(sfs, scr.getRequest().getFiles());
569
570    // All the Stripes are expired, so the Compactor will not create any Writers. We need to create
571    // an empty file to preserve metadata
572    StripeCompactor sc = createCompactor();
573    List<Path> paths = scr.execute(sc, NoLimitThroughputController.INSTANCE, null);
574    assertEquals(1, paths.size());
575  }
576
577  /**
578   * Verify the compaction that includes several entire stripes.
579   * @param policy Policy to test.
580   * @param si Stripe information pre-set with stripes to test.
581   * @param from Starting stripe.
582   * @param to Ending stripe (inclusive).
583   * @param dropDeletes Whether to drop deletes from compaction range.
584   * @param count Expected # of resulting stripes, null if not checked.
585   * @param size Expected target stripe size, null if not checked.
586   */
587  private void verifyWholeStripesCompaction(StripeCompactionPolicy policy,
588      StripeInformationProvider si, int from, int to, Boolean dropDeletes,
589      Integer count, Long size, boolean needsCompaction) throws IOException {
590    verifyCompaction(policy, si, getAllFiles(si, from, to), dropDeletes,
591        count, size, si.getStartRow(from), si.getEndRow(to), needsCompaction);
592  }
593
594  private void verifyWholeStripesCompaction(StripeCompactionPolicy policy,
595      StripeInformationProvider si, int from, int to, Boolean dropDeletes,
596      Integer count, Long size) throws IOException {
597    verifyWholeStripesCompaction(policy, si, from, to, dropDeletes, count, size, true);
598  }
599
600  private void verifySingleStripeCompaction(StripeCompactionPolicy policy,
601      StripeInformationProvider si, int index, Boolean dropDeletes) throws IOException {
602    verifyWholeStripesCompaction(policy, si, index, index, dropDeletes, 1, null, true);
603  }
604
605  /**
606   * Verify no compaction is needed or selected.
607   * @param policy Policy to test.
608   * @param si Stripe information pre-set with stripes to test.
609   */
610  private void verifyNoCompaction(
611      StripeCompactionPolicy policy, StripeInformationProvider si) throws IOException {
612    assertNull(policy.selectCompaction(si, al(), false));
613    assertFalse(policy.needsCompactions(si, al()));
614  }
615
616  /**
617   * Verify arbitrary compaction.
618   * @param policy Policy to test.
619   * @param si Stripe information pre-set with stripes to test.
620   * @param sfs Files that should be compacted.
621   * @param dropDeletesFrom Row from which to drop deletes.
622   * @param dropDeletesTo Row to which to drop deletes.
623   * @param boundaries Expected target stripe boundaries.
624   */
625  private void verifyCompaction(StripeCompactionPolicy policy, StripeInformationProvider si,
626      Collection<HStoreFile> sfs, byte[] dropDeletesFrom, byte[] dropDeletesTo,
627      final List<byte[]> boundaries) throws Exception {
628    StripeCompactor sc = mock(StripeCompactor.class);
629    assertTrue(policy.needsCompactions(si, al()));
630    StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
631    verifyCollectionsEqual(sfs, scr.getRequest().getFiles());
632    scr.execute(sc, NoLimitThroughputController.INSTANCE, null);
633    verify(sc, times(1)).compact(eq(scr.getRequest()), argThat(new ArgumentMatcher<List<byte[]>>() {
634      @Override
635      public boolean matches(List<byte[]> argument) {
636        List<byte[]> other = argument;
637        if (other.size() != boundaries.size()) {
638          return false;
639        }
640        for (int i = 0; i < other.size(); ++i) {
641          if (!Bytes.equals(other.get(i), boundaries.get(i))) {
642            return false;
643          }
644        }
645        return true;
646      }
647    }), dropDeletesFrom == null ? isNull(byte[].class) : aryEq(dropDeletesFrom),
648      dropDeletesTo == null ? isNull(byte[].class) : aryEq(dropDeletesTo),
649      any(), any());
650  }
651
652  /**
653   * Verify arbitrary compaction.
654   * @param policy Policy to test.
655   * @param si Stripe information pre-set with stripes to test.
656   * @param sfs Files that should be compacted.
657   * @param dropDeletes Whether to drop deletes from compaction range.
658   * @param count Expected # of resulting stripes, null if not checked.
659   * @param size Expected target stripe size, null if not checked.
660   * @param start Left boundary of the compaction.
661   * @param end Right boundary of the compaction.
662   */
663  private void verifyCompaction(StripeCompactionPolicy policy, StripeInformationProvider si,
664      Collection<HStoreFile> sfs, Boolean dropDeletes, Integer count, Long size,
665      byte[] start, byte[] end, boolean needsCompaction) throws IOException {
666    StripeCompactor sc = mock(StripeCompactor.class);
667    assertTrue(!needsCompaction || policy.needsCompactions(si, al()));
668    StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
669    verifyCollectionsEqual(sfs, scr.getRequest().getFiles());
670    scr.execute(sc, NoLimitThroughputController.INSTANCE, null);
671    verify(sc, times(1)).compact(eq(scr.getRequest()),
672      count == null ? anyInt() : eq(count.intValue()),
673      size == null ? anyLong() : eq(size.longValue()), aryEq(start), aryEq(end),
674      dropDeletesMatcher(dropDeletes, start), dropDeletesMatcher(dropDeletes, end),
675      any(), any());
676  }
677
678  /** Verify arbitrary flush. */
679  protected void verifyFlush(StripeCompactionPolicy policy, StripeInformationProvider si,
680      KeyValue[] input, KeyValue[][] expected, byte[][] boundaries) throws IOException {
681    StoreFileWritersCapture writers = new StoreFileWritersCapture();
682    StripeStoreFlusher.StripeFlushRequest req =
683      policy.selectFlush(CellComparatorImpl.COMPARATOR, si, input.length);
684    StripeMultiFileWriter mw = req.createWriter();
685    mw.init(null, writers);
686    for (KeyValue kv : input) {
687      mw.append(kv);
688    }
689    boolean hasMetadata = boundaries != null;
690    mw.commitWriters(0, false);
691    writers.verifyKvs(expected, true, hasMetadata);
692    if (hasMetadata) {
693      writers.verifyBoundaries(boundaries);
694    }
695  }
696
697
698  private byte[] dropDeletesMatcher(Boolean dropDeletes, byte[] value) {
699    return dropDeletes == null ? any()
700            : (dropDeletes.booleanValue() ? aryEq(value) : isNull(byte[].class));
701  }
702
703  private void verifyCollectionsEqual(Collection<HStoreFile> sfs, Collection<HStoreFile> scr) {
704    // Dumb.
705    assertEquals(sfs.size(), scr.size());
706    assertTrue(scr.containsAll(sfs));
707  }
708
709  private static List<HStoreFile> getAllFiles(
710      StripeInformationProvider si, int fromStripe, int toStripe) {
711    ArrayList<HStoreFile> expected = new ArrayList<>();
712    for (int i = fromStripe; i <= toStripe; ++i) {
713      expected.addAll(si.getStripes().get(i));
714    }
715    return expected;
716  }
717
718  /**
719   * @param l0Count Number of L0 files.
720   * @param boundaries Target boundaries.
721   * @return Mock stripes.
722   */
723  private static StripeInformationProvider createStripes(
724      int l0Count, byte[]... boundaries) throws Exception {
725    List<Long> l0Sizes = new ArrayList<>();
726    for (int i = 0; i < l0Count; ++i) {
727      l0Sizes.add(5L);
728    }
729    List<List<Long>> sizes = new ArrayList<>();
730    for (int i = 0; i <= boundaries.length; ++i) {
731      sizes.add(Arrays.asList(Long.valueOf(5)));
732    }
733    return createStripes(Arrays.asList(boundaries), sizes, l0Sizes);
734  }
735
736  /**
737   * @param l0Count Number of L0 files.
738   * @param l0Size Size of each file.
739   * @return Mock stripes.
740   */
741  private static StripeInformationProvider createStripesL0Only(
742      int l0Count, long l0Size) throws Exception {
743    List<Long> l0Sizes = new ArrayList<>();
744    for (int i = 0; i < l0Count; ++i) {
745      l0Sizes.add(l0Size);
746    }
747    return createStripes(null, new ArrayList<>(), l0Sizes);
748  }
749
750  /**
751   * @param l0Count Number of L0 files.
752   * @param l0Size Size of each file.
753   * @param sizes Sizes of the files; each sub-array representing a stripe.
754   * @return Mock stripes.
755   */
756  private static StripeInformationProvider createStripesWithSizes(
757      int l0Count, long l0Size, Long[]... sizes) throws Exception {
758    ArrayList<List<Long>> sizeList = new ArrayList<>(sizes.length);
759    for (Long[] size : sizes) {
760      sizeList.add(Arrays.asList(size));
761    }
762    return createStripesWithSizes(l0Count, l0Size, sizeList);
763  }
764
765  private static StripeInformationProvider createStripesWithSizes(
766      int l0Count, long l0Size, List<List<Long>> sizes) throws Exception {
767    List<byte[]> boundaries = createBoundaries(sizes.size());
768    List<Long> l0Sizes = new ArrayList<>();
769    for (int i = 0; i < l0Count; ++i) {
770      l0Sizes.add(l0Size);
771    }
772    return createStripes(boundaries, sizes, l0Sizes);
773  }
774
775  private static List<byte[]> createBoundaries(int stripeCount) {
776    byte[][] keys = new byte[][] { KEY_A, KEY_B, KEY_C, KEY_D, KEY_E };
777    assert stripeCount <= keys.length + 1;
778    List<byte[]> boundaries = new ArrayList<>();
779    boundaries.addAll(Arrays.asList(keys).subList(0, stripeCount - 1));
780    return boundaries;
781  }
782
783  private static StripeInformationProvider createStripes(List<byte[]> boundaries,
784      List<List<Long>> stripeSizes, List<Long> l0Sizes) throws Exception {
785    List<List<HStoreFile>> stripeFiles = new ArrayList<>(stripeSizes.size());
786    for (List<Long> sizes : stripeSizes) {
787      List<HStoreFile> sfs = new ArrayList<>(sizes.size());
788      for (Long size : sizes) {
789        sfs.add(createFile(size));
790      }
791      stripeFiles.add(sfs);
792    }
793    List<HStoreFile> l0Files = new ArrayList<>();
794    for (Long size : l0Sizes) {
795      l0Files.add(createFile(size));
796    }
797    return createStripesWithFiles(boundaries, stripeFiles, l0Files);
798  }
799
800  /**
801   * This method actually does all the work.
802   */
803  private static StripeInformationProvider createStripesWithFiles(List<byte[]> boundaries,
804      List<List<HStoreFile>> stripeFiles, List<HStoreFile> l0Files) throws Exception {
805    ArrayList<ImmutableList<HStoreFile>> stripes = new ArrayList<>();
806    ArrayList<byte[]> boundariesList = new ArrayList<>();
807    StripeInformationProvider si = mock(StripeInformationProvider.class);
808    if (!stripeFiles.isEmpty()) {
809      assert stripeFiles.size() == (boundaries.size() + 1);
810      boundariesList.add(OPEN_KEY);
811      for (int i = 0; i <= boundaries.size(); ++i) {
812        byte[] startKey = ((i == 0) ? OPEN_KEY : boundaries.get(i - 1));
813        byte[] endKey = ((i == boundaries.size()) ? OPEN_KEY : boundaries.get(i));
814        boundariesList.add(endKey);
815        for (HStoreFile sf : stripeFiles.get(i)) {
816          setFileStripe(sf, startKey, endKey);
817        }
818        stripes.add(ImmutableList.copyOf(stripeFiles.get(i)));
819        when(si.getStartRow(eq(i))).thenReturn(startKey);
820        when(si.getEndRow(eq(i))).thenReturn(endKey);
821      }
822    }
823    ConcatenatedLists<HStoreFile> sfs = new ConcatenatedLists<>();
824    sfs.addAllSublists(stripes);
825    sfs.addSublist(l0Files);
826    when(si.getStorefiles()).thenReturn(sfs);
827    when(si.getStripes()).thenReturn(stripes);
828    when(si.getStripeBoundaries()).thenReturn(boundariesList);
829    when(si.getStripeCount()).thenReturn(stripes.size());
830    when(si.getLevel0Files()).thenReturn(l0Files);
831    return si;
832  }
833
834  private static HStoreFile createFile(long size) throws Exception {
835    HStoreFile sf = mock(HStoreFile.class);
836    when(sf.getPath()).thenReturn(new Path("moo"));
837    StoreFileReader r = mock(StoreFileReader.class);
838    when(r.getEntries()).thenReturn(size);
839    when(r.length()).thenReturn(size);
840    when(r.getBloomFilterType()).thenReturn(BloomType.NONE);
841    when(r.getHFileReader()).thenReturn(mock(HFile.Reader.class));
842    when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong(), anyLong(),
843      anyBoolean())).thenReturn(mock(StoreFileScanner.class));
844    when(sf.getReader()).thenReturn(r);
845    when(sf.getBulkLoadTimestamp()).thenReturn(OptionalLong.empty());
846    when(r.getMaxTimestamp()).thenReturn(TimeRange.INITIAL_MAX_TIMESTAMP);
847    return sf;
848  }
849
850  private static HStoreFile createFile() throws Exception {
851    return createFile(0);
852  }
853
854  private static void setFileStripe(HStoreFile sf, byte[] startKey, byte[] endKey) {
855    when(sf.getMetadataValue(StripeStoreFileManager.STRIPE_START_KEY)).thenReturn(startKey);
856    when(sf.getMetadataValue(StripeStoreFileManager.STRIPE_END_KEY)).thenReturn(endKey);
857  }
858
859  private StripeCompactor createCompactor() throws Exception {
860    ColumnFamilyDescriptor familyDescriptor =
861      ColumnFamilyDescriptorBuilder.of(Bytes.toBytes("foo"));
862    StoreFileWritersCapture writers = new StoreFileWritersCapture();
863    HStore store = mock(HStore.class);
864    RegionInfo info = mock(RegionInfo.class);
865    when(info.getRegionNameAsString()).thenReturn("testRegion");
866    when(store.getColumnFamilyDescriptor()).thenReturn(familyDescriptor);
867    when(store.getRegionInfo()).thenReturn(info);
868    StoreEngine storeEngine = mock(StoreEngine.class);
869    when(storeEngine.createWriter(any(CreateStoreFileWriterParams.class))).thenAnswer(writers);
870    when(store.getStoreEngine()).thenReturn(storeEngine);
871
872    Configuration conf = HBaseConfiguration.create();
873    conf.setBoolean("hbase.regionserver.compaction.private.readers", usePrivateReaders);
874    final Scanner scanner = new Scanner();
875    return new StripeCompactor(conf, store) {
876      @Override
877      protected InternalScanner createScanner(HStore store, ScanInfo scanInfo,
878          List<StoreFileScanner> scanners, long smallestReadPoint, long earliestPutTs,
879          byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
880        return scanner;
881      }
882
883      @Override
884      protected InternalScanner createScanner(HStore store, ScanInfo scanInfo,
885          List<StoreFileScanner> scanners, ScanType scanType, long smallestReadPoint,
886          long earliestPutTs) throws IOException {
887        return scanner;
888      }
889    };
890  }
891
892  private static class Scanner implements InternalScanner {
893    private final ArrayList<KeyValue> kvs;
894
895    public Scanner(KeyValue... kvs) {
896      this.kvs = new ArrayList<>(Arrays.asList(kvs));
897    }
898
899    @Override
900    public boolean next(List<Cell> result, ScannerContext scannerContext)
901        throws IOException {
902      if (kvs.isEmpty()) {
903        return false;
904      }
905      result.add(kvs.remove(0));
906      return !kvs.isEmpty();
907    }
908
909    @Override
910    public void close() throws IOException {
911    }
912  }
913}