001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY; 021import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY_BYTES; 022import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1; 023import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER; 024import static org.apache.hadoop.hbase.regionserver.compactions.CloseChecker.SIZE_LIMIT_KEY; 025import static org.apache.hadoop.hbase.regionserver.compactions.CloseChecker.TIME_LIMIT_KEY; 026import static org.junit.Assert.assertEquals; 027import static org.junit.Assert.assertFalse; 028import static org.junit.Assert.assertThrows; 029import static org.junit.Assert.assertTrue; 030import static org.junit.Assert.fail; 031import static org.mockito.ArgumentMatchers.any; 032import static org.mockito.Mockito.doAnswer; 033import static org.mockito.Mockito.mock; 034import static org.mockito.Mockito.spy; 035import static org.mockito.Mockito.when; 036 037import java.io.IOException; 038import java.util.ArrayList; 039import java.util.Collection; 040import java.util.Collections; 041import java.util.List; 042import java.util.Optional; 043import java.util.concurrent.CountDownLatch; 044import java.util.concurrent.TimeUnit; 045import org.apache.hadoop.conf.Configuration; 046import org.apache.hadoop.fs.FSDataOutputStream; 047import org.apache.hadoop.fs.FileStatus; 048import org.apache.hadoop.fs.FileSystem; 049import org.apache.hadoop.fs.Path; 050import org.apache.hadoop.hbase.ChoreService; 051import org.apache.hadoop.hbase.HBaseClassTestRule; 052import org.apache.hadoop.hbase.HBaseConfiguration; 053import org.apache.hadoop.hbase.HBaseTestingUtility; 054import org.apache.hadoop.hbase.HColumnDescriptor; 055import org.apache.hadoop.hbase.HConstants; 056import org.apache.hadoop.hbase.HTableDescriptor; 057import org.apache.hadoop.hbase.HTestConst; 058import org.apache.hadoop.hbase.Waiter; 059import org.apache.hadoop.hbase.client.Delete; 060import org.apache.hadoop.hbase.client.Durability; 061import org.apache.hadoop.hbase.client.Put; 062import org.apache.hadoop.hbase.client.Table; 063import org.apache.hadoop.hbase.io.hfile.HFileScanner; 064import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; 065import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 066import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; 067import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; 068import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory; 069import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; 070import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 071import org.apache.hadoop.hbase.security.User; 072import org.apache.hadoop.hbase.testclassification.MediumTests; 073import org.apache.hadoop.hbase.testclassification.RegionServerTests; 074import org.apache.hadoop.hbase.util.Bytes; 075import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 076import org.apache.hadoop.hbase.util.Threads; 077import org.apache.hadoop.hbase.wal.WAL; 078import org.junit.After; 079import org.junit.Assume; 080import org.junit.Before; 081import org.junit.ClassRule; 082import org.junit.Rule; 083import org.junit.Test; 084import org.junit.experimental.categories.Category; 085import org.junit.rules.TestName; 086import org.mockito.Mockito; 087import org.mockito.invocation.InvocationOnMock; 088import org.mockito.stubbing.Answer; 089 090/** 091 * Test compaction framework and common functions 092 */ 093@Category({ RegionServerTests.class, MediumTests.class }) 094public class TestCompaction { 095 096 @ClassRule 097 public static final HBaseClassTestRule CLASS_RULE = 098 HBaseClassTestRule.forClass(TestCompaction.class); 099 100 @Rule 101 public TestName name = new TestName(); 102 private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); 103 protected Configuration conf = UTIL.getConfiguration(); 104 105 private HRegion r = null; 106 private HTableDescriptor htd = null; 107 private static final byte[] COLUMN_FAMILY = fam1; 108 private final byte[] STARTROW = Bytes.toBytes(START_KEY); 109 private static final byte[] COLUMN_FAMILY_TEXT = COLUMN_FAMILY; 110 private int compactionThreshold; 111 private byte[] secondRowBytes, thirdRowBytes; 112 private static final long MAX_FILES_TO_COMPACT = 10; 113 private final byte[] FAMILY = Bytes.toBytes("cf"); 114 115 /** constructor */ 116 public TestCompaction() { 117 super(); 118 119 // Set cache flush size to 1MB 120 conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024); 121 conf.setInt(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, 100); 122 conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, 123 NoLimitThroughputController.class.getName()); 124 compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3); 125 126 secondRowBytes = START_KEY_BYTES.clone(); 127 // Increment the least significant character so we get to next row. 128 secondRowBytes[START_KEY_BYTES.length - 1]++; 129 thirdRowBytes = START_KEY_BYTES.clone(); 130 thirdRowBytes[START_KEY_BYTES.length - 1] = 131 (byte) (thirdRowBytes[START_KEY_BYTES.length - 1] + 2); 132 } 133 134 @Before 135 public void setUp() throws Exception { 136 this.htd = UTIL.createTableDescriptor(name.getMethodName()); 137 if (name.getMethodName().equals("testCompactionSeqId")) { 138 UTIL.getConfiguration().set("hbase.hstore.compaction.kv.max", "10"); 139 UTIL.getConfiguration().set(DefaultStoreEngine.DEFAULT_COMPACTOR_CLASS_KEY, 140 DummyCompactor.class.getName()); 141 HColumnDescriptor hcd = new HColumnDescriptor(FAMILY); 142 hcd.setMaxVersions(65536); 143 this.htd.addFamily(hcd); 144 } 145 this.r = UTIL.createLocalHRegion(htd, null, null); 146 } 147 148 @After 149 public void tearDown() throws Exception { 150 WAL wal = r.getWAL(); 151 this.r.close(); 152 wal.close(); 153 } 154 155 /** 156 * Verify that you can stop a long-running compaction (used during RS shutdown) 157 */ 158 @Test 159 public void testInterruptCompactionBySize() throws Exception { 160 assertEquals(0, count()); 161 162 // lower the polling interval for this test 163 conf.setInt(SIZE_LIMIT_KEY, 10 * 1000 /* 10 KB */); 164 165 try { 166 // Create a couple store files w/ 15KB (over 10KB interval) 167 int jmax = (int) Math.ceil(15.0 / compactionThreshold); 168 byte[] pad = new byte[1000]; // 1 KB chunk 169 for (int i = 0; i < compactionThreshold; i++) { 170 Table loader = new RegionAsTable(r); 171 Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i))); 172 p.setDurability(Durability.SKIP_WAL); 173 for (int j = 0; j < jmax; j++) { 174 p.addColumn(COLUMN_FAMILY, Bytes.toBytes(j), pad); 175 } 176 HTestConst.addContent(loader, Bytes.toString(COLUMN_FAMILY)); 177 loader.put(p); 178 r.flush(true); 179 } 180 181 HRegion spyR = spy(r); 182 doAnswer(new Answer<Object>() { 183 @Override 184 public Object answer(InvocationOnMock invocation) throws Throwable { 185 r.writestate.writesEnabled = false; 186 return invocation.callRealMethod(); 187 } 188 }).when(spyR).doRegionCompactionPrep(); 189 190 // force a minor compaction, but not before requesting a stop 191 spyR.compactStores(); 192 193 // ensure that the compaction stopped, all old files are intact, 194 HStore s = r.getStore(COLUMN_FAMILY); 195 assertEquals(compactionThreshold, s.getStorefilesCount()); 196 assertTrue(s.getStorefilesSize() > 15 * 1000); 197 // and no new store files persisted past compactStores() 198 // only one empty dir exists in temp dir 199 FileStatus[] ls = r.getFilesystem().listStatus(r.getRegionFileSystem().getTempDir()); 200 assertEquals(1, ls.length); 201 Path storeTempDir = 202 new Path(r.getRegionFileSystem().getTempDir(), Bytes.toString(COLUMN_FAMILY)); 203 assertTrue(r.getFilesystem().exists(storeTempDir)); 204 ls = r.getFilesystem().listStatus(storeTempDir); 205 assertEquals(0, ls.length); 206 } finally { 207 // don't mess up future tests 208 r.writestate.writesEnabled = true; 209 conf.setInt(SIZE_LIMIT_KEY, 10 * 1000 * 1000 /* 10 MB */); 210 211 // Delete all Store information once done using 212 for (int i = 0; i < compactionThreshold; i++) { 213 Delete delete = new Delete(Bytes.add(STARTROW, Bytes.toBytes(i))); 214 byte[][] famAndQf = { COLUMN_FAMILY, null }; 215 delete.addFamily(famAndQf[0]); 216 r.delete(delete); 217 } 218 r.flush(true); 219 220 // Multiple versions allowed for an entry, so the delete isn't enough 221 // Lower TTL and expire to ensure that all our entries have been wiped 222 final int ttl = 1000; 223 for (HStore store : this.r.stores.values()) { 224 ScanInfo old = store.getScanInfo(); 225 ScanInfo si = old.customize(old.getMaxVersions(), ttl, old.getKeepDeletedCells()); 226 store.setScanInfo(si); 227 } 228 Thread.sleep(ttl); 229 230 r.compact(true); 231 assertEquals(0, count()); 232 } 233 } 234 235 @Test 236 public void testInterruptCompactionByTime() throws Exception { 237 assertEquals(0, count()); 238 239 // lower the polling interval for this test 240 conf.setLong(TIME_LIMIT_KEY, 1 /* 1ms */); 241 242 try { 243 // Create a couple store files w/ 15KB (over 10KB interval) 244 int jmax = (int) Math.ceil(15.0 / compactionThreshold); 245 byte[] pad = new byte[1000]; // 1 KB chunk 246 for (int i = 0; i < compactionThreshold; i++) { 247 Table loader = new RegionAsTable(r); 248 Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i))); 249 p.setDurability(Durability.SKIP_WAL); 250 for (int j = 0; j < jmax; j++) { 251 p.addColumn(COLUMN_FAMILY, Bytes.toBytes(j), pad); 252 } 253 HTestConst.addContent(loader, Bytes.toString(COLUMN_FAMILY)); 254 loader.put(p); 255 r.flush(true); 256 } 257 258 HRegion spyR = spy(r); 259 doAnswer(new Answer() { 260 @Override 261 public Object answer(InvocationOnMock invocation) throws Throwable { 262 r.writestate.writesEnabled = false; 263 return invocation.callRealMethod(); 264 } 265 }).when(spyR).doRegionCompactionPrep(); 266 267 // force a minor compaction, but not before requesting a stop 268 spyR.compactStores(); 269 270 // ensure that the compaction stopped, all old files are intact, 271 HStore s = r.getStore(COLUMN_FAMILY); 272 assertEquals(compactionThreshold, s.getStorefilesCount()); 273 assertTrue(s.getStorefilesSize() > 15 * 1000); 274 // and no new store files persisted past compactStores() 275 // only one empty dir exists in temp dir 276 FileStatus[] ls = r.getFilesystem().listStatus(r.getRegionFileSystem().getTempDir()); 277 assertEquals(1, ls.length); 278 Path storeTempDir = 279 new Path(r.getRegionFileSystem().getTempDir(), Bytes.toString(COLUMN_FAMILY)); 280 assertTrue(r.getFilesystem().exists(storeTempDir)); 281 ls = r.getFilesystem().listStatus(storeTempDir); 282 assertEquals(0, ls.length); 283 } finally { 284 // don't mess up future tests 285 r.writestate.writesEnabled = true; 286 conf.setLong(TIME_LIMIT_KEY, 10 * 1000L /* 10 s */); 287 288 // Delete all Store information once done using 289 for (int i = 0; i < compactionThreshold; i++) { 290 Delete delete = new Delete(Bytes.add(STARTROW, Bytes.toBytes(i))); 291 byte[][] famAndQf = { COLUMN_FAMILY, null }; 292 delete.addFamily(famAndQf[0]); 293 r.delete(delete); 294 } 295 r.flush(true); 296 297 // Multiple versions allowed for an entry, so the delete isn't enough 298 // Lower TTL and expire to ensure that all our entries have been wiped 299 final int ttl = 1000; 300 for (HStore store : this.r.stores.values()) { 301 ScanInfo old = store.getScanInfo(); 302 ScanInfo si = old.customize(old.getMaxVersions(), ttl, old.getKeepDeletedCells()); 303 store.setScanInfo(si); 304 } 305 Thread.sleep(ttl); 306 307 r.compact(true); 308 assertEquals(0, count()); 309 } 310 } 311 312 private int count() throws IOException { 313 int count = 0; 314 for (HStoreFile f : this.r.stores.get(COLUMN_FAMILY_TEXT).getStorefiles()) { 315 HFileScanner scanner = f.getReader().getScanner(false, false); 316 if (!scanner.seekTo()) { 317 continue; 318 } 319 do { 320 count++; 321 } while (scanner.next()); 322 } 323 return count; 324 } 325 326 private void createStoreFile(final HRegion region) throws IOException { 327 createStoreFile(region, Bytes.toString(COLUMN_FAMILY)); 328 } 329 330 private void createStoreFile(final HRegion region, String family) throws IOException { 331 Table loader = new RegionAsTable(region); 332 HTestConst.addContent(loader, family); 333 region.flush(true); 334 } 335 336 @Test 337 public void testCompactionWithCorruptResult() throws Exception { 338 int nfiles = 10; 339 for (int i = 0; i < nfiles; i++) { 340 createStoreFile(r); 341 } 342 HStore store = r.getStore(COLUMN_FAMILY); 343 344 Collection<HStoreFile> storeFiles = store.getStorefiles(); 345 DefaultCompactor tool = (DefaultCompactor) store.storeEngine.getCompactor(); 346 CompactionRequestImpl request = new CompactionRequestImpl(storeFiles); 347 tool.compact(request, NoLimitThroughputController.INSTANCE, null); 348 349 // Now lets corrupt the compacted file. 350 FileSystem fs = store.getFileSystem(); 351 // default compaction policy created one and only one new compacted file 352 Path tmpPath = store.getRegionFileSystem().createTempName(); 353 try (FSDataOutputStream stream = fs.create(tmpPath, null, true, 512, (short) 3, 1024L, null)) { 354 stream.writeChars("CORRUPT FILE!!!!"); 355 } 356 // The complete compaction should fail and the corrupt file should remain 357 // in the 'tmp' directory; 358 assertThrows(IOException.class, () -> store.doCompaction(null, null, null, 359 EnvironmentEdgeManager.currentTime(), Collections.singletonList(tmpPath))); 360 assertTrue(fs.exists(tmpPath)); 361 } 362 363 /** 364 * Create a custom compaction request and be sure that we can track it through the queue, knowing 365 * when the compaction is completed. 366 */ 367 @Test 368 public void testTrackingCompactionRequest() throws Exception { 369 // setup a compact/split thread on a mock server 370 HRegionServer mockServer = Mockito.mock(HRegionServer.class); 371 Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf()); 372 CompactSplit thread = new CompactSplit(mockServer); 373 Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread); 374 375 // setup a region/store with some files 376 HStore store = r.getStore(COLUMN_FAMILY); 377 createStoreFile(r); 378 for (int i = 0; i < MAX_FILES_TO_COMPACT + 1; i++) { 379 createStoreFile(r); 380 } 381 382 CountDownLatch latch = new CountDownLatch(1); 383 Tracker tracker = new Tracker(latch); 384 thread.requestCompaction(r, store, "test custom comapction", PRIORITY_USER, tracker, null); 385 // wait for the latch to complete. 386 latch.await(); 387 388 thread.interruptIfNecessary(); 389 } 390 391 @Test 392 public void testCompactionFailure() throws Exception { 393 // setup a compact/split thread on a mock server 394 HRegionServer mockServer = Mockito.mock(HRegionServer.class); 395 Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf()); 396 CompactSplit thread = new CompactSplit(mockServer); 397 Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread); 398 399 // setup a region/store with some files 400 HStore store = r.getStore(COLUMN_FAMILY); 401 createStoreFile(r); 402 for (int i = 0; i < HStore.DEFAULT_BLOCKING_STOREFILE_COUNT - 1; i++) { 403 createStoreFile(r); 404 } 405 406 HRegion mockRegion = Mockito.spy(r); 407 Mockito.when(mockRegion.checkSplit()) 408 .thenThrow(new RuntimeException("Thrown intentionally by test!")); 409 410 try (MetricsRegionWrapperImpl metricsWrapper = new MetricsRegionWrapperImpl(r)) { 411 412 long preCompletedCount = metricsWrapper.getNumCompactionsCompleted(); 413 long preFailedCount = metricsWrapper.getNumCompactionsFailed(); 414 415 CountDownLatch latch = new CountDownLatch(1); 416 Tracker tracker = new Tracker(latch); 417 thread.requestCompaction(mockRegion, store, "test custom comapction", PRIORITY_USER, tracker, 418 null); 419 // wait for the latch to complete. 420 latch.await(120, TimeUnit.SECONDS); 421 422 // compaction should have completed and been marked as failed due to error in split request 423 long postCompletedCount = metricsWrapper.getNumCompactionsCompleted(); 424 long postFailedCount = metricsWrapper.getNumCompactionsFailed(); 425 426 assertTrue("Completed count should have increased (pre=" + preCompletedCount + ", post=" 427 + postCompletedCount + ")", postCompletedCount > preCompletedCount); 428 assertTrue("Failed count should have increased (pre=" + preFailedCount + ", post=" 429 + postFailedCount + ")", postFailedCount > preFailedCount); 430 } 431 } 432 433 /** 434 * Test no new Compaction requests are generated after calling stop compactions 435 */ 436 @Test 437 public void testStopStartCompaction() throws IOException { 438 // setup a compact/split thread on a mock server 439 HRegionServer mockServer = Mockito.mock(HRegionServer.class); 440 Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf()); 441 final CompactSplit thread = new CompactSplit(mockServer); 442 Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread); 443 // setup a region/store with some files 444 HStore store = r.getStore(COLUMN_FAMILY); 445 createStoreFile(r); 446 for (int i = 0; i < HStore.DEFAULT_BLOCKING_STOREFILE_COUNT - 1; i++) { 447 createStoreFile(r); 448 } 449 thread.switchCompaction(false); 450 thread.requestCompaction(r, store, "test", Store.PRIORITY_USER, 451 CompactionLifeCycleTracker.DUMMY, null); 452 assertFalse(thread.isCompactionsEnabled()); 453 int longCompactions = thread.getLongCompactions().getActiveCount(); 454 int shortCompactions = thread.getShortCompactions().getActiveCount(); 455 assertEquals( 456 "longCompactions=" + longCompactions + "," + "shortCompactions=" + shortCompactions, 0, 457 longCompactions + shortCompactions); 458 thread.switchCompaction(true); 459 assertTrue(thread.isCompactionsEnabled()); 460 // Make sure no compactions have run. 461 assertEquals(0, thread.getLongCompactions().getCompletedTaskCount() 462 + thread.getShortCompactions().getCompletedTaskCount()); 463 // Request a compaction and make sure it is submitted successfully. 464 thread.requestCompaction(r, store, "test", Store.PRIORITY_USER, 465 CompactionLifeCycleTracker.DUMMY, null); 466 // Wait until the compaction finishes. 467 Waiter.waitFor(UTIL.getConfiguration(), 5000, 468 (Waiter.Predicate<Exception>) () -> thread.getLongCompactions().getCompletedTaskCount() 469 + thread.getShortCompactions().getCompletedTaskCount() == 1); 470 // Make sure there are no compactions running. 471 assertEquals(0, 472 thread.getLongCompactions().getActiveCount() + thread.getShortCompactions().getActiveCount()); 473 } 474 475 @Test 476 public void testInterruptingRunningCompactions() throws Exception { 477 // setup a compact/split thread on a mock server 478 conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, 479 WaitThroughPutController.class.getName()); 480 HRegionServer mockServer = Mockito.mock(HRegionServer.class); 481 Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf()); 482 CompactSplit thread = new CompactSplit(mockServer); 483 484 Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread); 485 486 // setup a region/store with some files 487 HStore store = r.getStore(COLUMN_FAMILY); 488 int jmax = (int) Math.ceil(15.0 / compactionThreshold); 489 byte[] pad = new byte[1000]; // 1 KB chunk 490 for (int i = 0; i < compactionThreshold; i++) { 491 Table loader = new RegionAsTable(r); 492 Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i))); 493 p.setDurability(Durability.SKIP_WAL); 494 for (int j = 0; j < jmax; j++) { 495 p.addColumn(COLUMN_FAMILY, Bytes.toBytes(j), pad); 496 } 497 HTestConst.addContent(loader, Bytes.toString(COLUMN_FAMILY)); 498 loader.put(p); 499 r.flush(true); 500 } 501 HStore s = r.getStore(COLUMN_FAMILY); 502 int initialFiles = s.getStorefilesCount(); 503 504 thread.requestCompaction(r, store, "test custom comapction", PRIORITY_USER, 505 CompactionLifeCycleTracker.DUMMY, null); 506 507 Thread.sleep(3000); 508 thread.switchCompaction(false); 509 assertEquals(initialFiles, s.getStorefilesCount()); 510 // don't mess up future tests 511 thread.switchCompaction(true); 512 } 513 514 /** 515 * HBASE-7947: Regression test to ensure adding to the correct list in the {@link CompactSplit} 516 * @throws Exception on failure 517 */ 518 @Test 519 public void testMultipleCustomCompactionRequests() throws Exception { 520 // setup a compact/split thread on a mock server 521 HRegionServer mockServer = Mockito.mock(HRegionServer.class); 522 Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf()); 523 CompactSplit thread = new CompactSplit(mockServer); 524 Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread); 525 526 // setup a region/store with some files 527 int numStores = r.getStores().size(); 528 CountDownLatch latch = new CountDownLatch(numStores); 529 Tracker tracker = new Tracker(latch); 530 // create some store files and setup requests for each store on which we want to do a 531 // compaction 532 for (HStore store : r.getStores()) { 533 createStoreFile(r, store.getColumnFamilyName()); 534 createStoreFile(r, store.getColumnFamilyName()); 535 createStoreFile(r, store.getColumnFamilyName()); 536 thread.requestCompaction(r, store, "test mulitple custom comapctions", PRIORITY_USER, tracker, 537 null); 538 } 539 // wait for the latch to complete. 540 latch.await(); 541 542 thread.interruptIfNecessary(); 543 } 544 545 class StoreMockMaker extends StatefulStoreMockMaker { 546 public ArrayList<HStoreFile> compacting = new ArrayList<>(); 547 public ArrayList<HStoreFile> notCompacting = new ArrayList<>(); 548 private final ArrayList<Integer> results; 549 550 public StoreMockMaker(ArrayList<Integer> results) { 551 this.results = results; 552 } 553 554 public class TestCompactionContext extends CompactionContext { 555 556 private List<HStoreFile> selectedFiles; 557 558 public TestCompactionContext(List<HStoreFile> selectedFiles) { 559 super(); 560 this.selectedFiles = selectedFiles; 561 } 562 563 @Override 564 public List<HStoreFile> preSelect(List<HStoreFile> filesCompacting) { 565 return new ArrayList<>(); 566 } 567 568 @Override 569 public boolean select(List<HStoreFile> filesCompacting, boolean isUserCompaction, 570 boolean mayUseOffPeak, boolean forceMajor) throws IOException { 571 this.request = new CompactionRequestImpl(selectedFiles); 572 this.request.setPriority(getPriority()); 573 return true; 574 } 575 576 @Override 577 public List<Path> compact(ThroughputController throughputController, User user) 578 throws IOException { 579 finishCompaction(this.selectedFiles); 580 return new ArrayList<>(); 581 } 582 } 583 584 @Override 585 public synchronized Optional<CompactionContext> selectCompaction() { 586 CompactionContext ctx = new TestCompactionContext(new ArrayList<>(notCompacting)); 587 compacting.addAll(notCompacting); 588 notCompacting.clear(); 589 try { 590 ctx.select(null, false, false, false); 591 } catch (IOException ex) { 592 fail("Shouldn't happen"); 593 } 594 return Optional.of(ctx); 595 } 596 597 @Override 598 public synchronized void cancelCompaction(Object object) { 599 TestCompactionContext ctx = (TestCompactionContext) object; 600 compacting.removeAll(ctx.selectedFiles); 601 notCompacting.addAll(ctx.selectedFiles); 602 } 603 604 public synchronized void finishCompaction(List<HStoreFile> sfs) { 605 if (sfs.isEmpty()) return; 606 synchronized (results) { 607 results.add(sfs.size()); 608 } 609 compacting.removeAll(sfs); 610 } 611 612 @Override 613 public int getPriority() { 614 return 7 - compacting.size() - notCompacting.size(); 615 } 616 } 617 618 public class BlockingStoreMockMaker extends StatefulStoreMockMaker { 619 BlockingCompactionContext blocked = null; 620 621 public class BlockingCompactionContext extends CompactionContext { 622 public volatile boolean isInCompact = false; 623 624 public void unblock() { 625 synchronized (this) { 626 this.notifyAll(); 627 } 628 } 629 630 @Override 631 public List<Path> compact(ThroughputController throughputController, User user) 632 throws IOException { 633 try { 634 isInCompact = true; 635 synchronized (this) { 636 this.wait(); 637 } 638 } catch (InterruptedException e) { 639 Assume.assumeNoException(e); 640 } 641 return new ArrayList<>(); 642 } 643 644 @Override 645 public List<HStoreFile> preSelect(List<HStoreFile> filesCompacting) { 646 return new ArrayList<>(); 647 } 648 649 @Override 650 public boolean select(List<HStoreFile> f, boolean i, boolean m, boolean e) 651 throws IOException { 652 this.request = new CompactionRequestImpl(new ArrayList<>()); 653 return true; 654 } 655 } 656 657 @Override 658 public Optional<CompactionContext> selectCompaction() { 659 this.blocked = new BlockingCompactionContext(); 660 try { 661 this.blocked.select(null, false, false, false); 662 } catch (IOException ex) { 663 fail("Shouldn't happen"); 664 } 665 return Optional.of(blocked); 666 } 667 668 @Override 669 public void cancelCompaction(Object object) { 670 } 671 672 @Override 673 public int getPriority() { 674 return Integer.MIN_VALUE; // some invalid value, see createStoreMock 675 } 676 677 public BlockingCompactionContext waitForBlocking() { 678 while (this.blocked == null || !this.blocked.isInCompact) { 679 Threads.sleepWithoutInterrupt(50); 680 } 681 BlockingCompactionContext ctx = this.blocked; 682 this.blocked = null; 683 return ctx; 684 } 685 686 @Override 687 public HStore createStoreMock(String name) throws Exception { 688 return createStoreMock(Integer.MIN_VALUE, name); 689 } 690 691 public HStore createStoreMock(int priority, String name) throws Exception { 692 // Override the mock to always return the specified priority. 693 HStore s = super.createStoreMock(name); 694 when(s.getCompactPriority()).thenReturn(priority); 695 return s; 696 } 697 } 698 699 /** Test compaction priority management and multiple compactions per store (HBASE-8665). */ 700 @Test 701 public void testCompactionQueuePriorities() throws Exception { 702 // Setup a compact/split thread on a mock server. 703 final Configuration conf = HBaseConfiguration.create(); 704 HRegionServer mockServer = mock(HRegionServer.class); 705 when(mockServer.isStopped()).thenReturn(false); 706 when(mockServer.getConfiguration()).thenReturn(conf); 707 when(mockServer.getChoreService()).thenReturn(new ChoreService("test")); 708 CompactSplit cst = new CompactSplit(mockServer); 709 when(mockServer.getCompactSplitThread()).thenReturn(cst); 710 // prevent large compaction thread pool stealing job from small compaction queue. 711 cst.shutdownLongCompactions(); 712 // Set up the region mock that redirects compactions. 713 HRegion r = mock(HRegion.class); 714 when(r.compact(any(), any(), any(), any())).then(new Answer<Boolean>() { 715 @Override 716 public Boolean answer(InvocationOnMock invocation) throws Throwable { 717 invocation.<CompactionContext> getArgument(0).compact(invocation.getArgument(2), null); 718 return true; 719 } 720 }); 721 722 // Set up store mocks for 2 "real" stores and the one we use for blocking CST. 723 ArrayList<Integer> results = new ArrayList<>(); 724 StoreMockMaker sm = new StoreMockMaker(results), sm2 = new StoreMockMaker(results); 725 HStore store = sm.createStoreMock("store1"); 726 HStore store2 = sm2.createStoreMock("store2"); 727 BlockingStoreMockMaker blocker = new BlockingStoreMockMaker(); 728 729 // First, block the compaction thread so that we could muck with queue. 730 cst.requestSystemCompaction(r, blocker.createStoreMock(1, "b-pri1"), "b-pri1"); 731 BlockingStoreMockMaker.BlockingCompactionContext currentBlock = blocker.waitForBlocking(); 732 733 // Add 4 files to store1, 3 to store2, and queue compactions; pri 3 and 4 respectively. 734 for (int i = 0; i < 4; ++i) { 735 sm.notCompacting.add(createFile()); 736 } 737 cst.requestSystemCompaction(r, store, "s1-pri3"); 738 for (int i = 0; i < 3; ++i) { 739 sm2.notCompacting.add(createFile()); 740 } 741 cst.requestSystemCompaction(r, store2, "s2-pri4"); 742 // Now add 2 more files to store1 and queue compaction - pri 1. 743 for (int i = 0; i < 2; ++i) { 744 sm.notCompacting.add(createFile()); 745 } 746 cst.requestSystemCompaction(r, store, "s1-pri1"); 747 // Finally add blocking compaction with priority 2. 748 cst.requestSystemCompaction(r, blocker.createStoreMock(2, "b-pri2"), "b-pri2"); 749 750 // Unblock the blocking compaction; we should run pri1 and become block again in pri2. 751 currentBlock.unblock(); 752 currentBlock = blocker.waitForBlocking(); 753 // Pri1 should have "compacted" all 6 files. 754 assertEquals(1, results.size()); 755 assertEquals(6, results.get(0).intValue()); 756 // Add 2 files to store 1 (it has 2 files now). 757 for (int i = 0; i < 2; ++i) { 758 sm.notCompacting.add(createFile()); 759 } 760 // Now we have pri4 for store 2 in queue, and pri3 for store1; store1's current priority 761 // is 5, however, so it must not preempt store 2. Add blocking compaction at the end. 762 cst.requestSystemCompaction(r, blocker.createStoreMock(7, "b-pri7"), "b-pri7"); 763 currentBlock.unblock(); 764 currentBlock = blocker.waitForBlocking(); 765 assertEquals(3, results.size()); 766 assertEquals(3, results.get(1).intValue()); // 3 files should go before 2 files. 767 assertEquals(2, results.get(2).intValue()); 768 769 currentBlock.unblock(); 770 cst.interruptIfNecessary(); 771 } 772 773 /** 774 * Firstly write 10 cells (with different time stamp) to a qualifier and flush to hfile1, then 775 * write 10 cells (with different time stamp) to the same qualifier and flush to hfile2. The 776 * latest cell (cell-A) in hfile1 and the oldest cell (cell-B) in hfile2 are with the same time 777 * stamp but different sequence id, and will get scanned successively during compaction. 778 * <p/> 779 * We set compaction.kv.max to 10 so compaction will scan 10 versions each round, meanwhile we set 780 * keepSeqIdPeriod=0 in {@link DummyCompactor} so all 10 versions of hfile2 will be written out 781 * with seqId cleaned (set to 0) including cell-B, then when scanner goes to cell-A it will cause 782 * a scan out-of-order assertion error before HBASE-16931 n * if error occurs during the test 783 */ 784 @Test 785 public void testCompactionSeqId() throws Exception { 786 final byte[] ROW = Bytes.toBytes("row"); 787 final byte[] QUALIFIER = Bytes.toBytes("qualifier"); 788 789 long timestamp = 10000; 790 791 // row1/cf:a/10009/Put/vlen=2/seqid=11 V: v9 792 // row1/cf:a/10008/Put/vlen=2/seqid=10 V: v8 793 // row1/cf:a/10007/Put/vlen=2/seqid=9 V: v7 794 // row1/cf:a/10006/Put/vlen=2/seqid=8 V: v6 795 // row1/cf:a/10005/Put/vlen=2/seqid=7 V: v5 796 // row1/cf:a/10004/Put/vlen=2/seqid=6 V: v4 797 // row1/cf:a/10003/Put/vlen=2/seqid=5 V: v3 798 // row1/cf:a/10002/Put/vlen=2/seqid=4 V: v2 799 // row1/cf:a/10001/Put/vlen=2/seqid=3 V: v1 800 // row1/cf:a/10000/Put/vlen=2/seqid=2 V: v0 801 for (int i = 0; i < 10; i++) { 802 Put put = new Put(ROW); 803 put.addColumn(FAMILY, QUALIFIER, timestamp + i, Bytes.toBytes("v" + i)); 804 r.put(put); 805 } 806 r.flush(true); 807 808 // row1/cf:a/10018/Put/vlen=3/seqid=16 V: v18 809 // row1/cf:a/10017/Put/vlen=3/seqid=17 V: v17 810 // row1/cf:a/10016/Put/vlen=3/seqid=18 V: v16 811 // row1/cf:a/10015/Put/vlen=3/seqid=19 V: v15 812 // row1/cf:a/10014/Put/vlen=3/seqid=20 V: v14 813 // row1/cf:a/10013/Put/vlen=3/seqid=21 V: v13 814 // row1/cf:a/10012/Put/vlen=3/seqid=22 V: v12 815 // row1/cf:a/10011/Put/vlen=3/seqid=23 V: v11 816 // row1/cf:a/10010/Put/vlen=3/seqid=24 V: v10 817 // row1/cf:a/10009/Put/vlen=2/seqid=25 V: v9 818 for (int i = 18; i > 8; i--) { 819 Put put = new Put(ROW); 820 put.addColumn(FAMILY, QUALIFIER, timestamp + i, Bytes.toBytes("v" + i)); 821 r.put(put); 822 } 823 r.flush(true); 824 r.compact(true); 825 } 826 827 public static class DummyCompactor extends DefaultCompactor { 828 public DummyCompactor(Configuration conf, HStore store) { 829 super(conf, store); 830 this.keepSeqIdPeriod = 0; 831 } 832 } 833 834 private static HStoreFile createFile() throws Exception { 835 HStoreFile sf = mock(HStoreFile.class); 836 when(sf.getPath()).thenReturn(new Path("file")); 837 StoreFileReader r = mock(StoreFileReader.class); 838 when(r.length()).thenReturn(10L); 839 when(sf.getReader()).thenReturn(r); 840 return sf; 841 } 842 843 /** 844 * Simple {@link CompactionLifeCycleTracker} on which you can wait until the requested compaction 845 * finishes. 846 */ 847 public static class Tracker implements CompactionLifeCycleTracker { 848 849 private final CountDownLatch done; 850 851 public Tracker(CountDownLatch done) { 852 this.done = done; 853 } 854 855 @Override 856 public void afterExecution(Store store) { 857 done.countDown(); 858 } 859 } 860 861 /** 862 * Simple {@link CompactionLifeCycleTracker} on which you can wait until the requested compaction 863 * finishes. 864 */ 865 public static class WaitThroughPutController extends NoLimitThroughputController { 866 867 public WaitThroughPutController() { 868 } 869 870 @Override 871 public long control(String compactionName, long size) throws InterruptedException { 872 Thread.sleep(6000000); 873 return 6000000; 874 } 875 } 876}