001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.compactions; 019 020import static org.apache.hadoop.hbase.regionserver.HStoreFile.EARLIEST_PUT_TS; 021import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY; 022import static org.apache.hadoop.hbase.regionserver.ScanType.COMPACT_DROP_DELETES; 023import static org.apache.hadoop.hbase.regionserver.ScanType.COMPACT_RETAIN_DELETES; 024 025import java.io.IOException; 026import java.io.InterruptedIOException; 027import java.util.ArrayList; 028import java.util.Collection; 029import java.util.Collections; 030import java.util.IdentityHashMap; 031import java.util.List; 032import java.util.Map; 033import java.util.Set; 034import java.util.function.Consumer; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.hbase.Cell; 038import org.apache.hadoop.hbase.HConstants; 039import org.apache.hadoop.hbase.PrivateCellUtil; 040import org.apache.hadoop.hbase.PrivateConstants; 041import org.apache.hadoop.hbase.io.compress.Compression; 042import org.apache.hadoop.hbase.io.hfile.HFile; 043import org.apache.hadoop.hbase.io.hfile.HFileInfo; 044import org.apache.hadoop.hbase.regionserver.CellSink; 045import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams; 046import org.apache.hadoop.hbase.regionserver.HStore; 047import org.apache.hadoop.hbase.regionserver.HStoreFile; 048import org.apache.hadoop.hbase.regionserver.InternalScanner; 049import org.apache.hadoop.hbase.regionserver.KeyValueScanner; 050import org.apache.hadoop.hbase.regionserver.ScanInfo; 051import org.apache.hadoop.hbase.regionserver.ScanType; 052import org.apache.hadoop.hbase.regionserver.ScannerContext; 053import org.apache.hadoop.hbase.regionserver.ShipperListener; 054import org.apache.hadoop.hbase.regionserver.StoreFileReader; 055import org.apache.hadoop.hbase.regionserver.StoreFileScanner; 056import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 057import org.apache.hadoop.hbase.regionserver.StoreScanner; 058import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; 059import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil; 060import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 061import org.apache.hadoop.hbase.security.User; 062import org.apache.hadoop.hbase.util.Bytes; 063import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 064import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; 065import org.apache.yetus.audience.InterfaceAudience; 066import org.slf4j.Logger; 067import org.slf4j.LoggerFactory; 068 069import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 070 071/** 072 * A compactor is a compaction algorithm associated a given policy. Base class also contains 073 * reusable parts for implementing compactors (what is common and what isn't is evolving). 074 * <p> 075 * Compactions might be concurrent against a given store and the Compactor is shared among them. Do 076 * not put mutable state into class fields. All Compactor class fields should be final or 077 * effectively final. 'keepSeqIdPeriod' is an exception to this rule because unit tests may set it. 078 */ 079@InterfaceAudience.Private 080public abstract class Compactor<T extends CellSink> { 081 private static final Logger LOG = LoggerFactory.getLogger(Compactor.class); 082 protected static final long COMPACTION_PROGRESS_LOG_INTERVAL = 60 * 1000; 083 protected final Configuration conf; 084 protected final HStore store; 085 protected final int compactionKVMax; 086 protected final Compression.Algorithm majorCompactionCompression; 087 protected final Compression.Algorithm minorCompactionCompression; 088 089 /** specify how many days to keep MVCC values during major compaction **/ 090 protected int keepSeqIdPeriod; 091 092 // Configs that drive whether we drop page cache behind compactions 093 protected static final String MAJOR_COMPACTION_DROP_CACHE = 094 "hbase.regionserver.majorcompaction.pagecache.drop"; 095 protected static final String MINOR_COMPACTION_DROP_CACHE = 096 "hbase.regionserver.minorcompaction.pagecache.drop"; 097 098 protected final boolean dropCacheMajor; 099 protected final boolean dropCacheMinor; 100 101 // We track progress per request using the CompactionRequestImpl identity as key. 102 // completeCompaction() cleans up this state. 103 private final Set<CompactionProgress> progressSet = 104 Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap<>())); 105 106 // TODO: depending on Store is not good but, realistically, all compactors currently do. 107 Compactor(Configuration conf, HStore store) { 108 this.conf = conf; 109 this.store = store; 110 this.compactionKVMax = 111 this.conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT); 112 this.majorCompactionCompression = (store.getColumnFamilyDescriptor() == null) 113 ? Compression.Algorithm.NONE 114 : store.getColumnFamilyDescriptor().getMajorCompactionCompressionType(); 115 this.minorCompactionCompression = (store.getColumnFamilyDescriptor() == null) 116 ? Compression.Algorithm.NONE 117 : store.getColumnFamilyDescriptor().getMinorCompactionCompressionType(); 118 this.keepSeqIdPeriod = 119 Math.max(this.conf.getInt(HConstants.KEEP_SEQID_PERIOD, HConstants.MIN_KEEP_SEQID_PERIOD), 120 HConstants.MIN_KEEP_SEQID_PERIOD); 121 this.dropCacheMajor = conf.getBoolean(MAJOR_COMPACTION_DROP_CACHE, true); 122 this.dropCacheMinor = conf.getBoolean(MINOR_COMPACTION_DROP_CACHE, true); 123 } 124 125 protected interface CellSinkFactory<S> { 126 S createWriter(InternalScanner scanner, FileDetails fd, boolean shouldDropBehind, boolean major, 127 Consumer<Path> writerCreationTracker) throws IOException; 128 } 129 130 /** The sole reason this class exists is that java has no ref/out/pointer parameters. */ 131 protected static class FileDetails { 132 /** Maximum key count after compaction (for blooms) */ 133 public long maxKeyCount = 0; 134 /** Earliest put timestamp if major compaction */ 135 public long earliestPutTs = HConstants.LATEST_TIMESTAMP; 136 /** Latest put timestamp */ 137 public long latestPutTs = HConstants.LATEST_TIMESTAMP; 138 /** The last key in the files we're compacting. */ 139 public long maxSeqId = 0; 140 /** Latest memstore read point found in any of the involved files */ 141 public long maxMVCCReadpoint = 0; 142 /** Max tags length **/ 143 public int maxTagsLength = 0; 144 /** Min SeqId to keep during a major compaction **/ 145 public long minSeqIdToKeep = 0; 146 /** Total size of the compacted files **/ 147 private long totalCompactedFilesSize = 0; 148 } 149 150 /** 151 * Extracts some details about the files to compact that are commonly needed by compactors. 152 * @param filesToCompact Files. 153 * @param allFiles Whether all files are included for compaction 154 * @parma major If major compaction 155 * @return The result. 156 */ 157 private FileDetails getFileDetails(Collection<HStoreFile> filesToCompact, boolean allFiles, 158 boolean major) throws IOException { 159 FileDetails fd = new FileDetails(); 160 long oldestHFileTimestampToKeepMVCC = 161 EnvironmentEdgeManager.currentTime() - (1000L * 60 * 60 * 24 * this.keepSeqIdPeriod); 162 163 for (HStoreFile file : filesToCompact) { 164 if (allFiles && (file.getModificationTimestamp() < oldestHFileTimestampToKeepMVCC)) { 165 // when isAllFiles is true, all files are compacted so we can calculate the smallest 166 // MVCC value to keep 167 if (fd.minSeqIdToKeep < file.getMaxMemStoreTS()) { 168 fd.minSeqIdToKeep = file.getMaxMemStoreTS(); 169 } 170 } 171 long seqNum = file.getMaxSequenceId(); 172 fd.maxSeqId = Math.max(fd.maxSeqId, seqNum); 173 StoreFileReader r = file.getReader(); 174 if (r == null) { 175 LOG.warn("Null reader for " + file.getPath()); 176 continue; 177 } 178 // NOTE: use getEntries when compacting instead of getFilterEntries, otherwise under-sized 179 // blooms can cause progress to be miscalculated or if the user switches bloom 180 // type (e.g. from ROW to ROWCOL) 181 long keyCount = r.getEntries(); 182 fd.maxKeyCount += keyCount; 183 // calculate the latest MVCC readpoint in any of the involved store files 184 Map<byte[], byte[]> fileInfo = r.loadFileInfo(); 185 186 // calculate the total size of the compacted files 187 fd.totalCompactedFilesSize += r.length(); 188 189 byte[] tmp = null; 190 // Get and set the real MVCCReadpoint for bulk loaded files, which is the 191 // SeqId number. 192 if (r.isBulkLoaded()) { 193 fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, r.getSequenceID()); 194 } else { 195 tmp = fileInfo.get(HFile.Writer.MAX_MEMSTORE_TS_KEY); 196 if (tmp != null) { 197 fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, Bytes.toLong(tmp)); 198 } 199 } 200 tmp = fileInfo.get(HFileInfo.MAX_TAGS_LEN); 201 if (tmp != null) { 202 fd.maxTagsLength = Math.max(fd.maxTagsLength, Bytes.toInt(tmp)); 203 } 204 // If required, calculate the earliest put timestamp of all involved storefiles. 205 // This is used to remove family delete marker during compaction. 206 long earliestPutTs = 0; 207 if (allFiles) { 208 tmp = fileInfo.get(EARLIEST_PUT_TS); 209 if (tmp == null) { 210 // There's a file with no information, must be an old one 211 // assume we have very old puts 212 fd.earliestPutTs = earliestPutTs = PrivateConstants.OLDEST_TIMESTAMP; 213 } else { 214 earliestPutTs = Bytes.toLong(tmp); 215 fd.earliestPutTs = Math.min(fd.earliestPutTs, earliestPutTs); 216 } 217 } 218 tmp = fileInfo.get(TIMERANGE_KEY); 219 fd.latestPutTs = 220 tmp == null ? HConstants.LATEST_TIMESTAMP : TimeRangeTracker.parseFrom(tmp).getMax(); 221 LOG.debug( 222 "Compacting {}, keycount={}, bloomtype={}, size={}, " 223 + "encoding={}, compression={}, seqNum={}{}", 224 (file.getPath() == null ? null : file.getPath().getName()), keyCount, 225 r.getBloomFilterType().toString(), TraditionalBinaryPrefix.long2String(r.length(), "", 1), 226 r.getHFileReader().getDataBlockEncoding(), 227 major ? majorCompactionCompression : minorCompactionCompression, seqNum, 228 (allFiles ? ", earliestPutTs=" + earliestPutTs : "")); 229 } 230 return fd; 231 } 232 233 /** 234 * Creates file scanners for compaction. 235 * @param filesToCompact Files. 236 * @return Scanners. 237 */ 238 private List<StoreFileScanner> createFileScanners(Collection<HStoreFile> filesToCompact, 239 long smallestReadPoint, boolean useDropBehind) throws IOException { 240 return StoreFileScanner.getScannersForCompaction(filesToCompact, useDropBehind, 241 smallestReadPoint); 242 } 243 244 private long getSmallestReadPoint() { 245 return store.getSmallestReadPoint(); 246 } 247 248 protected interface InternalScannerFactory { 249 250 ScanType getScanType(CompactionRequestImpl request); 251 252 InternalScanner createScanner(ScanInfo scanInfo, List<StoreFileScanner> scanners, 253 ScanType scanType, FileDetails fd, long smallestReadPoint) throws IOException; 254 } 255 256 protected final InternalScannerFactory defaultScannerFactory = new InternalScannerFactory() { 257 258 @Override 259 public ScanType getScanType(CompactionRequestImpl request) { 260 return request.isAllFiles() ? COMPACT_DROP_DELETES : COMPACT_RETAIN_DELETES; 261 } 262 263 @Override 264 public InternalScanner createScanner(ScanInfo scanInfo, List<StoreFileScanner> scanners, 265 ScanType scanType, FileDetails fd, long smallestReadPoint) throws IOException { 266 return Compactor.this.createScanner(store, scanInfo, scanners, scanType, smallestReadPoint, 267 fd.earliestPutTs); 268 } 269 }; 270 271 protected final CreateStoreFileWriterParams createParams(FileDetails fd, boolean shouldDropBehind, 272 boolean major, Consumer<Path> writerCreationTracker) { 273 return CreateStoreFileWriterParams.create().maxKeyCount(fd.maxKeyCount) 274 .compression(major ? majorCompactionCompression : minorCompactionCompression) 275 .isCompaction(true).includeMVCCReadpoint(fd.maxMVCCReadpoint > 0) 276 .includesTag(fd.maxTagsLength > 0).shouldDropBehind(shouldDropBehind) 277 .totalCompactedFilesSize(fd.totalCompactedFilesSize) 278 .writerCreationTracker(writerCreationTracker); 279 } 280 281 /** 282 * Creates a writer for a new file. 283 * @param fd The file details. 284 * @return Writer for a new StoreFile 285 * @throws IOException if creation failed 286 */ 287 protected final StoreFileWriter createWriter(FileDetails fd, boolean shouldDropBehind, 288 boolean major, Consumer<Path> writerCreationTracker) throws IOException { 289 // When all MVCC readpoints are 0, don't write them. 290 // See HBASE-8166, HBASE-12600, and HBASE-13389. 291 return store.getStoreEngine() 292 .createWriter(createParams(fd, shouldDropBehind, major, writerCreationTracker)); 293 } 294 295 protected final StoreFileWriter createWriter(FileDetails fd, boolean shouldDropBehind, 296 String fileStoragePolicy, boolean major, Consumer<Path> writerCreationTracker) 297 throws IOException { 298 return store.getStoreEngine() 299 .createWriter(createParams(fd, shouldDropBehind, major, writerCreationTracker) 300 .fileStoragePolicy(fileStoragePolicy)); 301 } 302 303 private ScanInfo preCompactScannerOpen(CompactionRequestImpl request, ScanType scanType, 304 User user) throws IOException { 305 if (store.getCoprocessorHost() == null) { 306 return store.getScanInfo(); 307 } 308 return store.getCoprocessorHost().preCompactScannerOpen(store, scanType, request.getTracker(), 309 request, user); 310 } 311 312 /** 313 * Calls coprocessor, if any, to create scanners - after normal scanner creation. 314 * @param request Compaction request. 315 * @param scanType Scan type. 316 * @param scanner The default scanner created for compaction. 317 * @return Scanner scanner to use (usually the default); null if compaction should not proceed. 318 */ 319 private InternalScanner postCompactScannerOpen(CompactionRequestImpl request, ScanType scanType, 320 InternalScanner scanner, User user) throws IOException { 321 if (store.getCoprocessorHost() == null) { 322 return scanner; 323 } 324 return store.getCoprocessorHost().preCompact(store, scanner, scanType, request.getTracker(), 325 request, user); 326 } 327 328 protected final List<Path> compact(final CompactionRequestImpl request, 329 InternalScannerFactory scannerFactory, CellSinkFactory<T> sinkFactory, 330 ThroughputController throughputController, User user) throws IOException { 331 FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles(), request.isMajor()); 332 333 // Find the smallest read point across all the Scanners. 334 long smallestReadPoint = getSmallestReadPoint(); 335 336 boolean dropCache; 337 if (request.isMajor() || request.isAllFiles()) { 338 dropCache = this.dropCacheMajor; 339 } else { 340 dropCache = this.dropCacheMinor; 341 } 342 343 InternalScanner scanner = null; 344 boolean finished = false; 345 List<StoreFileScanner> scanners = 346 createFileScanners(request.getFiles(), smallestReadPoint, dropCache); 347 T writer = null; 348 CompactionProgress progress = new CompactionProgress(fd.maxKeyCount); 349 progressSet.add(progress); 350 try { 351 /* Include deletes, unless we are doing a major compaction */ 352 ScanType scanType = scannerFactory.getScanType(request); 353 ScanInfo scanInfo = preCompactScannerOpen(request, scanType, user); 354 scanner = postCompactScannerOpen(request, scanType, 355 scannerFactory.createScanner(scanInfo, scanners, scanType, fd, smallestReadPoint), user); 356 boolean cleanSeqId = false; 357 if (fd.minSeqIdToKeep > 0 && !store.getColumnFamilyDescriptor().isNewVersionBehavior()) { 358 // For mvcc-sensitive family, we never set mvcc to 0. 359 smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint); 360 cleanSeqId = true; 361 } 362 writer = sinkFactory.createWriter(scanner, fd, dropCache, request.isMajor(), 363 request.getWriterCreationTracker()); 364 finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId, 365 throughputController, request, progress); 366 if (!finished) { 367 throw new InterruptedIOException("Aborting compaction of store " + store + " in region " 368 + store.getRegionInfo().getRegionNameAsString() + " because it was interrupted."); 369 } 370 } finally { 371 // createScanner may fail when seeking hfiles encounter Exception, e.g. even only one hfile 372 // reader encounters java.io.IOException: Invalid HFile block magic: 373 // \x00\x00\x00\x00\x00\x00\x00\x00 374 // and then scanner will be null, but scanners for all the hfiles should be closed, 375 // or else we will find leak of ESTABLISHED sockets. 376 if (scanner == null) { 377 for (StoreFileScanner sfs : scanners) { 378 sfs.close(); 379 } 380 } else { 381 Closeables.close(scanner, true); 382 } 383 if (!finished) { 384 if (writer != null) { 385 abortWriter(writer); 386 } 387 } else { 388 store.updateCompactedMetrics(request.isMajor(), progress); 389 } 390 progressSet.remove(progress); 391 } 392 assert finished : "We should have exited the method on all error paths"; 393 assert writer != null : "Writer should be non-null if no error"; 394 return commitWriter(writer, fd, request); 395 } 396 397 protected abstract List<Path> commitWriter(T writer, FileDetails fd, 398 CompactionRequestImpl request) throws IOException; 399 400 protected abstract void abortWriter(T writer) throws IOException; 401 402 /** 403 * Performs the compaction. 404 * @param fd FileDetails of cell sink writer 405 * @param scanner Where to read from. 406 * @param writer Where to write to. 407 * @param smallestReadPoint Smallest read point. 408 * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= 409 * smallestReadPoint 410 * @param request compaction request. 411 * @param progress Progress reporter. 412 * @return Whether compaction ended; false if it was interrupted for some reason. 413 */ 414 protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, 415 long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController, 416 CompactionRequestImpl request, CompactionProgress progress) throws IOException { 417 assert writer instanceof ShipperListener; 418 long bytesWrittenProgressForLog = 0; 419 long bytesWrittenProgressForShippedCall = 0; 420 // Since scanner.next() can return 'false' but still be delivering data, 421 // we have to use a do/while loop. 422 List<Cell> cells = new ArrayList<>(); 423 long currentTime = EnvironmentEdgeManager.currentTime(); 424 long lastMillis = 0; 425 if (LOG.isDebugEnabled()) { 426 lastMillis = currentTime; 427 } 428 CloseChecker closeChecker = new CloseChecker(conf, currentTime); 429 String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction"); 430 long now = 0; 431 boolean hasMore; 432 ScannerContext scannerContext = 433 ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); 434 435 throughputController.start(compactionName); 436 KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null; 437 long shippedCallSizeLimit = 438 (long) request.getFiles().size() * this.store.getColumnFamilyDescriptor().getBlocksize(); 439 try { 440 do { 441 hasMore = scanner.next(cells, scannerContext); 442 currentTime = EnvironmentEdgeManager.currentTime(); 443 if (LOG.isDebugEnabled()) { 444 now = currentTime; 445 } 446 if (closeChecker.isTimeLimit(store, currentTime)) { 447 progress.cancel(); 448 return false; 449 } 450 // output to writer: 451 Cell lastCleanCell = null; 452 long lastCleanCellSeqId = 0; 453 for (Cell c : cells) { 454 if (cleanSeqId && c.getSequenceId() <= smallestReadPoint) { 455 lastCleanCell = c; 456 lastCleanCellSeqId = c.getSequenceId(); 457 PrivateCellUtil.setSequenceId(c, 0); 458 } else { 459 lastCleanCell = null; 460 lastCleanCellSeqId = 0; 461 } 462 writer.append(c); 463 int len = c.getSerializedSize(); 464 ++progress.currentCompactedKVs; 465 progress.totalCompactedSize += len; 466 bytesWrittenProgressForShippedCall += len; 467 if (LOG.isDebugEnabled()) { 468 bytesWrittenProgressForLog += len; 469 } 470 throughputController.control(compactionName, len); 471 if (closeChecker.isSizeLimit(store, len)) { 472 progress.cancel(); 473 return false; 474 } 475 } 476 if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) { 477 if (lastCleanCell != null) { 478 // HBASE-16931, set back sequence id to avoid affecting scan order unexpectedly. 479 // ShipperListener will do a clone of the last cells it refer, so need to set back 480 // sequence id before ShipperListener.beforeShipped 481 PrivateCellUtil.setSequenceId(lastCleanCell, lastCleanCellSeqId); 482 } 483 // Clone the cells that are in the writer so that they are freed of references, 484 // if they are holding any. 485 ((ShipperListener) writer).beforeShipped(); 486 // The SHARED block references, being read for compaction, will be kept in prevBlocks 487 // list(See HFileScannerImpl#prevBlocks). In case of scan flow, after each set of cells 488 // being returned to client, we will call shipped() which can clear this list. Here by 489 // we are doing the similar thing. In between the compaction (after every N cells 490 // written with collective size of 'shippedCallSizeLimit') we will call shipped which 491 // may clear prevBlocks list. 492 kvs.shipped(); 493 bytesWrittenProgressForShippedCall = 0; 494 } 495 if (lastCleanCell != null) { 496 // HBASE-16931, set back sequence id to avoid affecting scan order unexpectedly 497 PrivateCellUtil.setSequenceId(lastCleanCell, lastCleanCellSeqId); 498 } 499 // Log the progress of long running compactions every minute if 500 // logging at DEBUG level 501 if (LOG.isDebugEnabled()) { 502 if ((now - lastMillis) >= COMPACTION_PROGRESS_LOG_INTERVAL) { 503 String rate = String.format("%.2f", 504 (bytesWrittenProgressForLog / 1024.0) / ((now - lastMillis) / 1000.0)); 505 LOG.debug("Compaction progress: {} {}, rate={} KB/sec, throughputController is {}", 506 compactionName, progress, rate, throughputController); 507 lastMillis = now; 508 bytesWrittenProgressForLog = 0; 509 } 510 } 511 cells.clear(); 512 } while (hasMore); 513 } catch (InterruptedException e) { 514 progress.cancel(); 515 throw new InterruptedIOException( 516 "Interrupted while control throughput of compacting " + compactionName); 517 } finally { 518 // Clone last cell in the final because writer will append last cell when committing. If 519 // don't clone here and once the scanner get closed, then the memory of last cell will be 520 // released. (HBASE-22582) 521 ((ShipperListener) writer).beforeShipped(); 522 throughputController.finish(compactionName); 523 } 524 progress.complete(); 525 return true; 526 } 527 528 /** 529 * @param store store 530 * @param scanners Store file scanners. 531 * @param scanType Scan type. 532 * @param smallestReadPoint Smallest MVCC read point. 533 * @param earliestPutTs Earliest put across all files. 534 * @return A compaction scanner. 535 */ 536 protected InternalScanner createScanner(HStore store, ScanInfo scanInfo, 537 List<StoreFileScanner> scanners, ScanType scanType, long smallestReadPoint, long earliestPutTs) 538 throws IOException { 539 return new StoreScanner(store, scanInfo, scanners, scanType, smallestReadPoint, earliestPutTs); 540 } 541 542 /** 543 * @param store The store. 544 * @param scanners Store file scanners. 545 * @param smallestReadPoint Smallest MVCC read point. 546 * @param earliestPutTs Earliest put across all files. 547 * @param dropDeletesFromRow Drop deletes starting with this row, inclusive. Can be null. 548 * @param dropDeletesToRow Drop deletes ending with this row, exclusive. Can be null. 549 * @return A compaction scanner. 550 */ 551 protected InternalScanner createScanner(HStore store, ScanInfo scanInfo, 552 List<StoreFileScanner> scanners, long smallestReadPoint, long earliestPutTs, 553 byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException { 554 return new StoreScanner(store, scanInfo, scanners, smallestReadPoint, earliestPutTs, 555 dropDeletesFromRow, dropDeletesToRow); 556 } 557 558 /** 559 * Return the aggregate progress for all currently active compactions. 560 */ 561 public CompactionProgress getProgress() { 562 synchronized (progressSet) { 563 long totalCompactingKVs = 0; 564 long currentCompactedKVs = 0; 565 long totalCompactedSize = 0; 566 for (CompactionProgress progress : progressSet) { 567 totalCompactingKVs += progress.totalCompactingKVs; 568 currentCompactedKVs += progress.currentCompactedKVs; 569 totalCompactedSize += progress.totalCompactedSize; 570 } 571 CompactionProgress result = new CompactionProgress(totalCompactingKVs); 572 result.currentCompactedKVs = currentCompactedKVs; 573 result.totalCompactedSize = totalCompactedSize; 574 return result; 575 } 576 } 577 578 public boolean isCompacting() { 579 return !progressSet.isEmpty(); 580 } 581}