001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.compactions; 019 020import static org.apache.hadoop.hbase.regionserver.HStoreFile.EARLIEST_PUT_TS; 021import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY; 022import static org.apache.hadoop.hbase.regionserver.ScanType.COMPACT_DROP_DELETES; 023import static org.apache.hadoop.hbase.regionserver.ScanType.COMPACT_RETAIN_DELETES; 024 025import java.io.IOException; 026import java.io.InterruptedIOException; 027import java.util.ArrayList; 028import java.util.Collection; 029import java.util.List; 030import java.util.Map; 031 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.hbase.Cell; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.PrivateCellUtil; 037import org.apache.hadoop.hbase.KeyValueUtil; 038import org.apache.hadoop.hbase.io.compress.Compression; 039import org.apache.hadoop.hbase.io.hfile.HFile; 040import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo; 041import org.apache.hadoop.hbase.regionserver.CellSink; 042import org.apache.hadoop.hbase.regionserver.HStore; 043import org.apache.hadoop.hbase.regionserver.HStoreFile; 044import org.apache.hadoop.hbase.regionserver.InternalScanner; 045import org.apache.hadoop.hbase.regionserver.KeyValueScanner; 046import org.apache.hadoop.hbase.regionserver.ScanInfo; 047import org.apache.hadoop.hbase.regionserver.ScanType; 048import org.apache.hadoop.hbase.regionserver.ScannerContext; 049import org.apache.hadoop.hbase.regionserver.ShipperListener; 050import org.apache.hadoop.hbase.regionserver.StoreFileReader; 051import org.apache.hadoop.hbase.regionserver.StoreFileScanner; 052import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 053import org.apache.hadoop.hbase.regionserver.StoreScanner; 054import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; 055import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil; 056import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 057import org.apache.hadoop.hbase.security.User; 058import org.apache.hadoop.hbase.util.Bytes; 059import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 060import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; 061import org.apache.yetus.audience.InterfaceAudience; 062import org.slf4j.Logger; 063import org.slf4j.LoggerFactory; 064import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 065 066/** 067 * A compactor is a compaction algorithm associated a given policy. Base class also contains 068 * reusable parts for implementing compactors (what is common and what isn't is evolving). 069 */ 070@InterfaceAudience.Private 071public abstract class Compactor<T extends CellSink> { 072 private static final Logger LOG = LoggerFactory.getLogger(Compactor.class); 073 protected static final long COMPACTION_PROGRESS_LOG_INTERVAL = 60 * 1000; 074 protected volatile CompactionProgress progress; 075 protected final Configuration conf; 076 protected final HStore store; 077 078 protected final int compactionKVMax; 079 protected final Compression.Algorithm compactionCompression; 080 081 /** specify how many days to keep MVCC values during major compaction **/ 082 protected int keepSeqIdPeriod; 083 084 // Configs that drive whether we drop page cache behind compactions 085 protected static final String MAJOR_COMPACTION_DROP_CACHE = 086 "hbase.regionserver.majorcompaction.pagecache.drop"; 087 protected static final String MINOR_COMPACTION_DROP_CACHE = 088 "hbase.regionserver.minorcompaction.pagecache.drop"; 089 090 private final boolean dropCacheMajor; 091 private final boolean dropCacheMinor; 092 093 //TODO: depending on Store is not good but, realistically, all compactors currently do. 094 Compactor(Configuration conf, HStore store) { 095 this.conf = conf; 096 this.store = store; 097 this.compactionKVMax = 098 this.conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT); 099 this.compactionCompression = (this.store.getColumnFamilyDescriptor() == null) ? 100 Compression.Algorithm.NONE : this.store.getColumnFamilyDescriptor().getCompactionCompressionType(); 101 this.keepSeqIdPeriod = Math.max(this.conf.getInt(HConstants.KEEP_SEQID_PERIOD, 102 HConstants.MIN_KEEP_SEQID_PERIOD), HConstants.MIN_KEEP_SEQID_PERIOD); 103 this.dropCacheMajor = conf.getBoolean(MAJOR_COMPACTION_DROP_CACHE, true); 104 this.dropCacheMinor = conf.getBoolean(MINOR_COMPACTION_DROP_CACHE, true); 105 } 106 107 108 109 protected interface CellSinkFactory<S> { 110 S createWriter(InternalScanner scanner, FileDetails fd, boolean shouldDropBehind) 111 throws IOException; 112 } 113 114 public CompactionProgress getProgress() { 115 return this.progress; 116 } 117 118 /** The sole reason this class exists is that java has no ref/out/pointer parameters. */ 119 protected static class FileDetails { 120 /** Maximum key count after compaction (for blooms) */ 121 public long maxKeyCount = 0; 122 /** Earliest put timestamp if major compaction */ 123 public long earliestPutTs = HConstants.LATEST_TIMESTAMP; 124 /** Latest put timestamp */ 125 public long latestPutTs = HConstants.LATEST_TIMESTAMP; 126 /** The last key in the files we're compacting. */ 127 public long maxSeqId = 0; 128 /** Latest memstore read point found in any of the involved files */ 129 public long maxMVCCReadpoint = 0; 130 /** Max tags length**/ 131 public int maxTagsLength = 0; 132 /** Min SeqId to keep during a major compaction **/ 133 public long minSeqIdToKeep = 0; 134 } 135 136 /** 137 * Extracts some details about the files to compact that are commonly needed by compactors. 138 * @param filesToCompact Files. 139 * @param allFiles Whether all files are included for compaction 140 * @return The result. 141 */ 142 private FileDetails getFileDetails( 143 Collection<HStoreFile> filesToCompact, boolean allFiles) throws IOException { 144 FileDetails fd = new FileDetails(); 145 long oldestHFileTimestampToKeepMVCC = System.currentTimeMillis() - 146 (1000L * 60 * 60 * 24 * this.keepSeqIdPeriod); 147 148 for (HStoreFile file : filesToCompact) { 149 if(allFiles && (file.getModificationTimestamp() < oldestHFileTimestampToKeepMVCC)) { 150 // when isAllFiles is true, all files are compacted so we can calculate the smallest 151 // MVCC value to keep 152 if(fd.minSeqIdToKeep < file.getMaxMemStoreTS()) { 153 fd.minSeqIdToKeep = file.getMaxMemStoreTS(); 154 } 155 } 156 long seqNum = file.getMaxSequenceId(); 157 fd.maxSeqId = Math.max(fd.maxSeqId, seqNum); 158 StoreFileReader r = file.getReader(); 159 if (r == null) { 160 LOG.warn("Null reader for " + file.getPath()); 161 continue; 162 } 163 // NOTE: use getEntries when compacting instead of getFilterEntries, otherwise under-sized 164 // blooms can cause progress to be miscalculated or if the user switches bloom 165 // type (e.g. from ROW to ROWCOL) 166 long keyCount = r.getEntries(); 167 fd.maxKeyCount += keyCount; 168 // calculate the latest MVCC readpoint in any of the involved store files 169 Map<byte[], byte[]> fileInfo = r.loadFileInfo(); 170 byte[] tmp = null; 171 // Get and set the real MVCCReadpoint for bulk loaded files, which is the 172 // SeqId number. 173 if (r.isBulkLoaded()) { 174 fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, r.getSequenceID()); 175 } 176 else { 177 tmp = fileInfo.get(HFile.Writer.MAX_MEMSTORE_TS_KEY); 178 if (tmp != null) { 179 fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, Bytes.toLong(tmp)); 180 } 181 } 182 tmp = fileInfo.get(FileInfo.MAX_TAGS_LEN); 183 if (tmp != null) { 184 fd.maxTagsLength = Math.max(fd.maxTagsLength, Bytes.toInt(tmp)); 185 } 186 // If required, calculate the earliest put timestamp of all involved storefiles. 187 // This is used to remove family delete marker during compaction. 188 long earliestPutTs = 0; 189 if (allFiles) { 190 tmp = fileInfo.get(EARLIEST_PUT_TS); 191 if (tmp == null) { 192 // There's a file with no information, must be an old one 193 // assume we have very old puts 194 fd.earliestPutTs = earliestPutTs = HConstants.OLDEST_TIMESTAMP; 195 } else { 196 earliestPutTs = Bytes.toLong(tmp); 197 fd.earliestPutTs = Math.min(fd.earliestPutTs, earliestPutTs); 198 } 199 } 200 tmp = fileInfo.get(TIMERANGE_KEY); 201 fd.latestPutTs = tmp == null ? HConstants.LATEST_TIMESTAMP: TimeRangeTracker.parseFrom(tmp).getMax(); 202 LOG.debug("Compacting {}, keycount={}, bloomtype={}, size={}, " 203 + "encoding={}, compression={}, seqNum={}{}", 204 (file.getPath() == null? null: file.getPath().getName()), 205 keyCount, 206 r.getBloomFilterType().toString(), 207 TraditionalBinaryPrefix.long2String(r.length(), "", 1), 208 r.getHFileReader().getDataBlockEncoding(), 209 compactionCompression, 210 seqNum, 211 (allFiles? ", earliestPutTs=" + earliestPutTs: "")); 212 } 213 return fd; 214 } 215 216 /** 217 * Creates file scanners for compaction. 218 * @param filesToCompact Files. 219 * @return Scanners. 220 */ 221 private List<StoreFileScanner> createFileScanners(Collection<HStoreFile> filesToCompact, 222 long smallestReadPoint, boolean useDropBehind) throws IOException { 223 return StoreFileScanner.getScannersForCompaction(filesToCompact, useDropBehind, 224 smallestReadPoint); 225 } 226 227 private long getSmallestReadPoint() { 228 return store.getSmallestReadPoint(); 229 } 230 231 protected interface InternalScannerFactory { 232 233 ScanType getScanType(CompactionRequestImpl request); 234 235 InternalScanner createScanner(ScanInfo scanInfo, List<StoreFileScanner> scanners, ScanType scanType, 236 FileDetails fd, long smallestReadPoint) throws IOException; 237 } 238 239 protected final InternalScannerFactory defaultScannerFactory = new InternalScannerFactory() { 240 241 @Override 242 public ScanType getScanType(CompactionRequestImpl request) { 243 return request.isAllFiles() ? COMPACT_DROP_DELETES : COMPACT_RETAIN_DELETES; 244 } 245 246 @Override 247 public InternalScanner createScanner(ScanInfo scanInfo, List<StoreFileScanner> scanners, 248 ScanType scanType, FileDetails fd, long smallestReadPoint) throws IOException { 249 return Compactor.this.createScanner(store, scanInfo, scanners, scanType, smallestReadPoint, 250 fd.earliestPutTs); 251 } 252 }; 253 254 /** 255 * Creates a writer for a new file in a temporary directory. 256 * @param fd The file details. 257 * @return Writer for a new StoreFile in the tmp dir. 258 * @throws IOException if creation failed 259 */ 260 protected final StoreFileWriter createTmpWriter(FileDetails fd, boolean shouldDropBehind) 261 throws IOException { 262 // When all MVCC readpoints are 0, don't write them. 263 // See HBASE-8166, HBASE-12600, and HBASE-13389. 264 return store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true, 265 fd.maxMVCCReadpoint > 0, fd.maxTagsLength > 0, shouldDropBehind); 266 } 267 268 private ScanInfo preCompactScannerOpen(CompactionRequestImpl request, ScanType scanType, 269 User user) throws IOException { 270 if (store.getCoprocessorHost() == null) { 271 return store.getScanInfo(); 272 } 273 return store.getCoprocessorHost().preCompactScannerOpen(store, scanType, request.getTracker(), 274 request, user); 275 } 276 277 /** 278 * Calls coprocessor, if any, to create scanners - after normal scanner creation. 279 * @param request Compaction request. 280 * @param scanType Scan type. 281 * @param scanner The default scanner created for compaction. 282 * @return Scanner scanner to use (usually the default); null if compaction should not proceed. 283 */ 284 private InternalScanner postCompactScannerOpen(CompactionRequestImpl request, ScanType scanType, 285 InternalScanner scanner, User user) throws IOException { 286 if (store.getCoprocessorHost() == null) { 287 return scanner; 288 } 289 return store.getCoprocessorHost().preCompact(store, scanner, scanType, request.getTracker(), 290 request, user); 291 } 292 293 protected final List<Path> compact(final CompactionRequestImpl request, 294 InternalScannerFactory scannerFactory, CellSinkFactory<T> sinkFactory, 295 ThroughputController throughputController, User user) throws IOException { 296 FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles()); 297 this.progress = new CompactionProgress(fd.maxKeyCount); 298 299 // Find the smallest read point across all the Scanners. 300 long smallestReadPoint = getSmallestReadPoint(); 301 302 T writer = null; 303 boolean dropCache; 304 if (request.isMajor() || request.isAllFiles()) { 305 dropCache = this.dropCacheMajor; 306 } else { 307 dropCache = this.dropCacheMinor; 308 } 309 310 List<StoreFileScanner> scanners = 311 createFileScanners(request.getFiles(), smallestReadPoint, dropCache); 312 InternalScanner scanner = null; 313 boolean finished = false; 314 try { 315 /* Include deletes, unless we are doing a major compaction */ 316 ScanType scanType = scannerFactory.getScanType(request); 317 ScanInfo scanInfo = preCompactScannerOpen(request, scanType, user); 318 scanner = postCompactScannerOpen(request, scanType, 319 scannerFactory.createScanner(scanInfo, scanners, scanType, fd, smallestReadPoint), user); 320 boolean cleanSeqId = false; 321 if (fd.minSeqIdToKeep > 0 && !store.getColumnFamilyDescriptor().isNewVersionBehavior()) { 322 // For mvcc-sensitive family, we never set mvcc to 0. 323 smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint); 324 cleanSeqId = true; 325 } 326 writer = sinkFactory.createWriter(scanner, fd, dropCache); 327 finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId, 328 throughputController, request.isAllFiles(), request.getFiles().size()); 329 if (!finished) { 330 throw new InterruptedIOException("Aborting compaction of store " + store + " in region " 331 + store.getRegionInfo().getRegionNameAsString() + " because it was interrupted."); 332 } 333 } finally { 334 Closeables.close(scanner, true); 335 if (!finished && writer != null) { 336 abortWriter(writer); 337 } 338 } 339 assert finished : "We should have exited the method on all error paths"; 340 assert writer != null : "Writer should be non-null if no error"; 341 return commitWriter(writer, fd, request); 342 } 343 344 protected abstract List<Path> commitWriter(T writer, FileDetails fd, 345 CompactionRequestImpl request) throws IOException; 346 347 protected abstract void abortWriter(T writer) throws IOException; 348 349 /** 350 * Performs the compaction. 351 * @param fd FileDetails of cell sink writer 352 * @param scanner Where to read from. 353 * @param writer Where to write to. 354 * @param smallestReadPoint Smallest read point. 355 * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= 356 * smallestReadPoint 357 * @param major Is a major compaction. 358 * @param numofFilesToCompact the number of files to compact 359 * @return Whether compaction ended; false if it was interrupted for some reason. 360 */ 361 protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, 362 long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController, 363 boolean major, int numofFilesToCompact) throws IOException { 364 assert writer instanceof ShipperListener; 365 long bytesWrittenProgressForCloseCheck = 0; 366 long bytesWrittenProgressForLog = 0; 367 long bytesWrittenProgressForShippedCall = 0; 368 // Since scanner.next() can return 'false' but still be delivering data, 369 // we have to use a do/while loop. 370 List<Cell> cells = new ArrayList<>(); 371 long closeCheckSizeLimit = HStore.getCloseCheckInterval(); 372 long lastMillis = 0; 373 if (LOG.isDebugEnabled()) { 374 lastMillis = EnvironmentEdgeManager.currentTime(); 375 } 376 String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction"); 377 long now = 0; 378 boolean hasMore; 379 ScannerContext scannerContext = 380 ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); 381 382 throughputController.start(compactionName); 383 KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null; 384 long shippedCallSizeLimit = 385 (long) numofFilesToCompact * this.store.getColumnFamilyDescriptor().getBlocksize(); 386 try { 387 do { 388 hasMore = scanner.next(cells, scannerContext); 389 if (LOG.isDebugEnabled()) { 390 now = EnvironmentEdgeManager.currentTime(); 391 } 392 // output to writer: 393 Cell lastCleanCell = null; 394 long lastCleanCellSeqId = 0; 395 for (Cell c : cells) { 396 if (cleanSeqId && c.getSequenceId() <= smallestReadPoint) { 397 lastCleanCell = c; 398 lastCleanCellSeqId = c.getSequenceId(); 399 PrivateCellUtil.setSequenceId(c, 0); 400 } else { 401 lastCleanCell = null; 402 lastCleanCellSeqId = 0; 403 } 404 writer.append(c); 405 int len = KeyValueUtil.length(c); 406 ++progress.currentCompactedKVs; 407 progress.totalCompactedSize += len; 408 bytesWrittenProgressForShippedCall += len; 409 if (LOG.isDebugEnabled()) { 410 bytesWrittenProgressForLog += len; 411 } 412 throughputController.control(compactionName, len); 413 // check periodically to see if a system stop is requested 414 if (closeCheckSizeLimit > 0) { 415 bytesWrittenProgressForCloseCheck += len; 416 if (bytesWrittenProgressForCloseCheck > closeCheckSizeLimit) { 417 bytesWrittenProgressForCloseCheck = 0; 418 if (!store.areWritesEnabled()) { 419 progress.cancel(); 420 return false; 421 } 422 } 423 } 424 if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) { 425 if (lastCleanCell != null) { 426 // HBASE-16931, set back sequence id to avoid affecting scan order unexpectedly. 427 // ShipperListener will do a clone of the last cells it refer, so need to set back 428 // sequence id before ShipperListener.beforeShipped 429 PrivateCellUtil.setSequenceId(lastCleanCell, lastCleanCellSeqId); 430 } 431 // Clone the cells that are in the writer so that they are freed of references, 432 // if they are holding any. 433 ((ShipperListener) writer).beforeShipped(); 434 // The SHARED block references, being read for compaction, will be kept in prevBlocks 435 // list(See HFileScannerImpl#prevBlocks). In case of scan flow, after each set of cells 436 // being returned to client, we will call shipped() which can clear this list. Here by 437 // we are doing the similar thing. In between the compaction (after every N cells 438 // written with collective size of 'shippedCallSizeLimit') we will call shipped which 439 // may clear prevBlocks list. 440 kvs.shipped(); 441 bytesWrittenProgressForShippedCall = 0; 442 } 443 } 444 if (lastCleanCell != null) { 445 // HBASE-16931, set back sequence id to avoid affecting scan order unexpectedly 446 PrivateCellUtil.setSequenceId(lastCleanCell, lastCleanCellSeqId); 447 } 448 // Log the progress of long running compactions every minute if 449 // logging at DEBUG level 450 if (LOG.isDebugEnabled()) { 451 if ((now - lastMillis) >= COMPACTION_PROGRESS_LOG_INTERVAL) { 452 String rate = String.format("%.2f", 453 (bytesWrittenProgressForLog / 1024.0) / ((now - lastMillis) / 1000.0)); 454 LOG.debug("Compaction progress: {} {}, rate={} KB/sec, throughputController is {}", 455 compactionName, progress, rate, throughputController); 456 lastMillis = now; 457 bytesWrittenProgressForLog = 0; 458 } 459 } 460 cells.clear(); 461 } while (hasMore); 462 } catch (InterruptedException e) { 463 progress.cancel(); 464 throw new InterruptedIOException( 465 "Interrupted while control throughput of compacting " + compactionName); 466 } finally { 467 // Clone last cell in the final because writer will append last cell when committing. If 468 // don't clone here and once the scanner get closed, then the memory of last cell will be 469 // released. (HBASE-22582) 470 ((ShipperListener) writer).beforeShipped(); 471 throughputController.finish(compactionName); 472 } 473 progress.complete(); 474 return true; 475 } 476 477 /** 478 * @param store store 479 * @param scanners Store file scanners. 480 * @param scanType Scan type. 481 * @param smallestReadPoint Smallest MVCC read point. 482 * @param earliestPutTs Earliest put across all files. 483 * @return A compaction scanner. 484 */ 485 protected InternalScanner createScanner(HStore store, ScanInfo scanInfo, 486 List<StoreFileScanner> scanners, ScanType scanType, long smallestReadPoint, 487 long earliestPutTs) throws IOException { 488 return new StoreScanner(store, scanInfo, scanners, scanType, smallestReadPoint, earliestPutTs); 489 } 490 491 /** 492 * @param store The store. 493 * @param scanners Store file scanners. 494 * @param smallestReadPoint Smallest MVCC read point. 495 * @param earliestPutTs Earliest put across all files. 496 * @param dropDeletesFromRow Drop deletes starting with this row, inclusive. Can be null. 497 * @param dropDeletesToRow Drop deletes ending with this row, exclusive. Can be null. 498 * @return A compaction scanner. 499 */ 500 protected InternalScanner createScanner(HStore store, ScanInfo scanInfo, 501 List<StoreFileScanner> scanners, long smallestReadPoint, long earliestPutTs, 502 byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException { 503 return new StoreScanner(store, scanInfo, scanners, smallestReadPoint, earliestPutTs, 504 dropDeletesFromRow, dropDeletesToRow); 505 } 506}