001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.regionserver.compactions; 020 021import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY; 022 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.Collection; 026import java.util.List; 027 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.CellComparator; 031import org.apache.hadoop.hbase.regionserver.HStoreFile; 032import org.apache.hadoop.hbase.regionserver.StoreConfigInformation; 033import org.apache.hadoop.hbase.regionserver.StoreUtils; 034import org.apache.hadoop.hbase.regionserver.StripeStoreConfig; 035import org.apache.hadoop.hbase.regionserver.StripeStoreFlusher; 036import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 037import org.apache.hadoop.hbase.security.User; 038import org.apache.hadoop.hbase.util.Bytes; 039import org.apache.hadoop.hbase.util.ConcatenatedLists; 040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 041import org.apache.hadoop.hbase.util.Pair; 042import org.apache.yetus.audience.InterfaceAudience; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 046 047/** 048 * Stripe store implementation of compaction policy. 049 */ 050@InterfaceAudience.Private 051public class StripeCompactionPolicy extends CompactionPolicy { 052 private final static Logger LOG = LoggerFactory.getLogger(StripeCompactionPolicy.class); 053 // Policy used to compact individual stripes. 054 private ExploringCompactionPolicy stripePolicy = null; 055 056 private StripeStoreConfig config; 057 058 public StripeCompactionPolicy( 059 Configuration conf, StoreConfigInformation storeConfigInfo, StripeStoreConfig config) { 060 super(conf, storeConfigInfo); 061 this.config = config; 062 stripePolicy = new ExploringCompactionPolicy(conf, storeConfigInfo); 063 } 064 065 public List<HStoreFile> preSelectFilesForCoprocessor(StripeInformationProvider si, 066 List<HStoreFile> filesCompacting) { 067 // We sincerely hope nobody is messing with us with their coprocessors. 068 // If they do, they are very likely to shoot themselves in the foot. 069 // We'll just exclude all the filesCompacting from the list. 070 ArrayList<HStoreFile> candidateFiles = new ArrayList<>(si.getStorefiles()); 071 candidateFiles.removeAll(filesCompacting); 072 return candidateFiles; 073 } 074 075 public StripeCompactionRequest createEmptyRequest( 076 StripeInformationProvider si, CompactionRequestImpl request) { 077 // Treat as L0-ish compaction with fixed set of files, and hope for the best. 078 if (si.getStripeCount() > 0) { 079 return new BoundaryStripeCompactionRequest(request, si.getStripeBoundaries()); 080 } 081 Pair<Long, Integer> targetKvsAndCount = estimateTargetKvs( 082 request.getFiles(), this.config.getInitialCount()); 083 return new SplitStripeCompactionRequest( 084 request, OPEN_KEY, OPEN_KEY, targetKvsAndCount.getSecond(), targetKvsAndCount.getFirst()); 085 } 086 087 public StripeStoreFlusher.StripeFlushRequest selectFlush(CellComparator comparator, 088 StripeInformationProvider si, int kvCount) { 089 if (this.config.isUsingL0Flush()) { 090 // L0 is used, return dumb request. 091 return new StripeStoreFlusher.StripeFlushRequest(comparator); 092 } 093 if (si.getStripeCount() == 0) { 094 // No stripes - start with the requisite count, derive KVs per stripe. 095 int initialCount = this.config.getInitialCount(); 096 return new StripeStoreFlusher.SizeStripeFlushRequest(comparator, initialCount, 097 kvCount / initialCount); 098 } 099 // There are stripes - do according to the boundaries. 100 return new StripeStoreFlusher.BoundaryStripeFlushRequest(comparator, si.getStripeBoundaries()); 101 } 102 103 public StripeCompactionRequest selectCompaction(StripeInformationProvider si, 104 List<HStoreFile> filesCompacting, boolean isOffpeak) throws IOException { 105 // TODO: first cut - no parallel compactions. To have more fine grained control we 106 // probably need structure more sophisticated than a list. 107 if (!filesCompacting.isEmpty()) { 108 LOG.debug("Not selecting compaction: " + filesCompacting.size() + " files compacting"); 109 return null; 110 } 111 112 // We are going to do variations of compaction in strict order of preference. 113 // A better/more advanced approach is to use a heuristic to see which one is "more 114 // necessary" at current time. 115 116 // This can happen due to region split. We can skip it later; for now preserve 117 // compact-all-things behavior. 118 Collection<HStoreFile> allFiles = si.getStorefiles(); 119 if (StoreUtils.hasReferences(allFiles)) { 120 LOG.debug("There are references in the store; compacting all files"); 121 long targetKvs = estimateTargetKvs(allFiles, config.getInitialCount()).getFirst(); 122 SplitStripeCompactionRequest request = new SplitStripeCompactionRequest( 123 allFiles, OPEN_KEY, OPEN_KEY, targetKvs); 124 request.setMajorRangeFull(); 125 request.getRequest().setAfterSplit(true); 126 return request; 127 } 128 129 int stripeCount = si.getStripeCount(); 130 List<HStoreFile> l0Files = si.getLevel0Files(); 131 132 // See if we need to make new stripes. 133 boolean shouldCompactL0 = (this.config.getLevel0MinFiles() <= l0Files.size()); 134 if (stripeCount == 0) { 135 if (!shouldCompactL0) return null; // nothing to do. 136 return selectNewStripesCompaction(si); 137 } 138 139 boolean canDropDeletesNoL0 = l0Files.isEmpty(); 140 if (shouldCompactL0) { 141 if (!canDropDeletesNoL0) { 142 // If we need to compact L0, see if we can add something to it, and drop deletes. 143 StripeCompactionRequest result = selectSingleStripeCompaction( 144 si, true, canDropDeletesNoL0, isOffpeak); 145 if (result != null) return result; 146 } 147 LOG.debug("Selecting L0 compaction with " + l0Files.size() + " files"); 148 return new BoundaryStripeCompactionRequest(l0Files, si.getStripeBoundaries()); 149 } 150 151 // Try to delete fully expired stripes 152 StripeCompactionRequest result = selectExpiredMergeCompaction(si, canDropDeletesNoL0); 153 if (result != null) return result; 154 155 // Ok, nothing special here, let's see if we need to do a common compaction. 156 // This will also split the stripes that are too big if needed. 157 return selectSingleStripeCompaction(si, false, canDropDeletesNoL0, isOffpeak); 158 } 159 160 public boolean needsCompactions(StripeInformationProvider si, List<HStoreFile> filesCompacting) { 161 // Approximation on whether we need compaction. 162 return filesCompacting.isEmpty() 163 && (StoreUtils.hasReferences(si.getStorefiles()) 164 || (si.getLevel0Files().size() >= this.config.getLevel0MinFiles()) 165 || needsSingleStripeCompaction(si)); 166 } 167 168 @Override 169 public boolean shouldPerformMajorCompaction(Collection<HStoreFile> filesToCompact) 170 throws IOException { 171 return false; // there's never a major compaction! 172 } 173 174 @Override 175 public boolean throttleCompaction(long compactionSize) { 176 return compactionSize > comConf.getThrottlePoint(); 177 } 178 179 /** 180 * @param si StoreFileManager. 181 * @return Whether any stripe potentially needs compaction. 182 */ 183 protected boolean needsSingleStripeCompaction(StripeInformationProvider si) { 184 int minFiles = this.config.getStripeCompactMinFiles(); 185 for (List<HStoreFile> stripe : si.getStripes()) { 186 if (stripe.size() >= minFiles) return true; 187 } 188 return false; 189 } 190 191 protected StripeCompactionRequest selectSingleStripeCompaction(StripeInformationProvider si, 192 boolean includeL0, boolean canDropDeletesWithoutL0, boolean isOffpeak) throws IOException { 193 ArrayList<ImmutableList<HStoreFile>> stripes = si.getStripes(); 194 195 int bqIndex = -1; 196 List<HStoreFile> bqSelection = null; 197 int stripeCount = stripes.size(); 198 long bqTotalSize = -1; 199 for (int i = 0; i < stripeCount; ++i) { 200 // If we want to compact L0 to drop deletes, we only want whole-stripe compactions. 201 // So, pass includeL0 as 2nd parameter to indicate that. 202 List<HStoreFile> selection = selectSimpleCompaction(stripes.get(i), 203 !canDropDeletesWithoutL0 && includeL0, isOffpeak); 204 if (selection.isEmpty()) continue; 205 long size = 0; 206 for (HStoreFile sf : selection) { 207 size += sf.getReader().length(); 208 } 209 if (bqSelection == null || selection.size() > bqSelection.size() || 210 (selection.size() == bqSelection.size() && size < bqTotalSize)) { 211 bqSelection = selection; 212 bqIndex = i; 213 bqTotalSize = size; 214 } 215 } 216 if (bqSelection == null) { 217 LOG.debug("No good compaction is possible in any stripe"); 218 return null; 219 } 220 List<HStoreFile> filesToCompact = new ArrayList<>(bqSelection); 221 // See if we can, and need to, split this stripe. 222 int targetCount = 1; 223 long targetKvs = Long.MAX_VALUE; 224 boolean hasAllFiles = filesToCompact.size() == stripes.get(bqIndex).size(); 225 String splitString = ""; 226 if (hasAllFiles && bqTotalSize >= config.getSplitSize()) { 227 if (includeL0) { 228 // We want to avoid the scenario where we compact a stripe w/L0 and then split it. 229 // So, if we might split, don't compact the stripe with L0. 230 return null; 231 } 232 Pair<Long, Integer> kvsAndCount = estimateTargetKvs(filesToCompact, config.getSplitCount()); 233 targetKvs = kvsAndCount.getFirst(); 234 targetCount = kvsAndCount.getSecond(); 235 splitString = "; the stripe will be split into at most " 236 + targetCount + " stripes with " + targetKvs + " target KVs"; 237 } 238 239 LOG.debug("Found compaction in a stripe with end key [" 240 + Bytes.toString(si.getEndRow(bqIndex)) + "], with " 241 + filesToCompact.size() + " files of total size " + bqTotalSize + splitString); 242 243 // See if we can drop deletes. 244 StripeCompactionRequest req; 245 if (includeL0) { 246 assert hasAllFiles; 247 List<HStoreFile> l0Files = si.getLevel0Files(); 248 LOG.debug("Adding " + l0Files.size() + " files to compaction to be able to drop deletes"); 249 ConcatenatedLists<HStoreFile> sfs = new ConcatenatedLists<>(); 250 sfs.addSublist(filesToCompact); 251 sfs.addSublist(l0Files); 252 req = new BoundaryStripeCompactionRequest(sfs, si.getStripeBoundaries()); 253 } else { 254 req = new SplitStripeCompactionRequest( 255 filesToCompact, si.getStartRow(bqIndex), si.getEndRow(bqIndex), targetCount, targetKvs); 256 } 257 if (hasAllFiles && (canDropDeletesWithoutL0 || includeL0)) { 258 req.setMajorRange(si.getStartRow(bqIndex), si.getEndRow(bqIndex)); 259 } 260 req.getRequest().setOffPeak(isOffpeak); 261 return req; 262 } 263 264 /** 265 * Selects the compaction of a single stripe using default policy. 266 * @param sfs Files. 267 * @param allFilesOnly Whether a compaction of all-or-none files is needed. 268 * @return The resulting selection. 269 */ 270 private List<HStoreFile> selectSimpleCompaction( 271 List<HStoreFile> sfs, boolean allFilesOnly, boolean isOffpeak) { 272 int minFilesLocal = Math.max( 273 allFilesOnly ? sfs.size() : 0, this.config.getStripeCompactMinFiles()); 274 int maxFilesLocal = Math.max(this.config.getStripeCompactMaxFiles(), minFilesLocal); 275 return stripePolicy.applyCompactionPolicy(sfs, false, isOffpeak, minFilesLocal, maxFilesLocal); 276 } 277 278 private StripeCompactionRequest selectNewStripesCompaction(StripeInformationProvider si) { 279 List<HStoreFile> l0Files = si.getLevel0Files(); 280 Pair<Long, Integer> kvsAndCount = estimateTargetKvs(l0Files, config.getInitialCount()); 281 LOG.debug("Creating " + kvsAndCount.getSecond() + " initial stripes with " 282 + kvsAndCount.getFirst() + " kvs each via L0 compaction of " + l0Files.size() + " files"); 283 SplitStripeCompactionRequest request = new SplitStripeCompactionRequest( 284 si.getLevel0Files(), OPEN_KEY, OPEN_KEY, kvsAndCount.getSecond(), kvsAndCount.getFirst()); 285 request.setMajorRangeFull(); // L0 only, can drop deletes. 286 return request; 287 } 288 289 private StripeCompactionRequest selectExpiredMergeCompaction( 290 StripeInformationProvider si, boolean canDropDeletesNoL0) { 291 long cfTtl = this.storeConfigInfo.getStoreFileTtl(); 292 if (cfTtl == Long.MAX_VALUE) { 293 return null; // minversion might be set, cannot delete old files 294 } 295 long timestampCutoff = EnvironmentEdgeManager.currentTime() - cfTtl; 296 // Merge the longest sequence of stripes where all files have expired, if any. 297 int start = -1, bestStart = -1, length = 0, bestLength = 0; 298 ArrayList<ImmutableList<HStoreFile>> stripes = si.getStripes(); 299 OUTER: for (int i = 0; i < stripes.size(); ++i) { 300 for (HStoreFile storeFile : stripes.get(i)) { 301 if (storeFile.getReader().getMaxTimestamp() < timestampCutoff) continue; 302 // Found non-expired file, this stripe has to stay. 303 if (length > bestLength) { 304 bestStart = start; 305 bestLength = length; 306 } 307 start = -1; 308 length = 0; 309 continue OUTER; 310 } 311 if (start == -1) { 312 start = i; 313 } 314 ++length; 315 } 316 if (length > bestLength) { 317 bestStart = start; 318 bestLength = length; 319 } 320 if (bestLength == 0) return null; 321 if (bestLength == 1) { 322 // This is currently inefficient. If only one stripe expired, we will rewrite some 323 // entire stripe just to delete some expired files because we rely on metadata and it 324 // cannot simply be updated in an old file. When we either determine stripe dynamically 325 // or move metadata to manifest, we can just drop the "expired stripes". 326 if (bestStart == (stripes.size() - 1)) return null; 327 ++bestLength; 328 } 329 LOG.debug("Merging " + bestLength + " stripes to delete expired store files"); 330 int endIndex = bestStart + bestLength - 1; 331 ConcatenatedLists<HStoreFile> sfs = new ConcatenatedLists<>(); 332 sfs.addAllSublists(stripes.subList(bestStart, endIndex + 1)); 333 SplitStripeCompactionRequest result = new SplitStripeCompactionRequest(sfs, 334 si.getStartRow(bestStart), si.getEndRow(endIndex), 1, Long.MAX_VALUE); 335 if (canDropDeletesNoL0) { 336 result.setMajorRangeFull(); 337 } 338 return result; 339 } 340 341 private static long getTotalKvCount(final Collection<HStoreFile> candidates) { 342 long totalSize = 0; 343 for (HStoreFile storeFile : candidates) { 344 totalSize += storeFile.getReader().getEntries(); 345 } 346 return totalSize; 347 } 348 349 public static long getTotalFileSize(final Collection<HStoreFile> candidates) { 350 long totalSize = 0; 351 for (HStoreFile storeFile : candidates) { 352 totalSize += storeFile.getReader().length(); 353 } 354 return totalSize; 355 } 356 357 private Pair<Long, Integer> estimateTargetKvs(Collection<HStoreFile> files, double splitCount) { 358 // If the size is larger than what we target, we don't want to split into proportionally 359 // larger parts and then have to split again very soon. So, we will increase the multiplier 360 // by one until we get small enough parts. E.g. 5Gb stripe that should have been split into 361 // 2 parts when it was 3Gb will be split into 3x1.67Gb parts, rather than 2x2.5Gb parts. 362 long totalSize = getTotalFileSize(files); 363 long targetPartSize = config.getSplitPartSize(); 364 assert targetPartSize > 0 && splitCount > 0; 365 double ratio = totalSize / (splitCount * targetPartSize); // ratio of real to desired size 366 while (ratio > 1.0) { 367 // Ratio of real to desired size if we increase the multiplier. 368 double newRatio = totalSize / ((splitCount + 1.0) * targetPartSize); 369 if ((1.0 / newRatio) >= ratio) break; // New ratio is < 1.0, but further than the last one. 370 ratio = newRatio; 371 splitCount += 1.0; 372 } 373 long kvCount = (long)(getTotalKvCount(files) / splitCount); 374 return new Pair<>(kvCount, (int)Math.ceil(splitCount)); 375 } 376 377 /** Stripe compaction request wrapper. */ 378 public abstract static class StripeCompactionRequest { 379 protected CompactionRequestImpl request; 380 protected byte[] majorRangeFromRow = null, majorRangeToRow = null; 381 382 public List<Path> execute(StripeCompactor compactor, 383 ThroughputController throughputController) throws IOException { 384 return execute(compactor, throughputController, null); 385 } 386 /** 387 * Executes the request against compactor (essentially, just calls correct overload of 388 * compact method), to simulate more dynamic dispatch. 389 * @param compactor Compactor. 390 * @return result of compact(...) 391 */ 392 public abstract List<Path> execute(StripeCompactor compactor, 393 ThroughputController throughputController, User user) throws IOException; 394 395 public StripeCompactionRequest(CompactionRequestImpl request) { 396 this.request = request; 397 } 398 399 /** 400 * Sets compaction "major range". Major range is the key range for which all 401 * the files are included, so they can be treated like major-compacted files. 402 * @param startRow Left boundary, inclusive. 403 * @param endRow Right boundary, exclusive. 404 */ 405 public void setMajorRange(byte[] startRow, byte[] endRow) { 406 this.majorRangeFromRow = startRow; 407 this.majorRangeToRow = endRow; 408 } 409 410 public CompactionRequestImpl getRequest() { 411 return this.request; 412 } 413 414 public void setRequest(CompactionRequestImpl request) { 415 assert request != null; 416 this.request = request; 417 this.majorRangeFromRow = this.majorRangeToRow = null; 418 } 419 } 420 421 /** 422 * Request for stripe compactor that will cause it to split the source files into several 423 * separate files at the provided boundaries. 424 */ 425 private static class BoundaryStripeCompactionRequest extends StripeCompactionRequest { 426 private final List<byte[]> targetBoundaries; 427 428 /** 429 * @param request Original request. 430 * @param targetBoundaries New files should be written with these boundaries. 431 */ 432 public BoundaryStripeCompactionRequest(CompactionRequestImpl request, 433 List<byte[]> targetBoundaries) { 434 super(request); 435 this.targetBoundaries = targetBoundaries; 436 } 437 438 public BoundaryStripeCompactionRequest(Collection<HStoreFile> files, 439 List<byte[]> targetBoundaries) { 440 this(new CompactionRequestImpl(files), targetBoundaries); 441 } 442 443 @Override 444 public List<Path> execute(StripeCompactor compactor, 445 ThroughputController throughputController, User user) throws IOException { 446 return compactor.compact(this.request, this.targetBoundaries, this.majorRangeFromRow, 447 this.majorRangeToRow, throughputController, user); 448 } 449 } 450 451 /** 452 * Request for stripe compactor that will cause it to split the source files into several 453 * separate files into based on key-value count, as well as file count limit. 454 * Most of the files will be roughly the same size. The last file may be smaller or larger 455 * depending on the interplay of the amount of data and maximum number of files allowed. 456 */ 457 private static class SplitStripeCompactionRequest extends StripeCompactionRequest { 458 private final byte[] startRow, endRow; 459 private final int targetCount; 460 private final long targetKvs; 461 462 /** 463 * @param request Original request. 464 * @param startRow Left boundary of the range to compact, inclusive. 465 * @param endRow Right boundary of the range to compact, exclusive. 466 * @param targetCount The maximum number of stripe to compact into. 467 * @param targetKvs The KV count of each segment. If targetKvs*targetCount is less than 468 * total number of kvs, all the overflow data goes into the last stripe. 469 */ 470 public SplitStripeCompactionRequest(CompactionRequestImpl request, 471 byte[] startRow, byte[] endRow, int targetCount, long targetKvs) { 472 super(request); 473 this.startRow = startRow; 474 this.endRow = endRow; 475 this.targetCount = targetCount; 476 this.targetKvs = targetKvs; 477 } 478 479 public SplitStripeCompactionRequest( 480 Collection<HStoreFile> files, byte[] startRow, byte[] endRow, long targetKvs) { 481 this(files, startRow, endRow, Integer.MAX_VALUE, targetKvs); 482 } 483 484 public SplitStripeCompactionRequest(Collection<HStoreFile> files, 485 byte[] startRow, byte[] endRow, int targetCount, long targetKvs) { 486 this(new CompactionRequestImpl(files), startRow, endRow, targetCount, targetKvs); 487 } 488 489 @Override 490 public List<Path> execute(StripeCompactor compactor, 491 ThroughputController throughputController, User user) throws IOException { 492 return compactor.compact(this.request, this.targetCount, this.targetKvs, this.startRow, 493 this.endRow, this.majorRangeFromRow, this.majorRangeToRow, throughputController, user); 494 } 495 496 /** Set major range of the compaction to the entire compaction range. 497 * See {@link #setMajorRange(byte[], byte[])}. */ 498 public void setMajorRangeFull() { 499 setMajorRange(this.startRow, this.endRow); 500 } 501 } 502 503 /** The information about stripes that the policy needs to do its stuff */ 504 public static interface StripeInformationProvider { 505 public Collection<HStoreFile> getStorefiles(); 506 507 /** 508 * Gets the start row for a given stripe. 509 * @param stripeIndex Stripe index. 510 * @return Start row. May be an open key. 511 */ 512 public byte[] getStartRow(int stripeIndex); 513 514 /** 515 * Gets the end row for a given stripe. 516 * @param stripeIndex Stripe index. 517 * @return End row. May be an open key. 518 */ 519 public byte[] getEndRow(int stripeIndex); 520 521 /** 522 * @return Level 0 files. 523 */ 524 public List<HStoreFile> getLevel0Files(); 525 526 /** 527 * @return All stripe boundaries; including the open ones on both ends. 528 */ 529 public List<byte[]> getStripeBoundaries(); 530 531 /** 532 * @return The stripes. 533 */ 534 public ArrayList<ImmutableList<HStoreFile>> getStripes(); 535 536 /** 537 * @return Stripe count. 538 */ 539 public int getStripeCount(); 540 } 541}