001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.HBaseTestingUtil.START_KEY;
021import static org.apache.hadoop.hbase.HBaseTestingUtil.START_KEY_BYTES;
022import static org.apache.hadoop.hbase.HBaseTestingUtil.fam1;
023import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER;
024import static org.apache.hadoop.hbase.regionserver.compactions.CloseChecker.SIZE_LIMIT_KEY;
025import static org.apache.hadoop.hbase.regionserver.compactions.CloseChecker.TIME_LIMIT_KEY;
026import static org.hamcrest.MatcherAssert.assertThat;
027import static org.hamcrest.Matchers.allOf;
028import static org.hamcrest.Matchers.containsString;
029import static org.hamcrest.Matchers.hasProperty;
030import static org.hamcrest.Matchers.instanceOf;
031import static org.hamcrest.Matchers.notNullValue;
032import static org.junit.jupiter.api.Assertions.assertEquals;
033import static org.junit.jupiter.api.Assertions.assertFalse;
034import static org.junit.jupiter.api.Assertions.assertThrows;
035import static org.junit.jupiter.api.Assertions.assertTrue;
036import static org.junit.jupiter.api.Assertions.fail;
037import static org.mockito.ArgumentMatchers.any;
038import static org.mockito.Mockito.doAnswer;
039import static org.mockito.Mockito.mock;
040import static org.mockito.Mockito.spy;
041import static org.mockito.Mockito.when;
042
043import java.io.File;
044import java.io.FileOutputStream;
045import java.io.IOException;
046import java.io.InputStream;
047import java.io.OutputStream;
048import java.util.ArrayList;
049import java.util.Collection;
050import java.util.Collections;
051import java.util.List;
052import java.util.Objects;
053import java.util.Optional;
054import java.util.concurrent.CountDownLatch;
055import java.util.concurrent.TimeUnit;
056import java.util.zip.GZIPInputStream;
057import java.util.zip.GZIPOutputStream;
058import org.apache.hadoop.conf.Configuration;
059import org.apache.hadoop.fs.FSDataOutputStream;
060import org.apache.hadoop.fs.FileStatus;
061import org.apache.hadoop.fs.FileSystem;
062import org.apache.hadoop.fs.Path;
063import org.apache.hadoop.hbase.ChoreService;
064import org.apache.hadoop.hbase.HBaseConfiguration;
065import org.apache.hadoop.hbase.HBaseTestingUtil;
066import org.apache.hadoop.hbase.HConstants;
067import org.apache.hadoop.hbase.HTestConst;
068import org.apache.hadoop.hbase.KeyValue;
069import org.apache.hadoop.hbase.Waiter;
070import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
071import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
072import org.apache.hadoop.hbase.client.Delete;
073import org.apache.hadoop.hbase.client.Durability;
074import org.apache.hadoop.hbase.client.Put;
075import org.apache.hadoop.hbase.client.Table;
076import org.apache.hadoop.hbase.client.TableDescriptor;
077import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
078import org.apache.hadoop.hbase.io.compress.Compression;
079import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
080import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
081import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
082import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
083import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
084import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
085import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
086import org.apache.hadoop.hbase.security.User;
087import org.apache.hadoop.hbase.testclassification.LargeTests;
088import org.apache.hadoop.hbase.testclassification.RegionServerTests;
089import org.apache.hadoop.hbase.util.Bytes;
090import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
091import org.apache.hadoop.hbase.util.Threads;
092import org.apache.hadoop.hbase.wal.WAL;
093import org.apache.hadoop.io.IOUtils;
094import org.junit.jupiter.api.AfterEach;
095import org.junit.jupiter.api.Assumptions;
096import org.junit.jupiter.api.BeforeEach;
097import org.junit.jupiter.api.Disabled;
098import org.junit.jupiter.api.Tag;
099import org.junit.jupiter.api.Test;
100import org.junit.jupiter.api.TestInfo;
101import org.mockito.Mockito;
102import org.mockito.invocation.InvocationOnMock;
103import org.mockito.stubbing.Answer;
104import org.slf4j.LoggerFactory;
105
106/**
107 * Test compaction framework and common functions
108 */
109@Tag(RegionServerTests.TAG)
110@Tag(LargeTests.TAG)
111public class TestCompaction {
112
113  private String name;
114
115  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
116  protected Configuration conf = UTIL.getConfiguration();
117
118  private HRegion r = null;
119  private TableDescriptor tableDescriptor = null;
120  private static final byte[] COLUMN_FAMILY = fam1;
121  private final byte[] STARTROW = Bytes.toBytes(START_KEY);
122  private static final byte[] COLUMN_FAMILY_TEXT = COLUMN_FAMILY;
123  private int compactionThreshold;
124  private byte[] secondRowBytes, thirdRowBytes;
125  private static final long MAX_FILES_TO_COMPACT = 10;
126  private final byte[] FAMILY = Bytes.toBytes("cf");
127
128  /** constructor */
129  public TestCompaction() {
130    // Set cache flush size to 1MB
131    conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024);
132    conf.setInt(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, 100);
133    conf.setLong(HConstants.COMPACTION_SCANNER_SIZE_MAX, 10L);
134    conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
135      NoLimitThroughputController.class.getName());
136    compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3);
137
138    secondRowBytes = START_KEY_BYTES.clone();
139    // Increment the least significant character so we get to next row.
140    secondRowBytes[START_KEY_BYTES.length - 1]++;
141    thirdRowBytes = START_KEY_BYTES.clone();
142    thirdRowBytes[START_KEY_BYTES.length - 1] =
143      (byte) (thirdRowBytes[START_KEY_BYTES.length - 1] + 2);
144  }
145
146  @BeforeEach
147  public void setUp(TestInfo testInfo) throws Exception {
148    this.name = testInfo.getTestMethod().get().getName();
149    TableDescriptorBuilder builder = UTIL.createModifyableTableDescriptor(name);
150    if (name.equals("testCompactionSeqId")) {
151      UTIL.getConfiguration().set("hbase.hstore.compaction.kv.max", "10");
152      UTIL.getConfiguration().set(DefaultStoreEngine.DEFAULT_COMPACTOR_CLASS_KEY,
153        DummyCompactor.class.getName());
154      ColumnFamilyDescriptor familyDescriptor =
155        ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setMaxVersions(65536).build();
156      builder.setColumnFamily(familyDescriptor);
157    }
158    if (
159      name.equals("testCompactionWithCorruptBlock")
160        || name.equals("generateHFileForCorruptBlockTest")
161    ) {
162      UTIL.getConfiguration().setBoolean("hbase.hstore.validate.read_fully", true);
163      ColumnFamilyDescriptor familyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder(FAMILY)
164        .setCompressionType(Compression.Algorithm.GZ).build();
165      builder.setColumnFamily(familyDescriptor);
166    }
167    this.tableDescriptor = builder.build();
168    this.r = UTIL.createLocalHRegion(tableDescriptor, null, null);
169  }
170
171  @AfterEach
172  public void tearDown() throws Exception {
173    WAL wal = r.getWAL();
174    this.r.close();
175    wal.close();
176  }
177
178  /**
179   * Verify that you can stop a long-running compaction (used during RS shutdown)
180   */
181  @Test
182  public void testInterruptCompactionBySize() throws Exception {
183    assertEquals(0, count());
184
185    // lower the polling interval for this test
186    conf.setInt(SIZE_LIMIT_KEY, 10 * 1000 /* 10 KB */);
187
188    try {
189      // Create a couple store files w/ 15KB (over 10KB interval)
190      int jmax = (int) Math.ceil(15.0 / compactionThreshold);
191      byte[] pad = new byte[1000]; // 1 KB chunk
192      for (int i = 0; i < compactionThreshold; i++) {
193        Table loader = new RegionAsTable(r);
194        Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i)));
195        p.setDurability(Durability.SKIP_WAL);
196        for (int j = 0; j < jmax; j++) {
197          p.addColumn(COLUMN_FAMILY, Bytes.toBytes(j), pad);
198        }
199        HTestConst.addContent(loader, Bytes.toString(COLUMN_FAMILY));
200        loader.put(p);
201        r.flush(true);
202      }
203
204      HRegion spyR = spy(r);
205      doAnswer(new Answer<Object>() {
206        @Override
207        public Object answer(InvocationOnMock invocation) throws Throwable {
208          r.writestate.writesEnabled = false;
209          return invocation.callRealMethod();
210        }
211      }).when(spyR).doRegionCompactionPrep();
212
213      // force a minor compaction, but not before requesting a stop
214      spyR.compactStores();
215
216      // ensure that the compaction stopped, all old files are intact,
217      HStore s = r.getStore(COLUMN_FAMILY);
218      assertEquals(compactionThreshold, s.getStorefilesCount());
219      assertTrue(s.getStorefilesSize() > 15 * 1000);
220      // and no new store files persisted past compactStores()
221      // only one empty dir exists in temp dir
222      FileStatus[] ls = r.getFilesystem().listStatus(r.getRegionFileSystem().getTempDir());
223      assertEquals(1, ls.length);
224      Path storeTempDir =
225        new Path(r.getRegionFileSystem().getTempDir(), Bytes.toString(COLUMN_FAMILY));
226      assertTrue(r.getFilesystem().exists(storeTempDir));
227      ls = r.getFilesystem().listStatus(storeTempDir);
228      assertEquals(0, ls.length);
229    } finally {
230      // don't mess up future tests
231      r.writestate.writesEnabled = true;
232      conf.setInt(SIZE_LIMIT_KEY, 10 * 1000 * 1000 /* 10 MB */);
233
234      // Delete all Store information once done using
235      for (int i = 0; i < compactionThreshold; i++) {
236        Delete delete = new Delete(Bytes.add(STARTROW, Bytes.toBytes(i)));
237        byte[][] famAndQf = { COLUMN_FAMILY, null };
238        delete.addFamily(famAndQf[0]);
239        r.delete(delete);
240      }
241      r.flush(true);
242
243      // Multiple versions allowed for an entry, so the delete isn't enough
244      // Lower TTL and expire to ensure that all our entries have been wiped
245      final int ttl = 1000;
246      for (HStore store : this.r.stores.values()) {
247        ScanInfo old = store.getScanInfo();
248        ScanInfo si = old.customize(old.getMaxVersions(), ttl, old.getKeepDeletedCells());
249        store.setScanInfo(si);
250      }
251      Thread.sleep(ttl);
252
253      r.compact(true);
254      assertEquals(0, count());
255    }
256  }
257
258  @Test
259  public void testInterruptCompactionByTime() throws Exception {
260    assertEquals(0, count());
261
262    // lower the polling interval for this test
263    conf.setLong(TIME_LIMIT_KEY, 1 /* 1ms */);
264
265    try {
266      // Create a couple store files w/ 15KB (over 10KB interval)
267      int jmax = (int) Math.ceil(15.0 / compactionThreshold);
268      byte[] pad = new byte[1000]; // 1 KB chunk
269      for (int i = 0; i < compactionThreshold; i++) {
270        Table loader = new RegionAsTable(r);
271        Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i)));
272        p.setDurability(Durability.SKIP_WAL);
273        for (int j = 0; j < jmax; j++) {
274          p.addColumn(COLUMN_FAMILY, Bytes.toBytes(j), pad);
275        }
276        HTestConst.addContent(loader, Bytes.toString(COLUMN_FAMILY));
277        loader.put(p);
278        r.flush(true);
279      }
280
281      HRegion spyR = spy(r);
282      doAnswer(new Answer<Object>() {
283        @Override
284        public Object answer(InvocationOnMock invocation) throws Throwable {
285          r.writestate.writesEnabled = false;
286          return invocation.callRealMethod();
287        }
288      }).when(spyR).doRegionCompactionPrep();
289
290      // force a minor compaction, but not before requesting a stop
291      spyR.compactStores();
292
293      // ensure that the compaction stopped, all old files are intact,
294      HStore s = r.getStore(COLUMN_FAMILY);
295      assertEquals(compactionThreshold, s.getStorefilesCount());
296      assertTrue(s.getStorefilesSize() > 15 * 1000);
297      // and no new store files persisted past compactStores()
298      // only one empty dir exists in temp dir
299      FileStatus[] ls = r.getFilesystem().listStatus(r.getRegionFileSystem().getTempDir());
300      assertEquals(1, ls.length);
301      Path storeTempDir =
302        new Path(r.getRegionFileSystem().getTempDir(), Bytes.toString(COLUMN_FAMILY));
303      assertTrue(r.getFilesystem().exists(storeTempDir));
304      ls = r.getFilesystem().listStatus(storeTempDir);
305      assertEquals(0, ls.length);
306    } finally {
307      // don't mess up future tests
308      r.writestate.writesEnabled = true;
309      conf.setLong(TIME_LIMIT_KEY, 10 * 1000L /* 10 s */);
310
311      // Delete all Store information once done using
312      for (int i = 0; i < compactionThreshold; i++) {
313        Delete delete = new Delete(Bytes.add(STARTROW, Bytes.toBytes(i)));
314        byte[][] famAndQf = { COLUMN_FAMILY, null };
315        delete.addFamily(famAndQf[0]);
316        r.delete(delete);
317      }
318      r.flush(true);
319
320      // Multiple versions allowed for an entry, so the delete isn't enough
321      // Lower TTL and expire to ensure that all our entries have been wiped
322      final int ttl = 1000;
323      for (HStore store : this.r.stores.values()) {
324        ScanInfo old = store.getScanInfo();
325        ScanInfo si = old.customize(old.getMaxVersions(), ttl, old.getKeepDeletedCells());
326        store.setScanInfo(si);
327      }
328      Thread.sleep(ttl);
329
330      r.compact(true);
331      assertEquals(0, count());
332    }
333  }
334
335  private int count() throws IOException {
336    int count = 0;
337    for (HStoreFile f : this.r.stores.get(COLUMN_FAMILY_TEXT).getStorefiles()) {
338      f.initReader();
339      try (StoreFileScanner scanner = f.getPreadScanner(false, Long.MAX_VALUE, 0, false)) {
340        scanner.seek(KeyValue.LOWESTKEY);
341        while (scanner.next() != null) {
342          count++;
343        }
344      }
345    }
346    return count;
347  }
348
349  private void createStoreFile(final HRegion region) throws IOException {
350    createStoreFile(region, Bytes.toString(COLUMN_FAMILY));
351  }
352
353  private void createStoreFile(final HRegion region, String family) throws IOException {
354    Table loader = new RegionAsTable(region);
355    HTestConst.addContent(loader, family);
356    region.flush(true);
357  }
358
359  @Test
360  public void testCompactionWithCorruptResult() throws Exception {
361    int nfiles = 10;
362    for (int i = 0; i < nfiles; i++) {
363      createStoreFile(r);
364    }
365    HStore store = r.getStore(COLUMN_FAMILY);
366
367    Collection<HStoreFile> storeFiles = store.getStorefiles();
368    DefaultCompactor tool = (DefaultCompactor) store.storeEngine.getCompactor();
369    CompactionRequestImpl request = new CompactionRequestImpl(storeFiles);
370    tool.compact(request, NoLimitThroughputController.INSTANCE, null);
371
372    // Now lets corrupt the compacted file.
373    FileSystem fs = store.getFileSystem();
374    // default compaction policy created one and only one new compacted file
375    Path tmpPath = store.getRegionFileSystem().createTempName();
376    try (FSDataOutputStream stream = fs.create(tmpPath, null, true, 512, (short) 3, 1024L, null)) {
377      stream.writeChars("CORRUPT FILE!!!!");
378    }
379
380    // The complete compaction should fail and the corrupt file should remain
381    // in the 'tmp' directory;
382    assertThrows(IOException.class, () -> store.doCompaction(null, null, null,
383      EnvironmentEdgeManager.currentTime(), Collections.singletonList(tmpPath)));
384    assertTrue(fs.exists(tmpPath));
385  }
386
387  /**
388   * Generates the HFile used by {@link #testCompactionWithCorruptBlock()}. Run this method to
389   * regenerate the test resource file after changes to the HFile format. The output file must then
390   * be hand-edited to corrupt the first data block (zero out the GZip magic bytes at offset 33)
391   * before being placed into the test resources directory.
392   */
393  @Disabled("Not a test; utility for regenerating testCompactionWithCorruptBlock resource file")
394  @Test
395  public void generateHFileForCorruptBlockTest() throws Exception {
396    createStoreFile(r, Bytes.toString(FAMILY));
397    createStoreFile(r, Bytes.toString(FAMILY));
398    HStore store = r.getStore(FAMILY);
399
400    Collection<HStoreFile> storeFiles = store.getStorefiles();
401    DefaultCompactor tool = (DefaultCompactor) store.storeEngine.getCompactor();
402    CompactionRequestImpl request = new CompactionRequestImpl(storeFiles);
403    List<Path> paths = tool.compact(request, NoLimitThroughputController.INSTANCE, null);
404
405    FileSystem fs = store.getFileSystem();
406    Path hfilePath = paths.get(0);
407    File outFile = new File("/tmp/TestCompaction_HFileWithCorruptBlock.gz");
408    try (InputStream in = fs.open(hfilePath);
409      GZIPOutputStream gzOut = new GZIPOutputStream(new FileOutputStream(outFile))) {
410      IOUtils.copyBytes(in, gzOut, 4096);
411    }
412    LoggerFactory.getLogger(TestCompaction.class)
413      .info("Wrote HFile to {}. Now hex-edit offset 33 (0x21): zero out bytes 1f 8b.", outFile);
414  }
415
416  /**
417   * This test uses a hand-modified HFile, which is loaded in from the resources' path. That file
418   * was generated from {@link #generateHFileForCorruptBlockTest()} and then edited to corrupt the
419   * GZ-encoded block by zeroing-out the first two bytes of the GZip header, the "standard
420   * declaration" of {@code 1f 8b}, found at offset 33 in the file. I'm not sure why, but it seems
421   * that in this test context we do not enforce CRC checksums. Thus, this corruption manifests in
422   * the Decompressor rather than in the reader when it loads the block bytes and compares vs. the
423   * header.
424   */
425  @Test
426  public void testCompactionWithCorruptBlock() throws Exception {
427    createStoreFile(r, Bytes.toString(FAMILY));
428    createStoreFile(r, Bytes.toString(FAMILY));
429    HStore store = r.getStore(FAMILY);
430
431    Collection<HStoreFile> storeFiles = store.getStorefiles();
432    DefaultCompactor tool = (DefaultCompactor) store.storeEngine.getCompactor();
433    CompactionRequestImpl request = new CompactionRequestImpl(storeFiles);
434    tool.compact(request, NoLimitThroughputController.INSTANCE, null);
435
436    // insert the hfile with a corrupted data block into the region's tmp directory, where
437    // compaction output is collected.
438    FileSystem fs = store.getFileSystem();
439    Path tmpPath = store.getRegionFileSystem().createTempName();
440    try (
441      InputStream inputStream =
442        getClass().getResourceAsStream("TestCompaction_HFileWithCorruptBlock.gz");
443      GZIPInputStream gzipInputStream = new GZIPInputStream(Objects.requireNonNull(inputStream));
444      OutputStream outputStream = fs.create(tmpPath, null, true, 512, (short) 3, 1024L, null)) {
445      assertThat(gzipInputStream, notNullValue());
446      assertThat(outputStream, notNullValue());
447      IOUtils.copyBytes(gzipInputStream, outputStream, 512);
448    }
449    LoggerFactory.getLogger(TestCompaction.class).info("Wrote corrupted HFile to {}", tmpPath);
450
451    // The complete compaction should fail and the corrupt file should remain
452    // in the 'tmp' directory;
453    try {
454      store.doCompaction(request, storeFiles, null, EnvironmentEdgeManager.currentTime(),
455        Collections.singletonList(tmpPath));
456    } catch (IOException e) {
457      Throwable rootCause = e;
458      while (rootCause.getCause() != null) {
459        rootCause = rootCause.getCause();
460      }
461      assertThat(rootCause, allOf(instanceOf(IOException.class),
462        hasProperty("message", containsString("not a gzip file"))));
463      assertTrue(fs.exists(tmpPath));
464      return;
465    }
466    fail("Compaction should have failed due to corrupt block");
467  }
468
469  /**
470   * Create a custom compaction request and be sure that we can track it through the queue, knowing
471   * when the compaction is completed.
472   */
473  @Test
474  public void testTrackingCompactionRequest() throws Exception {
475    // setup a compact/split thread on a mock server
476    HRegionServer mockServer = Mockito.mock(HRegionServer.class);
477    Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
478    CompactSplit thread = new CompactSplit(mockServer);
479    Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
480
481    // setup a region/store with some files
482    HStore store = r.getStore(COLUMN_FAMILY);
483    createStoreFile(r);
484    for (int i = 0; i < MAX_FILES_TO_COMPACT + 1; i++) {
485      createStoreFile(r);
486    }
487
488    CountDownLatch latch = new CountDownLatch(1);
489    Tracker tracker = new Tracker(latch);
490    thread.requestCompaction(r, store, "test custom comapction", PRIORITY_USER, tracker, null);
491    // wait for the latch to complete.
492    latch.await();
493
494    thread.interruptIfNecessary();
495  }
496
497  @Test
498  public void testCompactionFailure() throws Exception {
499    // setup a compact/split thread on a mock server
500    HRegionServer mockServer = Mockito.mock(HRegionServer.class);
501    Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
502    CompactSplit thread = new CompactSplit(mockServer);
503    Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
504
505    // setup a region/store with some files
506    HStore store = r.getStore(COLUMN_FAMILY);
507    createStoreFile(r);
508    for (int i = 0; i < HStore.DEFAULT_BLOCKING_STOREFILE_COUNT - 1; i++) {
509      createStoreFile(r);
510    }
511
512    HRegion mockRegion = Mockito.spy(r);
513    Mockito.when(mockRegion.checkSplit())
514      .thenThrow(new RuntimeException("Thrown intentionally by test!"));
515
516    try (MetricsRegionWrapperImpl metricsWrapper = new MetricsRegionWrapperImpl(r)) {
517
518      long preCompletedCount = metricsWrapper.getNumCompactionsCompleted();
519      long preFailedCount = metricsWrapper.getNumCompactionsFailed();
520
521      CountDownLatch latch = new CountDownLatch(1);
522      Tracker tracker = new Tracker(latch);
523      thread.requestCompaction(mockRegion, store, "test custom comapction", PRIORITY_USER, tracker,
524        null);
525      // wait for the latch to complete.
526      latch.await(120, TimeUnit.SECONDS);
527
528      // compaction should have completed and been marked as failed due to error in split request
529      long postCompletedCount = metricsWrapper.getNumCompactionsCompleted();
530      long postFailedCount = metricsWrapper.getNumCompactionsFailed();
531
532      assertTrue(postCompletedCount > preCompletedCount,
533        "Completed count should have increased (pre=" + preCompletedCount + ", post="
534          + postCompletedCount + ")");
535      assertTrue(postFailedCount > preFailedCount, "Failed count should have increased (pre="
536        + preFailedCount + ", post=" + postFailedCount + ")");
537    }
538  }
539
540  /**
541   * Test no new Compaction requests are generated after calling stop compactions
542   */
543  @Test
544  public void testStopStartCompaction() throws IOException {
545    // setup a compact/split thread on a mock server
546    HRegionServer mockServer = Mockito.mock(HRegionServer.class);
547    Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
548    final CompactSplit thread = new CompactSplit(mockServer);
549    Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
550    // setup a region/store with some files
551    HStore store = r.getStore(COLUMN_FAMILY);
552    createStoreFile(r);
553    for (int i = 0; i < HStore.DEFAULT_BLOCKING_STOREFILE_COUNT - 1; i++) {
554      createStoreFile(r);
555    }
556    thread.switchCompaction(false);
557    thread.requestCompaction(r, store, "test", Store.PRIORITY_USER,
558      CompactionLifeCycleTracker.DUMMY, null);
559    assertFalse(thread.isCompactionsEnabled());
560    int longCompactions = thread.getLongCompactions().getActiveCount();
561    int shortCompactions = thread.getShortCompactions().getActiveCount();
562    assertEquals(0, longCompactions + shortCompactions,
563      "longCompactions=" + longCompactions + "," + "shortCompactions=" + shortCompactions);
564    thread.switchCompaction(true);
565    assertTrue(thread.isCompactionsEnabled());
566    // Make sure no compactions have run.
567    assertEquals(0, thread.getLongCompactions().getCompletedTaskCount()
568      + thread.getShortCompactions().getCompletedTaskCount());
569    // Request a compaction and make sure it is submitted successfully.
570    thread.requestCompaction(r, store, "test", Store.PRIORITY_USER,
571      CompactionLifeCycleTracker.DUMMY, null);
572    // Wait until the compaction finishes.
573    Waiter.waitFor(UTIL.getConfiguration(), 5000,
574      (Waiter.Predicate<Exception>) () -> thread.getLongCompactions().getCompletedTaskCount()
575        + thread.getShortCompactions().getCompletedTaskCount() == 1);
576    // Make sure there are no compactions running.
577    assertEquals(0,
578      thread.getLongCompactions().getActiveCount() + thread.getShortCompactions().getActiveCount());
579  }
580
581  @Test
582  public void testInterruptingRunningCompactions() throws Exception {
583    // setup a compact/split thread on a mock server
584    conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
585      WaitThroughPutController.class.getName());
586    HRegionServer mockServer = Mockito.mock(HRegionServer.class);
587    Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
588    CompactSplit thread = new CompactSplit(mockServer);
589
590    Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
591
592    // setup a region/store with some files
593    HStore store = r.getStore(COLUMN_FAMILY);
594    int jmax = (int) Math.ceil(15.0 / compactionThreshold);
595    byte[] pad = new byte[1000]; // 1 KB chunk
596    for (int i = 0; i < compactionThreshold; i++) {
597      Table loader = new RegionAsTable(r);
598      Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i)));
599      p.setDurability(Durability.SKIP_WAL);
600      for (int j = 0; j < jmax; j++) {
601        p.addColumn(COLUMN_FAMILY, Bytes.toBytes(j), pad);
602      }
603      HTestConst.addContent(loader, Bytes.toString(COLUMN_FAMILY));
604      loader.put(p);
605      r.flush(true);
606    }
607    HStore s = r.getStore(COLUMN_FAMILY);
608    int initialFiles = s.getStorefilesCount();
609
610    thread.requestCompaction(r, store, "test custom comapction", PRIORITY_USER,
611      CompactionLifeCycleTracker.DUMMY, null);
612
613    Thread.sleep(3000);
614    thread.switchCompaction(false);
615    assertEquals(initialFiles, s.getStorefilesCount());
616    // don't mess up future tests
617    thread.switchCompaction(true);
618  }
619
620  /**
621   * HBASE-7947: Regression test to ensure adding to the correct list in the {@link CompactSplit}
622   * @throws Exception on failure
623   */
624  @Test
625  public void testMultipleCustomCompactionRequests() throws Exception {
626    // setup a compact/split thread on a mock server
627    HRegionServer mockServer = Mockito.mock(HRegionServer.class);
628    Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
629    CompactSplit thread = new CompactSplit(mockServer);
630    Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
631
632    // setup a region/store with some files
633    int numStores = r.getStores().size();
634    CountDownLatch latch = new CountDownLatch(numStores);
635    Tracker tracker = new Tracker(latch);
636    // create some store files and setup requests for each store on which we want to do a
637    // compaction
638    for (HStore store : r.getStores()) {
639      createStoreFile(r, store.getColumnFamilyName());
640      createStoreFile(r, store.getColumnFamilyName());
641      createStoreFile(r, store.getColumnFamilyName());
642      thread.requestCompaction(r, store, "test mulitple custom comapctions", PRIORITY_USER, tracker,
643        null);
644    }
645    // wait for the latch to complete.
646    latch.await();
647
648    thread.interruptIfNecessary();
649  }
650
651  class StoreMockMaker extends StatefulStoreMockMaker {
652    public ArrayList<HStoreFile> compacting = new ArrayList<>();
653    public ArrayList<HStoreFile> notCompacting = new ArrayList<>();
654    private final ArrayList<Integer> results;
655
656    public StoreMockMaker(ArrayList<Integer> results) {
657      this.results = results;
658    }
659
660    public class TestCompactionContext extends CompactionContext {
661
662      private List<HStoreFile> selectedFiles;
663
664      public TestCompactionContext(List<HStoreFile> selectedFiles) {
665        super();
666        this.selectedFiles = selectedFiles;
667      }
668
669      @Override
670      public List<HStoreFile> preSelect(List<HStoreFile> filesCompacting) {
671        return new ArrayList<>();
672      }
673
674      @Override
675      public boolean select(List<HStoreFile> filesCompacting, boolean isUserCompaction,
676        boolean mayUseOffPeak, boolean forceMajor) throws IOException {
677        this.request = new CompactionRequestImpl(selectedFiles);
678        this.request.setPriority(getPriority());
679        return true;
680      }
681
682      @Override
683      public List<Path> compact(ThroughputController throughputController, User user)
684        throws IOException {
685        finishCompaction(this.selectedFiles);
686        return new ArrayList<>();
687      }
688    }
689
690    @Override
691    public synchronized Optional<CompactionContext> selectCompaction() {
692      CompactionContext ctx = new TestCompactionContext(new ArrayList<>(notCompacting));
693      compacting.addAll(notCompacting);
694      notCompacting.clear();
695      try {
696        ctx.select(null, false, false, false);
697      } catch (IOException ex) {
698        fail("Shouldn't happen");
699      }
700      return Optional.of(ctx);
701    }
702
703    @Override
704    public synchronized void cancelCompaction(Object object) {
705      TestCompactionContext ctx = (TestCompactionContext) object;
706      compacting.removeAll(ctx.selectedFiles);
707      notCompacting.addAll(ctx.selectedFiles);
708    }
709
710    public synchronized void finishCompaction(List<HStoreFile> sfs) {
711      if (sfs.isEmpty()) return;
712      synchronized (results) {
713        results.add(sfs.size());
714      }
715      compacting.removeAll(sfs);
716    }
717
718    @Override
719    public int getPriority() {
720      return 7 - compacting.size() - notCompacting.size();
721    }
722  }
723
724  public class BlockingStoreMockMaker extends StatefulStoreMockMaker {
725    BlockingCompactionContext blocked = null;
726
727    public class BlockingCompactionContext extends CompactionContext {
728      public volatile boolean isInCompact = false;
729
730      public void unblock() {
731        synchronized (this) {
732          this.notifyAll();
733        }
734      }
735
736      @Override
737      public List<Path> compact(ThroughputController throughputController, User user)
738        throws IOException {
739        try {
740          isInCompact = true;
741          synchronized (this) {
742            this.wait();
743          }
744        } catch (InterruptedException e) {
745          Assumptions.abort(e.getMessage());
746        }
747        return new ArrayList<>();
748      }
749
750      @Override
751      public List<HStoreFile> preSelect(List<HStoreFile> filesCompacting) {
752        return new ArrayList<>();
753      }
754
755      @Override
756      public boolean select(List<HStoreFile> f, boolean i, boolean m, boolean e)
757        throws IOException {
758        this.request = new CompactionRequestImpl(new ArrayList<>());
759        return true;
760      }
761    }
762
763    @Override
764    public Optional<CompactionContext> selectCompaction() {
765      this.blocked = new BlockingCompactionContext();
766      try {
767        this.blocked.select(null, false, false, false);
768      } catch (IOException ex) {
769        fail("Shouldn't happen");
770      }
771      return Optional.of(blocked);
772    }
773
774    @Override
775    public void cancelCompaction(Object object) {
776    }
777
778    @Override
779    public int getPriority() {
780      return Integer.MIN_VALUE; // some invalid value, see createStoreMock
781    }
782
783    public BlockingCompactionContext waitForBlocking() {
784      while (this.blocked == null || !this.blocked.isInCompact) {
785        Threads.sleepWithoutInterrupt(50);
786      }
787      BlockingCompactionContext ctx = this.blocked;
788      this.blocked = null;
789      return ctx;
790    }
791
792    @Override
793    public HStore createStoreMock(String name) throws Exception {
794      return createStoreMock(Integer.MIN_VALUE, name);
795    }
796
797    public HStore createStoreMock(int priority, String name) throws Exception {
798      // Override the mock to always return the specified priority.
799      HStore s = super.createStoreMock(name);
800      when(s.getCompactPriority()).thenReturn(priority);
801      return s;
802    }
803  }
804
805  /** Test compaction priority management and multiple compactions per store (HBASE-8665). */
806  @Test
807  public void testCompactionQueuePriorities() throws Exception {
808    // Setup a compact/split thread on a mock server.
809    final Configuration conf = HBaseConfiguration.create();
810    HRegionServer mockServer = mock(HRegionServer.class);
811    when(mockServer.isStopped()).thenReturn(false);
812    when(mockServer.getConfiguration()).thenReturn(conf);
813    when(mockServer.getChoreService()).thenReturn(new ChoreService("test"));
814    CompactSplit cst = new CompactSplit(mockServer);
815    when(mockServer.getCompactSplitThread()).thenReturn(cst);
816    // prevent large compaction thread pool stealing job from small compaction queue.
817    cst.shutdownLongCompactions();
818    // Set up the region mock that redirects compactions.
819    HRegion r = mock(HRegion.class);
820    when(r.compact(any(), any(), any(), any())).then(new Answer<Boolean>() {
821      @Override
822      public Boolean answer(InvocationOnMock invocation) throws Throwable {
823        invocation.<CompactionContext> getArgument(0).compact(invocation.getArgument(2), null);
824        return true;
825      }
826    });
827
828    // Set up store mocks for 2 "real" stores and the one we use for blocking CST.
829    ArrayList<Integer> results = new ArrayList<>();
830    StoreMockMaker sm = new StoreMockMaker(results), sm2 = new StoreMockMaker(results);
831    HStore store = sm.createStoreMock("store1");
832    HStore store2 = sm2.createStoreMock("store2");
833    BlockingStoreMockMaker blocker = new BlockingStoreMockMaker();
834
835    // First, block the compaction thread so that we could muck with queue.
836    cst.requestSystemCompaction(r, blocker.createStoreMock(1, "b-pri1"), "b-pri1");
837    BlockingStoreMockMaker.BlockingCompactionContext currentBlock = blocker.waitForBlocking();
838
839    // Add 4 files to store1, 3 to store2, and queue compactions; pri 3 and 4 respectively.
840    for (int i = 0; i < 4; ++i) {
841      sm.notCompacting.add(createFile());
842    }
843    cst.requestSystemCompaction(r, store, "s1-pri3");
844    for (int i = 0; i < 3; ++i) {
845      sm2.notCompacting.add(createFile());
846    }
847    cst.requestSystemCompaction(r, store2, "s2-pri4");
848    // Now add 2 more files to store1 and queue compaction - pri 1.
849    for (int i = 0; i < 2; ++i) {
850      sm.notCompacting.add(createFile());
851    }
852    cst.requestSystemCompaction(r, store, "s1-pri1");
853    // Finally add blocking compaction with priority 2.
854    cst.requestSystemCompaction(r, blocker.createStoreMock(2, "b-pri2"), "b-pri2");
855
856    // Unblock the blocking compaction; we should run pri1 and become block again in pri2.
857    currentBlock.unblock();
858    currentBlock = blocker.waitForBlocking();
859    // Pri1 should have "compacted" all 6 files.
860    assertEquals(1, results.size());
861    assertEquals(6, results.get(0).intValue());
862    // Add 2 files to store 1 (it has 2 files now).
863    for (int i = 0; i < 2; ++i) {
864      sm.notCompacting.add(createFile());
865    }
866    // Now we have pri4 for store 2 in queue, and pri3 for store1; store1's current priority
867    // is 5, however, so it must not preempt store 2. Add blocking compaction at the end.
868    cst.requestSystemCompaction(r, blocker.createStoreMock(7, "b-pri7"), "b-pri7");
869    currentBlock.unblock();
870    currentBlock = blocker.waitForBlocking();
871    assertEquals(3, results.size());
872    assertEquals(3, results.get(1).intValue()); // 3 files should go before 2 files.
873    assertEquals(2, results.get(2).intValue());
874
875    currentBlock.unblock();
876    cst.interruptIfNecessary();
877  }
878
879  /**
880   * Firstly write 10 cells (with different time stamp) to a qualifier and flush to hfile1, then
881   * write 10 cells (with different time stamp) to the same qualifier and flush to hfile2. The
882   * latest cell (cell-A) in hfile1 and the oldest cell (cell-B) in hfile2 are with the same time
883   * stamp but different sequence id, and will get scanned successively during compaction.
884   * <p/>
885   * We set compaction.kv.max to 10 so compaction will scan 10 versions each round, meanwhile we set
886   * keepSeqIdPeriod=0 in {@link DummyCompactor} so all 10 versions of hfile2 will be written out
887   * with seqId cleaned (set to 0) including cell-B, then when scanner goes to cell-A it will cause
888   * a scan out-of-order assertion error before HBASE-16931 if error occurs during the test
889   */
890  @Test
891  public void testCompactionSeqId() throws Exception {
892    final byte[] ROW = Bytes.toBytes("row");
893    final byte[] QUALIFIER = Bytes.toBytes("qualifier");
894
895    long timestamp = 10000;
896
897    // row1/cf:a/10009/Put/vlen=2/seqid=11 V: v9
898    // row1/cf:a/10008/Put/vlen=2/seqid=10 V: v8
899    // row1/cf:a/10007/Put/vlen=2/seqid=9 V: v7
900    // row1/cf:a/10006/Put/vlen=2/seqid=8 V: v6
901    // row1/cf:a/10005/Put/vlen=2/seqid=7 V: v5
902    // row1/cf:a/10004/Put/vlen=2/seqid=6 V: v4
903    // row1/cf:a/10003/Put/vlen=2/seqid=5 V: v3
904    // row1/cf:a/10002/Put/vlen=2/seqid=4 V: v2
905    // row1/cf:a/10001/Put/vlen=2/seqid=3 V: v1
906    // row1/cf:a/10000/Put/vlen=2/seqid=2 V: v0
907    for (int i = 0; i < 10; i++) {
908      Put put = new Put(ROW);
909      put.addColumn(FAMILY, QUALIFIER, timestamp + i, Bytes.toBytes("v" + i));
910      r.put(put);
911    }
912    r.flush(true);
913
914    // row1/cf:a/10018/Put/vlen=3/seqid=16 V: v18
915    // row1/cf:a/10017/Put/vlen=3/seqid=17 V: v17
916    // row1/cf:a/10016/Put/vlen=3/seqid=18 V: v16
917    // row1/cf:a/10015/Put/vlen=3/seqid=19 V: v15
918    // row1/cf:a/10014/Put/vlen=3/seqid=20 V: v14
919    // row1/cf:a/10013/Put/vlen=3/seqid=21 V: v13
920    // row1/cf:a/10012/Put/vlen=3/seqid=22 V: v12
921    // row1/cf:a/10011/Put/vlen=3/seqid=23 V: v11
922    // row1/cf:a/10010/Put/vlen=3/seqid=24 V: v10
923    // row1/cf:a/10009/Put/vlen=2/seqid=25 V: v9
924    for (int i = 18; i > 8; i--) {
925      Put put = new Put(ROW);
926      put.addColumn(FAMILY, QUALIFIER, timestamp + i, Bytes.toBytes("v" + i));
927      r.put(put);
928    }
929    r.flush(true);
930    r.compact(true);
931  }
932
933  public static class DummyCompactor extends DefaultCompactor {
934    public DummyCompactor(Configuration conf, HStore store) {
935      super(conf, store);
936      this.keepSeqIdPeriod = 0;
937    }
938  }
939
940  private static HStoreFile createFile() throws Exception {
941    HStoreFile sf = mock(HStoreFile.class);
942    when(sf.getPath()).thenReturn(new Path("file"));
943    StoreFileReader r = mock(StoreFileReader.class);
944    when(r.length()).thenReturn(10L);
945    when(sf.getReader()).thenReturn(r);
946    return sf;
947  }
948
949  /**
950   * Simple {@link CompactionLifeCycleTracker} on which you can wait until the requested compaction
951   * finishes.
952   */
953  public static class Tracker implements CompactionLifeCycleTracker {
954
955    private final CountDownLatch done;
956
957    public Tracker(CountDownLatch done) {
958      this.done = done;
959    }
960
961    @Override
962    public void afterExecution(Store store) {
963      done.countDown();
964    }
965  }
966
967  /**
968   * Simple {@link CompactionLifeCycleTracker} on which you can wait until the requested compaction
969   * finishes.
970   */
971  public static class WaitThroughPutController extends NoLimitThroughputController {
972
973    public WaitThroughPutController() {
974    }
975
976    @Override
977    public long control(String compactionName, long size) throws InterruptedException {
978      Thread.sleep(6000000);
979      return 6000000;
980    }
981  }
982}