001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.compactions;
019
020import static org.apache.hadoop.hbase.regionserver.HStoreFile.EARLIEST_PUT_TS;
021import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY;
022import static org.apache.hadoop.hbase.regionserver.ScanType.COMPACT_DROP_DELETES;
023import static org.apache.hadoop.hbase.regionserver.ScanType.COMPACT_RETAIN_DELETES;
024
025import java.io.IOException;
026import java.io.InterruptedIOException;
027import java.util.ArrayList;
028import java.util.Collection;
029import java.util.List;
030import java.util.Map;
031
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.hbase.Cell;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.PrivateCellUtil;
037import org.apache.hadoop.hbase.io.compress.Compression;
038import org.apache.hadoop.hbase.io.hfile.HFile;
039import org.apache.hadoop.hbase.io.hfile.HFileInfo;
040import org.apache.hadoop.hbase.regionserver.CellSink;
041import org.apache.hadoop.hbase.regionserver.HStore;
042import org.apache.hadoop.hbase.regionserver.HStoreFile;
043import org.apache.hadoop.hbase.regionserver.InternalScanner;
044import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
045import org.apache.hadoop.hbase.regionserver.ScanInfo;
046import org.apache.hadoop.hbase.regionserver.ScanType;
047import org.apache.hadoop.hbase.regionserver.ScannerContext;
048import org.apache.hadoop.hbase.regionserver.ShipperListener;
049import org.apache.hadoop.hbase.regionserver.StoreFileReader;
050import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
051import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
052import org.apache.hadoop.hbase.regionserver.StoreScanner;
053import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
054import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
055import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
056import org.apache.hadoop.hbase.security.User;
057import org.apache.hadoop.hbase.util.Bytes;
058import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
059import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
060import org.apache.yetus.audience.InterfaceAudience;
061import org.slf4j.Logger;
062import org.slf4j.LoggerFactory;
063import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
064
065/**
066 * A compactor is a compaction algorithm associated a given policy. Base class also contains
067 * reusable parts for implementing compactors (what is common and what isn't is evolving).
068 */
069@InterfaceAudience.Private
070public abstract class Compactor<T extends CellSink> {
071  private static final Logger LOG = LoggerFactory.getLogger(Compactor.class);
072  protected static final long COMPACTION_PROGRESS_LOG_INTERVAL = 60 * 1000;
073  protected volatile CompactionProgress progress;
074  protected final Configuration conf;
075  protected final HStore store;
076
077  protected final int compactionKVMax;
078  protected final Compression.Algorithm compactionCompression;
079
080  /** specify how many days to keep MVCC values during major compaction **/
081  protected int keepSeqIdPeriod;
082
083  // Configs that drive whether we drop page cache behind compactions
084  protected static final String  MAJOR_COMPACTION_DROP_CACHE =
085      "hbase.regionserver.majorcompaction.pagecache.drop";
086  protected static final String MINOR_COMPACTION_DROP_CACHE =
087      "hbase.regionserver.minorcompaction.pagecache.drop";
088
089  private final boolean dropCacheMajor;
090  private final boolean dropCacheMinor;
091
092  //TODO: depending on Store is not good but, realistically, all compactors currently do.
093  Compactor(Configuration conf, HStore store) {
094    this.conf = conf;
095    this.store = store;
096    this.compactionKVMax =
097      this.conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
098    this.compactionCompression = (this.store.getColumnFamilyDescriptor() == null) ?
099        Compression.Algorithm.NONE : this.store.getColumnFamilyDescriptor().getCompactionCompressionType();
100    this.keepSeqIdPeriod = Math.max(this.conf.getInt(HConstants.KEEP_SEQID_PERIOD,
101      HConstants.MIN_KEEP_SEQID_PERIOD), HConstants.MIN_KEEP_SEQID_PERIOD);
102    this.dropCacheMajor = conf.getBoolean(MAJOR_COMPACTION_DROP_CACHE, true);
103    this.dropCacheMinor = conf.getBoolean(MINOR_COMPACTION_DROP_CACHE, true);
104  }
105
106
107
108  protected interface CellSinkFactory<S> {
109    S createWriter(InternalScanner scanner, FileDetails fd, boolean shouldDropBehind)
110        throws IOException;
111  }
112
113  public CompactionProgress getProgress() {
114    return this.progress;
115  }
116
117  /** The sole reason this class exists is that java has no ref/out/pointer parameters. */
118  protected static class FileDetails {
119    /** Maximum key count after compaction (for blooms) */
120    public long maxKeyCount = 0;
121    /** Earliest put timestamp if major compaction */
122    public long earliestPutTs = HConstants.LATEST_TIMESTAMP;
123    /** Latest put timestamp */
124    public long latestPutTs = HConstants.LATEST_TIMESTAMP;
125    /** The last key in the files we're compacting. */
126    public long maxSeqId = 0;
127    /** Latest memstore read point found in any of the involved files */
128    public long maxMVCCReadpoint = 0;
129    /** Max tags length**/
130    public int maxTagsLength = 0;
131    /** Min SeqId to keep during a major compaction **/
132    public long minSeqIdToKeep = 0;
133    /** Total size of the compacted files **/
134    private long totalCompactedFilesSize = 0;
135  }
136
137  /**
138   * Extracts some details about the files to compact that are commonly needed by compactors.
139   * @param filesToCompact Files.
140   * @param allFiles Whether all files are included for compaction
141   * @return The result.
142   */
143  private FileDetails getFileDetails(
144      Collection<HStoreFile> filesToCompact, boolean allFiles) throws IOException {
145    FileDetails fd = new FileDetails();
146    long oldestHFileTimestampToKeepMVCC = System.currentTimeMillis() -
147      (1000L * 60 * 60 * 24 * this.keepSeqIdPeriod);
148
149    for (HStoreFile file : filesToCompact) {
150      if(allFiles && (file.getModificationTimestamp() < oldestHFileTimestampToKeepMVCC)) {
151        // when isAllFiles is true, all files are compacted so we can calculate the smallest
152        // MVCC value to keep
153        if(fd.minSeqIdToKeep < file.getMaxMemStoreTS()) {
154          fd.minSeqIdToKeep = file.getMaxMemStoreTS();
155        }
156      }
157      long seqNum = file.getMaxSequenceId();
158      fd.maxSeqId = Math.max(fd.maxSeqId, seqNum);
159      StoreFileReader r = file.getReader();
160      if (r == null) {
161        LOG.warn("Null reader for " + file.getPath());
162        continue;
163      }
164      // NOTE: use getEntries when compacting instead of getFilterEntries, otherwise under-sized
165      // blooms can cause progress to be miscalculated or if the user switches bloom
166      // type (e.g. from ROW to ROWCOL)
167      long keyCount = r.getEntries();
168      fd.maxKeyCount += keyCount;
169      // calculate the latest MVCC readpoint in any of the involved store files
170      Map<byte[], byte[]> fileInfo = r.loadFileInfo();
171
172      // calculate the total size of the compacted files
173      fd.totalCompactedFilesSize += r.length();
174
175      byte[] tmp = null;
176      // Get and set the real MVCCReadpoint for bulk loaded files, which is the
177      // SeqId number.
178      if (r.isBulkLoaded()) {
179        fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, r.getSequenceID());
180      }
181      else {
182        tmp = fileInfo.get(HFile.Writer.MAX_MEMSTORE_TS_KEY);
183        if (tmp != null) {
184          fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, Bytes.toLong(tmp));
185        }
186      }
187      tmp = fileInfo.get(HFileInfo.MAX_TAGS_LEN);
188      if (tmp != null) {
189        fd.maxTagsLength = Math.max(fd.maxTagsLength, Bytes.toInt(tmp));
190      }
191      // If required, calculate the earliest put timestamp of all involved storefiles.
192      // This is used to remove family delete marker during compaction.
193      long earliestPutTs = 0;
194      if (allFiles) {
195        tmp = fileInfo.get(EARLIEST_PUT_TS);
196        if (tmp == null) {
197          // There's a file with no information, must be an old one
198          // assume we have very old puts
199          fd.earliestPutTs = earliestPutTs = HConstants.OLDEST_TIMESTAMP;
200        } else {
201          earliestPutTs = Bytes.toLong(tmp);
202          fd.earliestPutTs = Math.min(fd.earliestPutTs, earliestPutTs);
203        }
204      }
205      tmp = fileInfo.get(TIMERANGE_KEY);
206      fd.latestPutTs = tmp == null ? HConstants.LATEST_TIMESTAMP: TimeRangeTracker.parseFrom(tmp).getMax();
207      LOG.debug("Compacting {}, keycount={}, bloomtype={}, size={}, "
208              + "encoding={}, compression={}, seqNum={}{}",
209          (file.getPath() == null? null: file.getPath().getName()),
210          keyCount,
211          r.getBloomFilterType().toString(),
212          TraditionalBinaryPrefix.long2String(r.length(), "", 1),
213          r.getHFileReader().getDataBlockEncoding(),
214          compactionCompression,
215          seqNum,
216          (allFiles? ", earliestPutTs=" + earliestPutTs: ""));
217    }
218    return fd;
219  }
220
221  /**
222   * Creates file scanners for compaction.
223   * @param filesToCompact Files.
224   * @return Scanners.
225   */
226  private List<StoreFileScanner> createFileScanners(Collection<HStoreFile> filesToCompact,
227      long smallestReadPoint, boolean useDropBehind) throws IOException {
228    return StoreFileScanner.getScannersForCompaction(filesToCompact, useDropBehind,
229      smallestReadPoint);
230  }
231
232  private long getSmallestReadPoint() {
233    return store.getSmallestReadPoint();
234  }
235
236  protected interface InternalScannerFactory {
237
238    ScanType getScanType(CompactionRequestImpl request);
239
240    InternalScanner createScanner(ScanInfo scanInfo, List<StoreFileScanner> scanners, ScanType scanType,
241        FileDetails fd, long smallestReadPoint) throws IOException;
242  }
243
244  protected final InternalScannerFactory defaultScannerFactory = new InternalScannerFactory() {
245
246    @Override
247    public ScanType getScanType(CompactionRequestImpl request) {
248      return request.isAllFiles() ? COMPACT_DROP_DELETES : COMPACT_RETAIN_DELETES;
249    }
250
251    @Override
252    public InternalScanner createScanner(ScanInfo scanInfo, List<StoreFileScanner> scanners,
253        ScanType scanType, FileDetails fd, long smallestReadPoint) throws IOException {
254      return Compactor.this.createScanner(store, scanInfo, scanners, scanType, smallestReadPoint,
255        fd.earliestPutTs);
256    }
257  };
258
259  /**
260   * Creates a writer for a new file in a temporary directory.
261   * @param fd The file details.
262   * @return Writer for a new StoreFile in the tmp dir.
263   * @throws IOException if creation failed
264   */
265  protected final StoreFileWriter createTmpWriter(FileDetails fd, boolean shouldDropBehind)
266      throws IOException {
267    // When all MVCC readpoints are 0, don't write them.
268    // See HBASE-8166, HBASE-12600, and HBASE-13389.
269    return store
270      .createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true, fd.maxMVCCReadpoint > 0,
271        fd.maxTagsLength > 0, shouldDropBehind, fd.totalCompactedFilesSize,
272        HConstants.EMPTY_STRING);
273  }
274
275  protected final StoreFileWriter createTmpWriter(FileDetails fd, boolean shouldDropBehind,
276      String fileStoragePolicy) throws IOException {
277    return store
278      .createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true, fd.maxMVCCReadpoint > 0,
279        fd.maxTagsLength > 0, shouldDropBehind, fd.totalCompactedFilesSize, fileStoragePolicy);
280  }
281
282  private ScanInfo preCompactScannerOpen(CompactionRequestImpl request, ScanType scanType,
283      User user) throws IOException {
284    if (store.getCoprocessorHost() == null) {
285      return store.getScanInfo();
286    }
287    return store.getCoprocessorHost().preCompactScannerOpen(store, scanType, request.getTracker(),
288      request, user);
289  }
290
291  /**
292   * Calls coprocessor, if any, to create scanners - after normal scanner creation.
293   * @param request Compaction request.
294   * @param scanType Scan type.
295   * @param scanner The default scanner created for compaction.
296   * @return Scanner scanner to use (usually the default); null if compaction should not proceed.
297   */
298  private InternalScanner postCompactScannerOpen(CompactionRequestImpl request, ScanType scanType,
299      InternalScanner scanner, User user) throws IOException {
300    if (store.getCoprocessorHost() == null) {
301      return scanner;
302    }
303    return store.getCoprocessorHost().preCompact(store, scanner, scanType, request.getTracker(),
304      request, user);
305  }
306
307  protected final List<Path> compact(final CompactionRequestImpl request,
308      InternalScannerFactory scannerFactory, CellSinkFactory<T> sinkFactory,
309      ThroughputController throughputController, User user) throws IOException {
310    FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles());
311    this.progress = new CompactionProgress(fd.maxKeyCount);
312
313    // Find the smallest read point across all the Scanners.
314    long smallestReadPoint = getSmallestReadPoint();
315
316    T writer = null;
317    boolean dropCache;
318    if (request.isMajor() || request.isAllFiles()) {
319      dropCache = this.dropCacheMajor;
320    } else {
321      dropCache = this.dropCacheMinor;
322    }
323
324    List<StoreFileScanner> scanners =
325        createFileScanners(request.getFiles(), smallestReadPoint, dropCache);
326    InternalScanner scanner = null;
327    boolean finished = false;
328    try {
329      /* Include deletes, unless we are doing a major compaction */
330      ScanType scanType = scannerFactory.getScanType(request);
331      ScanInfo scanInfo = preCompactScannerOpen(request, scanType, user);
332      scanner = postCompactScannerOpen(request, scanType,
333        scannerFactory.createScanner(scanInfo, scanners, scanType, fd, smallestReadPoint), user);
334      boolean cleanSeqId = false;
335      if (fd.minSeqIdToKeep > 0 && !store.getColumnFamilyDescriptor().isNewVersionBehavior()) {
336        // For mvcc-sensitive family, we never set mvcc to 0.
337        smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);
338        cleanSeqId = true;
339      }
340      writer = sinkFactory.createWriter(scanner, fd, dropCache);
341      finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId,
342        throughputController, request.isAllFiles(), request.getFiles().size());
343      if (!finished) {
344        throw new InterruptedIOException("Aborting compaction of store " + store + " in region "
345            + store.getRegionInfo().getRegionNameAsString() + " because it was interrupted.");
346      }
347    } finally {
348      Closeables.close(scanner, true);
349      if (!finished && writer != null) {
350        abortWriter(writer);
351      }
352    }
353    assert finished : "We should have exited the method on all error paths";
354    assert writer != null : "Writer should be non-null if no error";
355    return commitWriter(writer, fd, request);
356  }
357
358  protected abstract List<Path> commitWriter(T writer, FileDetails fd,
359      CompactionRequestImpl request) throws IOException;
360
361  protected abstract void abortWriter(T writer) throws IOException;
362
363  /**
364   * Performs the compaction.
365   * @param fd FileDetails of cell sink writer
366   * @param scanner Where to read from.
367   * @param writer Where to write to.
368   * @param smallestReadPoint Smallest read point.
369   * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is &lt;=
370   *          smallestReadPoint
371   * @param major Is a major compaction.
372   * @param numofFilesToCompact the number of files to compact
373   * @return Whether compaction ended; false if it was interrupted for some reason.
374   */
375  protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
376      long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
377      boolean major, int numofFilesToCompact) throws IOException {
378    assert writer instanceof ShipperListener;
379    long bytesWrittenProgressForLog = 0;
380    long bytesWrittenProgressForShippedCall = 0;
381    // Since scanner.next() can return 'false' but still be delivering data,
382    // we have to use a do/while loop.
383    List<Cell> cells = new ArrayList<>();
384    long currentTime = EnvironmentEdgeManager.currentTime();
385    long lastMillis = 0;
386    if (LOG.isDebugEnabled()) {
387      lastMillis = currentTime;
388    }
389    CloseChecker closeChecker = new CloseChecker(conf, currentTime);
390    String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction");
391    long now = 0;
392    boolean hasMore;
393    ScannerContext scannerContext =
394          ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
395
396    throughputController.start(compactionName);
397    KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null;
398    long shippedCallSizeLimit =
399          (long) numofFilesToCompact * this.store.getColumnFamilyDescriptor().getBlocksize();
400    try {
401      do {
402        hasMore = scanner.next(cells, scannerContext);
403        currentTime = EnvironmentEdgeManager.currentTime();
404        if (LOG.isDebugEnabled()) {
405          now = currentTime;
406        }
407        if (closeChecker.isTimeLimit(store, currentTime)) {
408          progress.cancel();
409          return false;
410        }
411        // output to writer:
412        Cell lastCleanCell = null;
413        long lastCleanCellSeqId = 0;
414        for (Cell c : cells) {
415          if (cleanSeqId && c.getSequenceId() <= smallestReadPoint) {
416            lastCleanCell = c;
417            lastCleanCellSeqId = c.getSequenceId();
418            PrivateCellUtil.setSequenceId(c, 0);
419          } else {
420            lastCleanCell = null;
421            lastCleanCellSeqId = 0;
422          }
423          writer.append(c);
424          int len = c.getSerializedSize();
425          ++progress.currentCompactedKVs;
426          progress.totalCompactedSize += len;
427          bytesWrittenProgressForShippedCall += len;
428          if (LOG.isDebugEnabled()) {
429            bytesWrittenProgressForLog += len;
430          }
431          throughputController.control(compactionName, len);
432          if (closeChecker.isSizeLimit(store, len)) {
433            progress.cancel();
434            return false;
435          }
436          if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) {
437            if (lastCleanCell != null) {
438              // HBASE-16931, set back sequence id to avoid affecting scan order unexpectedly.
439              // ShipperListener will do a clone of the last cells it refer, so need to set back
440              // sequence id before ShipperListener.beforeShipped
441              PrivateCellUtil.setSequenceId(lastCleanCell, lastCleanCellSeqId);
442            }
443            // Clone the cells that are in the writer so that they are freed of references,
444            // if they are holding any.
445            ((ShipperListener) writer).beforeShipped();
446            // The SHARED block references, being read for compaction, will be kept in prevBlocks
447            // list(See HFileScannerImpl#prevBlocks). In case of scan flow, after each set of cells
448            // being returned to client, we will call shipped() which can clear this list. Here by
449            // we are doing the similar thing. In between the compaction (after every N cells
450            // written with collective size of 'shippedCallSizeLimit') we will call shipped which
451            // may clear prevBlocks list.
452            kvs.shipped();
453            bytesWrittenProgressForShippedCall = 0;
454          }
455        }
456        if (lastCleanCell != null) {
457          // HBASE-16931, set back sequence id to avoid affecting scan order unexpectedly
458          PrivateCellUtil.setSequenceId(lastCleanCell, lastCleanCellSeqId);
459        }
460        // Log the progress of long running compactions every minute if
461        // logging at DEBUG level
462        if (LOG.isDebugEnabled()) {
463          if ((now - lastMillis) >= COMPACTION_PROGRESS_LOG_INTERVAL) {
464            String rate = String.format("%.2f",
465              (bytesWrittenProgressForLog / 1024.0) / ((now - lastMillis) / 1000.0));
466            LOG.debug("Compaction progress: {} {}, rate={} KB/sec, throughputController is {}",
467              compactionName, progress, rate, throughputController);
468            lastMillis = now;
469            bytesWrittenProgressForLog = 0;
470          }
471        }
472        cells.clear();
473      } while (hasMore);
474    } catch (InterruptedException e) {
475      progress.cancel();
476      throw new InterruptedIOException(
477            "Interrupted while control throughput of compacting " + compactionName);
478    } finally {
479      // Clone last cell in the final because writer will append last cell when committing. If
480      // don't clone here and once the scanner get closed, then the memory of last cell will be
481      // released. (HBASE-22582)
482      ((ShipperListener) writer).beforeShipped();
483      throughputController.finish(compactionName);
484    }
485    progress.complete();
486    return true;
487  }
488
489  /**
490   * @param store store
491   * @param scanners Store file scanners.
492   * @param scanType Scan type.
493   * @param smallestReadPoint Smallest MVCC read point.
494   * @param earliestPutTs Earliest put across all files.
495   * @return A compaction scanner.
496   */
497  protected InternalScanner createScanner(HStore store, ScanInfo scanInfo,
498      List<StoreFileScanner> scanners, ScanType scanType, long smallestReadPoint,
499      long earliestPutTs) throws IOException {
500    return new StoreScanner(store, scanInfo, scanners, scanType, smallestReadPoint, earliestPutTs);
501  }
502
503  /**
504   * @param store The store.
505   * @param scanners Store file scanners.
506   * @param smallestReadPoint Smallest MVCC read point.
507   * @param earliestPutTs Earliest put across all files.
508   * @param dropDeletesFromRow Drop deletes starting with this row, inclusive. Can be null.
509   * @param dropDeletesToRow Drop deletes ending with this row, exclusive. Can be null.
510   * @return A compaction scanner.
511   */
512  protected InternalScanner createScanner(HStore store, ScanInfo scanInfo,
513      List<StoreFileScanner> scanners, long smallestReadPoint, long earliestPutTs,
514      byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
515    return new StoreScanner(store, scanInfo, scanners, smallestReadPoint, earliestPutTs,
516        dropDeletesFromRow, dropDeletesToRow);
517  }
518}