001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.compactions; 019 020import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertFalse; 023import static org.junit.Assert.assertNull; 024import static org.junit.Assert.assertTrue; 025import static org.mockito.AdditionalMatchers.aryEq; 026import static org.mockito.ArgumentMatchers.any; 027import static org.mockito.ArgumentMatchers.anyBoolean; 028import static org.mockito.ArgumentMatchers.anyInt; 029import static org.mockito.ArgumentMatchers.anyLong; 030import static org.mockito.ArgumentMatchers.anyString; 031import static org.mockito.ArgumentMatchers.argThat; 032import static org.mockito.ArgumentMatchers.eq; 033import static org.mockito.ArgumentMatchers.isNull; 034import static org.mockito.Mockito.mock; 035import static org.mockito.Mockito.only; 036import static org.mockito.Mockito.times; 037import static org.mockito.Mockito.verify; 038import static org.mockito.Mockito.when; 039import java.io.IOException; 040import java.util.ArrayList; 041import java.util.Arrays; 042import java.util.Collection; 043import java.util.Iterator; 044import java.util.List; 045import java.util.OptionalLong; 046import org.apache.hadoop.conf.Configuration; 047import org.apache.hadoop.fs.Path; 048import org.apache.hadoop.hbase.Cell; 049import org.apache.hadoop.hbase.CellComparatorImpl; 050import org.apache.hadoop.hbase.HBaseClassTestRule; 051import org.apache.hadoop.hbase.HBaseConfiguration; 052import org.apache.hadoop.hbase.HColumnDescriptor; 053import org.apache.hadoop.hbase.HRegionInfo; 054import org.apache.hadoop.hbase.KeyValue; 055import org.apache.hadoop.hbase.io.hfile.HFile; 056import org.apache.hadoop.hbase.regionserver.BloomType; 057import org.apache.hadoop.hbase.regionserver.HStore; 058import org.apache.hadoop.hbase.regionserver.HStoreFile; 059import org.apache.hadoop.hbase.regionserver.InternalScanner; 060import org.apache.hadoop.hbase.regionserver.ScanInfo; 061import org.apache.hadoop.hbase.regionserver.ScanType; 062import org.apache.hadoop.hbase.regionserver.ScannerContext; 063import org.apache.hadoop.hbase.regionserver.StoreConfigInformation; 064import org.apache.hadoop.hbase.regionserver.StoreFileReader; 065import org.apache.hadoop.hbase.regionserver.StoreFileScanner; 066import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter; 067import org.apache.hadoop.hbase.regionserver.StripeStoreConfig; 068import org.apache.hadoop.hbase.regionserver.StripeStoreFileManager; 069import org.apache.hadoop.hbase.regionserver.StripeStoreFlusher; 070import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy.StripeInformationProvider; 071import org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.StoreFileWritersCapture; 072import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; 073import org.apache.hadoop.hbase.testclassification.MediumTests; 074import org.apache.hadoop.hbase.testclassification.RegionServerTests; 075import org.apache.hadoop.hbase.util.Bytes; 076import org.apache.hadoop.hbase.util.ConcatenatedLists; 077import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 078import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; 079import org.junit.ClassRule; 080import org.junit.Test; 081import org.junit.experimental.categories.Category; 082import org.junit.runner.RunWith; 083import org.junit.runners.Parameterized; 084import org.junit.runners.Parameterized.Parameter; 085import org.junit.runners.Parameterized.Parameters; 086import org.mockito.ArgumentMatcher; 087import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 088import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 089 090@RunWith(Parameterized.class) 091@Category({RegionServerTests.class, MediumTests.class}) 092public class TestStripeCompactionPolicy { 093 094 @ClassRule 095 public static final HBaseClassTestRule CLASS_RULE = 096 HBaseClassTestRule.forClass(TestStripeCompactionPolicy.class); 097 098 private static final byte[] KEY_A = Bytes.toBytes("aaa"); 099 private static final byte[] KEY_B = Bytes.toBytes("bbb"); 100 private static final byte[] KEY_C = Bytes.toBytes("ccc"); 101 private static final byte[] KEY_D = Bytes.toBytes("ddd"); 102 private static final byte[] KEY_E = Bytes.toBytes("eee"); 103 private static final KeyValue KV_A = new KeyValue(KEY_A, 0L); 104 private static final KeyValue KV_B = new KeyValue(KEY_B, 0L); 105 private static final KeyValue KV_C = new KeyValue(KEY_C, 0L); 106 private static final KeyValue KV_D = new KeyValue(KEY_D, 0L); 107 private static final KeyValue KV_E = new KeyValue(KEY_E, 0L); 108 109 110 private static long defaultSplitSize = 18; 111 private static float defaultSplitCount = 1.8F; 112 private final static int defaultInitialCount = 1; 113 private static long defaultTtl = 1000 * 1000; 114 115 @Parameters(name = "{index}: usePrivateReaders={0}") 116 public static Iterable<Object[]> data() { 117 return Arrays.asList(new Object[] { true }, new Object[] { false }); 118 } 119 120 @Parameter 121 public boolean usePrivateReaders; 122 123 @Test 124 public void testNoStripesFromFlush() throws Exception { 125 Configuration conf = HBaseConfiguration.create(); 126 conf.setBoolean(StripeStoreConfig.FLUSH_TO_L0_KEY, true); 127 StripeCompactionPolicy policy = createPolicy(conf); 128 StripeInformationProvider si = createStripesL0Only(0, 0); 129 130 KeyValue[] input = new KeyValue[] { KV_A, KV_B, KV_C, KV_D, KV_E }; 131 KeyValue[][] expected = new KeyValue[][] { input }; 132 verifyFlush(policy, si, input, expected, null); 133 } 134 135 @Test 136 public void testOldStripesFromFlush() throws Exception { 137 StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create()); 138 StripeInformationProvider si = createStripes(0, KEY_C, KEY_D); 139 140 KeyValue[] input = new KeyValue[] { KV_B, KV_C, KV_C, KV_D, KV_E }; 141 KeyValue[][] expected = new KeyValue[][] { new KeyValue[] { KV_B }, 142 new KeyValue[] { KV_C, KV_C }, new KeyValue[] { KV_D, KV_E } }; 143 verifyFlush(policy, si, input, expected, new byte[][] { OPEN_KEY, KEY_C, KEY_D, OPEN_KEY }); 144 } 145 146 @Test 147 public void testNewStripesFromFlush() throws Exception { 148 StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create()); 149 StripeInformationProvider si = createStripesL0Only(0, 0); 150 KeyValue[] input = new KeyValue[] { KV_B, KV_C, KV_C, KV_D, KV_E }; 151 // Starts with one stripe; unlike flush results, must have metadata 152 KeyValue[][] expected = new KeyValue[][] { input }; 153 verifyFlush(policy, si, input, expected, new byte[][] { OPEN_KEY, OPEN_KEY }); 154 } 155 156 @Test 157 public void testSingleStripeCompaction() throws Exception { 158 // Create a special policy that only compacts single stripes, using standard methods. 159 Configuration conf = HBaseConfiguration.create(); 160 // Test depends on this not being set to pass. Default breaks test. TODO: Revisit. 161 conf.unset("hbase.hstore.compaction.min.size"); 162 conf.setFloat(CompactionConfiguration.HBASE_HSTORE_COMPACTION_RATIO_KEY, 1.0F); 163 conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 3); 164 conf.setInt(StripeStoreConfig.MAX_FILES_KEY, 4); 165 conf.setLong(StripeStoreConfig.SIZE_TO_SPLIT_KEY, 1000); // make sure the are no splits 166 StoreConfigInformation sci = mock(StoreConfigInformation.class); 167 when(sci.getRegionInfo()).thenReturn(HRegionInfo.FIRST_META_REGIONINFO); 168 StripeStoreConfig ssc = new StripeStoreConfig(conf, sci); 169 StripeCompactionPolicy policy = new StripeCompactionPolicy(conf, sci, ssc) { 170 @Override 171 public StripeCompactionRequest selectCompaction(StripeInformationProvider si, 172 List<HStoreFile> filesCompacting, boolean isOffpeak) throws IOException { 173 if (!filesCompacting.isEmpty()) { 174 return null; 175 } 176 return selectSingleStripeCompaction(si, false, false, isOffpeak); 177 } 178 179 @Override 180 public boolean needsCompactions( 181 StripeInformationProvider si, List<HStoreFile> filesCompacting) { 182 if (!filesCompacting.isEmpty()) { 183 return false; 184 } 185 return needsSingleStripeCompaction(si); 186 } 187 }; 188 189 // No compaction due to min files or ratio 190 StripeInformationProvider si = createStripesWithSizes(0, 0, 191 new Long[] { 2L }, new Long[] { 3L, 3L }, new Long[] { 5L, 1L }); 192 verifyNoCompaction(policy, si); 193 // No compaction due to min files or ratio - will report needed, but not do any. 194 si = createStripesWithSizes(0, 0, 195 new Long[] { 2L }, new Long[] { 3L, 3L }, new Long[] { 5L, 1L, 1L }); 196 assertNull(policy.selectCompaction(si, al(), false)); 197 assertTrue(policy.needsCompactions(si, al())); 198 // One stripe has possible compaction 199 si = createStripesWithSizes(0, 0, 200 new Long[] { 2L }, new Long[] { 3L, 3L }, new Long[] { 5L, 4L, 3L }); 201 verifySingleStripeCompaction(policy, si, 2, null); 202 // Several stripes have possible compactions; choose best quality (removes most files) 203 si = createStripesWithSizes(0, 0, 204 new Long[] { 3L, 2L, 2L }, new Long[] { 2L, 2L, 1L }, new Long[] { 3L, 2L, 2L, 1L }); 205 verifySingleStripeCompaction(policy, si, 2, null); 206 si = createStripesWithSizes(0, 0, 207 new Long[] { 5L }, new Long[] { 3L, 2L, 2L, 1L }, new Long[] { 3L, 2L, 2L }); 208 verifySingleStripeCompaction(policy, si, 1, null); 209 // Or with smallest files, if the count is the same 210 si = createStripesWithSizes(0, 0, 211 new Long[] { 3L, 3L, 3L }, new Long[] { 3L, 1L, 2L }, new Long[] { 3L, 2L, 2L }); 212 verifySingleStripeCompaction(policy, si, 1, null); 213 // Verify max count is respected. 214 si = createStripesWithSizes(0, 0, new Long[] { 5L }, new Long[] { 5L, 4L, 4L, 4L, 4L }); 215 List<HStoreFile> sfs = si.getStripes().get(1).subList(1, 5); 216 verifyCompaction(policy, si, sfs, null, 1, null, si.getStartRow(1), si.getEndRow(1), true); 217 // Verify ratio is applied. 218 si = createStripesWithSizes(0, 0, new Long[] { 5L }, new Long[] { 50L, 4L, 4L, 4L, 4L }); 219 sfs = si.getStripes().get(1).subList(1, 5); 220 verifyCompaction(policy, si, sfs, null, 1, null, si.getStartRow(1), si.getEndRow(1), true); 221 } 222 223 @Test 224 public void testWithParallelCompaction() throws Exception { 225 // TODO: currently only one compaction at a time per store is allowed. If this changes, 226 // the appropriate file exclusion testing would need to be done in respective tests. 227 assertNull(createPolicy(HBaseConfiguration.create()).selectCompaction( 228 mock(StripeInformationProvider.class), al(createFile()), false)); 229 } 230 231 @Test 232 public void testWithReferences() throws Exception { 233 StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create()); 234 StripeCompactor sc = mock(StripeCompactor.class); 235 HStoreFile ref = createFile(); 236 when(ref.isReference()).thenReturn(true); 237 StripeInformationProvider si = mock(StripeInformationProvider.class); 238 Collection<HStoreFile> sfs = al(ref, createFile()); 239 when(si.getStorefiles()).thenReturn(sfs); 240 241 assertTrue(policy.needsCompactions(si, al())); 242 StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false); 243 // UnmodifiableCollection does not implement equals so we need to change it here to a 244 // collection that implements it. 245 assertEquals(si.getStorefiles(), new ArrayList<>(scr.getRequest().getFiles())); 246 scr.execute(sc, NoLimitThroughputController.INSTANCE, null); 247 verify(sc, only()).compact(eq(scr.getRequest()), anyInt(), anyLong(), aryEq(OPEN_KEY), 248 aryEq(OPEN_KEY), aryEq(OPEN_KEY), aryEq(OPEN_KEY), 249 any(), any()); 250 } 251 252 @Test 253 public void testInitialCountFromL0() throws Exception { 254 Configuration conf = HBaseConfiguration.create(); 255 conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 2); 256 StripeCompactionPolicy policy = createPolicy( 257 conf, defaultSplitSize, defaultSplitCount, 2, false); 258 StripeCompactionPolicy.StripeInformationProvider si = createStripesL0Only(3, 8); 259 verifyCompaction(policy, si, si.getStorefiles(), true, 2, 12L, OPEN_KEY, OPEN_KEY, true); 260 si = createStripesL0Only(3, 10); // If result would be too large, split into smaller parts. 261 verifyCompaction(policy, si, si.getStorefiles(), true, 3, 10L, OPEN_KEY, OPEN_KEY, true); 262 policy = createPolicy(conf, defaultSplitSize, defaultSplitCount, 6, false); 263 verifyCompaction(policy, si, si.getStorefiles(), true, 6, 5L, OPEN_KEY, OPEN_KEY, true); 264 } 265 266 @Test 267 public void testExistingStripesFromL0() throws Exception { 268 Configuration conf = HBaseConfiguration.create(); 269 conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 3); 270 StripeCompactionPolicy.StripeInformationProvider si = createStripes(3, KEY_A); 271 verifyCompaction( 272 createPolicy(conf), si, si.getLevel0Files(), null, null, si.getStripeBoundaries()); 273 } 274 275 @Test 276 public void testNothingToCompactFromL0() throws Exception { 277 Configuration conf = HBaseConfiguration.create(); 278 conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 4); 279 StripeCompactionPolicy.StripeInformationProvider si = createStripesL0Only(3, 10); 280 StripeCompactionPolicy policy = createPolicy(conf); 281 verifyNoCompaction(policy, si); 282 283 si = createStripes(3, KEY_A); 284 verifyNoCompaction(policy, si); 285 } 286 287 @Test 288 public void testSplitOffStripe() throws Exception { 289 Configuration conf = HBaseConfiguration.create(); 290 // Test depends on this not being set to pass. Default breaks test. TODO: Revisit. 291 conf.unset("hbase.hstore.compaction.min.size"); 292 // First test everything with default split count of 2, then split into more. 293 conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 2); 294 Long[] toSplit = new Long[] { defaultSplitSize - 2, 1L, 1L }; 295 Long[] noSplit = new Long[] { defaultSplitSize - 2, 1L }; 296 long splitTargetSize = (long)(defaultSplitSize / defaultSplitCount); 297 // Don't split if not eligible for compaction. 298 StripeCompactionPolicy.StripeInformationProvider si = 299 createStripesWithSizes(0, 0, new Long[] { defaultSplitSize - 2, 2L }); 300 assertNull(createPolicy(conf).selectCompaction(si, al(), false)); 301 // Make sure everything is eligible. 302 conf.setFloat(CompactionConfiguration.HBASE_HSTORE_COMPACTION_RATIO_KEY, 500f); 303 StripeCompactionPolicy policy = createPolicy(conf); 304 verifyWholeStripesCompaction(policy, si, 0, 0, null, 2, splitTargetSize); 305 // Add some extra stripes... 306 si = createStripesWithSizes(0, 0, noSplit, noSplit, toSplit); 307 verifyWholeStripesCompaction(policy, si, 2, 2, null, 2, splitTargetSize); 308 // In the middle. 309 si = createStripesWithSizes(0, 0, noSplit, toSplit, noSplit); 310 verifyWholeStripesCompaction(policy, si, 1, 1, null, 2, splitTargetSize); 311 // No split-off with different config (larger split size). 312 // However, in this case some eligible stripe will just be compacted alone. 313 StripeCompactionPolicy specPolicy = createPolicy( 314 conf, defaultSplitSize + 1, defaultSplitCount, defaultInitialCount, false); 315 verifySingleStripeCompaction(specPolicy, si, 1, null); 316 } 317 318 @Test 319 public void testSplitOffStripeOffPeak() throws Exception { 320 // for HBASE-11439 321 Configuration conf = HBaseConfiguration.create(); 322 323 // Test depends on this not being set to pass. Default breaks test. TODO: Revisit. 324 conf.unset("hbase.hstore.compaction.min.size"); 325 326 conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 2); 327 // Select the last 2 files. 328 StripeCompactionPolicy.StripeInformationProvider si = 329 createStripesWithSizes(0, 0, new Long[] { defaultSplitSize - 2, 1L, 1L }); 330 assertEquals(2, createPolicy(conf).selectCompaction(si, al(), false).getRequest().getFiles() 331 .size()); 332 // Make sure everything is eligible in offpeak. 333 conf.setFloat("hbase.hstore.compaction.ratio.offpeak", 500f); 334 assertEquals(3, createPolicy(conf).selectCompaction(si, al(), true).getRequest().getFiles() 335 .size()); 336 } 337 338 @Test 339 public void testSplitOffStripeDropDeletes() throws Exception { 340 Configuration conf = HBaseConfiguration.create(); 341 conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 2); 342 StripeCompactionPolicy policy = createPolicy(conf); 343 Long[] toSplit = new Long[] { defaultSplitSize / 2, defaultSplitSize / 2 }; 344 Long[] noSplit = new Long[] { 1L }; 345 long splitTargetSize = (long)(defaultSplitSize / defaultSplitCount); 346 347 // Verify the deletes can be dropped if there are no L0 files. 348 StripeCompactionPolicy.StripeInformationProvider si = 349 createStripesWithSizes(0, 0, noSplit, toSplit); 350 verifyWholeStripesCompaction(policy, si, 1, 1, true, null, splitTargetSize); 351 // But cannot be dropped if there are. 352 si = createStripesWithSizes(2, 2, noSplit, toSplit); 353 verifyWholeStripesCompaction(policy, si, 1, 1, false, null, splitTargetSize); 354 } 355 356 @SuppressWarnings("unchecked") 357 @Test 358 public void testMergeExpiredFiles() throws Exception { 359 ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); 360 long now = defaultTtl + 2; 361 edge.setValue(now); 362 EnvironmentEdgeManager.injectEdge(edge); 363 try { 364 HStoreFile expiredFile = createFile(), notExpiredFile = createFile(); 365 when(expiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl - 1); 366 when(notExpiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl + 1); 367 List<HStoreFile> expired = Lists.newArrayList(expiredFile, expiredFile); 368 List<HStoreFile> notExpired = Lists.newArrayList(notExpiredFile, notExpiredFile); 369 List<HStoreFile> mixed = Lists.newArrayList(expiredFile, notExpiredFile); 370 371 StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create(), 372 defaultSplitSize, defaultSplitCount, defaultInitialCount, true); 373 // Merge expired if there are eligible stripes. 374 StripeCompactionPolicy.StripeInformationProvider si = 375 createStripesWithFiles(expired, expired, expired); 376 verifyWholeStripesCompaction(policy, si, 0, 2, null, 1, Long.MAX_VALUE, false); 377 // Don't merge if nothing expired. 378 si = createStripesWithFiles(notExpired, notExpired, notExpired); 379 assertNull(policy.selectCompaction(si, al(), false)); 380 // Merge one expired stripe with next. 381 si = createStripesWithFiles(notExpired, expired, notExpired); 382 verifyWholeStripesCompaction(policy, si, 1, 2, null, 1, Long.MAX_VALUE, false); 383 // Merge the biggest run out of multiple options. 384 // Merge one expired stripe with next. 385 si = createStripesWithFiles(notExpired, expired, notExpired, expired, expired, notExpired); 386 verifyWholeStripesCompaction(policy, si, 3, 4, null, 1, Long.MAX_VALUE, false); 387 // Stripe with a subset of expired files is not merged. 388 si = createStripesWithFiles(expired, expired, notExpired, expired, mixed); 389 verifyWholeStripesCompaction(policy, si, 0, 1, null, 1, Long.MAX_VALUE, false); 390 } finally { 391 EnvironmentEdgeManager.reset(); 392 } 393 } 394 395 @SuppressWarnings("unchecked") 396 @Test 397 public void testMergeExpiredStripes() throws Exception { 398 // HBASE-11397 399 ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); 400 long now = defaultTtl + 2; 401 edge.setValue(now); 402 EnvironmentEdgeManager.injectEdge(edge); 403 try { 404 HStoreFile expiredFile = createFile(), notExpiredFile = createFile(); 405 when(expiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl - 1); 406 when(notExpiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl + 1); 407 List<HStoreFile> expired = Lists.newArrayList(expiredFile, expiredFile); 408 List<HStoreFile> notExpired = Lists.newArrayList(notExpiredFile, notExpiredFile); 409 410 StripeCompactionPolicy policy = 411 createPolicy(HBaseConfiguration.create(), defaultSplitSize, defaultSplitCount, 412 defaultInitialCount, true); 413 414 // Merge all three expired stripes into one. 415 StripeCompactionPolicy.StripeInformationProvider si = 416 createStripesWithFiles(expired, expired, expired); 417 verifyMergeCompatcion(policy, si, 0, 2); 418 419 // Merge two adjacent expired stripes into one. 420 si = createStripesWithFiles(notExpired, expired, notExpired, expired, expired, notExpired); 421 verifyMergeCompatcion(policy, si, 3, 4); 422 } finally { 423 EnvironmentEdgeManager.reset(); 424 } 425 } 426 427 @SuppressWarnings("unchecked") 428 private static StripeCompactionPolicy.StripeInformationProvider createStripesWithFiles( 429 List<HStoreFile>... stripeFiles) throws Exception { 430 return createStripesWithFiles(createBoundaries(stripeFiles.length), 431 Lists.newArrayList(stripeFiles), new ArrayList<>()); 432 } 433 434 @Test 435 public void testSingleStripeDropDeletes() throws Exception { 436 Configuration conf = HBaseConfiguration.create(); 437 // Test depends on this not being set to pass. Default breaks test. TODO: Revisit. 438 conf.unset("hbase.hstore.compaction.min.size"); 439 StripeCompactionPolicy policy = createPolicy(conf); 440 // Verify the deletes can be dropped if there are no L0 files. 441 Long[][] stripes = new Long[][] { new Long[] { 3L, 2L, 2L, 2L }, new Long[] { 6L } }; 442 StripeInformationProvider si = createStripesWithSizes(0, 0, stripes); 443 verifySingleStripeCompaction(policy, si, 0, true); 444 // But cannot be dropped if there are. 445 si = createStripesWithSizes(2, 2, stripes); 446 verifySingleStripeCompaction(policy, si, 0, false); 447 // Unless there are enough to cause L0 compaction. 448 si = createStripesWithSizes(6, 2, stripes); 449 ConcatenatedLists<HStoreFile> sfs = new ConcatenatedLists<>(); 450 sfs.addSublist(si.getLevel0Files()); 451 sfs.addSublist(si.getStripes().get(0)); 452 verifyCompaction( 453 policy, si, sfs, si.getStartRow(0), si.getEndRow(0), si.getStripeBoundaries()); 454 // If we cannot actually compact all files in some stripe, L0 is chosen. 455 si = createStripesWithSizes(6, 2, 456 new Long[][] { new Long[] { 10L, 1L, 1L, 1L, 1L }, new Long[] { 12L } }); 457 verifyCompaction(policy, si, si.getLevel0Files(), null, null, si.getStripeBoundaries()); 458 // even if L0 has no file 459 // if all files of stripe aren't selected, delete must not be dropped. 460 stripes = new Long[][] { new Long[] { 100L, 3L, 2L, 2L, 2L }, new Long[] { 6L } }; 461 si = createStripesWithSizes(0, 0, stripes); 462 List<HStoreFile> compactFile = new ArrayList<>(); 463 Iterator<HStoreFile> iter = si.getStripes().get(0).listIterator(1); 464 while (iter.hasNext()) { 465 compactFile.add(iter.next()); 466 } 467 verifyCompaction(policy, si, compactFile, false, 1, null, si.getStartRow(0), si.getEndRow(0), 468 true); 469 } 470 471 /********* HELPER METHODS ************/ 472 private static StripeCompactionPolicy createPolicy( 473 Configuration conf) throws Exception { 474 return createPolicy(conf, defaultSplitSize, defaultSplitCount, defaultInitialCount, false); 475 } 476 477 private static StripeCompactionPolicy createPolicy(Configuration conf, 478 long splitSize, float splitCount, int initialCount, boolean hasTtl) throws Exception { 479 conf.setLong(StripeStoreConfig.SIZE_TO_SPLIT_KEY, splitSize); 480 conf.setFloat(StripeStoreConfig.SPLIT_PARTS_KEY, splitCount); 481 conf.setInt(StripeStoreConfig.INITIAL_STRIPE_COUNT_KEY, initialCount); 482 StoreConfigInformation sci = mock(StoreConfigInformation.class); 483 when(sci.getStoreFileTtl()).thenReturn(hasTtl ? defaultTtl : Long.MAX_VALUE); 484 when(sci.getRegionInfo()).thenReturn(HRegionInfo.FIRST_META_REGIONINFO); 485 StripeStoreConfig ssc = new StripeStoreConfig(conf, sci); 486 return new StripeCompactionPolicy(conf, sci, ssc); 487 } 488 489 private static ArrayList<HStoreFile> al(HStoreFile... sfs) { 490 return new ArrayList<>(Arrays.asList(sfs)); 491 } 492 493 private void verifyMergeCompatcion(StripeCompactionPolicy policy, StripeInformationProvider si, 494 int from, int to) throws Exception { 495 StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false); 496 Collection<HStoreFile> sfs = getAllFiles(si, from, to); 497 verifyCollectionsEqual(sfs, scr.getRequest().getFiles()); 498 499 // All the Stripes are expired, so the Compactor will not create any Writers. We need to create 500 // an empty file to preserve metadata 501 StripeCompactor sc = createCompactor(); 502 List<Path> paths = scr.execute(sc, NoLimitThroughputController.INSTANCE, null); 503 assertEquals(1, paths.size()); 504 } 505 506 /** 507 * Verify the compaction that includes several entire stripes. 508 * @param policy Policy to test. 509 * @param si Stripe information pre-set with stripes to test. 510 * @param from Starting stripe. 511 * @param to Ending stripe (inclusive). 512 * @param dropDeletes Whether to drop deletes from compaction range. 513 * @param count Expected # of resulting stripes, null if not checked. 514 * @param size Expected target stripe size, null if not checked. 515 */ 516 private void verifyWholeStripesCompaction(StripeCompactionPolicy policy, 517 StripeInformationProvider si, int from, int to, Boolean dropDeletes, 518 Integer count, Long size, boolean needsCompaction) throws IOException { 519 verifyCompaction(policy, si, getAllFiles(si, from, to), dropDeletes, 520 count, size, si.getStartRow(from), si.getEndRow(to), needsCompaction); 521 } 522 523 private void verifyWholeStripesCompaction(StripeCompactionPolicy policy, 524 StripeInformationProvider si, int from, int to, Boolean dropDeletes, 525 Integer count, Long size) throws IOException { 526 verifyWholeStripesCompaction(policy, si, from, to, dropDeletes, count, size, true); 527 } 528 529 private void verifySingleStripeCompaction(StripeCompactionPolicy policy, 530 StripeInformationProvider si, int index, Boolean dropDeletes) throws IOException { 531 verifyWholeStripesCompaction(policy, si, index, index, dropDeletes, 1, null, true); 532 } 533 534 /** 535 * Verify no compaction is needed or selected. 536 * @param policy Policy to test. 537 * @param si Stripe information pre-set with stripes to test. 538 */ 539 private void verifyNoCompaction( 540 StripeCompactionPolicy policy, StripeInformationProvider si) throws IOException { 541 assertNull(policy.selectCompaction(si, al(), false)); 542 assertFalse(policy.needsCompactions(si, al())); 543 } 544 545 /** 546 * Verify arbitrary compaction. 547 * @param policy Policy to test. 548 * @param si Stripe information pre-set with stripes to test. 549 * @param sfs Files that should be compacted. 550 * @param dropDeletesFrom Row from which to drop deletes. 551 * @param dropDeletesTo Row to which to drop deletes. 552 * @param boundaries Expected target stripe boundaries. 553 */ 554 private void verifyCompaction(StripeCompactionPolicy policy, StripeInformationProvider si, 555 Collection<HStoreFile> sfs, byte[] dropDeletesFrom, byte[] dropDeletesTo, 556 final List<byte[]> boundaries) throws Exception { 557 StripeCompactor sc = mock(StripeCompactor.class); 558 assertTrue(policy.needsCompactions(si, al())); 559 StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false); 560 verifyCollectionsEqual(sfs, scr.getRequest().getFiles()); 561 scr.execute(sc, NoLimitThroughputController.INSTANCE, null); 562 verify(sc, times(1)).compact(eq(scr.getRequest()), argThat(new ArgumentMatcher<List<byte[]>>() { 563 @Override 564 public boolean matches(List<byte[]> argument) { 565 List<byte[]> other = argument; 566 if (other.size() != boundaries.size()) { 567 return false; 568 } 569 for (int i = 0; i < other.size(); ++i) { 570 if (!Bytes.equals(other.get(i), boundaries.get(i))) { 571 return false; 572 } 573 } 574 return true; 575 } 576 }), dropDeletesFrom == null ? isNull(byte[].class) : aryEq(dropDeletesFrom), 577 dropDeletesTo == null ? isNull(byte[].class) : aryEq(dropDeletesTo), 578 any(), any()); 579 } 580 581 /** 582 * Verify arbitrary compaction. 583 * @param policy Policy to test. 584 * @param si Stripe information pre-set with stripes to test. 585 * @param sfs Files that should be compacted. 586 * @param dropDeletes Whether to drop deletes from compaction range. 587 * @param count Expected # of resulting stripes, null if not checked. 588 * @param size Expected target stripe size, null if not checked. 589 * @param start Left boundary of the compaction. 590 * @param end Right boundary of the compaction. 591 */ 592 private void verifyCompaction(StripeCompactionPolicy policy, StripeInformationProvider si, 593 Collection<HStoreFile> sfs, Boolean dropDeletes, Integer count, Long size, 594 byte[] start, byte[] end, boolean needsCompaction) throws IOException { 595 StripeCompactor sc = mock(StripeCompactor.class); 596 assertTrue(!needsCompaction || policy.needsCompactions(si, al())); 597 StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false); 598 verifyCollectionsEqual(sfs, scr.getRequest().getFiles()); 599 scr.execute(sc, NoLimitThroughputController.INSTANCE, null); 600 verify(sc, times(1)).compact(eq(scr.getRequest()), 601 count == null ? anyInt() : eq(count.intValue()), 602 size == null ? anyLong() : eq(size.longValue()), aryEq(start), aryEq(end), 603 dropDeletesMatcher(dropDeletes, start), dropDeletesMatcher(dropDeletes, end), 604 any(), any()); 605 } 606 607 /** Verify arbitrary flush. */ 608 protected void verifyFlush(StripeCompactionPolicy policy, StripeInformationProvider si, 609 KeyValue[] input, KeyValue[][] expected, byte[][] boundaries) throws IOException { 610 StoreFileWritersCapture writers = new StoreFileWritersCapture(); 611 StripeStoreFlusher.StripeFlushRequest req = 612 policy.selectFlush(CellComparatorImpl.COMPARATOR, si, input.length); 613 StripeMultiFileWriter mw = req.createWriter(); 614 mw.init(null, writers); 615 for (KeyValue kv : input) { 616 mw.append(kv); 617 } 618 boolean hasMetadata = boundaries != null; 619 mw.commitWriters(0, false); 620 writers.verifyKvs(expected, true, hasMetadata); 621 if (hasMetadata) { 622 writers.verifyBoundaries(boundaries); 623 } 624 } 625 626 627 private byte[] dropDeletesMatcher(Boolean dropDeletes, byte[] value) { 628 return dropDeletes == null ? any() 629 : (dropDeletes.booleanValue() ? aryEq(value) : isNull(byte[].class)); 630 } 631 632 private void verifyCollectionsEqual(Collection<HStoreFile> sfs, Collection<HStoreFile> scr) { 633 // Dumb. 634 assertEquals(sfs.size(), scr.size()); 635 assertTrue(scr.containsAll(sfs)); 636 } 637 638 private static List<HStoreFile> getAllFiles( 639 StripeInformationProvider si, int fromStripe, int toStripe) { 640 ArrayList<HStoreFile> expected = new ArrayList<>(); 641 for (int i = fromStripe; i <= toStripe; ++i) { 642 expected.addAll(si.getStripes().get(i)); 643 } 644 return expected; 645 } 646 647 /** 648 * @param l0Count Number of L0 files. 649 * @param boundaries Target boundaries. 650 * @return Mock stripes. 651 */ 652 private static StripeInformationProvider createStripes( 653 int l0Count, byte[]... boundaries) throws Exception { 654 List<Long> l0Sizes = new ArrayList<>(); 655 for (int i = 0; i < l0Count; ++i) { 656 l0Sizes.add(5L); 657 } 658 List<List<Long>> sizes = new ArrayList<>(); 659 for (int i = 0; i <= boundaries.length; ++i) { 660 sizes.add(Arrays.asList(Long.valueOf(5))); 661 } 662 return createStripes(Arrays.asList(boundaries), sizes, l0Sizes); 663 } 664 665 /** 666 * @param l0Count Number of L0 files. 667 * @param l0Size Size of each file. 668 * @return Mock stripes. 669 */ 670 private static StripeInformationProvider createStripesL0Only( 671 int l0Count, long l0Size) throws Exception { 672 List<Long> l0Sizes = new ArrayList<>(); 673 for (int i = 0; i < l0Count; ++i) { 674 l0Sizes.add(l0Size); 675 } 676 return createStripes(null, new ArrayList<>(), l0Sizes); 677 } 678 679 /** 680 * @param l0Count Number of L0 files. 681 * @param l0Size Size of each file. 682 * @param sizes Sizes of the files; each sub-array representing a stripe. 683 * @return Mock stripes. 684 */ 685 private static StripeInformationProvider createStripesWithSizes( 686 int l0Count, long l0Size, Long[]... sizes) throws Exception { 687 ArrayList<List<Long>> sizeList = new ArrayList<>(sizes.length); 688 for (Long[] size : sizes) { 689 sizeList.add(Arrays.asList(size)); 690 } 691 return createStripesWithSizes(l0Count, l0Size, sizeList); 692 } 693 694 private static StripeInformationProvider createStripesWithSizes( 695 int l0Count, long l0Size, List<List<Long>> sizes) throws Exception { 696 List<byte[]> boundaries = createBoundaries(sizes.size()); 697 List<Long> l0Sizes = new ArrayList<>(); 698 for (int i = 0; i < l0Count; ++i) { 699 l0Sizes.add(l0Size); 700 } 701 return createStripes(boundaries, sizes, l0Sizes); 702 } 703 704 private static List<byte[]> createBoundaries(int stripeCount) { 705 byte[][] keys = new byte[][] { KEY_A, KEY_B, KEY_C, KEY_D, KEY_E }; 706 assert stripeCount <= keys.length + 1; 707 List<byte[]> boundaries = new ArrayList<>(); 708 boundaries.addAll(Arrays.asList(keys).subList(0, stripeCount - 1)); 709 return boundaries; 710 } 711 712 private static StripeInformationProvider createStripes(List<byte[]> boundaries, 713 List<List<Long>> stripeSizes, List<Long> l0Sizes) throws Exception { 714 List<List<HStoreFile>> stripeFiles = new ArrayList<>(stripeSizes.size()); 715 for (List<Long> sizes : stripeSizes) { 716 List<HStoreFile> sfs = new ArrayList<>(sizes.size()); 717 for (Long size : sizes) { 718 sfs.add(createFile(size)); 719 } 720 stripeFiles.add(sfs); 721 } 722 List<HStoreFile> l0Files = new ArrayList<>(); 723 for (Long size : l0Sizes) { 724 l0Files.add(createFile(size)); 725 } 726 return createStripesWithFiles(boundaries, stripeFiles, l0Files); 727 } 728 729 /** 730 * This method actually does all the work. 731 */ 732 private static StripeInformationProvider createStripesWithFiles(List<byte[]> boundaries, 733 List<List<HStoreFile>> stripeFiles, List<HStoreFile> l0Files) throws Exception { 734 ArrayList<ImmutableList<HStoreFile>> stripes = new ArrayList<>(); 735 ArrayList<byte[]> boundariesList = new ArrayList<>(); 736 StripeInformationProvider si = mock(StripeInformationProvider.class); 737 if (!stripeFiles.isEmpty()) { 738 assert stripeFiles.size() == (boundaries.size() + 1); 739 boundariesList.add(OPEN_KEY); 740 for (int i = 0; i <= boundaries.size(); ++i) { 741 byte[] startKey = ((i == 0) ? OPEN_KEY : boundaries.get(i - 1)); 742 byte[] endKey = ((i == boundaries.size()) ? OPEN_KEY : boundaries.get(i)); 743 boundariesList.add(endKey); 744 for (HStoreFile sf : stripeFiles.get(i)) { 745 setFileStripe(sf, startKey, endKey); 746 } 747 stripes.add(ImmutableList.copyOf(stripeFiles.get(i))); 748 when(si.getStartRow(eq(i))).thenReturn(startKey); 749 when(si.getEndRow(eq(i))).thenReturn(endKey); 750 } 751 } 752 ConcatenatedLists<HStoreFile> sfs = new ConcatenatedLists<>(); 753 sfs.addAllSublists(stripes); 754 sfs.addSublist(l0Files); 755 when(si.getStorefiles()).thenReturn(sfs); 756 when(si.getStripes()).thenReturn(stripes); 757 when(si.getStripeBoundaries()).thenReturn(boundariesList); 758 when(si.getStripeCount()).thenReturn(stripes.size()); 759 when(si.getLevel0Files()).thenReturn(l0Files); 760 return si; 761 } 762 763 private static HStoreFile createFile(long size) throws Exception { 764 HStoreFile sf = mock(HStoreFile.class); 765 when(sf.getPath()).thenReturn(new Path("moo")); 766 StoreFileReader r = mock(StoreFileReader.class); 767 when(r.getEntries()).thenReturn(size); 768 when(r.length()).thenReturn(size); 769 when(r.getBloomFilterType()).thenReturn(BloomType.NONE); 770 when(r.getHFileReader()).thenReturn(mock(HFile.Reader.class)); 771 when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong(), anyLong(), 772 anyBoolean())).thenReturn(mock(StoreFileScanner.class)); 773 when(sf.getReader()).thenReturn(r); 774 when(sf.getBulkLoadTimestamp()).thenReturn(OptionalLong.empty()); 775 return sf; 776 } 777 778 private static HStoreFile createFile() throws Exception { 779 return createFile(0); 780 } 781 782 private static void setFileStripe(HStoreFile sf, byte[] startKey, byte[] endKey) { 783 when(sf.getMetadataValue(StripeStoreFileManager.STRIPE_START_KEY)).thenReturn(startKey); 784 when(sf.getMetadataValue(StripeStoreFileManager.STRIPE_END_KEY)).thenReturn(endKey); 785 } 786 787 private StripeCompactor createCompactor() throws Exception { 788 HColumnDescriptor col = new HColumnDescriptor(Bytes.toBytes("foo")); 789 StoreFileWritersCapture writers = new StoreFileWritersCapture(); 790 HStore store = mock(HStore.class); 791 HRegionInfo info = mock(HRegionInfo.class); 792 when(info.getRegionNameAsString()).thenReturn("testRegion"); 793 when(store.getColumnFamilyDescriptor()).thenReturn(col); 794 when(store.getRegionInfo()).thenReturn(info); 795 when( 796 store.createWriterInTmp(anyLong(), any(), anyBoolean(), 797 anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers); 798 when( 799 store.createWriterInTmp(anyLong(), any(), anyBoolean(), 800 anyBoolean(), anyBoolean(), anyBoolean(), anyLong(), anyString())).thenAnswer(writers); 801 802 Configuration conf = HBaseConfiguration.create(); 803 conf.setBoolean("hbase.regionserver.compaction.private.readers", usePrivateReaders); 804 final Scanner scanner = new Scanner(); 805 return new StripeCompactor(conf, store) { 806 @Override 807 protected InternalScanner createScanner(HStore store, ScanInfo scanInfo, 808 List<StoreFileScanner> scanners, long smallestReadPoint, long earliestPutTs, 809 byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException { 810 return scanner; 811 } 812 813 @Override 814 protected InternalScanner createScanner(HStore store, ScanInfo scanInfo, 815 List<StoreFileScanner> scanners, ScanType scanType, long smallestReadPoint, 816 long earliestPutTs) throws IOException { 817 return scanner; 818 } 819 }; 820 } 821 822 private static class Scanner implements InternalScanner { 823 private final ArrayList<KeyValue> kvs; 824 825 public Scanner(KeyValue... kvs) { 826 this.kvs = new ArrayList<>(Arrays.asList(kvs)); 827 } 828 829 @Override 830 public boolean next(List<Cell> result, ScannerContext scannerContext) 831 throws IOException { 832 if (kvs.isEmpty()) { 833 return false; 834 } 835 result.add(kvs.remove(0)); 836 return !kvs.isEmpty(); 837 } 838 839 @Override 840 public void close() throws IOException { 841 } 842 } 843}