001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.compactions; 019 020import static org.apache.hadoop.hbase.regionserver.HStoreFile.EARLIEST_PUT_TS; 021import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY; 022import static org.apache.hadoop.hbase.regionserver.ScanType.COMPACT_DROP_DELETES; 023import static org.apache.hadoop.hbase.regionserver.ScanType.COMPACT_RETAIN_DELETES; 024 025import java.io.IOException; 026import java.io.InterruptedIOException; 027import java.util.ArrayList; 028import java.util.Collection; 029import java.util.Collections; 030import java.util.IdentityHashMap; 031import java.util.List; 032import java.util.Map; 033import java.util.Set; 034import java.util.function.Consumer; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.hbase.Cell; 038import org.apache.hadoop.hbase.HConstants; 039import org.apache.hadoop.hbase.PrivateCellUtil; 040import org.apache.hadoop.hbase.PrivateConstants; 041import org.apache.hadoop.hbase.io.compress.Compression; 042import org.apache.hadoop.hbase.io.hfile.HFile; 043import org.apache.hadoop.hbase.io.hfile.HFileInfo; 044import org.apache.hadoop.hbase.regionserver.CellSink; 045import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams; 046import org.apache.hadoop.hbase.regionserver.HStore; 047import org.apache.hadoop.hbase.regionserver.HStoreFile; 048import org.apache.hadoop.hbase.regionserver.InternalScanner; 049import org.apache.hadoop.hbase.regionserver.ScanInfo; 050import org.apache.hadoop.hbase.regionserver.ScanType; 051import org.apache.hadoop.hbase.regionserver.ScannerContext; 052import org.apache.hadoop.hbase.regionserver.Shipper; 053import org.apache.hadoop.hbase.regionserver.ShipperListener; 054import org.apache.hadoop.hbase.regionserver.StoreFileReader; 055import org.apache.hadoop.hbase.regionserver.StoreFileScanner; 056import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 057import org.apache.hadoop.hbase.regionserver.StoreScanner; 058import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; 059import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil; 060import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 061import org.apache.hadoop.hbase.security.User; 062import org.apache.hadoop.hbase.util.Bytes; 063import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 064import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; 065import org.apache.yetus.audience.InterfaceAudience; 066import org.slf4j.Logger; 067import org.slf4j.LoggerFactory; 068 069import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 070 071/** 072 * A compactor is a compaction algorithm associated a given policy. Base class also contains 073 * reusable parts for implementing compactors (what is common and what isn't is evolving). 074 * <p> 075 * Compactions might be concurrent against a given store and the Compactor is shared among them. Do 076 * not put mutable state into class fields. All Compactor class fields should be final or 077 * effectively final. 'keepSeqIdPeriod' is an exception to this rule because unit tests may set it. 078 */ 079@InterfaceAudience.Private 080public abstract class Compactor<T extends CellSink> { 081 private static final Logger LOG = LoggerFactory.getLogger(Compactor.class); 082 protected static final long COMPACTION_PROGRESS_LOG_INTERVAL = 60 * 1000; 083 protected final Configuration conf; 084 protected final HStore store; 085 protected final int compactionKVMax; 086 protected final long compactScannerSizeLimit; 087 protected final Compression.Algorithm majorCompactionCompression; 088 protected final Compression.Algorithm minorCompactionCompression; 089 090 /** specify how many days to keep MVCC values during major compaction **/ 091 protected int keepSeqIdPeriod; 092 093 // Configs that drive whether we drop page cache behind compactions 094 protected static final String MAJOR_COMPACTION_DROP_CACHE = 095 "hbase.regionserver.majorcompaction.pagecache.drop"; 096 protected static final String MINOR_COMPACTION_DROP_CACHE = 097 "hbase.regionserver.minorcompaction.pagecache.drop"; 098 099 protected final boolean dropCacheMajor; 100 protected final boolean dropCacheMinor; 101 102 // We track progress per request using the CompactionRequestImpl identity as key. 103 // completeCompaction() cleans up this state. 104 private final Set<CompactionProgress> progressSet = 105 Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap<>())); 106 107 // TODO: depending on Store is not good but, realistically, all compactors currently do. 108 Compactor(Configuration conf, HStore store) { 109 this.conf = conf; 110 this.store = store; 111 this.compactionKVMax = 112 this.conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT); 113 this.compactScannerSizeLimit = this.conf.getLong(HConstants.COMPACTION_SCANNER_SIZE_MAX, 114 HConstants.COMPACTION_SCANNER_SIZE_MAX_DEFAULT); 115 this.majorCompactionCompression = (store.getColumnFamilyDescriptor() == null) 116 ? Compression.Algorithm.NONE 117 : store.getColumnFamilyDescriptor().getMajorCompactionCompressionType(); 118 this.minorCompactionCompression = (store.getColumnFamilyDescriptor() == null) 119 ? Compression.Algorithm.NONE 120 : store.getColumnFamilyDescriptor().getMinorCompactionCompressionType(); 121 this.keepSeqIdPeriod = 122 Math.max(this.conf.getInt(HConstants.KEEP_SEQID_PERIOD, HConstants.MIN_KEEP_SEQID_PERIOD), 123 HConstants.MIN_KEEP_SEQID_PERIOD); 124 this.dropCacheMajor = conf.getBoolean(MAJOR_COMPACTION_DROP_CACHE, true); 125 this.dropCacheMinor = conf.getBoolean(MINOR_COMPACTION_DROP_CACHE, true); 126 } 127 128 protected interface CellSinkFactory<S> { 129 S createWriter(InternalScanner scanner, FileDetails fd, boolean shouldDropBehind, boolean major, 130 Consumer<Path> writerCreationTracker) throws IOException; 131 } 132 133 /** The sole reason this class exists is that java has no ref/out/pointer parameters. */ 134 protected static class FileDetails { 135 /** Maximum key count after compaction (for blooms) */ 136 public long maxKeyCount = 0; 137 /** Earliest put timestamp if major compaction */ 138 public long earliestPutTs = HConstants.LATEST_TIMESTAMP; 139 /** Latest put timestamp */ 140 public long latestPutTs = HConstants.LATEST_TIMESTAMP; 141 /** The last key in the files we're compacting. */ 142 public long maxSeqId = 0; 143 /** Latest memstore read point found in any of the involved files */ 144 public long maxMVCCReadpoint = 0; 145 /** Max tags length **/ 146 public int maxTagsLength = 0; 147 /** Min SeqId to keep during a major compaction **/ 148 public long minSeqIdToKeep = 0; 149 /** Total size of the compacted files **/ 150 private long totalCompactedFilesSize = 0; 151 } 152 153 /** 154 * Extracts some details about the files to compact that are commonly needed by compactors. 155 * @param filesToCompact Files. 156 * @param allFiles Whether all files are included for compaction 157 * @parma major If major compaction 158 * @return The result. 159 */ 160 private FileDetails getFileDetails(Collection<HStoreFile> filesToCompact, boolean allFiles, 161 boolean major) throws IOException { 162 FileDetails fd = new FileDetails(); 163 long oldestHFileTimestampToKeepMVCC = 164 EnvironmentEdgeManager.currentTime() - (1000L * 60 * 60 * 24 * this.keepSeqIdPeriod); 165 166 for (HStoreFile file : filesToCompact) { 167 if (allFiles && (file.getModificationTimestamp() < oldestHFileTimestampToKeepMVCC)) { 168 // when isAllFiles is true, all files are compacted so we can calculate the smallest 169 // MVCC value to keep 170 if (fd.minSeqIdToKeep < file.getMaxMemStoreTS()) { 171 fd.minSeqIdToKeep = file.getMaxMemStoreTS(); 172 } 173 } 174 long seqNum = file.getMaxSequenceId(); 175 fd.maxSeqId = Math.max(fd.maxSeqId, seqNum); 176 StoreFileReader r = file.getReader(); 177 if (r == null) { 178 LOG.warn("Null reader for " + file.getPath()); 179 continue; 180 } 181 // NOTE: use getEntries when compacting instead of getFilterEntries, otherwise under-sized 182 // blooms can cause progress to be miscalculated or if the user switches bloom 183 // type (e.g. from ROW to ROWCOL) 184 long keyCount = r.getEntries(); 185 fd.maxKeyCount += keyCount; 186 // calculate the latest MVCC readpoint in any of the involved store files 187 Map<byte[], byte[]> fileInfo = r.loadFileInfo(); 188 189 // calculate the total size of the compacted files 190 fd.totalCompactedFilesSize += r.length(); 191 192 byte[] tmp = null; 193 // Get and set the real MVCCReadpoint for bulk loaded files, which is the 194 // SeqId number. 195 if (r.isBulkLoaded()) { 196 fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, r.getSequenceID()); 197 } else { 198 tmp = fileInfo.get(HFile.Writer.MAX_MEMSTORE_TS_KEY); 199 if (tmp != null) { 200 fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, Bytes.toLong(tmp)); 201 } 202 } 203 tmp = fileInfo.get(HFileInfo.MAX_TAGS_LEN); 204 if (tmp != null) { 205 fd.maxTagsLength = Math.max(fd.maxTagsLength, Bytes.toInt(tmp)); 206 } 207 // If required, calculate the earliest put timestamp of all involved storefiles. 208 // This is used to remove family delete marker during compaction. 209 long earliestPutTs = 0; 210 if (allFiles) { 211 tmp = fileInfo.get(EARLIEST_PUT_TS); 212 if (tmp == null) { 213 // There's a file with no information, must be an old one 214 // assume we have very old puts 215 fd.earliestPutTs = earliestPutTs = PrivateConstants.OLDEST_TIMESTAMP; 216 } else { 217 earliestPutTs = Bytes.toLong(tmp); 218 fd.earliestPutTs = Math.min(fd.earliestPutTs, earliestPutTs); 219 } 220 } 221 tmp = fileInfo.get(TIMERANGE_KEY); 222 fd.latestPutTs = 223 tmp == null ? HConstants.LATEST_TIMESTAMP : TimeRangeTracker.parseFrom(tmp).getMax(); 224 LOG.debug( 225 "Compacting {}, keycount={}, bloomtype={}, size={}, " 226 + "encoding={}, compression={}, seqNum={}{}", 227 (file.getPath() == null ? null : file.getPath().getName()), keyCount, 228 r.getBloomFilterType().toString(), TraditionalBinaryPrefix.long2String(r.length(), "", 1), 229 r.getHFileReader().getDataBlockEncoding(), 230 major ? majorCompactionCompression : minorCompactionCompression, seqNum, 231 (allFiles ? ", earliestPutTs=" + earliestPutTs : "")); 232 } 233 return fd; 234 } 235 236 /** 237 * Creates file scanners for compaction. 238 * @param filesToCompact Files. 239 * @return Scanners. 240 */ 241 private List<StoreFileScanner> createFileScanners(Collection<HStoreFile> filesToCompact, 242 long smallestReadPoint, boolean useDropBehind) throws IOException { 243 return StoreFileScanner.getScannersForCompaction(filesToCompact, useDropBehind, 244 smallestReadPoint); 245 } 246 247 private long getSmallestReadPoint() { 248 return store.getSmallestReadPoint(); 249 } 250 251 protected interface InternalScannerFactory { 252 253 ScanType getScanType(CompactionRequestImpl request); 254 255 InternalScanner createScanner(ScanInfo scanInfo, List<StoreFileScanner> scanners, 256 ScanType scanType, FileDetails fd, long smallestReadPoint) throws IOException; 257 } 258 259 protected final InternalScannerFactory defaultScannerFactory = new InternalScannerFactory() { 260 261 @Override 262 public ScanType getScanType(CompactionRequestImpl request) { 263 return request.isAllFiles() ? COMPACT_DROP_DELETES : COMPACT_RETAIN_DELETES; 264 } 265 266 @Override 267 public InternalScanner createScanner(ScanInfo scanInfo, List<StoreFileScanner> scanners, 268 ScanType scanType, FileDetails fd, long smallestReadPoint) throws IOException { 269 return Compactor.this.createScanner(store, scanInfo, scanners, scanType, smallestReadPoint, 270 fd.earliestPutTs); 271 } 272 }; 273 274 protected final CreateStoreFileWriterParams createParams(FileDetails fd, boolean shouldDropBehind, 275 boolean major, Consumer<Path> writerCreationTracker) { 276 return CreateStoreFileWriterParams.create().maxKeyCount(fd.maxKeyCount) 277 .compression(major ? majorCompactionCompression : minorCompactionCompression) 278 .isCompaction(true).includeMVCCReadpoint(fd.maxMVCCReadpoint > 0) 279 .includesTag(fd.maxTagsLength > 0).shouldDropBehind(shouldDropBehind) 280 .totalCompactedFilesSize(fd.totalCompactedFilesSize) 281 .writerCreationTracker(writerCreationTracker); 282 } 283 284 /** 285 * Creates a writer for a new file. 286 * @param fd The file details. 287 * @return Writer for a new StoreFile 288 * @throws IOException if creation failed 289 */ 290 protected final StoreFileWriter createWriter(FileDetails fd, boolean shouldDropBehind, 291 boolean major, Consumer<Path> writerCreationTracker) throws IOException { 292 // When all MVCC readpoints are 0, don't write them. 293 // See HBASE-8166, HBASE-12600, and HBASE-13389. 294 return store.getStoreEngine() 295 .createWriter(createParams(fd, shouldDropBehind, major, writerCreationTracker)); 296 } 297 298 protected final StoreFileWriter createWriter(FileDetails fd, boolean shouldDropBehind, 299 String fileStoragePolicy, boolean major, Consumer<Path> writerCreationTracker) 300 throws IOException { 301 return store.getStoreEngine() 302 .createWriter(createParams(fd, shouldDropBehind, major, writerCreationTracker) 303 .fileStoragePolicy(fileStoragePolicy)); 304 } 305 306 private ScanInfo preCompactScannerOpen(CompactionRequestImpl request, ScanType scanType, 307 User user) throws IOException { 308 if (store.getCoprocessorHost() == null) { 309 return store.getScanInfo(); 310 } 311 return store.getCoprocessorHost().preCompactScannerOpen(store, scanType, request.getTracker(), 312 request, user); 313 } 314 315 /** 316 * Calls coprocessor, if any, to create scanners - after normal scanner creation. 317 * @param request Compaction request. 318 * @param scanType Scan type. 319 * @param scanner The default scanner created for compaction. 320 * @return Scanner scanner to use (usually the default); null if compaction should not proceed. 321 */ 322 private InternalScanner postCompactScannerOpen(CompactionRequestImpl request, ScanType scanType, 323 InternalScanner scanner, User user) throws IOException { 324 if (store.getCoprocessorHost() == null) { 325 return scanner; 326 } 327 return store.getCoprocessorHost().preCompact(store, scanner, scanType, request.getTracker(), 328 request, user); 329 } 330 331 protected final List<Path> compact(final CompactionRequestImpl request, 332 InternalScannerFactory scannerFactory, CellSinkFactory<T> sinkFactory, 333 ThroughputController throughputController, User user) throws IOException { 334 FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles(), request.isMajor()); 335 336 // Find the smallest read point across all the Scanners. 337 long smallestReadPoint = getSmallestReadPoint(); 338 339 boolean dropCache; 340 if (request.isMajor() || request.isAllFiles()) { 341 dropCache = this.dropCacheMajor; 342 } else { 343 dropCache = this.dropCacheMinor; 344 } 345 346 InternalScanner scanner = null; 347 boolean finished = false; 348 List<StoreFileScanner> scanners = 349 createFileScanners(request.getFiles(), smallestReadPoint, dropCache); 350 T writer = null; 351 CompactionProgress progress = new CompactionProgress(fd.maxKeyCount); 352 progressSet.add(progress); 353 try { 354 /* Include deletes, unless we are doing a major compaction */ 355 ScanType scanType = scannerFactory.getScanType(request); 356 ScanInfo scanInfo = preCompactScannerOpen(request, scanType, user); 357 scanner = postCompactScannerOpen(request, scanType, 358 scannerFactory.createScanner(scanInfo, scanners, scanType, fd, smallestReadPoint), user); 359 boolean cleanSeqId = false; 360 if (fd.minSeqIdToKeep > 0 && !store.getColumnFamilyDescriptor().isNewVersionBehavior()) { 361 // For mvcc-sensitive family, we never set mvcc to 0. 362 smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint); 363 cleanSeqId = true; 364 } 365 writer = sinkFactory.createWriter(scanner, fd, dropCache, request.isMajor(), 366 request.getWriterCreationTracker()); 367 finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId, 368 throughputController, request, progress); 369 if (!finished) { 370 throw new InterruptedIOException("Aborting compaction of store " + store + " in region " 371 + store.getRegionInfo().getRegionNameAsString() + " because it was interrupted."); 372 } 373 } finally { 374 // createScanner may fail when seeking hfiles encounter Exception, e.g. even only one hfile 375 // reader encounters java.io.IOException: Invalid HFile block magic: 376 // \x00\x00\x00\x00\x00\x00\x00\x00 377 // and then scanner will be null, but scanners for all the hfiles should be closed, 378 // or else we will find leak of ESTABLISHED sockets. 379 if (scanner == null) { 380 for (StoreFileScanner sfs : scanners) { 381 sfs.close(); 382 } 383 } else { 384 Closeables.close(scanner, true); 385 } 386 if (!finished) { 387 if (writer != null) { 388 abortWriter(writer); 389 } 390 } else { 391 store.updateCompactedMetrics(request.isMajor(), progress); 392 } 393 progressSet.remove(progress); 394 } 395 assert finished : "We should have exited the method on all error paths"; 396 assert writer != null : "Writer should be non-null if no error"; 397 return commitWriter(writer, fd, request); 398 } 399 400 protected abstract List<Path> commitWriter(T writer, FileDetails fd, 401 CompactionRequestImpl request) throws IOException; 402 403 protected abstract void abortWriter(T writer) throws IOException; 404 405 /** 406 * Performs the compaction. 407 * @param fd FileDetails of cell sink writer 408 * @param scanner Where to read from. 409 * @param writer Where to write to. 410 * @param smallestReadPoint Smallest read point. 411 * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= 412 * smallestReadPoint 413 * @param request compaction request. 414 * @param progress Progress reporter. 415 * @return Whether compaction ended; false if it was interrupted for some reason. 416 */ 417 protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, 418 long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController, 419 CompactionRequestImpl request, CompactionProgress progress) throws IOException { 420 assert writer instanceof ShipperListener; 421 long bytesWrittenProgressForLog = 0; 422 long bytesWrittenProgressForShippedCall = 0; 423 // Since scanner.next() can return 'false' but still be delivering data, 424 // we have to use a do/while loop. 425 List<Cell> cells = new ArrayList<>(); 426 long currentTime = EnvironmentEdgeManager.currentTime(); 427 long lastMillis = 0; 428 if (LOG.isDebugEnabled()) { 429 lastMillis = currentTime; 430 } 431 CloseChecker closeChecker = new CloseChecker(conf, currentTime); 432 String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction"); 433 long now = 0; 434 boolean hasMore; 435 ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(compactionKVMax) 436 .setSizeLimit(ScannerContext.LimitScope.BETWEEN_CELLS, Long.MAX_VALUE, Long.MAX_VALUE, 437 compactScannerSizeLimit) 438 .build(); 439 440 throughputController.start(compactionName); 441 Shipper shipper = (scanner instanceof Shipper) ? (Shipper) scanner : null; 442 long shippedCallSizeLimit = 443 (long) request.getFiles().size() * this.store.getColumnFamilyDescriptor().getBlocksize(); 444 try { 445 do { 446 hasMore = scanner.next(cells, scannerContext); 447 currentTime = EnvironmentEdgeManager.currentTime(); 448 if (LOG.isDebugEnabled()) { 449 now = currentTime; 450 } 451 if (closeChecker.isTimeLimit(store, currentTime)) { 452 progress.cancel(); 453 return false; 454 } 455 // output to writer: 456 Cell lastCleanCell = null; 457 long lastCleanCellSeqId = 0; 458 for (Cell c : cells) { 459 if (cleanSeqId && c.getSequenceId() <= smallestReadPoint) { 460 lastCleanCell = c; 461 lastCleanCellSeqId = c.getSequenceId(); 462 PrivateCellUtil.setSequenceId(c, 0); 463 } else { 464 lastCleanCell = null; 465 lastCleanCellSeqId = 0; 466 } 467 writer.append(c); 468 int len = c.getSerializedSize(); 469 ++progress.currentCompactedKVs; 470 progress.totalCompactedSize += len; 471 bytesWrittenProgressForShippedCall += len; 472 if (LOG.isDebugEnabled()) { 473 bytesWrittenProgressForLog += len; 474 } 475 throughputController.control(compactionName, len); 476 if (closeChecker.isSizeLimit(store, len)) { 477 progress.cancel(); 478 return false; 479 } 480 } 481 if (shipper != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) { 482 if (lastCleanCell != null) { 483 // HBASE-16931, set back sequence id to avoid affecting scan order unexpectedly. 484 // ShipperListener will do a clone of the last cells it refer, so need to set back 485 // sequence id before ShipperListener.beforeShipped 486 PrivateCellUtil.setSequenceId(lastCleanCell, lastCleanCellSeqId); 487 } 488 // Clone the cells that are in the writer so that they are freed of references, 489 // if they are holding any. 490 ((ShipperListener) writer).beforeShipped(); 491 // The SHARED block references, being read for compaction, will be kept in prevBlocks 492 // list(See HFileScannerImpl#prevBlocks). In case of scan flow, after each set of cells 493 // being returned to client, we will call shipped() which can clear this list. Here by 494 // we are doing the similar thing. In between the compaction (after every N cells 495 // written with collective size of 'shippedCallSizeLimit') we will call shipped which 496 // may clear prevBlocks list. 497 shipper.shipped(); 498 bytesWrittenProgressForShippedCall = 0; 499 } 500 if (lastCleanCell != null) { 501 // HBASE-16931, set back sequence id to avoid affecting scan order unexpectedly 502 PrivateCellUtil.setSequenceId(lastCleanCell, lastCleanCellSeqId); 503 } 504 // Log the progress of long running compactions every minute if 505 // logging at DEBUG level 506 if (LOG.isDebugEnabled()) { 507 if ((now - lastMillis) >= COMPACTION_PROGRESS_LOG_INTERVAL) { 508 String rate = String.format("%.2f", 509 (bytesWrittenProgressForLog / 1024.0) / ((now - lastMillis) / 1000.0)); 510 LOG.debug("Compaction progress: {} {}, rate={} KB/sec, throughputController is {}", 511 compactionName, progress, rate, throughputController); 512 lastMillis = now; 513 bytesWrittenProgressForLog = 0; 514 } 515 } 516 cells.clear(); 517 } while (hasMore); 518 } catch (InterruptedException e) { 519 progress.cancel(); 520 throw new InterruptedIOException( 521 "Interrupted while control throughput of compacting " + compactionName); 522 } finally { 523 // Clone last cell in the final because writer will append last cell when committing. If 524 // don't clone here and once the scanner get closed, then the memory of last cell will be 525 // released. (HBASE-22582) 526 ((ShipperListener) writer).beforeShipped(); 527 throughputController.finish(compactionName); 528 } 529 progress.complete(); 530 return true; 531 } 532 533 /** 534 * @param store store 535 * @param scanners Store file scanners. 536 * @param scanType Scan type. 537 * @param smallestReadPoint Smallest MVCC read point. 538 * @param earliestPutTs Earliest put across all files. 539 * @return A compaction scanner. 540 */ 541 protected InternalScanner createScanner(HStore store, ScanInfo scanInfo, 542 List<StoreFileScanner> scanners, ScanType scanType, long smallestReadPoint, long earliestPutTs) 543 throws IOException { 544 return new StoreScanner(store, scanInfo, scanners, scanType, smallestReadPoint, earliestPutTs); 545 } 546 547 /** 548 * @param store The store. 549 * @param scanners Store file scanners. 550 * @param smallestReadPoint Smallest MVCC read point. 551 * @param earliestPutTs Earliest put across all files. 552 * @param dropDeletesFromRow Drop deletes starting with this row, inclusive. Can be null. 553 * @param dropDeletesToRow Drop deletes ending with this row, exclusive. Can be null. 554 * @return A compaction scanner. 555 */ 556 protected InternalScanner createScanner(HStore store, ScanInfo scanInfo, 557 List<StoreFileScanner> scanners, long smallestReadPoint, long earliestPutTs, 558 byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException { 559 return new StoreScanner(store, scanInfo, scanners, smallestReadPoint, earliestPutTs, 560 dropDeletesFromRow, dropDeletesToRow); 561 } 562 563 /** 564 * Return the aggregate progress for all currently active compactions. 565 */ 566 public CompactionProgress getProgress() { 567 synchronized (progressSet) { 568 long totalCompactingKVs = 0; 569 long currentCompactedKVs = 0; 570 long totalCompactedSize = 0; 571 for (CompactionProgress progress : progressSet) { 572 totalCompactingKVs += progress.totalCompactingKVs; 573 currentCompactedKVs += progress.currentCompactedKVs; 574 totalCompactedSize += progress.totalCompactedSize; 575 } 576 CompactionProgress result = new CompactionProgress(totalCompactingKVs); 577 result.currentCompactedKVs = currentCompactedKVs; 578 result.totalCompactedSize = totalCompactedSize; 579 return result; 580 } 581 } 582 583 public boolean isCompacting() { 584 return !progressSet.isEmpty(); 585 } 586}