001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.compactions; 019 020import static org.apache.hadoop.hbase.regionserver.HStoreFile.EARLIEST_PUT_TS; 021import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY; 022import static org.apache.hadoop.hbase.regionserver.ScanType.COMPACT_DROP_DELETES; 023import static org.apache.hadoop.hbase.regionserver.ScanType.COMPACT_RETAIN_DELETES; 024 025import java.io.IOException; 026import java.io.InterruptedIOException; 027import java.util.ArrayList; 028import java.util.Collection; 029import java.util.Collections; 030import java.util.IdentityHashMap; 031import java.util.List; 032import java.util.Map; 033import java.util.Set; 034import java.util.function.Consumer; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.hbase.Cell; 038import org.apache.hadoop.hbase.ExtendedCell; 039import org.apache.hadoop.hbase.HConstants; 040import org.apache.hadoop.hbase.PrivateCellUtil; 041import org.apache.hadoop.hbase.PrivateConstants; 042import org.apache.hadoop.hbase.io.compress.Compression; 043import org.apache.hadoop.hbase.io.hfile.HFile; 044import org.apache.hadoop.hbase.io.hfile.HFileInfo; 045import org.apache.hadoop.hbase.regionserver.CellSink; 046import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams; 047import org.apache.hadoop.hbase.regionserver.HStore; 048import org.apache.hadoop.hbase.regionserver.HStoreFile; 049import org.apache.hadoop.hbase.regionserver.InternalScanner; 050import org.apache.hadoop.hbase.regionserver.ScanInfo; 051import org.apache.hadoop.hbase.regionserver.ScanType; 052import org.apache.hadoop.hbase.regionserver.ScannerContext; 053import org.apache.hadoop.hbase.regionserver.Shipper; 054import org.apache.hadoop.hbase.regionserver.ShipperListener; 055import org.apache.hadoop.hbase.regionserver.StoreFileReader; 056import org.apache.hadoop.hbase.regionserver.StoreFileScanner; 057import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 058import org.apache.hadoop.hbase.regionserver.StoreScanner; 059import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; 060import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil; 061import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 062import org.apache.hadoop.hbase.security.User; 063import org.apache.hadoop.hbase.util.Bytes; 064import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 065import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; 066import org.apache.yetus.audience.InterfaceAudience; 067import org.slf4j.Logger; 068import org.slf4j.LoggerFactory; 069 070import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 071 072/** 073 * A compactor is a compaction algorithm associated a given policy. Base class also contains 074 * reusable parts for implementing compactors (what is common and what isn't is evolving). 075 * <p> 076 * Compactions might be concurrent against a given store and the Compactor is shared among them. Do 077 * not put mutable state into class fields. All Compactor class fields should be final or 078 * effectively final. 'keepSeqIdPeriod' is an exception to this rule because unit tests may set it. 079 */ 080@InterfaceAudience.Private 081public abstract class Compactor<T extends CellSink> { 082 private static final Logger LOG = LoggerFactory.getLogger(Compactor.class); 083 protected static final long COMPACTION_PROGRESS_LOG_INTERVAL = 60 * 1000; 084 protected final Configuration conf; 085 protected final HStore store; 086 protected final int compactionKVMax; 087 protected final long compactScannerSizeLimit; 088 protected final Compression.Algorithm majorCompactionCompression; 089 protected final Compression.Algorithm minorCompactionCompression; 090 091 /** specify how many days to keep MVCC values during major compaction **/ 092 protected int keepSeqIdPeriod; 093 094 // Configs that drive whether we drop page cache behind compactions 095 protected static final String MAJOR_COMPACTION_DROP_CACHE = 096 "hbase.regionserver.majorcompaction.pagecache.drop"; 097 protected static final String MINOR_COMPACTION_DROP_CACHE = 098 "hbase.regionserver.minorcompaction.pagecache.drop"; 099 100 protected final boolean dropCacheMajor; 101 protected final boolean dropCacheMinor; 102 103 // We track progress per request using the CompactionRequestImpl identity as key. 104 // completeCompaction() cleans up this state. 105 private final Set<CompactionProgress> progressSet = 106 Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap<>())); 107 108 // TODO: depending on Store is not good but, realistically, all compactors currently do. 109 Compactor(Configuration conf, HStore store) { 110 this.conf = conf; 111 this.store = store; 112 this.compactionKVMax = 113 this.conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT); 114 this.compactScannerSizeLimit = this.conf.getLong(HConstants.COMPACTION_SCANNER_SIZE_MAX, 115 HConstants.COMPACTION_SCANNER_SIZE_MAX_DEFAULT); 116 this.majorCompactionCompression = (store.getColumnFamilyDescriptor() == null) 117 ? Compression.Algorithm.NONE 118 : store.getColumnFamilyDescriptor().getMajorCompactionCompressionType(); 119 this.minorCompactionCompression = (store.getColumnFamilyDescriptor() == null) 120 ? Compression.Algorithm.NONE 121 : store.getColumnFamilyDescriptor().getMinorCompactionCompressionType(); 122 this.keepSeqIdPeriod = 123 Math.max(this.conf.getInt(HConstants.KEEP_SEQID_PERIOD, HConstants.MIN_KEEP_SEQID_PERIOD), 124 HConstants.MIN_KEEP_SEQID_PERIOD); 125 this.dropCacheMajor = conf.getBoolean(MAJOR_COMPACTION_DROP_CACHE, true); 126 this.dropCacheMinor = conf.getBoolean(MINOR_COMPACTION_DROP_CACHE, true); 127 } 128 129 protected interface CellSinkFactory<S> { 130 S createWriter(InternalScanner scanner, FileDetails fd, boolean shouldDropBehind, boolean major, 131 Consumer<Path> writerCreationTracker) throws IOException; 132 } 133 134 /** The sole reason this class exists is that java has no ref/out/pointer parameters. */ 135 protected static class FileDetails { 136 /** Maximum key count after compaction (for blooms) */ 137 public long maxKeyCount = 0; 138 /** Earliest put timestamp if major compaction */ 139 public long earliestPutTs = HConstants.LATEST_TIMESTAMP; 140 /** Latest put timestamp */ 141 public long latestPutTs = HConstants.LATEST_TIMESTAMP; 142 /** The last key in the files we're compacting. */ 143 public long maxSeqId = 0; 144 /** Latest memstore read point found in any of the involved files */ 145 public long maxMVCCReadpoint = 0; 146 /** Max tags length **/ 147 public int maxTagsLength = 0; 148 /** Min SeqId to keep during a major compaction **/ 149 public long minSeqIdToKeep = 0; 150 /** Total size of the compacted files **/ 151 private long totalCompactedFilesSize = 0; 152 } 153 154 /** 155 * Extracts some details about the files to compact that are commonly needed by compactors. 156 * @param filesToCompact Files. 157 * @param allFiles Whether all files are included for compaction 158 * @parma major If major compaction 159 * @return The result. 160 */ 161 private FileDetails getFileDetails(Collection<HStoreFile> filesToCompact, boolean allFiles, 162 boolean major) throws IOException { 163 FileDetails fd = new FileDetails(); 164 long oldestHFileTimestampToKeepMVCC = 165 EnvironmentEdgeManager.currentTime() - (1000L * 60 * 60 * 24 * this.keepSeqIdPeriod); 166 167 for (HStoreFile file : filesToCompact) { 168 if (allFiles && (file.getModificationTimestamp() < oldestHFileTimestampToKeepMVCC)) { 169 // when isAllFiles is true, all files are compacted so we can calculate the smallest 170 // MVCC value to keep 171 if (fd.minSeqIdToKeep < file.getMaxMemStoreTS()) { 172 fd.minSeqIdToKeep = file.getMaxMemStoreTS(); 173 } 174 } 175 long seqNum = file.getMaxSequenceId(); 176 fd.maxSeqId = Math.max(fd.maxSeqId, seqNum); 177 StoreFileReader r = file.getReader(); 178 if (r == null) { 179 LOG.warn("Null reader for " + file.getPath()); 180 continue; 181 } 182 // NOTE: use getEntries when compacting instead of getFilterEntries, otherwise under-sized 183 // blooms can cause progress to be miscalculated or if the user switches bloom 184 // type (e.g. from ROW to ROWCOL) 185 long keyCount = r.getEntries(); 186 fd.maxKeyCount += keyCount; 187 // calculate the latest MVCC readpoint in any of the involved store files 188 Map<byte[], byte[]> fileInfo = r.loadFileInfo(); 189 190 // calculate the total size of the compacted files 191 fd.totalCompactedFilesSize += r.length(); 192 193 byte[] tmp = null; 194 // Get and set the real MVCCReadpoint for bulk loaded files, which is the 195 // SeqId number. 196 if (r.isBulkLoaded()) { 197 fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, r.getSequenceID()); 198 } else { 199 tmp = fileInfo.get(HFile.Writer.MAX_MEMSTORE_TS_KEY); 200 if (tmp != null) { 201 fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, Bytes.toLong(tmp)); 202 } 203 } 204 tmp = fileInfo.get(HFileInfo.MAX_TAGS_LEN); 205 if (tmp != null) { 206 fd.maxTagsLength = Math.max(fd.maxTagsLength, Bytes.toInt(tmp)); 207 } 208 // If required, calculate the earliest put timestamp of all involved storefiles. 209 // This is used to remove family delete marker during compaction. 210 long earliestPutTs = 0; 211 if (allFiles) { 212 tmp = fileInfo.get(EARLIEST_PUT_TS); 213 if (tmp == null) { 214 // There's a file with no information, must be an old one 215 // assume we have very old puts 216 fd.earliestPutTs = earliestPutTs = PrivateConstants.OLDEST_TIMESTAMP; 217 } else { 218 earliestPutTs = Bytes.toLong(tmp); 219 fd.earliestPutTs = Math.min(fd.earliestPutTs, earliestPutTs); 220 } 221 } 222 tmp = fileInfo.get(TIMERANGE_KEY); 223 fd.latestPutTs = 224 tmp == null ? HConstants.LATEST_TIMESTAMP : TimeRangeTracker.parseFrom(tmp).getMax(); 225 LOG.debug( 226 "Compacting {}, keycount={}, bloomtype={}, size={}, " 227 + "encoding={}, compression={}, seqNum={}{}", 228 (file.getPath() == null ? null : file.getPath().getName()), keyCount, 229 r.getBloomFilterType().toString(), TraditionalBinaryPrefix.long2String(r.length(), "", 1), 230 r.getHFileReader().getDataBlockEncoding(), 231 major ? majorCompactionCompression : minorCompactionCompression, seqNum, 232 (allFiles ? ", earliestPutTs=" + earliestPutTs : "")); 233 } 234 return fd; 235 } 236 237 /** 238 * Creates file scanners for compaction. 239 * @param filesToCompact Files. 240 * @return Scanners. 241 */ 242 private List<StoreFileScanner> createFileScanners(Collection<HStoreFile> filesToCompact, 243 long smallestReadPoint, boolean useDropBehind) throws IOException { 244 return StoreFileScanner.getScannersForCompaction(filesToCompact, useDropBehind, 245 smallestReadPoint); 246 } 247 248 private long getSmallestReadPoint() { 249 return store.getSmallestReadPoint(); 250 } 251 252 protected interface InternalScannerFactory { 253 254 ScanType getScanType(CompactionRequestImpl request); 255 256 InternalScanner createScanner(ScanInfo scanInfo, List<StoreFileScanner> scanners, 257 ScanType scanType, FileDetails fd, long smallestReadPoint) throws IOException; 258 } 259 260 protected final InternalScannerFactory defaultScannerFactory = new InternalScannerFactory() { 261 262 @Override 263 public ScanType getScanType(CompactionRequestImpl request) { 264 return request.isAllFiles() ? COMPACT_DROP_DELETES : COMPACT_RETAIN_DELETES; 265 } 266 267 @Override 268 public InternalScanner createScanner(ScanInfo scanInfo, List<StoreFileScanner> scanners, 269 ScanType scanType, FileDetails fd, long smallestReadPoint) throws IOException { 270 return Compactor.this.createScanner(store, scanInfo, scanners, scanType, smallestReadPoint, 271 fd.earliestPutTs); 272 } 273 }; 274 275 protected final CreateStoreFileWriterParams createParams(FileDetails fd, boolean shouldDropBehind, 276 boolean major, Consumer<Path> writerCreationTracker) { 277 return CreateStoreFileWriterParams.create().maxKeyCount(fd.maxKeyCount) 278 .compression(major ? majorCompactionCompression : minorCompactionCompression) 279 .isCompaction(true).includeMVCCReadpoint(fd.maxMVCCReadpoint > 0) 280 .includesTag(fd.maxTagsLength > 0).shouldDropBehind(shouldDropBehind) 281 .totalCompactedFilesSize(fd.totalCompactedFilesSize) 282 .writerCreationTracker(writerCreationTracker); 283 } 284 285 /** 286 * Creates a writer for a new file. 287 * @param fd The file details. 288 * @return Writer for a new StoreFile 289 * @throws IOException if creation failed 290 */ 291 protected final StoreFileWriter createWriter(FileDetails fd, boolean shouldDropBehind, 292 boolean major, Consumer<Path> writerCreationTracker) throws IOException { 293 // When all MVCC readpoints are 0, don't write them. 294 // See HBASE-8166, HBASE-12600, and HBASE-13389. 295 return store.getStoreEngine() 296 .createWriter(createParams(fd, shouldDropBehind, major, writerCreationTracker)); 297 } 298 299 protected final StoreFileWriter createWriter(FileDetails fd, boolean shouldDropBehind, 300 String fileStoragePolicy, boolean major, Consumer<Path> writerCreationTracker) 301 throws IOException { 302 return store.getStoreEngine() 303 .createWriter(createParams(fd, shouldDropBehind, major, writerCreationTracker) 304 .fileStoragePolicy(fileStoragePolicy)); 305 } 306 307 private ScanInfo preCompactScannerOpen(CompactionRequestImpl request, ScanType scanType, 308 User user) throws IOException { 309 if (store.getCoprocessorHost() == null) { 310 return store.getScanInfo(); 311 } 312 return store.getCoprocessorHost().preCompactScannerOpen(store, scanType, request.getTracker(), 313 request, user); 314 } 315 316 /** 317 * Calls coprocessor, if any, to create scanners - after normal scanner creation. 318 * @param request Compaction request. 319 * @param scanType Scan type. 320 * @param scanner The default scanner created for compaction. 321 * @return Scanner scanner to use (usually the default); null if compaction should not proceed. 322 */ 323 private InternalScanner postCompactScannerOpen(CompactionRequestImpl request, ScanType scanType, 324 InternalScanner scanner, User user) throws IOException { 325 if (store.getCoprocessorHost() == null) { 326 return scanner; 327 } 328 return store.getCoprocessorHost().preCompact(store, scanner, scanType, request.getTracker(), 329 request, user); 330 } 331 332 protected final List<Path> compact(final CompactionRequestImpl request, 333 InternalScannerFactory scannerFactory, CellSinkFactory<T> sinkFactory, 334 ThroughputController throughputController, User user) throws IOException { 335 FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles(), request.isMajor()); 336 337 // Find the smallest read point across all the Scanners. 338 long smallestReadPoint = getSmallestReadPoint(); 339 340 boolean dropCache; 341 if (request.isMajor() || request.isAllFiles()) { 342 dropCache = this.dropCacheMajor; 343 } else { 344 dropCache = this.dropCacheMinor; 345 } 346 347 InternalScanner scanner = null; 348 boolean finished = false; 349 List<StoreFileScanner> scanners = 350 createFileScanners(request.getFiles(), smallestReadPoint, dropCache); 351 T writer = null; 352 CompactionProgress progress = new CompactionProgress(fd.maxKeyCount); 353 progressSet.add(progress); 354 try { 355 /* Include deletes, unless we are doing a major compaction */ 356 ScanType scanType = scannerFactory.getScanType(request); 357 ScanInfo scanInfo = preCompactScannerOpen(request, scanType, user); 358 scanner = postCompactScannerOpen(request, scanType, 359 scannerFactory.createScanner(scanInfo, scanners, scanType, fd, smallestReadPoint), user); 360 boolean cleanSeqId = false; 361 if (fd.minSeqIdToKeep > 0 && !store.getColumnFamilyDescriptor().isNewVersionBehavior()) { 362 // For mvcc-sensitive family, we never set mvcc to 0. 363 smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint); 364 cleanSeqId = true; 365 } 366 writer = sinkFactory.createWriter(scanner, fd, dropCache, request.isMajor(), 367 request.getWriterCreationTracker()); 368 finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId, 369 throughputController, request, progress); 370 if (!finished) { 371 throw new InterruptedIOException("Aborting compaction of store " + store + " in region " 372 + store.getRegionInfo().getRegionNameAsString() + " because it was interrupted."); 373 } 374 } finally { 375 // createScanner may fail when seeking hfiles encounter Exception, e.g. even only one hfile 376 // reader encounters java.io.IOException: Invalid HFile block magic: 377 // \x00\x00\x00\x00\x00\x00\x00\x00 378 // and then scanner will be null, but scanners for all the hfiles should be closed, 379 // or else we will find leak of ESTABLISHED sockets. 380 if (scanner == null) { 381 for (StoreFileScanner sfs : scanners) { 382 sfs.close(); 383 } 384 } else { 385 Closeables.close(scanner, true); 386 } 387 if (!finished) { 388 if (writer != null) { 389 abortWriter(writer); 390 } 391 } else { 392 store.updateCompactedMetrics(request.isMajor(), progress); 393 } 394 progressSet.remove(progress); 395 } 396 assert finished : "We should have exited the method on all error paths"; 397 assert writer != null : "Writer should be non-null if no error"; 398 return commitWriter(writer, fd, request); 399 } 400 401 protected abstract List<Path> commitWriter(T writer, FileDetails fd, 402 CompactionRequestImpl request) throws IOException; 403 404 protected abstract void abortWriter(T writer) throws IOException; 405 406 /** 407 * Performs the compaction. 408 * @param fd FileDetails of cell sink writer 409 * @param scanner Where to read from. 410 * @param writer Where to write to. 411 * @param smallestReadPoint Smallest read point. 412 * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= 413 * smallestReadPoint 414 * @param request compaction request. 415 * @param progress Progress reporter. 416 * @return Whether compaction ended; false if it was interrupted for some reason. 417 */ 418 protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, 419 long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController, 420 CompactionRequestImpl request, CompactionProgress progress) throws IOException { 421 assert writer instanceof ShipperListener; 422 long bytesWrittenProgressForLog = 0; 423 long bytesWrittenProgressForShippedCall = 0; 424 // Since scanner.next() can return 'false' but still be delivering data, 425 // we have to use a do/while loop. 426 List<ExtendedCell> cells = new ArrayList<>(); 427 long currentTime = EnvironmentEdgeManager.currentTime(); 428 long lastMillis = 0; 429 if (LOG.isDebugEnabled()) { 430 lastMillis = currentTime; 431 } 432 CloseChecker closeChecker = new CloseChecker(conf, currentTime); 433 String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction"); 434 long now = 0; 435 boolean hasMore; 436 ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(compactionKVMax) 437 .setSizeLimit(ScannerContext.LimitScope.BETWEEN_CELLS, Long.MAX_VALUE, Long.MAX_VALUE, 438 compactScannerSizeLimit) 439 .build(); 440 441 throughputController.start(compactionName); 442 Shipper shipper = (scanner instanceof Shipper) ? (Shipper) scanner : null; 443 long shippedCallSizeLimit = 444 (long) request.getFiles().size() * this.store.getColumnFamilyDescriptor().getBlocksize(); 445 try { 446 do { 447 // InternalScanner is for CPs so we do not want to leak ExtendedCell to the interface, but 448 // all the server side implementation should only add ExtendedCell to the List, otherwise it 449 // will cause serious assertions in our code 450 hasMore = scanner.next((List) cells, scannerContext); 451 currentTime = EnvironmentEdgeManager.currentTime(); 452 if (LOG.isDebugEnabled()) { 453 now = currentTime; 454 } 455 if (closeChecker.isTimeLimit(store, currentTime)) { 456 progress.cancel(); 457 return false; 458 } 459 // output to writer: 460 Cell lastCleanCell = null; 461 long lastCleanCellSeqId = 0; 462 for (ExtendedCell c : cells) { 463 if (cleanSeqId && c.getSequenceId() <= smallestReadPoint) { 464 lastCleanCell = c; 465 lastCleanCellSeqId = c.getSequenceId(); 466 PrivateCellUtil.setSequenceId(c, 0); 467 } else { 468 lastCleanCell = null; 469 lastCleanCellSeqId = 0; 470 } 471 int len = c.getSerializedSize(); 472 ++progress.currentCompactedKVs; 473 progress.totalCompactedSize += len; 474 bytesWrittenProgressForShippedCall += len; 475 if (LOG.isDebugEnabled()) { 476 bytesWrittenProgressForLog += len; 477 } 478 throughputController.control(compactionName, len); 479 if (closeChecker.isSizeLimit(store, len)) { 480 progress.cancel(); 481 return false; 482 } 483 } 484 writer.appendAll(cells); 485 if (shipper != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) { 486 if (lastCleanCell != null) { 487 // HBASE-16931, set back sequence id to avoid affecting scan order unexpectedly. 488 // ShipperListener will do a clone of the last cells it refer, so need to set back 489 // sequence id before ShipperListener.beforeShipped 490 PrivateCellUtil.setSequenceId(lastCleanCell, lastCleanCellSeqId); 491 } 492 // Clone the cells that are in the writer so that they are freed of references, 493 // if they are holding any. 494 ((ShipperListener) writer).beforeShipped(); 495 // The SHARED block references, being read for compaction, will be kept in prevBlocks 496 // list(See HFileScannerImpl#prevBlocks). In case of scan flow, after each set of cells 497 // being returned to client, we will call shipped() which can clear this list. Here by 498 // we are doing the similar thing. In between the compaction (after every N cells 499 // written with collective size of 'shippedCallSizeLimit') we will call shipped which 500 // may clear prevBlocks list. 501 shipper.shipped(); 502 bytesWrittenProgressForShippedCall = 0; 503 } 504 if (lastCleanCell != null) { 505 // HBASE-16931, set back sequence id to avoid affecting scan order unexpectedly 506 PrivateCellUtil.setSequenceId(lastCleanCell, lastCleanCellSeqId); 507 } 508 // Log the progress of long running compactions every minute if 509 // logging at DEBUG level 510 if (LOG.isDebugEnabled()) { 511 if ((now - lastMillis) >= COMPACTION_PROGRESS_LOG_INTERVAL) { 512 String rate = String.format("%.2f", 513 (bytesWrittenProgressForLog / 1024.0) / ((now - lastMillis) / 1000.0)); 514 LOG.debug("Compaction progress: {} {}, rate={} KB/sec, throughputController is {}", 515 compactionName, progress, rate, throughputController); 516 lastMillis = now; 517 bytesWrittenProgressForLog = 0; 518 } 519 } 520 cells.clear(); 521 } while (hasMore); 522 } catch (InterruptedException e) { 523 progress.cancel(); 524 throw new InterruptedIOException( 525 "Interrupted while control throughput of compacting " + compactionName); 526 } finally { 527 // Clone last cell in the final because writer will append last cell when committing. If 528 // don't clone here and once the scanner get closed, then the memory of last cell will be 529 // released. (HBASE-22582) 530 ((ShipperListener) writer).beforeShipped(); 531 throughputController.finish(compactionName); 532 } 533 progress.complete(); 534 return true; 535 } 536 537 /** 538 * @param store store 539 * @param scanners Store file scanners. 540 * @param scanType Scan type. 541 * @param smallestReadPoint Smallest MVCC read point. 542 * @param earliestPutTs Earliest put across all files. 543 * @return A compaction scanner. 544 */ 545 protected InternalScanner createScanner(HStore store, ScanInfo scanInfo, 546 List<StoreFileScanner> scanners, ScanType scanType, long smallestReadPoint, long earliestPutTs) 547 throws IOException { 548 return new StoreScanner(store, scanInfo, scanners, scanType, smallestReadPoint, earliestPutTs); 549 } 550 551 /** 552 * @param store The store. 553 * @param scanners Store file scanners. 554 * @param smallestReadPoint Smallest MVCC read point. 555 * @param earliestPutTs Earliest put across all files. 556 * @param dropDeletesFromRow Drop deletes starting with this row, inclusive. Can be null. 557 * @param dropDeletesToRow Drop deletes ending with this row, exclusive. Can be null. 558 * @return A compaction scanner. 559 */ 560 protected InternalScanner createScanner(HStore store, ScanInfo scanInfo, 561 List<StoreFileScanner> scanners, long smallestReadPoint, long earliestPutTs, 562 byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException { 563 return new StoreScanner(store, scanInfo, scanners, smallestReadPoint, earliestPutTs, 564 dropDeletesFromRow, dropDeletesToRow); 565 } 566 567 /** 568 * Return the aggregate progress for all currently active compactions. 569 */ 570 public CompactionProgress getProgress() { 571 synchronized (progressSet) { 572 long totalCompactingKVs = 0; 573 long currentCompactedKVs = 0; 574 long totalCompactedSize = 0; 575 for (CompactionProgress progress : progressSet) { 576 totalCompactingKVs += progress.totalCompactingKVs; 577 currentCompactedKVs += progress.currentCompactedKVs; 578 totalCompactedSize += progress.totalCompactedSize; 579 } 580 CompactionProgress result = new CompactionProgress(totalCompactingKVs); 581 result.currentCompactedKVs = currentCompactedKVs; 582 result.totalCompactedSize = totalCompactedSize; 583 return result; 584 } 585 } 586 587 public boolean isCompacting() { 588 return !progressSet.isEmpty(); 589 } 590}