001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY; 021import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY_BYTES; 022import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1; 023import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER; 024import static org.junit.Assert.assertEquals; 025import static org.junit.Assert.assertFalse; 026import static org.junit.Assert.assertTrue; 027import static org.junit.Assert.fail; 028import static org.mockito.Matchers.any; 029import static org.mockito.Mockito.doAnswer; 030import static org.mockito.Mockito.mock; 031import static org.mockito.Mockito.spy; 032import static org.mockito.Mockito.when; 033 034import java.io.IOException; 035import java.util.ArrayList; 036import java.util.Collection; 037import java.util.List; 038import java.util.Optional; 039import java.util.concurrent.CountDownLatch; 040import java.util.concurrent.TimeUnit; 041import org.apache.hadoop.conf.Configuration; 042import org.apache.hadoop.fs.FSDataOutputStream; 043import org.apache.hadoop.fs.FileStatus; 044import org.apache.hadoop.fs.FileSystem; 045import org.apache.hadoop.fs.Path; 046import org.apache.hadoop.hbase.ChoreService; 047import org.apache.hadoop.hbase.HBaseClassTestRule; 048import org.apache.hadoop.hbase.HBaseConfiguration; 049import org.apache.hadoop.hbase.HBaseTestCase; 050import org.apache.hadoop.hbase.HBaseTestingUtility; 051import org.apache.hadoop.hbase.HColumnDescriptor; 052import org.apache.hadoop.hbase.HConstants; 053import org.apache.hadoop.hbase.HTableDescriptor; 054import org.apache.hadoop.hbase.client.Delete; 055import org.apache.hadoop.hbase.client.Durability; 056import org.apache.hadoop.hbase.client.Put; 057import org.apache.hadoop.hbase.client.Table; 058import org.apache.hadoop.hbase.io.hfile.HFileScanner; 059import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; 060import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 061import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; 062import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; 063import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory; 064import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; 065import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 066import org.apache.hadoop.hbase.security.User; 067import org.apache.hadoop.hbase.testclassification.MediumTests; 068import org.apache.hadoop.hbase.testclassification.RegionServerTests; 069import org.apache.hadoop.hbase.util.Bytes; 070import org.apache.hadoop.hbase.util.Threads; 071import org.apache.hadoop.hbase.wal.WAL; 072import org.junit.After; 073import org.junit.Assume; 074import org.junit.Before; 075import org.junit.ClassRule; 076import org.junit.Rule; 077import org.junit.Test; 078import org.junit.experimental.categories.Category; 079import org.junit.rules.TestName; 080import org.mockito.Mockito; 081import org.mockito.invocation.InvocationOnMock; 082import org.mockito.stubbing.Answer; 083 084/** 085 * Test compaction framework and common functions 086 */ 087@Category({RegionServerTests.class, MediumTests.class}) 088public class TestCompaction { 089 090 @ClassRule 091 public static final HBaseClassTestRule CLASS_RULE = 092 HBaseClassTestRule.forClass(TestCompaction.class); 093 094 @Rule public TestName name = new TestName(); 095 private static final HBaseTestingUtility UTIL = HBaseTestingUtility.createLocalHTU(); 096 protected Configuration conf = UTIL.getConfiguration(); 097 098 private HRegion r = null; 099 private HTableDescriptor htd = null; 100 private static final byte [] COLUMN_FAMILY = fam1; 101 private final byte [] STARTROW = Bytes.toBytes(START_KEY); 102 private static final byte [] COLUMN_FAMILY_TEXT = COLUMN_FAMILY; 103 private int compactionThreshold; 104 private byte[] secondRowBytes, thirdRowBytes; 105 private static final long MAX_FILES_TO_COMPACT = 10; 106 private final byte[] FAMILY = Bytes.toBytes("cf"); 107 108 /** constructor */ 109 public TestCompaction() { 110 super(); 111 112 // Set cache flush size to 1MB 113 conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024); 114 conf.setInt(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, 100); 115 conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, 116 NoLimitThroughputController.class.getName()); 117 compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3); 118 119 secondRowBytes = START_KEY_BYTES.clone(); 120 // Increment the least significant character so we get to next row. 121 secondRowBytes[START_KEY_BYTES.length - 1]++; 122 thirdRowBytes = START_KEY_BYTES.clone(); 123 thirdRowBytes[START_KEY_BYTES.length - 1] = 124 (byte) (thirdRowBytes[START_KEY_BYTES.length - 1] + 2); 125 } 126 127 @Before 128 public void setUp() throws Exception { 129 this.htd = UTIL.createTableDescriptor(name.getMethodName()); 130 if (name.getMethodName().equals("testCompactionSeqId")) { 131 UTIL.getConfiguration().set("hbase.hstore.compaction.kv.max", "10"); 132 UTIL.getConfiguration().set( 133 DefaultStoreEngine.DEFAULT_COMPACTOR_CLASS_KEY, 134 DummyCompactor.class.getName()); 135 HColumnDescriptor hcd = new HColumnDescriptor(FAMILY); 136 hcd.setMaxVersions(65536); 137 this.htd.addFamily(hcd); 138 } 139 this.r = UTIL.createLocalHRegion(htd, null, null); 140 } 141 142 @After 143 public void tearDown() throws Exception { 144 WAL wal = r.getWAL(); 145 this.r.close(); 146 wal.close(); 147 } 148 149 /** 150 * Verify that you can stop a long-running compaction 151 * (used during RS shutdown) 152 * @throws Exception 153 */ 154 @Test 155 public void testInterruptCompaction() throws Exception { 156 assertEquals(0, count()); 157 158 // lower the polling interval for this test 159 int origWI = HStore.closeCheckInterval; 160 HStore.closeCheckInterval = 10*1000; // 10 KB 161 162 try { 163 // Create a couple store files w/ 15KB (over 10KB interval) 164 int jmax = (int) Math.ceil(15.0/compactionThreshold); 165 byte [] pad = new byte[1000]; // 1 KB chunk 166 for (int i = 0; i < compactionThreshold; i++) { 167 Table loader = new RegionAsTable(r); 168 Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i))); 169 p.setDurability(Durability.SKIP_WAL); 170 for (int j = 0; j < jmax; j++) { 171 p.addColumn(COLUMN_FAMILY, Bytes.toBytes(j), pad); 172 } 173 HBaseTestCase.addContent(loader, Bytes.toString(COLUMN_FAMILY)); 174 loader.put(p); 175 r.flush(true); 176 } 177 178 HRegion spyR = spy(r); 179 doAnswer(new Answer() { 180 @Override 181 public Object answer(InvocationOnMock invocation) throws Throwable { 182 r.writestate.writesEnabled = false; 183 return invocation.callRealMethod(); 184 } 185 }).when(spyR).doRegionCompactionPrep(); 186 187 // force a minor compaction, but not before requesting a stop 188 spyR.compactStores(); 189 190 // ensure that the compaction stopped, all old files are intact, 191 HStore s = r.getStore(COLUMN_FAMILY); 192 assertEquals(compactionThreshold, s.getStorefilesCount()); 193 assertTrue(s.getStorefilesSize() > 15*1000); 194 // and no new store files persisted past compactStores() 195 // only one empty dir exists in temp dir 196 FileStatus[] ls = r.getFilesystem().listStatus(r.getRegionFileSystem().getTempDir()); 197 assertEquals(1, ls.length); 198 Path storeTempDir = new Path(r.getRegionFileSystem().getTempDir(), Bytes.toString(COLUMN_FAMILY)); 199 assertTrue(r.getFilesystem().exists(storeTempDir)); 200 ls = r.getFilesystem().listStatus(storeTempDir); 201 assertEquals(0, ls.length); 202 } finally { 203 // don't mess up future tests 204 r.writestate.writesEnabled = true; 205 HStore.closeCheckInterval = origWI; 206 207 // Delete all Store information once done using 208 for (int i = 0; i < compactionThreshold; i++) { 209 Delete delete = new Delete(Bytes.add(STARTROW, Bytes.toBytes(i))); 210 byte [][] famAndQf = {COLUMN_FAMILY, null}; 211 delete.addFamily(famAndQf[0]); 212 r.delete(delete); 213 } 214 r.flush(true); 215 216 // Multiple versions allowed for an entry, so the delete isn't enough 217 // Lower TTL and expire to ensure that all our entries have been wiped 218 final int ttl = 1000; 219 for (HStore store : this.r.stores.values()) { 220 ScanInfo old = store.getScanInfo(); 221 ScanInfo si = old.customize(old.getMaxVersions(), ttl, old.getKeepDeletedCells()); 222 store.setScanInfo(si); 223 } 224 Thread.sleep(ttl); 225 226 r.compact(true); 227 assertEquals(0, count()); 228 } 229 } 230 231 private int count() throws IOException { 232 int count = 0; 233 for (HStoreFile f: this.r.stores. 234 get(COLUMN_FAMILY_TEXT).getStorefiles()) { 235 HFileScanner scanner = f.getReader().getScanner(false, false); 236 if (!scanner.seekTo()) { 237 continue; 238 } 239 do { 240 count++; 241 } while(scanner.next()); 242 } 243 return count; 244 } 245 246 private void createStoreFile(final HRegion region) throws IOException { 247 createStoreFile(region, Bytes.toString(COLUMN_FAMILY)); 248 } 249 250 private void createStoreFile(final HRegion region, String family) throws IOException { 251 Table loader = new RegionAsTable(region); 252 HBaseTestCase.addContent(loader, family); 253 region.flush(true); 254 } 255 256 @Test 257 public void testCompactionWithCorruptResult() throws Exception { 258 int nfiles = 10; 259 for (int i = 0; i < nfiles; i++) { 260 createStoreFile(r); 261 } 262 HStore store = r.getStore(COLUMN_FAMILY); 263 264 Collection<HStoreFile> storeFiles = store.getStorefiles(); 265 DefaultCompactor tool = (DefaultCompactor)store.storeEngine.getCompactor(); 266 tool.compactForTesting(storeFiles, false); 267 268 // Now lets corrupt the compacted file. 269 FileSystem fs = store.getFileSystem(); 270 // default compaction policy created one and only one new compacted file 271 Path dstPath = store.getRegionFileSystem().createTempName(); 272 FSDataOutputStream stream = fs.create(dstPath, null, true, 512, (short)3, 1024L, null); 273 stream.writeChars("CORRUPT FILE!!!!"); 274 stream.close(); 275 Path origPath = store.getRegionFileSystem().commitStoreFile( 276 Bytes.toString(COLUMN_FAMILY), dstPath); 277 278 try { 279 ((HStore)store).moveFileIntoPlace(origPath); 280 } catch (Exception e) { 281 // The complete compaction should fail and the corrupt file should remain 282 // in the 'tmp' directory; 283 assertTrue(fs.exists(origPath)); 284 assertFalse(fs.exists(dstPath)); 285 System.out.println("testCompactionWithCorruptResult Passed"); 286 return; 287 } 288 fail("testCompactionWithCorruptResult failed since no exception was" + 289 "thrown while completing a corrupt file"); 290 } 291 292 /** 293 * Create a custom compaction request and be sure that we can track it through the queue, knowing 294 * when the compaction is completed. 295 */ 296 @Test 297 public void testTrackingCompactionRequest() throws Exception { 298 // setup a compact/split thread on a mock server 299 HRegionServer mockServer = Mockito.mock(HRegionServer.class); 300 Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf()); 301 CompactSplit thread = new CompactSplit(mockServer); 302 Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread); 303 304 // setup a region/store with some files 305 HStore store = r.getStore(COLUMN_FAMILY); 306 createStoreFile(r); 307 for (int i = 0; i < MAX_FILES_TO_COMPACT + 1; i++) { 308 createStoreFile(r); 309 } 310 311 CountDownLatch latch = new CountDownLatch(1); 312 Tracker tracker = new Tracker(latch); 313 thread.requestCompaction(r, store, "test custom comapction", PRIORITY_USER, tracker, 314 null); 315 // wait for the latch to complete. 316 latch.await(); 317 318 thread.interruptIfNecessary(); 319 } 320 321 @Test 322 public void testCompactionFailure() throws Exception { 323 // setup a compact/split thread on a mock server 324 HRegionServer mockServer = Mockito.mock(HRegionServer.class); 325 Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf()); 326 CompactSplit thread = new CompactSplit(mockServer); 327 Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread); 328 329 // setup a region/store with some files 330 HStore store = r.getStore(COLUMN_FAMILY); 331 createStoreFile(r); 332 for (int i = 0; i < HStore.DEFAULT_BLOCKING_STOREFILE_COUNT - 1; i++) { 333 createStoreFile(r); 334 } 335 336 HRegion mockRegion = Mockito.spy(r); 337 Mockito.when(mockRegion.checkSplit()).thenThrow(new IndexOutOfBoundsException()); 338 339 MetricsRegionWrapper metricsWrapper = new MetricsRegionWrapperImpl(r); 340 341 long preCompletedCount = metricsWrapper.getNumCompactionsCompleted(); 342 long preFailedCount = metricsWrapper.getNumCompactionsFailed(); 343 344 CountDownLatch latch = new CountDownLatch(1); 345 Tracker tracker = new Tracker(latch); 346 thread.requestCompaction(mockRegion, store, "test custom comapction", PRIORITY_USER, 347 tracker, null); 348 // wait for the latch to complete. 349 latch.await(120, TimeUnit.SECONDS); 350 351 // compaction should have completed and been marked as failed due to error in split request 352 long postCompletedCount = metricsWrapper.getNumCompactionsCompleted(); 353 long postFailedCount = metricsWrapper.getNumCompactionsFailed(); 354 355 assertTrue("Completed count should have increased (pre=" + preCompletedCount + 356 ", post="+postCompletedCount+")", 357 postCompletedCount > preCompletedCount); 358 assertTrue("Failed count should have increased (pre=" + preFailedCount + 359 ", post=" + postFailedCount + ")", 360 postFailedCount > preFailedCount); 361 } 362 363 /** 364 * HBASE-7947: Regression test to ensure adding to the correct list in the 365 * {@link CompactSplit} 366 * @throws Exception on failure 367 */ 368 @Test 369 public void testMultipleCustomCompactionRequests() throws Exception { 370 // setup a compact/split thread on a mock server 371 HRegionServer mockServer = Mockito.mock(HRegionServer.class); 372 Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf()); 373 CompactSplit thread = new CompactSplit(mockServer); 374 Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread); 375 376 // setup a region/store with some files 377 int numStores = r.getStores().size(); 378 CountDownLatch latch = new CountDownLatch(numStores); 379 Tracker tracker = new Tracker(latch); 380 // create some store files and setup requests for each store on which we want to do a 381 // compaction 382 for (HStore store : r.getStores()) { 383 createStoreFile(r, store.getColumnFamilyName()); 384 createStoreFile(r, store.getColumnFamilyName()); 385 createStoreFile(r, store.getColumnFamilyName()); 386 thread.requestCompaction(r, store, "test mulitple custom comapctions", PRIORITY_USER, 387 tracker, null); 388 } 389 // wait for the latch to complete. 390 latch.await(); 391 392 thread.interruptIfNecessary(); 393 } 394 395 class StoreMockMaker extends StatefulStoreMockMaker { 396 public ArrayList<HStoreFile> compacting = new ArrayList<>(); 397 public ArrayList<HStoreFile> notCompacting = new ArrayList<>(); 398 private final ArrayList<Integer> results; 399 400 public StoreMockMaker(ArrayList<Integer> results) { 401 this.results = results; 402 } 403 404 public class TestCompactionContext extends CompactionContext { 405 406 private List<HStoreFile> selectedFiles; 407 408 public TestCompactionContext(List<HStoreFile> selectedFiles) { 409 super(); 410 this.selectedFiles = selectedFiles; 411 } 412 413 @Override 414 public List<HStoreFile> preSelect(List<HStoreFile> filesCompacting) { 415 return new ArrayList<>(); 416 } 417 418 @Override 419 public boolean select(List<HStoreFile> filesCompacting, boolean isUserCompaction, 420 boolean mayUseOffPeak, boolean forceMajor) throws IOException { 421 this.request = new CompactionRequestImpl(selectedFiles); 422 this.request.setPriority(getPriority()); 423 return true; 424 } 425 426 @Override 427 public List<Path> compact(ThroughputController throughputController, User user) 428 throws IOException { 429 finishCompaction(this.selectedFiles); 430 return new ArrayList<>(); 431 } 432 } 433 434 @Override 435 public synchronized Optional<CompactionContext> selectCompaction() { 436 CompactionContext ctx = new TestCompactionContext(new ArrayList<>(notCompacting)); 437 compacting.addAll(notCompacting); 438 notCompacting.clear(); 439 try { 440 ctx.select(null, false, false, false); 441 } catch (IOException ex) { 442 fail("Shouldn't happen"); 443 } 444 return Optional.of(ctx); 445 } 446 447 @Override 448 public synchronized void cancelCompaction(Object object) { 449 TestCompactionContext ctx = (TestCompactionContext)object; 450 compacting.removeAll(ctx.selectedFiles); 451 notCompacting.addAll(ctx.selectedFiles); 452 } 453 454 public synchronized void finishCompaction(List<HStoreFile> sfs) { 455 if (sfs.isEmpty()) return; 456 synchronized (results) { 457 results.add(sfs.size()); 458 } 459 compacting.removeAll(sfs); 460 } 461 462 @Override 463 public int getPriority() { 464 return 7 - compacting.size() - notCompacting.size(); 465 } 466 } 467 468 public class BlockingStoreMockMaker extends StatefulStoreMockMaker { 469 BlockingCompactionContext blocked = null; 470 471 public class BlockingCompactionContext extends CompactionContext { 472 public volatile boolean isInCompact = false; 473 474 public void unblock() { 475 synchronized (this) { 476 this.notifyAll(); 477 } 478 } 479 480 @Override 481 public List<Path> compact(ThroughputController throughputController, User user) 482 throws IOException { 483 try { 484 isInCompact = true; 485 synchronized (this) { 486 this.wait(); 487 } 488 } catch (InterruptedException e) { 489 Assume.assumeNoException(e); 490 } 491 return new ArrayList<>(); 492 } 493 494 @Override 495 public List<HStoreFile> preSelect(List<HStoreFile> filesCompacting) { 496 return new ArrayList<>(); 497 } 498 499 @Override 500 public boolean select(List<HStoreFile> f, boolean i, boolean m, boolean e) 501 throws IOException { 502 this.request = new CompactionRequestImpl(new ArrayList<>()); 503 return true; 504 } 505 } 506 507 @Override 508 public Optional<CompactionContext> selectCompaction() { 509 this.blocked = new BlockingCompactionContext(); 510 try { 511 this.blocked.select(null, false, false, false); 512 } catch (IOException ex) { 513 fail("Shouldn't happen"); 514 } 515 return Optional.of(blocked); 516 } 517 518 @Override 519 public void cancelCompaction(Object object) {} 520 521 @Override 522 public int getPriority() { 523 return Integer.MIN_VALUE; // some invalid value, see createStoreMock 524 } 525 526 public BlockingCompactionContext waitForBlocking() { 527 while (this.blocked == null || !this.blocked.isInCompact) { 528 Threads.sleepWithoutInterrupt(50); 529 } 530 BlockingCompactionContext ctx = this.blocked; 531 this.blocked = null; 532 return ctx; 533 } 534 535 @Override 536 public HStore createStoreMock(String name) throws Exception { 537 return createStoreMock(Integer.MIN_VALUE, name); 538 } 539 540 public HStore createStoreMock(int priority, String name) throws Exception { 541 // Override the mock to always return the specified priority. 542 HStore s = super.createStoreMock(name); 543 when(s.getCompactPriority()).thenReturn(priority); 544 return s; 545 } 546 } 547 548 /** Test compaction priority management and multiple compactions per store (HBASE-8665). */ 549 @Test 550 public void testCompactionQueuePriorities() throws Exception { 551 // Setup a compact/split thread on a mock server. 552 final Configuration conf = HBaseConfiguration.create(); 553 HRegionServer mockServer = mock(HRegionServer.class); 554 when(mockServer.isStopped()).thenReturn(false); 555 when(mockServer.getConfiguration()).thenReturn(conf); 556 when(mockServer.getChoreService()).thenReturn(new ChoreService("test")); 557 CompactSplit cst = new CompactSplit(mockServer); 558 when(mockServer.getCompactSplitThread()).thenReturn(cst); 559 //prevent large compaction thread pool stealing job from small compaction queue. 560 cst.shutdownLongCompactions(); 561 // Set up the region mock that redirects compactions. 562 HRegion r = mock(HRegion.class); 563 when( 564 r.compact(any(), any(), any(), any())).then(new Answer<Boolean>() { 565 @Override 566 public Boolean answer(InvocationOnMock invocation) throws Throwable { 567 invocation.<CompactionContext>getArgument(0).compact(invocation.getArgument(2), null); 568 return true; 569 } 570 }); 571 572 // Set up store mocks for 2 "real" stores and the one we use for blocking CST. 573 ArrayList<Integer> results = new ArrayList<>(); 574 StoreMockMaker sm = new StoreMockMaker(results), sm2 = new StoreMockMaker(results); 575 HStore store = sm.createStoreMock("store1"); 576 HStore store2 = sm2.createStoreMock("store2"); 577 BlockingStoreMockMaker blocker = new BlockingStoreMockMaker(); 578 579 // First, block the compaction thread so that we could muck with queue. 580 cst.requestSystemCompaction(r, blocker.createStoreMock(1, "b-pri1"), "b-pri1"); 581 BlockingStoreMockMaker.BlockingCompactionContext currentBlock = blocker.waitForBlocking(); 582 583 // Add 4 files to store1, 3 to store2, and queue compactions; pri 3 and 4 respectively. 584 for (int i = 0; i < 4; ++i) { 585 sm.notCompacting.add(createFile()); 586 } 587 cst.requestSystemCompaction(r, store, "s1-pri3"); 588 for (int i = 0; i < 3; ++i) { 589 sm2.notCompacting.add(createFile()); 590 } 591 cst.requestSystemCompaction(r, store2, "s2-pri4"); 592 // Now add 2 more files to store1 and queue compaction - pri 1. 593 for (int i = 0; i < 2; ++i) { 594 sm.notCompacting.add(createFile()); 595 } 596 cst.requestSystemCompaction(r, store, "s1-pri1"); 597 // Finally add blocking compaction with priority 2. 598 cst.requestSystemCompaction(r, blocker.createStoreMock(2, "b-pri2"), "b-pri2"); 599 600 // Unblock the blocking compaction; we should run pri1 and become block again in pri2. 601 currentBlock.unblock(); 602 currentBlock = blocker.waitForBlocking(); 603 // Pri1 should have "compacted" all 6 files. 604 assertEquals(1, results.size()); 605 assertEquals(6, results.get(0).intValue()); 606 // Add 2 files to store 1 (it has 2 files now). 607 for (int i = 0; i < 2; ++i) { 608 sm.notCompacting.add(createFile()); 609 } 610 // Now we have pri4 for store 2 in queue, and pri3 for store1; store1's current priority 611 // is 5, however, so it must not preempt store 2. Add blocking compaction at the end. 612 cst.requestSystemCompaction(r, blocker.createStoreMock(7, "b-pri7"), "b-pri7"); 613 currentBlock.unblock(); 614 currentBlock = blocker.waitForBlocking(); 615 assertEquals(3, results.size()); 616 assertEquals(3, results.get(1).intValue()); // 3 files should go before 2 files. 617 assertEquals(2, results.get(2).intValue()); 618 619 currentBlock.unblock(); 620 cst.interruptIfNecessary(); 621 } 622 623 /** 624 * Firstly write 10 cells (with different time stamp) to a qualifier and flush 625 * to hfile1, then write 10 cells (with different time stamp) to the same 626 * qualifier and flush to hfile2. The latest cell (cell-A) in hfile1 and the 627 * oldest cell (cell-B) in hfile2 are with the same time stamp but different 628 * sequence id, and will get scanned successively during compaction. 629 * <p/> 630 * We set compaction.kv.max to 10 so compaction will scan 10 versions each 631 * round, meanwhile we set keepSeqIdPeriod=0 in {@link DummyCompactor} so all 632 * 10 versions of hfile2 will be written out with seqId cleaned (set to 0) 633 * including cell-B, then when scanner goes to cell-A it will cause a scan 634 * out-of-order assertion error before HBASE-16931 635 * 636 * @throws Exception 637 * if error occurs during the test 638 */ 639 @Test 640 public void testCompactionSeqId() throws Exception { 641 final byte[] ROW = Bytes.toBytes("row"); 642 final byte[] QUALIFIER = Bytes.toBytes("qualifier"); 643 644 long timestamp = 10000; 645 646 // row1/cf:a/10009/Put/vlen=2/seqid=11 V: v9 647 // row1/cf:a/10008/Put/vlen=2/seqid=10 V: v8 648 // row1/cf:a/10007/Put/vlen=2/seqid=9 V: v7 649 // row1/cf:a/10006/Put/vlen=2/seqid=8 V: v6 650 // row1/cf:a/10005/Put/vlen=2/seqid=7 V: v5 651 // row1/cf:a/10004/Put/vlen=2/seqid=6 V: v4 652 // row1/cf:a/10003/Put/vlen=2/seqid=5 V: v3 653 // row1/cf:a/10002/Put/vlen=2/seqid=4 V: v2 654 // row1/cf:a/10001/Put/vlen=2/seqid=3 V: v1 655 // row1/cf:a/10000/Put/vlen=2/seqid=2 V: v0 656 for (int i = 0; i < 10; i++) { 657 Put put = new Put(ROW); 658 put.addColumn(FAMILY, QUALIFIER, timestamp + i, Bytes.toBytes("v" + i)); 659 r.put(put); 660 } 661 r.flush(true); 662 663 // row1/cf:a/10018/Put/vlen=3/seqid=16 V: v18 664 // row1/cf:a/10017/Put/vlen=3/seqid=17 V: v17 665 // row1/cf:a/10016/Put/vlen=3/seqid=18 V: v16 666 // row1/cf:a/10015/Put/vlen=3/seqid=19 V: v15 667 // row1/cf:a/10014/Put/vlen=3/seqid=20 V: v14 668 // row1/cf:a/10013/Put/vlen=3/seqid=21 V: v13 669 // row1/cf:a/10012/Put/vlen=3/seqid=22 V: v12 670 // row1/cf:a/10011/Put/vlen=3/seqid=23 V: v11 671 // row1/cf:a/10010/Put/vlen=3/seqid=24 V: v10 672 // row1/cf:a/10009/Put/vlen=2/seqid=25 V: v9 673 for (int i = 18; i > 8; i--) { 674 Put put = new Put(ROW); 675 put.addColumn(FAMILY, QUALIFIER, timestamp + i, Bytes.toBytes("v" + i)); 676 r.put(put); 677 } 678 r.flush(true); 679 r.compact(true); 680 } 681 682 public static class DummyCompactor extends DefaultCompactor { 683 public DummyCompactor(Configuration conf, HStore store) { 684 super(conf, store); 685 this.keepSeqIdPeriod = 0; 686 } 687 } 688 689 private static HStoreFile createFile() throws Exception { 690 HStoreFile sf = mock(HStoreFile.class); 691 when(sf.getPath()).thenReturn(new Path("file")); 692 StoreFileReader r = mock(StoreFileReader.class); 693 when(r.length()).thenReturn(10L); 694 when(sf.getReader()).thenReturn(r); 695 return sf; 696 } 697 698 /** 699 * Simple {@link CompactionLifeCycleTracker} on which you can wait until the requested compaction 700 * finishes. 701 */ 702 public static class Tracker implements CompactionLifeCycleTracker { 703 704 private final CountDownLatch done; 705 706 public Tracker(CountDownLatch done) { 707 this.done = done; 708 } 709 710 @Override 711 public void afterExecution(Store store) { 712 done.countDown(); 713 } 714 } 715}