001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.compactions;
019
020import static org.apache.hadoop.hbase.regionserver.HStoreFile.EARLIEST_PUT_TS;
021import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY;
022import static org.apache.hadoop.hbase.regionserver.ScanType.COMPACT_DROP_DELETES;
023import static org.apache.hadoop.hbase.regionserver.ScanType.COMPACT_RETAIN_DELETES;
024
025import java.io.IOException;
026import java.io.InterruptedIOException;
027import java.util.ArrayList;
028import java.util.Collection;
029import java.util.Collections;
030import java.util.IdentityHashMap;
031import java.util.List;
032import java.util.Map;
033import java.util.Set;
034import java.util.function.Consumer;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.hbase.Cell;
038import org.apache.hadoop.hbase.ExtendedCell;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.PrivateCellUtil;
041import org.apache.hadoop.hbase.PrivateConstants;
042import org.apache.hadoop.hbase.io.compress.Compression;
043import org.apache.hadoop.hbase.io.hfile.HFile;
044import org.apache.hadoop.hbase.io.hfile.HFileInfo;
045import org.apache.hadoop.hbase.regionserver.CellSink;
046import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams;
047import org.apache.hadoop.hbase.regionserver.HStore;
048import org.apache.hadoop.hbase.regionserver.HStoreFile;
049import org.apache.hadoop.hbase.regionserver.InternalScanner;
050import org.apache.hadoop.hbase.regionserver.ScanInfo;
051import org.apache.hadoop.hbase.regionserver.ScanType;
052import org.apache.hadoop.hbase.regionserver.ScannerContext;
053import org.apache.hadoop.hbase.regionserver.Shipper;
054import org.apache.hadoop.hbase.regionserver.ShipperListener;
055import org.apache.hadoop.hbase.regionserver.StoreFileReader;
056import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
057import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
058import org.apache.hadoop.hbase.regionserver.StoreScanner;
059import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
060import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
061import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
062import org.apache.hadoop.hbase.security.User;
063import org.apache.hadoop.hbase.util.Bytes;
064import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
065import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
066import org.apache.yetus.audience.InterfaceAudience;
067import org.slf4j.Logger;
068import org.slf4j.LoggerFactory;
069
070import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
071
072/**
073 * A compactor is a compaction algorithm associated a given policy. Base class also contains
074 * reusable parts for implementing compactors (what is common and what isn't is evolving).
075 * <p>
076 * Compactions might be concurrent against a given store and the Compactor is shared among them. Do
077 * not put mutable state into class fields. All Compactor class fields should be final or
078 * effectively final. 'keepSeqIdPeriod' is an exception to this rule because unit tests may set it.
079 */
080@InterfaceAudience.Private
081public abstract class Compactor<T extends CellSink> {
082  private static final Logger LOG = LoggerFactory.getLogger(Compactor.class);
083  protected static final long COMPACTION_PROGRESS_LOG_INTERVAL = 60 * 1000;
084  protected final Configuration conf;
085  protected final HStore store;
086  protected final int compactionKVMax;
087  protected final long compactScannerSizeLimit;
088  protected final Compression.Algorithm majorCompactionCompression;
089  protected final Compression.Algorithm minorCompactionCompression;
090
091  /** specify how many days to keep MVCC values during major compaction **/
092  protected int keepSeqIdPeriod;
093
094  // Configs that drive whether we drop page cache behind compactions
095  protected static final String MAJOR_COMPACTION_DROP_CACHE =
096    "hbase.regionserver.majorcompaction.pagecache.drop";
097  protected static final String MINOR_COMPACTION_DROP_CACHE =
098    "hbase.regionserver.minorcompaction.pagecache.drop";
099
100  protected final boolean dropCacheMajor;
101  protected final boolean dropCacheMinor;
102
103  // We track progress per request using the CompactionRequestImpl identity as key.
104  // completeCompaction() cleans up this state.
105  private final Set<CompactionProgress> progressSet =
106    Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap<>()));
107
108  // TODO: depending on Store is not good but, realistically, all compactors currently do.
109  Compactor(Configuration conf, HStore store) {
110    this.conf = conf;
111    this.store = store;
112    this.compactionKVMax =
113      this.conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
114    this.compactScannerSizeLimit = this.conf.getLong(HConstants.COMPACTION_SCANNER_SIZE_MAX,
115      HConstants.COMPACTION_SCANNER_SIZE_MAX_DEFAULT);
116    this.majorCompactionCompression = (store.getColumnFamilyDescriptor() == null)
117      ? Compression.Algorithm.NONE
118      : store.getColumnFamilyDescriptor().getMajorCompactionCompressionType();
119    this.minorCompactionCompression = (store.getColumnFamilyDescriptor() == null)
120      ? Compression.Algorithm.NONE
121      : store.getColumnFamilyDescriptor().getMinorCompactionCompressionType();
122    this.keepSeqIdPeriod =
123      Math.max(this.conf.getInt(HConstants.KEEP_SEQID_PERIOD, HConstants.MIN_KEEP_SEQID_PERIOD),
124        HConstants.MIN_KEEP_SEQID_PERIOD);
125    this.dropCacheMajor = conf.getBoolean(MAJOR_COMPACTION_DROP_CACHE, true);
126    this.dropCacheMinor = conf.getBoolean(MINOR_COMPACTION_DROP_CACHE, true);
127  }
128
129  protected interface CellSinkFactory<S> {
130    S createWriter(InternalScanner scanner, FileDetails fd, boolean shouldDropBehind, boolean major,
131      Consumer<Path> writerCreationTracker) throws IOException;
132  }
133
134  /** The sole reason this class exists is that java has no ref/out/pointer parameters. */
135  protected static class FileDetails {
136    /** Maximum key count after compaction (for blooms) */
137    public long maxKeyCount = 0;
138    /** Earliest put timestamp if major compaction */
139    public long earliestPutTs = HConstants.LATEST_TIMESTAMP;
140    /** Latest put timestamp */
141    public long latestPutTs = HConstants.LATEST_TIMESTAMP;
142    /** The last key in the files we're compacting. */
143    public long maxSeqId = 0;
144    /** Latest memstore read point found in any of the involved files */
145    public long maxMVCCReadpoint = 0;
146    /** Max tags length **/
147    public int maxTagsLength = 0;
148    /** Min SeqId to keep during a major compaction **/
149    public long minSeqIdToKeep = 0;
150    /** Total size of the compacted files **/
151    private long totalCompactedFilesSize = 0;
152  }
153
154  /**
155   * Extracts some details about the files to compact that are commonly needed by compactors.
156   * @param filesToCompact Files.
157   * @param allFiles       Whether all files are included for compaction
158   * @parma major If major compaction
159   * @return The result.
160   */
161  private FileDetails getFileDetails(Collection<HStoreFile> filesToCompact, boolean allFiles,
162    boolean major) throws IOException {
163    FileDetails fd = new FileDetails();
164    long oldestHFileTimestampToKeepMVCC =
165      EnvironmentEdgeManager.currentTime() - (1000L * 60 * 60 * 24 * this.keepSeqIdPeriod);
166
167    for (HStoreFile file : filesToCompact) {
168      if (allFiles && (file.getModificationTimestamp() < oldestHFileTimestampToKeepMVCC)) {
169        // when isAllFiles is true, all files are compacted so we can calculate the smallest
170        // MVCC value to keep
171        if (fd.minSeqIdToKeep < file.getMaxMemStoreTS()) {
172          fd.minSeqIdToKeep = file.getMaxMemStoreTS();
173        }
174      }
175      long seqNum = file.getMaxSequenceId();
176      fd.maxSeqId = Math.max(fd.maxSeqId, seqNum);
177      StoreFileReader r = file.getReader();
178      if (r == null) {
179        LOG.warn("Null reader for " + file.getPath());
180        continue;
181      }
182      // NOTE: use getEntries when compacting instead of getFilterEntries, otherwise under-sized
183      // blooms can cause progress to be miscalculated or if the user switches bloom
184      // type (e.g. from ROW to ROWCOL)
185      long keyCount = r.getEntries();
186      fd.maxKeyCount += keyCount;
187      // calculate the latest MVCC readpoint in any of the involved store files
188      Map<byte[], byte[]> fileInfo = r.loadFileInfo();
189
190      // calculate the total size of the compacted files
191      fd.totalCompactedFilesSize += r.length();
192
193      byte[] tmp = null;
194      // Get and set the real MVCCReadpoint for bulk loaded files, which is the
195      // SeqId number.
196      if (r.isBulkLoaded()) {
197        fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, r.getSequenceID());
198      } else {
199        tmp = fileInfo.get(HFile.Writer.MAX_MEMSTORE_TS_KEY);
200        if (tmp != null) {
201          fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, Bytes.toLong(tmp));
202        }
203      }
204      tmp = fileInfo.get(HFileInfo.MAX_TAGS_LEN);
205      if (tmp != null) {
206        fd.maxTagsLength = Math.max(fd.maxTagsLength, Bytes.toInt(tmp));
207      }
208      // If required, calculate the earliest put timestamp of all involved storefiles.
209      // This is used to remove family delete marker during compaction.
210      long earliestPutTs = 0;
211      if (allFiles) {
212        tmp = fileInfo.get(EARLIEST_PUT_TS);
213        if (tmp == null) {
214          // There's a file with no information, must be an old one
215          // assume we have very old puts
216          fd.earliestPutTs = earliestPutTs = PrivateConstants.OLDEST_TIMESTAMP;
217        } else {
218          earliestPutTs = Bytes.toLong(tmp);
219          fd.earliestPutTs = Math.min(fd.earliestPutTs, earliestPutTs);
220        }
221      }
222      tmp = fileInfo.get(TIMERANGE_KEY);
223      fd.latestPutTs =
224        tmp == null ? HConstants.LATEST_TIMESTAMP : TimeRangeTracker.parseFrom(tmp).getMax();
225      LOG.debug(
226        "Compacting {}, keycount={}, bloomtype={}, size={}, "
227          + "encoding={}, compression={}, seqNum={}{}",
228        (file.getPath() == null ? null : file.getPath().getName()), keyCount,
229        r.getBloomFilterType().toString(), TraditionalBinaryPrefix.long2String(r.length(), "", 1),
230        r.getHFileReader().getDataBlockEncoding(),
231        major ? majorCompactionCompression : minorCompactionCompression, seqNum,
232        (allFiles ? ", earliestPutTs=" + earliestPutTs : ""));
233    }
234    return fd;
235  }
236
237  /**
238   * Creates file scanners for compaction.
239   * @param filesToCompact Files.
240   * @return Scanners.
241   */
242  private List<StoreFileScanner> createFileScanners(Collection<HStoreFile> filesToCompact,
243    long smallestReadPoint, boolean useDropBehind) throws IOException {
244    return StoreFileScanner.getScannersForCompaction(filesToCompact, useDropBehind,
245      smallestReadPoint);
246  }
247
248  private long getSmallestReadPoint() {
249    return store.getSmallestReadPoint();
250  }
251
252  protected interface InternalScannerFactory {
253
254    ScanType getScanType(CompactionRequestImpl request);
255
256    InternalScanner createScanner(ScanInfo scanInfo, List<StoreFileScanner> scanners,
257      ScanType scanType, FileDetails fd, long smallestReadPoint) throws IOException;
258  }
259
260  protected final InternalScannerFactory defaultScannerFactory = new InternalScannerFactory() {
261
262    @Override
263    public ScanType getScanType(CompactionRequestImpl request) {
264      return request.isAllFiles() ? COMPACT_DROP_DELETES : COMPACT_RETAIN_DELETES;
265    }
266
267    @Override
268    public InternalScanner createScanner(ScanInfo scanInfo, List<StoreFileScanner> scanners,
269      ScanType scanType, FileDetails fd, long smallestReadPoint) throws IOException {
270      return Compactor.this.createScanner(store, scanInfo, scanners, scanType, smallestReadPoint,
271        fd.earliestPutTs);
272    }
273  };
274
275  protected final CreateStoreFileWriterParams createParams(FileDetails fd, boolean shouldDropBehind,
276    boolean major, Consumer<Path> writerCreationTracker) {
277    return CreateStoreFileWriterParams.create().maxKeyCount(fd.maxKeyCount)
278      .compression(major ? majorCompactionCompression : minorCompactionCompression)
279      .isCompaction(true).includeMVCCReadpoint(fd.maxMVCCReadpoint > 0)
280      .includesTag(fd.maxTagsLength > 0).shouldDropBehind(shouldDropBehind)
281      .totalCompactedFilesSize(fd.totalCompactedFilesSize)
282      .writerCreationTracker(writerCreationTracker);
283  }
284
285  /**
286   * Creates a writer for a new file.
287   * @param fd The file details.
288   * @return Writer for a new StoreFile
289   * @throws IOException if creation failed
290   */
291  protected final StoreFileWriter createWriter(FileDetails fd, boolean shouldDropBehind,
292    boolean major, Consumer<Path> writerCreationTracker) throws IOException {
293    // When all MVCC readpoints are 0, don't write them.
294    // See HBASE-8166, HBASE-12600, and HBASE-13389.
295    return store.getStoreEngine()
296      .createWriter(createParams(fd, shouldDropBehind, major, writerCreationTracker));
297  }
298
299  protected final StoreFileWriter createWriter(FileDetails fd, boolean shouldDropBehind,
300    String fileStoragePolicy, boolean major, Consumer<Path> writerCreationTracker)
301    throws IOException {
302    return store.getStoreEngine()
303      .createWriter(createParams(fd, shouldDropBehind, major, writerCreationTracker)
304        .fileStoragePolicy(fileStoragePolicy));
305  }
306
307  private ScanInfo preCompactScannerOpen(CompactionRequestImpl request, ScanType scanType,
308    User user) throws IOException {
309    if (store.getCoprocessorHost() == null) {
310      return store.getScanInfo();
311    }
312    return store.getCoprocessorHost().preCompactScannerOpen(store, scanType, request.getTracker(),
313      request, user);
314  }
315
316  /**
317   * Calls coprocessor, if any, to create scanners - after normal scanner creation.
318   * @param request  Compaction request.
319   * @param scanType Scan type.
320   * @param scanner  The default scanner created for compaction.
321   * @return Scanner scanner to use (usually the default); null if compaction should not proceed.
322   */
323  private InternalScanner postCompactScannerOpen(CompactionRequestImpl request, ScanType scanType,
324    InternalScanner scanner, User user) throws IOException {
325    if (store.getCoprocessorHost() == null) {
326      return scanner;
327    }
328    return store.getCoprocessorHost().preCompact(store, scanner, scanType, request.getTracker(),
329      request, user);
330  }
331
332  protected final List<Path> compact(final CompactionRequestImpl request,
333    InternalScannerFactory scannerFactory, CellSinkFactory<T> sinkFactory,
334    ThroughputController throughputController, User user) throws IOException {
335    FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles(), request.isMajor());
336
337    // Find the smallest read point across all the Scanners.
338    long smallestReadPoint = getSmallestReadPoint();
339
340    boolean dropCache;
341    if (request.isMajor() || request.isAllFiles()) {
342      dropCache = this.dropCacheMajor;
343    } else {
344      dropCache = this.dropCacheMinor;
345    }
346
347    InternalScanner scanner = null;
348    boolean finished = false;
349    List<StoreFileScanner> scanners =
350      createFileScanners(request.getFiles(), smallestReadPoint, dropCache);
351    T writer = null;
352    CompactionProgress progress = new CompactionProgress(fd.maxKeyCount);
353    progressSet.add(progress);
354    try {
355      /* Include deletes, unless we are doing a major compaction */
356      ScanType scanType = scannerFactory.getScanType(request);
357      ScanInfo scanInfo = preCompactScannerOpen(request, scanType, user);
358      scanner = postCompactScannerOpen(request, scanType,
359        scannerFactory.createScanner(scanInfo, scanners, scanType, fd, smallestReadPoint), user);
360      boolean cleanSeqId = false;
361      if (fd.minSeqIdToKeep > 0 && !store.getColumnFamilyDescriptor().isNewVersionBehavior()) {
362        // For mvcc-sensitive family, we never set mvcc to 0.
363        smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);
364        cleanSeqId = true;
365      }
366      writer = sinkFactory.createWriter(scanner, fd, dropCache, request.isMajor(),
367        request.getWriterCreationTracker());
368      finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId,
369        throughputController, request, progress);
370      if (!finished) {
371        throw new InterruptedIOException("Aborting compaction of store " + store + " in region "
372          + store.getRegionInfo().getRegionNameAsString() + " because it was interrupted.");
373      }
374    } finally {
375      // createScanner may fail when seeking hfiles encounter Exception, e.g. even only one hfile
376      // reader encounters java.io.IOException: Invalid HFile block magic:
377      // \x00\x00\x00\x00\x00\x00\x00\x00
378      // and then scanner will be null, but scanners for all the hfiles should be closed,
379      // or else we will find leak of ESTABLISHED sockets.
380      if (scanner == null) {
381        for (StoreFileScanner sfs : scanners) {
382          sfs.close();
383        }
384      } else {
385        Closeables.close(scanner, true);
386      }
387      if (!finished) {
388        if (writer != null) {
389          abortWriter(writer);
390        }
391      } else {
392        store.updateCompactedMetrics(request.isMajor(), progress);
393      }
394      progressSet.remove(progress);
395    }
396    assert finished : "We should have exited the method on all error paths";
397    assert writer != null : "Writer should be non-null if no error";
398    return commitWriter(writer, fd, request);
399  }
400
401  protected abstract List<Path> commitWriter(T writer, FileDetails fd,
402    CompactionRequestImpl request) throws IOException;
403
404  protected abstract void abortWriter(T writer) throws IOException;
405
406  protected List<ExtendedCell> decorateCells(List<ExtendedCell> cells) {
407    // no op
408    return cells;
409  }
410
411  /**
412   * Performs the compaction.
413   * @param fd                FileDetails of cell sink writer
414   * @param scanner           Where to read from.
415   * @param writer            Where to write to.
416   * @param smallestReadPoint Smallest read point.
417   * @param cleanSeqId        When true, remove seqId(used to be mvcc) value which is &lt;=
418   *                          smallestReadPoint
419   * @param request           compaction request.
420   * @param progress          Progress reporter.
421   * @return Whether compaction ended; false if it was interrupted for some reason.
422   */
423  protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
424    long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
425    CompactionRequestImpl request, CompactionProgress progress) throws IOException {
426    assert writer instanceof ShipperListener;
427    long bytesWrittenProgressForLog = 0;
428    long bytesWrittenProgressForShippedCall = 0;
429    // Since scanner.next() can return 'false' but still be delivering data,
430    // we have to use a do/while loop.
431    List<ExtendedCell> cells = new ArrayList<>();
432    long currentTime = EnvironmentEdgeManager.currentTime();
433    long lastMillis = 0;
434    if (LOG.isDebugEnabled()) {
435      lastMillis = currentTime;
436    }
437    CloseChecker closeChecker = new CloseChecker(conf, currentTime);
438    String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction");
439    long now = 0;
440    boolean hasMore;
441    ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(compactionKVMax)
442      .setSizeLimit(ScannerContext.LimitScope.BETWEEN_CELLS, Long.MAX_VALUE, Long.MAX_VALUE,
443        compactScannerSizeLimit)
444      .build();
445
446    throughputController.start(compactionName);
447    Shipper shipper = (scanner instanceof Shipper) ? (Shipper) scanner : null;
448    long shippedCallSizeLimit =
449      (long) request.getFiles().size() * this.store.getColumnFamilyDescriptor().getBlocksize();
450    try {
451      do {
452        // InternalScanner is for CPs so we do not want to leak ExtendedCell to the interface, but
453        // all the server side implementation should only add ExtendedCell to the List, otherwise it
454        // will cause serious assertions in our code
455        hasMore = scanner.next((List) cells, scannerContext);
456        currentTime = EnvironmentEdgeManager.currentTime();
457        if (LOG.isDebugEnabled()) {
458          now = currentTime;
459        }
460        if (closeChecker.isTimeLimit(store, currentTime)) {
461          progress.cancel();
462          return false;
463        }
464        // output to writer:
465        Cell lastCleanCell = null;
466        long lastCleanCellSeqId = 0;
467        cells = decorateCells(cells);
468        for (ExtendedCell c : cells) {
469          if (cleanSeqId && c.getSequenceId() <= smallestReadPoint) {
470            lastCleanCell = c;
471            lastCleanCellSeqId = c.getSequenceId();
472            PrivateCellUtil.setSequenceId(c, 0);
473          } else {
474            lastCleanCell = null;
475            lastCleanCellSeqId = 0;
476          }
477          int len = c.getSerializedSize();
478          ++progress.currentCompactedKVs;
479          progress.totalCompactedSize += len;
480          bytesWrittenProgressForShippedCall += len;
481          if (LOG.isDebugEnabled()) {
482            bytesWrittenProgressForLog += len;
483          }
484          throughputController.control(compactionName, len);
485          if (closeChecker.isSizeLimit(store, len)) {
486            progress.cancel();
487            return false;
488          }
489        }
490        writer.appendAll(cells);
491        if (shipper != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) {
492          if (lastCleanCell != null) {
493            // HBASE-16931, set back sequence id to avoid affecting scan order unexpectedly.
494            // ShipperListener will do a clone of the last cells it refer, so need to set back
495            // sequence id before ShipperListener.beforeShipped
496            PrivateCellUtil.setSequenceId(lastCleanCell, lastCleanCellSeqId);
497          }
498          // Clone the cells that are in the writer so that they are freed of references,
499          // if they are holding any.
500          ((ShipperListener) writer).beforeShipped();
501          // The SHARED block references, being read for compaction, will be kept in prevBlocks
502          // list(See HFileScannerImpl#prevBlocks). In case of scan flow, after each set of cells
503          // being returned to client, we will call shipped() which can clear this list. Here by
504          // we are doing the similar thing. In between the compaction (after every N cells
505          // written with collective size of 'shippedCallSizeLimit') we will call shipped which
506          // may clear prevBlocks list.
507          shipper.shipped();
508          bytesWrittenProgressForShippedCall = 0;
509        }
510        if (lastCleanCell != null) {
511          // HBASE-16931, set back sequence id to avoid affecting scan order unexpectedly
512          PrivateCellUtil.setSequenceId(lastCleanCell, lastCleanCellSeqId);
513        }
514        // Log the progress of long running compactions every minute if
515        // logging at DEBUG level
516        if (LOG.isDebugEnabled()) {
517          if ((now - lastMillis) >= COMPACTION_PROGRESS_LOG_INTERVAL) {
518            String rate = String.format("%.2f",
519              (bytesWrittenProgressForLog / 1024.0) / ((now - lastMillis) / 1000.0));
520            LOG.debug("Compaction progress: {} {}, rate={} KB/sec, throughputController is {}",
521              compactionName, progress, rate, throughputController);
522            lastMillis = now;
523            bytesWrittenProgressForLog = 0;
524          }
525        }
526        cells.clear();
527      } while (hasMore);
528    } catch (InterruptedException e) {
529      progress.cancel();
530      throw new InterruptedIOException(
531        "Interrupted while control throughput of compacting " + compactionName);
532    } finally {
533      // Clone last cell in the final because writer will append last cell when committing. If
534      // don't clone here and once the scanner get closed, then the memory of last cell will be
535      // released. (HBASE-22582)
536      ((ShipperListener) writer).beforeShipped();
537      throughputController.finish(compactionName);
538    }
539    progress.complete();
540    return true;
541  }
542
543  /**
544   * @param store             store
545   * @param scanners          Store file scanners.
546   * @param scanType          Scan type.
547   * @param smallestReadPoint Smallest MVCC read point.
548   * @param earliestPutTs     Earliest put across all files.
549   * @return A compaction scanner.
550   */
551  protected InternalScanner createScanner(HStore store, ScanInfo scanInfo,
552    List<StoreFileScanner> scanners, ScanType scanType, long smallestReadPoint, long earliestPutTs)
553    throws IOException {
554    return new StoreScanner(store, scanInfo, scanners, scanType, smallestReadPoint, earliestPutTs);
555  }
556
557  /**
558   * @param store              The store.
559   * @param scanners           Store file scanners.
560   * @param smallestReadPoint  Smallest MVCC read point.
561   * @param earliestPutTs      Earliest put across all files.
562   * @param dropDeletesFromRow Drop deletes starting with this row, inclusive. Can be null.
563   * @param dropDeletesToRow   Drop deletes ending with this row, exclusive. Can be null.
564   * @return A compaction scanner.
565   */
566  protected InternalScanner createScanner(HStore store, ScanInfo scanInfo,
567    List<StoreFileScanner> scanners, long smallestReadPoint, long earliestPutTs,
568    byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
569    return new StoreScanner(store, scanInfo, scanners, smallestReadPoint, earliestPutTs,
570      dropDeletesFromRow, dropDeletesToRow);
571  }
572
573  /**
574   * Return the aggregate progress for all currently active compactions.
575   */
576  public CompactionProgress getProgress() {
577    synchronized (progressSet) {
578      long totalCompactingKVs = 0;
579      long currentCompactedKVs = 0;
580      long totalCompactedSize = 0;
581      for (CompactionProgress progress : progressSet) {
582        totalCompactingKVs += progress.totalCompactingKVs;
583        currentCompactedKVs += progress.currentCompactedKVs;
584        totalCompactedSize += progress.totalCompactedSize;
585      }
586      CompactionProgress result = new CompactionProgress(totalCompactingKVs);
587      result.currentCompactedKVs = currentCompactedKVs;
588      result.totalCompactedSize = totalCompactedSize;
589      return result;
590    }
591  }
592
593  public boolean isCompacting() {
594    return !progressSet.isEmpty();
595  }
596}