001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.compactions;
019
020import java.io.IOException;
021import java.util.List;
022import java.util.Map;
023import java.util.OptionalLong;
024import java.util.function.Consumer;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.Path;
027import org.apache.hadoop.hbase.regionserver.DateTieredMultiFileWriter;
028import org.apache.hadoop.hbase.regionserver.HStore;
029import org.apache.hadoop.hbase.regionserver.InternalScanner;
030import org.apache.hadoop.hbase.regionserver.StoreUtils;
031import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
032import org.apache.hadoop.hbase.security.User;
033import org.apache.yetus.audience.InterfaceAudience;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036
037/**
038 * This compactor will generate StoreFile for different time ranges.
039 */
040@InterfaceAudience.Private
041public class DateTieredCompactor extends AbstractMultiOutputCompactor<DateTieredMultiFileWriter> {
042
043  private static final Logger LOG = LoggerFactory.getLogger(DateTieredCompactor.class);
044
045  public DateTieredCompactor(Configuration conf, HStore store) {
046    super(conf, store);
047  }
048
049  private boolean needEmptyFile(CompactionRequestImpl request) {
050    // if we are going to compact the last N files, then we need to emit an empty file to retain the
051    // maxSeqId if we haven't written out anything.
052    OptionalLong maxSeqId = StoreUtils.getMaxSequenceIdInList(request.getFiles());
053    OptionalLong storeMaxSeqId = store.getMaxSequenceId();
054    return maxSeqId.isPresent() && storeMaxSeqId.isPresent()
055      && maxSeqId.getAsLong() == storeMaxSeqId.getAsLong();
056  }
057
058  public List<Path> compact(final CompactionRequestImpl request, final List<Long> lowerBoundaries,
059    final Map<Long, String> lowerBoundariesPolicies, ThroughputController throughputController,
060    User user) throws IOException {
061    if (LOG.isDebugEnabled()) {
062      LOG.debug("Executing compaction with " + lowerBoundaries.size()
063        + "windows, lower boundaries: " + lowerBoundaries);
064    }
065
066    return compact(request, defaultScannerFactory,
067      new CellSinkFactory<DateTieredMultiFileWriter>() {
068
069        @Override
070        public DateTieredMultiFileWriter createWriter(InternalScanner scanner, FileDetails fd,
071          boolean shouldDropBehind, boolean major, Consumer<Path> writerCreationTracker)
072          throws IOException {
073          DateTieredMultiFileWriter writer = new DateTieredMultiFileWriter(lowerBoundaries,
074            lowerBoundariesPolicies, needEmptyFile(request));
075          initMultiWriter(writer, scanner, fd, shouldDropBehind, major, writerCreationTracker);
076          return writer;
077        }
078      }, throughputController, user);
079  }
080
081  @Override
082  protected List<Path> commitWriter(DateTieredMultiFileWriter writer, FileDetails fd,
083    CompactionRequestImpl request) throws IOException {
084    List<Path> pathList =
085      writer.commitWriters(fd.maxSeqId, request.isAllFiles(), request.getFiles());
086    return pathList;
087  }
088
089}