001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.compactions;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collection;
023import java.util.List;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.regionserver.HStoreFile;
026import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
027import org.apache.hadoop.hbase.regionserver.StoreUtils;
028import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
029import org.apache.yetus.audience.InterfaceAudience;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033/**
034 * FIFO compaction policy selects only files which have all cells expired. The column family MUST
035 * have non-default TTL. One of the use cases for this policy is when we need to store raw data
036 * which will be post-processed later and discarded completely after quite short period of time. Raw
037 * time-series vs. time-based roll up aggregates and compacted time-series. We collect raw
038 * time-series and store them into CF with FIFO compaction policy, periodically we run task which
039 * creates roll up aggregates and compacts time-series, the original raw data can be discarded after
040 * that.
041 */
042@InterfaceAudience.Private
043public class FIFOCompactionPolicy extends ExploringCompactionPolicy {
044
045  private static final Logger LOG = LoggerFactory.getLogger(FIFOCompactionPolicy.class);
046
047  public FIFOCompactionPolicy(Configuration conf, StoreConfigInformation storeConfigInfo) {
048    super(conf, storeConfigInfo);
049  }
050
051  @Override
052  public CompactionRequestImpl selectCompaction(Collection<HStoreFile> candidateFiles,
053    List<HStoreFile> filesCompacting, boolean isUserCompaction, boolean mayUseOffPeak,
054    boolean forceMajor) throws IOException {
055    if (forceMajor) {
056      LOG.warn("Major compaction is not supported for FIFO compaction policy. Ignore the flag.");
057    }
058    boolean isAfterSplit = StoreUtils.hasReferences(candidateFiles);
059    if (isAfterSplit) {
060      LOG.info("Split detected, delegate selection to the parent policy.");
061      return super.selectCompaction(candidateFiles, filesCompacting, isUserCompaction,
062        mayUseOffPeak, forceMajor);
063    }
064
065    // Nothing to compact
066    Collection<HStoreFile> toCompact = getExpiredStores(candidateFiles, filesCompacting);
067    CompactionRequestImpl result = new CompactionRequestImpl(toCompact);
068    return result;
069  }
070
071  @Override
072  public boolean shouldPerformMajorCompaction(Collection<HStoreFile> filesToCompact)
073    throws IOException {
074    boolean isAfterSplit = StoreUtils.hasReferences(filesToCompact);
075    if (isAfterSplit) {
076      LOG.info("Split detected, delegate to the parent policy.");
077      return super.shouldPerformMajorCompaction(filesToCompact);
078    }
079    return false;
080  }
081
082  @Override
083  public boolean needsCompaction(Collection<HStoreFile> storeFiles,
084    List<HStoreFile> filesCompacting) {
085    boolean isAfterSplit = StoreUtils.hasReferences(storeFiles);
086    if (isAfterSplit) {
087      LOG.info("Split detected, delegate to the parent policy.");
088      return super.needsCompaction(storeFiles, filesCompacting);
089    }
090    return hasExpiredStores(storeFiles);
091  }
092
093  /**
094   * The FIFOCompactionPolicy only choose the TTL expired store files as the compaction candidates.
095   * If all the store files are TTL expired, then the compaction will generate a new empty file.
096   * While its max timestamp will be Long.MAX_VALUE. If not considered separately, the store file
097   * will never be archived because its TTL will be never expired. So we'll check the empty store
098   * file separately (See HBASE-21504).
099   */
100  private boolean isEmptyStoreFile(HStoreFile sf) {
101    return sf.getReader().getEntries() == 0;
102  }
103
104  private boolean hasExpiredStores(Collection<HStoreFile> files) {
105    long currentTime = EnvironmentEdgeManager.currentTime();
106    for (HStoreFile sf : files) {
107      if (isEmptyStoreFile(sf)) {
108        return true;
109      }
110      // Check MIN_VERSIONS is in HStore removeUnneededFiles
111      long maxTs = sf.getReader().getMaxTimestamp();
112      long maxTtl = storeConfigInfo.getStoreFileTtl();
113      if (maxTtl == Long.MAX_VALUE || (currentTime - maxTtl < maxTs)) {
114        continue;
115      } else {
116        return true;
117      }
118    }
119    return false;
120  }
121
122  private Collection<HStoreFile> getExpiredStores(Collection<HStoreFile> files,
123    Collection<HStoreFile> filesCompacting) {
124    long currentTime = EnvironmentEdgeManager.currentTime();
125    Collection<HStoreFile> expiredStores = new ArrayList<>();
126    for (HStoreFile sf : files) {
127      if (isEmptyStoreFile(sf) && !filesCompacting.contains(sf)) {
128        expiredStores.add(sf);
129        continue;
130      }
131      // Check MIN_VERSIONS is in HStore removeUnneededFiles
132      long maxTs = sf.getReader().getMaxTimestamp();
133      long maxTtl = storeConfigInfo.getStoreFileTtl();
134      if (maxTtl == Long.MAX_VALUE || (currentTime - maxTtl < maxTs)) {
135        continue;
136      } else if (filesCompacting == null || !filesCompacting.contains(sf)) {
137        expiredStores.add(sf);
138      }
139    }
140    return expiredStores;
141  }
142}