001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.HBaseTestingUtil.START_KEY; 021import static org.apache.hadoop.hbase.HBaseTestingUtil.START_KEY_BYTES; 022import static org.apache.hadoop.hbase.HBaseTestingUtil.fam1; 023import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER; 024import static org.apache.hadoop.hbase.regionserver.compactions.CloseChecker.SIZE_LIMIT_KEY; 025import static org.apache.hadoop.hbase.regionserver.compactions.CloseChecker.TIME_LIMIT_KEY; 026import static org.hamcrest.MatcherAssert.assertThat; 027import static org.hamcrest.Matchers.allOf; 028import static org.hamcrest.Matchers.containsString; 029import static org.hamcrest.Matchers.hasProperty; 030import static org.hamcrest.Matchers.instanceOf; 031import static org.hamcrest.Matchers.notNullValue; 032import static org.junit.Assert.assertEquals; 033import static org.junit.Assert.assertFalse; 034import static org.junit.Assert.assertThrows; 035import static org.junit.Assert.assertTrue; 036import static org.junit.Assert.fail; 037import static org.mockito.ArgumentMatchers.any; 038import static org.mockito.Mockito.doAnswer; 039import static org.mockito.Mockito.mock; 040import static org.mockito.Mockito.spy; 041import static org.mockito.Mockito.when; 042 043import java.io.File; 044import java.io.FileOutputStream; 045import java.io.IOException; 046import java.io.InputStream; 047import java.io.OutputStream; 048import java.util.ArrayList; 049import java.util.Collection; 050import java.util.Collections; 051import java.util.List; 052import java.util.Objects; 053import java.util.Optional; 054import java.util.concurrent.CountDownLatch; 055import java.util.concurrent.TimeUnit; 056import java.util.zip.GZIPInputStream; 057import java.util.zip.GZIPOutputStream; 058import org.apache.hadoop.conf.Configuration; 059import org.apache.hadoop.fs.FSDataOutputStream; 060import org.apache.hadoop.fs.FileStatus; 061import org.apache.hadoop.fs.FileSystem; 062import org.apache.hadoop.fs.Path; 063import org.apache.hadoop.hbase.ChoreService; 064import org.apache.hadoop.hbase.HBaseClassTestRule; 065import org.apache.hadoop.hbase.HBaseConfiguration; 066import org.apache.hadoop.hbase.HBaseTestingUtil; 067import org.apache.hadoop.hbase.HConstants; 068import org.apache.hadoop.hbase.HTestConst; 069import org.apache.hadoop.hbase.KeyValue; 070import org.apache.hadoop.hbase.Waiter; 071import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 072import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 073import org.apache.hadoop.hbase.client.Delete; 074import org.apache.hadoop.hbase.client.Durability; 075import org.apache.hadoop.hbase.client.Put; 076import org.apache.hadoop.hbase.client.Table; 077import org.apache.hadoop.hbase.client.TableDescriptor; 078import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 079import org.apache.hadoop.hbase.io.compress.Compression; 080import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; 081import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 082import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; 083import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; 084import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory; 085import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; 086import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 087import org.apache.hadoop.hbase.security.User; 088import org.apache.hadoop.hbase.testclassification.LargeTests; 089import org.apache.hadoop.hbase.testclassification.RegionServerTests; 090import org.apache.hadoop.hbase.util.Bytes; 091import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 092import org.apache.hadoop.hbase.util.Threads; 093import org.apache.hadoop.hbase.wal.WAL; 094import org.apache.hadoop.io.IOUtils; 095import org.junit.After; 096import org.junit.Assume; 097import org.junit.Before; 098import org.junit.ClassRule; 099import org.junit.Ignore; 100import org.junit.Rule; 101import org.junit.Test; 102import org.junit.experimental.categories.Category; 103import org.junit.rules.TestName; 104import org.mockito.Mockito; 105import org.mockito.invocation.InvocationOnMock; 106import org.mockito.stubbing.Answer; 107import org.slf4j.LoggerFactory; 108 109/** 110 * Test compaction framework and common functions 111 */ 112@Category({ RegionServerTests.class, LargeTests.class }) 113public class TestCompaction { 114 115 @ClassRule 116 public static final HBaseClassTestRule CLASS_RULE = 117 HBaseClassTestRule.forClass(TestCompaction.class); 118 119 @Rule 120 public TestName name = new TestName(); 121 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 122 protected Configuration conf = UTIL.getConfiguration(); 123 124 private HRegion r = null; 125 private TableDescriptor tableDescriptor = null; 126 private static final byte[] COLUMN_FAMILY = fam1; 127 private final byte[] STARTROW = Bytes.toBytes(START_KEY); 128 private static final byte[] COLUMN_FAMILY_TEXT = COLUMN_FAMILY; 129 private int compactionThreshold; 130 private byte[] secondRowBytes, thirdRowBytes; 131 private static final long MAX_FILES_TO_COMPACT = 10; 132 private final byte[] FAMILY = Bytes.toBytes("cf"); 133 134 /** constructor */ 135 public TestCompaction() { 136 // Set cache flush size to 1MB 137 conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024); 138 conf.setInt(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, 100); 139 conf.setLong(HConstants.COMPACTION_SCANNER_SIZE_MAX, 10L); 140 conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, 141 NoLimitThroughputController.class.getName()); 142 compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3); 143 144 secondRowBytes = START_KEY_BYTES.clone(); 145 // Increment the least significant character so we get to next row. 146 secondRowBytes[START_KEY_BYTES.length - 1]++; 147 thirdRowBytes = START_KEY_BYTES.clone(); 148 thirdRowBytes[START_KEY_BYTES.length - 1] = 149 (byte) (thirdRowBytes[START_KEY_BYTES.length - 1] + 2); 150 } 151 152 @Before 153 public void setUp() throws Exception { 154 TableDescriptorBuilder builder = UTIL.createModifyableTableDescriptor(name.getMethodName()); 155 if (name.getMethodName().equals("testCompactionSeqId")) { 156 UTIL.getConfiguration().set("hbase.hstore.compaction.kv.max", "10"); 157 UTIL.getConfiguration().set(DefaultStoreEngine.DEFAULT_COMPACTOR_CLASS_KEY, 158 DummyCompactor.class.getName()); 159 ColumnFamilyDescriptor familyDescriptor = 160 ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setMaxVersions(65536).build(); 161 builder.setColumnFamily(familyDescriptor); 162 } 163 if ( 164 name.getMethodName().equals("testCompactionWithCorruptBlock") 165 || name.getMethodName().equals("generateHFileForCorruptBlockTest") 166 ) { 167 UTIL.getConfiguration().setBoolean("hbase.hstore.validate.read_fully", true); 168 ColumnFamilyDescriptor familyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder(FAMILY) 169 .setCompressionType(Compression.Algorithm.GZ).build(); 170 builder.setColumnFamily(familyDescriptor); 171 } 172 this.tableDescriptor = builder.build(); 173 this.r = UTIL.createLocalHRegion(tableDescriptor, null, null); 174 } 175 176 @After 177 public void tearDown() throws Exception { 178 WAL wal = r.getWAL(); 179 this.r.close(); 180 wal.close(); 181 } 182 183 /** 184 * Verify that you can stop a long-running compaction (used during RS shutdown) 185 */ 186 @Test 187 public void testInterruptCompactionBySize() throws Exception { 188 assertEquals(0, count()); 189 190 // lower the polling interval for this test 191 conf.setInt(SIZE_LIMIT_KEY, 10 * 1000 /* 10 KB */); 192 193 try { 194 // Create a couple store files w/ 15KB (over 10KB interval) 195 int jmax = (int) Math.ceil(15.0 / compactionThreshold); 196 byte[] pad = new byte[1000]; // 1 KB chunk 197 for (int i = 0; i < compactionThreshold; i++) { 198 Table loader = new RegionAsTable(r); 199 Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i))); 200 p.setDurability(Durability.SKIP_WAL); 201 for (int j = 0; j < jmax; j++) { 202 p.addColumn(COLUMN_FAMILY, Bytes.toBytes(j), pad); 203 } 204 HTestConst.addContent(loader, Bytes.toString(COLUMN_FAMILY)); 205 loader.put(p); 206 r.flush(true); 207 } 208 209 HRegion spyR = spy(r); 210 doAnswer(new Answer<Object>() { 211 @Override 212 public Object answer(InvocationOnMock invocation) throws Throwable { 213 r.writestate.writesEnabled = false; 214 return invocation.callRealMethod(); 215 } 216 }).when(spyR).doRegionCompactionPrep(); 217 218 // force a minor compaction, but not before requesting a stop 219 spyR.compactStores(); 220 221 // ensure that the compaction stopped, all old files are intact, 222 HStore s = r.getStore(COLUMN_FAMILY); 223 assertEquals(compactionThreshold, s.getStorefilesCount()); 224 assertTrue(s.getStorefilesSize() > 15 * 1000); 225 // and no new store files persisted past compactStores() 226 // only one empty dir exists in temp dir 227 FileStatus[] ls = r.getFilesystem().listStatus(r.getRegionFileSystem().getTempDir()); 228 assertEquals(1, ls.length); 229 Path storeTempDir = 230 new Path(r.getRegionFileSystem().getTempDir(), Bytes.toString(COLUMN_FAMILY)); 231 assertTrue(r.getFilesystem().exists(storeTempDir)); 232 ls = r.getFilesystem().listStatus(storeTempDir); 233 assertEquals(0, ls.length); 234 } finally { 235 // don't mess up future tests 236 r.writestate.writesEnabled = true; 237 conf.setInt(SIZE_LIMIT_KEY, 10 * 1000 * 1000 /* 10 MB */); 238 239 // Delete all Store information once done using 240 for (int i = 0; i < compactionThreshold; i++) { 241 Delete delete = new Delete(Bytes.add(STARTROW, Bytes.toBytes(i))); 242 byte[][] famAndQf = { COLUMN_FAMILY, null }; 243 delete.addFamily(famAndQf[0]); 244 r.delete(delete); 245 } 246 r.flush(true); 247 248 // Multiple versions allowed for an entry, so the delete isn't enough 249 // Lower TTL and expire to ensure that all our entries have been wiped 250 final int ttl = 1000; 251 for (HStore store : this.r.stores.values()) { 252 ScanInfo old = store.getScanInfo(); 253 ScanInfo si = old.customize(old.getMaxVersions(), ttl, old.getKeepDeletedCells()); 254 store.setScanInfo(si); 255 } 256 Thread.sleep(ttl); 257 258 r.compact(true); 259 assertEquals(0, count()); 260 } 261 } 262 263 @Test 264 public void testInterruptCompactionByTime() throws Exception { 265 assertEquals(0, count()); 266 267 // lower the polling interval for this test 268 conf.setLong(TIME_LIMIT_KEY, 1 /* 1ms */); 269 270 try { 271 // Create a couple store files w/ 15KB (over 10KB interval) 272 int jmax = (int) Math.ceil(15.0 / compactionThreshold); 273 byte[] pad = new byte[1000]; // 1 KB chunk 274 for (int i = 0; i < compactionThreshold; i++) { 275 Table loader = new RegionAsTable(r); 276 Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i))); 277 p.setDurability(Durability.SKIP_WAL); 278 for (int j = 0; j < jmax; j++) { 279 p.addColumn(COLUMN_FAMILY, Bytes.toBytes(j), pad); 280 } 281 HTestConst.addContent(loader, Bytes.toString(COLUMN_FAMILY)); 282 loader.put(p); 283 r.flush(true); 284 } 285 286 HRegion spyR = spy(r); 287 doAnswer(new Answer<Object>() { 288 @Override 289 public Object answer(InvocationOnMock invocation) throws Throwable { 290 r.writestate.writesEnabled = false; 291 return invocation.callRealMethod(); 292 } 293 }).when(spyR).doRegionCompactionPrep(); 294 295 // force a minor compaction, but not before requesting a stop 296 spyR.compactStores(); 297 298 // ensure that the compaction stopped, all old files are intact, 299 HStore s = r.getStore(COLUMN_FAMILY); 300 assertEquals(compactionThreshold, s.getStorefilesCount()); 301 assertTrue(s.getStorefilesSize() > 15 * 1000); 302 // and no new store files persisted past compactStores() 303 // only one empty dir exists in temp dir 304 FileStatus[] ls = r.getFilesystem().listStatus(r.getRegionFileSystem().getTempDir()); 305 assertEquals(1, ls.length); 306 Path storeTempDir = 307 new Path(r.getRegionFileSystem().getTempDir(), Bytes.toString(COLUMN_FAMILY)); 308 assertTrue(r.getFilesystem().exists(storeTempDir)); 309 ls = r.getFilesystem().listStatus(storeTempDir); 310 assertEquals(0, ls.length); 311 } finally { 312 // don't mess up future tests 313 r.writestate.writesEnabled = true; 314 conf.setLong(TIME_LIMIT_KEY, 10 * 1000L /* 10 s */); 315 316 // Delete all Store information once done using 317 for (int i = 0; i < compactionThreshold; i++) { 318 Delete delete = new Delete(Bytes.add(STARTROW, Bytes.toBytes(i))); 319 byte[][] famAndQf = { COLUMN_FAMILY, null }; 320 delete.addFamily(famAndQf[0]); 321 r.delete(delete); 322 } 323 r.flush(true); 324 325 // Multiple versions allowed for an entry, so the delete isn't enough 326 // Lower TTL and expire to ensure that all our entries have been wiped 327 final int ttl = 1000; 328 for (HStore store : this.r.stores.values()) { 329 ScanInfo old = store.getScanInfo(); 330 ScanInfo si = old.customize(old.getMaxVersions(), ttl, old.getKeepDeletedCells()); 331 store.setScanInfo(si); 332 } 333 Thread.sleep(ttl); 334 335 r.compact(true); 336 assertEquals(0, count()); 337 } 338 } 339 340 private int count() throws IOException { 341 int count = 0; 342 for (HStoreFile f : this.r.stores.get(COLUMN_FAMILY_TEXT).getStorefiles()) { 343 f.initReader(); 344 try (StoreFileScanner scanner = f.getPreadScanner(false, Long.MAX_VALUE, 0, false)) { 345 scanner.seek(KeyValue.LOWESTKEY); 346 while (scanner.next() != null) { 347 count++; 348 } 349 } 350 } 351 return count; 352 } 353 354 private void createStoreFile(final HRegion region) throws IOException { 355 createStoreFile(region, Bytes.toString(COLUMN_FAMILY)); 356 } 357 358 private void createStoreFile(final HRegion region, String family) throws IOException { 359 Table loader = new RegionAsTable(region); 360 HTestConst.addContent(loader, family); 361 region.flush(true); 362 } 363 364 @Test 365 public void testCompactionWithCorruptResult() throws Exception { 366 int nfiles = 10; 367 for (int i = 0; i < nfiles; i++) { 368 createStoreFile(r); 369 } 370 HStore store = r.getStore(COLUMN_FAMILY); 371 372 Collection<HStoreFile> storeFiles = store.getStorefiles(); 373 DefaultCompactor tool = (DefaultCompactor) store.storeEngine.getCompactor(); 374 CompactionRequestImpl request = new CompactionRequestImpl(storeFiles); 375 tool.compact(request, NoLimitThroughputController.INSTANCE, null); 376 377 // Now lets corrupt the compacted file. 378 FileSystem fs = store.getFileSystem(); 379 // default compaction policy created one and only one new compacted file 380 Path tmpPath = store.getRegionFileSystem().createTempName(); 381 try (FSDataOutputStream stream = fs.create(tmpPath, null, true, 512, (short) 3, 1024L, null)) { 382 stream.writeChars("CORRUPT FILE!!!!"); 383 } 384 385 // The complete compaction should fail and the corrupt file should remain 386 // in the 'tmp' directory; 387 assertThrows(IOException.class, () -> store.doCompaction(null, null, null, 388 EnvironmentEdgeManager.currentTime(), Collections.singletonList(tmpPath))); 389 assertTrue(fs.exists(tmpPath)); 390 } 391 392 /** 393 * Generates the HFile used by {@link #testCompactionWithCorruptBlock()}. Run this method to 394 * regenerate the test resource file after changes to the HFile format. The output file must then 395 * be hand-edited to corrupt the first data block (zero out the GZip magic bytes at offset 33) 396 * before being placed into the test resources directory. 397 */ 398 @Ignore("Not a test; utility for regenerating testCompactionWithCorruptBlock resource file") 399 @Test 400 public void generateHFileForCorruptBlockTest() throws Exception { 401 createStoreFile(r, Bytes.toString(FAMILY)); 402 createStoreFile(r, Bytes.toString(FAMILY)); 403 HStore store = r.getStore(FAMILY); 404 405 Collection<HStoreFile> storeFiles = store.getStorefiles(); 406 DefaultCompactor tool = (DefaultCompactor) store.storeEngine.getCompactor(); 407 CompactionRequestImpl request = new CompactionRequestImpl(storeFiles); 408 List<Path> paths = tool.compact(request, NoLimitThroughputController.INSTANCE, null); 409 410 FileSystem fs = store.getFileSystem(); 411 Path hfilePath = paths.get(0); 412 File outFile = new File("/tmp/TestCompaction_HFileWithCorruptBlock.gz"); 413 try (InputStream in = fs.open(hfilePath); 414 GZIPOutputStream gzOut = new GZIPOutputStream(new FileOutputStream(outFile))) { 415 IOUtils.copyBytes(in, gzOut, 4096); 416 } 417 LoggerFactory.getLogger(TestCompaction.class) 418 .info("Wrote HFile to {}. Now hex-edit offset 33 (0x21): zero out bytes 1f 8b.", outFile); 419 } 420 421 /** 422 * This test uses a hand-modified HFile, which is loaded in from the resources' path. That file 423 * was generated from {@link #generateHFileForCorruptBlockTest()} and then edited to corrupt the 424 * GZ-encoded block by zeroing-out the first two bytes of the GZip header, the "standard 425 * declaration" of {@code 1f 8b}, found at offset 33 in the file. I'm not sure why, but it seems 426 * that in this test context we do not enforce CRC checksums. Thus, this corruption manifests in 427 * the Decompressor rather than in the reader when it loads the block bytes and compares vs. the 428 * header. 429 */ 430 @Test 431 public void testCompactionWithCorruptBlock() throws Exception { 432 createStoreFile(r, Bytes.toString(FAMILY)); 433 createStoreFile(r, Bytes.toString(FAMILY)); 434 HStore store = r.getStore(FAMILY); 435 436 Collection<HStoreFile> storeFiles = store.getStorefiles(); 437 DefaultCompactor tool = (DefaultCompactor) store.storeEngine.getCompactor(); 438 CompactionRequestImpl request = new CompactionRequestImpl(storeFiles); 439 tool.compact(request, NoLimitThroughputController.INSTANCE, null); 440 441 // insert the hfile with a corrupted data block into the region's tmp directory, where 442 // compaction output is collected. 443 FileSystem fs = store.getFileSystem(); 444 Path tmpPath = store.getRegionFileSystem().createTempName(); 445 try ( 446 InputStream inputStream = 447 getClass().getResourceAsStream("TestCompaction_HFileWithCorruptBlock.gz"); 448 GZIPInputStream gzipInputStream = new GZIPInputStream(Objects.requireNonNull(inputStream)); 449 OutputStream outputStream = fs.create(tmpPath, null, true, 512, (short) 3, 1024L, null)) { 450 assertThat(gzipInputStream, notNullValue()); 451 assertThat(outputStream, notNullValue()); 452 IOUtils.copyBytes(gzipInputStream, outputStream, 512); 453 } 454 LoggerFactory.getLogger(TestCompaction.class).info("Wrote corrupted HFile to {}", tmpPath); 455 456 // The complete compaction should fail and the corrupt file should remain 457 // in the 'tmp' directory; 458 try { 459 store.doCompaction(request, storeFiles, null, EnvironmentEdgeManager.currentTime(), 460 Collections.singletonList(tmpPath)); 461 } catch (IOException e) { 462 Throwable rootCause = e; 463 while (rootCause.getCause() != null) { 464 rootCause = rootCause.getCause(); 465 } 466 assertThat(rootCause, allOf(instanceOf(IOException.class), 467 hasProperty("message", containsString("not a gzip file")))); 468 assertTrue(fs.exists(tmpPath)); 469 return; 470 } 471 fail("Compaction should have failed due to corrupt block"); 472 } 473 474 /** 475 * Create a custom compaction request and be sure that we can track it through the queue, knowing 476 * when the compaction is completed. 477 */ 478 @Test 479 public void testTrackingCompactionRequest() throws Exception { 480 // setup a compact/split thread on a mock server 481 HRegionServer mockServer = Mockito.mock(HRegionServer.class); 482 Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf()); 483 CompactSplit thread = new CompactSplit(mockServer); 484 Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread); 485 486 // setup a region/store with some files 487 HStore store = r.getStore(COLUMN_FAMILY); 488 createStoreFile(r); 489 for (int i = 0; i < MAX_FILES_TO_COMPACT + 1; i++) { 490 createStoreFile(r); 491 } 492 493 CountDownLatch latch = new CountDownLatch(1); 494 Tracker tracker = new Tracker(latch); 495 thread.requestCompaction(r, store, "test custom comapction", PRIORITY_USER, tracker, null); 496 // wait for the latch to complete. 497 latch.await(); 498 499 thread.interruptIfNecessary(); 500 } 501 502 @Test 503 public void testCompactionFailure() throws Exception { 504 // setup a compact/split thread on a mock server 505 HRegionServer mockServer = Mockito.mock(HRegionServer.class); 506 Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf()); 507 CompactSplit thread = new CompactSplit(mockServer); 508 Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread); 509 510 // setup a region/store with some files 511 HStore store = r.getStore(COLUMN_FAMILY); 512 createStoreFile(r); 513 for (int i = 0; i < HStore.DEFAULT_BLOCKING_STOREFILE_COUNT - 1; i++) { 514 createStoreFile(r); 515 } 516 517 HRegion mockRegion = Mockito.spy(r); 518 Mockito.when(mockRegion.checkSplit()) 519 .thenThrow(new RuntimeException("Thrown intentionally by test!")); 520 521 try (MetricsRegionWrapperImpl metricsWrapper = new MetricsRegionWrapperImpl(r)) { 522 523 long preCompletedCount = metricsWrapper.getNumCompactionsCompleted(); 524 long preFailedCount = metricsWrapper.getNumCompactionsFailed(); 525 526 CountDownLatch latch = new CountDownLatch(1); 527 Tracker tracker = new Tracker(latch); 528 thread.requestCompaction(mockRegion, store, "test custom comapction", PRIORITY_USER, tracker, 529 null); 530 // wait for the latch to complete. 531 latch.await(120, TimeUnit.SECONDS); 532 533 // compaction should have completed and been marked as failed due to error in split request 534 long postCompletedCount = metricsWrapper.getNumCompactionsCompleted(); 535 long postFailedCount = metricsWrapper.getNumCompactionsFailed(); 536 537 assertTrue("Completed count should have increased (pre=" + preCompletedCount + ", post=" 538 + postCompletedCount + ")", postCompletedCount > preCompletedCount); 539 assertTrue("Failed count should have increased (pre=" + preFailedCount + ", post=" 540 + postFailedCount + ")", postFailedCount > preFailedCount); 541 } 542 } 543 544 /** 545 * Test no new Compaction requests are generated after calling stop compactions 546 */ 547 @Test 548 public void testStopStartCompaction() throws IOException { 549 // setup a compact/split thread on a mock server 550 HRegionServer mockServer = Mockito.mock(HRegionServer.class); 551 Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf()); 552 final CompactSplit thread = new CompactSplit(mockServer); 553 Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread); 554 // setup a region/store with some files 555 HStore store = r.getStore(COLUMN_FAMILY); 556 createStoreFile(r); 557 for (int i = 0; i < HStore.DEFAULT_BLOCKING_STOREFILE_COUNT - 1; i++) { 558 createStoreFile(r); 559 } 560 thread.switchCompaction(false); 561 thread.requestCompaction(r, store, "test", Store.PRIORITY_USER, 562 CompactionLifeCycleTracker.DUMMY, null); 563 assertFalse(thread.isCompactionsEnabled()); 564 int longCompactions = thread.getLongCompactions().getActiveCount(); 565 int shortCompactions = thread.getShortCompactions().getActiveCount(); 566 assertEquals( 567 "longCompactions=" + longCompactions + "," + "shortCompactions=" + shortCompactions, 0, 568 longCompactions + shortCompactions); 569 thread.switchCompaction(true); 570 assertTrue(thread.isCompactionsEnabled()); 571 // Make sure no compactions have run. 572 assertEquals(0, thread.getLongCompactions().getCompletedTaskCount() 573 + thread.getShortCompactions().getCompletedTaskCount()); 574 // Request a compaction and make sure it is submitted successfully. 575 thread.requestCompaction(r, store, "test", Store.PRIORITY_USER, 576 CompactionLifeCycleTracker.DUMMY, null); 577 // Wait until the compaction finishes. 578 Waiter.waitFor(UTIL.getConfiguration(), 5000, 579 (Waiter.Predicate<Exception>) () -> thread.getLongCompactions().getCompletedTaskCount() 580 + thread.getShortCompactions().getCompletedTaskCount() == 1); 581 // Make sure there are no compactions running. 582 assertEquals(0, 583 thread.getLongCompactions().getActiveCount() + thread.getShortCompactions().getActiveCount()); 584 } 585 586 @Test 587 public void testInterruptingRunningCompactions() throws Exception { 588 // setup a compact/split thread on a mock server 589 conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, 590 WaitThroughPutController.class.getName()); 591 HRegionServer mockServer = Mockito.mock(HRegionServer.class); 592 Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf()); 593 CompactSplit thread = new CompactSplit(mockServer); 594 595 Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread); 596 597 // setup a region/store with some files 598 HStore store = r.getStore(COLUMN_FAMILY); 599 int jmax = (int) Math.ceil(15.0 / compactionThreshold); 600 byte[] pad = new byte[1000]; // 1 KB chunk 601 for (int i = 0; i < compactionThreshold; i++) { 602 Table loader = new RegionAsTable(r); 603 Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i))); 604 p.setDurability(Durability.SKIP_WAL); 605 for (int j = 0; j < jmax; j++) { 606 p.addColumn(COLUMN_FAMILY, Bytes.toBytes(j), pad); 607 } 608 HTestConst.addContent(loader, Bytes.toString(COLUMN_FAMILY)); 609 loader.put(p); 610 r.flush(true); 611 } 612 HStore s = r.getStore(COLUMN_FAMILY); 613 int initialFiles = s.getStorefilesCount(); 614 615 thread.requestCompaction(r, store, "test custom comapction", PRIORITY_USER, 616 CompactionLifeCycleTracker.DUMMY, null); 617 618 Thread.sleep(3000); 619 thread.switchCompaction(false); 620 assertEquals(initialFiles, s.getStorefilesCount()); 621 // don't mess up future tests 622 thread.switchCompaction(true); 623 } 624 625 /** 626 * HBASE-7947: Regression test to ensure adding to the correct list in the {@link CompactSplit} 627 * @throws Exception on failure 628 */ 629 @Test 630 public void testMultipleCustomCompactionRequests() throws Exception { 631 // setup a compact/split thread on a mock server 632 HRegionServer mockServer = Mockito.mock(HRegionServer.class); 633 Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf()); 634 CompactSplit thread = new CompactSplit(mockServer); 635 Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread); 636 637 // setup a region/store with some files 638 int numStores = r.getStores().size(); 639 CountDownLatch latch = new CountDownLatch(numStores); 640 Tracker tracker = new Tracker(latch); 641 // create some store files and setup requests for each store on which we want to do a 642 // compaction 643 for (HStore store : r.getStores()) { 644 createStoreFile(r, store.getColumnFamilyName()); 645 createStoreFile(r, store.getColumnFamilyName()); 646 createStoreFile(r, store.getColumnFamilyName()); 647 thread.requestCompaction(r, store, "test mulitple custom comapctions", PRIORITY_USER, tracker, 648 null); 649 } 650 // wait for the latch to complete. 651 latch.await(); 652 653 thread.interruptIfNecessary(); 654 } 655 656 class StoreMockMaker extends StatefulStoreMockMaker { 657 public ArrayList<HStoreFile> compacting = new ArrayList<>(); 658 public ArrayList<HStoreFile> notCompacting = new ArrayList<>(); 659 private final ArrayList<Integer> results; 660 661 public StoreMockMaker(ArrayList<Integer> results) { 662 this.results = results; 663 } 664 665 public class TestCompactionContext extends CompactionContext { 666 667 private List<HStoreFile> selectedFiles; 668 669 public TestCompactionContext(List<HStoreFile> selectedFiles) { 670 super(); 671 this.selectedFiles = selectedFiles; 672 } 673 674 @Override 675 public List<HStoreFile> preSelect(List<HStoreFile> filesCompacting) { 676 return new ArrayList<>(); 677 } 678 679 @Override 680 public boolean select(List<HStoreFile> filesCompacting, boolean isUserCompaction, 681 boolean mayUseOffPeak, boolean forceMajor) throws IOException { 682 this.request = new CompactionRequestImpl(selectedFiles); 683 this.request.setPriority(getPriority()); 684 return true; 685 } 686 687 @Override 688 public List<Path> compact(ThroughputController throughputController, User user) 689 throws IOException { 690 finishCompaction(this.selectedFiles); 691 return new ArrayList<>(); 692 } 693 } 694 695 @Override 696 public synchronized Optional<CompactionContext> selectCompaction() { 697 CompactionContext ctx = new TestCompactionContext(new ArrayList<>(notCompacting)); 698 compacting.addAll(notCompacting); 699 notCompacting.clear(); 700 try { 701 ctx.select(null, false, false, false); 702 } catch (IOException ex) { 703 fail("Shouldn't happen"); 704 } 705 return Optional.of(ctx); 706 } 707 708 @Override 709 public synchronized void cancelCompaction(Object object) { 710 TestCompactionContext ctx = (TestCompactionContext) object; 711 compacting.removeAll(ctx.selectedFiles); 712 notCompacting.addAll(ctx.selectedFiles); 713 } 714 715 public synchronized void finishCompaction(List<HStoreFile> sfs) { 716 if (sfs.isEmpty()) return; 717 synchronized (results) { 718 results.add(sfs.size()); 719 } 720 compacting.removeAll(sfs); 721 } 722 723 @Override 724 public int getPriority() { 725 return 7 - compacting.size() - notCompacting.size(); 726 } 727 } 728 729 public class BlockingStoreMockMaker extends StatefulStoreMockMaker { 730 BlockingCompactionContext blocked = null; 731 732 public class BlockingCompactionContext extends CompactionContext { 733 public volatile boolean isInCompact = false; 734 735 public void unblock() { 736 synchronized (this) { 737 this.notifyAll(); 738 } 739 } 740 741 @Override 742 public List<Path> compact(ThroughputController throughputController, User user) 743 throws IOException { 744 try { 745 isInCompact = true; 746 synchronized (this) { 747 this.wait(); 748 } 749 } catch (InterruptedException e) { 750 Assume.assumeNoException(e); 751 } 752 return new ArrayList<>(); 753 } 754 755 @Override 756 public List<HStoreFile> preSelect(List<HStoreFile> filesCompacting) { 757 return new ArrayList<>(); 758 } 759 760 @Override 761 public boolean select(List<HStoreFile> f, boolean i, boolean m, boolean e) 762 throws IOException { 763 this.request = new CompactionRequestImpl(new ArrayList<>()); 764 return true; 765 } 766 } 767 768 @Override 769 public Optional<CompactionContext> selectCompaction() { 770 this.blocked = new BlockingCompactionContext(); 771 try { 772 this.blocked.select(null, false, false, false); 773 } catch (IOException ex) { 774 fail("Shouldn't happen"); 775 } 776 return Optional.of(blocked); 777 } 778 779 @Override 780 public void cancelCompaction(Object object) { 781 } 782 783 @Override 784 public int getPriority() { 785 return Integer.MIN_VALUE; // some invalid value, see createStoreMock 786 } 787 788 public BlockingCompactionContext waitForBlocking() { 789 while (this.blocked == null || !this.blocked.isInCompact) { 790 Threads.sleepWithoutInterrupt(50); 791 } 792 BlockingCompactionContext ctx = this.blocked; 793 this.blocked = null; 794 return ctx; 795 } 796 797 @Override 798 public HStore createStoreMock(String name) throws Exception { 799 return createStoreMock(Integer.MIN_VALUE, name); 800 } 801 802 public HStore createStoreMock(int priority, String name) throws Exception { 803 // Override the mock to always return the specified priority. 804 HStore s = super.createStoreMock(name); 805 when(s.getCompactPriority()).thenReturn(priority); 806 return s; 807 } 808 } 809 810 /** Test compaction priority management and multiple compactions per store (HBASE-8665). */ 811 @Test 812 public void testCompactionQueuePriorities() throws Exception { 813 // Setup a compact/split thread on a mock server. 814 final Configuration conf = HBaseConfiguration.create(); 815 HRegionServer mockServer = mock(HRegionServer.class); 816 when(mockServer.isStopped()).thenReturn(false); 817 when(mockServer.getConfiguration()).thenReturn(conf); 818 when(mockServer.getChoreService()).thenReturn(new ChoreService("test")); 819 CompactSplit cst = new CompactSplit(mockServer); 820 when(mockServer.getCompactSplitThread()).thenReturn(cst); 821 // prevent large compaction thread pool stealing job from small compaction queue. 822 cst.shutdownLongCompactions(); 823 // Set up the region mock that redirects compactions. 824 HRegion r = mock(HRegion.class); 825 when(r.compact(any(), any(), any(), any())).then(new Answer<Boolean>() { 826 @Override 827 public Boolean answer(InvocationOnMock invocation) throws Throwable { 828 invocation.<CompactionContext> getArgument(0).compact(invocation.getArgument(2), null); 829 return true; 830 } 831 }); 832 833 // Set up store mocks for 2 "real" stores and the one we use for blocking CST. 834 ArrayList<Integer> results = new ArrayList<>(); 835 StoreMockMaker sm = new StoreMockMaker(results), sm2 = new StoreMockMaker(results); 836 HStore store = sm.createStoreMock("store1"); 837 HStore store2 = sm2.createStoreMock("store2"); 838 BlockingStoreMockMaker blocker = new BlockingStoreMockMaker(); 839 840 // First, block the compaction thread so that we could muck with queue. 841 cst.requestSystemCompaction(r, blocker.createStoreMock(1, "b-pri1"), "b-pri1"); 842 BlockingStoreMockMaker.BlockingCompactionContext currentBlock = blocker.waitForBlocking(); 843 844 // Add 4 files to store1, 3 to store2, and queue compactions; pri 3 and 4 respectively. 845 for (int i = 0; i < 4; ++i) { 846 sm.notCompacting.add(createFile()); 847 } 848 cst.requestSystemCompaction(r, store, "s1-pri3"); 849 for (int i = 0; i < 3; ++i) { 850 sm2.notCompacting.add(createFile()); 851 } 852 cst.requestSystemCompaction(r, store2, "s2-pri4"); 853 // Now add 2 more files to store1 and queue compaction - pri 1. 854 for (int i = 0; i < 2; ++i) { 855 sm.notCompacting.add(createFile()); 856 } 857 cst.requestSystemCompaction(r, store, "s1-pri1"); 858 // Finally add blocking compaction with priority 2. 859 cst.requestSystemCompaction(r, blocker.createStoreMock(2, "b-pri2"), "b-pri2"); 860 861 // Unblock the blocking compaction; we should run pri1 and become block again in pri2. 862 currentBlock.unblock(); 863 currentBlock = blocker.waitForBlocking(); 864 // Pri1 should have "compacted" all 6 files. 865 assertEquals(1, results.size()); 866 assertEquals(6, results.get(0).intValue()); 867 // Add 2 files to store 1 (it has 2 files now). 868 for (int i = 0; i < 2; ++i) { 869 sm.notCompacting.add(createFile()); 870 } 871 // Now we have pri4 for store 2 in queue, and pri3 for store1; store1's current priority 872 // is 5, however, so it must not preempt store 2. Add blocking compaction at the end. 873 cst.requestSystemCompaction(r, blocker.createStoreMock(7, "b-pri7"), "b-pri7"); 874 currentBlock.unblock(); 875 currentBlock = blocker.waitForBlocking(); 876 assertEquals(3, results.size()); 877 assertEquals(3, results.get(1).intValue()); // 3 files should go before 2 files. 878 assertEquals(2, results.get(2).intValue()); 879 880 currentBlock.unblock(); 881 cst.interruptIfNecessary(); 882 } 883 884 /** 885 * Firstly write 10 cells (with different time stamp) to a qualifier and flush to hfile1, then 886 * write 10 cells (with different time stamp) to the same qualifier and flush to hfile2. The 887 * latest cell (cell-A) in hfile1 and the oldest cell (cell-B) in hfile2 are with the same time 888 * stamp but different sequence id, and will get scanned successively during compaction. 889 * <p/> 890 * We set compaction.kv.max to 10 so compaction will scan 10 versions each round, meanwhile we set 891 * keepSeqIdPeriod=0 in {@link DummyCompactor} so all 10 versions of hfile2 will be written out 892 * with seqId cleaned (set to 0) including cell-B, then when scanner goes to cell-A it will cause 893 * a scan out-of-order assertion error before HBASE-16931 if error occurs during the test 894 */ 895 @Test 896 public void testCompactionSeqId() throws Exception { 897 final byte[] ROW = Bytes.toBytes("row"); 898 final byte[] QUALIFIER = Bytes.toBytes("qualifier"); 899 900 long timestamp = 10000; 901 902 // row1/cf:a/10009/Put/vlen=2/seqid=11 V: v9 903 // row1/cf:a/10008/Put/vlen=2/seqid=10 V: v8 904 // row1/cf:a/10007/Put/vlen=2/seqid=9 V: v7 905 // row1/cf:a/10006/Put/vlen=2/seqid=8 V: v6 906 // row1/cf:a/10005/Put/vlen=2/seqid=7 V: v5 907 // row1/cf:a/10004/Put/vlen=2/seqid=6 V: v4 908 // row1/cf:a/10003/Put/vlen=2/seqid=5 V: v3 909 // row1/cf:a/10002/Put/vlen=2/seqid=4 V: v2 910 // row1/cf:a/10001/Put/vlen=2/seqid=3 V: v1 911 // row1/cf:a/10000/Put/vlen=2/seqid=2 V: v0 912 for (int i = 0; i < 10; i++) { 913 Put put = new Put(ROW); 914 put.addColumn(FAMILY, QUALIFIER, timestamp + i, Bytes.toBytes("v" + i)); 915 r.put(put); 916 } 917 r.flush(true); 918 919 // row1/cf:a/10018/Put/vlen=3/seqid=16 V: v18 920 // row1/cf:a/10017/Put/vlen=3/seqid=17 V: v17 921 // row1/cf:a/10016/Put/vlen=3/seqid=18 V: v16 922 // row1/cf:a/10015/Put/vlen=3/seqid=19 V: v15 923 // row1/cf:a/10014/Put/vlen=3/seqid=20 V: v14 924 // row1/cf:a/10013/Put/vlen=3/seqid=21 V: v13 925 // row1/cf:a/10012/Put/vlen=3/seqid=22 V: v12 926 // row1/cf:a/10011/Put/vlen=3/seqid=23 V: v11 927 // row1/cf:a/10010/Put/vlen=3/seqid=24 V: v10 928 // row1/cf:a/10009/Put/vlen=2/seqid=25 V: v9 929 for (int i = 18; i > 8; i--) { 930 Put put = new Put(ROW); 931 put.addColumn(FAMILY, QUALIFIER, timestamp + i, Bytes.toBytes("v" + i)); 932 r.put(put); 933 } 934 r.flush(true); 935 r.compact(true); 936 } 937 938 public static class DummyCompactor extends DefaultCompactor { 939 public DummyCompactor(Configuration conf, HStore store) { 940 super(conf, store); 941 this.keepSeqIdPeriod = 0; 942 } 943 } 944 945 private static HStoreFile createFile() throws Exception { 946 HStoreFile sf = mock(HStoreFile.class); 947 when(sf.getPath()).thenReturn(new Path("file")); 948 StoreFileReader r = mock(StoreFileReader.class); 949 when(r.length()).thenReturn(10L); 950 when(sf.getReader()).thenReturn(r); 951 return sf; 952 } 953 954 /** 955 * Simple {@link CompactionLifeCycleTracker} on which you can wait until the requested compaction 956 * finishes. 957 */ 958 public static class Tracker implements CompactionLifeCycleTracker { 959 960 private final CountDownLatch done; 961 962 public Tracker(CountDownLatch done) { 963 this.done = done; 964 } 965 966 @Override 967 public void afterExecution(Store store) { 968 done.countDown(); 969 } 970 } 971 972 /** 973 * Simple {@link CompactionLifeCycleTracker} on which you can wait until the requested compaction 974 * finishes. 975 */ 976 public static class WaitThroughPutController extends NoLimitThroughputController { 977 978 public WaitThroughPutController() { 979 } 980 981 @Override 982 public long control(String compactionName, long size) throws InterruptedException { 983 Thread.sleep(6000000); 984 return 6000000; 985 } 986 } 987}