001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.compactions; 019 020import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertFalse; 023import static org.junit.Assert.assertNull; 024import static org.junit.Assert.assertTrue; 025import static org.mockito.AdditionalMatchers.aryEq; 026import static org.mockito.Matchers.any; 027import static org.mockito.Matchers.anyBoolean; 028import static org.mockito.Matchers.anyInt; 029import static org.mockito.Matchers.anyLong; 030import static org.mockito.Matchers.argThat; 031import static org.mockito.Matchers.eq; 032import static org.mockito.Matchers.isNull; 033import static org.mockito.Mockito.mock; 034import static org.mockito.Mockito.only; 035import static org.mockito.Mockito.times; 036import static org.mockito.Mockito.verify; 037import static org.mockito.Mockito.when; 038import java.io.IOException; 039import java.util.ArrayList; 040import java.util.Arrays; 041import java.util.Collection; 042import java.util.Iterator; 043import java.util.List; 044import java.util.OptionalLong; 045import org.apache.hadoop.conf.Configuration; 046import org.apache.hadoop.fs.Path; 047import org.apache.hadoop.hbase.Cell; 048import org.apache.hadoop.hbase.CellComparatorImpl; 049import org.apache.hadoop.hbase.HBaseClassTestRule; 050import org.apache.hadoop.hbase.HBaseConfiguration; 051import org.apache.hadoop.hbase.HColumnDescriptor; 052import org.apache.hadoop.hbase.HRegionInfo; 053import org.apache.hadoop.hbase.KeyValue; 054import org.apache.hadoop.hbase.io.hfile.HFile; 055import org.apache.hadoop.hbase.regionserver.BloomType; 056import org.apache.hadoop.hbase.regionserver.HStore; 057import org.apache.hadoop.hbase.regionserver.HStoreFile; 058import org.apache.hadoop.hbase.regionserver.InternalScanner; 059import org.apache.hadoop.hbase.regionserver.ScanInfo; 060import org.apache.hadoop.hbase.regionserver.ScanType; 061import org.apache.hadoop.hbase.regionserver.ScannerContext; 062import org.apache.hadoop.hbase.regionserver.StoreConfigInformation; 063import org.apache.hadoop.hbase.regionserver.StoreFileReader; 064import org.apache.hadoop.hbase.regionserver.StoreFileScanner; 065import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter; 066import org.apache.hadoop.hbase.regionserver.StripeStoreConfig; 067import org.apache.hadoop.hbase.regionserver.StripeStoreFileManager; 068import org.apache.hadoop.hbase.regionserver.StripeStoreFlusher; 069import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy.StripeInformationProvider; 070import org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.StoreFileWritersCapture; 071import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; 072import org.apache.hadoop.hbase.testclassification.MediumTests; 073import org.apache.hadoop.hbase.testclassification.RegionServerTests; 074import org.apache.hadoop.hbase.util.Bytes; 075import org.apache.hadoop.hbase.util.ConcatenatedLists; 076import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 077import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; 078import org.junit.ClassRule; 079import org.junit.Test; 080import org.junit.experimental.categories.Category; 081import org.junit.runner.RunWith; 082import org.junit.runners.Parameterized; 083import org.junit.runners.Parameterized.Parameter; 084import org.junit.runners.Parameterized.Parameters; 085import org.mockito.ArgumentMatcher; 086import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 087import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 088 089@RunWith(Parameterized.class) 090@Category({RegionServerTests.class, MediumTests.class}) 091public class TestStripeCompactionPolicy { 092 093 @ClassRule 094 public static final HBaseClassTestRule CLASS_RULE = 095 HBaseClassTestRule.forClass(TestStripeCompactionPolicy.class); 096 097 private static final byte[] KEY_A = Bytes.toBytes("aaa"); 098 private static final byte[] KEY_B = Bytes.toBytes("bbb"); 099 private static final byte[] KEY_C = Bytes.toBytes("ccc"); 100 private static final byte[] KEY_D = Bytes.toBytes("ddd"); 101 private static final byte[] KEY_E = Bytes.toBytes("eee"); 102 private static final KeyValue KV_A = new KeyValue(KEY_A, 0L); 103 private static final KeyValue KV_B = new KeyValue(KEY_B, 0L); 104 private static final KeyValue KV_C = new KeyValue(KEY_C, 0L); 105 private static final KeyValue KV_D = new KeyValue(KEY_D, 0L); 106 private static final KeyValue KV_E = new KeyValue(KEY_E, 0L); 107 108 109 private static long defaultSplitSize = 18; 110 private static float defaultSplitCount = 1.8F; 111 private final static int defaultInitialCount = 1; 112 private static long defaultTtl = 1000 * 1000; 113 114 @Parameters(name = "{index}: usePrivateReaders={0}") 115 public static Iterable<Object[]> data() { 116 return Arrays.asList(new Object[] { true }, new Object[] { false }); 117 } 118 119 @Parameter 120 public boolean usePrivateReaders; 121 122 @Test 123 public void testNoStripesFromFlush() throws Exception { 124 Configuration conf = HBaseConfiguration.create(); 125 conf.setBoolean(StripeStoreConfig.FLUSH_TO_L0_KEY, true); 126 StripeCompactionPolicy policy = createPolicy(conf); 127 StripeInformationProvider si = createStripesL0Only(0, 0); 128 129 KeyValue[] input = new KeyValue[] { KV_A, KV_B, KV_C, KV_D, KV_E }; 130 KeyValue[][] expected = new KeyValue[][] { input }; 131 verifyFlush(policy, si, input, expected, null); 132 } 133 134 @Test 135 public void testOldStripesFromFlush() throws Exception { 136 StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create()); 137 StripeInformationProvider si = createStripes(0, KEY_C, KEY_D); 138 139 KeyValue[] input = new KeyValue[] { KV_B, KV_C, KV_C, KV_D, KV_E }; 140 KeyValue[][] expected = new KeyValue[][] { new KeyValue[] { KV_B }, 141 new KeyValue[] { KV_C, KV_C }, new KeyValue[] { KV_D, KV_E } }; 142 verifyFlush(policy, si, input, expected, new byte[][] { OPEN_KEY, KEY_C, KEY_D, OPEN_KEY }); 143 } 144 145 @Test 146 public void testNewStripesFromFlush() throws Exception { 147 StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create()); 148 StripeInformationProvider si = createStripesL0Only(0, 0); 149 KeyValue[] input = new KeyValue[] { KV_B, KV_C, KV_C, KV_D, KV_E }; 150 // Starts with one stripe; unlike flush results, must have metadata 151 KeyValue[][] expected = new KeyValue[][] { input }; 152 verifyFlush(policy, si, input, expected, new byte[][] { OPEN_KEY, OPEN_KEY }); 153 } 154 155 @Test 156 public void testSingleStripeCompaction() throws Exception { 157 // Create a special policy that only compacts single stripes, using standard methods. 158 Configuration conf = HBaseConfiguration.create(); 159 // Test depends on this not being set to pass. Default breaks test. TODO: Revisit. 160 conf.unset("hbase.hstore.compaction.min.size"); 161 conf.setFloat(CompactionConfiguration.HBASE_HSTORE_COMPACTION_RATIO_KEY, 1.0F); 162 conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 3); 163 conf.setInt(StripeStoreConfig.MAX_FILES_KEY, 4); 164 conf.setLong(StripeStoreConfig.SIZE_TO_SPLIT_KEY, 1000); // make sure the are no splits 165 StoreConfigInformation sci = mock(StoreConfigInformation.class); 166 when(sci.getRegionInfo()).thenReturn(HRegionInfo.FIRST_META_REGIONINFO); 167 StripeStoreConfig ssc = new StripeStoreConfig(conf, sci); 168 StripeCompactionPolicy policy = new StripeCompactionPolicy(conf, sci, ssc) { 169 @Override 170 public StripeCompactionRequest selectCompaction(StripeInformationProvider si, 171 List<HStoreFile> filesCompacting, boolean isOffpeak) throws IOException { 172 if (!filesCompacting.isEmpty()) { 173 return null; 174 } 175 return selectSingleStripeCompaction(si, false, false, isOffpeak); 176 } 177 178 @Override 179 public boolean needsCompactions( 180 StripeInformationProvider si, List<HStoreFile> filesCompacting) { 181 if (!filesCompacting.isEmpty()) { 182 return false; 183 } 184 return needsSingleStripeCompaction(si); 185 } 186 }; 187 188 // No compaction due to min files or ratio 189 StripeInformationProvider si = createStripesWithSizes(0, 0, 190 new Long[] { 2L }, new Long[] { 3L, 3L }, new Long[] { 5L, 1L }); 191 verifyNoCompaction(policy, si); 192 // No compaction due to min files or ratio - will report needed, but not do any. 193 si = createStripesWithSizes(0, 0, 194 new Long[] { 2L }, new Long[] { 3L, 3L }, new Long[] { 5L, 1L, 1L }); 195 assertNull(policy.selectCompaction(si, al(), false)); 196 assertTrue(policy.needsCompactions(si, al())); 197 // One stripe has possible compaction 198 si = createStripesWithSizes(0, 0, 199 new Long[] { 2L }, new Long[] { 3L, 3L }, new Long[] { 5L, 4L, 3L }); 200 verifySingleStripeCompaction(policy, si, 2, null); 201 // Several stripes have possible compactions; choose best quality (removes most files) 202 si = createStripesWithSizes(0, 0, 203 new Long[] { 3L, 2L, 2L }, new Long[] { 2L, 2L, 1L }, new Long[] { 3L, 2L, 2L, 1L }); 204 verifySingleStripeCompaction(policy, si, 2, null); 205 si = createStripesWithSizes(0, 0, 206 new Long[] { 5L }, new Long[] { 3L, 2L, 2L, 1L }, new Long[] { 3L, 2L, 2L }); 207 verifySingleStripeCompaction(policy, si, 1, null); 208 // Or with smallest files, if the count is the same 209 si = createStripesWithSizes(0, 0, 210 new Long[] { 3L, 3L, 3L }, new Long[] { 3L, 1L, 2L }, new Long[] { 3L, 2L, 2L }); 211 verifySingleStripeCompaction(policy, si, 1, null); 212 // Verify max count is respected. 213 si = createStripesWithSizes(0, 0, new Long[] { 5L }, new Long[] { 5L, 4L, 4L, 4L, 4L }); 214 List<HStoreFile> sfs = si.getStripes().get(1).subList(1, 5); 215 verifyCompaction(policy, si, sfs, null, 1, null, si.getStartRow(1), si.getEndRow(1), true); 216 // Verify ratio is applied. 217 si = createStripesWithSizes(0, 0, new Long[] { 5L }, new Long[] { 50L, 4L, 4L, 4L, 4L }); 218 sfs = si.getStripes().get(1).subList(1, 5); 219 verifyCompaction(policy, si, sfs, null, 1, null, si.getStartRow(1), si.getEndRow(1), true); 220 } 221 222 @Test 223 public void testWithParallelCompaction() throws Exception { 224 // TODO: currently only one compaction at a time per store is allowed. If this changes, 225 // the appropriate file exclusion testing would need to be done in respective tests. 226 assertNull(createPolicy(HBaseConfiguration.create()).selectCompaction( 227 mock(StripeInformationProvider.class), al(createFile()), false)); 228 } 229 230 @Test 231 public void testWithReferences() throws Exception { 232 StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create()); 233 StripeCompactor sc = mock(StripeCompactor.class); 234 HStoreFile ref = createFile(); 235 when(ref.isReference()).thenReturn(true); 236 StripeInformationProvider si = mock(StripeInformationProvider.class); 237 Collection<HStoreFile> sfs = al(ref, createFile()); 238 when(si.getStorefiles()).thenReturn(sfs); 239 240 assertTrue(policy.needsCompactions(si, al())); 241 StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false); 242 // UnmodifiableCollection does not implement equals so we need to change it here to a 243 // collection that implements it. 244 assertEquals(si.getStorefiles(), new ArrayList<>(scr.getRequest().getFiles())); 245 scr.execute(sc, NoLimitThroughputController.INSTANCE, null); 246 verify(sc, only()).compact(eq(scr.getRequest()), anyInt(), anyLong(), aryEq(OPEN_KEY), 247 aryEq(OPEN_KEY), aryEq(OPEN_KEY), aryEq(OPEN_KEY), 248 any(), any()); 249 } 250 251 @Test 252 public void testInitialCountFromL0() throws Exception { 253 Configuration conf = HBaseConfiguration.create(); 254 conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 2); 255 StripeCompactionPolicy policy = createPolicy( 256 conf, defaultSplitSize, defaultSplitCount, 2, false); 257 StripeCompactionPolicy.StripeInformationProvider si = createStripesL0Only(3, 8); 258 verifyCompaction(policy, si, si.getStorefiles(), true, 2, 12L, OPEN_KEY, OPEN_KEY, true); 259 si = createStripesL0Only(3, 10); // If result would be too large, split into smaller parts. 260 verifyCompaction(policy, si, si.getStorefiles(), true, 3, 10L, OPEN_KEY, OPEN_KEY, true); 261 policy = createPolicy(conf, defaultSplitSize, defaultSplitCount, 6, false); 262 verifyCompaction(policy, si, si.getStorefiles(), true, 6, 5L, OPEN_KEY, OPEN_KEY, true); 263 } 264 265 @Test 266 public void testExistingStripesFromL0() throws Exception { 267 Configuration conf = HBaseConfiguration.create(); 268 conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 3); 269 StripeCompactionPolicy.StripeInformationProvider si = createStripes(3, KEY_A); 270 verifyCompaction( 271 createPolicy(conf), si, si.getLevel0Files(), null, null, si.getStripeBoundaries()); 272 } 273 274 @Test 275 public void testNothingToCompactFromL0() throws Exception { 276 Configuration conf = HBaseConfiguration.create(); 277 conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 4); 278 StripeCompactionPolicy.StripeInformationProvider si = createStripesL0Only(3, 10); 279 StripeCompactionPolicy policy = createPolicy(conf); 280 verifyNoCompaction(policy, si); 281 282 si = createStripes(3, KEY_A); 283 verifyNoCompaction(policy, si); 284 } 285 286 @Test 287 public void testSplitOffStripe() throws Exception { 288 Configuration conf = HBaseConfiguration.create(); 289 // Test depends on this not being set to pass. Default breaks test. TODO: Revisit. 290 conf.unset("hbase.hstore.compaction.min.size"); 291 // First test everything with default split count of 2, then split into more. 292 conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 2); 293 Long[] toSplit = new Long[] { defaultSplitSize - 2, 1L, 1L }; 294 Long[] noSplit = new Long[] { defaultSplitSize - 2, 1L }; 295 long splitTargetSize = (long)(defaultSplitSize / defaultSplitCount); 296 // Don't split if not eligible for compaction. 297 StripeCompactionPolicy.StripeInformationProvider si = 298 createStripesWithSizes(0, 0, new Long[] { defaultSplitSize - 2, 2L }); 299 assertNull(createPolicy(conf).selectCompaction(si, al(), false)); 300 // Make sure everything is eligible. 301 conf.setFloat(CompactionConfiguration.HBASE_HSTORE_COMPACTION_RATIO_KEY, 500f); 302 StripeCompactionPolicy policy = createPolicy(conf); 303 verifyWholeStripesCompaction(policy, si, 0, 0, null, 2, splitTargetSize); 304 // Add some extra stripes... 305 si = createStripesWithSizes(0, 0, noSplit, noSplit, toSplit); 306 verifyWholeStripesCompaction(policy, si, 2, 2, null, 2, splitTargetSize); 307 // In the middle. 308 si = createStripesWithSizes(0, 0, noSplit, toSplit, noSplit); 309 verifyWholeStripesCompaction(policy, si, 1, 1, null, 2, splitTargetSize); 310 // No split-off with different config (larger split size). 311 // However, in this case some eligible stripe will just be compacted alone. 312 StripeCompactionPolicy specPolicy = createPolicy( 313 conf, defaultSplitSize + 1, defaultSplitCount, defaultInitialCount, false); 314 verifySingleStripeCompaction(specPolicy, si, 1, null); 315 } 316 317 @Test 318 public void testSplitOffStripeOffPeak() throws Exception { 319 // for HBASE-11439 320 Configuration conf = HBaseConfiguration.create(); 321 322 // Test depends on this not being set to pass. Default breaks test. TODO: Revisit. 323 conf.unset("hbase.hstore.compaction.min.size"); 324 325 conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 2); 326 // Select the last 2 files. 327 StripeCompactionPolicy.StripeInformationProvider si = 328 createStripesWithSizes(0, 0, new Long[] { defaultSplitSize - 2, 1L, 1L }); 329 assertEquals(2, createPolicy(conf).selectCompaction(si, al(), false).getRequest().getFiles() 330 .size()); 331 // Make sure everything is eligible in offpeak. 332 conf.setFloat("hbase.hstore.compaction.ratio.offpeak", 500f); 333 assertEquals(3, createPolicy(conf).selectCompaction(si, al(), true).getRequest().getFiles() 334 .size()); 335 } 336 337 @Test 338 public void testSplitOffStripeDropDeletes() throws Exception { 339 Configuration conf = HBaseConfiguration.create(); 340 conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 2); 341 StripeCompactionPolicy policy = createPolicy(conf); 342 Long[] toSplit = new Long[] { defaultSplitSize / 2, defaultSplitSize / 2 }; 343 Long[] noSplit = new Long[] { 1L }; 344 long splitTargetSize = (long)(defaultSplitSize / defaultSplitCount); 345 346 // Verify the deletes can be dropped if there are no L0 files. 347 StripeCompactionPolicy.StripeInformationProvider si = 348 createStripesWithSizes(0, 0, noSplit, toSplit); 349 verifyWholeStripesCompaction(policy, si, 1, 1, true, null, splitTargetSize); 350 // But cannot be dropped if there are. 351 si = createStripesWithSizes(2, 2, noSplit, toSplit); 352 verifyWholeStripesCompaction(policy, si, 1, 1, false, null, splitTargetSize); 353 } 354 355 @SuppressWarnings("unchecked") 356 @Test 357 public void testMergeExpiredFiles() throws Exception { 358 ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); 359 long now = defaultTtl + 2; 360 edge.setValue(now); 361 EnvironmentEdgeManager.injectEdge(edge); 362 try { 363 HStoreFile expiredFile = createFile(), notExpiredFile = createFile(); 364 when(expiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl - 1); 365 when(notExpiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl + 1); 366 List<HStoreFile> expired = Lists.newArrayList(expiredFile, expiredFile); 367 List<HStoreFile> notExpired = Lists.newArrayList(notExpiredFile, notExpiredFile); 368 List<HStoreFile> mixed = Lists.newArrayList(expiredFile, notExpiredFile); 369 370 StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create(), 371 defaultSplitSize, defaultSplitCount, defaultInitialCount, true); 372 // Merge expired if there are eligible stripes. 373 StripeCompactionPolicy.StripeInformationProvider si = 374 createStripesWithFiles(expired, expired, expired); 375 verifyWholeStripesCompaction(policy, si, 0, 2, null, 1, Long.MAX_VALUE, false); 376 // Don't merge if nothing expired. 377 si = createStripesWithFiles(notExpired, notExpired, notExpired); 378 assertNull(policy.selectCompaction(si, al(), false)); 379 // Merge one expired stripe with next. 380 si = createStripesWithFiles(notExpired, expired, notExpired); 381 verifyWholeStripesCompaction(policy, si, 1, 2, null, 1, Long.MAX_VALUE, false); 382 // Merge the biggest run out of multiple options. 383 // Merge one expired stripe with next. 384 si = createStripesWithFiles(notExpired, expired, notExpired, expired, expired, notExpired); 385 verifyWholeStripesCompaction(policy, si, 3, 4, null, 1, Long.MAX_VALUE, false); 386 // Stripe with a subset of expired files is not merged. 387 si = createStripesWithFiles(expired, expired, notExpired, expired, mixed); 388 verifyWholeStripesCompaction(policy, si, 0, 1, null, 1, Long.MAX_VALUE, false); 389 } finally { 390 EnvironmentEdgeManager.reset(); 391 } 392 } 393 394 @SuppressWarnings("unchecked") 395 @Test 396 public void testMergeExpiredStripes() throws Exception { 397 // HBASE-11397 398 ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); 399 long now = defaultTtl + 2; 400 edge.setValue(now); 401 EnvironmentEdgeManager.injectEdge(edge); 402 try { 403 HStoreFile expiredFile = createFile(), notExpiredFile = createFile(); 404 when(expiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl - 1); 405 when(notExpiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl + 1); 406 List<HStoreFile> expired = Lists.newArrayList(expiredFile, expiredFile); 407 List<HStoreFile> notExpired = Lists.newArrayList(notExpiredFile, notExpiredFile); 408 409 StripeCompactionPolicy policy = 410 createPolicy(HBaseConfiguration.create(), defaultSplitSize, defaultSplitCount, 411 defaultInitialCount, true); 412 413 // Merge all three expired stripes into one. 414 StripeCompactionPolicy.StripeInformationProvider si = 415 createStripesWithFiles(expired, expired, expired); 416 verifyMergeCompatcion(policy, si, 0, 2); 417 418 // Merge two adjacent expired stripes into one. 419 si = createStripesWithFiles(notExpired, expired, notExpired, expired, expired, notExpired); 420 verifyMergeCompatcion(policy, si, 3, 4); 421 } finally { 422 EnvironmentEdgeManager.reset(); 423 } 424 } 425 426 @SuppressWarnings("unchecked") 427 private static StripeCompactionPolicy.StripeInformationProvider createStripesWithFiles( 428 List<HStoreFile>... stripeFiles) throws Exception { 429 return createStripesWithFiles(createBoundaries(stripeFiles.length), 430 Lists.newArrayList(stripeFiles), new ArrayList<>()); 431 } 432 433 @Test 434 public void testSingleStripeDropDeletes() throws Exception { 435 Configuration conf = HBaseConfiguration.create(); 436 // Test depends on this not being set to pass. Default breaks test. TODO: Revisit. 437 conf.unset("hbase.hstore.compaction.min.size"); 438 StripeCompactionPolicy policy = createPolicy(conf); 439 // Verify the deletes can be dropped if there are no L0 files. 440 Long[][] stripes = new Long[][] { new Long[] { 3L, 2L, 2L, 2L }, new Long[] { 6L } }; 441 StripeInformationProvider si = createStripesWithSizes(0, 0, stripes); 442 verifySingleStripeCompaction(policy, si, 0, true); 443 // But cannot be dropped if there are. 444 si = createStripesWithSizes(2, 2, stripes); 445 verifySingleStripeCompaction(policy, si, 0, false); 446 // Unless there are enough to cause L0 compaction. 447 si = createStripesWithSizes(6, 2, stripes); 448 ConcatenatedLists<HStoreFile> sfs = new ConcatenatedLists<>(); 449 sfs.addSublist(si.getLevel0Files()); 450 sfs.addSublist(si.getStripes().get(0)); 451 verifyCompaction( 452 policy, si, sfs, si.getStartRow(0), si.getEndRow(0), si.getStripeBoundaries()); 453 // If we cannot actually compact all files in some stripe, L0 is chosen. 454 si = createStripesWithSizes(6, 2, 455 new Long[][] { new Long[] { 10L, 1L, 1L, 1L, 1L }, new Long[] { 12L } }); 456 verifyCompaction(policy, si, si.getLevel0Files(), null, null, si.getStripeBoundaries()); 457 // even if L0 has no file 458 // if all files of stripe aren't selected, delete must not be dropped. 459 stripes = new Long[][] { new Long[] { 100L, 3L, 2L, 2L, 2L }, new Long[] { 6L } }; 460 si = createStripesWithSizes(0, 0, stripes); 461 List<HStoreFile> compactFile = new ArrayList<>(); 462 Iterator<HStoreFile> iter = si.getStripes().get(0).listIterator(1); 463 while (iter.hasNext()) { 464 compactFile.add(iter.next()); 465 } 466 verifyCompaction(policy, si, compactFile, false, 1, null, si.getStartRow(0), si.getEndRow(0), 467 true); 468 } 469 470 /********* HELPER METHODS ************/ 471 private static StripeCompactionPolicy createPolicy( 472 Configuration conf) throws Exception { 473 return createPolicy(conf, defaultSplitSize, defaultSplitCount, defaultInitialCount, false); 474 } 475 476 private static StripeCompactionPolicy createPolicy(Configuration conf, 477 long splitSize, float splitCount, int initialCount, boolean hasTtl) throws Exception { 478 conf.setLong(StripeStoreConfig.SIZE_TO_SPLIT_KEY, splitSize); 479 conf.setFloat(StripeStoreConfig.SPLIT_PARTS_KEY, splitCount); 480 conf.setInt(StripeStoreConfig.INITIAL_STRIPE_COUNT_KEY, initialCount); 481 StoreConfigInformation sci = mock(StoreConfigInformation.class); 482 when(sci.getStoreFileTtl()).thenReturn(hasTtl ? defaultTtl : Long.MAX_VALUE); 483 when(sci.getRegionInfo()).thenReturn(HRegionInfo.FIRST_META_REGIONINFO); 484 StripeStoreConfig ssc = new StripeStoreConfig(conf, sci); 485 return new StripeCompactionPolicy(conf, sci, ssc); 486 } 487 488 private static ArrayList<HStoreFile> al(HStoreFile... sfs) { 489 return new ArrayList<>(Arrays.asList(sfs)); 490 } 491 492 private void verifyMergeCompatcion(StripeCompactionPolicy policy, StripeInformationProvider si, 493 int from, int to) throws Exception { 494 StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false); 495 Collection<HStoreFile> sfs = getAllFiles(si, from, to); 496 verifyCollectionsEqual(sfs, scr.getRequest().getFiles()); 497 498 // All the Stripes are expired, so the Compactor will not create any Writers. We need to create 499 // an empty file to preserve metadata 500 StripeCompactor sc = createCompactor(); 501 List<Path> paths = scr.execute(sc, NoLimitThroughputController.INSTANCE, null); 502 assertEquals(1, paths.size()); 503 } 504 505 /** 506 * Verify the compaction that includes several entire stripes. 507 * @param policy Policy to test. 508 * @param si Stripe information pre-set with stripes to test. 509 * @param from Starting stripe. 510 * @param to Ending stripe (inclusive). 511 * @param dropDeletes Whether to drop deletes from compaction range. 512 * @param count Expected # of resulting stripes, null if not checked. 513 * @param size Expected target stripe size, null if not checked. 514 */ 515 private void verifyWholeStripesCompaction(StripeCompactionPolicy policy, 516 StripeInformationProvider si, int from, int to, Boolean dropDeletes, 517 Integer count, Long size, boolean needsCompaction) throws IOException { 518 verifyCompaction(policy, si, getAllFiles(si, from, to), dropDeletes, 519 count, size, si.getStartRow(from), si.getEndRow(to), needsCompaction); 520 } 521 522 private void verifyWholeStripesCompaction(StripeCompactionPolicy policy, 523 StripeInformationProvider si, int from, int to, Boolean dropDeletes, 524 Integer count, Long size) throws IOException { 525 verifyWholeStripesCompaction(policy, si, from, to, dropDeletes, count, size, true); 526 } 527 528 private void verifySingleStripeCompaction(StripeCompactionPolicy policy, 529 StripeInformationProvider si, int index, Boolean dropDeletes) throws IOException { 530 verifyWholeStripesCompaction(policy, si, index, index, dropDeletes, 1, null, true); 531 } 532 533 /** 534 * Verify no compaction is needed or selected. 535 * @param policy Policy to test. 536 * @param si Stripe information pre-set with stripes to test. 537 */ 538 private void verifyNoCompaction( 539 StripeCompactionPolicy policy, StripeInformationProvider si) throws IOException { 540 assertNull(policy.selectCompaction(si, al(), false)); 541 assertFalse(policy.needsCompactions(si, al())); 542 } 543 544 /** 545 * Verify arbitrary compaction. 546 * @param policy Policy to test. 547 * @param si Stripe information pre-set with stripes to test. 548 * @param sfs Files that should be compacted. 549 * @param dropDeletesFrom Row from which to drop deletes. 550 * @param dropDeletesTo Row to which to drop deletes. 551 * @param boundaries Expected target stripe boundaries. 552 */ 553 private void verifyCompaction(StripeCompactionPolicy policy, StripeInformationProvider si, 554 Collection<HStoreFile> sfs, byte[] dropDeletesFrom, byte[] dropDeletesTo, 555 final List<byte[]> boundaries) throws Exception { 556 StripeCompactor sc = mock(StripeCompactor.class); 557 assertTrue(policy.needsCompactions(si, al())); 558 StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false); 559 verifyCollectionsEqual(sfs, scr.getRequest().getFiles()); 560 scr.execute(sc, NoLimitThroughputController.INSTANCE, null); 561 verify(sc, times(1)).compact(eq(scr.getRequest()), argThat(new ArgumentMatcher<List<byte[]>>() { 562 @Override 563 public boolean matches(List<byte[]> argument) { 564 List<byte[]> other = argument; 565 if (other.size() != boundaries.size()) { 566 return false; 567 } 568 for (int i = 0; i < other.size(); ++i) { 569 if (!Bytes.equals(other.get(i), boundaries.get(i))) { 570 return false; 571 } 572 } 573 return true; 574 } 575 }), dropDeletesFrom == null ? isNull(byte[].class) : aryEq(dropDeletesFrom), 576 dropDeletesTo == null ? isNull(byte[].class) : aryEq(dropDeletesTo), 577 any(), any()); 578 } 579 580 /** 581 * Verify arbitrary compaction. 582 * @param policy Policy to test. 583 * @param si Stripe information pre-set with stripes to test. 584 * @param sfs Files that should be compacted. 585 * @param dropDeletes Whether to drop deletes from compaction range. 586 * @param count Expected # of resulting stripes, null if not checked. 587 * @param size Expected target stripe size, null if not checked. 588 * @param start Left boundary of the compaction. 589 * @param end Right boundary of the compaction. 590 */ 591 private void verifyCompaction(StripeCompactionPolicy policy, StripeInformationProvider si, 592 Collection<HStoreFile> sfs, Boolean dropDeletes, Integer count, Long size, 593 byte[] start, byte[] end, boolean needsCompaction) throws IOException { 594 StripeCompactor sc = mock(StripeCompactor.class); 595 assertTrue(!needsCompaction || policy.needsCompactions(si, al())); 596 StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false); 597 verifyCollectionsEqual(sfs, scr.getRequest().getFiles()); 598 scr.execute(sc, NoLimitThroughputController.INSTANCE, null); 599 verify(sc, times(1)).compact(eq(scr.getRequest()), 600 count == null ? anyInt() : eq(count.intValue()), 601 size == null ? anyLong() : eq(size.longValue()), aryEq(start), aryEq(end), 602 dropDeletesMatcher(dropDeletes, start), dropDeletesMatcher(dropDeletes, end), 603 any(), any()); 604 } 605 606 /** Verify arbitrary flush. */ 607 protected void verifyFlush(StripeCompactionPolicy policy, StripeInformationProvider si, 608 KeyValue[] input, KeyValue[][] expected, byte[][] boundaries) throws IOException { 609 StoreFileWritersCapture writers = new StoreFileWritersCapture(); 610 StripeStoreFlusher.StripeFlushRequest req = 611 policy.selectFlush(CellComparatorImpl.COMPARATOR, si, input.length); 612 StripeMultiFileWriter mw = req.createWriter(); 613 mw.init(null, writers); 614 for (KeyValue kv : input) { 615 mw.append(kv); 616 } 617 boolean hasMetadata = boundaries != null; 618 mw.commitWriters(0, false); 619 writers.verifyKvs(expected, true, hasMetadata); 620 if (hasMetadata) { 621 writers.verifyBoundaries(boundaries); 622 } 623 } 624 625 626 private byte[] dropDeletesMatcher(Boolean dropDeletes, byte[] value) { 627 return dropDeletes == null ? any() 628 : (dropDeletes.booleanValue() ? aryEq(value) : isNull(byte[].class)); 629 } 630 631 private void verifyCollectionsEqual(Collection<HStoreFile> sfs, Collection<HStoreFile> scr) { 632 // Dumb. 633 assertEquals(sfs.size(), scr.size()); 634 assertTrue(scr.containsAll(sfs)); 635 } 636 637 private static List<HStoreFile> getAllFiles( 638 StripeInformationProvider si, int fromStripe, int toStripe) { 639 ArrayList<HStoreFile> expected = new ArrayList<>(); 640 for (int i = fromStripe; i <= toStripe; ++i) { 641 expected.addAll(si.getStripes().get(i)); 642 } 643 return expected; 644 } 645 646 /** 647 * @param l0Count Number of L0 files. 648 * @param boundaries Target boundaries. 649 * @return Mock stripes. 650 */ 651 private static StripeInformationProvider createStripes( 652 int l0Count, byte[]... boundaries) throws Exception { 653 List<Long> l0Sizes = new ArrayList<>(); 654 for (int i = 0; i < l0Count; ++i) { 655 l0Sizes.add(5L); 656 } 657 List<List<Long>> sizes = new ArrayList<>(); 658 for (int i = 0; i <= boundaries.length; ++i) { 659 sizes.add(Arrays.asList(Long.valueOf(5))); 660 } 661 return createStripes(Arrays.asList(boundaries), sizes, l0Sizes); 662 } 663 664 /** 665 * @param l0Count Number of L0 files. 666 * @param l0Size Size of each file. 667 * @return Mock stripes. 668 */ 669 private static StripeInformationProvider createStripesL0Only( 670 int l0Count, long l0Size) throws Exception { 671 List<Long> l0Sizes = new ArrayList<>(); 672 for (int i = 0; i < l0Count; ++i) { 673 l0Sizes.add(l0Size); 674 } 675 return createStripes(null, new ArrayList<>(), l0Sizes); 676 } 677 678 /** 679 * @param l0Count Number of L0 files. 680 * @param l0Size Size of each file. 681 * @param sizes Sizes of the files; each sub-array representing a stripe. 682 * @return Mock stripes. 683 */ 684 private static StripeInformationProvider createStripesWithSizes( 685 int l0Count, long l0Size, Long[]... sizes) throws Exception { 686 ArrayList<List<Long>> sizeList = new ArrayList<>(sizes.length); 687 for (Long[] size : sizes) { 688 sizeList.add(Arrays.asList(size)); 689 } 690 return createStripesWithSizes(l0Count, l0Size, sizeList); 691 } 692 693 private static StripeInformationProvider createStripesWithSizes( 694 int l0Count, long l0Size, List<List<Long>> sizes) throws Exception { 695 List<byte[]> boundaries = createBoundaries(sizes.size()); 696 List<Long> l0Sizes = new ArrayList<>(); 697 for (int i = 0; i < l0Count; ++i) { 698 l0Sizes.add(l0Size); 699 } 700 return createStripes(boundaries, sizes, l0Sizes); 701 } 702 703 private static List<byte[]> createBoundaries(int stripeCount) { 704 byte[][] keys = new byte[][] { KEY_A, KEY_B, KEY_C, KEY_D, KEY_E }; 705 assert stripeCount <= keys.length + 1; 706 List<byte[]> boundaries = new ArrayList<>(); 707 boundaries.addAll(Arrays.asList(keys).subList(0, stripeCount - 1)); 708 return boundaries; 709 } 710 711 private static StripeInformationProvider createStripes(List<byte[]> boundaries, 712 List<List<Long>> stripeSizes, List<Long> l0Sizes) throws Exception { 713 List<List<HStoreFile>> stripeFiles = new ArrayList<>(stripeSizes.size()); 714 for (List<Long> sizes : stripeSizes) { 715 List<HStoreFile> sfs = new ArrayList<>(sizes.size()); 716 for (Long size : sizes) { 717 sfs.add(createFile(size)); 718 } 719 stripeFiles.add(sfs); 720 } 721 List<HStoreFile> l0Files = new ArrayList<>(); 722 for (Long size : l0Sizes) { 723 l0Files.add(createFile(size)); 724 } 725 return createStripesWithFiles(boundaries, stripeFiles, l0Files); 726 } 727 728 /** 729 * This method actually does all the work. 730 */ 731 private static StripeInformationProvider createStripesWithFiles(List<byte[]> boundaries, 732 List<List<HStoreFile>> stripeFiles, List<HStoreFile> l0Files) throws Exception { 733 ArrayList<ImmutableList<HStoreFile>> stripes = new ArrayList<>(); 734 ArrayList<byte[]> boundariesList = new ArrayList<>(); 735 StripeInformationProvider si = mock(StripeInformationProvider.class); 736 if (!stripeFiles.isEmpty()) { 737 assert stripeFiles.size() == (boundaries.size() + 1); 738 boundariesList.add(OPEN_KEY); 739 for (int i = 0; i <= boundaries.size(); ++i) { 740 byte[] startKey = ((i == 0) ? OPEN_KEY : boundaries.get(i - 1)); 741 byte[] endKey = ((i == boundaries.size()) ? OPEN_KEY : boundaries.get(i)); 742 boundariesList.add(endKey); 743 for (HStoreFile sf : stripeFiles.get(i)) { 744 setFileStripe(sf, startKey, endKey); 745 } 746 stripes.add(ImmutableList.copyOf(stripeFiles.get(i))); 747 when(si.getStartRow(eq(i))).thenReturn(startKey); 748 when(si.getEndRow(eq(i))).thenReturn(endKey); 749 } 750 } 751 ConcatenatedLists<HStoreFile> sfs = new ConcatenatedLists<>(); 752 sfs.addAllSublists(stripes); 753 sfs.addSublist(l0Files); 754 when(si.getStorefiles()).thenReturn(sfs); 755 when(si.getStripes()).thenReturn(stripes); 756 when(si.getStripeBoundaries()).thenReturn(boundariesList); 757 when(si.getStripeCount()).thenReturn(stripes.size()); 758 when(si.getLevel0Files()).thenReturn(l0Files); 759 return si; 760 } 761 762 private static HStoreFile createFile(long size) throws Exception { 763 HStoreFile sf = mock(HStoreFile.class); 764 when(sf.getPath()).thenReturn(new Path("moo")); 765 StoreFileReader r = mock(StoreFileReader.class); 766 when(r.getEntries()).thenReturn(size); 767 when(r.length()).thenReturn(size); 768 when(r.getBloomFilterType()).thenReturn(BloomType.NONE); 769 when(r.getHFileReader()).thenReturn(mock(HFile.Reader.class)); 770 when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong(), anyLong(), 771 anyBoolean())).thenReturn(mock(StoreFileScanner.class)); 772 when(sf.getReader()).thenReturn(r); 773 when(sf.getBulkLoadTimestamp()).thenReturn(OptionalLong.empty()); 774 return sf; 775 } 776 777 private static HStoreFile createFile() throws Exception { 778 return createFile(0); 779 } 780 781 private static void setFileStripe(HStoreFile sf, byte[] startKey, byte[] endKey) { 782 when(sf.getMetadataValue(StripeStoreFileManager.STRIPE_START_KEY)).thenReturn(startKey); 783 when(sf.getMetadataValue(StripeStoreFileManager.STRIPE_END_KEY)).thenReturn(endKey); 784 } 785 786 private StripeCompactor createCompactor() throws Exception { 787 HColumnDescriptor col = new HColumnDescriptor(Bytes.toBytes("foo")); 788 StoreFileWritersCapture writers = new StoreFileWritersCapture(); 789 HStore store = mock(HStore.class); 790 HRegionInfo info = mock(HRegionInfo.class); 791 when(info.getRegionNameAsString()).thenReturn("testRegion"); 792 when(store.getColumnFamilyDescriptor()).thenReturn(col); 793 when(store.getRegionInfo()).thenReturn(info); 794 when( 795 store.createWriterInTmp(anyLong(), any(), anyBoolean(), 796 anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers); 797 when( 798 store.createWriterInTmp(anyLong(), any(), anyBoolean(), 799 anyBoolean(), anyBoolean(), anyBoolean(), anyLong())).thenAnswer(writers); 800 801 Configuration conf = HBaseConfiguration.create(); 802 conf.setBoolean("hbase.regionserver.compaction.private.readers", usePrivateReaders); 803 final Scanner scanner = new Scanner(); 804 return new StripeCompactor(conf, store) { 805 @Override 806 protected InternalScanner createScanner(HStore store, ScanInfo scanInfo, 807 List<StoreFileScanner> scanners, long smallestReadPoint, long earliestPutTs, 808 byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException { 809 return scanner; 810 } 811 812 @Override 813 protected InternalScanner createScanner(HStore store, ScanInfo scanInfo, 814 List<StoreFileScanner> scanners, ScanType scanType, long smallestReadPoint, 815 long earliestPutTs) throws IOException { 816 return scanner; 817 } 818 }; 819 } 820 821 private static class Scanner implements InternalScanner { 822 private final ArrayList<KeyValue> kvs; 823 824 public Scanner(KeyValue... kvs) { 825 this.kvs = new ArrayList<>(Arrays.asList(kvs)); 826 } 827 828 @Override 829 public boolean next(List<Cell> result, ScannerContext scannerContext) 830 throws IOException { 831 if (kvs.isEmpty()) { 832 return false; 833 } 834 result.add(kvs.remove(0)); 835 return !kvs.isEmpty(); 836 } 837 838 @Override 839 public void close() throws IOException { 840 } 841 } 842}