001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY;
021import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY_BYTES;
022import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
023import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER;
024import static org.junit.Assert.assertEquals;
025import static org.junit.Assert.assertFalse;
026import static org.junit.Assert.assertTrue;
027import static org.junit.Assert.fail;
028import static org.mockito.Matchers.any;
029import static org.mockito.Mockito.doAnswer;
030import static org.mockito.Mockito.mock;
031import static org.mockito.Mockito.spy;
032import static org.mockito.Mockito.when;
033
034import java.io.IOException;
035import java.util.ArrayList;
036import java.util.Collection;
037import java.util.List;
038import java.util.Optional;
039import java.util.concurrent.CountDownLatch;
040import java.util.concurrent.TimeUnit;
041import org.apache.hadoop.conf.Configuration;
042import org.apache.hadoop.fs.FSDataOutputStream;
043import org.apache.hadoop.fs.FileStatus;
044import org.apache.hadoop.fs.FileSystem;
045import org.apache.hadoop.fs.Path;
046import org.apache.hadoop.hbase.ChoreService;
047import org.apache.hadoop.hbase.HBaseClassTestRule;
048import org.apache.hadoop.hbase.HBaseConfiguration;
049import org.apache.hadoop.hbase.HBaseTestCase;
050import org.apache.hadoop.hbase.HBaseTestingUtility;
051import org.apache.hadoop.hbase.HColumnDescriptor;
052import org.apache.hadoop.hbase.HConstants;
053import org.apache.hadoop.hbase.HTableDescriptor;
054import org.apache.hadoop.hbase.Waiter;
055import org.apache.hadoop.hbase.client.Delete;
056import org.apache.hadoop.hbase.client.Durability;
057import org.apache.hadoop.hbase.client.Put;
058import org.apache.hadoop.hbase.client.Table;
059import org.apache.hadoop.hbase.io.hfile.HFileScanner;
060import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
061import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
062import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
063import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
064import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
065import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
066import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
067import org.apache.hadoop.hbase.security.User;
068import org.apache.hadoop.hbase.testclassification.MediumTests;
069import org.apache.hadoop.hbase.testclassification.RegionServerTests;
070import org.apache.hadoop.hbase.util.Bytes;
071import org.apache.hadoop.hbase.util.Threads;
072import org.apache.hadoop.hbase.wal.WAL;
073import org.junit.After;
074import org.junit.Assume;
075import org.junit.Before;
076import org.junit.ClassRule;
077import org.junit.Rule;
078import org.junit.Test;
079import org.junit.experimental.categories.Category;
080import org.junit.rules.TestName;
081import org.mockito.Mockito;
082import org.mockito.invocation.InvocationOnMock;
083import org.mockito.stubbing.Answer;
084
085/**
086 * Test compaction framework and common functions
087 */
088@Category({RegionServerTests.class, MediumTests.class})
089public class TestCompaction {
090
091  @ClassRule
092  public static final HBaseClassTestRule CLASS_RULE =
093      HBaseClassTestRule.forClass(TestCompaction.class);
094
095  @Rule public TestName name = new TestName();
096  private static final HBaseTestingUtility UTIL = HBaseTestingUtility.createLocalHTU();
097  protected Configuration conf = UTIL.getConfiguration();
098
099  private HRegion r = null;
100  private HTableDescriptor htd = null;
101  private static final byte [] COLUMN_FAMILY = fam1;
102  private final byte [] STARTROW = Bytes.toBytes(START_KEY);
103  private static final byte [] COLUMN_FAMILY_TEXT = COLUMN_FAMILY;
104  private int compactionThreshold;
105  private byte[] secondRowBytes, thirdRowBytes;
106  private static final long MAX_FILES_TO_COMPACT = 10;
107  private final byte[] FAMILY = Bytes.toBytes("cf");
108
109  /** constructor */
110  public TestCompaction() {
111    super();
112
113    // Set cache flush size to 1MB
114    conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024);
115    conf.setInt(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, 100);
116    conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
117        NoLimitThroughputController.class.getName());
118    compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3);
119
120    secondRowBytes = START_KEY_BYTES.clone();
121    // Increment the least significant character so we get to next row.
122    secondRowBytes[START_KEY_BYTES.length - 1]++;
123    thirdRowBytes = START_KEY_BYTES.clone();
124    thirdRowBytes[START_KEY_BYTES.length - 1] =
125        (byte) (thirdRowBytes[START_KEY_BYTES.length - 1] + 2);
126  }
127
128  @Before
129  public void setUp() throws Exception {
130    this.htd = UTIL.createTableDescriptor(name.getMethodName());
131    if (name.getMethodName().equals("testCompactionSeqId")) {
132      UTIL.getConfiguration().set("hbase.hstore.compaction.kv.max", "10");
133      UTIL.getConfiguration().set(
134          DefaultStoreEngine.DEFAULT_COMPACTOR_CLASS_KEY,
135          DummyCompactor.class.getName());
136      HColumnDescriptor hcd = new HColumnDescriptor(FAMILY);
137      hcd.setMaxVersions(65536);
138      this.htd.addFamily(hcd);
139    }
140    this.r = UTIL.createLocalHRegion(htd, null, null);
141  }
142
143  @After
144  public void tearDown() throws Exception {
145    WAL wal = r.getWAL();
146    this.r.close();
147    wal.close();
148  }
149
150  /**
151   * Verify that you can stop a long-running compaction
152   * (used during RS shutdown)
153   * @throws Exception
154   */
155  @Test
156  public void testInterruptCompaction() throws Exception {
157    assertEquals(0, count());
158
159    // lower the polling interval for this test
160    int origWI = HStore.closeCheckInterval;
161    HStore.closeCheckInterval = 10*1000; // 10 KB
162
163    try {
164      // Create a couple store files w/ 15KB (over 10KB interval)
165      int jmax = (int) Math.ceil(15.0/compactionThreshold);
166      byte [] pad = new byte[1000]; // 1 KB chunk
167      for (int i = 0; i < compactionThreshold; i++) {
168        Table loader = new RegionAsTable(r);
169        Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i)));
170        p.setDurability(Durability.SKIP_WAL);
171        for (int j = 0; j < jmax; j++) {
172          p.addColumn(COLUMN_FAMILY, Bytes.toBytes(j), pad);
173        }
174        HBaseTestCase.addContent(loader, Bytes.toString(COLUMN_FAMILY));
175        loader.put(p);
176        r.flush(true);
177      }
178
179      HRegion spyR = spy(r);
180      doAnswer(new Answer() {
181        @Override
182        public Object answer(InvocationOnMock invocation) throws Throwable {
183          r.writestate.writesEnabled = false;
184          return invocation.callRealMethod();
185        }
186      }).when(spyR).doRegionCompactionPrep();
187
188      // force a minor compaction, but not before requesting a stop
189      spyR.compactStores();
190
191      // ensure that the compaction stopped, all old files are intact,
192      HStore s = r.getStore(COLUMN_FAMILY);
193      assertEquals(compactionThreshold, s.getStorefilesCount());
194      assertTrue(s.getStorefilesSize() > 15*1000);
195      // and no new store files persisted past compactStores()
196      // only one empty dir exists in temp dir
197      FileStatus[] ls = r.getFilesystem().listStatus(r.getRegionFileSystem().getTempDir());
198      assertEquals(1, ls.length);
199      Path storeTempDir = new Path(r.getRegionFileSystem().getTempDir(), Bytes.toString(COLUMN_FAMILY));
200      assertTrue(r.getFilesystem().exists(storeTempDir));
201      ls = r.getFilesystem().listStatus(storeTempDir);
202      assertEquals(0, ls.length);
203    } finally {
204      // don't mess up future tests
205      r.writestate.writesEnabled = true;
206      HStore.closeCheckInterval = origWI;
207
208      // Delete all Store information once done using
209      for (int i = 0; i < compactionThreshold; i++) {
210        Delete delete = new Delete(Bytes.add(STARTROW, Bytes.toBytes(i)));
211        byte [][] famAndQf = {COLUMN_FAMILY, null};
212        delete.addFamily(famAndQf[0]);
213        r.delete(delete);
214      }
215      r.flush(true);
216
217      // Multiple versions allowed for an entry, so the delete isn't enough
218      // Lower TTL and expire to ensure that all our entries have been wiped
219      final int ttl = 1000;
220      for (HStore store : this.r.stores.values()) {
221        ScanInfo old = store.getScanInfo();
222        ScanInfo si = old.customize(old.getMaxVersions(), ttl, old.getKeepDeletedCells());
223        store.setScanInfo(si);
224      }
225      Thread.sleep(ttl);
226
227      r.compact(true);
228      assertEquals(0, count());
229    }
230  }
231
232  private int count() throws IOException {
233    int count = 0;
234    for (HStoreFile f: this.r.stores.
235        get(COLUMN_FAMILY_TEXT).getStorefiles()) {
236      HFileScanner scanner = f.getReader().getScanner(false, false);
237      if (!scanner.seekTo()) {
238        continue;
239      }
240      do {
241        count++;
242      } while(scanner.next());
243    }
244    return count;
245  }
246
247  private void createStoreFile(final HRegion region) throws IOException {
248    createStoreFile(region, Bytes.toString(COLUMN_FAMILY));
249  }
250
251  private void createStoreFile(final HRegion region, String family) throws IOException {
252    Table loader = new RegionAsTable(region);
253    HBaseTestCase.addContent(loader, family);
254    region.flush(true);
255  }
256
257  @Test
258  public void testCompactionWithCorruptResult() throws Exception {
259    int nfiles = 10;
260    for (int i = 0; i < nfiles; i++) {
261      createStoreFile(r);
262    }
263    HStore store = r.getStore(COLUMN_FAMILY);
264
265    Collection<HStoreFile> storeFiles = store.getStorefiles();
266    DefaultCompactor tool = (DefaultCompactor)store.storeEngine.getCompactor();
267    tool.compactForTesting(storeFiles, false);
268
269    // Now lets corrupt the compacted file.
270    FileSystem fs = store.getFileSystem();
271    // default compaction policy created one and only one new compacted file
272    Path dstPath = store.getRegionFileSystem().createTempName();
273    FSDataOutputStream stream = fs.create(dstPath, null, true, 512, (short)3, 1024L, null);
274    stream.writeChars("CORRUPT FILE!!!!");
275    stream.close();
276    Path origPath = store.getRegionFileSystem().commitStoreFile(
277      Bytes.toString(COLUMN_FAMILY), dstPath);
278
279    try {
280      ((HStore)store).moveFileIntoPlace(origPath);
281    } catch (Exception e) {
282      // The complete compaction should fail and the corrupt file should remain
283      // in the 'tmp' directory;
284      assertTrue(fs.exists(origPath));
285      assertFalse(fs.exists(dstPath));
286      System.out.println("testCompactionWithCorruptResult Passed");
287      return;
288    }
289    fail("testCompactionWithCorruptResult failed since no exception was" +
290        "thrown while completing a corrupt file");
291  }
292
293  /**
294   * Create a custom compaction request and be sure that we can track it through the queue, knowing
295   * when the compaction is completed.
296   */
297  @Test
298  public void testTrackingCompactionRequest() throws Exception {
299    // setup a compact/split thread on a mock server
300    HRegionServer mockServer = Mockito.mock(HRegionServer.class);
301    Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
302    CompactSplit thread = new CompactSplit(mockServer);
303    Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
304
305    // setup a region/store with some files
306    HStore store = r.getStore(COLUMN_FAMILY);
307    createStoreFile(r);
308    for (int i = 0; i < MAX_FILES_TO_COMPACT + 1; i++) {
309      createStoreFile(r);
310    }
311
312    CountDownLatch latch = new CountDownLatch(1);
313    Tracker tracker = new Tracker(latch);
314    thread.requestCompaction(r, store, "test custom comapction", PRIORITY_USER, tracker,
315      null);
316    // wait for the latch to complete.
317    latch.await();
318
319    thread.interruptIfNecessary();
320  }
321
322  @Test
323  public void testCompactionFailure() throws Exception {
324    // setup a compact/split thread on a mock server
325    HRegionServer mockServer = Mockito.mock(HRegionServer.class);
326    Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
327    CompactSplit thread = new CompactSplit(mockServer);
328    Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
329
330    // setup a region/store with some files
331    HStore store = r.getStore(COLUMN_FAMILY);
332    createStoreFile(r);
333    for (int i = 0; i < HStore.DEFAULT_BLOCKING_STOREFILE_COUNT - 1; i++) {
334      createStoreFile(r);
335    }
336
337    HRegion mockRegion = Mockito.spy(r);
338    Mockito.when(mockRegion.checkSplit()).
339      thenThrow(new RuntimeException("Thrown intentionally by test!"));
340
341    MetricsRegionWrapper metricsWrapper = new MetricsRegionWrapperImpl(r);
342
343    long preCompletedCount = metricsWrapper.getNumCompactionsCompleted();
344    long preFailedCount = metricsWrapper.getNumCompactionsFailed();
345
346    CountDownLatch latch = new CountDownLatch(1);
347    Tracker tracker = new Tracker(latch);
348    thread.requestCompaction(mockRegion, store, "test custom comapction", PRIORITY_USER,
349      tracker, null);
350    // wait for the latch to complete.
351    latch.await(120, TimeUnit.SECONDS);
352
353    // compaction should have completed and been marked as failed due to error in split request
354    long postCompletedCount = metricsWrapper.getNumCompactionsCompleted();
355    long postFailedCount = metricsWrapper.getNumCompactionsFailed();
356
357    assertTrue("Completed count should have increased (pre=" + preCompletedCount +
358        ", post="+postCompletedCount+")",
359        postCompletedCount > preCompletedCount);
360    assertTrue("Failed count should have increased (pre=" + preFailedCount +
361        ", post=" + postFailedCount + ")",
362        postFailedCount > preFailedCount);
363  }
364
365  /**
366   * Test no new Compaction requests are generated after calling stop compactions
367   */
368  @Test
369  public void testStopStartCompaction() throws IOException {
370    // setup a compact/split thread on a mock server
371    HRegionServer mockServer = Mockito.mock(HRegionServer.class);
372    Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
373    final CompactSplit thread = new CompactSplit(mockServer);
374    Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
375    // setup a region/store with some files
376    HStore store = r.getStore(COLUMN_FAMILY);
377    createStoreFile(r);
378    for (int i = 0; i < HStore.DEFAULT_BLOCKING_STOREFILE_COUNT - 1; i++) {
379      createStoreFile(r);
380    }
381    thread.switchCompaction(false);
382    thread.requestCompaction(r, store, "test", Store.PRIORITY_USER,
383      CompactionLifeCycleTracker.DUMMY, null);
384    assertFalse(thread.isCompactionsEnabled());
385    int longCompactions = thread.getLongCompactions().getActiveCount();
386    int shortCompactions = thread.getShortCompactions().getActiveCount();
387    assertEquals("longCompactions=" + longCompactions + "," +
388        "shortCompactions=" + shortCompactions, 0, longCompactions + shortCompactions);
389    thread.switchCompaction(true);
390    assertTrue(thread.isCompactionsEnabled());
391    // Make sure no compactions have run.
392    assertEquals(0, thread.getLongCompactions().getCompletedTaskCount() +
393        thread.getShortCompactions().getCompletedTaskCount());
394    // Request a compaction and make sure it is submitted successfully.
395    thread.requestCompaction(r, store, "test", Store.PRIORITY_USER,
396        CompactionLifeCycleTracker.DUMMY, null);
397    // Wait until the compaction finishes.
398    Waiter.waitFor(UTIL.getConfiguration(), 5000,
399        (Waiter.Predicate<Exception>) () -> thread.getLongCompactions().getCompletedTaskCount() +
400        thread.getShortCompactions().getCompletedTaskCount() == 1);
401    // Make sure there are no compactions running.
402    assertEquals(0, thread.getLongCompactions().getActiveCount()
403        + thread.getShortCompactions().getActiveCount());
404  }
405
406  @Test public void testInterruptingRunningCompactions() throws Exception {
407    // setup a compact/split thread on a mock server
408    conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
409        WaitThroughPutController.class.getName());
410    HRegionServer mockServer = Mockito.mock(HRegionServer.class);
411    Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
412    CompactSplit thread = new CompactSplit(mockServer);
413
414    Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
415
416    // setup a region/store with some files
417    HStore store = r.getStore(COLUMN_FAMILY);
418    int jmax = (int) Math.ceil(15.0 / compactionThreshold);
419    byte[] pad = new byte[1000]; // 1 KB chunk
420    for (int i = 0; i < compactionThreshold; i++) {
421      Table loader = new RegionAsTable(r);
422      Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i)));
423      p.setDurability(Durability.SKIP_WAL);
424      for (int j = 0; j < jmax; j++) {
425        p.addColumn(COLUMN_FAMILY, Bytes.toBytes(j), pad);
426      }
427      HBaseTestCase.addContent(loader, Bytes.toString(COLUMN_FAMILY));
428      loader.put(p);
429      r.flush(true);
430    }
431    HStore s = r.getStore(COLUMN_FAMILY);
432    int initialFiles = s.getStorefilesCount();
433
434    thread.requestCompaction(r, store, "test custom comapction", PRIORITY_USER,
435        CompactionLifeCycleTracker.DUMMY, null);
436
437    Thread.sleep(3000);
438    thread.switchCompaction(false);
439    assertEquals(initialFiles, s.getStorefilesCount());
440    //don't mess up future tests
441    thread.switchCompaction(true);
442  }
443
444  /**
445   * HBASE-7947: Regression test to ensure adding to the correct list in the
446   * {@link CompactSplit}
447   * @throws Exception on failure
448   */
449  @Test
450  public void testMultipleCustomCompactionRequests() throws Exception {
451    // setup a compact/split thread on a mock server
452    HRegionServer mockServer = Mockito.mock(HRegionServer.class);
453    Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
454    CompactSplit thread = new CompactSplit(mockServer);
455    Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
456
457    // setup a region/store with some files
458    int numStores = r.getStores().size();
459    CountDownLatch latch = new CountDownLatch(numStores);
460    Tracker tracker = new Tracker(latch);
461    // create some store files and setup requests for each store on which we want to do a
462    // compaction
463    for (HStore store : r.getStores()) {
464      createStoreFile(r, store.getColumnFamilyName());
465      createStoreFile(r, store.getColumnFamilyName());
466      createStoreFile(r, store.getColumnFamilyName());
467      thread.requestCompaction(r, store, "test mulitple custom comapctions", PRIORITY_USER,
468        tracker, null);
469    }
470    // wait for the latch to complete.
471    latch.await();
472
473    thread.interruptIfNecessary();
474  }
475
476  class StoreMockMaker extends StatefulStoreMockMaker {
477    public ArrayList<HStoreFile> compacting = new ArrayList<>();
478    public ArrayList<HStoreFile> notCompacting = new ArrayList<>();
479    private final ArrayList<Integer> results;
480
481    public StoreMockMaker(ArrayList<Integer> results) {
482      this.results = results;
483    }
484
485    public class TestCompactionContext extends CompactionContext {
486
487      private List<HStoreFile> selectedFiles;
488
489      public TestCompactionContext(List<HStoreFile> selectedFiles) {
490        super();
491        this.selectedFiles = selectedFiles;
492      }
493
494      @Override
495      public List<HStoreFile> preSelect(List<HStoreFile> filesCompacting) {
496        return new ArrayList<>();
497      }
498
499      @Override
500      public boolean select(List<HStoreFile> filesCompacting, boolean isUserCompaction,
501          boolean mayUseOffPeak, boolean forceMajor) throws IOException {
502        this.request = new CompactionRequestImpl(selectedFiles);
503        this.request.setPriority(getPriority());
504        return true;
505      }
506
507      @Override
508      public List<Path> compact(ThroughputController throughputController, User user)
509          throws IOException {
510        finishCompaction(this.selectedFiles);
511        return new ArrayList<>();
512      }
513    }
514
515    @Override
516    public synchronized Optional<CompactionContext> selectCompaction() {
517      CompactionContext ctx = new TestCompactionContext(new ArrayList<>(notCompacting));
518      compacting.addAll(notCompacting);
519      notCompacting.clear();
520      try {
521        ctx.select(null, false, false, false);
522      } catch (IOException ex) {
523        fail("Shouldn't happen");
524      }
525      return Optional.of(ctx);
526    }
527
528    @Override
529    public synchronized void cancelCompaction(Object object) {
530      TestCompactionContext ctx = (TestCompactionContext)object;
531      compacting.removeAll(ctx.selectedFiles);
532      notCompacting.addAll(ctx.selectedFiles);
533    }
534
535    public synchronized void finishCompaction(List<HStoreFile> sfs) {
536      if (sfs.isEmpty()) return;
537      synchronized (results) {
538        results.add(sfs.size());
539      }
540      compacting.removeAll(sfs);
541    }
542
543    @Override
544    public int getPriority() {
545      return 7 - compacting.size() - notCompacting.size();
546    }
547  }
548
549  public class BlockingStoreMockMaker extends StatefulStoreMockMaker {
550    BlockingCompactionContext blocked = null;
551
552    public class BlockingCompactionContext extends CompactionContext {
553      public volatile boolean isInCompact = false;
554
555      public void unblock() {
556        synchronized (this) {
557          this.notifyAll();
558        }
559      }
560
561      @Override
562      public List<Path> compact(ThroughputController throughputController, User user)
563          throws IOException {
564        try {
565          isInCompact = true;
566          synchronized (this) {
567            this.wait();
568          }
569        } catch (InterruptedException e) {
570          Assume.assumeNoException(e);
571        }
572        return new ArrayList<>();
573      }
574
575      @Override
576      public List<HStoreFile> preSelect(List<HStoreFile> filesCompacting) {
577        return new ArrayList<>();
578      }
579
580      @Override
581      public boolean select(List<HStoreFile> f, boolean i, boolean m, boolean e)
582          throws IOException {
583        this.request = new CompactionRequestImpl(new ArrayList<>());
584        return true;
585      }
586    }
587
588    @Override
589    public Optional<CompactionContext> selectCompaction() {
590      this.blocked = new BlockingCompactionContext();
591      try {
592        this.blocked.select(null, false, false, false);
593      } catch (IOException ex) {
594        fail("Shouldn't happen");
595      }
596      return Optional.of(blocked);
597    }
598
599    @Override
600    public void cancelCompaction(Object object) {}
601
602    @Override
603    public int getPriority() {
604      return Integer.MIN_VALUE; // some invalid value, see createStoreMock
605    }
606
607    public BlockingCompactionContext waitForBlocking() {
608      while (this.blocked == null || !this.blocked.isInCompact) {
609        Threads.sleepWithoutInterrupt(50);
610      }
611      BlockingCompactionContext ctx = this.blocked;
612      this.blocked = null;
613      return ctx;
614    }
615
616    @Override
617    public HStore createStoreMock(String name) throws Exception {
618      return createStoreMock(Integer.MIN_VALUE, name);
619    }
620
621    public HStore createStoreMock(int priority, String name) throws Exception {
622      // Override the mock to always return the specified priority.
623      HStore s = super.createStoreMock(name);
624      when(s.getCompactPriority()).thenReturn(priority);
625      return s;
626    }
627  }
628
629  /** Test compaction priority management and multiple compactions per store (HBASE-8665). */
630  @Test
631  public void testCompactionQueuePriorities() throws Exception {
632    // Setup a compact/split thread on a mock server.
633    final Configuration conf = HBaseConfiguration.create();
634    HRegionServer mockServer = mock(HRegionServer.class);
635    when(mockServer.isStopped()).thenReturn(false);
636    when(mockServer.getConfiguration()).thenReturn(conf);
637    when(mockServer.getChoreService()).thenReturn(new ChoreService("test"));
638    CompactSplit cst = new CompactSplit(mockServer);
639    when(mockServer.getCompactSplitThread()).thenReturn(cst);
640    //prevent large compaction thread pool stealing job from small compaction queue.
641    cst.shutdownLongCompactions();
642    // Set up the region mock that redirects compactions.
643    HRegion r = mock(HRegion.class);
644    when(
645      r.compact(any(), any(), any(), any())).then(new Answer<Boolean>() {
646        @Override
647        public Boolean answer(InvocationOnMock invocation) throws Throwable {
648          invocation.<CompactionContext>getArgument(0).compact(invocation.getArgument(2), null);
649          return true;
650        }
651    });
652
653    // Set up store mocks for 2 "real" stores and the one we use for blocking CST.
654    ArrayList<Integer> results = new ArrayList<>();
655    StoreMockMaker sm = new StoreMockMaker(results), sm2 = new StoreMockMaker(results);
656    HStore store = sm.createStoreMock("store1");
657    HStore store2 = sm2.createStoreMock("store2");
658    BlockingStoreMockMaker blocker = new BlockingStoreMockMaker();
659
660    // First, block the compaction thread so that we could muck with queue.
661    cst.requestSystemCompaction(r, blocker.createStoreMock(1, "b-pri1"), "b-pri1");
662    BlockingStoreMockMaker.BlockingCompactionContext currentBlock = blocker.waitForBlocking();
663
664    // Add 4 files to store1, 3 to store2, and queue compactions; pri 3 and 4 respectively.
665    for (int i = 0; i < 4; ++i) {
666      sm.notCompacting.add(createFile());
667    }
668    cst.requestSystemCompaction(r, store, "s1-pri3");
669    for (int i = 0; i < 3; ++i) {
670      sm2.notCompacting.add(createFile());
671    }
672    cst.requestSystemCompaction(r, store2, "s2-pri4");
673    // Now add 2 more files to store1 and queue compaction - pri 1.
674    for (int i = 0; i < 2; ++i) {
675      sm.notCompacting.add(createFile());
676    }
677    cst.requestSystemCompaction(r, store, "s1-pri1");
678    // Finally add blocking compaction with priority 2.
679    cst.requestSystemCompaction(r, blocker.createStoreMock(2, "b-pri2"), "b-pri2");
680
681    // Unblock the blocking compaction; we should run pri1 and become block again in pri2.
682    currentBlock.unblock();
683    currentBlock = blocker.waitForBlocking();
684    // Pri1 should have "compacted" all 6 files.
685    assertEquals(1, results.size());
686    assertEquals(6, results.get(0).intValue());
687    // Add 2 files to store 1 (it has 2 files now).
688    for (int i = 0; i < 2; ++i) {
689      sm.notCompacting.add(createFile());
690    }
691    // Now we have pri4 for store 2 in queue, and pri3 for store1; store1's current priority
692    // is 5, however, so it must not preempt store 2. Add blocking compaction at the end.
693    cst.requestSystemCompaction(r, blocker.createStoreMock(7, "b-pri7"), "b-pri7");
694    currentBlock.unblock();
695    currentBlock = blocker.waitForBlocking();
696    assertEquals(3, results.size());
697    assertEquals(3, results.get(1).intValue()); // 3 files should go before 2 files.
698    assertEquals(2, results.get(2).intValue());
699
700    currentBlock.unblock();
701    cst.interruptIfNecessary();
702  }
703
704  /**
705   * Firstly write 10 cells (with different time stamp) to a qualifier and flush
706   * to hfile1, then write 10 cells (with different time stamp) to the same
707   * qualifier and flush to hfile2. The latest cell (cell-A) in hfile1 and the
708   * oldest cell (cell-B) in hfile2 are with the same time stamp but different
709   * sequence id, and will get scanned successively during compaction.
710   * <p/>
711   * We set compaction.kv.max to 10 so compaction will scan 10 versions each
712   * round, meanwhile we set keepSeqIdPeriod=0 in {@link DummyCompactor} so all
713   * 10 versions of hfile2 will be written out with seqId cleaned (set to 0)
714   * including cell-B, then when scanner goes to cell-A it will cause a scan
715   * out-of-order assertion error before HBASE-16931
716   *
717   * @throws Exception
718   *           if error occurs during the test
719   */
720  @Test
721  public void testCompactionSeqId() throws Exception {
722    final byte[] ROW = Bytes.toBytes("row");
723    final byte[] QUALIFIER = Bytes.toBytes("qualifier");
724
725    long timestamp = 10000;
726
727    // row1/cf:a/10009/Put/vlen=2/seqid=11 V: v9
728    // row1/cf:a/10008/Put/vlen=2/seqid=10 V: v8
729    // row1/cf:a/10007/Put/vlen=2/seqid=9 V: v7
730    // row1/cf:a/10006/Put/vlen=2/seqid=8 V: v6
731    // row1/cf:a/10005/Put/vlen=2/seqid=7 V: v5
732    // row1/cf:a/10004/Put/vlen=2/seqid=6 V: v4
733    // row1/cf:a/10003/Put/vlen=2/seqid=5 V: v3
734    // row1/cf:a/10002/Put/vlen=2/seqid=4 V: v2
735    // row1/cf:a/10001/Put/vlen=2/seqid=3 V: v1
736    // row1/cf:a/10000/Put/vlen=2/seqid=2 V: v0
737    for (int i = 0; i < 10; i++) {
738      Put put = new Put(ROW);
739      put.addColumn(FAMILY, QUALIFIER, timestamp + i, Bytes.toBytes("v" + i));
740      r.put(put);
741    }
742    r.flush(true);
743
744    // row1/cf:a/10018/Put/vlen=3/seqid=16 V: v18
745    // row1/cf:a/10017/Put/vlen=3/seqid=17 V: v17
746    // row1/cf:a/10016/Put/vlen=3/seqid=18 V: v16
747    // row1/cf:a/10015/Put/vlen=3/seqid=19 V: v15
748    // row1/cf:a/10014/Put/vlen=3/seqid=20 V: v14
749    // row1/cf:a/10013/Put/vlen=3/seqid=21 V: v13
750    // row1/cf:a/10012/Put/vlen=3/seqid=22 V: v12
751    // row1/cf:a/10011/Put/vlen=3/seqid=23 V: v11
752    // row1/cf:a/10010/Put/vlen=3/seqid=24 V: v10
753    // row1/cf:a/10009/Put/vlen=2/seqid=25 V: v9
754    for (int i = 18; i > 8; i--) {
755      Put put = new Put(ROW);
756      put.addColumn(FAMILY, QUALIFIER, timestamp + i, Bytes.toBytes("v" + i));
757      r.put(put);
758    }
759    r.flush(true);
760    r.compact(true);
761  }
762
763  public static class DummyCompactor extends DefaultCompactor {
764    public DummyCompactor(Configuration conf, HStore store) {
765      super(conf, store);
766      this.keepSeqIdPeriod = 0;
767    }
768  }
769
770  private static HStoreFile createFile() throws Exception {
771    HStoreFile sf = mock(HStoreFile.class);
772    when(sf.getPath()).thenReturn(new Path("file"));
773    StoreFileReader r = mock(StoreFileReader.class);
774    when(r.length()).thenReturn(10L);
775    when(sf.getReader()).thenReturn(r);
776    return sf;
777  }
778
779  /**
780   * Simple {@link CompactionLifeCycleTracker} on which you can wait until the requested compaction
781   * finishes.
782   */
783  public static class Tracker implements CompactionLifeCycleTracker {
784
785    private final CountDownLatch done;
786
787    public Tracker(CountDownLatch done) {
788      this.done = done;
789    }
790
791    @Override
792    public void afterExecution(Store store) {
793      done.countDown();
794    }
795  }
796
797  /**
798   * Simple {@link CompactionLifeCycleTracker} on which you can wait until the requested compaction
799   * finishes.
800   */
801  public static class WaitThroughPutController extends NoLimitThroughputController{
802
803    public WaitThroughPutController() {
804    }
805
806    @Override
807    public long control(String compactionName, long size) throws InterruptedException {
808      Thread.sleep(6000000);
809      return 6000000;
810    }
811  }
812}