001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY; 021import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY_BYTES; 022import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1; 023import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER; 024import static org.junit.Assert.assertEquals; 025import static org.junit.Assert.assertFalse; 026import static org.junit.Assert.assertTrue; 027import static org.junit.Assert.fail; 028import static org.mockito.Matchers.any; 029import static org.mockito.Mockito.doAnswer; 030import static org.mockito.Mockito.mock; 031import static org.mockito.Mockito.spy; 032import static org.mockito.Mockito.when; 033 034import java.io.IOException; 035import java.util.ArrayList; 036import java.util.Collection; 037import java.util.List; 038import java.util.Optional; 039import java.util.concurrent.CountDownLatch; 040import java.util.concurrent.TimeUnit; 041import org.apache.hadoop.conf.Configuration; 042import org.apache.hadoop.fs.FSDataOutputStream; 043import org.apache.hadoop.fs.FileStatus; 044import org.apache.hadoop.fs.FileSystem; 045import org.apache.hadoop.fs.Path; 046import org.apache.hadoop.hbase.ChoreService; 047import org.apache.hadoop.hbase.HBaseClassTestRule; 048import org.apache.hadoop.hbase.HBaseConfiguration; 049import org.apache.hadoop.hbase.HBaseTestCase; 050import org.apache.hadoop.hbase.HBaseTestingUtility; 051import org.apache.hadoop.hbase.HColumnDescriptor; 052import org.apache.hadoop.hbase.HConstants; 053import org.apache.hadoop.hbase.HTableDescriptor; 054import org.apache.hadoop.hbase.Waiter; 055import org.apache.hadoop.hbase.client.Delete; 056import org.apache.hadoop.hbase.client.Durability; 057import org.apache.hadoop.hbase.client.Put; 058import org.apache.hadoop.hbase.client.Table; 059import org.apache.hadoop.hbase.io.hfile.HFileScanner; 060import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; 061import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 062import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; 063import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; 064import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory; 065import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; 066import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 067import org.apache.hadoop.hbase.security.User; 068import org.apache.hadoop.hbase.testclassification.MediumTests; 069import org.apache.hadoop.hbase.testclassification.RegionServerTests; 070import org.apache.hadoop.hbase.util.Bytes; 071import org.apache.hadoop.hbase.util.Threads; 072import org.apache.hadoop.hbase.wal.WAL; 073import org.junit.After; 074import org.junit.Assume; 075import org.junit.Before; 076import org.junit.ClassRule; 077import org.junit.Rule; 078import org.junit.Test; 079import org.junit.experimental.categories.Category; 080import org.junit.rules.TestName; 081import org.mockito.Mockito; 082import org.mockito.invocation.InvocationOnMock; 083import org.mockito.stubbing.Answer; 084 085/** 086 * Test compaction framework and common functions 087 */ 088@Category({RegionServerTests.class, MediumTests.class}) 089public class TestCompaction { 090 091 @ClassRule 092 public static final HBaseClassTestRule CLASS_RULE = 093 HBaseClassTestRule.forClass(TestCompaction.class); 094 095 @Rule public TestName name = new TestName(); 096 private static final HBaseTestingUtility UTIL = HBaseTestingUtility.createLocalHTU(); 097 protected Configuration conf = UTIL.getConfiguration(); 098 099 private HRegion r = null; 100 private HTableDescriptor htd = null; 101 private static final byte [] COLUMN_FAMILY = fam1; 102 private final byte [] STARTROW = Bytes.toBytes(START_KEY); 103 private static final byte [] COLUMN_FAMILY_TEXT = COLUMN_FAMILY; 104 private int compactionThreshold; 105 private byte[] secondRowBytes, thirdRowBytes; 106 private static final long MAX_FILES_TO_COMPACT = 10; 107 private final byte[] FAMILY = Bytes.toBytes("cf"); 108 109 /** constructor */ 110 public TestCompaction() { 111 super(); 112 113 // Set cache flush size to 1MB 114 conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024); 115 conf.setInt(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, 100); 116 conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, 117 NoLimitThroughputController.class.getName()); 118 compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3); 119 120 secondRowBytes = START_KEY_BYTES.clone(); 121 // Increment the least significant character so we get to next row. 122 secondRowBytes[START_KEY_BYTES.length - 1]++; 123 thirdRowBytes = START_KEY_BYTES.clone(); 124 thirdRowBytes[START_KEY_BYTES.length - 1] = 125 (byte) (thirdRowBytes[START_KEY_BYTES.length - 1] + 2); 126 } 127 128 @Before 129 public void setUp() throws Exception { 130 this.htd = UTIL.createTableDescriptor(name.getMethodName()); 131 if (name.getMethodName().equals("testCompactionSeqId")) { 132 UTIL.getConfiguration().set("hbase.hstore.compaction.kv.max", "10"); 133 UTIL.getConfiguration().set( 134 DefaultStoreEngine.DEFAULT_COMPACTOR_CLASS_KEY, 135 DummyCompactor.class.getName()); 136 HColumnDescriptor hcd = new HColumnDescriptor(FAMILY); 137 hcd.setMaxVersions(65536); 138 this.htd.addFamily(hcd); 139 } 140 this.r = UTIL.createLocalHRegion(htd, null, null); 141 } 142 143 @After 144 public void tearDown() throws Exception { 145 WAL wal = r.getWAL(); 146 this.r.close(); 147 wal.close(); 148 } 149 150 /** 151 * Verify that you can stop a long-running compaction 152 * (used during RS shutdown) 153 * @throws Exception 154 */ 155 @Test 156 public void testInterruptCompaction() throws Exception { 157 assertEquals(0, count()); 158 159 // lower the polling interval for this test 160 int origWI = HStore.closeCheckInterval; 161 HStore.closeCheckInterval = 10*1000; // 10 KB 162 163 try { 164 // Create a couple store files w/ 15KB (over 10KB interval) 165 int jmax = (int) Math.ceil(15.0/compactionThreshold); 166 byte [] pad = new byte[1000]; // 1 KB chunk 167 for (int i = 0; i < compactionThreshold; i++) { 168 Table loader = new RegionAsTable(r); 169 Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i))); 170 p.setDurability(Durability.SKIP_WAL); 171 for (int j = 0; j < jmax; j++) { 172 p.addColumn(COLUMN_FAMILY, Bytes.toBytes(j), pad); 173 } 174 HBaseTestCase.addContent(loader, Bytes.toString(COLUMN_FAMILY)); 175 loader.put(p); 176 r.flush(true); 177 } 178 179 HRegion spyR = spy(r); 180 doAnswer(new Answer() { 181 @Override 182 public Object answer(InvocationOnMock invocation) throws Throwable { 183 r.writestate.writesEnabled = false; 184 return invocation.callRealMethod(); 185 } 186 }).when(spyR).doRegionCompactionPrep(); 187 188 // force a minor compaction, but not before requesting a stop 189 spyR.compactStores(); 190 191 // ensure that the compaction stopped, all old files are intact, 192 HStore s = r.getStore(COLUMN_FAMILY); 193 assertEquals(compactionThreshold, s.getStorefilesCount()); 194 assertTrue(s.getStorefilesSize() > 15*1000); 195 // and no new store files persisted past compactStores() 196 // only one empty dir exists in temp dir 197 FileStatus[] ls = r.getFilesystem().listStatus(r.getRegionFileSystem().getTempDir()); 198 assertEquals(1, ls.length); 199 Path storeTempDir = new Path(r.getRegionFileSystem().getTempDir(), Bytes.toString(COLUMN_FAMILY)); 200 assertTrue(r.getFilesystem().exists(storeTempDir)); 201 ls = r.getFilesystem().listStatus(storeTempDir); 202 assertEquals(0, ls.length); 203 } finally { 204 // don't mess up future tests 205 r.writestate.writesEnabled = true; 206 HStore.closeCheckInterval = origWI; 207 208 // Delete all Store information once done using 209 for (int i = 0; i < compactionThreshold; i++) { 210 Delete delete = new Delete(Bytes.add(STARTROW, Bytes.toBytes(i))); 211 byte [][] famAndQf = {COLUMN_FAMILY, null}; 212 delete.addFamily(famAndQf[0]); 213 r.delete(delete); 214 } 215 r.flush(true); 216 217 // Multiple versions allowed for an entry, so the delete isn't enough 218 // Lower TTL and expire to ensure that all our entries have been wiped 219 final int ttl = 1000; 220 for (HStore store : this.r.stores.values()) { 221 ScanInfo old = store.getScanInfo(); 222 ScanInfo si = old.customize(old.getMaxVersions(), ttl, old.getKeepDeletedCells()); 223 store.setScanInfo(si); 224 } 225 Thread.sleep(ttl); 226 227 r.compact(true); 228 assertEquals(0, count()); 229 } 230 } 231 232 private int count() throws IOException { 233 int count = 0; 234 for (HStoreFile f: this.r.stores. 235 get(COLUMN_FAMILY_TEXT).getStorefiles()) { 236 HFileScanner scanner = f.getReader().getScanner(false, false); 237 if (!scanner.seekTo()) { 238 continue; 239 } 240 do { 241 count++; 242 } while(scanner.next()); 243 } 244 return count; 245 } 246 247 private void createStoreFile(final HRegion region) throws IOException { 248 createStoreFile(region, Bytes.toString(COLUMN_FAMILY)); 249 } 250 251 private void createStoreFile(final HRegion region, String family) throws IOException { 252 Table loader = new RegionAsTable(region); 253 HBaseTestCase.addContent(loader, family); 254 region.flush(true); 255 } 256 257 @Test 258 public void testCompactionWithCorruptResult() throws Exception { 259 int nfiles = 10; 260 for (int i = 0; i < nfiles; i++) { 261 createStoreFile(r); 262 } 263 HStore store = r.getStore(COLUMN_FAMILY); 264 265 Collection<HStoreFile> storeFiles = store.getStorefiles(); 266 DefaultCompactor tool = (DefaultCompactor)store.storeEngine.getCompactor(); 267 tool.compactForTesting(storeFiles, false); 268 269 // Now lets corrupt the compacted file. 270 FileSystem fs = store.getFileSystem(); 271 // default compaction policy created one and only one new compacted file 272 Path dstPath = store.getRegionFileSystem().createTempName(); 273 FSDataOutputStream stream = fs.create(dstPath, null, true, 512, (short)3, 1024L, null); 274 stream.writeChars("CORRUPT FILE!!!!"); 275 stream.close(); 276 Path origPath = store.getRegionFileSystem().commitStoreFile( 277 Bytes.toString(COLUMN_FAMILY), dstPath); 278 279 try { 280 ((HStore)store).moveFileIntoPlace(origPath); 281 } catch (Exception e) { 282 // The complete compaction should fail and the corrupt file should remain 283 // in the 'tmp' directory; 284 assertTrue(fs.exists(origPath)); 285 assertFalse(fs.exists(dstPath)); 286 System.out.println("testCompactionWithCorruptResult Passed"); 287 return; 288 } 289 fail("testCompactionWithCorruptResult failed since no exception was" + 290 "thrown while completing a corrupt file"); 291 } 292 293 /** 294 * Create a custom compaction request and be sure that we can track it through the queue, knowing 295 * when the compaction is completed. 296 */ 297 @Test 298 public void testTrackingCompactionRequest() throws Exception { 299 // setup a compact/split thread on a mock server 300 HRegionServer mockServer = Mockito.mock(HRegionServer.class); 301 Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf()); 302 CompactSplit thread = new CompactSplit(mockServer); 303 Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread); 304 305 // setup a region/store with some files 306 HStore store = r.getStore(COLUMN_FAMILY); 307 createStoreFile(r); 308 for (int i = 0; i < MAX_FILES_TO_COMPACT + 1; i++) { 309 createStoreFile(r); 310 } 311 312 CountDownLatch latch = new CountDownLatch(1); 313 Tracker tracker = new Tracker(latch); 314 thread.requestCompaction(r, store, "test custom comapction", PRIORITY_USER, tracker, 315 null); 316 // wait for the latch to complete. 317 latch.await(); 318 319 thread.interruptIfNecessary(); 320 } 321 322 @Test 323 public void testCompactionFailure() throws Exception { 324 // setup a compact/split thread on a mock server 325 HRegionServer mockServer = Mockito.mock(HRegionServer.class); 326 Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf()); 327 CompactSplit thread = new CompactSplit(mockServer); 328 Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread); 329 330 // setup a region/store with some files 331 HStore store = r.getStore(COLUMN_FAMILY); 332 createStoreFile(r); 333 for (int i = 0; i < HStore.DEFAULT_BLOCKING_STOREFILE_COUNT - 1; i++) { 334 createStoreFile(r); 335 } 336 337 HRegion mockRegion = Mockito.spy(r); 338 Mockito.when(mockRegion.checkSplit()). 339 thenThrow(new RuntimeException("Thrown intentionally by test!")); 340 341 MetricsRegionWrapper metricsWrapper = new MetricsRegionWrapperImpl(r); 342 343 long preCompletedCount = metricsWrapper.getNumCompactionsCompleted(); 344 long preFailedCount = metricsWrapper.getNumCompactionsFailed(); 345 346 CountDownLatch latch = new CountDownLatch(1); 347 Tracker tracker = new Tracker(latch); 348 thread.requestCompaction(mockRegion, store, "test custom comapction", PRIORITY_USER, 349 tracker, null); 350 // wait for the latch to complete. 351 latch.await(120, TimeUnit.SECONDS); 352 353 // compaction should have completed and been marked as failed due to error in split request 354 long postCompletedCount = metricsWrapper.getNumCompactionsCompleted(); 355 long postFailedCount = metricsWrapper.getNumCompactionsFailed(); 356 357 assertTrue("Completed count should have increased (pre=" + preCompletedCount + 358 ", post="+postCompletedCount+")", 359 postCompletedCount > preCompletedCount); 360 assertTrue("Failed count should have increased (pre=" + preFailedCount + 361 ", post=" + postFailedCount + ")", 362 postFailedCount > preFailedCount); 363 } 364 365 /** 366 * Test no new Compaction requests are generated after calling stop compactions 367 */ 368 @Test 369 public void testStopStartCompaction() throws IOException { 370 // setup a compact/split thread on a mock server 371 HRegionServer mockServer = Mockito.mock(HRegionServer.class); 372 Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf()); 373 final CompactSplit thread = new CompactSplit(mockServer); 374 Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread); 375 // setup a region/store with some files 376 HStore store = r.getStore(COLUMN_FAMILY); 377 createStoreFile(r); 378 for (int i = 0; i < HStore.DEFAULT_BLOCKING_STOREFILE_COUNT - 1; i++) { 379 createStoreFile(r); 380 } 381 thread.switchCompaction(false); 382 thread.requestCompaction(r, store, "test", Store.PRIORITY_USER, 383 CompactionLifeCycleTracker.DUMMY, null); 384 assertFalse(thread.isCompactionsEnabled()); 385 int longCompactions = thread.getLongCompactions().getActiveCount(); 386 int shortCompactions = thread.getShortCompactions().getActiveCount(); 387 assertEquals("longCompactions=" + longCompactions + "," + 388 "shortCompactions=" + shortCompactions, 0, longCompactions + shortCompactions); 389 thread.switchCompaction(true); 390 assertTrue(thread.isCompactionsEnabled()); 391 // Make sure no compactions have run. 392 assertEquals(0, thread.getLongCompactions().getCompletedTaskCount() + 393 thread.getShortCompactions().getCompletedTaskCount()); 394 // Request a compaction and make sure it is submitted successfully. 395 thread.requestCompaction(r, store, "test", Store.PRIORITY_USER, 396 CompactionLifeCycleTracker.DUMMY, null); 397 // Wait until the compaction finishes. 398 Waiter.waitFor(UTIL.getConfiguration(), 5000, 399 (Waiter.Predicate<Exception>) () -> thread.getLongCompactions().getCompletedTaskCount() + 400 thread.getShortCompactions().getCompletedTaskCount() == 1); 401 // Make sure there are no compactions running. 402 assertEquals(0, thread.getLongCompactions().getActiveCount() 403 + thread.getShortCompactions().getActiveCount()); 404 } 405 406 @Test public void testInterruptingRunningCompactions() throws Exception { 407 // setup a compact/split thread on a mock server 408 conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, 409 WaitThroughPutController.class.getName()); 410 HRegionServer mockServer = Mockito.mock(HRegionServer.class); 411 Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf()); 412 CompactSplit thread = new CompactSplit(mockServer); 413 414 Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread); 415 416 // setup a region/store with some files 417 HStore store = r.getStore(COLUMN_FAMILY); 418 int jmax = (int) Math.ceil(15.0 / compactionThreshold); 419 byte[] pad = new byte[1000]; // 1 KB chunk 420 for (int i = 0; i < compactionThreshold; i++) { 421 Table loader = new RegionAsTable(r); 422 Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i))); 423 p.setDurability(Durability.SKIP_WAL); 424 for (int j = 0; j < jmax; j++) { 425 p.addColumn(COLUMN_FAMILY, Bytes.toBytes(j), pad); 426 } 427 HBaseTestCase.addContent(loader, Bytes.toString(COLUMN_FAMILY)); 428 loader.put(p); 429 r.flush(true); 430 } 431 HStore s = r.getStore(COLUMN_FAMILY); 432 int initialFiles = s.getStorefilesCount(); 433 434 thread.requestCompaction(r, store, "test custom comapction", PRIORITY_USER, 435 CompactionLifeCycleTracker.DUMMY, null); 436 437 Thread.sleep(3000); 438 thread.switchCompaction(false); 439 assertEquals(initialFiles, s.getStorefilesCount()); 440 //don't mess up future tests 441 thread.switchCompaction(true); 442 } 443 444 /** 445 * HBASE-7947: Regression test to ensure adding to the correct list in the 446 * {@link CompactSplit} 447 * @throws Exception on failure 448 */ 449 @Test 450 public void testMultipleCustomCompactionRequests() throws Exception { 451 // setup a compact/split thread on a mock server 452 HRegionServer mockServer = Mockito.mock(HRegionServer.class); 453 Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf()); 454 CompactSplit thread = new CompactSplit(mockServer); 455 Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread); 456 457 // setup a region/store with some files 458 int numStores = r.getStores().size(); 459 CountDownLatch latch = new CountDownLatch(numStores); 460 Tracker tracker = new Tracker(latch); 461 // create some store files and setup requests for each store on which we want to do a 462 // compaction 463 for (HStore store : r.getStores()) { 464 createStoreFile(r, store.getColumnFamilyName()); 465 createStoreFile(r, store.getColumnFamilyName()); 466 createStoreFile(r, store.getColumnFamilyName()); 467 thread.requestCompaction(r, store, "test mulitple custom comapctions", PRIORITY_USER, 468 tracker, null); 469 } 470 // wait for the latch to complete. 471 latch.await(); 472 473 thread.interruptIfNecessary(); 474 } 475 476 class StoreMockMaker extends StatefulStoreMockMaker { 477 public ArrayList<HStoreFile> compacting = new ArrayList<>(); 478 public ArrayList<HStoreFile> notCompacting = new ArrayList<>(); 479 private final ArrayList<Integer> results; 480 481 public StoreMockMaker(ArrayList<Integer> results) { 482 this.results = results; 483 } 484 485 public class TestCompactionContext extends CompactionContext { 486 487 private List<HStoreFile> selectedFiles; 488 489 public TestCompactionContext(List<HStoreFile> selectedFiles) { 490 super(); 491 this.selectedFiles = selectedFiles; 492 } 493 494 @Override 495 public List<HStoreFile> preSelect(List<HStoreFile> filesCompacting) { 496 return new ArrayList<>(); 497 } 498 499 @Override 500 public boolean select(List<HStoreFile> filesCompacting, boolean isUserCompaction, 501 boolean mayUseOffPeak, boolean forceMajor) throws IOException { 502 this.request = new CompactionRequestImpl(selectedFiles); 503 this.request.setPriority(getPriority()); 504 return true; 505 } 506 507 @Override 508 public List<Path> compact(ThroughputController throughputController, User user) 509 throws IOException { 510 finishCompaction(this.selectedFiles); 511 return new ArrayList<>(); 512 } 513 } 514 515 @Override 516 public synchronized Optional<CompactionContext> selectCompaction() { 517 CompactionContext ctx = new TestCompactionContext(new ArrayList<>(notCompacting)); 518 compacting.addAll(notCompacting); 519 notCompacting.clear(); 520 try { 521 ctx.select(null, false, false, false); 522 } catch (IOException ex) { 523 fail("Shouldn't happen"); 524 } 525 return Optional.of(ctx); 526 } 527 528 @Override 529 public synchronized void cancelCompaction(Object object) { 530 TestCompactionContext ctx = (TestCompactionContext)object; 531 compacting.removeAll(ctx.selectedFiles); 532 notCompacting.addAll(ctx.selectedFiles); 533 } 534 535 public synchronized void finishCompaction(List<HStoreFile> sfs) { 536 if (sfs.isEmpty()) return; 537 synchronized (results) { 538 results.add(sfs.size()); 539 } 540 compacting.removeAll(sfs); 541 } 542 543 @Override 544 public int getPriority() { 545 return 7 - compacting.size() - notCompacting.size(); 546 } 547 } 548 549 public class BlockingStoreMockMaker extends StatefulStoreMockMaker { 550 BlockingCompactionContext blocked = null; 551 552 public class BlockingCompactionContext extends CompactionContext { 553 public volatile boolean isInCompact = false; 554 555 public void unblock() { 556 synchronized (this) { 557 this.notifyAll(); 558 } 559 } 560 561 @Override 562 public List<Path> compact(ThroughputController throughputController, User user) 563 throws IOException { 564 try { 565 isInCompact = true; 566 synchronized (this) { 567 this.wait(); 568 } 569 } catch (InterruptedException e) { 570 Assume.assumeNoException(e); 571 } 572 return new ArrayList<>(); 573 } 574 575 @Override 576 public List<HStoreFile> preSelect(List<HStoreFile> filesCompacting) { 577 return new ArrayList<>(); 578 } 579 580 @Override 581 public boolean select(List<HStoreFile> f, boolean i, boolean m, boolean e) 582 throws IOException { 583 this.request = new CompactionRequestImpl(new ArrayList<>()); 584 return true; 585 } 586 } 587 588 @Override 589 public Optional<CompactionContext> selectCompaction() { 590 this.blocked = new BlockingCompactionContext(); 591 try { 592 this.blocked.select(null, false, false, false); 593 } catch (IOException ex) { 594 fail("Shouldn't happen"); 595 } 596 return Optional.of(blocked); 597 } 598 599 @Override 600 public void cancelCompaction(Object object) {} 601 602 @Override 603 public int getPriority() { 604 return Integer.MIN_VALUE; // some invalid value, see createStoreMock 605 } 606 607 public BlockingCompactionContext waitForBlocking() { 608 while (this.blocked == null || !this.blocked.isInCompact) { 609 Threads.sleepWithoutInterrupt(50); 610 } 611 BlockingCompactionContext ctx = this.blocked; 612 this.blocked = null; 613 return ctx; 614 } 615 616 @Override 617 public HStore createStoreMock(String name) throws Exception { 618 return createStoreMock(Integer.MIN_VALUE, name); 619 } 620 621 public HStore createStoreMock(int priority, String name) throws Exception { 622 // Override the mock to always return the specified priority. 623 HStore s = super.createStoreMock(name); 624 when(s.getCompactPriority()).thenReturn(priority); 625 return s; 626 } 627 } 628 629 /** Test compaction priority management and multiple compactions per store (HBASE-8665). */ 630 @Test 631 public void testCompactionQueuePriorities() throws Exception { 632 // Setup a compact/split thread on a mock server. 633 final Configuration conf = HBaseConfiguration.create(); 634 HRegionServer mockServer = mock(HRegionServer.class); 635 when(mockServer.isStopped()).thenReturn(false); 636 when(mockServer.getConfiguration()).thenReturn(conf); 637 when(mockServer.getChoreService()).thenReturn(new ChoreService("test")); 638 CompactSplit cst = new CompactSplit(mockServer); 639 when(mockServer.getCompactSplitThread()).thenReturn(cst); 640 //prevent large compaction thread pool stealing job from small compaction queue. 641 cst.shutdownLongCompactions(); 642 // Set up the region mock that redirects compactions. 643 HRegion r = mock(HRegion.class); 644 when( 645 r.compact(any(), any(), any(), any())).then(new Answer<Boolean>() { 646 @Override 647 public Boolean answer(InvocationOnMock invocation) throws Throwable { 648 invocation.<CompactionContext>getArgument(0).compact(invocation.getArgument(2), null); 649 return true; 650 } 651 }); 652 653 // Set up store mocks for 2 "real" stores and the one we use for blocking CST. 654 ArrayList<Integer> results = new ArrayList<>(); 655 StoreMockMaker sm = new StoreMockMaker(results), sm2 = new StoreMockMaker(results); 656 HStore store = sm.createStoreMock("store1"); 657 HStore store2 = sm2.createStoreMock("store2"); 658 BlockingStoreMockMaker blocker = new BlockingStoreMockMaker(); 659 660 // First, block the compaction thread so that we could muck with queue. 661 cst.requestSystemCompaction(r, blocker.createStoreMock(1, "b-pri1"), "b-pri1"); 662 BlockingStoreMockMaker.BlockingCompactionContext currentBlock = blocker.waitForBlocking(); 663 664 // Add 4 files to store1, 3 to store2, and queue compactions; pri 3 and 4 respectively. 665 for (int i = 0; i < 4; ++i) { 666 sm.notCompacting.add(createFile()); 667 } 668 cst.requestSystemCompaction(r, store, "s1-pri3"); 669 for (int i = 0; i < 3; ++i) { 670 sm2.notCompacting.add(createFile()); 671 } 672 cst.requestSystemCompaction(r, store2, "s2-pri4"); 673 // Now add 2 more files to store1 and queue compaction - pri 1. 674 for (int i = 0; i < 2; ++i) { 675 sm.notCompacting.add(createFile()); 676 } 677 cst.requestSystemCompaction(r, store, "s1-pri1"); 678 // Finally add blocking compaction with priority 2. 679 cst.requestSystemCompaction(r, blocker.createStoreMock(2, "b-pri2"), "b-pri2"); 680 681 // Unblock the blocking compaction; we should run pri1 and become block again in pri2. 682 currentBlock.unblock(); 683 currentBlock = blocker.waitForBlocking(); 684 // Pri1 should have "compacted" all 6 files. 685 assertEquals(1, results.size()); 686 assertEquals(6, results.get(0).intValue()); 687 // Add 2 files to store 1 (it has 2 files now). 688 for (int i = 0; i < 2; ++i) { 689 sm.notCompacting.add(createFile()); 690 } 691 // Now we have pri4 for store 2 in queue, and pri3 for store1; store1's current priority 692 // is 5, however, so it must not preempt store 2. Add blocking compaction at the end. 693 cst.requestSystemCompaction(r, blocker.createStoreMock(7, "b-pri7"), "b-pri7"); 694 currentBlock.unblock(); 695 currentBlock = blocker.waitForBlocking(); 696 assertEquals(3, results.size()); 697 assertEquals(3, results.get(1).intValue()); // 3 files should go before 2 files. 698 assertEquals(2, results.get(2).intValue()); 699 700 currentBlock.unblock(); 701 cst.interruptIfNecessary(); 702 } 703 704 /** 705 * Firstly write 10 cells (with different time stamp) to a qualifier and flush 706 * to hfile1, then write 10 cells (with different time stamp) to the same 707 * qualifier and flush to hfile2. The latest cell (cell-A) in hfile1 and the 708 * oldest cell (cell-B) in hfile2 are with the same time stamp but different 709 * sequence id, and will get scanned successively during compaction. 710 * <p/> 711 * We set compaction.kv.max to 10 so compaction will scan 10 versions each 712 * round, meanwhile we set keepSeqIdPeriod=0 in {@link DummyCompactor} so all 713 * 10 versions of hfile2 will be written out with seqId cleaned (set to 0) 714 * including cell-B, then when scanner goes to cell-A it will cause a scan 715 * out-of-order assertion error before HBASE-16931 716 * 717 * @throws Exception 718 * if error occurs during the test 719 */ 720 @Test 721 public void testCompactionSeqId() throws Exception { 722 final byte[] ROW = Bytes.toBytes("row"); 723 final byte[] QUALIFIER = Bytes.toBytes("qualifier"); 724 725 long timestamp = 10000; 726 727 // row1/cf:a/10009/Put/vlen=2/seqid=11 V: v9 728 // row1/cf:a/10008/Put/vlen=2/seqid=10 V: v8 729 // row1/cf:a/10007/Put/vlen=2/seqid=9 V: v7 730 // row1/cf:a/10006/Put/vlen=2/seqid=8 V: v6 731 // row1/cf:a/10005/Put/vlen=2/seqid=7 V: v5 732 // row1/cf:a/10004/Put/vlen=2/seqid=6 V: v4 733 // row1/cf:a/10003/Put/vlen=2/seqid=5 V: v3 734 // row1/cf:a/10002/Put/vlen=2/seqid=4 V: v2 735 // row1/cf:a/10001/Put/vlen=2/seqid=3 V: v1 736 // row1/cf:a/10000/Put/vlen=2/seqid=2 V: v0 737 for (int i = 0; i < 10; i++) { 738 Put put = new Put(ROW); 739 put.addColumn(FAMILY, QUALIFIER, timestamp + i, Bytes.toBytes("v" + i)); 740 r.put(put); 741 } 742 r.flush(true); 743 744 // row1/cf:a/10018/Put/vlen=3/seqid=16 V: v18 745 // row1/cf:a/10017/Put/vlen=3/seqid=17 V: v17 746 // row1/cf:a/10016/Put/vlen=3/seqid=18 V: v16 747 // row1/cf:a/10015/Put/vlen=3/seqid=19 V: v15 748 // row1/cf:a/10014/Put/vlen=3/seqid=20 V: v14 749 // row1/cf:a/10013/Put/vlen=3/seqid=21 V: v13 750 // row1/cf:a/10012/Put/vlen=3/seqid=22 V: v12 751 // row1/cf:a/10011/Put/vlen=3/seqid=23 V: v11 752 // row1/cf:a/10010/Put/vlen=3/seqid=24 V: v10 753 // row1/cf:a/10009/Put/vlen=2/seqid=25 V: v9 754 for (int i = 18; i > 8; i--) { 755 Put put = new Put(ROW); 756 put.addColumn(FAMILY, QUALIFIER, timestamp + i, Bytes.toBytes("v" + i)); 757 r.put(put); 758 } 759 r.flush(true); 760 r.compact(true); 761 } 762 763 public static class DummyCompactor extends DefaultCompactor { 764 public DummyCompactor(Configuration conf, HStore store) { 765 super(conf, store); 766 this.keepSeqIdPeriod = 0; 767 } 768 } 769 770 private static HStoreFile createFile() throws Exception { 771 HStoreFile sf = mock(HStoreFile.class); 772 when(sf.getPath()).thenReturn(new Path("file")); 773 StoreFileReader r = mock(StoreFileReader.class); 774 when(r.length()).thenReturn(10L); 775 when(sf.getReader()).thenReturn(r); 776 return sf; 777 } 778 779 /** 780 * Simple {@link CompactionLifeCycleTracker} on which you can wait until the requested compaction 781 * finishes. 782 */ 783 public static class Tracker implements CompactionLifeCycleTracker { 784 785 private final CountDownLatch done; 786 787 public Tracker(CountDownLatch done) { 788 this.done = done; 789 } 790 791 @Override 792 public void afterExecution(Store store) { 793 done.countDown(); 794 } 795 } 796 797 /** 798 * Simple {@link CompactionLifeCycleTracker} on which you can wait until the requested compaction 799 * finishes. 800 */ 801 public static class WaitThroughPutController extends NoLimitThroughputController{ 802 803 public WaitThroughPutController() { 804 } 805 806 @Override 807 public long control(String compactionName, long size) throws InterruptedException { 808 Thread.sleep(6000000); 809 return 6000000; 810 } 811 } 812}