001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.compactions; 019 020import java.io.IOException; 021import java.net.UnknownHostException; 022import java.util.ArrayList; 023import java.util.Collection; 024import java.util.Collections; 025import java.util.HashMap; 026import java.util.List; 027import java.util.Map; 028import java.util.OptionalLong; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.HBaseInterfaceAudience; 031import org.apache.hadoop.hbase.HDFSBlocksDistribution; 032import org.apache.hadoop.hbase.regionserver.HStoreFile; 033import org.apache.hadoop.hbase.regionserver.StoreConfigInformation; 034import org.apache.hadoop.hbase.regionserver.StoreUtils; 035import org.apache.hadoop.hbase.util.DNS; 036import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 037import org.apache.hadoop.hbase.util.Pair; 038import org.apache.hadoop.hbase.util.ReflectionUtils; 039import org.apache.yetus.audience.InterfaceAudience; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043import org.apache.hbase.thirdparty.com.google.common.collect.Iterators; 044import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 045import org.apache.hbase.thirdparty.com.google.common.collect.PeekingIterator; 046import org.apache.hbase.thirdparty.com.google.common.math.LongMath; 047 048/** 049 * HBASE-15181 This is a simple implementation of date-based tiered compaction similar to 050 * Cassandra's for the following benefits: 051 * <ol> 052 * <li>Improve date-range-based scan by structuring store files in date-based tiered layout.</li> 053 * <li>Reduce compaction overhead.</li> 054 * <li>Improve TTL efficiency.</li> 055 * </ol> 056 * Perfect fit for the use cases that: 057 * <ol> 058 * <li>has mostly date-based data write and scan and a focus on the most recent data.</li> 059 * </ol> 060 * Out-of-order writes are handled gracefully. Time range overlapping among store files is tolerated 061 * and the performance impact is minimized. Configuration can be set at hbase-site or overridden at 062 * per-table or per-column-family level by hbase shell. Design spec is at 063 * https://docs.google.com/document/d/1_AmlNb2N8Us1xICsTeGDLKIqL6T-oHoRLZ323MG_uy8/ 064 */ 065@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 066public class DateTieredCompactionPolicy extends SortedCompactionPolicy { 067 068 private static final Logger LOG = LoggerFactory.getLogger(DateTieredCompactionPolicy.class); 069 070 protected final RatioBasedCompactionPolicy compactionPolicyPerWindow; 071 072 private final CompactionWindowFactory windowFactory; 073 074 public DateTieredCompactionPolicy(Configuration conf, StoreConfigInformation storeConfigInfo) 075 throws IOException { 076 super(conf, storeConfigInfo); 077 try { 078 compactionPolicyPerWindow = 079 ReflectionUtils.instantiateWithCustomCtor(comConf.getCompactionPolicyForDateTieredWindow(), 080 new Class[] { Configuration.class, StoreConfigInformation.class }, 081 new Object[] { conf, storeConfigInfo }); 082 } catch (Exception e) { 083 throw new IOException("Unable to load configured compaction policy '" 084 + comConf.getCompactionPolicyForDateTieredWindow() + "'", e); 085 } 086 try { 087 windowFactory = 088 ReflectionUtils.instantiateWithCustomCtor(comConf.getDateTieredCompactionWindowFactory(), 089 new Class[] { CompactionConfiguration.class }, new Object[] { comConf }); 090 } catch (Exception e) { 091 throw new IOException("Unable to load configured window factory '" 092 + comConf.getDateTieredCompactionWindowFactory() + "'", e); 093 } 094 } 095 096 /** 097 * Heuristics for guessing whether we need minor compaction. 098 */ 099 @Override 100 @InterfaceAudience.Private 101 public boolean needsCompaction(Collection<HStoreFile> storeFiles, 102 List<HStoreFile> filesCompacting) { 103 ArrayList<HStoreFile> candidates = new ArrayList<>(storeFiles); 104 try { 105 return !selectMinorCompaction(candidates, false, true).getFiles().isEmpty(); 106 } catch (Exception e) { 107 LOG.error("Can not check for compaction: ", e); 108 return false; 109 } 110 } 111 112 protected boolean isMajorCompactionTime(Collection<HStoreFile> filesToCompact, long now, 113 long lowestModificationTime) throws IOException { 114 long mcTime = getNextMajorCompactTime(filesToCompact); 115 if (filesToCompact == null || mcTime == 0) { 116 if (LOG.isDebugEnabled()) { 117 LOG.debug("filesToCompact: " + filesToCompact + " mcTime: " + mcTime); 118 } 119 return false; 120 } 121 // TODO: Use better method for determining stamp of last major (HBASE-2990) 122 if (lowestModificationTime <= 0L || lowestModificationTime >= (now - mcTime)) { 123 if (LOG.isDebugEnabled()) { 124 LOG.debug("lowTimestamp: " + lowestModificationTime + " lowTimestamp: " 125 + lowestModificationTime + " now: " + now + " mcTime: " + mcTime); 126 } 127 return false; 128 } 129 return true; 130 } 131 132 protected boolean checkForTtl(long ttl, HStoreFile file) { 133 OptionalLong minTimestamp = file.getMinimumTimestamp(); 134 long oldest = minTimestamp.isPresent() 135 ? EnvironmentEdgeManager.currentTime() - minTimestamp.getAsLong() 136 : Long.MIN_VALUE; 137 if (ttl != Long.MAX_VALUE && oldest >= ttl) { 138 LOG.debug("Major compaction triggered on store " + this + "; for TTL maintenance"); 139 return true; 140 } 141 return false; 142 } 143 144 protected boolean isMajorOrBulkloadResult(HStoreFile file, long timeDiff) { 145 if (!file.isMajorCompactionResult() || file.isBulkLoadResult()) { 146 LOG.debug("Major compaction triggered on store " + this 147 + ", because there are new files and time since last major compaction " + timeDiff + "ms"); 148 return true; 149 } 150 return false; 151 } 152 153 protected boolean checkBlockLocality(HDFSBlocksDistribution hdfsBlocksDistribution) 154 throws UnknownHostException { 155 float blockLocalityIndex = hdfsBlocksDistribution 156 .getBlockLocalityIndex(DNS.getHostname(comConf.conf, DNS.ServerType.REGIONSERVER)); 157 if (blockLocalityIndex < comConf.getMinLocalityToForceCompact()) { 158 LOG.debug("Major compaction triggered on store " + this 159 + "; to make hdfs blocks local, current blockLocalityIndex is " + blockLocalityIndex 160 + " (min " + comConf.getMinLocalityToForceCompact() + ")"); 161 return true; 162 } 163 return false; 164 } 165 166 @Override 167 public boolean shouldPerformMajorCompaction(Collection<HStoreFile> filesToCompact) 168 throws IOException { 169 long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact); 170 long now = EnvironmentEdgeManager.currentTime(); 171 if (isMajorCompactionTime(filesToCompact, now, lowTimestamp)) { 172 long cfTTL = this.storeConfigInfo.getStoreFileTtl(); 173 HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution(); 174 List<Long> boundaries = getCompactBoundariesForMajor(filesToCompact, now); 175 boolean[] filesInWindow = new boolean[boundaries.size()]; 176 for (HStoreFile file : filesToCompact) { 177 OptionalLong minTimestamp = file.getMinimumTimestamp(); 178 if (checkForTtl(cfTTL, file)) { 179 return true; 180 } 181 if (isMajorOrBulkloadResult(file, now - lowTimestamp)) { 182 return true; 183 } 184 int lowerWindowIndex = 185 Collections.binarySearch(boundaries, minTimestamp.orElse(Long.MAX_VALUE)); 186 int upperWindowIndex = 187 Collections.binarySearch(boundaries, file.getMaximumTimestamp().orElse(Long.MAX_VALUE)); 188 // Handle boundary conditions and negative values of binarySearch 189 lowerWindowIndex = 190 (lowerWindowIndex < 0) ? Math.abs(lowerWindowIndex + 2) : lowerWindowIndex; 191 upperWindowIndex = 192 (upperWindowIndex < 0) ? Math.abs(upperWindowIndex + 2) : upperWindowIndex; 193 if (lowerWindowIndex != upperWindowIndex) { 194 LOG.debug("Major compaction triggered on store " + this + "; because file " 195 + file.getPath() + " has data with timestamps cross window boundaries"); 196 return true; 197 } else if (filesInWindow[upperWindowIndex]) { 198 LOG.debug("Major compaction triggered on store " + this 199 + "; because there are more than one file in some windows"); 200 return true; 201 } else { 202 filesInWindow[upperWindowIndex] = true; 203 } 204 hdfsBlocksDistribution.add(file.getHDFSBlockDistribution()); 205 } 206 if (checkBlockLocality(hdfsBlocksDistribution)) { 207 return true; 208 } 209 LOG.debug( 210 "Skipping major compaction of " + this + ", because the files are already major compacted"); 211 } 212 return false; 213 } 214 215 @Override 216 protected CompactionRequestImpl createCompactionRequest(ArrayList<HStoreFile> candidateSelection, 217 boolean tryingMajor, boolean mayUseOffPeak, boolean mayBeStuck) throws IOException { 218 CompactionRequestImpl result = tryingMajor 219 ? selectMajorCompaction(candidateSelection) 220 : selectMinorCompaction(candidateSelection, mayUseOffPeak, mayBeStuck); 221 if (LOG.isDebugEnabled()) { 222 LOG.debug("Generated compaction request: " + result); 223 } 224 return result; 225 } 226 227 public CompactionRequestImpl selectMajorCompaction(ArrayList<HStoreFile> candidateSelection) { 228 long now = EnvironmentEdgeManager.currentTime(); 229 List<Long> boundaries = getCompactBoundariesForMajor(candidateSelection, now); 230 Map<Long, String> boundariesPolicies = getBoundariesStoragePolicyForMajor(boundaries, now); 231 return new DateTieredCompactionRequest(candidateSelection, boundaries, boundariesPolicies); 232 } 233 234 /** 235 * We receive store files sorted in ascending order by seqId then scan the list of files. If the 236 * current file has a maxTimestamp older than last known maximum, treat this file as it carries 237 * the last known maximum. This way both seqId and timestamp are in the same order. If files carry 238 * the same maxTimestamps, they are ordered by seqId. We then reverse the list so they are ordered 239 * by seqId and maxTimestamp in descending order and build the time windows. All the out-of-order 240 * data into the same compaction windows, guaranteeing contiguous compaction based on sequence id. 241 */ 242 public CompactionRequestImpl selectMinorCompaction(ArrayList<HStoreFile> candidateSelection, 243 boolean mayUseOffPeak, boolean mayBeStuck) throws IOException { 244 long now = EnvironmentEdgeManager.currentTime(); 245 long oldestToCompact = getOldestToCompact(comConf.getDateTieredMaxStoreFileAgeMillis(), now); 246 247 List<Pair<HStoreFile, Long>> storefileMaxTimestampPairs = 248 Lists.newArrayListWithCapacity(candidateSelection.size()); 249 long maxTimestampSeen = Long.MIN_VALUE; 250 for (HStoreFile storeFile : candidateSelection) { 251 // if there is out-of-order data, 252 // we put them in the same window as the last file in increasing order 253 maxTimestampSeen = 254 Math.max(maxTimestampSeen, storeFile.getMaximumTimestamp().orElse(Long.MIN_VALUE)); 255 storefileMaxTimestampPairs.add(new Pair<>(storeFile, maxTimestampSeen)); 256 } 257 Collections.reverse(storefileMaxTimestampPairs); 258 259 CompactionWindow window = getIncomingWindow(now); 260 int minThreshold = comConf.getDateTieredIncomingWindowMin(); 261 PeekingIterator<Pair<HStoreFile, Long>> it = 262 Iterators.peekingIterator(storefileMaxTimestampPairs.iterator()); 263 while (it.hasNext()) { 264 if (window.compareToTimestamp(oldestToCompact) < 0) { 265 break; 266 } 267 int compResult = window.compareToTimestamp(it.peek().getSecond()); 268 if (compResult > 0) { 269 // If the file is too old for the window, switch to the next window 270 window = window.nextEarlierWindow(); 271 minThreshold = comConf.getMinFilesToCompact(); 272 } else { 273 // The file is within the target window 274 ArrayList<HStoreFile> fileList = Lists.newArrayList(); 275 // Add all files in the same window. For incoming window 276 // we tolerate files with future data although it is sub-optimal 277 while (it.hasNext() && window.compareToTimestamp(it.peek().getSecond()) <= 0) { 278 fileList.add(it.next().getFirst()); 279 } 280 if (fileList.size() >= minThreshold) { 281 if (LOG.isDebugEnabled()) { 282 LOG.debug("Processing files: " + fileList + " for window: " + window); 283 } 284 DateTieredCompactionRequest request = generateCompactionRequest(fileList, window, 285 mayUseOffPeak, mayBeStuck, minThreshold, now); 286 if (request != null) { 287 return request; 288 } 289 } 290 } 291 } 292 // A non-null file list is expected by HStore 293 return new CompactionRequestImpl(Collections.emptyList()); 294 } 295 296 private DateTieredCompactionRequest generateCompactionRequest(ArrayList<HStoreFile> storeFiles, 297 CompactionWindow window, boolean mayUseOffPeak, boolean mayBeStuck, int minThreshold, long now) 298 throws IOException { 299 // The files has to be in ascending order for ratio-based compaction to work right 300 // and removeExcessFile to exclude youngest files. 301 Collections.reverse(storeFiles); 302 303 // Compact everything in the window if have more files than comConf.maxBlockingFiles 304 compactionPolicyPerWindow.setMinThreshold(minThreshold); 305 ArrayList<HStoreFile> storeFileSelection = mayBeStuck 306 ? storeFiles 307 : compactionPolicyPerWindow.applyCompactionPolicy(storeFiles, mayUseOffPeak, false); 308 if (storeFileSelection != null && !storeFileSelection.isEmpty()) { 309 // If there is any file in the window excluded from compaction, 310 // only one file will be output from compaction. 311 boolean singleOutput = storeFiles.size() != storeFileSelection.size() 312 || comConf.useDateTieredSingleOutputForMinorCompaction(); 313 List<Long> boundaries = getCompactionBoundariesForMinor(window, singleOutput); 314 // we want to generate policy to boundaries for minor compaction 315 Map<Long, String> boundaryPolicyMap = 316 getBoundariesStoragePolicyForMinor(singleOutput, window, now); 317 DateTieredCompactionRequest result = 318 new DateTieredCompactionRequest(storeFileSelection, boundaries, boundaryPolicyMap); 319 return result; 320 } 321 return null; 322 } 323 324 /** 325 * Return a list of boundaries for multiple compaction output in ascending order. 326 */ 327 protected List<Long> getCompactBoundariesForMajor(Collection<HStoreFile> filesToCompact, 328 long now) { 329 long minTimestamp = filesToCompact.stream() 330 .mapToLong(f -> f.getMinimumTimestamp().orElse(Long.MAX_VALUE)).min().orElse(Long.MAX_VALUE); 331 332 List<Long> boundaries = new ArrayList<>(); 333 334 // Add startMillis of all windows between now and min timestamp 335 for (CompactionWindow window = getIncomingWindow(now); window.compareToTimestamp(minTimestamp) 336 > 0; window = window.nextEarlierWindow()) { 337 boundaries.add(window.startMillis()); 338 } 339 boundaries.add(Long.MIN_VALUE); 340 Collections.reverse(boundaries); 341 return boundaries; 342 } 343 344 /** 345 * Returns a list of boundaries for multiple compaction output from minTimestamp to maxTimestamp. 346 */ 347 private static List<Long> getCompactionBoundariesForMinor(CompactionWindow window, 348 boolean singleOutput) { 349 List<Long> boundaries = new ArrayList<>(); 350 boundaries.add(Long.MIN_VALUE); 351 if (!singleOutput) { 352 boundaries.add(window.startMillis()); 353 } 354 return boundaries; 355 } 356 357 private CompactionWindow getIncomingWindow(long now) { 358 return windowFactory.newIncomingWindow(now); 359 } 360 361 private static long getOldestToCompact(long maxAgeMillis, long now) { 362 try { 363 return LongMath.checkedSubtract(now, maxAgeMillis); 364 } catch (ArithmeticException ae) { 365 LOG.warn("Value for " + CompactionConfiguration.DATE_TIERED_MAX_AGE_MILLIS_KEY + ": " 366 + maxAgeMillis + ". All the files will be eligible for minor compaction."); 367 return Long.MIN_VALUE; 368 } 369 } 370 371 private Map<Long, String> getBoundariesStoragePolicyForMinor(boolean singleOutput, 372 CompactionWindow window, long now) { 373 Map<Long, String> boundariesPolicy = new HashMap<>(); 374 if (!comConf.isDateTieredStoragePolicyEnable()) { 375 return boundariesPolicy; 376 } 377 String windowStoragePolicy = getWindowStoragePolicy(now, window.startMillis()); 378 if (singleOutput) { 379 boundariesPolicy.put(Long.MIN_VALUE, windowStoragePolicy); 380 } else { 381 boundariesPolicy.put(window.startMillis(), windowStoragePolicy); 382 } 383 return boundariesPolicy; 384 } 385 386 private Map<Long, String> getBoundariesStoragePolicyForMajor(List<Long> boundaries, long now) { 387 Map<Long, String> boundariesPolicy = new HashMap<>(); 388 if (!comConf.isDateTieredStoragePolicyEnable()) { 389 return boundariesPolicy; 390 } 391 for (Long startTs : boundaries) { 392 boundariesPolicy.put(startTs, getWindowStoragePolicy(now, startTs)); 393 } 394 return boundariesPolicy; 395 } 396 397 private String getWindowStoragePolicy(long now, long windowStartMillis) { 398 if (windowStartMillis >= (now - comConf.getHotWindowAgeMillis())) { 399 return comConf.getHotWindowStoragePolicy(); 400 } else if (windowStartMillis >= (now - comConf.getWarmWindowAgeMillis())) { 401 return comConf.getWarmWindowStoragePolicy(); 402 } 403 return comConf.getColdWindowStoragePolicy(); 404 } 405}