001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.regionserver.compactions; 020 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Collection; 024import java.util.Collections; 025import java.util.List; 026import java.util.OptionalLong; 027 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.HBaseInterfaceAudience; 030import org.apache.hadoop.hbase.HDFSBlocksDistribution; 031import org.apache.hadoop.hbase.regionserver.HStoreFile; 032import org.apache.hadoop.hbase.regionserver.RSRpcServices; 033import org.apache.hadoop.hbase.regionserver.StoreConfigInformation; 034import org.apache.hadoop.hbase.regionserver.StoreUtils; 035import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 036import org.apache.hadoop.hbase.util.Pair; 037import org.apache.hadoop.hbase.util.ReflectionUtils; 038import org.apache.yetus.audience.InterfaceAudience; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 042import org.apache.hbase.thirdparty.com.google.common.collect.Iterators; 043import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 044import org.apache.hbase.thirdparty.com.google.common.collect.PeekingIterator; 045import org.apache.hbase.thirdparty.com.google.common.math.LongMath; 046 047/** 048 * HBASE-15181 This is a simple implementation of date-based tiered compaction similar to 049 * Cassandra's for the following benefits: 050 * <ol> 051 * <li>Improve date-range-based scan by structuring store files in date-based tiered layout.</li> 052 * <li>Reduce compaction overhead.</li> 053 * <li>Improve TTL efficiency.</li> 054 * </ol> 055 * Perfect fit for the use cases that: 056 * <ol> 057 * <li>has mostly date-based data write and scan and a focus on the most recent data.</li> 058 * </ol> 059 * Out-of-order writes are handled gracefully. Time range overlapping among store files is tolerated 060 * and the performance impact is minimized. Configuration can be set at hbase-site or overridden at 061 * per-table or per-column-family level by hbase shell. Design spec is at 062 * https://docs.google.com/document/d/1_AmlNb2N8Us1xICsTeGDLKIqL6T-oHoRLZ323MG_uy8/ 063 */ 064@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 065public class DateTieredCompactionPolicy extends SortedCompactionPolicy { 066 067 private static final Logger LOG = LoggerFactory.getLogger(DateTieredCompactionPolicy.class); 068 069 private final RatioBasedCompactionPolicy compactionPolicyPerWindow; 070 071 private final CompactionWindowFactory windowFactory; 072 073 public DateTieredCompactionPolicy(Configuration conf, StoreConfigInformation storeConfigInfo) 074 throws IOException { 075 super(conf, storeConfigInfo); 076 try { 077 compactionPolicyPerWindow = ReflectionUtils.instantiateWithCustomCtor( 078 comConf.getCompactionPolicyForDateTieredWindow(), 079 new Class[] { Configuration.class, StoreConfigInformation.class }, 080 new Object[] { conf, storeConfigInfo }); 081 } catch (Exception e) { 082 throw new IOException("Unable to load configured compaction policy '" 083 + comConf.getCompactionPolicyForDateTieredWindow() + "'", e); 084 } 085 try { 086 windowFactory = ReflectionUtils.instantiateWithCustomCtor( 087 comConf.getDateTieredCompactionWindowFactory(), 088 new Class[] { CompactionConfiguration.class }, new Object[] { comConf }); 089 } catch (Exception e) { 090 throw new IOException("Unable to load configured window factory '" 091 + comConf.getDateTieredCompactionWindowFactory() + "'", e); 092 } 093 } 094 095 /** 096 * Heuristics for guessing whether we need minor compaction. 097 */ 098 @Override 099 @VisibleForTesting 100 public boolean needsCompaction(Collection<HStoreFile> storeFiles, 101 List<HStoreFile> filesCompacting) { 102 ArrayList<HStoreFile> candidates = new ArrayList<>(storeFiles); 103 try { 104 return !selectMinorCompaction(candidates, false, true).getFiles().isEmpty(); 105 } catch (Exception e) { 106 LOG.error("Can not check for compaction: ", e); 107 return false; 108 } 109 } 110 111 @Override 112 public boolean shouldPerformMajorCompaction(Collection<HStoreFile> filesToCompact) 113 throws IOException { 114 long mcTime = getNextMajorCompactTime(filesToCompact); 115 if (filesToCompact == null || mcTime == 0) { 116 if (LOG.isDebugEnabled()) { 117 LOG.debug("filesToCompact: " + filesToCompact + " mcTime: " + mcTime); 118 } 119 return false; 120 } 121 122 // TODO: Use better method for determining stamp of last major (HBASE-2990) 123 long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact); 124 long now = EnvironmentEdgeManager.currentTime(); 125 if (lowTimestamp <= 0L || lowTimestamp >= (now - mcTime)) { 126 if (LOG.isDebugEnabled()) { 127 LOG.debug("lowTimestamp: " + lowTimestamp + " lowTimestamp: " + lowTimestamp + " now: " + 128 now + " mcTime: " + mcTime); 129 } 130 return false; 131 } 132 133 long cfTTL = this.storeConfigInfo.getStoreFileTtl(); 134 HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution(); 135 List<Long> boundaries = getCompactBoundariesForMajor(filesToCompact, now); 136 boolean[] filesInWindow = new boolean[boundaries.size()]; 137 138 for (HStoreFile file: filesToCompact) { 139 OptionalLong minTimestamp = file.getMinimumTimestamp(); 140 long oldest = minTimestamp.isPresent() ? now - minTimestamp.getAsLong() : Long.MIN_VALUE; 141 if (cfTTL != Long.MAX_VALUE && oldest >= cfTTL) { 142 LOG.debug("Major compaction triggered on store " + this 143 + "; for TTL maintenance"); 144 return true; 145 } 146 if (!file.isMajorCompactionResult() || file.isBulkLoadResult()) { 147 LOG.debug("Major compaction triggered on store " + this 148 + ", because there are new files and time since last major compaction " 149 + (now - lowTimestamp) + "ms"); 150 return true; 151 } 152 153 int lowerWindowIndex = 154 Collections.binarySearch(boundaries, minTimestamp.orElse(Long.MAX_VALUE)); 155 int upperWindowIndex = 156 Collections.binarySearch(boundaries, file.getMaximumTimestamp().orElse(Long.MAX_VALUE)); 157 // Handle boundary conditions and negative values of binarySearch 158 lowerWindowIndex = (lowerWindowIndex < 0) ? Math.abs(lowerWindowIndex + 2) : lowerWindowIndex; 159 upperWindowIndex = (upperWindowIndex < 0) ? Math.abs(upperWindowIndex + 2) : upperWindowIndex; 160 if (lowerWindowIndex != upperWindowIndex) { 161 LOG.debug("Major compaction triggered on store " + this + "; because file " 162 + file.getPath() + " has data with timestamps cross window boundaries"); 163 return true; 164 } else if (filesInWindow[upperWindowIndex]) { 165 LOG.debug("Major compaction triggered on store " + this + 166 "; because there are more than one file in some windows"); 167 return true; 168 } else { 169 filesInWindow[upperWindowIndex] = true; 170 } 171 hdfsBlocksDistribution.add(file.getHDFSBlockDistribution()); 172 } 173 174 float blockLocalityIndex = hdfsBlocksDistribution 175 .getBlockLocalityIndex(RSRpcServices.getHostname(comConf.conf, false)); 176 if (blockLocalityIndex < comConf.getMinLocalityToForceCompact()) { 177 LOG.debug("Major compaction triggered on store " + this 178 + "; to make hdfs blocks local, current blockLocalityIndex is " 179 + blockLocalityIndex + " (min " + comConf.getMinLocalityToForceCompact() + ")"); 180 return true; 181 } 182 183 LOG.debug("Skipping major compaction of " + this + 184 ", because the files are already major compacted"); 185 return false; 186 } 187 188 @Override 189 protected CompactionRequestImpl createCompactionRequest(ArrayList<HStoreFile> candidateSelection, 190 boolean tryingMajor, boolean mayUseOffPeak, boolean mayBeStuck) throws IOException { 191 CompactionRequestImpl result = tryingMajor ? selectMajorCompaction(candidateSelection) 192 : selectMinorCompaction(candidateSelection, mayUseOffPeak, mayBeStuck); 193 if (LOG.isDebugEnabled()) { 194 LOG.debug("Generated compaction request: " + result); 195 } 196 return result; 197 } 198 199 public CompactionRequestImpl selectMajorCompaction(ArrayList<HStoreFile> candidateSelection) { 200 long now = EnvironmentEdgeManager.currentTime(); 201 return new DateTieredCompactionRequest(candidateSelection, 202 this.getCompactBoundariesForMajor(candidateSelection, now)); 203 } 204 205 /** 206 * We receive store files sorted in ascending order by seqId then scan the list of files. If the 207 * current file has a maxTimestamp older than last known maximum, treat this file as it carries 208 * the last known maximum. This way both seqId and timestamp are in the same order. If files carry 209 * the same maxTimestamps, they are ordered by seqId. We then reverse the list so they are ordered 210 * by seqId and maxTimestamp in descending order and build the time windows. All the out-of-order 211 * data into the same compaction windows, guaranteeing contiguous compaction based on sequence id. 212 */ 213 public CompactionRequestImpl selectMinorCompaction(ArrayList<HStoreFile> candidateSelection, 214 boolean mayUseOffPeak, boolean mayBeStuck) throws IOException { 215 long now = EnvironmentEdgeManager.currentTime(); 216 long oldestToCompact = getOldestToCompact(comConf.getDateTieredMaxStoreFileAgeMillis(), now); 217 218 List<Pair<HStoreFile, Long>> storefileMaxTimestampPairs = 219 Lists.newArrayListWithCapacity(candidateSelection.size()); 220 long maxTimestampSeen = Long.MIN_VALUE; 221 for (HStoreFile storeFile : candidateSelection) { 222 // if there is out-of-order data, 223 // we put them in the same window as the last file in increasing order 224 maxTimestampSeen = 225 Math.max(maxTimestampSeen, storeFile.getMaximumTimestamp().orElse(Long.MIN_VALUE)); 226 storefileMaxTimestampPairs.add(new Pair<>(storeFile, maxTimestampSeen)); 227 } 228 Collections.reverse(storefileMaxTimestampPairs); 229 230 CompactionWindow window = getIncomingWindow(now); 231 int minThreshold = comConf.getDateTieredIncomingWindowMin(); 232 PeekingIterator<Pair<HStoreFile, Long>> it = 233 Iterators.peekingIterator(storefileMaxTimestampPairs.iterator()); 234 while (it.hasNext()) { 235 if (window.compareToTimestamp(oldestToCompact) < 0) { 236 break; 237 } 238 int compResult = window.compareToTimestamp(it.peek().getSecond()); 239 if (compResult > 0) { 240 // If the file is too old for the window, switch to the next window 241 window = window.nextEarlierWindow(); 242 minThreshold = comConf.getMinFilesToCompact(); 243 } else { 244 // The file is within the target window 245 ArrayList<HStoreFile> fileList = Lists.newArrayList(); 246 // Add all files in the same window. For incoming window 247 // we tolerate files with future data although it is sub-optimal 248 while (it.hasNext() && window.compareToTimestamp(it.peek().getSecond()) <= 0) { 249 fileList.add(it.next().getFirst()); 250 } 251 if (fileList.size() >= minThreshold) { 252 if (LOG.isDebugEnabled()) { 253 LOG.debug("Processing files: " + fileList + " for window: " + window); 254 } 255 DateTieredCompactionRequest request = generateCompactionRequest(fileList, window, 256 mayUseOffPeak, mayBeStuck, minThreshold); 257 if (request != null) { 258 return request; 259 } 260 } 261 } 262 } 263 // A non-null file list is expected by HStore 264 return new CompactionRequestImpl(Collections.emptyList()); 265 } 266 267 private DateTieredCompactionRequest generateCompactionRequest(ArrayList<HStoreFile> storeFiles, 268 CompactionWindow window, boolean mayUseOffPeak, boolean mayBeStuck, int minThreshold) 269 throws IOException { 270 // The files has to be in ascending order for ratio-based compaction to work right 271 // and removeExcessFile to exclude youngest files. 272 Collections.reverse(storeFiles); 273 274 // Compact everything in the window if have more files than comConf.maxBlockingFiles 275 compactionPolicyPerWindow.setMinThreshold(minThreshold); 276 ArrayList<HStoreFile> storeFileSelection = mayBeStuck ? storeFiles 277 : compactionPolicyPerWindow.applyCompactionPolicy(storeFiles, mayUseOffPeak, false); 278 if (storeFileSelection != null && !storeFileSelection.isEmpty()) { 279 // If there is any file in the window excluded from compaction, 280 // only one file will be output from compaction. 281 boolean singleOutput = storeFiles.size() != storeFileSelection.size() || 282 comConf.useDateTieredSingleOutputForMinorCompaction(); 283 List<Long> boundaries = getCompactionBoundariesForMinor(window, singleOutput); 284 DateTieredCompactionRequest result = new DateTieredCompactionRequest(storeFileSelection, 285 boundaries); 286 return result; 287 } 288 return null; 289 } 290 291 /** 292 * Return a list of boundaries for multiple compaction output in ascending order. 293 */ 294 private List<Long> getCompactBoundariesForMajor(Collection<HStoreFile> filesToCompact, long now) { 295 long minTimestamp = 296 filesToCompact.stream().mapToLong(f -> f.getMinimumTimestamp().orElse(Long.MAX_VALUE)).min() 297 .orElse(Long.MAX_VALUE); 298 299 List<Long> boundaries = new ArrayList<>(); 300 301 // Add startMillis of all windows between now and min timestamp 302 for (CompactionWindow window = getIncomingWindow(now); window 303 .compareToTimestamp(minTimestamp) > 0; window = window.nextEarlierWindow()) { 304 boundaries.add(window.startMillis()); 305 } 306 boundaries.add(Long.MIN_VALUE); 307 Collections.reverse(boundaries); 308 return boundaries; 309 } 310 311 /** 312 * @return a list of boundaries for multiple compaction output from minTimestamp to maxTimestamp. 313 */ 314 private static List<Long> getCompactionBoundariesForMinor(CompactionWindow window, 315 boolean singleOutput) { 316 List<Long> boundaries = new ArrayList<>(); 317 boundaries.add(Long.MIN_VALUE); 318 if (!singleOutput) { 319 boundaries.add(window.startMillis()); 320 } 321 return boundaries; 322 } 323 324 private CompactionWindow getIncomingWindow(long now) { 325 return windowFactory.newIncomingWindow(now); 326 } 327 328 private static long getOldestToCompact(long maxAgeMillis, long now) { 329 try { 330 return LongMath.checkedSubtract(now, maxAgeMillis); 331 } catch (ArithmeticException ae) { 332 LOG.warn("Value for " + CompactionConfiguration.DATE_TIERED_MAX_AGE_MILLIS_KEY + ": " 333 + maxAgeMillis + ". All the files will be eligible for minor compaction."); 334 return Long.MIN_VALUE; 335 } 336 } 337}