001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.compactions; 019 020import static org.apache.hadoop.hbase.regionserver.HStoreFile.EARLIEST_PUT_TS; 021import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY; 022import static org.apache.hadoop.hbase.regionserver.ScanType.COMPACT_DROP_DELETES; 023import static org.apache.hadoop.hbase.regionserver.ScanType.COMPACT_RETAIN_DELETES; 024 025import java.io.IOException; 026import java.io.InterruptedIOException; 027import java.util.ArrayList; 028import java.util.Collection; 029import java.util.List; 030import java.util.Map; 031 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.hbase.Cell; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.PrivateCellUtil; 037import org.apache.hadoop.hbase.io.compress.Compression; 038import org.apache.hadoop.hbase.io.hfile.HFile; 039import org.apache.hadoop.hbase.io.hfile.HFileInfo; 040import org.apache.hadoop.hbase.regionserver.CellSink; 041import org.apache.hadoop.hbase.regionserver.HStore; 042import org.apache.hadoop.hbase.regionserver.HStoreFile; 043import org.apache.hadoop.hbase.regionserver.InternalScanner; 044import org.apache.hadoop.hbase.regionserver.KeyValueScanner; 045import org.apache.hadoop.hbase.regionserver.ScanInfo; 046import org.apache.hadoop.hbase.regionserver.ScanType; 047import org.apache.hadoop.hbase.regionserver.ScannerContext; 048import org.apache.hadoop.hbase.regionserver.ShipperListener; 049import org.apache.hadoop.hbase.regionserver.StoreFileReader; 050import org.apache.hadoop.hbase.regionserver.StoreFileScanner; 051import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 052import org.apache.hadoop.hbase.regionserver.StoreScanner; 053import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; 054import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil; 055import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 056import org.apache.hadoop.hbase.security.User; 057import org.apache.hadoop.hbase.util.Bytes; 058import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 059import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; 060import org.apache.yetus.audience.InterfaceAudience; 061import org.slf4j.Logger; 062import org.slf4j.LoggerFactory; 063import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 064 065/** 066 * A compactor is a compaction algorithm associated a given policy. Base class also contains 067 * reusable parts for implementing compactors (what is common and what isn't is evolving). 068 */ 069@InterfaceAudience.Private 070public abstract class Compactor<T extends CellSink> { 071 private static final Logger LOG = LoggerFactory.getLogger(Compactor.class); 072 protected static final long COMPACTION_PROGRESS_LOG_INTERVAL = 60 * 1000; 073 protected volatile CompactionProgress progress; 074 protected final Configuration conf; 075 protected final HStore store; 076 077 protected final int compactionKVMax; 078 protected final Compression.Algorithm compactionCompression; 079 080 /** specify how many days to keep MVCC values during major compaction **/ 081 protected int keepSeqIdPeriod; 082 083 // Configs that drive whether we drop page cache behind compactions 084 protected static final String MAJOR_COMPACTION_DROP_CACHE = 085 "hbase.regionserver.majorcompaction.pagecache.drop"; 086 protected static final String MINOR_COMPACTION_DROP_CACHE = 087 "hbase.regionserver.minorcompaction.pagecache.drop"; 088 089 private final boolean dropCacheMajor; 090 private final boolean dropCacheMinor; 091 092 //TODO: depending on Store is not good but, realistically, all compactors currently do. 093 Compactor(Configuration conf, HStore store) { 094 this.conf = conf; 095 this.store = store; 096 this.compactionKVMax = 097 this.conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT); 098 this.compactionCompression = (this.store.getColumnFamilyDescriptor() == null) ? 099 Compression.Algorithm.NONE : this.store.getColumnFamilyDescriptor().getCompactionCompressionType(); 100 this.keepSeqIdPeriod = Math.max(this.conf.getInt(HConstants.KEEP_SEQID_PERIOD, 101 HConstants.MIN_KEEP_SEQID_PERIOD), HConstants.MIN_KEEP_SEQID_PERIOD); 102 this.dropCacheMajor = conf.getBoolean(MAJOR_COMPACTION_DROP_CACHE, true); 103 this.dropCacheMinor = conf.getBoolean(MINOR_COMPACTION_DROP_CACHE, true); 104 } 105 106 107 108 protected interface CellSinkFactory<S> { 109 S createWriter(InternalScanner scanner, FileDetails fd, boolean shouldDropBehind) 110 throws IOException; 111 } 112 113 public CompactionProgress getProgress() { 114 return this.progress; 115 } 116 117 /** The sole reason this class exists is that java has no ref/out/pointer parameters. */ 118 protected static class FileDetails { 119 /** Maximum key count after compaction (for blooms) */ 120 public long maxKeyCount = 0; 121 /** Earliest put timestamp if major compaction */ 122 public long earliestPutTs = HConstants.LATEST_TIMESTAMP; 123 /** Latest put timestamp */ 124 public long latestPutTs = HConstants.LATEST_TIMESTAMP; 125 /** The last key in the files we're compacting. */ 126 public long maxSeqId = 0; 127 /** Latest memstore read point found in any of the involved files */ 128 public long maxMVCCReadpoint = 0; 129 /** Max tags length**/ 130 public int maxTagsLength = 0; 131 /** Min SeqId to keep during a major compaction **/ 132 public long minSeqIdToKeep = 0; 133 /** Total size of the compacted files **/ 134 private long totalCompactedFilesSize = 0; 135 } 136 137 /** 138 * Extracts some details about the files to compact that are commonly needed by compactors. 139 * @param filesToCompact Files. 140 * @param allFiles Whether all files are included for compaction 141 * @return The result. 142 */ 143 private FileDetails getFileDetails( 144 Collection<HStoreFile> filesToCompact, boolean allFiles) throws IOException { 145 FileDetails fd = new FileDetails(); 146 long oldestHFileTimestampToKeepMVCC = System.currentTimeMillis() - 147 (1000L * 60 * 60 * 24 * this.keepSeqIdPeriod); 148 149 for (HStoreFile file : filesToCompact) { 150 if(allFiles && (file.getModificationTimestamp() < oldestHFileTimestampToKeepMVCC)) { 151 // when isAllFiles is true, all files are compacted so we can calculate the smallest 152 // MVCC value to keep 153 if(fd.minSeqIdToKeep < file.getMaxMemStoreTS()) { 154 fd.minSeqIdToKeep = file.getMaxMemStoreTS(); 155 } 156 } 157 long seqNum = file.getMaxSequenceId(); 158 fd.maxSeqId = Math.max(fd.maxSeqId, seqNum); 159 StoreFileReader r = file.getReader(); 160 if (r == null) { 161 LOG.warn("Null reader for " + file.getPath()); 162 continue; 163 } 164 // NOTE: use getEntries when compacting instead of getFilterEntries, otherwise under-sized 165 // blooms can cause progress to be miscalculated or if the user switches bloom 166 // type (e.g. from ROW to ROWCOL) 167 long keyCount = r.getEntries(); 168 fd.maxKeyCount += keyCount; 169 // calculate the latest MVCC readpoint in any of the involved store files 170 Map<byte[], byte[]> fileInfo = r.loadFileInfo(); 171 172 // calculate the total size of the compacted files 173 fd.totalCompactedFilesSize += r.length(); 174 175 byte[] tmp = null; 176 // Get and set the real MVCCReadpoint for bulk loaded files, which is the 177 // SeqId number. 178 if (r.isBulkLoaded()) { 179 fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, r.getSequenceID()); 180 } 181 else { 182 tmp = fileInfo.get(HFile.Writer.MAX_MEMSTORE_TS_KEY); 183 if (tmp != null) { 184 fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, Bytes.toLong(tmp)); 185 } 186 } 187 tmp = fileInfo.get(HFileInfo.MAX_TAGS_LEN); 188 if (tmp != null) { 189 fd.maxTagsLength = Math.max(fd.maxTagsLength, Bytes.toInt(tmp)); 190 } 191 // If required, calculate the earliest put timestamp of all involved storefiles. 192 // This is used to remove family delete marker during compaction. 193 long earliestPutTs = 0; 194 if (allFiles) { 195 tmp = fileInfo.get(EARLIEST_PUT_TS); 196 if (tmp == null) { 197 // There's a file with no information, must be an old one 198 // assume we have very old puts 199 fd.earliestPutTs = earliestPutTs = HConstants.OLDEST_TIMESTAMP; 200 } else { 201 earliestPutTs = Bytes.toLong(tmp); 202 fd.earliestPutTs = Math.min(fd.earliestPutTs, earliestPutTs); 203 } 204 } 205 tmp = fileInfo.get(TIMERANGE_KEY); 206 fd.latestPutTs = tmp == null ? HConstants.LATEST_TIMESTAMP: TimeRangeTracker.parseFrom(tmp).getMax(); 207 LOG.debug("Compacting {}, keycount={}, bloomtype={}, size={}, " 208 + "encoding={}, compression={}, seqNum={}{}", 209 (file.getPath() == null? null: file.getPath().getName()), 210 keyCount, 211 r.getBloomFilterType().toString(), 212 TraditionalBinaryPrefix.long2String(r.length(), "", 1), 213 r.getHFileReader().getDataBlockEncoding(), 214 compactionCompression, 215 seqNum, 216 (allFiles? ", earliestPutTs=" + earliestPutTs: "")); 217 } 218 return fd; 219 } 220 221 /** 222 * Creates file scanners for compaction. 223 * @param filesToCompact Files. 224 * @return Scanners. 225 */ 226 private List<StoreFileScanner> createFileScanners(Collection<HStoreFile> filesToCompact, 227 long smallestReadPoint, boolean useDropBehind) throws IOException { 228 return StoreFileScanner.getScannersForCompaction(filesToCompact, useDropBehind, 229 smallestReadPoint); 230 } 231 232 private long getSmallestReadPoint() { 233 return store.getSmallestReadPoint(); 234 } 235 236 protected interface InternalScannerFactory { 237 238 ScanType getScanType(CompactionRequestImpl request); 239 240 InternalScanner createScanner(ScanInfo scanInfo, List<StoreFileScanner> scanners, ScanType scanType, 241 FileDetails fd, long smallestReadPoint) throws IOException; 242 } 243 244 protected final InternalScannerFactory defaultScannerFactory = new InternalScannerFactory() { 245 246 @Override 247 public ScanType getScanType(CompactionRequestImpl request) { 248 return request.isAllFiles() ? COMPACT_DROP_DELETES : COMPACT_RETAIN_DELETES; 249 } 250 251 @Override 252 public InternalScanner createScanner(ScanInfo scanInfo, List<StoreFileScanner> scanners, 253 ScanType scanType, FileDetails fd, long smallestReadPoint) throws IOException { 254 return Compactor.this.createScanner(store, scanInfo, scanners, scanType, smallestReadPoint, 255 fd.earliestPutTs); 256 } 257 }; 258 259 /** 260 * Creates a writer for a new file in a temporary directory. 261 * @param fd The file details. 262 * @return Writer for a new StoreFile in the tmp dir. 263 * @throws IOException if creation failed 264 */ 265 protected final StoreFileWriter createTmpWriter(FileDetails fd, boolean shouldDropBehind) 266 throws IOException { 267 // When all MVCC readpoints are 0, don't write them. 268 // See HBASE-8166, HBASE-12600, and HBASE-13389. 269 return store 270 .createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true, fd.maxMVCCReadpoint > 0, 271 fd.maxTagsLength > 0, shouldDropBehind, fd.totalCompactedFilesSize, 272 HConstants.EMPTY_STRING); 273 } 274 275 protected final StoreFileWriter createTmpWriter(FileDetails fd, boolean shouldDropBehind, 276 String fileStoragePolicy) throws IOException { 277 return store 278 .createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true, fd.maxMVCCReadpoint > 0, 279 fd.maxTagsLength > 0, shouldDropBehind, fd.totalCompactedFilesSize, fileStoragePolicy); 280 } 281 282 private ScanInfo preCompactScannerOpen(CompactionRequestImpl request, ScanType scanType, 283 User user) throws IOException { 284 if (store.getCoprocessorHost() == null) { 285 return store.getScanInfo(); 286 } 287 return store.getCoprocessorHost().preCompactScannerOpen(store, scanType, request.getTracker(), 288 request, user); 289 } 290 291 /** 292 * Calls coprocessor, if any, to create scanners - after normal scanner creation. 293 * @param request Compaction request. 294 * @param scanType Scan type. 295 * @param scanner The default scanner created for compaction. 296 * @return Scanner scanner to use (usually the default); null if compaction should not proceed. 297 */ 298 private InternalScanner postCompactScannerOpen(CompactionRequestImpl request, ScanType scanType, 299 InternalScanner scanner, User user) throws IOException { 300 if (store.getCoprocessorHost() == null) { 301 return scanner; 302 } 303 return store.getCoprocessorHost().preCompact(store, scanner, scanType, request.getTracker(), 304 request, user); 305 } 306 307 protected final List<Path> compact(final CompactionRequestImpl request, 308 InternalScannerFactory scannerFactory, CellSinkFactory<T> sinkFactory, 309 ThroughputController throughputController, User user) throws IOException { 310 FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles()); 311 this.progress = new CompactionProgress(fd.maxKeyCount); 312 313 // Find the smallest read point across all the Scanners. 314 long smallestReadPoint = getSmallestReadPoint(); 315 316 T writer = null; 317 boolean dropCache; 318 if (request.isMajor() || request.isAllFiles()) { 319 dropCache = this.dropCacheMajor; 320 } else { 321 dropCache = this.dropCacheMinor; 322 } 323 324 List<StoreFileScanner> scanners = 325 createFileScanners(request.getFiles(), smallestReadPoint, dropCache); 326 InternalScanner scanner = null; 327 boolean finished = false; 328 try { 329 /* Include deletes, unless we are doing a major compaction */ 330 ScanType scanType = scannerFactory.getScanType(request); 331 ScanInfo scanInfo = preCompactScannerOpen(request, scanType, user); 332 scanner = postCompactScannerOpen(request, scanType, 333 scannerFactory.createScanner(scanInfo, scanners, scanType, fd, smallestReadPoint), user); 334 boolean cleanSeqId = false; 335 if (fd.minSeqIdToKeep > 0 && !store.getColumnFamilyDescriptor().isNewVersionBehavior()) { 336 // For mvcc-sensitive family, we never set mvcc to 0. 337 smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint); 338 cleanSeqId = true; 339 } 340 writer = sinkFactory.createWriter(scanner, fd, dropCache); 341 finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId, 342 throughputController, request.isAllFiles(), request.getFiles().size()); 343 if (!finished) { 344 throw new InterruptedIOException("Aborting compaction of store " + store + " in region " 345 + store.getRegionInfo().getRegionNameAsString() + " because it was interrupted."); 346 } 347 } finally { 348 Closeables.close(scanner, true); 349 if (!finished && writer != null) { 350 abortWriter(writer); 351 } 352 } 353 assert finished : "We should have exited the method on all error paths"; 354 assert writer != null : "Writer should be non-null if no error"; 355 return commitWriter(writer, fd, request); 356 } 357 358 protected abstract List<Path> commitWriter(T writer, FileDetails fd, 359 CompactionRequestImpl request) throws IOException; 360 361 protected abstract void abortWriter(T writer) throws IOException; 362 363 /** 364 * Performs the compaction. 365 * @param fd FileDetails of cell sink writer 366 * @param scanner Where to read from. 367 * @param writer Where to write to. 368 * @param smallestReadPoint Smallest read point. 369 * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= 370 * smallestReadPoint 371 * @param major Is a major compaction. 372 * @param numofFilesToCompact the number of files to compact 373 * @return Whether compaction ended; false if it was interrupted for some reason. 374 */ 375 protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, 376 long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController, 377 boolean major, int numofFilesToCompact) throws IOException { 378 assert writer instanceof ShipperListener; 379 long bytesWrittenProgressForCloseCheck = 0; 380 long bytesWrittenProgressForLog = 0; 381 long bytesWrittenProgressForShippedCall = 0; 382 // Since scanner.next() can return 'false' but still be delivering data, 383 // we have to use a do/while loop. 384 List<Cell> cells = new ArrayList<>(); 385 long closeCheckSizeLimit = HStore.getCloseCheckInterval(); 386 long lastMillis = 0; 387 if (LOG.isDebugEnabled()) { 388 lastMillis = EnvironmentEdgeManager.currentTime(); 389 } 390 String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction"); 391 long now = 0; 392 boolean hasMore; 393 ScannerContext scannerContext = 394 ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); 395 396 throughputController.start(compactionName); 397 KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null; 398 long shippedCallSizeLimit = 399 (long) numofFilesToCompact * this.store.getColumnFamilyDescriptor().getBlocksize(); 400 try { 401 do { 402 hasMore = scanner.next(cells, scannerContext); 403 if (LOG.isDebugEnabled()) { 404 now = EnvironmentEdgeManager.currentTime(); 405 } 406 // output to writer: 407 Cell lastCleanCell = null; 408 long lastCleanCellSeqId = 0; 409 for (Cell c : cells) { 410 if (cleanSeqId && c.getSequenceId() <= smallestReadPoint) { 411 lastCleanCell = c; 412 lastCleanCellSeqId = c.getSequenceId(); 413 PrivateCellUtil.setSequenceId(c, 0); 414 } else { 415 lastCleanCell = null; 416 lastCleanCellSeqId = 0; 417 } 418 writer.append(c); 419 int len = c.getSerializedSize(); 420 ++progress.currentCompactedKVs; 421 progress.totalCompactedSize += len; 422 bytesWrittenProgressForShippedCall += len; 423 if (LOG.isDebugEnabled()) { 424 bytesWrittenProgressForLog += len; 425 } 426 throughputController.control(compactionName, len); 427 // check periodically to see if a system stop is requested 428 if (closeCheckSizeLimit > 0) { 429 bytesWrittenProgressForCloseCheck += len; 430 if (bytesWrittenProgressForCloseCheck > closeCheckSizeLimit) { 431 bytesWrittenProgressForCloseCheck = 0; 432 if (!store.areWritesEnabled()) { 433 progress.cancel(); 434 return false; 435 } 436 } 437 } 438 if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) { 439 if (lastCleanCell != null) { 440 // HBASE-16931, set back sequence id to avoid affecting scan order unexpectedly. 441 // ShipperListener will do a clone of the last cells it refer, so need to set back 442 // sequence id before ShipperListener.beforeShipped 443 PrivateCellUtil.setSequenceId(lastCleanCell, lastCleanCellSeqId); 444 } 445 // Clone the cells that are in the writer so that they are freed of references, 446 // if they are holding any. 447 ((ShipperListener) writer).beforeShipped(); 448 // The SHARED block references, being read for compaction, will be kept in prevBlocks 449 // list(See HFileScannerImpl#prevBlocks). In case of scan flow, after each set of cells 450 // being returned to client, we will call shipped() which can clear this list. Here by 451 // we are doing the similar thing. In between the compaction (after every N cells 452 // written with collective size of 'shippedCallSizeLimit') we will call shipped which 453 // may clear prevBlocks list. 454 kvs.shipped(); 455 bytesWrittenProgressForShippedCall = 0; 456 } 457 } 458 if (lastCleanCell != null) { 459 // HBASE-16931, set back sequence id to avoid affecting scan order unexpectedly 460 PrivateCellUtil.setSequenceId(lastCleanCell, lastCleanCellSeqId); 461 } 462 // Log the progress of long running compactions every minute if 463 // logging at DEBUG level 464 if (LOG.isDebugEnabled()) { 465 if ((now - lastMillis) >= COMPACTION_PROGRESS_LOG_INTERVAL) { 466 String rate = String.format("%.2f", 467 (bytesWrittenProgressForLog / 1024.0) / ((now - lastMillis) / 1000.0)); 468 LOG.debug("Compaction progress: {} {}, rate={} KB/sec, throughputController is {}", 469 compactionName, progress, rate, throughputController); 470 lastMillis = now; 471 bytesWrittenProgressForLog = 0; 472 } 473 } 474 cells.clear(); 475 } while (hasMore); 476 } catch (InterruptedException e) { 477 progress.cancel(); 478 throw new InterruptedIOException( 479 "Interrupted while control throughput of compacting " + compactionName); 480 } finally { 481 // Clone last cell in the final because writer will append last cell when committing. If 482 // don't clone here and once the scanner get closed, then the memory of last cell will be 483 // released. (HBASE-22582) 484 ((ShipperListener) writer).beforeShipped(); 485 throughputController.finish(compactionName); 486 } 487 progress.complete(); 488 return true; 489 } 490 491 /** 492 * @param store store 493 * @param scanners Store file scanners. 494 * @param scanType Scan type. 495 * @param smallestReadPoint Smallest MVCC read point. 496 * @param earliestPutTs Earliest put across all files. 497 * @return A compaction scanner. 498 */ 499 protected InternalScanner createScanner(HStore store, ScanInfo scanInfo, 500 List<StoreFileScanner> scanners, ScanType scanType, long smallestReadPoint, 501 long earliestPutTs) throws IOException { 502 return new StoreScanner(store, scanInfo, scanners, scanType, smallestReadPoint, earliestPutTs); 503 } 504 505 /** 506 * @param store The store. 507 * @param scanners Store file scanners. 508 * @param smallestReadPoint Smallest MVCC read point. 509 * @param earliestPutTs Earliest put across all files. 510 * @param dropDeletesFromRow Drop deletes starting with this row, inclusive. Can be null. 511 * @param dropDeletesToRow Drop deletes ending with this row, exclusive. Can be null. 512 * @return A compaction scanner. 513 */ 514 protected InternalScanner createScanner(HStore store, ScanInfo scanInfo, 515 List<StoreFileScanner> scanners, long smallestReadPoint, long earliestPutTs, 516 byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException { 517 return new StoreScanner(store, scanInfo, scanners, smallestReadPoint, earliestPutTs, 518 dropDeletesFromRow, dropDeletesToRow); 519 } 520}