001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY; 021import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY_BYTES; 022import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1; 023import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER; 024import static org.junit.Assert.assertEquals; 025import static org.junit.Assert.assertFalse; 026import static org.junit.Assert.assertTrue; 027import static org.junit.Assert.fail; 028import static org.mockito.Matchers.any; 029import static org.mockito.Mockito.doAnswer; 030import static org.mockito.Mockito.mock; 031import static org.mockito.Mockito.spy; 032import static org.mockito.Mockito.when; 033 034import java.io.IOException; 035import java.util.ArrayList; 036import java.util.Collection; 037import java.util.List; 038import java.util.Optional; 039import java.util.concurrent.CountDownLatch; 040import java.util.concurrent.TimeUnit; 041import org.apache.hadoop.conf.Configuration; 042import org.apache.hadoop.fs.FSDataOutputStream; 043import org.apache.hadoop.fs.FileStatus; 044import org.apache.hadoop.fs.FileSystem; 045import org.apache.hadoop.fs.Path; 046import org.apache.hadoop.hbase.ChoreService; 047import org.apache.hadoop.hbase.HBaseClassTestRule; 048import org.apache.hadoop.hbase.HBaseConfiguration; 049import org.apache.hadoop.hbase.HBaseTestCase; 050import org.apache.hadoop.hbase.HBaseTestingUtility; 051import org.apache.hadoop.hbase.HColumnDescriptor; 052import org.apache.hadoop.hbase.HConstants; 053import org.apache.hadoop.hbase.HTableDescriptor; 054import org.apache.hadoop.hbase.client.Delete; 055import org.apache.hadoop.hbase.client.Durability; 056import org.apache.hadoop.hbase.client.Put; 057import org.apache.hadoop.hbase.client.Table; 058import org.apache.hadoop.hbase.io.hfile.HFileScanner; 059import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; 060import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 061import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; 062import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; 063import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory; 064import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; 065import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 066import org.apache.hadoop.hbase.security.User; 067import org.apache.hadoop.hbase.testclassification.MediumTests; 068import org.apache.hadoop.hbase.testclassification.RegionServerTests; 069import org.apache.hadoop.hbase.util.Bytes; 070import org.apache.hadoop.hbase.util.Threads; 071import org.apache.hadoop.hbase.wal.WAL; 072import org.junit.After; 073import org.junit.Assume; 074import org.junit.Before; 075import org.junit.ClassRule; 076import org.junit.Rule; 077import org.junit.Test; 078import org.junit.experimental.categories.Category; 079import org.junit.rules.TestName; 080import org.mockito.Mockito; 081import org.mockito.invocation.InvocationOnMock; 082import org.mockito.stubbing.Answer; 083 084/** 085 * Test compaction framework and common functions 086 */ 087@Category({RegionServerTests.class, MediumTests.class}) 088public class TestCompaction { 089 090 @ClassRule 091 public static final HBaseClassTestRule CLASS_RULE = 092 HBaseClassTestRule.forClass(TestCompaction.class); 093 094 @Rule public TestName name = new TestName(); 095 private static final HBaseTestingUtility UTIL = HBaseTestingUtility.createLocalHTU(); 096 protected Configuration conf = UTIL.getConfiguration(); 097 098 private HRegion r = null; 099 private HTableDescriptor htd = null; 100 private static final byte [] COLUMN_FAMILY = fam1; 101 private final byte [] STARTROW = Bytes.toBytes(START_KEY); 102 private static final byte [] COLUMN_FAMILY_TEXT = COLUMN_FAMILY; 103 private int compactionThreshold; 104 private byte[] secondRowBytes, thirdRowBytes; 105 private static final long MAX_FILES_TO_COMPACT = 10; 106 private final byte[] FAMILY = Bytes.toBytes("cf"); 107 108 /** constructor */ 109 public TestCompaction() { 110 super(); 111 112 // Set cache flush size to 1MB 113 conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024); 114 conf.setInt(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, 100); 115 conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, 116 NoLimitThroughputController.class.getName()); 117 compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3); 118 119 secondRowBytes = START_KEY_BYTES.clone(); 120 // Increment the least significant character so we get to next row. 121 secondRowBytes[START_KEY_BYTES.length - 1]++; 122 thirdRowBytes = START_KEY_BYTES.clone(); 123 thirdRowBytes[START_KEY_BYTES.length - 1] = 124 (byte) (thirdRowBytes[START_KEY_BYTES.length - 1] + 2); 125 } 126 127 @Before 128 public void setUp() throws Exception { 129 this.htd = UTIL.createTableDescriptor(name.getMethodName()); 130 if (name.getMethodName().equals("testCompactionSeqId")) { 131 UTIL.getConfiguration().set("hbase.hstore.compaction.kv.max", "10"); 132 UTIL.getConfiguration().set( 133 DefaultStoreEngine.DEFAULT_COMPACTOR_CLASS_KEY, 134 DummyCompactor.class.getName()); 135 HColumnDescriptor hcd = new HColumnDescriptor(FAMILY); 136 hcd.setMaxVersions(65536); 137 this.htd.addFamily(hcd); 138 } 139 this.r = UTIL.createLocalHRegion(htd, null, null); 140 } 141 142 @After 143 public void tearDown() throws Exception { 144 WAL wal = r.getWAL(); 145 this.r.close(); 146 wal.close(); 147 } 148 149 /** 150 * Verify that you can stop a long-running compaction 151 * (used during RS shutdown) 152 * @throws Exception 153 */ 154 @Test 155 public void testInterruptCompaction() throws Exception { 156 assertEquals(0, count()); 157 158 // lower the polling interval for this test 159 int origWI = HStore.closeCheckInterval; 160 HStore.closeCheckInterval = 10*1000; // 10 KB 161 162 try { 163 // Create a couple store files w/ 15KB (over 10KB interval) 164 int jmax = (int) Math.ceil(15.0/compactionThreshold); 165 byte [] pad = new byte[1000]; // 1 KB chunk 166 for (int i = 0; i < compactionThreshold; i++) { 167 Table loader = new RegionAsTable(r); 168 Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i))); 169 p.setDurability(Durability.SKIP_WAL); 170 for (int j = 0; j < jmax; j++) { 171 p.addColumn(COLUMN_FAMILY, Bytes.toBytes(j), pad); 172 } 173 HBaseTestCase.addContent(loader, Bytes.toString(COLUMN_FAMILY)); 174 loader.put(p); 175 r.flush(true); 176 } 177 178 HRegion spyR = spy(r); 179 doAnswer(new Answer() { 180 @Override 181 public Object answer(InvocationOnMock invocation) throws Throwable { 182 r.writestate.writesEnabled = false; 183 return invocation.callRealMethod(); 184 } 185 }).when(spyR).doRegionCompactionPrep(); 186 187 // force a minor compaction, but not before requesting a stop 188 spyR.compactStores(); 189 190 // ensure that the compaction stopped, all old files are intact, 191 HStore s = r.getStore(COLUMN_FAMILY); 192 assertEquals(compactionThreshold, s.getStorefilesCount()); 193 assertTrue(s.getStorefilesSize() > 15*1000); 194 // and no new store files persisted past compactStores() 195 // only one empty dir exists in temp dir 196 FileStatus[] ls = r.getFilesystem().listStatus(r.getRegionFileSystem().getTempDir()); 197 assertEquals(1, ls.length); 198 Path storeTempDir = new Path(r.getRegionFileSystem().getTempDir(), Bytes.toString(COLUMN_FAMILY)); 199 assertTrue(r.getFilesystem().exists(storeTempDir)); 200 ls = r.getFilesystem().listStatus(storeTempDir); 201 assertEquals(0, ls.length); 202 } finally { 203 // don't mess up future tests 204 r.writestate.writesEnabled = true; 205 HStore.closeCheckInterval = origWI; 206 207 // Delete all Store information once done using 208 for (int i = 0; i < compactionThreshold; i++) { 209 Delete delete = new Delete(Bytes.add(STARTROW, Bytes.toBytes(i))); 210 byte [][] famAndQf = {COLUMN_FAMILY, null}; 211 delete.addFamily(famAndQf[0]); 212 r.delete(delete); 213 } 214 r.flush(true); 215 216 // Multiple versions allowed for an entry, so the delete isn't enough 217 // Lower TTL and expire to ensure that all our entries have been wiped 218 final int ttl = 1000; 219 for (HStore store : this.r.stores.values()) { 220 ScanInfo old = store.getScanInfo(); 221 ScanInfo si = old.customize(old.getMaxVersions(), ttl, old.getKeepDeletedCells()); 222 store.setScanInfo(si); 223 } 224 Thread.sleep(ttl); 225 226 r.compact(true); 227 assertEquals(0, count()); 228 } 229 } 230 231 private int count() throws IOException { 232 int count = 0; 233 for (HStoreFile f: this.r.stores. 234 get(COLUMN_FAMILY_TEXT).getStorefiles()) { 235 HFileScanner scanner = f.getReader().getScanner(false, false); 236 if (!scanner.seekTo()) { 237 continue; 238 } 239 do { 240 count++; 241 } while(scanner.next()); 242 } 243 return count; 244 } 245 246 private void createStoreFile(final HRegion region) throws IOException { 247 createStoreFile(region, Bytes.toString(COLUMN_FAMILY)); 248 } 249 250 private void createStoreFile(final HRegion region, String family) throws IOException { 251 Table loader = new RegionAsTable(region); 252 HBaseTestCase.addContent(loader, family); 253 region.flush(true); 254 } 255 256 @Test 257 public void testCompactionWithCorruptResult() throws Exception { 258 int nfiles = 10; 259 for (int i = 0; i < nfiles; i++) { 260 createStoreFile(r); 261 } 262 HStore store = r.getStore(COLUMN_FAMILY); 263 264 Collection<HStoreFile> storeFiles = store.getStorefiles(); 265 DefaultCompactor tool = (DefaultCompactor)store.storeEngine.getCompactor(); 266 tool.compactForTesting(storeFiles, false); 267 268 // Now lets corrupt the compacted file. 269 FileSystem fs = store.getFileSystem(); 270 // default compaction policy created one and only one new compacted file 271 Path dstPath = store.getRegionFileSystem().createTempName(); 272 FSDataOutputStream stream = fs.create(dstPath, null, true, 512, (short)3, 1024L, null); 273 stream.writeChars("CORRUPT FILE!!!!"); 274 stream.close(); 275 Path origPath = store.getRegionFileSystem().commitStoreFile( 276 Bytes.toString(COLUMN_FAMILY), dstPath); 277 278 try { 279 ((HStore)store).moveFileIntoPlace(origPath); 280 } catch (Exception e) { 281 // The complete compaction should fail and the corrupt file should remain 282 // in the 'tmp' directory; 283 assertTrue(fs.exists(origPath)); 284 assertFalse(fs.exists(dstPath)); 285 System.out.println("testCompactionWithCorruptResult Passed"); 286 return; 287 } 288 fail("testCompactionWithCorruptResult failed since no exception was" + 289 "thrown while completing a corrupt file"); 290 } 291 292 /** 293 * Create a custom compaction request and be sure that we can track it through the queue, knowing 294 * when the compaction is completed. 295 */ 296 @Test 297 public void testTrackingCompactionRequest() throws Exception { 298 // setup a compact/split thread on a mock server 299 HRegionServer mockServer = Mockito.mock(HRegionServer.class); 300 Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf()); 301 CompactSplit thread = new CompactSplit(mockServer); 302 Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread); 303 304 // setup a region/store with some files 305 HStore store = r.getStore(COLUMN_FAMILY); 306 createStoreFile(r); 307 for (int i = 0; i < MAX_FILES_TO_COMPACT + 1; i++) { 308 createStoreFile(r); 309 } 310 311 CountDownLatch latch = new CountDownLatch(1); 312 Tracker tracker = new Tracker(latch); 313 thread.requestCompaction(r, store, "test custom comapction", PRIORITY_USER, tracker, 314 null); 315 // wait for the latch to complete. 316 latch.await(); 317 318 thread.interruptIfNecessary(); 319 } 320 321 @Test 322 public void testCompactionFailure() throws Exception { 323 // setup a compact/split thread on a mock server 324 HRegionServer mockServer = Mockito.mock(HRegionServer.class); 325 Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf()); 326 CompactSplit thread = new CompactSplit(mockServer); 327 Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread); 328 329 // setup a region/store with some files 330 HStore store = r.getStore(COLUMN_FAMILY); 331 createStoreFile(r); 332 for (int i = 0; i < HStore.DEFAULT_BLOCKING_STOREFILE_COUNT - 1; i++) { 333 createStoreFile(r); 334 } 335 336 HRegion mockRegion = Mockito.spy(r); 337 Mockito.when(mockRegion.checkSplit()).thenThrow(new IndexOutOfBoundsException()); 338 339 MetricsRegionWrapper metricsWrapper = new MetricsRegionWrapperImpl(r); 340 341 long preCompletedCount = metricsWrapper.getNumCompactionsCompleted(); 342 long preFailedCount = metricsWrapper.getNumCompactionsFailed(); 343 344 CountDownLatch latch = new CountDownLatch(1); 345 Tracker tracker = new Tracker(latch); 346 thread.requestCompaction(mockRegion, store, "test custom comapction", PRIORITY_USER, 347 tracker, null); 348 // wait for the latch to complete. 349 latch.await(120, TimeUnit.SECONDS); 350 351 // compaction should have completed and been marked as failed due to error in split request 352 long postCompletedCount = metricsWrapper.getNumCompactionsCompleted(); 353 long postFailedCount = metricsWrapper.getNumCompactionsFailed(); 354 355 assertTrue("Completed count should have increased (pre=" + preCompletedCount + 356 ", post="+postCompletedCount+")", 357 postCompletedCount > preCompletedCount); 358 assertTrue("Failed count should have increased (pre=" + preFailedCount + 359 ", post=" + postFailedCount + ")", 360 postFailedCount > preFailedCount); 361 } 362 363 /** 364 * Test no new Compaction requests are generated after calling stop compactions 365 */ 366 @Test public void testStopStartCompaction() throws IOException { 367 // setup a compact/split thread on a mock server 368 HRegionServer mockServer = Mockito.mock(HRegionServer.class); 369 Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf()); 370 CompactSplit thread = new CompactSplit(mockServer); 371 Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread); 372 // setup a region/store with some files 373 HStore store = r.getStore(COLUMN_FAMILY); 374 createStoreFile(r); 375 for (int i = 0; i < HStore.DEFAULT_BLOCKING_STOREFILE_COUNT - 1; i++) { 376 createStoreFile(r); 377 } 378 thread.switchCompaction(false); 379 thread 380 .requestCompaction(r, store, "test", Store.PRIORITY_USER, CompactionLifeCycleTracker.DUMMY, 381 null); 382 assertEquals(false, thread.isCompactionsEnabled()); 383 assertEquals(0, thread.getLongCompactions().getActiveCount() + thread.getShortCompactions() 384 .getActiveCount()); 385 thread.switchCompaction(true); 386 assertEquals(true, thread.isCompactionsEnabled()); 387 thread 388 .requestCompaction(r, store, "test", Store.PRIORITY_USER, CompactionLifeCycleTracker.DUMMY, 389 null); 390 assertEquals(1, thread.getLongCompactions().getActiveCount() + thread.getShortCompactions() 391 .getActiveCount()); 392 } 393 394 @Test public void testInterruptingRunningCompactions() throws Exception { 395 // setup a compact/split thread on a mock server 396 conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, 397 WaitThroughPutController.class.getName()); 398 HRegionServer mockServer = Mockito.mock(HRegionServer.class); 399 Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf()); 400 CompactSplit thread = new CompactSplit(mockServer); 401 402 Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread); 403 404 // setup a region/store with some files 405 HStore store = r.getStore(COLUMN_FAMILY); 406 int jmax = (int) Math.ceil(15.0 / compactionThreshold); 407 byte[] pad = new byte[1000]; // 1 KB chunk 408 for (int i = 0; i < compactionThreshold; i++) { 409 Table loader = new RegionAsTable(r); 410 Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i))); 411 p.setDurability(Durability.SKIP_WAL); 412 for (int j = 0; j < jmax; j++) { 413 p.addColumn(COLUMN_FAMILY, Bytes.toBytes(j), pad); 414 } 415 HBaseTestCase.addContent(loader, Bytes.toString(COLUMN_FAMILY)); 416 loader.put(p); 417 r.flush(true); 418 } 419 HStore s = r.getStore(COLUMN_FAMILY); 420 int initialFiles = s.getStorefilesCount(); 421 422 thread.requestCompaction(r, store, "test custom comapction", PRIORITY_USER, 423 CompactionLifeCycleTracker.DUMMY, null); 424 425 Thread.sleep(3000); 426 thread.switchCompaction(false); 427 assertEquals(initialFiles, s.getStorefilesCount()); 428 //don't mess up future tests 429 thread.switchCompaction(true); 430 } 431 432 /** 433 * HBASE-7947: Regression test to ensure adding to the correct list in the 434 * {@link CompactSplit} 435 * @throws Exception on failure 436 */ 437 @Test 438 public void testMultipleCustomCompactionRequests() throws Exception { 439 // setup a compact/split thread on a mock server 440 HRegionServer mockServer = Mockito.mock(HRegionServer.class); 441 Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf()); 442 CompactSplit thread = new CompactSplit(mockServer); 443 Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread); 444 445 // setup a region/store with some files 446 int numStores = r.getStores().size(); 447 CountDownLatch latch = new CountDownLatch(numStores); 448 Tracker tracker = new Tracker(latch); 449 // create some store files and setup requests for each store on which we want to do a 450 // compaction 451 for (HStore store : r.getStores()) { 452 createStoreFile(r, store.getColumnFamilyName()); 453 createStoreFile(r, store.getColumnFamilyName()); 454 createStoreFile(r, store.getColumnFamilyName()); 455 thread.requestCompaction(r, store, "test mulitple custom comapctions", PRIORITY_USER, 456 tracker, null); 457 } 458 // wait for the latch to complete. 459 latch.await(); 460 461 thread.interruptIfNecessary(); 462 } 463 464 class StoreMockMaker extends StatefulStoreMockMaker { 465 public ArrayList<HStoreFile> compacting = new ArrayList<>(); 466 public ArrayList<HStoreFile> notCompacting = new ArrayList<>(); 467 private final ArrayList<Integer> results; 468 469 public StoreMockMaker(ArrayList<Integer> results) { 470 this.results = results; 471 } 472 473 public class TestCompactionContext extends CompactionContext { 474 475 private List<HStoreFile> selectedFiles; 476 477 public TestCompactionContext(List<HStoreFile> selectedFiles) { 478 super(); 479 this.selectedFiles = selectedFiles; 480 } 481 482 @Override 483 public List<HStoreFile> preSelect(List<HStoreFile> filesCompacting) { 484 return new ArrayList<>(); 485 } 486 487 @Override 488 public boolean select(List<HStoreFile> filesCompacting, boolean isUserCompaction, 489 boolean mayUseOffPeak, boolean forceMajor) throws IOException { 490 this.request = new CompactionRequestImpl(selectedFiles); 491 this.request.setPriority(getPriority()); 492 return true; 493 } 494 495 @Override 496 public List<Path> compact(ThroughputController throughputController, User user) 497 throws IOException { 498 finishCompaction(this.selectedFiles); 499 return new ArrayList<>(); 500 } 501 } 502 503 @Override 504 public synchronized Optional<CompactionContext> selectCompaction() { 505 CompactionContext ctx = new TestCompactionContext(new ArrayList<>(notCompacting)); 506 compacting.addAll(notCompacting); 507 notCompacting.clear(); 508 try { 509 ctx.select(null, false, false, false); 510 } catch (IOException ex) { 511 fail("Shouldn't happen"); 512 } 513 return Optional.of(ctx); 514 } 515 516 @Override 517 public synchronized void cancelCompaction(Object object) { 518 TestCompactionContext ctx = (TestCompactionContext)object; 519 compacting.removeAll(ctx.selectedFiles); 520 notCompacting.addAll(ctx.selectedFiles); 521 } 522 523 public synchronized void finishCompaction(List<HStoreFile> sfs) { 524 if (sfs.isEmpty()) return; 525 synchronized (results) { 526 results.add(sfs.size()); 527 } 528 compacting.removeAll(sfs); 529 } 530 531 @Override 532 public int getPriority() { 533 return 7 - compacting.size() - notCompacting.size(); 534 } 535 } 536 537 public class BlockingStoreMockMaker extends StatefulStoreMockMaker { 538 BlockingCompactionContext blocked = null; 539 540 public class BlockingCompactionContext extends CompactionContext { 541 public volatile boolean isInCompact = false; 542 543 public void unblock() { 544 synchronized (this) { 545 this.notifyAll(); 546 } 547 } 548 549 @Override 550 public List<Path> compact(ThroughputController throughputController, User user) 551 throws IOException { 552 try { 553 isInCompact = true; 554 synchronized (this) { 555 this.wait(); 556 } 557 } catch (InterruptedException e) { 558 Assume.assumeNoException(e); 559 } 560 return new ArrayList<>(); 561 } 562 563 @Override 564 public List<HStoreFile> preSelect(List<HStoreFile> filesCompacting) { 565 return new ArrayList<>(); 566 } 567 568 @Override 569 public boolean select(List<HStoreFile> f, boolean i, boolean m, boolean e) 570 throws IOException { 571 this.request = new CompactionRequestImpl(new ArrayList<>()); 572 return true; 573 } 574 } 575 576 @Override 577 public Optional<CompactionContext> selectCompaction() { 578 this.blocked = new BlockingCompactionContext(); 579 try { 580 this.blocked.select(null, false, false, false); 581 } catch (IOException ex) { 582 fail("Shouldn't happen"); 583 } 584 return Optional.of(blocked); 585 } 586 587 @Override 588 public void cancelCompaction(Object object) {} 589 590 @Override 591 public int getPriority() { 592 return Integer.MIN_VALUE; // some invalid value, see createStoreMock 593 } 594 595 public BlockingCompactionContext waitForBlocking() { 596 while (this.blocked == null || !this.blocked.isInCompact) { 597 Threads.sleepWithoutInterrupt(50); 598 } 599 BlockingCompactionContext ctx = this.blocked; 600 this.blocked = null; 601 return ctx; 602 } 603 604 @Override 605 public HStore createStoreMock(String name) throws Exception { 606 return createStoreMock(Integer.MIN_VALUE, name); 607 } 608 609 public HStore createStoreMock(int priority, String name) throws Exception { 610 // Override the mock to always return the specified priority. 611 HStore s = super.createStoreMock(name); 612 when(s.getCompactPriority()).thenReturn(priority); 613 return s; 614 } 615 } 616 617 /** Test compaction priority management and multiple compactions per store (HBASE-8665). */ 618 @Test 619 public void testCompactionQueuePriorities() throws Exception { 620 // Setup a compact/split thread on a mock server. 621 final Configuration conf = HBaseConfiguration.create(); 622 HRegionServer mockServer = mock(HRegionServer.class); 623 when(mockServer.isStopped()).thenReturn(false); 624 when(mockServer.getConfiguration()).thenReturn(conf); 625 when(mockServer.getChoreService()).thenReturn(new ChoreService("test")); 626 CompactSplit cst = new CompactSplit(mockServer); 627 when(mockServer.getCompactSplitThread()).thenReturn(cst); 628 //prevent large compaction thread pool stealing job from small compaction queue. 629 cst.shutdownLongCompactions(); 630 // Set up the region mock that redirects compactions. 631 HRegion r = mock(HRegion.class); 632 when( 633 r.compact(any(), any(), any(), any())).then(new Answer<Boolean>() { 634 @Override 635 public Boolean answer(InvocationOnMock invocation) throws Throwable { 636 invocation.<CompactionContext>getArgument(0).compact(invocation.getArgument(2), null); 637 return true; 638 } 639 }); 640 641 // Set up store mocks for 2 "real" stores and the one we use for blocking CST. 642 ArrayList<Integer> results = new ArrayList<>(); 643 StoreMockMaker sm = new StoreMockMaker(results), sm2 = new StoreMockMaker(results); 644 HStore store = sm.createStoreMock("store1"); 645 HStore store2 = sm2.createStoreMock("store2"); 646 BlockingStoreMockMaker blocker = new BlockingStoreMockMaker(); 647 648 // First, block the compaction thread so that we could muck with queue. 649 cst.requestSystemCompaction(r, blocker.createStoreMock(1, "b-pri1"), "b-pri1"); 650 BlockingStoreMockMaker.BlockingCompactionContext currentBlock = blocker.waitForBlocking(); 651 652 // Add 4 files to store1, 3 to store2, and queue compactions; pri 3 and 4 respectively. 653 for (int i = 0; i < 4; ++i) { 654 sm.notCompacting.add(createFile()); 655 } 656 cst.requestSystemCompaction(r, store, "s1-pri3"); 657 for (int i = 0; i < 3; ++i) { 658 sm2.notCompacting.add(createFile()); 659 } 660 cst.requestSystemCompaction(r, store2, "s2-pri4"); 661 // Now add 2 more files to store1 and queue compaction - pri 1. 662 for (int i = 0; i < 2; ++i) { 663 sm.notCompacting.add(createFile()); 664 } 665 cst.requestSystemCompaction(r, store, "s1-pri1"); 666 // Finally add blocking compaction with priority 2. 667 cst.requestSystemCompaction(r, blocker.createStoreMock(2, "b-pri2"), "b-pri2"); 668 669 // Unblock the blocking compaction; we should run pri1 and become block again in pri2. 670 currentBlock.unblock(); 671 currentBlock = blocker.waitForBlocking(); 672 // Pri1 should have "compacted" all 6 files. 673 assertEquals(1, results.size()); 674 assertEquals(6, results.get(0).intValue()); 675 // Add 2 files to store 1 (it has 2 files now). 676 for (int i = 0; i < 2; ++i) { 677 sm.notCompacting.add(createFile()); 678 } 679 // Now we have pri4 for store 2 in queue, and pri3 for store1; store1's current priority 680 // is 5, however, so it must not preempt store 2. Add blocking compaction at the end. 681 cst.requestSystemCompaction(r, blocker.createStoreMock(7, "b-pri7"), "b-pri7"); 682 currentBlock.unblock(); 683 currentBlock = blocker.waitForBlocking(); 684 assertEquals(3, results.size()); 685 assertEquals(3, results.get(1).intValue()); // 3 files should go before 2 files. 686 assertEquals(2, results.get(2).intValue()); 687 688 currentBlock.unblock(); 689 cst.interruptIfNecessary(); 690 } 691 692 /** 693 * Firstly write 10 cells (with different time stamp) to a qualifier and flush 694 * to hfile1, then write 10 cells (with different time stamp) to the same 695 * qualifier and flush to hfile2. The latest cell (cell-A) in hfile1 and the 696 * oldest cell (cell-B) in hfile2 are with the same time stamp but different 697 * sequence id, and will get scanned successively during compaction. 698 * <p/> 699 * We set compaction.kv.max to 10 so compaction will scan 10 versions each 700 * round, meanwhile we set keepSeqIdPeriod=0 in {@link DummyCompactor} so all 701 * 10 versions of hfile2 will be written out with seqId cleaned (set to 0) 702 * including cell-B, then when scanner goes to cell-A it will cause a scan 703 * out-of-order assertion error before HBASE-16931 704 * 705 * @throws Exception 706 * if error occurs during the test 707 */ 708 @Test 709 public void testCompactionSeqId() throws Exception { 710 final byte[] ROW = Bytes.toBytes("row"); 711 final byte[] QUALIFIER = Bytes.toBytes("qualifier"); 712 713 long timestamp = 10000; 714 715 // row1/cf:a/10009/Put/vlen=2/seqid=11 V: v9 716 // row1/cf:a/10008/Put/vlen=2/seqid=10 V: v8 717 // row1/cf:a/10007/Put/vlen=2/seqid=9 V: v7 718 // row1/cf:a/10006/Put/vlen=2/seqid=8 V: v6 719 // row1/cf:a/10005/Put/vlen=2/seqid=7 V: v5 720 // row1/cf:a/10004/Put/vlen=2/seqid=6 V: v4 721 // row1/cf:a/10003/Put/vlen=2/seqid=5 V: v3 722 // row1/cf:a/10002/Put/vlen=2/seqid=4 V: v2 723 // row1/cf:a/10001/Put/vlen=2/seqid=3 V: v1 724 // row1/cf:a/10000/Put/vlen=2/seqid=2 V: v0 725 for (int i = 0; i < 10; i++) { 726 Put put = new Put(ROW); 727 put.addColumn(FAMILY, QUALIFIER, timestamp + i, Bytes.toBytes("v" + i)); 728 r.put(put); 729 } 730 r.flush(true); 731 732 // row1/cf:a/10018/Put/vlen=3/seqid=16 V: v18 733 // row1/cf:a/10017/Put/vlen=3/seqid=17 V: v17 734 // row1/cf:a/10016/Put/vlen=3/seqid=18 V: v16 735 // row1/cf:a/10015/Put/vlen=3/seqid=19 V: v15 736 // row1/cf:a/10014/Put/vlen=3/seqid=20 V: v14 737 // row1/cf:a/10013/Put/vlen=3/seqid=21 V: v13 738 // row1/cf:a/10012/Put/vlen=3/seqid=22 V: v12 739 // row1/cf:a/10011/Put/vlen=3/seqid=23 V: v11 740 // row1/cf:a/10010/Put/vlen=3/seqid=24 V: v10 741 // row1/cf:a/10009/Put/vlen=2/seqid=25 V: v9 742 for (int i = 18; i > 8; i--) { 743 Put put = new Put(ROW); 744 put.addColumn(FAMILY, QUALIFIER, timestamp + i, Bytes.toBytes("v" + i)); 745 r.put(put); 746 } 747 r.flush(true); 748 r.compact(true); 749 } 750 751 public static class DummyCompactor extends DefaultCompactor { 752 public DummyCompactor(Configuration conf, HStore store) { 753 super(conf, store); 754 this.keepSeqIdPeriod = 0; 755 } 756 } 757 758 private static HStoreFile createFile() throws Exception { 759 HStoreFile sf = mock(HStoreFile.class); 760 when(sf.getPath()).thenReturn(new Path("file")); 761 StoreFileReader r = mock(StoreFileReader.class); 762 when(r.length()).thenReturn(10L); 763 when(sf.getReader()).thenReturn(r); 764 return sf; 765 } 766 767 /** 768 * Simple {@link CompactionLifeCycleTracker} on which you can wait until the requested compaction 769 * finishes. 770 */ 771 public static class Tracker implements CompactionLifeCycleTracker { 772 773 private final CountDownLatch done; 774 775 public Tracker(CountDownLatch done) { 776 this.done = done; 777 } 778 779 @Override 780 public void afterExecution(Store store) { 781 done.countDown(); 782 } 783 } 784 785 /** 786 * Simple {@link CompactionLifeCycleTracker} on which you can wait until the requested compaction 787 * finishes. 788 */ 789 public static class WaitThroughPutController extends NoLimitThroughputController{ 790 791 public WaitThroughPutController() { 792 } 793 794 @Override 795 public long control(String compactionName, long size) throws InterruptedException { 796 Thread.sleep(6000000); 797 return 6000000; 798 } 799 } 800}