View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver.compactions;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.Collection;
24  import java.util.List;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.conf.Configuration;
29  import org.apache.hadoop.hbase.classification.InterfaceAudience;
30  import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
31  import org.apache.hadoop.hbase.regionserver.StoreFile;
32  import org.apache.hadoop.hbase.regionserver.StoreUtils;
33  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
34  
35  /**
36   * 
37   * FIFO compaction policy selects only files which have all cells expired. 
38   * The column family MUST have non-default TTL. One of the use cases for this 
39   * policy is when we need to store raw data which will be post-processed later 
40   * and discarded completely after quite short period of time. Raw time-series vs. 
41   * time-based roll up aggregates and compacted time-series. We collect raw time-series
42   * and store them into CF with FIFO compaction policy, periodically we run task 
43   * which creates roll up aggregates and compacts time-series, the original raw data 
44   * can be discarded after that.
45   * 
46   */
47  @InterfaceAudience.Private
48  public class FIFOCompactionPolicy extends ExploringCompactionPolicy {
49    
50    private static final Log LOG = LogFactory.getLog(FIFOCompactionPolicy.class);
51  
52  
53    public FIFOCompactionPolicy(Configuration conf, StoreConfigInformation storeConfigInfo) {
54      super(conf, storeConfigInfo);
55    }
56  
57    @Override
58    public CompactionRequest selectCompaction(Collection<StoreFile> candidateFiles,
59        List<StoreFile> filesCompacting, boolean isUserCompaction, boolean mayUseOffPeak,
60        boolean forceMajor) throws IOException {
61      
62      if(forceMajor){
63        LOG.warn("Major compaction is not supported for FIFO compaction policy. Ignore the flag.");
64      }
65      boolean isAfterSplit = StoreUtils.hasReferences(candidateFiles);
66      if(isAfterSplit){
67        LOG.info("Split detected, delegate selection to the parent policy.");
68        return super.selectCompaction(candidateFiles, filesCompacting, isUserCompaction, 
69          mayUseOffPeak, forceMajor);
70      }
71      
72      // Nothing to compact
73      Collection<StoreFile> toCompact = getExpiredStores(candidateFiles, filesCompacting);
74      CompactionRequest result = new CompactionRequest(toCompact);
75      return result;
76    }
77  
78    @Override
79    public boolean isMajorCompaction(Collection<StoreFile> filesToCompact) throws IOException {
80      boolean isAfterSplit = StoreUtils.hasReferences(filesToCompact);
81      if(isAfterSplit){
82        LOG.info("Split detected, delegate to the parent policy.");
83        return super.isMajorCompaction(filesToCompact);
84      }
85      return false;
86    }
87  
88    @Override
89    public boolean needsCompaction(Collection<StoreFile> storeFiles, 
90        List<StoreFile> filesCompacting) {  
91      boolean isAfterSplit = StoreUtils.hasReferences(storeFiles);
92      if(isAfterSplit){
93        LOG.info("Split detected, delegate to the parent policy.");
94        return super.needsCompaction(storeFiles, filesCompacting);
95      }
96      return hasExpiredStores(storeFiles);
97    }
98  
99    /**
100    * The FIFOCompactionPolicy only choose those TTL expired HFiles as the compaction candidates. So
101    * if all HFiles are TTL expired, then the compaction will generate a new empty HFile. While its
102    * max timestamp will be Long.MAX_VALUE. If not considered separately, the HFile will never be
103    * archived because its TTL will be never expired. So we'll check the empty store file separately.
104    * (See HBASE-21504)
105    */
106   private boolean isEmptyStoreFile(StoreFile sf) {
107     return sf.getReader().getEntries() == 0;
108   }
109 
110   private boolean hasExpiredStores(Collection<StoreFile> files) {
111     long currentTime = EnvironmentEdgeManager.currentTime();
112     for (StoreFile sf : files) {
113       if (isEmptyStoreFile(sf)) {
114         return true;
115       }
116       // Check MIN_VERSIONS is in HStore removeUnneededFiles
117       Long maxTs = sf.getReader().getMaxTimestamp();
118       long maxTtl = storeConfigInfo.getStoreFileTtl();
119       if (maxTs == null || maxTtl == Long.MAX_VALUE || (currentTime - maxTtl < maxTs)) {
120         continue;
121       } else {
122         return true;
123       }
124     }
125     return false;
126   }
127 
128   private  Collection<StoreFile> getExpiredStores(Collection<StoreFile> files,
129       Collection<StoreFile> filesCompacting) {
130     long currentTime = EnvironmentEdgeManager.currentTime();
131     Collection<StoreFile> expiredStores = new ArrayList<StoreFile>();
132     for (StoreFile sf : files) {
133       if (isEmptyStoreFile(sf)) {
134         expiredStores.add(sf);
135         continue;
136       }
137       // Check MIN_VERSIONS is in HStore removeUnneededFiles
138       Long maxTs = sf.getReader().getMaxTimestamp();
139       long maxTtl = storeConfigInfo.getStoreFileTtl();
140       if (maxTs == null || maxTtl == Long.MAX_VALUE || (currentTime - maxTtl < maxTs)) {
141         continue;
142       } else if (filesCompacting == null || filesCompacting.contains(sf) == false) {
143         expiredStores.add(sf);
144       }
145     }
146     return expiredStores;
147   }
148 }