001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.compactions; 019 020import static org.apache.hadoop.hbase.regionserver.HStoreFile.EARLIEST_PUT_TS; 021import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY; 022import static org.apache.hadoop.hbase.regionserver.ScanType.COMPACT_DROP_DELETES; 023import static org.apache.hadoop.hbase.regionserver.ScanType.COMPACT_RETAIN_DELETES; 024 025import java.io.IOException; 026import java.io.InterruptedIOException; 027import java.util.ArrayList; 028import java.util.Collection; 029import java.util.List; 030import java.util.Map; 031 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.hbase.Cell; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.PrivateCellUtil; 037import org.apache.hadoop.hbase.io.compress.Compression; 038import org.apache.hadoop.hbase.io.hfile.HFile; 039import org.apache.hadoop.hbase.io.hfile.HFileInfo; 040import org.apache.hadoop.hbase.regionserver.CellSink; 041import org.apache.hadoop.hbase.regionserver.HStore; 042import org.apache.hadoop.hbase.regionserver.HStoreFile; 043import org.apache.hadoop.hbase.regionserver.InternalScanner; 044import org.apache.hadoop.hbase.regionserver.KeyValueScanner; 045import org.apache.hadoop.hbase.regionserver.ScanInfo; 046import org.apache.hadoop.hbase.regionserver.ScanType; 047import org.apache.hadoop.hbase.regionserver.ScannerContext; 048import org.apache.hadoop.hbase.regionserver.ShipperListener; 049import org.apache.hadoop.hbase.regionserver.StoreFileReader; 050import org.apache.hadoop.hbase.regionserver.StoreFileScanner; 051import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 052import org.apache.hadoop.hbase.regionserver.StoreScanner; 053import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; 054import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil; 055import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 056import org.apache.hadoop.hbase.security.User; 057import org.apache.hadoop.hbase.util.Bytes; 058import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 059import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; 060import org.apache.yetus.audience.InterfaceAudience; 061import org.slf4j.Logger; 062import org.slf4j.LoggerFactory; 063import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 064 065/** 066 * A compactor is a compaction algorithm associated a given policy. Base class also contains 067 * reusable parts for implementing compactors (what is common and what isn't is evolving). 068 */ 069@InterfaceAudience.Private 070public abstract class Compactor<T extends CellSink> { 071 private static final Logger LOG = LoggerFactory.getLogger(Compactor.class); 072 protected static final long COMPACTION_PROGRESS_LOG_INTERVAL = 60 * 1000; 073 protected volatile CompactionProgress progress; 074 protected final Configuration conf; 075 protected final HStore store; 076 077 protected final int compactionKVMax; 078 protected final Compression.Algorithm compactionCompression; 079 080 /** specify how many days to keep MVCC values during major compaction **/ 081 protected int keepSeqIdPeriod; 082 083 // Configs that drive whether we drop page cache behind compactions 084 protected static final String MAJOR_COMPACTION_DROP_CACHE = 085 "hbase.regionserver.majorcompaction.pagecache.drop"; 086 protected static final String MINOR_COMPACTION_DROP_CACHE = 087 "hbase.regionserver.minorcompaction.pagecache.drop"; 088 089 private final boolean dropCacheMajor; 090 private final boolean dropCacheMinor; 091 092 //TODO: depending on Store is not good but, realistically, all compactors currently do. 093 Compactor(Configuration conf, HStore store) { 094 this.conf = conf; 095 this.store = store; 096 this.compactionKVMax = 097 this.conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT); 098 this.compactionCompression = (this.store.getColumnFamilyDescriptor() == null) ? 099 Compression.Algorithm.NONE : this.store.getColumnFamilyDescriptor().getCompactionCompressionType(); 100 this.keepSeqIdPeriod = Math.max(this.conf.getInt(HConstants.KEEP_SEQID_PERIOD, 101 HConstants.MIN_KEEP_SEQID_PERIOD), HConstants.MIN_KEEP_SEQID_PERIOD); 102 this.dropCacheMajor = conf.getBoolean(MAJOR_COMPACTION_DROP_CACHE, true); 103 this.dropCacheMinor = conf.getBoolean(MINOR_COMPACTION_DROP_CACHE, true); 104 } 105 106 107 108 protected interface CellSinkFactory<S> { 109 S createWriter(InternalScanner scanner, FileDetails fd, boolean shouldDropBehind) 110 throws IOException; 111 } 112 113 public CompactionProgress getProgress() { 114 return this.progress; 115 } 116 117 /** The sole reason this class exists is that java has no ref/out/pointer parameters. */ 118 protected static class FileDetails { 119 /** Maximum key count after compaction (for blooms) */ 120 public long maxKeyCount = 0; 121 /** Earliest put timestamp if major compaction */ 122 public long earliestPutTs = HConstants.LATEST_TIMESTAMP; 123 /** Latest put timestamp */ 124 public long latestPutTs = HConstants.LATEST_TIMESTAMP; 125 /** The last key in the files we're compacting. */ 126 public long maxSeqId = 0; 127 /** Latest memstore read point found in any of the involved files */ 128 public long maxMVCCReadpoint = 0; 129 /** Max tags length**/ 130 public int maxTagsLength = 0; 131 /** Min SeqId to keep during a major compaction **/ 132 public long minSeqIdToKeep = 0; 133 /** Total size of the compacted files **/ 134 private long totalCompactedFilesSize = 0; 135 } 136 137 /** 138 * Extracts some details about the files to compact that are commonly needed by compactors. 139 * @param filesToCompact Files. 140 * @param allFiles Whether all files are included for compaction 141 * @return The result. 142 */ 143 private FileDetails getFileDetails( 144 Collection<HStoreFile> filesToCompact, boolean allFiles) throws IOException { 145 FileDetails fd = new FileDetails(); 146 long oldestHFileTimestampToKeepMVCC = System.currentTimeMillis() - 147 (1000L * 60 * 60 * 24 * this.keepSeqIdPeriod); 148 149 for (HStoreFile file : filesToCompact) { 150 if(allFiles && (file.getModificationTimestamp() < oldestHFileTimestampToKeepMVCC)) { 151 // when isAllFiles is true, all files are compacted so we can calculate the smallest 152 // MVCC value to keep 153 if(fd.minSeqIdToKeep < file.getMaxMemStoreTS()) { 154 fd.minSeqIdToKeep = file.getMaxMemStoreTS(); 155 } 156 } 157 long seqNum = file.getMaxSequenceId(); 158 fd.maxSeqId = Math.max(fd.maxSeqId, seqNum); 159 StoreFileReader r = file.getReader(); 160 if (r == null) { 161 LOG.warn("Null reader for " + file.getPath()); 162 continue; 163 } 164 // NOTE: use getEntries when compacting instead of getFilterEntries, otherwise under-sized 165 // blooms can cause progress to be miscalculated or if the user switches bloom 166 // type (e.g. from ROW to ROWCOL) 167 long keyCount = r.getEntries(); 168 fd.maxKeyCount += keyCount; 169 // calculate the latest MVCC readpoint in any of the involved store files 170 Map<byte[], byte[]> fileInfo = r.loadFileInfo(); 171 172 // calculate the total size of the compacted files 173 fd.totalCompactedFilesSize += r.length(); 174 175 byte[] tmp = null; 176 // Get and set the real MVCCReadpoint for bulk loaded files, which is the 177 // SeqId number. 178 if (r.isBulkLoaded()) { 179 fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, r.getSequenceID()); 180 } 181 else { 182 tmp = fileInfo.get(HFile.Writer.MAX_MEMSTORE_TS_KEY); 183 if (tmp != null) { 184 fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, Bytes.toLong(tmp)); 185 } 186 } 187 tmp = fileInfo.get(HFileInfo.MAX_TAGS_LEN); 188 if (tmp != null) { 189 fd.maxTagsLength = Math.max(fd.maxTagsLength, Bytes.toInt(tmp)); 190 } 191 // If required, calculate the earliest put timestamp of all involved storefiles. 192 // This is used to remove family delete marker during compaction. 193 long earliestPutTs = 0; 194 if (allFiles) { 195 tmp = fileInfo.get(EARLIEST_PUT_TS); 196 if (tmp == null) { 197 // There's a file with no information, must be an old one 198 // assume we have very old puts 199 fd.earliestPutTs = earliestPutTs = HConstants.OLDEST_TIMESTAMP; 200 } else { 201 earliestPutTs = Bytes.toLong(tmp); 202 fd.earliestPutTs = Math.min(fd.earliestPutTs, earliestPutTs); 203 } 204 } 205 tmp = fileInfo.get(TIMERANGE_KEY); 206 fd.latestPutTs = tmp == null ? HConstants.LATEST_TIMESTAMP: TimeRangeTracker.parseFrom(tmp).getMax(); 207 LOG.debug("Compacting {}, keycount={}, bloomtype={}, size={}, " 208 + "encoding={}, compression={}, seqNum={}{}", 209 (file.getPath() == null? null: file.getPath().getName()), 210 keyCount, 211 r.getBloomFilterType().toString(), 212 TraditionalBinaryPrefix.long2String(r.length(), "", 1), 213 r.getHFileReader().getDataBlockEncoding(), 214 compactionCompression, 215 seqNum, 216 (allFiles? ", earliestPutTs=" + earliestPutTs: "")); 217 } 218 return fd; 219 } 220 221 /** 222 * Creates file scanners for compaction. 223 * @param filesToCompact Files. 224 * @return Scanners. 225 */ 226 private List<StoreFileScanner> createFileScanners(Collection<HStoreFile> filesToCompact, 227 long smallestReadPoint, boolean useDropBehind) throws IOException { 228 return StoreFileScanner.getScannersForCompaction(filesToCompact, useDropBehind, 229 smallestReadPoint); 230 } 231 232 private long getSmallestReadPoint() { 233 return store.getSmallestReadPoint(); 234 } 235 236 protected interface InternalScannerFactory { 237 238 ScanType getScanType(CompactionRequestImpl request); 239 240 InternalScanner createScanner(ScanInfo scanInfo, List<StoreFileScanner> scanners, ScanType scanType, 241 FileDetails fd, long smallestReadPoint) throws IOException; 242 } 243 244 protected final InternalScannerFactory defaultScannerFactory = new InternalScannerFactory() { 245 246 @Override 247 public ScanType getScanType(CompactionRequestImpl request) { 248 return request.isAllFiles() ? COMPACT_DROP_DELETES : COMPACT_RETAIN_DELETES; 249 } 250 251 @Override 252 public InternalScanner createScanner(ScanInfo scanInfo, List<StoreFileScanner> scanners, 253 ScanType scanType, FileDetails fd, long smallestReadPoint) throws IOException { 254 return Compactor.this.createScanner(store, scanInfo, scanners, scanType, smallestReadPoint, 255 fd.earliestPutTs); 256 } 257 }; 258 259 /** 260 * Creates a writer for a new file in a temporary directory. 261 * @param fd The file details. 262 * @return Writer for a new StoreFile in the tmp dir. 263 * @throws IOException if creation failed 264 */ 265 protected final StoreFileWriter createTmpWriter(FileDetails fd, boolean shouldDropBehind) 266 throws IOException { 267 // When all MVCC readpoints are 0, don't write them. 268 // See HBASE-8166, HBASE-12600, and HBASE-13389. 269 return store 270 .createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true, fd.maxMVCCReadpoint > 0, 271 fd.maxTagsLength > 0, shouldDropBehind, fd.totalCompactedFilesSize); 272 } 273 274 private ScanInfo preCompactScannerOpen(CompactionRequestImpl request, ScanType scanType, 275 User user) throws IOException { 276 if (store.getCoprocessorHost() == null) { 277 return store.getScanInfo(); 278 } 279 return store.getCoprocessorHost().preCompactScannerOpen(store, scanType, request.getTracker(), 280 request, user); 281 } 282 283 /** 284 * Calls coprocessor, if any, to create scanners - after normal scanner creation. 285 * @param request Compaction request. 286 * @param scanType Scan type. 287 * @param scanner The default scanner created for compaction. 288 * @return Scanner scanner to use (usually the default); null if compaction should not proceed. 289 */ 290 private InternalScanner postCompactScannerOpen(CompactionRequestImpl request, ScanType scanType, 291 InternalScanner scanner, User user) throws IOException { 292 if (store.getCoprocessorHost() == null) { 293 return scanner; 294 } 295 return store.getCoprocessorHost().preCompact(store, scanner, scanType, request.getTracker(), 296 request, user); 297 } 298 299 protected final List<Path> compact(final CompactionRequestImpl request, 300 InternalScannerFactory scannerFactory, CellSinkFactory<T> sinkFactory, 301 ThroughputController throughputController, User user) throws IOException { 302 FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles()); 303 this.progress = new CompactionProgress(fd.maxKeyCount); 304 305 // Find the smallest read point across all the Scanners. 306 long smallestReadPoint = getSmallestReadPoint(); 307 308 T writer = null; 309 boolean dropCache; 310 if (request.isMajor() || request.isAllFiles()) { 311 dropCache = this.dropCacheMajor; 312 } else { 313 dropCache = this.dropCacheMinor; 314 } 315 316 List<StoreFileScanner> scanners = 317 createFileScanners(request.getFiles(), smallestReadPoint, dropCache); 318 InternalScanner scanner = null; 319 boolean finished = false; 320 try { 321 /* Include deletes, unless we are doing a major compaction */ 322 ScanType scanType = scannerFactory.getScanType(request); 323 ScanInfo scanInfo = preCompactScannerOpen(request, scanType, user); 324 scanner = postCompactScannerOpen(request, scanType, 325 scannerFactory.createScanner(scanInfo, scanners, scanType, fd, smallestReadPoint), user); 326 boolean cleanSeqId = false; 327 if (fd.minSeqIdToKeep > 0 && !store.getColumnFamilyDescriptor().isNewVersionBehavior()) { 328 // For mvcc-sensitive family, we never set mvcc to 0. 329 smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint); 330 cleanSeqId = true; 331 } 332 writer = sinkFactory.createWriter(scanner, fd, dropCache); 333 finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId, 334 throughputController, request.isAllFiles(), request.getFiles().size()); 335 if (!finished) { 336 throw new InterruptedIOException("Aborting compaction of store " + store + " in region " 337 + store.getRegionInfo().getRegionNameAsString() + " because it was interrupted."); 338 } 339 } finally { 340 Closeables.close(scanner, true); 341 if (!finished && writer != null) { 342 abortWriter(writer); 343 } 344 } 345 assert finished : "We should have exited the method on all error paths"; 346 assert writer != null : "Writer should be non-null if no error"; 347 return commitWriter(writer, fd, request); 348 } 349 350 protected abstract List<Path> commitWriter(T writer, FileDetails fd, 351 CompactionRequestImpl request) throws IOException; 352 353 protected abstract void abortWriter(T writer) throws IOException; 354 355 /** 356 * Performs the compaction. 357 * @param fd FileDetails of cell sink writer 358 * @param scanner Where to read from. 359 * @param writer Where to write to. 360 * @param smallestReadPoint Smallest read point. 361 * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= 362 * smallestReadPoint 363 * @param major Is a major compaction. 364 * @param numofFilesToCompact the number of files to compact 365 * @return Whether compaction ended; false if it was interrupted for some reason. 366 */ 367 protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, 368 long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController, 369 boolean major, int numofFilesToCompact) throws IOException { 370 assert writer instanceof ShipperListener; 371 long bytesWrittenProgressForCloseCheck = 0; 372 long bytesWrittenProgressForLog = 0; 373 long bytesWrittenProgressForShippedCall = 0; 374 // Since scanner.next() can return 'false' but still be delivering data, 375 // we have to use a do/while loop. 376 List<Cell> cells = new ArrayList<>(); 377 long closeCheckSizeLimit = HStore.getCloseCheckInterval(); 378 long lastMillis = 0; 379 if (LOG.isDebugEnabled()) { 380 lastMillis = EnvironmentEdgeManager.currentTime(); 381 } 382 String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction"); 383 long now = 0; 384 boolean hasMore; 385 ScannerContext scannerContext = 386 ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); 387 388 throughputController.start(compactionName); 389 KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null; 390 long shippedCallSizeLimit = 391 (long) numofFilesToCompact * this.store.getColumnFamilyDescriptor().getBlocksize(); 392 try { 393 do { 394 hasMore = scanner.next(cells, scannerContext); 395 if (LOG.isDebugEnabled()) { 396 now = EnvironmentEdgeManager.currentTime(); 397 } 398 // output to writer: 399 Cell lastCleanCell = null; 400 long lastCleanCellSeqId = 0; 401 for (Cell c : cells) { 402 if (cleanSeqId && c.getSequenceId() <= smallestReadPoint) { 403 lastCleanCell = c; 404 lastCleanCellSeqId = c.getSequenceId(); 405 PrivateCellUtil.setSequenceId(c, 0); 406 } else { 407 lastCleanCell = null; 408 lastCleanCellSeqId = 0; 409 } 410 writer.append(c); 411 int len = c.getSerializedSize(); 412 ++progress.currentCompactedKVs; 413 progress.totalCompactedSize += len; 414 bytesWrittenProgressForShippedCall += len; 415 if (LOG.isDebugEnabled()) { 416 bytesWrittenProgressForLog += len; 417 } 418 throughputController.control(compactionName, len); 419 // check periodically to see if a system stop is requested 420 if (closeCheckSizeLimit > 0) { 421 bytesWrittenProgressForCloseCheck += len; 422 if (bytesWrittenProgressForCloseCheck > closeCheckSizeLimit) { 423 bytesWrittenProgressForCloseCheck = 0; 424 if (!store.areWritesEnabled()) { 425 progress.cancel(); 426 return false; 427 } 428 } 429 } 430 if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) { 431 if (lastCleanCell != null) { 432 // HBASE-16931, set back sequence id to avoid affecting scan order unexpectedly. 433 // ShipperListener will do a clone of the last cells it refer, so need to set back 434 // sequence id before ShipperListener.beforeShipped 435 PrivateCellUtil.setSequenceId(lastCleanCell, lastCleanCellSeqId); 436 } 437 // Clone the cells that are in the writer so that they are freed of references, 438 // if they are holding any. 439 ((ShipperListener) writer).beforeShipped(); 440 // The SHARED block references, being read for compaction, will be kept in prevBlocks 441 // list(See HFileScannerImpl#prevBlocks). In case of scan flow, after each set of cells 442 // being returned to client, we will call shipped() which can clear this list. Here by 443 // we are doing the similar thing. In between the compaction (after every N cells 444 // written with collective size of 'shippedCallSizeLimit') we will call shipped which 445 // may clear prevBlocks list. 446 kvs.shipped(); 447 bytesWrittenProgressForShippedCall = 0; 448 } 449 } 450 if (lastCleanCell != null) { 451 // HBASE-16931, set back sequence id to avoid affecting scan order unexpectedly 452 PrivateCellUtil.setSequenceId(lastCleanCell, lastCleanCellSeqId); 453 } 454 // Log the progress of long running compactions every minute if 455 // logging at DEBUG level 456 if (LOG.isDebugEnabled()) { 457 if ((now - lastMillis) >= COMPACTION_PROGRESS_LOG_INTERVAL) { 458 String rate = String.format("%.2f", 459 (bytesWrittenProgressForLog / 1024.0) / ((now - lastMillis) / 1000.0)); 460 LOG.debug("Compaction progress: {} {}, rate={} KB/sec, throughputController is {}", 461 compactionName, progress, rate, throughputController); 462 lastMillis = now; 463 bytesWrittenProgressForLog = 0; 464 } 465 } 466 cells.clear(); 467 } while (hasMore); 468 } catch (InterruptedException e) { 469 progress.cancel(); 470 throw new InterruptedIOException( 471 "Interrupted while control throughput of compacting " + compactionName); 472 } finally { 473 // Clone last cell in the final because writer will append last cell when committing. If 474 // don't clone here and once the scanner get closed, then the memory of last cell will be 475 // released. (HBASE-22582) 476 ((ShipperListener) writer).beforeShipped(); 477 throughputController.finish(compactionName); 478 } 479 progress.complete(); 480 return true; 481 } 482 483 /** 484 * @param store store 485 * @param scanners Store file scanners. 486 * @param scanType Scan type. 487 * @param smallestReadPoint Smallest MVCC read point. 488 * @param earliestPutTs Earliest put across all files. 489 * @return A compaction scanner. 490 */ 491 protected InternalScanner createScanner(HStore store, ScanInfo scanInfo, 492 List<StoreFileScanner> scanners, ScanType scanType, long smallestReadPoint, 493 long earliestPutTs) throws IOException { 494 return new StoreScanner(store, scanInfo, scanners, scanType, smallestReadPoint, earliestPutTs); 495 } 496 497 /** 498 * @param store The store. 499 * @param scanners Store file scanners. 500 * @param smallestReadPoint Smallest MVCC read point. 501 * @param earliestPutTs Earliest put across all files. 502 * @param dropDeletesFromRow Drop deletes starting with this row, inclusive. Can be null. 503 * @param dropDeletesToRow Drop deletes ending with this row, exclusive. Can be null. 504 * @return A compaction scanner. 505 */ 506 protected InternalScanner createScanner(HStore store, ScanInfo scanInfo, 507 List<StoreFileScanner> scanners, long smallestReadPoint, long earliestPutTs, 508 byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException { 509 return new StoreScanner(store, scanInfo, scanners, smallestReadPoint, earliestPutTs, 510 dropDeletesFromRow, dropDeletesToRow); 511 } 512}