001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.compactions; 019 020import static org.apache.hadoop.hbase.regionserver.HStoreFile.EARLIEST_PUT_TS; 021import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY; 022import static org.apache.hadoop.hbase.regionserver.ScanType.COMPACT_DROP_DELETES; 023import static org.apache.hadoop.hbase.regionserver.ScanType.COMPACT_RETAIN_DELETES; 024 025import java.io.IOException; 026import java.io.InterruptedIOException; 027import java.util.ArrayList; 028import java.util.Collection; 029import java.util.List; 030import java.util.Map; 031 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.hbase.Cell; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.PrivateCellUtil; 037import org.apache.hadoop.hbase.io.compress.Compression; 038import org.apache.hadoop.hbase.io.hfile.HFile; 039import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo; 040import org.apache.hadoop.hbase.regionserver.CellSink; 041import org.apache.hadoop.hbase.regionserver.HStore; 042import org.apache.hadoop.hbase.regionserver.HStoreFile; 043import org.apache.hadoop.hbase.regionserver.InternalScanner; 044import org.apache.hadoop.hbase.regionserver.KeyValueScanner; 045import org.apache.hadoop.hbase.regionserver.ScanInfo; 046import org.apache.hadoop.hbase.regionserver.ScanType; 047import org.apache.hadoop.hbase.regionserver.ScannerContext; 048import org.apache.hadoop.hbase.regionserver.ShipperListener; 049import org.apache.hadoop.hbase.regionserver.StoreFileReader; 050import org.apache.hadoop.hbase.regionserver.StoreFileScanner; 051import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 052import org.apache.hadoop.hbase.regionserver.StoreScanner; 053import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; 054import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil; 055import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 056import org.apache.hadoop.hbase.security.User; 057import org.apache.hadoop.hbase.util.Bytes; 058import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 059import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; 060import org.apache.yetus.audience.InterfaceAudience; 061import org.slf4j.Logger; 062import org.slf4j.LoggerFactory; 063import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 064 065/** 066 * A compactor is a compaction algorithm associated a given policy. Base class also contains 067 * reusable parts for implementing compactors (what is common and what isn't is evolving). 068 */ 069@InterfaceAudience.Private 070public abstract class Compactor<T extends CellSink> { 071 private static final Logger LOG = LoggerFactory.getLogger(Compactor.class); 072 protected static final long COMPACTION_PROGRESS_LOG_INTERVAL = 60 * 1000; 073 protected volatile CompactionProgress progress; 074 protected final Configuration conf; 075 protected final HStore store; 076 077 protected final int compactionKVMax; 078 protected final Compression.Algorithm compactionCompression; 079 080 /** specify how many days to keep MVCC values during major compaction **/ 081 protected int keepSeqIdPeriod; 082 083 // Configs that drive whether we drop page cache behind compactions 084 protected static final String MAJOR_COMPACTION_DROP_CACHE = 085 "hbase.regionserver.majorcompaction.pagecache.drop"; 086 protected static final String MINOR_COMPACTION_DROP_CACHE = 087 "hbase.regionserver.minorcompaction.pagecache.drop"; 088 089 private final boolean dropCacheMajor; 090 private final boolean dropCacheMinor; 091 092 //TODO: depending on Store is not good but, realistically, all compactors currently do. 093 Compactor(Configuration conf, HStore store) { 094 this.conf = conf; 095 this.store = store; 096 this.compactionKVMax = 097 this.conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT); 098 this.compactionCompression = (this.store.getColumnFamilyDescriptor() == null) ? 099 Compression.Algorithm.NONE : this.store.getColumnFamilyDescriptor().getCompactionCompressionType(); 100 this.keepSeqIdPeriod = Math.max(this.conf.getInt(HConstants.KEEP_SEQID_PERIOD, 101 HConstants.MIN_KEEP_SEQID_PERIOD), HConstants.MIN_KEEP_SEQID_PERIOD); 102 this.dropCacheMajor = conf.getBoolean(MAJOR_COMPACTION_DROP_CACHE, true); 103 this.dropCacheMinor = conf.getBoolean(MINOR_COMPACTION_DROP_CACHE, true); 104 } 105 106 107 108 protected interface CellSinkFactory<S> { 109 S createWriter(InternalScanner scanner, FileDetails fd, boolean shouldDropBehind) 110 throws IOException; 111 } 112 113 public CompactionProgress getProgress() { 114 return this.progress; 115 } 116 117 /** The sole reason this class exists is that java has no ref/out/pointer parameters. */ 118 protected static class FileDetails { 119 /** Maximum key count after compaction (for blooms) */ 120 public long maxKeyCount = 0; 121 /** Earliest put timestamp if major compaction */ 122 public long earliestPutTs = HConstants.LATEST_TIMESTAMP; 123 /** Latest put timestamp */ 124 public long latestPutTs = HConstants.LATEST_TIMESTAMP; 125 /** The last key in the files we're compacting. */ 126 public long maxSeqId = 0; 127 /** Latest memstore read point found in any of the involved files */ 128 public long maxMVCCReadpoint = 0; 129 /** Max tags length**/ 130 public int maxTagsLength = 0; 131 /** Min SeqId to keep during a major compaction **/ 132 public long minSeqIdToKeep = 0; 133 } 134 135 /** 136 * Extracts some details about the files to compact that are commonly needed by compactors. 137 * @param filesToCompact Files. 138 * @param allFiles Whether all files are included for compaction 139 * @return The result. 140 */ 141 private FileDetails getFileDetails( 142 Collection<HStoreFile> filesToCompact, boolean allFiles) throws IOException { 143 FileDetails fd = new FileDetails(); 144 long oldestHFileTimestampToKeepMVCC = System.currentTimeMillis() - 145 (1000L * 60 * 60 * 24 * this.keepSeqIdPeriod); 146 147 for (HStoreFile file : filesToCompact) { 148 if(allFiles && (file.getModificationTimestamp() < oldestHFileTimestampToKeepMVCC)) { 149 // when isAllFiles is true, all files are compacted so we can calculate the smallest 150 // MVCC value to keep 151 if(fd.minSeqIdToKeep < file.getMaxMemStoreTS()) { 152 fd.minSeqIdToKeep = file.getMaxMemStoreTS(); 153 } 154 } 155 long seqNum = file.getMaxSequenceId(); 156 fd.maxSeqId = Math.max(fd.maxSeqId, seqNum); 157 StoreFileReader r = file.getReader(); 158 if (r == null) { 159 LOG.warn("Null reader for " + file.getPath()); 160 continue; 161 } 162 // NOTE: use getEntries when compacting instead of getFilterEntries, otherwise under-sized 163 // blooms can cause progress to be miscalculated or if the user switches bloom 164 // type (e.g. from ROW to ROWCOL) 165 long keyCount = r.getEntries(); 166 fd.maxKeyCount += keyCount; 167 // calculate the latest MVCC readpoint in any of the involved store files 168 Map<byte[], byte[]> fileInfo = r.loadFileInfo(); 169 byte[] tmp = null; 170 // Get and set the real MVCCReadpoint for bulk loaded files, which is the 171 // SeqId number. 172 if (r.isBulkLoaded()) { 173 fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, r.getSequenceID()); 174 } 175 else { 176 tmp = fileInfo.get(HFile.Writer.MAX_MEMSTORE_TS_KEY); 177 if (tmp != null) { 178 fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, Bytes.toLong(tmp)); 179 } 180 } 181 tmp = fileInfo.get(FileInfo.MAX_TAGS_LEN); 182 if (tmp != null) { 183 fd.maxTagsLength = Math.max(fd.maxTagsLength, Bytes.toInt(tmp)); 184 } 185 // If required, calculate the earliest put timestamp of all involved storefiles. 186 // This is used to remove family delete marker during compaction. 187 long earliestPutTs = 0; 188 if (allFiles) { 189 tmp = fileInfo.get(EARLIEST_PUT_TS); 190 if (tmp == null) { 191 // There's a file with no information, must be an old one 192 // assume we have very old puts 193 fd.earliestPutTs = earliestPutTs = HConstants.OLDEST_TIMESTAMP; 194 } else { 195 earliestPutTs = Bytes.toLong(tmp); 196 fd.earliestPutTs = Math.min(fd.earliestPutTs, earliestPutTs); 197 } 198 } 199 tmp = fileInfo.get(TIMERANGE_KEY); 200 fd.latestPutTs = tmp == null ? HConstants.LATEST_TIMESTAMP: TimeRangeTracker.parseFrom(tmp).getMax(); 201 LOG.debug("Compacting {}, keycount={}, bloomtype={}, size={}, " 202 + "encoding={}, compression={}, seqNum={}{}", 203 (file.getPath() == null? null: file.getPath().getName()), 204 keyCount, 205 r.getBloomFilterType().toString(), 206 TraditionalBinaryPrefix.long2String(r.length(), "", 1), 207 r.getHFileReader().getDataBlockEncoding(), 208 compactionCompression, 209 seqNum, 210 (allFiles? ", earliestPutTs=" + earliestPutTs: "")); 211 } 212 return fd; 213 } 214 215 /** 216 * Creates file scanners for compaction. 217 * @param filesToCompact Files. 218 * @return Scanners. 219 */ 220 private List<StoreFileScanner> createFileScanners(Collection<HStoreFile> filesToCompact, 221 long smallestReadPoint, boolean useDropBehind) throws IOException { 222 return StoreFileScanner.getScannersForCompaction(filesToCompact, useDropBehind, 223 smallestReadPoint); 224 } 225 226 private long getSmallestReadPoint() { 227 return store.getSmallestReadPoint(); 228 } 229 230 protected interface InternalScannerFactory { 231 232 ScanType getScanType(CompactionRequestImpl request); 233 234 InternalScanner createScanner(ScanInfo scanInfo, List<StoreFileScanner> scanners, ScanType scanType, 235 FileDetails fd, long smallestReadPoint) throws IOException; 236 } 237 238 protected final InternalScannerFactory defaultScannerFactory = new InternalScannerFactory() { 239 240 @Override 241 public ScanType getScanType(CompactionRequestImpl request) { 242 return request.isAllFiles() ? COMPACT_DROP_DELETES : COMPACT_RETAIN_DELETES; 243 } 244 245 @Override 246 public InternalScanner createScanner(ScanInfo scanInfo, List<StoreFileScanner> scanners, 247 ScanType scanType, FileDetails fd, long smallestReadPoint) throws IOException { 248 return Compactor.this.createScanner(store, scanInfo, scanners, scanType, smallestReadPoint, 249 fd.earliestPutTs); 250 } 251 }; 252 253 /** 254 * Creates a writer for a new file in a temporary directory. 255 * @param fd The file details. 256 * @return Writer for a new StoreFile in the tmp dir. 257 * @throws IOException if creation failed 258 */ 259 protected final StoreFileWriter createTmpWriter(FileDetails fd, boolean shouldDropBehind) 260 throws IOException { 261 // When all MVCC readpoints are 0, don't write them. 262 // See HBASE-8166, HBASE-12600, and HBASE-13389. 263 return store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true, 264 fd.maxMVCCReadpoint > 0, fd.maxTagsLength > 0, shouldDropBehind); 265 } 266 267 private ScanInfo preCompactScannerOpen(CompactionRequestImpl request, ScanType scanType, 268 User user) throws IOException { 269 if (store.getCoprocessorHost() == null) { 270 return store.getScanInfo(); 271 } 272 return store.getCoprocessorHost().preCompactScannerOpen(store, scanType, request.getTracker(), 273 request, user); 274 } 275 276 /** 277 * Calls coprocessor, if any, to create scanners - after normal scanner creation. 278 * @param request Compaction request. 279 * @param scanType Scan type. 280 * @param scanner The default scanner created for compaction. 281 * @return Scanner scanner to use (usually the default); null if compaction should not proceed. 282 */ 283 private InternalScanner postCompactScannerOpen(CompactionRequestImpl request, ScanType scanType, 284 InternalScanner scanner, User user) throws IOException { 285 if (store.getCoprocessorHost() == null) { 286 return scanner; 287 } 288 return store.getCoprocessorHost().preCompact(store, scanner, scanType, request.getTracker(), 289 request, user); 290 } 291 292 protected final List<Path> compact(final CompactionRequestImpl request, 293 InternalScannerFactory scannerFactory, CellSinkFactory<T> sinkFactory, 294 ThroughputController throughputController, User user) throws IOException { 295 FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles()); 296 this.progress = new CompactionProgress(fd.maxKeyCount); 297 298 // Find the smallest read point across all the Scanners. 299 long smallestReadPoint = getSmallestReadPoint(); 300 301 T writer = null; 302 boolean dropCache; 303 if (request.isMajor() || request.isAllFiles()) { 304 dropCache = this.dropCacheMajor; 305 } else { 306 dropCache = this.dropCacheMinor; 307 } 308 309 List<StoreFileScanner> scanners = 310 createFileScanners(request.getFiles(), smallestReadPoint, dropCache); 311 InternalScanner scanner = null; 312 boolean finished = false; 313 try { 314 /* Include deletes, unless we are doing a major compaction */ 315 ScanType scanType = scannerFactory.getScanType(request); 316 ScanInfo scanInfo = preCompactScannerOpen(request, scanType, user); 317 scanner = postCompactScannerOpen(request, scanType, 318 scannerFactory.createScanner(scanInfo, scanners, scanType, fd, smallestReadPoint), user); 319 boolean cleanSeqId = false; 320 if (fd.minSeqIdToKeep > 0 && !store.getColumnFamilyDescriptor().isNewVersionBehavior()) { 321 // For mvcc-sensitive family, we never set mvcc to 0. 322 smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint); 323 cleanSeqId = true; 324 } 325 writer = sinkFactory.createWriter(scanner, fd, dropCache); 326 finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId, 327 throughputController, request.isAllFiles(), request.getFiles().size()); 328 if (!finished) { 329 throw new InterruptedIOException("Aborting compaction of store " + store + " in region " 330 + store.getRegionInfo().getRegionNameAsString() + " because it was interrupted."); 331 } 332 } finally { 333 Closeables.close(scanner, true); 334 if (!finished && writer != null) { 335 abortWriter(writer); 336 } 337 } 338 assert finished : "We should have exited the method on all error paths"; 339 assert writer != null : "Writer should be non-null if no error"; 340 return commitWriter(writer, fd, request); 341 } 342 343 protected abstract List<Path> commitWriter(T writer, FileDetails fd, 344 CompactionRequestImpl request) throws IOException; 345 346 protected abstract void abortWriter(T writer) throws IOException; 347 348 /** 349 * Performs the compaction. 350 * @param fd FileDetails of cell sink writer 351 * @param scanner Where to read from. 352 * @param writer Where to write to. 353 * @param smallestReadPoint Smallest read point. 354 * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= 355 * smallestReadPoint 356 * @param major Is a major compaction. 357 * @param numofFilesToCompact the number of files to compact 358 * @return Whether compaction ended; false if it was interrupted for some reason. 359 */ 360 protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, 361 long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController, 362 boolean major, int numofFilesToCompact) throws IOException { 363 assert writer instanceof ShipperListener; 364 long bytesWrittenProgressForCloseCheck = 0; 365 long bytesWrittenProgressForLog = 0; 366 long bytesWrittenProgressForShippedCall = 0; 367 // Since scanner.next() can return 'false' but still be delivering data, 368 // we have to use a do/while loop. 369 List<Cell> cells = new ArrayList<>(); 370 long closeCheckSizeLimit = HStore.getCloseCheckInterval(); 371 long lastMillis = 0; 372 if (LOG.isDebugEnabled()) { 373 lastMillis = EnvironmentEdgeManager.currentTime(); 374 } 375 String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction"); 376 long now = 0; 377 boolean hasMore; 378 ScannerContext scannerContext = 379 ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); 380 381 throughputController.start(compactionName); 382 KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null; 383 long shippedCallSizeLimit = 384 (long) numofFilesToCompact * this.store.getColumnFamilyDescriptor().getBlocksize(); 385 try { 386 do { 387 hasMore = scanner.next(cells, scannerContext); 388 if (LOG.isDebugEnabled()) { 389 now = EnvironmentEdgeManager.currentTime(); 390 } 391 // output to writer: 392 Cell lastCleanCell = null; 393 long lastCleanCellSeqId = 0; 394 for (Cell c : cells) { 395 if (cleanSeqId && c.getSequenceId() <= smallestReadPoint) { 396 lastCleanCell = c; 397 lastCleanCellSeqId = c.getSequenceId(); 398 PrivateCellUtil.setSequenceId(c, 0); 399 } else { 400 lastCleanCell = null; 401 lastCleanCellSeqId = 0; 402 } 403 writer.append(c); 404 int len = c.getSerializedSize(); 405 ++progress.currentCompactedKVs; 406 progress.totalCompactedSize += len; 407 bytesWrittenProgressForShippedCall += len; 408 if (LOG.isDebugEnabled()) { 409 bytesWrittenProgressForLog += len; 410 } 411 throughputController.control(compactionName, len); 412 // check periodically to see if a system stop is requested 413 if (closeCheckSizeLimit > 0) { 414 bytesWrittenProgressForCloseCheck += len; 415 if (bytesWrittenProgressForCloseCheck > closeCheckSizeLimit) { 416 bytesWrittenProgressForCloseCheck = 0; 417 if (!store.areWritesEnabled()) { 418 progress.cancel(); 419 return false; 420 } 421 } 422 } 423 if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) { 424 if (lastCleanCell != null) { 425 // HBASE-16931, set back sequence id to avoid affecting scan order unexpectedly. 426 // ShipperListener will do a clone of the last cells it refer, so need to set back 427 // sequence id before ShipperListener.beforeShipped 428 PrivateCellUtil.setSequenceId(lastCleanCell, lastCleanCellSeqId); 429 } 430 // Clone the cells that are in the writer so that they are freed of references, 431 // if they are holding any. 432 ((ShipperListener) writer).beforeShipped(); 433 // The SHARED block references, being read for compaction, will be kept in prevBlocks 434 // list(See HFileScannerImpl#prevBlocks). In case of scan flow, after each set of cells 435 // being returned to client, we will call shipped() which can clear this list. Here by 436 // we are doing the similar thing. In between the compaction (after every N cells 437 // written with collective size of 'shippedCallSizeLimit') we will call shipped which 438 // may clear prevBlocks list. 439 kvs.shipped(); 440 bytesWrittenProgressForShippedCall = 0; 441 } 442 } 443 if (lastCleanCell != null) { 444 // HBASE-16931, set back sequence id to avoid affecting scan order unexpectedly 445 PrivateCellUtil.setSequenceId(lastCleanCell, lastCleanCellSeqId); 446 } 447 // Log the progress of long running compactions every minute if 448 // logging at DEBUG level 449 if (LOG.isDebugEnabled()) { 450 if ((now - lastMillis) >= COMPACTION_PROGRESS_LOG_INTERVAL) { 451 String rate = String.format("%.2f", 452 (bytesWrittenProgressForLog / 1024.0) / ((now - lastMillis) / 1000.0)); 453 LOG.debug("Compaction progress: {} {}, rate={} KB/sec, throughputController is {}", 454 compactionName, progress, rate, throughputController); 455 lastMillis = now; 456 bytesWrittenProgressForLog = 0; 457 } 458 } 459 cells.clear(); 460 } while (hasMore); 461 } catch (InterruptedException e) { 462 progress.cancel(); 463 throw new InterruptedIOException( 464 "Interrupted while control throughput of compacting " + compactionName); 465 } finally { 466 // Clone last cell in the final because writer will append last cell when committing. If 467 // don't clone here and once the scanner get closed, then the memory of last cell will be 468 // released. (HBASE-22582) 469 ((ShipperListener) writer).beforeShipped(); 470 throughputController.finish(compactionName); 471 } 472 progress.complete(); 473 return true; 474 } 475 476 /** 477 * @param store store 478 * @param scanners Store file scanners. 479 * @param scanType Scan type. 480 * @param smallestReadPoint Smallest MVCC read point. 481 * @param earliestPutTs Earliest put across all files. 482 * @return A compaction scanner. 483 */ 484 protected InternalScanner createScanner(HStore store, ScanInfo scanInfo, 485 List<StoreFileScanner> scanners, ScanType scanType, long smallestReadPoint, 486 long earliestPutTs) throws IOException { 487 return new StoreScanner(store, scanInfo, scanners, scanType, smallestReadPoint, earliestPutTs); 488 } 489 490 /** 491 * @param store The store. 492 * @param scanners Store file scanners. 493 * @param smallestReadPoint Smallest MVCC read point. 494 * @param earliestPutTs Earliest put across all files. 495 * @param dropDeletesFromRow Drop deletes starting with this row, inclusive. Can be null. 496 * @param dropDeletesToRow Drop deletes ending with this row, exclusive. Can be null. 497 * @return A compaction scanner. 498 */ 499 protected InternalScanner createScanner(HStore store, ScanInfo scanInfo, 500 List<StoreFileScanner> scanners, long smallestReadPoint, long earliestPutTs, 501 byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException { 502 return new StoreScanner(store, scanInfo, scanners, smallestReadPoint, earliestPutTs, 503 dropDeletesFromRow, dropDeletesToRow); 504 } 505}