001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.compactions; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collection; 023import java.util.Collections; 024import java.util.HashMap; 025import java.util.List; 026import java.util.Map; 027import java.util.OptionalLong; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.HBaseInterfaceAudience; 030import org.apache.hadoop.hbase.HDFSBlocksDistribution; 031import org.apache.hadoop.hbase.regionserver.HStoreFile; 032import org.apache.hadoop.hbase.regionserver.StoreConfigInformation; 033import org.apache.hadoop.hbase.regionserver.StoreUtils; 034import org.apache.hadoop.hbase.util.DNS; 035import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 036import org.apache.hadoop.hbase.util.Pair; 037import org.apache.hadoop.hbase.util.ReflectionUtils; 038import org.apache.yetus.audience.InterfaceAudience; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042import org.apache.hbase.thirdparty.com.google.common.collect.Iterators; 043import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 044import org.apache.hbase.thirdparty.com.google.common.collect.PeekingIterator; 045import org.apache.hbase.thirdparty.com.google.common.math.LongMath; 046 047/** 048 * HBASE-15181 This is a simple implementation of date-based tiered compaction similar to 049 * Cassandra's for the following benefits: 050 * <ol> 051 * <li>Improve date-range-based scan by structuring store files in date-based tiered layout.</li> 052 * <li>Reduce compaction overhead.</li> 053 * <li>Improve TTL efficiency.</li> 054 * </ol> 055 * Perfect fit for the use cases that: 056 * <ol> 057 * <li>has mostly date-based data write and scan and a focus on the most recent data.</li> 058 * </ol> 059 * Out-of-order writes are handled gracefully. Time range overlapping among store files is tolerated 060 * and the performance impact is minimized. Configuration can be set at hbase-site or overridden at 061 * per-table or per-column-family level by hbase shell. Design spec is at 062 * https://docs.google.com/document/d/1_AmlNb2N8Us1xICsTeGDLKIqL6T-oHoRLZ323MG_uy8/ 063 */ 064@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 065public class DateTieredCompactionPolicy extends SortedCompactionPolicy { 066 067 private static final Logger LOG = LoggerFactory.getLogger(DateTieredCompactionPolicy.class); 068 069 private final RatioBasedCompactionPolicy compactionPolicyPerWindow; 070 071 private final CompactionWindowFactory windowFactory; 072 073 public DateTieredCompactionPolicy(Configuration conf, StoreConfigInformation storeConfigInfo) 074 throws IOException { 075 super(conf, storeConfigInfo); 076 try { 077 compactionPolicyPerWindow = 078 ReflectionUtils.instantiateWithCustomCtor(comConf.getCompactionPolicyForDateTieredWindow(), 079 new Class[] { Configuration.class, StoreConfigInformation.class }, 080 new Object[] { conf, storeConfigInfo }); 081 } catch (Exception e) { 082 throw new IOException("Unable to load configured compaction policy '" 083 + comConf.getCompactionPolicyForDateTieredWindow() + "'", e); 084 } 085 try { 086 windowFactory = 087 ReflectionUtils.instantiateWithCustomCtor(comConf.getDateTieredCompactionWindowFactory(), 088 new Class[] { CompactionConfiguration.class }, new Object[] { comConf }); 089 } catch (Exception e) { 090 throw new IOException("Unable to load configured window factory '" 091 + comConf.getDateTieredCompactionWindowFactory() + "'", e); 092 } 093 } 094 095 /** 096 * Heuristics for guessing whether we need minor compaction. 097 */ 098 @InterfaceAudience.Private 099 @Override 100 public boolean needsCompaction(Collection<HStoreFile> storeFiles, 101 List<HStoreFile> filesCompacting) { 102 ArrayList<HStoreFile> candidates = new ArrayList<>(storeFiles); 103 try { 104 return !selectMinorCompaction(candidates, false, true).getFiles().isEmpty(); 105 } catch (Exception e) { 106 LOG.error("Can not check for compaction: ", e); 107 return false; 108 } 109 } 110 111 @Override 112 public boolean shouldPerformMajorCompaction(Collection<HStoreFile> filesToCompact) 113 throws IOException { 114 long mcTime = getNextMajorCompactTime(filesToCompact); 115 if (filesToCompact == null || mcTime == 0) { 116 if (LOG.isDebugEnabled()) { 117 LOG.debug("filesToCompact: " + filesToCompact + " mcTime: " + mcTime); 118 } 119 return false; 120 } 121 122 // TODO: Use better method for determining stamp of last major (HBASE-2990) 123 long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact); 124 long now = EnvironmentEdgeManager.currentTime(); 125 if (lowTimestamp <= 0L || lowTimestamp >= (now - mcTime)) { 126 if (LOG.isDebugEnabled()) { 127 LOG.debug("lowTimestamp: " + lowTimestamp + " lowTimestamp: " + lowTimestamp + " now: " 128 + now + " mcTime: " + mcTime); 129 } 130 return false; 131 } 132 133 long cfTTL = this.storeConfigInfo.getStoreFileTtl(); 134 HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution(); 135 List<Long> boundaries = getCompactBoundariesForMajor(filesToCompact, now); 136 boolean[] filesInWindow = new boolean[boundaries.size()]; 137 138 for (HStoreFile file : filesToCompact) { 139 OptionalLong minTimestamp = file.getMinimumTimestamp(); 140 long oldest = minTimestamp.isPresent() ? now - minTimestamp.getAsLong() : Long.MIN_VALUE; 141 if (cfTTL != Long.MAX_VALUE && oldest >= cfTTL) { 142 LOG.debug("Major compaction triggered on store " + this + "; for TTL maintenance"); 143 return true; 144 } 145 if (!file.isMajorCompactionResult() || file.isBulkLoadResult()) { 146 LOG.debug("Major compaction triggered on store " + this 147 + ", because there are new files and time since last major compaction " 148 + (now - lowTimestamp) + "ms"); 149 return true; 150 } 151 152 int lowerWindowIndex = 153 Collections.binarySearch(boundaries, minTimestamp.orElse(Long.MAX_VALUE)); 154 int upperWindowIndex = 155 Collections.binarySearch(boundaries, file.getMaximumTimestamp().orElse(Long.MAX_VALUE)); 156 // Handle boundary conditions and negative values of binarySearch 157 lowerWindowIndex = (lowerWindowIndex < 0) ? Math.abs(lowerWindowIndex + 2) : lowerWindowIndex; 158 upperWindowIndex = (upperWindowIndex < 0) ? Math.abs(upperWindowIndex + 2) : upperWindowIndex; 159 if (lowerWindowIndex != upperWindowIndex) { 160 LOG.debug("Major compaction triggered on store " + this + "; because file " + file.getPath() 161 + " has data with timestamps cross window boundaries"); 162 return true; 163 } else if (filesInWindow[upperWindowIndex]) { 164 LOG.debug("Major compaction triggered on store " + this 165 + "; because there are more than one file in some windows"); 166 return true; 167 } else { 168 filesInWindow[upperWindowIndex] = true; 169 } 170 hdfsBlocksDistribution.add(file.getHDFSBlockDistribution()); 171 } 172 173 float blockLocalityIndex = hdfsBlocksDistribution 174 .getBlockLocalityIndex(DNS.getHostname(comConf.conf, DNS.ServerType.REGIONSERVER)); 175 if (blockLocalityIndex < comConf.getMinLocalityToForceCompact()) { 176 LOG.debug("Major compaction triggered on store " + this 177 + "; to make hdfs blocks local, current blockLocalityIndex is " + blockLocalityIndex 178 + " (min " + comConf.getMinLocalityToForceCompact() + ")"); 179 return true; 180 } 181 182 LOG.debug( 183 "Skipping major compaction of " + this + ", because the files are already major compacted"); 184 return false; 185 } 186 187 @Override 188 protected CompactionRequestImpl createCompactionRequest(ArrayList<HStoreFile> candidateSelection, 189 boolean tryingMajor, boolean mayUseOffPeak, boolean mayBeStuck) throws IOException { 190 CompactionRequestImpl result = tryingMajor 191 ? selectMajorCompaction(candidateSelection) 192 : selectMinorCompaction(candidateSelection, mayUseOffPeak, mayBeStuck); 193 if (LOG.isDebugEnabled()) { 194 LOG.debug("Generated compaction request: " + result); 195 } 196 return result; 197 } 198 199 public CompactionRequestImpl selectMajorCompaction(ArrayList<HStoreFile> candidateSelection) { 200 long now = EnvironmentEdgeManager.currentTime(); 201 List<Long> boundaries = getCompactBoundariesForMajor(candidateSelection, now); 202 Map<Long, String> boundariesPolicies = getBoundariesStoragePolicyForMajor(boundaries, now); 203 return new DateTieredCompactionRequest(candidateSelection, boundaries, boundariesPolicies); 204 } 205 206 /** 207 * We receive store files sorted in ascending order by seqId then scan the list of files. If the 208 * current file has a maxTimestamp older than last known maximum, treat this file as it carries 209 * the last known maximum. This way both seqId and timestamp are in the same order. If files carry 210 * the same maxTimestamps, they are ordered by seqId. We then reverse the list so they are ordered 211 * by seqId and maxTimestamp in descending order and build the time windows. All the out-of-order 212 * data into the same compaction windows, guaranteeing contiguous compaction based on sequence id. 213 */ 214 public CompactionRequestImpl selectMinorCompaction(ArrayList<HStoreFile> candidateSelection, 215 boolean mayUseOffPeak, boolean mayBeStuck) throws IOException { 216 long now = EnvironmentEdgeManager.currentTime(); 217 long oldestToCompact = getOldestToCompact(comConf.getDateTieredMaxStoreFileAgeMillis(), now); 218 219 List<Pair<HStoreFile, Long>> storefileMaxTimestampPairs = 220 Lists.newArrayListWithCapacity(candidateSelection.size()); 221 long maxTimestampSeen = Long.MIN_VALUE; 222 for (HStoreFile storeFile : candidateSelection) { 223 // if there is out-of-order data, 224 // we put them in the same window as the last file in increasing order 225 maxTimestampSeen = 226 Math.max(maxTimestampSeen, storeFile.getMaximumTimestamp().orElse(Long.MIN_VALUE)); 227 storefileMaxTimestampPairs.add(new Pair<>(storeFile, maxTimestampSeen)); 228 } 229 Collections.reverse(storefileMaxTimestampPairs); 230 231 CompactionWindow window = getIncomingWindow(now); 232 int minThreshold = comConf.getDateTieredIncomingWindowMin(); 233 PeekingIterator<Pair<HStoreFile, Long>> it = 234 Iterators.peekingIterator(storefileMaxTimestampPairs.iterator()); 235 while (it.hasNext()) { 236 if (window.compareToTimestamp(oldestToCompact) < 0) { 237 break; 238 } 239 int compResult = window.compareToTimestamp(it.peek().getSecond()); 240 if (compResult > 0) { 241 // If the file is too old for the window, switch to the next window 242 window = window.nextEarlierWindow(); 243 minThreshold = comConf.getMinFilesToCompact(); 244 } else { 245 // The file is within the target window 246 ArrayList<HStoreFile> fileList = Lists.newArrayList(); 247 // Add all files in the same window. For incoming window 248 // we tolerate files with future data although it is sub-optimal 249 while (it.hasNext() && window.compareToTimestamp(it.peek().getSecond()) <= 0) { 250 fileList.add(it.next().getFirst()); 251 } 252 if (fileList.size() >= minThreshold) { 253 if (LOG.isDebugEnabled()) { 254 LOG.debug("Processing files: " + fileList + " for window: " + window); 255 } 256 DateTieredCompactionRequest request = generateCompactionRequest(fileList, window, 257 mayUseOffPeak, mayBeStuck, minThreshold, now); 258 if (request != null) { 259 return request; 260 } 261 } 262 } 263 } 264 // A non-null file list is expected by HStore 265 return new CompactionRequestImpl(Collections.emptyList()); 266 } 267 268 private DateTieredCompactionRequest generateCompactionRequest(ArrayList<HStoreFile> storeFiles, 269 CompactionWindow window, boolean mayUseOffPeak, boolean mayBeStuck, int minThreshold, long now) 270 throws IOException { 271 // The files has to be in ascending order for ratio-based compaction to work right 272 // and removeExcessFile to exclude youngest files. 273 Collections.reverse(storeFiles); 274 275 // Compact everything in the window if have more files than comConf.maxBlockingFiles 276 compactionPolicyPerWindow.setMinThreshold(minThreshold); 277 ArrayList<HStoreFile> storeFileSelection = mayBeStuck 278 ? storeFiles 279 : compactionPolicyPerWindow.applyCompactionPolicy(storeFiles, mayUseOffPeak, false); 280 if (storeFileSelection != null && !storeFileSelection.isEmpty()) { 281 // If there is any file in the window excluded from compaction, 282 // only one file will be output from compaction. 283 boolean singleOutput = storeFiles.size() != storeFileSelection.size() 284 || comConf.useDateTieredSingleOutputForMinorCompaction(); 285 List<Long> boundaries = getCompactionBoundariesForMinor(window, singleOutput); 286 // we want to generate policy to boundaries for minor compaction 287 Map<Long, String> boundaryPolicyMap = 288 getBoundariesStoragePolicyForMinor(singleOutput, window, now); 289 DateTieredCompactionRequest result = 290 new DateTieredCompactionRequest(storeFileSelection, boundaries, boundaryPolicyMap); 291 return result; 292 } 293 return null; 294 } 295 296 /** 297 * Return a list of boundaries for multiple compaction output in ascending order. 298 */ 299 private List<Long> getCompactBoundariesForMajor(Collection<HStoreFile> filesToCompact, long now) { 300 long minTimestamp = filesToCompact.stream() 301 .mapToLong(f -> f.getMinimumTimestamp().orElse(Long.MAX_VALUE)).min().orElse(Long.MAX_VALUE); 302 303 List<Long> boundaries = new ArrayList<>(); 304 305 // Add startMillis of all windows between now and min timestamp 306 for (CompactionWindow window = getIncomingWindow(now); window.compareToTimestamp(minTimestamp) 307 > 0; window = window.nextEarlierWindow()) { 308 boundaries.add(window.startMillis()); 309 } 310 boundaries.add(Long.MIN_VALUE); 311 Collections.reverse(boundaries); 312 return boundaries; 313 } 314 315 /** 316 * Returns a list of boundaries for multiple compaction output from minTimestamp to maxTimestamp. 317 */ 318 private static List<Long> getCompactionBoundariesForMinor(CompactionWindow window, 319 boolean singleOutput) { 320 List<Long> boundaries = new ArrayList<>(); 321 boundaries.add(Long.MIN_VALUE); 322 if (!singleOutput) { 323 boundaries.add(window.startMillis()); 324 } 325 return boundaries; 326 } 327 328 private CompactionWindow getIncomingWindow(long now) { 329 return windowFactory.newIncomingWindow(now); 330 } 331 332 private static long getOldestToCompact(long maxAgeMillis, long now) { 333 try { 334 return LongMath.checkedSubtract(now, maxAgeMillis); 335 } catch (ArithmeticException ae) { 336 LOG.warn("Value for " + CompactionConfiguration.DATE_TIERED_MAX_AGE_MILLIS_KEY + ": " 337 + maxAgeMillis + ". All the files will be eligible for minor compaction."); 338 return Long.MIN_VALUE; 339 } 340 } 341 342 private Map<Long, String> getBoundariesStoragePolicyForMinor(boolean singleOutput, 343 CompactionWindow window, long now) { 344 Map<Long, String> boundariesPolicy = new HashMap<>(); 345 if (!comConf.isDateTieredStoragePolicyEnable()) { 346 return boundariesPolicy; 347 } 348 String windowStoragePolicy = getWindowStoragePolicy(now, window.startMillis()); 349 if (singleOutput) { 350 boundariesPolicy.put(Long.MIN_VALUE, windowStoragePolicy); 351 } else { 352 boundariesPolicy.put(window.startMillis(), windowStoragePolicy); 353 } 354 return boundariesPolicy; 355 } 356 357 private Map<Long, String> getBoundariesStoragePolicyForMajor(List<Long> boundaries, long now) { 358 Map<Long, String> boundariesPolicy = new HashMap<>(); 359 if (!comConf.isDateTieredStoragePolicyEnable()) { 360 return boundariesPolicy; 361 } 362 for (Long startTs : boundaries) { 363 boundariesPolicy.put(startTs, getWindowStoragePolicy(now, startTs)); 364 } 365 return boundariesPolicy; 366 } 367 368 private String getWindowStoragePolicy(long now, long windowStartMillis) { 369 if (windowStartMillis >= (now - comConf.getHotWindowAgeMillis())) { 370 return comConf.getHotWindowStoragePolicy(); 371 } else if (windowStartMillis >= (now - comConf.getWarmWindowAgeMillis())) { 372 return comConf.getWarmWindowStoragePolicy(); 373 } 374 return comConf.getColdWindowStoragePolicy(); 375 } 376}