View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver.compactions;
20  
21  import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY;
22  
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.Collection;
26  import java.util.List;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.classification.InterfaceAudience;
31  import org.apache.hadoop.conf.Configuration;
32  import org.apache.hadoop.fs.Path;
33  import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
34  import org.apache.hadoop.hbase.regionserver.StoreFile;
35  import org.apache.hadoop.hbase.regionserver.StoreUtils;
36  import org.apache.hadoop.hbase.regionserver.StripeStoreConfig;
37  import org.apache.hadoop.hbase.regionserver.StripeStoreFlusher;
38  import org.apache.hadoop.hbase.util.Bytes;
39  import org.apache.hadoop.hbase.util.ConcatenatedLists;
40  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
41  import org.apache.hadoop.hbase.util.Pair;
42  
43  import com.google.common.collect.ImmutableList;
44  
45  /**
46   * Stripe store implementation of compaction policy.
47   */
48  @InterfaceAudience.Private
49  public class StripeCompactionPolicy extends CompactionPolicy {
50    private final static Log LOG = LogFactory.getLog(StripeCompactionPolicy.class);
51    // Policy used to compact individual stripes.
52    private ExploringCompactionPolicy stripePolicy = null;
53  
54    private StripeStoreConfig config;
55  
56    public StripeCompactionPolicy(
57        Configuration conf, StoreConfigInformation storeConfigInfo, StripeStoreConfig config) {
58      super(conf, storeConfigInfo);
59      this.config = config;
60      stripePolicy = new ExploringCompactionPolicy(conf, storeConfigInfo);
61    }
62  
63    public List<StoreFile> preSelectFilesForCoprocessor(StripeInformationProvider si,
64        List<StoreFile> filesCompacting) {
65      // We sincerely hope nobody is messing with us with their coprocessors.
66      // If they do, they are very likely to shoot themselves in the foot.
67      // We'll just exclude all the filesCompacting from the list.
68      ArrayList<StoreFile> candidateFiles = new ArrayList<StoreFile>(si.getStorefiles());
69      candidateFiles.removeAll(filesCompacting);
70      return candidateFiles;
71    }
72  
73    public StripeCompactionRequest createEmptyRequest(
74        StripeInformationProvider si, CompactionRequest request) {
75      // Treat as L0-ish compaction with fixed set of files, and hope for the best.
76      if (si.getStripeCount() > 0) {
77        return new BoundaryStripeCompactionRequest(request, si.getStripeBoundaries());
78      }
79      Pair<Long, Integer> targetKvsAndCount = estimateTargetKvs(
80          request.getFiles(), this.config.getInitialCount());
81      return new SplitStripeCompactionRequest(
82          request, OPEN_KEY, OPEN_KEY, targetKvsAndCount.getSecond(), targetKvsAndCount.getFirst());
83    }
84  
85    public StripeStoreFlusher.StripeFlushRequest selectFlush(
86        StripeInformationProvider si, int kvCount) {
87      if (this.config.isUsingL0Flush()) {
88        return new StripeStoreFlusher.StripeFlushRequest(); // L0 is used, return dumb request.
89      }
90      if (si.getStripeCount() == 0) {
91        // No stripes - start with the requisite count, derive KVs per stripe.
92        int initialCount = this.config.getInitialCount();
93        return new StripeStoreFlusher.SizeStripeFlushRequest(initialCount, kvCount / initialCount);
94      }
95      // There are stripes - do according to the boundaries.
96      return new StripeStoreFlusher.BoundaryStripeFlushRequest(si.getStripeBoundaries());
97    }
98  
99    public StripeCompactionRequest selectCompaction(StripeInformationProvider si,
100       List<StoreFile> filesCompacting, boolean isOffpeak) throws IOException {
101     // TODO: first cut - no parallel compactions. To have more fine grained control we
102     //       probably need structure more sophisticated than a list.
103     if (!filesCompacting.isEmpty()) {
104       LOG.debug("Not selecting compaction: " + filesCompacting.size() + " files compacting");
105       return null;
106     }
107 
108     // We are going to do variations of compaction in strict order of preference.
109     // A better/more advanced approach is to use a heuristic to see which one is "more
110     // necessary" at current time.
111 
112     // This can happen due to region split. We can skip it later; for now preserve
113     // compact-all-things behavior.
114     Collection<StoreFile> allFiles = si.getStorefiles();
115     if (StoreUtils.hasReferences(allFiles)) {
116       LOG.debug("There are references in the store; compacting all files");
117       long targetKvs = estimateTargetKvs(allFiles, config.getInitialCount()).getFirst();
118       SplitStripeCompactionRequest request = new SplitStripeCompactionRequest(
119           allFiles, OPEN_KEY, OPEN_KEY, targetKvs);
120       request.setMajorRangeFull();
121       return request;
122     }
123 
124     int stripeCount = si.getStripeCount();
125     List<StoreFile> l0Files = si.getLevel0Files();
126 
127     // See if we need to make new stripes.
128     boolean shouldCompactL0 = (this.config.getLevel0MinFiles() <= l0Files.size());
129     if (stripeCount == 0) {
130       if (!shouldCompactL0) return null; // nothing to do.
131       return selectNewStripesCompaction(si);
132     }
133 
134     boolean canDropDeletesNoL0 = l0Files.size() == 0;
135     if (shouldCompactL0) {
136       if (!canDropDeletesNoL0) {
137         // If we need to compact L0, see if we can add something to it, and drop deletes.
138         StripeCompactionRequest result = selectSingleStripeCompaction(
139             si, true, canDropDeletesNoL0, isOffpeak);
140         if (result != null) return result;
141       }
142       LOG.debug("Selecting L0 compaction with " + l0Files.size() + " files");
143       return new BoundaryStripeCompactionRequest(l0Files, si.getStripeBoundaries());
144     }
145 
146     // Try to delete fully expired stripes
147     StripeCompactionRequest result = selectExpiredMergeCompaction(si, canDropDeletesNoL0);
148     if (result != null) return result;
149 
150     // Ok, nothing special here, let's see if we need to do a common compaction.
151     // This will also split the stripes that are too big if needed.
152     return selectSingleStripeCompaction(si, false, canDropDeletesNoL0, isOffpeak);
153   }
154 
155   public boolean needsCompactions(StripeInformationProvider si, List<StoreFile> filesCompacting) {
156     // Approximation on whether we need compaction.
157     return filesCompacting.isEmpty()
158         && (StoreUtils.hasReferences(si.getStorefiles())
159           || (si.getLevel0Files().size() >= this.config.getLevel0MinFiles())
160           || needsSingleStripeCompaction(si));
161   }
162 
163   @Override
164   public boolean isMajorCompaction(Collection<StoreFile> filesToCompact) throws IOException {
165     return false; // there's never a major compaction!
166   }
167 
168   @Override
169   public boolean throttleCompaction(long compactionSize) {
170     return compactionSize > comConf.getThrottlePoint();
171   }
172 
173   /**
174    * @param si StoreFileManager.
175    * @return Whether any stripe potentially needs compaction.
176    */
177   protected boolean needsSingleStripeCompaction(StripeInformationProvider si) {
178     int minFiles = this.config.getStripeCompactMinFiles();
179     for (List<StoreFile> stripe : si.getStripes()) {
180       if (stripe.size() >= minFiles) return true;
181     }
182     return false;
183   }
184 
185   protected StripeCompactionRequest selectSingleStripeCompaction(StripeInformationProvider si,
186       boolean includeL0, boolean canDropDeletesWithoutL0, boolean isOffpeak) throws IOException {
187     ArrayList<ImmutableList<StoreFile>> stripes = si.getStripes();
188 
189     int bqIndex = -1;
190     List<StoreFile> bqSelection = null;
191     int stripeCount = stripes.size();
192     long bqTotalSize = -1;
193     for (int i = 0; i < stripeCount; ++i) {
194       // If we want to compact L0 to drop deletes, we only want whole-stripe compactions.
195       // So, pass includeL0 as 2nd parameter to indicate that.
196       List<StoreFile> selection = selectSimpleCompaction(stripes.get(i),
197           !canDropDeletesWithoutL0 && includeL0, isOffpeak);
198       if (selection.isEmpty()) continue;
199       long size = 0;
200       for (StoreFile sf : selection) {
201         size += sf.getReader().length();
202       }
203       if (bqSelection == null || selection.size() > bqSelection.size() ||
204           (selection.size() == bqSelection.size() && size < bqTotalSize)) {
205         bqSelection = selection;
206         bqIndex = i;
207         bqTotalSize = size;
208       }
209     }
210     if (bqSelection == null) {
211       LOG.debug("No good compaction is possible in any stripe");
212       return null;
213     }
214     List<StoreFile> filesToCompact = new ArrayList<StoreFile>(bqSelection);
215     // See if we can, and need to, split this stripe.
216     int targetCount = 1;
217     long targetKvs = Long.MAX_VALUE;
218     boolean hasAllFiles = filesToCompact.size() == stripes.get(bqIndex).size();
219     String splitString = "";
220     if (hasAllFiles && bqTotalSize >= config.getSplitSize()) {
221       if (includeL0) {
222         // We want to avoid the scenario where we compact a stripe w/L0 and then split it.
223         // So, if we might split, don't compact the stripe with L0.
224         return null;
225       }
226       Pair<Long, Integer> kvsAndCount = estimateTargetKvs(filesToCompact, config.getSplitCount());
227       targetKvs = kvsAndCount.getFirst();
228       targetCount = kvsAndCount.getSecond();
229       splitString = "; the stripe will be split into at most "
230           + targetCount + " stripes with " + targetKvs + " target KVs";
231     }
232 
233     LOG.debug("Found compaction in a stripe with end key ["
234         + Bytes.toString(si.getEndRow(bqIndex)) + "], with "
235         + filesToCompact.size() + " files of total size " + bqTotalSize + splitString);
236 
237     // See if we can drop deletes.
238     StripeCompactionRequest req;
239     if (includeL0) {
240       assert hasAllFiles;
241       List<StoreFile> l0Files = si.getLevel0Files();
242       LOG.debug("Adding " + l0Files.size() + " files to compaction to be able to drop deletes");
243       ConcatenatedLists<StoreFile> sfs = new ConcatenatedLists<StoreFile>();
244       sfs.addSublist(filesToCompact);
245       sfs.addSublist(l0Files);
246       req = new BoundaryStripeCompactionRequest(sfs, si.getStripeBoundaries());
247     } else {
248       req = new SplitStripeCompactionRequest(
249           filesToCompact, si.getStartRow(bqIndex), si.getEndRow(bqIndex), targetCount, targetKvs);
250     }
251     if (canDropDeletesWithoutL0 || includeL0) {
252       req.setMajorRange(si.getStartRow(bqIndex), si.getEndRow(bqIndex));
253     }
254     req.getRequest().setOffPeak(isOffpeak);
255     return req;
256   }
257 
258   /**
259    * Selects the compaction of a single stripe using default policy.
260    * @param sfs Files.
261    * @param allFilesOnly Whether a compaction of all-or-none files is needed.
262    * @return The resulting selection.
263    */
264   private List<StoreFile> selectSimpleCompaction(
265       List<StoreFile> sfs, boolean allFilesOnly, boolean isOffpeak) {
266     int minFilesLocal = Math.max(
267         allFilesOnly ? sfs.size() : 0, this.config.getStripeCompactMinFiles());
268     int maxFilesLocal = Math.max(this.config.getStripeCompactMaxFiles(), minFilesLocal);
269     return stripePolicy.applyCompactionPolicy(sfs, isOffpeak, false, minFilesLocal, maxFilesLocal);
270   }
271 
272   /**
273    * Selects the compaction that compacts all files (to be removed later).
274    * @param si StoreFileManager.
275    * @param targetStripeCount Target stripe count.
276    * @param targetSize Target stripe size.
277    * @return The compaction.
278    */
279   private StripeCompactionRequest selectCompactionOfAllFiles(StripeInformationProvider si,
280       int targetStripeCount, long targetSize) {
281     Collection<StoreFile> allFiles = si.getStorefiles();
282     SplitStripeCompactionRequest request = new SplitStripeCompactionRequest(
283         allFiles, OPEN_KEY, OPEN_KEY, targetStripeCount, targetSize);
284     request.setMajorRangeFull();
285     LOG.debug("Selecting a compaction that includes all " + allFiles.size() + " files");
286     return request;
287   }
288 
289   private StripeCompactionRequest selectNewStripesCompaction(StripeInformationProvider si) {
290     List<StoreFile> l0Files = si.getLevel0Files();
291     Pair<Long, Integer> kvsAndCount = estimateTargetKvs(l0Files, config.getInitialCount());
292     LOG.debug("Creating " + kvsAndCount.getSecond() + " initial stripes with "
293         + kvsAndCount.getFirst() + " kvs each via L0 compaction of " + l0Files.size() + " files");
294     SplitStripeCompactionRequest request = new SplitStripeCompactionRequest(
295         si.getLevel0Files(), OPEN_KEY, OPEN_KEY, kvsAndCount.getSecond(), kvsAndCount.getFirst());
296     request.setMajorRangeFull(); // L0 only, can drop deletes.
297     return request;
298   }
299 
300   private StripeCompactionRequest selectExpiredMergeCompaction(
301       StripeInformationProvider si, boolean canDropDeletesNoL0) {
302     long cfTtl = this.storeConfigInfo.getStoreFileTtl();
303     if (cfTtl == Long.MAX_VALUE) {
304       return null; // minversion might be set, cannot delete old files
305     }
306     long timestampCutoff = EnvironmentEdgeManager.currentTimeMillis() - cfTtl;
307     // Merge the longest sequence of stripes where all files have expired, if any.
308     int start = -1, bestStart = -1, length = 0, bestLength = 0;
309     ArrayList<ImmutableList<StoreFile>> stripes = si.getStripes();
310     OUTER: for (int i = 0; i < stripes.size(); ++i) {
311       for (StoreFile storeFile : stripes.get(i)) {
312         if (storeFile.getReader().getMaxTimestamp() < timestampCutoff) continue;
313         // Found non-expired file, this stripe has to stay.
314         if (length > bestLength) {
315           bestStart = start;
316           bestLength = length;
317         }
318         start = -1;
319         length = 0;
320         continue OUTER;
321       }
322       if (start == -1) {
323         start = i;
324       }
325       ++length;
326     }
327     if (length > bestLength) {
328       bestStart = start;
329       bestLength = length;
330     }
331     if (bestLength == 0) return null;
332     if (bestLength == 1) {
333       // This is currently inefficient. If only one stripe expired, we will rewrite some
334       // entire stripe just to delete some expired files because we rely on metadata and it
335       // cannot simply be updated in an old file. When we either determine stripe dynamically
336       // or move metadata to manifest, we can just drop the "expired stripes".
337       if (bestStart == (stripes.size() - 1)) return null;
338       ++bestLength;
339     }
340     LOG.debug("Merging " + bestLength + " stripes to delete expired store files");
341     int endIndex = bestStart + bestLength - 1;
342     ConcatenatedLists<StoreFile> sfs = new ConcatenatedLists<StoreFile>();
343     sfs.addAllSublists(stripes.subList(bestStart, endIndex + 1));
344     SplitStripeCompactionRequest result = new SplitStripeCompactionRequest(sfs,
345         si.getStartRow(bestStart), si.getEndRow(endIndex), 1, Long.MAX_VALUE);
346     if (canDropDeletesNoL0) {
347       result.setMajorRangeFull();
348     }
349     return result;
350   }
351 
352   private static long getTotalKvCount(final Collection<StoreFile> candidates) {
353     long totalSize = 0;
354     for (StoreFile storeFile : candidates) {
355       totalSize += storeFile.getReader().getEntries();
356     }
357     return totalSize;
358   }
359 
360   public static long getTotalFileSize(final Collection<StoreFile> candidates) {
361     long totalSize = 0;
362     for (StoreFile storeFile : candidates) {
363       totalSize += storeFile.getReader().length();
364     }
365     return totalSize;
366   }
367 
368   private Pair<Long, Integer> estimateTargetKvs(Collection<StoreFile> files, double splitCount) {
369     // If the size is larger than what we target, we don't want to split into proportionally
370     // larger parts and then have to split again very soon. So, we will increase the multiplier
371     // by one until we get small enough parts. E.g. 5Gb stripe that should have been split into
372     // 2 parts when it was 3Gb will be split into 3x1.67Gb parts, rather than 2x2.5Gb parts.
373     long totalSize = getTotalFileSize(files);
374     long targetPartSize = config.getSplitPartSize();
375     assert targetPartSize > 0 && splitCount > 0;
376     double ratio = totalSize / (splitCount * targetPartSize); // ratio of real to desired size
377     while (ratio > 1.0) {
378       // Ratio of real to desired size if we increase the multiplier.
379       double newRatio = totalSize / ((splitCount + 1.0) * targetPartSize);
380       if ((1.0 / newRatio) >= ratio) break; // New ratio is < 1.0, but further than the last one.
381       ratio = newRatio;
382       splitCount += 1.0;
383     }
384     long kvCount = (long)(getTotalKvCount(files) / splitCount);
385     return new Pair<Long, Integer>(kvCount, (int)Math.ceil(splitCount));
386   }
387 
388   /** Stripe compaction request wrapper. */
389   public abstract static class StripeCompactionRequest {
390     protected CompactionRequest request;
391     protected byte[] majorRangeFromRow = null, majorRangeToRow = null;
392 
393     /**
394      * Executes the request against compactor (essentially, just calls correct overload of
395      * compact method), to simulate more dynamic dispatch.
396      * @param compactor Compactor.
397      * @return result of compact(...)
398      */
399     public abstract List<Path> execute(StripeCompactor compactor) throws IOException;
400 
401     public StripeCompactionRequest(CompactionRequest request) {
402       this.request = request;
403     }
404 
405     /**
406      * Sets compaction "major range". Major range is the key range for which all
407      * the files are included, so they can be treated like major-compacted files.
408      * @param startRow Left boundary, inclusive.
409      * @param endRow Right boundary, exclusive.
410      */
411     public void setMajorRange(byte[] startRow, byte[] endRow) {
412       this.majorRangeFromRow = startRow;
413       this.majorRangeToRow = endRow;
414     }
415 
416     public CompactionRequest getRequest() {
417       return this.request;
418     }
419 
420     public void setRequest(CompactionRequest request) {
421       assert request != null;
422       this.request = request;
423       this.majorRangeFromRow = this.majorRangeToRow = null;
424     }
425   }
426 
427   /**
428    * Request for stripe compactor that will cause it to split the source files into several
429    * separate files at the provided boundaries.
430    */
431   private static class BoundaryStripeCompactionRequest extends StripeCompactionRequest {
432     private final List<byte[]> targetBoundaries;
433 
434     /**
435      * @param request Original request.
436      * @param targetBoundaries New files should be written with these boundaries.
437      */
438     public BoundaryStripeCompactionRequest(CompactionRequest request,
439         List<byte[]> targetBoundaries) {
440       super(request);
441       this.targetBoundaries = targetBoundaries;
442     }
443 
444     public BoundaryStripeCompactionRequest(Collection<StoreFile> files,
445         List<byte[]> targetBoundaries) {
446       this(new CompactionRequest(files), targetBoundaries);
447     }
448 
449     @Override
450     public List<Path> execute(StripeCompactor compactor) throws IOException {
451       return compactor.compact(
452           this.request, this.targetBoundaries, this.majorRangeFromRow, this.majorRangeToRow);
453     }
454   }
455 
456   /**
457    * Request for stripe compactor that will cause it to split the source files into several
458    * separate files into based on key-value count, as well as file count limit.
459    * Most of the files will be roughly the same size. The last file may be smaller or larger
460    * depending on the interplay of the amount of data and maximum number of files allowed.
461    */
462   private static class SplitStripeCompactionRequest extends StripeCompactionRequest {
463     private final byte[] startRow, endRow;
464     private final int targetCount;
465     private final long targetKvs;
466 
467     /**
468      * @param request Original request.
469      * @param startRow Left boundary of the range to compact, inclusive.
470      * @param endRow Right boundary of the range to compact, exclusive.
471      * @param targetCount The maximum number of stripe to compact into.
472      * @param targetKvs The KV count of each segment. If targetKvs*targetCount is less than
473      *                  total number of kvs, all the overflow data goes into the last stripe.
474      */
475     public SplitStripeCompactionRequest(CompactionRequest request,
476         byte[] startRow, byte[] endRow, int targetCount, long targetKvs) {
477       super(request);
478       this.startRow = startRow;
479       this.endRow = endRow;
480       this.targetCount = targetCount;
481       this.targetKvs = targetKvs;
482     }
483 
484     public SplitStripeCompactionRequest(
485         CompactionRequest request, byte[] startRow, byte[] endRow, long targetKvs) {
486       this(request, startRow, endRow, Integer.MAX_VALUE, targetKvs);
487     }
488 
489     public SplitStripeCompactionRequest(
490         Collection<StoreFile> files, byte[] startRow, byte[] endRow, long targetKvs) {
491       this(files, startRow, endRow, Integer.MAX_VALUE, targetKvs);
492     }
493 
494     public SplitStripeCompactionRequest(Collection<StoreFile> files,
495         byte[] startRow, byte[] endRow, int targetCount, long targetKvs) {
496       this(new CompactionRequest(files), startRow, endRow, targetCount, targetKvs);
497     }
498 
499     @Override
500     public List<Path> execute(StripeCompactor compactor) throws IOException {
501       return compactor.compact(this.request, this.targetCount, this.targetKvs,
502           this.startRow, this.endRow, this.majorRangeFromRow, this.majorRangeToRow);
503     }
504 
505     /** Set major range of the compaction to the entire compaction range.
506      * See {@link #setMajorRange(byte[], byte[])}. */
507     public void setMajorRangeFull() {
508       setMajorRange(this.startRow, this.endRow);
509     }
510   }
511 
512   /** The information about stripes that the policy needs to do its stuff */
513   public static interface StripeInformationProvider {
514     public Collection<StoreFile> getStorefiles();
515 
516     /**
517      * Gets the start row for a given stripe.
518      * @param stripeIndex Stripe index.
519      * @return Start row. May be an open key.
520      */
521     public byte[] getStartRow(int stripeIndex);
522 
523     /**
524      * Gets the end row for a given stripe.
525      * @param stripeIndex Stripe index.
526      * @return End row. May be an open key.
527      */
528     public byte[] getEndRow(int stripeIndex);
529 
530     /**
531      * @return Level 0 files.
532      */
533     public List<StoreFile> getLevel0Files();
534 
535     /**
536      * @return All stripe boundaries; including the open ones on both ends.
537      */
538     public List<byte[]> getStripeBoundaries();
539 
540     /**
541      * @return The stripes.
542      */
543     public ArrayList<ImmutableList<StoreFile>> getStripes();
544 
545     /**
546      * @return Stripe count.
547      */
548     public int getStripeCount();
549   }
550 }