001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY;
021import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY_BYTES;
022import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
023import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER;
024import static org.junit.Assert.assertEquals;
025import static org.junit.Assert.assertFalse;
026import static org.junit.Assert.assertTrue;
027import static org.junit.Assert.fail;
028import static org.mockito.Matchers.any;
029import static org.mockito.Mockito.doAnswer;
030import static org.mockito.Mockito.mock;
031import static org.mockito.Mockito.spy;
032import static org.mockito.Mockito.when;
033
034import java.io.IOException;
035import java.util.ArrayList;
036import java.util.Collection;
037import java.util.List;
038import java.util.Optional;
039import java.util.concurrent.CountDownLatch;
040import java.util.concurrent.TimeUnit;
041import org.apache.hadoop.conf.Configuration;
042import org.apache.hadoop.fs.FSDataOutputStream;
043import org.apache.hadoop.fs.FileStatus;
044import org.apache.hadoop.fs.FileSystem;
045import org.apache.hadoop.fs.Path;
046import org.apache.hadoop.hbase.ChoreService;
047import org.apache.hadoop.hbase.HBaseClassTestRule;
048import org.apache.hadoop.hbase.HBaseConfiguration;
049import org.apache.hadoop.hbase.HBaseTestCase;
050import org.apache.hadoop.hbase.HBaseTestingUtility;
051import org.apache.hadoop.hbase.HColumnDescriptor;
052import org.apache.hadoop.hbase.HConstants;
053import org.apache.hadoop.hbase.HTableDescriptor;
054import org.apache.hadoop.hbase.client.Delete;
055import org.apache.hadoop.hbase.client.Durability;
056import org.apache.hadoop.hbase.client.Put;
057import org.apache.hadoop.hbase.client.Table;
058import org.apache.hadoop.hbase.io.hfile.HFileScanner;
059import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
060import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
061import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
062import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
063import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
064import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
065import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
066import org.apache.hadoop.hbase.security.User;
067import org.apache.hadoop.hbase.testclassification.MediumTests;
068import org.apache.hadoop.hbase.testclassification.RegionServerTests;
069import org.apache.hadoop.hbase.util.Bytes;
070import org.apache.hadoop.hbase.util.Threads;
071import org.apache.hadoop.hbase.wal.WAL;
072import org.junit.After;
073import org.junit.Assume;
074import org.junit.Before;
075import org.junit.ClassRule;
076import org.junit.Rule;
077import org.junit.Test;
078import org.junit.experimental.categories.Category;
079import org.junit.rules.TestName;
080import org.mockito.Mockito;
081import org.mockito.invocation.InvocationOnMock;
082import org.mockito.stubbing.Answer;
083
084/**
085 * Test compaction framework and common functions
086 */
087@Category({RegionServerTests.class, MediumTests.class})
088public class TestCompaction {
089
090  @ClassRule
091  public static final HBaseClassTestRule CLASS_RULE =
092      HBaseClassTestRule.forClass(TestCompaction.class);
093
094  @Rule public TestName name = new TestName();
095  private static final HBaseTestingUtility UTIL = HBaseTestingUtility.createLocalHTU();
096  protected Configuration conf = UTIL.getConfiguration();
097
098  private HRegion r = null;
099  private HTableDescriptor htd = null;
100  private static final byte [] COLUMN_FAMILY = fam1;
101  private final byte [] STARTROW = Bytes.toBytes(START_KEY);
102  private static final byte [] COLUMN_FAMILY_TEXT = COLUMN_FAMILY;
103  private int compactionThreshold;
104  private byte[] secondRowBytes, thirdRowBytes;
105  private static final long MAX_FILES_TO_COMPACT = 10;
106  private final byte[] FAMILY = Bytes.toBytes("cf");
107
108  /** constructor */
109  public TestCompaction() {
110    super();
111
112    // Set cache flush size to 1MB
113    conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024);
114    conf.setInt(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, 100);
115    conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
116        NoLimitThroughputController.class.getName());
117    compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3);
118
119    secondRowBytes = START_KEY_BYTES.clone();
120    // Increment the least significant character so we get to next row.
121    secondRowBytes[START_KEY_BYTES.length - 1]++;
122    thirdRowBytes = START_KEY_BYTES.clone();
123    thirdRowBytes[START_KEY_BYTES.length - 1] =
124        (byte) (thirdRowBytes[START_KEY_BYTES.length - 1] + 2);
125  }
126
127  @Before
128  public void setUp() throws Exception {
129    this.htd = UTIL.createTableDescriptor(name.getMethodName());
130    if (name.getMethodName().equals("testCompactionSeqId")) {
131      UTIL.getConfiguration().set("hbase.hstore.compaction.kv.max", "10");
132      UTIL.getConfiguration().set(
133          DefaultStoreEngine.DEFAULT_COMPACTOR_CLASS_KEY,
134          DummyCompactor.class.getName());
135      HColumnDescriptor hcd = new HColumnDescriptor(FAMILY);
136      hcd.setMaxVersions(65536);
137      this.htd.addFamily(hcd);
138    }
139    this.r = UTIL.createLocalHRegion(htd, null, null);
140  }
141
142  @After
143  public void tearDown() throws Exception {
144    WAL wal = r.getWAL();
145    this.r.close();
146    wal.close();
147  }
148
149  /**
150   * Verify that you can stop a long-running compaction
151   * (used during RS shutdown)
152   * @throws Exception
153   */
154  @Test
155  public void testInterruptCompaction() throws Exception {
156    assertEquals(0, count());
157
158    // lower the polling interval for this test
159    int origWI = HStore.closeCheckInterval;
160    HStore.closeCheckInterval = 10*1000; // 10 KB
161
162    try {
163      // Create a couple store files w/ 15KB (over 10KB interval)
164      int jmax = (int) Math.ceil(15.0/compactionThreshold);
165      byte [] pad = new byte[1000]; // 1 KB chunk
166      for (int i = 0; i < compactionThreshold; i++) {
167        Table loader = new RegionAsTable(r);
168        Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i)));
169        p.setDurability(Durability.SKIP_WAL);
170        for (int j = 0; j < jmax; j++) {
171          p.addColumn(COLUMN_FAMILY, Bytes.toBytes(j), pad);
172        }
173        HBaseTestCase.addContent(loader, Bytes.toString(COLUMN_FAMILY));
174        loader.put(p);
175        r.flush(true);
176      }
177
178      HRegion spyR = spy(r);
179      doAnswer(new Answer() {
180        @Override
181        public Object answer(InvocationOnMock invocation) throws Throwable {
182          r.writestate.writesEnabled = false;
183          return invocation.callRealMethod();
184        }
185      }).when(spyR).doRegionCompactionPrep();
186
187      // force a minor compaction, but not before requesting a stop
188      spyR.compactStores();
189
190      // ensure that the compaction stopped, all old files are intact,
191      HStore s = r.getStore(COLUMN_FAMILY);
192      assertEquals(compactionThreshold, s.getStorefilesCount());
193      assertTrue(s.getStorefilesSize() > 15*1000);
194      // and no new store files persisted past compactStores()
195      // only one empty dir exists in temp dir
196      FileStatus[] ls = r.getFilesystem().listStatus(r.getRegionFileSystem().getTempDir());
197      assertEquals(1, ls.length);
198      Path storeTempDir = new Path(r.getRegionFileSystem().getTempDir(), Bytes.toString(COLUMN_FAMILY));
199      assertTrue(r.getFilesystem().exists(storeTempDir));
200      ls = r.getFilesystem().listStatus(storeTempDir);
201      assertEquals(0, ls.length);
202    } finally {
203      // don't mess up future tests
204      r.writestate.writesEnabled = true;
205      HStore.closeCheckInterval = origWI;
206
207      // Delete all Store information once done using
208      for (int i = 0; i < compactionThreshold; i++) {
209        Delete delete = new Delete(Bytes.add(STARTROW, Bytes.toBytes(i)));
210        byte [][] famAndQf = {COLUMN_FAMILY, null};
211        delete.addFamily(famAndQf[0]);
212        r.delete(delete);
213      }
214      r.flush(true);
215
216      // Multiple versions allowed for an entry, so the delete isn't enough
217      // Lower TTL and expire to ensure that all our entries have been wiped
218      final int ttl = 1000;
219      for (HStore store : this.r.stores.values()) {
220        ScanInfo old = store.getScanInfo();
221        ScanInfo si = old.customize(old.getMaxVersions(), ttl, old.getKeepDeletedCells());
222        store.setScanInfo(si);
223      }
224      Thread.sleep(ttl);
225
226      r.compact(true);
227      assertEquals(0, count());
228    }
229  }
230
231  private int count() throws IOException {
232    int count = 0;
233    for (HStoreFile f: this.r.stores.
234        get(COLUMN_FAMILY_TEXT).getStorefiles()) {
235      HFileScanner scanner = f.getReader().getScanner(false, false);
236      if (!scanner.seekTo()) {
237        continue;
238      }
239      do {
240        count++;
241      } while(scanner.next());
242    }
243    return count;
244  }
245
246  private void createStoreFile(final HRegion region) throws IOException {
247    createStoreFile(region, Bytes.toString(COLUMN_FAMILY));
248  }
249
250  private void createStoreFile(final HRegion region, String family) throws IOException {
251    Table loader = new RegionAsTable(region);
252    HBaseTestCase.addContent(loader, family);
253    region.flush(true);
254  }
255
256  @Test
257  public void testCompactionWithCorruptResult() throws Exception {
258    int nfiles = 10;
259    for (int i = 0; i < nfiles; i++) {
260      createStoreFile(r);
261    }
262    HStore store = r.getStore(COLUMN_FAMILY);
263
264    Collection<HStoreFile> storeFiles = store.getStorefiles();
265    DefaultCompactor tool = (DefaultCompactor)store.storeEngine.getCompactor();
266    tool.compactForTesting(storeFiles, false);
267
268    // Now lets corrupt the compacted file.
269    FileSystem fs = store.getFileSystem();
270    // default compaction policy created one and only one new compacted file
271    Path dstPath = store.getRegionFileSystem().createTempName();
272    FSDataOutputStream stream = fs.create(dstPath, null, true, 512, (short)3, 1024L, null);
273    stream.writeChars("CORRUPT FILE!!!!");
274    stream.close();
275    Path origPath = store.getRegionFileSystem().commitStoreFile(
276      Bytes.toString(COLUMN_FAMILY), dstPath);
277
278    try {
279      ((HStore)store).moveFileIntoPlace(origPath);
280    } catch (Exception e) {
281      // The complete compaction should fail and the corrupt file should remain
282      // in the 'tmp' directory;
283      assertTrue(fs.exists(origPath));
284      assertFalse(fs.exists(dstPath));
285      System.out.println("testCompactionWithCorruptResult Passed");
286      return;
287    }
288    fail("testCompactionWithCorruptResult failed since no exception was" +
289        "thrown while completing a corrupt file");
290  }
291
292  /**
293   * Create a custom compaction request and be sure that we can track it through the queue, knowing
294   * when the compaction is completed.
295   */
296  @Test
297  public void testTrackingCompactionRequest() throws Exception {
298    // setup a compact/split thread on a mock server
299    HRegionServer mockServer = Mockito.mock(HRegionServer.class);
300    Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
301    CompactSplit thread = new CompactSplit(mockServer);
302    Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
303
304    // setup a region/store with some files
305    HStore store = r.getStore(COLUMN_FAMILY);
306    createStoreFile(r);
307    for (int i = 0; i < MAX_FILES_TO_COMPACT + 1; i++) {
308      createStoreFile(r);
309    }
310
311    CountDownLatch latch = new CountDownLatch(1);
312    Tracker tracker = new Tracker(latch);
313    thread.requestCompaction(r, store, "test custom comapction", PRIORITY_USER, tracker,
314      null);
315    // wait for the latch to complete.
316    latch.await();
317
318    thread.interruptIfNecessary();
319  }
320
321  @Test
322  public void testCompactionFailure() throws Exception {
323    // setup a compact/split thread on a mock server
324    HRegionServer mockServer = Mockito.mock(HRegionServer.class);
325    Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
326    CompactSplit thread = new CompactSplit(mockServer);
327    Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
328
329    // setup a region/store with some files
330    HStore store = r.getStore(COLUMN_FAMILY);
331    createStoreFile(r);
332    for (int i = 0; i < HStore.DEFAULT_BLOCKING_STOREFILE_COUNT - 1; i++) {
333      createStoreFile(r);
334    }
335
336    HRegion mockRegion = Mockito.spy(r);
337    Mockito.when(mockRegion.checkSplit()).thenThrow(new IndexOutOfBoundsException());
338
339    MetricsRegionWrapper metricsWrapper = new MetricsRegionWrapperImpl(r);
340
341    long preCompletedCount = metricsWrapper.getNumCompactionsCompleted();
342    long preFailedCount = metricsWrapper.getNumCompactionsFailed();
343
344    CountDownLatch latch = new CountDownLatch(1);
345    Tracker tracker = new Tracker(latch);
346    thread.requestCompaction(mockRegion, store, "test custom comapction", PRIORITY_USER,
347      tracker, null);
348    // wait for the latch to complete.
349    latch.await(120, TimeUnit.SECONDS);
350
351    // compaction should have completed and been marked as failed due to error in split request
352    long postCompletedCount = metricsWrapper.getNumCompactionsCompleted();
353    long postFailedCount = metricsWrapper.getNumCompactionsFailed();
354
355    assertTrue("Completed count should have increased (pre=" + preCompletedCount +
356        ", post="+postCompletedCount+")",
357        postCompletedCount > preCompletedCount);
358    assertTrue("Failed count should have increased (pre=" + preFailedCount +
359        ", post=" + postFailedCount + ")",
360        postFailedCount > preFailedCount);
361  }
362
363  /**
364   * Test no new Compaction requests are generated after calling stop compactions
365   */
366  @Test public void testStopStartCompaction() throws IOException {
367    // setup a compact/split thread on a mock server
368    HRegionServer mockServer = Mockito.mock(HRegionServer.class);
369    Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
370    CompactSplit thread = new CompactSplit(mockServer);
371    Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
372    // setup a region/store with some files
373    HStore store = r.getStore(COLUMN_FAMILY);
374    createStoreFile(r);
375    for (int i = 0; i < HStore.DEFAULT_BLOCKING_STOREFILE_COUNT - 1; i++) {
376      createStoreFile(r);
377    }
378    thread.switchCompaction(false);
379    thread
380        .requestCompaction(r, store, "test", Store.PRIORITY_USER, CompactionLifeCycleTracker.DUMMY,
381            null);
382    assertEquals(false, thread.isCompactionsEnabled());
383    assertEquals(0, thread.getLongCompactions().getActiveCount() + thread.getShortCompactions()
384        .getActiveCount());
385    thread.switchCompaction(true);
386    assertEquals(true, thread.isCompactionsEnabled());
387    thread
388        .requestCompaction(r, store, "test", Store.PRIORITY_USER, CompactionLifeCycleTracker.DUMMY,
389            null);
390    assertEquals(1, thread.getLongCompactions().getActiveCount() + thread.getShortCompactions()
391        .getActiveCount());
392  }
393
394  @Test public void testInterruptingRunningCompactions() throws Exception {
395    // setup a compact/split thread on a mock server
396    conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
397        WaitThroughPutController.class.getName());
398    HRegionServer mockServer = Mockito.mock(HRegionServer.class);
399    Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
400    CompactSplit thread = new CompactSplit(mockServer);
401
402    Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
403
404    // setup a region/store with some files
405    HStore store = r.getStore(COLUMN_FAMILY);
406    int jmax = (int) Math.ceil(15.0 / compactionThreshold);
407    byte[] pad = new byte[1000]; // 1 KB chunk
408    for (int i = 0; i < compactionThreshold; i++) {
409      Table loader = new RegionAsTable(r);
410      Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i)));
411      p.setDurability(Durability.SKIP_WAL);
412      for (int j = 0; j < jmax; j++) {
413        p.addColumn(COLUMN_FAMILY, Bytes.toBytes(j), pad);
414      }
415      HBaseTestCase.addContent(loader, Bytes.toString(COLUMN_FAMILY));
416      loader.put(p);
417      r.flush(true);
418    }
419    HStore s = r.getStore(COLUMN_FAMILY);
420    int initialFiles = s.getStorefilesCount();
421
422    thread.requestCompaction(r, store, "test custom comapction", PRIORITY_USER,
423        CompactionLifeCycleTracker.DUMMY, null);
424
425    Thread.sleep(3000);
426    thread.switchCompaction(false);
427    assertEquals(initialFiles, s.getStorefilesCount());
428    //don't mess up future tests
429    thread.switchCompaction(true);
430  }
431
432  /**
433   * HBASE-7947: Regression test to ensure adding to the correct list in the
434   * {@link CompactSplit}
435   * @throws Exception on failure
436   */
437  @Test
438  public void testMultipleCustomCompactionRequests() throws Exception {
439    // setup a compact/split thread on a mock server
440    HRegionServer mockServer = Mockito.mock(HRegionServer.class);
441    Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
442    CompactSplit thread = new CompactSplit(mockServer);
443    Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
444
445    // setup a region/store with some files
446    int numStores = r.getStores().size();
447    CountDownLatch latch = new CountDownLatch(numStores);
448    Tracker tracker = new Tracker(latch);
449    // create some store files and setup requests for each store on which we want to do a
450    // compaction
451    for (HStore store : r.getStores()) {
452      createStoreFile(r, store.getColumnFamilyName());
453      createStoreFile(r, store.getColumnFamilyName());
454      createStoreFile(r, store.getColumnFamilyName());
455      thread.requestCompaction(r, store, "test mulitple custom comapctions", PRIORITY_USER,
456        tracker, null);
457    }
458    // wait for the latch to complete.
459    latch.await();
460
461    thread.interruptIfNecessary();
462  }
463
464  class StoreMockMaker extends StatefulStoreMockMaker {
465    public ArrayList<HStoreFile> compacting = new ArrayList<>();
466    public ArrayList<HStoreFile> notCompacting = new ArrayList<>();
467    private final ArrayList<Integer> results;
468
469    public StoreMockMaker(ArrayList<Integer> results) {
470      this.results = results;
471    }
472
473    public class TestCompactionContext extends CompactionContext {
474
475      private List<HStoreFile> selectedFiles;
476
477      public TestCompactionContext(List<HStoreFile> selectedFiles) {
478        super();
479        this.selectedFiles = selectedFiles;
480      }
481
482      @Override
483      public List<HStoreFile> preSelect(List<HStoreFile> filesCompacting) {
484        return new ArrayList<>();
485      }
486
487      @Override
488      public boolean select(List<HStoreFile> filesCompacting, boolean isUserCompaction,
489          boolean mayUseOffPeak, boolean forceMajor) throws IOException {
490        this.request = new CompactionRequestImpl(selectedFiles);
491        this.request.setPriority(getPriority());
492        return true;
493      }
494
495      @Override
496      public List<Path> compact(ThroughputController throughputController, User user)
497          throws IOException {
498        finishCompaction(this.selectedFiles);
499        return new ArrayList<>();
500      }
501    }
502
503    @Override
504    public synchronized Optional<CompactionContext> selectCompaction() {
505      CompactionContext ctx = new TestCompactionContext(new ArrayList<>(notCompacting));
506      compacting.addAll(notCompacting);
507      notCompacting.clear();
508      try {
509        ctx.select(null, false, false, false);
510      } catch (IOException ex) {
511        fail("Shouldn't happen");
512      }
513      return Optional.of(ctx);
514    }
515
516    @Override
517    public synchronized void cancelCompaction(Object object) {
518      TestCompactionContext ctx = (TestCompactionContext)object;
519      compacting.removeAll(ctx.selectedFiles);
520      notCompacting.addAll(ctx.selectedFiles);
521    }
522
523    public synchronized void finishCompaction(List<HStoreFile> sfs) {
524      if (sfs.isEmpty()) return;
525      synchronized (results) {
526        results.add(sfs.size());
527      }
528      compacting.removeAll(sfs);
529    }
530
531    @Override
532    public int getPriority() {
533      return 7 - compacting.size() - notCompacting.size();
534    }
535  }
536
537  public class BlockingStoreMockMaker extends StatefulStoreMockMaker {
538    BlockingCompactionContext blocked = null;
539
540    public class BlockingCompactionContext extends CompactionContext {
541      public volatile boolean isInCompact = false;
542
543      public void unblock() {
544        synchronized (this) {
545          this.notifyAll();
546        }
547      }
548
549      @Override
550      public List<Path> compact(ThroughputController throughputController, User user)
551          throws IOException {
552        try {
553          isInCompact = true;
554          synchronized (this) {
555            this.wait();
556          }
557        } catch (InterruptedException e) {
558          Assume.assumeNoException(e);
559        }
560        return new ArrayList<>();
561      }
562
563      @Override
564      public List<HStoreFile> preSelect(List<HStoreFile> filesCompacting) {
565        return new ArrayList<>();
566      }
567
568      @Override
569      public boolean select(List<HStoreFile> f, boolean i, boolean m, boolean e)
570          throws IOException {
571        this.request = new CompactionRequestImpl(new ArrayList<>());
572        return true;
573      }
574    }
575
576    @Override
577    public Optional<CompactionContext> selectCompaction() {
578      this.blocked = new BlockingCompactionContext();
579      try {
580        this.blocked.select(null, false, false, false);
581      } catch (IOException ex) {
582        fail("Shouldn't happen");
583      }
584      return Optional.of(blocked);
585    }
586
587    @Override
588    public void cancelCompaction(Object object) {}
589
590    @Override
591    public int getPriority() {
592      return Integer.MIN_VALUE; // some invalid value, see createStoreMock
593    }
594
595    public BlockingCompactionContext waitForBlocking() {
596      while (this.blocked == null || !this.blocked.isInCompact) {
597        Threads.sleepWithoutInterrupt(50);
598      }
599      BlockingCompactionContext ctx = this.blocked;
600      this.blocked = null;
601      return ctx;
602    }
603
604    @Override
605    public HStore createStoreMock(String name) throws Exception {
606      return createStoreMock(Integer.MIN_VALUE, name);
607    }
608
609    public HStore createStoreMock(int priority, String name) throws Exception {
610      // Override the mock to always return the specified priority.
611      HStore s = super.createStoreMock(name);
612      when(s.getCompactPriority()).thenReturn(priority);
613      return s;
614    }
615  }
616
617  /** Test compaction priority management and multiple compactions per store (HBASE-8665). */
618  @Test
619  public void testCompactionQueuePriorities() throws Exception {
620    // Setup a compact/split thread on a mock server.
621    final Configuration conf = HBaseConfiguration.create();
622    HRegionServer mockServer = mock(HRegionServer.class);
623    when(mockServer.isStopped()).thenReturn(false);
624    when(mockServer.getConfiguration()).thenReturn(conf);
625    when(mockServer.getChoreService()).thenReturn(new ChoreService("test"));
626    CompactSplit cst = new CompactSplit(mockServer);
627    when(mockServer.getCompactSplitThread()).thenReturn(cst);
628    //prevent large compaction thread pool stealing job from small compaction queue.
629    cst.shutdownLongCompactions();
630    // Set up the region mock that redirects compactions.
631    HRegion r = mock(HRegion.class);
632    when(
633      r.compact(any(), any(), any(), any())).then(new Answer<Boolean>() {
634        @Override
635        public Boolean answer(InvocationOnMock invocation) throws Throwable {
636          invocation.<CompactionContext>getArgument(0).compact(invocation.getArgument(2), null);
637          return true;
638        }
639    });
640
641    // Set up store mocks for 2 "real" stores and the one we use for blocking CST.
642    ArrayList<Integer> results = new ArrayList<>();
643    StoreMockMaker sm = new StoreMockMaker(results), sm2 = new StoreMockMaker(results);
644    HStore store = sm.createStoreMock("store1");
645    HStore store2 = sm2.createStoreMock("store2");
646    BlockingStoreMockMaker blocker = new BlockingStoreMockMaker();
647
648    // First, block the compaction thread so that we could muck with queue.
649    cst.requestSystemCompaction(r, blocker.createStoreMock(1, "b-pri1"), "b-pri1");
650    BlockingStoreMockMaker.BlockingCompactionContext currentBlock = blocker.waitForBlocking();
651
652    // Add 4 files to store1, 3 to store2, and queue compactions; pri 3 and 4 respectively.
653    for (int i = 0; i < 4; ++i) {
654      sm.notCompacting.add(createFile());
655    }
656    cst.requestSystemCompaction(r, store, "s1-pri3");
657    for (int i = 0; i < 3; ++i) {
658      sm2.notCompacting.add(createFile());
659    }
660    cst.requestSystemCompaction(r, store2, "s2-pri4");
661    // Now add 2 more files to store1 and queue compaction - pri 1.
662    for (int i = 0; i < 2; ++i) {
663      sm.notCompacting.add(createFile());
664    }
665    cst.requestSystemCompaction(r, store, "s1-pri1");
666    // Finally add blocking compaction with priority 2.
667    cst.requestSystemCompaction(r, blocker.createStoreMock(2, "b-pri2"), "b-pri2");
668
669    // Unblock the blocking compaction; we should run pri1 and become block again in pri2.
670    currentBlock.unblock();
671    currentBlock = blocker.waitForBlocking();
672    // Pri1 should have "compacted" all 6 files.
673    assertEquals(1, results.size());
674    assertEquals(6, results.get(0).intValue());
675    // Add 2 files to store 1 (it has 2 files now).
676    for (int i = 0; i < 2; ++i) {
677      sm.notCompacting.add(createFile());
678    }
679    // Now we have pri4 for store 2 in queue, and pri3 for store1; store1's current priority
680    // is 5, however, so it must not preempt store 2. Add blocking compaction at the end.
681    cst.requestSystemCompaction(r, blocker.createStoreMock(7, "b-pri7"), "b-pri7");
682    currentBlock.unblock();
683    currentBlock = blocker.waitForBlocking();
684    assertEquals(3, results.size());
685    assertEquals(3, results.get(1).intValue()); // 3 files should go before 2 files.
686    assertEquals(2, results.get(2).intValue());
687
688    currentBlock.unblock();
689    cst.interruptIfNecessary();
690  }
691
692  /**
693   * Firstly write 10 cells (with different time stamp) to a qualifier and flush
694   * to hfile1, then write 10 cells (with different time stamp) to the same
695   * qualifier and flush to hfile2. The latest cell (cell-A) in hfile1 and the
696   * oldest cell (cell-B) in hfile2 are with the same time stamp but different
697   * sequence id, and will get scanned successively during compaction.
698   * <p/>
699   * We set compaction.kv.max to 10 so compaction will scan 10 versions each
700   * round, meanwhile we set keepSeqIdPeriod=0 in {@link DummyCompactor} so all
701   * 10 versions of hfile2 will be written out with seqId cleaned (set to 0)
702   * including cell-B, then when scanner goes to cell-A it will cause a scan
703   * out-of-order assertion error before HBASE-16931
704   *
705   * @throws Exception
706   *           if error occurs during the test
707   */
708  @Test
709  public void testCompactionSeqId() throws Exception {
710    final byte[] ROW = Bytes.toBytes("row");
711    final byte[] QUALIFIER = Bytes.toBytes("qualifier");
712
713    long timestamp = 10000;
714
715    // row1/cf:a/10009/Put/vlen=2/seqid=11 V: v9
716    // row1/cf:a/10008/Put/vlen=2/seqid=10 V: v8
717    // row1/cf:a/10007/Put/vlen=2/seqid=9 V: v7
718    // row1/cf:a/10006/Put/vlen=2/seqid=8 V: v6
719    // row1/cf:a/10005/Put/vlen=2/seqid=7 V: v5
720    // row1/cf:a/10004/Put/vlen=2/seqid=6 V: v4
721    // row1/cf:a/10003/Put/vlen=2/seqid=5 V: v3
722    // row1/cf:a/10002/Put/vlen=2/seqid=4 V: v2
723    // row1/cf:a/10001/Put/vlen=2/seqid=3 V: v1
724    // row1/cf:a/10000/Put/vlen=2/seqid=2 V: v0
725    for (int i = 0; i < 10; i++) {
726      Put put = new Put(ROW);
727      put.addColumn(FAMILY, QUALIFIER, timestamp + i, Bytes.toBytes("v" + i));
728      r.put(put);
729    }
730    r.flush(true);
731
732    // row1/cf:a/10018/Put/vlen=3/seqid=16 V: v18
733    // row1/cf:a/10017/Put/vlen=3/seqid=17 V: v17
734    // row1/cf:a/10016/Put/vlen=3/seqid=18 V: v16
735    // row1/cf:a/10015/Put/vlen=3/seqid=19 V: v15
736    // row1/cf:a/10014/Put/vlen=3/seqid=20 V: v14
737    // row1/cf:a/10013/Put/vlen=3/seqid=21 V: v13
738    // row1/cf:a/10012/Put/vlen=3/seqid=22 V: v12
739    // row1/cf:a/10011/Put/vlen=3/seqid=23 V: v11
740    // row1/cf:a/10010/Put/vlen=3/seqid=24 V: v10
741    // row1/cf:a/10009/Put/vlen=2/seqid=25 V: v9
742    for (int i = 18; i > 8; i--) {
743      Put put = new Put(ROW);
744      put.addColumn(FAMILY, QUALIFIER, timestamp + i, Bytes.toBytes("v" + i));
745      r.put(put);
746    }
747    r.flush(true);
748    r.compact(true);
749  }
750
751  public static class DummyCompactor extends DefaultCompactor {
752    public DummyCompactor(Configuration conf, HStore store) {
753      super(conf, store);
754      this.keepSeqIdPeriod = 0;
755    }
756  }
757
758  private static HStoreFile createFile() throws Exception {
759    HStoreFile sf = mock(HStoreFile.class);
760    when(sf.getPath()).thenReturn(new Path("file"));
761    StoreFileReader r = mock(StoreFileReader.class);
762    when(r.length()).thenReturn(10L);
763    when(sf.getReader()).thenReturn(r);
764    return sf;
765  }
766
767  /**
768   * Simple {@link CompactionLifeCycleTracker} on which you can wait until the requested compaction
769   * finishes.
770   */
771  public static class Tracker implements CompactionLifeCycleTracker {
772
773    private final CountDownLatch done;
774
775    public Tracker(CountDownLatch done) {
776      this.done = done;
777    }
778
779    @Override
780    public void afterExecution(Store store) {
781      done.countDown();
782    }
783  }
784
785  /**
786   * Simple {@link CompactionLifeCycleTracker} on which you can wait until the requested compaction
787   * finishes.
788   */
789  public static class WaitThroughPutController extends NoLimitThroughputController{
790
791    public WaitThroughPutController() {
792    }
793
794    @Override
795    public long control(String compactionName, long size) throws InterruptedException {
796      Thread.sleep(6000000);
797      return 6000000;
798    }
799  }
800}