View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver.compactions;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.Collection;
25  import java.util.List;
26  import java.util.Random;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.hbase.HConstants;
32  import org.apache.hadoop.hbase.classification.InterfaceAudience;
33  import org.apache.hadoop.hbase.regionserver.RSRpcServices;
34  import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
35  import org.apache.hadoop.hbase.regionserver.StoreFile;
36  import org.apache.hadoop.hbase.regionserver.StoreUtils;
37  
38  import com.google.common.base.Preconditions;
39  import com.google.common.base.Predicate;
40  import com.google.common.collect.Collections2;
41  
42  /**
43   * The default algorithm for selecting files for compaction.
44   * Combines the compaction configuration and the provisional file selection that
45   * it's given to produce the list of suitable candidates for compaction.
46   */
47  @InterfaceAudience.Private
48  public class RatioBasedCompactionPolicy extends CompactionPolicy {
49    private static final Log LOG = LogFactory.getLog(RatioBasedCompactionPolicy.class);
50  
51    public RatioBasedCompactionPolicy(Configuration conf,
52                                      StoreConfigInformation storeConfigInfo) {
53      super(conf, storeConfigInfo);
54    }
55  
56    private ArrayList<StoreFile> getCurrentEligibleFiles(
57        ArrayList<StoreFile> candidateFiles, final List<StoreFile> filesCompacting) {
58      // candidates = all storefiles not already in compaction queue
59      if (!filesCompacting.isEmpty()) {
60        // exclude all files older than the newest file we're currently
61        // compacting. this allows us to preserve contiguity (HBASE-2856)
62        StoreFile last = filesCompacting.get(filesCompacting.size() - 1);
63        int idx = candidateFiles.indexOf(last);
64        Preconditions.checkArgument(idx != -1);
65        candidateFiles.subList(0, idx + 1).clear();
66      }
67      return candidateFiles;
68    }
69  
70    public List<StoreFile> preSelectCompactionForCoprocessor(
71        final Collection<StoreFile> candidates, final List<StoreFile> filesCompacting) {
72      return getCurrentEligibleFiles(new ArrayList<StoreFile>(candidates), filesCompacting);
73    }
74  
75    /**
76     * @param candidateFiles candidate files, ordered from oldest to newest. All files in store.
77     * @return subset copy of candidate list that meets compaction criteria
78     * @throws java.io.IOException
79     */
80    public CompactionRequest selectCompaction(Collection<StoreFile> candidateFiles,
81        final List<StoreFile> filesCompacting, final boolean isUserCompaction,
82        final boolean mayUseOffPeak, final boolean forceMajor) throws IOException {
83      // Preliminary compaction subject to filters
84      ArrayList<StoreFile> candidateSelection = new ArrayList<StoreFile>(candidateFiles);
85      // Stuck and not compacting enough (estimate). It is not guaranteed that we will be
86      // able to compact more if stuck and compacting, because ratio policy excludes some
87      // non-compacting files from consideration during compaction (see getCurrentEligibleFiles).
88      int futureFiles = filesCompacting.isEmpty() ? 0 : 1;
89      boolean mayBeStuck = (candidateFiles.size() - filesCompacting.size() + futureFiles)
90          >= storeConfigInfo.getBlockingFileCount();
91      candidateSelection = getCurrentEligibleFiles(candidateSelection, filesCompacting);
92      LOG.debug("Selecting compaction from " + candidateFiles.size() + " store files, " +
93          filesCompacting.size() + " compacting, " + candidateSelection.size() +
94          " eligible, " + storeConfigInfo.getBlockingFileCount() + " blocking");
95  
96      // If we can't have all files, we cannot do major anyway
97      boolean isAllFiles = candidateFiles.size() == candidateSelection.size();
98      if (!(forceMajor && isAllFiles)) {
99        candidateSelection = skipLargeFiles(candidateSelection, mayUseOffPeak);
100       isAllFiles = candidateFiles.size() == candidateSelection.size();
101     }
102 
103     // Try a major compaction if this is a user-requested major compaction,
104     // or if we do not have too many files to compact and this was requested as a major compaction
105     boolean isTryingMajor = (forceMajor && isAllFiles && isUserCompaction)
106         || (((forceMajor && isAllFiles) || isMajorCompaction(candidateSelection))
107           && (candidateSelection.size() < comConf.getMaxFilesToCompact()));
108     // Or, if there are any references among the candidates.
109     boolean isAfterSplit = StoreUtils.hasReferences(candidateSelection);
110     if (!isTryingMajor && !isAfterSplit) {
111       // We're are not compacting all files, let's see what files are applicable
112       candidateSelection = filterBulk(candidateSelection);
113       candidateSelection = applyCompactionPolicy(candidateSelection, mayUseOffPeak, mayBeStuck);
114       candidateSelection = checkMinFilesCriteria(candidateSelection);
115     }
116     candidateSelection = removeExcessFiles(candidateSelection, isUserCompaction, isTryingMajor);
117     // Now we have the final file list, so we can determine if we can do major/all files.
118     isAllFiles = (candidateFiles.size() == candidateSelection.size());
119     CompactionRequest result = new CompactionRequest(candidateSelection);
120     result.setOffPeak(!candidateSelection.isEmpty() && !isAllFiles && mayUseOffPeak);
121     result.setIsMajor(isTryingMajor && isAllFiles, isAllFiles);
122     return result;
123   }
124 
125   /**
126    * @param candidates pre-filtrate
127    * @return filtered subset
128    * exclude all files above maxCompactSize
129    * Also save all references. We MUST compact them
130    */
131   private ArrayList<StoreFile> skipLargeFiles(ArrayList<StoreFile> candidates, 
132     boolean mayUseOffpeak) {
133     int pos = 0;
134     while (pos < candidates.size() && !candidates.get(pos).isReference()
135       && (candidates.get(pos).getReader().length() > comConf.getMaxCompactSize(mayUseOffpeak))) {
136       ++pos;
137     }
138     if (pos > 0) {
139       LOG.debug("Some files are too large. Excluding " + pos
140           + " files from compaction candidates");
141       candidates.subList(0, pos).clear();
142     }
143     return candidates;
144   }
145 
146   /**
147    * @param candidates pre-filtrate
148    * @return filtered subset
149    * exclude all bulk load files if configured
150    */
151   private ArrayList<StoreFile> filterBulk(ArrayList<StoreFile> candidates) {
152     candidates.removeAll(Collections2.filter(candidates,
153         new Predicate<StoreFile>() {
154           @Override
155           public boolean apply(StoreFile input) {
156             return input.excludeFromMinorCompaction();
157           }
158         }));
159     return candidates;
160   }
161 
162   /**
163    * @param candidates pre-filtrate
164    * @return filtered subset
165    * take upto maxFilesToCompact from the start
166    */
167   private ArrayList<StoreFile> removeExcessFiles(ArrayList<StoreFile> candidates,
168       boolean isUserCompaction, boolean isMajorCompaction) {
169     int excess = candidates.size() - comConf.getMaxFilesToCompact();
170     if (excess > 0) {
171       if (isMajorCompaction && isUserCompaction) {
172         LOG.debug("Warning, compacting more than " + comConf.getMaxFilesToCompact() +
173             " files because of a user-requested major compaction");
174       } else {
175         LOG.debug("Too many admissible files. Excluding " + excess
176           + " files from compaction candidates");
177         candidates.subList(comConf.getMaxFilesToCompact(), candidates.size()).clear();
178       }
179     }
180     return candidates;
181   }
182   /**
183    * @param candidates pre-filtrate
184    * @return filtered subset
185    * forget the compactionSelection if we don't have enough files
186    */
187   private ArrayList<StoreFile> checkMinFilesCriteria(ArrayList<StoreFile> candidates) {
188     int minFiles = comConf.getMinFilesToCompact();
189     if (candidates.size() < minFiles) {
190       if(LOG.isDebugEnabled()) {
191         LOG.debug("Not compacting files because we only have " + candidates.size() +
192           " files ready for compaction. Need " + minFiles + " to initiate.");
193       }
194       candidates.clear();
195     }
196     return candidates;
197   }
198 
199   /**
200     * @param candidates pre-filtrate
201     * @return filtered subset
202     * -- Default minor compaction selection algorithm:
203     * choose CompactSelection from candidates --
204     * First exclude bulk-load files if indicated in configuration.
205     * Start at the oldest file and stop when you find the first file that
206     * meets compaction criteria:
207     * (1) a recently-flushed, small file (i.e. <= minCompactSize)
208     * OR
209     * (2) within the compactRatio of sum(newer_files)
210     * Given normal skew, any newer files will also meet this criteria
211     * <p/>
212     * Additional Note:
213     * If fileSizes.size() >> maxFilesToCompact, we will recurse on
214     * compact().  Consider the oldest files first to avoid a
215     * situation where we always compact [end-threshold,end).  Then, the
216     * last file becomes an aggregate of the previous compactions.
217     *
218     * normal skew:
219     *
220     *         older ----> newer (increasing seqID)
221     *     _
222     *    | |   _
223     *    | |  | |   _
224     *  --|-|- |-|- |-|---_-------_-------  minCompactSize
225     *    | |  | |  | |  | |  _  | |
226     *    | |  | |  | |  | | | | | |
227     *    | |  | |  | |  | | | | | |
228     */
229   ArrayList<StoreFile> applyCompactionPolicy(ArrayList<StoreFile> candidates,
230       boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {
231     if (candidates.isEmpty()) {
232       return candidates;
233     }
234 
235     // we're doing a minor compaction, let's see what files are applicable
236     int start = 0;
237     double ratio = comConf.getCompactionRatio();
238     if (mayUseOffPeak) {
239       ratio = comConf.getCompactionRatioOffPeak();
240       LOG.info("Running an off-peak compaction, selection ratio = " + ratio);
241     }
242 
243     // get store file sizes for incremental compacting selection.
244     final int countOfFiles = candidates.size();
245     long[] fileSizes = new long[countOfFiles];
246     long[] sumSize = new long[countOfFiles];
247     for (int i = countOfFiles - 1; i >= 0; --i) {
248       StoreFile file = candidates.get(i);
249       fileSizes[i] = file.getReader().length();
250       // calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo
251       int tooFar = i + comConf.getMaxFilesToCompact() - 1;
252       sumSize[i] = fileSizes[i]
253         + ((i + 1 < countOfFiles) ? sumSize[i + 1] : 0)
254         - ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);
255     }
256 
257 
258     while (countOfFiles - start >= comConf.getMinFilesToCompact() &&
259       fileSizes[start] > Math.max(comConf.getMinCompactSize(),
260           (long) (sumSize[start + 1] * ratio))) {
261       ++start;
262     }
263     if (start < countOfFiles) {
264       LOG.info("Default compaction algorithm has selected " + (countOfFiles - start)
265         + " files from " + countOfFiles + " candidates");
266     } else if (mayBeStuck) {
267       // We may be stuck. Compact the latest files if we can.
268       int filesToLeave = candidates.size() - comConf.getMinFilesToCompact();
269       if (filesToLeave >= 0) {
270         start = filesToLeave;
271       }
272     }
273     candidates.subList(0, start).clear();
274     return candidates;
275   }
276 
277   /*
278    * @param filesToCompact Files to compact. Can be null.
279    * @return True if we should run a major compaction.
280    */
281   @Override
282   public boolean isMajorCompaction(final Collection<StoreFile> filesToCompact)
283       throws IOException {
284     boolean result = false;
285     long mcTime = getNextMajorCompactTime(filesToCompact);
286     if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) {
287       return result;
288     }
289     // TODO: Use better method for determining stamp of last major (HBASE-2990)
290     long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact);
291     long now = System.currentTimeMillis();
292     if (lowTimestamp > 0l && lowTimestamp < (now - mcTime)) {
293       // Major compaction time has elapsed.
294       long cfTtl = this.storeConfigInfo.getStoreFileTtl();
295       if (filesToCompact.size() == 1) {
296         // Single file
297         StoreFile sf = filesToCompact.iterator().next();
298         Long minTimestamp = sf.getMinimumTimestamp();
299         long oldest = (minTimestamp == null)
300             ? Long.MIN_VALUE
301             : now - minTimestamp.longValue();
302         if (sf.isMajorCompaction() &&
303             (cfTtl == HConstants.FOREVER || oldest < cfTtl)) {
304           float blockLocalityIndex = sf.getHDFSBlockDistribution().getBlockLocalityIndex(
305               RSRpcServices.getHostname(comConf.conf, false)
306           );
307           if (blockLocalityIndex < comConf.getMinLocalityToForceCompact()) {
308             if (LOG.isDebugEnabled()) {
309               LOG.debug("Major compaction triggered on only store " + this +
310                   "; to make hdfs blocks local, current blockLocalityIndex is " +
311                   blockLocalityIndex + " (min " + comConf.getMinLocalityToForceCompact() +
312                   ")");
313             }
314             result = true;
315           } else {
316             if (LOG.isDebugEnabled()) {
317               LOG.debug("Skipping major compaction of " + this +
318                   " because one (major) compacted file only, oldestTime " +
319                   oldest + "ms is < ttl=" + cfTtl + " and blockLocalityIndex is " +
320                   blockLocalityIndex + " (min " + comConf.getMinLocalityToForceCompact() +
321                   ")");
322             }
323           }
324         } else if (cfTtl != HConstants.FOREVER && oldest > cfTtl) {
325           LOG.debug("Major compaction triggered on store " + this +
326             ", because keyvalues outdated; time since last major compaction " +
327             (now - lowTimestamp) + "ms");
328           result = true;
329         }
330       } else {
331         if (LOG.isDebugEnabled()) {
332           LOG.debug("Major compaction triggered on store " + this +
333               "; time since last major compaction " + (now - lowTimestamp) + "ms");
334         }
335         result = true;
336       }
337     }
338     return result;
339   }
340 
341   /**
342    * Used calculation jitter
343    */
344   private final Random random = new Random();
345 
346   /**
347    * @param filesToCompact
348    * @return When to run next major compaction
349    */
350   public long getNextMajorCompactTime(final Collection<StoreFile> filesToCompact) {
351     // default = 24hrs
352     long ret = comConf.getMajorCompactionPeriod();
353     if (ret > 0) {
354       // default = 20% = +/- 4.8 hrs
355       double jitterPct = comConf.getMajorCompactionJitter();
356       if (jitterPct > 0) {
357         long jitter = Math.round(ret * jitterPct);
358         // deterministic jitter avoids a major compaction storm on restart
359         Integer seed = StoreUtils.getDeterministicRandomSeed(filesToCompact);
360         if (seed != null) {
361           // Synchronized to ensure one user of random instance at a time.
362           double rnd = -1;
363           synchronized (this) {
364             this.random.setSeed(seed);
365             rnd = this.random.nextDouble();
366           }
367           ret += jitter - Math.round(2L * jitter * rnd);
368         } else {
369           ret = 0; // If seed is null, then no storefiles == no major compaction
370         }
371       }
372     }
373     return ret;
374   }
375 
376   /**
377    * @param compactionSize Total size of some compaction
378    * @return whether this should be a large or small compaction
379    */
380   @Override
381   public boolean throttleCompaction(long compactionSize) {
382     return compactionSize > comConf.getThrottlePoint();
383   }
384 
385   public boolean needsCompaction(final Collection<StoreFile> storeFiles,
386       final List<StoreFile> filesCompacting) {
387     int numCandidates = storeFiles.size() - filesCompacting.size();
388     return numCandidates >= comConf.getMinFilesToCompact();
389   }
390 }