001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.HBaseTestingUtil.START_KEY;
021import static org.apache.hadoop.hbase.HBaseTestingUtil.START_KEY_BYTES;
022import static org.apache.hadoop.hbase.HBaseTestingUtil.fam1;
023import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER;
024import static org.apache.hadoop.hbase.regionserver.compactions.CloseChecker.SIZE_LIMIT_KEY;
025import static org.apache.hadoop.hbase.regionserver.compactions.CloseChecker.TIME_LIMIT_KEY;
026import static org.hamcrest.MatcherAssert.assertThat;
027import static org.hamcrest.Matchers.allOf;
028import static org.hamcrest.Matchers.containsString;
029import static org.hamcrest.Matchers.hasProperty;
030import static org.hamcrest.Matchers.instanceOf;
031import static org.hamcrest.Matchers.notNullValue;
032import static org.junit.Assert.assertEquals;
033import static org.junit.Assert.assertFalse;
034import static org.junit.Assert.assertThrows;
035import static org.junit.Assert.assertTrue;
036import static org.junit.Assert.fail;
037import static org.mockito.ArgumentMatchers.any;
038import static org.mockito.Mockito.doAnswer;
039import static org.mockito.Mockito.mock;
040import static org.mockito.Mockito.spy;
041import static org.mockito.Mockito.when;
042
043import java.io.File;
044import java.io.FileOutputStream;
045import java.io.IOException;
046import java.io.InputStream;
047import java.io.OutputStream;
048import java.util.ArrayList;
049import java.util.Collection;
050import java.util.Collections;
051import java.util.List;
052import java.util.Objects;
053import java.util.Optional;
054import java.util.concurrent.CountDownLatch;
055import java.util.concurrent.TimeUnit;
056import java.util.zip.GZIPInputStream;
057import java.util.zip.GZIPOutputStream;
058import org.apache.hadoop.conf.Configuration;
059import org.apache.hadoop.fs.FSDataOutputStream;
060import org.apache.hadoop.fs.FileStatus;
061import org.apache.hadoop.fs.FileSystem;
062import org.apache.hadoop.fs.Path;
063import org.apache.hadoop.hbase.ChoreService;
064import org.apache.hadoop.hbase.HBaseClassTestRule;
065import org.apache.hadoop.hbase.HBaseConfiguration;
066import org.apache.hadoop.hbase.HBaseTestingUtil;
067import org.apache.hadoop.hbase.HConstants;
068import org.apache.hadoop.hbase.HTestConst;
069import org.apache.hadoop.hbase.KeyValue;
070import org.apache.hadoop.hbase.Waiter;
071import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
072import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
073import org.apache.hadoop.hbase.client.Delete;
074import org.apache.hadoop.hbase.client.Durability;
075import org.apache.hadoop.hbase.client.Put;
076import org.apache.hadoop.hbase.client.Table;
077import org.apache.hadoop.hbase.client.TableDescriptor;
078import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
079import org.apache.hadoop.hbase.io.compress.Compression;
080import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
081import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
082import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
083import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
084import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
085import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
086import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
087import org.apache.hadoop.hbase.security.User;
088import org.apache.hadoop.hbase.testclassification.LargeTests;
089import org.apache.hadoop.hbase.testclassification.RegionServerTests;
090import org.apache.hadoop.hbase.util.Bytes;
091import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
092import org.apache.hadoop.hbase.util.Threads;
093import org.apache.hadoop.hbase.wal.WAL;
094import org.apache.hadoop.io.IOUtils;
095import org.junit.After;
096import org.junit.Assume;
097import org.junit.Before;
098import org.junit.ClassRule;
099import org.junit.Ignore;
100import org.junit.Rule;
101import org.junit.Test;
102import org.junit.experimental.categories.Category;
103import org.junit.rules.TestName;
104import org.mockito.Mockito;
105import org.mockito.invocation.InvocationOnMock;
106import org.mockito.stubbing.Answer;
107import org.slf4j.LoggerFactory;
108
109/**
110 * Test compaction framework and common functions
111 */
112@Category({ RegionServerTests.class, LargeTests.class })
113public class TestCompaction {
114
115  @ClassRule
116  public static final HBaseClassTestRule CLASS_RULE =
117    HBaseClassTestRule.forClass(TestCompaction.class);
118
119  @Rule
120  public TestName name = new TestName();
121  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
122  protected Configuration conf = UTIL.getConfiguration();
123
124  private HRegion r = null;
125  private TableDescriptor tableDescriptor = null;
126  private static final byte[] COLUMN_FAMILY = fam1;
127  private final byte[] STARTROW = Bytes.toBytes(START_KEY);
128  private static final byte[] COLUMN_FAMILY_TEXT = COLUMN_FAMILY;
129  private int compactionThreshold;
130  private byte[] secondRowBytes, thirdRowBytes;
131  private static final long MAX_FILES_TO_COMPACT = 10;
132  private final byte[] FAMILY = Bytes.toBytes("cf");
133
134  /** constructor */
135  public TestCompaction() {
136    // Set cache flush size to 1MB
137    conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024);
138    conf.setInt(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, 100);
139    conf.setLong(HConstants.COMPACTION_SCANNER_SIZE_MAX, 10L);
140    conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
141      NoLimitThroughputController.class.getName());
142    compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3);
143
144    secondRowBytes = START_KEY_BYTES.clone();
145    // Increment the least significant character so we get to next row.
146    secondRowBytes[START_KEY_BYTES.length - 1]++;
147    thirdRowBytes = START_KEY_BYTES.clone();
148    thirdRowBytes[START_KEY_BYTES.length - 1] =
149      (byte) (thirdRowBytes[START_KEY_BYTES.length - 1] + 2);
150  }
151
152  @Before
153  public void setUp() throws Exception {
154    TableDescriptorBuilder builder = UTIL.createModifyableTableDescriptor(name.getMethodName());
155    if (name.getMethodName().equals("testCompactionSeqId")) {
156      UTIL.getConfiguration().set("hbase.hstore.compaction.kv.max", "10");
157      UTIL.getConfiguration().set(DefaultStoreEngine.DEFAULT_COMPACTOR_CLASS_KEY,
158        DummyCompactor.class.getName());
159      ColumnFamilyDescriptor familyDescriptor =
160        ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setMaxVersions(65536).build();
161      builder.setColumnFamily(familyDescriptor);
162    }
163    if (
164      name.getMethodName().equals("testCompactionWithCorruptBlock")
165        || name.getMethodName().equals("generateHFileForCorruptBlockTest")
166    ) {
167      UTIL.getConfiguration().setBoolean("hbase.hstore.validate.read_fully", true);
168      ColumnFamilyDescriptor familyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder(FAMILY)
169        .setCompressionType(Compression.Algorithm.GZ).build();
170      builder.setColumnFamily(familyDescriptor);
171    }
172    this.tableDescriptor = builder.build();
173    this.r = UTIL.createLocalHRegion(tableDescriptor, null, null);
174  }
175
176  @After
177  public void tearDown() throws Exception {
178    WAL wal = r.getWAL();
179    this.r.close();
180    wal.close();
181  }
182
183  /**
184   * Verify that you can stop a long-running compaction (used during RS shutdown)
185   */
186  @Test
187  public void testInterruptCompactionBySize() throws Exception {
188    assertEquals(0, count());
189
190    // lower the polling interval for this test
191    conf.setInt(SIZE_LIMIT_KEY, 10 * 1000 /* 10 KB */);
192
193    try {
194      // Create a couple store files w/ 15KB (over 10KB interval)
195      int jmax = (int) Math.ceil(15.0 / compactionThreshold);
196      byte[] pad = new byte[1000]; // 1 KB chunk
197      for (int i = 0; i < compactionThreshold; i++) {
198        Table loader = new RegionAsTable(r);
199        Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i)));
200        p.setDurability(Durability.SKIP_WAL);
201        for (int j = 0; j < jmax; j++) {
202          p.addColumn(COLUMN_FAMILY, Bytes.toBytes(j), pad);
203        }
204        HTestConst.addContent(loader, Bytes.toString(COLUMN_FAMILY));
205        loader.put(p);
206        r.flush(true);
207      }
208
209      HRegion spyR = spy(r);
210      doAnswer(new Answer<Object>() {
211        @Override
212        public Object answer(InvocationOnMock invocation) throws Throwable {
213          r.writestate.writesEnabled = false;
214          return invocation.callRealMethod();
215        }
216      }).when(spyR).doRegionCompactionPrep();
217
218      // force a minor compaction, but not before requesting a stop
219      spyR.compactStores();
220
221      // ensure that the compaction stopped, all old files are intact,
222      HStore s = r.getStore(COLUMN_FAMILY);
223      assertEquals(compactionThreshold, s.getStorefilesCount());
224      assertTrue(s.getStorefilesSize() > 15 * 1000);
225      // and no new store files persisted past compactStores()
226      // only one empty dir exists in temp dir
227      FileStatus[] ls = r.getFilesystem().listStatus(r.getRegionFileSystem().getTempDir());
228      assertEquals(1, ls.length);
229      Path storeTempDir =
230        new Path(r.getRegionFileSystem().getTempDir(), Bytes.toString(COLUMN_FAMILY));
231      assertTrue(r.getFilesystem().exists(storeTempDir));
232      ls = r.getFilesystem().listStatus(storeTempDir);
233      assertEquals(0, ls.length);
234    } finally {
235      // don't mess up future tests
236      r.writestate.writesEnabled = true;
237      conf.setInt(SIZE_LIMIT_KEY, 10 * 1000 * 1000 /* 10 MB */);
238
239      // Delete all Store information once done using
240      for (int i = 0; i < compactionThreshold; i++) {
241        Delete delete = new Delete(Bytes.add(STARTROW, Bytes.toBytes(i)));
242        byte[][] famAndQf = { COLUMN_FAMILY, null };
243        delete.addFamily(famAndQf[0]);
244        r.delete(delete);
245      }
246      r.flush(true);
247
248      // Multiple versions allowed for an entry, so the delete isn't enough
249      // Lower TTL and expire to ensure that all our entries have been wiped
250      final int ttl = 1000;
251      for (HStore store : this.r.stores.values()) {
252        ScanInfo old = store.getScanInfo();
253        ScanInfo si = old.customize(old.getMaxVersions(), ttl, old.getKeepDeletedCells());
254        store.setScanInfo(si);
255      }
256      Thread.sleep(ttl);
257
258      r.compact(true);
259      assertEquals(0, count());
260    }
261  }
262
263  @Test
264  public void testInterruptCompactionByTime() throws Exception {
265    assertEquals(0, count());
266
267    // lower the polling interval for this test
268    conf.setLong(TIME_LIMIT_KEY, 1 /* 1ms */);
269
270    try {
271      // Create a couple store files w/ 15KB (over 10KB interval)
272      int jmax = (int) Math.ceil(15.0 / compactionThreshold);
273      byte[] pad = new byte[1000]; // 1 KB chunk
274      for (int i = 0; i < compactionThreshold; i++) {
275        Table loader = new RegionAsTable(r);
276        Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i)));
277        p.setDurability(Durability.SKIP_WAL);
278        for (int j = 0; j < jmax; j++) {
279          p.addColumn(COLUMN_FAMILY, Bytes.toBytes(j), pad);
280        }
281        HTestConst.addContent(loader, Bytes.toString(COLUMN_FAMILY));
282        loader.put(p);
283        r.flush(true);
284      }
285
286      HRegion spyR = spy(r);
287      doAnswer(new Answer<Object>() {
288        @Override
289        public Object answer(InvocationOnMock invocation) throws Throwable {
290          r.writestate.writesEnabled = false;
291          return invocation.callRealMethod();
292        }
293      }).when(spyR).doRegionCompactionPrep();
294
295      // force a minor compaction, but not before requesting a stop
296      spyR.compactStores();
297
298      // ensure that the compaction stopped, all old files are intact,
299      HStore s = r.getStore(COLUMN_FAMILY);
300      assertEquals(compactionThreshold, s.getStorefilesCount());
301      assertTrue(s.getStorefilesSize() > 15 * 1000);
302      // and no new store files persisted past compactStores()
303      // only one empty dir exists in temp dir
304      FileStatus[] ls = r.getFilesystem().listStatus(r.getRegionFileSystem().getTempDir());
305      assertEquals(1, ls.length);
306      Path storeTempDir =
307        new Path(r.getRegionFileSystem().getTempDir(), Bytes.toString(COLUMN_FAMILY));
308      assertTrue(r.getFilesystem().exists(storeTempDir));
309      ls = r.getFilesystem().listStatus(storeTempDir);
310      assertEquals(0, ls.length);
311    } finally {
312      // don't mess up future tests
313      r.writestate.writesEnabled = true;
314      conf.setLong(TIME_LIMIT_KEY, 10 * 1000L /* 10 s */);
315
316      // Delete all Store information once done using
317      for (int i = 0; i < compactionThreshold; i++) {
318        Delete delete = new Delete(Bytes.add(STARTROW, Bytes.toBytes(i)));
319        byte[][] famAndQf = { COLUMN_FAMILY, null };
320        delete.addFamily(famAndQf[0]);
321        r.delete(delete);
322      }
323      r.flush(true);
324
325      // Multiple versions allowed for an entry, so the delete isn't enough
326      // Lower TTL and expire to ensure that all our entries have been wiped
327      final int ttl = 1000;
328      for (HStore store : this.r.stores.values()) {
329        ScanInfo old = store.getScanInfo();
330        ScanInfo si = old.customize(old.getMaxVersions(), ttl, old.getKeepDeletedCells());
331        store.setScanInfo(si);
332      }
333      Thread.sleep(ttl);
334
335      r.compact(true);
336      assertEquals(0, count());
337    }
338  }
339
340  private int count() throws IOException {
341    int count = 0;
342    for (HStoreFile f : this.r.stores.get(COLUMN_FAMILY_TEXT).getStorefiles()) {
343      f.initReader();
344      try (StoreFileScanner scanner = f.getPreadScanner(false, Long.MAX_VALUE, 0, false)) {
345        scanner.seek(KeyValue.LOWESTKEY);
346        while (scanner.next() != null) {
347          count++;
348        }
349      }
350    }
351    return count;
352  }
353
354  private void createStoreFile(final HRegion region) throws IOException {
355    createStoreFile(region, Bytes.toString(COLUMN_FAMILY));
356  }
357
358  private void createStoreFile(final HRegion region, String family) throws IOException {
359    Table loader = new RegionAsTable(region);
360    HTestConst.addContent(loader, family);
361    region.flush(true);
362  }
363
364  @Test
365  public void testCompactionWithCorruptResult() throws Exception {
366    int nfiles = 10;
367    for (int i = 0; i < nfiles; i++) {
368      createStoreFile(r);
369    }
370    HStore store = r.getStore(COLUMN_FAMILY);
371
372    Collection<HStoreFile> storeFiles = store.getStorefiles();
373    DefaultCompactor tool = (DefaultCompactor) store.storeEngine.getCompactor();
374    CompactionRequestImpl request = new CompactionRequestImpl(storeFiles);
375    tool.compact(request, NoLimitThroughputController.INSTANCE, null);
376
377    // Now lets corrupt the compacted file.
378    FileSystem fs = store.getFileSystem();
379    // default compaction policy created one and only one new compacted file
380    Path tmpPath = store.getRegionFileSystem().createTempName();
381    try (FSDataOutputStream stream = fs.create(tmpPath, null, true, 512, (short) 3, 1024L, null)) {
382      stream.writeChars("CORRUPT FILE!!!!");
383    }
384
385    // The complete compaction should fail and the corrupt file should remain
386    // in the 'tmp' directory;
387    assertThrows(IOException.class, () -> store.doCompaction(null, null, null,
388      EnvironmentEdgeManager.currentTime(), Collections.singletonList(tmpPath)));
389    assertTrue(fs.exists(tmpPath));
390  }
391
392  /**
393   * Generates the HFile used by {@link #testCompactionWithCorruptBlock()}. Run this method to
394   * regenerate the test resource file after changes to the HFile format. The output file must then
395   * be hand-edited to corrupt the first data block (zero out the GZip magic bytes at offset 33)
396   * before being placed into the test resources directory.
397   */
398  @Ignore("Not a test; utility for regenerating testCompactionWithCorruptBlock resource file")
399  @Test
400  public void generateHFileForCorruptBlockTest() throws Exception {
401    createStoreFile(r, Bytes.toString(FAMILY));
402    createStoreFile(r, Bytes.toString(FAMILY));
403    HStore store = r.getStore(FAMILY);
404
405    Collection<HStoreFile> storeFiles = store.getStorefiles();
406    DefaultCompactor tool = (DefaultCompactor) store.storeEngine.getCompactor();
407    CompactionRequestImpl request = new CompactionRequestImpl(storeFiles);
408    List<Path> paths = tool.compact(request, NoLimitThroughputController.INSTANCE, null);
409
410    FileSystem fs = store.getFileSystem();
411    Path hfilePath = paths.get(0);
412    File outFile = new File("/tmp/TestCompaction_HFileWithCorruptBlock.gz");
413    try (InputStream in = fs.open(hfilePath);
414      GZIPOutputStream gzOut = new GZIPOutputStream(new FileOutputStream(outFile))) {
415      IOUtils.copyBytes(in, gzOut, 4096);
416    }
417    LoggerFactory.getLogger(TestCompaction.class)
418      .info("Wrote HFile to {}. Now hex-edit offset 33 (0x21): zero out bytes 1f 8b.", outFile);
419  }
420
421  /**
422   * This test uses a hand-modified HFile, which is loaded in from the resources' path. That file
423   * was generated from {@link #generateHFileForCorruptBlockTest()} and then edited to corrupt the
424   * GZ-encoded block by zeroing-out the first two bytes of the GZip header, the "standard
425   * declaration" of {@code 1f 8b}, found at offset 33 in the file. I'm not sure why, but it seems
426   * that in this test context we do not enforce CRC checksums. Thus, this corruption manifests in
427   * the Decompressor rather than in the reader when it loads the block bytes and compares vs. the
428   * header.
429   */
430  @Test
431  public void testCompactionWithCorruptBlock() throws Exception {
432    createStoreFile(r, Bytes.toString(FAMILY));
433    createStoreFile(r, Bytes.toString(FAMILY));
434    HStore store = r.getStore(FAMILY);
435
436    Collection<HStoreFile> storeFiles = store.getStorefiles();
437    DefaultCompactor tool = (DefaultCompactor) store.storeEngine.getCompactor();
438    CompactionRequestImpl request = new CompactionRequestImpl(storeFiles);
439    tool.compact(request, NoLimitThroughputController.INSTANCE, null);
440
441    // insert the hfile with a corrupted data block into the region's tmp directory, where
442    // compaction output is collected.
443    FileSystem fs = store.getFileSystem();
444    Path tmpPath = store.getRegionFileSystem().createTempName();
445    try (
446      InputStream inputStream =
447        getClass().getResourceAsStream("TestCompaction_HFileWithCorruptBlock.gz");
448      GZIPInputStream gzipInputStream = new GZIPInputStream(Objects.requireNonNull(inputStream));
449      OutputStream outputStream = fs.create(tmpPath, null, true, 512, (short) 3, 1024L, null)) {
450      assertThat(gzipInputStream, notNullValue());
451      assertThat(outputStream, notNullValue());
452      IOUtils.copyBytes(gzipInputStream, outputStream, 512);
453    }
454    LoggerFactory.getLogger(TestCompaction.class).info("Wrote corrupted HFile to {}", tmpPath);
455
456    // The complete compaction should fail and the corrupt file should remain
457    // in the 'tmp' directory;
458    try {
459      store.doCompaction(request, storeFiles, null, EnvironmentEdgeManager.currentTime(),
460        Collections.singletonList(tmpPath));
461    } catch (IOException e) {
462      Throwable rootCause = e;
463      while (rootCause.getCause() != null) {
464        rootCause = rootCause.getCause();
465      }
466      assertThat(rootCause, allOf(instanceOf(IOException.class),
467        hasProperty("message", containsString("not a gzip file"))));
468      assertTrue(fs.exists(tmpPath));
469      return;
470    }
471    fail("Compaction should have failed due to corrupt block");
472  }
473
474  /**
475   * Create a custom compaction request and be sure that we can track it through the queue, knowing
476   * when the compaction is completed.
477   */
478  @Test
479  public void testTrackingCompactionRequest() throws Exception {
480    // setup a compact/split thread on a mock server
481    HRegionServer mockServer = Mockito.mock(HRegionServer.class);
482    Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
483    CompactSplit thread = new CompactSplit(mockServer);
484    Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
485
486    // setup a region/store with some files
487    HStore store = r.getStore(COLUMN_FAMILY);
488    createStoreFile(r);
489    for (int i = 0; i < MAX_FILES_TO_COMPACT + 1; i++) {
490      createStoreFile(r);
491    }
492
493    CountDownLatch latch = new CountDownLatch(1);
494    Tracker tracker = new Tracker(latch);
495    thread.requestCompaction(r, store, "test custom comapction", PRIORITY_USER, tracker, null);
496    // wait for the latch to complete.
497    latch.await();
498
499    thread.interruptIfNecessary();
500  }
501
502  @Test
503  public void testCompactionFailure() throws Exception {
504    // setup a compact/split thread on a mock server
505    HRegionServer mockServer = Mockito.mock(HRegionServer.class);
506    Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
507    CompactSplit thread = new CompactSplit(mockServer);
508    Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
509
510    // setup a region/store with some files
511    HStore store = r.getStore(COLUMN_FAMILY);
512    createStoreFile(r);
513    for (int i = 0; i < HStore.DEFAULT_BLOCKING_STOREFILE_COUNT - 1; i++) {
514      createStoreFile(r);
515    }
516
517    HRegion mockRegion = Mockito.spy(r);
518    Mockito.when(mockRegion.checkSplit())
519      .thenThrow(new RuntimeException("Thrown intentionally by test!"));
520
521    try (MetricsRegionWrapperImpl metricsWrapper = new MetricsRegionWrapperImpl(r)) {
522
523      long preCompletedCount = metricsWrapper.getNumCompactionsCompleted();
524      long preFailedCount = metricsWrapper.getNumCompactionsFailed();
525
526      CountDownLatch latch = new CountDownLatch(1);
527      Tracker tracker = new Tracker(latch);
528      thread.requestCompaction(mockRegion, store, "test custom comapction", PRIORITY_USER, tracker,
529        null);
530      // wait for the latch to complete.
531      latch.await(120, TimeUnit.SECONDS);
532
533      // compaction should have completed and been marked as failed due to error in split request
534      long postCompletedCount = metricsWrapper.getNumCompactionsCompleted();
535      long postFailedCount = metricsWrapper.getNumCompactionsFailed();
536
537      assertTrue("Completed count should have increased (pre=" + preCompletedCount + ", post="
538        + postCompletedCount + ")", postCompletedCount > preCompletedCount);
539      assertTrue("Failed count should have increased (pre=" + preFailedCount + ", post="
540        + postFailedCount + ")", postFailedCount > preFailedCount);
541    }
542  }
543
544  /**
545   * Test no new Compaction requests are generated after calling stop compactions
546   */
547  @Test
548  public void testStopStartCompaction() throws IOException {
549    // setup a compact/split thread on a mock server
550    HRegionServer mockServer = Mockito.mock(HRegionServer.class);
551    Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
552    final CompactSplit thread = new CompactSplit(mockServer);
553    Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
554    // setup a region/store with some files
555    HStore store = r.getStore(COLUMN_FAMILY);
556    createStoreFile(r);
557    for (int i = 0; i < HStore.DEFAULT_BLOCKING_STOREFILE_COUNT - 1; i++) {
558      createStoreFile(r);
559    }
560    thread.switchCompaction(false);
561    thread.requestCompaction(r, store, "test", Store.PRIORITY_USER,
562      CompactionLifeCycleTracker.DUMMY, null);
563    assertFalse(thread.isCompactionsEnabled());
564    int longCompactions = thread.getLongCompactions().getActiveCount();
565    int shortCompactions = thread.getShortCompactions().getActiveCount();
566    assertEquals(
567      "longCompactions=" + longCompactions + "," + "shortCompactions=" + shortCompactions, 0,
568      longCompactions + shortCompactions);
569    thread.switchCompaction(true);
570    assertTrue(thread.isCompactionsEnabled());
571    // Make sure no compactions have run.
572    assertEquals(0, thread.getLongCompactions().getCompletedTaskCount()
573      + thread.getShortCompactions().getCompletedTaskCount());
574    // Request a compaction and make sure it is submitted successfully.
575    thread.requestCompaction(r, store, "test", Store.PRIORITY_USER,
576      CompactionLifeCycleTracker.DUMMY, null);
577    // Wait until the compaction finishes.
578    Waiter.waitFor(UTIL.getConfiguration(), 5000,
579      (Waiter.Predicate<Exception>) () -> thread.getLongCompactions().getCompletedTaskCount()
580        + thread.getShortCompactions().getCompletedTaskCount() == 1);
581    // Make sure there are no compactions running.
582    assertEquals(0,
583      thread.getLongCompactions().getActiveCount() + thread.getShortCompactions().getActiveCount());
584  }
585
586  @Test
587  public void testInterruptingRunningCompactions() throws Exception {
588    // setup a compact/split thread on a mock server
589    conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
590      WaitThroughPutController.class.getName());
591    HRegionServer mockServer = Mockito.mock(HRegionServer.class);
592    Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
593    CompactSplit thread = new CompactSplit(mockServer);
594
595    Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
596
597    // setup a region/store with some files
598    HStore store = r.getStore(COLUMN_FAMILY);
599    int jmax = (int) Math.ceil(15.0 / compactionThreshold);
600    byte[] pad = new byte[1000]; // 1 KB chunk
601    for (int i = 0; i < compactionThreshold; i++) {
602      Table loader = new RegionAsTable(r);
603      Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i)));
604      p.setDurability(Durability.SKIP_WAL);
605      for (int j = 0; j < jmax; j++) {
606        p.addColumn(COLUMN_FAMILY, Bytes.toBytes(j), pad);
607      }
608      HTestConst.addContent(loader, Bytes.toString(COLUMN_FAMILY));
609      loader.put(p);
610      r.flush(true);
611    }
612    HStore s = r.getStore(COLUMN_FAMILY);
613    int initialFiles = s.getStorefilesCount();
614
615    thread.requestCompaction(r, store, "test custom comapction", PRIORITY_USER,
616      CompactionLifeCycleTracker.DUMMY, null);
617
618    Thread.sleep(3000);
619    thread.switchCompaction(false);
620    assertEquals(initialFiles, s.getStorefilesCount());
621    // don't mess up future tests
622    thread.switchCompaction(true);
623  }
624
625  /**
626   * HBASE-7947: Regression test to ensure adding to the correct list in the {@link CompactSplit}
627   * @throws Exception on failure
628   */
629  @Test
630  public void testMultipleCustomCompactionRequests() throws Exception {
631    // setup a compact/split thread on a mock server
632    HRegionServer mockServer = Mockito.mock(HRegionServer.class);
633    Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
634    CompactSplit thread = new CompactSplit(mockServer);
635    Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
636
637    // setup a region/store with some files
638    int numStores = r.getStores().size();
639    CountDownLatch latch = new CountDownLatch(numStores);
640    Tracker tracker = new Tracker(latch);
641    // create some store files and setup requests for each store on which we want to do a
642    // compaction
643    for (HStore store : r.getStores()) {
644      createStoreFile(r, store.getColumnFamilyName());
645      createStoreFile(r, store.getColumnFamilyName());
646      createStoreFile(r, store.getColumnFamilyName());
647      thread.requestCompaction(r, store, "test mulitple custom comapctions", PRIORITY_USER, tracker,
648        null);
649    }
650    // wait for the latch to complete.
651    latch.await();
652
653    thread.interruptIfNecessary();
654  }
655
656  class StoreMockMaker extends StatefulStoreMockMaker {
657    public ArrayList<HStoreFile> compacting = new ArrayList<>();
658    public ArrayList<HStoreFile> notCompacting = new ArrayList<>();
659    private final ArrayList<Integer> results;
660
661    public StoreMockMaker(ArrayList<Integer> results) {
662      this.results = results;
663    }
664
665    public class TestCompactionContext extends CompactionContext {
666
667      private List<HStoreFile> selectedFiles;
668
669      public TestCompactionContext(List<HStoreFile> selectedFiles) {
670        super();
671        this.selectedFiles = selectedFiles;
672      }
673
674      @Override
675      public List<HStoreFile> preSelect(List<HStoreFile> filesCompacting) {
676        return new ArrayList<>();
677      }
678
679      @Override
680      public boolean select(List<HStoreFile> filesCompacting, boolean isUserCompaction,
681        boolean mayUseOffPeak, boolean forceMajor) throws IOException {
682        this.request = new CompactionRequestImpl(selectedFiles);
683        this.request.setPriority(getPriority());
684        return true;
685      }
686
687      @Override
688      public List<Path> compact(ThroughputController throughputController, User user)
689        throws IOException {
690        finishCompaction(this.selectedFiles);
691        return new ArrayList<>();
692      }
693    }
694
695    @Override
696    public synchronized Optional<CompactionContext> selectCompaction() {
697      CompactionContext ctx = new TestCompactionContext(new ArrayList<>(notCompacting));
698      compacting.addAll(notCompacting);
699      notCompacting.clear();
700      try {
701        ctx.select(null, false, false, false);
702      } catch (IOException ex) {
703        fail("Shouldn't happen");
704      }
705      return Optional.of(ctx);
706    }
707
708    @Override
709    public synchronized void cancelCompaction(Object object) {
710      TestCompactionContext ctx = (TestCompactionContext) object;
711      compacting.removeAll(ctx.selectedFiles);
712      notCompacting.addAll(ctx.selectedFiles);
713    }
714
715    public synchronized void finishCompaction(List<HStoreFile> sfs) {
716      if (sfs.isEmpty()) return;
717      synchronized (results) {
718        results.add(sfs.size());
719      }
720      compacting.removeAll(sfs);
721    }
722
723    @Override
724    public int getPriority() {
725      return 7 - compacting.size() - notCompacting.size();
726    }
727  }
728
729  public class BlockingStoreMockMaker extends StatefulStoreMockMaker {
730    BlockingCompactionContext blocked = null;
731
732    public class BlockingCompactionContext extends CompactionContext {
733      public volatile boolean isInCompact = false;
734
735      public void unblock() {
736        synchronized (this) {
737          this.notifyAll();
738        }
739      }
740
741      @Override
742      public List<Path> compact(ThroughputController throughputController, User user)
743        throws IOException {
744        try {
745          isInCompact = true;
746          synchronized (this) {
747            this.wait();
748          }
749        } catch (InterruptedException e) {
750          Assume.assumeNoException(e);
751        }
752        return new ArrayList<>();
753      }
754
755      @Override
756      public List<HStoreFile> preSelect(List<HStoreFile> filesCompacting) {
757        return new ArrayList<>();
758      }
759
760      @Override
761      public boolean select(List<HStoreFile> f, boolean i, boolean m, boolean e)
762        throws IOException {
763        this.request = new CompactionRequestImpl(new ArrayList<>());
764        return true;
765      }
766    }
767
768    @Override
769    public Optional<CompactionContext> selectCompaction() {
770      this.blocked = new BlockingCompactionContext();
771      try {
772        this.blocked.select(null, false, false, false);
773      } catch (IOException ex) {
774        fail("Shouldn't happen");
775      }
776      return Optional.of(blocked);
777    }
778
779    @Override
780    public void cancelCompaction(Object object) {
781    }
782
783    @Override
784    public int getPriority() {
785      return Integer.MIN_VALUE; // some invalid value, see createStoreMock
786    }
787
788    public BlockingCompactionContext waitForBlocking() {
789      while (this.blocked == null || !this.blocked.isInCompact) {
790        Threads.sleepWithoutInterrupt(50);
791      }
792      BlockingCompactionContext ctx = this.blocked;
793      this.blocked = null;
794      return ctx;
795    }
796
797    @Override
798    public HStore createStoreMock(String name) throws Exception {
799      return createStoreMock(Integer.MIN_VALUE, name);
800    }
801
802    public HStore createStoreMock(int priority, String name) throws Exception {
803      // Override the mock to always return the specified priority.
804      HStore s = super.createStoreMock(name);
805      when(s.getCompactPriority()).thenReturn(priority);
806      return s;
807    }
808  }
809
810  /** Test compaction priority management and multiple compactions per store (HBASE-8665). */
811  @Test
812  public void testCompactionQueuePriorities() throws Exception {
813    // Setup a compact/split thread on a mock server.
814    final Configuration conf = HBaseConfiguration.create();
815    HRegionServer mockServer = mock(HRegionServer.class);
816    when(mockServer.isStopped()).thenReturn(false);
817    when(mockServer.getConfiguration()).thenReturn(conf);
818    when(mockServer.getChoreService()).thenReturn(new ChoreService("test"));
819    CompactSplit cst = new CompactSplit(mockServer);
820    when(mockServer.getCompactSplitThread()).thenReturn(cst);
821    // prevent large compaction thread pool stealing job from small compaction queue.
822    cst.shutdownLongCompactions();
823    // Set up the region mock that redirects compactions.
824    HRegion r = mock(HRegion.class);
825    when(r.compact(any(), any(), any(), any())).then(new Answer<Boolean>() {
826      @Override
827      public Boolean answer(InvocationOnMock invocation) throws Throwable {
828        invocation.<CompactionContext> getArgument(0).compact(invocation.getArgument(2), null);
829        return true;
830      }
831    });
832
833    // Set up store mocks for 2 "real" stores and the one we use for blocking CST.
834    ArrayList<Integer> results = new ArrayList<>();
835    StoreMockMaker sm = new StoreMockMaker(results), sm2 = new StoreMockMaker(results);
836    HStore store = sm.createStoreMock("store1");
837    HStore store2 = sm2.createStoreMock("store2");
838    BlockingStoreMockMaker blocker = new BlockingStoreMockMaker();
839
840    // First, block the compaction thread so that we could muck with queue.
841    cst.requestSystemCompaction(r, blocker.createStoreMock(1, "b-pri1"), "b-pri1");
842    BlockingStoreMockMaker.BlockingCompactionContext currentBlock = blocker.waitForBlocking();
843
844    // Add 4 files to store1, 3 to store2, and queue compactions; pri 3 and 4 respectively.
845    for (int i = 0; i < 4; ++i) {
846      sm.notCompacting.add(createFile());
847    }
848    cst.requestSystemCompaction(r, store, "s1-pri3");
849    for (int i = 0; i < 3; ++i) {
850      sm2.notCompacting.add(createFile());
851    }
852    cst.requestSystemCompaction(r, store2, "s2-pri4");
853    // Now add 2 more files to store1 and queue compaction - pri 1.
854    for (int i = 0; i < 2; ++i) {
855      sm.notCompacting.add(createFile());
856    }
857    cst.requestSystemCompaction(r, store, "s1-pri1");
858    // Finally add blocking compaction with priority 2.
859    cst.requestSystemCompaction(r, blocker.createStoreMock(2, "b-pri2"), "b-pri2");
860
861    // Unblock the blocking compaction; we should run pri1 and become block again in pri2.
862    currentBlock.unblock();
863    currentBlock = blocker.waitForBlocking();
864    // Pri1 should have "compacted" all 6 files.
865    assertEquals(1, results.size());
866    assertEquals(6, results.get(0).intValue());
867    // Add 2 files to store 1 (it has 2 files now).
868    for (int i = 0; i < 2; ++i) {
869      sm.notCompacting.add(createFile());
870    }
871    // Now we have pri4 for store 2 in queue, and pri3 for store1; store1's current priority
872    // is 5, however, so it must not preempt store 2. Add blocking compaction at the end.
873    cst.requestSystemCompaction(r, blocker.createStoreMock(7, "b-pri7"), "b-pri7");
874    currentBlock.unblock();
875    currentBlock = blocker.waitForBlocking();
876    assertEquals(3, results.size());
877    assertEquals(3, results.get(1).intValue()); // 3 files should go before 2 files.
878    assertEquals(2, results.get(2).intValue());
879
880    currentBlock.unblock();
881    cst.interruptIfNecessary();
882  }
883
884  /**
885   * Firstly write 10 cells (with different time stamp) to a qualifier and flush to hfile1, then
886   * write 10 cells (with different time stamp) to the same qualifier and flush to hfile2. The
887   * latest cell (cell-A) in hfile1 and the oldest cell (cell-B) in hfile2 are with the same time
888   * stamp but different sequence id, and will get scanned successively during compaction.
889   * <p/>
890   * We set compaction.kv.max to 10 so compaction will scan 10 versions each round, meanwhile we set
891   * keepSeqIdPeriod=0 in {@link DummyCompactor} so all 10 versions of hfile2 will be written out
892   * with seqId cleaned (set to 0) including cell-B, then when scanner goes to cell-A it will cause
893   * a scan out-of-order assertion error before HBASE-16931 if error occurs during the test
894   */
895  @Test
896  public void testCompactionSeqId() throws Exception {
897    final byte[] ROW = Bytes.toBytes("row");
898    final byte[] QUALIFIER = Bytes.toBytes("qualifier");
899
900    long timestamp = 10000;
901
902    // row1/cf:a/10009/Put/vlen=2/seqid=11 V: v9
903    // row1/cf:a/10008/Put/vlen=2/seqid=10 V: v8
904    // row1/cf:a/10007/Put/vlen=2/seqid=9 V: v7
905    // row1/cf:a/10006/Put/vlen=2/seqid=8 V: v6
906    // row1/cf:a/10005/Put/vlen=2/seqid=7 V: v5
907    // row1/cf:a/10004/Put/vlen=2/seqid=6 V: v4
908    // row1/cf:a/10003/Put/vlen=2/seqid=5 V: v3
909    // row1/cf:a/10002/Put/vlen=2/seqid=4 V: v2
910    // row1/cf:a/10001/Put/vlen=2/seqid=3 V: v1
911    // row1/cf:a/10000/Put/vlen=2/seqid=2 V: v0
912    for (int i = 0; i < 10; i++) {
913      Put put = new Put(ROW);
914      put.addColumn(FAMILY, QUALIFIER, timestamp + i, Bytes.toBytes("v" + i));
915      r.put(put);
916    }
917    r.flush(true);
918
919    // row1/cf:a/10018/Put/vlen=3/seqid=16 V: v18
920    // row1/cf:a/10017/Put/vlen=3/seqid=17 V: v17
921    // row1/cf:a/10016/Put/vlen=3/seqid=18 V: v16
922    // row1/cf:a/10015/Put/vlen=3/seqid=19 V: v15
923    // row1/cf:a/10014/Put/vlen=3/seqid=20 V: v14
924    // row1/cf:a/10013/Put/vlen=3/seqid=21 V: v13
925    // row1/cf:a/10012/Put/vlen=3/seqid=22 V: v12
926    // row1/cf:a/10011/Put/vlen=3/seqid=23 V: v11
927    // row1/cf:a/10010/Put/vlen=3/seqid=24 V: v10
928    // row1/cf:a/10009/Put/vlen=2/seqid=25 V: v9
929    for (int i = 18; i > 8; i--) {
930      Put put = new Put(ROW);
931      put.addColumn(FAMILY, QUALIFIER, timestamp + i, Bytes.toBytes("v" + i));
932      r.put(put);
933    }
934    r.flush(true);
935    r.compact(true);
936  }
937
938  public static class DummyCompactor extends DefaultCompactor {
939    public DummyCompactor(Configuration conf, HStore store) {
940      super(conf, store);
941      this.keepSeqIdPeriod = 0;
942    }
943  }
944
945  private static HStoreFile createFile() throws Exception {
946    HStoreFile sf = mock(HStoreFile.class);
947    when(sf.getPath()).thenReturn(new Path("file"));
948    StoreFileReader r = mock(StoreFileReader.class);
949    when(r.length()).thenReturn(10L);
950    when(sf.getReader()).thenReturn(r);
951    return sf;
952  }
953
954  /**
955   * Simple {@link CompactionLifeCycleTracker} on which you can wait until the requested compaction
956   * finishes.
957   */
958  public static class Tracker implements CompactionLifeCycleTracker {
959
960    private final CountDownLatch done;
961
962    public Tracker(CountDownLatch done) {
963      this.done = done;
964    }
965
966    @Override
967    public void afterExecution(Store store) {
968      done.countDown();
969    }
970  }
971
972  /**
973   * Simple {@link CompactionLifeCycleTracker} on which you can wait until the requested compaction
974   * finishes.
975   */
976  public static class WaitThroughPutController extends NoLimitThroughputController {
977
978    public WaitThroughPutController() {
979    }
980
981    @Override
982    public long control(String compactionName, long size) throws InterruptedException {
983      Thread.sleep(6000000);
984      return 6000000;
985    }
986  }
987}