001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.regionserver.compactions;
020
021import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY;
022
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.Collection;
026import java.util.List;
027
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.CellComparator;
031import org.apache.hadoop.hbase.regionserver.HStoreFile;
032import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
033import org.apache.hadoop.hbase.regionserver.StoreUtils;
034import org.apache.hadoop.hbase.regionserver.StripeStoreConfig;
035import org.apache.hadoop.hbase.regionserver.StripeStoreFlusher;
036import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
037import org.apache.hadoop.hbase.security.User;
038import org.apache.hadoop.hbase.util.Bytes;
039import org.apache.hadoop.hbase.util.ConcatenatedLists;
040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
041import org.apache.hadoop.hbase.util.Pair;
042import org.apache.yetus.audience.InterfaceAudience;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
046
047/**
048 * Stripe store implementation of compaction policy.
049 */
050@InterfaceAudience.Private
051public class StripeCompactionPolicy extends CompactionPolicy {
052  private final static Logger LOG = LoggerFactory.getLogger(StripeCompactionPolicy.class);
053  // Policy used to compact individual stripes.
054  private ExploringCompactionPolicy stripePolicy = null;
055
056  private StripeStoreConfig config;
057
058  public StripeCompactionPolicy(
059      Configuration conf, StoreConfigInformation storeConfigInfo, StripeStoreConfig config) {
060    super(conf, storeConfigInfo);
061    this.config = config;
062    stripePolicy = new ExploringCompactionPolicy(conf, storeConfigInfo);
063  }
064
065  public List<HStoreFile> preSelectFilesForCoprocessor(StripeInformationProvider si,
066      List<HStoreFile> filesCompacting) {
067    // We sincerely hope nobody is messing with us with their coprocessors.
068    // If they do, they are very likely to shoot themselves in the foot.
069    // We'll just exclude all the filesCompacting from the list.
070    ArrayList<HStoreFile> candidateFiles = new ArrayList<>(si.getStorefiles());
071    candidateFiles.removeAll(filesCompacting);
072    return candidateFiles;
073  }
074
075  public StripeCompactionRequest createEmptyRequest(
076      StripeInformationProvider si, CompactionRequestImpl request) {
077    // Treat as L0-ish compaction with fixed set of files, and hope for the best.
078    if (si.getStripeCount() > 0) {
079      return new BoundaryStripeCompactionRequest(request, si.getStripeBoundaries());
080    }
081    Pair<Long, Integer> targetKvsAndCount = estimateTargetKvs(
082        request.getFiles(), this.config.getInitialCount());
083    return new SplitStripeCompactionRequest(
084        request, OPEN_KEY, OPEN_KEY, targetKvsAndCount.getSecond(), targetKvsAndCount.getFirst());
085  }
086
087  public StripeStoreFlusher.StripeFlushRequest selectFlush(CellComparator comparator,
088      StripeInformationProvider si, int kvCount) {
089    if (this.config.isUsingL0Flush()) {
090      // L0 is used, return dumb request.
091      return new StripeStoreFlusher.StripeFlushRequest(comparator);
092    }
093    if (si.getStripeCount() == 0) {
094      // No stripes - start with the requisite count, derive KVs per stripe.
095      int initialCount = this.config.getInitialCount();
096      return new StripeStoreFlusher.SizeStripeFlushRequest(comparator, initialCount,
097          kvCount / initialCount);
098    }
099    // There are stripes - do according to the boundaries.
100    return new StripeStoreFlusher.BoundaryStripeFlushRequest(comparator, si.getStripeBoundaries());
101  }
102
103  public StripeCompactionRequest selectCompaction(StripeInformationProvider si,
104      List<HStoreFile> filesCompacting, boolean isOffpeak) throws IOException {
105    // TODO: first cut - no parallel compactions. To have more fine grained control we
106    //       probably need structure more sophisticated than a list.
107    if (!filesCompacting.isEmpty()) {
108      LOG.debug("Not selecting compaction: " + filesCompacting.size() + " files compacting");
109      return null;
110    }
111
112    // We are going to do variations of compaction in strict order of preference.
113    // A better/more advanced approach is to use a heuristic to see which one is "more
114    // necessary" at current time.
115
116    // This can happen due to region split. We can skip it later; for now preserve
117    // compact-all-things behavior.
118    Collection<HStoreFile> allFiles = si.getStorefiles();
119    if (StoreUtils.hasReferences(allFiles)) {
120      LOG.debug("There are references in the store; compacting all files");
121      long targetKvs = estimateTargetKvs(allFiles, config.getInitialCount()).getFirst();
122      SplitStripeCompactionRequest request = new SplitStripeCompactionRequest(
123          allFiles, OPEN_KEY, OPEN_KEY, targetKvs);
124      request.setMajorRangeFull();
125      request.getRequest().setAfterSplit(true);
126      return request;
127    }
128
129    int stripeCount = si.getStripeCount();
130    List<HStoreFile> l0Files = si.getLevel0Files();
131
132    // See if we need to make new stripes.
133    boolean shouldCompactL0 = (this.config.getLevel0MinFiles() <= l0Files.size());
134    if (stripeCount == 0) {
135      if (!shouldCompactL0) return null; // nothing to do.
136      return selectNewStripesCompaction(si);
137    }
138
139    boolean canDropDeletesNoL0 = l0Files.isEmpty();
140    if (shouldCompactL0) {
141      if (!canDropDeletesNoL0) {
142        // If we need to compact L0, see if we can add something to it, and drop deletes.
143        StripeCompactionRequest result = selectSingleStripeCompaction(
144            si, true, canDropDeletesNoL0, isOffpeak);
145        if (result != null) return result;
146      }
147      LOG.debug("Selecting L0 compaction with " + l0Files.size() + " files");
148      return new BoundaryStripeCompactionRequest(l0Files, si.getStripeBoundaries());
149    }
150
151    // Try to delete fully expired stripes
152    StripeCompactionRequest result = selectExpiredMergeCompaction(si, canDropDeletesNoL0);
153    if (result != null) return result;
154
155    // Ok, nothing special here, let's see if we need to do a common compaction.
156    // This will also split the stripes that are too big if needed.
157    return selectSingleStripeCompaction(si, false, canDropDeletesNoL0, isOffpeak);
158  }
159
160  public boolean needsCompactions(StripeInformationProvider si, List<HStoreFile> filesCompacting) {
161    // Approximation on whether we need compaction.
162    return filesCompacting.isEmpty()
163        && (StoreUtils.hasReferences(si.getStorefiles())
164          || (si.getLevel0Files().size() >= this.config.getLevel0MinFiles())
165          || needsSingleStripeCompaction(si));
166  }
167
168  @Override
169  public boolean shouldPerformMajorCompaction(Collection<HStoreFile> filesToCompact)
170    throws IOException {
171    return false; // there's never a major compaction!
172  }
173
174  @Override
175  public boolean throttleCompaction(long compactionSize) {
176    return compactionSize > comConf.getThrottlePoint();
177  }
178
179  /**
180   * @param si StoreFileManager.
181   * @return Whether any stripe potentially needs compaction.
182   */
183  protected boolean needsSingleStripeCompaction(StripeInformationProvider si) {
184    int minFiles = this.config.getStripeCompactMinFiles();
185    for (List<HStoreFile> stripe : si.getStripes()) {
186      if (stripe.size() >= minFiles) return true;
187    }
188    return false;
189  }
190
191  protected StripeCompactionRequest selectSingleStripeCompaction(StripeInformationProvider si,
192      boolean includeL0, boolean canDropDeletesWithoutL0, boolean isOffpeak) throws IOException {
193    ArrayList<ImmutableList<HStoreFile>> stripes = si.getStripes();
194
195    int bqIndex = -1;
196    List<HStoreFile> bqSelection = null;
197    int stripeCount = stripes.size();
198    long bqTotalSize = -1;
199    for (int i = 0; i < stripeCount; ++i) {
200      // If we want to compact L0 to drop deletes, we only want whole-stripe compactions.
201      // So, pass includeL0 as 2nd parameter to indicate that.
202      List<HStoreFile> selection = selectSimpleCompaction(stripes.get(i),
203          !canDropDeletesWithoutL0 && includeL0, isOffpeak);
204      if (selection.isEmpty()) continue;
205      long size = 0;
206      for (HStoreFile sf : selection) {
207        size += sf.getReader().length();
208      }
209      if (bqSelection == null || selection.size() > bqSelection.size() ||
210          (selection.size() == bqSelection.size() && size < bqTotalSize)) {
211        bqSelection = selection;
212        bqIndex = i;
213        bqTotalSize = size;
214      }
215    }
216    if (bqSelection == null) {
217      LOG.debug("No good compaction is possible in any stripe");
218      return null;
219    }
220    List<HStoreFile> filesToCompact = new ArrayList<>(bqSelection);
221    // See if we can, and need to, split this stripe.
222    int targetCount = 1;
223    long targetKvs = Long.MAX_VALUE;
224    boolean hasAllFiles = filesToCompact.size() == stripes.get(bqIndex).size();
225    String splitString = "";
226    if (hasAllFiles && bqTotalSize >= config.getSplitSize()) {
227      if (includeL0) {
228        // We want to avoid the scenario where we compact a stripe w/L0 and then split it.
229        // So, if we might split, don't compact the stripe with L0.
230        return null;
231      }
232      Pair<Long, Integer> kvsAndCount = estimateTargetKvs(filesToCompact, config.getSplitCount());
233      targetKvs = kvsAndCount.getFirst();
234      targetCount = kvsAndCount.getSecond();
235      splitString = "; the stripe will be split into at most "
236          + targetCount + " stripes with " + targetKvs + " target KVs";
237    }
238
239    LOG.debug("Found compaction in a stripe with end key ["
240        + Bytes.toString(si.getEndRow(bqIndex)) + "], with "
241        + filesToCompact.size() + " files of total size " + bqTotalSize + splitString);
242
243    // See if we can drop deletes.
244    StripeCompactionRequest req;
245    if (includeL0) {
246      assert hasAllFiles;
247      List<HStoreFile> l0Files = si.getLevel0Files();
248      LOG.debug("Adding " + l0Files.size() + " files to compaction to be able to drop deletes");
249      ConcatenatedLists<HStoreFile> sfs = new ConcatenatedLists<>();
250      sfs.addSublist(filesToCompact);
251      sfs.addSublist(l0Files);
252      req = new BoundaryStripeCompactionRequest(sfs, si.getStripeBoundaries());
253    } else {
254      req = new SplitStripeCompactionRequest(
255          filesToCompact, si.getStartRow(bqIndex), si.getEndRow(bqIndex), targetCount, targetKvs);
256    }
257    if (hasAllFiles && (canDropDeletesWithoutL0 || includeL0)) {
258      req.setMajorRange(si.getStartRow(bqIndex), si.getEndRow(bqIndex));
259    }
260    req.getRequest().setOffPeak(isOffpeak);
261    return req;
262  }
263
264  /**
265   * Selects the compaction of a single stripe using default policy.
266   * @param sfs Files.
267   * @param allFilesOnly Whether a compaction of all-or-none files is needed.
268   * @return The resulting selection.
269   */
270  private List<HStoreFile> selectSimpleCompaction(
271      List<HStoreFile> sfs, boolean allFilesOnly, boolean isOffpeak) {
272    int minFilesLocal = Math.max(
273        allFilesOnly ? sfs.size() : 0, this.config.getStripeCompactMinFiles());
274    int maxFilesLocal = Math.max(this.config.getStripeCompactMaxFiles(), minFilesLocal);
275    return stripePolicy.applyCompactionPolicy(sfs, false, isOffpeak, minFilesLocal, maxFilesLocal);
276  }
277
278  private StripeCompactionRequest selectNewStripesCompaction(StripeInformationProvider si) {
279    List<HStoreFile> l0Files = si.getLevel0Files();
280    Pair<Long, Integer> kvsAndCount = estimateTargetKvs(l0Files, config.getInitialCount());
281    LOG.debug("Creating " + kvsAndCount.getSecond() + " initial stripes with "
282        + kvsAndCount.getFirst() + " kvs each via L0 compaction of " + l0Files.size() + " files");
283    SplitStripeCompactionRequest request = new SplitStripeCompactionRequest(
284        si.getLevel0Files(), OPEN_KEY, OPEN_KEY, kvsAndCount.getSecond(), kvsAndCount.getFirst());
285    request.setMajorRangeFull(); // L0 only, can drop deletes.
286    return request;
287  }
288
289  private StripeCompactionRequest selectExpiredMergeCompaction(
290      StripeInformationProvider si, boolean canDropDeletesNoL0) {
291    long cfTtl = this.storeConfigInfo.getStoreFileTtl();
292    if (cfTtl == Long.MAX_VALUE) {
293      return null; // minversion might be set, cannot delete old files
294    }
295    long timestampCutoff = EnvironmentEdgeManager.currentTime() - cfTtl;
296    // Merge the longest sequence of stripes where all files have expired, if any.
297    int start = -1, bestStart = -1, length = 0, bestLength = 0;
298    ArrayList<ImmutableList<HStoreFile>> stripes = si.getStripes();
299    OUTER: for (int i = 0; i < stripes.size(); ++i) {
300      for (HStoreFile storeFile : stripes.get(i)) {
301        if (storeFile.getReader().getMaxTimestamp() < timestampCutoff) continue;
302        // Found non-expired file, this stripe has to stay.
303        if (length > bestLength) {
304          bestStart = start;
305          bestLength = length;
306        }
307        start = -1;
308        length = 0;
309        continue OUTER;
310      }
311      if (start == -1) {
312        start = i;
313      }
314      ++length;
315    }
316    if (length > bestLength) {
317      bestStart = start;
318      bestLength = length;
319    }
320    if (bestLength == 0) return null;
321    if (bestLength == 1) {
322      // This is currently inefficient. If only one stripe expired, we will rewrite some
323      // entire stripe just to delete some expired files because we rely on metadata and it
324      // cannot simply be updated in an old file. When we either determine stripe dynamically
325      // or move metadata to manifest, we can just drop the "expired stripes".
326      if (bestStart == (stripes.size() - 1)) return null;
327      ++bestLength;
328    }
329    LOG.debug("Merging " + bestLength + " stripes to delete expired store files");
330    int endIndex = bestStart + bestLength - 1;
331    ConcatenatedLists<HStoreFile> sfs = new ConcatenatedLists<>();
332    sfs.addAllSublists(stripes.subList(bestStart, endIndex + 1));
333    SplitStripeCompactionRequest result = new SplitStripeCompactionRequest(sfs,
334        si.getStartRow(bestStart), si.getEndRow(endIndex), 1, Long.MAX_VALUE);
335    if (canDropDeletesNoL0) {
336      result.setMajorRangeFull();
337    }
338    return result;
339  }
340
341  private static long getTotalKvCount(final Collection<HStoreFile> candidates) {
342    long totalSize = 0;
343    for (HStoreFile storeFile : candidates) {
344      totalSize += storeFile.getReader().getEntries();
345    }
346    return totalSize;
347  }
348
349  public static long getTotalFileSize(final Collection<HStoreFile> candidates) {
350    long totalSize = 0;
351    for (HStoreFile storeFile : candidates) {
352      totalSize += storeFile.getReader().length();
353    }
354    return totalSize;
355  }
356
357  private Pair<Long, Integer> estimateTargetKvs(Collection<HStoreFile> files, double splitCount) {
358    // If the size is larger than what we target, we don't want to split into proportionally
359    // larger parts and then have to split again very soon. So, we will increase the multiplier
360    // by one until we get small enough parts. E.g. 5Gb stripe that should have been split into
361    // 2 parts when it was 3Gb will be split into 3x1.67Gb parts, rather than 2x2.5Gb parts.
362    long totalSize = getTotalFileSize(files);
363    long targetPartSize = config.getSplitPartSize();
364    assert targetPartSize > 0 && splitCount > 0;
365    double ratio = totalSize / (splitCount * targetPartSize); // ratio of real to desired size
366    while (ratio > 1.0) {
367      // Ratio of real to desired size if we increase the multiplier.
368      double newRatio = totalSize / ((splitCount + 1.0) * targetPartSize);
369      if ((1.0 / newRatio) >= ratio) break; // New ratio is < 1.0, but further than the last one.
370      ratio = newRatio;
371      splitCount += 1.0;
372    }
373    long kvCount = (long)(getTotalKvCount(files) / splitCount);
374    return new Pair<>(kvCount, (int)Math.ceil(splitCount));
375  }
376
377  /** Stripe compaction request wrapper. */
378  public abstract static class StripeCompactionRequest {
379    protected CompactionRequestImpl request;
380    protected byte[] majorRangeFromRow = null, majorRangeToRow = null;
381
382    public List<Path> execute(StripeCompactor compactor,
383      ThroughputController throughputController) throws IOException {
384      return execute(compactor, throughputController, null);
385    }
386    /**
387     * Executes the request against compactor (essentially, just calls correct overload of
388     * compact method), to simulate more dynamic dispatch.
389     * @param compactor Compactor.
390     * @return result of compact(...)
391     */
392    public abstract List<Path> execute(StripeCompactor compactor,
393        ThroughputController throughputController, User user) throws IOException;
394
395    public StripeCompactionRequest(CompactionRequestImpl request) {
396      this.request = request;
397    }
398
399    /**
400     * Sets compaction "major range". Major range is the key range for which all
401     * the files are included, so they can be treated like major-compacted files.
402     * @param startRow Left boundary, inclusive.
403     * @param endRow Right boundary, exclusive.
404     */
405    public void setMajorRange(byte[] startRow, byte[] endRow) {
406      this.majorRangeFromRow = startRow;
407      this.majorRangeToRow = endRow;
408    }
409
410    public CompactionRequestImpl getRequest() {
411      return this.request;
412    }
413
414    public void setRequest(CompactionRequestImpl request) {
415      assert request != null;
416      this.request = request;
417      this.majorRangeFromRow = this.majorRangeToRow = null;
418    }
419  }
420
421  /**
422   * Request for stripe compactor that will cause it to split the source files into several
423   * separate files at the provided boundaries.
424   */
425  private static class BoundaryStripeCompactionRequest extends StripeCompactionRequest {
426    private final List<byte[]> targetBoundaries;
427
428    /**
429     * @param request Original request.
430     * @param targetBoundaries New files should be written with these boundaries.
431     */
432    public BoundaryStripeCompactionRequest(CompactionRequestImpl request,
433        List<byte[]> targetBoundaries) {
434      super(request);
435      this.targetBoundaries = targetBoundaries;
436    }
437
438    public BoundaryStripeCompactionRequest(Collection<HStoreFile> files,
439        List<byte[]> targetBoundaries) {
440      this(new CompactionRequestImpl(files), targetBoundaries);
441    }
442
443    @Override
444    public List<Path> execute(StripeCompactor compactor,
445        ThroughputController throughputController, User user) throws IOException {
446      return compactor.compact(this.request, this.targetBoundaries, this.majorRangeFromRow,
447        this.majorRangeToRow, throughputController, user);
448    }
449  }
450
451  /**
452   * Request for stripe compactor that will cause it to split the source files into several
453   * separate files into based on key-value count, as well as file count limit.
454   * Most of the files will be roughly the same size. The last file may be smaller or larger
455   * depending on the interplay of the amount of data and maximum number of files allowed.
456   */
457  private static class SplitStripeCompactionRequest extends StripeCompactionRequest {
458    private final byte[] startRow, endRow;
459    private final int targetCount;
460    private final long targetKvs;
461
462    /**
463     * @param request Original request.
464     * @param startRow Left boundary of the range to compact, inclusive.
465     * @param endRow Right boundary of the range to compact, exclusive.
466     * @param targetCount The maximum number of stripe to compact into.
467     * @param targetKvs The KV count of each segment. If targetKvs*targetCount is less than
468     *                  total number of kvs, all the overflow data goes into the last stripe.
469     */
470    public SplitStripeCompactionRequest(CompactionRequestImpl request,
471        byte[] startRow, byte[] endRow, int targetCount, long targetKvs) {
472      super(request);
473      this.startRow = startRow;
474      this.endRow = endRow;
475      this.targetCount = targetCount;
476      this.targetKvs = targetKvs;
477    }
478
479    public SplitStripeCompactionRequest(
480        Collection<HStoreFile> files, byte[] startRow, byte[] endRow, long targetKvs) {
481      this(files, startRow, endRow, Integer.MAX_VALUE, targetKvs);
482    }
483
484    public SplitStripeCompactionRequest(Collection<HStoreFile> files,
485        byte[] startRow, byte[] endRow, int targetCount, long targetKvs) {
486      this(new CompactionRequestImpl(files), startRow, endRow, targetCount, targetKvs);
487    }
488
489    @Override
490    public List<Path> execute(StripeCompactor compactor,
491        ThroughputController throughputController, User user) throws IOException {
492      return compactor.compact(this.request, this.targetCount, this.targetKvs, this.startRow,
493        this.endRow, this.majorRangeFromRow, this.majorRangeToRow, throughputController, user);
494    }
495
496    /** Set major range of the compaction to the entire compaction range.
497     * See {@link #setMajorRange(byte[], byte[])}. */
498    public void setMajorRangeFull() {
499      setMajorRange(this.startRow, this.endRow);
500    }
501  }
502
503  /** The information about stripes that the policy needs to do its stuff */
504  public static interface StripeInformationProvider {
505    public Collection<HStoreFile> getStorefiles();
506
507    /**
508     * Gets the start row for a given stripe.
509     * @param stripeIndex Stripe index.
510     * @return Start row. May be an open key.
511     */
512    public byte[] getStartRow(int stripeIndex);
513
514    /**
515     * Gets the end row for a given stripe.
516     * @param stripeIndex Stripe index.
517     * @return End row. May be an open key.
518     */
519    public byte[] getEndRow(int stripeIndex);
520
521    /**
522     * @return Level 0 files.
523     */
524    public List<HStoreFile> getLevel0Files();
525
526    /**
527     * @return All stripe boundaries; including the open ones on both ends.
528     */
529    public List<byte[]> getStripeBoundaries();
530
531    /**
532     * @return The stripes.
533     */
534    public ArrayList<ImmutableList<HStoreFile>> getStripes();
535
536    /**
537     * @return Stripe count.
538     */
539    public int getStripeCount();
540  }
541}