001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY;
021import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY_BYTES;
022import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
023import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER;
024import static org.junit.Assert.assertEquals;
025import static org.junit.Assert.assertFalse;
026import static org.junit.Assert.assertTrue;
027import static org.junit.Assert.fail;
028import static org.mockito.Matchers.any;
029import static org.mockito.Mockito.doAnswer;
030import static org.mockito.Mockito.mock;
031import static org.mockito.Mockito.spy;
032import static org.mockito.Mockito.when;
033
034import java.io.IOException;
035import java.util.ArrayList;
036import java.util.Collection;
037import java.util.List;
038import java.util.Optional;
039import java.util.concurrent.CountDownLatch;
040import java.util.concurrent.TimeUnit;
041import org.apache.hadoop.conf.Configuration;
042import org.apache.hadoop.fs.FSDataOutputStream;
043import org.apache.hadoop.fs.FileStatus;
044import org.apache.hadoop.fs.FileSystem;
045import org.apache.hadoop.fs.Path;
046import org.apache.hadoop.hbase.ChoreService;
047import org.apache.hadoop.hbase.HBaseClassTestRule;
048import org.apache.hadoop.hbase.HBaseConfiguration;
049import org.apache.hadoop.hbase.HBaseTestCase;
050import org.apache.hadoop.hbase.HBaseTestingUtility;
051import org.apache.hadoop.hbase.HColumnDescriptor;
052import org.apache.hadoop.hbase.HConstants;
053import org.apache.hadoop.hbase.HTableDescriptor;
054import org.apache.hadoop.hbase.client.Delete;
055import org.apache.hadoop.hbase.client.Durability;
056import org.apache.hadoop.hbase.client.Put;
057import org.apache.hadoop.hbase.client.Table;
058import org.apache.hadoop.hbase.io.hfile.HFileScanner;
059import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
060import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
061import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
062import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
063import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
064import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
065import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
066import org.apache.hadoop.hbase.security.User;
067import org.apache.hadoop.hbase.testclassification.MediumTests;
068import org.apache.hadoop.hbase.testclassification.RegionServerTests;
069import org.apache.hadoop.hbase.util.Bytes;
070import org.apache.hadoop.hbase.util.Threads;
071import org.apache.hadoop.hbase.wal.WAL;
072import org.junit.After;
073import org.junit.Assume;
074import org.junit.Before;
075import org.junit.ClassRule;
076import org.junit.Rule;
077import org.junit.Test;
078import org.junit.experimental.categories.Category;
079import org.junit.rules.TestName;
080import org.mockito.Mockito;
081import org.mockito.invocation.InvocationOnMock;
082import org.mockito.stubbing.Answer;
083
084/**
085 * Test compaction framework and common functions
086 */
087@Category({RegionServerTests.class, MediumTests.class})
088public class TestCompaction {
089
090  @ClassRule
091  public static final HBaseClassTestRule CLASS_RULE =
092      HBaseClassTestRule.forClass(TestCompaction.class);
093
094  @Rule public TestName name = new TestName();
095  private static final HBaseTestingUtility UTIL = HBaseTestingUtility.createLocalHTU();
096  protected Configuration conf = UTIL.getConfiguration();
097
098  private HRegion r = null;
099  private HTableDescriptor htd = null;
100  private static final byte [] COLUMN_FAMILY = fam1;
101  private final byte [] STARTROW = Bytes.toBytes(START_KEY);
102  private static final byte [] COLUMN_FAMILY_TEXT = COLUMN_FAMILY;
103  private int compactionThreshold;
104  private byte[] secondRowBytes, thirdRowBytes;
105  private static final long MAX_FILES_TO_COMPACT = 10;
106  private final byte[] FAMILY = Bytes.toBytes("cf");
107
108  /** constructor */
109  public TestCompaction() {
110    super();
111
112    // Set cache flush size to 1MB
113    conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024);
114    conf.setInt(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, 100);
115    conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
116        NoLimitThroughputController.class.getName());
117    compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3);
118
119    secondRowBytes = START_KEY_BYTES.clone();
120    // Increment the least significant character so we get to next row.
121    secondRowBytes[START_KEY_BYTES.length - 1]++;
122    thirdRowBytes = START_KEY_BYTES.clone();
123    thirdRowBytes[START_KEY_BYTES.length - 1] =
124        (byte) (thirdRowBytes[START_KEY_BYTES.length - 1] + 2);
125  }
126
127  @Before
128  public void setUp() throws Exception {
129    this.htd = UTIL.createTableDescriptor(name.getMethodName());
130    if (name.getMethodName().equals("testCompactionSeqId")) {
131      UTIL.getConfiguration().set("hbase.hstore.compaction.kv.max", "10");
132      UTIL.getConfiguration().set(
133          DefaultStoreEngine.DEFAULT_COMPACTOR_CLASS_KEY,
134          DummyCompactor.class.getName());
135      HColumnDescriptor hcd = new HColumnDescriptor(FAMILY);
136      hcd.setMaxVersions(65536);
137      this.htd.addFamily(hcd);
138    }
139    this.r = UTIL.createLocalHRegion(htd, null, null);
140  }
141
142  @After
143  public void tearDown() throws Exception {
144    WAL wal = r.getWAL();
145    this.r.close();
146    wal.close();
147  }
148
149  /**
150   * Verify that you can stop a long-running compaction
151   * (used during RS shutdown)
152   * @throws Exception
153   */
154  @Test
155  public void testInterruptCompaction() throws Exception {
156    assertEquals(0, count());
157
158    // lower the polling interval for this test
159    int origWI = HStore.closeCheckInterval;
160    HStore.closeCheckInterval = 10*1000; // 10 KB
161
162    try {
163      // Create a couple store files w/ 15KB (over 10KB interval)
164      int jmax = (int) Math.ceil(15.0/compactionThreshold);
165      byte [] pad = new byte[1000]; // 1 KB chunk
166      for (int i = 0; i < compactionThreshold; i++) {
167        Table loader = new RegionAsTable(r);
168        Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i)));
169        p.setDurability(Durability.SKIP_WAL);
170        for (int j = 0; j < jmax; j++) {
171          p.addColumn(COLUMN_FAMILY, Bytes.toBytes(j), pad);
172        }
173        HBaseTestCase.addContent(loader, Bytes.toString(COLUMN_FAMILY));
174        loader.put(p);
175        r.flush(true);
176      }
177
178      HRegion spyR = spy(r);
179      doAnswer(new Answer() {
180        @Override
181        public Object answer(InvocationOnMock invocation) throws Throwable {
182          r.writestate.writesEnabled = false;
183          return invocation.callRealMethod();
184        }
185      }).when(spyR).doRegionCompactionPrep();
186
187      // force a minor compaction, but not before requesting a stop
188      spyR.compactStores();
189
190      // ensure that the compaction stopped, all old files are intact,
191      HStore s = r.getStore(COLUMN_FAMILY);
192      assertEquals(compactionThreshold, s.getStorefilesCount());
193      assertTrue(s.getStorefilesSize() > 15*1000);
194      // and no new store files persisted past compactStores()
195      // only one empty dir exists in temp dir
196      FileStatus[] ls = r.getFilesystem().listStatus(r.getRegionFileSystem().getTempDir());
197      assertEquals(1, ls.length);
198      Path storeTempDir = new Path(r.getRegionFileSystem().getTempDir(), Bytes.toString(COLUMN_FAMILY));
199      assertTrue(r.getFilesystem().exists(storeTempDir));
200      ls = r.getFilesystem().listStatus(storeTempDir);
201      assertEquals(0, ls.length);
202    } finally {
203      // don't mess up future tests
204      r.writestate.writesEnabled = true;
205      HStore.closeCheckInterval = origWI;
206
207      // Delete all Store information once done using
208      for (int i = 0; i < compactionThreshold; i++) {
209        Delete delete = new Delete(Bytes.add(STARTROW, Bytes.toBytes(i)));
210        byte [][] famAndQf = {COLUMN_FAMILY, null};
211        delete.addFamily(famAndQf[0]);
212        r.delete(delete);
213      }
214      r.flush(true);
215
216      // Multiple versions allowed for an entry, so the delete isn't enough
217      // Lower TTL and expire to ensure that all our entries have been wiped
218      final int ttl = 1000;
219      for (HStore store : this.r.stores.values()) {
220        ScanInfo old = store.getScanInfo();
221        ScanInfo si = old.customize(old.getMaxVersions(), ttl, old.getKeepDeletedCells());
222        store.setScanInfo(si);
223      }
224      Thread.sleep(ttl);
225
226      r.compact(true);
227      assertEquals(0, count());
228    }
229  }
230
231  private int count() throws IOException {
232    int count = 0;
233    for (HStoreFile f: this.r.stores.
234        get(COLUMN_FAMILY_TEXT).getStorefiles()) {
235      HFileScanner scanner = f.getReader().getScanner(false, false);
236      if (!scanner.seekTo()) {
237        continue;
238      }
239      do {
240        count++;
241      } while(scanner.next());
242    }
243    return count;
244  }
245
246  private void createStoreFile(final HRegion region) throws IOException {
247    createStoreFile(region, Bytes.toString(COLUMN_FAMILY));
248  }
249
250  private void createStoreFile(final HRegion region, String family) throws IOException {
251    Table loader = new RegionAsTable(region);
252    HBaseTestCase.addContent(loader, family);
253    region.flush(true);
254  }
255
256  @Test
257  public void testCompactionWithCorruptResult() throws Exception {
258    int nfiles = 10;
259    for (int i = 0; i < nfiles; i++) {
260      createStoreFile(r);
261    }
262    HStore store = r.getStore(COLUMN_FAMILY);
263
264    Collection<HStoreFile> storeFiles = store.getStorefiles();
265    DefaultCompactor tool = (DefaultCompactor)store.storeEngine.getCompactor();
266    tool.compactForTesting(storeFiles, false);
267
268    // Now lets corrupt the compacted file.
269    FileSystem fs = store.getFileSystem();
270    // default compaction policy created one and only one new compacted file
271    Path dstPath = store.getRegionFileSystem().createTempName();
272    FSDataOutputStream stream = fs.create(dstPath, null, true, 512, (short)3, 1024L, null);
273    stream.writeChars("CORRUPT FILE!!!!");
274    stream.close();
275    Path origPath = store.getRegionFileSystem().commitStoreFile(
276      Bytes.toString(COLUMN_FAMILY), dstPath);
277
278    try {
279      ((HStore)store).moveFileIntoPlace(origPath);
280    } catch (Exception e) {
281      // The complete compaction should fail and the corrupt file should remain
282      // in the 'tmp' directory;
283      assertTrue(fs.exists(origPath));
284      assertFalse(fs.exists(dstPath));
285      System.out.println("testCompactionWithCorruptResult Passed");
286      return;
287    }
288    fail("testCompactionWithCorruptResult failed since no exception was" +
289        "thrown while completing a corrupt file");
290  }
291
292  /**
293   * Create a custom compaction request and be sure that we can track it through the queue, knowing
294   * when the compaction is completed.
295   */
296  @Test
297  public void testTrackingCompactionRequest() throws Exception {
298    // setup a compact/split thread on a mock server
299    HRegionServer mockServer = Mockito.mock(HRegionServer.class);
300    Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
301    CompactSplit thread = new CompactSplit(mockServer);
302    Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
303
304    // setup a region/store with some files
305    HStore store = r.getStore(COLUMN_FAMILY);
306    createStoreFile(r);
307    for (int i = 0; i < MAX_FILES_TO_COMPACT + 1; i++) {
308      createStoreFile(r);
309    }
310
311    CountDownLatch latch = new CountDownLatch(1);
312    Tracker tracker = new Tracker(latch);
313    thread.requestCompaction(r, store, "test custom comapction", PRIORITY_USER, tracker,
314      null);
315    // wait for the latch to complete.
316    latch.await();
317
318    thread.interruptIfNecessary();
319  }
320
321  @Test
322  public void testCompactionFailure() throws Exception {
323    // setup a compact/split thread on a mock server
324    HRegionServer mockServer = Mockito.mock(HRegionServer.class);
325    Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
326    CompactSplit thread = new CompactSplit(mockServer);
327    Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
328
329    // setup a region/store with some files
330    HStore store = r.getStore(COLUMN_FAMILY);
331    createStoreFile(r);
332    for (int i = 0; i < HStore.DEFAULT_BLOCKING_STOREFILE_COUNT - 1; i++) {
333      createStoreFile(r);
334    }
335
336    HRegion mockRegion = Mockito.spy(r);
337    Mockito.when(mockRegion.checkSplit()).thenThrow(new IndexOutOfBoundsException());
338
339    MetricsRegionWrapper metricsWrapper = new MetricsRegionWrapperImpl(r);
340
341    long preCompletedCount = metricsWrapper.getNumCompactionsCompleted();
342    long preFailedCount = metricsWrapper.getNumCompactionsFailed();
343
344    CountDownLatch latch = new CountDownLatch(1);
345    Tracker tracker = new Tracker(latch);
346    thread.requestCompaction(mockRegion, store, "test custom comapction", PRIORITY_USER,
347      tracker, null);
348    // wait for the latch to complete.
349    latch.await(120, TimeUnit.SECONDS);
350
351    // compaction should have completed and been marked as failed due to error in split request
352    long postCompletedCount = metricsWrapper.getNumCompactionsCompleted();
353    long postFailedCount = metricsWrapper.getNumCompactionsFailed();
354
355    assertTrue("Completed count should have increased (pre=" + preCompletedCount +
356        ", post="+postCompletedCount+")",
357        postCompletedCount > preCompletedCount);
358    assertTrue("Failed count should have increased (pre=" + preFailedCount +
359        ", post=" + postFailedCount + ")",
360        postFailedCount > preFailedCount);
361  }
362
363  /**
364   * HBASE-7947: Regression test to ensure adding to the correct list in the
365   * {@link CompactSplit}
366   * @throws Exception on failure
367   */
368  @Test
369  public void testMultipleCustomCompactionRequests() throws Exception {
370    // setup a compact/split thread on a mock server
371    HRegionServer mockServer = Mockito.mock(HRegionServer.class);
372    Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
373    CompactSplit thread = new CompactSplit(mockServer);
374    Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
375
376    // setup a region/store with some files
377    int numStores = r.getStores().size();
378    CountDownLatch latch = new CountDownLatch(numStores);
379    Tracker tracker = new Tracker(latch);
380    // create some store files and setup requests for each store on which we want to do a
381    // compaction
382    for (HStore store : r.getStores()) {
383      createStoreFile(r, store.getColumnFamilyName());
384      createStoreFile(r, store.getColumnFamilyName());
385      createStoreFile(r, store.getColumnFamilyName());
386      thread.requestCompaction(r, store, "test mulitple custom comapctions", PRIORITY_USER,
387        tracker, null);
388    }
389    // wait for the latch to complete.
390    latch.await();
391
392    thread.interruptIfNecessary();
393  }
394
395  class StoreMockMaker extends StatefulStoreMockMaker {
396    public ArrayList<HStoreFile> compacting = new ArrayList<>();
397    public ArrayList<HStoreFile> notCompacting = new ArrayList<>();
398    private final ArrayList<Integer> results;
399
400    public StoreMockMaker(ArrayList<Integer> results) {
401      this.results = results;
402    }
403
404    public class TestCompactionContext extends CompactionContext {
405
406      private List<HStoreFile> selectedFiles;
407
408      public TestCompactionContext(List<HStoreFile> selectedFiles) {
409        super();
410        this.selectedFiles = selectedFiles;
411      }
412
413      @Override
414      public List<HStoreFile> preSelect(List<HStoreFile> filesCompacting) {
415        return new ArrayList<>();
416      }
417
418      @Override
419      public boolean select(List<HStoreFile> filesCompacting, boolean isUserCompaction,
420          boolean mayUseOffPeak, boolean forceMajor) throws IOException {
421        this.request = new CompactionRequestImpl(selectedFiles);
422        this.request.setPriority(getPriority());
423        return true;
424      }
425
426      @Override
427      public List<Path> compact(ThroughputController throughputController, User user)
428          throws IOException {
429        finishCompaction(this.selectedFiles);
430        return new ArrayList<>();
431      }
432    }
433
434    @Override
435    public synchronized Optional<CompactionContext> selectCompaction() {
436      CompactionContext ctx = new TestCompactionContext(new ArrayList<>(notCompacting));
437      compacting.addAll(notCompacting);
438      notCompacting.clear();
439      try {
440        ctx.select(null, false, false, false);
441      } catch (IOException ex) {
442        fail("Shouldn't happen");
443      }
444      return Optional.of(ctx);
445    }
446
447    @Override
448    public synchronized void cancelCompaction(Object object) {
449      TestCompactionContext ctx = (TestCompactionContext)object;
450      compacting.removeAll(ctx.selectedFiles);
451      notCompacting.addAll(ctx.selectedFiles);
452    }
453
454    public synchronized void finishCompaction(List<HStoreFile> sfs) {
455      if (sfs.isEmpty()) return;
456      synchronized (results) {
457        results.add(sfs.size());
458      }
459      compacting.removeAll(sfs);
460    }
461
462    @Override
463    public int getPriority() {
464      return 7 - compacting.size() - notCompacting.size();
465    }
466  }
467
468  public class BlockingStoreMockMaker extends StatefulStoreMockMaker {
469    BlockingCompactionContext blocked = null;
470
471    public class BlockingCompactionContext extends CompactionContext {
472      public volatile boolean isInCompact = false;
473
474      public void unblock() {
475        synchronized (this) {
476          this.notifyAll();
477        }
478      }
479
480      @Override
481      public List<Path> compact(ThroughputController throughputController, User user)
482          throws IOException {
483        try {
484          isInCompact = true;
485          synchronized (this) {
486            this.wait();
487          }
488        } catch (InterruptedException e) {
489          Assume.assumeNoException(e);
490        }
491        return new ArrayList<>();
492      }
493
494      @Override
495      public List<HStoreFile> preSelect(List<HStoreFile> filesCompacting) {
496        return new ArrayList<>();
497      }
498
499      @Override
500      public boolean select(List<HStoreFile> f, boolean i, boolean m, boolean e)
501          throws IOException {
502        this.request = new CompactionRequestImpl(new ArrayList<>());
503        return true;
504      }
505    }
506
507    @Override
508    public Optional<CompactionContext> selectCompaction() {
509      this.blocked = new BlockingCompactionContext();
510      try {
511        this.blocked.select(null, false, false, false);
512      } catch (IOException ex) {
513        fail("Shouldn't happen");
514      }
515      return Optional.of(blocked);
516    }
517
518    @Override
519    public void cancelCompaction(Object object) {}
520
521    @Override
522    public int getPriority() {
523      return Integer.MIN_VALUE; // some invalid value, see createStoreMock
524    }
525
526    public BlockingCompactionContext waitForBlocking() {
527      while (this.blocked == null || !this.blocked.isInCompact) {
528        Threads.sleepWithoutInterrupt(50);
529      }
530      BlockingCompactionContext ctx = this.blocked;
531      this.blocked = null;
532      return ctx;
533    }
534
535    @Override
536    public HStore createStoreMock(String name) throws Exception {
537      return createStoreMock(Integer.MIN_VALUE, name);
538    }
539
540    public HStore createStoreMock(int priority, String name) throws Exception {
541      // Override the mock to always return the specified priority.
542      HStore s = super.createStoreMock(name);
543      when(s.getCompactPriority()).thenReturn(priority);
544      return s;
545    }
546  }
547
548  /** Test compaction priority management and multiple compactions per store (HBASE-8665). */
549  @Test
550  public void testCompactionQueuePriorities() throws Exception {
551    // Setup a compact/split thread on a mock server.
552    final Configuration conf = HBaseConfiguration.create();
553    HRegionServer mockServer = mock(HRegionServer.class);
554    when(mockServer.isStopped()).thenReturn(false);
555    when(mockServer.getConfiguration()).thenReturn(conf);
556    when(mockServer.getChoreService()).thenReturn(new ChoreService("test"));
557    CompactSplit cst = new CompactSplit(mockServer);
558    when(mockServer.getCompactSplitThread()).thenReturn(cst);
559    //prevent large compaction thread pool stealing job from small compaction queue.
560    cst.shutdownLongCompactions();
561    // Set up the region mock that redirects compactions.
562    HRegion r = mock(HRegion.class);
563    when(
564      r.compact(any(), any(), any(), any())).then(new Answer<Boolean>() {
565        @Override
566        public Boolean answer(InvocationOnMock invocation) throws Throwable {
567          invocation.<CompactionContext>getArgument(0).compact(invocation.getArgument(2), null);
568          return true;
569        }
570    });
571
572    // Set up store mocks for 2 "real" stores and the one we use for blocking CST.
573    ArrayList<Integer> results = new ArrayList<>();
574    StoreMockMaker sm = new StoreMockMaker(results), sm2 = new StoreMockMaker(results);
575    HStore store = sm.createStoreMock("store1");
576    HStore store2 = sm2.createStoreMock("store2");
577    BlockingStoreMockMaker blocker = new BlockingStoreMockMaker();
578
579    // First, block the compaction thread so that we could muck with queue.
580    cst.requestSystemCompaction(r, blocker.createStoreMock(1, "b-pri1"), "b-pri1");
581    BlockingStoreMockMaker.BlockingCompactionContext currentBlock = blocker.waitForBlocking();
582
583    // Add 4 files to store1, 3 to store2, and queue compactions; pri 3 and 4 respectively.
584    for (int i = 0; i < 4; ++i) {
585      sm.notCompacting.add(createFile());
586    }
587    cst.requestSystemCompaction(r, store, "s1-pri3");
588    for (int i = 0; i < 3; ++i) {
589      sm2.notCompacting.add(createFile());
590    }
591    cst.requestSystemCompaction(r, store2, "s2-pri4");
592    // Now add 2 more files to store1 and queue compaction - pri 1.
593    for (int i = 0; i < 2; ++i) {
594      sm.notCompacting.add(createFile());
595    }
596    cst.requestSystemCompaction(r, store, "s1-pri1");
597    // Finally add blocking compaction with priority 2.
598    cst.requestSystemCompaction(r, blocker.createStoreMock(2, "b-pri2"), "b-pri2");
599
600    // Unblock the blocking compaction; we should run pri1 and become block again in pri2.
601    currentBlock.unblock();
602    currentBlock = blocker.waitForBlocking();
603    // Pri1 should have "compacted" all 6 files.
604    assertEquals(1, results.size());
605    assertEquals(6, results.get(0).intValue());
606    // Add 2 files to store 1 (it has 2 files now).
607    for (int i = 0; i < 2; ++i) {
608      sm.notCompacting.add(createFile());
609    }
610    // Now we have pri4 for store 2 in queue, and pri3 for store1; store1's current priority
611    // is 5, however, so it must not preempt store 2. Add blocking compaction at the end.
612    cst.requestSystemCompaction(r, blocker.createStoreMock(7, "b-pri7"), "b-pri7");
613    currentBlock.unblock();
614    currentBlock = blocker.waitForBlocking();
615    assertEquals(3, results.size());
616    assertEquals(3, results.get(1).intValue()); // 3 files should go before 2 files.
617    assertEquals(2, results.get(2).intValue());
618
619    currentBlock.unblock();
620    cst.interruptIfNecessary();
621  }
622
623  /**
624   * Firstly write 10 cells (with different time stamp) to a qualifier and flush
625   * to hfile1, then write 10 cells (with different time stamp) to the same
626   * qualifier and flush to hfile2. The latest cell (cell-A) in hfile1 and the
627   * oldest cell (cell-B) in hfile2 are with the same time stamp but different
628   * sequence id, and will get scanned successively during compaction.
629   * <p/>
630   * We set compaction.kv.max to 10 so compaction will scan 10 versions each
631   * round, meanwhile we set keepSeqIdPeriod=0 in {@link DummyCompactor} so all
632   * 10 versions of hfile2 will be written out with seqId cleaned (set to 0)
633   * including cell-B, then when scanner goes to cell-A it will cause a scan
634   * out-of-order assertion error before HBASE-16931
635   *
636   * @throws Exception
637   *           if error occurs during the test
638   */
639  @Test
640  public void testCompactionSeqId() throws Exception {
641    final byte[] ROW = Bytes.toBytes("row");
642    final byte[] QUALIFIER = Bytes.toBytes("qualifier");
643
644    long timestamp = 10000;
645
646    // row1/cf:a/10009/Put/vlen=2/seqid=11 V: v9
647    // row1/cf:a/10008/Put/vlen=2/seqid=10 V: v8
648    // row1/cf:a/10007/Put/vlen=2/seqid=9 V: v7
649    // row1/cf:a/10006/Put/vlen=2/seqid=8 V: v6
650    // row1/cf:a/10005/Put/vlen=2/seqid=7 V: v5
651    // row1/cf:a/10004/Put/vlen=2/seqid=6 V: v4
652    // row1/cf:a/10003/Put/vlen=2/seqid=5 V: v3
653    // row1/cf:a/10002/Put/vlen=2/seqid=4 V: v2
654    // row1/cf:a/10001/Put/vlen=2/seqid=3 V: v1
655    // row1/cf:a/10000/Put/vlen=2/seqid=2 V: v0
656    for (int i = 0; i < 10; i++) {
657      Put put = new Put(ROW);
658      put.addColumn(FAMILY, QUALIFIER, timestamp + i, Bytes.toBytes("v" + i));
659      r.put(put);
660    }
661    r.flush(true);
662
663    // row1/cf:a/10018/Put/vlen=3/seqid=16 V: v18
664    // row1/cf:a/10017/Put/vlen=3/seqid=17 V: v17
665    // row1/cf:a/10016/Put/vlen=3/seqid=18 V: v16
666    // row1/cf:a/10015/Put/vlen=3/seqid=19 V: v15
667    // row1/cf:a/10014/Put/vlen=3/seqid=20 V: v14
668    // row1/cf:a/10013/Put/vlen=3/seqid=21 V: v13
669    // row1/cf:a/10012/Put/vlen=3/seqid=22 V: v12
670    // row1/cf:a/10011/Put/vlen=3/seqid=23 V: v11
671    // row1/cf:a/10010/Put/vlen=3/seqid=24 V: v10
672    // row1/cf:a/10009/Put/vlen=2/seqid=25 V: v9
673    for (int i = 18; i > 8; i--) {
674      Put put = new Put(ROW);
675      put.addColumn(FAMILY, QUALIFIER, timestamp + i, Bytes.toBytes("v" + i));
676      r.put(put);
677    }
678    r.flush(true);
679    r.compact(true);
680  }
681
682  public static class DummyCompactor extends DefaultCompactor {
683    public DummyCompactor(Configuration conf, HStore store) {
684      super(conf, store);
685      this.keepSeqIdPeriod = 0;
686    }
687  }
688
689  private static HStoreFile createFile() throws Exception {
690    HStoreFile sf = mock(HStoreFile.class);
691    when(sf.getPath()).thenReturn(new Path("file"));
692    StoreFileReader r = mock(StoreFileReader.class);
693    when(r.length()).thenReturn(10L);
694    when(sf.getReader()).thenReturn(r);
695    return sf;
696  }
697
698  /**
699   * Simple {@link CompactionLifeCycleTracker} on which you can wait until the requested compaction
700   * finishes.
701   */
702  public static class Tracker implements CompactionLifeCycleTracker {
703
704    private final CountDownLatch done;
705
706    public Tracker(CountDownLatch done) {
707      this.done = done;
708    }
709
710    @Override
711    public void afterExecution(Store store) {
712      done.countDown();
713    }
714  }
715}