001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.compactions;
019
020import java.io.IOException;
021import java.net.UnknownHostException;
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.Collections;
025import java.util.HashMap;
026import java.util.List;
027import java.util.Map;
028import java.util.OptionalLong;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.HBaseInterfaceAudience;
031import org.apache.hadoop.hbase.HDFSBlocksDistribution;
032import org.apache.hadoop.hbase.regionserver.HStoreFile;
033import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
034import org.apache.hadoop.hbase.regionserver.StoreUtils;
035import org.apache.hadoop.hbase.util.DNS;
036import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
037import org.apache.hadoop.hbase.util.Pair;
038import org.apache.hadoop.hbase.util.ReflectionUtils;
039import org.apache.yetus.audience.InterfaceAudience;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
044import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
045import org.apache.hbase.thirdparty.com.google.common.collect.PeekingIterator;
046import org.apache.hbase.thirdparty.com.google.common.math.LongMath;
047
048/**
049 * HBASE-15181 This is a simple implementation of date-based tiered compaction similar to
050 * Cassandra's for the following benefits:
051 * <ol>
052 * <li>Improve date-range-based scan by structuring store files in date-based tiered layout.</li>
053 * <li>Reduce compaction overhead.</li>
054 * <li>Improve TTL efficiency.</li>
055 * </ol>
056 * Perfect fit for the use cases that:
057 * <ol>
058 * <li>has mostly date-based data write and scan and a focus on the most recent data.</li>
059 * </ol>
060 * Out-of-order writes are handled gracefully. Time range overlapping among store files is tolerated
061 * and the performance impact is minimized. Configuration can be set at hbase-site or overridden at
062 * per-table or per-column-family level by hbase shell. Design spec is at
063 * https://docs.google.com/document/d/1_AmlNb2N8Us1xICsTeGDLKIqL6T-oHoRLZ323MG_uy8/
064 */
065@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
066public class DateTieredCompactionPolicy extends SortedCompactionPolicy {
067
068  private static final Logger LOG = LoggerFactory.getLogger(DateTieredCompactionPolicy.class);
069
070  protected final RatioBasedCompactionPolicy compactionPolicyPerWindow;
071
072  private final CompactionWindowFactory windowFactory;
073
074  public DateTieredCompactionPolicy(Configuration conf, StoreConfigInformation storeConfigInfo)
075    throws IOException {
076    super(conf, storeConfigInfo);
077    try {
078      compactionPolicyPerWindow =
079        ReflectionUtils.instantiateWithCustomCtor(comConf.getCompactionPolicyForDateTieredWindow(),
080          new Class[] { Configuration.class, StoreConfigInformation.class },
081          new Object[] { conf, storeConfigInfo });
082    } catch (Exception e) {
083      throw new IOException("Unable to load configured compaction policy '"
084        + comConf.getCompactionPolicyForDateTieredWindow() + "'", e);
085    }
086    try {
087      windowFactory =
088        ReflectionUtils.instantiateWithCustomCtor(comConf.getDateTieredCompactionWindowFactory(),
089          new Class[] { CompactionConfiguration.class }, new Object[] { comConf });
090    } catch (Exception e) {
091      throw new IOException("Unable to load configured window factory '"
092        + comConf.getDateTieredCompactionWindowFactory() + "'", e);
093    }
094  }
095
096  /**
097   * Heuristics for guessing whether we need minor compaction.
098   */
099  @Override
100  @InterfaceAudience.Private
101  public boolean needsCompaction(Collection<HStoreFile> storeFiles,
102    List<HStoreFile> filesCompacting) {
103    ArrayList<HStoreFile> candidates = new ArrayList<>(storeFiles);
104    try {
105      return !selectMinorCompaction(candidates, false, true).getFiles().isEmpty();
106    } catch (Exception e) {
107      LOG.error("Can not check for compaction: ", e);
108      return false;
109    }
110  }
111
112  protected boolean isMajorCompactionTime(Collection<HStoreFile> filesToCompact, long now,
113    long lowestModificationTime) throws IOException {
114    long mcTime = getNextMajorCompactTime(filesToCompact);
115    if (filesToCompact == null || mcTime == 0) {
116      if (LOG.isDebugEnabled()) {
117        LOG.debug("filesToCompact: " + filesToCompact + " mcTime: " + mcTime);
118      }
119      return false;
120    }
121    // TODO: Use better method for determining stamp of last major (HBASE-2990)
122    if (lowestModificationTime <= 0L || lowestModificationTime >= (now - mcTime)) {
123      if (LOG.isDebugEnabled()) {
124        LOG.debug("lowTimestamp: " + lowestModificationTime + " lowTimestamp: "
125          + lowestModificationTime + " now: " + now + " mcTime: " + mcTime);
126      }
127      return false;
128    }
129    return true;
130  }
131
132  protected boolean checkForTtl(long ttl, HStoreFile file) {
133    OptionalLong minTimestamp = file.getMinimumTimestamp();
134    long oldest = minTimestamp.isPresent()
135      ? EnvironmentEdgeManager.currentTime() - minTimestamp.getAsLong()
136      : Long.MIN_VALUE;
137    if (ttl != Long.MAX_VALUE && oldest >= ttl) {
138      LOG.debug("Major compaction triggered on store " + this + "; for TTL maintenance");
139      return true;
140    }
141    return false;
142  }
143
144  protected boolean isMajorOrBulkloadResult(HStoreFile file, long timeDiff) {
145    if (!file.isMajorCompactionResult() || file.isBulkLoadResult()) {
146      LOG.debug("Major compaction triggered on store " + this
147        + ", because there are new files and time since last major compaction " + timeDiff + "ms");
148      return true;
149    }
150    return false;
151  }
152
153  protected boolean checkBlockLocality(HDFSBlocksDistribution hdfsBlocksDistribution)
154    throws UnknownHostException {
155    float blockLocalityIndex = hdfsBlocksDistribution
156      .getBlockLocalityIndex(DNS.getHostname(comConf.conf, DNS.ServerType.REGIONSERVER));
157    if (blockLocalityIndex < comConf.getMinLocalityToForceCompact()) {
158      LOG.debug("Major compaction triggered on store " + this
159        + "; to make hdfs blocks local, current blockLocalityIndex is " + blockLocalityIndex
160        + " (min " + comConf.getMinLocalityToForceCompact() + ")");
161      return true;
162    }
163    return false;
164  }
165
166  @Override
167  public boolean shouldPerformMajorCompaction(Collection<HStoreFile> filesToCompact)
168    throws IOException {
169    long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact);
170    long now = EnvironmentEdgeManager.currentTime();
171    if (isMajorCompactionTime(filesToCompact, now, lowTimestamp)) {
172      long cfTTL = this.storeConfigInfo.getStoreFileTtl();
173      HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution();
174      List<Long> boundaries = getCompactBoundariesForMajor(filesToCompact, now);
175      boolean[] filesInWindow = new boolean[boundaries.size()];
176      for (HStoreFile file : filesToCompact) {
177        OptionalLong minTimestamp = file.getMinimumTimestamp();
178        if (checkForTtl(cfTTL, file)) {
179          return true;
180        }
181        if (isMajorOrBulkloadResult(file, now - lowTimestamp)) {
182          return true;
183        }
184        int lowerWindowIndex =
185          Collections.binarySearch(boundaries, minTimestamp.orElse(Long.MAX_VALUE));
186        int upperWindowIndex =
187          Collections.binarySearch(boundaries, file.getMaximumTimestamp().orElse(Long.MAX_VALUE));
188        // Handle boundary conditions and negative values of binarySearch
189        lowerWindowIndex =
190          (lowerWindowIndex < 0) ? Math.abs(lowerWindowIndex + 2) : lowerWindowIndex;
191        upperWindowIndex =
192          (upperWindowIndex < 0) ? Math.abs(upperWindowIndex + 2) : upperWindowIndex;
193        if (lowerWindowIndex != upperWindowIndex) {
194          LOG.debug("Major compaction triggered on store " + this + "; because file "
195            + file.getPath() + " has data with timestamps cross window boundaries");
196          return true;
197        } else if (filesInWindow[upperWindowIndex]) {
198          LOG.debug("Major compaction triggered on store " + this
199            + "; because there are more than one file in some windows");
200          return true;
201        } else {
202          filesInWindow[upperWindowIndex] = true;
203        }
204        hdfsBlocksDistribution.add(file.getHDFSBlockDistribution());
205      }
206      if (checkBlockLocality(hdfsBlocksDistribution)) {
207        return true;
208      }
209      LOG.debug(
210        "Skipping major compaction of " + this + ", because the files are already major compacted");
211    }
212    return false;
213  }
214
215  @Override
216  protected CompactionRequestImpl createCompactionRequest(ArrayList<HStoreFile> candidateSelection,
217    boolean tryingMajor, boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {
218    CompactionRequestImpl result = tryingMajor
219      ? selectMajorCompaction(candidateSelection)
220      : selectMinorCompaction(candidateSelection, mayUseOffPeak, mayBeStuck);
221    if (LOG.isDebugEnabled()) {
222      LOG.debug("Generated compaction request: " + result);
223    }
224    return result;
225  }
226
227  public CompactionRequestImpl selectMajorCompaction(ArrayList<HStoreFile> candidateSelection) {
228    long now = EnvironmentEdgeManager.currentTime();
229    List<Long> boundaries = getCompactBoundariesForMajor(candidateSelection, now);
230    Map<Long, String> boundariesPolicies = getBoundariesStoragePolicyForMajor(boundaries, now);
231    return new DateTieredCompactionRequest(candidateSelection, boundaries, boundariesPolicies);
232  }
233
234  /**
235   * We receive store files sorted in ascending order by seqId then scan the list of files. If the
236   * current file has a maxTimestamp older than last known maximum, treat this file as it carries
237   * the last known maximum. This way both seqId and timestamp are in the same order. If files carry
238   * the same maxTimestamps, they are ordered by seqId. We then reverse the list so they are ordered
239   * by seqId and maxTimestamp in descending order and build the time windows. All the out-of-order
240   * data into the same compaction windows, guaranteeing contiguous compaction based on sequence id.
241   */
242  public CompactionRequestImpl selectMinorCompaction(ArrayList<HStoreFile> candidateSelection,
243    boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {
244    long now = EnvironmentEdgeManager.currentTime();
245    long oldestToCompact = getOldestToCompact(comConf.getDateTieredMaxStoreFileAgeMillis(), now);
246
247    List<Pair<HStoreFile, Long>> storefileMaxTimestampPairs =
248      Lists.newArrayListWithCapacity(candidateSelection.size());
249    long maxTimestampSeen = Long.MIN_VALUE;
250    for (HStoreFile storeFile : candidateSelection) {
251      // if there is out-of-order data,
252      // we put them in the same window as the last file in increasing order
253      maxTimestampSeen =
254        Math.max(maxTimestampSeen, storeFile.getMaximumTimestamp().orElse(Long.MIN_VALUE));
255      storefileMaxTimestampPairs.add(new Pair<>(storeFile, maxTimestampSeen));
256    }
257    Collections.reverse(storefileMaxTimestampPairs);
258
259    CompactionWindow window = getIncomingWindow(now);
260    int minThreshold = comConf.getDateTieredIncomingWindowMin();
261    PeekingIterator<Pair<HStoreFile, Long>> it =
262      Iterators.peekingIterator(storefileMaxTimestampPairs.iterator());
263    while (it.hasNext()) {
264      if (window.compareToTimestamp(oldestToCompact) < 0) {
265        break;
266      }
267      int compResult = window.compareToTimestamp(it.peek().getSecond());
268      if (compResult > 0) {
269        // If the file is too old for the window, switch to the next window
270        window = window.nextEarlierWindow();
271        minThreshold = comConf.getMinFilesToCompact();
272      } else {
273        // The file is within the target window
274        ArrayList<HStoreFile> fileList = Lists.newArrayList();
275        // Add all files in the same window. For incoming window
276        // we tolerate files with future data although it is sub-optimal
277        while (it.hasNext() && window.compareToTimestamp(it.peek().getSecond()) <= 0) {
278          fileList.add(it.next().getFirst());
279        }
280        if (fileList.size() >= minThreshold) {
281          if (LOG.isDebugEnabled()) {
282            LOG.debug("Processing files: " + fileList + " for window: " + window);
283          }
284          DateTieredCompactionRequest request = generateCompactionRequest(fileList, window,
285            mayUseOffPeak, mayBeStuck, minThreshold, now);
286          if (request != null) {
287            return request;
288          }
289        }
290      }
291    }
292    // A non-null file list is expected by HStore
293    return new CompactionRequestImpl(Collections.emptyList());
294  }
295
296  private DateTieredCompactionRequest generateCompactionRequest(ArrayList<HStoreFile> storeFiles,
297    CompactionWindow window, boolean mayUseOffPeak, boolean mayBeStuck, int minThreshold, long now)
298    throws IOException {
299    // The files has to be in ascending order for ratio-based compaction to work right
300    // and removeExcessFile to exclude youngest files.
301    Collections.reverse(storeFiles);
302
303    // Compact everything in the window if have more files than comConf.maxBlockingFiles
304    compactionPolicyPerWindow.setMinThreshold(minThreshold);
305    ArrayList<HStoreFile> storeFileSelection = mayBeStuck
306      ? storeFiles
307      : compactionPolicyPerWindow.applyCompactionPolicy(storeFiles, mayUseOffPeak, false);
308    if (storeFileSelection != null && !storeFileSelection.isEmpty()) {
309      // If there is any file in the window excluded from compaction,
310      // only one file will be output from compaction.
311      boolean singleOutput = storeFiles.size() != storeFileSelection.size()
312        || comConf.useDateTieredSingleOutputForMinorCompaction();
313      List<Long> boundaries = getCompactionBoundariesForMinor(window, singleOutput);
314      // we want to generate policy to boundaries for minor compaction
315      Map<Long, String> boundaryPolicyMap =
316        getBoundariesStoragePolicyForMinor(singleOutput, window, now);
317      DateTieredCompactionRequest result =
318        new DateTieredCompactionRequest(storeFileSelection, boundaries, boundaryPolicyMap);
319      return result;
320    }
321    return null;
322  }
323
324  /**
325   * Return a list of boundaries for multiple compaction output in ascending order.
326   */
327  protected List<Long> getCompactBoundariesForMajor(Collection<HStoreFile> filesToCompact,
328    long now) {
329    long minTimestamp = filesToCompact.stream()
330      .mapToLong(f -> f.getMinimumTimestamp().orElse(Long.MAX_VALUE)).min().orElse(Long.MAX_VALUE);
331
332    List<Long> boundaries = new ArrayList<>();
333
334    // Add startMillis of all windows between now and min timestamp
335    for (CompactionWindow window = getIncomingWindow(now); window.compareToTimestamp(minTimestamp)
336        > 0; window = window.nextEarlierWindow()) {
337      boundaries.add(window.startMillis());
338    }
339    boundaries.add(Long.MIN_VALUE);
340    Collections.reverse(boundaries);
341    return boundaries;
342  }
343
344  /**
345   * Returns a list of boundaries for multiple compaction output from minTimestamp to maxTimestamp.
346   */
347  private static List<Long> getCompactionBoundariesForMinor(CompactionWindow window,
348    boolean singleOutput) {
349    List<Long> boundaries = new ArrayList<>();
350    boundaries.add(Long.MIN_VALUE);
351    if (!singleOutput) {
352      boundaries.add(window.startMillis());
353    }
354    return boundaries;
355  }
356
357  private CompactionWindow getIncomingWindow(long now) {
358    return windowFactory.newIncomingWindow(now);
359  }
360
361  private static long getOldestToCompact(long maxAgeMillis, long now) {
362    try {
363      return LongMath.checkedSubtract(now, maxAgeMillis);
364    } catch (ArithmeticException ae) {
365      LOG.warn("Value for " + CompactionConfiguration.DATE_TIERED_MAX_AGE_MILLIS_KEY + ": "
366        + maxAgeMillis + ". All the files will be eligible for minor compaction.");
367      return Long.MIN_VALUE;
368    }
369  }
370
371  private Map<Long, String> getBoundariesStoragePolicyForMinor(boolean singleOutput,
372    CompactionWindow window, long now) {
373    Map<Long, String> boundariesPolicy = new HashMap<>();
374    if (!comConf.isDateTieredStoragePolicyEnable()) {
375      return boundariesPolicy;
376    }
377    String windowStoragePolicy = getWindowStoragePolicy(now, window.startMillis());
378    if (singleOutput) {
379      boundariesPolicy.put(Long.MIN_VALUE, windowStoragePolicy);
380    } else {
381      boundariesPolicy.put(window.startMillis(), windowStoragePolicy);
382    }
383    return boundariesPolicy;
384  }
385
386  private Map<Long, String> getBoundariesStoragePolicyForMajor(List<Long> boundaries, long now) {
387    Map<Long, String> boundariesPolicy = new HashMap<>();
388    if (!comConf.isDateTieredStoragePolicyEnable()) {
389      return boundariesPolicy;
390    }
391    for (Long startTs : boundaries) {
392      boundariesPolicy.put(startTs, getWindowStoragePolicy(now, startTs));
393    }
394    return boundariesPolicy;
395  }
396
397  private String getWindowStoragePolicy(long now, long windowStartMillis) {
398    if (windowStartMillis >= (now - comConf.getHotWindowAgeMillis())) {
399      return comConf.getHotWindowStoragePolicy();
400    } else if (windowStartMillis >= (now - comConf.getWarmWindowAgeMillis())) {
401      return comConf.getWarmWindowStoragePolicy();
402    }
403    return comConf.getColdWindowStoragePolicy();
404  }
405}