001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.mob.compactions;
020
021import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY;
022import static org.apache.hadoop.hbase.regionserver.HStoreFile.MOB_CELLS_COUNT;
023import static org.apache.hadoop.hbase.regionserver.HStoreFile.SKIP_RESET_SEQ_ID;
024
025import java.io.FileNotFoundException;
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.Calendar;
029import java.util.Collection;
030import java.util.Collections;
031import java.util.Comparator;
032import java.util.Date;
033import java.util.HashMap;
034import java.util.List;
035import java.util.Map;
036import java.util.Map.Entry;
037import java.util.NavigableMap;
038import java.util.Objects;
039import java.util.TreeMap;
040import java.util.concurrent.Callable;
041import java.util.concurrent.ExecutorService;
042import java.util.concurrent.Future;
043
044import org.apache.hadoop.conf.Configuration;
045import org.apache.hadoop.fs.FileStatus;
046import org.apache.hadoop.fs.FileSystem;
047import org.apache.hadoop.fs.Path;
048import org.apache.hadoop.hbase.ArrayBackedTag;
049import org.apache.hadoop.hbase.Cell;
050import org.apache.hadoop.hbase.CellComparator;
051import org.apache.hadoop.hbase.HConstants;
052import org.apache.hadoop.hbase.TableName;
053import org.apache.hadoop.hbase.Tag;
054import org.apache.hadoop.hbase.TagType;
055import org.apache.hadoop.hbase.TagUtil;
056import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
057import org.apache.hadoop.hbase.client.Connection;
058import org.apache.hadoop.hbase.client.ConnectionFactory;
059import org.apache.hadoop.hbase.client.MobCompactPartitionPolicy;
060import org.apache.hadoop.hbase.client.Table;
061import org.apache.hadoop.hbase.io.HFileLink;
062import org.apache.hadoop.hbase.io.crypto.Encryption;
063import org.apache.hadoop.hbase.io.hfile.CacheConfig;
064import org.apache.hadoop.hbase.io.hfile.HFile;
065import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
066import org.apache.hadoop.hbase.mob.MobConstants;
067import org.apache.hadoop.hbase.mob.MobFileName;
068import org.apache.hadoop.hbase.mob.MobUtils;
069import org.apache.hadoop.hbase.mob.compactions.MobCompactionRequest.CompactionType;
070import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionDelPartition;
071import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionDelPartitionId;
072import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartition;
073import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartitionId;
074import org.apache.hadoop.hbase.regionserver.BloomType;
075import org.apache.hadoop.hbase.regionserver.HStore;
076import org.apache.hadoop.hbase.regionserver.HStoreFile;
077import org.apache.hadoop.hbase.regionserver.ScanInfo;
078import org.apache.hadoop.hbase.regionserver.ScanType;
079import org.apache.hadoop.hbase.regionserver.ScannerContext;
080import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
081import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
082import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
083import org.apache.hadoop.hbase.regionserver.StoreScanner;
084import org.apache.hadoop.hbase.security.EncryptionUtil;
085import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
086import org.apache.hadoop.hbase.util.Bytes;
087import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
088import org.apache.hadoop.hbase.util.Pair;
089import org.apache.yetus.audience.InterfaceAudience;
090import org.slf4j.Logger;
091import org.slf4j.LoggerFactory;
092
093/**
094 * An implementation of {@link MobCompactor} that compacts the mob files in partitions.
095 */
096@InterfaceAudience.Private
097public class PartitionedMobCompactor extends MobCompactor {
098
099  private static final Logger LOG = LoggerFactory.getLogger(PartitionedMobCompactor.class);
100  protected long mergeableSize;
101  protected int delFileMaxCount;
102  /** The number of files compacted in a batch */
103  protected int compactionBatchSize;
104  protected int compactionKVMax;
105
106  private final Path tempPath;
107  private final Path bulkloadPath;
108  private final CacheConfig compactionCacheConfig;
109  private final byte[] refCellTags;
110  private Encryption.Context cryptoContext = Encryption.Context.NONE;
111
112  public PartitionedMobCompactor(Configuration conf, FileSystem fs, TableName tableName,
113                                 ColumnFamilyDescriptor column, ExecutorService pool) throws IOException {
114    super(conf, fs, tableName, column, pool);
115    mergeableSize = conf.getLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD,
116      MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD);
117    delFileMaxCount = conf.getInt(MobConstants.MOB_DELFILE_MAX_COUNT,
118      MobConstants.DEFAULT_MOB_DELFILE_MAX_COUNT);
119    // default is 100
120    compactionBatchSize = conf.getInt(MobConstants.MOB_COMPACTION_BATCH_SIZE,
121      MobConstants.DEFAULT_MOB_COMPACTION_BATCH_SIZE);
122    tempPath = new Path(MobUtils.getMobHome(conf), MobConstants.TEMP_DIR_NAME);
123    bulkloadPath = new Path(tempPath, new Path(MobConstants.BULKLOAD_DIR_NAME, new Path(
124      tableName.getNamespaceAsString(), tableName.getQualifierAsString())));
125    compactionKVMax = this.conf.getInt(HConstants.COMPACTION_KV_MAX,
126      HConstants.COMPACTION_KV_MAX_DEFAULT);
127    Configuration copyOfConf = new Configuration(conf);
128    copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
129    compactionCacheConfig = new CacheConfig(copyOfConf);
130    List<Tag> tags = new ArrayList<>(2);
131    tags.add(MobConstants.MOB_REF_TAG);
132    Tag tableNameTag = new ArrayBackedTag(TagType.MOB_TABLE_NAME_TAG_TYPE, tableName.getName());
133    tags.add(tableNameTag);
134    this.refCellTags = TagUtil.fromList(tags);
135    cryptoContext = EncryptionUtil.createEncryptionContext(copyOfConf, column);
136  }
137
138  @Override
139  public List<Path> compact(List<FileStatus> files, boolean allFiles) throws IOException {
140    if (files == null || files.isEmpty()) {
141      LOG.info("No candidate mob files");
142      return null;
143    }
144    LOG.info("is allFiles: " + allFiles);
145
146    // find the files to compact.
147    PartitionedMobCompactionRequest request = select(files, allFiles);
148    // compact the files.
149    return performCompaction(request);
150  }
151
152  /**
153   * Selects the compacted mob/del files.
154   * Iterates the candidates to find out all the del files and small mob files.
155   * @param candidates All the candidates.
156   * @param allFiles Whether add all mob files into the compaction.
157   * @return A compaction request.
158   * @throws IOException if IO failure is encountered
159   */
160  protected PartitionedMobCompactionRequest select(List<FileStatus> candidates,
161    boolean allFiles) throws IOException {
162    final Map<CompactionPartitionId, CompactionPartition> filesToCompact = new HashMap<>();
163    final CompactionPartitionId id = new CompactionPartitionId();
164    final NavigableMap<CompactionDelPartitionId, CompactionDelPartition> delFilesToCompact = new TreeMap<>();
165    final CompactionDelPartitionId delId = new CompactionDelPartitionId();
166    final ArrayList<CompactionDelPartition> allDelPartitions = new ArrayList<>();
167    int selectedFileCount = 0;
168    int irrelevantFileCount = 0;
169    int totalDelFiles = 0;
170    MobCompactPartitionPolicy policy = column.getMobCompactPartitionPolicy();
171
172    Calendar calendar =  Calendar.getInstance();
173    Date currentDate = new Date();
174    Date firstDayOfCurrentMonth = null;
175    Date firstDayOfCurrentWeek = null;
176
177    if (policy == MobCompactPartitionPolicy.MONTHLY) {
178      firstDayOfCurrentMonth = MobUtils.getFirstDayOfMonth(calendar, currentDate);
179      firstDayOfCurrentWeek = MobUtils.getFirstDayOfWeek(calendar, currentDate);
180    } else if (policy == MobCompactPartitionPolicy.WEEKLY) {
181      firstDayOfCurrentWeek = MobUtils.getFirstDayOfWeek(calendar, currentDate);
182    }
183
184    // We check if there is any del files so the logic can be optimized for the following processing
185    // First step is to check if there is any delete files. If there is any delete files,
186    // For each Partition, it needs to read its startKey and endKey from files.
187    // If there is no delete file, there is no need to read startKey and endKey from files, this
188    // is an optimization.
189    boolean withDelFiles = false;
190    for (FileStatus file : candidates) {
191      if (!file.isFile()) {
192        continue;
193      }
194      // group the del files and small files.
195      FileStatus linkedFile = file;
196      if (HFileLink.isHFileLink(file.getPath())) {
197        HFileLink link = HFileLink.buildFromHFileLinkPattern(conf, file.getPath());
198        linkedFile = getLinkedFileStatus(link);
199        if (linkedFile == null) {
200          continue;
201        }
202      }
203      if (StoreFileInfo.isDelFile(linkedFile.getPath())) {
204        withDelFiles = true;
205        break;
206      }
207    }
208
209    for (FileStatus file : candidates) {
210      if (!file.isFile()) {
211        irrelevantFileCount++;
212        continue;
213      }
214      // group the del files and small files.
215      FileStatus linkedFile = file;
216      if (HFileLink.isHFileLink(file.getPath())) {
217        HFileLink link = HFileLink.buildFromHFileLinkPattern(conf, file.getPath());
218        linkedFile = getLinkedFileStatus(link);
219        if (linkedFile == null) {
220          // If the linked file cannot be found, regard it as an irrelevantFileCount file
221          irrelevantFileCount++;
222          continue;
223        }
224      }
225      if (withDelFiles && StoreFileInfo.isDelFile(linkedFile.getPath())) {
226        // File in the Del Partition List
227
228        // Get delId from the file
229        try (Reader reader = HFile.createReader(fs, linkedFile.getPath(), conf)) {
230          delId.setStartKey(reader.getFirstRowKey().get());
231          delId.setEndKey(reader.getLastRowKey().get());
232        }
233        CompactionDelPartition delPartition = delFilesToCompact.get(delId);
234        if (delPartition == null) {
235          CompactionDelPartitionId newDelId =
236              new CompactionDelPartitionId(delId.getStartKey(), delId.getEndKey());
237          delPartition = new CompactionDelPartition(newDelId);
238          delFilesToCompact.put(newDelId, delPartition);
239        }
240        delPartition.addDelFile(file);
241        totalDelFiles ++;
242      } else {
243        String fileName = linkedFile.getPath().getName();
244        String date = MobFileName.getDateFromName(fileName);
245        boolean skipCompaction = MobUtils
246            .fillPartitionId(id, firstDayOfCurrentMonth, firstDayOfCurrentWeek, date, policy,
247                calendar, mergeableSize);
248        if (allFiles || (!skipCompaction && (linkedFile.getLen() < id.getThreshold()))) {
249          // add all files if allFiles is true,
250          // otherwise add the small files to the merge pool
251          // filter out files which are not supposed to be compacted with the
252          // current policy
253
254          id.setStartKey(MobFileName.getStartKeyFromName(fileName));
255          CompactionPartition compactionPartition = filesToCompact.get(id);
256          if (compactionPartition == null) {
257            CompactionPartitionId newId = new CompactionPartitionId(id.getStartKey(), id.getDate());
258            compactionPartition = new CompactionPartition(newId);
259            compactionPartition.addFile(file);
260            filesToCompact.put(newId, compactionPartition);
261            newId.updateLatestDate(date);
262          } else {
263            compactionPartition.addFile(file);
264            compactionPartition.getPartitionId().updateLatestDate(date);
265          }
266
267          if (withDelFiles) {
268            // get startKey and endKey from the file and update partition
269            // TODO: is it possible to skip read of most hfiles?
270            try (Reader reader = HFile.createReader(fs, linkedFile.getPath(), conf)) {
271              compactionPartition.setStartKey(reader.getFirstRowKey().get());
272              compactionPartition.setEndKey(reader.getLastRowKey().get());
273            }
274          }
275
276          selectedFileCount++;
277        }
278      }
279    }
280
281    /*
282     * Merge del files so there are only non-overlapped del file lists
283     */
284    for(Map.Entry<CompactionDelPartitionId, CompactionDelPartition> entry : delFilesToCompact.entrySet()) {
285      if (allDelPartitions.size() > 0) {
286        // check if the current key range overlaps the previous one
287        CompactionDelPartition prev = allDelPartitions.get(allDelPartitions.size() - 1);
288        if (Bytes.compareTo(prev.getId().getEndKey(), entry.getKey().getStartKey()) >= 0) {
289          // merge them together
290          prev.getId().setEndKey(entry.getValue().getId().getEndKey());
291          prev.addDelFileList(entry.getValue().listDelFiles());
292
293        } else {
294          allDelPartitions.add(entry.getValue());
295        }
296      } else {
297        allDelPartitions.add(entry.getValue());
298      }
299    }
300
301    PartitionedMobCompactionRequest request = new PartitionedMobCompactionRequest(
302      filesToCompact.values(), allDelPartitions);
303    if (candidates.size() == (totalDelFiles + selectedFileCount + irrelevantFileCount)) {
304      // all the files are selected
305      request.setCompactionType(CompactionType.ALL_FILES);
306    }
307    LOG.info("The compaction type is {}, the request has {} del files, {} selected files, and {} " +
308        "irrelevant files table '{}' and column '{}'", request.getCompactionType(), totalDelFiles,
309        selectedFileCount, irrelevantFileCount, tableName, column.getNameAsString());
310    return request;
311  }
312
313  /**
314   * Performs the compaction on the selected files.
315   * <ol>
316   * <li>Compacts the del files.</li>
317   * <li>Compacts the selected small mob files and all the del files.</li>
318   * <li>If all the candidates are selected, delete the del files.</li>
319   * </ol>
320   * @param request The compaction request.
321   * @return The paths of new mob files generated in the compaction.
322   * @throws IOException if IO failure is encountered
323   */
324  protected List<Path> performCompaction(PartitionedMobCompactionRequest request)
325    throws IOException {
326
327    // merge the del files, it is per del partition
328    for (CompactionDelPartition delPartition : request.getDelPartitions()) {
329      if (delPartition.getDelFileCount() <= 1) continue;
330      List<Path> newDelPaths = compactDelFiles(request, delPartition.listDelFiles());
331      delPartition.cleanDelFiles();
332      delPartition.addDelFileList(newDelPaths);
333    }
334
335    List<Path> paths = null;
336    int totalDelFileCount = 0;
337    try {
338      for (CompactionDelPartition delPartition : request.getDelPartitions()) {
339        for (Path newDelPath : delPartition.listDelFiles()) {
340          HStoreFile sf =
341              new HStoreFile(fs, newDelPath, conf, compactionCacheConfig, BloomType.NONE, true);
342          // pre-create reader of a del file to avoid race condition when opening the reader in each
343          // partition.
344          sf.initReader();
345          delPartition.addStoreFile(sf);
346          totalDelFileCount++;
347        }
348      }
349      LOG.info("After merging, there are {} del files. table='{}' column='{}'", totalDelFileCount,
350          tableName, column.getNameAsString());
351      // compact the mob files by partitions.
352      paths = compactMobFiles(request);
353      LOG.info("After compaction, there are {} mob files. table='{}' column='{}'", paths.size(),
354          tableName, column.getNameAsString());
355    } finally {
356      for (CompactionDelPartition delPartition : request.getDelPartitions()) {
357        closeStoreFileReaders(delPartition.getStoreFiles());
358      }
359    }
360
361    // archive the del files if all the mob files are selected.
362    if (request.type == CompactionType.ALL_FILES && !request.getDelPartitions().isEmpty()) {
363      LOG.info("After a mob compaction with all files selected, archiving the del files for " +
364          "table='{}' and column='{}'", tableName, column.getNameAsString());
365      for (CompactionDelPartition delPartition : request.getDelPartitions()) {
366        LOG.info(Objects.toString(delPartition.listDelFiles()));
367        try {
368          MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(),
369            delPartition.getStoreFiles());
370        } catch (IOException e) {
371          LOG.error("Failed to archive the del files {} for partition {} table='{}' and " +
372              "column='{}'", delPartition.getStoreFiles(), delPartition.getId(), tableName,
373              column.getNameAsString(), e);
374        }
375      }
376    }
377    return paths;
378  }
379
380  static class DelPartitionComparator implements Comparator<CompactionDelPartition> {
381    private boolean compareStartKey;
382
383    DelPartitionComparator(boolean compareStartKey) {
384      this.compareStartKey = compareStartKey;
385    }
386
387    public boolean getCompareStartKey() {
388      return this.compareStartKey;
389    }
390
391    public void setCompareStartKey(final boolean compareStartKey) {
392      this.compareStartKey = compareStartKey;
393    }
394
395    @Override
396    public int compare(CompactionDelPartition o1, CompactionDelPartition o2) {
397
398      if (compareStartKey) {
399        return Bytes.compareTo(o1.getId().getStartKey(), o2.getId().getStartKey());
400      } else {
401        return Bytes.compareTo(o1.getId().getEndKey(), o2.getId().getEndKey());
402      }
403    }
404  }
405
406  List<HStoreFile> getListOfDelFilesForPartition(final CompactionPartition partition,
407      final List<CompactionDelPartition> delPartitions) {
408    // Binary search for startKey and endKey
409
410    List<HStoreFile> result = new ArrayList<>();
411
412    DelPartitionComparator comparator = new DelPartitionComparator(false);
413    CompactionDelPartitionId id = new CompactionDelPartitionId(null, partition.getStartKey());
414    CompactionDelPartition target = new CompactionDelPartition(id);
415    int start = Collections.binarySearch(delPartitions, target, comparator);
416
417    // Get the start index for partition
418    if (start < 0) {
419      // Calculate the insert point
420      start = (start + 1) * (-1);
421      if (start == delPartitions.size()) {
422        // no overlap
423        return result;
424      } else {
425        // Check another case which has no overlap
426        if (Bytes.compareTo(partition.getEndKey(), delPartitions.get(start).getId().getStartKey()) < 0) {
427          return result;
428        }
429      }
430    }
431
432    // Search for end index for the partition
433    comparator.setCompareStartKey(true);
434    id.setStartKey(partition.getEndKey());
435    int end = Collections.binarySearch(delPartitions, target, comparator);
436
437    if (end < 0) {
438      end = (end + 1) * (-1);
439      if (end == 0) {
440        return result;
441      } else {
442        --end;
443        if (Bytes.compareTo(partition.getStartKey(), delPartitions.get(end).getId().getEndKey()) > 0) {
444          return result;
445        }
446      }
447    }
448
449    for (int i = start; i <= end; ++i) {
450        result.addAll(delPartitions.get(i).getStoreFiles());
451    }
452
453    return result;
454  }
455
456  /**
457   * Compacts the selected small mob files and all the del files.
458   * @param request The compaction request.
459   * @return The paths of new mob files after compactions.
460   * @throws IOException if IO failure is encountered
461   */
462  protected List<Path> compactMobFiles(final PartitionedMobCompactionRequest request)
463      throws IOException {
464    Collection<CompactionPartition> partitions = request.compactionPartitions;
465    if (partitions == null || partitions.isEmpty()) {
466      LOG.info("No partitions of mob files in table='{}' and column='{}'", tableName,
467          column.getNameAsString());
468      return Collections.emptyList();
469    }
470    List<Path> paths = new ArrayList<>();
471    final Connection c = ConnectionFactory.createConnection(conf);
472    final Table table = c.getTable(tableName);
473
474    try {
475      Map<CompactionPartitionId, Future<List<Path>>> results = new HashMap<>();
476      // compact the mob files by partitions in parallel.
477      for (final CompactionPartition partition : partitions) {
478
479        // How to efficiently come up a list of delFiles for one partition?
480        // Search the delPartitions and collect all the delFiles for the partition
481        // One optimization can do is that if there is no del file, we do not need to
482        // come up with startKey/endKey.
483        List<HStoreFile> delFiles = getListOfDelFilesForPartition(partition,
484            request.getDelPartitions());
485
486        results.put(partition.getPartitionId(), pool.submit(new Callable<List<Path>>() {
487          @Override
488          public List<Path> call() throws Exception {
489            LOG.info("Compacting mob files for partition {} for table='{}' and column='{}'",
490                partition.getPartitionId(), tableName, column.getNameAsString());
491            return compactMobFilePartition(request, partition, delFiles, c, table);
492          }
493        }));
494      }
495      // compact the partitions in parallel.
496      List<CompactionPartitionId> failedPartitions = new ArrayList<>();
497      for (Entry<CompactionPartitionId, Future<List<Path>>> result : results.entrySet()) {
498        try {
499          paths.addAll(result.getValue().get());
500        } catch (Exception e) {
501          // just log the error
502          LOG.error("Failed to compact the partition {} for table='{}' and column='{}'",
503              result.getKey(), tableName, column.getNameAsString(), e);
504          failedPartitions.add(result.getKey());
505        }
506      }
507      if (!failedPartitions.isEmpty()) {
508        // if any partition fails in the compaction, directly throw an exception.
509        throw new IOException("Failed to compact the partitions " + failedPartitions +
510            " for table='" + tableName + "' column='" + column.getNameAsString() + "'");
511      }
512    } finally {
513      try {
514        table.close();
515      } catch (IOException e) {
516        LOG.error("Failed to close the Table", e);
517      }
518    }
519    return paths;
520  }
521
522  /**
523   * Compacts a partition of selected small mob files and all the del files.
524   * @param request The compaction request.
525   * @param partition A compaction partition.
526   * @param delFiles The del files.
527   * @param connection The connection to use.
528   * @param table The current table.
529   * @return The paths of new mob files after compactions.
530   * @throws IOException if IO failure is encountered
531   */
532  private List<Path> compactMobFilePartition(PartitionedMobCompactionRequest request,
533                                             CompactionPartition partition,
534                                             List<HStoreFile> delFiles,
535                                             Connection connection,
536                                             Table table) throws IOException {
537    if (MobUtils.isMobFileExpired(column, EnvironmentEdgeManager.currentTime(),
538      partition.getPartitionId().getDate())) {
539      // If the files in the partition are expired, do not compact them and directly
540      // return an empty list.
541      return Collections.emptyList();
542    }
543    List<Path> newFiles = new ArrayList<>();
544    List<FileStatus> files = partition.listFiles();
545    int offset = 0;
546    Path bulkloadPathOfPartition = new Path(bulkloadPath, partition.getPartitionId().toString());
547    Path bulkloadColumnPath = new Path(bulkloadPathOfPartition, column.getNameAsString());
548    while (offset < files.size()) {
549      int batch = compactionBatchSize;
550      if (files.size() - offset < compactionBatchSize) {
551        batch = files.size() - offset;
552      }
553      if (batch == 1 && delFiles.isEmpty()) {
554        // only one file left and no del files, do not compact it,
555        // and directly add it to the new files.
556        newFiles.add(files.get(offset).getPath());
557        offset++;
558        continue;
559      }
560      // clean the bulkload directory to avoid loading old files.
561      fs.delete(bulkloadPathOfPartition, true);
562      // add the selected mob files and del files into filesToCompact
563      List<HStoreFile> filesToCompact = new ArrayList<>();
564      for (int i = offset; i < batch + offset; i++) {
565        HStoreFile sf = new HStoreFile(fs, files.get(i).getPath(), conf, compactionCacheConfig,
566            BloomType.NONE, true);
567        filesToCompact.add(sf);
568      }
569      filesToCompact.addAll(delFiles);
570      // compact the mob files in a batch.
571      compactMobFilesInBatch(request, partition, connection, table, filesToCompact, batch,
572        bulkloadPathOfPartition, bulkloadColumnPath, newFiles);
573      // move to the next batch.
574      offset += batch;
575    }
576    LOG.info("Compaction is finished. The number of mob files is changed from {} to {} for " +
577        "partition={} for table='{}' and column='{}'", files.size(), newFiles.size(),
578        partition.getPartitionId(), tableName, column.getNameAsString());
579    return newFiles;
580  }
581
582  /**
583   * Closes the readers of store files.
584   * @param storeFiles The store files to be closed.
585   */
586  private void closeStoreFileReaders(List<HStoreFile> storeFiles) {
587    for (HStoreFile storeFile : storeFiles) {
588      try {
589        storeFile.closeStoreFile(true);
590      } catch (IOException e) {
591        LOG.warn("Failed to close the reader on store file " + storeFile.getPath(), e);
592      }
593    }
594  }
595
596  /**
597   * Compacts a partition of selected small mob files and all the del files in a batch.
598   * @param request The compaction request.
599   * @param partition A compaction partition.
600   * @param connection To use for transport
601   * @param table The current table.
602   * @param filesToCompact The files to be compacted.
603   * @param batch The number of mob files to be compacted in a batch.
604   * @param bulkloadPathOfPartition The directory where the bulkload column of the current
605   *   partition is saved.
606   * @param bulkloadColumnPath The directory where the bulkload files of current partition
607   *   are saved.
608   * @param newFiles The paths of new mob files after compactions.
609   * @throws IOException if IO failure is encountered
610   */
611  private void compactMobFilesInBatch(PartitionedMobCompactionRequest request,
612                                      CompactionPartition partition,
613                                      Connection connection, Table table,
614                                      List<HStoreFile> filesToCompact, int batch,
615                                      Path bulkloadPathOfPartition, Path bulkloadColumnPath,
616                                      List<Path> newFiles)
617      throws IOException {
618    // open scanner to the selected mob files and del files.
619    StoreScanner scanner = createScanner(filesToCompact, ScanType.COMPACT_DROP_DELETES);
620    // the mob files to be compacted, not include the del files.
621    List<HStoreFile> mobFilesToCompact = filesToCompact.subList(0, batch);
622    // Pair(maxSeqId, cellsCount)
623    Pair<Long, Long> fileInfo = getFileInfo(mobFilesToCompact);
624    // open writers for the mob files and new ref store files.
625    StoreFileWriter writer = null;
626    StoreFileWriter refFileWriter = null;
627    Path filePath = null;
628    long mobCells = 0;
629    boolean cleanupTmpMobFile = false;
630    boolean cleanupBulkloadDirOfPartition = false;
631    boolean cleanupCommittedMobFile = false;
632    boolean closeReaders= true;
633
634    try {
635      try {
636        writer = MobUtils
637            .createWriter(conf, fs, column, partition.getPartitionId().getLatestDate(), tempPath,
638                Long.MAX_VALUE, column.getCompactionCompressionType(),
639                partition.getPartitionId().getStartKey(), compactionCacheConfig, cryptoContext,
640                true);
641        cleanupTmpMobFile = true;
642        filePath = writer.getPath();
643        byte[] fileName = Bytes.toBytes(filePath.getName());
644        // create a temp file and open a writer for it in the bulkloadPath
645        refFileWriter = MobUtils.createRefFileWriter(conf, fs, column, bulkloadColumnPath,
646            fileInfo.getSecond().longValue(), compactionCacheConfig, cryptoContext, true);
647        cleanupBulkloadDirOfPartition = true;
648        List<Cell> cells = new ArrayList<>();
649        boolean hasMore;
650        ScannerContext scannerContext =
651            ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
652        do {
653          hasMore = scanner.next(cells, scannerContext);
654          for (Cell cell : cells) {
655            // write the mob cell to the mob file.
656            writer.append(cell);
657            // write the new reference cell to the store file.
658            Cell reference = MobUtils.createMobRefCell(cell, fileName, this.refCellTags);
659            refFileWriter.append(reference);
660            mobCells++;
661          }
662          cells.clear();
663        } while (hasMore);
664      } finally {
665        // close the scanner.
666        scanner.close();
667
668        if (cleanupTmpMobFile) {
669          // append metadata to the mob file, and close the mob file writer.
670          closeMobFileWriter(writer, fileInfo.getFirst(), mobCells);
671        }
672
673        if (cleanupBulkloadDirOfPartition) {
674          // append metadata and bulkload info to the ref mob file, and close the writer.
675          closeRefFileWriter(refFileWriter, fileInfo.getFirst(), request.selectionTime);
676        }
677      }
678
679      if (mobCells > 0) {
680        // commit mob file
681        MobUtils.commitFile(conf, fs, filePath, mobFamilyDir, compactionCacheConfig);
682        cleanupTmpMobFile = false;
683        cleanupCommittedMobFile = true;
684        // bulkload the ref file
685        LOG.info("start MOB ref bulkload for partition {} table='{}' column='{}'",
686            partition.getPartitionId(), tableName, column.getNameAsString());
687        bulkloadRefFile(connection, table, bulkloadPathOfPartition, filePath.getName());
688        cleanupCommittedMobFile = false;
689        LOG.info("end MOB ref bulkload for partition {} table='{}' column='{}'",
690            partition.getPartitionId(), tableName, column.getNameAsString());
691        newFiles.add(new Path(mobFamilyDir, filePath.getName()));
692      }
693
694      // archive the old mob files, do not archive the del files.
695      try {
696        closeStoreFileReaders(mobFilesToCompact);
697        closeReaders = false;
698        MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), mobFilesToCompact);
699      } catch (IOException e) {
700        LOG.error("Failed to archive the files " + mobFilesToCompact, e);
701      }
702    } finally {
703      if (closeReaders) {
704        closeStoreFileReaders(mobFilesToCompact);
705      }
706
707      if (cleanupTmpMobFile) {
708        deletePath(filePath);
709      }
710
711      if (cleanupBulkloadDirOfPartition) {
712        // delete the bulkload files in bulkloadPath
713        deletePath(bulkloadPathOfPartition);
714      }
715
716      if (cleanupCommittedMobFile) {
717        LOG.error("failed MOB ref bulkload for partition {} table='{}' column='{}'",
718            partition.getPartitionId(), tableName, column.getNameAsString());
719        MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(),
720            Collections.singletonList(new HStoreFile(fs, new Path(mobFamilyDir, filePath.getName()),
721            conf, compactionCacheConfig, BloomType.NONE, true)));
722      }
723    }
724  }
725
726  /**
727   * Compacts the del files in batches which avoids opening too many files.
728   * @param request The compaction request.
729   * @param delFilePaths Del file paths to compact
730   * @return The paths of new del files after merging or the original files if no merging
731   *         is necessary.
732   * @throws IOException if IO failure is encountered
733   */
734  protected List<Path> compactDelFiles(PartitionedMobCompactionRequest request,
735    List<Path> delFilePaths) throws IOException {
736    if (delFilePaths.size() <= delFileMaxCount) {
737      return delFilePaths;
738    }
739    // when there are more del files than the number that is allowed, merge it firstly.
740    int offset = 0;
741    List<Path> paths = new ArrayList<>();
742    while (offset < delFilePaths.size()) {
743      // get the batch
744      int batch = compactionBatchSize;
745      if (delFilePaths.size() - offset < compactionBatchSize) {
746        batch = delFilePaths.size() - offset;
747      }
748      List<HStoreFile> batchedDelFiles = new ArrayList<>();
749      if (batch == 1) {
750        // only one file left, do not compact it, directly add it to the new files.
751        paths.add(delFilePaths.get(offset));
752        offset++;
753        continue;
754      }
755      for (int i = offset; i < batch + offset; i++) {
756        batchedDelFiles.add(new HStoreFile(fs, delFilePaths.get(i), conf, compactionCacheConfig,
757          BloomType.NONE, true));
758      }
759      // compact the del files in a batch.
760      paths.add(compactDelFilesInBatch(request, batchedDelFiles));
761      // move to the next batch.
762      offset += batch;
763    }
764    return compactDelFiles(request, paths);
765  }
766
767  /**
768   * Compacts the del file in a batch.
769   * @param request The compaction request.
770   * @param delFiles The del files.
771   * @return The path of new del file after merging.
772   * @throws IOException if IO failure is encountered
773   */
774  private Path compactDelFilesInBatch(PartitionedMobCompactionRequest request,
775    List<HStoreFile> delFiles) throws IOException {
776    // create a scanner for the del files.
777    StoreScanner scanner = createScanner(delFiles, ScanType.COMPACT_RETAIN_DELETES);
778    StoreFileWriter writer = null;
779    Path filePath = null;
780    try {
781      writer = MobUtils.createDelFileWriter(conf, fs, column,
782        MobUtils.formatDate(new Date(request.selectionTime)), tempPath, Long.MAX_VALUE,
783        column.getCompactionCompressionType(), HConstants.EMPTY_START_ROW, compactionCacheConfig,
784          cryptoContext);
785      filePath = writer.getPath();
786      List<Cell> cells = new ArrayList<>();
787      boolean hasMore;
788      ScannerContext scannerContext =
789              ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
790      do {
791        hasMore = scanner.next(cells, scannerContext);
792        for (Cell cell : cells) {
793          writer.append(cell);
794        }
795        cells.clear();
796      } while (hasMore);
797    } finally {
798      scanner.close();
799      if (writer != null) {
800        try {
801          writer.close();
802        } catch (IOException e) {
803          LOG.error("Failed to close the writer of the file " + filePath, e);
804        }
805      }
806    }
807    // commit the new del file
808    Path path = MobUtils.commitFile(conf, fs, filePath, mobFamilyDir, compactionCacheConfig);
809    // archive the old del files
810    try {
811      MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), delFiles);
812    } catch (IOException e) {
813      LOG.error("Failed to archive the old del files " + delFiles, e);
814    }
815    return path;
816  }
817
818  /**
819   * Creates a store scanner.
820   * @param filesToCompact The files to be compacted.
821   * @param scanType The scan type.
822   * @return The store scanner.
823   * @throws IOException if IO failure is encountered
824   */
825  private StoreScanner createScanner(List<HStoreFile> filesToCompact, ScanType scanType)
826      throws IOException {
827    List<StoreFileScanner> scanners = StoreFileScanner.getScannersForStoreFiles(filesToCompact,
828      false, true, false, false, HConstants.LATEST_TIMESTAMP);
829    long ttl = HStore.determineTTLFromFamily(column);
830    ScanInfo scanInfo = new ScanInfo(conf, column, ttl, 0, CellComparator.getInstance());
831    return new StoreScanner(scanInfo, scanType, scanners);
832  }
833
834  /**
835   * Bulkloads the current file.
836   *
837   * @param connection to use to get admin/RegionLocator
838   * @param table The current table.
839   * @param bulkloadDirectory The path of bulkload directory.
840   * @param fileName The current file name.
841   * @throws IOException if IO failure is encountered
842   */
843  private void bulkloadRefFile(Connection connection, Table table, Path bulkloadDirectory,
844      String fileName)
845      throws IOException {
846    // bulkload the ref file
847    try {
848      LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf);
849      bulkload.disableReplication();
850      bulkload.doBulkLoad(bulkloadDirectory, connection.getAdmin(), table,
851          connection.getRegionLocator(table.getName()));
852    } catch (Exception e) {
853      throw new IOException(e);
854    }
855  }
856
857  /**
858   * Closes the mob file writer.
859   * @param writer The mob file writer.
860   * @param maxSeqId Maximum sequence id.
861   * @param mobCellsCount The number of mob cells.
862   * @throws IOException if IO failure is encountered
863   */
864  private void closeMobFileWriter(StoreFileWriter writer, long maxSeqId, long mobCellsCount)
865    throws IOException {
866    if (writer != null) {
867      writer.appendMetadata(maxSeqId, false, mobCellsCount);
868      try {
869        writer.close();
870      } catch (IOException e) {
871        LOG.error("Failed to close the writer of the file " + writer.getPath(), e);
872      }
873    }
874  }
875
876  /**
877   * Closes the ref file writer.
878   * @param writer The ref file writer.
879   * @param maxSeqId Maximum sequence id.
880   * @param bulkloadTime The timestamp at which the bulk load file is created.
881   * @throws IOException if IO failure is encountered
882   */
883  private void closeRefFileWriter(StoreFileWriter writer, long maxSeqId, long bulkloadTime)
884    throws IOException {
885    if (writer != null) {
886      writer.appendMetadata(maxSeqId, false);
887      writer.appendFileInfo(BULKLOAD_TIME_KEY, Bytes.toBytes(bulkloadTime));
888      writer.appendFileInfo(SKIP_RESET_SEQ_ID, Bytes.toBytes(true));
889      try {
890        writer.close();
891      } catch (IOException e) {
892        LOG.error("Failed to close the writer of the ref file " + writer.getPath(), e);
893      }
894    }
895  }
896
897  /**
898   * Gets the max seqId and number of cells of the store files.
899   * @param storeFiles The store files.
900   * @return The pair of the max seqId and number of cells of the store files.
901   * @throws IOException if IO failure is encountered
902   */
903  private Pair<Long, Long> getFileInfo(List<HStoreFile> storeFiles) throws IOException {
904    long maxSeqId = 0;
905    long maxKeyCount = 0;
906    for (HStoreFile sf : storeFiles) {
907      // the readers will be closed later after the merge.
908      maxSeqId = Math.max(maxSeqId, sf.getMaxSequenceId());
909      sf.initReader();
910      byte[] count = sf.getReader().loadFileInfo().get(MOB_CELLS_COUNT);
911      if (count != null) {
912        maxKeyCount += Bytes.toLong(count);
913      }
914    }
915    return new Pair<>(maxSeqId, maxKeyCount);
916  }
917
918  /**
919   * Deletes a file.
920   * @param path The path of the file to be deleted.
921   */
922  private void deletePath(Path path) {
923    LOG.debug("Cleanup, delete path '{}'", path);
924    try {
925      if (path != null) {
926        fs.delete(path, true);
927      }
928    } catch (IOException e) {
929      LOG.error("Failed to delete the file " + path, e);
930    }
931  }
932
933  private FileStatus getLinkedFileStatus(HFileLink link) throws IOException {
934    Path[] locations = link.getLocations();
935    FileStatus file;
936    for (Path location : locations) {
937
938      if (location != null) {
939        try {
940          file = fs.getFileStatus(location);
941          if (file != null) {
942            return file;
943          }
944        }  catch (FileNotFoundException e) {
945        }
946      }
947    }
948    LOG.warn("The file " + link + " links to can not be found");
949    return null;
950  }
951}