001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.compactions;
019
020import java.io.IOException;
021import java.util.List;
022import java.util.Map;
023import java.util.OptionalLong;
024import java.util.function.Consumer;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.Path;
027import org.apache.hadoop.hbase.regionserver.DateTieredMultiFileWriter;
028import org.apache.hadoop.hbase.regionserver.HStore;
029import org.apache.hadoop.hbase.regionserver.InternalScanner;
030import org.apache.hadoop.hbase.regionserver.StoreUtils;
031import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
032import org.apache.hadoop.hbase.security.User;
033import org.apache.yetus.audience.InterfaceAudience;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036
037/**
038 * This compactor will generate StoreFile for different time ranges.
039 */
040@InterfaceAudience.Private
041public class DateTieredCompactor extends AbstractMultiOutputCompactor<DateTieredMultiFileWriter> {
042
043  private static final Logger LOG = LoggerFactory.getLogger(DateTieredCompactor.class);
044
045  public DateTieredCompactor(Configuration conf, HStore store) {
046    super(conf, store);
047  }
048
049  protected boolean needEmptyFile(CompactionRequestImpl request) {
050    // if we are going to compact the last N files, then we need to emit an empty file to retain the
051    // maxSeqId if we haven't written out anything.
052    OptionalLong maxSeqId = StoreUtils.getMaxSequenceIdInList(request.getFiles());
053    OptionalLong storeMaxSeqId = store.getMaxSequenceId();
054    return maxSeqId.isPresent() && storeMaxSeqId.isPresent()
055      && maxSeqId.getAsLong() == storeMaxSeqId.getAsLong();
056  }
057
058  public List<Path> compact(final CompactionRequestImpl request, final List<Long> lowerBoundaries,
059    final Map<Long, String> lowerBoundariesPolicies, ThroughputController throughputController,
060    User user) throws IOException {
061    if (LOG.isDebugEnabled()) {
062      LOG.debug("Executing compaction with " + lowerBoundaries.size()
063        + "windows, lower boundaries: " + lowerBoundaries);
064    }
065
066    return compact(request, defaultScannerFactory,
067      new CellSinkFactory<DateTieredMultiFileWriter>() {
068
069        @Override
070        public DateTieredMultiFileWriter createWriter(InternalScanner scanner, FileDetails fd,
071          boolean shouldDropBehind, boolean major, Consumer<Path> writerCreationTracker)
072          throws IOException {
073          DateTieredMultiFileWriter writer =
074            createMultiWriter(request, lowerBoundaries, lowerBoundariesPolicies);
075          initMultiWriter(writer, scanner, fd, shouldDropBehind, major, writerCreationTracker);
076          return writer;
077        }
078      }, throughputController, user);
079  }
080
081  protected DateTieredMultiFileWriter createMultiWriter(final CompactionRequestImpl request,
082    final List<Long> lowerBoundaries, final Map<Long, String> lowerBoundariesPolicies) {
083    return new DateTieredMultiFileWriter(lowerBoundaries, lowerBoundariesPolicies,
084      needEmptyFile(request), c -> c.getTimestamp());
085  }
086
087  @Override
088  protected List<Path> commitWriter(DateTieredMultiFileWriter writer, FileDetails fd,
089    CompactionRequestImpl request) throws IOException {
090    List<Path> pathList =
091      writer.commitWriters(fd.maxSeqId, request.isAllFiles(), request.getFiles());
092    return pathList;
093  }
094
095}