001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.regionserver.compactions;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.Collections;
025import java.util.HashMap;
026import java.util.List;
027import java.util.Map;
028import java.util.OptionalLong;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.HBaseInterfaceAudience;
031import org.apache.hadoop.hbase.HDFSBlocksDistribution;
032import org.apache.hadoop.hbase.regionserver.HStoreFile;
033import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
034import org.apache.hadoop.hbase.regionserver.StoreUtils;
035import org.apache.hadoop.hbase.util.DNS;
036import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
037import org.apache.hadoop.hbase.util.Pair;
038import org.apache.hadoop.hbase.util.ReflectionUtils;
039import org.apache.yetus.audience.InterfaceAudience;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
044import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
045import org.apache.hbase.thirdparty.com.google.common.collect.PeekingIterator;
046import org.apache.hbase.thirdparty.com.google.common.math.LongMath;
047
048/**
049 * HBASE-15181 This is a simple implementation of date-based tiered compaction similar to
050 * Cassandra's for the following benefits:
051 * <ol>
052 * <li>Improve date-range-based scan by structuring store files in date-based tiered layout.</li>
053 * <li>Reduce compaction overhead.</li>
054 * <li>Improve TTL efficiency.</li>
055 * </ol>
056 * Perfect fit for the use cases that:
057 * <ol>
058 * <li>has mostly date-based data write and scan and a focus on the most recent data.</li>
059 * </ol>
060 * Out-of-order writes are handled gracefully. Time range overlapping among store files is tolerated
061 * and the performance impact is minimized. Configuration can be set at hbase-site or overridden at
062 * per-table or per-column-family level by hbase shell. Design spec is at
063 * https://docs.google.com/document/d/1_AmlNb2N8Us1xICsTeGDLKIqL6T-oHoRLZ323MG_uy8/
064 */
065@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
066public class DateTieredCompactionPolicy extends SortedCompactionPolicy {
067
068  private static final Logger LOG = LoggerFactory.getLogger(DateTieredCompactionPolicy.class);
069
070  private final RatioBasedCompactionPolicy compactionPolicyPerWindow;
071
072  private final CompactionWindowFactory windowFactory;
073
074  public DateTieredCompactionPolicy(Configuration conf, StoreConfigInformation storeConfigInfo)
075      throws IOException {
076    super(conf, storeConfigInfo);
077    try {
078      compactionPolicyPerWindow = ReflectionUtils.instantiateWithCustomCtor(
079        comConf.getCompactionPolicyForDateTieredWindow(),
080        new Class[] { Configuration.class, StoreConfigInformation.class },
081        new Object[] { conf, storeConfigInfo });
082    } catch (Exception e) {
083      throw new IOException("Unable to load configured compaction policy '"
084          + comConf.getCompactionPolicyForDateTieredWindow() + "'", e);
085    }
086    try {
087      windowFactory = ReflectionUtils.instantiateWithCustomCtor(
088        comConf.getDateTieredCompactionWindowFactory(),
089        new Class[] { CompactionConfiguration.class }, new Object[] { comConf });
090    } catch (Exception e) {
091      throw new IOException("Unable to load configured window factory '"
092          + comConf.getDateTieredCompactionWindowFactory() + "'", e);
093    }
094  }
095
096  /**
097   * Heuristics for guessing whether we need minor compaction.
098   */
099  @InterfaceAudience.Private
100  @Override
101  public boolean needsCompaction(Collection<HStoreFile> storeFiles,
102      List<HStoreFile> filesCompacting) {
103    ArrayList<HStoreFile> candidates = new ArrayList<>(storeFiles);
104    try {
105      return !selectMinorCompaction(candidates, false, true).getFiles().isEmpty();
106    } catch (Exception e) {
107      LOG.error("Can not check for compaction: ", e);
108      return false;
109    }
110  }
111
112  @Override
113  public boolean shouldPerformMajorCompaction(Collection<HStoreFile> filesToCompact)
114      throws IOException {
115    long mcTime = getNextMajorCompactTime(filesToCompact);
116    if (filesToCompact == null || mcTime == 0) {
117      if (LOG.isDebugEnabled()) {
118        LOG.debug("filesToCompact: " + filesToCompact + " mcTime: " + mcTime);
119      }
120      return false;
121    }
122
123    // TODO: Use better method for determining stamp of last major (HBASE-2990)
124    long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact);
125    long now = EnvironmentEdgeManager.currentTime();
126    if (lowTimestamp <= 0L || lowTimestamp >= (now - mcTime)) {
127      if (LOG.isDebugEnabled()) {
128        LOG.debug("lowTimestamp: " + lowTimestamp + " lowTimestamp: " + lowTimestamp + " now: " +
129            now + " mcTime: " + mcTime); 
130      }
131      return false;
132    }
133
134    long cfTTL = this.storeConfigInfo.getStoreFileTtl();
135    HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution();
136    List<Long> boundaries = getCompactBoundariesForMajor(filesToCompact, now);
137    boolean[] filesInWindow = new boolean[boundaries.size()];
138
139    for (HStoreFile file: filesToCompact) {
140      OptionalLong minTimestamp = file.getMinimumTimestamp();
141      long oldest = minTimestamp.isPresent() ? now - minTimestamp.getAsLong() : Long.MIN_VALUE;
142      if (cfTTL != Long.MAX_VALUE && oldest >= cfTTL) {
143        LOG.debug("Major compaction triggered on store " + this
144          + "; for TTL maintenance");
145        return true;
146      }
147      if (!file.isMajorCompactionResult() || file.isBulkLoadResult()) {
148        LOG.debug("Major compaction triggered on store " + this
149          + ", because there are new files and time since last major compaction "
150          + (now - lowTimestamp) + "ms");
151        return true;
152      }
153
154      int lowerWindowIndex =
155          Collections.binarySearch(boundaries, minTimestamp.orElse(Long.MAX_VALUE));
156      int upperWindowIndex =
157          Collections.binarySearch(boundaries, file.getMaximumTimestamp().orElse(Long.MAX_VALUE));
158      // Handle boundary conditions and negative values of binarySearch
159      lowerWindowIndex = (lowerWindowIndex < 0) ? Math.abs(lowerWindowIndex + 2) : lowerWindowIndex;
160      upperWindowIndex = (upperWindowIndex < 0) ? Math.abs(upperWindowIndex + 2) : upperWindowIndex;
161      if (lowerWindowIndex != upperWindowIndex) {
162        LOG.debug("Major compaction triggered on store " + this + "; because file "
163          + file.getPath() + " has data with timestamps cross window boundaries");
164        return true;
165      } else if (filesInWindow[upperWindowIndex]) {
166        LOG.debug("Major compaction triggered on store " + this +
167          "; because there are more than one file in some windows");
168        return true;
169      } else {
170        filesInWindow[upperWindowIndex] = true;
171      }
172      hdfsBlocksDistribution.add(file.getHDFSBlockDistribution());
173    }
174
175    float blockLocalityIndex = hdfsBlocksDistribution
176        .getBlockLocalityIndex(DNS.getHostname(comConf.conf, DNS.ServerType.REGIONSERVER));
177    if (blockLocalityIndex < comConf.getMinLocalityToForceCompact()) {
178      LOG.debug("Major compaction triggered on store " + this
179        + "; to make hdfs blocks local, current blockLocalityIndex is "
180        + blockLocalityIndex + " (min " + comConf.getMinLocalityToForceCompact() + ")");
181      return true;
182    }
183
184    LOG.debug("Skipping major compaction of " + this +
185      ", because the files are already major compacted");
186    return false;
187  }
188
189  @Override
190  protected CompactionRequestImpl createCompactionRequest(ArrayList<HStoreFile> candidateSelection,
191    boolean tryingMajor, boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {
192    CompactionRequestImpl result = tryingMajor ? selectMajorCompaction(candidateSelection)
193      : selectMinorCompaction(candidateSelection, mayUseOffPeak, mayBeStuck);
194    if (LOG.isDebugEnabled()) {
195      LOG.debug("Generated compaction request: " + result);
196    }
197    return result;
198  }
199
200  public CompactionRequestImpl selectMajorCompaction(ArrayList<HStoreFile> candidateSelection) {
201    long now = EnvironmentEdgeManager.currentTime();
202    List<Long> boundaries = getCompactBoundariesForMajor(candidateSelection, now);
203    Map<Long, String> boundariesPolicies = getBoundariesStoragePolicyForMajor(boundaries, now);
204    return new DateTieredCompactionRequest(candidateSelection,
205      boundaries, boundariesPolicies);
206  }
207
208  /**
209   * We receive store files sorted in ascending order by seqId then scan the list of files. If the
210   * current file has a maxTimestamp older than last known maximum, treat this file as it carries
211   * the last known maximum. This way both seqId and timestamp are in the same order. If files carry
212   * the same maxTimestamps, they are ordered by seqId. We then reverse the list so they are ordered
213   * by seqId and maxTimestamp in descending order and build the time windows. All the out-of-order
214   * data into the same compaction windows, guaranteeing contiguous compaction based on sequence id.
215   */
216  public CompactionRequestImpl selectMinorCompaction(ArrayList<HStoreFile> candidateSelection,
217      boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {
218    long now = EnvironmentEdgeManager.currentTime();
219    long oldestToCompact = getOldestToCompact(comConf.getDateTieredMaxStoreFileAgeMillis(), now);
220
221    List<Pair<HStoreFile, Long>> storefileMaxTimestampPairs =
222        Lists.newArrayListWithCapacity(candidateSelection.size());
223    long maxTimestampSeen = Long.MIN_VALUE;
224    for (HStoreFile storeFile : candidateSelection) {
225      // if there is out-of-order data,
226      // we put them in the same window as the last file in increasing order
227      maxTimestampSeen =
228          Math.max(maxTimestampSeen, storeFile.getMaximumTimestamp().orElse(Long.MIN_VALUE));
229      storefileMaxTimestampPairs.add(new Pair<>(storeFile, maxTimestampSeen));
230    }
231    Collections.reverse(storefileMaxTimestampPairs);
232
233    CompactionWindow window = getIncomingWindow(now);
234    int minThreshold = comConf.getDateTieredIncomingWindowMin();
235    PeekingIterator<Pair<HStoreFile, Long>> it =
236        Iterators.peekingIterator(storefileMaxTimestampPairs.iterator());
237    while (it.hasNext()) {
238      if (window.compareToTimestamp(oldestToCompact) < 0) {
239        break;
240      }
241      int compResult = window.compareToTimestamp(it.peek().getSecond());
242      if (compResult > 0) {
243        // If the file is too old for the window, switch to the next window
244        window = window.nextEarlierWindow();
245        minThreshold = comConf.getMinFilesToCompact();
246      } else {
247        // The file is within the target window
248        ArrayList<HStoreFile> fileList = Lists.newArrayList();
249        // Add all files in the same window. For incoming window
250        // we tolerate files with future data although it is sub-optimal
251        while (it.hasNext() && window.compareToTimestamp(it.peek().getSecond()) <= 0) {
252          fileList.add(it.next().getFirst());
253        }
254        if (fileList.size() >= minThreshold) {
255          if (LOG.isDebugEnabled()) {
256            LOG.debug("Processing files: " + fileList + " for window: " + window);
257          }
258          DateTieredCompactionRequest request = generateCompactionRequest(fileList, window,
259            mayUseOffPeak, mayBeStuck, minThreshold, now);
260          if (request != null) {
261            return request;
262          }
263        }
264      }
265    }
266    // A non-null file list is expected by HStore
267    return new CompactionRequestImpl(Collections.emptyList());
268  }
269
270  private DateTieredCompactionRequest generateCompactionRequest(ArrayList<HStoreFile> storeFiles,
271      CompactionWindow window, boolean mayUseOffPeak, boolean mayBeStuck, int minThreshold,
272      long now) throws IOException {
273    // The files has to be in ascending order for ratio-based compaction to work right
274    // and removeExcessFile to exclude youngest files.
275    Collections.reverse(storeFiles);
276
277    // Compact everything in the window if have more files than comConf.maxBlockingFiles
278    compactionPolicyPerWindow.setMinThreshold(minThreshold);
279    ArrayList<HStoreFile> storeFileSelection = mayBeStuck ? storeFiles
280      : compactionPolicyPerWindow.applyCompactionPolicy(storeFiles, mayUseOffPeak, false);
281    if (storeFileSelection != null && !storeFileSelection.isEmpty()) {
282      // If there is any file in the window excluded from compaction,
283      // only one file will be output from compaction.
284      boolean singleOutput = storeFiles.size() != storeFileSelection.size() ||
285        comConf.useDateTieredSingleOutputForMinorCompaction();
286      List<Long> boundaries = getCompactionBoundariesForMinor(window, singleOutput);
287      // we want to generate policy to boundaries for minor compaction
288      Map<Long, String> boundaryPolicyMap =
289        getBoundariesStoragePolicyForMinor(singleOutput, window, now);
290      DateTieredCompactionRequest result = new DateTieredCompactionRequest(storeFileSelection,
291        boundaries, boundaryPolicyMap);
292      return result;
293    }
294    return null;
295  }
296
297  /**
298   * Return a list of boundaries for multiple compaction output in ascending order.
299   */
300  private List<Long> getCompactBoundariesForMajor(Collection<HStoreFile> filesToCompact, long now) {
301    long minTimestamp =
302        filesToCompact.stream().mapToLong(f -> f.getMinimumTimestamp().orElse(Long.MAX_VALUE)).min()
303            .orElse(Long.MAX_VALUE);
304
305    List<Long> boundaries = new ArrayList<>();
306
307    // Add startMillis of all windows between now and min timestamp
308    for (CompactionWindow window = getIncomingWindow(now); window
309        .compareToTimestamp(minTimestamp) > 0; window = window.nextEarlierWindow()) {
310      boundaries.add(window.startMillis());
311    }
312    boundaries.add(Long.MIN_VALUE);
313    Collections.reverse(boundaries);
314    return boundaries;
315  }
316
317  /**
318   * @return a list of boundaries for multiple compaction output from minTimestamp to maxTimestamp.
319   */
320  private static List<Long> getCompactionBoundariesForMinor(CompactionWindow window,
321      boolean singleOutput) {
322    List<Long> boundaries = new ArrayList<>();
323    boundaries.add(Long.MIN_VALUE);
324    if (!singleOutput) {
325      boundaries.add(window.startMillis());
326    }
327    return boundaries;
328  }
329
330  private CompactionWindow getIncomingWindow(long now) {
331    return windowFactory.newIncomingWindow(now);
332  }
333
334  private static long getOldestToCompact(long maxAgeMillis, long now) {
335    try {
336      return LongMath.checkedSubtract(now, maxAgeMillis);
337    } catch (ArithmeticException ae) {
338      LOG.warn("Value for " + CompactionConfiguration.DATE_TIERED_MAX_AGE_MILLIS_KEY + ": "
339          + maxAgeMillis + ". All the files will be eligible for minor compaction.");
340      return Long.MIN_VALUE;
341    }
342  }
343
344  private Map<Long, String> getBoundariesStoragePolicyForMinor(boolean singleOutput,
345      CompactionWindow window, long now) {
346    Map<Long, String> boundariesPolicy = new HashMap<>();
347    if (!comConf.isDateTieredStoragePolicyEnable()) {
348      return boundariesPolicy;
349    }
350    String windowStoragePolicy = getWindowStoragePolicy(now, window.startMillis());
351    if (singleOutput) {
352      boundariesPolicy.put(Long.MIN_VALUE, windowStoragePolicy);
353    } else {
354      boundariesPolicy.put(window.startMillis(), windowStoragePolicy);
355    }
356    return boundariesPolicy;
357  }
358
359  private Map<Long, String> getBoundariesStoragePolicyForMajor(List<Long> boundaries, long now) {
360    Map<Long, String> boundariesPolicy = new HashMap<>();
361    if (!comConf.isDateTieredStoragePolicyEnable()) {
362      return boundariesPolicy;
363    }
364    for (Long startTs : boundaries) {
365      boundariesPolicy.put(startTs, getWindowStoragePolicy(now, startTs));
366    }
367    return boundariesPolicy;
368  }
369
370  private String getWindowStoragePolicy(long now, long windowStartMillis) {
371    if (windowStartMillis >= (now - comConf.getHotWindowAgeMillis())) {
372      return comConf.getHotWindowStoragePolicy();
373    } else if (windowStartMillis >= (now - comConf.getWarmWindowAgeMillis())) {
374      return comConf.getWarmWindowStoragePolicy();
375    }
376    return comConf.getColdWindowStoragePolicy();
377  }
378}