001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.compactions; 019 020import static org.apache.hadoop.hbase.regionserver.StripeStoreConfig.MAX_FILES_KEY; 021import static org.apache.hadoop.hbase.regionserver.StripeStoreConfig.MIN_FILES_KEY; 022import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY; 023import static org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_SIZE_KEY; 024import static org.junit.jupiter.api.Assertions.assertEquals; 025import static org.junit.jupiter.api.Assertions.assertFalse; 026import static org.junit.jupiter.api.Assertions.assertNotNull; 027import static org.junit.jupiter.api.Assertions.assertNull; 028import static org.junit.jupiter.api.Assertions.assertTrue; 029import static org.mockito.AdditionalMatchers.aryEq; 030import static org.mockito.ArgumentMatchers.any; 031import static org.mockito.ArgumentMatchers.anyBoolean; 032import static org.mockito.ArgumentMatchers.anyInt; 033import static org.mockito.ArgumentMatchers.anyLong; 034import static org.mockito.ArgumentMatchers.argThat; 035import static org.mockito.ArgumentMatchers.eq; 036import static org.mockito.ArgumentMatchers.isNull; 037import static org.mockito.Mockito.mock; 038import static org.mockito.Mockito.only; 039import static org.mockito.Mockito.times; 040import static org.mockito.Mockito.verify; 041import static org.mockito.Mockito.when; 042 043import java.io.IOException; 044import java.util.ArrayList; 045import java.util.Arrays; 046import java.util.Collection; 047import java.util.Iterator; 048import java.util.List; 049import java.util.OptionalLong; 050import java.util.stream.Stream; 051import org.apache.hadoop.conf.Configuration; 052import org.apache.hadoop.fs.Path; 053import org.apache.hadoop.hbase.CellComparatorImpl; 054import org.apache.hadoop.hbase.ExtendedCell; 055import org.apache.hadoop.hbase.HBaseConfiguration; 056import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate; 057import org.apache.hadoop.hbase.KeyValue; 058import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 059import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 060import org.apache.hadoop.hbase.client.RegionInfo; 061import org.apache.hadoop.hbase.client.RegionInfoBuilder; 062import org.apache.hadoop.hbase.io.TimeRange; 063import org.apache.hadoop.hbase.io.hfile.HFile; 064import org.apache.hadoop.hbase.regionserver.BloomType; 065import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams; 066import org.apache.hadoop.hbase.regionserver.HStore; 067import org.apache.hadoop.hbase.regionserver.HStoreFile; 068import org.apache.hadoop.hbase.regionserver.InternalScanner; 069import org.apache.hadoop.hbase.regionserver.ScanInfo; 070import org.apache.hadoop.hbase.regionserver.ScanType; 071import org.apache.hadoop.hbase.regionserver.ScannerContext; 072import org.apache.hadoop.hbase.regionserver.StoreConfigInformation; 073import org.apache.hadoop.hbase.regionserver.StoreEngine; 074import org.apache.hadoop.hbase.regionserver.StoreFileReader; 075import org.apache.hadoop.hbase.regionserver.StoreFileScanner; 076import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter; 077import org.apache.hadoop.hbase.regionserver.StripeStoreConfig; 078import org.apache.hadoop.hbase.regionserver.StripeStoreFileManager; 079import org.apache.hadoop.hbase.regionserver.StripeStoreFlusher; 080import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy.StripeInformationProvider; 081import org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.StoreFileWritersCapture; 082import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; 083import org.apache.hadoop.hbase.testclassification.MediumTests; 084import org.apache.hadoop.hbase.testclassification.RegionServerTests; 085import org.apache.hadoop.hbase.util.Bytes; 086import org.apache.hadoop.hbase.util.ConcatenatedLists; 087import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 088import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; 089import org.junit.jupiter.api.Tag; 090import org.junit.jupiter.api.TestTemplate; 091import org.junit.jupiter.params.provider.Arguments; 092import org.mockito.ArgumentMatcher; 093 094import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 095import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 096 097@Tag(RegionServerTests.TAG) 098@Tag(MediumTests.TAG) 099@HBaseParameterizedTestTemplate(name = "{index}: usePrivateReaders={0}") 100public class TestStripeCompactionPolicy { 101 102 private static final byte[] KEY_A = Bytes.toBytes("aaa"); 103 private static final byte[] KEY_B = Bytes.toBytes("bbb"); 104 private static final byte[] KEY_C = Bytes.toBytes("ccc"); 105 private static final byte[] KEY_D = Bytes.toBytes("ddd"); 106 private static final byte[] KEY_E = Bytes.toBytes("eee"); 107 private static final KeyValue KV_A = new KeyValue(KEY_A, 0L); 108 private static final KeyValue KV_B = new KeyValue(KEY_B, 0L); 109 private static final KeyValue KV_C = new KeyValue(KEY_C, 0L); 110 private static final KeyValue KV_D = new KeyValue(KEY_D, 0L); 111 private static final KeyValue KV_E = new KeyValue(KEY_E, 0L); 112 113 private static long defaultSplitSize = 18; 114 private static float defaultSplitCount = 1.8F; 115 private final static int defaultInitialCount = 1; 116 private static long defaultTtl = 1000 * 1000; 117 118 public static Stream<Arguments> parameters() { 119 return Stream.of(Arguments.of(true), Arguments.of(false)); 120 } 121 122 private final boolean usePrivateReaders; 123 124 public TestStripeCompactionPolicy(boolean usePrivateReaders) { 125 this.usePrivateReaders = usePrivateReaders; 126 } 127 128 @TestTemplate 129 public void testNoStripesFromFlush() throws Exception { 130 Configuration conf = HBaseConfiguration.create(); 131 conf.setBoolean(StripeStoreConfig.FLUSH_TO_L0_KEY, true); 132 StripeCompactionPolicy policy = createPolicy(conf); 133 StripeInformationProvider si = createStripesL0Only(0, 0); 134 135 KeyValue[] input = new KeyValue[] { KV_A, KV_B, KV_C, KV_D, KV_E }; 136 KeyValue[][] expected = new KeyValue[][] { input }; 137 verifyFlush(policy, si, input, expected, null); 138 } 139 140 @TestTemplate 141 public void testOldStripesFromFlush() throws Exception { 142 StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create()); 143 StripeInformationProvider si = createStripes(0, KEY_C, KEY_D); 144 145 KeyValue[] input = new KeyValue[] { KV_B, KV_C, KV_C, KV_D, KV_E }; 146 KeyValue[][] expected = new KeyValue[][] { new KeyValue[] { KV_B }, 147 new KeyValue[] { KV_C, KV_C }, new KeyValue[] { KV_D, KV_E } }; 148 verifyFlush(policy, si, input, expected, new byte[][] { OPEN_KEY, KEY_C, KEY_D, OPEN_KEY }); 149 } 150 151 @TestTemplate 152 public void testNewStripesFromFlush() throws Exception { 153 StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create()); 154 StripeInformationProvider si = createStripesL0Only(0, 0); 155 KeyValue[] input = new KeyValue[] { KV_B, KV_C, KV_C, KV_D, KV_E }; 156 // Starts with one stripe; unlike flush results, must have metadata 157 KeyValue[][] expected = new KeyValue[][] { input }; 158 verifyFlush(policy, si, input, expected, new byte[][] { OPEN_KEY, OPEN_KEY }); 159 } 160 161 @TestTemplate 162 public void testSingleStripeCompaction() throws Exception { 163 // Create a special policy that only compacts single stripes, using standard methods. 164 Configuration conf = HBaseConfiguration.create(); 165 // Test depends on this not being set to pass. Default breaks test. TODO: Revisit. 166 conf.unset("hbase.hstore.compaction.min.size"); 167 conf.setFloat(CompactionConfiguration.HBASE_HSTORE_COMPACTION_RATIO_KEY, 1.0F); 168 conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 3); 169 conf.setInt(StripeStoreConfig.MAX_FILES_KEY, 4); 170 conf.setLong(StripeStoreConfig.SIZE_TO_SPLIT_KEY, 1000); // make sure the are no splits 171 StoreConfigInformation sci = mock(StoreConfigInformation.class); 172 when(sci.getRegionInfo()).thenReturn(RegionInfoBuilder.FIRST_META_REGIONINFO); 173 StripeStoreConfig ssc = new StripeStoreConfig(conf, sci); 174 StripeCompactionPolicy policy = new StripeCompactionPolicy(conf, sci, ssc) { 175 @Override 176 public StripeCompactionRequest selectCompaction(StripeInformationProvider si, 177 List<HStoreFile> filesCompacting, boolean isOffpeak) throws IOException { 178 if (!filesCompacting.isEmpty()) { 179 return null; 180 } 181 return selectSingleStripeCompaction(si, false, false, isOffpeak); 182 } 183 184 @Override 185 public boolean needsCompactions(StripeInformationProvider si, 186 List<HStoreFile> filesCompacting) { 187 if (!filesCompacting.isEmpty()) { 188 return false; 189 } 190 return needsSingleStripeCompaction(si); 191 } 192 }; 193 194 // No compaction due to min files or ratio 195 StripeInformationProvider si = 196 createStripesWithSizes(0, 0, new Long[] { 2L }, new Long[] { 3L, 3L }, new Long[] { 5L, 1L }); 197 verifyNoCompaction(policy, si); 198 // No compaction due to min files or ratio - will report needed, but not do any. 199 si = createStripesWithSizes(0, 0, new Long[] { 2L }, new Long[] { 3L, 3L }, 200 new Long[] { 5L, 1L, 1L }); 201 assertNull(policy.selectCompaction(si, al(), false)); 202 assertTrue(policy.needsCompactions(si, al())); 203 // One stripe has possible compaction 204 si = createStripesWithSizes(0, 0, new Long[] { 2L }, new Long[] { 3L, 3L }, 205 new Long[] { 5L, 4L, 3L }); 206 verifySingleStripeCompaction(policy, si, 2, null); 207 // Several stripes have possible compactions; choose best quality (removes most files) 208 si = createStripesWithSizes(0, 0, new Long[] { 3L, 2L, 2L }, new Long[] { 2L, 2L, 1L }, 209 new Long[] { 3L, 2L, 2L, 1L }); 210 verifySingleStripeCompaction(policy, si, 2, null); 211 si = createStripesWithSizes(0, 0, new Long[] { 5L }, new Long[] { 3L, 2L, 2L, 1L }, 212 new Long[] { 3L, 2L, 2L }); 213 verifySingleStripeCompaction(policy, si, 1, null); 214 // Or with smallest files, if the count is the same 215 si = createStripesWithSizes(0, 0, new Long[] { 3L, 3L, 3L }, new Long[] { 3L, 1L, 2L }, 216 new Long[] { 3L, 2L, 2L }); 217 verifySingleStripeCompaction(policy, si, 1, null); 218 // Verify max count is respected. 219 si = createStripesWithSizes(0, 0, new Long[] { 5L }, new Long[] { 5L, 4L, 4L, 4L, 4L }); 220 List<HStoreFile> sfs = si.getStripes().get(1).subList(1, 5); 221 verifyCompaction(policy, si, sfs, null, 1, null, si.getStartRow(1), si.getEndRow(1), true); 222 // Verify ratio is applied. 223 si = createStripesWithSizes(0, 0, new Long[] { 5L }, new Long[] { 50L, 4L, 4L, 4L, 4L }); 224 sfs = si.getStripes().get(1).subList(1, 5); 225 verifyCompaction(policy, si, sfs, null, 1, null, si.getStartRow(1), si.getEndRow(1), true); 226 } 227 228 @TestTemplate 229 public void testWithParallelCompaction() throws Exception { 230 // TODO: currently only one compaction at a time per store is allowed. If this changes, 231 // the appropriate file exclusion testing would need to be done in respective tests. 232 assertNull(createPolicy(HBaseConfiguration.create()) 233 .selectCompaction(mock(StripeInformationProvider.class), al(createFile()), false)); 234 } 235 236 @TestTemplate 237 public void testWithReferences() throws Exception { 238 StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create()); 239 StripeCompactor sc = mock(StripeCompactor.class); 240 HStoreFile ref = createFile(); 241 when(ref.isReference()).thenReturn(true); 242 StripeInformationProvider si = mock(StripeInformationProvider.class); 243 Collection<HStoreFile> sfs = al(ref, createFile()); 244 when(si.getStoreFiles()).thenReturn(sfs); 245 246 assertTrue(policy.needsCompactions(si, al())); 247 StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false); 248 // UnmodifiableCollection does not implement equals so we need to change it here to a 249 // collection that implements it. 250 assertEquals(si.getStoreFiles(), new ArrayList<>(scr.getRequest().getFiles())); 251 scr.execute(sc, NoLimitThroughputController.INSTANCE, null); 252 verify(sc, only()).compact(eq(scr.getRequest()), anyInt(), anyLong(), aryEq(OPEN_KEY), 253 aryEq(OPEN_KEY), aryEq(OPEN_KEY), aryEq(OPEN_KEY), any(), any()); 254 } 255 256 @TestTemplate 257 public void testInitialCountFromL0() throws Exception { 258 Configuration conf = HBaseConfiguration.create(); 259 conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 2); 260 StripeCompactionPolicy policy = 261 createPolicy(conf, defaultSplitSize, defaultSplitCount, 2, false); 262 StripeCompactionPolicy.StripeInformationProvider si = createStripesL0Only(3, 8); 263 verifyCompaction(policy, si, si.getStoreFiles(), true, 2, 12L, OPEN_KEY, OPEN_KEY, true); 264 si = createStripesL0Only(3, 10); // If result would be too large, split into smaller parts. 265 verifyCompaction(policy, si, si.getStoreFiles(), true, 3, 10L, OPEN_KEY, OPEN_KEY, true); 266 policy = createPolicy(conf, defaultSplitSize, defaultSplitCount, 6, false); 267 verifyCompaction(policy, si, si.getStoreFiles(), true, 6, 5L, OPEN_KEY, OPEN_KEY, true); 268 } 269 270 @TestTemplate 271 public void testSelectL0Compaction() throws Exception { 272 // test select ALL L0 files when L0 files count > MIN_FILES_L0_KEY 273 Configuration conf = HBaseConfiguration.create(); 274 conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 4); 275 StripeCompactionPolicy policy = createPolicy(conf); 276 StripeCompactionPolicy.StripeInformationProvider si = createStripesWithSizes(10, 10L, 277 new Long[] { 5L, 1L, 1L }, new Long[] { 3L, 2L, 2L }, new Long[] { 3L, 2L, 2L }); 278 StripeCompactionPolicy.StripeCompactionRequest cr = policy.selectCompaction(si, al(), false); 279 assertNotNull(cr); 280 assertEquals(10, cr.getRequest().getFiles().size()); 281 verifyCollectionsEqual(si.getLevel0Files(), cr.getRequest().getFiles()); 282 283 // test select partial L0 files when size of L0 files > HBASE_HSTORE_COMPACTION_MAX_SIZE_KEY 284 conf.setLong(HBASE_HSTORE_COMPACTION_MAX_SIZE_KEY, 100L); 285 policy = createPolicy(conf); 286 si = createStripesWithSizes(5, 50L, new Long[] { 5L, 1L, 1L }, new Long[] { 3L, 2L, 2L }, 287 new Long[] { 3L, 2L, 2L }); 288 cr = policy.selectCompaction(si, al(), false); 289 assertNotNull(cr); 290 assertEquals(2, cr.getRequest().getFiles().size()); 291 verifyCollectionsEqual(si.getLevel0Files().subList(0, 2), cr.getRequest().getFiles()); 292 293 // test select partial L0 files when count of L0 files > MAX_FILES_KEY 294 conf.setInt(MAX_FILES_KEY, 6); 295 conf.setLong(HBASE_HSTORE_COMPACTION_MAX_SIZE_KEY, 1000L); 296 policy = createPolicy(conf); 297 si = createStripesWithSizes(10, 10L, new Long[] { 5L, 1L, 1L }, new Long[] { 3L, 2L, 2L }, 298 new Long[] { 3L, 2L, 2L }); 299 cr = policy.selectCompaction(si, al(), false); 300 assertNotNull(cr); 301 assertEquals(6, cr.getRequest().getFiles().size()); 302 verifyCollectionsEqual(si.getLevel0Files().subList(0, 6), cr.getRequest().getFiles()); 303 } 304 305 @TestTemplate 306 public void testExistingStripesFromL0() throws Exception { 307 Configuration conf = HBaseConfiguration.create(); 308 conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 3); 309 StripeCompactionPolicy.StripeInformationProvider si = createStripes(3, KEY_A); 310 verifyCompaction(createPolicy(conf), si, si.getLevel0Files(), null, null, 311 si.getStripeBoundaries()); 312 } 313 314 @TestTemplate 315 public void testNothingToCompactFromL0() throws Exception { 316 Configuration conf = HBaseConfiguration.create(); 317 conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 4); 318 StripeCompactionPolicy.StripeInformationProvider si = createStripesL0Only(3, 10); 319 StripeCompactionPolicy policy = createPolicy(conf); 320 verifyNoCompaction(policy, si); 321 322 si = createStripes(3, KEY_A); 323 verifyNoCompaction(policy, si); 324 } 325 326 @TestTemplate 327 public void testCheckExpiredStripeCompaction() throws Exception { 328 Configuration conf = HBaseConfiguration.create(); 329 conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 5); 330 conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 4); 331 332 ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); 333 long now = defaultTtl + 2; 334 edge.setValue(now); 335 EnvironmentEdgeManager.injectEdge(edge); 336 HStoreFile expiredFile = createFile(10), notExpiredFile = createFile(10); 337 when(expiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl - 1); 338 when(notExpiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl + 1); 339 List<HStoreFile> expired = Lists.newArrayList(expiredFile, expiredFile); 340 List<HStoreFile> mixed = Lists.newArrayList(expiredFile, notExpiredFile); 341 342 StripeCompactionPolicy policy = 343 createPolicy(conf, defaultSplitSize, defaultSplitCount, defaultInitialCount, true); 344 // Merge expired if there are eligible stripes. 345 StripeCompactionPolicy.StripeInformationProvider si = 346 createStripesWithFiles(mixed, mixed, mixed); 347 assertFalse(policy.needsCompactions(si, al())); 348 349 si = createStripesWithFiles(mixed, mixed, mixed, expired); 350 assertFalse(policy.needsSingleStripeCompaction(si)); 351 assertTrue(policy.hasExpiredStripes(si)); 352 assertTrue(policy.needsCompactions(si, al())); 353 } 354 355 @TestTemplate 356 public void testSplitOffStripe() throws Exception { 357 Configuration conf = HBaseConfiguration.create(); 358 // Test depends on this not being set to pass. Default breaks test. TODO: Revisit. 359 conf.unset("hbase.hstore.compaction.min.size"); 360 // First test everything with default split count of 2, then split into more. 361 conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 2); 362 Long[] toSplit = new Long[] { defaultSplitSize - 2, 1L, 1L }; 363 Long[] noSplit = new Long[] { defaultSplitSize - 2, 1L }; 364 long splitTargetSize = (long) (defaultSplitSize / defaultSplitCount); 365 // Don't split if not eligible for compaction. 366 StripeCompactionPolicy.StripeInformationProvider si = 367 createStripesWithSizes(0, 0, new Long[] { defaultSplitSize - 2, 2L }); 368 assertNull(createPolicy(conf).selectCompaction(si, al(), false)); 369 // Make sure everything is eligible. 370 conf.setFloat(CompactionConfiguration.HBASE_HSTORE_COMPACTION_RATIO_KEY, 500f); 371 StripeCompactionPolicy policy = createPolicy(conf); 372 verifyWholeStripesCompaction(policy, si, 0, 0, null, 2, splitTargetSize); 373 // Add some extra stripes... 374 si = createStripesWithSizes(0, 0, noSplit, noSplit, toSplit); 375 verifyWholeStripesCompaction(policy, si, 2, 2, null, 2, splitTargetSize); 376 // In the middle. 377 si = createStripesWithSizes(0, 0, noSplit, toSplit, noSplit); 378 verifyWholeStripesCompaction(policy, si, 1, 1, null, 2, splitTargetSize); 379 // No split-off with different config (larger split size). 380 // However, in this case some eligible stripe will just be compacted alone. 381 StripeCompactionPolicy specPolicy = 382 createPolicy(conf, defaultSplitSize + 1, defaultSplitCount, defaultInitialCount, false); 383 verifySingleStripeCompaction(specPolicy, si, 1, null); 384 } 385 386 @TestTemplate 387 public void testSplitOffStripeOffPeak() throws Exception { 388 // for HBASE-11439 389 Configuration conf = HBaseConfiguration.create(); 390 391 // Test depends on this not being set to pass. Default breaks test. TODO: Revisit. 392 conf.unset("hbase.hstore.compaction.min.size"); 393 394 conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 2); 395 // Select the last 2 files. 396 StripeCompactionPolicy.StripeInformationProvider si = 397 createStripesWithSizes(0, 0, new Long[] { defaultSplitSize - 2, 1L, 1L }); 398 assertEquals(2, 399 createPolicy(conf).selectCompaction(si, al(), false).getRequest().getFiles().size()); 400 // Make sure everything is eligible in offpeak. 401 conf.setFloat("hbase.hstore.compaction.ratio.offpeak", 500f); 402 assertEquals(3, 403 createPolicy(conf).selectCompaction(si, al(), true).getRequest().getFiles().size()); 404 } 405 406 @TestTemplate 407 public void testSplitOffStripeDropDeletes() throws Exception { 408 Configuration conf = HBaseConfiguration.create(); 409 conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 2); 410 StripeCompactionPolicy policy = createPolicy(conf); 411 Long[] toSplit = new Long[] { defaultSplitSize / 2, defaultSplitSize / 2 }; 412 Long[] noSplit = new Long[] { 1L }; 413 long splitTargetSize = (long) (defaultSplitSize / defaultSplitCount); 414 415 // Verify the deletes can be dropped if there are no L0 files. 416 StripeCompactionPolicy.StripeInformationProvider si = 417 createStripesWithSizes(0, 0, noSplit, toSplit); 418 verifyWholeStripesCompaction(policy, si, 1, 1, true, null, splitTargetSize); 419 // But cannot be dropped if there are. 420 si = createStripesWithSizes(2, 2, noSplit, toSplit); 421 verifyWholeStripesCompaction(policy, si, 1, 1, false, null, splitTargetSize); 422 } 423 424 @SuppressWarnings("unchecked") 425 @TestTemplate 426 public void testMergeExpiredFiles() throws Exception { 427 ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); 428 long now = defaultTtl + 2; 429 edge.setValue(now); 430 EnvironmentEdgeManager.injectEdge(edge); 431 try { 432 HStoreFile expiredFile = createFile(), notExpiredFile = createFile(); 433 when(expiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl - 1); 434 when(notExpiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl + 1); 435 List<HStoreFile> expired = Lists.newArrayList(expiredFile, expiredFile); 436 List<HStoreFile> notExpired = Lists.newArrayList(notExpiredFile, notExpiredFile); 437 List<HStoreFile> mixed = Lists.newArrayList(expiredFile, notExpiredFile); 438 439 StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create(), defaultSplitSize, 440 defaultSplitCount, defaultInitialCount, true); 441 // Merge expired if there are eligible stripes. 442 StripeCompactionPolicy.StripeInformationProvider si = 443 createStripesWithFiles(expired, expired, expired); 444 verifyWholeStripesCompaction(policy, si, 0, 2, null, 1, Long.MAX_VALUE, false); 445 // Don't merge if nothing expired. 446 si = createStripesWithFiles(notExpired, notExpired, notExpired); 447 assertNull(policy.selectCompaction(si, al(), false)); 448 // Merge one expired stripe with next. 449 si = createStripesWithFiles(notExpired, expired, notExpired); 450 verifyWholeStripesCompaction(policy, si, 1, 2, null, 1, Long.MAX_VALUE, false); 451 // Merge the biggest run out of multiple options. 452 // Merge one expired stripe with next. 453 si = createStripesWithFiles(notExpired, expired, notExpired, expired, expired, notExpired); 454 verifyWholeStripesCompaction(policy, si, 3, 4, null, 1, Long.MAX_VALUE, false); 455 // Stripe with a subset of expired files is not merged. 456 si = createStripesWithFiles(expired, expired, notExpired, expired, mixed); 457 verifyWholeStripesCompaction(policy, si, 0, 1, null, 1, Long.MAX_VALUE, false); 458 } finally { 459 EnvironmentEdgeManager.reset(); 460 } 461 } 462 463 @SuppressWarnings("unchecked") 464 @TestTemplate 465 public void testMergeExpiredStripes() throws Exception { 466 // HBASE-11397 467 ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); 468 long now = defaultTtl + 2; 469 edge.setValue(now); 470 EnvironmentEdgeManager.injectEdge(edge); 471 try { 472 HStoreFile expiredFile = createFile(), notExpiredFile = createFile(); 473 when(expiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl - 1); 474 when(notExpiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl + 1); 475 List<HStoreFile> expired = Lists.newArrayList(expiredFile, expiredFile); 476 List<HStoreFile> notExpired = Lists.newArrayList(notExpiredFile, notExpiredFile); 477 478 StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create(), defaultSplitSize, 479 defaultSplitCount, defaultInitialCount, true); 480 481 // Merge all three expired stripes into one. 482 StripeCompactionPolicy.StripeInformationProvider si = 483 createStripesWithFiles(expired, expired, expired); 484 verifyMergeCompatcion(policy, si, 0, 2); 485 486 // Merge two adjacent expired stripes into one. 487 si = createStripesWithFiles(notExpired, expired, notExpired, expired, expired, notExpired); 488 verifyMergeCompatcion(policy, si, 3, 4); 489 } finally { 490 EnvironmentEdgeManager.reset(); 491 } 492 } 493 494 @SuppressWarnings("unchecked") 495 private static StripeCompactionPolicy.StripeInformationProvider 496 createStripesWithFiles(List<HStoreFile>... stripeFiles) throws Exception { 497 return createStripesWithFiles(createBoundaries(stripeFiles.length), 498 Lists.newArrayList(stripeFiles), new ArrayList<>()); 499 } 500 501 @TestTemplate 502 public void testSingleStripeDropDeletes() throws Exception { 503 Configuration conf = HBaseConfiguration.create(); 504 // Test depends on this not being set to pass. Default breaks test. TODO: Revisit. 505 conf.unset("hbase.hstore.compaction.min.size"); 506 StripeCompactionPolicy policy = createPolicy(conf); 507 // Verify the deletes can be dropped if there are no L0 files. 508 Long[][] stripes = new Long[][] { new Long[] { 3L, 2L, 2L, 2L }, new Long[] { 6L } }; 509 StripeInformationProvider si = createStripesWithSizes(0, 0, stripes); 510 verifySingleStripeCompaction(policy, si, 0, true); 511 // But cannot be dropped if there are. 512 si = createStripesWithSizes(2, 2, stripes); 513 verifySingleStripeCompaction(policy, si, 0, false); 514 // Unless there are enough to cause L0 compaction. 515 si = createStripesWithSizes(6, 2, stripes); 516 ConcatenatedLists<HStoreFile> sfs = new ConcatenatedLists<>(); 517 sfs.addSublist(si.getLevel0Files()); 518 sfs.addSublist(si.getStripes().get(0)); 519 verifyCompaction(policy, si, sfs, si.getStartRow(0), si.getEndRow(0), si.getStripeBoundaries()); 520 // If we cannot actually compact all files in some stripe, L0 is chosen. 521 si = createStripesWithSizes(6, 2, 522 new Long[][] { new Long[] { 10L, 1L, 1L, 1L, 1L }, new Long[] { 12L } }); 523 verifyCompaction(policy, si, si.getLevel0Files(), null, null, si.getStripeBoundaries()); 524 // even if L0 has no file 525 // if all files of stripe aren't selected, delete must not be dropped. 526 stripes = new Long[][] { new Long[] { 100L, 3L, 2L, 2L, 2L }, new Long[] { 6L } }; 527 si = createStripesWithSizes(0, 0, stripes); 528 List<HStoreFile> compactFile = new ArrayList<>(); 529 Iterator<HStoreFile> iter = si.getStripes().get(0).listIterator(1); 530 while (iter.hasNext()) { 531 compactFile.add(iter.next()); 532 } 533 verifyCompaction(policy, si, compactFile, false, 1, null, si.getStartRow(0), si.getEndRow(0), 534 true); 535 } 536 537 @TestTemplate 538 public void testCheckExpiredL0Compaction() throws Exception { 539 Configuration conf = HBaseConfiguration.create(); 540 int minL0 = 100; 541 conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, minL0); 542 conf.setInt(MIN_FILES_KEY, 4); 543 544 ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); 545 long now = defaultTtl + 2; 546 edge.setValue(now); 547 EnvironmentEdgeManager.injectEdge(edge); 548 HStoreFile expiredFile = createFile(10), notExpiredFile = createFile(10); 549 when(expiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl - 1); 550 when(notExpiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl + 1); 551 List<HStoreFile> expired = Lists.newArrayList(expiredFile, expiredFile); 552 List<HStoreFile> mixed = Lists.newArrayList(expiredFile, notExpiredFile); 553 554 StripeCompactionPolicy policy = 555 createPolicy(conf, defaultSplitSize, defaultSplitCount, defaultInitialCount, true); 556 // Merge expired if there are eligible stripes. 557 StripeCompactionPolicy.StripeInformationProvider si = 558 createStripesWithFiles(null, new ArrayList<>(), mixed); 559 assertFalse(policy.needsCompactions(si, al())); 560 561 List<HStoreFile> largeMixed = new ArrayList<>(); 562 for (int i = 0; i < minL0 - 1; i++) { 563 largeMixed.add(i % 2 == 0 ? notExpiredFile : expiredFile); 564 } 565 si = createStripesWithFiles(null, new ArrayList<>(), largeMixed); 566 assertFalse(policy.needsCompactions(si, al())); 567 568 si = createStripesWithFiles(null, new ArrayList<>(), expired); 569 assertFalse(policy.needsSingleStripeCompaction(si)); 570 assertFalse(policy.hasExpiredStripes(si)); 571 assertTrue(policy.allL0FilesExpired(si)); 572 assertTrue(policy.needsCompactions(si, al())); 573 } 574 575 /********* HELPER METHODS ************/ 576 private static StripeCompactionPolicy createPolicy(Configuration conf) throws Exception { 577 return createPolicy(conf, defaultSplitSize, defaultSplitCount, defaultInitialCount, false); 578 } 579 580 private static StripeCompactionPolicy createPolicy(Configuration conf, long splitSize, 581 float splitCount, int initialCount, boolean hasTtl) throws Exception { 582 conf.setLong(StripeStoreConfig.SIZE_TO_SPLIT_KEY, splitSize); 583 conf.setFloat(StripeStoreConfig.SPLIT_PARTS_KEY, splitCount); 584 conf.setInt(StripeStoreConfig.INITIAL_STRIPE_COUNT_KEY, initialCount); 585 StoreConfigInformation sci = mock(StoreConfigInformation.class); 586 when(sci.getStoreFileTtl()).thenReturn(hasTtl ? defaultTtl : Long.MAX_VALUE); 587 when(sci.getRegionInfo()).thenReturn(RegionInfoBuilder.FIRST_META_REGIONINFO); 588 StripeStoreConfig ssc = new StripeStoreConfig(conf, sci); 589 return new StripeCompactionPolicy(conf, sci, ssc); 590 } 591 592 private static ArrayList<HStoreFile> al(HStoreFile... sfs) { 593 return new ArrayList<>(Arrays.asList(sfs)); 594 } 595 596 private void verifyMergeCompatcion(StripeCompactionPolicy policy, StripeInformationProvider si, 597 int from, int to) throws Exception { 598 StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false); 599 Collection<HStoreFile> sfs = getAllFiles(si, from, to); 600 verifyCollectionsEqual(sfs, scr.getRequest().getFiles()); 601 602 // All the Stripes are expired, so the Compactor will not create any Writers. We need to create 603 // an empty file to preserve metadata 604 StripeCompactor sc = createCompactor(); 605 List<Path> paths = scr.execute(sc, NoLimitThroughputController.INSTANCE, null); 606 assertEquals(1, paths.size()); 607 } 608 609 /** 610 * Verify the compaction that includes several entire stripes. 611 * @param policy Policy to test. 612 * @param si Stripe information pre-set with stripes to test. 613 * @param from Starting stripe. 614 * @param to Ending stripe (inclusive). 615 * @param dropDeletes Whether to drop deletes from compaction range. 616 * @param count Expected # of resulting stripes, null if not checked. 617 * @param size Expected target stripe size, null if not checked. 618 */ 619 private void verifyWholeStripesCompaction(StripeCompactionPolicy policy, 620 StripeInformationProvider si, int from, int to, Boolean dropDeletes, Integer count, Long size, 621 boolean needsCompaction) throws IOException { 622 verifyCompaction(policy, si, getAllFiles(si, from, to), dropDeletes, count, size, 623 si.getStartRow(from), si.getEndRow(to), needsCompaction); 624 } 625 626 private void verifyWholeStripesCompaction(StripeCompactionPolicy policy, 627 StripeInformationProvider si, int from, int to, Boolean dropDeletes, Integer count, Long size) 628 throws IOException { 629 verifyWholeStripesCompaction(policy, si, from, to, dropDeletes, count, size, true); 630 } 631 632 private void verifySingleStripeCompaction(StripeCompactionPolicy policy, 633 StripeInformationProvider si, int index, Boolean dropDeletes) throws IOException { 634 verifyWholeStripesCompaction(policy, si, index, index, dropDeletes, 1, null, true); 635 } 636 637 /** 638 * Verify no compaction is needed or selected. 639 * @param policy Policy to test. 640 * @param si Stripe information pre-set with stripes to test. 641 */ 642 private void verifyNoCompaction(StripeCompactionPolicy policy, StripeInformationProvider si) 643 throws IOException { 644 assertNull(policy.selectCompaction(si, al(), false)); 645 assertFalse(policy.needsCompactions(si, al())); 646 } 647 648 /** 649 * Verify arbitrary compaction. 650 * @param policy Policy to test. 651 * @param si Stripe information pre-set with stripes to test. 652 * @param sfs Files that should be compacted. 653 * @param dropDeletesFrom Row from which to drop deletes. 654 * @param dropDeletesTo Row to which to drop deletes. 655 * @param boundaries Expected target stripe boundaries. 656 */ 657 private void verifyCompaction(StripeCompactionPolicy policy, StripeInformationProvider si, 658 Collection<HStoreFile> sfs, byte[] dropDeletesFrom, byte[] dropDeletesTo, 659 final List<byte[]> boundaries) throws Exception { 660 StripeCompactor sc = mock(StripeCompactor.class); 661 assertTrue(policy.needsCompactions(si, al())); 662 StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false); 663 verifyCollectionsEqual(sfs, scr.getRequest().getFiles()); 664 scr.execute(sc, NoLimitThroughputController.INSTANCE, null); 665 verify(sc, times(1)).compact(eq(scr.getRequest()), argThat(new ArgumentMatcher<List<byte[]>>() { 666 @Override 667 public boolean matches(List<byte[]> argument) { 668 List<byte[]> other = argument; 669 if (other.size() != boundaries.size()) { 670 return false; 671 } 672 for (int i = 0; i < other.size(); ++i) { 673 if (!Bytes.equals(other.get(i), boundaries.get(i))) { 674 return false; 675 } 676 } 677 return true; 678 } 679 }), dropDeletesFrom == null ? isNull(byte[].class) : aryEq(dropDeletesFrom), 680 dropDeletesTo == null ? isNull(byte[].class) : aryEq(dropDeletesTo), any(), any()); 681 } 682 683 /** 684 * Verify arbitrary compaction. 685 * @param policy Policy to test. 686 * @param si Stripe information pre-set with stripes to test. 687 * @param sfs Files that should be compacted. 688 * @param dropDeletes Whether to drop deletes from compaction range. 689 * @param count Expected # of resulting stripes, null if not checked. 690 * @param size Expected target stripe size, null if not checked. 691 * @param start Left boundary of the compaction. 692 * @param end Right boundary of the compaction. 693 */ 694 private void verifyCompaction(StripeCompactionPolicy policy, StripeInformationProvider si, 695 Collection<HStoreFile> sfs, Boolean dropDeletes, Integer count, Long size, byte[] start, 696 byte[] end, boolean needsCompaction) throws IOException { 697 StripeCompactor sc = mock(StripeCompactor.class); 698 assertTrue(!needsCompaction || policy.needsCompactions(si, al())); 699 StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false); 700 verifyCollectionsEqual(sfs, scr.getRequest().getFiles()); 701 scr.execute(sc, NoLimitThroughputController.INSTANCE, null); 702 verify(sc, times(1)).compact(eq(scr.getRequest()), 703 count == null ? anyInt() : eq(count.intValue()), 704 size == null ? anyLong() : eq(size.longValue()), aryEq(start), aryEq(end), 705 dropDeletesMatcher(dropDeletes, start), dropDeletesMatcher(dropDeletes, end), any(), any()); 706 } 707 708 /** Verify arbitrary flush. */ 709 protected void verifyFlush(StripeCompactionPolicy policy, StripeInformationProvider si, 710 KeyValue[] input, KeyValue[][] expected, byte[][] boundaries) throws IOException { 711 StoreFileWritersCapture writers = new StoreFileWritersCapture(); 712 StripeStoreFlusher.StripeFlushRequest req = 713 policy.selectFlush(CellComparatorImpl.COMPARATOR, si, input.length); 714 StripeMultiFileWriter mw = req.createWriter(); 715 mw.init(null, writers); 716 for (KeyValue kv : input) { 717 mw.append(kv); 718 } 719 boolean hasMetadata = boundaries != null; 720 mw.commitWriters(0, false); 721 writers.verifyKvs(expected, true, hasMetadata); 722 if (hasMetadata) { 723 writers.verifyBoundaries(boundaries); 724 } 725 } 726 727 private byte[] dropDeletesMatcher(Boolean dropDeletes, byte[] value) { 728 return dropDeletes == null 729 ? any() 730 : (dropDeletes.booleanValue() ? aryEq(value) : isNull(byte[].class)); 731 } 732 733 private void verifyCollectionsEqual(Collection<HStoreFile> sfs, Collection<HStoreFile> scr) { 734 // Dumb. 735 assertEquals(sfs.size(), scr.size()); 736 assertTrue(scr.containsAll(sfs)); 737 } 738 739 private static List<HStoreFile> getAllFiles(StripeInformationProvider si, int fromStripe, 740 int toStripe) { 741 ArrayList<HStoreFile> expected = new ArrayList<>(); 742 for (int i = fromStripe; i <= toStripe; ++i) { 743 expected.addAll(si.getStripes().get(i)); 744 } 745 return expected; 746 } 747 748 /** 749 * @param l0Count Number of L0 files. 750 * @param boundaries Target boundaries. 751 * @return Mock stripes. 752 */ 753 private static StripeInformationProvider createStripes(int l0Count, byte[]... boundaries) 754 throws Exception { 755 List<Long> l0Sizes = new ArrayList<>(); 756 for (int i = 0; i < l0Count; ++i) { 757 l0Sizes.add(5L); 758 } 759 List<List<Long>> sizes = new ArrayList<>(); 760 for (int i = 0; i <= boundaries.length; ++i) { 761 sizes.add(Arrays.asList(Long.valueOf(5))); 762 } 763 return createStripes(Arrays.asList(boundaries), sizes, l0Sizes); 764 } 765 766 /** 767 * @param l0Count Number of L0 files. 768 * @param l0Size Size of each file. 769 * @return Mock stripes. 770 */ 771 private static StripeInformationProvider createStripesL0Only(int l0Count, long l0Size) 772 throws Exception { 773 List<Long> l0Sizes = new ArrayList<>(); 774 for (int i = 0; i < l0Count; ++i) { 775 l0Sizes.add(l0Size); 776 } 777 return createStripes(null, new ArrayList<>(), l0Sizes); 778 } 779 780 /** 781 * @param l0Count Number of L0 files. 782 * @param l0Size Size of each file. 783 * @param sizes Sizes of the files; each sub-array representing a stripe. 784 * @return Mock stripes. 785 */ 786 private static StripeInformationProvider createStripesWithSizes(int l0Count, long l0Size, 787 Long[]... sizes) throws Exception { 788 ArrayList<List<Long>> sizeList = new ArrayList<>(sizes.length); 789 for (Long[] size : sizes) { 790 sizeList.add(Arrays.asList(size)); 791 } 792 return createStripesWithSizes(l0Count, l0Size, sizeList); 793 } 794 795 private static StripeInformationProvider createStripesWithSizes(int l0Count, long l0Size, 796 List<List<Long>> sizes) throws Exception { 797 List<byte[]> boundaries = createBoundaries(sizes.size()); 798 List<Long> l0Sizes = new ArrayList<>(); 799 for (int i = 0; i < l0Count; ++i) { 800 l0Sizes.add(l0Size); 801 } 802 return createStripes(boundaries, sizes, l0Sizes); 803 } 804 805 private static List<byte[]> createBoundaries(int stripeCount) { 806 byte[][] keys = new byte[][] { KEY_A, KEY_B, KEY_C, KEY_D, KEY_E }; 807 assert stripeCount <= keys.length + 1; 808 List<byte[]> boundaries = new ArrayList<>(); 809 boundaries.addAll(Arrays.asList(keys).subList(0, stripeCount - 1)); 810 return boundaries; 811 } 812 813 private static StripeInformationProvider createStripes(List<byte[]> boundaries, 814 List<List<Long>> stripeSizes, List<Long> l0Sizes) throws Exception { 815 List<List<HStoreFile>> stripeFiles = new ArrayList<>(stripeSizes.size()); 816 for (List<Long> sizes : stripeSizes) { 817 List<HStoreFile> sfs = new ArrayList<>(sizes.size()); 818 for (Long size : sizes) { 819 sfs.add(createFile(size)); 820 } 821 stripeFiles.add(sfs); 822 } 823 List<HStoreFile> l0Files = new ArrayList<>(); 824 for (Long size : l0Sizes) { 825 l0Files.add(createFile(size)); 826 } 827 return createStripesWithFiles(boundaries, stripeFiles, l0Files); 828 } 829 830 /** 831 * This method actually does all the work. 832 */ 833 private static StripeInformationProvider createStripesWithFiles(List<byte[]> boundaries, 834 List<List<HStoreFile>> stripeFiles, List<HStoreFile> l0Files) throws Exception { 835 ArrayList<ImmutableList<HStoreFile>> stripes = new ArrayList<>(); 836 ArrayList<byte[]> boundariesList = new ArrayList<>(); 837 StripeInformationProvider si = mock(StripeInformationProvider.class); 838 if (!stripeFiles.isEmpty()) { 839 assert stripeFiles.size() == (boundaries.size() + 1); 840 boundariesList.add(OPEN_KEY); 841 for (int i = 0; i <= boundaries.size(); ++i) { 842 byte[] startKey = ((i == 0) ? OPEN_KEY : boundaries.get(i - 1)); 843 byte[] endKey = ((i == boundaries.size()) ? OPEN_KEY : boundaries.get(i)); 844 boundariesList.add(endKey); 845 for (HStoreFile sf : stripeFiles.get(i)) { 846 setFileStripe(sf, startKey, endKey); 847 } 848 stripes.add(ImmutableList.copyOf(stripeFiles.get(i))); 849 when(si.getStartRow(eq(i))).thenReturn(startKey); 850 when(si.getEndRow(eq(i))).thenReturn(endKey); 851 } 852 } 853 ConcatenatedLists<HStoreFile> sfs = new ConcatenatedLists<>(); 854 sfs.addAllSublists(stripes); 855 sfs.addSublist(l0Files); 856 when(si.getStoreFiles()).thenReturn(sfs); 857 when(si.getStripes()).thenReturn(stripes); 858 when(si.getStripeBoundaries()).thenReturn(boundariesList); 859 when(si.getStripeCount()).thenReturn(stripes.size()); 860 when(si.getLevel0Files()).thenReturn(l0Files); 861 return si; 862 } 863 864 private static HStoreFile createFile(long size) throws Exception { 865 HStoreFile sf = mock(HStoreFile.class); 866 when(sf.getPath()).thenReturn(new Path("moo")); 867 StoreFileReader r = mock(StoreFileReader.class); 868 when(r.getEntries()).thenReturn(size); 869 when(r.length()).thenReturn(size); 870 when(r.getBloomFilterType()).thenReturn(BloomType.NONE); 871 when(r.getHFileReader()).thenReturn(mock(HFile.Reader.class)); 872 when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong(), anyLong(), 873 anyBoolean())).thenReturn(mock(StoreFileScanner.class)); 874 when(sf.getReader()).thenReturn(r); 875 when(sf.getBulkLoadTimestamp()).thenReturn(OptionalLong.empty()); 876 when(r.getMaxTimestamp()).thenReturn(TimeRange.INITIAL_MAX_TIMESTAMP); 877 return sf; 878 } 879 880 private static HStoreFile createFile() throws Exception { 881 return createFile(0); 882 } 883 884 private static void setFileStripe(HStoreFile sf, byte[] startKey, byte[] endKey) { 885 when(sf.getMetadataValue(StripeStoreFileManager.STRIPE_START_KEY)).thenReturn(startKey); 886 when(sf.getMetadataValue(StripeStoreFileManager.STRIPE_END_KEY)).thenReturn(endKey); 887 } 888 889 private StripeCompactor createCompactor() throws Exception { 890 ColumnFamilyDescriptor familyDescriptor = 891 ColumnFamilyDescriptorBuilder.of(Bytes.toBytes("foo")); 892 StoreFileWritersCapture writers = new StoreFileWritersCapture(); 893 HStore store = mock(HStore.class); 894 RegionInfo info = mock(RegionInfo.class); 895 when(info.getRegionNameAsString()).thenReturn("testRegion"); 896 when(store.getColumnFamilyDescriptor()).thenReturn(familyDescriptor); 897 when(store.getRegionInfo()).thenReturn(info); 898 StoreEngine storeEngine = mock(StoreEngine.class); 899 when(storeEngine.createWriter(any(CreateStoreFileWriterParams.class))).thenAnswer(writers); 900 when(store.getStoreEngine()).thenReturn(storeEngine); 901 902 Configuration conf = HBaseConfiguration.create(); 903 conf.setBoolean("hbase.regionserver.compaction.private.readers", usePrivateReaders); 904 final Scanner scanner = new Scanner(); 905 return new StripeCompactor(conf, store) { 906 @Override 907 protected InternalScanner createScanner(HStore store, ScanInfo scanInfo, 908 List<StoreFileScanner> scanners, long smallestReadPoint, long earliestPutTs, 909 byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException { 910 return scanner; 911 } 912 913 @Override 914 protected InternalScanner createScanner(HStore store, ScanInfo scanInfo, 915 List<StoreFileScanner> scanners, ScanType scanType, long smallestReadPoint, 916 long earliestPutTs) throws IOException { 917 return scanner; 918 } 919 }; 920 } 921 922 private static class Scanner implements InternalScanner { 923 private final ArrayList<KeyValue> kvs; 924 925 public Scanner(KeyValue... kvs) { 926 this.kvs = new ArrayList<>(Arrays.asList(kvs)); 927 } 928 929 @Override 930 public boolean next(List<? super ExtendedCell> result, ScannerContext scannerContext) 931 throws IOException { 932 if (kvs.isEmpty()) { 933 return false; 934 } 935 result.add(kvs.remove(0)); 936 return !kvs.isEmpty(); 937 } 938 939 @Override 940 public void close() throws IOException { 941 } 942 } 943}