001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mob.compactions;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.text.ParseException;
025import java.util.ArrayList;
026import java.util.Calendar;
027import java.util.Collection;
028import java.util.Collections;
029import java.util.Date;
030import java.util.HashMap;
031import java.util.List;
032import java.util.Map;
033import java.util.Random;
034import java.util.concurrent.ExecutorService;
035import java.util.concurrent.RejectedExecutionException;
036import java.util.concurrent.RejectedExecutionHandler;
037import java.util.concurrent.SynchronousQueue;
038import java.util.concurrent.ThreadPoolExecutor;
039import java.util.concurrent.TimeUnit;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.fs.FileStatus;
042import org.apache.hadoop.fs.FileSystem;
043import org.apache.hadoop.fs.Path;
044import org.apache.hadoop.hbase.Cell;
045import org.apache.hadoop.hbase.CellComparatorImpl;
046import org.apache.hadoop.hbase.CellUtil;
047import org.apache.hadoop.hbase.HBaseClassTestRule;
048import org.apache.hadoop.hbase.HBaseTestingUtility;
049import org.apache.hadoop.hbase.HColumnDescriptor;
050import org.apache.hadoop.hbase.HConstants;
051import org.apache.hadoop.hbase.KeyValue;
052import org.apache.hadoop.hbase.KeyValue.Type;
053import org.apache.hadoop.hbase.TableName;
054import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
055import org.apache.hadoop.hbase.client.MobCompactPartitionPolicy;
056import org.apache.hadoop.hbase.io.hfile.CacheConfig;
057import org.apache.hadoop.hbase.io.hfile.HFileContext;
058import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
059import org.apache.hadoop.hbase.mob.MobConstants;
060import org.apache.hadoop.hbase.mob.MobFileName;
061import org.apache.hadoop.hbase.mob.MobUtils;
062import org.apache.hadoop.hbase.mob.compactions.MobCompactionRequest.CompactionType;
063import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionDelPartition;
064import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartition;
065import org.apache.hadoop.hbase.regionserver.BloomType;
066import org.apache.hadoop.hbase.regionserver.HStore;
067import org.apache.hadoop.hbase.regionserver.HStoreFile;
068import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
069import org.apache.hadoop.hbase.regionserver.ScanInfo;
070import org.apache.hadoop.hbase.regionserver.ScanType;
071import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
072import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
073import org.apache.hadoop.hbase.regionserver.StoreScanner;
074import org.apache.hadoop.hbase.testclassification.LargeTests;
075import org.apache.hadoop.hbase.util.Bytes;
076import org.apache.hadoop.hbase.util.CommonFSUtils;
077import org.apache.hadoop.hbase.util.Threads;
078import org.apache.hadoop.hdfs.DistributedFileSystem;
079import org.junit.AfterClass;
080import org.junit.BeforeClass;
081import org.junit.ClassRule;
082import org.junit.Rule;
083import org.junit.Test;
084import org.junit.experimental.categories.Category;
085import org.junit.rules.TestName;
086import org.slf4j.Logger;
087import org.slf4j.LoggerFactory;
088
089@Category(LargeTests.class)
090public class TestPartitionedMobCompactor {
091
092  @ClassRule
093  public static final HBaseClassTestRule CLASS_RULE =
094      HBaseClassTestRule.forClass(TestPartitionedMobCompactor.class);
095
096  private static final Logger LOG = LoggerFactory.getLogger(TestPartitionedMobCompactor.class);
097  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
098  private final static String family = "family";
099  private final static String qf = "qf";
100  private final long DAY_IN_MS = 1000 * 60 * 60 * 24;
101  private static byte[] KEYS = Bytes.toBytes("012");
102  private HColumnDescriptor hcd = new HColumnDescriptor(family);
103  private Configuration conf = TEST_UTIL.getConfiguration();
104  private CacheConfig cacheConf = new CacheConfig(conf);
105  private FileSystem fs;
106  private List<FileStatus> mobFiles = new ArrayList<>();
107  private List<Path> delFiles = new ArrayList<>();
108  private List<FileStatus> allFiles = new ArrayList<>();
109  private Path basePath;
110  private String mobSuffix;
111  private String delSuffix;
112  private static ExecutorService pool;
113
114  @Rule
115  public TestName name = new TestName();
116
117  @BeforeClass
118  public static void setUpBeforeClass() throws Exception {
119    TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
120    // Inject our customized DistributedFileSystem
121    TEST_UTIL.getConfiguration().setClass("fs.hdfs.impl", FaultyDistributedFileSystem.class,
122        DistributedFileSystem.class);
123    TEST_UTIL.startMiniCluster(1);
124    pool = createThreadPool();
125  }
126
127  @AfterClass
128  public static void tearDownAfterClass() throws Exception {
129    pool.shutdown();
130    TEST_UTIL.shutdownMiniCluster();
131  }
132
133  private void init(String tableName) throws Exception {
134    fs = FileSystem.get(conf);
135    Path testDir = CommonFSUtils.getRootDir(conf);
136    Path mobTestDir = new Path(testDir, MobConstants.MOB_DIR_NAME);
137    basePath = new Path(new Path(mobTestDir, tableName), family);
138    mobSuffix = TEST_UTIL.getRandomUUID().toString().replaceAll("-", "");
139    delSuffix = TEST_UTIL.getRandomUUID().toString().replaceAll("-", "") + "_del";
140    allFiles.clear();
141    mobFiles.clear();
142    delFiles.clear();
143  }
144
145  @Test
146  public void testCompactionSelectAllFilesWeeklyPolicy() throws Exception {
147    String tableName = "testCompactionSelectAllFilesWeeklyPolicy";
148    testCompactionAtMergeSize(tableName, MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD,
149        CompactionType.ALL_FILES, false, false, new Date(), MobCompactPartitionPolicy.WEEKLY, 1);
150  }
151
152  @Test
153  public void testCompactionSelectPartFilesWeeklyPolicy() throws Exception {
154    String tableName = "testCompactionSelectPartFilesWeeklyPolicy";
155    testCompactionAtMergeSize(tableName, 4000, CompactionType.PART_FILES, false, false,
156        new Date(), MobCompactPartitionPolicy.WEEKLY, 1);
157  }
158
159  @Test
160  public void testCompactionSelectPartFilesWeeklyPolicyWithPastWeek() throws Exception {
161    String tableName = "testCompactionSelectPartFilesWeeklyPolicyWithPastWeek";
162    Date dateLastWeek = new Date(System.currentTimeMillis() - (7 * DAY_IN_MS));
163    testCompactionAtMergeSize(tableName, 700, CompactionType.PART_FILES, false, false, dateLastWeek,
164        MobCompactPartitionPolicy.WEEKLY, 7);
165  }
166
167  @Test
168  public void testCompactionSelectAllFilesWeeklyPolicyWithPastWeek() throws Exception {
169    String tableName = "testCompactionSelectAllFilesWeeklyPolicyWithPastWeek";
170    Date dateLastWeek = new Date(System.currentTimeMillis() - (7 * DAY_IN_MS));
171    testCompactionAtMergeSize(tableName, 3000, CompactionType.ALL_FILES,
172        false, false, dateLastWeek, MobCompactPartitionPolicy.WEEKLY, 7);
173  }
174
175  @Test
176  public void testCompactionSelectAllFilesMonthlyPolicy() throws Exception {
177    String tableName = "testCompactionSelectAllFilesMonthlyPolicy";
178    Date dateLastWeek = new Date(System.currentTimeMillis() - (7 * DAY_IN_MS));
179    testCompactionAtMergeSize(tableName, MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD,
180        CompactionType.ALL_FILES, false, false, dateLastWeek,
181        MobCompactPartitionPolicy.MONTHLY, 7);
182  }
183
184  @Test
185  public void testCompactionSelectNoFilesWithinCurrentWeekMonthlyPolicy() throws Exception {
186    String tableName = "testCompactionSelectNoFilesWithinCurrentWeekMonthlyPolicy";
187    testCompactionAtMergeSize(tableName, MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD,
188        CompactionType.PART_FILES, false, false, new Date(), MobCompactPartitionPolicy.MONTHLY, 1);
189  }
190
191  @Test
192  public void testCompactionSelectPartFilesMonthlyPolicy() throws Exception {
193    String tableName = "testCompactionSelectPartFilesMonthlyPolicy";
194    testCompactionAtMergeSize(tableName, 4000, CompactionType.PART_FILES, false, false,
195        new Date(), MobCompactPartitionPolicy.MONTHLY, 1);
196  }
197
198  @Test
199  public void testCompactionSelectPartFilesMonthlyPolicyWithPastWeek() throws Exception {
200    String tableName = "testCompactionSelectPartFilesMonthlyPolicyWithPastWeek";
201    Date dateLastWeek = new Date(System.currentTimeMillis() - (7 * DAY_IN_MS));
202    Calendar calendar =  Calendar.getInstance();
203    Date firstDayOfCurrentMonth = MobUtils.getFirstDayOfMonth(calendar, new Date());
204    CompactionType type = CompactionType.PART_FILES;
205    long mergeSizeMultiFactor = 7;
206
207
208    // The dateLastWeek may not really be last week, suppose that it runs at 2/1/2017, it is going
209    // to be last month and the monthly policy is going to be applied here.
210    if (dateLastWeek.before(firstDayOfCurrentMonth)) {
211      type = CompactionType.ALL_FILES;
212      mergeSizeMultiFactor *= 4;
213    }
214
215    testCompactionAtMergeSize(tableName, 700, type, false, false, dateLastWeek,
216        MobCompactPartitionPolicy.MONTHLY, mergeSizeMultiFactor);
217  }
218
219  @Test
220  public void testCompactionSelectAllFilesMonthlyPolicyWithPastWeek() throws Exception {
221    String tableName = "testCompactionSelectAllFilesMonthlyPolicyWithPastWeek";
222    Date dateLastWeek = new Date(System.currentTimeMillis() - (7 * DAY_IN_MS));
223
224    testCompactionAtMergeSize(tableName, 3000, CompactionType.ALL_FILES,
225        false, false, dateLastWeek, MobCompactPartitionPolicy.MONTHLY, 7);
226  }
227
228  @Test
229  public void testCompactionSelectPartFilesMonthlyPolicyWithPastMonth() throws Exception {
230    String tableName = "testCompactionSelectPartFilesMonthlyPolicyWithPastMonth";
231
232    // back 5 weeks, it is going to be a past month
233    Date dateLastMonth = new Date(System.currentTimeMillis() - (7 * 5 * DAY_IN_MS));
234    testCompactionAtMergeSize(tableName, 200, CompactionType.PART_FILES, false, false, dateLastMonth,
235        MobCompactPartitionPolicy.MONTHLY, 28);
236  }
237
238  @Test
239  public void testCompactionSelectAllFilesMonthlyPolicyWithPastMonth() throws Exception {
240    String tableName = "testCompactionSelectAllFilesMonthlyPolicyWithPastMonth";
241
242    // back 5 weeks, it is going to be a past month
243    Date dateLastMonth = new Date(System.currentTimeMillis() - (7 * 5 * DAY_IN_MS));
244    testCompactionAtMergeSize(tableName, 750, CompactionType.ALL_FILES,
245        false, false, dateLastMonth, MobCompactPartitionPolicy.MONTHLY, 28);
246  }
247
248  @Test
249  public void testCompactionSelectWithAllFiles() throws Exception {
250    String tableName = "testCompactionSelectWithAllFiles";
251    // If there is only 1 file, it will not be compacted with _del files, so
252    // It wont be CompactionType.ALL_FILES in this case, do not create with _del files.
253    testCompactionAtMergeSize(tableName, MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD,
254        CompactionType.ALL_FILES, false, false);
255  }
256
257  @Test
258  public void testCompactionSelectWithPartFiles() throws Exception {
259    String tableName = "testCompactionSelectWithPartFiles";
260    testCompactionAtMergeSize(tableName, 4000, CompactionType.PART_FILES, false);
261  }
262
263  @Test
264  public void testCompactionSelectWithForceAllFiles() throws Exception {
265    String tableName = "testCompactionSelectWithForceAllFiles";
266    testCompactionAtMergeSize(tableName, Long.MAX_VALUE, CompactionType.ALL_FILES, true);
267  }
268
269  private void testCompactionAtMergeSize(final String tableName,
270      final long mergeSize, final CompactionType type, final boolean isForceAllFiles)
271      throws Exception {
272    testCompactionAtMergeSize(tableName, mergeSize, type, isForceAllFiles, true);
273  }
274
275  private void testCompactionAtMergeSize(final String tableName,
276      final long mergeSize, final CompactionType type, final boolean isForceAllFiles,
277      final boolean createDelFiles)
278      throws Exception {
279    Date date = new Date();
280    testCompactionAtMergeSize(tableName, mergeSize, type, isForceAllFiles, createDelFiles, date);
281  }
282
283  private void testCompactionAtMergeSize(final String tableName,
284      final long mergeSize, final CompactionType type, final boolean isForceAllFiles,
285      final boolean createDelFiles, final Date date)
286      throws Exception {
287    testCompactionAtMergeSize(tableName, mergeSize, type, isForceAllFiles, createDelFiles, date,
288        MobCompactPartitionPolicy.DAILY, 1);
289  }
290
291  private void testCompactionAtMergeSize(final String tableName,
292      final long mergeSize, final CompactionType type, final boolean isForceAllFiles,
293      final boolean createDelFiles, final Date date, final MobCompactPartitionPolicy policy,
294      final long mergeSizeMultiFactor)
295      throws Exception {
296    resetConf();
297    init(tableName);
298    int count = 10;
299    // create 10 mob files.
300    createStoreFiles(basePath, family, qf, count, Type.Put, date);
301
302    if (createDelFiles) {
303      // create 10 del files
304      createStoreFiles(basePath, family, qf, count, Type.Delete, date);
305    }
306
307    Calendar calendar =  Calendar.getInstance();
308    Date firstDayOfCurrentWeek = MobUtils.getFirstDayOfWeek(calendar, new Date());
309
310    listFiles();
311    List<String> expectedStartKeys = new ArrayList<>();
312    for(FileStatus file : mobFiles) {
313      if(file.getLen() < mergeSize * mergeSizeMultiFactor) {
314        String fileName = file.getPath().getName();
315        String startKey = fileName.substring(0, 32);
316
317        // If the policy is monthly and files are in current week, they will be skipped
318        // in minor compcation.
319        boolean skipCompaction = false;
320        if (policy == MobCompactPartitionPolicy.MONTHLY) {
321          String fileDateStr = MobFileName.getDateFromName(fileName);
322          Date fileDate;
323          try {
324            fileDate = MobUtils.parseDate(fileDateStr);
325          } catch (ParseException e)  {
326            LOG.warn("Failed to parse date " + fileDateStr, e);
327            fileDate = new Date();
328          }
329          if (!fileDate.before(firstDayOfCurrentWeek)) {
330            skipCompaction = true;
331          }
332        }
333
334        // If it is not an major mob compaction and del files are there,
335        // these mob files wont be compacted.
336        if (isForceAllFiles || (!createDelFiles && !skipCompaction)) {
337          expectedStartKeys.add(startKey);
338        }
339      }
340    }
341
342    // Set the policy
343    this.hcd.setMobCompactPartitionPolicy(policy);
344    // set the mob compaction mergeable threshold
345    conf.setLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD, mergeSize);
346    testSelectFiles(tableName, type, isForceAllFiles, expectedStartKeys);
347    // go back to the default daily policy
348    this.hcd.setMobCompactPartitionPolicy(MobCompactPartitionPolicy.DAILY);
349  }
350
351  @Test
352  public void testCompactDelFilesWithDefaultBatchSize() throws Exception {
353    testCompactDelFilesAtBatchSize(name.getMethodName(), MobConstants.DEFAULT_MOB_COMPACTION_BATCH_SIZE,
354        MobConstants.DEFAULT_MOB_DELFILE_MAX_COUNT);
355  }
356
357  @Test
358  public void testCompactDelFilesWithSmallBatchSize() throws Exception {
359    testCompactDelFilesAtBatchSize(name.getMethodName(), 4, MobConstants.DEFAULT_MOB_DELFILE_MAX_COUNT);
360  }
361
362  @Test
363  public void testCompactDelFilesChangeMaxDelFileCount() throws Exception {
364    testCompactDelFilesAtBatchSize(name.getMethodName(), 4, 2);
365  }
366
367  @Test
368  public void testCompactFilesWithDstDirFull() throws Exception {
369    String tableName = name.getMethodName();
370    fs = FileSystem.get(conf);
371    FaultyDistributedFileSystem faultyFs = (FaultyDistributedFileSystem)fs;
372    Path testDir = CommonFSUtils.getRootDir(conf);
373    Path mobTestDir = new Path(testDir, MobConstants.MOB_DIR_NAME);
374    basePath = new Path(new Path(mobTestDir, tableName), family);
375
376    try {
377      int count = 2;
378      // create 2 mob files.
379      createStoreFiles(basePath, family, qf, count, Type.Put, true, new Date());
380      listFiles();
381
382      TableName tName = TableName.valueOf(tableName);
383      MobCompactor compactor = new PartitionedMobCompactor(conf, faultyFs, tName, hcd, pool);
384      faultyFs.setThrowException(true);
385      try {
386        compactor.compact(allFiles, true);
387      } catch (IOException e) {
388        System.out.println("Expected exception, ignore");
389      }
390
391      // Verify that all the files in tmp directory are cleaned up
392      Path tempPath = new Path(MobUtils.getMobHome(conf), MobConstants.TEMP_DIR_NAME);
393      FileStatus[] ls = faultyFs.listStatus(tempPath);
394
395      // Only .bulkload under this directory
396      assertTrue(ls.length == 1);
397      assertTrue(MobConstants.BULKLOAD_DIR_NAME.equalsIgnoreCase(ls[0].getPath().getName()));
398
399      Path bulkloadPath = new Path(tempPath, new Path(MobConstants.BULKLOAD_DIR_NAME, new Path(
400          tName.getNamespaceAsString(), tName.getQualifierAsString())));
401
402      // Nothing in bulkLoad directory
403      FileStatus[] lsBulkload = faultyFs.listStatus(bulkloadPath);
404      assertTrue(lsBulkload.length == 0);
405
406    } finally {
407      faultyFs.setThrowException(false);
408    }
409  }
410
411  /**
412   * Create mulitple partition files
413   */
414  private void createMobFile(Path basePath) throws IOException {
415    HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
416    MobFileName mobFileName = null;
417    int ii = 0;
418    Date today = new Date();
419    for (byte k0 : KEYS) {
420      byte[] startRow = Bytes.toBytes(ii++);
421
422      mobFileName = MobFileName.create(startRow, MobUtils.formatDate(today), mobSuffix);
423
424      StoreFileWriter mobFileWriter =
425          new StoreFileWriter.Builder(conf, cacheConf, fs).withFileContext(meta)
426              .withFilePath(new Path(basePath, mobFileName.getFileName())).build();
427
428      long now = System.currentTimeMillis();
429      try {
430        for (int i = 0; i < 10; i++) {
431          byte[] key = Bytes.add(Bytes.toBytes(k0), Bytes.toBytes(i));
432          byte[] dummyData = new byte[5000];
433          new Random().nextBytes(dummyData);
434          mobFileWriter.append(
435              new KeyValue(key, Bytes.toBytes(family), Bytes.toBytes(qf), now, Type.Put, dummyData));
436        }
437      } finally {
438        mobFileWriter.close();
439      }
440    }
441  }
442
443  /**
444   * Create mulitple partition delete files
445   */
446  private void createMobDelFile(Path basePath, int startKey) throws IOException {
447    HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
448    MobFileName mobFileName = null;
449    Date today = new Date();
450
451    byte[] startRow = Bytes.toBytes(startKey);
452
453    mobFileName = MobFileName.create(startRow, MobUtils.formatDate(today), delSuffix);
454
455    StoreFileWriter mobFileWriter =
456        new StoreFileWriter.Builder(conf, cacheConf, fs).withFileContext(meta)
457            .withFilePath(new Path(basePath, mobFileName.getFileName())).build();
458
459    long now = System.currentTimeMillis();
460    try {
461      byte[] key = Bytes.add(Bytes.toBytes(KEYS[startKey]), Bytes.toBytes(0));
462      byte[] dummyData = new byte[5000];
463      new Random().nextBytes(dummyData);
464      mobFileWriter.append(
465          new KeyValue(key, Bytes.toBytes(family), Bytes.toBytes(qf), now, Type.Delete, dummyData));
466      key = Bytes.add(Bytes.toBytes(KEYS[startKey]), Bytes.toBytes(2));
467      mobFileWriter.append(
468          new KeyValue(key, Bytes.toBytes(family), Bytes.toBytes(qf), now, Type.Delete, dummyData));
469      key = Bytes.add(Bytes.toBytes(KEYS[startKey]), Bytes.toBytes(4));
470      mobFileWriter.append(
471          new KeyValue(key, Bytes.toBytes(family), Bytes.toBytes(qf), now, Type.Delete, dummyData));
472
473    } finally {
474      mobFileWriter.close();
475    }
476  }
477
478  @Test
479  public void testCompactFilesWithoutDelFile() throws Exception {
480    String tableName = "testCompactFilesWithoutDelFile";
481    resetConf();
482    init(tableName);
483
484    createMobFile(basePath);
485
486    listFiles();
487
488    PartitionedMobCompactor compactor = new PartitionedMobCompactor(conf, fs,
489        TableName.valueOf(tableName), hcd, pool) {
490      @Override
491      public List<Path> compact(List<FileStatus> files, boolean isForceAllFiles)
492          throws IOException {
493        if (files == null || files.isEmpty()) {
494          return null;
495        }
496
497        PartitionedMobCompactionRequest request = select(files, isForceAllFiles);
498
499        // Make sure that there is no del Partitions
500        assertTrue(request.getDelPartitions().size() == 0);
501
502        // Make sure that when there is no startKey/endKey for partition.
503        for (CompactionPartition p : request.getCompactionPartitions()) {
504          assertTrue(p.getStartKey() == null);
505          assertTrue(p.getEndKey() == null);
506        }
507        return null;
508      }
509    };
510
511    compactor.compact(allFiles, true);
512  }
513
514  static class MyPartitionedMobCompactor extends PartitionedMobCompactor {
515    int delPartitionSize = 0;
516    int PartitionsIncludeDelFiles = 0;
517    CacheConfig cacheConfig = null;
518
519    MyPartitionedMobCompactor(Configuration conf, FileSystem fs, TableName tableName,
520        ColumnFamilyDescriptor column, ExecutorService pool, final int delPartitionSize,
521        final CacheConfig cacheConf, final int PartitionsIncludeDelFiles)
522        throws IOException {
523      super(conf, fs, tableName, column, pool);
524      this.delPartitionSize = delPartitionSize;
525      this.cacheConfig = cacheConf;
526      this.PartitionsIncludeDelFiles = PartitionsIncludeDelFiles;
527    }
528
529    @Override public List<Path> compact(List<FileStatus> files, boolean isForceAllFiles)
530        throws IOException {
531      if (files == null || files.isEmpty()) {
532        return null;
533      }
534      PartitionedMobCompactionRequest request = select(files, isForceAllFiles);
535
536      assertTrue(request.getDelPartitions().size() == delPartitionSize);
537      if (request.getDelPartitions().size() > 0) {
538        for (CompactionPartition p : request.getCompactionPartitions()) {
539          assertTrue(p.getStartKey() != null);
540          assertTrue(p.getEndKey() != null);
541        }
542      }
543
544      try {
545        for (CompactionDelPartition delPartition : request.getDelPartitions()) {
546          for (Path newDelPath : delPartition.listDelFiles()) {
547            HStoreFile sf =
548                new HStoreFile(fs, newDelPath, conf, this.cacheConfig, BloomType.NONE, true);
549            // pre-create reader of a del file to avoid race condition when opening the reader in
550            // each partition.
551            sf.initReader();
552            delPartition.addStoreFile(sf);
553          }
554        }
555
556        // Make sure that CompactionDelPartitions does not overlap
557        CompactionDelPartition prevDelP = null;
558        for (CompactionDelPartition delP : request.getDelPartitions()) {
559          assertTrue(
560              Bytes.compareTo(delP.getId().getStartKey(), delP.getId().getEndKey()) <= 0);
561
562          if (prevDelP != null) {
563            assertTrue(
564                Bytes.compareTo(prevDelP.getId().getEndKey(), delP.getId().getStartKey()) < 0);
565          }
566        }
567
568        int affectedPartitions = 0;
569
570        // Make sure that only del files within key range for a partition is included in compaction.
571        // compact the mob files by partitions in parallel.
572        for (CompactionPartition partition : request.getCompactionPartitions()) {
573          List<HStoreFile> delFiles = getListOfDelFilesForPartition(partition, request.getDelPartitions());
574          if (!request.getDelPartitions().isEmpty()) {
575            if (!((Bytes.compareTo(request.getDelPartitions().get(0).getId().getStartKey(),
576                partition.getEndKey()) > 0) || (Bytes.compareTo(
577                request.getDelPartitions().get(request.getDelPartitions().size() - 1).getId()
578                    .getEndKey(), partition.getStartKey()) < 0))) {
579
580              if (delFiles.size() > 0) {
581                assertTrue(delFiles.size() == 1);
582                affectedPartitions += delFiles.size();
583                assertTrue(Bytes.compareTo(partition.getStartKey(),
584                  CellUtil.cloneRow(delFiles.get(0).getLastKey().get())) <= 0);
585                assertTrue(Bytes.compareTo(partition.getEndKey(),
586                  CellUtil.cloneRow(delFiles.get(delFiles.size() - 1).getFirstKey().get())) >= 0);
587              }
588            }
589          }
590        }
591        // The del file is only included in one partition
592        assertTrue(affectedPartitions == PartitionsIncludeDelFiles);
593      } finally {
594        for (CompactionDelPartition delPartition : request.getDelPartitions()) {
595          for (HStoreFile storeFile : delPartition.getStoreFiles()) {
596            try {
597              storeFile.closeStoreFile(true);
598            } catch (IOException e) {
599              LOG.warn("Failed to close the reader on store file " + storeFile.getPath(), e);
600            }
601          }
602        }
603      }
604
605      return null;
606    }
607  }
608
609  @Test
610  public void testCompactFilesWithOneDelFile() throws Exception {
611    String tableName = "testCompactFilesWithOneDelFile";
612    resetConf();
613    init(tableName);
614
615    // Create only del file.
616    createMobFile(basePath);
617    createMobDelFile(basePath, 2);
618
619    listFiles();
620
621    MyPartitionedMobCompactor compactor = new MyPartitionedMobCompactor(conf, fs,
622        TableName.valueOf(tableName), hcd, pool, 1, cacheConf, 1);
623
624    compactor.compact(allFiles, true);
625  }
626
627  @Test
628  public void testCompactFilesWithMultiDelFiles() throws Exception {
629    String tableName = "testCompactFilesWithMultiDelFiles";
630    resetConf();
631    init(tableName);
632
633    // Create only del file.
634    createMobFile(basePath);
635    createMobDelFile(basePath, 0);
636    createMobDelFile(basePath, 1);
637    createMobDelFile(basePath, 2);
638
639    listFiles();
640
641    MyPartitionedMobCompactor compactor = new MyPartitionedMobCompactor(conf, fs,
642        TableName.valueOf(tableName), hcd, pool, 3, cacheConf, 3);
643    compactor.compact(allFiles, true);
644  }
645
646  private void testCompactDelFilesAtBatchSize(String tableName, int batchSize,
647      int delfileMaxCount)  throws Exception {
648    resetConf();
649    init(tableName);
650    // create 20 mob files.
651    createStoreFiles(basePath, family, qf, 20, Type.Put, new Date());
652    // create 13 del files
653    createStoreFiles(basePath, family, qf, 13, Type.Delete, new Date());
654    listFiles();
655
656    // set the max del file count
657    conf.setInt(MobConstants.MOB_DELFILE_MAX_COUNT, delfileMaxCount);
658    // set the mob compaction batch size
659    conf.setInt(MobConstants.MOB_COMPACTION_BATCH_SIZE, batchSize);
660    testCompactDelFiles(tableName, 1, 13, false);
661  }
662
663  /**
664   * Tests the selectFiles
665   * @param tableName the table name
666   * @param type the expected compaction type
667   * @param isForceAllFiles whether all the mob files are selected
668   * @param expected the expected start keys
669   */
670  private void testSelectFiles(String tableName, final CompactionType type,
671    final boolean isForceAllFiles, final List<String> expected) throws IOException {
672    PartitionedMobCompactor compactor = new PartitionedMobCompactor(conf, fs,
673      TableName.valueOf(tableName), hcd, pool) {
674      @Override
675      public List<Path> compact(List<FileStatus> files, boolean isForceAllFiles)
676        throws IOException {
677        if (files == null || files.isEmpty()) {
678          return null;
679        }
680        PartitionedMobCompactionRequest request = select(files, isForceAllFiles);
681
682        // Make sure that when there is no del files, there will be no startKey/endKey for partition.
683        if (request.getDelPartitions().size() == 0) {
684          for (CompactionPartition p : request.getCompactionPartitions()) {
685            assertTrue(p.getStartKey() == null);
686            assertTrue(p.getEndKey() == null);
687          }
688        }
689
690        // Make sure that CompactionDelPartitions does not overlap
691        CompactionDelPartition prevDelP = null;
692        for (CompactionDelPartition delP : request.getDelPartitions()) {
693          assertTrue(Bytes.compareTo(delP.getId().getStartKey(),
694              delP.getId().getEndKey()) <= 0);
695
696          if (prevDelP != null) {
697            assertTrue(Bytes.compareTo(prevDelP.getId().getEndKey(),
698                delP.getId().getStartKey()) < 0);
699          }
700        }
701
702        // Make sure that only del files within key range for a partition is included in compaction.
703        // compact the mob files by partitions in parallel.
704        for (CompactionPartition partition : request.getCompactionPartitions()) {
705          List<HStoreFile> delFiles = getListOfDelFilesForPartition(partition, request.getDelPartitions());
706          if (!request.getDelPartitions().isEmpty()) {
707            if (!((Bytes.compareTo(request.getDelPartitions().get(0).getId().getStartKey(),
708                partition.getEndKey()) > 0) || (Bytes.compareTo(
709                request.getDelPartitions().get(request.getDelPartitions().size() - 1).getId()
710                    .getEndKey(), partition.getStartKey()) < 0))) {
711              if (delFiles.size() > 0) {
712                assertTrue(Bytes.compareTo(partition.getStartKey(),
713                  delFiles.get(0).getFirstKey().get().getRowArray()) >= 0);
714                assertTrue(Bytes.compareTo(partition.getEndKey(),
715                  delFiles.get(delFiles.size() - 1).getLastKey().get().getRowArray()) <= 0);
716              }
717            }
718          }
719        }
720
721        // assert the compaction type
722        assertEquals(type, request.type);
723        // assert get the right partitions
724        compareCompactedPartitions(expected, request.compactionPartitions);
725        // assert get the right del files
726        compareDelFiles(request.getDelPartitions());
727        return null;
728      }
729    };
730    compactor.compact(allFiles, isForceAllFiles);
731  }
732
733  /**
734   * Tests the compacteDelFile
735   * @param tableName the table name
736   * @param expectedFileCount the expected file count
737   * @param expectedCellCount the expected cell count
738   * @param isForceAllFiles whether all the mob files are selected
739   */
740  private void testCompactDelFiles(String tableName, final int expectedFileCount,
741      final int expectedCellCount, boolean isForceAllFiles) throws IOException {
742    PartitionedMobCompactor compactor = new PartitionedMobCompactor(conf, fs,
743      TableName.valueOf(tableName), hcd, pool) {
744      @Override
745      protected List<Path> performCompaction(PartitionedMobCompactionRequest request)
746          throws IOException {
747        List<Path> delFilePaths = new ArrayList<>();
748        for (CompactionDelPartition delPartition: request.getDelPartitions()) {
749          for (Path p : delPartition.listDelFiles()) {
750            delFilePaths.add(p);
751          }
752        }
753        List<Path> newDelPaths = compactDelFiles(request, delFilePaths);
754        // assert the del files are merged.
755        assertEquals(expectedFileCount, newDelPaths.size());
756        assertEquals(expectedCellCount, countDelCellsInDelFiles(newDelPaths));
757        return null;
758      }
759    };
760    compactor.compact(allFiles, isForceAllFiles);
761  }
762
763  /**
764   * Lists the files in the path
765   */
766  private void listFiles() throws IOException {
767    for (FileStatus file : fs.listStatus(basePath)) {
768      allFiles.add(file);
769      if (file.getPath().getName().endsWith("_del")) {
770        delFiles.add(file.getPath());
771      } else {
772        mobFiles.add(file);
773      }
774    }
775  }
776
777  /**
778   * Compares the compacted partitions.
779   * @param partitions the collection of CompactedPartitions
780   */
781  private void compareCompactedPartitions(List<String> expected,
782      Collection<CompactionPartition> partitions) {
783    List<String> actualKeys = new ArrayList<>();
784    for (CompactionPartition partition : partitions) {
785      actualKeys.add(partition.getPartitionId().getStartKey());
786    }
787    Collections.sort(expected);
788    Collections.sort(actualKeys);
789    assertEquals(expected.size(), actualKeys.size());
790    for (int i = 0; i < expected.size(); i++) {
791      assertEquals(expected.get(i), actualKeys.get(i));
792    }
793  }
794
795  /**
796   * Compares the del files.
797   * @param delPartitions all del partitions
798   */
799  private void compareDelFiles(List<CompactionDelPartition> delPartitions) {
800    Map<Path, Path> delMap = new HashMap<>();
801    for (CompactionDelPartition delPartition : delPartitions) {
802      for (Path f : delPartition.listDelFiles()) {
803        delMap.put(f, f);
804      }
805    }
806    for (Path f : delFiles) {
807      assertTrue(delMap.containsKey(f));
808    }
809  }
810
811  /**
812   * Creates store files.
813   * @param basePath the path to create file
814   * @family the family name
815   * @qualifier the column qualifier
816   * @count the store file number
817   * @type the key type
818   */
819  private void createStoreFiles(Path basePath, String family, String qualifier, int count,
820      Type type, final Date date) throws IOException {
821    createStoreFiles(basePath, family, qualifier, count, type, false, date);
822  }
823
824  private void createStoreFiles(Path basePath, String family, String qualifier, int count,
825      Type type, boolean sameStartKey, final Date date) throws IOException {
826    HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
827    String startKey = "row_";
828    MobFileName mobFileName = null;
829    for (int i = 0; i < count; i++) {
830      byte[] startRow;
831      if (sameStartKey) {
832        // When creating multiple files under one partition, suffix needs to be different.
833        startRow = Bytes.toBytes(startKey);
834        mobSuffix = TEST_UTIL.getRandomUUID().toString().replaceAll("-", "");
835        delSuffix = TEST_UTIL.getRandomUUID().toString().replaceAll("-", "") + "_del";
836      } else {
837        startRow = Bytes.toBytes(startKey + i);
838      }
839      if(type.equals(Type.Delete)) {
840        mobFileName = MobFileName.create(startRow, MobUtils.formatDate(date), delSuffix);
841      }
842      if(type.equals(Type.Put)){
843        mobFileName = MobFileName.create(startRow, MobUtils.formatDate(date), mobSuffix);
844      }
845      StoreFileWriter mobFileWriter = new StoreFileWriter.Builder(conf, cacheConf, fs)
846      .withFileContext(meta).withFilePath(new Path(basePath, mobFileName.getFileName())).build();
847      writeStoreFile(mobFileWriter, startRow, Bytes.toBytes(family), Bytes.toBytes(qualifier),
848          type, (i+1)*1000);
849    }
850  }
851
852  /**
853   * Writes data to store file.
854   * @param writer the store file writer
855   * @param row the row key
856   * @param family the family name
857   * @param qualifier the column qualifier
858   * @param type the key type
859   * @param size the size of value
860   */
861  private static void writeStoreFile(final StoreFileWriter writer, byte[]row, byte[] family,
862      byte[] qualifier, Type type, int size) throws IOException {
863    long now = System.currentTimeMillis();
864    try {
865      byte[] dummyData = new byte[size];
866      new Random().nextBytes(dummyData);
867      writer.append(new KeyValue(row, family, qualifier, now, type, dummyData));
868    } finally {
869      writer.close();
870    }
871  }
872
873  /**
874   * Gets the number of del cell in the del files
875   * @param paths the del file paths
876   * @return the cell size
877   */
878  private int countDelCellsInDelFiles(List<Path> paths) throws IOException {
879    List<HStoreFile> sfs = new ArrayList<>();
880    int size = 0;
881    for (Path path : paths) {
882      HStoreFile sf = new HStoreFile(fs, path, conf, cacheConf, BloomType.NONE, true);
883      sfs.add(sf);
884    }
885    List<KeyValueScanner> scanners = new ArrayList<>(StoreFileScanner.getScannersForStoreFiles(sfs,
886      false, true, false, false, HConstants.LATEST_TIMESTAMP));
887    long timeToPurgeDeletes = Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0);
888    long ttl = HStore.determineTTLFromFamily(hcd);
889    ScanInfo scanInfo = new ScanInfo(conf, hcd, ttl, timeToPurgeDeletes, CellComparatorImpl.COMPARATOR);
890    StoreScanner scanner = new StoreScanner(scanInfo, ScanType.COMPACT_RETAIN_DELETES, scanners);
891    List<Cell> results = new ArrayList<>();
892    boolean hasMore = true;
893
894    while (hasMore) {
895      hasMore = scanner.next(results);
896      size += results.size();
897      results.clear();
898    }
899    scanner.close();
900    return size;
901  }
902
903  private static ExecutorService createThreadPool() {
904    int maxThreads = 10;
905    long keepAliveTime = 60;
906    final SynchronousQueue<Runnable> queue = new SynchronousQueue<>();
907    ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime,
908      TimeUnit.SECONDS, queue, Threads.newDaemonThreadFactory("MobFileCompactionChore"),
909      new RejectedExecutionHandler() {
910        @Override
911        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
912          try {
913            // waiting for a thread to pick up instead of throwing exceptions.
914            queue.put(r);
915          } catch (InterruptedException e) {
916            throw new RejectedExecutionException(e);
917          }
918        }
919      });
920    ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
921    return pool;
922  }
923
924  /**
925   * Resets the configuration.
926   */
927  private void resetConf() {
928    conf.setLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD,
929      MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD);
930    conf.setInt(MobConstants.MOB_DELFILE_MAX_COUNT, MobConstants.DEFAULT_MOB_DELFILE_MAX_COUNT);
931    conf.setInt(MobConstants.MOB_COMPACTION_BATCH_SIZE,
932      MobConstants.DEFAULT_MOB_COMPACTION_BATCH_SIZE);
933  }
934
935  /**
936   * The customized Distributed File System Implementation
937   */
938  static class FaultyDistributedFileSystem extends DistributedFileSystem {
939    private volatile boolean throwException = false;
940
941    public FaultyDistributedFileSystem() {
942      super();
943    }
944
945    public void setThrowException(boolean throwException) {
946      this.throwException = throwException;
947    }
948
949    @Override
950    public boolean rename(Path src, Path dst) throws IOException {
951      if (throwException) {
952        throw new IOException("No more files allowed");
953      }
954      return super.rename(src, dst);
955    }
956  }
957}