001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.compactions; 019 020import static org.apache.hadoop.hbase.regionserver.CustomTieringMultiFileWriter.CUSTOM_TIERING_TIME_RANGE; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.Collection; 025import java.util.List; 026import org.apache.commons.lang3.mutable.MutableLong; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.HDFSBlocksDistribution; 029import org.apache.hadoop.hbase.regionserver.HStoreFile; 030import org.apache.hadoop.hbase.regionserver.StoreConfigInformation; 031import org.apache.hadoop.hbase.regionserver.StoreUtils; 032import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; 033import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 034import org.apache.yetus.audience.InterfaceAudience; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038/** 039 * Custom implementation of DateTieredCompactionPolicy that calculates compaction boundaries based 040 * on the <b>hbase.hstore.compaction.date.tiered.custom.age.limit.millis</b> configuration property 041 * and the TIERING_CELL_MIN/TIERING_CELL_MAX stored on metadata of each store file. This policy 042 * would produce either one or two tiers: - One tier if either all files data age are older than the 043 * configured age limit or all files data age are younger than the configured age limit. - Two tiers 044 * if files have both younger and older data than the configured age limit. 045 */ 046@InterfaceAudience.Private 047public class CustomDateTieredCompactionPolicy extends DateTieredCompactionPolicy { 048 049 public static final String AGE_LIMIT_MILLIS = 050 "hbase.hstore.compaction.date.tiered.custom.age.limit.millis"; 051 052 // Defaults to 10 years 053 public static final long DEFAULT_AGE_LIMIT_MILLIS = 054 (long) (10L * 365.25 * 24L * 60L * 60L * 1000L); 055 056 private static final Logger LOG = LoggerFactory.getLogger(CustomDateTieredCompactionPolicy.class); 057 058 private long cutOffTimestamp; 059 060 public CustomDateTieredCompactionPolicy(Configuration conf, 061 StoreConfigInformation storeConfigInfo) throws IOException { 062 super(conf, storeConfigInfo); 063 cutOffTimestamp = EnvironmentEdgeManager.currentTime() 064 - conf.getLong(AGE_LIMIT_MILLIS, DEFAULT_AGE_LIMIT_MILLIS); 065 066 } 067 068 @Override 069 protected List<Long> getCompactBoundariesForMajor(Collection<HStoreFile> filesToCompact, 070 long now) { 071 MutableLong min = new MutableLong(Long.MAX_VALUE); 072 MutableLong max = new MutableLong(0); 073 filesToCompact.forEach(f -> { 074 byte[] timeRangeBytes = f.getMetadataValue(CUSTOM_TIERING_TIME_RANGE); 075 long minCurrent = Long.MAX_VALUE; 076 long maxCurrent = 0; 077 if (timeRangeBytes != null) { 078 try { 079 TimeRangeTracker timeRangeTracker = TimeRangeTracker.parseFrom(timeRangeBytes); 080 timeRangeTracker.getMin(); 081 minCurrent = timeRangeTracker.getMin(); 082 maxCurrent = timeRangeTracker.getMax(); 083 } catch (IOException e) { 084 LOG.warn("Got TIERING_CELL_TIME_RANGE info from file, but failed to parse it:", e); 085 } 086 } 087 if (minCurrent < min.getValue()) { 088 min.setValue(minCurrent); 089 } 090 if (maxCurrent > max.getValue()) { 091 max.setValue(maxCurrent); 092 } 093 }); 094 095 List<Long> boundaries = new ArrayList<>(); 096 boundaries.add(Long.MIN_VALUE); 097 if (min.getValue() < cutOffTimestamp) { 098 boundaries.add(min.getValue()); 099 if (max.getValue() > cutOffTimestamp) { 100 boundaries.add(cutOffTimestamp); 101 } 102 } 103 return boundaries; 104 } 105 106 @Override 107 public CompactionRequestImpl selectMinorCompaction(ArrayList<HStoreFile> candidateSelection, 108 boolean mayUseOffPeak, boolean mayBeStuck) throws IOException { 109 ArrayList<HStoreFile> filteredByPolicy = this.compactionPolicyPerWindow 110 .applyCompactionPolicy(candidateSelection, mayUseOffPeak, mayBeStuck); 111 return selectMajorCompaction(filteredByPolicy); 112 } 113 114 @Override 115 public boolean shouldPerformMajorCompaction(Collection<HStoreFile> filesToCompact) 116 throws IOException { 117 long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact); 118 long now = EnvironmentEdgeManager.currentTime(); 119 if (isMajorCompactionTime(filesToCompact, now, lowTimestamp)) { 120 long cfTTL = this.storeConfigInfo.getStoreFileTtl(); 121 int countLower = 0; 122 int countHigher = 0; 123 HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution(); 124 for (HStoreFile f : filesToCompact) { 125 if (checkForTtl(cfTTL, f)) { 126 return true; 127 } 128 if (isMajorOrBulkloadResult(f, now - lowTimestamp)) { 129 return true; 130 } 131 byte[] timeRangeBytes = f.getMetadataValue(CUSTOM_TIERING_TIME_RANGE); 132 TimeRangeTracker timeRangeTracker = TimeRangeTracker.parseFrom(timeRangeBytes); 133 if (timeRangeTracker.getMin() < cutOffTimestamp) { 134 if (timeRangeTracker.getMax() > cutOffTimestamp) { 135 // Found at least one file crossing the cutOffTimestamp 136 return true; 137 } else { 138 countLower++; 139 } 140 } else { 141 countHigher++; 142 } 143 hdfsBlocksDistribution.add(f.getHDFSBlockDistribution()); 144 } 145 // If we haven't found any files crossing the cutOffTimestamp, we have to check 146 // if there are at least more than one file on each tier and if so, perform compaction 147 if (countLower > 1 || countHigher > 1) { 148 return true; 149 } 150 return checkBlockLocality(hdfsBlocksDistribution); 151 } 152 return false; 153 } 154 155}