001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.regionserver.compactions;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.List;
025
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.regionserver.HStoreFile;
028import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
029import org.apache.hadoop.hbase.regionserver.StoreUtils;
030import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
031import org.apache.yetus.audience.InterfaceAudience;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035/**
036 * 
037 * FIFO compaction policy selects only files which have all cells expired. 
038 * The column family MUST have non-default TTL. One of the use cases for this 
039 * policy is when we need to store raw data which will be post-processed later 
040 * and discarded completely after quite short period of time. Raw time-series vs. 
041 * time-based roll up aggregates and compacted time-series. We collect raw time-series
042 * and store them into CF with FIFO compaction policy, periodically we run task 
043 * which creates roll up aggregates and compacts time-series, the original raw data 
044 * can be discarded after that.
045 * 
046 */
047@InterfaceAudience.Private
048public class FIFOCompactionPolicy extends ExploringCompactionPolicy {
049  
050  private static final Logger LOG = LoggerFactory.getLogger(FIFOCompactionPolicy.class);
051
052
053  public FIFOCompactionPolicy(Configuration conf, StoreConfigInformation storeConfigInfo) {
054    super(conf, storeConfigInfo);
055  }
056
057  @Override
058  public CompactionRequestImpl selectCompaction(Collection<HStoreFile> candidateFiles,
059      List<HStoreFile> filesCompacting, boolean isUserCompaction, boolean mayUseOffPeak,
060      boolean forceMajor) throws IOException {
061    if(forceMajor){
062      LOG.warn("Major compaction is not supported for FIFO compaction policy. Ignore the flag.");
063    }
064    boolean isAfterSplit = StoreUtils.hasReferences(candidateFiles);
065    if(isAfterSplit){
066      LOG.info("Split detected, delegate selection to the parent policy.");
067      return super.selectCompaction(candidateFiles, filesCompacting, isUserCompaction, 
068        mayUseOffPeak, forceMajor);
069    }
070
071    // Nothing to compact
072    Collection<HStoreFile> toCompact = getExpiredStores(candidateFiles, filesCompacting);
073    CompactionRequestImpl result = new CompactionRequestImpl(toCompact);
074    return result;
075  }
076
077  @Override
078  public boolean shouldPerformMajorCompaction(Collection<HStoreFile> filesToCompact)
079    throws IOException {
080    boolean isAfterSplit = StoreUtils.hasReferences(filesToCompact);
081    if(isAfterSplit){
082      LOG.info("Split detected, delegate to the parent policy.");
083      return super.shouldPerformMajorCompaction(filesToCompact);
084    }
085    return false;
086  }
087
088  @Override
089  public boolean needsCompaction(Collection<HStoreFile> storeFiles,
090      List<HStoreFile> filesCompacting) {
091    boolean isAfterSplit = StoreUtils.hasReferences(storeFiles);
092    if(isAfterSplit){
093      LOG.info("Split detected, delegate to the parent policy.");
094      return super.needsCompaction(storeFiles, filesCompacting);
095    }
096    return hasExpiredStores(storeFiles);
097  }
098
099  /**
100   * The FIFOCompactionPolicy only choose those TTL expired HFiles as the compaction candidates. So
101   * if all HFiles are TTL expired, then the compaction will generate a new empty HFile. While its
102   * max timestamp will be Long.MAX_VALUE. If not considered separately, the HFile will never be
103   * archived because its TTL will be never expired. So we'll check the empty store file separately.
104   * (See HBASE-21504)
105   */
106  private boolean isEmptyStoreFile(HStoreFile sf) {
107    return sf.getReader().getEntries() == 0;
108  }
109
110  private boolean hasExpiredStores(Collection<HStoreFile> files) {
111    long currentTime = EnvironmentEdgeManager.currentTime();
112    for (HStoreFile sf : files) {
113      if (isEmptyStoreFile(sf)) {
114        return true;
115      }
116      // Check MIN_VERSIONS is in HStore removeUnneededFiles
117      long maxTs = sf.getReader().getMaxTimestamp();
118      long maxTtl = storeConfigInfo.getStoreFileTtl();
119      if (maxTtl == Long.MAX_VALUE || (currentTime - maxTtl < maxTs)) {
120        continue;
121      } else {
122        return true;
123      }
124    }
125    return false;
126  }
127
128  private Collection<HStoreFile> getExpiredStores(Collection<HStoreFile> files,
129      Collection<HStoreFile> filesCompacting) {
130    long currentTime = EnvironmentEdgeManager.currentTime();
131    Collection<HStoreFile> expiredStores = new ArrayList<>();
132    for (HStoreFile sf : files) {
133      if (isEmptyStoreFile(sf)) {
134        expiredStores.add(sf);
135        continue;
136      }
137      // Check MIN_VERSIONS is in HStore removeUnneededFiles
138      long maxTs = sf.getReader().getMaxTimestamp();
139      long maxTtl = storeConfigInfo.getStoreFileTtl();
140      if (maxTtl == Long.MAX_VALUE || (currentTime - maxTtl < maxTs)) {
141        continue;
142      } else if (filesCompacting == null || !filesCompacting.contains(sf)) {
143        expiredStores.add(sf);
144      }
145    }
146    return expiredStores;
147  }
148}