001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.compactions; 019 020import static org.apache.hadoop.hbase.regionserver.StripeStoreConfig.MAX_FILES_KEY; 021import static org.apache.hadoop.hbase.regionserver.StripeStoreConfig.MIN_FILES_KEY; 022import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY; 023import static org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_SIZE_KEY; 024import static org.junit.Assert.assertEquals; 025import static org.junit.Assert.assertFalse; 026import static org.junit.Assert.assertNotNull; 027import static org.junit.Assert.assertNull; 028import static org.junit.Assert.assertTrue; 029import static org.mockito.AdditionalMatchers.aryEq; 030import static org.mockito.ArgumentMatchers.any; 031import static org.mockito.ArgumentMatchers.anyBoolean; 032import static org.mockito.ArgumentMatchers.anyInt; 033import static org.mockito.ArgumentMatchers.anyLong; 034import static org.mockito.ArgumentMatchers.argThat; 035import static org.mockito.ArgumentMatchers.eq; 036import static org.mockito.ArgumentMatchers.isNull; 037import static org.mockito.Mockito.mock; 038import static org.mockito.Mockito.only; 039import static org.mockito.Mockito.times; 040import static org.mockito.Mockito.verify; 041import static org.mockito.Mockito.when; 042 043import java.io.IOException; 044import java.util.ArrayList; 045import java.util.Arrays; 046import java.util.Collection; 047import java.util.Iterator; 048import java.util.List; 049import java.util.OptionalLong; 050import org.apache.hadoop.conf.Configuration; 051import org.apache.hadoop.fs.Path; 052import org.apache.hadoop.hbase.Cell; 053import org.apache.hadoop.hbase.CellComparatorImpl; 054import org.apache.hadoop.hbase.HBaseClassTestRule; 055import org.apache.hadoop.hbase.HBaseConfiguration; 056import org.apache.hadoop.hbase.HColumnDescriptor; 057import org.apache.hadoop.hbase.HRegionInfo; 058import org.apache.hadoop.hbase.KeyValue; 059import org.apache.hadoop.hbase.io.TimeRange; 060import org.apache.hadoop.hbase.io.hfile.HFile; 061import org.apache.hadoop.hbase.regionserver.BloomType; 062import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams; 063import org.apache.hadoop.hbase.regionserver.HStore; 064import org.apache.hadoop.hbase.regionserver.HStoreFile; 065import org.apache.hadoop.hbase.regionserver.InternalScanner; 066import org.apache.hadoop.hbase.regionserver.ScanInfo; 067import org.apache.hadoop.hbase.regionserver.ScanType; 068import org.apache.hadoop.hbase.regionserver.ScannerContext; 069import org.apache.hadoop.hbase.regionserver.StoreConfigInformation; 070import org.apache.hadoop.hbase.regionserver.StoreEngine; 071import org.apache.hadoop.hbase.regionserver.StoreFileReader; 072import org.apache.hadoop.hbase.regionserver.StoreFileScanner; 073import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter; 074import org.apache.hadoop.hbase.regionserver.StripeStoreConfig; 075import org.apache.hadoop.hbase.regionserver.StripeStoreFileManager; 076import org.apache.hadoop.hbase.regionserver.StripeStoreFlusher; 077import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy.StripeInformationProvider; 078import org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.StoreFileWritersCapture; 079import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; 080import org.apache.hadoop.hbase.testclassification.MediumTests; 081import org.apache.hadoop.hbase.testclassification.RegionServerTests; 082import org.apache.hadoop.hbase.util.Bytes; 083import org.apache.hadoop.hbase.util.ConcatenatedLists; 084import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 085import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; 086import org.junit.ClassRule; 087import org.junit.Test; 088import org.junit.experimental.categories.Category; 089import org.junit.runner.RunWith; 090import org.junit.runners.Parameterized; 091import org.junit.runners.Parameterized.Parameter; 092import org.junit.runners.Parameterized.Parameters; 093import org.mockito.ArgumentMatcher; 094 095import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 096import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 097 098@RunWith(Parameterized.class) 099@Category({ RegionServerTests.class, MediumTests.class }) 100public class TestStripeCompactionPolicy { 101 102 @ClassRule 103 public static final HBaseClassTestRule CLASS_RULE = 104 HBaseClassTestRule.forClass(TestStripeCompactionPolicy.class); 105 106 private static final byte[] KEY_A = Bytes.toBytes("aaa"); 107 private static final byte[] KEY_B = Bytes.toBytes("bbb"); 108 private static final byte[] KEY_C = Bytes.toBytes("ccc"); 109 private static final byte[] KEY_D = Bytes.toBytes("ddd"); 110 private static final byte[] KEY_E = Bytes.toBytes("eee"); 111 private static final KeyValue KV_A = new KeyValue(KEY_A, 0L); 112 private static final KeyValue KV_B = new KeyValue(KEY_B, 0L); 113 private static final KeyValue KV_C = new KeyValue(KEY_C, 0L); 114 private static final KeyValue KV_D = new KeyValue(KEY_D, 0L); 115 private static final KeyValue KV_E = new KeyValue(KEY_E, 0L); 116 117 private static long defaultSplitSize = 18; 118 private static float defaultSplitCount = 1.8F; 119 private final static int defaultInitialCount = 1; 120 private static long defaultTtl = 1000 * 1000; 121 122 @Parameters(name = "{index}: usePrivateReaders={0}") 123 public static Iterable<Object[]> data() { 124 return Arrays.asList(new Object[] { true }, new Object[] { false }); 125 } 126 127 @Parameter 128 public boolean usePrivateReaders; 129 130 @Test 131 public void testNoStripesFromFlush() throws Exception { 132 Configuration conf = HBaseConfiguration.create(); 133 conf.setBoolean(StripeStoreConfig.FLUSH_TO_L0_KEY, true); 134 StripeCompactionPolicy policy = createPolicy(conf); 135 StripeInformationProvider si = createStripesL0Only(0, 0); 136 137 KeyValue[] input = new KeyValue[] { KV_A, KV_B, KV_C, KV_D, KV_E }; 138 KeyValue[][] expected = new KeyValue[][] { input }; 139 verifyFlush(policy, si, input, expected, null); 140 } 141 142 @Test 143 public void testOldStripesFromFlush() throws Exception { 144 StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create()); 145 StripeInformationProvider si = createStripes(0, KEY_C, KEY_D); 146 147 KeyValue[] input = new KeyValue[] { KV_B, KV_C, KV_C, KV_D, KV_E }; 148 KeyValue[][] expected = new KeyValue[][] { new KeyValue[] { KV_B }, 149 new KeyValue[] { KV_C, KV_C }, new KeyValue[] { KV_D, KV_E } }; 150 verifyFlush(policy, si, input, expected, new byte[][] { OPEN_KEY, KEY_C, KEY_D, OPEN_KEY }); 151 } 152 153 @Test 154 public void testNewStripesFromFlush() throws Exception { 155 StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create()); 156 StripeInformationProvider si = createStripesL0Only(0, 0); 157 KeyValue[] input = new KeyValue[] { KV_B, KV_C, KV_C, KV_D, KV_E }; 158 // Starts with one stripe; unlike flush results, must have metadata 159 KeyValue[][] expected = new KeyValue[][] { input }; 160 verifyFlush(policy, si, input, expected, new byte[][] { OPEN_KEY, OPEN_KEY }); 161 } 162 163 @Test 164 public void testSingleStripeCompaction() throws Exception { 165 // Create a special policy that only compacts single stripes, using standard methods. 166 Configuration conf = HBaseConfiguration.create(); 167 // Test depends on this not being set to pass. Default breaks test. TODO: Revisit. 168 conf.unset("hbase.hstore.compaction.min.size"); 169 conf.setFloat(CompactionConfiguration.HBASE_HSTORE_COMPACTION_RATIO_KEY, 1.0F); 170 conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 3); 171 conf.setInt(StripeStoreConfig.MAX_FILES_KEY, 4); 172 conf.setLong(StripeStoreConfig.SIZE_TO_SPLIT_KEY, 1000); // make sure the are no splits 173 StoreConfigInformation sci = mock(StoreConfigInformation.class); 174 when(sci.getRegionInfo()).thenReturn(HRegionInfo.FIRST_META_REGIONINFO); 175 StripeStoreConfig ssc = new StripeStoreConfig(conf, sci); 176 StripeCompactionPolicy policy = new StripeCompactionPolicy(conf, sci, ssc) { 177 @Override 178 public StripeCompactionRequest selectCompaction(StripeInformationProvider si, 179 List<HStoreFile> filesCompacting, boolean isOffpeak) throws IOException { 180 if (!filesCompacting.isEmpty()) { 181 return null; 182 } 183 return selectSingleStripeCompaction(si, false, false, isOffpeak); 184 } 185 186 @Override 187 public boolean needsCompactions(StripeInformationProvider si, 188 List<HStoreFile> filesCompacting) { 189 if (!filesCompacting.isEmpty()) { 190 return false; 191 } 192 return needsSingleStripeCompaction(si); 193 } 194 }; 195 196 // No compaction due to min files or ratio 197 StripeInformationProvider si = 198 createStripesWithSizes(0, 0, new Long[] { 2L }, new Long[] { 3L, 3L }, new Long[] { 5L, 1L }); 199 verifyNoCompaction(policy, si); 200 // No compaction due to min files or ratio - will report needed, but not do any. 201 si = createStripesWithSizes(0, 0, new Long[] { 2L }, new Long[] { 3L, 3L }, 202 new Long[] { 5L, 1L, 1L }); 203 assertNull(policy.selectCompaction(si, al(), false)); 204 assertTrue(policy.needsCompactions(si, al())); 205 // One stripe has possible compaction 206 si = createStripesWithSizes(0, 0, new Long[] { 2L }, new Long[] { 3L, 3L }, 207 new Long[] { 5L, 4L, 3L }); 208 verifySingleStripeCompaction(policy, si, 2, null); 209 // Several stripes have possible compactions; choose best quality (removes most files) 210 si = createStripesWithSizes(0, 0, new Long[] { 3L, 2L, 2L }, new Long[] { 2L, 2L, 1L }, 211 new Long[] { 3L, 2L, 2L, 1L }); 212 verifySingleStripeCompaction(policy, si, 2, null); 213 si = createStripesWithSizes(0, 0, new Long[] { 5L }, new Long[] { 3L, 2L, 2L, 1L }, 214 new Long[] { 3L, 2L, 2L }); 215 verifySingleStripeCompaction(policy, si, 1, null); 216 // Or with smallest files, if the count is the same 217 si = createStripesWithSizes(0, 0, new Long[] { 3L, 3L, 3L }, new Long[] { 3L, 1L, 2L }, 218 new Long[] { 3L, 2L, 2L }); 219 verifySingleStripeCompaction(policy, si, 1, null); 220 // Verify max count is respected. 221 si = createStripesWithSizes(0, 0, new Long[] { 5L }, new Long[] { 5L, 4L, 4L, 4L, 4L }); 222 List<HStoreFile> sfs = si.getStripes().get(1).subList(1, 5); 223 verifyCompaction(policy, si, sfs, null, 1, null, si.getStartRow(1), si.getEndRow(1), true); 224 // Verify ratio is applied. 225 si = createStripesWithSizes(0, 0, new Long[] { 5L }, new Long[] { 50L, 4L, 4L, 4L, 4L }); 226 sfs = si.getStripes().get(1).subList(1, 5); 227 verifyCompaction(policy, si, sfs, null, 1, null, si.getStartRow(1), si.getEndRow(1), true); 228 } 229 230 @Test 231 public void testWithParallelCompaction() throws Exception { 232 // TODO: currently only one compaction at a time per store is allowed. If this changes, 233 // the appropriate file exclusion testing would need to be done in respective tests. 234 assertNull(createPolicy(HBaseConfiguration.create()) 235 .selectCompaction(mock(StripeInformationProvider.class), al(createFile()), false)); 236 } 237 238 @Test 239 public void testWithReferences() throws Exception { 240 StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create()); 241 StripeCompactor sc = mock(StripeCompactor.class); 242 HStoreFile ref = createFile(); 243 when(ref.isReference()).thenReturn(true); 244 StripeInformationProvider si = mock(StripeInformationProvider.class); 245 Collection<HStoreFile> sfs = al(ref, createFile()); 246 when(si.getStorefiles()).thenReturn(sfs); 247 248 assertTrue(policy.needsCompactions(si, al())); 249 StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false); 250 // UnmodifiableCollection does not implement equals so we need to change it here to a 251 // collection that implements it. 252 assertEquals(si.getStorefiles(), new ArrayList<>(scr.getRequest().getFiles())); 253 scr.execute(sc, NoLimitThroughputController.INSTANCE, null); 254 verify(sc, only()).compact(eq(scr.getRequest()), anyInt(), anyLong(), aryEq(OPEN_KEY), 255 aryEq(OPEN_KEY), aryEq(OPEN_KEY), aryEq(OPEN_KEY), any(), any()); 256 } 257 258 @Test 259 public void testInitialCountFromL0() throws Exception { 260 Configuration conf = HBaseConfiguration.create(); 261 conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 2); 262 StripeCompactionPolicy policy = 263 createPolicy(conf, defaultSplitSize, defaultSplitCount, 2, false); 264 StripeCompactionPolicy.StripeInformationProvider si = createStripesL0Only(3, 8); 265 verifyCompaction(policy, si, si.getStorefiles(), true, 2, 12L, OPEN_KEY, OPEN_KEY, true); 266 si = createStripesL0Only(3, 10); // If result would be too large, split into smaller parts. 267 verifyCompaction(policy, si, si.getStorefiles(), true, 3, 10L, OPEN_KEY, OPEN_KEY, true); 268 policy = createPolicy(conf, defaultSplitSize, defaultSplitCount, 6, false); 269 verifyCompaction(policy, si, si.getStorefiles(), true, 6, 5L, OPEN_KEY, OPEN_KEY, true); 270 } 271 272 @Test 273 public void testSelectL0Compaction() throws Exception { 274 // test select ALL L0 files when L0 files count > MIN_FILES_L0_KEY 275 Configuration conf = HBaseConfiguration.create(); 276 conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 4); 277 StripeCompactionPolicy policy = createPolicy(conf); 278 StripeCompactionPolicy.StripeInformationProvider si = createStripesWithSizes(10, 10L, 279 new Long[] { 5L, 1L, 1L }, new Long[] { 3L, 2L, 2L }, new Long[] { 3L, 2L, 2L }); 280 StripeCompactionPolicy.StripeCompactionRequest cr = policy.selectCompaction(si, al(), false); 281 assertNotNull(cr); 282 assertEquals(10, cr.getRequest().getFiles().size()); 283 verifyCollectionsEqual(si.getLevel0Files(), cr.getRequest().getFiles()); 284 285 // test select partial L0 files when size of L0 files > HBASE_HSTORE_COMPACTION_MAX_SIZE_KEY 286 conf.setLong(HBASE_HSTORE_COMPACTION_MAX_SIZE_KEY, 100L); 287 policy = createPolicy(conf); 288 si = createStripesWithSizes(5, 50L, new Long[] { 5L, 1L, 1L }, new Long[] { 3L, 2L, 2L }, 289 new Long[] { 3L, 2L, 2L }); 290 cr = policy.selectCompaction(si, al(), false); 291 assertNotNull(cr); 292 assertEquals(2, cr.getRequest().getFiles().size()); 293 verifyCollectionsEqual(si.getLevel0Files().subList(0, 2), cr.getRequest().getFiles()); 294 295 // test select partial L0 files when count of L0 files > MAX_FILES_KEY 296 conf.setInt(MAX_FILES_KEY, 6); 297 conf.setLong(HBASE_HSTORE_COMPACTION_MAX_SIZE_KEY, 1000L); 298 policy = createPolicy(conf); 299 si = createStripesWithSizes(10, 10L, new Long[] { 5L, 1L, 1L }, new Long[] { 3L, 2L, 2L }, 300 new Long[] { 3L, 2L, 2L }); 301 cr = policy.selectCompaction(si, al(), false); 302 assertNotNull(cr); 303 assertEquals(6, cr.getRequest().getFiles().size()); 304 verifyCollectionsEqual(si.getLevel0Files().subList(0, 6), cr.getRequest().getFiles()); 305 } 306 307 @Test 308 public void testExistingStripesFromL0() throws Exception { 309 Configuration conf = HBaseConfiguration.create(); 310 conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 3); 311 StripeCompactionPolicy.StripeInformationProvider si = createStripes(3, KEY_A); 312 verifyCompaction(createPolicy(conf), si, si.getLevel0Files(), null, null, 313 si.getStripeBoundaries()); 314 } 315 316 @Test 317 public void testNothingToCompactFromL0() throws Exception { 318 Configuration conf = HBaseConfiguration.create(); 319 conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 4); 320 StripeCompactionPolicy.StripeInformationProvider si = createStripesL0Only(3, 10); 321 StripeCompactionPolicy policy = createPolicy(conf); 322 verifyNoCompaction(policy, si); 323 324 si = createStripes(3, KEY_A); 325 verifyNoCompaction(policy, si); 326 } 327 328 @Test 329 public void testCheckExpiredStripeCompaction() throws Exception { 330 Configuration conf = HBaseConfiguration.create(); 331 conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 5); 332 conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 4); 333 334 ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); 335 long now = defaultTtl + 2; 336 edge.setValue(now); 337 EnvironmentEdgeManager.injectEdge(edge); 338 HStoreFile expiredFile = createFile(10), notExpiredFile = createFile(10); 339 when(expiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl - 1); 340 when(notExpiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl + 1); 341 List<HStoreFile> expired = Lists.newArrayList(expiredFile, expiredFile); 342 List<HStoreFile> mixed = Lists.newArrayList(expiredFile, notExpiredFile); 343 344 StripeCompactionPolicy policy = 345 createPolicy(conf, defaultSplitSize, defaultSplitCount, defaultInitialCount, true); 346 // Merge expired if there are eligible stripes. 347 StripeCompactionPolicy.StripeInformationProvider si = 348 createStripesWithFiles(mixed, mixed, mixed); 349 assertFalse(policy.needsCompactions(si, al())); 350 351 si = createStripesWithFiles(mixed, mixed, mixed, expired); 352 assertFalse(policy.needsSingleStripeCompaction(si)); 353 assertTrue(policy.hasExpiredStripes(si)); 354 assertTrue(policy.needsCompactions(si, al())); 355 } 356 357 @Test 358 public void testSplitOffStripe() throws Exception { 359 Configuration conf = HBaseConfiguration.create(); 360 // Test depends on this not being set to pass. Default breaks test. TODO: Revisit. 361 conf.unset("hbase.hstore.compaction.min.size"); 362 // First test everything with default split count of 2, then split into more. 363 conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 2); 364 Long[] toSplit = new Long[] { defaultSplitSize - 2, 1L, 1L }; 365 Long[] noSplit = new Long[] { defaultSplitSize - 2, 1L }; 366 long splitTargetSize = (long) (defaultSplitSize / defaultSplitCount); 367 // Don't split if not eligible for compaction. 368 StripeCompactionPolicy.StripeInformationProvider si = 369 createStripesWithSizes(0, 0, new Long[] { defaultSplitSize - 2, 2L }); 370 assertNull(createPolicy(conf).selectCompaction(si, al(), false)); 371 // Make sure everything is eligible. 372 conf.setFloat(CompactionConfiguration.HBASE_HSTORE_COMPACTION_RATIO_KEY, 500f); 373 StripeCompactionPolicy policy = createPolicy(conf); 374 verifyWholeStripesCompaction(policy, si, 0, 0, null, 2, splitTargetSize); 375 // Add some extra stripes... 376 si = createStripesWithSizes(0, 0, noSplit, noSplit, toSplit); 377 verifyWholeStripesCompaction(policy, si, 2, 2, null, 2, splitTargetSize); 378 // In the middle. 379 si = createStripesWithSizes(0, 0, noSplit, toSplit, noSplit); 380 verifyWholeStripesCompaction(policy, si, 1, 1, null, 2, splitTargetSize); 381 // No split-off with different config (larger split size). 382 // However, in this case some eligible stripe will just be compacted alone. 383 StripeCompactionPolicy specPolicy = 384 createPolicy(conf, defaultSplitSize + 1, defaultSplitCount, defaultInitialCount, false); 385 verifySingleStripeCompaction(specPolicy, si, 1, null); 386 } 387 388 @Test 389 public void testSplitOffStripeOffPeak() throws Exception { 390 // for HBASE-11439 391 Configuration conf = HBaseConfiguration.create(); 392 393 // Test depends on this not being set to pass. Default breaks test. TODO: Revisit. 394 conf.unset("hbase.hstore.compaction.min.size"); 395 396 conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 2); 397 // Select the last 2 files. 398 StripeCompactionPolicy.StripeInformationProvider si = 399 createStripesWithSizes(0, 0, new Long[] { defaultSplitSize - 2, 1L, 1L }); 400 assertEquals(2, 401 createPolicy(conf).selectCompaction(si, al(), false).getRequest().getFiles().size()); 402 // Make sure everything is eligible in offpeak. 403 conf.setFloat("hbase.hstore.compaction.ratio.offpeak", 500f); 404 assertEquals(3, 405 createPolicy(conf).selectCompaction(si, al(), true).getRequest().getFiles().size()); 406 } 407 408 @Test 409 public void testSplitOffStripeDropDeletes() throws Exception { 410 Configuration conf = HBaseConfiguration.create(); 411 conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 2); 412 StripeCompactionPolicy policy = createPolicy(conf); 413 Long[] toSplit = new Long[] { defaultSplitSize / 2, defaultSplitSize / 2 }; 414 Long[] noSplit = new Long[] { 1L }; 415 long splitTargetSize = (long) (defaultSplitSize / defaultSplitCount); 416 417 // Verify the deletes can be dropped if there are no L0 files. 418 StripeCompactionPolicy.StripeInformationProvider si = 419 createStripesWithSizes(0, 0, noSplit, toSplit); 420 verifyWholeStripesCompaction(policy, si, 1, 1, true, null, splitTargetSize); 421 // But cannot be dropped if there are. 422 si = createStripesWithSizes(2, 2, noSplit, toSplit); 423 verifyWholeStripesCompaction(policy, si, 1, 1, false, null, splitTargetSize); 424 } 425 426 @SuppressWarnings("unchecked") 427 @Test 428 public void testMergeExpiredFiles() throws Exception { 429 ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); 430 long now = defaultTtl + 2; 431 edge.setValue(now); 432 EnvironmentEdgeManager.injectEdge(edge); 433 try { 434 HStoreFile expiredFile = createFile(), notExpiredFile = createFile(); 435 when(expiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl - 1); 436 when(notExpiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl + 1); 437 List<HStoreFile> expired = Lists.newArrayList(expiredFile, expiredFile); 438 List<HStoreFile> notExpired = Lists.newArrayList(notExpiredFile, notExpiredFile); 439 List<HStoreFile> mixed = Lists.newArrayList(expiredFile, notExpiredFile); 440 441 StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create(), defaultSplitSize, 442 defaultSplitCount, defaultInitialCount, true); 443 // Merge expired if there are eligible stripes. 444 StripeCompactionPolicy.StripeInformationProvider si = 445 createStripesWithFiles(expired, expired, expired); 446 verifyWholeStripesCompaction(policy, si, 0, 2, null, 1, Long.MAX_VALUE, false); 447 // Don't merge if nothing expired. 448 si = createStripesWithFiles(notExpired, notExpired, notExpired); 449 assertNull(policy.selectCompaction(si, al(), false)); 450 // Merge one expired stripe with next. 451 si = createStripesWithFiles(notExpired, expired, notExpired); 452 verifyWholeStripesCompaction(policy, si, 1, 2, null, 1, Long.MAX_VALUE, false); 453 // Merge the biggest run out of multiple options. 454 // Merge one expired stripe with next. 455 si = createStripesWithFiles(notExpired, expired, notExpired, expired, expired, notExpired); 456 verifyWholeStripesCompaction(policy, si, 3, 4, null, 1, Long.MAX_VALUE, false); 457 // Stripe with a subset of expired files is not merged. 458 si = createStripesWithFiles(expired, expired, notExpired, expired, mixed); 459 verifyWholeStripesCompaction(policy, si, 0, 1, null, 1, Long.MAX_VALUE, false); 460 } finally { 461 EnvironmentEdgeManager.reset(); 462 } 463 } 464 465 @SuppressWarnings("unchecked") 466 @Test 467 public void testMergeExpiredStripes() throws Exception { 468 // HBASE-11397 469 ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); 470 long now = defaultTtl + 2; 471 edge.setValue(now); 472 EnvironmentEdgeManager.injectEdge(edge); 473 try { 474 HStoreFile expiredFile = createFile(), notExpiredFile = createFile(); 475 when(expiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl - 1); 476 when(notExpiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl + 1); 477 List<HStoreFile> expired = Lists.newArrayList(expiredFile, expiredFile); 478 List<HStoreFile> notExpired = Lists.newArrayList(notExpiredFile, notExpiredFile); 479 480 StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create(), defaultSplitSize, 481 defaultSplitCount, defaultInitialCount, true); 482 483 // Merge all three expired stripes into one. 484 StripeCompactionPolicy.StripeInformationProvider si = 485 createStripesWithFiles(expired, expired, expired); 486 verifyMergeCompatcion(policy, si, 0, 2); 487 488 // Merge two adjacent expired stripes into one. 489 si = createStripesWithFiles(notExpired, expired, notExpired, expired, expired, notExpired); 490 verifyMergeCompatcion(policy, si, 3, 4); 491 } finally { 492 EnvironmentEdgeManager.reset(); 493 } 494 } 495 496 @SuppressWarnings("unchecked") 497 private static StripeCompactionPolicy.StripeInformationProvider 498 createStripesWithFiles(List<HStoreFile>... stripeFiles) throws Exception { 499 return createStripesWithFiles(createBoundaries(stripeFiles.length), 500 Lists.newArrayList(stripeFiles), new ArrayList<>()); 501 } 502 503 @Test 504 public void testSingleStripeDropDeletes() throws Exception { 505 Configuration conf = HBaseConfiguration.create(); 506 // Test depends on this not being set to pass. Default breaks test. TODO: Revisit. 507 conf.unset("hbase.hstore.compaction.min.size"); 508 StripeCompactionPolicy policy = createPolicy(conf); 509 // Verify the deletes can be dropped if there are no L0 files. 510 Long[][] stripes = new Long[][] { new Long[] { 3L, 2L, 2L, 2L }, new Long[] { 6L } }; 511 StripeInformationProvider si = createStripesWithSizes(0, 0, stripes); 512 verifySingleStripeCompaction(policy, si, 0, true); 513 // But cannot be dropped if there are. 514 si = createStripesWithSizes(2, 2, stripes); 515 verifySingleStripeCompaction(policy, si, 0, false); 516 // Unless there are enough to cause L0 compaction. 517 si = createStripesWithSizes(6, 2, stripes); 518 ConcatenatedLists<HStoreFile> sfs = new ConcatenatedLists<>(); 519 sfs.addSublist(si.getLevel0Files()); 520 sfs.addSublist(si.getStripes().get(0)); 521 verifyCompaction(policy, si, sfs, si.getStartRow(0), si.getEndRow(0), si.getStripeBoundaries()); 522 // If we cannot actually compact all files in some stripe, L0 is chosen. 523 si = createStripesWithSizes(6, 2, 524 new Long[][] { new Long[] { 10L, 1L, 1L, 1L, 1L }, new Long[] { 12L } }); 525 verifyCompaction(policy, si, si.getLevel0Files(), null, null, si.getStripeBoundaries()); 526 // even if L0 has no file 527 // if all files of stripe aren't selected, delete must not be dropped. 528 stripes = new Long[][] { new Long[] { 100L, 3L, 2L, 2L, 2L }, new Long[] { 6L } }; 529 si = createStripesWithSizes(0, 0, stripes); 530 List<HStoreFile> compactFile = new ArrayList<>(); 531 Iterator<HStoreFile> iter = si.getStripes().get(0).listIterator(1); 532 while (iter.hasNext()) { 533 compactFile.add(iter.next()); 534 } 535 verifyCompaction(policy, si, compactFile, false, 1, null, si.getStartRow(0), si.getEndRow(0), 536 true); 537 } 538 539 @Test 540 public void testCheckExpiredL0Compaction() throws Exception { 541 Configuration conf = HBaseConfiguration.create(); 542 int minL0 = 100; 543 conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, minL0); 544 conf.setInt(MIN_FILES_KEY, 4); 545 546 ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); 547 long now = defaultTtl + 2; 548 edge.setValue(now); 549 EnvironmentEdgeManager.injectEdge(edge); 550 HStoreFile expiredFile = createFile(10), notExpiredFile = createFile(10); 551 when(expiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl - 1); 552 when(notExpiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl + 1); 553 List<HStoreFile> expired = Lists.newArrayList(expiredFile, expiredFile); 554 List<HStoreFile> mixed = Lists.newArrayList(expiredFile, notExpiredFile); 555 556 StripeCompactionPolicy policy = 557 createPolicy(conf, defaultSplitSize, defaultSplitCount, defaultInitialCount, true); 558 // Merge expired if there are eligible stripes. 559 StripeCompactionPolicy.StripeInformationProvider si = 560 createStripesWithFiles(null, new ArrayList<>(), mixed); 561 assertFalse(policy.needsCompactions(si, al())); 562 563 List<HStoreFile> largeMixed = new ArrayList<>(); 564 for (int i = 0; i < minL0 - 1; i++) { 565 largeMixed.add(i % 2 == 0 ? notExpiredFile : expiredFile); 566 } 567 si = createStripesWithFiles(null, new ArrayList<>(), largeMixed); 568 assertFalse(policy.needsCompactions(si, al())); 569 570 si = createStripesWithFiles(null, new ArrayList<>(), expired); 571 assertFalse(policy.needsSingleStripeCompaction(si)); 572 assertFalse(policy.hasExpiredStripes(si)); 573 assertTrue(policy.allL0FilesExpired(si)); 574 assertTrue(policy.needsCompactions(si, al())); 575 } 576 577 /********* HELPER METHODS ************/ 578 private static StripeCompactionPolicy createPolicy(Configuration conf) throws Exception { 579 return createPolicy(conf, defaultSplitSize, defaultSplitCount, defaultInitialCount, false); 580 } 581 582 private static StripeCompactionPolicy createPolicy(Configuration conf, long splitSize, 583 float splitCount, int initialCount, boolean hasTtl) throws Exception { 584 conf.setLong(StripeStoreConfig.SIZE_TO_SPLIT_KEY, splitSize); 585 conf.setFloat(StripeStoreConfig.SPLIT_PARTS_KEY, splitCount); 586 conf.setInt(StripeStoreConfig.INITIAL_STRIPE_COUNT_KEY, initialCount); 587 StoreConfigInformation sci = mock(StoreConfigInformation.class); 588 when(sci.getStoreFileTtl()).thenReturn(hasTtl ? defaultTtl : Long.MAX_VALUE); 589 when(sci.getRegionInfo()).thenReturn(HRegionInfo.FIRST_META_REGIONINFO); 590 StripeStoreConfig ssc = new StripeStoreConfig(conf, sci); 591 return new StripeCompactionPolicy(conf, sci, ssc); 592 } 593 594 private static ArrayList<HStoreFile> al(HStoreFile... sfs) { 595 return new ArrayList<>(Arrays.asList(sfs)); 596 } 597 598 private void verifyMergeCompatcion(StripeCompactionPolicy policy, StripeInformationProvider si, 599 int from, int to) throws Exception { 600 StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false); 601 Collection<HStoreFile> sfs = getAllFiles(si, from, to); 602 verifyCollectionsEqual(sfs, scr.getRequest().getFiles()); 603 604 // All the Stripes are expired, so the Compactor will not create any Writers. We need to create 605 // an empty file to preserve metadata 606 StripeCompactor sc = createCompactor(); 607 List<Path> paths = scr.execute(sc, NoLimitThroughputController.INSTANCE, null); 608 assertEquals(1, paths.size()); 609 } 610 611 /** 612 * Verify the compaction that includes several entire stripes. 613 * @param policy Policy to test. 614 * @param si Stripe information pre-set with stripes to test. 615 * @param from Starting stripe. 616 * @param to Ending stripe (inclusive). 617 * @param dropDeletes Whether to drop deletes from compaction range. 618 * @param count Expected # of resulting stripes, null if not checked. 619 * @param size Expected target stripe size, null if not checked. 620 */ 621 private void verifyWholeStripesCompaction(StripeCompactionPolicy policy, 622 StripeInformationProvider si, int from, int to, Boolean dropDeletes, Integer count, Long size, 623 boolean needsCompaction) throws IOException { 624 verifyCompaction(policy, si, getAllFiles(si, from, to), dropDeletes, count, size, 625 si.getStartRow(from), si.getEndRow(to), needsCompaction); 626 } 627 628 private void verifyWholeStripesCompaction(StripeCompactionPolicy policy, 629 StripeInformationProvider si, int from, int to, Boolean dropDeletes, Integer count, Long size) 630 throws IOException { 631 verifyWholeStripesCompaction(policy, si, from, to, dropDeletes, count, size, true); 632 } 633 634 private void verifySingleStripeCompaction(StripeCompactionPolicy policy, 635 StripeInformationProvider si, int index, Boolean dropDeletes) throws IOException { 636 verifyWholeStripesCompaction(policy, si, index, index, dropDeletes, 1, null, true); 637 } 638 639 /** 640 * Verify no compaction is needed or selected. 641 * @param policy Policy to test. 642 * @param si Stripe information pre-set with stripes to test. 643 */ 644 private void verifyNoCompaction(StripeCompactionPolicy policy, StripeInformationProvider si) 645 throws IOException { 646 assertNull(policy.selectCompaction(si, al(), false)); 647 assertFalse(policy.needsCompactions(si, al())); 648 } 649 650 /** 651 * Verify arbitrary compaction. 652 * @param policy Policy to test. 653 * @param si Stripe information pre-set with stripes to test. 654 * @param sfs Files that should be compacted. 655 * @param dropDeletesFrom Row from which to drop deletes. 656 * @param dropDeletesTo Row to which to drop deletes. 657 * @param boundaries Expected target stripe boundaries. 658 */ 659 private void verifyCompaction(StripeCompactionPolicy policy, StripeInformationProvider si, 660 Collection<HStoreFile> sfs, byte[] dropDeletesFrom, byte[] dropDeletesTo, 661 final List<byte[]> boundaries) throws Exception { 662 StripeCompactor sc = mock(StripeCompactor.class); 663 assertTrue(policy.needsCompactions(si, al())); 664 StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false); 665 verifyCollectionsEqual(sfs, scr.getRequest().getFiles()); 666 scr.execute(sc, NoLimitThroughputController.INSTANCE, null); 667 verify(sc, times(1)).compact(eq(scr.getRequest()), argThat(new ArgumentMatcher<List<byte[]>>() { 668 @Override 669 public boolean matches(List<byte[]> argument) { 670 List<byte[]> other = argument; 671 if (other.size() != boundaries.size()) { 672 return false; 673 } 674 for (int i = 0; i < other.size(); ++i) { 675 if (!Bytes.equals(other.get(i), boundaries.get(i))) { 676 return false; 677 } 678 } 679 return true; 680 } 681 }), dropDeletesFrom == null ? isNull(byte[].class) : aryEq(dropDeletesFrom), 682 dropDeletesTo == null ? isNull(byte[].class) : aryEq(dropDeletesTo), any(), any()); 683 } 684 685 /** 686 * Verify arbitrary compaction. 687 * @param policy Policy to test. 688 * @param si Stripe information pre-set with stripes to test. 689 * @param sfs Files that should be compacted. 690 * @param dropDeletes Whether to drop deletes from compaction range. 691 * @param count Expected # of resulting stripes, null if not checked. 692 * @param size Expected target stripe size, null if not checked. 693 * @param start Left boundary of the compaction. 694 * @param end Right boundary of the compaction. 695 */ 696 private void verifyCompaction(StripeCompactionPolicy policy, StripeInformationProvider si, 697 Collection<HStoreFile> sfs, Boolean dropDeletes, Integer count, Long size, byte[] start, 698 byte[] end, boolean needsCompaction) throws IOException { 699 StripeCompactor sc = mock(StripeCompactor.class); 700 assertTrue(!needsCompaction || policy.needsCompactions(si, al())); 701 StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false); 702 verifyCollectionsEqual(sfs, scr.getRequest().getFiles()); 703 scr.execute(sc, NoLimitThroughputController.INSTANCE, null); 704 verify(sc, times(1)).compact(eq(scr.getRequest()), 705 count == null ? anyInt() : eq(count.intValue()), 706 size == null ? anyLong() : eq(size.longValue()), aryEq(start), aryEq(end), 707 dropDeletesMatcher(dropDeletes, start), dropDeletesMatcher(dropDeletes, end), any(), any()); 708 } 709 710 /** Verify arbitrary flush. */ 711 protected void verifyFlush(StripeCompactionPolicy policy, StripeInformationProvider si, 712 KeyValue[] input, KeyValue[][] expected, byte[][] boundaries) throws IOException { 713 StoreFileWritersCapture writers = new StoreFileWritersCapture(); 714 StripeStoreFlusher.StripeFlushRequest req = 715 policy.selectFlush(CellComparatorImpl.COMPARATOR, si, input.length); 716 StripeMultiFileWriter mw = req.createWriter(); 717 mw.init(null, writers); 718 for (KeyValue kv : input) { 719 mw.append(kv); 720 } 721 boolean hasMetadata = boundaries != null; 722 mw.commitWriters(0, false); 723 writers.verifyKvs(expected, true, hasMetadata); 724 if (hasMetadata) { 725 writers.verifyBoundaries(boundaries); 726 } 727 } 728 729 private byte[] dropDeletesMatcher(Boolean dropDeletes, byte[] value) { 730 return dropDeletes == null 731 ? any() 732 : (dropDeletes.booleanValue() ? aryEq(value) : isNull(byte[].class)); 733 } 734 735 private void verifyCollectionsEqual(Collection<HStoreFile> sfs, Collection<HStoreFile> scr) { 736 // Dumb. 737 assertEquals(sfs.size(), scr.size()); 738 assertTrue(scr.containsAll(sfs)); 739 } 740 741 private static List<HStoreFile> getAllFiles(StripeInformationProvider si, int fromStripe, 742 int toStripe) { 743 ArrayList<HStoreFile> expected = new ArrayList<>(); 744 for (int i = fromStripe; i <= toStripe; ++i) { 745 expected.addAll(si.getStripes().get(i)); 746 } 747 return expected; 748 } 749 750 /** 751 * @param l0Count Number of L0 files. 752 * @param boundaries Target boundaries. 753 * @return Mock stripes. 754 */ 755 private static StripeInformationProvider createStripes(int l0Count, byte[]... boundaries) 756 throws Exception { 757 List<Long> l0Sizes = new ArrayList<>(); 758 for (int i = 0; i < l0Count; ++i) { 759 l0Sizes.add(5L); 760 } 761 List<List<Long>> sizes = new ArrayList<>(); 762 for (int i = 0; i <= boundaries.length; ++i) { 763 sizes.add(Arrays.asList(Long.valueOf(5))); 764 } 765 return createStripes(Arrays.asList(boundaries), sizes, l0Sizes); 766 } 767 768 /** 769 * @param l0Count Number of L0 files. 770 * @param l0Size Size of each file. 771 * @return Mock stripes. 772 */ 773 private static StripeInformationProvider createStripesL0Only(int l0Count, long l0Size) 774 throws Exception { 775 List<Long> l0Sizes = new ArrayList<>(); 776 for (int i = 0; i < l0Count; ++i) { 777 l0Sizes.add(l0Size); 778 } 779 return createStripes(null, new ArrayList<>(), l0Sizes); 780 } 781 782 /** 783 * @param l0Count Number of L0 files. 784 * @param l0Size Size of each file. 785 * @param sizes Sizes of the files; each sub-array representing a stripe. 786 * @return Mock stripes. 787 */ 788 private static StripeInformationProvider createStripesWithSizes(int l0Count, long l0Size, 789 Long[]... sizes) throws Exception { 790 ArrayList<List<Long>> sizeList = new ArrayList<>(sizes.length); 791 for (Long[] size : sizes) { 792 sizeList.add(Arrays.asList(size)); 793 } 794 return createStripesWithSizes(l0Count, l0Size, sizeList); 795 } 796 797 private static StripeInformationProvider createStripesWithSizes(int l0Count, long l0Size, 798 List<List<Long>> sizes) throws Exception { 799 List<byte[]> boundaries = createBoundaries(sizes.size()); 800 List<Long> l0Sizes = new ArrayList<>(); 801 for (int i = 0; i < l0Count; ++i) { 802 l0Sizes.add(l0Size); 803 } 804 return createStripes(boundaries, sizes, l0Sizes); 805 } 806 807 private static List<byte[]> createBoundaries(int stripeCount) { 808 byte[][] keys = new byte[][] { KEY_A, KEY_B, KEY_C, KEY_D, KEY_E }; 809 assert stripeCount <= keys.length + 1; 810 List<byte[]> boundaries = new ArrayList<>(); 811 boundaries.addAll(Arrays.asList(keys).subList(0, stripeCount - 1)); 812 return boundaries; 813 } 814 815 private static StripeInformationProvider createStripes(List<byte[]> boundaries, 816 List<List<Long>> stripeSizes, List<Long> l0Sizes) throws Exception { 817 List<List<HStoreFile>> stripeFiles = new ArrayList<>(stripeSizes.size()); 818 for (List<Long> sizes : stripeSizes) { 819 List<HStoreFile> sfs = new ArrayList<>(sizes.size()); 820 for (Long size : sizes) { 821 sfs.add(createFile(size)); 822 } 823 stripeFiles.add(sfs); 824 } 825 List<HStoreFile> l0Files = new ArrayList<>(); 826 for (Long size : l0Sizes) { 827 l0Files.add(createFile(size)); 828 } 829 return createStripesWithFiles(boundaries, stripeFiles, l0Files); 830 } 831 832 /** 833 * This method actually does all the work. 834 */ 835 private static StripeInformationProvider createStripesWithFiles(List<byte[]> boundaries, 836 List<List<HStoreFile>> stripeFiles, List<HStoreFile> l0Files) throws Exception { 837 ArrayList<ImmutableList<HStoreFile>> stripes = new ArrayList<>(); 838 ArrayList<byte[]> boundariesList = new ArrayList<>(); 839 StripeInformationProvider si = mock(StripeInformationProvider.class); 840 if (!stripeFiles.isEmpty()) { 841 assert stripeFiles.size() == (boundaries.size() + 1); 842 boundariesList.add(OPEN_KEY); 843 for (int i = 0; i <= boundaries.size(); ++i) { 844 byte[] startKey = ((i == 0) ? OPEN_KEY : boundaries.get(i - 1)); 845 byte[] endKey = ((i == boundaries.size()) ? OPEN_KEY : boundaries.get(i)); 846 boundariesList.add(endKey); 847 for (HStoreFile sf : stripeFiles.get(i)) { 848 setFileStripe(sf, startKey, endKey); 849 } 850 stripes.add(ImmutableList.copyOf(stripeFiles.get(i))); 851 when(si.getStartRow(eq(i))).thenReturn(startKey); 852 when(si.getEndRow(eq(i))).thenReturn(endKey); 853 } 854 } 855 ConcatenatedLists<HStoreFile> sfs = new ConcatenatedLists<>(); 856 sfs.addAllSublists(stripes); 857 sfs.addSublist(l0Files); 858 when(si.getStorefiles()).thenReturn(sfs); 859 when(si.getStripes()).thenReturn(stripes); 860 when(si.getStripeBoundaries()).thenReturn(boundariesList); 861 when(si.getStripeCount()).thenReturn(stripes.size()); 862 when(si.getLevel0Files()).thenReturn(l0Files); 863 return si; 864 } 865 866 private static HStoreFile createFile(long size) throws Exception { 867 HStoreFile sf = mock(HStoreFile.class); 868 when(sf.getPath()).thenReturn(new Path("moo")); 869 StoreFileReader r = mock(StoreFileReader.class); 870 when(r.getEntries()).thenReturn(size); 871 when(r.length()).thenReturn(size); 872 when(r.getBloomFilterType()).thenReturn(BloomType.NONE); 873 when(r.getHFileReader()).thenReturn(mock(HFile.Reader.class)); 874 when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong(), anyLong(), 875 anyBoolean())).thenReturn(mock(StoreFileScanner.class)); 876 when(sf.getReader()).thenReturn(r); 877 when(sf.getBulkLoadTimestamp()).thenReturn(OptionalLong.empty()); 878 when(r.getMaxTimestamp()).thenReturn(TimeRange.INITIAL_MAX_TIMESTAMP); 879 return sf; 880 } 881 882 private static HStoreFile createFile() throws Exception { 883 return createFile(0); 884 } 885 886 private static void setFileStripe(HStoreFile sf, byte[] startKey, byte[] endKey) { 887 when(sf.getMetadataValue(StripeStoreFileManager.STRIPE_START_KEY)).thenReturn(startKey); 888 when(sf.getMetadataValue(StripeStoreFileManager.STRIPE_END_KEY)).thenReturn(endKey); 889 } 890 891 private StripeCompactor createCompactor() throws Exception { 892 HColumnDescriptor col = new HColumnDescriptor(Bytes.toBytes("foo")); 893 StoreFileWritersCapture writers = new StoreFileWritersCapture(); 894 HStore store = mock(HStore.class); 895 HRegionInfo info = mock(HRegionInfo.class); 896 when(info.getRegionNameAsString()).thenReturn("testRegion"); 897 when(store.getColumnFamilyDescriptor()).thenReturn(col); 898 when(store.getRegionInfo()).thenReturn(info); 899 StoreEngine storeEngine = mock(StoreEngine.class); 900 when(storeEngine.createWriter(any(CreateStoreFileWriterParams.class))).thenAnswer(writers); 901 when(store.getStoreEngine()).thenReturn(storeEngine); 902 903 Configuration conf = HBaseConfiguration.create(); 904 conf.setBoolean("hbase.regionserver.compaction.private.readers", usePrivateReaders); 905 final Scanner scanner = new Scanner(); 906 return new StripeCompactor(conf, store) { 907 @Override 908 protected InternalScanner createScanner(HStore store, ScanInfo scanInfo, 909 List<StoreFileScanner> scanners, long smallestReadPoint, long earliestPutTs, 910 byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException { 911 return scanner; 912 } 913 914 @Override 915 protected InternalScanner createScanner(HStore store, ScanInfo scanInfo, 916 List<StoreFileScanner> scanners, ScanType scanType, long smallestReadPoint, 917 long earliestPutTs) throws IOException { 918 return scanner; 919 } 920 }; 921 } 922 923 private static class Scanner implements InternalScanner { 924 private final ArrayList<KeyValue> kvs; 925 926 public Scanner(KeyValue... kvs) { 927 this.kvs = new ArrayList<>(Arrays.asList(kvs)); 928 } 929 930 @Override 931 public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException { 932 if (kvs.isEmpty()) { 933 return false; 934 } 935 result.add(kvs.remove(0)); 936 return !kvs.isEmpty(); 937 } 938 939 @Override 940 public void close() throws IOException { 941 } 942 } 943}