001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.regionserver.compactions; 020 021import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY; 022 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.Collection; 026import java.util.List; 027 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.CellComparator; 031import org.apache.hadoop.hbase.regionserver.HStoreFile; 032import org.apache.hadoop.hbase.regionserver.StoreConfigInformation; 033import org.apache.hadoop.hbase.regionserver.StoreUtils; 034import org.apache.hadoop.hbase.regionserver.StripeStoreConfig; 035import org.apache.hadoop.hbase.regionserver.StripeStoreFlusher; 036import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 037import org.apache.hadoop.hbase.security.User; 038import org.apache.hadoop.hbase.util.Bytes; 039import org.apache.hadoop.hbase.util.ConcatenatedLists; 040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 041import org.apache.hadoop.hbase.util.Pair; 042import org.apache.yetus.audience.InterfaceAudience; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 046 047/** 048 * Stripe store implementation of compaction policy. 049 */ 050@InterfaceAudience.Private 051public class StripeCompactionPolicy extends CompactionPolicy { 052 private final static Logger LOG = LoggerFactory.getLogger(StripeCompactionPolicy.class); 053 // Policy used to compact individual stripes. 054 private ExploringCompactionPolicy stripePolicy = null; 055 056 private StripeStoreConfig config; 057 058 public StripeCompactionPolicy( 059 Configuration conf, StoreConfigInformation storeConfigInfo, StripeStoreConfig config) { 060 super(conf, storeConfigInfo); 061 this.config = config; 062 stripePolicy = new ExploringCompactionPolicy(conf, storeConfigInfo); 063 } 064 065 public List<HStoreFile> preSelectFilesForCoprocessor(StripeInformationProvider si, 066 List<HStoreFile> filesCompacting) { 067 // We sincerely hope nobody is messing with us with their coprocessors. 068 // If they do, they are very likely to shoot themselves in the foot. 069 // We'll just exclude all the filesCompacting from the list. 070 ArrayList<HStoreFile> candidateFiles = new ArrayList<>(si.getStorefiles()); 071 candidateFiles.removeAll(filesCompacting); 072 return candidateFiles; 073 } 074 075 public StripeCompactionRequest createEmptyRequest( 076 StripeInformationProvider si, CompactionRequestImpl request) { 077 // Treat as L0-ish compaction with fixed set of files, and hope for the best. 078 if (si.getStripeCount() > 0) { 079 return new BoundaryStripeCompactionRequest(request, si.getStripeBoundaries()); 080 } 081 Pair<Long, Integer> targetKvsAndCount = estimateTargetKvs( 082 request.getFiles(), this.config.getInitialCount()); 083 return new SplitStripeCompactionRequest( 084 request, OPEN_KEY, OPEN_KEY, targetKvsAndCount.getSecond(), targetKvsAndCount.getFirst()); 085 } 086 087 public StripeStoreFlusher.StripeFlushRequest selectFlush(CellComparator comparator, 088 StripeInformationProvider si, int kvCount) { 089 if (this.config.isUsingL0Flush()) { 090 // L0 is used, return dumb request. 091 return new StripeStoreFlusher.StripeFlushRequest(comparator); 092 } 093 if (si.getStripeCount() == 0) { 094 // No stripes - start with the requisite count, derive KVs per stripe. 095 int initialCount = this.config.getInitialCount(); 096 return new StripeStoreFlusher.SizeStripeFlushRequest(comparator, initialCount, 097 kvCount / initialCount); 098 } 099 // There are stripes - do according to the boundaries. 100 return new StripeStoreFlusher.BoundaryStripeFlushRequest(comparator, si.getStripeBoundaries()); 101 } 102 103 public StripeCompactionRequest selectCompaction(StripeInformationProvider si, 104 List<HStoreFile> filesCompacting, boolean isOffpeak) throws IOException { 105 // TODO: first cut - no parallel compactions. To have more fine grained control we 106 // probably need structure more sophisticated than a list. 107 if (!filesCompacting.isEmpty()) { 108 LOG.debug("Not selecting compaction: " + filesCompacting.size() + " files compacting"); 109 return null; 110 } 111 112 // We are going to do variations of compaction in strict order of preference. 113 // A better/more advanced approach is to use a heuristic to see which one is "more 114 // necessary" at current time. 115 116 // This can happen due to region split. We can skip it later; for now preserve 117 // compact-all-things behavior. 118 Collection<HStoreFile> allFiles = si.getStorefiles(); 119 if (StoreUtils.hasReferences(allFiles)) { 120 LOG.debug("There are references in the store; compacting all files"); 121 long targetKvs = estimateTargetKvs(allFiles, config.getInitialCount()).getFirst(); 122 SplitStripeCompactionRequest request = new SplitStripeCompactionRequest( 123 allFiles, OPEN_KEY, OPEN_KEY, targetKvs); 124 request.setMajorRangeFull(); 125 return request; 126 } 127 128 int stripeCount = si.getStripeCount(); 129 List<HStoreFile> l0Files = si.getLevel0Files(); 130 131 // See if we need to make new stripes. 132 boolean shouldCompactL0 = (this.config.getLevel0MinFiles() <= l0Files.size()); 133 if (stripeCount == 0) { 134 if (!shouldCompactL0) return null; // nothing to do. 135 return selectNewStripesCompaction(si); 136 } 137 138 boolean canDropDeletesNoL0 = l0Files.isEmpty(); 139 if (shouldCompactL0) { 140 if (!canDropDeletesNoL0) { 141 // If we need to compact L0, see if we can add something to it, and drop deletes. 142 StripeCompactionRequest result = selectSingleStripeCompaction( 143 si, true, canDropDeletesNoL0, isOffpeak); 144 if (result != null) return result; 145 } 146 LOG.debug("Selecting L0 compaction with " + l0Files.size() + " files"); 147 return new BoundaryStripeCompactionRequest(l0Files, si.getStripeBoundaries()); 148 } 149 150 // Try to delete fully expired stripes 151 StripeCompactionRequest result = selectExpiredMergeCompaction(si, canDropDeletesNoL0); 152 if (result != null) return result; 153 154 // Ok, nothing special here, let's see if we need to do a common compaction. 155 // This will also split the stripes that are too big if needed. 156 return selectSingleStripeCompaction(si, false, canDropDeletesNoL0, isOffpeak); 157 } 158 159 public boolean needsCompactions(StripeInformationProvider si, List<HStoreFile> filesCompacting) { 160 // Approximation on whether we need compaction. 161 return filesCompacting.isEmpty() 162 && (StoreUtils.hasReferences(si.getStorefiles()) 163 || (si.getLevel0Files().size() >= this.config.getLevel0MinFiles()) 164 || needsSingleStripeCompaction(si)); 165 } 166 167 @Override 168 public boolean shouldPerformMajorCompaction(Collection<HStoreFile> filesToCompact) 169 throws IOException { 170 return false; // there's never a major compaction! 171 } 172 173 @Override 174 public boolean throttleCompaction(long compactionSize) { 175 return compactionSize > comConf.getThrottlePoint(); 176 } 177 178 /** 179 * @param si StoreFileManager. 180 * @return Whether any stripe potentially needs compaction. 181 */ 182 protected boolean needsSingleStripeCompaction(StripeInformationProvider si) { 183 int minFiles = this.config.getStripeCompactMinFiles(); 184 for (List<HStoreFile> stripe : si.getStripes()) { 185 if (stripe.size() >= minFiles) return true; 186 } 187 return false; 188 } 189 190 protected StripeCompactionRequest selectSingleStripeCompaction(StripeInformationProvider si, 191 boolean includeL0, boolean canDropDeletesWithoutL0, boolean isOffpeak) throws IOException { 192 ArrayList<ImmutableList<HStoreFile>> stripes = si.getStripes(); 193 194 int bqIndex = -1; 195 List<HStoreFile> bqSelection = null; 196 int stripeCount = stripes.size(); 197 long bqTotalSize = -1; 198 for (int i = 0; i < stripeCount; ++i) { 199 // If we want to compact L0 to drop deletes, we only want whole-stripe compactions. 200 // So, pass includeL0 as 2nd parameter to indicate that. 201 List<HStoreFile> selection = selectSimpleCompaction(stripes.get(i), 202 !canDropDeletesWithoutL0 && includeL0, isOffpeak); 203 if (selection.isEmpty()) continue; 204 long size = 0; 205 for (HStoreFile sf : selection) { 206 size += sf.getReader().length(); 207 } 208 if (bqSelection == null || selection.size() > bqSelection.size() || 209 (selection.size() == bqSelection.size() && size < bqTotalSize)) { 210 bqSelection = selection; 211 bqIndex = i; 212 bqTotalSize = size; 213 } 214 } 215 if (bqSelection == null) { 216 LOG.debug("No good compaction is possible in any stripe"); 217 return null; 218 } 219 List<HStoreFile> filesToCompact = new ArrayList<>(bqSelection); 220 // See if we can, and need to, split this stripe. 221 int targetCount = 1; 222 long targetKvs = Long.MAX_VALUE; 223 boolean hasAllFiles = filesToCompact.size() == stripes.get(bqIndex).size(); 224 String splitString = ""; 225 if (hasAllFiles && bqTotalSize >= config.getSplitSize()) { 226 if (includeL0) { 227 // We want to avoid the scenario where we compact a stripe w/L0 and then split it. 228 // So, if we might split, don't compact the stripe with L0. 229 return null; 230 } 231 Pair<Long, Integer> kvsAndCount = estimateTargetKvs(filesToCompact, config.getSplitCount()); 232 targetKvs = kvsAndCount.getFirst(); 233 targetCount = kvsAndCount.getSecond(); 234 splitString = "; the stripe will be split into at most " 235 + targetCount + " stripes with " + targetKvs + " target KVs"; 236 } 237 238 LOG.debug("Found compaction in a stripe with end key [" 239 + Bytes.toString(si.getEndRow(bqIndex)) + "], with " 240 + filesToCompact.size() + " files of total size " + bqTotalSize + splitString); 241 242 // See if we can drop deletes. 243 StripeCompactionRequest req; 244 if (includeL0) { 245 assert hasAllFiles; 246 List<HStoreFile> l0Files = si.getLevel0Files(); 247 LOG.debug("Adding " + l0Files.size() + " files to compaction to be able to drop deletes"); 248 ConcatenatedLists<HStoreFile> sfs = new ConcatenatedLists<>(); 249 sfs.addSublist(filesToCompact); 250 sfs.addSublist(l0Files); 251 req = new BoundaryStripeCompactionRequest(sfs, si.getStripeBoundaries()); 252 } else { 253 req = new SplitStripeCompactionRequest( 254 filesToCompact, si.getStartRow(bqIndex), si.getEndRow(bqIndex), targetCount, targetKvs); 255 } 256 if (hasAllFiles && (canDropDeletesWithoutL0 || includeL0)) { 257 req.setMajorRange(si.getStartRow(bqIndex), si.getEndRow(bqIndex)); 258 } 259 req.getRequest().setOffPeak(isOffpeak); 260 return req; 261 } 262 263 /** 264 * Selects the compaction of a single stripe using default policy. 265 * @param sfs Files. 266 * @param allFilesOnly Whether a compaction of all-or-none files is needed. 267 * @return The resulting selection. 268 */ 269 private List<HStoreFile> selectSimpleCompaction( 270 List<HStoreFile> sfs, boolean allFilesOnly, boolean isOffpeak) { 271 int minFilesLocal = Math.max( 272 allFilesOnly ? sfs.size() : 0, this.config.getStripeCompactMinFiles()); 273 int maxFilesLocal = Math.max(this.config.getStripeCompactMaxFiles(), minFilesLocal); 274 return stripePolicy.applyCompactionPolicy(sfs, false, isOffpeak, minFilesLocal, maxFilesLocal); 275 } 276 277 private StripeCompactionRequest selectNewStripesCompaction(StripeInformationProvider si) { 278 List<HStoreFile> l0Files = si.getLevel0Files(); 279 Pair<Long, Integer> kvsAndCount = estimateTargetKvs(l0Files, config.getInitialCount()); 280 LOG.debug("Creating " + kvsAndCount.getSecond() + " initial stripes with " 281 + kvsAndCount.getFirst() + " kvs each via L0 compaction of " + l0Files.size() + " files"); 282 SplitStripeCompactionRequest request = new SplitStripeCompactionRequest( 283 si.getLevel0Files(), OPEN_KEY, OPEN_KEY, kvsAndCount.getSecond(), kvsAndCount.getFirst()); 284 request.setMajorRangeFull(); // L0 only, can drop deletes. 285 return request; 286 } 287 288 private StripeCompactionRequest selectExpiredMergeCompaction( 289 StripeInformationProvider si, boolean canDropDeletesNoL0) { 290 long cfTtl = this.storeConfigInfo.getStoreFileTtl(); 291 if (cfTtl == Long.MAX_VALUE) { 292 return null; // minversion might be set, cannot delete old files 293 } 294 long timestampCutoff = EnvironmentEdgeManager.currentTime() - cfTtl; 295 // Merge the longest sequence of stripes where all files have expired, if any. 296 int start = -1, bestStart = -1, length = 0, bestLength = 0; 297 ArrayList<ImmutableList<HStoreFile>> stripes = si.getStripes(); 298 OUTER: for (int i = 0; i < stripes.size(); ++i) { 299 for (HStoreFile storeFile : stripes.get(i)) { 300 if (storeFile.getReader().getMaxTimestamp() < timestampCutoff) continue; 301 // Found non-expired file, this stripe has to stay. 302 if (length > bestLength) { 303 bestStart = start; 304 bestLength = length; 305 } 306 start = -1; 307 length = 0; 308 continue OUTER; 309 } 310 if (start == -1) { 311 start = i; 312 } 313 ++length; 314 } 315 if (length > bestLength) { 316 bestStart = start; 317 bestLength = length; 318 } 319 if (bestLength == 0) return null; 320 if (bestLength == 1) { 321 // This is currently inefficient. If only one stripe expired, we will rewrite some 322 // entire stripe just to delete some expired files because we rely on metadata and it 323 // cannot simply be updated in an old file. When we either determine stripe dynamically 324 // or move metadata to manifest, we can just drop the "expired stripes". 325 if (bestStart == (stripes.size() - 1)) return null; 326 ++bestLength; 327 } 328 LOG.debug("Merging " + bestLength + " stripes to delete expired store files"); 329 int endIndex = bestStart + bestLength - 1; 330 ConcatenatedLists<HStoreFile> sfs = new ConcatenatedLists<>(); 331 sfs.addAllSublists(stripes.subList(bestStart, endIndex + 1)); 332 SplitStripeCompactionRequest result = new SplitStripeCompactionRequest(sfs, 333 si.getStartRow(bestStart), si.getEndRow(endIndex), 1, Long.MAX_VALUE); 334 if (canDropDeletesNoL0) { 335 result.setMajorRangeFull(); 336 } 337 return result; 338 } 339 340 private static long getTotalKvCount(final Collection<HStoreFile> candidates) { 341 long totalSize = 0; 342 for (HStoreFile storeFile : candidates) { 343 totalSize += storeFile.getReader().getEntries(); 344 } 345 return totalSize; 346 } 347 348 public static long getTotalFileSize(final Collection<HStoreFile> candidates) { 349 long totalSize = 0; 350 for (HStoreFile storeFile : candidates) { 351 totalSize += storeFile.getReader().length(); 352 } 353 return totalSize; 354 } 355 356 private Pair<Long, Integer> estimateTargetKvs(Collection<HStoreFile> files, double splitCount) { 357 // If the size is larger than what we target, we don't want to split into proportionally 358 // larger parts and then have to split again very soon. So, we will increase the multiplier 359 // by one until we get small enough parts. E.g. 5Gb stripe that should have been split into 360 // 2 parts when it was 3Gb will be split into 3x1.67Gb parts, rather than 2x2.5Gb parts. 361 long totalSize = getTotalFileSize(files); 362 long targetPartSize = config.getSplitPartSize(); 363 assert targetPartSize > 0 && splitCount > 0; 364 double ratio = totalSize / (splitCount * targetPartSize); // ratio of real to desired size 365 while (ratio > 1.0) { 366 // Ratio of real to desired size if we increase the multiplier. 367 double newRatio = totalSize / ((splitCount + 1.0) * targetPartSize); 368 if ((1.0 / newRatio) >= ratio) break; // New ratio is < 1.0, but further than the last one. 369 ratio = newRatio; 370 splitCount += 1.0; 371 } 372 long kvCount = (long)(getTotalKvCount(files) / splitCount); 373 return new Pair<>(kvCount, (int)Math.ceil(splitCount)); 374 } 375 376 /** Stripe compaction request wrapper. */ 377 public abstract static class StripeCompactionRequest { 378 protected CompactionRequestImpl request; 379 protected byte[] majorRangeFromRow = null, majorRangeToRow = null; 380 381 public List<Path> execute(StripeCompactor compactor, 382 ThroughputController throughputController) throws IOException { 383 return execute(compactor, throughputController, null); 384 } 385 /** 386 * Executes the request against compactor (essentially, just calls correct overload of 387 * compact method), to simulate more dynamic dispatch. 388 * @param compactor Compactor. 389 * @return result of compact(...) 390 */ 391 public abstract List<Path> execute(StripeCompactor compactor, 392 ThroughputController throughputController, User user) throws IOException; 393 394 public StripeCompactionRequest(CompactionRequestImpl request) { 395 this.request = request; 396 } 397 398 /** 399 * Sets compaction "major range". Major range is the key range for which all 400 * the files are included, so they can be treated like major-compacted files. 401 * @param startRow Left boundary, inclusive. 402 * @param endRow Right boundary, exclusive. 403 */ 404 public void setMajorRange(byte[] startRow, byte[] endRow) { 405 this.majorRangeFromRow = startRow; 406 this.majorRangeToRow = endRow; 407 } 408 409 public CompactionRequestImpl getRequest() { 410 return this.request; 411 } 412 413 public void setRequest(CompactionRequestImpl request) { 414 assert request != null; 415 this.request = request; 416 this.majorRangeFromRow = this.majorRangeToRow = null; 417 } 418 } 419 420 /** 421 * Request for stripe compactor that will cause it to split the source files into several 422 * separate files at the provided boundaries. 423 */ 424 private static class BoundaryStripeCompactionRequest extends StripeCompactionRequest { 425 private final List<byte[]> targetBoundaries; 426 427 /** 428 * @param request Original request. 429 * @param targetBoundaries New files should be written with these boundaries. 430 */ 431 public BoundaryStripeCompactionRequest(CompactionRequestImpl request, 432 List<byte[]> targetBoundaries) { 433 super(request); 434 this.targetBoundaries = targetBoundaries; 435 } 436 437 public BoundaryStripeCompactionRequest(Collection<HStoreFile> files, 438 List<byte[]> targetBoundaries) { 439 this(new CompactionRequestImpl(files), targetBoundaries); 440 } 441 442 @Override 443 public List<Path> execute(StripeCompactor compactor, 444 ThroughputController throughputController, User user) throws IOException { 445 return compactor.compact(this.request, this.targetBoundaries, this.majorRangeFromRow, 446 this.majorRangeToRow, throughputController, user); 447 } 448 } 449 450 /** 451 * Request for stripe compactor that will cause it to split the source files into several 452 * separate files into based on key-value count, as well as file count limit. 453 * Most of the files will be roughly the same size. The last file may be smaller or larger 454 * depending on the interplay of the amount of data and maximum number of files allowed. 455 */ 456 private static class SplitStripeCompactionRequest extends StripeCompactionRequest { 457 private final byte[] startRow, endRow; 458 private final int targetCount; 459 private final long targetKvs; 460 461 /** 462 * @param request Original request. 463 * @param startRow Left boundary of the range to compact, inclusive. 464 * @param endRow Right boundary of the range to compact, exclusive. 465 * @param targetCount The maximum number of stripe to compact into. 466 * @param targetKvs The KV count of each segment. If targetKvs*targetCount is less than 467 * total number of kvs, all the overflow data goes into the last stripe. 468 */ 469 public SplitStripeCompactionRequest(CompactionRequestImpl request, 470 byte[] startRow, byte[] endRow, int targetCount, long targetKvs) { 471 super(request); 472 this.startRow = startRow; 473 this.endRow = endRow; 474 this.targetCount = targetCount; 475 this.targetKvs = targetKvs; 476 } 477 478 public SplitStripeCompactionRequest( 479 Collection<HStoreFile> files, byte[] startRow, byte[] endRow, long targetKvs) { 480 this(files, startRow, endRow, Integer.MAX_VALUE, targetKvs); 481 } 482 483 public SplitStripeCompactionRequest(Collection<HStoreFile> files, 484 byte[] startRow, byte[] endRow, int targetCount, long targetKvs) { 485 this(new CompactionRequestImpl(files), startRow, endRow, targetCount, targetKvs); 486 } 487 488 @Override 489 public List<Path> execute(StripeCompactor compactor, 490 ThroughputController throughputController, User user) throws IOException { 491 return compactor.compact(this.request, this.targetCount, this.targetKvs, this.startRow, 492 this.endRow, this.majorRangeFromRow, this.majorRangeToRow, throughputController, user); 493 } 494 495 /** Set major range of the compaction to the entire compaction range. 496 * See {@link #setMajorRange(byte[], byte[])}. */ 497 public void setMajorRangeFull() { 498 setMajorRange(this.startRow, this.endRow); 499 } 500 } 501 502 /** The information about stripes that the policy needs to do its stuff */ 503 public static interface StripeInformationProvider { 504 public Collection<HStoreFile> getStorefiles(); 505 506 /** 507 * Gets the start row for a given stripe. 508 * @param stripeIndex Stripe index. 509 * @return Start row. May be an open key. 510 */ 511 public byte[] getStartRow(int stripeIndex); 512 513 /** 514 * Gets the end row for a given stripe. 515 * @param stripeIndex Stripe index. 516 * @return End row. May be an open key. 517 */ 518 public byte[] getEndRow(int stripeIndex); 519 520 /** 521 * @return Level 0 files. 522 */ 523 public List<HStoreFile> getLevel0Files(); 524 525 /** 526 * @return All stripe boundaries; including the open ones on both ends. 527 */ 528 public List<byte[]> getStripeBoundaries(); 529 530 /** 531 * @return The stripes. 532 */ 533 public ArrayList<ImmutableList<HStoreFile>> getStripes(); 534 535 /** 536 * @return Stripe count. 537 */ 538 public int getStripeCount(); 539 } 540}