001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.regionserver.compactions; 020 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Collection; 024import java.util.Collections; 025import java.util.HashMap; 026import java.util.List; 027import java.util.Map; 028import java.util.OptionalLong; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.HBaseInterfaceAudience; 031import org.apache.hadoop.hbase.HDFSBlocksDistribution; 032import org.apache.hadoop.hbase.regionserver.HStoreFile; 033import org.apache.hadoop.hbase.regionserver.StoreConfigInformation; 034import org.apache.hadoop.hbase.regionserver.StoreUtils; 035import org.apache.hadoop.hbase.util.DNS; 036import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 037import org.apache.hadoop.hbase.util.Pair; 038import org.apache.hadoop.hbase.util.ReflectionUtils; 039import org.apache.yetus.audience.InterfaceAudience; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043import org.apache.hbase.thirdparty.com.google.common.collect.Iterators; 044import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 045import org.apache.hbase.thirdparty.com.google.common.collect.PeekingIterator; 046import org.apache.hbase.thirdparty.com.google.common.math.LongMath; 047 048/** 049 * HBASE-15181 This is a simple implementation of date-based tiered compaction similar to 050 * Cassandra's for the following benefits: 051 * <ol> 052 * <li>Improve date-range-based scan by structuring store files in date-based tiered layout.</li> 053 * <li>Reduce compaction overhead.</li> 054 * <li>Improve TTL efficiency.</li> 055 * </ol> 056 * Perfect fit for the use cases that: 057 * <ol> 058 * <li>has mostly date-based data write and scan and a focus on the most recent data.</li> 059 * </ol> 060 * Out-of-order writes are handled gracefully. Time range overlapping among store files is tolerated 061 * and the performance impact is minimized. Configuration can be set at hbase-site or overridden at 062 * per-table or per-column-family level by hbase shell. Design spec is at 063 * https://docs.google.com/document/d/1_AmlNb2N8Us1xICsTeGDLKIqL6T-oHoRLZ323MG_uy8/ 064 */ 065@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 066public class DateTieredCompactionPolicy extends SortedCompactionPolicy { 067 068 private static final Logger LOG = LoggerFactory.getLogger(DateTieredCompactionPolicy.class); 069 070 private final RatioBasedCompactionPolicy compactionPolicyPerWindow; 071 072 private final CompactionWindowFactory windowFactory; 073 074 public DateTieredCompactionPolicy(Configuration conf, StoreConfigInformation storeConfigInfo) 075 throws IOException { 076 super(conf, storeConfigInfo); 077 try { 078 compactionPolicyPerWindow = ReflectionUtils.instantiateWithCustomCtor( 079 comConf.getCompactionPolicyForDateTieredWindow(), 080 new Class[] { Configuration.class, StoreConfigInformation.class }, 081 new Object[] { conf, storeConfigInfo }); 082 } catch (Exception e) { 083 throw new IOException("Unable to load configured compaction policy '" 084 + comConf.getCompactionPolicyForDateTieredWindow() + "'", e); 085 } 086 try { 087 windowFactory = ReflectionUtils.instantiateWithCustomCtor( 088 comConf.getDateTieredCompactionWindowFactory(), 089 new Class[] { CompactionConfiguration.class }, new Object[] { comConf }); 090 } catch (Exception e) { 091 throw new IOException("Unable to load configured window factory '" 092 + comConf.getDateTieredCompactionWindowFactory() + "'", e); 093 } 094 } 095 096 /** 097 * Heuristics for guessing whether we need minor compaction. 098 */ 099 @InterfaceAudience.Private 100 @Override 101 public boolean needsCompaction(Collection<HStoreFile> storeFiles, 102 List<HStoreFile> filesCompacting) { 103 ArrayList<HStoreFile> candidates = new ArrayList<>(storeFiles); 104 try { 105 return !selectMinorCompaction(candidates, false, true).getFiles().isEmpty(); 106 } catch (Exception e) { 107 LOG.error("Can not check for compaction: ", e); 108 return false; 109 } 110 } 111 112 @Override 113 public boolean shouldPerformMajorCompaction(Collection<HStoreFile> filesToCompact) 114 throws IOException { 115 long mcTime = getNextMajorCompactTime(filesToCompact); 116 if (filesToCompact == null || mcTime == 0) { 117 if (LOG.isDebugEnabled()) { 118 LOG.debug("filesToCompact: " + filesToCompact + " mcTime: " + mcTime); 119 } 120 return false; 121 } 122 123 // TODO: Use better method for determining stamp of last major (HBASE-2990) 124 long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact); 125 long now = EnvironmentEdgeManager.currentTime(); 126 if (lowTimestamp <= 0L || lowTimestamp >= (now - mcTime)) { 127 if (LOG.isDebugEnabled()) { 128 LOG.debug("lowTimestamp: " + lowTimestamp + " lowTimestamp: " + lowTimestamp + " now: " + 129 now + " mcTime: " + mcTime); 130 } 131 return false; 132 } 133 134 long cfTTL = this.storeConfigInfo.getStoreFileTtl(); 135 HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution(); 136 List<Long> boundaries = getCompactBoundariesForMajor(filesToCompact, now); 137 boolean[] filesInWindow = new boolean[boundaries.size()]; 138 139 for (HStoreFile file: filesToCompact) { 140 OptionalLong minTimestamp = file.getMinimumTimestamp(); 141 long oldest = minTimestamp.isPresent() ? now - minTimestamp.getAsLong() : Long.MIN_VALUE; 142 if (cfTTL != Long.MAX_VALUE && oldest >= cfTTL) { 143 LOG.debug("Major compaction triggered on store " + this 144 + "; for TTL maintenance"); 145 return true; 146 } 147 if (!file.isMajorCompactionResult() || file.isBulkLoadResult()) { 148 LOG.debug("Major compaction triggered on store " + this 149 + ", because there are new files and time since last major compaction " 150 + (now - lowTimestamp) + "ms"); 151 return true; 152 } 153 154 int lowerWindowIndex = 155 Collections.binarySearch(boundaries, minTimestamp.orElse(Long.MAX_VALUE)); 156 int upperWindowIndex = 157 Collections.binarySearch(boundaries, file.getMaximumTimestamp().orElse(Long.MAX_VALUE)); 158 // Handle boundary conditions and negative values of binarySearch 159 lowerWindowIndex = (lowerWindowIndex < 0) ? Math.abs(lowerWindowIndex + 2) : lowerWindowIndex; 160 upperWindowIndex = (upperWindowIndex < 0) ? Math.abs(upperWindowIndex + 2) : upperWindowIndex; 161 if (lowerWindowIndex != upperWindowIndex) { 162 LOG.debug("Major compaction triggered on store " + this + "; because file " 163 + file.getPath() + " has data with timestamps cross window boundaries"); 164 return true; 165 } else if (filesInWindow[upperWindowIndex]) { 166 LOG.debug("Major compaction triggered on store " + this + 167 "; because there are more than one file in some windows"); 168 return true; 169 } else { 170 filesInWindow[upperWindowIndex] = true; 171 } 172 hdfsBlocksDistribution.add(file.getHDFSBlockDistribution()); 173 } 174 175 float blockLocalityIndex = hdfsBlocksDistribution 176 .getBlockLocalityIndex(DNS.getHostname(comConf.conf, DNS.ServerType.REGIONSERVER)); 177 if (blockLocalityIndex < comConf.getMinLocalityToForceCompact()) { 178 LOG.debug("Major compaction triggered on store " + this 179 + "; to make hdfs blocks local, current blockLocalityIndex is " 180 + blockLocalityIndex + " (min " + comConf.getMinLocalityToForceCompact() + ")"); 181 return true; 182 } 183 184 LOG.debug("Skipping major compaction of " + this + 185 ", because the files are already major compacted"); 186 return false; 187 } 188 189 @Override 190 protected CompactionRequestImpl createCompactionRequest(ArrayList<HStoreFile> candidateSelection, 191 boolean tryingMajor, boolean mayUseOffPeak, boolean mayBeStuck) throws IOException { 192 CompactionRequestImpl result = tryingMajor ? selectMajorCompaction(candidateSelection) 193 : selectMinorCompaction(candidateSelection, mayUseOffPeak, mayBeStuck); 194 if (LOG.isDebugEnabled()) { 195 LOG.debug("Generated compaction request: " + result); 196 } 197 return result; 198 } 199 200 public CompactionRequestImpl selectMajorCompaction(ArrayList<HStoreFile> candidateSelection) { 201 long now = EnvironmentEdgeManager.currentTime(); 202 List<Long> boundaries = getCompactBoundariesForMajor(candidateSelection, now); 203 Map<Long, String> boundariesPolicies = getBoundariesStoragePolicyForMajor(boundaries, now); 204 return new DateTieredCompactionRequest(candidateSelection, 205 boundaries, boundariesPolicies); 206 } 207 208 /** 209 * We receive store files sorted in ascending order by seqId then scan the list of files. If the 210 * current file has a maxTimestamp older than last known maximum, treat this file as it carries 211 * the last known maximum. This way both seqId and timestamp are in the same order. If files carry 212 * the same maxTimestamps, they are ordered by seqId. We then reverse the list so they are ordered 213 * by seqId and maxTimestamp in descending order and build the time windows. All the out-of-order 214 * data into the same compaction windows, guaranteeing contiguous compaction based on sequence id. 215 */ 216 public CompactionRequestImpl selectMinorCompaction(ArrayList<HStoreFile> candidateSelection, 217 boolean mayUseOffPeak, boolean mayBeStuck) throws IOException { 218 long now = EnvironmentEdgeManager.currentTime(); 219 long oldestToCompact = getOldestToCompact(comConf.getDateTieredMaxStoreFileAgeMillis(), now); 220 221 List<Pair<HStoreFile, Long>> storefileMaxTimestampPairs = 222 Lists.newArrayListWithCapacity(candidateSelection.size()); 223 long maxTimestampSeen = Long.MIN_VALUE; 224 for (HStoreFile storeFile : candidateSelection) { 225 // if there is out-of-order data, 226 // we put them in the same window as the last file in increasing order 227 maxTimestampSeen = 228 Math.max(maxTimestampSeen, storeFile.getMaximumTimestamp().orElse(Long.MIN_VALUE)); 229 storefileMaxTimestampPairs.add(new Pair<>(storeFile, maxTimestampSeen)); 230 } 231 Collections.reverse(storefileMaxTimestampPairs); 232 233 CompactionWindow window = getIncomingWindow(now); 234 int minThreshold = comConf.getDateTieredIncomingWindowMin(); 235 PeekingIterator<Pair<HStoreFile, Long>> it = 236 Iterators.peekingIterator(storefileMaxTimestampPairs.iterator()); 237 while (it.hasNext()) { 238 if (window.compareToTimestamp(oldestToCompact) < 0) { 239 break; 240 } 241 int compResult = window.compareToTimestamp(it.peek().getSecond()); 242 if (compResult > 0) { 243 // If the file is too old for the window, switch to the next window 244 window = window.nextEarlierWindow(); 245 minThreshold = comConf.getMinFilesToCompact(); 246 } else { 247 // The file is within the target window 248 ArrayList<HStoreFile> fileList = Lists.newArrayList(); 249 // Add all files in the same window. For incoming window 250 // we tolerate files with future data although it is sub-optimal 251 while (it.hasNext() && window.compareToTimestamp(it.peek().getSecond()) <= 0) { 252 fileList.add(it.next().getFirst()); 253 } 254 if (fileList.size() >= minThreshold) { 255 if (LOG.isDebugEnabled()) { 256 LOG.debug("Processing files: " + fileList + " for window: " + window); 257 } 258 DateTieredCompactionRequest request = generateCompactionRequest(fileList, window, 259 mayUseOffPeak, mayBeStuck, minThreshold, now); 260 if (request != null) { 261 return request; 262 } 263 } 264 } 265 } 266 // A non-null file list is expected by HStore 267 return new CompactionRequestImpl(Collections.emptyList()); 268 } 269 270 private DateTieredCompactionRequest generateCompactionRequest(ArrayList<HStoreFile> storeFiles, 271 CompactionWindow window, boolean mayUseOffPeak, boolean mayBeStuck, int minThreshold, 272 long now) throws IOException { 273 // The files has to be in ascending order for ratio-based compaction to work right 274 // and removeExcessFile to exclude youngest files. 275 Collections.reverse(storeFiles); 276 277 // Compact everything in the window if have more files than comConf.maxBlockingFiles 278 compactionPolicyPerWindow.setMinThreshold(minThreshold); 279 ArrayList<HStoreFile> storeFileSelection = mayBeStuck ? storeFiles 280 : compactionPolicyPerWindow.applyCompactionPolicy(storeFiles, mayUseOffPeak, false); 281 if (storeFileSelection != null && !storeFileSelection.isEmpty()) { 282 // If there is any file in the window excluded from compaction, 283 // only one file will be output from compaction. 284 boolean singleOutput = storeFiles.size() != storeFileSelection.size() || 285 comConf.useDateTieredSingleOutputForMinorCompaction(); 286 List<Long> boundaries = getCompactionBoundariesForMinor(window, singleOutput); 287 // we want to generate policy to boundaries for minor compaction 288 Map<Long, String> boundaryPolicyMap = 289 getBoundariesStoragePolicyForMinor(singleOutput, window, now); 290 DateTieredCompactionRequest result = new DateTieredCompactionRequest(storeFileSelection, 291 boundaries, boundaryPolicyMap); 292 return result; 293 } 294 return null; 295 } 296 297 /** 298 * Return a list of boundaries for multiple compaction output in ascending order. 299 */ 300 private List<Long> getCompactBoundariesForMajor(Collection<HStoreFile> filesToCompact, long now) { 301 long minTimestamp = 302 filesToCompact.stream().mapToLong(f -> f.getMinimumTimestamp().orElse(Long.MAX_VALUE)).min() 303 .orElse(Long.MAX_VALUE); 304 305 List<Long> boundaries = new ArrayList<>(); 306 307 // Add startMillis of all windows between now and min timestamp 308 for (CompactionWindow window = getIncomingWindow(now); window 309 .compareToTimestamp(minTimestamp) > 0; window = window.nextEarlierWindow()) { 310 boundaries.add(window.startMillis()); 311 } 312 boundaries.add(Long.MIN_VALUE); 313 Collections.reverse(boundaries); 314 return boundaries; 315 } 316 317 /** 318 * @return a list of boundaries for multiple compaction output from minTimestamp to maxTimestamp. 319 */ 320 private static List<Long> getCompactionBoundariesForMinor(CompactionWindow window, 321 boolean singleOutput) { 322 List<Long> boundaries = new ArrayList<>(); 323 boundaries.add(Long.MIN_VALUE); 324 if (!singleOutput) { 325 boundaries.add(window.startMillis()); 326 } 327 return boundaries; 328 } 329 330 private CompactionWindow getIncomingWindow(long now) { 331 return windowFactory.newIncomingWindow(now); 332 } 333 334 private static long getOldestToCompact(long maxAgeMillis, long now) { 335 try { 336 return LongMath.checkedSubtract(now, maxAgeMillis); 337 } catch (ArithmeticException ae) { 338 LOG.warn("Value for " + CompactionConfiguration.DATE_TIERED_MAX_AGE_MILLIS_KEY + ": " 339 + maxAgeMillis + ". All the files will be eligible for minor compaction."); 340 return Long.MIN_VALUE; 341 } 342 } 343 344 private Map<Long, String> getBoundariesStoragePolicyForMinor(boolean singleOutput, 345 CompactionWindow window, long now) { 346 Map<Long, String> boundariesPolicy = new HashMap<>(); 347 if (!comConf.isDateTieredStoragePolicyEnable()) { 348 return boundariesPolicy; 349 } 350 String windowStoragePolicy = getWindowStoragePolicy(now, window.startMillis()); 351 if (singleOutput) { 352 boundariesPolicy.put(Long.MIN_VALUE, windowStoragePolicy); 353 } else { 354 boundariesPolicy.put(window.startMillis(), windowStoragePolicy); 355 } 356 return boundariesPolicy; 357 } 358 359 private Map<Long, String> getBoundariesStoragePolicyForMajor(List<Long> boundaries, long now) { 360 Map<Long, String> boundariesPolicy = new HashMap<>(); 361 if (!comConf.isDateTieredStoragePolicyEnable()) { 362 return boundariesPolicy; 363 } 364 for (Long startTs : boundaries) { 365 boundariesPolicy.put(startTs, getWindowStoragePolicy(now, startTs)); 366 } 367 return boundariesPolicy; 368 } 369 370 private String getWindowStoragePolicy(long now, long windowStartMillis) { 371 if (windowStartMillis >= (now - comConf.getHotWindowAgeMillis())) { 372 return comConf.getHotWindowStoragePolicy(); 373 } else if (windowStartMillis >= (now - comConf.getWarmWindowAgeMillis())) { 374 return comConf.getWarmWindowStoragePolicy(); 375 } 376 return comConf.getColdWindowStoragePolicy(); 377 } 378}