001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.compactions;
019
020import java.io.IOException;
021import java.util.List;
022import java.util.OptionalLong;
023
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.fs.Path;
026import org.apache.hadoop.hbase.regionserver.DateTieredMultiFileWriter;
027import org.apache.hadoop.hbase.regionserver.HStore;
028import org.apache.hadoop.hbase.regionserver.InternalScanner;
029import org.apache.hadoop.hbase.regionserver.StoreUtils;
030import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
031import org.apache.hadoop.hbase.security.User;
032import org.apache.yetus.audience.InterfaceAudience;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036/**
037 * This compactor will generate StoreFile for different time ranges.
038 */
039@InterfaceAudience.Private
040public class DateTieredCompactor extends AbstractMultiOutputCompactor<DateTieredMultiFileWriter> {
041
042  private static final Logger LOG = LoggerFactory.getLogger(DateTieredCompactor.class);
043
044  public DateTieredCompactor(Configuration conf, HStore store) {
045    super(conf, store);
046  }
047
048  private boolean needEmptyFile(CompactionRequestImpl request) {
049    // if we are going to compact the last N files, then we need to emit an empty file to retain the
050    // maxSeqId if we haven't written out anything.
051    OptionalLong maxSeqId = StoreUtils.getMaxSequenceIdInList(request.getFiles());
052    OptionalLong storeMaxSeqId = store.getMaxSequenceId();
053    return maxSeqId.isPresent() && storeMaxSeqId.isPresent() &&
054        maxSeqId.getAsLong() == storeMaxSeqId.getAsLong();
055  }
056
057  public List<Path> compact(final CompactionRequestImpl request, final List<Long> lowerBoundaries,
058      ThroughputController throughputController, User user) throws IOException {
059    if (LOG.isDebugEnabled()) {
060      LOG.debug("Executing compaction with " + lowerBoundaries.size()
061          + "windows, lower boundaries: " + lowerBoundaries);
062    }
063
064    return compact(request, defaultScannerFactory,
065      new CellSinkFactory<DateTieredMultiFileWriter>() {
066
067        @Override
068        public DateTieredMultiFileWriter createWriter(InternalScanner scanner, FileDetails fd,
069            boolean shouldDropBehind) throws IOException {
070          DateTieredMultiFileWriter writer = new DateTieredMultiFileWriter(lowerBoundaries,
071              needEmptyFile(request));
072          initMultiWriter(writer, scanner, fd, shouldDropBehind);
073          return writer;
074        }
075      }, throughputController, user);
076  }
077
078  @Override
079  protected List<Path> commitWriter(DateTieredMultiFileWriter writer, FileDetails fd,
080      CompactionRequestImpl request) throws IOException {
081    return writer.commitWriters(fd.maxSeqId, request.isAllFiles());
082  }
083}