View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver.compactions;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.Collection;
24  import java.util.List;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.conf.Configuration;
29  import org.apache.hadoop.hbase.classification.InterfaceAudience;
30  import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
31  import org.apache.hadoop.hbase.regionserver.StoreFile;
32  import org.apache.hadoop.hbase.regionserver.StoreUtils;
33  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
34  
35  /**
36   * 
37   * FIFO compaction policy selects only files which have all cells expired. 
38   * The column family MUST have non-default TTL. One of the use cases for this 
39   * policy is when we need to store raw data which will be post-processed later 
40   * and discarded completely after quite short period of time. Raw time-series vs. 
41   * time-based roll up aggregates and compacted time-series. We collect raw time-series
42   * and store them into CF with FIFO compaction policy, periodically we run task 
43   * which creates roll up aggregates and compacts time-series, the original raw data 
44   * can be discarded after that.
45   * 
46   */
47  @InterfaceAudience.Private
48  public class FIFOCompactionPolicy extends ExploringCompactionPolicy {
49    
50    private static final Log LOG = LogFactory.getLog(FIFOCompactionPolicy.class);
51  
52  
53    public FIFOCompactionPolicy(Configuration conf, StoreConfigInformation storeConfigInfo) {
54      super(conf, storeConfigInfo);
55    }
56  
57    @Override
58    public CompactionRequest selectCompaction(Collection<StoreFile> candidateFiles,
59        List<StoreFile> filesCompacting, boolean isUserCompaction, boolean mayUseOffPeak,
60        boolean forceMajor) throws IOException {
61      
62      if(forceMajor){
63        LOG.warn("Major compaction is not supported for FIFO compaction policy. Ignore the flag.");
64      }
65      boolean isAfterSplit = StoreUtils.hasReferences(candidateFiles);
66      if(isAfterSplit){
67        LOG.info("Split detected, delegate selection to the parent policy.");
68        return super.selectCompaction(candidateFiles, filesCompacting, isUserCompaction, 
69          mayUseOffPeak, forceMajor);
70      }
71      
72      // Nothing to compact
73      Collection<StoreFile> toCompact = getExpiredStores(candidateFiles, filesCompacting);
74      CompactionRequest result = new CompactionRequest(toCompact);
75      return result;
76    }
77  
78    @Override
79    public boolean isMajorCompaction(Collection<StoreFile> filesToCompact) throws IOException {
80      boolean isAfterSplit = StoreUtils.hasReferences(filesToCompact);
81      if(isAfterSplit){
82        LOG.info("Split detected, delegate to the parent policy.");
83        return super.isMajorCompaction(filesToCompact);
84      }
85      return false;
86    }
87  
88    @Override
89    public boolean needsCompaction(Collection<StoreFile> storeFiles, 
90        List<StoreFile> filesCompacting) {  
91      boolean isAfterSplit = StoreUtils.hasReferences(storeFiles);
92      if(isAfterSplit){
93        LOG.info("Split detected, delegate to the parent policy.");
94        return super.needsCompaction(storeFiles, filesCompacting);
95      }
96      return hasExpiredStores(storeFiles);
97    }
98  
99    private  boolean hasExpiredStores(Collection<StoreFile> files) {
100     long currentTime = EnvironmentEdgeManager.currentTime();
101     for(StoreFile sf: files){
102       // Check MIN_VERSIONS is in HStore removeUnneededFiles
103       Long maxTs = sf.getReader().getMaxTimestamp();
104       long maxTtl = storeConfigInfo.getStoreFileTtl();
105       if(maxTs == null 
106           || maxTtl == Long.MAX_VALUE
107           || (currentTime - maxTtl < maxTs)){
108         continue; 
109       } else{
110         return true;
111       }
112     }
113     return false;
114   }
115 
116   private  Collection<StoreFile> getExpiredStores(Collection<StoreFile> files,
117     Collection<StoreFile> filesCompacting) {
118     long currentTime = EnvironmentEdgeManager.currentTime();
119     Collection<StoreFile> expiredStores = new ArrayList<StoreFile>();    
120     for(StoreFile sf: files){
121       // Check MIN_VERSIONS is in HStore removeUnneededFiles
122       Long maxTs = sf.getReader().getMaxTimestamp();
123       long maxTtl = storeConfigInfo.getStoreFileTtl();
124       if(maxTs == null 
125           || maxTtl == Long.MAX_VALUE
126           || (currentTime - maxTtl < maxTs)){
127         continue; 
128       } else if(filesCompacting == null || filesCompacting.contains(sf) == false){
129         expiredStores.add(sf);
130       }
131     }
132     return expiredStores;
133   }
134 }