001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.HBaseTestingUtil.START_KEY; 021import static org.apache.hadoop.hbase.HBaseTestingUtil.START_KEY_BYTES; 022import static org.apache.hadoop.hbase.HBaseTestingUtil.fam1; 023import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER; 024import static org.apache.hadoop.hbase.regionserver.compactions.CloseChecker.SIZE_LIMIT_KEY; 025import static org.apache.hadoop.hbase.regionserver.compactions.CloseChecker.TIME_LIMIT_KEY; 026import static org.junit.Assert.assertEquals; 027import static org.junit.Assert.assertFalse; 028import static org.junit.Assert.assertThrows; 029import static org.junit.Assert.assertTrue; 030import static org.junit.Assert.fail; 031import static org.mockito.ArgumentMatchers.any; 032import static org.mockito.Mockito.doAnswer; 033import static org.mockito.Mockito.mock; 034import static org.mockito.Mockito.spy; 035import static org.mockito.Mockito.when; 036 037import java.io.IOException; 038import java.util.ArrayList; 039import java.util.Collection; 040import java.util.Collections; 041import java.util.List; 042import java.util.Optional; 043import java.util.concurrent.CountDownLatch; 044import java.util.concurrent.TimeUnit; 045import org.apache.hadoop.conf.Configuration; 046import org.apache.hadoop.fs.FSDataOutputStream; 047import org.apache.hadoop.fs.FileStatus; 048import org.apache.hadoop.fs.FileSystem; 049import org.apache.hadoop.fs.Path; 050import org.apache.hadoop.hbase.ChoreService; 051import org.apache.hadoop.hbase.HBaseClassTestRule; 052import org.apache.hadoop.hbase.HBaseConfiguration; 053import org.apache.hadoop.hbase.HBaseTestingUtil; 054import org.apache.hadoop.hbase.HConstants; 055import org.apache.hadoop.hbase.HTestConst; 056import org.apache.hadoop.hbase.Waiter; 057import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 058import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 059import org.apache.hadoop.hbase.client.Delete; 060import org.apache.hadoop.hbase.client.Durability; 061import org.apache.hadoop.hbase.client.Put; 062import org.apache.hadoop.hbase.client.Table; 063import org.apache.hadoop.hbase.client.TableDescriptor; 064import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 065import org.apache.hadoop.hbase.io.hfile.HFileScanner; 066import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; 067import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 068import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; 069import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; 070import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory; 071import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; 072import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 073import org.apache.hadoop.hbase.security.User; 074import org.apache.hadoop.hbase.testclassification.MediumTests; 075import org.apache.hadoop.hbase.testclassification.RegionServerTests; 076import org.apache.hadoop.hbase.util.Bytes; 077import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 078import org.apache.hadoop.hbase.util.Threads; 079import org.apache.hadoop.hbase.wal.WAL; 080import org.junit.After; 081import org.junit.Assume; 082import org.junit.Before; 083import org.junit.ClassRule; 084import org.junit.Rule; 085import org.junit.Test; 086import org.junit.experimental.categories.Category; 087import org.junit.rules.TestName; 088import org.mockito.Mockito; 089import org.mockito.invocation.InvocationOnMock; 090import org.mockito.stubbing.Answer; 091 092/** 093 * Test compaction framework and common functions 094 */ 095@Category({ RegionServerTests.class, MediumTests.class }) 096public class TestCompaction { 097 098 @ClassRule 099 public static final HBaseClassTestRule CLASS_RULE = 100 HBaseClassTestRule.forClass(TestCompaction.class); 101 102 @Rule 103 public TestName name = new TestName(); 104 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 105 protected Configuration conf = UTIL.getConfiguration(); 106 107 private HRegion r = null; 108 private TableDescriptor tableDescriptor = null; 109 private static final byte[] COLUMN_FAMILY = fam1; 110 private final byte[] STARTROW = Bytes.toBytes(START_KEY); 111 private static final byte[] COLUMN_FAMILY_TEXT = COLUMN_FAMILY; 112 private int compactionThreshold; 113 private byte[] secondRowBytes, thirdRowBytes; 114 private static final long MAX_FILES_TO_COMPACT = 10; 115 private final byte[] FAMILY = Bytes.toBytes("cf"); 116 117 /** constructor */ 118 public TestCompaction() { 119 super(); 120 121 // Set cache flush size to 1MB 122 conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024); 123 conf.setInt(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, 100); 124 conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, 125 NoLimitThroughputController.class.getName()); 126 compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3); 127 128 secondRowBytes = START_KEY_BYTES.clone(); 129 // Increment the least significant character so we get to next row. 130 secondRowBytes[START_KEY_BYTES.length - 1]++; 131 thirdRowBytes = START_KEY_BYTES.clone(); 132 thirdRowBytes[START_KEY_BYTES.length - 1] = 133 (byte) (thirdRowBytes[START_KEY_BYTES.length - 1] + 2); 134 } 135 136 @Before 137 public void setUp() throws Exception { 138 TableDescriptorBuilder builder = UTIL.createModifyableTableDescriptor(name.getMethodName()); 139 if (name.getMethodName().equals("testCompactionSeqId")) { 140 UTIL.getConfiguration().set("hbase.hstore.compaction.kv.max", "10"); 141 UTIL.getConfiguration().set(DefaultStoreEngine.DEFAULT_COMPACTOR_CLASS_KEY, 142 DummyCompactor.class.getName()); 143 ColumnFamilyDescriptor familyDescriptor = 144 ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setMaxVersions(65536).build(); 145 builder.setColumnFamily(familyDescriptor); 146 } 147 this.tableDescriptor = builder.build(); 148 this.r = UTIL.createLocalHRegion(tableDescriptor, null, null); 149 } 150 151 @After 152 public void tearDown() throws Exception { 153 WAL wal = r.getWAL(); 154 this.r.close(); 155 wal.close(); 156 } 157 158 /** 159 * Verify that you can stop a long-running compaction (used during RS shutdown) 160 */ 161 @Test 162 public void testInterruptCompactionBySize() throws Exception { 163 assertEquals(0, count()); 164 165 // lower the polling interval for this test 166 conf.setInt(SIZE_LIMIT_KEY, 10 * 1000 /* 10 KB */); 167 168 try { 169 // Create a couple store files w/ 15KB (over 10KB interval) 170 int jmax = (int) Math.ceil(15.0 / compactionThreshold); 171 byte[] pad = new byte[1000]; // 1 KB chunk 172 for (int i = 0; i < compactionThreshold; i++) { 173 Table loader = new RegionAsTable(r); 174 Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i))); 175 p.setDurability(Durability.SKIP_WAL); 176 for (int j = 0; j < jmax; j++) { 177 p.addColumn(COLUMN_FAMILY, Bytes.toBytes(j), pad); 178 } 179 HTestConst.addContent(loader, Bytes.toString(COLUMN_FAMILY)); 180 loader.put(p); 181 r.flush(true); 182 } 183 184 HRegion spyR = spy(r); 185 doAnswer(new Answer<Object>() { 186 @Override 187 public Object answer(InvocationOnMock invocation) throws Throwable { 188 r.writestate.writesEnabled = false; 189 return invocation.callRealMethod(); 190 } 191 }).when(spyR).doRegionCompactionPrep(); 192 193 // force a minor compaction, but not before requesting a stop 194 spyR.compactStores(); 195 196 // ensure that the compaction stopped, all old files are intact, 197 HStore s = r.getStore(COLUMN_FAMILY); 198 assertEquals(compactionThreshold, s.getStorefilesCount()); 199 assertTrue(s.getStorefilesSize() > 15 * 1000); 200 // and no new store files persisted past compactStores() 201 // only one empty dir exists in temp dir 202 FileStatus[] ls = r.getFilesystem().listStatus(r.getRegionFileSystem().getTempDir()); 203 assertEquals(1, ls.length); 204 Path storeTempDir = 205 new Path(r.getRegionFileSystem().getTempDir(), Bytes.toString(COLUMN_FAMILY)); 206 assertTrue(r.getFilesystem().exists(storeTempDir)); 207 ls = r.getFilesystem().listStatus(storeTempDir); 208 assertEquals(0, ls.length); 209 } finally { 210 // don't mess up future tests 211 r.writestate.writesEnabled = true; 212 conf.setInt(SIZE_LIMIT_KEY, 10 * 1000 * 1000 /* 10 MB */); 213 214 // Delete all Store information once done using 215 for (int i = 0; i < compactionThreshold; i++) { 216 Delete delete = new Delete(Bytes.add(STARTROW, Bytes.toBytes(i))); 217 byte[][] famAndQf = { COLUMN_FAMILY, null }; 218 delete.addFamily(famAndQf[0]); 219 r.delete(delete); 220 } 221 r.flush(true); 222 223 // Multiple versions allowed for an entry, so the delete isn't enough 224 // Lower TTL and expire to ensure that all our entries have been wiped 225 final int ttl = 1000; 226 for (HStore store : this.r.stores.values()) { 227 ScanInfo old = store.getScanInfo(); 228 ScanInfo si = old.customize(old.getMaxVersions(), ttl, old.getKeepDeletedCells()); 229 store.setScanInfo(si); 230 } 231 Thread.sleep(ttl); 232 233 r.compact(true); 234 assertEquals(0, count()); 235 } 236 } 237 238 @Test 239 public void testInterruptCompactionByTime() throws Exception { 240 assertEquals(0, count()); 241 242 // lower the polling interval for this test 243 conf.setLong(TIME_LIMIT_KEY, 1 /* 1ms */); 244 245 try { 246 // Create a couple store files w/ 15KB (over 10KB interval) 247 int jmax = (int) Math.ceil(15.0 / compactionThreshold); 248 byte[] pad = new byte[1000]; // 1 KB chunk 249 for (int i = 0; i < compactionThreshold; i++) { 250 Table loader = new RegionAsTable(r); 251 Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i))); 252 p.setDurability(Durability.SKIP_WAL); 253 for (int j = 0; j < jmax; j++) { 254 p.addColumn(COLUMN_FAMILY, Bytes.toBytes(j), pad); 255 } 256 HTestConst.addContent(loader, Bytes.toString(COLUMN_FAMILY)); 257 loader.put(p); 258 r.flush(true); 259 } 260 261 HRegion spyR = spy(r); 262 doAnswer(new Answer<Object>() { 263 @Override 264 public Object answer(InvocationOnMock invocation) throws Throwable { 265 r.writestate.writesEnabled = false; 266 return invocation.callRealMethod(); 267 } 268 }).when(spyR).doRegionCompactionPrep(); 269 270 // force a minor compaction, but not before requesting a stop 271 spyR.compactStores(); 272 273 // ensure that the compaction stopped, all old files are intact, 274 HStore s = r.getStore(COLUMN_FAMILY); 275 assertEquals(compactionThreshold, s.getStorefilesCount()); 276 assertTrue(s.getStorefilesSize() > 15 * 1000); 277 // and no new store files persisted past compactStores() 278 // only one empty dir exists in temp dir 279 FileStatus[] ls = r.getFilesystem().listStatus(r.getRegionFileSystem().getTempDir()); 280 assertEquals(1, ls.length); 281 Path storeTempDir = 282 new Path(r.getRegionFileSystem().getTempDir(), Bytes.toString(COLUMN_FAMILY)); 283 assertTrue(r.getFilesystem().exists(storeTempDir)); 284 ls = r.getFilesystem().listStatus(storeTempDir); 285 assertEquals(0, ls.length); 286 } finally { 287 // don't mess up future tests 288 r.writestate.writesEnabled = true; 289 conf.setLong(TIME_LIMIT_KEY, 10 * 1000L /* 10 s */); 290 291 // Delete all Store information once done using 292 for (int i = 0; i < compactionThreshold; i++) { 293 Delete delete = new Delete(Bytes.add(STARTROW, Bytes.toBytes(i))); 294 byte[][] famAndQf = { COLUMN_FAMILY, null }; 295 delete.addFamily(famAndQf[0]); 296 r.delete(delete); 297 } 298 r.flush(true); 299 300 // Multiple versions allowed for an entry, so the delete isn't enough 301 // Lower TTL and expire to ensure that all our entries have been wiped 302 final int ttl = 1000; 303 for (HStore store : this.r.stores.values()) { 304 ScanInfo old = store.getScanInfo(); 305 ScanInfo si = old.customize(old.getMaxVersions(), ttl, old.getKeepDeletedCells()); 306 store.setScanInfo(si); 307 } 308 Thread.sleep(ttl); 309 310 r.compact(true); 311 assertEquals(0, count()); 312 } 313 } 314 315 private int count() throws IOException { 316 int count = 0; 317 for (HStoreFile f : this.r.stores.get(COLUMN_FAMILY_TEXT).getStorefiles()) { 318 HFileScanner scanner = f.getReader().getScanner(false, false); 319 if (!scanner.seekTo()) { 320 continue; 321 } 322 do { 323 count++; 324 } while (scanner.next()); 325 } 326 return count; 327 } 328 329 private void createStoreFile(final HRegion region) throws IOException { 330 createStoreFile(region, Bytes.toString(COLUMN_FAMILY)); 331 } 332 333 private void createStoreFile(final HRegion region, String family) throws IOException { 334 Table loader = new RegionAsTable(region); 335 HTestConst.addContent(loader, family); 336 region.flush(true); 337 } 338 339 @Test 340 public void testCompactionWithCorruptResult() throws Exception { 341 int nfiles = 10; 342 for (int i = 0; i < nfiles; i++) { 343 createStoreFile(r); 344 } 345 HStore store = r.getStore(COLUMN_FAMILY); 346 347 Collection<HStoreFile> storeFiles = store.getStorefiles(); 348 DefaultCompactor tool = (DefaultCompactor) store.storeEngine.getCompactor(); 349 CompactionRequestImpl request = new CompactionRequestImpl(storeFiles); 350 tool.compact(request, NoLimitThroughputController.INSTANCE, null); 351 352 // Now lets corrupt the compacted file. 353 FileSystem fs = store.getFileSystem(); 354 // default compaction policy created one and only one new compacted file 355 Path tmpPath = store.getRegionFileSystem().createTempName(); 356 try (FSDataOutputStream stream = fs.create(tmpPath, null, true, 512, (short) 3, 1024L, null)) { 357 stream.writeChars("CORRUPT FILE!!!!"); 358 } 359 // The complete compaction should fail and the corrupt file should remain 360 // in the 'tmp' directory; 361 assertThrows(IOException.class, () -> store.doCompaction(null, null, null, 362 EnvironmentEdgeManager.currentTime(), Collections.singletonList(tmpPath))); 363 assertTrue(fs.exists(tmpPath)); 364 } 365 366 /** 367 * Create a custom compaction request and be sure that we can track it through the queue, knowing 368 * when the compaction is completed. 369 */ 370 @Test 371 public void testTrackingCompactionRequest() throws Exception { 372 // setup a compact/split thread on a mock server 373 HRegionServer mockServer = Mockito.mock(HRegionServer.class); 374 Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf()); 375 CompactSplit thread = new CompactSplit(mockServer); 376 Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread); 377 378 // setup a region/store with some files 379 HStore store = r.getStore(COLUMN_FAMILY); 380 createStoreFile(r); 381 for (int i = 0; i < MAX_FILES_TO_COMPACT + 1; i++) { 382 createStoreFile(r); 383 } 384 385 CountDownLatch latch = new CountDownLatch(1); 386 Tracker tracker = new Tracker(latch); 387 thread.requestCompaction(r, store, "test custom comapction", PRIORITY_USER, tracker, null); 388 // wait for the latch to complete. 389 latch.await(); 390 391 thread.interruptIfNecessary(); 392 } 393 394 @Test 395 public void testCompactionFailure() throws Exception { 396 // setup a compact/split thread on a mock server 397 HRegionServer mockServer = Mockito.mock(HRegionServer.class); 398 Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf()); 399 CompactSplit thread = new CompactSplit(mockServer); 400 Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread); 401 402 // setup a region/store with some files 403 HStore store = r.getStore(COLUMN_FAMILY); 404 createStoreFile(r); 405 for (int i = 0; i < HStore.DEFAULT_BLOCKING_STOREFILE_COUNT - 1; i++) { 406 createStoreFile(r); 407 } 408 409 HRegion mockRegion = Mockito.spy(r); 410 Mockito.when(mockRegion.checkSplit()) 411 .thenThrow(new RuntimeException("Thrown intentionally by test!")); 412 413 try (MetricsRegionWrapperImpl metricsWrapper = new MetricsRegionWrapperImpl(r)) { 414 415 long preCompletedCount = metricsWrapper.getNumCompactionsCompleted(); 416 long preFailedCount = metricsWrapper.getNumCompactionsFailed(); 417 418 CountDownLatch latch = new CountDownLatch(1); 419 Tracker tracker = new Tracker(latch); 420 thread.requestCompaction(mockRegion, store, "test custom comapction", PRIORITY_USER, tracker, 421 null); 422 // wait for the latch to complete. 423 latch.await(120, TimeUnit.SECONDS); 424 425 // compaction should have completed and been marked as failed due to error in split request 426 long postCompletedCount = metricsWrapper.getNumCompactionsCompleted(); 427 long postFailedCount = metricsWrapper.getNumCompactionsFailed(); 428 429 assertTrue("Completed count should have increased (pre=" + preCompletedCount + ", post=" 430 + postCompletedCount + ")", postCompletedCount > preCompletedCount); 431 assertTrue("Failed count should have increased (pre=" + preFailedCount + ", post=" 432 + postFailedCount + ")", postFailedCount > preFailedCount); 433 } 434 } 435 436 /** 437 * Test no new Compaction requests are generated after calling stop compactions 438 */ 439 @Test 440 public void testStopStartCompaction() throws IOException { 441 // setup a compact/split thread on a mock server 442 HRegionServer mockServer = Mockito.mock(HRegionServer.class); 443 Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf()); 444 final CompactSplit thread = new CompactSplit(mockServer); 445 Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread); 446 // setup a region/store with some files 447 HStore store = r.getStore(COLUMN_FAMILY); 448 createStoreFile(r); 449 for (int i = 0; i < HStore.DEFAULT_BLOCKING_STOREFILE_COUNT - 1; i++) { 450 createStoreFile(r); 451 } 452 thread.switchCompaction(false); 453 thread.requestCompaction(r, store, "test", Store.PRIORITY_USER, 454 CompactionLifeCycleTracker.DUMMY, null); 455 assertFalse(thread.isCompactionsEnabled()); 456 int longCompactions = thread.getLongCompactions().getActiveCount(); 457 int shortCompactions = thread.getShortCompactions().getActiveCount(); 458 assertEquals( 459 "longCompactions=" + longCompactions + "," + "shortCompactions=" + shortCompactions, 0, 460 longCompactions + shortCompactions); 461 thread.switchCompaction(true); 462 assertTrue(thread.isCompactionsEnabled()); 463 // Make sure no compactions have run. 464 assertEquals(0, thread.getLongCompactions().getCompletedTaskCount() 465 + thread.getShortCompactions().getCompletedTaskCount()); 466 // Request a compaction and make sure it is submitted successfully. 467 thread.requestCompaction(r, store, "test", Store.PRIORITY_USER, 468 CompactionLifeCycleTracker.DUMMY, null); 469 // Wait until the compaction finishes. 470 Waiter.waitFor(UTIL.getConfiguration(), 5000, 471 (Waiter.Predicate<Exception>) () -> thread.getLongCompactions().getCompletedTaskCount() 472 + thread.getShortCompactions().getCompletedTaskCount() == 1); 473 // Make sure there are no compactions running. 474 assertEquals(0, 475 thread.getLongCompactions().getActiveCount() + thread.getShortCompactions().getActiveCount()); 476 } 477 478 @Test 479 public void testInterruptingRunningCompactions() throws Exception { 480 // setup a compact/split thread on a mock server 481 conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, 482 WaitThroughPutController.class.getName()); 483 HRegionServer mockServer = Mockito.mock(HRegionServer.class); 484 Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf()); 485 CompactSplit thread = new CompactSplit(mockServer); 486 487 Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread); 488 489 // setup a region/store with some files 490 HStore store = r.getStore(COLUMN_FAMILY); 491 int jmax = (int) Math.ceil(15.0 / compactionThreshold); 492 byte[] pad = new byte[1000]; // 1 KB chunk 493 for (int i = 0; i < compactionThreshold; i++) { 494 Table loader = new RegionAsTable(r); 495 Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i))); 496 p.setDurability(Durability.SKIP_WAL); 497 for (int j = 0; j < jmax; j++) { 498 p.addColumn(COLUMN_FAMILY, Bytes.toBytes(j), pad); 499 } 500 HTestConst.addContent(loader, Bytes.toString(COLUMN_FAMILY)); 501 loader.put(p); 502 r.flush(true); 503 } 504 HStore s = r.getStore(COLUMN_FAMILY); 505 int initialFiles = s.getStorefilesCount(); 506 507 thread.requestCompaction(r, store, "test custom comapction", PRIORITY_USER, 508 CompactionLifeCycleTracker.DUMMY, null); 509 510 Thread.sleep(3000); 511 thread.switchCompaction(false); 512 assertEquals(initialFiles, s.getStorefilesCount()); 513 // don't mess up future tests 514 thread.switchCompaction(true); 515 } 516 517 /** 518 * HBASE-7947: Regression test to ensure adding to the correct list in the {@link CompactSplit} 519 * @throws Exception on failure 520 */ 521 @Test 522 public void testMultipleCustomCompactionRequests() throws Exception { 523 // setup a compact/split thread on a mock server 524 HRegionServer mockServer = Mockito.mock(HRegionServer.class); 525 Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf()); 526 CompactSplit thread = new CompactSplit(mockServer); 527 Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread); 528 529 // setup a region/store with some files 530 int numStores = r.getStores().size(); 531 CountDownLatch latch = new CountDownLatch(numStores); 532 Tracker tracker = new Tracker(latch); 533 // create some store files and setup requests for each store on which we want to do a 534 // compaction 535 for (HStore store : r.getStores()) { 536 createStoreFile(r, store.getColumnFamilyName()); 537 createStoreFile(r, store.getColumnFamilyName()); 538 createStoreFile(r, store.getColumnFamilyName()); 539 thread.requestCompaction(r, store, "test mulitple custom comapctions", PRIORITY_USER, tracker, 540 null); 541 } 542 // wait for the latch to complete. 543 latch.await(); 544 545 thread.interruptIfNecessary(); 546 } 547 548 class StoreMockMaker extends StatefulStoreMockMaker { 549 public ArrayList<HStoreFile> compacting = new ArrayList<>(); 550 public ArrayList<HStoreFile> notCompacting = new ArrayList<>(); 551 private final ArrayList<Integer> results; 552 553 public StoreMockMaker(ArrayList<Integer> results) { 554 this.results = results; 555 } 556 557 public class TestCompactionContext extends CompactionContext { 558 559 private List<HStoreFile> selectedFiles; 560 561 public TestCompactionContext(List<HStoreFile> selectedFiles) { 562 super(); 563 this.selectedFiles = selectedFiles; 564 } 565 566 @Override 567 public List<HStoreFile> preSelect(List<HStoreFile> filesCompacting) { 568 return new ArrayList<>(); 569 } 570 571 @Override 572 public boolean select(List<HStoreFile> filesCompacting, boolean isUserCompaction, 573 boolean mayUseOffPeak, boolean forceMajor) throws IOException { 574 this.request = new CompactionRequestImpl(selectedFiles); 575 this.request.setPriority(getPriority()); 576 return true; 577 } 578 579 @Override 580 public List<Path> compact(ThroughputController throughputController, User user) 581 throws IOException { 582 finishCompaction(this.selectedFiles); 583 return new ArrayList<>(); 584 } 585 } 586 587 @Override 588 public synchronized Optional<CompactionContext> selectCompaction() { 589 CompactionContext ctx = new TestCompactionContext(new ArrayList<>(notCompacting)); 590 compacting.addAll(notCompacting); 591 notCompacting.clear(); 592 try { 593 ctx.select(null, false, false, false); 594 } catch (IOException ex) { 595 fail("Shouldn't happen"); 596 } 597 return Optional.of(ctx); 598 } 599 600 @Override 601 public synchronized void cancelCompaction(Object object) { 602 TestCompactionContext ctx = (TestCompactionContext) object; 603 compacting.removeAll(ctx.selectedFiles); 604 notCompacting.addAll(ctx.selectedFiles); 605 } 606 607 public synchronized void finishCompaction(List<HStoreFile> sfs) { 608 if (sfs.isEmpty()) return; 609 synchronized (results) { 610 results.add(sfs.size()); 611 } 612 compacting.removeAll(sfs); 613 } 614 615 @Override 616 public int getPriority() { 617 return 7 - compacting.size() - notCompacting.size(); 618 } 619 } 620 621 public class BlockingStoreMockMaker extends StatefulStoreMockMaker { 622 BlockingCompactionContext blocked = null; 623 624 public class BlockingCompactionContext extends CompactionContext { 625 public volatile boolean isInCompact = false; 626 627 public void unblock() { 628 synchronized (this) { 629 this.notifyAll(); 630 } 631 } 632 633 @Override 634 public List<Path> compact(ThroughputController throughputController, User user) 635 throws IOException { 636 try { 637 isInCompact = true; 638 synchronized (this) { 639 this.wait(); 640 } 641 } catch (InterruptedException e) { 642 Assume.assumeNoException(e); 643 } 644 return new ArrayList<>(); 645 } 646 647 @Override 648 public List<HStoreFile> preSelect(List<HStoreFile> filesCompacting) { 649 return new ArrayList<>(); 650 } 651 652 @Override 653 public boolean select(List<HStoreFile> f, boolean i, boolean m, boolean e) 654 throws IOException { 655 this.request = new CompactionRequestImpl(new ArrayList<>()); 656 return true; 657 } 658 } 659 660 @Override 661 public Optional<CompactionContext> selectCompaction() { 662 this.blocked = new BlockingCompactionContext(); 663 try { 664 this.blocked.select(null, false, false, false); 665 } catch (IOException ex) { 666 fail("Shouldn't happen"); 667 } 668 return Optional.of(blocked); 669 } 670 671 @Override 672 public void cancelCompaction(Object object) { 673 } 674 675 @Override 676 public int getPriority() { 677 return Integer.MIN_VALUE; // some invalid value, see createStoreMock 678 } 679 680 public BlockingCompactionContext waitForBlocking() { 681 while (this.blocked == null || !this.blocked.isInCompact) { 682 Threads.sleepWithoutInterrupt(50); 683 } 684 BlockingCompactionContext ctx = this.blocked; 685 this.blocked = null; 686 return ctx; 687 } 688 689 @Override 690 public HStore createStoreMock(String name) throws Exception { 691 return createStoreMock(Integer.MIN_VALUE, name); 692 } 693 694 public HStore createStoreMock(int priority, String name) throws Exception { 695 // Override the mock to always return the specified priority. 696 HStore s = super.createStoreMock(name); 697 when(s.getCompactPriority()).thenReturn(priority); 698 return s; 699 } 700 } 701 702 /** Test compaction priority management and multiple compactions per store (HBASE-8665). */ 703 @Test 704 public void testCompactionQueuePriorities() throws Exception { 705 // Setup a compact/split thread on a mock server. 706 final Configuration conf = HBaseConfiguration.create(); 707 HRegionServer mockServer = mock(HRegionServer.class); 708 when(mockServer.isStopped()).thenReturn(false); 709 when(mockServer.getConfiguration()).thenReturn(conf); 710 when(mockServer.getChoreService()).thenReturn(new ChoreService("test")); 711 CompactSplit cst = new CompactSplit(mockServer); 712 when(mockServer.getCompactSplitThread()).thenReturn(cst); 713 // prevent large compaction thread pool stealing job from small compaction queue. 714 cst.shutdownLongCompactions(); 715 // Set up the region mock that redirects compactions. 716 HRegion r = mock(HRegion.class); 717 when(r.compact(any(), any(), any(), any())).then(new Answer<Boolean>() { 718 @Override 719 public Boolean answer(InvocationOnMock invocation) throws Throwable { 720 invocation.<CompactionContext> getArgument(0).compact(invocation.getArgument(2), null); 721 return true; 722 } 723 }); 724 725 // Set up store mocks for 2 "real" stores and the one we use for blocking CST. 726 ArrayList<Integer> results = new ArrayList<>(); 727 StoreMockMaker sm = new StoreMockMaker(results), sm2 = new StoreMockMaker(results); 728 HStore store = sm.createStoreMock("store1"); 729 HStore store2 = sm2.createStoreMock("store2"); 730 BlockingStoreMockMaker blocker = new BlockingStoreMockMaker(); 731 732 // First, block the compaction thread so that we could muck with queue. 733 cst.requestSystemCompaction(r, blocker.createStoreMock(1, "b-pri1"), "b-pri1"); 734 BlockingStoreMockMaker.BlockingCompactionContext currentBlock = blocker.waitForBlocking(); 735 736 // Add 4 files to store1, 3 to store2, and queue compactions; pri 3 and 4 respectively. 737 for (int i = 0; i < 4; ++i) { 738 sm.notCompacting.add(createFile()); 739 } 740 cst.requestSystemCompaction(r, store, "s1-pri3"); 741 for (int i = 0; i < 3; ++i) { 742 sm2.notCompacting.add(createFile()); 743 } 744 cst.requestSystemCompaction(r, store2, "s2-pri4"); 745 // Now add 2 more files to store1 and queue compaction - pri 1. 746 for (int i = 0; i < 2; ++i) { 747 sm.notCompacting.add(createFile()); 748 } 749 cst.requestSystemCompaction(r, store, "s1-pri1"); 750 // Finally add blocking compaction with priority 2. 751 cst.requestSystemCompaction(r, blocker.createStoreMock(2, "b-pri2"), "b-pri2"); 752 753 // Unblock the blocking compaction; we should run pri1 and become block again in pri2. 754 currentBlock.unblock(); 755 currentBlock = blocker.waitForBlocking(); 756 // Pri1 should have "compacted" all 6 files. 757 assertEquals(1, results.size()); 758 assertEquals(6, results.get(0).intValue()); 759 // Add 2 files to store 1 (it has 2 files now). 760 for (int i = 0; i < 2; ++i) { 761 sm.notCompacting.add(createFile()); 762 } 763 // Now we have pri4 for store 2 in queue, and pri3 for store1; store1's current priority 764 // is 5, however, so it must not preempt store 2. Add blocking compaction at the end. 765 cst.requestSystemCompaction(r, blocker.createStoreMock(7, "b-pri7"), "b-pri7"); 766 currentBlock.unblock(); 767 currentBlock = blocker.waitForBlocking(); 768 assertEquals(3, results.size()); 769 assertEquals(3, results.get(1).intValue()); // 3 files should go before 2 files. 770 assertEquals(2, results.get(2).intValue()); 771 772 currentBlock.unblock(); 773 cst.interruptIfNecessary(); 774 } 775 776 /** 777 * Firstly write 10 cells (with different time stamp) to a qualifier and flush to hfile1, then 778 * write 10 cells (with different time stamp) to the same qualifier and flush to hfile2. The 779 * latest cell (cell-A) in hfile1 and the oldest cell (cell-B) in hfile2 are with the same time 780 * stamp but different sequence id, and will get scanned successively during compaction. 781 * <p/> 782 * We set compaction.kv.max to 10 so compaction will scan 10 versions each round, meanwhile we set 783 * keepSeqIdPeriod=0 in {@link DummyCompactor} so all 10 versions of hfile2 will be written out 784 * with seqId cleaned (set to 0) including cell-B, then when scanner goes to cell-A it will cause 785 * a scan out-of-order assertion error before HBASE-16931 n * if error occurs during the test 786 */ 787 @Test 788 public void testCompactionSeqId() throws Exception { 789 final byte[] ROW = Bytes.toBytes("row"); 790 final byte[] QUALIFIER = Bytes.toBytes("qualifier"); 791 792 long timestamp = 10000; 793 794 // row1/cf:a/10009/Put/vlen=2/seqid=11 V: v9 795 // row1/cf:a/10008/Put/vlen=2/seqid=10 V: v8 796 // row1/cf:a/10007/Put/vlen=2/seqid=9 V: v7 797 // row1/cf:a/10006/Put/vlen=2/seqid=8 V: v6 798 // row1/cf:a/10005/Put/vlen=2/seqid=7 V: v5 799 // row1/cf:a/10004/Put/vlen=2/seqid=6 V: v4 800 // row1/cf:a/10003/Put/vlen=2/seqid=5 V: v3 801 // row1/cf:a/10002/Put/vlen=2/seqid=4 V: v2 802 // row1/cf:a/10001/Put/vlen=2/seqid=3 V: v1 803 // row1/cf:a/10000/Put/vlen=2/seqid=2 V: v0 804 for (int i = 0; i < 10; i++) { 805 Put put = new Put(ROW); 806 put.addColumn(FAMILY, QUALIFIER, timestamp + i, Bytes.toBytes("v" + i)); 807 r.put(put); 808 } 809 r.flush(true); 810 811 // row1/cf:a/10018/Put/vlen=3/seqid=16 V: v18 812 // row1/cf:a/10017/Put/vlen=3/seqid=17 V: v17 813 // row1/cf:a/10016/Put/vlen=3/seqid=18 V: v16 814 // row1/cf:a/10015/Put/vlen=3/seqid=19 V: v15 815 // row1/cf:a/10014/Put/vlen=3/seqid=20 V: v14 816 // row1/cf:a/10013/Put/vlen=3/seqid=21 V: v13 817 // row1/cf:a/10012/Put/vlen=3/seqid=22 V: v12 818 // row1/cf:a/10011/Put/vlen=3/seqid=23 V: v11 819 // row1/cf:a/10010/Put/vlen=3/seqid=24 V: v10 820 // row1/cf:a/10009/Put/vlen=2/seqid=25 V: v9 821 for (int i = 18; i > 8; i--) { 822 Put put = new Put(ROW); 823 put.addColumn(FAMILY, QUALIFIER, timestamp + i, Bytes.toBytes("v" + i)); 824 r.put(put); 825 } 826 r.flush(true); 827 r.compact(true); 828 } 829 830 public static class DummyCompactor extends DefaultCompactor { 831 public DummyCompactor(Configuration conf, HStore store) { 832 super(conf, store); 833 this.keepSeqIdPeriod = 0; 834 } 835 } 836 837 private static HStoreFile createFile() throws Exception { 838 HStoreFile sf = mock(HStoreFile.class); 839 when(sf.getPath()).thenReturn(new Path("file")); 840 StoreFileReader r = mock(StoreFileReader.class); 841 when(r.length()).thenReturn(10L); 842 when(sf.getReader()).thenReturn(r); 843 return sf; 844 } 845 846 /** 847 * Simple {@link CompactionLifeCycleTracker} on which you can wait until the requested compaction 848 * finishes. 849 */ 850 public static class Tracker implements CompactionLifeCycleTracker { 851 852 private final CountDownLatch done; 853 854 public Tracker(CountDownLatch done) { 855 this.done = done; 856 } 857 858 @Override 859 public void afterExecution(Store store) { 860 done.countDown(); 861 } 862 } 863 864 /** 865 * Simple {@link CompactionLifeCycleTracker} on which you can wait until the requested compaction 866 * finishes. 867 */ 868 public static class WaitThroughPutController extends NoLimitThroughputController { 869 870 public WaitThroughPutController() { 871 } 872 873 @Override 874 public long control(String compactionName, long size) throws InterruptedException { 875 Thread.sleep(6000000); 876 return 6000000; 877 } 878 } 879}