View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver.compactions;
20  
21  import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY;
22  
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.Collection;
26  import java.util.List;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.hbase.classification.InterfaceAudience;
31  import org.apache.hadoop.conf.Configuration;
32  import org.apache.hadoop.fs.Path;
33  import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
34  import org.apache.hadoop.hbase.regionserver.StoreFile;
35  import org.apache.hadoop.hbase.regionserver.StoreUtils;
36  import org.apache.hadoop.hbase.regionserver.StripeStoreConfig;
37  import org.apache.hadoop.hbase.regionserver.StripeStoreFlusher;
38  import org.apache.hadoop.hbase.security.User;
39  import org.apache.hadoop.hbase.util.Bytes;
40  import org.apache.hadoop.hbase.util.ConcatenatedLists;
41  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
42  import org.apache.hadoop.hbase.util.Pair;
43  
44  import com.google.common.collect.ImmutableList;
45  
46  /**
47   * Stripe store implementation of compaction policy.
48   */
49  @InterfaceAudience.Private
50  public class StripeCompactionPolicy extends CompactionPolicy {
51    private final static Log LOG = LogFactory.getLog(StripeCompactionPolicy.class);
52    // Policy used to compact individual stripes.
53    private ExploringCompactionPolicy stripePolicy = null;
54  
55    private StripeStoreConfig config;
56  
57    public StripeCompactionPolicy(
58        Configuration conf, StoreConfigInformation storeConfigInfo, StripeStoreConfig config) {
59      super(conf, storeConfigInfo);
60      this.config = config;
61      stripePolicy = new ExploringCompactionPolicy(conf, storeConfigInfo);
62    }
63  
64    public List<StoreFile> preSelectFilesForCoprocessor(StripeInformationProvider si,
65        List<StoreFile> filesCompacting) {
66      // We sincerely hope nobody is messing with us with their coprocessors.
67      // If they do, they are very likely to shoot themselves in the foot.
68      // We'll just exclude all the filesCompacting from the list.
69      ArrayList<StoreFile> candidateFiles = new ArrayList<StoreFile>(si.getStorefiles());
70      candidateFiles.removeAll(filesCompacting);
71      return candidateFiles;
72    }
73  
74    public StripeCompactionRequest createEmptyRequest(
75        StripeInformationProvider si, CompactionRequest request) {
76      // Treat as L0-ish compaction with fixed set of files, and hope for the best.
77      if (si.getStripeCount() > 0) {
78        return new BoundaryStripeCompactionRequest(request, si.getStripeBoundaries());
79      }
80      Pair<Long, Integer> targetKvsAndCount = estimateTargetKvs(
81          request.getFiles(), this.config.getInitialCount());
82      return new SplitStripeCompactionRequest(
83          request, OPEN_KEY, OPEN_KEY, targetKvsAndCount.getSecond(), targetKvsAndCount.getFirst());
84    }
85  
86    public StripeStoreFlusher.StripeFlushRequest selectFlush(
87        StripeInformationProvider si, int kvCount) {
88      if (this.config.isUsingL0Flush()) {
89        return new StripeStoreFlusher.StripeFlushRequest(); // L0 is used, return dumb request.
90      }
91      if (si.getStripeCount() == 0) {
92        // No stripes - start with the requisite count, derive KVs per stripe.
93        int initialCount = this.config.getInitialCount();
94        return new StripeStoreFlusher.SizeStripeFlushRequest(initialCount, kvCount / initialCount);
95      }
96      // There are stripes - do according to the boundaries.
97      return new StripeStoreFlusher.BoundaryStripeFlushRequest(si.getStripeBoundaries());
98    }
99  
100   public StripeCompactionRequest selectCompaction(StripeInformationProvider si,
101       List<StoreFile> filesCompacting, boolean isOffpeak) throws IOException {
102     // TODO: first cut - no parallel compactions. To have more fine grained control we
103     //       probably need structure more sophisticated than a list.
104     if (!filesCompacting.isEmpty()) {
105       LOG.debug("Not selecting compaction: " + filesCompacting.size() + " files compacting");
106       return null;
107     }
108 
109     // We are going to do variations of compaction in strict order of preference.
110     // A better/more advanced approach is to use a heuristic to see which one is "more
111     // necessary" at current time.
112 
113     // This can happen due to region split. We can skip it later; for now preserve
114     // compact-all-things behavior.
115     Collection<StoreFile> allFiles = si.getStorefiles();
116     if (StoreUtils.hasReferences(allFiles)) {
117       LOG.debug("There are references in the store; compacting all files");
118       long targetKvs = estimateTargetKvs(allFiles, config.getInitialCount()).getFirst();
119       SplitStripeCompactionRequest request = new SplitStripeCompactionRequest(
120           allFiles, OPEN_KEY, OPEN_KEY, targetKvs);
121       request.setMajorRangeFull();
122       return request;
123     }
124 
125     int stripeCount = si.getStripeCount();
126     List<StoreFile> l0Files = si.getLevel0Files();
127 
128     // See if we need to make new stripes.
129     boolean shouldCompactL0 = (this.config.getLevel0MinFiles() <= l0Files.size());
130     if (stripeCount == 0) {
131       if (!shouldCompactL0) return null; // nothing to do.
132       return selectNewStripesCompaction(si);
133     }
134 
135     boolean canDropDeletesNoL0 = l0Files.size() == 0;
136     if (shouldCompactL0) {
137       if (!canDropDeletesNoL0) {
138         // If we need to compact L0, see if we can add something to it, and drop deletes.
139         StripeCompactionRequest result = selectSingleStripeCompaction(
140             si, true, canDropDeletesNoL0, isOffpeak);
141         if (result != null) return result;
142       }
143       LOG.debug("Selecting L0 compaction with " + l0Files.size() + " files");
144       return new BoundaryStripeCompactionRequest(l0Files, si.getStripeBoundaries());
145     }
146 
147     // Try to delete fully expired stripes
148     StripeCompactionRequest result = selectExpiredMergeCompaction(si, canDropDeletesNoL0);
149     if (result != null) return result;
150 
151     // Ok, nothing special here, let's see if we need to do a common compaction.
152     // This will also split the stripes that are too big if needed.
153     return selectSingleStripeCompaction(si, false, canDropDeletesNoL0, isOffpeak);
154   }
155 
156   public boolean needsCompactions(StripeInformationProvider si, List<StoreFile> filesCompacting) {
157     // Approximation on whether we need compaction.
158     return filesCompacting.isEmpty()
159         && (StoreUtils.hasReferences(si.getStorefiles())
160           || (si.getLevel0Files().size() >= this.config.getLevel0MinFiles())
161           || needsSingleStripeCompaction(si));
162   }
163 
164   @Override
165   public boolean isMajorCompaction(Collection<StoreFile> filesToCompact) throws IOException {
166     return false; // there's never a major compaction!
167   }
168 
169   @Override
170   public boolean throttleCompaction(long compactionSize) {
171     return compactionSize > comConf.getThrottlePoint();
172   }
173 
174   /**
175    * @param si StoreFileManager.
176    * @return Whether any stripe potentially needs compaction.
177    */
178   protected boolean needsSingleStripeCompaction(StripeInformationProvider si) {
179     int minFiles = this.config.getStripeCompactMinFiles();
180     for (List<StoreFile> stripe : si.getStripes()) {
181       if (stripe.size() >= minFiles) return true;
182     }
183     return false;
184   }
185 
186   protected StripeCompactionRequest selectSingleStripeCompaction(StripeInformationProvider si,
187       boolean includeL0, boolean canDropDeletesWithoutL0, boolean isOffpeak) throws IOException {
188     ArrayList<ImmutableList<StoreFile>> stripes = si.getStripes();
189 
190     int bqIndex = -1;
191     List<StoreFile> bqSelection = null;
192     int stripeCount = stripes.size();
193     long bqTotalSize = -1;
194     for (int i = 0; i < stripeCount; ++i) {
195       // If we want to compact L0 to drop deletes, we only want whole-stripe compactions.
196       // So, pass includeL0 as 2nd parameter to indicate that.
197       List<StoreFile> selection = selectSimpleCompaction(stripes.get(i),
198           !canDropDeletesWithoutL0 && includeL0, isOffpeak);
199       if (selection.isEmpty()) continue;
200       long size = 0;
201       for (StoreFile sf : selection) {
202         size += sf.getReader().length();
203       }
204       if (bqSelection == null || selection.size() > bqSelection.size() ||
205           (selection.size() == bqSelection.size() && size < bqTotalSize)) {
206         bqSelection = selection;
207         bqIndex = i;
208         bqTotalSize = size;
209       }
210     }
211     if (bqSelection == null) {
212       LOG.debug("No good compaction is possible in any stripe");
213       return null;
214     }
215     List<StoreFile> filesToCompact = new ArrayList<StoreFile>(bqSelection);
216     // See if we can, and need to, split this stripe.
217     int targetCount = 1;
218     long targetKvs = Long.MAX_VALUE;
219     boolean hasAllFiles = filesToCompact.size() == stripes.get(bqIndex).size();
220     String splitString = "";
221     if (hasAllFiles && bqTotalSize >= config.getSplitSize()) {
222       if (includeL0) {
223         // We want to avoid the scenario where we compact a stripe w/L0 and then split it.
224         // So, if we might split, don't compact the stripe with L0.
225         return null;
226       }
227       Pair<Long, Integer> kvsAndCount = estimateTargetKvs(filesToCompact, config.getSplitCount());
228       targetKvs = kvsAndCount.getFirst();
229       targetCount = kvsAndCount.getSecond();
230       splitString = "; the stripe will be split into at most "
231           + targetCount + " stripes with " + targetKvs + " target KVs";
232     }
233 
234     LOG.debug("Found compaction in a stripe with end key ["
235         + Bytes.toString(si.getEndRow(bqIndex)) + "], with "
236         + filesToCompact.size() + " files of total size " + bqTotalSize + splitString);
237 
238     // See if we can drop deletes.
239     StripeCompactionRequest req;
240     if (includeL0) {
241       assert hasAllFiles;
242       List<StoreFile> l0Files = si.getLevel0Files();
243       LOG.debug("Adding " + l0Files.size() + " files to compaction to be able to drop deletes");
244       ConcatenatedLists<StoreFile> sfs = new ConcatenatedLists<StoreFile>();
245       sfs.addSublist(filesToCompact);
246       sfs.addSublist(l0Files);
247       req = new BoundaryStripeCompactionRequest(sfs, si.getStripeBoundaries());
248     } else {
249       req = new SplitStripeCompactionRequest(
250           filesToCompact, si.getStartRow(bqIndex), si.getEndRow(bqIndex), targetCount, targetKvs);
251     }
252     if (hasAllFiles && (canDropDeletesWithoutL0 || includeL0)) {
253       req.setMajorRange(si.getStartRow(bqIndex), si.getEndRow(bqIndex));
254     }
255     req.getRequest().setOffPeak(isOffpeak);
256     return req;
257   }
258 
259   /**
260    * Selects the compaction of a single stripe using default policy.
261    * @param sfs Files.
262    * @param allFilesOnly Whether a compaction of all-or-none files is needed.
263    * @return The resulting selection.
264    */
265   private List<StoreFile> selectSimpleCompaction(
266       List<StoreFile> sfs, boolean allFilesOnly, boolean isOffpeak) {
267     int minFilesLocal = Math.max(
268         allFilesOnly ? sfs.size() : 0, this.config.getStripeCompactMinFiles());
269     int maxFilesLocal = Math.max(this.config.getStripeCompactMaxFiles(), minFilesLocal);
270     return stripePolicy.applyCompactionPolicy(sfs, false, isOffpeak, minFilesLocal, maxFilesLocal);
271   }
272 
273   /**
274    * Selects the compaction that compacts all files (to be removed later).
275    * @param si StoreFileManager.
276    * @param targetStripeCount Target stripe count.
277    * @param targetSize Target stripe size.
278    * @return The compaction.
279    */
280   private StripeCompactionRequest selectCompactionOfAllFiles(StripeInformationProvider si,
281       int targetStripeCount, long targetSize) {
282     Collection<StoreFile> allFiles = si.getStorefiles();
283     SplitStripeCompactionRequest request = new SplitStripeCompactionRequest(
284         allFiles, OPEN_KEY, OPEN_KEY, targetStripeCount, targetSize);
285     request.setMajorRangeFull();
286     LOG.debug("Selecting a compaction that includes all " + allFiles.size() + " files");
287     return request;
288   }
289 
290   private StripeCompactionRequest selectNewStripesCompaction(StripeInformationProvider si) {
291     List<StoreFile> l0Files = si.getLevel0Files();
292     Pair<Long, Integer> kvsAndCount = estimateTargetKvs(l0Files, config.getInitialCount());
293     LOG.debug("Creating " + kvsAndCount.getSecond() + " initial stripes with "
294         + kvsAndCount.getFirst() + " kvs each via L0 compaction of " + l0Files.size() + " files");
295     SplitStripeCompactionRequest request = new SplitStripeCompactionRequest(
296         si.getLevel0Files(), OPEN_KEY, OPEN_KEY, kvsAndCount.getSecond(), kvsAndCount.getFirst());
297     request.setMajorRangeFull(); // L0 only, can drop deletes.
298     return request;
299   }
300 
301   private StripeCompactionRequest selectExpiredMergeCompaction(
302       StripeInformationProvider si, boolean canDropDeletesNoL0) {
303     long cfTtl = this.storeConfigInfo.getStoreFileTtl();
304     if (cfTtl == Long.MAX_VALUE) {
305       return null; // minversion might be set, cannot delete old files
306     }
307     long timestampCutoff = EnvironmentEdgeManager.currentTime() - cfTtl;
308     // Merge the longest sequence of stripes where all files have expired, if any.
309     int start = -1, bestStart = -1, length = 0, bestLength = 0;
310     ArrayList<ImmutableList<StoreFile>> stripes = si.getStripes();
311     OUTER: for (int i = 0; i < stripes.size(); ++i) {
312       for (StoreFile storeFile : stripes.get(i)) {
313         if (storeFile.getReader().getMaxTimestamp() < timestampCutoff) continue;
314         // Found non-expired file, this stripe has to stay.
315         if (length > bestLength) {
316           bestStart = start;
317           bestLength = length;
318         }
319         start = -1;
320         length = 0;
321         continue OUTER;
322       }
323       if (start == -1) {
324         start = i;
325       }
326       ++length;
327     }
328     if (length > bestLength) {
329       bestStart = start;
330       bestLength = length;
331     }
332     if (bestLength == 0) return null;
333     if (bestLength == 1) {
334       // This is currently inefficient. If only one stripe expired, we will rewrite some
335       // entire stripe just to delete some expired files because we rely on metadata and it
336       // cannot simply be updated in an old file. When we either determine stripe dynamically
337       // or move metadata to manifest, we can just drop the "expired stripes".
338       if (bestStart == (stripes.size() - 1)) return null;
339       ++bestLength;
340     }
341     LOG.debug("Merging " + bestLength + " stripes to delete expired store files");
342     int endIndex = bestStart + bestLength - 1;
343     ConcatenatedLists<StoreFile> sfs = new ConcatenatedLists<StoreFile>();
344     sfs.addAllSublists(stripes.subList(bestStart, endIndex + 1));
345     SplitStripeCompactionRequest result = new SplitStripeCompactionRequest(sfs,
346         si.getStartRow(bestStart), si.getEndRow(endIndex), 1, Long.MAX_VALUE);
347     if (canDropDeletesNoL0) {
348       result.setMajorRangeFull();
349     }
350     return result;
351   }
352 
353   private static long getTotalKvCount(final Collection<StoreFile> candidates) {
354     long totalSize = 0;
355     for (StoreFile storeFile : candidates) {
356       totalSize += storeFile.getReader().getEntries();
357     }
358     return totalSize;
359   }
360 
361   public static long getTotalFileSize(final Collection<StoreFile> candidates) {
362     long totalSize = 0;
363     for (StoreFile storeFile : candidates) {
364       totalSize += storeFile.getReader().length();
365     }
366     return totalSize;
367   }
368 
369   private Pair<Long, Integer> estimateTargetKvs(Collection<StoreFile> files, double splitCount) {
370     // If the size is larger than what we target, we don't want to split into proportionally
371     // larger parts and then have to split again very soon. So, we will increase the multiplier
372     // by one until we get small enough parts. E.g. 5Gb stripe that should have been split into
373     // 2 parts when it was 3Gb will be split into 3x1.67Gb parts, rather than 2x2.5Gb parts.
374     long totalSize = getTotalFileSize(files);
375     long targetPartSize = config.getSplitPartSize();
376     assert targetPartSize > 0 && splitCount > 0;
377     double ratio = totalSize / (splitCount * targetPartSize); // ratio of real to desired size
378     while (ratio > 1.0) {
379       // Ratio of real to desired size if we increase the multiplier.
380       double newRatio = totalSize / ((splitCount + 1.0) * targetPartSize);
381       if ((1.0 / newRatio) >= ratio) break; // New ratio is < 1.0, but further than the last one.
382       ratio = newRatio;
383       splitCount += 1.0;
384     }
385     long kvCount = (long)(getTotalKvCount(files) / splitCount);
386     return new Pair<Long, Integer>(kvCount, (int)Math.ceil(splitCount));
387   }
388 
389   /** Stripe compaction request wrapper. */
390   public abstract static class StripeCompactionRequest {
391     protected CompactionRequest request;
392     protected byte[] majorRangeFromRow = null, majorRangeToRow = null;
393 
394     public List<Path> execute(StripeCompactor compactor,
395       CompactionThroughputController throughputController) throws IOException {
396       return execute(compactor, throughputController, null);
397     }
398     /**
399      * Executes the request against compactor (essentially, just calls correct overload of
400      * compact method), to simulate more dynamic dispatch.
401      * @param compactor Compactor.
402      * @return result of compact(...)
403      */
404     public abstract List<Path> execute(StripeCompactor compactor,
405         CompactionThroughputController throughputController, User user) throws IOException;
406 
407     public StripeCompactionRequest(CompactionRequest request) {
408       this.request = request;
409     }
410 
411     /**
412      * Sets compaction "major range". Major range is the key range for which all
413      * the files are included, so they can be treated like major-compacted files.
414      * @param startRow Left boundary, inclusive.
415      * @param endRow Right boundary, exclusive.
416      */
417     public void setMajorRange(byte[] startRow, byte[] endRow) {
418       this.majorRangeFromRow = startRow;
419       this.majorRangeToRow = endRow;
420     }
421 
422     public CompactionRequest getRequest() {
423       return this.request;
424     }
425 
426     public void setRequest(CompactionRequest request) {
427       assert request != null;
428       this.request = request;
429       this.majorRangeFromRow = this.majorRangeToRow = null;
430     }
431   }
432 
433   /**
434    * Request for stripe compactor that will cause it to split the source files into several
435    * separate files at the provided boundaries.
436    */
437   private static class BoundaryStripeCompactionRequest extends StripeCompactionRequest {
438     private final List<byte[]> targetBoundaries;
439 
440     /**
441      * @param request Original request.
442      * @param targetBoundaries New files should be written with these boundaries.
443      */
444     public BoundaryStripeCompactionRequest(CompactionRequest request,
445         List<byte[]> targetBoundaries) {
446       super(request);
447       this.targetBoundaries = targetBoundaries;
448     }
449 
450     public BoundaryStripeCompactionRequest(Collection<StoreFile> files,
451         List<byte[]> targetBoundaries) {
452       this(new CompactionRequest(files), targetBoundaries);
453     }
454 
455     @Override
456     public List<Path> execute(StripeCompactor compactor,
457         CompactionThroughputController throughputController, User user) throws IOException {
458       return compactor.compact(this.request, this.targetBoundaries, this.majorRangeFromRow,
459         this.majorRangeToRow, throughputController, user);
460     }
461   }
462 
463   /**
464    * Request for stripe compactor that will cause it to split the source files into several
465    * separate files into based on key-value count, as well as file count limit.
466    * Most of the files will be roughly the same size. The last file may be smaller or larger
467    * depending on the interplay of the amount of data and maximum number of files allowed.
468    */
469   private static class SplitStripeCompactionRequest extends StripeCompactionRequest {
470     private final byte[] startRow, endRow;
471     private final int targetCount;
472     private final long targetKvs;
473 
474     /**
475      * @param request Original request.
476      * @param startRow Left boundary of the range to compact, inclusive.
477      * @param endRow Right boundary of the range to compact, exclusive.
478      * @param targetCount The maximum number of stripe to compact into.
479      * @param targetKvs The KV count of each segment. If targetKvs*targetCount is less than
480      *                  total number of kvs, all the overflow data goes into the last stripe.
481      */
482     public SplitStripeCompactionRequest(CompactionRequest request,
483         byte[] startRow, byte[] endRow, int targetCount, long targetKvs) {
484       super(request);
485       this.startRow = startRow;
486       this.endRow = endRow;
487       this.targetCount = targetCount;
488       this.targetKvs = targetKvs;
489     }
490 
491     public SplitStripeCompactionRequest(
492         CompactionRequest request, byte[] startRow, byte[] endRow, long targetKvs) {
493       this(request, startRow, endRow, Integer.MAX_VALUE, targetKvs);
494     }
495 
496     public SplitStripeCompactionRequest(
497         Collection<StoreFile> files, byte[] startRow, byte[] endRow, long targetKvs) {
498       this(files, startRow, endRow, Integer.MAX_VALUE, targetKvs);
499     }
500 
501     public SplitStripeCompactionRequest(Collection<StoreFile> files,
502         byte[] startRow, byte[] endRow, int targetCount, long targetKvs) {
503       this(new CompactionRequest(files), startRow, endRow, targetCount, targetKvs);
504     }
505 
506     @Override
507     public List<Path> execute(StripeCompactor compactor,
508         CompactionThroughputController throughputController, User user) throws IOException {
509       return compactor.compact(this.request, this.targetCount, this.targetKvs, this.startRow,
510         this.endRow, this.majorRangeFromRow, this.majorRangeToRow, throughputController, user);
511     }
512 
513     /** Set major range of the compaction to the entire compaction range.
514      * See {@link #setMajorRange(byte[], byte[])}. */
515     public void setMajorRangeFull() {
516       setMajorRange(this.startRow, this.endRow);
517     }
518   }
519 
520   /** The information about stripes that the policy needs to do its stuff */
521   public static interface StripeInformationProvider {
522     public Collection<StoreFile> getStorefiles();
523 
524     /**
525      * Gets the start row for a given stripe.
526      * @param stripeIndex Stripe index.
527      * @return Start row. May be an open key.
528      */
529     public byte[] getStartRow(int stripeIndex);
530 
531     /**
532      * Gets the end row for a given stripe.
533      * @param stripeIndex Stripe index.
534      * @return End row. May be an open key.
535      */
536     public byte[] getEndRow(int stripeIndex);
537 
538     /**
539      * @return Level 0 files.
540      */
541     public List<StoreFile> getLevel0Files();
542 
543     /**
544      * @return All stripe boundaries; including the open ones on both ends.
545      */
546     public List<byte[]> getStripeBoundaries();
547 
548     /**
549      * @return The stripes.
550      */
551     public ArrayList<ImmutableList<StoreFile>> getStripes();
552 
553     /**
554      * @return Stripe count.
555      */
556     public int getStripeCount();
557   }
558 }