001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.compactions;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collection;
023import java.util.Collections;
024import java.util.HashMap;
025import java.util.List;
026import java.util.Map;
027import java.util.OptionalLong;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.HBaseInterfaceAudience;
030import org.apache.hadoop.hbase.HDFSBlocksDistribution;
031import org.apache.hadoop.hbase.regionserver.HStoreFile;
032import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
033import org.apache.hadoop.hbase.regionserver.StoreUtils;
034import org.apache.hadoop.hbase.util.DNS;
035import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
036import org.apache.hadoop.hbase.util.Pair;
037import org.apache.hadoop.hbase.util.ReflectionUtils;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
043import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
044import org.apache.hbase.thirdparty.com.google.common.collect.PeekingIterator;
045import org.apache.hbase.thirdparty.com.google.common.math.LongMath;
046
047/**
048 * HBASE-15181 This is a simple implementation of date-based tiered compaction similar to
049 * Cassandra's for the following benefits:
050 * <ol>
051 * <li>Improve date-range-based scan by structuring store files in date-based tiered layout.</li>
052 * <li>Reduce compaction overhead.</li>
053 * <li>Improve TTL efficiency.</li>
054 * </ol>
055 * Perfect fit for the use cases that:
056 * <ol>
057 * <li>has mostly date-based data write and scan and a focus on the most recent data.</li>
058 * </ol>
059 * Out-of-order writes are handled gracefully. Time range overlapping among store files is tolerated
060 * and the performance impact is minimized. Configuration can be set at hbase-site or overridden at
061 * per-table or per-column-family level by hbase shell. Design spec is at
062 * https://docs.google.com/document/d/1_AmlNb2N8Us1xICsTeGDLKIqL6T-oHoRLZ323MG_uy8/
063 */
064@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
065public class DateTieredCompactionPolicy extends SortedCompactionPolicy {
066
067  private static final Logger LOG = LoggerFactory.getLogger(DateTieredCompactionPolicy.class);
068
069  private final RatioBasedCompactionPolicy compactionPolicyPerWindow;
070
071  private final CompactionWindowFactory windowFactory;
072
073  public DateTieredCompactionPolicy(Configuration conf, StoreConfigInformation storeConfigInfo)
074    throws IOException {
075    super(conf, storeConfigInfo);
076    try {
077      compactionPolicyPerWindow =
078        ReflectionUtils.instantiateWithCustomCtor(comConf.getCompactionPolicyForDateTieredWindow(),
079          new Class[] { Configuration.class, StoreConfigInformation.class },
080          new Object[] { conf, storeConfigInfo });
081    } catch (Exception e) {
082      throw new IOException("Unable to load configured compaction policy '"
083        + comConf.getCompactionPolicyForDateTieredWindow() + "'", e);
084    }
085    try {
086      windowFactory =
087        ReflectionUtils.instantiateWithCustomCtor(comConf.getDateTieredCompactionWindowFactory(),
088          new Class[] { CompactionConfiguration.class }, new Object[] { comConf });
089    } catch (Exception e) {
090      throw new IOException("Unable to load configured window factory '"
091        + comConf.getDateTieredCompactionWindowFactory() + "'", e);
092    }
093  }
094
095  /**
096   * Heuristics for guessing whether we need minor compaction.
097   */
098  @Override
099  @InterfaceAudience.Private
100  public boolean needsCompaction(Collection<HStoreFile> storeFiles,
101    List<HStoreFile> filesCompacting) {
102    ArrayList<HStoreFile> candidates = new ArrayList<>(storeFiles);
103    try {
104      return !selectMinorCompaction(candidates, false, true).getFiles().isEmpty();
105    } catch (Exception e) {
106      LOG.error("Can not check for compaction: ", e);
107      return false;
108    }
109  }
110
111  @Override
112  public boolean shouldPerformMajorCompaction(Collection<HStoreFile> filesToCompact)
113    throws IOException {
114    long mcTime = getNextMajorCompactTime(filesToCompact);
115    if (filesToCompact == null || mcTime == 0) {
116      if (LOG.isDebugEnabled()) {
117        LOG.debug("filesToCompact: " + filesToCompact + " mcTime: " + mcTime);
118      }
119      return false;
120    }
121
122    // TODO: Use better method for determining stamp of last major (HBASE-2990)
123    long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact);
124    long now = EnvironmentEdgeManager.currentTime();
125    if (lowTimestamp <= 0L || lowTimestamp >= (now - mcTime)) {
126      if (LOG.isDebugEnabled()) {
127        LOG.debug("lowTimestamp: " + lowTimestamp + " lowTimestamp: " + lowTimestamp + " now: "
128          + now + " mcTime: " + mcTime);
129      }
130      return false;
131    }
132
133    long cfTTL = this.storeConfigInfo.getStoreFileTtl();
134    HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution();
135    List<Long> boundaries = getCompactBoundariesForMajor(filesToCompact, now);
136    boolean[] filesInWindow = new boolean[boundaries.size()];
137
138    for (HStoreFile file : filesToCompact) {
139      OptionalLong minTimestamp = file.getMinimumTimestamp();
140      long oldest = minTimestamp.isPresent() ? now - minTimestamp.getAsLong() : Long.MIN_VALUE;
141      if (cfTTL != Long.MAX_VALUE && oldest >= cfTTL) {
142        LOG.debug("Major compaction triggered on store " + this + "; for TTL maintenance");
143        return true;
144      }
145      if (!file.isMajorCompactionResult() || file.isBulkLoadResult()) {
146        LOG.debug("Major compaction triggered on store " + this
147          + ", because there are new files and time since last major compaction "
148          + (now - lowTimestamp) + "ms");
149        return true;
150      }
151
152      int lowerWindowIndex =
153        Collections.binarySearch(boundaries, minTimestamp.orElse(Long.MAX_VALUE));
154      int upperWindowIndex =
155        Collections.binarySearch(boundaries, file.getMaximumTimestamp().orElse(Long.MAX_VALUE));
156      // Handle boundary conditions and negative values of binarySearch
157      lowerWindowIndex = (lowerWindowIndex < 0) ? Math.abs(lowerWindowIndex + 2) : lowerWindowIndex;
158      upperWindowIndex = (upperWindowIndex < 0) ? Math.abs(upperWindowIndex + 2) : upperWindowIndex;
159      if (lowerWindowIndex != upperWindowIndex) {
160        LOG.debug("Major compaction triggered on store " + this + "; because file " + file.getPath()
161          + " has data with timestamps cross window boundaries");
162        return true;
163      } else if (filesInWindow[upperWindowIndex]) {
164        LOG.debug("Major compaction triggered on store " + this
165          + "; because there are more than one file in some windows");
166        return true;
167      } else {
168        filesInWindow[upperWindowIndex] = true;
169      }
170      hdfsBlocksDistribution.add(file.getHDFSBlockDistribution());
171    }
172
173    float blockLocalityIndex = hdfsBlocksDistribution
174      .getBlockLocalityIndex(DNS.getHostname(comConf.conf, DNS.ServerType.REGIONSERVER));
175    if (blockLocalityIndex < comConf.getMinLocalityToForceCompact()) {
176      LOG.debug("Major compaction triggered on store " + this
177        + "; to make hdfs blocks local, current blockLocalityIndex is " + blockLocalityIndex
178        + " (min " + comConf.getMinLocalityToForceCompact() + ")");
179      return true;
180    }
181
182    LOG.debug(
183      "Skipping major compaction of " + this + ", because the files are already major compacted");
184    return false;
185  }
186
187  @Override
188  protected CompactionRequestImpl createCompactionRequest(ArrayList<HStoreFile> candidateSelection,
189    boolean tryingMajor, boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {
190    CompactionRequestImpl result = tryingMajor
191      ? selectMajorCompaction(candidateSelection)
192      : selectMinorCompaction(candidateSelection, mayUseOffPeak, mayBeStuck);
193    if (LOG.isDebugEnabled()) {
194      LOG.debug("Generated compaction request: " + result);
195    }
196    return result;
197  }
198
199  public CompactionRequestImpl selectMajorCompaction(ArrayList<HStoreFile> candidateSelection) {
200    long now = EnvironmentEdgeManager.currentTime();
201    List<Long> boundaries = getCompactBoundariesForMajor(candidateSelection, now);
202    Map<Long, String> boundariesPolicies = getBoundariesStoragePolicyForMajor(boundaries, now);
203    return new DateTieredCompactionRequest(candidateSelection, boundaries, boundariesPolicies);
204  }
205
206  /**
207   * We receive store files sorted in ascending order by seqId then scan the list of files. If the
208   * current file has a maxTimestamp older than last known maximum, treat this file as it carries
209   * the last known maximum. This way both seqId and timestamp are in the same order. If files carry
210   * the same maxTimestamps, they are ordered by seqId. We then reverse the list so they are ordered
211   * by seqId and maxTimestamp in descending order and build the time windows. All the out-of-order
212   * data into the same compaction windows, guaranteeing contiguous compaction based on sequence id.
213   */
214  public CompactionRequestImpl selectMinorCompaction(ArrayList<HStoreFile> candidateSelection,
215    boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {
216    long now = EnvironmentEdgeManager.currentTime();
217    long oldestToCompact = getOldestToCompact(comConf.getDateTieredMaxStoreFileAgeMillis(), now);
218
219    List<Pair<HStoreFile, Long>> storefileMaxTimestampPairs =
220      Lists.newArrayListWithCapacity(candidateSelection.size());
221    long maxTimestampSeen = Long.MIN_VALUE;
222    for (HStoreFile storeFile : candidateSelection) {
223      // if there is out-of-order data,
224      // we put them in the same window as the last file in increasing order
225      maxTimestampSeen =
226        Math.max(maxTimestampSeen, storeFile.getMaximumTimestamp().orElse(Long.MIN_VALUE));
227      storefileMaxTimestampPairs.add(new Pair<>(storeFile, maxTimestampSeen));
228    }
229    Collections.reverse(storefileMaxTimestampPairs);
230
231    CompactionWindow window = getIncomingWindow(now);
232    int minThreshold = comConf.getDateTieredIncomingWindowMin();
233    PeekingIterator<Pair<HStoreFile, Long>> it =
234      Iterators.peekingIterator(storefileMaxTimestampPairs.iterator());
235    while (it.hasNext()) {
236      if (window.compareToTimestamp(oldestToCompact) < 0) {
237        break;
238      }
239      int compResult = window.compareToTimestamp(it.peek().getSecond());
240      if (compResult > 0) {
241        // If the file is too old for the window, switch to the next window
242        window = window.nextEarlierWindow();
243        minThreshold = comConf.getMinFilesToCompact();
244      } else {
245        // The file is within the target window
246        ArrayList<HStoreFile> fileList = Lists.newArrayList();
247        // Add all files in the same window. For incoming window
248        // we tolerate files with future data although it is sub-optimal
249        while (it.hasNext() && window.compareToTimestamp(it.peek().getSecond()) <= 0) {
250          fileList.add(it.next().getFirst());
251        }
252        if (fileList.size() >= minThreshold) {
253          if (LOG.isDebugEnabled()) {
254            LOG.debug("Processing files: " + fileList + " for window: " + window);
255          }
256          DateTieredCompactionRequest request = generateCompactionRequest(fileList, window,
257            mayUseOffPeak, mayBeStuck, minThreshold, now);
258          if (request != null) {
259            return request;
260          }
261        }
262      }
263    }
264    // A non-null file list is expected by HStore
265    return new CompactionRequestImpl(Collections.emptyList());
266  }
267
268  private DateTieredCompactionRequest generateCompactionRequest(ArrayList<HStoreFile> storeFiles,
269    CompactionWindow window, boolean mayUseOffPeak, boolean mayBeStuck, int minThreshold, long now)
270    throws IOException {
271    // The files has to be in ascending order for ratio-based compaction to work right
272    // and removeExcessFile to exclude youngest files.
273    Collections.reverse(storeFiles);
274
275    // Compact everything in the window if have more files than comConf.maxBlockingFiles
276    compactionPolicyPerWindow.setMinThreshold(minThreshold);
277    ArrayList<HStoreFile> storeFileSelection = mayBeStuck
278      ? storeFiles
279      : compactionPolicyPerWindow.applyCompactionPolicy(storeFiles, mayUseOffPeak, false);
280    if (storeFileSelection != null && !storeFileSelection.isEmpty()) {
281      // If there is any file in the window excluded from compaction,
282      // only one file will be output from compaction.
283      boolean singleOutput = storeFiles.size() != storeFileSelection.size()
284        || comConf.useDateTieredSingleOutputForMinorCompaction();
285      List<Long> boundaries = getCompactionBoundariesForMinor(window, singleOutput);
286      // we want to generate policy to boundaries for minor compaction
287      Map<Long, String> boundaryPolicyMap =
288        getBoundariesStoragePolicyForMinor(singleOutput, window, now);
289      DateTieredCompactionRequest result =
290        new DateTieredCompactionRequest(storeFileSelection, boundaries, boundaryPolicyMap);
291      return result;
292    }
293    return null;
294  }
295
296  /**
297   * Return a list of boundaries for multiple compaction output in ascending order.
298   */
299  private List<Long> getCompactBoundariesForMajor(Collection<HStoreFile> filesToCompact, long now) {
300    long minTimestamp = filesToCompact.stream()
301      .mapToLong(f -> f.getMinimumTimestamp().orElse(Long.MAX_VALUE)).min().orElse(Long.MAX_VALUE);
302
303    List<Long> boundaries = new ArrayList<>();
304
305    // Add startMillis of all windows between now and min timestamp
306    for (CompactionWindow window = getIncomingWindow(now); window.compareToTimestamp(minTimestamp)
307        > 0; window = window.nextEarlierWindow()) {
308      boundaries.add(window.startMillis());
309    }
310    boundaries.add(Long.MIN_VALUE);
311    Collections.reverse(boundaries);
312    return boundaries;
313  }
314
315  /**
316   * Returns a list of boundaries for multiple compaction output from minTimestamp to maxTimestamp.
317   */
318  private static List<Long> getCompactionBoundariesForMinor(CompactionWindow window,
319    boolean singleOutput) {
320    List<Long> boundaries = new ArrayList<>();
321    boundaries.add(Long.MIN_VALUE);
322    if (!singleOutput) {
323      boundaries.add(window.startMillis());
324    }
325    return boundaries;
326  }
327
328  private CompactionWindow getIncomingWindow(long now) {
329    return windowFactory.newIncomingWindow(now);
330  }
331
332  private static long getOldestToCompact(long maxAgeMillis, long now) {
333    try {
334      return LongMath.checkedSubtract(now, maxAgeMillis);
335    } catch (ArithmeticException ae) {
336      LOG.warn("Value for " + CompactionConfiguration.DATE_TIERED_MAX_AGE_MILLIS_KEY + ": "
337        + maxAgeMillis + ". All the files will be eligible for minor compaction.");
338      return Long.MIN_VALUE;
339    }
340  }
341
342  private Map<Long, String> getBoundariesStoragePolicyForMinor(boolean singleOutput,
343    CompactionWindow window, long now) {
344    Map<Long, String> boundariesPolicy = new HashMap<>();
345    if (!comConf.isDateTieredStoragePolicyEnable()) {
346      return boundariesPolicy;
347    }
348    String windowStoragePolicy = getWindowStoragePolicy(now, window.startMillis());
349    if (singleOutput) {
350      boundariesPolicy.put(Long.MIN_VALUE, windowStoragePolicy);
351    } else {
352      boundariesPolicy.put(window.startMillis(), windowStoragePolicy);
353    }
354    return boundariesPolicy;
355  }
356
357  private Map<Long, String> getBoundariesStoragePolicyForMajor(List<Long> boundaries, long now) {
358    Map<Long, String> boundariesPolicy = new HashMap<>();
359    if (!comConf.isDateTieredStoragePolicyEnable()) {
360      return boundariesPolicy;
361    }
362    for (Long startTs : boundaries) {
363      boundariesPolicy.put(startTs, getWindowStoragePolicy(now, startTs));
364    }
365    return boundariesPolicy;
366  }
367
368  private String getWindowStoragePolicy(long now, long windowStartMillis) {
369    if (windowStartMillis >= (now - comConf.getHotWindowAgeMillis())) {
370      return comConf.getHotWindowStoragePolicy();
371    } else if (windowStartMillis >= (now - comConf.getWarmWindowAgeMillis())) {
372      return comConf.getWarmWindowStoragePolicy();
373    }
374    return comConf.getColdWindowStoragePolicy();
375  }
376}