001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mob.compactions;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.text.ParseException;
025import java.util.ArrayList;
026import java.util.Calendar;
027import java.util.Collection;
028import java.util.Collections;
029import java.util.Date;
030import java.util.HashMap;
031import java.util.List;
032import java.util.Map;
033import java.util.Random;
034import java.util.UUID;
035import java.util.concurrent.ExecutorService;
036import java.util.concurrent.RejectedExecutionException;
037import java.util.concurrent.RejectedExecutionHandler;
038import java.util.concurrent.SynchronousQueue;
039import java.util.concurrent.ThreadPoolExecutor;
040import java.util.concurrent.TimeUnit;
041import org.apache.hadoop.conf.Configuration;
042import org.apache.hadoop.fs.FileStatus;
043import org.apache.hadoop.fs.FileSystem;
044import org.apache.hadoop.fs.Path;
045import org.apache.hadoop.hbase.Cell;
046import org.apache.hadoop.hbase.CellComparatorImpl;
047import org.apache.hadoop.hbase.CellUtil;
048import org.apache.hadoop.hbase.HBaseClassTestRule;
049import org.apache.hadoop.hbase.HBaseTestingUtility;
050import org.apache.hadoop.hbase.HColumnDescriptor;
051import org.apache.hadoop.hbase.HConstants;
052import org.apache.hadoop.hbase.KeyValue;
053import org.apache.hadoop.hbase.KeyValue.Type;
054import org.apache.hadoop.hbase.TableName;
055import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
056import org.apache.hadoop.hbase.client.MobCompactPartitionPolicy;
057import org.apache.hadoop.hbase.io.hfile.CacheConfig;
058import org.apache.hadoop.hbase.io.hfile.HFileContext;
059import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
060import org.apache.hadoop.hbase.mob.MobConstants;
061import org.apache.hadoop.hbase.mob.MobFileName;
062import org.apache.hadoop.hbase.mob.MobUtils;
063import org.apache.hadoop.hbase.mob.compactions.MobCompactionRequest.CompactionType;
064import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionDelPartition;
065import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartition;
066import org.apache.hadoop.hbase.regionserver.BloomType;
067import org.apache.hadoop.hbase.regionserver.HStore;
068import org.apache.hadoop.hbase.regionserver.HStoreFile;
069import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
070import org.apache.hadoop.hbase.regionserver.ScanInfo;
071import org.apache.hadoop.hbase.regionserver.ScanType;
072import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
073import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
074import org.apache.hadoop.hbase.regionserver.StoreScanner;
075import org.apache.hadoop.hbase.testclassification.LargeTests;
076import org.apache.hadoop.hbase.util.Bytes;
077import org.apache.hadoop.hbase.util.FSUtils;
078import org.apache.hadoop.hbase.util.Threads;
079import org.apache.hadoop.hdfs.DistributedFileSystem;
080import org.junit.AfterClass;
081import org.junit.BeforeClass;
082import org.junit.ClassRule;
083import org.junit.Rule;
084import org.junit.Test;
085import org.junit.experimental.categories.Category;
086import org.junit.rules.TestName;
087import org.slf4j.Logger;
088import org.slf4j.LoggerFactory;
089
090@Category(LargeTests.class)
091public class TestPartitionedMobCompactor {
092
093  @ClassRule
094  public static final HBaseClassTestRule CLASS_RULE =
095      HBaseClassTestRule.forClass(TestPartitionedMobCompactor.class);
096
097  private static final Logger LOG = LoggerFactory.getLogger(TestPartitionedMobCompactor.class);
098  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
099  private final static String family = "family";
100  private final static String qf = "qf";
101  private final long DAY_IN_MS = 1000 * 60 * 60 * 24;
102  private static byte[] KEYS = Bytes.toBytes("012");
103  private HColumnDescriptor hcd = new HColumnDescriptor(family);
104  private Configuration conf = TEST_UTIL.getConfiguration();
105  private CacheConfig cacheConf = new CacheConfig(conf);
106  private FileSystem fs;
107  private List<FileStatus> mobFiles = new ArrayList<>();
108  private List<Path> delFiles = new ArrayList<>();
109  private List<FileStatus> allFiles = new ArrayList<>();
110  private Path basePath;
111  private String mobSuffix;
112  private String delSuffix;
113  private static ExecutorService pool;
114
115  @Rule
116  public TestName name = new TestName();
117
118  @BeforeClass
119  public static void setUpBeforeClass() throws Exception {
120    TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
121    // Inject our customized DistributedFileSystem
122    TEST_UTIL.getConfiguration().setClass("fs.hdfs.impl", FaultyDistributedFileSystem.class,
123        DistributedFileSystem.class);
124    TEST_UTIL.startMiniCluster(1);
125    pool = createThreadPool();
126  }
127
128  @AfterClass
129  public static void tearDownAfterClass() throws Exception {
130    pool.shutdown();
131    TEST_UTIL.shutdownMiniCluster();
132  }
133
134  private void init(String tableName) throws Exception {
135    fs = FileSystem.get(conf);
136    Path testDir = FSUtils.getRootDir(conf);
137    Path mobTestDir = new Path(testDir, MobConstants.MOB_DIR_NAME);
138    basePath = new Path(new Path(mobTestDir, tableName), family);
139    mobSuffix = UUID.randomUUID().toString().replaceAll("-", "");
140    delSuffix = UUID.randomUUID().toString().replaceAll("-", "") + "_del";
141    allFiles.clear();
142    mobFiles.clear();
143    delFiles.clear();
144  }
145
146  @Test
147  public void testCompactionSelectAllFilesWeeklyPolicy() throws Exception {
148    String tableName = "testCompactionSelectAllFilesWeeklyPolicy";
149    testCompactionAtMergeSize(tableName, MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD,
150        CompactionType.ALL_FILES, false, false, new Date(), MobCompactPartitionPolicy.WEEKLY, 1);
151  }
152
153  @Test
154  public void testCompactionSelectPartFilesWeeklyPolicy() throws Exception {
155    String tableName = "testCompactionSelectPartFilesWeeklyPolicy";
156    testCompactionAtMergeSize(tableName, 4000, CompactionType.PART_FILES, false, false,
157        new Date(), MobCompactPartitionPolicy.WEEKLY, 1);
158  }
159
160  @Test
161  public void testCompactionSelectPartFilesWeeklyPolicyWithPastWeek() throws Exception {
162    String tableName = "testCompactionSelectPartFilesWeeklyPolicyWithPastWeek";
163    Date dateLastWeek = new Date(System.currentTimeMillis() - (7 * DAY_IN_MS));
164    testCompactionAtMergeSize(tableName, 700, CompactionType.PART_FILES, false, false, dateLastWeek,
165        MobCompactPartitionPolicy.WEEKLY, 7);
166  }
167
168  @Test
169  public void testCompactionSelectAllFilesWeeklyPolicyWithPastWeek() throws Exception {
170    String tableName = "testCompactionSelectAllFilesWeeklyPolicyWithPastWeek";
171    Date dateLastWeek = new Date(System.currentTimeMillis() - (7 * DAY_IN_MS));
172    testCompactionAtMergeSize(tableName, 3000, CompactionType.ALL_FILES,
173        false, false, dateLastWeek, MobCompactPartitionPolicy.WEEKLY, 7);
174  }
175
176  @Test
177  public void testCompactionSelectAllFilesMonthlyPolicy() throws Exception {
178    String tableName = "testCompactionSelectAllFilesMonthlyPolicy";
179    Date dateLastWeek = new Date(System.currentTimeMillis() - (7 * DAY_IN_MS));
180    testCompactionAtMergeSize(tableName, MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD,
181        CompactionType.ALL_FILES, false, false, dateLastWeek,
182        MobCompactPartitionPolicy.MONTHLY, 7);
183  }
184
185  @Test
186  public void testCompactionSelectNoFilesWithinCurrentWeekMonthlyPolicy() throws Exception {
187    String tableName = "testCompactionSelectNoFilesWithinCurrentWeekMonthlyPolicy";
188    testCompactionAtMergeSize(tableName, MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD,
189        CompactionType.PART_FILES, false, false, new Date(), MobCompactPartitionPolicy.MONTHLY, 1);
190  }
191
192  @Test
193  public void testCompactionSelectPartFilesMonthlyPolicy() throws Exception {
194    String tableName = "testCompactionSelectPartFilesMonthlyPolicy";
195    testCompactionAtMergeSize(tableName, 4000, CompactionType.PART_FILES, false, false,
196        new Date(), MobCompactPartitionPolicy.MONTHLY, 1);
197  }
198
199  @Test
200  public void testCompactionSelectPartFilesMonthlyPolicyWithPastWeek() throws Exception {
201    String tableName = "testCompactionSelectPartFilesMonthlyPolicyWithPastWeek";
202    Date dateLastWeek = new Date(System.currentTimeMillis() - (7 * DAY_IN_MS));
203    Calendar calendar =  Calendar.getInstance();
204    Date firstDayOfCurrentMonth = MobUtils.getFirstDayOfMonth(calendar, new Date());
205    CompactionType type = CompactionType.PART_FILES;
206    long mergeSizeMultiFactor = 7;
207
208
209    // The dateLastWeek may not really be last week, suppose that it runs at 2/1/2017, it is going
210    // to be last month and the monthly policy is going to be applied here.
211    if (dateLastWeek.before(firstDayOfCurrentMonth)) {
212      type = CompactionType.ALL_FILES;
213      mergeSizeMultiFactor *= 4;
214    }
215
216    testCompactionAtMergeSize(tableName, 700, type, false, false, dateLastWeek,
217        MobCompactPartitionPolicy.MONTHLY, mergeSizeMultiFactor);
218  }
219
220  @Test
221  public void testCompactionSelectAllFilesMonthlyPolicyWithPastWeek() throws Exception {
222    String tableName = "testCompactionSelectAllFilesMonthlyPolicyWithPastWeek";
223    Date dateLastWeek = new Date(System.currentTimeMillis() - (7 * DAY_IN_MS));
224
225    testCompactionAtMergeSize(tableName, 3000, CompactionType.ALL_FILES,
226        false, false, dateLastWeek, MobCompactPartitionPolicy.MONTHLY, 7);
227  }
228
229  @Test
230  public void testCompactionSelectPartFilesMonthlyPolicyWithPastMonth() throws Exception {
231    String tableName = "testCompactionSelectPartFilesMonthlyPolicyWithPastMonth";
232
233    // back 5 weeks, it is going to be a past month
234    Date dateLastMonth = new Date(System.currentTimeMillis() - (7 * 5 * DAY_IN_MS));
235    testCompactionAtMergeSize(tableName, 200, CompactionType.PART_FILES, false, false, dateLastMonth,
236        MobCompactPartitionPolicy.MONTHLY, 28);
237  }
238
239  @Test
240  public void testCompactionSelectAllFilesMonthlyPolicyWithPastMonth() throws Exception {
241    String tableName = "testCompactionSelectAllFilesMonthlyPolicyWithPastMonth";
242
243    // back 5 weeks, it is going to be a past month
244    Date dateLastMonth = new Date(System.currentTimeMillis() - (7 * 5 * DAY_IN_MS));
245    testCompactionAtMergeSize(tableName, 750, CompactionType.ALL_FILES,
246        false, false, dateLastMonth, MobCompactPartitionPolicy.MONTHLY, 28);
247  }
248
249  @Test
250  public void testCompactionSelectWithAllFiles() throws Exception {
251    String tableName = "testCompactionSelectWithAllFiles";
252    // If there is only 1 file, it will not be compacted with _del files, so
253    // It wont be CompactionType.ALL_FILES in this case, do not create with _del files.
254    testCompactionAtMergeSize(tableName, MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD,
255        CompactionType.ALL_FILES, false, false);
256  }
257
258  @Test
259  public void testCompactionSelectWithPartFiles() throws Exception {
260    String tableName = "testCompactionSelectWithPartFiles";
261    testCompactionAtMergeSize(tableName, 4000, CompactionType.PART_FILES, false);
262  }
263
264  @Test
265  public void testCompactionSelectWithForceAllFiles() throws Exception {
266    String tableName = "testCompactionSelectWithForceAllFiles";
267    testCompactionAtMergeSize(tableName, Long.MAX_VALUE, CompactionType.ALL_FILES, true);
268  }
269
270  private void testCompactionAtMergeSize(final String tableName,
271      final long mergeSize, final CompactionType type, final boolean isForceAllFiles)
272      throws Exception {
273    testCompactionAtMergeSize(tableName, mergeSize, type, isForceAllFiles, true);
274  }
275
276  private void testCompactionAtMergeSize(final String tableName,
277      final long mergeSize, final CompactionType type, final boolean isForceAllFiles,
278      final boolean createDelFiles)
279      throws Exception {
280    Date date = new Date();
281    testCompactionAtMergeSize(tableName, mergeSize, type, isForceAllFiles, createDelFiles, date);
282  }
283
284  private void testCompactionAtMergeSize(final String tableName,
285      final long mergeSize, final CompactionType type, final boolean isForceAllFiles,
286      final boolean createDelFiles, final Date date)
287      throws Exception {
288    testCompactionAtMergeSize(tableName, mergeSize, type, isForceAllFiles, createDelFiles, date,
289        MobCompactPartitionPolicy.DAILY, 1);
290  }
291
292  private void testCompactionAtMergeSize(final String tableName,
293      final long mergeSize, final CompactionType type, final boolean isForceAllFiles,
294      final boolean createDelFiles, final Date date, final MobCompactPartitionPolicy policy,
295      final long mergeSizeMultiFactor)
296      throws Exception {
297    resetConf();
298    init(tableName);
299    int count = 10;
300    // create 10 mob files.
301    createStoreFiles(basePath, family, qf, count, Type.Put, date);
302
303    if (createDelFiles) {
304      // create 10 del files
305      createStoreFiles(basePath, family, qf, count, Type.Delete, date);
306    }
307
308    Calendar calendar =  Calendar.getInstance();
309    Date firstDayOfCurrentWeek = MobUtils.getFirstDayOfWeek(calendar, new Date());
310
311    listFiles();
312    List<String> expectedStartKeys = new ArrayList<>();
313    for(FileStatus file : mobFiles) {
314      if(file.getLen() < mergeSize * mergeSizeMultiFactor) {
315        String fileName = file.getPath().getName();
316        String startKey = fileName.substring(0, 32);
317
318        // If the policy is monthly and files are in current week, they will be skipped
319        // in minor compcation.
320        boolean skipCompaction = false;
321        if (policy == MobCompactPartitionPolicy.MONTHLY) {
322          String fileDateStr = MobFileName.getDateFromName(fileName);
323          Date fileDate;
324          try {
325            fileDate = MobUtils.parseDate(fileDateStr);
326          } catch (ParseException e)  {
327            LOG.warn("Failed to parse date " + fileDateStr, e);
328            fileDate = new Date();
329          }
330          if (!fileDate.before(firstDayOfCurrentWeek)) {
331            skipCompaction = true;
332          }
333        }
334
335        // If it is not an major mob compaction and del files are there,
336        // these mob files wont be compacted.
337        if (isForceAllFiles || (!createDelFiles && !skipCompaction)) {
338          expectedStartKeys.add(startKey);
339        }
340      }
341    }
342
343    // Set the policy
344    this.hcd.setMobCompactPartitionPolicy(policy);
345    // set the mob compaction mergeable threshold
346    conf.setLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD, mergeSize);
347    testSelectFiles(tableName, type, isForceAllFiles, expectedStartKeys);
348    // go back to the default daily policy
349    this.hcd.setMobCompactPartitionPolicy(MobCompactPartitionPolicy.DAILY);
350  }
351
352  @Test
353  public void testCompactDelFilesWithDefaultBatchSize() throws Exception {
354    testCompactDelFilesAtBatchSize(name.getMethodName(), MobConstants.DEFAULT_MOB_COMPACTION_BATCH_SIZE,
355        MobConstants.DEFAULT_MOB_DELFILE_MAX_COUNT);
356  }
357
358  @Test
359  public void testCompactDelFilesWithSmallBatchSize() throws Exception {
360    testCompactDelFilesAtBatchSize(name.getMethodName(), 4, MobConstants.DEFAULT_MOB_DELFILE_MAX_COUNT);
361  }
362
363  @Test
364  public void testCompactDelFilesChangeMaxDelFileCount() throws Exception {
365    testCompactDelFilesAtBatchSize(name.getMethodName(), 4, 2);
366  }
367
368  @Test
369  public void testCompactFilesWithDstDirFull() throws Exception {
370    String tableName = name.getMethodName();
371    fs = FileSystem.get(conf);
372    FaultyDistributedFileSystem faultyFs = (FaultyDistributedFileSystem)fs;
373    Path testDir = FSUtils.getRootDir(conf);
374    Path mobTestDir = new Path(testDir, MobConstants.MOB_DIR_NAME);
375    basePath = new Path(new Path(mobTestDir, tableName), family);
376
377    try {
378      int count = 2;
379      // create 2 mob files.
380      createStoreFiles(basePath, family, qf, count, Type.Put, true, new Date());
381      listFiles();
382
383      TableName tName = TableName.valueOf(tableName);
384      MobCompactor compactor = new PartitionedMobCompactor(conf, faultyFs, tName, hcd, pool);
385      faultyFs.setThrowException(true);
386      try {
387        compactor.compact(allFiles, true);
388      } catch (IOException e) {
389        System.out.println("Expected exception, ignore");
390      }
391
392      // Verify that all the files in tmp directory are cleaned up
393      Path tempPath = new Path(MobUtils.getMobHome(conf), MobConstants.TEMP_DIR_NAME);
394      FileStatus[] ls = faultyFs.listStatus(tempPath);
395
396      // Only .bulkload under this directory
397      assertTrue(ls.length == 1);
398      assertTrue(MobConstants.BULKLOAD_DIR_NAME.equalsIgnoreCase(ls[0].getPath().getName()));
399
400      Path bulkloadPath = new Path(tempPath, new Path(MobConstants.BULKLOAD_DIR_NAME, new Path(
401          tName.getNamespaceAsString(), tName.getQualifierAsString())));
402
403      // Nothing in bulkLoad directory
404      FileStatus[] lsBulkload = faultyFs.listStatus(bulkloadPath);
405      assertTrue(lsBulkload.length == 0);
406
407    } finally {
408      faultyFs.setThrowException(false);
409    }
410  }
411
412  /**
413   * Create mulitple partition files
414   */
415  private void createMobFile(Path basePath) throws IOException {
416    HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
417    MobFileName mobFileName = null;
418    int ii = 0;
419    Date today = new Date();
420    for (byte k0 : KEYS) {
421      byte[] startRow = Bytes.toBytes(ii++);
422
423      mobFileName = MobFileName.create(startRow, MobUtils.formatDate(today), mobSuffix);
424
425      StoreFileWriter mobFileWriter =
426          new StoreFileWriter.Builder(conf, cacheConf, fs).withFileContext(meta)
427              .withFilePath(new Path(basePath, mobFileName.getFileName())).build();
428
429      long now = System.currentTimeMillis();
430      try {
431        for (int i = 0; i < 10; i++) {
432          byte[] key = Bytes.add(Bytes.toBytes(k0), Bytes.toBytes(i));
433          byte[] dummyData = new byte[5000];
434          new Random().nextBytes(dummyData);
435          mobFileWriter.append(
436              new KeyValue(key, Bytes.toBytes(family), Bytes.toBytes(qf), now, Type.Put, dummyData));
437        }
438      } finally {
439        mobFileWriter.close();
440      }
441    }
442  }
443
444  /**
445   * Create mulitple partition delete files
446   */
447  private void createMobDelFile(Path basePath, int startKey) throws IOException {
448    HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
449    MobFileName mobFileName = null;
450    Date today = new Date();
451
452    byte[] startRow = Bytes.toBytes(startKey);
453
454    mobFileName = MobFileName.create(startRow, MobUtils.formatDate(today), delSuffix);
455
456    StoreFileWriter mobFileWriter =
457        new StoreFileWriter.Builder(conf, cacheConf, fs).withFileContext(meta)
458            .withFilePath(new Path(basePath, mobFileName.getFileName())).build();
459
460    long now = System.currentTimeMillis();
461    try {
462      byte[] key = Bytes.add(Bytes.toBytes(KEYS[startKey]), Bytes.toBytes(0));
463      byte[] dummyData = new byte[5000];
464      new Random().nextBytes(dummyData);
465      mobFileWriter.append(
466          new KeyValue(key, Bytes.toBytes(family), Bytes.toBytes(qf), now, Type.Delete, dummyData));
467      key = Bytes.add(Bytes.toBytes(KEYS[startKey]), Bytes.toBytes(2));
468      mobFileWriter.append(
469          new KeyValue(key, Bytes.toBytes(family), Bytes.toBytes(qf), now, Type.Delete, dummyData));
470      key = Bytes.add(Bytes.toBytes(KEYS[startKey]), Bytes.toBytes(4));
471      mobFileWriter.append(
472          new KeyValue(key, Bytes.toBytes(family), Bytes.toBytes(qf), now, Type.Delete, dummyData));
473
474    } finally {
475      mobFileWriter.close();
476    }
477  }
478
479  @Test
480  public void testCompactFilesWithoutDelFile() throws Exception {
481    String tableName = "testCompactFilesWithoutDelFile";
482    resetConf();
483    init(tableName);
484
485    createMobFile(basePath);
486
487    listFiles();
488
489    PartitionedMobCompactor compactor = new PartitionedMobCompactor(conf, fs,
490        TableName.valueOf(tableName), hcd, pool) {
491      @Override
492      public List<Path> compact(List<FileStatus> files, boolean isForceAllFiles)
493          throws IOException {
494        if (files == null || files.isEmpty()) {
495          return null;
496        }
497
498        PartitionedMobCompactionRequest request = select(files, isForceAllFiles);
499
500        // Make sure that there is no del Partitions
501        assertTrue(request.getDelPartitions().size() == 0);
502
503        // Make sure that when there is no startKey/endKey for partition.
504        for (CompactionPartition p : request.getCompactionPartitions()) {
505          assertTrue(p.getStartKey() == null);
506          assertTrue(p.getEndKey() == null);
507        }
508        return null;
509      }
510    };
511
512    compactor.compact(allFiles, true);
513  }
514
515  static class MyPartitionedMobCompactor extends PartitionedMobCompactor {
516    int delPartitionSize = 0;
517    int PartitionsIncludeDelFiles = 0;
518    CacheConfig cacheConfig = null;
519
520    MyPartitionedMobCompactor(Configuration conf, FileSystem fs, TableName tableName,
521        ColumnFamilyDescriptor column, ExecutorService pool, final int delPartitionSize,
522        final CacheConfig cacheConf, final int PartitionsIncludeDelFiles)
523        throws IOException {
524      super(conf, fs, tableName, column, pool);
525      this.delPartitionSize = delPartitionSize;
526      this.cacheConfig = cacheConf;
527      this.PartitionsIncludeDelFiles = PartitionsIncludeDelFiles;
528    }
529
530    @Override public List<Path> compact(List<FileStatus> files, boolean isForceAllFiles)
531        throws IOException {
532      if (files == null || files.isEmpty()) {
533        return null;
534      }
535      PartitionedMobCompactionRequest request = select(files, isForceAllFiles);
536
537      assertTrue(request.getDelPartitions().size() == delPartitionSize);
538      if (request.getDelPartitions().size() > 0) {
539        for (CompactionPartition p : request.getCompactionPartitions()) {
540          assertTrue(p.getStartKey() != null);
541          assertTrue(p.getEndKey() != null);
542        }
543      }
544
545      try {
546        for (CompactionDelPartition delPartition : request.getDelPartitions()) {
547          for (Path newDelPath : delPartition.listDelFiles()) {
548            HStoreFile sf =
549                new HStoreFile(fs, newDelPath, conf, this.cacheConfig, BloomType.NONE, true);
550            // pre-create reader of a del file to avoid race condition when opening the reader in
551            // each partition.
552            sf.initReader();
553            delPartition.addStoreFile(sf);
554          }
555        }
556
557        // Make sure that CompactionDelPartitions does not overlap
558        CompactionDelPartition prevDelP = null;
559        for (CompactionDelPartition delP : request.getDelPartitions()) {
560          assertTrue(
561              Bytes.compareTo(delP.getId().getStartKey(), delP.getId().getEndKey()) <= 0);
562
563          if (prevDelP != null) {
564            assertTrue(
565                Bytes.compareTo(prevDelP.getId().getEndKey(), delP.getId().getStartKey()) < 0);
566          }
567        }
568
569        int affectedPartitions = 0;
570
571        // Make sure that only del files within key range for a partition is included in compaction.
572        // compact the mob files by partitions in parallel.
573        for (CompactionPartition partition : request.getCompactionPartitions()) {
574          List<HStoreFile> delFiles = getListOfDelFilesForPartition(partition, request.getDelPartitions());
575          if (!request.getDelPartitions().isEmpty()) {
576            if (!((Bytes.compareTo(request.getDelPartitions().get(0).getId().getStartKey(),
577                partition.getEndKey()) > 0) || (Bytes.compareTo(
578                request.getDelPartitions().get(request.getDelPartitions().size() - 1).getId()
579                    .getEndKey(), partition.getStartKey()) < 0))) {
580
581              if (delFiles.size() > 0) {
582                assertTrue(delFiles.size() == 1);
583                affectedPartitions += delFiles.size();
584                assertTrue(Bytes.compareTo(partition.getStartKey(),
585                  CellUtil.cloneRow(delFiles.get(0).getLastKey().get())) <= 0);
586                assertTrue(Bytes.compareTo(partition.getEndKey(),
587                  CellUtil.cloneRow(delFiles.get(delFiles.size() - 1).getFirstKey().get())) >= 0);
588              }
589            }
590          }
591        }
592        // The del file is only included in one partition
593        assertTrue(affectedPartitions == PartitionsIncludeDelFiles);
594      } finally {
595        for (CompactionDelPartition delPartition : request.getDelPartitions()) {
596          for (HStoreFile storeFile : delPartition.getStoreFiles()) {
597            try {
598              storeFile.closeStoreFile(true);
599            } catch (IOException e) {
600              LOG.warn("Failed to close the reader on store file " + storeFile.getPath(), e);
601            }
602          }
603        }
604      }
605
606      return null;
607    }
608  }
609
610  @Test
611  public void testCompactFilesWithOneDelFile() throws Exception {
612    String tableName = "testCompactFilesWithOneDelFile";
613    resetConf();
614    init(tableName);
615
616    // Create only del file.
617    createMobFile(basePath);
618    createMobDelFile(basePath, 2);
619
620    listFiles();
621
622    MyPartitionedMobCompactor compactor = new MyPartitionedMobCompactor(conf, fs,
623        TableName.valueOf(tableName), hcd, pool, 1, cacheConf, 1);
624
625    compactor.compact(allFiles, true);
626  }
627
628  @Test
629  public void testCompactFilesWithMultiDelFiles() throws Exception {
630    String tableName = "testCompactFilesWithMultiDelFiles";
631    resetConf();
632    init(tableName);
633
634    // Create only del file.
635    createMobFile(basePath);
636    createMobDelFile(basePath, 0);
637    createMobDelFile(basePath, 1);
638    createMobDelFile(basePath, 2);
639
640    listFiles();
641
642    MyPartitionedMobCompactor compactor = new MyPartitionedMobCompactor(conf, fs,
643        TableName.valueOf(tableName), hcd, pool, 3, cacheConf, 3);
644    compactor.compact(allFiles, true);
645  }
646
647  private void testCompactDelFilesAtBatchSize(String tableName, int batchSize,
648      int delfileMaxCount)  throws Exception {
649    resetConf();
650    init(tableName);
651    // create 20 mob files.
652    createStoreFiles(basePath, family, qf, 20, Type.Put, new Date());
653    // create 13 del files
654    createStoreFiles(basePath, family, qf, 13, Type.Delete, new Date());
655    listFiles();
656
657    // set the max del file count
658    conf.setInt(MobConstants.MOB_DELFILE_MAX_COUNT, delfileMaxCount);
659    // set the mob compaction batch size
660    conf.setInt(MobConstants.MOB_COMPACTION_BATCH_SIZE, batchSize);
661    testCompactDelFiles(tableName, 1, 13, false);
662  }
663
664  /**
665   * Tests the selectFiles
666   * @param tableName the table name
667   * @param type the expected compaction type
668   * @param isForceAllFiles whether all the mob files are selected
669   * @param expected the expected start keys
670   */
671  private void testSelectFiles(String tableName, final CompactionType type,
672    final boolean isForceAllFiles, final List<String> expected) throws IOException {
673    PartitionedMobCompactor compactor = new PartitionedMobCompactor(conf, fs,
674      TableName.valueOf(tableName), hcd, pool) {
675      @Override
676      public List<Path> compact(List<FileStatus> files, boolean isForceAllFiles)
677        throws IOException {
678        if (files == null || files.isEmpty()) {
679          return null;
680        }
681        PartitionedMobCompactionRequest request = select(files, isForceAllFiles);
682
683        // Make sure that when there is no del files, there will be no startKey/endKey for partition.
684        if (request.getDelPartitions().size() == 0) {
685          for (CompactionPartition p : request.getCompactionPartitions()) {
686            assertTrue(p.getStartKey() == null);
687            assertTrue(p.getEndKey() == null);
688          }
689        }
690
691        // Make sure that CompactionDelPartitions does not overlap
692        CompactionDelPartition prevDelP = null;
693        for (CompactionDelPartition delP : request.getDelPartitions()) {
694          assertTrue(Bytes.compareTo(delP.getId().getStartKey(),
695              delP.getId().getEndKey()) <= 0);
696
697          if (prevDelP != null) {
698            assertTrue(Bytes.compareTo(prevDelP.getId().getEndKey(),
699                delP.getId().getStartKey()) < 0);
700          }
701        }
702
703        // Make sure that only del files within key range for a partition is included in compaction.
704        // compact the mob files by partitions in parallel.
705        for (CompactionPartition partition : request.getCompactionPartitions()) {
706          List<HStoreFile> delFiles = getListOfDelFilesForPartition(partition, request.getDelPartitions());
707          if (!request.getDelPartitions().isEmpty()) {
708            if (!((Bytes.compareTo(request.getDelPartitions().get(0).getId().getStartKey(),
709                partition.getEndKey()) > 0) || (Bytes.compareTo(
710                request.getDelPartitions().get(request.getDelPartitions().size() - 1).getId()
711                    .getEndKey(), partition.getStartKey()) < 0))) {
712              if (delFiles.size() > 0) {
713                assertTrue(Bytes.compareTo(partition.getStartKey(),
714                  delFiles.get(0).getFirstKey().get().getRowArray()) >= 0);
715                assertTrue(Bytes.compareTo(partition.getEndKey(),
716                  delFiles.get(delFiles.size() - 1).getLastKey().get().getRowArray()) <= 0);
717              }
718            }
719          }
720        }
721
722        // assert the compaction type
723        assertEquals(type, request.type);
724        // assert get the right partitions
725        compareCompactedPartitions(expected, request.compactionPartitions);
726        // assert get the right del files
727        compareDelFiles(request.getDelPartitions());
728        return null;
729      }
730    };
731    compactor.compact(allFiles, isForceAllFiles);
732  }
733
734  /**
735   * Tests the compacteDelFile
736   * @param tableName the table name
737   * @param expectedFileCount the expected file count
738   * @param expectedCellCount the expected cell count
739   * @param isForceAllFiles whether all the mob files are selected
740   */
741  private void testCompactDelFiles(String tableName, final int expectedFileCount,
742      final int expectedCellCount, boolean isForceAllFiles) throws IOException {
743    PartitionedMobCompactor compactor = new PartitionedMobCompactor(conf, fs,
744      TableName.valueOf(tableName), hcd, pool) {
745      @Override
746      protected List<Path> performCompaction(PartitionedMobCompactionRequest request)
747          throws IOException {
748        List<Path> delFilePaths = new ArrayList<>();
749        for (CompactionDelPartition delPartition: request.getDelPartitions()) {
750          for (Path p : delPartition.listDelFiles()) {
751            delFilePaths.add(p);
752          }
753        }
754        List<Path> newDelPaths = compactDelFiles(request, delFilePaths);
755        // assert the del files are merged.
756        assertEquals(expectedFileCount, newDelPaths.size());
757        assertEquals(expectedCellCount, countDelCellsInDelFiles(newDelPaths));
758        return null;
759      }
760    };
761    compactor.compact(allFiles, isForceAllFiles);
762  }
763
764  /**
765   * Lists the files in the path
766   */
767  private void listFiles() throws IOException {
768    for (FileStatus file : fs.listStatus(basePath)) {
769      allFiles.add(file);
770      if (file.getPath().getName().endsWith("_del")) {
771        delFiles.add(file.getPath());
772      } else {
773        mobFiles.add(file);
774      }
775    }
776  }
777
778  /**
779   * Compares the compacted partitions.
780   * @param partitions the collection of CompactedPartitions
781   */
782  private void compareCompactedPartitions(List<String> expected,
783      Collection<CompactionPartition> partitions) {
784    List<String> actualKeys = new ArrayList<>();
785    for (CompactionPartition partition : partitions) {
786      actualKeys.add(partition.getPartitionId().getStartKey());
787    }
788    Collections.sort(expected);
789    Collections.sort(actualKeys);
790    assertEquals(expected.size(), actualKeys.size());
791    for (int i = 0; i < expected.size(); i++) {
792      assertEquals(expected.get(i), actualKeys.get(i));
793    }
794  }
795
796  /**
797   * Compares the del files.
798   * @param delPartitions all del partitions
799   */
800  private void compareDelFiles(List<CompactionDelPartition> delPartitions) {
801    Map<Path, Path> delMap = new HashMap<>();
802    for (CompactionDelPartition delPartition : delPartitions) {
803      for (Path f : delPartition.listDelFiles()) {
804        delMap.put(f, f);
805      }
806    }
807    for (Path f : delFiles) {
808      assertTrue(delMap.containsKey(f));
809    }
810  }
811
812  /**
813   * Creates store files.
814   * @param basePath the path to create file
815   * @family the family name
816   * @qualifier the column qualifier
817   * @count the store file number
818   * @type the key type
819   */
820  private void createStoreFiles(Path basePath, String family, String qualifier, int count,
821      Type type, final Date date) throws IOException {
822    createStoreFiles(basePath, family, qualifier, count, type, false, date);
823  }
824
825  private void createStoreFiles(Path basePath, String family, String qualifier, int count,
826      Type type, boolean sameStartKey, final Date date) throws IOException {
827    HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
828    String startKey = "row_";
829    MobFileName mobFileName = null;
830    for (int i = 0; i < count; i++) {
831      byte[] startRow;
832      if (sameStartKey) {
833        // When creating multiple files under one partition, suffix needs to be different.
834        startRow = Bytes.toBytes(startKey);
835        mobSuffix = UUID.randomUUID().toString().replaceAll("-", "");
836        delSuffix = UUID.randomUUID().toString().replaceAll("-", "") + "_del";
837      } else {
838        startRow = Bytes.toBytes(startKey + i);
839      }
840      if(type.equals(Type.Delete)) {
841        mobFileName = MobFileName.create(startRow, MobUtils.formatDate(date), delSuffix);
842      }
843      if(type.equals(Type.Put)){
844        mobFileName = MobFileName.create(startRow, MobUtils.formatDate(date), mobSuffix);
845      }
846      StoreFileWriter mobFileWriter = new StoreFileWriter.Builder(conf, cacheConf, fs)
847      .withFileContext(meta).withFilePath(new Path(basePath, mobFileName.getFileName())).build();
848      writeStoreFile(mobFileWriter, startRow, Bytes.toBytes(family), Bytes.toBytes(qualifier),
849          type, (i+1)*1000);
850    }
851  }
852
853  /**
854   * Writes data to store file.
855   * @param writer the store file writer
856   * @param row the row key
857   * @param family the family name
858   * @param qualifier the column qualifier
859   * @param type the key type
860   * @param size the size of value
861   */
862  private static void writeStoreFile(final StoreFileWriter writer, byte[]row, byte[] family,
863      byte[] qualifier, Type type, int size) throws IOException {
864    long now = System.currentTimeMillis();
865    try {
866      byte[] dummyData = new byte[size];
867      new Random().nextBytes(dummyData);
868      writer.append(new KeyValue(row, family, qualifier, now, type, dummyData));
869    } finally {
870      writer.close();
871    }
872  }
873
874  /**
875   * Gets the number of del cell in the del files
876   * @param paths the del file paths
877   * @return the cell size
878   */
879  private int countDelCellsInDelFiles(List<Path> paths) throws IOException {
880    List<HStoreFile> sfs = new ArrayList<>();
881    int size = 0;
882    for (Path path : paths) {
883      HStoreFile sf = new HStoreFile(fs, path, conf, cacheConf, BloomType.NONE, true);
884      sfs.add(sf);
885    }
886    List<KeyValueScanner> scanners = new ArrayList<>(StoreFileScanner.getScannersForStoreFiles(sfs,
887      false, true, false, false, HConstants.LATEST_TIMESTAMP));
888    long timeToPurgeDeletes = Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0);
889    long ttl = HStore.determineTTLFromFamily(hcd);
890    ScanInfo scanInfo = new ScanInfo(conf, hcd, ttl, timeToPurgeDeletes, CellComparatorImpl.COMPARATOR);
891    StoreScanner scanner = new StoreScanner(scanInfo, ScanType.COMPACT_RETAIN_DELETES, scanners);
892    List<Cell> results = new ArrayList<>();
893    boolean hasMore = true;
894
895    while (hasMore) {
896      hasMore = scanner.next(results);
897      size += results.size();
898      results.clear();
899    }
900    scanner.close();
901    return size;
902  }
903
904  private static ExecutorService createThreadPool() {
905    int maxThreads = 10;
906    long keepAliveTime = 60;
907    final SynchronousQueue<Runnable> queue = new SynchronousQueue<>();
908    ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime,
909      TimeUnit.SECONDS, queue, Threads.newDaemonThreadFactory("MobFileCompactionChore"),
910      new RejectedExecutionHandler() {
911        @Override
912        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
913          try {
914            // waiting for a thread to pick up instead of throwing exceptions.
915            queue.put(r);
916          } catch (InterruptedException e) {
917            throw new RejectedExecutionException(e);
918          }
919        }
920      });
921    ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
922    return pool;
923  }
924
925  /**
926   * Resets the configuration.
927   */
928  private void resetConf() {
929    conf.setLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD,
930      MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD);
931    conf.setInt(MobConstants.MOB_DELFILE_MAX_COUNT, MobConstants.DEFAULT_MOB_DELFILE_MAX_COUNT);
932    conf.setInt(MobConstants.MOB_COMPACTION_BATCH_SIZE,
933      MobConstants.DEFAULT_MOB_COMPACTION_BATCH_SIZE);
934  }
935
936  /**
937   * The customized Distributed File System Implementation
938   */
939  static class FaultyDistributedFileSystem extends DistributedFileSystem {
940    private volatile boolean throwException = false;
941
942    public FaultyDistributedFileSystem() {
943      super();
944    }
945
946    public void setThrowException(boolean throwException) {
947      this.throwException = throwException;
948    }
949
950    @Override
951    public boolean rename(Path src, Path dst) throws IOException {
952      if (throwException) {
953        throw new IOException("No more files allowed");
954      }
955      return super.rename(src, dst);
956    }
957  }
958}