001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.compactions;
019
020import static org.apache.hadoop.hbase.regionserver.HStoreFile.EARLIEST_PUT_TS;
021import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY;
022import static org.apache.hadoop.hbase.regionserver.ScanType.COMPACT_DROP_DELETES;
023import static org.apache.hadoop.hbase.regionserver.ScanType.COMPACT_RETAIN_DELETES;
024
025import java.io.IOException;
026import java.io.InterruptedIOException;
027import java.util.ArrayList;
028import java.util.Collection;
029import java.util.Collections;
030import java.util.IdentityHashMap;
031import java.util.List;
032import java.util.Map;
033import java.util.Set;
034import java.util.function.Consumer;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.hbase.Cell;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.PrivateCellUtil;
040import org.apache.hadoop.hbase.io.compress.Compression;
041import org.apache.hadoop.hbase.io.hfile.HFile;
042import org.apache.hadoop.hbase.io.hfile.HFileInfo;
043import org.apache.hadoop.hbase.regionserver.CellSink;
044import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams;
045import org.apache.hadoop.hbase.regionserver.HStore;
046import org.apache.hadoop.hbase.regionserver.HStoreFile;
047import org.apache.hadoop.hbase.regionserver.InternalScanner;
048import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
049import org.apache.hadoop.hbase.regionserver.ScanInfo;
050import org.apache.hadoop.hbase.regionserver.ScanType;
051import org.apache.hadoop.hbase.regionserver.ScannerContext;
052import org.apache.hadoop.hbase.regionserver.ShipperListener;
053import org.apache.hadoop.hbase.regionserver.StoreFileReader;
054import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
055import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
056import org.apache.hadoop.hbase.regionserver.StoreScanner;
057import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
058import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
059import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
060import org.apache.hadoop.hbase.security.User;
061import org.apache.hadoop.hbase.util.Bytes;
062import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
063import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
064import org.apache.yetus.audience.InterfaceAudience;
065import org.slf4j.Logger;
066import org.slf4j.LoggerFactory;
067
068import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
069
070/**
071 * A compactor is a compaction algorithm associated a given policy. Base class also contains
072 * reusable parts for implementing compactors (what is common and what isn't is evolving).
073 * <p>
074 * Compactions might be concurrent against a given store and the Compactor is shared among them. Do
075 * not put mutable state into class fields. All Compactor class fields should be final or
076 * effectively final. 'keepSeqIdPeriod' is an exception to this rule because unit tests may set it.
077 */
078@InterfaceAudience.Private
079public abstract class Compactor<T extends CellSink> {
080  private static final Logger LOG = LoggerFactory.getLogger(Compactor.class);
081  protected static final long COMPACTION_PROGRESS_LOG_INTERVAL = 60 * 1000;
082  protected final Configuration conf;
083  protected final HStore store;
084  protected final int compactionKVMax;
085  protected final Compression.Algorithm majorCompactionCompression;
086  protected final Compression.Algorithm minorCompactionCompression;
087
088  /** specify how many days to keep MVCC values during major compaction **/
089  protected int keepSeqIdPeriod;
090
091  // Configs that drive whether we drop page cache behind compactions
092  protected static final String MAJOR_COMPACTION_DROP_CACHE =
093    "hbase.regionserver.majorcompaction.pagecache.drop";
094  protected static final String MINOR_COMPACTION_DROP_CACHE =
095    "hbase.regionserver.minorcompaction.pagecache.drop";
096
097  protected final boolean dropCacheMajor;
098  protected final boolean dropCacheMinor;
099
100  // We track progress per request using the CompactionRequestImpl identity as key.
101  // completeCompaction() cleans up this state.
102  private final Set<CompactionProgress> progressSet =
103    Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap<>()));
104
105  // TODO: depending on Store is not good but, realistically, all compactors currently do.
106  Compactor(Configuration conf, HStore store) {
107    this.conf = conf;
108    this.store = store;
109    this.compactionKVMax =
110      this.conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
111    this.majorCompactionCompression = (store.getColumnFamilyDescriptor() == null)
112      ? Compression.Algorithm.NONE
113      : store.getColumnFamilyDescriptor().getMajorCompactionCompressionType();
114    this.minorCompactionCompression = (store.getColumnFamilyDescriptor() == null)
115      ? Compression.Algorithm.NONE
116      : store.getColumnFamilyDescriptor().getMinorCompactionCompressionType();
117    this.keepSeqIdPeriod =
118      Math.max(this.conf.getInt(HConstants.KEEP_SEQID_PERIOD, HConstants.MIN_KEEP_SEQID_PERIOD),
119        HConstants.MIN_KEEP_SEQID_PERIOD);
120    this.dropCacheMajor = conf.getBoolean(MAJOR_COMPACTION_DROP_CACHE, true);
121    this.dropCacheMinor = conf.getBoolean(MINOR_COMPACTION_DROP_CACHE, true);
122  }
123
124  protected interface CellSinkFactory<S> {
125    S createWriter(InternalScanner scanner, FileDetails fd, boolean shouldDropBehind, boolean major,
126      Consumer<Path> writerCreationTracker) throws IOException;
127  }
128
129  /** The sole reason this class exists is that java has no ref/out/pointer parameters. */
130  protected static class FileDetails {
131    /** Maximum key count after compaction (for blooms) */
132    public long maxKeyCount = 0;
133    /** Earliest put timestamp if major compaction */
134    public long earliestPutTs = HConstants.LATEST_TIMESTAMP;
135    /** Latest put timestamp */
136    public long latestPutTs = HConstants.LATEST_TIMESTAMP;
137    /** The last key in the files we're compacting. */
138    public long maxSeqId = 0;
139    /** Latest memstore read point found in any of the involved files */
140    public long maxMVCCReadpoint = 0;
141    /** Max tags length **/
142    public int maxTagsLength = 0;
143    /** Min SeqId to keep during a major compaction **/
144    public long minSeqIdToKeep = 0;
145    /** Total size of the compacted files **/
146    private long totalCompactedFilesSize = 0;
147  }
148
149  /**
150   * Extracts some details about the files to compact that are commonly needed by compactors.
151   * @param filesToCompact Files.
152   * @param allFiles       Whether all files are included for compaction
153   * @parma major If major compaction
154   * @return The result.
155   */
156  private FileDetails getFileDetails(Collection<HStoreFile> filesToCompact, boolean allFiles,
157    boolean major) throws IOException {
158    FileDetails fd = new FileDetails();
159    long oldestHFileTimestampToKeepMVCC =
160      EnvironmentEdgeManager.currentTime() - (1000L * 60 * 60 * 24 * this.keepSeqIdPeriod);
161
162    for (HStoreFile file : filesToCompact) {
163      if (allFiles && (file.getModificationTimestamp() < oldestHFileTimestampToKeepMVCC)) {
164        // when isAllFiles is true, all files are compacted so we can calculate the smallest
165        // MVCC value to keep
166        if (fd.minSeqIdToKeep < file.getMaxMemStoreTS()) {
167          fd.minSeqIdToKeep = file.getMaxMemStoreTS();
168        }
169      }
170      long seqNum = file.getMaxSequenceId();
171      fd.maxSeqId = Math.max(fd.maxSeqId, seqNum);
172      StoreFileReader r = file.getReader();
173      if (r == null) {
174        LOG.warn("Null reader for " + file.getPath());
175        continue;
176      }
177      // NOTE: use getEntries when compacting instead of getFilterEntries, otherwise under-sized
178      // blooms can cause progress to be miscalculated or if the user switches bloom
179      // type (e.g. from ROW to ROWCOL)
180      long keyCount = r.getEntries();
181      fd.maxKeyCount += keyCount;
182      // calculate the latest MVCC readpoint in any of the involved store files
183      Map<byte[], byte[]> fileInfo = r.loadFileInfo();
184
185      // calculate the total size of the compacted files
186      fd.totalCompactedFilesSize += r.length();
187
188      byte[] tmp = null;
189      // Get and set the real MVCCReadpoint for bulk loaded files, which is the
190      // SeqId number.
191      if (r.isBulkLoaded()) {
192        fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, r.getSequenceID());
193      } else {
194        tmp = fileInfo.get(HFile.Writer.MAX_MEMSTORE_TS_KEY);
195        if (tmp != null) {
196          fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, Bytes.toLong(tmp));
197        }
198      }
199      tmp = fileInfo.get(HFileInfo.MAX_TAGS_LEN);
200      if (tmp != null) {
201        fd.maxTagsLength = Math.max(fd.maxTagsLength, Bytes.toInt(tmp));
202      }
203      // If required, calculate the earliest put timestamp of all involved storefiles.
204      // This is used to remove family delete marker during compaction.
205      long earliestPutTs = 0;
206      if (allFiles) {
207        tmp = fileInfo.get(EARLIEST_PUT_TS);
208        if (tmp == null) {
209          // There's a file with no information, must be an old one
210          // assume we have very old puts
211          fd.earliestPutTs = earliestPutTs = HConstants.OLDEST_TIMESTAMP;
212        } else {
213          earliestPutTs = Bytes.toLong(tmp);
214          fd.earliestPutTs = Math.min(fd.earliestPutTs, earliestPutTs);
215        }
216      }
217      tmp = fileInfo.get(TIMERANGE_KEY);
218      fd.latestPutTs =
219        tmp == null ? HConstants.LATEST_TIMESTAMP : TimeRangeTracker.parseFrom(tmp).getMax();
220      LOG.debug(
221        "Compacting {}, keycount={}, bloomtype={}, size={}, "
222          + "encoding={}, compression={}, seqNum={}{}",
223        (file.getPath() == null ? null : file.getPath().getName()), keyCount,
224        r.getBloomFilterType().toString(), TraditionalBinaryPrefix.long2String(r.length(), "", 1),
225        r.getHFileReader().getDataBlockEncoding(),
226        major ? majorCompactionCompression : minorCompactionCompression, seqNum,
227        (allFiles ? ", earliestPutTs=" + earliestPutTs : ""));
228    }
229    return fd;
230  }
231
232  /**
233   * Creates file scanners for compaction.
234   * @param filesToCompact Files.
235   * @return Scanners.
236   */
237  private List<StoreFileScanner> createFileScanners(Collection<HStoreFile> filesToCompact,
238    long smallestReadPoint, boolean useDropBehind) throws IOException {
239    return StoreFileScanner.getScannersForCompaction(filesToCompact, useDropBehind,
240      smallestReadPoint);
241  }
242
243  private long getSmallestReadPoint() {
244    return store.getSmallestReadPoint();
245  }
246
247  protected interface InternalScannerFactory {
248
249    ScanType getScanType(CompactionRequestImpl request);
250
251    InternalScanner createScanner(ScanInfo scanInfo, List<StoreFileScanner> scanners,
252      ScanType scanType, FileDetails fd, long smallestReadPoint) throws IOException;
253  }
254
255  protected final InternalScannerFactory defaultScannerFactory = new InternalScannerFactory() {
256
257    @Override
258    public ScanType getScanType(CompactionRequestImpl request) {
259      return request.isAllFiles() ? COMPACT_DROP_DELETES : COMPACT_RETAIN_DELETES;
260    }
261
262    @Override
263    public InternalScanner createScanner(ScanInfo scanInfo, List<StoreFileScanner> scanners,
264      ScanType scanType, FileDetails fd, long smallestReadPoint) throws IOException {
265      return Compactor.this.createScanner(store, scanInfo, scanners, scanType, smallestReadPoint,
266        fd.earliestPutTs);
267    }
268  };
269
270  protected final CreateStoreFileWriterParams createParams(FileDetails fd, boolean shouldDropBehind,
271    boolean major, Consumer<Path> writerCreationTracker) {
272    return CreateStoreFileWriterParams.create().maxKeyCount(fd.maxKeyCount)
273      .compression(major ? majorCompactionCompression : minorCompactionCompression)
274      .isCompaction(true).includeMVCCReadpoint(fd.maxMVCCReadpoint > 0)
275      .includesTag(fd.maxTagsLength > 0).shouldDropBehind(shouldDropBehind)
276      .totalCompactedFilesSize(fd.totalCompactedFilesSize)
277      .writerCreationTracker(writerCreationTracker);
278  }
279
280  /**
281   * Creates a writer for a new file.
282   * @param fd The file details.
283   * @return Writer for a new StoreFile
284   * @throws IOException if creation failed
285   */
286  protected final StoreFileWriter createWriter(FileDetails fd, boolean shouldDropBehind,
287    boolean major, Consumer<Path> writerCreationTracker) throws IOException {
288    // When all MVCC readpoints are 0, don't write them.
289    // See HBASE-8166, HBASE-12600, and HBASE-13389.
290    return store.getStoreEngine()
291      .createWriter(createParams(fd, shouldDropBehind, major, writerCreationTracker));
292  }
293
294  protected final StoreFileWriter createWriter(FileDetails fd, boolean shouldDropBehind,
295    String fileStoragePolicy, boolean major, Consumer<Path> writerCreationTracker)
296    throws IOException {
297    return store.getStoreEngine()
298      .createWriter(createParams(fd, shouldDropBehind, major, writerCreationTracker)
299        .fileStoragePolicy(fileStoragePolicy));
300  }
301
302  private ScanInfo preCompactScannerOpen(CompactionRequestImpl request, ScanType scanType,
303    User user) throws IOException {
304    if (store.getCoprocessorHost() == null) {
305      return store.getScanInfo();
306    }
307    return store.getCoprocessorHost().preCompactScannerOpen(store, scanType, request.getTracker(),
308      request, user);
309  }
310
311  /**
312   * Calls coprocessor, if any, to create scanners - after normal scanner creation.
313   * @param request  Compaction request.
314   * @param scanType Scan type.
315   * @param scanner  The default scanner created for compaction.
316   * @return Scanner scanner to use (usually the default); null if compaction should not proceed.
317   */
318  private InternalScanner postCompactScannerOpen(CompactionRequestImpl request, ScanType scanType,
319    InternalScanner scanner, User user) throws IOException {
320    if (store.getCoprocessorHost() == null) {
321      return scanner;
322    }
323    return store.getCoprocessorHost().preCompact(store, scanner, scanType, request.getTracker(),
324      request, user);
325  }
326
327  protected final List<Path> compact(final CompactionRequestImpl request,
328    InternalScannerFactory scannerFactory, CellSinkFactory<T> sinkFactory,
329    ThroughputController throughputController, User user) throws IOException {
330    FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles(), request.isMajor());
331
332    // Find the smallest read point across all the Scanners.
333    long smallestReadPoint = getSmallestReadPoint();
334
335    boolean dropCache;
336    if (request.isMajor() || request.isAllFiles()) {
337      dropCache = this.dropCacheMajor;
338    } else {
339      dropCache = this.dropCacheMinor;
340    }
341
342    InternalScanner scanner = null;
343    boolean finished = false;
344    List<StoreFileScanner> scanners =
345      createFileScanners(request.getFiles(), smallestReadPoint, dropCache);
346    T writer = null;
347    CompactionProgress progress = new CompactionProgress(fd.maxKeyCount);
348    progressSet.add(progress);
349    try {
350      /* Include deletes, unless we are doing a major compaction */
351      ScanType scanType = scannerFactory.getScanType(request);
352      ScanInfo scanInfo = preCompactScannerOpen(request, scanType, user);
353      scanner = postCompactScannerOpen(request, scanType,
354        scannerFactory.createScanner(scanInfo, scanners, scanType, fd, smallestReadPoint), user);
355      boolean cleanSeqId = false;
356      if (fd.minSeqIdToKeep > 0 && !store.getColumnFamilyDescriptor().isNewVersionBehavior()) {
357        // For mvcc-sensitive family, we never set mvcc to 0.
358        smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);
359        cleanSeqId = true;
360      }
361      writer = sinkFactory.createWriter(scanner, fd, dropCache, request.isMajor(),
362        request.getWriterCreationTracker());
363      finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId,
364        throughputController, request, progress);
365      if (!finished) {
366        throw new InterruptedIOException("Aborting compaction of store " + store + " in region "
367          + store.getRegionInfo().getRegionNameAsString() + " because it was interrupted.");
368      }
369    } finally {
370      // createScanner may fail when seeking hfiles encounter Exception, e.g. even only one hfile
371      // reader encounters java.io.IOException: Invalid HFile block magic:
372      // \x00\x00\x00\x00\x00\x00\x00\x00
373      // and then scanner will be null, but scanners for all the hfiles should be closed,
374      // or else we will find leak of ESTABLISHED sockets.
375      if (scanner == null) {
376        for (StoreFileScanner sfs : scanners) {
377          sfs.close();
378        }
379      } else {
380        Closeables.close(scanner, true);
381      }
382      if (!finished) {
383        if (writer != null) {
384          abortWriter(writer);
385        }
386      } else {
387        store.updateCompactedMetrics(request.isMajor(), progress);
388      }
389      progressSet.remove(progress);
390    }
391    assert finished : "We should have exited the method on all error paths";
392    assert writer != null : "Writer should be non-null if no error";
393    return commitWriter(writer, fd, request);
394  }
395
396  protected abstract List<Path> commitWriter(T writer, FileDetails fd,
397    CompactionRequestImpl request) throws IOException;
398
399  protected abstract void abortWriter(T writer) throws IOException;
400
401  /**
402   * Performs the compaction.
403   * @param fd                FileDetails of cell sink writer
404   * @param scanner           Where to read from.
405   * @param writer            Where to write to.
406   * @param smallestReadPoint Smallest read point.
407   * @param cleanSeqId        When true, remove seqId(used to be mvcc) value which is &lt;=
408   *                          smallestReadPoint
409   * @param request           compaction request.
410   * @param progress          Progress reporter.
411   * @return Whether compaction ended; false if it was interrupted for some reason.
412   */
413  protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
414    long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
415    CompactionRequestImpl request, CompactionProgress progress) throws IOException {
416    assert writer instanceof ShipperListener;
417    long bytesWrittenProgressForLog = 0;
418    long bytesWrittenProgressForShippedCall = 0;
419    // Since scanner.next() can return 'false' but still be delivering data,
420    // we have to use a do/while loop.
421    List<Cell> cells = new ArrayList<>();
422    long currentTime = EnvironmentEdgeManager.currentTime();
423    long lastMillis = 0;
424    if (LOG.isDebugEnabled()) {
425      lastMillis = currentTime;
426    }
427    CloseChecker closeChecker = new CloseChecker(conf, currentTime);
428    String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction");
429    long now = 0;
430    boolean hasMore;
431    ScannerContext scannerContext =
432      ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
433
434    throughputController.start(compactionName);
435    KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null;
436    long shippedCallSizeLimit =
437      (long) request.getFiles().size() * this.store.getColumnFamilyDescriptor().getBlocksize();
438    try {
439      do {
440        hasMore = scanner.next(cells, scannerContext);
441        currentTime = EnvironmentEdgeManager.currentTime();
442        if (LOG.isDebugEnabled()) {
443          now = currentTime;
444        }
445        if (closeChecker.isTimeLimit(store, currentTime)) {
446          progress.cancel();
447          return false;
448        }
449        // output to writer:
450        Cell lastCleanCell = null;
451        long lastCleanCellSeqId = 0;
452        for (Cell c : cells) {
453          if (cleanSeqId && c.getSequenceId() <= smallestReadPoint) {
454            lastCleanCell = c;
455            lastCleanCellSeqId = c.getSequenceId();
456            PrivateCellUtil.setSequenceId(c, 0);
457          } else {
458            lastCleanCell = null;
459            lastCleanCellSeqId = 0;
460          }
461          writer.append(c);
462          int len = c.getSerializedSize();
463          ++progress.currentCompactedKVs;
464          progress.totalCompactedSize += len;
465          bytesWrittenProgressForShippedCall += len;
466          if (LOG.isDebugEnabled()) {
467            bytesWrittenProgressForLog += len;
468          }
469          throughputController.control(compactionName, len);
470          if (closeChecker.isSizeLimit(store, len)) {
471            progress.cancel();
472            return false;
473          }
474        }
475        if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) {
476          if (lastCleanCell != null) {
477            // HBASE-16931, set back sequence id to avoid affecting scan order unexpectedly.
478            // ShipperListener will do a clone of the last cells it refer, so need to set back
479            // sequence id before ShipperListener.beforeShipped
480            PrivateCellUtil.setSequenceId(lastCleanCell, lastCleanCellSeqId);
481          }
482          // Clone the cells that are in the writer so that they are freed of references,
483          // if they are holding any.
484          ((ShipperListener) writer).beforeShipped();
485          // The SHARED block references, being read for compaction, will be kept in prevBlocks
486          // list(See HFileScannerImpl#prevBlocks). In case of scan flow, after each set of cells
487          // being returned to client, we will call shipped() which can clear this list. Here by
488          // we are doing the similar thing. In between the compaction (after every N cells
489          // written with collective size of 'shippedCallSizeLimit') we will call shipped which
490          // may clear prevBlocks list.
491          kvs.shipped();
492          bytesWrittenProgressForShippedCall = 0;
493        }
494        if (lastCleanCell != null) {
495          // HBASE-16931, set back sequence id to avoid affecting scan order unexpectedly
496          PrivateCellUtil.setSequenceId(lastCleanCell, lastCleanCellSeqId);
497        }
498        // Log the progress of long running compactions every minute if
499        // logging at DEBUG level
500        if (LOG.isDebugEnabled()) {
501          if ((now - lastMillis) >= COMPACTION_PROGRESS_LOG_INTERVAL) {
502            String rate = String.format("%.2f",
503              (bytesWrittenProgressForLog / 1024.0) / ((now - lastMillis) / 1000.0));
504            LOG.debug("Compaction progress: {} {}, rate={} KB/sec, throughputController is {}",
505              compactionName, progress, rate, throughputController);
506            lastMillis = now;
507            bytesWrittenProgressForLog = 0;
508          }
509        }
510        cells.clear();
511      } while (hasMore);
512    } catch (InterruptedException e) {
513      progress.cancel();
514      throw new InterruptedIOException(
515        "Interrupted while control throughput of compacting " + compactionName);
516    } finally {
517      // Clone last cell in the final because writer will append last cell when committing. If
518      // don't clone here and once the scanner get closed, then the memory of last cell will be
519      // released. (HBASE-22582)
520      ((ShipperListener) writer).beforeShipped();
521      throughputController.finish(compactionName);
522    }
523    progress.complete();
524    return true;
525  }
526
527  /**
528   * @param store             store
529   * @param scanners          Store file scanners.
530   * @param scanType          Scan type.
531   * @param smallestReadPoint Smallest MVCC read point.
532   * @param earliestPutTs     Earliest put across all files.
533   * @return A compaction scanner.
534   */
535  protected InternalScanner createScanner(HStore store, ScanInfo scanInfo,
536    List<StoreFileScanner> scanners, ScanType scanType, long smallestReadPoint, long earliestPutTs)
537    throws IOException {
538    return new StoreScanner(store, scanInfo, scanners, scanType, smallestReadPoint, earliestPutTs);
539  }
540
541  /**
542   * @param store              The store.
543   * @param scanners           Store file scanners.
544   * @param smallestReadPoint  Smallest MVCC read point.
545   * @param earliestPutTs      Earliest put across all files.
546   * @param dropDeletesFromRow Drop deletes starting with this row, inclusive. Can be null.
547   * @param dropDeletesToRow   Drop deletes ending with this row, exclusive. Can be null.
548   * @return A compaction scanner.
549   */
550  protected InternalScanner createScanner(HStore store, ScanInfo scanInfo,
551    List<StoreFileScanner> scanners, long smallestReadPoint, long earliestPutTs,
552    byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
553    return new StoreScanner(store, scanInfo, scanners, smallestReadPoint, earliestPutTs,
554      dropDeletesFromRow, dropDeletesToRow);
555  }
556
557  /**
558   * Return the aggregate progress for all currently active compactions.
559   */
560  public CompactionProgress getProgress() {
561    synchronized (progressSet) {
562      long totalCompactingKVs = 0;
563      long currentCompactedKVs = 0;
564      long totalCompactedSize = 0;
565      for (CompactionProgress progress : progressSet) {
566        totalCompactingKVs += progress.totalCompactingKVs;
567        currentCompactedKVs += progress.currentCompactedKVs;
568        totalCompactedSize += progress.totalCompactedSize;
569      }
570      CompactionProgress result = new CompactionProgress(totalCompactingKVs);
571      result.currentCompactedKVs = currentCompactedKVs;
572      result.totalCompactedSize = totalCompactedSize;
573      return result;
574    }
575  }
576
577  public boolean isCompacting() {
578    return !progressSet.isEmpty();
579  }
580}