001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.compactions; 019 020import static org.apache.hadoop.hbase.regionserver.HStoreFile.EARLIEST_PUT_TS; 021import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY; 022import static org.apache.hadoop.hbase.regionserver.ScanType.COMPACT_DROP_DELETES; 023import static org.apache.hadoop.hbase.regionserver.ScanType.COMPACT_RETAIN_DELETES; 024 025import java.io.IOException; 026import java.io.InterruptedIOException; 027import java.util.ArrayList; 028import java.util.Collection; 029import java.util.Collections; 030import java.util.IdentityHashMap; 031import java.util.List; 032import java.util.Map; 033import java.util.Set; 034import java.util.function.Consumer; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.hbase.Cell; 038import org.apache.hadoop.hbase.HConstants; 039import org.apache.hadoop.hbase.PrivateCellUtil; 040import org.apache.hadoop.hbase.io.compress.Compression; 041import org.apache.hadoop.hbase.io.hfile.HFile; 042import org.apache.hadoop.hbase.io.hfile.HFileInfo; 043import org.apache.hadoop.hbase.regionserver.CellSink; 044import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams; 045import org.apache.hadoop.hbase.regionserver.HStore; 046import org.apache.hadoop.hbase.regionserver.HStoreFile; 047import org.apache.hadoop.hbase.regionserver.InternalScanner; 048import org.apache.hadoop.hbase.regionserver.KeyValueScanner; 049import org.apache.hadoop.hbase.regionserver.ScanInfo; 050import org.apache.hadoop.hbase.regionserver.ScanType; 051import org.apache.hadoop.hbase.regionserver.ScannerContext; 052import org.apache.hadoop.hbase.regionserver.ShipperListener; 053import org.apache.hadoop.hbase.regionserver.StoreFileReader; 054import org.apache.hadoop.hbase.regionserver.StoreFileScanner; 055import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 056import org.apache.hadoop.hbase.regionserver.StoreScanner; 057import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; 058import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil; 059import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 060import org.apache.hadoop.hbase.security.User; 061import org.apache.hadoop.hbase.util.Bytes; 062import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 063import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; 064import org.apache.yetus.audience.InterfaceAudience; 065import org.slf4j.Logger; 066import org.slf4j.LoggerFactory; 067 068import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 069 070/** 071 * A compactor is a compaction algorithm associated a given policy. Base class also contains 072 * reusable parts for implementing compactors (what is common and what isn't is evolving). 073 * <p> 074 * Compactions might be concurrent against a given store and the Compactor is shared among them. Do 075 * not put mutable state into class fields. All Compactor class fields should be final or 076 * effectively final. 'keepSeqIdPeriod' is an exception to this rule because unit tests may set it. 077 */ 078@InterfaceAudience.Private 079public abstract class Compactor<T extends CellSink> { 080 private static final Logger LOG = LoggerFactory.getLogger(Compactor.class); 081 protected static final long COMPACTION_PROGRESS_LOG_INTERVAL = 60 * 1000; 082 protected final Configuration conf; 083 protected final HStore store; 084 protected final int compactionKVMax; 085 protected final Compression.Algorithm majorCompactionCompression; 086 protected final Compression.Algorithm minorCompactionCompression; 087 088 /** specify how many days to keep MVCC values during major compaction **/ 089 protected int keepSeqIdPeriod; 090 091 // Configs that drive whether we drop page cache behind compactions 092 protected static final String MAJOR_COMPACTION_DROP_CACHE = 093 "hbase.regionserver.majorcompaction.pagecache.drop"; 094 protected static final String MINOR_COMPACTION_DROP_CACHE = 095 "hbase.regionserver.minorcompaction.pagecache.drop"; 096 097 protected final boolean dropCacheMajor; 098 protected final boolean dropCacheMinor; 099 100 // We track progress per request using the CompactionRequestImpl identity as key. 101 // completeCompaction() cleans up this state. 102 private final Set<CompactionProgress> progressSet = 103 Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap<>())); 104 105 // TODO: depending on Store is not good but, realistically, all compactors currently do. 106 Compactor(Configuration conf, HStore store) { 107 this.conf = conf; 108 this.store = store; 109 this.compactionKVMax = 110 this.conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT); 111 this.majorCompactionCompression = (store.getColumnFamilyDescriptor() == null) 112 ? Compression.Algorithm.NONE 113 : store.getColumnFamilyDescriptor().getMajorCompactionCompressionType(); 114 this.minorCompactionCompression = (store.getColumnFamilyDescriptor() == null) 115 ? Compression.Algorithm.NONE 116 : store.getColumnFamilyDescriptor().getMinorCompactionCompressionType(); 117 this.keepSeqIdPeriod = 118 Math.max(this.conf.getInt(HConstants.KEEP_SEQID_PERIOD, HConstants.MIN_KEEP_SEQID_PERIOD), 119 HConstants.MIN_KEEP_SEQID_PERIOD); 120 this.dropCacheMajor = conf.getBoolean(MAJOR_COMPACTION_DROP_CACHE, true); 121 this.dropCacheMinor = conf.getBoolean(MINOR_COMPACTION_DROP_CACHE, true); 122 } 123 124 protected interface CellSinkFactory<S> { 125 S createWriter(InternalScanner scanner, FileDetails fd, boolean shouldDropBehind, boolean major, 126 Consumer<Path> writerCreationTracker) throws IOException; 127 } 128 129 /** The sole reason this class exists is that java has no ref/out/pointer parameters. */ 130 protected static class FileDetails { 131 /** Maximum key count after compaction (for blooms) */ 132 public long maxKeyCount = 0; 133 /** Earliest put timestamp if major compaction */ 134 public long earliestPutTs = HConstants.LATEST_TIMESTAMP; 135 /** Latest put timestamp */ 136 public long latestPutTs = HConstants.LATEST_TIMESTAMP; 137 /** The last key in the files we're compacting. */ 138 public long maxSeqId = 0; 139 /** Latest memstore read point found in any of the involved files */ 140 public long maxMVCCReadpoint = 0; 141 /** Max tags length **/ 142 public int maxTagsLength = 0; 143 /** Min SeqId to keep during a major compaction **/ 144 public long minSeqIdToKeep = 0; 145 /** Total size of the compacted files **/ 146 private long totalCompactedFilesSize = 0; 147 } 148 149 /** 150 * Extracts some details about the files to compact that are commonly needed by compactors. 151 * @param filesToCompact Files. 152 * @param allFiles Whether all files are included for compaction 153 * @parma major If major compaction 154 * @return The result. 155 */ 156 private FileDetails getFileDetails(Collection<HStoreFile> filesToCompact, boolean allFiles, 157 boolean major) throws IOException { 158 FileDetails fd = new FileDetails(); 159 long oldestHFileTimestampToKeepMVCC = 160 EnvironmentEdgeManager.currentTime() - (1000L * 60 * 60 * 24 * this.keepSeqIdPeriod); 161 162 for (HStoreFile file : filesToCompact) { 163 if (allFiles && (file.getModificationTimestamp() < oldestHFileTimestampToKeepMVCC)) { 164 // when isAllFiles is true, all files are compacted so we can calculate the smallest 165 // MVCC value to keep 166 if (fd.minSeqIdToKeep < file.getMaxMemStoreTS()) { 167 fd.minSeqIdToKeep = file.getMaxMemStoreTS(); 168 } 169 } 170 long seqNum = file.getMaxSequenceId(); 171 fd.maxSeqId = Math.max(fd.maxSeqId, seqNum); 172 StoreFileReader r = file.getReader(); 173 if (r == null) { 174 LOG.warn("Null reader for " + file.getPath()); 175 continue; 176 } 177 // NOTE: use getEntries when compacting instead of getFilterEntries, otherwise under-sized 178 // blooms can cause progress to be miscalculated or if the user switches bloom 179 // type (e.g. from ROW to ROWCOL) 180 long keyCount = r.getEntries(); 181 fd.maxKeyCount += keyCount; 182 // calculate the latest MVCC readpoint in any of the involved store files 183 Map<byte[], byte[]> fileInfo = r.loadFileInfo(); 184 185 // calculate the total size of the compacted files 186 fd.totalCompactedFilesSize += r.length(); 187 188 byte[] tmp = null; 189 // Get and set the real MVCCReadpoint for bulk loaded files, which is the 190 // SeqId number. 191 if (r.isBulkLoaded()) { 192 fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, r.getSequenceID()); 193 } else { 194 tmp = fileInfo.get(HFile.Writer.MAX_MEMSTORE_TS_KEY); 195 if (tmp != null) { 196 fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, Bytes.toLong(tmp)); 197 } 198 } 199 tmp = fileInfo.get(HFileInfo.MAX_TAGS_LEN); 200 if (tmp != null) { 201 fd.maxTagsLength = Math.max(fd.maxTagsLength, Bytes.toInt(tmp)); 202 } 203 // If required, calculate the earliest put timestamp of all involved storefiles. 204 // This is used to remove family delete marker during compaction. 205 long earliestPutTs = 0; 206 if (allFiles) { 207 tmp = fileInfo.get(EARLIEST_PUT_TS); 208 if (tmp == null) { 209 // There's a file with no information, must be an old one 210 // assume we have very old puts 211 fd.earliestPutTs = earliestPutTs = HConstants.OLDEST_TIMESTAMP; 212 } else { 213 earliestPutTs = Bytes.toLong(tmp); 214 fd.earliestPutTs = Math.min(fd.earliestPutTs, earliestPutTs); 215 } 216 } 217 tmp = fileInfo.get(TIMERANGE_KEY); 218 fd.latestPutTs = 219 tmp == null ? HConstants.LATEST_TIMESTAMP : TimeRangeTracker.parseFrom(tmp).getMax(); 220 LOG.debug( 221 "Compacting {}, keycount={}, bloomtype={}, size={}, " 222 + "encoding={}, compression={}, seqNum={}{}", 223 (file.getPath() == null ? null : file.getPath().getName()), keyCount, 224 r.getBloomFilterType().toString(), TraditionalBinaryPrefix.long2String(r.length(), "", 1), 225 r.getHFileReader().getDataBlockEncoding(), 226 major ? majorCompactionCompression : minorCompactionCompression, seqNum, 227 (allFiles ? ", earliestPutTs=" + earliestPutTs : "")); 228 } 229 return fd; 230 } 231 232 /** 233 * Creates file scanners for compaction. 234 * @param filesToCompact Files. 235 * @return Scanners. 236 */ 237 private List<StoreFileScanner> createFileScanners(Collection<HStoreFile> filesToCompact, 238 long smallestReadPoint, boolean useDropBehind) throws IOException { 239 return StoreFileScanner.getScannersForCompaction(filesToCompact, useDropBehind, 240 smallestReadPoint); 241 } 242 243 private long getSmallestReadPoint() { 244 return store.getSmallestReadPoint(); 245 } 246 247 protected interface InternalScannerFactory { 248 249 ScanType getScanType(CompactionRequestImpl request); 250 251 InternalScanner createScanner(ScanInfo scanInfo, List<StoreFileScanner> scanners, 252 ScanType scanType, FileDetails fd, long smallestReadPoint) throws IOException; 253 } 254 255 protected final InternalScannerFactory defaultScannerFactory = new InternalScannerFactory() { 256 257 @Override 258 public ScanType getScanType(CompactionRequestImpl request) { 259 return request.isAllFiles() ? COMPACT_DROP_DELETES : COMPACT_RETAIN_DELETES; 260 } 261 262 @Override 263 public InternalScanner createScanner(ScanInfo scanInfo, List<StoreFileScanner> scanners, 264 ScanType scanType, FileDetails fd, long smallestReadPoint) throws IOException { 265 return Compactor.this.createScanner(store, scanInfo, scanners, scanType, smallestReadPoint, 266 fd.earliestPutTs); 267 } 268 }; 269 270 protected final CreateStoreFileWriterParams createParams(FileDetails fd, boolean shouldDropBehind, 271 boolean major, Consumer<Path> writerCreationTracker) { 272 return CreateStoreFileWriterParams.create().maxKeyCount(fd.maxKeyCount) 273 .compression(major ? majorCompactionCompression : minorCompactionCompression) 274 .isCompaction(true).includeMVCCReadpoint(fd.maxMVCCReadpoint > 0) 275 .includesTag(fd.maxTagsLength > 0).shouldDropBehind(shouldDropBehind) 276 .totalCompactedFilesSize(fd.totalCompactedFilesSize) 277 .writerCreationTracker(writerCreationTracker); 278 } 279 280 /** 281 * Creates a writer for a new file. 282 * @param fd The file details. 283 * @return Writer for a new StoreFile 284 * @throws IOException if creation failed 285 */ 286 protected final StoreFileWriter createWriter(FileDetails fd, boolean shouldDropBehind, 287 boolean major, Consumer<Path> writerCreationTracker) throws IOException { 288 // When all MVCC readpoints are 0, don't write them. 289 // See HBASE-8166, HBASE-12600, and HBASE-13389. 290 return store.getStoreEngine() 291 .createWriter(createParams(fd, shouldDropBehind, major, writerCreationTracker)); 292 } 293 294 protected final StoreFileWriter createWriter(FileDetails fd, boolean shouldDropBehind, 295 String fileStoragePolicy, boolean major, Consumer<Path> writerCreationTracker) 296 throws IOException { 297 return store.getStoreEngine() 298 .createWriter(createParams(fd, shouldDropBehind, major, writerCreationTracker) 299 .fileStoragePolicy(fileStoragePolicy)); 300 } 301 302 private ScanInfo preCompactScannerOpen(CompactionRequestImpl request, ScanType scanType, 303 User user) throws IOException { 304 if (store.getCoprocessorHost() == null) { 305 return store.getScanInfo(); 306 } 307 return store.getCoprocessorHost().preCompactScannerOpen(store, scanType, request.getTracker(), 308 request, user); 309 } 310 311 /** 312 * Calls coprocessor, if any, to create scanners - after normal scanner creation. 313 * @param request Compaction request. 314 * @param scanType Scan type. 315 * @param scanner The default scanner created for compaction. 316 * @return Scanner scanner to use (usually the default); null if compaction should not proceed. 317 */ 318 private InternalScanner postCompactScannerOpen(CompactionRequestImpl request, ScanType scanType, 319 InternalScanner scanner, User user) throws IOException { 320 if (store.getCoprocessorHost() == null) { 321 return scanner; 322 } 323 return store.getCoprocessorHost().preCompact(store, scanner, scanType, request.getTracker(), 324 request, user); 325 } 326 327 protected final List<Path> compact(final CompactionRequestImpl request, 328 InternalScannerFactory scannerFactory, CellSinkFactory<T> sinkFactory, 329 ThroughputController throughputController, User user) throws IOException { 330 FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles(), request.isMajor()); 331 332 // Find the smallest read point across all the Scanners. 333 long smallestReadPoint = getSmallestReadPoint(); 334 335 boolean dropCache; 336 if (request.isMajor() || request.isAllFiles()) { 337 dropCache = this.dropCacheMajor; 338 } else { 339 dropCache = this.dropCacheMinor; 340 } 341 342 InternalScanner scanner = null; 343 boolean finished = false; 344 List<StoreFileScanner> scanners = 345 createFileScanners(request.getFiles(), smallestReadPoint, dropCache); 346 T writer = null; 347 CompactionProgress progress = new CompactionProgress(fd.maxKeyCount); 348 progressSet.add(progress); 349 try { 350 /* Include deletes, unless we are doing a major compaction */ 351 ScanType scanType = scannerFactory.getScanType(request); 352 ScanInfo scanInfo = preCompactScannerOpen(request, scanType, user); 353 scanner = postCompactScannerOpen(request, scanType, 354 scannerFactory.createScanner(scanInfo, scanners, scanType, fd, smallestReadPoint), user); 355 boolean cleanSeqId = false; 356 if (fd.minSeqIdToKeep > 0 && !store.getColumnFamilyDescriptor().isNewVersionBehavior()) { 357 // For mvcc-sensitive family, we never set mvcc to 0. 358 smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint); 359 cleanSeqId = true; 360 } 361 writer = sinkFactory.createWriter(scanner, fd, dropCache, request.isMajor(), 362 request.getWriterCreationTracker()); 363 finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId, 364 throughputController, request, progress); 365 if (!finished) { 366 throw new InterruptedIOException("Aborting compaction of store " + store + " in region " 367 + store.getRegionInfo().getRegionNameAsString() + " because it was interrupted."); 368 } 369 } finally { 370 // createScanner may fail when seeking hfiles encounter Exception, e.g. even only one hfile 371 // reader encounters java.io.IOException: Invalid HFile block magic: 372 // \x00\x00\x00\x00\x00\x00\x00\x00 373 // and then scanner will be null, but scanners for all the hfiles should be closed, 374 // or else we will find leak of ESTABLISHED sockets. 375 if (scanner == null) { 376 for (StoreFileScanner sfs : scanners) { 377 sfs.close(); 378 } 379 } else { 380 Closeables.close(scanner, true); 381 } 382 if (!finished) { 383 if (writer != null) { 384 abortWriter(writer); 385 } 386 } else { 387 store.updateCompactedMetrics(request.isMajor(), progress); 388 } 389 progressSet.remove(progress); 390 } 391 assert finished : "We should have exited the method on all error paths"; 392 assert writer != null : "Writer should be non-null if no error"; 393 return commitWriter(writer, fd, request); 394 } 395 396 protected abstract List<Path> commitWriter(T writer, FileDetails fd, 397 CompactionRequestImpl request) throws IOException; 398 399 protected abstract void abortWriter(T writer) throws IOException; 400 401 /** 402 * Performs the compaction. 403 * @param fd FileDetails of cell sink writer 404 * @param scanner Where to read from. 405 * @param writer Where to write to. 406 * @param smallestReadPoint Smallest read point. 407 * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= 408 * smallestReadPoint 409 * @param request compaction request. 410 * @param progress Progress reporter. 411 * @return Whether compaction ended; false if it was interrupted for some reason. 412 */ 413 protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, 414 long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController, 415 CompactionRequestImpl request, CompactionProgress progress) throws IOException { 416 assert writer instanceof ShipperListener; 417 long bytesWrittenProgressForLog = 0; 418 long bytesWrittenProgressForShippedCall = 0; 419 // Since scanner.next() can return 'false' but still be delivering data, 420 // we have to use a do/while loop. 421 List<Cell> cells = new ArrayList<>(); 422 long currentTime = EnvironmentEdgeManager.currentTime(); 423 long lastMillis = 0; 424 if (LOG.isDebugEnabled()) { 425 lastMillis = currentTime; 426 } 427 CloseChecker closeChecker = new CloseChecker(conf, currentTime); 428 String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction"); 429 long now = 0; 430 boolean hasMore; 431 ScannerContext scannerContext = 432 ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); 433 434 throughputController.start(compactionName); 435 KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null; 436 long shippedCallSizeLimit = 437 (long) request.getFiles().size() * this.store.getColumnFamilyDescriptor().getBlocksize(); 438 try { 439 do { 440 hasMore = scanner.next(cells, scannerContext); 441 currentTime = EnvironmentEdgeManager.currentTime(); 442 if (LOG.isDebugEnabled()) { 443 now = currentTime; 444 } 445 if (closeChecker.isTimeLimit(store, currentTime)) { 446 progress.cancel(); 447 return false; 448 } 449 // output to writer: 450 Cell lastCleanCell = null; 451 long lastCleanCellSeqId = 0; 452 for (Cell c : cells) { 453 if (cleanSeqId && c.getSequenceId() <= smallestReadPoint) { 454 lastCleanCell = c; 455 lastCleanCellSeqId = c.getSequenceId(); 456 PrivateCellUtil.setSequenceId(c, 0); 457 } else { 458 lastCleanCell = null; 459 lastCleanCellSeqId = 0; 460 } 461 writer.append(c); 462 int len = c.getSerializedSize(); 463 ++progress.currentCompactedKVs; 464 progress.totalCompactedSize += len; 465 bytesWrittenProgressForShippedCall += len; 466 if (LOG.isDebugEnabled()) { 467 bytesWrittenProgressForLog += len; 468 } 469 throughputController.control(compactionName, len); 470 if (closeChecker.isSizeLimit(store, len)) { 471 progress.cancel(); 472 return false; 473 } 474 } 475 if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) { 476 if (lastCleanCell != null) { 477 // HBASE-16931, set back sequence id to avoid affecting scan order unexpectedly. 478 // ShipperListener will do a clone of the last cells it refer, so need to set back 479 // sequence id before ShipperListener.beforeShipped 480 PrivateCellUtil.setSequenceId(lastCleanCell, lastCleanCellSeqId); 481 } 482 // Clone the cells that are in the writer so that they are freed of references, 483 // if they are holding any. 484 ((ShipperListener) writer).beforeShipped(); 485 // The SHARED block references, being read for compaction, will be kept in prevBlocks 486 // list(See HFileScannerImpl#prevBlocks). In case of scan flow, after each set of cells 487 // being returned to client, we will call shipped() which can clear this list. Here by 488 // we are doing the similar thing. In between the compaction (after every N cells 489 // written with collective size of 'shippedCallSizeLimit') we will call shipped which 490 // may clear prevBlocks list. 491 kvs.shipped(); 492 bytesWrittenProgressForShippedCall = 0; 493 } 494 if (lastCleanCell != null) { 495 // HBASE-16931, set back sequence id to avoid affecting scan order unexpectedly 496 PrivateCellUtil.setSequenceId(lastCleanCell, lastCleanCellSeqId); 497 } 498 // Log the progress of long running compactions every minute if 499 // logging at DEBUG level 500 if (LOG.isDebugEnabled()) { 501 if ((now - lastMillis) >= COMPACTION_PROGRESS_LOG_INTERVAL) { 502 String rate = String.format("%.2f", 503 (bytesWrittenProgressForLog / 1024.0) / ((now - lastMillis) / 1000.0)); 504 LOG.debug("Compaction progress: {} {}, rate={} KB/sec, throughputController is {}", 505 compactionName, progress, rate, throughputController); 506 lastMillis = now; 507 bytesWrittenProgressForLog = 0; 508 } 509 } 510 cells.clear(); 511 } while (hasMore); 512 } catch (InterruptedException e) { 513 progress.cancel(); 514 throw new InterruptedIOException( 515 "Interrupted while control throughput of compacting " + compactionName); 516 } finally { 517 // Clone last cell in the final because writer will append last cell when committing. If 518 // don't clone here and once the scanner get closed, then the memory of last cell will be 519 // released. (HBASE-22582) 520 ((ShipperListener) writer).beforeShipped(); 521 throughputController.finish(compactionName); 522 } 523 progress.complete(); 524 return true; 525 } 526 527 /** 528 * @param store store 529 * @param scanners Store file scanners. 530 * @param scanType Scan type. 531 * @param smallestReadPoint Smallest MVCC read point. 532 * @param earliestPutTs Earliest put across all files. 533 * @return A compaction scanner. 534 */ 535 protected InternalScanner createScanner(HStore store, ScanInfo scanInfo, 536 List<StoreFileScanner> scanners, ScanType scanType, long smallestReadPoint, long earliestPutTs) 537 throws IOException { 538 return new StoreScanner(store, scanInfo, scanners, scanType, smallestReadPoint, earliestPutTs); 539 } 540 541 /** 542 * @param store The store. 543 * @param scanners Store file scanners. 544 * @param smallestReadPoint Smallest MVCC read point. 545 * @param earliestPutTs Earliest put across all files. 546 * @param dropDeletesFromRow Drop deletes starting with this row, inclusive. Can be null. 547 * @param dropDeletesToRow Drop deletes ending with this row, exclusive. Can be null. 548 * @return A compaction scanner. 549 */ 550 protected InternalScanner createScanner(HStore store, ScanInfo scanInfo, 551 List<StoreFileScanner> scanners, long smallestReadPoint, long earliestPutTs, 552 byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException { 553 return new StoreScanner(store, scanInfo, scanners, smallestReadPoint, earliestPutTs, 554 dropDeletesFromRow, dropDeletesToRow); 555 } 556 557 /** 558 * Return the aggregate progress for all currently active compactions. 559 */ 560 public CompactionProgress getProgress() { 561 synchronized (progressSet) { 562 long totalCompactingKVs = 0; 563 long currentCompactedKVs = 0; 564 long totalCompactedSize = 0; 565 for (CompactionProgress progress : progressSet) { 566 totalCompactingKVs += progress.totalCompactingKVs; 567 currentCompactedKVs += progress.currentCompactedKVs; 568 totalCompactedSize += progress.totalCompactedSize; 569 } 570 CompactionProgress result = new CompactionProgress(totalCompactingKVs); 571 result.currentCompactedKVs = currentCompactedKVs; 572 result.totalCompactedSize = totalCompactedSize; 573 return result; 574 } 575 } 576 577 public boolean isCompacting() { 578 return !progressSet.isEmpty(); 579 } 580}