001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.compactions;
019
020import static org.apache.hadoop.hbase.regionserver.HStoreFile.EARLIEST_PUT_TS;
021import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY;
022import static org.apache.hadoop.hbase.regionserver.ScanType.COMPACT_DROP_DELETES;
023import static org.apache.hadoop.hbase.regionserver.ScanType.COMPACT_RETAIN_DELETES;
024
025import java.io.IOException;
026import java.io.InterruptedIOException;
027import java.util.ArrayList;
028import java.util.Collection;
029import java.util.List;
030import java.util.Map;
031
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.hbase.Cell;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.PrivateCellUtil;
037import org.apache.hadoop.hbase.KeyValueUtil;
038import org.apache.hadoop.hbase.io.compress.Compression;
039import org.apache.hadoop.hbase.io.hfile.HFile;
040import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
041import org.apache.hadoop.hbase.regionserver.CellSink;
042import org.apache.hadoop.hbase.regionserver.HStore;
043import org.apache.hadoop.hbase.regionserver.HStoreFile;
044import org.apache.hadoop.hbase.regionserver.InternalScanner;
045import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
046import org.apache.hadoop.hbase.regionserver.ScanInfo;
047import org.apache.hadoop.hbase.regionserver.ScanType;
048import org.apache.hadoop.hbase.regionserver.ScannerContext;
049import org.apache.hadoop.hbase.regionserver.ShipperListener;
050import org.apache.hadoop.hbase.regionserver.StoreFileReader;
051import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
052import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
053import org.apache.hadoop.hbase.regionserver.StoreScanner;
054import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
055import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
056import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
057import org.apache.hadoop.hbase.security.User;
058import org.apache.hadoop.hbase.util.Bytes;
059import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
060import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
061import org.apache.yetus.audience.InterfaceAudience;
062import org.slf4j.Logger;
063import org.slf4j.LoggerFactory;
064import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
065
066/**
067 * A compactor is a compaction algorithm associated a given policy. Base class also contains
068 * reusable parts for implementing compactors (what is common and what isn't is evolving).
069 */
070@InterfaceAudience.Private
071public abstract class Compactor<T extends CellSink> {
072  private static final Logger LOG = LoggerFactory.getLogger(Compactor.class);
073  protected static final long COMPACTION_PROGRESS_LOG_INTERVAL = 60 * 1000;
074  protected volatile CompactionProgress progress;
075  protected final Configuration conf;
076  protected final HStore store;
077
078  protected final int compactionKVMax;
079  protected final Compression.Algorithm compactionCompression;
080
081  /** specify how many days to keep MVCC values during major compaction **/
082  protected int keepSeqIdPeriod;
083
084  // Configs that drive whether we drop page cache behind compactions
085  protected static final String  MAJOR_COMPACTION_DROP_CACHE =
086      "hbase.regionserver.majorcompaction.pagecache.drop";
087  protected static final String MINOR_COMPACTION_DROP_CACHE =
088      "hbase.regionserver.minorcompaction.pagecache.drop";
089
090  private final boolean dropCacheMajor;
091  private final boolean dropCacheMinor;
092
093  //TODO: depending on Store is not good but, realistically, all compactors currently do.
094  Compactor(Configuration conf, HStore store) {
095    this.conf = conf;
096    this.store = store;
097    this.compactionKVMax =
098      this.conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
099    this.compactionCompression = (this.store.getColumnFamilyDescriptor() == null) ?
100        Compression.Algorithm.NONE : this.store.getColumnFamilyDescriptor().getCompactionCompressionType();
101    this.keepSeqIdPeriod = Math.max(this.conf.getInt(HConstants.KEEP_SEQID_PERIOD,
102      HConstants.MIN_KEEP_SEQID_PERIOD), HConstants.MIN_KEEP_SEQID_PERIOD);
103    this.dropCacheMajor = conf.getBoolean(MAJOR_COMPACTION_DROP_CACHE, true);
104    this.dropCacheMinor = conf.getBoolean(MINOR_COMPACTION_DROP_CACHE, true);
105  }
106
107
108
109  protected interface CellSinkFactory<S> {
110    S createWriter(InternalScanner scanner, FileDetails fd, boolean shouldDropBehind)
111        throws IOException;
112  }
113
114  public CompactionProgress getProgress() {
115    return this.progress;
116  }
117
118  /** The sole reason this class exists is that java has no ref/out/pointer parameters. */
119  protected static class FileDetails {
120    /** Maximum key count after compaction (for blooms) */
121    public long maxKeyCount = 0;
122    /** Earliest put timestamp if major compaction */
123    public long earliestPutTs = HConstants.LATEST_TIMESTAMP;
124    /** Latest put timestamp */
125    public long latestPutTs = HConstants.LATEST_TIMESTAMP;
126    /** The last key in the files we're compacting. */
127    public long maxSeqId = 0;
128    /** Latest memstore read point found in any of the involved files */
129    public long maxMVCCReadpoint = 0;
130    /** Max tags length**/
131    public int maxTagsLength = 0;
132    /** Min SeqId to keep during a major compaction **/
133    public long minSeqIdToKeep = 0;
134  }
135
136  /**
137   * Extracts some details about the files to compact that are commonly needed by compactors.
138   * @param filesToCompact Files.
139   * @param allFiles Whether all files are included for compaction
140   * @return The result.
141   */
142  private FileDetails getFileDetails(
143      Collection<HStoreFile> filesToCompact, boolean allFiles) throws IOException {
144    FileDetails fd = new FileDetails();
145    long oldestHFileTimestampToKeepMVCC = System.currentTimeMillis() -
146      (1000L * 60 * 60 * 24 * this.keepSeqIdPeriod);
147
148    for (HStoreFile file : filesToCompact) {
149      if(allFiles && (file.getModificationTimestamp() < oldestHFileTimestampToKeepMVCC)) {
150        // when isAllFiles is true, all files are compacted so we can calculate the smallest
151        // MVCC value to keep
152        if(fd.minSeqIdToKeep < file.getMaxMemStoreTS()) {
153          fd.minSeqIdToKeep = file.getMaxMemStoreTS();
154        }
155      }
156      long seqNum = file.getMaxSequenceId();
157      fd.maxSeqId = Math.max(fd.maxSeqId, seqNum);
158      StoreFileReader r = file.getReader();
159      if (r == null) {
160        LOG.warn("Null reader for " + file.getPath());
161        continue;
162      }
163      // NOTE: use getEntries when compacting instead of getFilterEntries, otherwise under-sized
164      // blooms can cause progress to be miscalculated or if the user switches bloom
165      // type (e.g. from ROW to ROWCOL)
166      long keyCount = r.getEntries();
167      fd.maxKeyCount += keyCount;
168      // calculate the latest MVCC readpoint in any of the involved store files
169      Map<byte[], byte[]> fileInfo = r.loadFileInfo();
170      byte[] tmp = null;
171      // Get and set the real MVCCReadpoint for bulk loaded files, which is the
172      // SeqId number.
173      if (r.isBulkLoaded()) {
174        fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, r.getSequenceID());
175      }
176      else {
177        tmp = fileInfo.get(HFile.Writer.MAX_MEMSTORE_TS_KEY);
178        if (tmp != null) {
179          fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, Bytes.toLong(tmp));
180        }
181      }
182      tmp = fileInfo.get(FileInfo.MAX_TAGS_LEN);
183      if (tmp != null) {
184        fd.maxTagsLength = Math.max(fd.maxTagsLength, Bytes.toInt(tmp));
185      }
186      // If required, calculate the earliest put timestamp of all involved storefiles.
187      // This is used to remove family delete marker during compaction.
188      long earliestPutTs = 0;
189      if (allFiles) {
190        tmp = fileInfo.get(EARLIEST_PUT_TS);
191        if (tmp == null) {
192          // There's a file with no information, must be an old one
193          // assume we have very old puts
194          fd.earliestPutTs = earliestPutTs = HConstants.OLDEST_TIMESTAMP;
195        } else {
196          earliestPutTs = Bytes.toLong(tmp);
197          fd.earliestPutTs = Math.min(fd.earliestPutTs, earliestPutTs);
198        }
199      }
200      tmp = fileInfo.get(TIMERANGE_KEY);
201      fd.latestPutTs = tmp == null ? HConstants.LATEST_TIMESTAMP: TimeRangeTracker.parseFrom(tmp).getMax();
202      LOG.debug("Compacting {}, keycount={}, bloomtype={}, size={}, "
203              + "encoding={}, compression={}, seqNum={}{}",
204          (file.getPath() == null? null: file.getPath().getName()),
205          keyCount,
206          r.getBloomFilterType().toString(),
207          TraditionalBinaryPrefix.long2String(r.length(), "", 1),
208          r.getHFileReader().getDataBlockEncoding(),
209          compactionCompression,
210          seqNum,
211          (allFiles? ", earliestPutTs=" + earliestPutTs: ""));
212    }
213    return fd;
214  }
215
216  /**
217   * Creates file scanners for compaction.
218   * @param filesToCompact Files.
219   * @return Scanners.
220   */
221  private List<StoreFileScanner> createFileScanners(Collection<HStoreFile> filesToCompact,
222      long smallestReadPoint, boolean useDropBehind) throws IOException {
223    return StoreFileScanner.getScannersForCompaction(filesToCompact, useDropBehind,
224      smallestReadPoint);
225  }
226
227  private long getSmallestReadPoint() {
228    return store.getSmallestReadPoint();
229  }
230
231  protected interface InternalScannerFactory {
232
233    ScanType getScanType(CompactionRequestImpl request);
234
235    InternalScanner createScanner(ScanInfo scanInfo, List<StoreFileScanner> scanners, ScanType scanType,
236        FileDetails fd, long smallestReadPoint) throws IOException;
237  }
238
239  protected final InternalScannerFactory defaultScannerFactory = new InternalScannerFactory() {
240
241    @Override
242    public ScanType getScanType(CompactionRequestImpl request) {
243      return request.isAllFiles() ? COMPACT_DROP_DELETES : COMPACT_RETAIN_DELETES;
244    }
245
246    @Override
247    public InternalScanner createScanner(ScanInfo scanInfo, List<StoreFileScanner> scanners,
248        ScanType scanType, FileDetails fd, long smallestReadPoint) throws IOException {
249      return Compactor.this.createScanner(store, scanInfo, scanners, scanType, smallestReadPoint,
250        fd.earliestPutTs);
251    }
252  };
253
254  /**
255   * Creates a writer for a new file in a temporary directory.
256   * @param fd The file details.
257   * @return Writer for a new StoreFile in the tmp dir.
258   * @throws IOException if creation failed
259   */
260  protected final StoreFileWriter createTmpWriter(FileDetails fd, boolean shouldDropBehind)
261      throws IOException {
262    // When all MVCC readpoints are 0, don't write them.
263    // See HBASE-8166, HBASE-12600, and HBASE-13389.
264    return store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true,
265    fd.maxMVCCReadpoint > 0, fd.maxTagsLength > 0, shouldDropBehind);
266  }
267
268  private ScanInfo preCompactScannerOpen(CompactionRequestImpl request, ScanType scanType,
269      User user) throws IOException {
270    if (store.getCoprocessorHost() == null) {
271      return store.getScanInfo();
272    }
273    return store.getCoprocessorHost().preCompactScannerOpen(store, scanType, request.getTracker(),
274      request, user);
275  }
276
277  /**
278   * Calls coprocessor, if any, to create scanners - after normal scanner creation.
279   * @param request Compaction request.
280   * @param scanType Scan type.
281   * @param scanner The default scanner created for compaction.
282   * @return Scanner scanner to use (usually the default); null if compaction should not proceed.
283   */
284  private InternalScanner postCompactScannerOpen(CompactionRequestImpl request, ScanType scanType,
285      InternalScanner scanner, User user) throws IOException {
286    if (store.getCoprocessorHost() == null) {
287      return scanner;
288    }
289    return store.getCoprocessorHost().preCompact(store, scanner, scanType, request.getTracker(),
290      request, user);
291  }
292
293  protected final List<Path> compact(final CompactionRequestImpl request,
294      InternalScannerFactory scannerFactory, CellSinkFactory<T> sinkFactory,
295      ThroughputController throughputController, User user) throws IOException {
296    FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles());
297    this.progress = new CompactionProgress(fd.maxKeyCount);
298
299    // Find the smallest read point across all the Scanners.
300    long smallestReadPoint = getSmallestReadPoint();
301
302    T writer = null;
303    boolean dropCache;
304    if (request.isMajor() || request.isAllFiles()) {
305      dropCache = this.dropCacheMajor;
306    } else {
307      dropCache = this.dropCacheMinor;
308    }
309
310    List<StoreFileScanner> scanners =
311        createFileScanners(request.getFiles(), smallestReadPoint, dropCache);
312    InternalScanner scanner = null;
313    boolean finished = false;
314    try {
315      /* Include deletes, unless we are doing a major compaction */
316      ScanType scanType = scannerFactory.getScanType(request);
317      ScanInfo scanInfo = preCompactScannerOpen(request, scanType, user);
318      scanner = postCompactScannerOpen(request, scanType,
319        scannerFactory.createScanner(scanInfo, scanners, scanType, fd, smallestReadPoint), user);
320      boolean cleanSeqId = false;
321      if (fd.minSeqIdToKeep > 0 && !store.getColumnFamilyDescriptor().isNewVersionBehavior()) {
322        // For mvcc-sensitive family, we never set mvcc to 0.
323        smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);
324        cleanSeqId = true;
325      }
326      writer = sinkFactory.createWriter(scanner, fd, dropCache);
327      finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId,
328        throughputController, request.isAllFiles(), request.getFiles().size());
329      if (!finished) {
330        throw new InterruptedIOException("Aborting compaction of store " + store + " in region "
331            + store.getRegionInfo().getRegionNameAsString() + " because it was interrupted.");
332      }
333    } finally {
334      Closeables.close(scanner, true);
335      if (!finished && writer != null) {
336        abortWriter(writer);
337      }
338    }
339    assert finished : "We should have exited the method on all error paths";
340    assert writer != null : "Writer should be non-null if no error";
341    return commitWriter(writer, fd, request);
342  }
343
344  protected abstract List<Path> commitWriter(T writer, FileDetails fd,
345      CompactionRequestImpl request) throws IOException;
346
347  protected abstract void abortWriter(T writer) throws IOException;
348
349  /**
350   * Performs the compaction.
351   * @param fd FileDetails of cell sink writer
352   * @param scanner Where to read from.
353   * @param writer Where to write to.
354   * @param smallestReadPoint Smallest read point.
355   * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is &lt;=
356   *          smallestReadPoint
357   * @param major Is a major compaction.
358   * @param numofFilesToCompact the number of files to compact
359   * @return Whether compaction ended; false if it was interrupted for some reason.
360   */
361  protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
362      long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
363      boolean major, int numofFilesToCompact) throws IOException {
364    assert writer instanceof ShipperListener;
365    long bytesWrittenProgressForCloseCheck = 0;
366    long bytesWrittenProgressForLog = 0;
367    long bytesWrittenProgressForShippedCall = 0;
368    // Since scanner.next() can return 'false' but still be delivering data,
369    // we have to use a do/while loop.
370    List<Cell> cells = new ArrayList<>();
371    long closeCheckSizeLimit = HStore.getCloseCheckInterval();
372    long lastMillis = 0;
373    if (LOG.isDebugEnabled()) {
374      lastMillis = EnvironmentEdgeManager.currentTime();
375    }
376    String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction");
377    long now = 0;
378    boolean hasMore;
379    ScannerContext scannerContext =
380          ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
381
382    throughputController.start(compactionName);
383    KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null;
384    long shippedCallSizeLimit =
385          (long) numofFilesToCompact * this.store.getColumnFamilyDescriptor().getBlocksize();
386    try {
387      do {
388        hasMore = scanner.next(cells, scannerContext);
389        if (LOG.isDebugEnabled()) {
390          now = EnvironmentEdgeManager.currentTime();
391        }
392        // output to writer:
393        Cell lastCleanCell = null;
394        long lastCleanCellSeqId = 0;
395        for (Cell c : cells) {
396          if (cleanSeqId && c.getSequenceId() <= smallestReadPoint) {
397            lastCleanCell = c;
398            lastCleanCellSeqId = c.getSequenceId();
399            PrivateCellUtil.setSequenceId(c, 0);
400          } else {
401            lastCleanCell = null;
402            lastCleanCellSeqId = 0;
403          }
404          writer.append(c);
405          int len = KeyValueUtil.length(c);
406          ++progress.currentCompactedKVs;
407          progress.totalCompactedSize += len;
408          bytesWrittenProgressForShippedCall += len;
409          if (LOG.isDebugEnabled()) {
410            bytesWrittenProgressForLog += len;
411          }
412          throughputController.control(compactionName, len);
413          // check periodically to see if a system stop is requested
414          if (closeCheckSizeLimit > 0) {
415            bytesWrittenProgressForCloseCheck += len;
416            if (bytesWrittenProgressForCloseCheck > closeCheckSizeLimit) {
417              bytesWrittenProgressForCloseCheck = 0;
418              if (!store.areWritesEnabled()) {
419                progress.cancel();
420                return false;
421              }
422            }
423          }
424          if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) {
425            if (lastCleanCell != null) {
426              // HBASE-16931, set back sequence id to avoid affecting scan order unexpectedly.
427              // ShipperListener will do a clone of the last cells it refer, so need to set back
428              // sequence id before ShipperListener.beforeShipped
429              PrivateCellUtil.setSequenceId(lastCleanCell, lastCleanCellSeqId);
430            }
431            // Clone the cells that are in the writer so that they are freed of references,
432            // if they are holding any.
433            ((ShipperListener) writer).beforeShipped();
434            // The SHARED block references, being read for compaction, will be kept in prevBlocks
435            // list(See HFileScannerImpl#prevBlocks). In case of scan flow, after each set of cells
436            // being returned to client, we will call shipped() which can clear this list. Here by
437            // we are doing the similar thing. In between the compaction (after every N cells
438            // written with collective size of 'shippedCallSizeLimit') we will call shipped which
439            // may clear prevBlocks list.
440            kvs.shipped();
441            bytesWrittenProgressForShippedCall = 0;
442          }
443        }
444        if (lastCleanCell != null) {
445          // HBASE-16931, set back sequence id to avoid affecting scan order unexpectedly
446          PrivateCellUtil.setSequenceId(lastCleanCell, lastCleanCellSeqId);
447        }
448        // Log the progress of long running compactions every minute if
449        // logging at DEBUG level
450        if (LOG.isDebugEnabled()) {
451          if ((now - lastMillis) >= COMPACTION_PROGRESS_LOG_INTERVAL) {
452            String rate = String.format("%.2f",
453              (bytesWrittenProgressForLog / 1024.0) / ((now - lastMillis) / 1000.0));
454            LOG.debug("Compaction progress: {} {}, rate={} KB/sec, throughputController is {}",
455              compactionName, progress, rate, throughputController);
456            lastMillis = now;
457            bytesWrittenProgressForLog = 0;
458          }
459        }
460        cells.clear();
461      } while (hasMore);
462    } catch (InterruptedException e) {
463      progress.cancel();
464      throw new InterruptedIOException(
465            "Interrupted while control throughput of compacting " + compactionName);
466    } finally {
467      // Clone last cell in the final because writer will append last cell when committing. If
468      // don't clone here and once the scanner get closed, then the memory of last cell will be
469      // released. (HBASE-22582)
470      ((ShipperListener) writer).beforeShipped();
471      throughputController.finish(compactionName);
472    }
473    progress.complete();
474    return true;
475  }
476
477  /**
478   * @param store store
479   * @param scanners Store file scanners.
480   * @param scanType Scan type.
481   * @param smallestReadPoint Smallest MVCC read point.
482   * @param earliestPutTs Earliest put across all files.
483   * @return A compaction scanner.
484   */
485  protected InternalScanner createScanner(HStore store, ScanInfo scanInfo,
486      List<StoreFileScanner> scanners, ScanType scanType, long smallestReadPoint,
487      long earliestPutTs) throws IOException {
488    return new StoreScanner(store, scanInfo, scanners, scanType, smallestReadPoint, earliestPutTs);
489  }
490
491  /**
492   * @param store The store.
493   * @param scanners Store file scanners.
494   * @param smallestReadPoint Smallest MVCC read point.
495   * @param earliestPutTs Earliest put across all files.
496   * @param dropDeletesFromRow Drop deletes starting with this row, inclusive. Can be null.
497   * @param dropDeletesToRow Drop deletes ending with this row, exclusive. Can be null.
498   * @return A compaction scanner.
499   */
500  protected InternalScanner createScanner(HStore store, ScanInfo scanInfo,
501      List<StoreFileScanner> scanners, long smallestReadPoint, long earliestPutTs,
502      byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
503    return new StoreScanner(store, scanInfo, scanners, smallestReadPoint, earliestPutTs,
504        dropDeletesFromRow, dropDeletesToRow);
505  }
506}