001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.compactions;
019
020import static org.apache.hadoop.hbase.regionserver.CustomTieringMultiFileWriter.CUSTOM_TIERING_TIME_RANGE;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.Collection;
025import java.util.List;
026import org.apache.commons.lang3.mutable.MutableLong;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.HDFSBlocksDistribution;
029import org.apache.hadoop.hbase.regionserver.HStoreFile;
030import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
031import org.apache.hadoop.hbase.regionserver.StoreUtils;
032import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
033import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
034import org.apache.yetus.audience.InterfaceAudience;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038/**
039 * Custom implementation of DateTieredCompactionPolicy that calculates compaction boundaries based
040 * on the <b>hbase.hstore.compaction.date.tiered.custom.age.limit.millis</b> configuration property
041 * and the TIERING_CELL_MIN/TIERING_CELL_MAX stored on metadata of each store file. This policy
042 * would produce either one or two tiers: - One tier if either all files data age are older than the
043 * configured age limit or all files data age are younger than the configured age limit. - Two tiers
044 * if files have both younger and older data than the configured age limit.
045 */
046@InterfaceAudience.Private
047public class CustomDateTieredCompactionPolicy extends DateTieredCompactionPolicy {
048
049  public static final String AGE_LIMIT_MILLIS =
050    "hbase.hstore.compaction.date.tiered.custom.age.limit.millis";
051
052  // Defaults to 10 years
053  public static final long DEFAULT_AGE_LIMIT_MILLIS =
054    (long) (10L * 365.25 * 24L * 60L * 60L * 1000L);
055
056  private static final Logger LOG = LoggerFactory.getLogger(CustomDateTieredCompactionPolicy.class);
057
058  private long cutOffTimestamp;
059
060  public CustomDateTieredCompactionPolicy(Configuration conf,
061    StoreConfigInformation storeConfigInfo) throws IOException {
062    super(conf, storeConfigInfo);
063    cutOffTimestamp = EnvironmentEdgeManager.currentTime()
064      - conf.getLong(AGE_LIMIT_MILLIS, DEFAULT_AGE_LIMIT_MILLIS);
065
066  }
067
068  @Override
069  protected List<Long> getCompactBoundariesForMajor(Collection<HStoreFile> filesToCompact,
070    long now) {
071    MutableLong min = new MutableLong(Long.MAX_VALUE);
072    MutableLong max = new MutableLong(0);
073    filesToCompact.forEach(f -> {
074      byte[] timeRangeBytes = f.getMetadataValue(CUSTOM_TIERING_TIME_RANGE);
075      long minCurrent = Long.MAX_VALUE;
076      long maxCurrent = 0;
077      if (timeRangeBytes != null) {
078        try {
079          TimeRangeTracker timeRangeTracker = TimeRangeTracker.parseFrom(timeRangeBytes);
080          timeRangeTracker.getMin();
081          minCurrent = timeRangeTracker.getMin();
082          maxCurrent = timeRangeTracker.getMax();
083        } catch (IOException e) {
084          LOG.warn("Got TIERING_CELL_TIME_RANGE info from file, but failed to parse it:", e);
085        }
086      }
087      if (minCurrent < min.getValue()) {
088        min.setValue(minCurrent);
089      }
090      if (maxCurrent > max.getValue()) {
091        max.setValue(maxCurrent);
092      }
093    });
094
095    List<Long> boundaries = new ArrayList<>();
096    boundaries.add(Long.MIN_VALUE);
097    if (min.getValue() < cutOffTimestamp) {
098      boundaries.add(min.getValue());
099      if (max.getValue() > cutOffTimestamp) {
100        boundaries.add(cutOffTimestamp);
101      }
102    }
103    return boundaries;
104  }
105
106  @Override
107  public CompactionRequestImpl selectMinorCompaction(ArrayList<HStoreFile> candidateSelection,
108    boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {
109    ArrayList<HStoreFile> filteredByPolicy = this.compactionPolicyPerWindow
110      .applyCompactionPolicy(candidateSelection, mayUseOffPeak, mayBeStuck);
111    return selectMajorCompaction(filteredByPolicy);
112  }
113
114  @Override
115  public boolean shouldPerformMajorCompaction(Collection<HStoreFile> filesToCompact)
116    throws IOException {
117    long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact);
118    long now = EnvironmentEdgeManager.currentTime();
119    if (isMajorCompactionTime(filesToCompact, now, lowTimestamp)) {
120      long cfTTL = this.storeConfigInfo.getStoreFileTtl();
121      int countLower = 0;
122      int countHigher = 0;
123      HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution();
124      for (HStoreFile f : filesToCompact) {
125        if (checkForTtl(cfTTL, f)) {
126          return true;
127        }
128        if (isMajorOrBulkloadResult(f, now - lowTimestamp)) {
129          return true;
130        }
131        byte[] timeRangeBytes = f.getMetadataValue(CUSTOM_TIERING_TIME_RANGE);
132        TimeRangeTracker timeRangeTracker = TimeRangeTracker.parseFrom(timeRangeBytes);
133        if (timeRangeTracker.getMin() < cutOffTimestamp) {
134          if (timeRangeTracker.getMax() > cutOffTimestamp) {
135            // Found at least one file crossing the cutOffTimestamp
136            return true;
137          } else {
138            countLower++;
139          }
140        } else {
141          countHigher++;
142        }
143        hdfsBlocksDistribution.add(f.getHDFSBlockDistribution());
144      }
145      // If we haven't found any files crossing the cutOffTimestamp, we have to check
146      // if there are at least more than one file on each tier and if so, perform compaction
147      if (countLower > 1 || countHigher > 1) {
148        return true;
149      }
150      return checkBlockLocality(hdfsBlocksDistribution);
151    }
152    return false;
153  }
154
155}