001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.mob.compactions;
020
021import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY;
022import static org.apache.hadoop.hbase.regionserver.HStoreFile.MOB_CELLS_COUNT;
023import static org.apache.hadoop.hbase.regionserver.HStoreFile.SKIP_RESET_SEQ_ID;
024
025import java.io.FileNotFoundException;
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.Calendar;
029import java.util.Collection;
030import java.util.Collections;
031import java.util.Comparator;
032import java.util.Date;
033import java.util.HashMap;
034import java.util.List;
035import java.util.Map;
036import java.util.Map.Entry;
037import java.util.NavigableMap;
038import java.util.Objects;
039import java.util.TreeMap;
040import java.util.concurrent.Callable;
041import java.util.concurrent.ExecutorService;
042import java.util.concurrent.Future;
043
044import org.apache.hadoop.conf.Configuration;
045import org.apache.hadoop.fs.FileStatus;
046import org.apache.hadoop.fs.FileSystem;
047import org.apache.hadoop.fs.Path;
048import org.apache.hadoop.hbase.ArrayBackedTag;
049import org.apache.hadoop.hbase.Cell;
050import org.apache.hadoop.hbase.CellComparator;
051import org.apache.hadoop.hbase.HConstants;
052import org.apache.hadoop.hbase.TableName;
053import org.apache.hadoop.hbase.Tag;
054import org.apache.hadoop.hbase.TagType;
055import org.apache.hadoop.hbase.TagUtil;
056import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
057import org.apache.hadoop.hbase.client.Connection;
058import org.apache.hadoop.hbase.client.ConnectionFactory;
059import org.apache.hadoop.hbase.client.MobCompactPartitionPolicy;
060import org.apache.hadoop.hbase.client.Table;
061import org.apache.hadoop.hbase.io.HFileLink;
062import org.apache.hadoop.hbase.io.crypto.Encryption;
063import org.apache.hadoop.hbase.io.hfile.CacheConfig;
064import org.apache.hadoop.hbase.io.hfile.HFile;
065import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
066import org.apache.hadoop.hbase.mob.MobConstants;
067import org.apache.hadoop.hbase.mob.MobFileName;
068import org.apache.hadoop.hbase.mob.MobUtils;
069import org.apache.hadoop.hbase.mob.compactions.MobCompactionRequest.CompactionType;
070import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionDelPartition;
071import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionDelPartitionId;
072import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartition;
073import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartitionId;
074import org.apache.hadoop.hbase.regionserver.BloomType;
075import org.apache.hadoop.hbase.regionserver.HStore;
076import org.apache.hadoop.hbase.regionserver.HStoreFile;
077import org.apache.hadoop.hbase.regionserver.ScanInfo;
078import org.apache.hadoop.hbase.regionserver.ScanType;
079import org.apache.hadoop.hbase.regionserver.ScannerContext;
080import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
081import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
082import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
083import org.apache.hadoop.hbase.regionserver.StoreScanner;
084import org.apache.hadoop.hbase.security.EncryptionUtil;
085import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
086import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
087import org.apache.hadoop.hbase.util.Bytes;
088import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
089import org.apache.hadoop.hbase.util.Pair;
090import org.apache.yetus.audience.InterfaceAudience;
091import org.slf4j.Logger;
092import org.slf4j.LoggerFactory;
093
094/**
095 * An implementation of {@link MobCompactor} that compacts the mob files in partitions.
096 */
097@InterfaceAudience.Private
098public class PartitionedMobCompactor extends MobCompactor {
099
100  private static final Logger LOG = LoggerFactory.getLogger(PartitionedMobCompactor.class);
101  protected long mergeableSize;
102  protected int delFileMaxCount;
103  /** The number of files compacted in a batch */
104  protected int compactionBatchSize;
105  protected int compactionKVMax;
106
107  private final Path tempPath;
108  private final Path bulkloadPath;
109  private final CacheConfig compactionCacheConfig;
110  private final byte[] refCellTags;
111  private Encryption.Context cryptoContext = Encryption.Context.NONE;
112
113  public PartitionedMobCompactor(Configuration conf, FileSystem fs, TableName tableName,
114                                 ColumnFamilyDescriptor column, ExecutorService pool) throws IOException {
115    super(conf, fs, tableName, column, pool);
116    mergeableSize = conf.getLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD,
117      MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD);
118    delFileMaxCount = conf.getInt(MobConstants.MOB_DELFILE_MAX_COUNT,
119      MobConstants.DEFAULT_MOB_DELFILE_MAX_COUNT);
120    // default is 100
121    compactionBatchSize = conf.getInt(MobConstants.MOB_COMPACTION_BATCH_SIZE,
122      MobConstants.DEFAULT_MOB_COMPACTION_BATCH_SIZE);
123    tempPath = new Path(MobUtils.getMobHome(conf), MobConstants.TEMP_DIR_NAME);
124    bulkloadPath = new Path(tempPath, new Path(MobConstants.BULKLOAD_DIR_NAME, new Path(
125      tableName.getNamespaceAsString(), tableName.getQualifierAsString())));
126    compactionKVMax = this.conf.getInt(HConstants.COMPACTION_KV_MAX,
127      HConstants.COMPACTION_KV_MAX_DEFAULT);
128    Configuration copyOfConf = new Configuration(conf);
129    copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
130    compactionCacheConfig = new CacheConfig(copyOfConf);
131    List<Tag> tags = new ArrayList<>(2);
132    tags.add(MobConstants.MOB_REF_TAG);
133    Tag tableNameTag = new ArrayBackedTag(TagType.MOB_TABLE_NAME_TAG_TYPE, tableName.getName());
134    tags.add(tableNameTag);
135    this.refCellTags = TagUtil.fromList(tags);
136    cryptoContext = EncryptionUtil.createEncryptionContext(copyOfConf, column);
137  }
138
139  @Override
140  public List<Path> compact(List<FileStatus> files, boolean allFiles) throws IOException {
141    if (files == null || files.isEmpty()) {
142      LOG.info("No candidate mob files");
143      return null;
144    }
145    LOG.info("is allFiles: " + allFiles);
146
147    // find the files to compact.
148    PartitionedMobCompactionRequest request = select(files, allFiles);
149    // compact the files.
150    return performCompaction(request);
151  }
152
153  /**
154   * Selects the compacted mob/del files.
155   * Iterates the candidates to find out all the del files and small mob files.
156   * @param candidates All the candidates.
157   * @param allFiles Whether add all mob files into the compaction.
158   * @return A compaction request.
159   * @throws IOException if IO failure is encountered
160   */
161  protected PartitionedMobCompactionRequest select(List<FileStatus> candidates,
162    boolean allFiles) throws IOException {
163    final Map<CompactionPartitionId, CompactionPartition> filesToCompact = new HashMap<>();
164    final CompactionPartitionId id = new CompactionPartitionId();
165    final NavigableMap<CompactionDelPartitionId, CompactionDelPartition> delFilesToCompact = new TreeMap<>();
166    final CompactionDelPartitionId delId = new CompactionDelPartitionId();
167    final ArrayList<CompactionDelPartition> allDelPartitions = new ArrayList<>();
168    int selectedFileCount = 0;
169    int irrelevantFileCount = 0;
170    int totalDelFiles = 0;
171    MobCompactPartitionPolicy policy = column.getMobCompactPartitionPolicy();
172
173    Calendar calendar =  Calendar.getInstance();
174    Date currentDate = new Date();
175    Date firstDayOfCurrentMonth = null;
176    Date firstDayOfCurrentWeek = null;
177
178    if (policy == MobCompactPartitionPolicy.MONTHLY) {
179      firstDayOfCurrentMonth = MobUtils.getFirstDayOfMonth(calendar, currentDate);
180      firstDayOfCurrentWeek = MobUtils.getFirstDayOfWeek(calendar, currentDate);
181    } else if (policy == MobCompactPartitionPolicy.WEEKLY) {
182      firstDayOfCurrentWeek = MobUtils.getFirstDayOfWeek(calendar, currentDate);
183    }
184
185    // We check if there is any del files so the logic can be optimized for the following processing
186    // First step is to check if there is any delete files. If there is any delete files,
187    // For each Partition, it needs to read its startKey and endKey from files.
188    // If there is no delete file, there is no need to read startKey and endKey from files, this
189    // is an optimization.
190    boolean withDelFiles = false;
191    for (FileStatus file : candidates) {
192      if (!file.isFile()) {
193        continue;
194      }
195      // group the del files and small files.
196      FileStatus linkedFile = file;
197      if (HFileLink.isHFileLink(file.getPath())) {
198        HFileLink link = HFileLink.buildFromHFileLinkPattern(conf, file.getPath());
199        linkedFile = getLinkedFileStatus(link);
200        if (linkedFile == null) {
201          continue;
202        }
203      }
204      if (StoreFileInfo.isDelFile(linkedFile.getPath())) {
205        withDelFiles = true;
206        break;
207      }
208    }
209
210    for (FileStatus file : candidates) {
211      if (!file.isFile()) {
212        irrelevantFileCount++;
213        continue;
214      }
215      // group the del files and small files.
216      FileStatus linkedFile = file;
217      if (HFileLink.isHFileLink(file.getPath())) {
218        HFileLink link = HFileLink.buildFromHFileLinkPattern(conf, file.getPath());
219        linkedFile = getLinkedFileStatus(link);
220        if (linkedFile == null) {
221          // If the linked file cannot be found, regard it as an irrelevantFileCount file
222          irrelevantFileCount++;
223          continue;
224        }
225      }
226      if (withDelFiles && StoreFileInfo.isDelFile(linkedFile.getPath())) {
227        // File in the Del Partition List
228
229        // Get delId from the file
230        try (Reader reader = HFile.createReader(fs, linkedFile.getPath(), conf)) {
231          delId.setStartKey(reader.getFirstRowKey().get());
232          delId.setEndKey(reader.getLastRowKey().get());
233        }
234        CompactionDelPartition delPartition = delFilesToCompact.get(delId);
235        if (delPartition == null) {
236          CompactionDelPartitionId newDelId =
237              new CompactionDelPartitionId(delId.getStartKey(), delId.getEndKey());
238          delPartition = new CompactionDelPartition(newDelId);
239          delFilesToCompact.put(newDelId, delPartition);
240        }
241        delPartition.addDelFile(file);
242        totalDelFiles ++;
243      } else {
244        String fileName = linkedFile.getPath().getName();
245        String date = MobFileName.getDateFromName(fileName);
246        boolean skipCompaction = MobUtils
247            .fillPartitionId(id, firstDayOfCurrentMonth, firstDayOfCurrentWeek, date, policy,
248                calendar, mergeableSize);
249        if (allFiles || (!skipCompaction && (linkedFile.getLen() < id.getThreshold()))) {
250          // add all files if allFiles is true,
251          // otherwise add the small files to the merge pool
252          // filter out files which are not supposed to be compacted with the
253          // current policy
254
255          id.setStartKey(MobFileName.getStartKeyFromName(fileName));
256          CompactionPartition compactionPartition = filesToCompact.get(id);
257          if (compactionPartition == null) {
258            CompactionPartitionId newId = new CompactionPartitionId(id.getStartKey(), id.getDate());
259            compactionPartition = new CompactionPartition(newId);
260            compactionPartition.addFile(file);
261            filesToCompact.put(newId, compactionPartition);
262            newId.updateLatestDate(date);
263          } else {
264            compactionPartition.addFile(file);
265            compactionPartition.getPartitionId().updateLatestDate(date);
266          }
267
268          if (withDelFiles) {
269            // get startKey and endKey from the file and update partition
270            // TODO: is it possible to skip read of most hfiles?
271            try (Reader reader = HFile.createReader(fs, linkedFile.getPath(), conf)) {
272              compactionPartition.setStartKey(reader.getFirstRowKey().get());
273              compactionPartition.setEndKey(reader.getLastRowKey().get());
274            }
275          }
276
277          selectedFileCount++;
278        }
279      }
280    }
281
282    /*
283     * Merge del files so there are only non-overlapped del file lists
284     */
285    for(Map.Entry<CompactionDelPartitionId, CompactionDelPartition> entry : delFilesToCompact.entrySet()) {
286      if (allDelPartitions.size() > 0) {
287        // check if the current key range overlaps the previous one
288        CompactionDelPartition prev = allDelPartitions.get(allDelPartitions.size() - 1);
289        if (Bytes.compareTo(prev.getId().getEndKey(), entry.getKey().getStartKey()) >= 0) {
290          // merge them together
291          prev.getId().setEndKey(entry.getValue().getId().getEndKey());
292          prev.addDelFileList(entry.getValue().listDelFiles());
293
294        } else {
295          allDelPartitions.add(entry.getValue());
296        }
297      } else {
298        allDelPartitions.add(entry.getValue());
299      }
300    }
301
302    PartitionedMobCompactionRequest request = new PartitionedMobCompactionRequest(
303      filesToCompact.values(), allDelPartitions);
304    if (candidates.size() == (totalDelFiles + selectedFileCount + irrelevantFileCount)) {
305      // all the files are selected
306      request.setCompactionType(CompactionType.ALL_FILES);
307    }
308    LOG.info("The compaction type is {}, the request has {} del files, {} selected files, and {} " +
309        "irrelevant files table '{}' and column '{}'", request.getCompactionType(), totalDelFiles,
310        selectedFileCount, irrelevantFileCount, tableName, column.getNameAsString());
311    return request;
312  }
313
314  /**
315   * Performs the compaction on the selected files.
316   * <ol>
317   * <li>Compacts the del files.</li>
318   * <li>Compacts the selected small mob files and all the del files.</li>
319   * <li>If all the candidates are selected, delete the del files.</li>
320   * </ol>
321   * @param request The compaction request.
322   * @return The paths of new mob files generated in the compaction.
323   * @throws IOException if IO failure is encountered
324   */
325  protected List<Path> performCompaction(PartitionedMobCompactionRequest request)
326    throws IOException {
327
328    // merge the del files, it is per del partition
329    for (CompactionDelPartition delPartition : request.getDelPartitions()) {
330      if (delPartition.getDelFileCount() <= 1) continue;
331      List<Path> newDelPaths = compactDelFiles(request, delPartition.listDelFiles());
332      delPartition.cleanDelFiles();
333      delPartition.addDelFileList(newDelPaths);
334    }
335
336    List<Path> paths = null;
337    int totalDelFileCount = 0;
338    try {
339      for (CompactionDelPartition delPartition : request.getDelPartitions()) {
340        for (Path newDelPath : delPartition.listDelFiles()) {
341          HStoreFile sf =
342              new HStoreFile(fs, newDelPath, conf, compactionCacheConfig, BloomType.NONE, true);
343          // pre-create reader of a del file to avoid race condition when opening the reader in each
344          // partition.
345          sf.initReader();
346          delPartition.addStoreFile(sf);
347          totalDelFileCount++;
348        }
349      }
350      LOG.info("After merging, there are {} del files. table='{}' column='{}'", totalDelFileCount,
351          tableName, column.getNameAsString());
352      // compact the mob files by partitions.
353      paths = compactMobFiles(request);
354      LOG.info("After compaction, there are {} mob files. table='{}' column='{}'", paths.size(),
355          tableName, column.getNameAsString());
356    } finally {
357      for (CompactionDelPartition delPartition : request.getDelPartitions()) {
358        closeStoreFileReaders(delPartition.getStoreFiles());
359      }
360    }
361
362    // archive the del files if all the mob files are selected.
363    if (request.type == CompactionType.ALL_FILES && !request.getDelPartitions().isEmpty()) {
364      LOG.info("After a mob compaction with all files selected, archiving the del files for " +
365          "table='{}' and column='{}'", tableName, column.getNameAsString());
366      for (CompactionDelPartition delPartition : request.getDelPartitions()) {
367        LOG.info(Objects.toString(delPartition.listDelFiles()));
368        try {
369          MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(),
370            delPartition.getStoreFiles());
371        } catch (IOException e) {
372          LOG.error("Failed to archive the del files {} for partition {} table='{}' and " +
373              "column='{}'", delPartition.getStoreFiles(), delPartition.getId(), tableName,
374              column.getNameAsString(), e);
375        }
376      }
377    }
378    return paths;
379  }
380
381  static class DelPartitionComparator implements Comparator<CompactionDelPartition> {
382    private boolean compareStartKey;
383
384    DelPartitionComparator(boolean compareStartKey) {
385      this.compareStartKey = compareStartKey;
386    }
387
388    public boolean getCompareStartKey() {
389      return this.compareStartKey;
390    }
391
392    public void setCompareStartKey(final boolean compareStartKey) {
393      this.compareStartKey = compareStartKey;
394    }
395
396    @Override
397    public int compare(CompactionDelPartition o1, CompactionDelPartition o2) {
398
399      if (compareStartKey) {
400        return Bytes.compareTo(o1.getId().getStartKey(), o2.getId().getStartKey());
401      } else {
402        return Bytes.compareTo(o1.getId().getEndKey(), o2.getId().getEndKey());
403      }
404    }
405  }
406
407  @VisibleForTesting
408  List<HStoreFile> getListOfDelFilesForPartition(final CompactionPartition partition,
409      final List<CompactionDelPartition> delPartitions) {
410    // Binary search for startKey and endKey
411
412    List<HStoreFile> result = new ArrayList<>();
413
414    DelPartitionComparator comparator = new DelPartitionComparator(false);
415    CompactionDelPartitionId id = new CompactionDelPartitionId(null, partition.getStartKey());
416    CompactionDelPartition target = new CompactionDelPartition(id);
417    int start = Collections.binarySearch(delPartitions, target, comparator);
418
419    // Get the start index for partition
420    if (start < 0) {
421      // Calculate the insert point
422      start = (start + 1) * (-1);
423      if (start == delPartitions.size()) {
424        // no overlap
425        return result;
426      } else {
427        // Check another case which has no overlap
428        if (Bytes.compareTo(partition.getEndKey(), delPartitions.get(start).getId().getStartKey()) < 0) {
429          return result;
430        }
431      }
432    }
433
434    // Search for end index for the partition
435    comparator.setCompareStartKey(true);
436    id.setStartKey(partition.getEndKey());
437    int end = Collections.binarySearch(delPartitions, target, comparator);
438
439    if (end < 0) {
440      end = (end + 1) * (-1);
441      if (end == 0) {
442        return result;
443      } else {
444        --end;
445        if (Bytes.compareTo(partition.getStartKey(), delPartitions.get(end).getId().getEndKey()) > 0) {
446          return result;
447        }
448      }
449    }
450
451    for (int i = start; i <= end; ++i) {
452        result.addAll(delPartitions.get(i).getStoreFiles());
453    }
454
455    return result;
456  }
457
458  /**
459   * Compacts the selected small mob files and all the del files.
460   * @param request The compaction request.
461   * @return The paths of new mob files after compactions.
462   * @throws IOException if IO failure is encountered
463   */
464  protected List<Path> compactMobFiles(final PartitionedMobCompactionRequest request)
465      throws IOException {
466    Collection<CompactionPartition> partitions = request.compactionPartitions;
467    if (partitions == null || partitions.isEmpty()) {
468      LOG.info("No partitions of mob files in table='{}' and column='{}'", tableName,
469          column.getNameAsString());
470      return Collections.emptyList();
471    }
472    List<Path> paths = new ArrayList<>();
473    final Connection c = ConnectionFactory.createConnection(conf);
474    final Table table = c.getTable(tableName);
475
476    try {
477      Map<CompactionPartitionId, Future<List<Path>>> results = new HashMap<>();
478      // compact the mob files by partitions in parallel.
479      for (final CompactionPartition partition : partitions) {
480
481        // How to efficiently come up a list of delFiles for one partition?
482        // Search the delPartitions and collect all the delFiles for the partition
483        // One optimization can do is that if there is no del file, we do not need to
484        // come up with startKey/endKey.
485        List<HStoreFile> delFiles = getListOfDelFilesForPartition(partition,
486            request.getDelPartitions());
487
488        results.put(partition.getPartitionId(), pool.submit(new Callable<List<Path>>() {
489          @Override
490          public List<Path> call() throws Exception {
491            LOG.info("Compacting mob files for partition {} for table='{}' and column='{}'",
492                partition.getPartitionId(), tableName, column.getNameAsString());
493            return compactMobFilePartition(request, partition, delFiles, c, table);
494          }
495        }));
496      }
497      // compact the partitions in parallel.
498      List<CompactionPartitionId> failedPartitions = new ArrayList<>();
499      for (Entry<CompactionPartitionId, Future<List<Path>>> result : results.entrySet()) {
500        try {
501          paths.addAll(result.getValue().get());
502        } catch (Exception e) {
503          // just log the error
504          LOG.error("Failed to compact the partition {} for table='{}' and column='{}'",
505              result.getKey(), tableName, column.getNameAsString(), e);
506          failedPartitions.add(result.getKey());
507        }
508      }
509      if (!failedPartitions.isEmpty()) {
510        // if any partition fails in the compaction, directly throw an exception.
511        throw new IOException("Failed to compact the partitions " + failedPartitions +
512            " for table='" + tableName + "' column='" + column.getNameAsString() + "'");
513      }
514    } finally {
515      try {
516        table.close();
517      } catch (IOException e) {
518        LOG.error("Failed to close the Table", e);
519      }
520    }
521    return paths;
522  }
523
524  /**
525   * Compacts a partition of selected small mob files and all the del files.
526   * @param request The compaction request.
527   * @param partition A compaction partition.
528   * @param delFiles The del files.
529   * @param connection The connection to use.
530   * @param table The current table.
531   * @return The paths of new mob files after compactions.
532   * @throws IOException if IO failure is encountered
533   */
534  private List<Path> compactMobFilePartition(PartitionedMobCompactionRequest request,
535                                             CompactionPartition partition,
536                                             List<HStoreFile> delFiles,
537                                             Connection connection,
538                                             Table table) throws IOException {
539    if (MobUtils.isMobFileExpired(column, EnvironmentEdgeManager.currentTime(),
540      partition.getPartitionId().getDate())) {
541      // If the files in the partition are expired, do not compact them and directly
542      // return an empty list.
543      return Collections.emptyList();
544    }
545    List<Path> newFiles = new ArrayList<>();
546    List<FileStatus> files = partition.listFiles();
547    int offset = 0;
548    Path bulkloadPathOfPartition = new Path(bulkloadPath, partition.getPartitionId().toString());
549    Path bulkloadColumnPath = new Path(bulkloadPathOfPartition, column.getNameAsString());
550    while (offset < files.size()) {
551      int batch = compactionBatchSize;
552      if (files.size() - offset < compactionBatchSize) {
553        batch = files.size() - offset;
554      }
555      if (batch == 1 && delFiles.isEmpty()) {
556        // only one file left and no del files, do not compact it,
557        // and directly add it to the new files.
558        newFiles.add(files.get(offset).getPath());
559        offset++;
560        continue;
561      }
562      // clean the bulkload directory to avoid loading old files.
563      fs.delete(bulkloadPathOfPartition, true);
564      // add the selected mob files and del files into filesToCompact
565      List<HStoreFile> filesToCompact = new ArrayList<>();
566      for (int i = offset; i < batch + offset; i++) {
567        HStoreFile sf = new HStoreFile(fs, files.get(i).getPath(), conf, compactionCacheConfig,
568            BloomType.NONE, true);
569        filesToCompact.add(sf);
570      }
571      filesToCompact.addAll(delFiles);
572      // compact the mob files in a batch.
573      compactMobFilesInBatch(request, partition, connection, table, filesToCompact, batch,
574        bulkloadPathOfPartition, bulkloadColumnPath, newFiles);
575      // move to the next batch.
576      offset += batch;
577    }
578    LOG.info("Compaction is finished. The number of mob files is changed from {} to {} for " +
579        "partition={} for table='{}' and column='{}'", files.size(), newFiles.size(),
580        partition.getPartitionId(), tableName, column.getNameAsString());
581    return newFiles;
582  }
583
584  /**
585   * Closes the readers of store files.
586   * @param storeFiles The store files to be closed.
587   */
588  private void closeStoreFileReaders(List<HStoreFile> storeFiles) {
589    for (HStoreFile storeFile : storeFiles) {
590      try {
591        storeFile.closeStoreFile(true);
592      } catch (IOException e) {
593        LOG.warn("Failed to close the reader on store file " + storeFile.getPath(), e);
594      }
595    }
596  }
597
598  /**
599   * Compacts a partition of selected small mob files and all the del files in a batch.
600   * @param request The compaction request.
601   * @param partition A compaction partition.
602   * @param connection To use for transport
603   * @param table The current table.
604   * @param filesToCompact The files to be compacted.
605   * @param batch The number of mob files to be compacted in a batch.
606   * @param bulkloadPathOfPartition The directory where the bulkload column of the current
607   *   partition is saved.
608   * @param bulkloadColumnPath The directory where the bulkload files of current partition
609   *   are saved.
610   * @param newFiles The paths of new mob files after compactions.
611   * @throws IOException if IO failure is encountered
612   */
613  private void compactMobFilesInBatch(PartitionedMobCompactionRequest request,
614                                      CompactionPartition partition,
615                                      Connection connection, Table table,
616                                      List<HStoreFile> filesToCompact, int batch,
617                                      Path bulkloadPathOfPartition, Path bulkloadColumnPath,
618                                      List<Path> newFiles)
619      throws IOException {
620    // open scanner to the selected mob files and del files.
621    StoreScanner scanner = createScanner(filesToCompact, ScanType.COMPACT_DROP_DELETES);
622    // the mob files to be compacted, not include the del files.
623    List<HStoreFile> mobFilesToCompact = filesToCompact.subList(0, batch);
624    // Pair(maxSeqId, cellsCount)
625    Pair<Long, Long> fileInfo = getFileInfo(mobFilesToCompact);
626    // open writers for the mob files and new ref store files.
627    StoreFileWriter writer = null;
628    StoreFileWriter refFileWriter = null;
629    Path filePath = null;
630    long mobCells = 0;
631    boolean cleanupTmpMobFile = false;
632    boolean cleanupBulkloadDirOfPartition = false;
633    boolean cleanupCommittedMobFile = false;
634    boolean closeReaders= true;
635
636    try {
637      try {
638        writer = MobUtils
639            .createWriter(conf, fs, column, partition.getPartitionId().getLatestDate(), tempPath,
640                Long.MAX_VALUE, column.getCompactionCompressionType(),
641                partition.getPartitionId().getStartKey(), compactionCacheConfig, cryptoContext,
642                true);
643        cleanupTmpMobFile = true;
644        filePath = writer.getPath();
645        byte[] fileName = Bytes.toBytes(filePath.getName());
646        // create a temp file and open a writer for it in the bulkloadPath
647        refFileWriter = MobUtils.createRefFileWriter(conf, fs, column, bulkloadColumnPath,
648            fileInfo.getSecond().longValue(), compactionCacheConfig, cryptoContext, true);
649        cleanupBulkloadDirOfPartition = true;
650        List<Cell> cells = new ArrayList<>();
651        boolean hasMore;
652        ScannerContext scannerContext =
653            ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
654        do {
655          hasMore = scanner.next(cells, scannerContext);
656          for (Cell cell : cells) {
657            // write the mob cell to the mob file.
658            writer.append(cell);
659            // write the new reference cell to the store file.
660            Cell reference = MobUtils.createMobRefCell(cell, fileName, this.refCellTags);
661            refFileWriter.append(reference);
662            mobCells++;
663          }
664          cells.clear();
665        } while (hasMore);
666      } finally {
667        // close the scanner.
668        scanner.close();
669
670        if (cleanupTmpMobFile) {
671          // append metadata to the mob file, and close the mob file writer.
672          closeMobFileWriter(writer, fileInfo.getFirst(), mobCells);
673        }
674
675        if (cleanupBulkloadDirOfPartition) {
676          // append metadata and bulkload info to the ref mob file, and close the writer.
677          closeRefFileWriter(refFileWriter, fileInfo.getFirst(), request.selectionTime);
678        }
679      }
680
681      if (mobCells > 0) {
682        // commit mob file
683        MobUtils.commitFile(conf, fs, filePath, mobFamilyDir, compactionCacheConfig);
684        cleanupTmpMobFile = false;
685        cleanupCommittedMobFile = true;
686        // bulkload the ref file
687        LOG.info("start MOB ref bulkload for partition {} table='{}' column='{}'",
688            partition.getPartitionId(), tableName, column.getNameAsString());
689        bulkloadRefFile(connection, table, bulkloadPathOfPartition, filePath.getName());
690        cleanupCommittedMobFile = false;
691        LOG.info("end MOB ref bulkload for partition {} table='{}' column='{}'",
692            partition.getPartitionId(), tableName, column.getNameAsString());
693        newFiles.add(new Path(mobFamilyDir, filePath.getName()));
694      }
695
696      // archive the old mob files, do not archive the del files.
697      try {
698        closeStoreFileReaders(mobFilesToCompact);
699        closeReaders = false;
700        MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), mobFilesToCompact);
701      } catch (IOException e) {
702        LOG.error("Failed to archive the files " + mobFilesToCompact, e);
703      }
704    } finally {
705      if (closeReaders) {
706        closeStoreFileReaders(mobFilesToCompact);
707      }
708
709      if (cleanupTmpMobFile) {
710        deletePath(filePath);
711      }
712
713      if (cleanupBulkloadDirOfPartition) {
714        // delete the bulkload files in bulkloadPath
715        deletePath(bulkloadPathOfPartition);
716      }
717
718      if (cleanupCommittedMobFile) {
719        LOG.error("failed MOB ref bulkload for partition {} table='{}' column='{}'",
720            partition.getPartitionId(), tableName, column.getNameAsString());
721        MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(),
722            Collections.singletonList(new HStoreFile(fs, new Path(mobFamilyDir, filePath.getName()),
723            conf, compactionCacheConfig, BloomType.NONE, true)));
724      }
725    }
726  }
727
728  /**
729   * Compacts the del files in batches which avoids opening too many files.
730   * @param request The compaction request.
731   * @param delFilePaths Del file paths to compact
732   * @return The paths of new del files after merging or the original files if no merging
733   *         is necessary.
734   * @throws IOException if IO failure is encountered
735   */
736  protected List<Path> compactDelFiles(PartitionedMobCompactionRequest request,
737    List<Path> delFilePaths) throws IOException {
738    if (delFilePaths.size() <= delFileMaxCount) {
739      return delFilePaths;
740    }
741    // when there are more del files than the number that is allowed, merge it firstly.
742    int offset = 0;
743    List<Path> paths = new ArrayList<>();
744    while (offset < delFilePaths.size()) {
745      // get the batch
746      int batch = compactionBatchSize;
747      if (delFilePaths.size() - offset < compactionBatchSize) {
748        batch = delFilePaths.size() - offset;
749      }
750      List<HStoreFile> batchedDelFiles = new ArrayList<>();
751      if (batch == 1) {
752        // only one file left, do not compact it, directly add it to the new files.
753        paths.add(delFilePaths.get(offset));
754        offset++;
755        continue;
756      }
757      for (int i = offset; i < batch + offset; i++) {
758        batchedDelFiles.add(new HStoreFile(fs, delFilePaths.get(i), conf, compactionCacheConfig,
759          BloomType.NONE, true));
760      }
761      // compact the del files in a batch.
762      paths.add(compactDelFilesInBatch(request, batchedDelFiles));
763      // move to the next batch.
764      offset += batch;
765    }
766    return compactDelFiles(request, paths);
767  }
768
769  /**
770   * Compacts the del file in a batch.
771   * @param request The compaction request.
772   * @param delFiles The del files.
773   * @return The path of new del file after merging.
774   * @throws IOException if IO failure is encountered
775   */
776  private Path compactDelFilesInBatch(PartitionedMobCompactionRequest request,
777    List<HStoreFile> delFiles) throws IOException {
778    // create a scanner for the del files.
779    StoreScanner scanner = createScanner(delFiles, ScanType.COMPACT_RETAIN_DELETES);
780    StoreFileWriter writer = null;
781    Path filePath = null;
782    try {
783      writer = MobUtils.createDelFileWriter(conf, fs, column,
784        MobUtils.formatDate(new Date(request.selectionTime)), tempPath, Long.MAX_VALUE,
785        column.getCompactionCompressionType(), HConstants.EMPTY_START_ROW, compactionCacheConfig,
786          cryptoContext);
787      filePath = writer.getPath();
788      List<Cell> cells = new ArrayList<>();
789      boolean hasMore;
790      ScannerContext scannerContext =
791              ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
792      do {
793        hasMore = scanner.next(cells, scannerContext);
794        for (Cell cell : cells) {
795          writer.append(cell);
796        }
797        cells.clear();
798      } while (hasMore);
799    } finally {
800      scanner.close();
801      if (writer != null) {
802        try {
803          writer.close();
804        } catch (IOException e) {
805          LOG.error("Failed to close the writer of the file " + filePath, e);
806        }
807      }
808    }
809    // commit the new del file
810    Path path = MobUtils.commitFile(conf, fs, filePath, mobFamilyDir, compactionCacheConfig);
811    // archive the old del files
812    try {
813      MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), delFiles);
814    } catch (IOException e) {
815      LOG.error("Failed to archive the old del files " + delFiles, e);
816    }
817    return path;
818  }
819
820  /**
821   * Creates a store scanner.
822   * @param filesToCompact The files to be compacted.
823   * @param scanType The scan type.
824   * @return The store scanner.
825   * @throws IOException if IO failure is encountered
826   */
827  private StoreScanner createScanner(List<HStoreFile> filesToCompact, ScanType scanType)
828      throws IOException {
829    List<StoreFileScanner> scanners = StoreFileScanner.getScannersForStoreFiles(filesToCompact,
830      false, true, false, false, HConstants.LATEST_TIMESTAMP);
831    long ttl = HStore.determineTTLFromFamily(column);
832    ScanInfo scanInfo = new ScanInfo(conf, column, ttl, 0, CellComparator.getInstance());
833    return new StoreScanner(scanInfo, scanType, scanners);
834  }
835
836  /**
837   * Bulkloads the current file.
838   *
839   * @param connection to use to get admin/RegionLocator
840   * @param table The current table.
841   * @param bulkloadDirectory The path of bulkload directory.
842   * @param fileName The current file name.
843   * @throws IOException if IO failure is encountered
844   */
845  private void bulkloadRefFile(Connection connection, Table table, Path bulkloadDirectory,
846      String fileName)
847      throws IOException {
848    // bulkload the ref file
849    try {
850      LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf);
851      bulkload.disableReplication();
852      bulkload.doBulkLoad(bulkloadDirectory, connection.getAdmin(), table,
853          connection.getRegionLocator(table.getName()));
854    } catch (Exception e) {
855      throw new IOException(e);
856    }
857  }
858
859  /**
860   * Closes the mob file writer.
861   * @param writer The mob file writer.
862   * @param maxSeqId Maximum sequence id.
863   * @param mobCellsCount The number of mob cells.
864   * @throws IOException if IO failure is encountered
865   */
866  private void closeMobFileWriter(StoreFileWriter writer, long maxSeqId, long mobCellsCount)
867    throws IOException {
868    if (writer != null) {
869      writer.appendMetadata(maxSeqId, false, mobCellsCount);
870      try {
871        writer.close();
872      } catch (IOException e) {
873        LOG.error("Failed to close the writer of the file " + writer.getPath(), e);
874      }
875    }
876  }
877
878  /**
879   * Closes the ref file writer.
880   * @param writer The ref file writer.
881   * @param maxSeqId Maximum sequence id.
882   * @param bulkloadTime The timestamp at which the bulk load file is created.
883   * @throws IOException if IO failure is encountered
884   */
885  private void closeRefFileWriter(StoreFileWriter writer, long maxSeqId, long bulkloadTime)
886    throws IOException {
887    if (writer != null) {
888      writer.appendMetadata(maxSeqId, false);
889      writer.appendFileInfo(BULKLOAD_TIME_KEY, Bytes.toBytes(bulkloadTime));
890      writer.appendFileInfo(SKIP_RESET_SEQ_ID, Bytes.toBytes(true));
891      try {
892        writer.close();
893      } catch (IOException e) {
894        LOG.error("Failed to close the writer of the ref file " + writer.getPath(), e);
895      }
896    }
897  }
898
899  /**
900   * Gets the max seqId and number of cells of the store files.
901   * @param storeFiles The store files.
902   * @return The pair of the max seqId and number of cells of the store files.
903   * @throws IOException if IO failure is encountered
904   */
905  private Pair<Long, Long> getFileInfo(List<HStoreFile> storeFiles) throws IOException {
906    long maxSeqId = 0;
907    long maxKeyCount = 0;
908    for (HStoreFile sf : storeFiles) {
909      // the readers will be closed later after the merge.
910      maxSeqId = Math.max(maxSeqId, sf.getMaxSequenceId());
911      sf.initReader();
912      byte[] count = sf.getReader().loadFileInfo().get(MOB_CELLS_COUNT);
913      if (count != null) {
914        maxKeyCount += Bytes.toLong(count);
915      }
916    }
917    return new Pair<>(maxSeqId, maxKeyCount);
918  }
919
920  /**
921   * Deletes a file.
922   * @param path The path of the file to be deleted.
923   */
924  private void deletePath(Path path) {
925    LOG.debug("Cleanup, delete path '{}'", path);
926    try {
927      if (path != null) {
928        fs.delete(path, true);
929      }
930    } catch (IOException e) {
931      LOG.error("Failed to delete the file " + path, e);
932    }
933  }
934
935  private FileStatus getLinkedFileStatus(HFileLink link) throws IOException {
936    Path[] locations = link.getLocations();
937    FileStatus file;
938    for (Path location : locations) {
939
940      if (location != null) {
941        try {
942          file = fs.getFileStatus(location);
943          if (file != null) {
944            return file;
945          }
946        }  catch (FileNotFoundException e) {
947        }
948      }
949    }
950    LOG.warn("The file " + link + " links to can not be found");
951    return null;
952  }
953}