001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.compactions;
019
020import static org.apache.hadoop.hbase.regionserver.HStoreFile.EARLIEST_PUT_TS;
021import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY;
022import static org.apache.hadoop.hbase.regionserver.ScanType.COMPACT_DROP_DELETES;
023import static org.apache.hadoop.hbase.regionserver.ScanType.COMPACT_RETAIN_DELETES;
024
025import java.io.IOException;
026import java.io.InterruptedIOException;
027import java.util.ArrayList;
028import java.util.Collection;
029import java.util.Collections;
030import java.util.IdentityHashMap;
031import java.util.List;
032import java.util.Map;
033import java.util.Set;
034import java.util.function.Consumer;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.hbase.Cell;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.PrivateCellUtil;
040import org.apache.hadoop.hbase.PrivateConstants;
041import org.apache.hadoop.hbase.io.compress.Compression;
042import org.apache.hadoop.hbase.io.hfile.HFile;
043import org.apache.hadoop.hbase.io.hfile.HFileInfo;
044import org.apache.hadoop.hbase.regionserver.CellSink;
045import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams;
046import org.apache.hadoop.hbase.regionserver.HStore;
047import org.apache.hadoop.hbase.regionserver.HStoreFile;
048import org.apache.hadoop.hbase.regionserver.InternalScanner;
049import org.apache.hadoop.hbase.regionserver.ScanInfo;
050import org.apache.hadoop.hbase.regionserver.ScanType;
051import org.apache.hadoop.hbase.regionserver.ScannerContext;
052import org.apache.hadoop.hbase.regionserver.Shipper;
053import org.apache.hadoop.hbase.regionserver.ShipperListener;
054import org.apache.hadoop.hbase.regionserver.StoreFileReader;
055import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
056import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
057import org.apache.hadoop.hbase.regionserver.StoreScanner;
058import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
059import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
060import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
061import org.apache.hadoop.hbase.security.User;
062import org.apache.hadoop.hbase.util.Bytes;
063import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
064import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
065import org.apache.yetus.audience.InterfaceAudience;
066import org.slf4j.Logger;
067import org.slf4j.LoggerFactory;
068
069import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
070
071/**
072 * A compactor is a compaction algorithm associated a given policy. Base class also contains
073 * reusable parts for implementing compactors (what is common and what isn't is evolving).
074 * <p>
075 * Compactions might be concurrent against a given store and the Compactor is shared among them. Do
076 * not put mutable state into class fields. All Compactor class fields should be final or
077 * effectively final. 'keepSeqIdPeriod' is an exception to this rule because unit tests may set it.
078 */
079@InterfaceAudience.Private
080public abstract class Compactor<T extends CellSink> {
081  private static final Logger LOG = LoggerFactory.getLogger(Compactor.class);
082  protected static final long COMPACTION_PROGRESS_LOG_INTERVAL = 60 * 1000;
083  protected final Configuration conf;
084  protected final HStore store;
085  protected final int compactionKVMax;
086  protected final long compactScannerSizeLimit;
087  protected final Compression.Algorithm majorCompactionCompression;
088  protected final Compression.Algorithm minorCompactionCompression;
089
090  /** specify how many days to keep MVCC values during major compaction **/
091  protected int keepSeqIdPeriod;
092
093  // Configs that drive whether we drop page cache behind compactions
094  protected static final String MAJOR_COMPACTION_DROP_CACHE =
095    "hbase.regionserver.majorcompaction.pagecache.drop";
096  protected static final String MINOR_COMPACTION_DROP_CACHE =
097    "hbase.regionserver.minorcompaction.pagecache.drop";
098
099  protected final boolean dropCacheMajor;
100  protected final boolean dropCacheMinor;
101
102  // We track progress per request using the CompactionRequestImpl identity as key.
103  // completeCompaction() cleans up this state.
104  private final Set<CompactionProgress> progressSet =
105    Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap<>()));
106
107  // TODO: depending on Store is not good but, realistically, all compactors currently do.
108  Compactor(Configuration conf, HStore store) {
109    this.conf = conf;
110    this.store = store;
111    this.compactionKVMax =
112      this.conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
113    this.compactScannerSizeLimit = this.conf.getLong(HConstants.COMPACTION_SCANNER_SIZE_MAX,
114      HConstants.COMPACTION_SCANNER_SIZE_MAX_DEFAULT);
115    this.majorCompactionCompression = (store.getColumnFamilyDescriptor() == null)
116      ? Compression.Algorithm.NONE
117      : store.getColumnFamilyDescriptor().getMajorCompactionCompressionType();
118    this.minorCompactionCompression = (store.getColumnFamilyDescriptor() == null)
119      ? Compression.Algorithm.NONE
120      : store.getColumnFamilyDescriptor().getMinorCompactionCompressionType();
121    this.keepSeqIdPeriod =
122      Math.max(this.conf.getInt(HConstants.KEEP_SEQID_PERIOD, HConstants.MIN_KEEP_SEQID_PERIOD),
123        HConstants.MIN_KEEP_SEQID_PERIOD);
124    this.dropCacheMajor = conf.getBoolean(MAJOR_COMPACTION_DROP_CACHE, true);
125    this.dropCacheMinor = conf.getBoolean(MINOR_COMPACTION_DROP_CACHE, true);
126  }
127
128  protected interface CellSinkFactory<S> {
129    S createWriter(InternalScanner scanner, FileDetails fd, boolean shouldDropBehind, boolean major,
130      Consumer<Path> writerCreationTracker) throws IOException;
131  }
132
133  /** The sole reason this class exists is that java has no ref/out/pointer parameters. */
134  protected static class FileDetails {
135    /** Maximum key count after compaction (for blooms) */
136    public long maxKeyCount = 0;
137    /** Earliest put timestamp if major compaction */
138    public long earliestPutTs = HConstants.LATEST_TIMESTAMP;
139    /** Latest put timestamp */
140    public long latestPutTs = HConstants.LATEST_TIMESTAMP;
141    /** The last key in the files we're compacting. */
142    public long maxSeqId = 0;
143    /** Latest memstore read point found in any of the involved files */
144    public long maxMVCCReadpoint = 0;
145    /** Max tags length **/
146    public int maxTagsLength = 0;
147    /** Min SeqId to keep during a major compaction **/
148    public long minSeqIdToKeep = 0;
149    /** Total size of the compacted files **/
150    private long totalCompactedFilesSize = 0;
151  }
152
153  /**
154   * Extracts some details about the files to compact that are commonly needed by compactors.
155   * @param filesToCompact Files.
156   * @param allFiles       Whether all files are included for compaction
157   * @parma major If major compaction
158   * @return The result.
159   */
160  private FileDetails getFileDetails(Collection<HStoreFile> filesToCompact, boolean allFiles,
161    boolean major) throws IOException {
162    FileDetails fd = new FileDetails();
163    long oldestHFileTimestampToKeepMVCC =
164      EnvironmentEdgeManager.currentTime() - (1000L * 60 * 60 * 24 * this.keepSeqIdPeriod);
165
166    for (HStoreFile file : filesToCompact) {
167      if (allFiles && (file.getModificationTimestamp() < oldestHFileTimestampToKeepMVCC)) {
168        // when isAllFiles is true, all files are compacted so we can calculate the smallest
169        // MVCC value to keep
170        if (fd.minSeqIdToKeep < file.getMaxMemStoreTS()) {
171          fd.minSeqIdToKeep = file.getMaxMemStoreTS();
172        }
173      }
174      long seqNum = file.getMaxSequenceId();
175      fd.maxSeqId = Math.max(fd.maxSeqId, seqNum);
176      StoreFileReader r = file.getReader();
177      if (r == null) {
178        LOG.warn("Null reader for " + file.getPath());
179        continue;
180      }
181      // NOTE: use getEntries when compacting instead of getFilterEntries, otherwise under-sized
182      // blooms can cause progress to be miscalculated or if the user switches bloom
183      // type (e.g. from ROW to ROWCOL)
184      long keyCount = r.getEntries();
185      fd.maxKeyCount += keyCount;
186      // calculate the latest MVCC readpoint in any of the involved store files
187      Map<byte[], byte[]> fileInfo = r.loadFileInfo();
188
189      // calculate the total size of the compacted files
190      fd.totalCompactedFilesSize += r.length();
191
192      byte[] tmp = null;
193      // Get and set the real MVCCReadpoint for bulk loaded files, which is the
194      // SeqId number.
195      if (r.isBulkLoaded()) {
196        fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, r.getSequenceID());
197      } else {
198        tmp = fileInfo.get(HFile.Writer.MAX_MEMSTORE_TS_KEY);
199        if (tmp != null) {
200          fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, Bytes.toLong(tmp));
201        }
202      }
203      tmp = fileInfo.get(HFileInfo.MAX_TAGS_LEN);
204      if (tmp != null) {
205        fd.maxTagsLength = Math.max(fd.maxTagsLength, Bytes.toInt(tmp));
206      }
207      // If required, calculate the earliest put timestamp of all involved storefiles.
208      // This is used to remove family delete marker during compaction.
209      long earliestPutTs = 0;
210      if (allFiles) {
211        tmp = fileInfo.get(EARLIEST_PUT_TS);
212        if (tmp == null) {
213          // There's a file with no information, must be an old one
214          // assume we have very old puts
215          fd.earliestPutTs = earliestPutTs = PrivateConstants.OLDEST_TIMESTAMP;
216        } else {
217          earliestPutTs = Bytes.toLong(tmp);
218          fd.earliestPutTs = Math.min(fd.earliestPutTs, earliestPutTs);
219        }
220      }
221      tmp = fileInfo.get(TIMERANGE_KEY);
222      fd.latestPutTs =
223        tmp == null ? HConstants.LATEST_TIMESTAMP : TimeRangeTracker.parseFrom(tmp).getMax();
224      LOG.debug(
225        "Compacting {}, keycount={}, bloomtype={}, size={}, "
226          + "encoding={}, compression={}, seqNum={}{}",
227        (file.getPath() == null ? null : file.getPath().getName()), keyCount,
228        r.getBloomFilterType().toString(), TraditionalBinaryPrefix.long2String(r.length(), "", 1),
229        r.getHFileReader().getDataBlockEncoding(),
230        major ? majorCompactionCompression : minorCompactionCompression, seqNum,
231        (allFiles ? ", earliestPutTs=" + earliestPutTs : ""));
232    }
233    return fd;
234  }
235
236  /**
237   * Creates file scanners for compaction.
238   * @param filesToCompact Files.
239   * @return Scanners.
240   */
241  private List<StoreFileScanner> createFileScanners(Collection<HStoreFile> filesToCompact,
242    long smallestReadPoint, boolean useDropBehind) throws IOException {
243    return StoreFileScanner.getScannersForCompaction(filesToCompact, useDropBehind,
244      smallestReadPoint);
245  }
246
247  private long getSmallestReadPoint() {
248    return store.getSmallestReadPoint();
249  }
250
251  protected interface InternalScannerFactory {
252
253    ScanType getScanType(CompactionRequestImpl request);
254
255    InternalScanner createScanner(ScanInfo scanInfo, List<StoreFileScanner> scanners,
256      ScanType scanType, FileDetails fd, long smallestReadPoint) throws IOException;
257  }
258
259  protected final InternalScannerFactory defaultScannerFactory = new InternalScannerFactory() {
260
261    @Override
262    public ScanType getScanType(CompactionRequestImpl request) {
263      return request.isAllFiles() ? COMPACT_DROP_DELETES : COMPACT_RETAIN_DELETES;
264    }
265
266    @Override
267    public InternalScanner createScanner(ScanInfo scanInfo, List<StoreFileScanner> scanners,
268      ScanType scanType, FileDetails fd, long smallestReadPoint) throws IOException {
269      return Compactor.this.createScanner(store, scanInfo, scanners, scanType, smallestReadPoint,
270        fd.earliestPutTs);
271    }
272  };
273
274  protected final CreateStoreFileWriterParams createParams(FileDetails fd, boolean shouldDropBehind,
275    boolean major, Consumer<Path> writerCreationTracker) {
276    return CreateStoreFileWriterParams.create().maxKeyCount(fd.maxKeyCount)
277      .compression(major ? majorCompactionCompression : minorCompactionCompression)
278      .isCompaction(true).includeMVCCReadpoint(fd.maxMVCCReadpoint > 0)
279      .includesTag(fd.maxTagsLength > 0).shouldDropBehind(shouldDropBehind)
280      .totalCompactedFilesSize(fd.totalCompactedFilesSize)
281      .writerCreationTracker(writerCreationTracker);
282  }
283
284  /**
285   * Creates a writer for a new file.
286   * @param fd The file details.
287   * @return Writer for a new StoreFile
288   * @throws IOException if creation failed
289   */
290  protected final StoreFileWriter createWriter(FileDetails fd, boolean shouldDropBehind,
291    boolean major, Consumer<Path> writerCreationTracker) throws IOException {
292    // When all MVCC readpoints are 0, don't write them.
293    // See HBASE-8166, HBASE-12600, and HBASE-13389.
294    return store.getStoreEngine()
295      .createWriter(createParams(fd, shouldDropBehind, major, writerCreationTracker));
296  }
297
298  protected final StoreFileWriter createWriter(FileDetails fd, boolean shouldDropBehind,
299    String fileStoragePolicy, boolean major, Consumer<Path> writerCreationTracker)
300    throws IOException {
301    return store.getStoreEngine()
302      .createWriter(createParams(fd, shouldDropBehind, major, writerCreationTracker)
303        .fileStoragePolicy(fileStoragePolicy));
304  }
305
306  private ScanInfo preCompactScannerOpen(CompactionRequestImpl request, ScanType scanType,
307    User user) throws IOException {
308    if (store.getCoprocessorHost() == null) {
309      return store.getScanInfo();
310    }
311    return store.getCoprocessorHost().preCompactScannerOpen(store, scanType, request.getTracker(),
312      request, user);
313  }
314
315  /**
316   * Calls coprocessor, if any, to create scanners - after normal scanner creation.
317   * @param request  Compaction request.
318   * @param scanType Scan type.
319   * @param scanner  The default scanner created for compaction.
320   * @return Scanner scanner to use (usually the default); null if compaction should not proceed.
321   */
322  private InternalScanner postCompactScannerOpen(CompactionRequestImpl request, ScanType scanType,
323    InternalScanner scanner, User user) throws IOException {
324    if (store.getCoprocessorHost() == null) {
325      return scanner;
326    }
327    return store.getCoprocessorHost().preCompact(store, scanner, scanType, request.getTracker(),
328      request, user);
329  }
330
331  protected final List<Path> compact(final CompactionRequestImpl request,
332    InternalScannerFactory scannerFactory, CellSinkFactory<T> sinkFactory,
333    ThroughputController throughputController, User user) throws IOException {
334    FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles(), request.isMajor());
335
336    // Find the smallest read point across all the Scanners.
337    long smallestReadPoint = getSmallestReadPoint();
338
339    boolean dropCache;
340    if (request.isMajor() || request.isAllFiles()) {
341      dropCache = this.dropCacheMajor;
342    } else {
343      dropCache = this.dropCacheMinor;
344    }
345
346    InternalScanner scanner = null;
347    boolean finished = false;
348    List<StoreFileScanner> scanners =
349      createFileScanners(request.getFiles(), smallestReadPoint, dropCache);
350    T writer = null;
351    CompactionProgress progress = new CompactionProgress(fd.maxKeyCount);
352    progressSet.add(progress);
353    try {
354      /* Include deletes, unless we are doing a major compaction */
355      ScanType scanType = scannerFactory.getScanType(request);
356      ScanInfo scanInfo = preCompactScannerOpen(request, scanType, user);
357      scanner = postCompactScannerOpen(request, scanType,
358        scannerFactory.createScanner(scanInfo, scanners, scanType, fd, smallestReadPoint), user);
359      boolean cleanSeqId = false;
360      if (fd.minSeqIdToKeep > 0 && !store.getColumnFamilyDescriptor().isNewVersionBehavior()) {
361        // For mvcc-sensitive family, we never set mvcc to 0.
362        smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);
363        cleanSeqId = true;
364      }
365      writer = sinkFactory.createWriter(scanner, fd, dropCache, request.isMajor(),
366        request.getWriterCreationTracker());
367      finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId,
368        throughputController, request, progress);
369      if (!finished) {
370        throw new InterruptedIOException("Aborting compaction of store " + store + " in region "
371          + store.getRegionInfo().getRegionNameAsString() + " because it was interrupted.");
372      }
373    } finally {
374      // createScanner may fail when seeking hfiles encounter Exception, e.g. even only one hfile
375      // reader encounters java.io.IOException: Invalid HFile block magic:
376      // \x00\x00\x00\x00\x00\x00\x00\x00
377      // and then scanner will be null, but scanners for all the hfiles should be closed,
378      // or else we will find leak of ESTABLISHED sockets.
379      if (scanner == null) {
380        for (StoreFileScanner sfs : scanners) {
381          sfs.close();
382        }
383      } else {
384        Closeables.close(scanner, true);
385      }
386      if (!finished) {
387        if (writer != null) {
388          abortWriter(writer);
389        }
390      } else {
391        store.updateCompactedMetrics(request.isMajor(), progress);
392      }
393      progressSet.remove(progress);
394    }
395    assert finished : "We should have exited the method on all error paths";
396    assert writer != null : "Writer should be non-null if no error";
397    return commitWriter(writer, fd, request);
398  }
399
400  protected abstract List<Path> commitWriter(T writer, FileDetails fd,
401    CompactionRequestImpl request) throws IOException;
402
403  protected abstract void abortWriter(T writer) throws IOException;
404
405  /**
406   * Performs the compaction.
407   * @param fd                FileDetails of cell sink writer
408   * @param scanner           Where to read from.
409   * @param writer            Where to write to.
410   * @param smallestReadPoint Smallest read point.
411   * @param cleanSeqId        When true, remove seqId(used to be mvcc) value which is &lt;=
412   *                          smallestReadPoint
413   * @param request           compaction request.
414   * @param progress          Progress reporter.
415   * @return Whether compaction ended; false if it was interrupted for some reason.
416   */
417  protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
418    long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
419    CompactionRequestImpl request, CompactionProgress progress) throws IOException {
420    assert writer instanceof ShipperListener;
421    long bytesWrittenProgressForLog = 0;
422    long bytesWrittenProgressForShippedCall = 0;
423    // Since scanner.next() can return 'false' but still be delivering data,
424    // we have to use a do/while loop.
425    List<Cell> cells = new ArrayList<>();
426    long currentTime = EnvironmentEdgeManager.currentTime();
427    long lastMillis = 0;
428    if (LOG.isDebugEnabled()) {
429      lastMillis = currentTime;
430    }
431    CloseChecker closeChecker = new CloseChecker(conf, currentTime);
432    String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction");
433    long now = 0;
434    boolean hasMore;
435    ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(compactionKVMax)
436      .setSizeLimit(ScannerContext.LimitScope.BETWEEN_CELLS, Long.MAX_VALUE, Long.MAX_VALUE,
437        compactScannerSizeLimit)
438      .build();
439
440    throughputController.start(compactionName);
441    Shipper shipper = (scanner instanceof Shipper) ? (Shipper) scanner : null;
442    long shippedCallSizeLimit =
443      (long) request.getFiles().size() * this.store.getColumnFamilyDescriptor().getBlocksize();
444    try {
445      do {
446        hasMore = scanner.next(cells, scannerContext);
447        currentTime = EnvironmentEdgeManager.currentTime();
448        if (LOG.isDebugEnabled()) {
449          now = currentTime;
450        }
451        if (closeChecker.isTimeLimit(store, currentTime)) {
452          progress.cancel();
453          return false;
454        }
455        // output to writer:
456        Cell lastCleanCell = null;
457        long lastCleanCellSeqId = 0;
458        for (Cell c : cells) {
459          if (cleanSeqId && c.getSequenceId() <= smallestReadPoint) {
460            lastCleanCell = c;
461            lastCleanCellSeqId = c.getSequenceId();
462            PrivateCellUtil.setSequenceId(c, 0);
463          } else {
464            lastCleanCell = null;
465            lastCleanCellSeqId = 0;
466          }
467          writer.append(c);
468          int len = c.getSerializedSize();
469          ++progress.currentCompactedKVs;
470          progress.totalCompactedSize += len;
471          bytesWrittenProgressForShippedCall += len;
472          if (LOG.isDebugEnabled()) {
473            bytesWrittenProgressForLog += len;
474          }
475          throughputController.control(compactionName, len);
476          if (closeChecker.isSizeLimit(store, len)) {
477            progress.cancel();
478            return false;
479          }
480        }
481        if (shipper != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) {
482          if (lastCleanCell != null) {
483            // HBASE-16931, set back sequence id to avoid affecting scan order unexpectedly.
484            // ShipperListener will do a clone of the last cells it refer, so need to set back
485            // sequence id before ShipperListener.beforeShipped
486            PrivateCellUtil.setSequenceId(lastCleanCell, lastCleanCellSeqId);
487          }
488          // Clone the cells that are in the writer so that they are freed of references,
489          // if they are holding any.
490          ((ShipperListener) writer).beforeShipped();
491          // The SHARED block references, being read for compaction, will be kept in prevBlocks
492          // list(See HFileScannerImpl#prevBlocks). In case of scan flow, after each set of cells
493          // being returned to client, we will call shipped() which can clear this list. Here by
494          // we are doing the similar thing. In between the compaction (after every N cells
495          // written with collective size of 'shippedCallSizeLimit') we will call shipped which
496          // may clear prevBlocks list.
497          shipper.shipped();
498          bytesWrittenProgressForShippedCall = 0;
499        }
500        if (lastCleanCell != null) {
501          // HBASE-16931, set back sequence id to avoid affecting scan order unexpectedly
502          PrivateCellUtil.setSequenceId(lastCleanCell, lastCleanCellSeqId);
503        }
504        // Log the progress of long running compactions every minute if
505        // logging at DEBUG level
506        if (LOG.isDebugEnabled()) {
507          if ((now - lastMillis) >= COMPACTION_PROGRESS_LOG_INTERVAL) {
508            String rate = String.format("%.2f",
509              (bytesWrittenProgressForLog / 1024.0) / ((now - lastMillis) / 1000.0));
510            LOG.debug("Compaction progress: {} {}, rate={} KB/sec, throughputController is {}",
511              compactionName, progress, rate, throughputController);
512            lastMillis = now;
513            bytesWrittenProgressForLog = 0;
514          }
515        }
516        cells.clear();
517      } while (hasMore);
518    } catch (InterruptedException e) {
519      progress.cancel();
520      throw new InterruptedIOException(
521        "Interrupted while control throughput of compacting " + compactionName);
522    } finally {
523      // Clone last cell in the final because writer will append last cell when committing. If
524      // don't clone here and once the scanner get closed, then the memory of last cell will be
525      // released. (HBASE-22582)
526      ((ShipperListener) writer).beforeShipped();
527      throughputController.finish(compactionName);
528    }
529    progress.complete();
530    return true;
531  }
532
533  /**
534   * @param store             store
535   * @param scanners          Store file scanners.
536   * @param scanType          Scan type.
537   * @param smallestReadPoint Smallest MVCC read point.
538   * @param earliestPutTs     Earliest put across all files.
539   * @return A compaction scanner.
540   */
541  protected InternalScanner createScanner(HStore store, ScanInfo scanInfo,
542    List<StoreFileScanner> scanners, ScanType scanType, long smallestReadPoint, long earliestPutTs)
543    throws IOException {
544    return new StoreScanner(store, scanInfo, scanners, scanType, smallestReadPoint, earliestPutTs);
545  }
546
547  /**
548   * @param store              The store.
549   * @param scanners           Store file scanners.
550   * @param smallestReadPoint  Smallest MVCC read point.
551   * @param earliestPutTs      Earliest put across all files.
552   * @param dropDeletesFromRow Drop deletes starting with this row, inclusive. Can be null.
553   * @param dropDeletesToRow   Drop deletes ending with this row, exclusive. Can be null.
554   * @return A compaction scanner.
555   */
556  protected InternalScanner createScanner(HStore store, ScanInfo scanInfo,
557    List<StoreFileScanner> scanners, long smallestReadPoint, long earliestPutTs,
558    byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
559    return new StoreScanner(store, scanInfo, scanners, smallestReadPoint, earliestPutTs,
560      dropDeletesFromRow, dropDeletesToRow);
561  }
562
563  /**
564   * Return the aggregate progress for all currently active compactions.
565   */
566  public CompactionProgress getProgress() {
567    synchronized (progressSet) {
568      long totalCompactingKVs = 0;
569      long currentCompactedKVs = 0;
570      long totalCompactedSize = 0;
571      for (CompactionProgress progress : progressSet) {
572        totalCompactingKVs += progress.totalCompactingKVs;
573        currentCompactedKVs += progress.currentCompactedKVs;
574        totalCompactedSize += progress.totalCompactedSize;
575      }
576      CompactionProgress result = new CompactionProgress(totalCompactingKVs);
577      result.currentCompactedKVs = currentCompactedKVs;
578      result.totalCompactedSize = totalCompactedSize;
579      return result;
580    }
581  }
582
583  public boolean isCompacting() {
584    return !progressSet.isEmpty();
585  }
586}