001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY;
021import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY_BYTES;
022import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
023import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER;
024import static org.apache.hadoop.hbase.regionserver.compactions.CloseChecker.SIZE_LIMIT_KEY;
025import static org.apache.hadoop.hbase.regionserver.compactions.CloseChecker.TIME_LIMIT_KEY;
026import static org.junit.Assert.assertEquals;
027import static org.junit.Assert.assertFalse;
028import static org.junit.Assert.assertThrows;
029import static org.junit.Assert.assertTrue;
030import static org.junit.Assert.fail;
031import static org.mockito.ArgumentMatchers.any;
032import static org.mockito.Mockito.doAnswer;
033import static org.mockito.Mockito.mock;
034import static org.mockito.Mockito.spy;
035import static org.mockito.Mockito.when;
036
037import java.io.IOException;
038import java.util.ArrayList;
039import java.util.Collection;
040import java.util.Collections;
041import java.util.List;
042import java.util.Optional;
043import java.util.concurrent.CountDownLatch;
044import java.util.concurrent.TimeUnit;
045import org.apache.hadoop.conf.Configuration;
046import org.apache.hadoop.fs.FSDataOutputStream;
047import org.apache.hadoop.fs.FileStatus;
048import org.apache.hadoop.fs.FileSystem;
049import org.apache.hadoop.fs.Path;
050import org.apache.hadoop.hbase.ChoreService;
051import org.apache.hadoop.hbase.HBaseClassTestRule;
052import org.apache.hadoop.hbase.HBaseConfiguration;
053import org.apache.hadoop.hbase.HBaseTestingUtility;
054import org.apache.hadoop.hbase.HColumnDescriptor;
055import org.apache.hadoop.hbase.HConstants;
056import org.apache.hadoop.hbase.HTableDescriptor;
057import org.apache.hadoop.hbase.HTestConst;
058import org.apache.hadoop.hbase.Waiter;
059import org.apache.hadoop.hbase.client.Delete;
060import org.apache.hadoop.hbase.client.Durability;
061import org.apache.hadoop.hbase.client.Put;
062import org.apache.hadoop.hbase.client.Table;
063import org.apache.hadoop.hbase.io.hfile.HFileScanner;
064import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
065import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
066import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
067import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
068import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
069import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
070import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
071import org.apache.hadoop.hbase.security.User;
072import org.apache.hadoop.hbase.testclassification.MediumTests;
073import org.apache.hadoop.hbase.testclassification.RegionServerTests;
074import org.apache.hadoop.hbase.util.Bytes;
075import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
076import org.apache.hadoop.hbase.util.Threads;
077import org.apache.hadoop.hbase.wal.WAL;
078import org.junit.After;
079import org.junit.Assume;
080import org.junit.Before;
081import org.junit.ClassRule;
082import org.junit.Rule;
083import org.junit.Test;
084import org.junit.experimental.categories.Category;
085import org.junit.rules.TestName;
086import org.mockito.Mockito;
087import org.mockito.invocation.InvocationOnMock;
088import org.mockito.stubbing.Answer;
089
090/**
091 * Test compaction framework and common functions
092 */
093@Category({ RegionServerTests.class, MediumTests.class })
094public class TestCompaction {
095
096  @ClassRule
097  public static final HBaseClassTestRule CLASS_RULE =
098    HBaseClassTestRule.forClass(TestCompaction.class);
099
100  @Rule
101  public TestName name = new TestName();
102  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
103  protected Configuration conf = UTIL.getConfiguration();
104
105  private HRegion r = null;
106  private HTableDescriptor htd = null;
107  private static final byte[] COLUMN_FAMILY = fam1;
108  private final byte[] STARTROW = Bytes.toBytes(START_KEY);
109  private static final byte[] COLUMN_FAMILY_TEXT = COLUMN_FAMILY;
110  private int compactionThreshold;
111  private byte[] secondRowBytes, thirdRowBytes;
112  private static final long MAX_FILES_TO_COMPACT = 10;
113  private final byte[] FAMILY = Bytes.toBytes("cf");
114
115  /** constructor */
116  public TestCompaction() {
117    super();
118
119    // Set cache flush size to 1MB
120    conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024);
121    conf.setInt(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, 100);
122    conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
123      NoLimitThroughputController.class.getName());
124    compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3);
125
126    secondRowBytes = START_KEY_BYTES.clone();
127    // Increment the least significant character so we get to next row.
128    secondRowBytes[START_KEY_BYTES.length - 1]++;
129    thirdRowBytes = START_KEY_BYTES.clone();
130    thirdRowBytes[START_KEY_BYTES.length - 1] =
131      (byte) (thirdRowBytes[START_KEY_BYTES.length - 1] + 2);
132  }
133
134  @Before
135  public void setUp() throws Exception {
136    this.htd = UTIL.createTableDescriptor(name.getMethodName());
137    if (name.getMethodName().equals("testCompactionSeqId")) {
138      UTIL.getConfiguration().set("hbase.hstore.compaction.kv.max", "10");
139      UTIL.getConfiguration().set(DefaultStoreEngine.DEFAULT_COMPACTOR_CLASS_KEY,
140        DummyCompactor.class.getName());
141      HColumnDescriptor hcd = new HColumnDescriptor(FAMILY);
142      hcd.setMaxVersions(65536);
143      this.htd.addFamily(hcd);
144    }
145    this.r = UTIL.createLocalHRegion(htd, null, null);
146  }
147
148  @After
149  public void tearDown() throws Exception {
150    WAL wal = r.getWAL();
151    this.r.close();
152    wal.close();
153  }
154
155  /**
156   * Verify that you can stop a long-running compaction (used during RS shutdown)
157   */
158  @Test
159  public void testInterruptCompactionBySize() throws Exception {
160    assertEquals(0, count());
161
162    // lower the polling interval for this test
163    conf.setInt(SIZE_LIMIT_KEY, 10 * 1000 /* 10 KB */);
164
165    try {
166      // Create a couple store files w/ 15KB (over 10KB interval)
167      int jmax = (int) Math.ceil(15.0 / compactionThreshold);
168      byte[] pad = new byte[1000]; // 1 KB chunk
169      for (int i = 0; i < compactionThreshold; i++) {
170        Table loader = new RegionAsTable(r);
171        Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i)));
172        p.setDurability(Durability.SKIP_WAL);
173        for (int j = 0; j < jmax; j++) {
174          p.addColumn(COLUMN_FAMILY, Bytes.toBytes(j), pad);
175        }
176        HTestConst.addContent(loader, Bytes.toString(COLUMN_FAMILY));
177        loader.put(p);
178        r.flush(true);
179      }
180
181      HRegion spyR = spy(r);
182      doAnswer(new Answer<Object>() {
183        @Override
184        public Object answer(InvocationOnMock invocation) throws Throwable {
185          r.writestate.writesEnabled = false;
186          return invocation.callRealMethod();
187        }
188      }).when(spyR).doRegionCompactionPrep();
189
190      // force a minor compaction, but not before requesting a stop
191      spyR.compactStores();
192
193      // ensure that the compaction stopped, all old files are intact,
194      HStore s = r.getStore(COLUMN_FAMILY);
195      assertEquals(compactionThreshold, s.getStorefilesCount());
196      assertTrue(s.getStorefilesSize() > 15 * 1000);
197      // and no new store files persisted past compactStores()
198      // only one empty dir exists in temp dir
199      FileStatus[] ls = r.getFilesystem().listStatus(r.getRegionFileSystem().getTempDir());
200      assertEquals(1, ls.length);
201      Path storeTempDir =
202        new Path(r.getRegionFileSystem().getTempDir(), Bytes.toString(COLUMN_FAMILY));
203      assertTrue(r.getFilesystem().exists(storeTempDir));
204      ls = r.getFilesystem().listStatus(storeTempDir);
205      assertEquals(0, ls.length);
206    } finally {
207      // don't mess up future tests
208      r.writestate.writesEnabled = true;
209      conf.setInt(SIZE_LIMIT_KEY, 10 * 1000 * 1000 /* 10 MB */);
210
211      // Delete all Store information once done using
212      for (int i = 0; i < compactionThreshold; i++) {
213        Delete delete = new Delete(Bytes.add(STARTROW, Bytes.toBytes(i)));
214        byte[][] famAndQf = { COLUMN_FAMILY, null };
215        delete.addFamily(famAndQf[0]);
216        r.delete(delete);
217      }
218      r.flush(true);
219
220      // Multiple versions allowed for an entry, so the delete isn't enough
221      // Lower TTL and expire to ensure that all our entries have been wiped
222      final int ttl = 1000;
223      for (HStore store : this.r.stores.values()) {
224        ScanInfo old = store.getScanInfo();
225        ScanInfo si = old.customize(old.getMaxVersions(), ttl, old.getKeepDeletedCells());
226        store.setScanInfo(si);
227      }
228      Thread.sleep(ttl);
229
230      r.compact(true);
231      assertEquals(0, count());
232    }
233  }
234
235  @Test
236  public void testInterruptCompactionByTime() throws Exception {
237    assertEquals(0, count());
238
239    // lower the polling interval for this test
240    conf.setLong(TIME_LIMIT_KEY, 1 /* 1ms */);
241
242    try {
243      // Create a couple store files w/ 15KB (over 10KB interval)
244      int jmax = (int) Math.ceil(15.0 / compactionThreshold);
245      byte[] pad = new byte[1000]; // 1 KB chunk
246      for (int i = 0; i < compactionThreshold; i++) {
247        Table loader = new RegionAsTable(r);
248        Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i)));
249        p.setDurability(Durability.SKIP_WAL);
250        for (int j = 0; j < jmax; j++) {
251          p.addColumn(COLUMN_FAMILY, Bytes.toBytes(j), pad);
252        }
253        HTestConst.addContent(loader, Bytes.toString(COLUMN_FAMILY));
254        loader.put(p);
255        r.flush(true);
256      }
257
258      HRegion spyR = spy(r);
259      doAnswer(new Answer() {
260        @Override
261        public Object answer(InvocationOnMock invocation) throws Throwable {
262          r.writestate.writesEnabled = false;
263          return invocation.callRealMethod();
264        }
265      }).when(spyR).doRegionCompactionPrep();
266
267      // force a minor compaction, but not before requesting a stop
268      spyR.compactStores();
269
270      // ensure that the compaction stopped, all old files are intact,
271      HStore s = r.getStore(COLUMN_FAMILY);
272      assertEquals(compactionThreshold, s.getStorefilesCount());
273      assertTrue(s.getStorefilesSize() > 15 * 1000);
274      // and no new store files persisted past compactStores()
275      // only one empty dir exists in temp dir
276      FileStatus[] ls = r.getFilesystem().listStatus(r.getRegionFileSystem().getTempDir());
277      assertEquals(1, ls.length);
278      Path storeTempDir =
279        new Path(r.getRegionFileSystem().getTempDir(), Bytes.toString(COLUMN_FAMILY));
280      assertTrue(r.getFilesystem().exists(storeTempDir));
281      ls = r.getFilesystem().listStatus(storeTempDir);
282      assertEquals(0, ls.length);
283    } finally {
284      // don't mess up future tests
285      r.writestate.writesEnabled = true;
286      conf.setLong(TIME_LIMIT_KEY, 10 * 1000L /* 10 s */);
287
288      // Delete all Store information once done using
289      for (int i = 0; i < compactionThreshold; i++) {
290        Delete delete = new Delete(Bytes.add(STARTROW, Bytes.toBytes(i)));
291        byte[][] famAndQf = { COLUMN_FAMILY, null };
292        delete.addFamily(famAndQf[0]);
293        r.delete(delete);
294      }
295      r.flush(true);
296
297      // Multiple versions allowed for an entry, so the delete isn't enough
298      // Lower TTL and expire to ensure that all our entries have been wiped
299      final int ttl = 1000;
300      for (HStore store : this.r.stores.values()) {
301        ScanInfo old = store.getScanInfo();
302        ScanInfo si = old.customize(old.getMaxVersions(), ttl, old.getKeepDeletedCells());
303        store.setScanInfo(si);
304      }
305      Thread.sleep(ttl);
306
307      r.compact(true);
308      assertEquals(0, count());
309    }
310  }
311
312  private int count() throws IOException {
313    int count = 0;
314    for (HStoreFile f : this.r.stores.get(COLUMN_FAMILY_TEXT).getStorefiles()) {
315      HFileScanner scanner = f.getReader().getScanner(false, false);
316      if (!scanner.seekTo()) {
317        continue;
318      }
319      do {
320        count++;
321      } while (scanner.next());
322    }
323    return count;
324  }
325
326  private void createStoreFile(final HRegion region) throws IOException {
327    createStoreFile(region, Bytes.toString(COLUMN_FAMILY));
328  }
329
330  private void createStoreFile(final HRegion region, String family) throws IOException {
331    Table loader = new RegionAsTable(region);
332    HTestConst.addContent(loader, family);
333    region.flush(true);
334  }
335
336  @Test
337  public void testCompactionWithCorruptResult() throws Exception {
338    int nfiles = 10;
339    for (int i = 0; i < nfiles; i++) {
340      createStoreFile(r);
341    }
342    HStore store = r.getStore(COLUMN_FAMILY);
343
344    Collection<HStoreFile> storeFiles = store.getStorefiles();
345    DefaultCompactor tool = (DefaultCompactor) store.storeEngine.getCompactor();
346    CompactionRequestImpl request = new CompactionRequestImpl(storeFiles);
347    tool.compact(request, NoLimitThroughputController.INSTANCE, null);
348
349    // Now lets corrupt the compacted file.
350    FileSystem fs = store.getFileSystem();
351    // default compaction policy created one and only one new compacted file
352    Path tmpPath = store.getRegionFileSystem().createTempName();
353    try (FSDataOutputStream stream = fs.create(tmpPath, null, true, 512, (short) 3, 1024L, null)) {
354      stream.writeChars("CORRUPT FILE!!!!");
355    }
356    // The complete compaction should fail and the corrupt file should remain
357    // in the 'tmp' directory;
358    assertThrows(IOException.class, () -> store.doCompaction(null, null, null,
359      EnvironmentEdgeManager.currentTime(), Collections.singletonList(tmpPath)));
360    assertTrue(fs.exists(tmpPath));
361  }
362
363  /**
364   * Create a custom compaction request and be sure that we can track it through the queue, knowing
365   * when the compaction is completed.
366   */
367  @Test
368  public void testTrackingCompactionRequest() throws Exception {
369    // setup a compact/split thread on a mock server
370    HRegionServer mockServer = Mockito.mock(HRegionServer.class);
371    Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
372    CompactSplit thread = new CompactSplit(mockServer);
373    Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
374
375    // setup a region/store with some files
376    HStore store = r.getStore(COLUMN_FAMILY);
377    createStoreFile(r);
378    for (int i = 0; i < MAX_FILES_TO_COMPACT + 1; i++) {
379      createStoreFile(r);
380    }
381
382    CountDownLatch latch = new CountDownLatch(1);
383    Tracker tracker = new Tracker(latch);
384    thread.requestCompaction(r, store, "test custom comapction", PRIORITY_USER, tracker, null);
385    // wait for the latch to complete.
386    latch.await();
387
388    thread.interruptIfNecessary();
389  }
390
391  @Test
392  public void testCompactionFailure() throws Exception {
393    // setup a compact/split thread on a mock server
394    HRegionServer mockServer = Mockito.mock(HRegionServer.class);
395    Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
396    CompactSplit thread = new CompactSplit(mockServer);
397    Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
398
399    // setup a region/store with some files
400    HStore store = r.getStore(COLUMN_FAMILY);
401    createStoreFile(r);
402    for (int i = 0; i < HStore.DEFAULT_BLOCKING_STOREFILE_COUNT - 1; i++) {
403      createStoreFile(r);
404    }
405
406    HRegion mockRegion = Mockito.spy(r);
407    Mockito.when(mockRegion.checkSplit())
408      .thenThrow(new RuntimeException("Thrown intentionally by test!"));
409
410    try (MetricsRegionWrapperImpl metricsWrapper = new MetricsRegionWrapperImpl(r)) {
411
412      long preCompletedCount = metricsWrapper.getNumCompactionsCompleted();
413      long preFailedCount = metricsWrapper.getNumCompactionsFailed();
414
415      CountDownLatch latch = new CountDownLatch(1);
416      Tracker tracker = new Tracker(latch);
417      thread.requestCompaction(mockRegion, store, "test custom comapction", PRIORITY_USER, tracker,
418        null);
419      // wait for the latch to complete.
420      latch.await(120, TimeUnit.SECONDS);
421
422      // compaction should have completed and been marked as failed due to error in split request
423      long postCompletedCount = metricsWrapper.getNumCompactionsCompleted();
424      long postFailedCount = metricsWrapper.getNumCompactionsFailed();
425
426      assertTrue("Completed count should have increased (pre=" + preCompletedCount + ", post="
427        + postCompletedCount + ")", postCompletedCount > preCompletedCount);
428      assertTrue("Failed count should have increased (pre=" + preFailedCount + ", post="
429        + postFailedCount + ")", postFailedCount > preFailedCount);
430    }
431  }
432
433  /**
434   * Test no new Compaction requests are generated after calling stop compactions
435   */
436  @Test
437  public void testStopStartCompaction() throws IOException {
438    // setup a compact/split thread on a mock server
439    HRegionServer mockServer = Mockito.mock(HRegionServer.class);
440    Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
441    final CompactSplit thread = new CompactSplit(mockServer);
442    Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
443    // setup a region/store with some files
444    HStore store = r.getStore(COLUMN_FAMILY);
445    createStoreFile(r);
446    for (int i = 0; i < HStore.DEFAULT_BLOCKING_STOREFILE_COUNT - 1; i++) {
447      createStoreFile(r);
448    }
449    thread.switchCompaction(false);
450    thread.requestCompaction(r, store, "test", Store.PRIORITY_USER,
451      CompactionLifeCycleTracker.DUMMY, null);
452    assertFalse(thread.isCompactionsEnabled());
453    int longCompactions = thread.getLongCompactions().getActiveCount();
454    int shortCompactions = thread.getShortCompactions().getActiveCount();
455    assertEquals(
456      "longCompactions=" + longCompactions + "," + "shortCompactions=" + shortCompactions, 0,
457      longCompactions + shortCompactions);
458    thread.switchCompaction(true);
459    assertTrue(thread.isCompactionsEnabled());
460    // Make sure no compactions have run.
461    assertEquals(0, thread.getLongCompactions().getCompletedTaskCount()
462      + thread.getShortCompactions().getCompletedTaskCount());
463    // Request a compaction and make sure it is submitted successfully.
464    thread.requestCompaction(r, store, "test", Store.PRIORITY_USER,
465      CompactionLifeCycleTracker.DUMMY, null);
466    // Wait until the compaction finishes.
467    Waiter.waitFor(UTIL.getConfiguration(), 5000,
468      (Waiter.Predicate<Exception>) () -> thread.getLongCompactions().getCompletedTaskCount()
469        + thread.getShortCompactions().getCompletedTaskCount() == 1);
470    // Make sure there are no compactions running.
471    assertEquals(0,
472      thread.getLongCompactions().getActiveCount() + thread.getShortCompactions().getActiveCount());
473  }
474
475  @Test
476  public void testInterruptingRunningCompactions() throws Exception {
477    // setup a compact/split thread on a mock server
478    conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
479      WaitThroughPutController.class.getName());
480    HRegionServer mockServer = Mockito.mock(HRegionServer.class);
481    Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
482    CompactSplit thread = new CompactSplit(mockServer);
483
484    Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
485
486    // setup a region/store with some files
487    HStore store = r.getStore(COLUMN_FAMILY);
488    int jmax = (int) Math.ceil(15.0 / compactionThreshold);
489    byte[] pad = new byte[1000]; // 1 KB chunk
490    for (int i = 0; i < compactionThreshold; i++) {
491      Table loader = new RegionAsTable(r);
492      Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i)));
493      p.setDurability(Durability.SKIP_WAL);
494      for (int j = 0; j < jmax; j++) {
495        p.addColumn(COLUMN_FAMILY, Bytes.toBytes(j), pad);
496      }
497      HTestConst.addContent(loader, Bytes.toString(COLUMN_FAMILY));
498      loader.put(p);
499      r.flush(true);
500    }
501    HStore s = r.getStore(COLUMN_FAMILY);
502    int initialFiles = s.getStorefilesCount();
503
504    thread.requestCompaction(r, store, "test custom comapction", PRIORITY_USER,
505      CompactionLifeCycleTracker.DUMMY, null);
506
507    Thread.sleep(3000);
508    thread.switchCompaction(false);
509    assertEquals(initialFiles, s.getStorefilesCount());
510    // don't mess up future tests
511    thread.switchCompaction(true);
512  }
513
514  /**
515   * HBASE-7947: Regression test to ensure adding to the correct list in the {@link CompactSplit}
516   * @throws Exception on failure
517   */
518  @Test
519  public void testMultipleCustomCompactionRequests() throws Exception {
520    // setup a compact/split thread on a mock server
521    HRegionServer mockServer = Mockito.mock(HRegionServer.class);
522    Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
523    CompactSplit thread = new CompactSplit(mockServer);
524    Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
525
526    // setup a region/store with some files
527    int numStores = r.getStores().size();
528    CountDownLatch latch = new CountDownLatch(numStores);
529    Tracker tracker = new Tracker(latch);
530    // create some store files and setup requests for each store on which we want to do a
531    // compaction
532    for (HStore store : r.getStores()) {
533      createStoreFile(r, store.getColumnFamilyName());
534      createStoreFile(r, store.getColumnFamilyName());
535      createStoreFile(r, store.getColumnFamilyName());
536      thread.requestCompaction(r, store, "test mulitple custom comapctions", PRIORITY_USER, tracker,
537        null);
538    }
539    // wait for the latch to complete.
540    latch.await();
541
542    thread.interruptIfNecessary();
543  }
544
545  class StoreMockMaker extends StatefulStoreMockMaker {
546    public ArrayList<HStoreFile> compacting = new ArrayList<>();
547    public ArrayList<HStoreFile> notCompacting = new ArrayList<>();
548    private final ArrayList<Integer> results;
549
550    public StoreMockMaker(ArrayList<Integer> results) {
551      this.results = results;
552    }
553
554    public class TestCompactionContext extends CompactionContext {
555
556      private List<HStoreFile> selectedFiles;
557
558      public TestCompactionContext(List<HStoreFile> selectedFiles) {
559        super();
560        this.selectedFiles = selectedFiles;
561      }
562
563      @Override
564      public List<HStoreFile> preSelect(List<HStoreFile> filesCompacting) {
565        return new ArrayList<>();
566      }
567
568      @Override
569      public boolean select(List<HStoreFile> filesCompacting, boolean isUserCompaction,
570        boolean mayUseOffPeak, boolean forceMajor) throws IOException {
571        this.request = new CompactionRequestImpl(selectedFiles);
572        this.request.setPriority(getPriority());
573        return true;
574      }
575
576      @Override
577      public List<Path> compact(ThroughputController throughputController, User user)
578        throws IOException {
579        finishCompaction(this.selectedFiles);
580        return new ArrayList<>();
581      }
582    }
583
584    @Override
585    public synchronized Optional<CompactionContext> selectCompaction() {
586      CompactionContext ctx = new TestCompactionContext(new ArrayList<>(notCompacting));
587      compacting.addAll(notCompacting);
588      notCompacting.clear();
589      try {
590        ctx.select(null, false, false, false);
591      } catch (IOException ex) {
592        fail("Shouldn't happen");
593      }
594      return Optional.of(ctx);
595    }
596
597    @Override
598    public synchronized void cancelCompaction(Object object) {
599      TestCompactionContext ctx = (TestCompactionContext) object;
600      compacting.removeAll(ctx.selectedFiles);
601      notCompacting.addAll(ctx.selectedFiles);
602    }
603
604    public synchronized void finishCompaction(List<HStoreFile> sfs) {
605      if (sfs.isEmpty()) return;
606      synchronized (results) {
607        results.add(sfs.size());
608      }
609      compacting.removeAll(sfs);
610    }
611
612    @Override
613    public int getPriority() {
614      return 7 - compacting.size() - notCompacting.size();
615    }
616  }
617
618  public class BlockingStoreMockMaker extends StatefulStoreMockMaker {
619    BlockingCompactionContext blocked = null;
620
621    public class BlockingCompactionContext extends CompactionContext {
622      public volatile boolean isInCompact = false;
623
624      public void unblock() {
625        synchronized (this) {
626          this.notifyAll();
627        }
628      }
629
630      @Override
631      public List<Path> compact(ThroughputController throughputController, User user)
632        throws IOException {
633        try {
634          isInCompact = true;
635          synchronized (this) {
636            this.wait();
637          }
638        } catch (InterruptedException e) {
639          Assume.assumeNoException(e);
640        }
641        return new ArrayList<>();
642      }
643
644      @Override
645      public List<HStoreFile> preSelect(List<HStoreFile> filesCompacting) {
646        return new ArrayList<>();
647      }
648
649      @Override
650      public boolean select(List<HStoreFile> f, boolean i, boolean m, boolean e)
651        throws IOException {
652        this.request = new CompactionRequestImpl(new ArrayList<>());
653        return true;
654      }
655    }
656
657    @Override
658    public Optional<CompactionContext> selectCompaction() {
659      this.blocked = new BlockingCompactionContext();
660      try {
661        this.blocked.select(null, false, false, false);
662      } catch (IOException ex) {
663        fail("Shouldn't happen");
664      }
665      return Optional.of(blocked);
666    }
667
668    @Override
669    public void cancelCompaction(Object object) {
670    }
671
672    @Override
673    public int getPriority() {
674      return Integer.MIN_VALUE; // some invalid value, see createStoreMock
675    }
676
677    public BlockingCompactionContext waitForBlocking() {
678      while (this.blocked == null || !this.blocked.isInCompact) {
679        Threads.sleepWithoutInterrupt(50);
680      }
681      BlockingCompactionContext ctx = this.blocked;
682      this.blocked = null;
683      return ctx;
684    }
685
686    @Override
687    public HStore createStoreMock(String name) throws Exception {
688      return createStoreMock(Integer.MIN_VALUE, name);
689    }
690
691    public HStore createStoreMock(int priority, String name) throws Exception {
692      // Override the mock to always return the specified priority.
693      HStore s = super.createStoreMock(name);
694      when(s.getCompactPriority()).thenReturn(priority);
695      return s;
696    }
697  }
698
699  /** Test compaction priority management and multiple compactions per store (HBASE-8665). */
700  @Test
701  public void testCompactionQueuePriorities() throws Exception {
702    // Setup a compact/split thread on a mock server.
703    final Configuration conf = HBaseConfiguration.create();
704    HRegionServer mockServer = mock(HRegionServer.class);
705    when(mockServer.isStopped()).thenReturn(false);
706    when(mockServer.getConfiguration()).thenReturn(conf);
707    when(mockServer.getChoreService()).thenReturn(new ChoreService("test"));
708    CompactSplit cst = new CompactSplit(mockServer);
709    when(mockServer.getCompactSplitThread()).thenReturn(cst);
710    // prevent large compaction thread pool stealing job from small compaction queue.
711    cst.shutdownLongCompactions();
712    // Set up the region mock that redirects compactions.
713    HRegion r = mock(HRegion.class);
714    when(r.compact(any(), any(), any(), any())).then(new Answer<Boolean>() {
715      @Override
716      public Boolean answer(InvocationOnMock invocation) throws Throwable {
717        invocation.<CompactionContext> getArgument(0).compact(invocation.getArgument(2), null);
718        return true;
719      }
720    });
721
722    // Set up store mocks for 2 "real" stores and the one we use for blocking CST.
723    ArrayList<Integer> results = new ArrayList<>();
724    StoreMockMaker sm = new StoreMockMaker(results), sm2 = new StoreMockMaker(results);
725    HStore store = sm.createStoreMock("store1");
726    HStore store2 = sm2.createStoreMock("store2");
727    BlockingStoreMockMaker blocker = new BlockingStoreMockMaker();
728
729    // First, block the compaction thread so that we could muck with queue.
730    cst.requestSystemCompaction(r, blocker.createStoreMock(1, "b-pri1"), "b-pri1");
731    BlockingStoreMockMaker.BlockingCompactionContext currentBlock = blocker.waitForBlocking();
732
733    // Add 4 files to store1, 3 to store2, and queue compactions; pri 3 and 4 respectively.
734    for (int i = 0; i < 4; ++i) {
735      sm.notCompacting.add(createFile());
736    }
737    cst.requestSystemCompaction(r, store, "s1-pri3");
738    for (int i = 0; i < 3; ++i) {
739      sm2.notCompacting.add(createFile());
740    }
741    cst.requestSystemCompaction(r, store2, "s2-pri4");
742    // Now add 2 more files to store1 and queue compaction - pri 1.
743    for (int i = 0; i < 2; ++i) {
744      sm.notCompacting.add(createFile());
745    }
746    cst.requestSystemCompaction(r, store, "s1-pri1");
747    // Finally add blocking compaction with priority 2.
748    cst.requestSystemCompaction(r, blocker.createStoreMock(2, "b-pri2"), "b-pri2");
749
750    // Unblock the blocking compaction; we should run pri1 and become block again in pri2.
751    currentBlock.unblock();
752    currentBlock = blocker.waitForBlocking();
753    // Pri1 should have "compacted" all 6 files.
754    assertEquals(1, results.size());
755    assertEquals(6, results.get(0).intValue());
756    // Add 2 files to store 1 (it has 2 files now).
757    for (int i = 0; i < 2; ++i) {
758      sm.notCompacting.add(createFile());
759    }
760    // Now we have pri4 for store 2 in queue, and pri3 for store1; store1's current priority
761    // is 5, however, so it must not preempt store 2. Add blocking compaction at the end.
762    cst.requestSystemCompaction(r, blocker.createStoreMock(7, "b-pri7"), "b-pri7");
763    currentBlock.unblock();
764    currentBlock = blocker.waitForBlocking();
765    assertEquals(3, results.size());
766    assertEquals(3, results.get(1).intValue()); // 3 files should go before 2 files.
767    assertEquals(2, results.get(2).intValue());
768
769    currentBlock.unblock();
770    cst.interruptIfNecessary();
771  }
772
773  /**
774   * Firstly write 10 cells (with different time stamp) to a qualifier and flush to hfile1, then
775   * write 10 cells (with different time stamp) to the same qualifier and flush to hfile2. The
776   * latest cell (cell-A) in hfile1 and the oldest cell (cell-B) in hfile2 are with the same time
777   * stamp but different sequence id, and will get scanned successively during compaction.
778   * <p/>
779   * We set compaction.kv.max to 10 so compaction will scan 10 versions each round, meanwhile we set
780   * keepSeqIdPeriod=0 in {@link DummyCompactor} so all 10 versions of hfile2 will be written out
781   * with seqId cleaned (set to 0) including cell-B, then when scanner goes to cell-A it will cause
782   * a scan out-of-order assertion error before HBASE-16931 n * if error occurs during the test
783   */
784  @Test
785  public void testCompactionSeqId() throws Exception {
786    final byte[] ROW = Bytes.toBytes("row");
787    final byte[] QUALIFIER = Bytes.toBytes("qualifier");
788
789    long timestamp = 10000;
790
791    // row1/cf:a/10009/Put/vlen=2/seqid=11 V: v9
792    // row1/cf:a/10008/Put/vlen=2/seqid=10 V: v8
793    // row1/cf:a/10007/Put/vlen=2/seqid=9 V: v7
794    // row1/cf:a/10006/Put/vlen=2/seqid=8 V: v6
795    // row1/cf:a/10005/Put/vlen=2/seqid=7 V: v5
796    // row1/cf:a/10004/Put/vlen=2/seqid=6 V: v4
797    // row1/cf:a/10003/Put/vlen=2/seqid=5 V: v3
798    // row1/cf:a/10002/Put/vlen=2/seqid=4 V: v2
799    // row1/cf:a/10001/Put/vlen=2/seqid=3 V: v1
800    // row1/cf:a/10000/Put/vlen=2/seqid=2 V: v0
801    for (int i = 0; i < 10; i++) {
802      Put put = new Put(ROW);
803      put.addColumn(FAMILY, QUALIFIER, timestamp + i, Bytes.toBytes("v" + i));
804      r.put(put);
805    }
806    r.flush(true);
807
808    // row1/cf:a/10018/Put/vlen=3/seqid=16 V: v18
809    // row1/cf:a/10017/Put/vlen=3/seqid=17 V: v17
810    // row1/cf:a/10016/Put/vlen=3/seqid=18 V: v16
811    // row1/cf:a/10015/Put/vlen=3/seqid=19 V: v15
812    // row1/cf:a/10014/Put/vlen=3/seqid=20 V: v14
813    // row1/cf:a/10013/Put/vlen=3/seqid=21 V: v13
814    // row1/cf:a/10012/Put/vlen=3/seqid=22 V: v12
815    // row1/cf:a/10011/Put/vlen=3/seqid=23 V: v11
816    // row1/cf:a/10010/Put/vlen=3/seqid=24 V: v10
817    // row1/cf:a/10009/Put/vlen=2/seqid=25 V: v9
818    for (int i = 18; i > 8; i--) {
819      Put put = new Put(ROW);
820      put.addColumn(FAMILY, QUALIFIER, timestamp + i, Bytes.toBytes("v" + i));
821      r.put(put);
822    }
823    r.flush(true);
824    r.compact(true);
825  }
826
827  public static class DummyCompactor extends DefaultCompactor {
828    public DummyCompactor(Configuration conf, HStore store) {
829      super(conf, store);
830      this.keepSeqIdPeriod = 0;
831    }
832  }
833
834  private static HStoreFile createFile() throws Exception {
835    HStoreFile sf = mock(HStoreFile.class);
836    when(sf.getPath()).thenReturn(new Path("file"));
837    StoreFileReader r = mock(StoreFileReader.class);
838    when(r.length()).thenReturn(10L);
839    when(sf.getReader()).thenReturn(r);
840    return sf;
841  }
842
843  /**
844   * Simple {@link CompactionLifeCycleTracker} on which you can wait until the requested compaction
845   * finishes.
846   */
847  public static class Tracker implements CompactionLifeCycleTracker {
848
849    private final CountDownLatch done;
850
851    public Tracker(CountDownLatch done) {
852      this.done = done;
853    }
854
855    @Override
856    public void afterExecution(Store store) {
857      done.countDown();
858    }
859  }
860
861  /**
862   * Simple {@link CompactionLifeCycleTracker} on which you can wait until the requested compaction
863   * finishes.
864   */
865  public static class WaitThroughPutController extends NoLimitThroughputController {
866
867    public WaitThroughPutController() {
868    }
869
870    @Override
871    public long control(String compactionName, long size) throws InterruptedException {
872      Thread.sleep(6000000);
873      return 6000000;
874    }
875  }
876}