001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.HBaseTestingUtil.START_KEY;
021import static org.apache.hadoop.hbase.HBaseTestingUtil.START_KEY_BYTES;
022import static org.apache.hadoop.hbase.HBaseTestingUtil.fam1;
023import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER;
024import static org.apache.hadoop.hbase.regionserver.compactions.CloseChecker.SIZE_LIMIT_KEY;
025import static org.apache.hadoop.hbase.regionserver.compactions.CloseChecker.TIME_LIMIT_KEY;
026import static org.junit.Assert.assertEquals;
027import static org.junit.Assert.assertFalse;
028import static org.junit.Assert.assertThrows;
029import static org.junit.Assert.assertTrue;
030import static org.junit.Assert.fail;
031import static org.mockito.ArgumentMatchers.any;
032import static org.mockito.Mockito.doAnswer;
033import static org.mockito.Mockito.mock;
034import static org.mockito.Mockito.spy;
035import static org.mockito.Mockito.when;
036
037import java.io.IOException;
038import java.util.ArrayList;
039import java.util.Collection;
040import java.util.Collections;
041import java.util.List;
042import java.util.Optional;
043import java.util.concurrent.CountDownLatch;
044import java.util.concurrent.TimeUnit;
045import org.apache.hadoop.conf.Configuration;
046import org.apache.hadoop.fs.FSDataOutputStream;
047import org.apache.hadoop.fs.FileStatus;
048import org.apache.hadoop.fs.FileSystem;
049import org.apache.hadoop.fs.Path;
050import org.apache.hadoop.hbase.ChoreService;
051import org.apache.hadoop.hbase.HBaseClassTestRule;
052import org.apache.hadoop.hbase.HBaseConfiguration;
053import org.apache.hadoop.hbase.HBaseTestingUtil;
054import org.apache.hadoop.hbase.HConstants;
055import org.apache.hadoop.hbase.HTestConst;
056import org.apache.hadoop.hbase.Waiter;
057import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
058import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
059import org.apache.hadoop.hbase.client.Delete;
060import org.apache.hadoop.hbase.client.Durability;
061import org.apache.hadoop.hbase.client.Put;
062import org.apache.hadoop.hbase.client.Table;
063import org.apache.hadoop.hbase.client.TableDescriptor;
064import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
065import org.apache.hadoop.hbase.io.hfile.HFileScanner;
066import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
067import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
068import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
069import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
070import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
071import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
072import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
073import org.apache.hadoop.hbase.security.User;
074import org.apache.hadoop.hbase.testclassification.MediumTests;
075import org.apache.hadoop.hbase.testclassification.RegionServerTests;
076import org.apache.hadoop.hbase.util.Bytes;
077import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
078import org.apache.hadoop.hbase.util.Threads;
079import org.apache.hadoop.hbase.wal.WAL;
080import org.junit.After;
081import org.junit.Assume;
082import org.junit.Before;
083import org.junit.ClassRule;
084import org.junit.Rule;
085import org.junit.Test;
086import org.junit.experimental.categories.Category;
087import org.junit.rules.TestName;
088import org.mockito.Mockito;
089import org.mockito.invocation.InvocationOnMock;
090import org.mockito.stubbing.Answer;
091
092/**
093 * Test compaction framework and common functions
094 */
095@Category({ RegionServerTests.class, MediumTests.class })
096public class TestCompaction {
097
098  @ClassRule
099  public static final HBaseClassTestRule CLASS_RULE =
100    HBaseClassTestRule.forClass(TestCompaction.class);
101
102  @Rule
103  public TestName name = new TestName();
104  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
105  protected Configuration conf = UTIL.getConfiguration();
106
107  private HRegion r = null;
108  private TableDescriptor tableDescriptor = null;
109  private static final byte[] COLUMN_FAMILY = fam1;
110  private final byte[] STARTROW = Bytes.toBytes(START_KEY);
111  private static final byte[] COLUMN_FAMILY_TEXT = COLUMN_FAMILY;
112  private int compactionThreshold;
113  private byte[] secondRowBytes, thirdRowBytes;
114  private static final long MAX_FILES_TO_COMPACT = 10;
115  private final byte[] FAMILY = Bytes.toBytes("cf");
116
117  /** constructor */
118  public TestCompaction() {
119    super();
120
121    // Set cache flush size to 1MB
122    conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024);
123    conf.setInt(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, 100);
124    conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
125      NoLimitThroughputController.class.getName());
126    compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3);
127
128    secondRowBytes = START_KEY_BYTES.clone();
129    // Increment the least significant character so we get to next row.
130    secondRowBytes[START_KEY_BYTES.length - 1]++;
131    thirdRowBytes = START_KEY_BYTES.clone();
132    thirdRowBytes[START_KEY_BYTES.length - 1] =
133      (byte) (thirdRowBytes[START_KEY_BYTES.length - 1] + 2);
134  }
135
136  @Before
137  public void setUp() throws Exception {
138    TableDescriptorBuilder builder = UTIL.createModifyableTableDescriptor(name.getMethodName());
139    if (name.getMethodName().equals("testCompactionSeqId")) {
140      UTIL.getConfiguration().set("hbase.hstore.compaction.kv.max", "10");
141      UTIL.getConfiguration().set(DefaultStoreEngine.DEFAULT_COMPACTOR_CLASS_KEY,
142        DummyCompactor.class.getName());
143      ColumnFamilyDescriptor familyDescriptor =
144        ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setMaxVersions(65536).build();
145      builder.setColumnFamily(familyDescriptor);
146    }
147    this.tableDescriptor = builder.build();
148    this.r = UTIL.createLocalHRegion(tableDescriptor, null, null);
149  }
150
151  @After
152  public void tearDown() throws Exception {
153    WAL wal = r.getWAL();
154    this.r.close();
155    wal.close();
156  }
157
158  /**
159   * Verify that you can stop a long-running compaction (used during RS shutdown)
160   */
161  @Test
162  public void testInterruptCompactionBySize() throws Exception {
163    assertEquals(0, count());
164
165    // lower the polling interval for this test
166    conf.setInt(SIZE_LIMIT_KEY, 10 * 1000 /* 10 KB */);
167
168    try {
169      // Create a couple store files w/ 15KB (over 10KB interval)
170      int jmax = (int) Math.ceil(15.0 / compactionThreshold);
171      byte[] pad = new byte[1000]; // 1 KB chunk
172      for (int i = 0; i < compactionThreshold; i++) {
173        Table loader = new RegionAsTable(r);
174        Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i)));
175        p.setDurability(Durability.SKIP_WAL);
176        for (int j = 0; j < jmax; j++) {
177          p.addColumn(COLUMN_FAMILY, Bytes.toBytes(j), pad);
178        }
179        HTestConst.addContent(loader, Bytes.toString(COLUMN_FAMILY));
180        loader.put(p);
181        r.flush(true);
182      }
183
184      HRegion spyR = spy(r);
185      doAnswer(new Answer<Object>() {
186        @Override
187        public Object answer(InvocationOnMock invocation) throws Throwable {
188          r.writestate.writesEnabled = false;
189          return invocation.callRealMethod();
190        }
191      }).when(spyR).doRegionCompactionPrep();
192
193      // force a minor compaction, but not before requesting a stop
194      spyR.compactStores();
195
196      // ensure that the compaction stopped, all old files are intact,
197      HStore s = r.getStore(COLUMN_FAMILY);
198      assertEquals(compactionThreshold, s.getStorefilesCount());
199      assertTrue(s.getStorefilesSize() > 15 * 1000);
200      // and no new store files persisted past compactStores()
201      // only one empty dir exists in temp dir
202      FileStatus[] ls = r.getFilesystem().listStatus(r.getRegionFileSystem().getTempDir());
203      assertEquals(1, ls.length);
204      Path storeTempDir =
205        new Path(r.getRegionFileSystem().getTempDir(), Bytes.toString(COLUMN_FAMILY));
206      assertTrue(r.getFilesystem().exists(storeTempDir));
207      ls = r.getFilesystem().listStatus(storeTempDir);
208      assertEquals(0, ls.length);
209    } finally {
210      // don't mess up future tests
211      r.writestate.writesEnabled = true;
212      conf.setInt(SIZE_LIMIT_KEY, 10 * 1000 * 1000 /* 10 MB */);
213
214      // Delete all Store information once done using
215      for (int i = 0; i < compactionThreshold; i++) {
216        Delete delete = new Delete(Bytes.add(STARTROW, Bytes.toBytes(i)));
217        byte[][] famAndQf = { COLUMN_FAMILY, null };
218        delete.addFamily(famAndQf[0]);
219        r.delete(delete);
220      }
221      r.flush(true);
222
223      // Multiple versions allowed for an entry, so the delete isn't enough
224      // Lower TTL and expire to ensure that all our entries have been wiped
225      final int ttl = 1000;
226      for (HStore store : this.r.stores.values()) {
227        ScanInfo old = store.getScanInfo();
228        ScanInfo si = old.customize(old.getMaxVersions(), ttl, old.getKeepDeletedCells());
229        store.setScanInfo(si);
230      }
231      Thread.sleep(ttl);
232
233      r.compact(true);
234      assertEquals(0, count());
235    }
236  }
237
238  @Test
239  public void testInterruptCompactionByTime() throws Exception {
240    assertEquals(0, count());
241
242    // lower the polling interval for this test
243    conf.setLong(TIME_LIMIT_KEY, 1 /* 1ms */);
244
245    try {
246      // Create a couple store files w/ 15KB (over 10KB interval)
247      int jmax = (int) Math.ceil(15.0 / compactionThreshold);
248      byte[] pad = new byte[1000]; // 1 KB chunk
249      for (int i = 0; i < compactionThreshold; i++) {
250        Table loader = new RegionAsTable(r);
251        Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i)));
252        p.setDurability(Durability.SKIP_WAL);
253        for (int j = 0; j < jmax; j++) {
254          p.addColumn(COLUMN_FAMILY, Bytes.toBytes(j), pad);
255        }
256        HTestConst.addContent(loader, Bytes.toString(COLUMN_FAMILY));
257        loader.put(p);
258        r.flush(true);
259      }
260
261      HRegion spyR = spy(r);
262      doAnswer(new Answer<Object>() {
263        @Override
264        public Object answer(InvocationOnMock invocation) throws Throwable {
265          r.writestate.writesEnabled = false;
266          return invocation.callRealMethod();
267        }
268      }).when(spyR).doRegionCompactionPrep();
269
270      // force a minor compaction, but not before requesting a stop
271      spyR.compactStores();
272
273      // ensure that the compaction stopped, all old files are intact,
274      HStore s = r.getStore(COLUMN_FAMILY);
275      assertEquals(compactionThreshold, s.getStorefilesCount());
276      assertTrue(s.getStorefilesSize() > 15 * 1000);
277      // and no new store files persisted past compactStores()
278      // only one empty dir exists in temp dir
279      FileStatus[] ls = r.getFilesystem().listStatus(r.getRegionFileSystem().getTempDir());
280      assertEquals(1, ls.length);
281      Path storeTempDir =
282        new Path(r.getRegionFileSystem().getTempDir(), Bytes.toString(COLUMN_FAMILY));
283      assertTrue(r.getFilesystem().exists(storeTempDir));
284      ls = r.getFilesystem().listStatus(storeTempDir);
285      assertEquals(0, ls.length);
286    } finally {
287      // don't mess up future tests
288      r.writestate.writesEnabled = true;
289      conf.setLong(TIME_LIMIT_KEY, 10 * 1000L /* 10 s */);
290
291      // Delete all Store information once done using
292      for (int i = 0; i < compactionThreshold; i++) {
293        Delete delete = new Delete(Bytes.add(STARTROW, Bytes.toBytes(i)));
294        byte[][] famAndQf = { COLUMN_FAMILY, null };
295        delete.addFamily(famAndQf[0]);
296        r.delete(delete);
297      }
298      r.flush(true);
299
300      // Multiple versions allowed for an entry, so the delete isn't enough
301      // Lower TTL and expire to ensure that all our entries have been wiped
302      final int ttl = 1000;
303      for (HStore store : this.r.stores.values()) {
304        ScanInfo old = store.getScanInfo();
305        ScanInfo si = old.customize(old.getMaxVersions(), ttl, old.getKeepDeletedCells());
306        store.setScanInfo(si);
307      }
308      Thread.sleep(ttl);
309
310      r.compact(true);
311      assertEquals(0, count());
312    }
313  }
314
315  private int count() throws IOException {
316    int count = 0;
317    for (HStoreFile f : this.r.stores.get(COLUMN_FAMILY_TEXT).getStorefiles()) {
318      HFileScanner scanner = f.getReader().getScanner(false, false);
319      if (!scanner.seekTo()) {
320        continue;
321      }
322      do {
323        count++;
324      } while (scanner.next());
325    }
326    return count;
327  }
328
329  private void createStoreFile(final HRegion region) throws IOException {
330    createStoreFile(region, Bytes.toString(COLUMN_FAMILY));
331  }
332
333  private void createStoreFile(final HRegion region, String family) throws IOException {
334    Table loader = new RegionAsTable(region);
335    HTestConst.addContent(loader, family);
336    region.flush(true);
337  }
338
339  @Test
340  public void testCompactionWithCorruptResult() throws Exception {
341    int nfiles = 10;
342    for (int i = 0; i < nfiles; i++) {
343      createStoreFile(r);
344    }
345    HStore store = r.getStore(COLUMN_FAMILY);
346
347    Collection<HStoreFile> storeFiles = store.getStorefiles();
348    DefaultCompactor tool = (DefaultCompactor) store.storeEngine.getCompactor();
349    CompactionRequestImpl request = new CompactionRequestImpl(storeFiles);
350    tool.compact(request, NoLimitThroughputController.INSTANCE, null);
351
352    // Now lets corrupt the compacted file.
353    FileSystem fs = store.getFileSystem();
354    // default compaction policy created one and only one new compacted file
355    Path tmpPath = store.getRegionFileSystem().createTempName();
356    try (FSDataOutputStream stream = fs.create(tmpPath, null, true, 512, (short) 3, 1024L, null)) {
357      stream.writeChars("CORRUPT FILE!!!!");
358    }
359    // The complete compaction should fail and the corrupt file should remain
360    // in the 'tmp' directory;
361    assertThrows(IOException.class, () -> store.doCompaction(null, null, null,
362      EnvironmentEdgeManager.currentTime(), Collections.singletonList(tmpPath)));
363    assertTrue(fs.exists(tmpPath));
364  }
365
366  /**
367   * Create a custom compaction request and be sure that we can track it through the queue, knowing
368   * when the compaction is completed.
369   */
370  @Test
371  public void testTrackingCompactionRequest() throws Exception {
372    // setup a compact/split thread on a mock server
373    HRegionServer mockServer = Mockito.mock(HRegionServer.class);
374    Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
375    CompactSplit thread = new CompactSplit(mockServer);
376    Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
377
378    // setup a region/store with some files
379    HStore store = r.getStore(COLUMN_FAMILY);
380    createStoreFile(r);
381    for (int i = 0; i < MAX_FILES_TO_COMPACT + 1; i++) {
382      createStoreFile(r);
383    }
384
385    CountDownLatch latch = new CountDownLatch(1);
386    Tracker tracker = new Tracker(latch);
387    thread.requestCompaction(r, store, "test custom comapction", PRIORITY_USER, tracker, null);
388    // wait for the latch to complete.
389    latch.await();
390
391    thread.interruptIfNecessary();
392  }
393
394  @Test
395  public void testCompactionFailure() throws Exception {
396    // setup a compact/split thread on a mock server
397    HRegionServer mockServer = Mockito.mock(HRegionServer.class);
398    Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
399    CompactSplit thread = new CompactSplit(mockServer);
400    Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
401
402    // setup a region/store with some files
403    HStore store = r.getStore(COLUMN_FAMILY);
404    createStoreFile(r);
405    for (int i = 0; i < HStore.DEFAULT_BLOCKING_STOREFILE_COUNT - 1; i++) {
406      createStoreFile(r);
407    }
408
409    HRegion mockRegion = Mockito.spy(r);
410    Mockito.when(mockRegion.checkSplit())
411      .thenThrow(new RuntimeException("Thrown intentionally by test!"));
412
413    try (MetricsRegionWrapperImpl metricsWrapper = new MetricsRegionWrapperImpl(r)) {
414
415      long preCompletedCount = metricsWrapper.getNumCompactionsCompleted();
416      long preFailedCount = metricsWrapper.getNumCompactionsFailed();
417
418      CountDownLatch latch = new CountDownLatch(1);
419      Tracker tracker = new Tracker(latch);
420      thread.requestCompaction(mockRegion, store, "test custom comapction", PRIORITY_USER, tracker,
421        null);
422      // wait for the latch to complete.
423      latch.await(120, TimeUnit.SECONDS);
424
425      // compaction should have completed and been marked as failed due to error in split request
426      long postCompletedCount = metricsWrapper.getNumCompactionsCompleted();
427      long postFailedCount = metricsWrapper.getNumCompactionsFailed();
428
429      assertTrue("Completed count should have increased (pre=" + preCompletedCount + ", post="
430        + postCompletedCount + ")", postCompletedCount > preCompletedCount);
431      assertTrue("Failed count should have increased (pre=" + preFailedCount + ", post="
432        + postFailedCount + ")", postFailedCount > preFailedCount);
433    }
434  }
435
436  /**
437   * Test no new Compaction requests are generated after calling stop compactions
438   */
439  @Test
440  public void testStopStartCompaction() throws IOException {
441    // setup a compact/split thread on a mock server
442    HRegionServer mockServer = Mockito.mock(HRegionServer.class);
443    Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
444    final CompactSplit thread = new CompactSplit(mockServer);
445    Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
446    // setup a region/store with some files
447    HStore store = r.getStore(COLUMN_FAMILY);
448    createStoreFile(r);
449    for (int i = 0; i < HStore.DEFAULT_BLOCKING_STOREFILE_COUNT - 1; i++) {
450      createStoreFile(r);
451    }
452    thread.switchCompaction(false);
453    thread.requestCompaction(r, store, "test", Store.PRIORITY_USER,
454      CompactionLifeCycleTracker.DUMMY, null);
455    assertFalse(thread.isCompactionsEnabled());
456    int longCompactions = thread.getLongCompactions().getActiveCount();
457    int shortCompactions = thread.getShortCompactions().getActiveCount();
458    assertEquals(
459      "longCompactions=" + longCompactions + "," + "shortCompactions=" + shortCompactions, 0,
460      longCompactions + shortCompactions);
461    thread.switchCompaction(true);
462    assertTrue(thread.isCompactionsEnabled());
463    // Make sure no compactions have run.
464    assertEquals(0, thread.getLongCompactions().getCompletedTaskCount()
465      + thread.getShortCompactions().getCompletedTaskCount());
466    // Request a compaction and make sure it is submitted successfully.
467    thread.requestCompaction(r, store, "test", Store.PRIORITY_USER,
468      CompactionLifeCycleTracker.DUMMY, null);
469    // Wait until the compaction finishes.
470    Waiter.waitFor(UTIL.getConfiguration(), 5000,
471      (Waiter.Predicate<Exception>) () -> thread.getLongCompactions().getCompletedTaskCount()
472        + thread.getShortCompactions().getCompletedTaskCount() == 1);
473    // Make sure there are no compactions running.
474    assertEquals(0,
475      thread.getLongCompactions().getActiveCount() + thread.getShortCompactions().getActiveCount());
476  }
477
478  @Test
479  public void testInterruptingRunningCompactions() throws Exception {
480    // setup a compact/split thread on a mock server
481    conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
482      WaitThroughPutController.class.getName());
483    HRegionServer mockServer = Mockito.mock(HRegionServer.class);
484    Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
485    CompactSplit thread = new CompactSplit(mockServer);
486
487    Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
488
489    // setup a region/store with some files
490    HStore store = r.getStore(COLUMN_FAMILY);
491    int jmax = (int) Math.ceil(15.0 / compactionThreshold);
492    byte[] pad = new byte[1000]; // 1 KB chunk
493    for (int i = 0; i < compactionThreshold; i++) {
494      Table loader = new RegionAsTable(r);
495      Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i)));
496      p.setDurability(Durability.SKIP_WAL);
497      for (int j = 0; j < jmax; j++) {
498        p.addColumn(COLUMN_FAMILY, Bytes.toBytes(j), pad);
499      }
500      HTestConst.addContent(loader, Bytes.toString(COLUMN_FAMILY));
501      loader.put(p);
502      r.flush(true);
503    }
504    HStore s = r.getStore(COLUMN_FAMILY);
505    int initialFiles = s.getStorefilesCount();
506
507    thread.requestCompaction(r, store, "test custom comapction", PRIORITY_USER,
508      CompactionLifeCycleTracker.DUMMY, null);
509
510    Thread.sleep(3000);
511    thread.switchCompaction(false);
512    assertEquals(initialFiles, s.getStorefilesCount());
513    // don't mess up future tests
514    thread.switchCompaction(true);
515  }
516
517  /**
518   * HBASE-7947: Regression test to ensure adding to the correct list in the {@link CompactSplit}
519   * @throws Exception on failure
520   */
521  @Test
522  public void testMultipleCustomCompactionRequests() throws Exception {
523    // setup a compact/split thread on a mock server
524    HRegionServer mockServer = Mockito.mock(HRegionServer.class);
525    Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
526    CompactSplit thread = new CompactSplit(mockServer);
527    Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
528
529    // setup a region/store with some files
530    int numStores = r.getStores().size();
531    CountDownLatch latch = new CountDownLatch(numStores);
532    Tracker tracker = new Tracker(latch);
533    // create some store files and setup requests for each store on which we want to do a
534    // compaction
535    for (HStore store : r.getStores()) {
536      createStoreFile(r, store.getColumnFamilyName());
537      createStoreFile(r, store.getColumnFamilyName());
538      createStoreFile(r, store.getColumnFamilyName());
539      thread.requestCompaction(r, store, "test mulitple custom comapctions", PRIORITY_USER, tracker,
540        null);
541    }
542    // wait for the latch to complete.
543    latch.await();
544
545    thread.interruptIfNecessary();
546  }
547
548  class StoreMockMaker extends StatefulStoreMockMaker {
549    public ArrayList<HStoreFile> compacting = new ArrayList<>();
550    public ArrayList<HStoreFile> notCompacting = new ArrayList<>();
551    private final ArrayList<Integer> results;
552
553    public StoreMockMaker(ArrayList<Integer> results) {
554      this.results = results;
555    }
556
557    public class TestCompactionContext extends CompactionContext {
558
559      private List<HStoreFile> selectedFiles;
560
561      public TestCompactionContext(List<HStoreFile> selectedFiles) {
562        super();
563        this.selectedFiles = selectedFiles;
564      }
565
566      @Override
567      public List<HStoreFile> preSelect(List<HStoreFile> filesCompacting) {
568        return new ArrayList<>();
569      }
570
571      @Override
572      public boolean select(List<HStoreFile> filesCompacting, boolean isUserCompaction,
573        boolean mayUseOffPeak, boolean forceMajor) throws IOException {
574        this.request = new CompactionRequestImpl(selectedFiles);
575        this.request.setPriority(getPriority());
576        return true;
577      }
578
579      @Override
580      public List<Path> compact(ThroughputController throughputController, User user)
581        throws IOException {
582        finishCompaction(this.selectedFiles);
583        return new ArrayList<>();
584      }
585    }
586
587    @Override
588    public synchronized Optional<CompactionContext> selectCompaction() {
589      CompactionContext ctx = new TestCompactionContext(new ArrayList<>(notCompacting));
590      compacting.addAll(notCompacting);
591      notCompacting.clear();
592      try {
593        ctx.select(null, false, false, false);
594      } catch (IOException ex) {
595        fail("Shouldn't happen");
596      }
597      return Optional.of(ctx);
598    }
599
600    @Override
601    public synchronized void cancelCompaction(Object object) {
602      TestCompactionContext ctx = (TestCompactionContext) object;
603      compacting.removeAll(ctx.selectedFiles);
604      notCompacting.addAll(ctx.selectedFiles);
605    }
606
607    public synchronized void finishCompaction(List<HStoreFile> sfs) {
608      if (sfs.isEmpty()) return;
609      synchronized (results) {
610        results.add(sfs.size());
611      }
612      compacting.removeAll(sfs);
613    }
614
615    @Override
616    public int getPriority() {
617      return 7 - compacting.size() - notCompacting.size();
618    }
619  }
620
621  public class BlockingStoreMockMaker extends StatefulStoreMockMaker {
622    BlockingCompactionContext blocked = null;
623
624    public class BlockingCompactionContext extends CompactionContext {
625      public volatile boolean isInCompact = false;
626
627      public void unblock() {
628        synchronized (this) {
629          this.notifyAll();
630        }
631      }
632
633      @Override
634      public List<Path> compact(ThroughputController throughputController, User user)
635        throws IOException {
636        try {
637          isInCompact = true;
638          synchronized (this) {
639            this.wait();
640          }
641        } catch (InterruptedException e) {
642          Assume.assumeNoException(e);
643        }
644        return new ArrayList<>();
645      }
646
647      @Override
648      public List<HStoreFile> preSelect(List<HStoreFile> filesCompacting) {
649        return new ArrayList<>();
650      }
651
652      @Override
653      public boolean select(List<HStoreFile> f, boolean i, boolean m, boolean e)
654        throws IOException {
655        this.request = new CompactionRequestImpl(new ArrayList<>());
656        return true;
657      }
658    }
659
660    @Override
661    public Optional<CompactionContext> selectCompaction() {
662      this.blocked = new BlockingCompactionContext();
663      try {
664        this.blocked.select(null, false, false, false);
665      } catch (IOException ex) {
666        fail("Shouldn't happen");
667      }
668      return Optional.of(blocked);
669    }
670
671    @Override
672    public void cancelCompaction(Object object) {
673    }
674
675    @Override
676    public int getPriority() {
677      return Integer.MIN_VALUE; // some invalid value, see createStoreMock
678    }
679
680    public BlockingCompactionContext waitForBlocking() {
681      while (this.blocked == null || !this.blocked.isInCompact) {
682        Threads.sleepWithoutInterrupt(50);
683      }
684      BlockingCompactionContext ctx = this.blocked;
685      this.blocked = null;
686      return ctx;
687    }
688
689    @Override
690    public HStore createStoreMock(String name) throws Exception {
691      return createStoreMock(Integer.MIN_VALUE, name);
692    }
693
694    public HStore createStoreMock(int priority, String name) throws Exception {
695      // Override the mock to always return the specified priority.
696      HStore s = super.createStoreMock(name);
697      when(s.getCompactPriority()).thenReturn(priority);
698      return s;
699    }
700  }
701
702  /** Test compaction priority management and multiple compactions per store (HBASE-8665). */
703  @Test
704  public void testCompactionQueuePriorities() throws Exception {
705    // Setup a compact/split thread on a mock server.
706    final Configuration conf = HBaseConfiguration.create();
707    HRegionServer mockServer = mock(HRegionServer.class);
708    when(mockServer.isStopped()).thenReturn(false);
709    when(mockServer.getConfiguration()).thenReturn(conf);
710    when(mockServer.getChoreService()).thenReturn(new ChoreService("test"));
711    CompactSplit cst = new CompactSplit(mockServer);
712    when(mockServer.getCompactSplitThread()).thenReturn(cst);
713    // prevent large compaction thread pool stealing job from small compaction queue.
714    cst.shutdownLongCompactions();
715    // Set up the region mock that redirects compactions.
716    HRegion r = mock(HRegion.class);
717    when(r.compact(any(), any(), any(), any())).then(new Answer<Boolean>() {
718      @Override
719      public Boolean answer(InvocationOnMock invocation) throws Throwable {
720        invocation.<CompactionContext> getArgument(0).compact(invocation.getArgument(2), null);
721        return true;
722      }
723    });
724
725    // Set up store mocks for 2 "real" stores and the one we use for blocking CST.
726    ArrayList<Integer> results = new ArrayList<>();
727    StoreMockMaker sm = new StoreMockMaker(results), sm2 = new StoreMockMaker(results);
728    HStore store = sm.createStoreMock("store1");
729    HStore store2 = sm2.createStoreMock("store2");
730    BlockingStoreMockMaker blocker = new BlockingStoreMockMaker();
731
732    // First, block the compaction thread so that we could muck with queue.
733    cst.requestSystemCompaction(r, blocker.createStoreMock(1, "b-pri1"), "b-pri1");
734    BlockingStoreMockMaker.BlockingCompactionContext currentBlock = blocker.waitForBlocking();
735
736    // Add 4 files to store1, 3 to store2, and queue compactions; pri 3 and 4 respectively.
737    for (int i = 0; i < 4; ++i) {
738      sm.notCompacting.add(createFile());
739    }
740    cst.requestSystemCompaction(r, store, "s1-pri3");
741    for (int i = 0; i < 3; ++i) {
742      sm2.notCompacting.add(createFile());
743    }
744    cst.requestSystemCompaction(r, store2, "s2-pri4");
745    // Now add 2 more files to store1 and queue compaction - pri 1.
746    for (int i = 0; i < 2; ++i) {
747      sm.notCompacting.add(createFile());
748    }
749    cst.requestSystemCompaction(r, store, "s1-pri1");
750    // Finally add blocking compaction with priority 2.
751    cst.requestSystemCompaction(r, blocker.createStoreMock(2, "b-pri2"), "b-pri2");
752
753    // Unblock the blocking compaction; we should run pri1 and become block again in pri2.
754    currentBlock.unblock();
755    currentBlock = blocker.waitForBlocking();
756    // Pri1 should have "compacted" all 6 files.
757    assertEquals(1, results.size());
758    assertEquals(6, results.get(0).intValue());
759    // Add 2 files to store 1 (it has 2 files now).
760    for (int i = 0; i < 2; ++i) {
761      sm.notCompacting.add(createFile());
762    }
763    // Now we have pri4 for store 2 in queue, and pri3 for store1; store1's current priority
764    // is 5, however, so it must not preempt store 2. Add blocking compaction at the end.
765    cst.requestSystemCompaction(r, blocker.createStoreMock(7, "b-pri7"), "b-pri7");
766    currentBlock.unblock();
767    currentBlock = blocker.waitForBlocking();
768    assertEquals(3, results.size());
769    assertEquals(3, results.get(1).intValue()); // 3 files should go before 2 files.
770    assertEquals(2, results.get(2).intValue());
771
772    currentBlock.unblock();
773    cst.interruptIfNecessary();
774  }
775
776  /**
777   * Firstly write 10 cells (with different time stamp) to a qualifier and flush to hfile1, then
778   * write 10 cells (with different time stamp) to the same qualifier and flush to hfile2. The
779   * latest cell (cell-A) in hfile1 and the oldest cell (cell-B) in hfile2 are with the same time
780   * stamp but different sequence id, and will get scanned successively during compaction.
781   * <p/>
782   * We set compaction.kv.max to 10 so compaction will scan 10 versions each round, meanwhile we set
783   * keepSeqIdPeriod=0 in {@link DummyCompactor} so all 10 versions of hfile2 will be written out
784   * with seqId cleaned (set to 0) including cell-B, then when scanner goes to cell-A it will cause
785   * a scan out-of-order assertion error before HBASE-16931 n * if error occurs during the test
786   */
787  @Test
788  public void testCompactionSeqId() throws Exception {
789    final byte[] ROW = Bytes.toBytes("row");
790    final byte[] QUALIFIER = Bytes.toBytes("qualifier");
791
792    long timestamp = 10000;
793
794    // row1/cf:a/10009/Put/vlen=2/seqid=11 V: v9
795    // row1/cf:a/10008/Put/vlen=2/seqid=10 V: v8
796    // row1/cf:a/10007/Put/vlen=2/seqid=9 V: v7
797    // row1/cf:a/10006/Put/vlen=2/seqid=8 V: v6
798    // row1/cf:a/10005/Put/vlen=2/seqid=7 V: v5
799    // row1/cf:a/10004/Put/vlen=2/seqid=6 V: v4
800    // row1/cf:a/10003/Put/vlen=2/seqid=5 V: v3
801    // row1/cf:a/10002/Put/vlen=2/seqid=4 V: v2
802    // row1/cf:a/10001/Put/vlen=2/seqid=3 V: v1
803    // row1/cf:a/10000/Put/vlen=2/seqid=2 V: v0
804    for (int i = 0; i < 10; i++) {
805      Put put = new Put(ROW);
806      put.addColumn(FAMILY, QUALIFIER, timestamp + i, Bytes.toBytes("v" + i));
807      r.put(put);
808    }
809    r.flush(true);
810
811    // row1/cf:a/10018/Put/vlen=3/seqid=16 V: v18
812    // row1/cf:a/10017/Put/vlen=3/seqid=17 V: v17
813    // row1/cf:a/10016/Put/vlen=3/seqid=18 V: v16
814    // row1/cf:a/10015/Put/vlen=3/seqid=19 V: v15
815    // row1/cf:a/10014/Put/vlen=3/seqid=20 V: v14
816    // row1/cf:a/10013/Put/vlen=3/seqid=21 V: v13
817    // row1/cf:a/10012/Put/vlen=3/seqid=22 V: v12
818    // row1/cf:a/10011/Put/vlen=3/seqid=23 V: v11
819    // row1/cf:a/10010/Put/vlen=3/seqid=24 V: v10
820    // row1/cf:a/10009/Put/vlen=2/seqid=25 V: v9
821    for (int i = 18; i > 8; i--) {
822      Put put = new Put(ROW);
823      put.addColumn(FAMILY, QUALIFIER, timestamp + i, Bytes.toBytes("v" + i));
824      r.put(put);
825    }
826    r.flush(true);
827    r.compact(true);
828  }
829
830  public static class DummyCompactor extends DefaultCompactor {
831    public DummyCompactor(Configuration conf, HStore store) {
832      super(conf, store);
833      this.keepSeqIdPeriod = 0;
834    }
835  }
836
837  private static HStoreFile createFile() throws Exception {
838    HStoreFile sf = mock(HStoreFile.class);
839    when(sf.getPath()).thenReturn(new Path("file"));
840    StoreFileReader r = mock(StoreFileReader.class);
841    when(r.length()).thenReturn(10L);
842    when(sf.getReader()).thenReturn(r);
843    return sf;
844  }
845
846  /**
847   * Simple {@link CompactionLifeCycleTracker} on which you can wait until the requested compaction
848   * finishes.
849   */
850  public static class Tracker implements CompactionLifeCycleTracker {
851
852    private final CountDownLatch done;
853
854    public Tracker(CountDownLatch done) {
855      this.done = done;
856    }
857
858    @Override
859    public void afterExecution(Store store) {
860      done.countDown();
861    }
862  }
863
864  /**
865   * Simple {@link CompactionLifeCycleTracker} on which you can wait until the requested compaction
866   * finishes.
867   */
868  public static class WaitThroughPutController extends NoLimitThroughputController {
869
870    public WaitThroughPutController() {
871    }
872
873    @Override
874    public long control(String compactionName, long size) throws InterruptedException {
875      Thread.sleep(6000000);
876      return 6000000;
877    }
878  }
879}