001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.HBaseTestingUtil.START_KEY; 021import static org.apache.hadoop.hbase.HBaseTestingUtil.START_KEY_BYTES; 022import static org.apache.hadoop.hbase.HBaseTestingUtil.fam1; 023import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER; 024import static org.apache.hadoop.hbase.regionserver.compactions.CloseChecker.SIZE_LIMIT_KEY; 025import static org.apache.hadoop.hbase.regionserver.compactions.CloseChecker.TIME_LIMIT_KEY; 026import static org.hamcrest.MatcherAssert.assertThat; 027import static org.hamcrest.Matchers.allOf; 028import static org.hamcrest.Matchers.containsString; 029import static org.hamcrest.Matchers.hasProperty; 030import static org.hamcrest.Matchers.instanceOf; 031import static org.hamcrest.Matchers.notNullValue; 032import static org.junit.jupiter.api.Assertions.assertEquals; 033import static org.junit.jupiter.api.Assertions.assertFalse; 034import static org.junit.jupiter.api.Assertions.assertThrows; 035import static org.junit.jupiter.api.Assertions.assertTrue; 036import static org.junit.jupiter.api.Assertions.fail; 037import static org.mockito.ArgumentMatchers.any; 038import static org.mockito.Mockito.doAnswer; 039import static org.mockito.Mockito.mock; 040import static org.mockito.Mockito.spy; 041import static org.mockito.Mockito.when; 042 043import java.io.File; 044import java.io.FileOutputStream; 045import java.io.IOException; 046import java.io.InputStream; 047import java.io.OutputStream; 048import java.util.ArrayList; 049import java.util.Collection; 050import java.util.Collections; 051import java.util.List; 052import java.util.Objects; 053import java.util.Optional; 054import java.util.concurrent.CountDownLatch; 055import java.util.concurrent.TimeUnit; 056import java.util.zip.GZIPInputStream; 057import java.util.zip.GZIPOutputStream; 058import org.apache.hadoop.conf.Configuration; 059import org.apache.hadoop.fs.FSDataOutputStream; 060import org.apache.hadoop.fs.FileStatus; 061import org.apache.hadoop.fs.FileSystem; 062import org.apache.hadoop.fs.Path; 063import org.apache.hadoop.hbase.ChoreService; 064import org.apache.hadoop.hbase.HBaseConfiguration; 065import org.apache.hadoop.hbase.HBaseTestingUtil; 066import org.apache.hadoop.hbase.HConstants; 067import org.apache.hadoop.hbase.HTestConst; 068import org.apache.hadoop.hbase.KeyValue; 069import org.apache.hadoop.hbase.Waiter; 070import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 071import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 072import org.apache.hadoop.hbase.client.Delete; 073import org.apache.hadoop.hbase.client.Durability; 074import org.apache.hadoop.hbase.client.Put; 075import org.apache.hadoop.hbase.client.Table; 076import org.apache.hadoop.hbase.client.TableDescriptor; 077import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 078import org.apache.hadoop.hbase.io.compress.Compression; 079import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; 080import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 081import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; 082import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; 083import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory; 084import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; 085import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 086import org.apache.hadoop.hbase.security.User; 087import org.apache.hadoop.hbase.testclassification.LargeTests; 088import org.apache.hadoop.hbase.testclassification.RegionServerTests; 089import org.apache.hadoop.hbase.util.Bytes; 090import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 091import org.apache.hadoop.hbase.util.Threads; 092import org.apache.hadoop.hbase.wal.WAL; 093import org.apache.hadoop.io.IOUtils; 094import org.junit.jupiter.api.AfterEach; 095import org.junit.jupiter.api.Assumptions; 096import org.junit.jupiter.api.BeforeEach; 097import org.junit.jupiter.api.Disabled; 098import org.junit.jupiter.api.Tag; 099import org.junit.jupiter.api.Test; 100import org.junit.jupiter.api.TestInfo; 101import org.mockito.Mockito; 102import org.mockito.invocation.InvocationOnMock; 103import org.mockito.stubbing.Answer; 104import org.slf4j.LoggerFactory; 105 106/** 107 * Test compaction framework and common functions 108 */ 109@Tag(RegionServerTests.TAG) 110@Tag(LargeTests.TAG) 111public class TestCompaction { 112 113 private String name; 114 115 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 116 protected Configuration conf = UTIL.getConfiguration(); 117 118 private HRegion r = null; 119 private TableDescriptor tableDescriptor = null; 120 private static final byte[] COLUMN_FAMILY = fam1; 121 private final byte[] STARTROW = Bytes.toBytes(START_KEY); 122 private static final byte[] COLUMN_FAMILY_TEXT = COLUMN_FAMILY; 123 private int compactionThreshold; 124 private byte[] secondRowBytes, thirdRowBytes; 125 private static final long MAX_FILES_TO_COMPACT = 10; 126 private final byte[] FAMILY = Bytes.toBytes("cf"); 127 128 /** constructor */ 129 public TestCompaction() { 130 // Set cache flush size to 1MB 131 conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024); 132 conf.setInt(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, 100); 133 conf.setLong(HConstants.COMPACTION_SCANNER_SIZE_MAX, 10L); 134 conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, 135 NoLimitThroughputController.class.getName()); 136 compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3); 137 138 secondRowBytes = START_KEY_BYTES.clone(); 139 // Increment the least significant character so we get to next row. 140 secondRowBytes[START_KEY_BYTES.length - 1]++; 141 thirdRowBytes = START_KEY_BYTES.clone(); 142 thirdRowBytes[START_KEY_BYTES.length - 1] = 143 (byte) (thirdRowBytes[START_KEY_BYTES.length - 1] + 2); 144 } 145 146 @BeforeEach 147 public void setUp(TestInfo testInfo) throws Exception { 148 this.name = testInfo.getTestMethod().get().getName(); 149 TableDescriptorBuilder builder = UTIL.createModifyableTableDescriptor(name); 150 if (name.equals("testCompactionSeqId")) { 151 UTIL.getConfiguration().set("hbase.hstore.compaction.kv.max", "10"); 152 UTIL.getConfiguration().set(DefaultStoreEngine.DEFAULT_COMPACTOR_CLASS_KEY, 153 DummyCompactor.class.getName()); 154 ColumnFamilyDescriptor familyDescriptor = 155 ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setMaxVersions(65536).build(); 156 builder.setColumnFamily(familyDescriptor); 157 } 158 if ( 159 name.equals("testCompactionWithCorruptBlock") 160 || name.equals("generateHFileForCorruptBlockTest") 161 ) { 162 UTIL.getConfiguration().setBoolean("hbase.hstore.validate.read_fully", true); 163 ColumnFamilyDescriptor familyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder(FAMILY) 164 .setCompressionType(Compression.Algorithm.GZ).build(); 165 builder.setColumnFamily(familyDescriptor); 166 } 167 this.tableDescriptor = builder.build(); 168 this.r = UTIL.createLocalHRegion(tableDescriptor, null, null); 169 } 170 171 @AfterEach 172 public void tearDown() throws Exception { 173 WAL wal = r.getWAL(); 174 this.r.close(); 175 wal.close(); 176 } 177 178 /** 179 * Verify that you can stop a long-running compaction (used during RS shutdown) 180 */ 181 @Test 182 public void testInterruptCompactionBySize() throws Exception { 183 assertEquals(0, count()); 184 185 // lower the polling interval for this test 186 conf.setInt(SIZE_LIMIT_KEY, 10 * 1000 /* 10 KB */); 187 188 try { 189 // Create a couple store files w/ 15KB (over 10KB interval) 190 int jmax = (int) Math.ceil(15.0 / compactionThreshold); 191 byte[] pad = new byte[1000]; // 1 KB chunk 192 for (int i = 0; i < compactionThreshold; i++) { 193 Table loader = new RegionAsTable(r); 194 Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i))); 195 p.setDurability(Durability.SKIP_WAL); 196 for (int j = 0; j < jmax; j++) { 197 p.addColumn(COLUMN_FAMILY, Bytes.toBytes(j), pad); 198 } 199 HTestConst.addContent(loader, Bytes.toString(COLUMN_FAMILY)); 200 loader.put(p); 201 r.flush(true); 202 } 203 204 HRegion spyR = spy(r); 205 doAnswer(new Answer<Object>() { 206 @Override 207 public Object answer(InvocationOnMock invocation) throws Throwable { 208 r.writestate.writesEnabled = false; 209 return invocation.callRealMethod(); 210 } 211 }).when(spyR).doRegionCompactionPrep(); 212 213 // force a minor compaction, but not before requesting a stop 214 spyR.compactStores(); 215 216 // ensure that the compaction stopped, all old files are intact, 217 HStore s = r.getStore(COLUMN_FAMILY); 218 assertEquals(compactionThreshold, s.getStorefilesCount()); 219 assertTrue(s.getStorefilesSize() > 15 * 1000); 220 // and no new store files persisted past compactStores() 221 // only one empty dir exists in temp dir 222 FileStatus[] ls = r.getFilesystem().listStatus(r.getRegionFileSystem().getTempDir()); 223 assertEquals(1, ls.length); 224 Path storeTempDir = 225 new Path(r.getRegionFileSystem().getTempDir(), Bytes.toString(COLUMN_FAMILY)); 226 assertTrue(r.getFilesystem().exists(storeTempDir)); 227 ls = r.getFilesystem().listStatus(storeTempDir); 228 assertEquals(0, ls.length); 229 } finally { 230 // don't mess up future tests 231 r.writestate.writesEnabled = true; 232 conf.setInt(SIZE_LIMIT_KEY, 10 * 1000 * 1000 /* 10 MB */); 233 234 // Delete all Store information once done using 235 for (int i = 0; i < compactionThreshold; i++) { 236 Delete delete = new Delete(Bytes.add(STARTROW, Bytes.toBytes(i))); 237 byte[][] famAndQf = { COLUMN_FAMILY, null }; 238 delete.addFamily(famAndQf[0]); 239 r.delete(delete); 240 } 241 r.flush(true); 242 243 // Multiple versions allowed for an entry, so the delete isn't enough 244 // Lower TTL and expire to ensure that all our entries have been wiped 245 final int ttl = 1000; 246 for (HStore store : this.r.stores.values()) { 247 ScanInfo old = store.getScanInfo(); 248 ScanInfo si = old.customize(old.getMaxVersions(), ttl, old.getKeepDeletedCells()); 249 store.setScanInfo(si); 250 } 251 Thread.sleep(ttl); 252 253 r.compact(true); 254 assertEquals(0, count()); 255 } 256 } 257 258 @Test 259 public void testInterruptCompactionByTime() throws Exception { 260 assertEquals(0, count()); 261 262 // lower the polling interval for this test 263 conf.setLong(TIME_LIMIT_KEY, 1 /* 1ms */); 264 265 try { 266 // Create a couple store files w/ 15KB (over 10KB interval) 267 int jmax = (int) Math.ceil(15.0 / compactionThreshold); 268 byte[] pad = new byte[1000]; // 1 KB chunk 269 for (int i = 0; i < compactionThreshold; i++) { 270 Table loader = new RegionAsTable(r); 271 Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i))); 272 p.setDurability(Durability.SKIP_WAL); 273 for (int j = 0; j < jmax; j++) { 274 p.addColumn(COLUMN_FAMILY, Bytes.toBytes(j), pad); 275 } 276 HTestConst.addContent(loader, Bytes.toString(COLUMN_FAMILY)); 277 loader.put(p); 278 r.flush(true); 279 } 280 281 HRegion spyR = spy(r); 282 doAnswer(new Answer<Object>() { 283 @Override 284 public Object answer(InvocationOnMock invocation) throws Throwable { 285 r.writestate.writesEnabled = false; 286 return invocation.callRealMethod(); 287 } 288 }).when(spyR).doRegionCompactionPrep(); 289 290 // force a minor compaction, but not before requesting a stop 291 spyR.compactStores(); 292 293 // ensure that the compaction stopped, all old files are intact, 294 HStore s = r.getStore(COLUMN_FAMILY); 295 assertEquals(compactionThreshold, s.getStorefilesCount()); 296 assertTrue(s.getStorefilesSize() > 15 * 1000); 297 // and no new store files persisted past compactStores() 298 // only one empty dir exists in temp dir 299 FileStatus[] ls = r.getFilesystem().listStatus(r.getRegionFileSystem().getTempDir()); 300 assertEquals(1, ls.length); 301 Path storeTempDir = 302 new Path(r.getRegionFileSystem().getTempDir(), Bytes.toString(COLUMN_FAMILY)); 303 assertTrue(r.getFilesystem().exists(storeTempDir)); 304 ls = r.getFilesystem().listStatus(storeTempDir); 305 assertEquals(0, ls.length); 306 } finally { 307 // don't mess up future tests 308 r.writestate.writesEnabled = true; 309 conf.setLong(TIME_LIMIT_KEY, 10 * 1000L /* 10 s */); 310 311 // Delete all Store information once done using 312 for (int i = 0; i < compactionThreshold; i++) { 313 Delete delete = new Delete(Bytes.add(STARTROW, Bytes.toBytes(i))); 314 byte[][] famAndQf = { COLUMN_FAMILY, null }; 315 delete.addFamily(famAndQf[0]); 316 r.delete(delete); 317 } 318 r.flush(true); 319 320 // Multiple versions allowed for an entry, so the delete isn't enough 321 // Lower TTL and expire to ensure that all our entries have been wiped 322 final int ttl = 1000; 323 for (HStore store : this.r.stores.values()) { 324 ScanInfo old = store.getScanInfo(); 325 ScanInfo si = old.customize(old.getMaxVersions(), ttl, old.getKeepDeletedCells()); 326 store.setScanInfo(si); 327 } 328 Thread.sleep(ttl); 329 330 r.compact(true); 331 assertEquals(0, count()); 332 } 333 } 334 335 private int count() throws IOException { 336 int count = 0; 337 for (HStoreFile f : this.r.stores.get(COLUMN_FAMILY_TEXT).getStorefiles()) { 338 f.initReader(); 339 try (StoreFileScanner scanner = f.getPreadScanner(false, Long.MAX_VALUE, 0, false)) { 340 scanner.seek(KeyValue.LOWESTKEY); 341 while (scanner.next() != null) { 342 count++; 343 } 344 } 345 } 346 return count; 347 } 348 349 private void createStoreFile(final HRegion region) throws IOException { 350 createStoreFile(region, Bytes.toString(COLUMN_FAMILY)); 351 } 352 353 private void createStoreFile(final HRegion region, String family) throws IOException { 354 Table loader = new RegionAsTable(region); 355 HTestConst.addContent(loader, family); 356 region.flush(true); 357 } 358 359 @Test 360 public void testCompactionWithCorruptResult() throws Exception { 361 int nfiles = 10; 362 for (int i = 0; i < nfiles; i++) { 363 createStoreFile(r); 364 } 365 HStore store = r.getStore(COLUMN_FAMILY); 366 367 Collection<HStoreFile> storeFiles = store.getStorefiles(); 368 DefaultCompactor tool = (DefaultCompactor) store.storeEngine.getCompactor(); 369 CompactionRequestImpl request = new CompactionRequestImpl(storeFiles); 370 tool.compact(request, NoLimitThroughputController.INSTANCE, null); 371 372 // Now lets corrupt the compacted file. 373 FileSystem fs = store.getFileSystem(); 374 // default compaction policy created one and only one new compacted file 375 Path tmpPath = store.getRegionFileSystem().createTempName(); 376 try (FSDataOutputStream stream = fs.create(tmpPath, null, true, 512, (short) 3, 1024L, null)) { 377 stream.writeChars("CORRUPT FILE!!!!"); 378 } 379 380 // The complete compaction should fail and the corrupt file should remain 381 // in the 'tmp' directory; 382 assertThrows(IOException.class, () -> store.doCompaction(null, null, null, 383 EnvironmentEdgeManager.currentTime(), Collections.singletonList(tmpPath))); 384 assertTrue(fs.exists(tmpPath)); 385 } 386 387 /** 388 * Generates the HFile used by {@link #testCompactionWithCorruptBlock()}. Run this method to 389 * regenerate the test resource file after changes to the HFile format. The output file must then 390 * be hand-edited to corrupt the first data block (zero out the GZip magic bytes at offset 33) 391 * before being placed into the test resources directory. 392 */ 393 @Disabled("Not a test; utility for regenerating testCompactionWithCorruptBlock resource file") 394 @Test 395 public void generateHFileForCorruptBlockTest() throws Exception { 396 createStoreFile(r, Bytes.toString(FAMILY)); 397 createStoreFile(r, Bytes.toString(FAMILY)); 398 HStore store = r.getStore(FAMILY); 399 400 Collection<HStoreFile> storeFiles = store.getStorefiles(); 401 DefaultCompactor tool = (DefaultCompactor) store.storeEngine.getCompactor(); 402 CompactionRequestImpl request = new CompactionRequestImpl(storeFiles); 403 List<Path> paths = tool.compact(request, NoLimitThroughputController.INSTANCE, null); 404 405 FileSystem fs = store.getFileSystem(); 406 Path hfilePath = paths.get(0); 407 File outFile = new File("/tmp/TestCompaction_HFileWithCorruptBlock.gz"); 408 try (InputStream in = fs.open(hfilePath); 409 GZIPOutputStream gzOut = new GZIPOutputStream(new FileOutputStream(outFile))) { 410 IOUtils.copyBytes(in, gzOut, 4096); 411 } 412 LoggerFactory.getLogger(TestCompaction.class) 413 .info("Wrote HFile to {}. Now hex-edit offset 33 (0x21): zero out bytes 1f 8b.", outFile); 414 } 415 416 /** 417 * This test uses a hand-modified HFile, which is loaded in from the resources' path. That file 418 * was generated from {@link #generateHFileForCorruptBlockTest()} and then edited to corrupt the 419 * GZ-encoded block by zeroing-out the first two bytes of the GZip header, the "standard 420 * declaration" of {@code 1f 8b}, found at offset 33 in the file. I'm not sure why, but it seems 421 * that in this test context we do not enforce CRC checksums. Thus, this corruption manifests in 422 * the Decompressor rather than in the reader when it loads the block bytes and compares vs. the 423 * header. 424 */ 425 @Test 426 public void testCompactionWithCorruptBlock() throws Exception { 427 createStoreFile(r, Bytes.toString(FAMILY)); 428 createStoreFile(r, Bytes.toString(FAMILY)); 429 HStore store = r.getStore(FAMILY); 430 431 Collection<HStoreFile> storeFiles = store.getStorefiles(); 432 DefaultCompactor tool = (DefaultCompactor) store.storeEngine.getCompactor(); 433 CompactionRequestImpl request = new CompactionRequestImpl(storeFiles); 434 tool.compact(request, NoLimitThroughputController.INSTANCE, null); 435 436 // insert the hfile with a corrupted data block into the region's tmp directory, where 437 // compaction output is collected. 438 FileSystem fs = store.getFileSystem(); 439 Path tmpPath = store.getRegionFileSystem().createTempName(); 440 try ( 441 InputStream inputStream = 442 getClass().getResourceAsStream("TestCompaction_HFileWithCorruptBlock.gz"); 443 GZIPInputStream gzipInputStream = new GZIPInputStream(Objects.requireNonNull(inputStream)); 444 OutputStream outputStream = fs.create(tmpPath, null, true, 512, (short) 3, 1024L, null)) { 445 assertThat(gzipInputStream, notNullValue()); 446 assertThat(outputStream, notNullValue()); 447 IOUtils.copyBytes(gzipInputStream, outputStream, 512); 448 } 449 LoggerFactory.getLogger(TestCompaction.class).info("Wrote corrupted HFile to {}", tmpPath); 450 451 // The complete compaction should fail and the corrupt file should remain 452 // in the 'tmp' directory; 453 try { 454 store.doCompaction(request, storeFiles, null, EnvironmentEdgeManager.currentTime(), 455 Collections.singletonList(tmpPath)); 456 } catch (IOException e) { 457 Throwable rootCause = e; 458 while (rootCause.getCause() != null) { 459 rootCause = rootCause.getCause(); 460 } 461 assertThat(rootCause, allOf(instanceOf(IOException.class), 462 hasProperty("message", containsString("not a gzip file")))); 463 assertTrue(fs.exists(tmpPath)); 464 return; 465 } 466 fail("Compaction should have failed due to corrupt block"); 467 } 468 469 /** 470 * Create a custom compaction request and be sure that we can track it through the queue, knowing 471 * when the compaction is completed. 472 */ 473 @Test 474 public void testTrackingCompactionRequest() throws Exception { 475 // setup a compact/split thread on a mock server 476 HRegionServer mockServer = Mockito.mock(HRegionServer.class); 477 Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf()); 478 CompactSplit thread = new CompactSplit(mockServer); 479 Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread); 480 481 // setup a region/store with some files 482 HStore store = r.getStore(COLUMN_FAMILY); 483 createStoreFile(r); 484 for (int i = 0; i < MAX_FILES_TO_COMPACT + 1; i++) { 485 createStoreFile(r); 486 } 487 488 CountDownLatch latch = new CountDownLatch(1); 489 Tracker tracker = new Tracker(latch); 490 thread.requestCompaction(r, store, "test custom comapction", PRIORITY_USER, tracker, null); 491 // wait for the latch to complete. 492 latch.await(); 493 494 thread.interruptIfNecessary(); 495 } 496 497 @Test 498 public void testCompactionFailure() throws Exception { 499 // setup a compact/split thread on a mock server 500 HRegionServer mockServer = Mockito.mock(HRegionServer.class); 501 Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf()); 502 CompactSplit thread = new CompactSplit(mockServer); 503 Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread); 504 505 // setup a region/store with some files 506 HStore store = r.getStore(COLUMN_FAMILY); 507 createStoreFile(r); 508 for (int i = 0; i < HStore.DEFAULT_BLOCKING_STOREFILE_COUNT - 1; i++) { 509 createStoreFile(r); 510 } 511 512 HRegion mockRegion = Mockito.spy(r); 513 Mockito.when(mockRegion.checkSplit()) 514 .thenThrow(new RuntimeException("Thrown intentionally by test!")); 515 516 try (MetricsRegionWrapperImpl metricsWrapper = new MetricsRegionWrapperImpl(r)) { 517 518 long preCompletedCount = metricsWrapper.getNumCompactionsCompleted(); 519 long preFailedCount = metricsWrapper.getNumCompactionsFailed(); 520 521 CountDownLatch latch = new CountDownLatch(1); 522 Tracker tracker = new Tracker(latch); 523 thread.requestCompaction(mockRegion, store, "test custom comapction", PRIORITY_USER, tracker, 524 null); 525 // wait for the latch to complete. 526 latch.await(120, TimeUnit.SECONDS); 527 528 // compaction should have completed and been marked as failed due to error in split request 529 long postCompletedCount = metricsWrapper.getNumCompactionsCompleted(); 530 long postFailedCount = metricsWrapper.getNumCompactionsFailed(); 531 532 assertTrue(postCompletedCount > preCompletedCount, 533 "Completed count should have increased (pre=" + preCompletedCount + ", post=" 534 + postCompletedCount + ")"); 535 assertTrue(postFailedCount > preFailedCount, "Failed count should have increased (pre=" 536 + preFailedCount + ", post=" + postFailedCount + ")"); 537 } 538 } 539 540 /** 541 * Test no new Compaction requests are generated after calling stop compactions 542 */ 543 @Test 544 public void testStopStartCompaction() throws IOException { 545 // setup a compact/split thread on a mock server 546 HRegionServer mockServer = Mockito.mock(HRegionServer.class); 547 Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf()); 548 final CompactSplit thread = new CompactSplit(mockServer); 549 Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread); 550 // setup a region/store with some files 551 HStore store = r.getStore(COLUMN_FAMILY); 552 createStoreFile(r); 553 for (int i = 0; i < HStore.DEFAULT_BLOCKING_STOREFILE_COUNT - 1; i++) { 554 createStoreFile(r); 555 } 556 thread.switchCompaction(false); 557 thread.requestCompaction(r, store, "test", Store.PRIORITY_USER, 558 CompactionLifeCycleTracker.DUMMY, null); 559 assertFalse(thread.isCompactionsEnabled()); 560 int longCompactions = thread.getLongCompactions().getActiveCount(); 561 int shortCompactions = thread.getShortCompactions().getActiveCount(); 562 assertEquals(0, longCompactions + shortCompactions, 563 "longCompactions=" + longCompactions + "," + "shortCompactions=" + shortCompactions); 564 thread.switchCompaction(true); 565 assertTrue(thread.isCompactionsEnabled()); 566 // Make sure no compactions have run. 567 assertEquals(0, thread.getLongCompactions().getCompletedTaskCount() 568 + thread.getShortCompactions().getCompletedTaskCount()); 569 // Request a compaction and make sure it is submitted successfully. 570 thread.requestCompaction(r, store, "test", Store.PRIORITY_USER, 571 CompactionLifeCycleTracker.DUMMY, null); 572 // Wait until the compaction finishes. 573 Waiter.waitFor(UTIL.getConfiguration(), 5000, 574 (Waiter.Predicate<Exception>) () -> thread.getLongCompactions().getCompletedTaskCount() 575 + thread.getShortCompactions().getCompletedTaskCount() == 1); 576 // Make sure there are no compactions running. 577 assertEquals(0, 578 thread.getLongCompactions().getActiveCount() + thread.getShortCompactions().getActiveCount()); 579 } 580 581 @Test 582 public void testInterruptingRunningCompactions() throws Exception { 583 // setup a compact/split thread on a mock server 584 conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, 585 WaitThroughPutController.class.getName()); 586 HRegionServer mockServer = Mockito.mock(HRegionServer.class); 587 Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf()); 588 CompactSplit thread = new CompactSplit(mockServer); 589 590 Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread); 591 592 // setup a region/store with some files 593 HStore store = r.getStore(COLUMN_FAMILY); 594 int jmax = (int) Math.ceil(15.0 / compactionThreshold); 595 byte[] pad = new byte[1000]; // 1 KB chunk 596 for (int i = 0; i < compactionThreshold; i++) { 597 Table loader = new RegionAsTable(r); 598 Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i))); 599 p.setDurability(Durability.SKIP_WAL); 600 for (int j = 0; j < jmax; j++) { 601 p.addColumn(COLUMN_FAMILY, Bytes.toBytes(j), pad); 602 } 603 HTestConst.addContent(loader, Bytes.toString(COLUMN_FAMILY)); 604 loader.put(p); 605 r.flush(true); 606 } 607 HStore s = r.getStore(COLUMN_FAMILY); 608 int initialFiles = s.getStorefilesCount(); 609 610 thread.requestCompaction(r, store, "test custom comapction", PRIORITY_USER, 611 CompactionLifeCycleTracker.DUMMY, null); 612 613 Thread.sleep(3000); 614 thread.switchCompaction(false); 615 assertEquals(initialFiles, s.getStorefilesCount()); 616 // don't mess up future tests 617 thread.switchCompaction(true); 618 } 619 620 /** 621 * HBASE-7947: Regression test to ensure adding to the correct list in the {@link CompactSplit} 622 * @throws Exception on failure 623 */ 624 @Test 625 public void testMultipleCustomCompactionRequests() throws Exception { 626 // setup a compact/split thread on a mock server 627 HRegionServer mockServer = Mockito.mock(HRegionServer.class); 628 Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf()); 629 CompactSplit thread = new CompactSplit(mockServer); 630 Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread); 631 632 // setup a region/store with some files 633 int numStores = r.getStores().size(); 634 CountDownLatch latch = new CountDownLatch(numStores); 635 Tracker tracker = new Tracker(latch); 636 // create some store files and setup requests for each store on which we want to do a 637 // compaction 638 for (HStore store : r.getStores()) { 639 createStoreFile(r, store.getColumnFamilyName()); 640 createStoreFile(r, store.getColumnFamilyName()); 641 createStoreFile(r, store.getColumnFamilyName()); 642 thread.requestCompaction(r, store, "test mulitple custom comapctions", PRIORITY_USER, tracker, 643 null); 644 } 645 // wait for the latch to complete. 646 latch.await(); 647 648 thread.interruptIfNecessary(); 649 } 650 651 class StoreMockMaker extends StatefulStoreMockMaker { 652 public ArrayList<HStoreFile> compacting = new ArrayList<>(); 653 public ArrayList<HStoreFile> notCompacting = new ArrayList<>(); 654 private final ArrayList<Integer> results; 655 656 public StoreMockMaker(ArrayList<Integer> results) { 657 this.results = results; 658 } 659 660 public class TestCompactionContext extends CompactionContext { 661 662 private List<HStoreFile> selectedFiles; 663 664 public TestCompactionContext(List<HStoreFile> selectedFiles) { 665 super(); 666 this.selectedFiles = selectedFiles; 667 } 668 669 @Override 670 public List<HStoreFile> preSelect(List<HStoreFile> filesCompacting) { 671 return new ArrayList<>(); 672 } 673 674 @Override 675 public boolean select(List<HStoreFile> filesCompacting, boolean isUserCompaction, 676 boolean mayUseOffPeak, boolean forceMajor) throws IOException { 677 this.request = new CompactionRequestImpl(selectedFiles); 678 this.request.setPriority(getPriority()); 679 return true; 680 } 681 682 @Override 683 public List<Path> compact(ThroughputController throughputController, User user) 684 throws IOException { 685 finishCompaction(this.selectedFiles); 686 return new ArrayList<>(); 687 } 688 } 689 690 @Override 691 public synchronized Optional<CompactionContext> selectCompaction() { 692 CompactionContext ctx = new TestCompactionContext(new ArrayList<>(notCompacting)); 693 compacting.addAll(notCompacting); 694 notCompacting.clear(); 695 try { 696 ctx.select(null, false, false, false); 697 } catch (IOException ex) { 698 fail("Shouldn't happen"); 699 } 700 return Optional.of(ctx); 701 } 702 703 @Override 704 public synchronized void cancelCompaction(Object object) { 705 TestCompactionContext ctx = (TestCompactionContext) object; 706 compacting.removeAll(ctx.selectedFiles); 707 notCompacting.addAll(ctx.selectedFiles); 708 } 709 710 public synchronized void finishCompaction(List<HStoreFile> sfs) { 711 if (sfs.isEmpty()) return; 712 synchronized (results) { 713 results.add(sfs.size()); 714 } 715 compacting.removeAll(sfs); 716 } 717 718 @Override 719 public int getPriority() { 720 return 7 - compacting.size() - notCompacting.size(); 721 } 722 } 723 724 public class BlockingStoreMockMaker extends StatefulStoreMockMaker { 725 BlockingCompactionContext blocked = null; 726 727 public class BlockingCompactionContext extends CompactionContext { 728 public volatile boolean isInCompact = false; 729 730 public void unblock() { 731 synchronized (this) { 732 this.notifyAll(); 733 } 734 } 735 736 @Override 737 public List<Path> compact(ThroughputController throughputController, User user) 738 throws IOException { 739 try { 740 isInCompact = true; 741 synchronized (this) { 742 this.wait(); 743 } 744 } catch (InterruptedException e) { 745 Assumptions.abort(e.getMessage()); 746 } 747 return new ArrayList<>(); 748 } 749 750 @Override 751 public List<HStoreFile> preSelect(List<HStoreFile> filesCompacting) { 752 return new ArrayList<>(); 753 } 754 755 @Override 756 public boolean select(List<HStoreFile> f, boolean i, boolean m, boolean e) 757 throws IOException { 758 this.request = new CompactionRequestImpl(new ArrayList<>()); 759 return true; 760 } 761 } 762 763 @Override 764 public Optional<CompactionContext> selectCompaction() { 765 this.blocked = new BlockingCompactionContext(); 766 try { 767 this.blocked.select(null, false, false, false); 768 } catch (IOException ex) { 769 fail("Shouldn't happen"); 770 } 771 return Optional.of(blocked); 772 } 773 774 @Override 775 public void cancelCompaction(Object object) { 776 } 777 778 @Override 779 public int getPriority() { 780 return Integer.MIN_VALUE; // some invalid value, see createStoreMock 781 } 782 783 public BlockingCompactionContext waitForBlocking() { 784 while (this.blocked == null || !this.blocked.isInCompact) { 785 Threads.sleepWithoutInterrupt(50); 786 } 787 BlockingCompactionContext ctx = this.blocked; 788 this.blocked = null; 789 return ctx; 790 } 791 792 @Override 793 public HStore createStoreMock(String name) throws Exception { 794 return createStoreMock(Integer.MIN_VALUE, name); 795 } 796 797 public HStore createStoreMock(int priority, String name) throws Exception { 798 // Override the mock to always return the specified priority. 799 HStore s = super.createStoreMock(name); 800 when(s.getCompactPriority()).thenReturn(priority); 801 return s; 802 } 803 } 804 805 /** Test compaction priority management and multiple compactions per store (HBASE-8665). */ 806 @Test 807 public void testCompactionQueuePriorities() throws Exception { 808 // Setup a compact/split thread on a mock server. 809 final Configuration conf = HBaseConfiguration.create(); 810 HRegionServer mockServer = mock(HRegionServer.class); 811 when(mockServer.isStopped()).thenReturn(false); 812 when(mockServer.getConfiguration()).thenReturn(conf); 813 when(mockServer.getChoreService()).thenReturn(new ChoreService("test")); 814 CompactSplit cst = new CompactSplit(mockServer); 815 when(mockServer.getCompactSplitThread()).thenReturn(cst); 816 // prevent large compaction thread pool stealing job from small compaction queue. 817 cst.shutdownLongCompactions(); 818 // Set up the region mock that redirects compactions. 819 HRegion r = mock(HRegion.class); 820 when(r.compact(any(), any(), any(), any())).then(new Answer<Boolean>() { 821 @Override 822 public Boolean answer(InvocationOnMock invocation) throws Throwable { 823 invocation.<CompactionContext> getArgument(0).compact(invocation.getArgument(2), null); 824 return true; 825 } 826 }); 827 828 // Set up store mocks for 2 "real" stores and the one we use for blocking CST. 829 ArrayList<Integer> results = new ArrayList<>(); 830 StoreMockMaker sm = new StoreMockMaker(results), sm2 = new StoreMockMaker(results); 831 HStore store = sm.createStoreMock("store1"); 832 HStore store2 = sm2.createStoreMock("store2"); 833 BlockingStoreMockMaker blocker = new BlockingStoreMockMaker(); 834 835 // First, block the compaction thread so that we could muck with queue. 836 cst.requestSystemCompaction(r, blocker.createStoreMock(1, "b-pri1"), "b-pri1"); 837 BlockingStoreMockMaker.BlockingCompactionContext currentBlock = blocker.waitForBlocking(); 838 839 // Add 4 files to store1, 3 to store2, and queue compactions; pri 3 and 4 respectively. 840 for (int i = 0; i < 4; ++i) { 841 sm.notCompacting.add(createFile()); 842 } 843 cst.requestSystemCompaction(r, store, "s1-pri3"); 844 for (int i = 0; i < 3; ++i) { 845 sm2.notCompacting.add(createFile()); 846 } 847 cst.requestSystemCompaction(r, store2, "s2-pri4"); 848 // Now add 2 more files to store1 and queue compaction - pri 1. 849 for (int i = 0; i < 2; ++i) { 850 sm.notCompacting.add(createFile()); 851 } 852 cst.requestSystemCompaction(r, store, "s1-pri1"); 853 // Finally add blocking compaction with priority 2. 854 cst.requestSystemCompaction(r, blocker.createStoreMock(2, "b-pri2"), "b-pri2"); 855 856 // Unblock the blocking compaction; we should run pri1 and become block again in pri2. 857 currentBlock.unblock(); 858 currentBlock = blocker.waitForBlocking(); 859 // Pri1 should have "compacted" all 6 files. 860 assertEquals(1, results.size()); 861 assertEquals(6, results.get(0).intValue()); 862 // Add 2 files to store 1 (it has 2 files now). 863 for (int i = 0; i < 2; ++i) { 864 sm.notCompacting.add(createFile()); 865 } 866 // Now we have pri4 for store 2 in queue, and pri3 for store1; store1's current priority 867 // is 5, however, so it must not preempt store 2. Add blocking compaction at the end. 868 cst.requestSystemCompaction(r, blocker.createStoreMock(7, "b-pri7"), "b-pri7"); 869 currentBlock.unblock(); 870 currentBlock = blocker.waitForBlocking(); 871 assertEquals(3, results.size()); 872 assertEquals(3, results.get(1).intValue()); // 3 files should go before 2 files. 873 assertEquals(2, results.get(2).intValue()); 874 875 currentBlock.unblock(); 876 cst.interruptIfNecessary(); 877 } 878 879 /** 880 * Firstly write 10 cells (with different time stamp) to a qualifier and flush to hfile1, then 881 * write 10 cells (with different time stamp) to the same qualifier and flush to hfile2. The 882 * latest cell (cell-A) in hfile1 and the oldest cell (cell-B) in hfile2 are with the same time 883 * stamp but different sequence id, and will get scanned successively during compaction. 884 * <p/> 885 * We set compaction.kv.max to 10 so compaction will scan 10 versions each round, meanwhile we set 886 * keepSeqIdPeriod=0 in {@link DummyCompactor} so all 10 versions of hfile2 will be written out 887 * with seqId cleaned (set to 0) including cell-B, then when scanner goes to cell-A it will cause 888 * a scan out-of-order assertion error before HBASE-16931 if error occurs during the test 889 */ 890 @Test 891 public void testCompactionSeqId() throws Exception { 892 final byte[] ROW = Bytes.toBytes("row"); 893 final byte[] QUALIFIER = Bytes.toBytes("qualifier"); 894 895 long timestamp = 10000; 896 897 // row1/cf:a/10009/Put/vlen=2/seqid=11 V: v9 898 // row1/cf:a/10008/Put/vlen=2/seqid=10 V: v8 899 // row1/cf:a/10007/Put/vlen=2/seqid=9 V: v7 900 // row1/cf:a/10006/Put/vlen=2/seqid=8 V: v6 901 // row1/cf:a/10005/Put/vlen=2/seqid=7 V: v5 902 // row1/cf:a/10004/Put/vlen=2/seqid=6 V: v4 903 // row1/cf:a/10003/Put/vlen=2/seqid=5 V: v3 904 // row1/cf:a/10002/Put/vlen=2/seqid=4 V: v2 905 // row1/cf:a/10001/Put/vlen=2/seqid=3 V: v1 906 // row1/cf:a/10000/Put/vlen=2/seqid=2 V: v0 907 for (int i = 0; i < 10; i++) { 908 Put put = new Put(ROW); 909 put.addColumn(FAMILY, QUALIFIER, timestamp + i, Bytes.toBytes("v" + i)); 910 r.put(put); 911 } 912 r.flush(true); 913 914 // row1/cf:a/10018/Put/vlen=3/seqid=16 V: v18 915 // row1/cf:a/10017/Put/vlen=3/seqid=17 V: v17 916 // row1/cf:a/10016/Put/vlen=3/seqid=18 V: v16 917 // row1/cf:a/10015/Put/vlen=3/seqid=19 V: v15 918 // row1/cf:a/10014/Put/vlen=3/seqid=20 V: v14 919 // row1/cf:a/10013/Put/vlen=3/seqid=21 V: v13 920 // row1/cf:a/10012/Put/vlen=3/seqid=22 V: v12 921 // row1/cf:a/10011/Put/vlen=3/seqid=23 V: v11 922 // row1/cf:a/10010/Put/vlen=3/seqid=24 V: v10 923 // row1/cf:a/10009/Put/vlen=2/seqid=25 V: v9 924 for (int i = 18; i > 8; i--) { 925 Put put = new Put(ROW); 926 put.addColumn(FAMILY, QUALIFIER, timestamp + i, Bytes.toBytes("v" + i)); 927 r.put(put); 928 } 929 r.flush(true); 930 r.compact(true); 931 } 932 933 public static class DummyCompactor extends DefaultCompactor { 934 public DummyCompactor(Configuration conf, HStore store) { 935 super(conf, store); 936 this.keepSeqIdPeriod = 0; 937 } 938 } 939 940 private static HStoreFile createFile() throws Exception { 941 HStoreFile sf = mock(HStoreFile.class); 942 when(sf.getPath()).thenReturn(new Path("file")); 943 StoreFileReader r = mock(StoreFileReader.class); 944 when(r.length()).thenReturn(10L); 945 when(sf.getReader()).thenReturn(r); 946 return sf; 947 } 948 949 /** 950 * Simple {@link CompactionLifeCycleTracker} on which you can wait until the requested compaction 951 * finishes. 952 */ 953 public static class Tracker implements CompactionLifeCycleTracker { 954 955 private final CountDownLatch done; 956 957 public Tracker(CountDownLatch done) { 958 this.done = done; 959 } 960 961 @Override 962 public void afterExecution(Store store) { 963 done.countDown(); 964 } 965 } 966 967 /** 968 * Simple {@link CompactionLifeCycleTracker} on which you can wait until the requested compaction 969 * finishes. 970 */ 971 public static class WaitThroughPutController extends NoLimitThroughputController { 972 973 public WaitThroughPutController() { 974 } 975 976 @Override 977 public long control(String compactionName, long size) throws InterruptedException { 978 Thread.sleep(6000000); 979 return 6000000; 980 } 981 } 982}