001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mob; 019 020import static org.apache.hadoop.hbase.regionserver.ScanType.COMPACT_DROP_DELETES; 021import static org.apache.hadoop.hbase.regionserver.ScanType.COMPACT_RETAIN_DELETES; 022 023import java.io.FileNotFoundException; 024import java.io.IOException; 025import java.io.InterruptedIOException; 026import java.util.ArrayList; 027import java.util.Date; 028import java.util.HashMap; 029import java.util.List; 030import java.util.Map; 031import java.util.Optional; 032import java.util.function.Consumer; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.FileStatus; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.hbase.Cell; 038import org.apache.hadoop.hbase.KeyValue; 039import org.apache.hadoop.hbase.PrivateCellUtil; 040import org.apache.hadoop.hbase.TableName; 041import org.apache.hadoop.hbase.regionserver.CellSink; 042import org.apache.hadoop.hbase.regionserver.HMobStore; 043import org.apache.hadoop.hbase.regionserver.HStore; 044import org.apache.hadoop.hbase.regionserver.HStoreFile; 045import org.apache.hadoop.hbase.regionserver.InternalScanner; 046import org.apache.hadoop.hbase.regionserver.KeyValueScanner; 047import org.apache.hadoop.hbase.regionserver.ScanInfo; 048import org.apache.hadoop.hbase.regionserver.ScanType; 049import org.apache.hadoop.hbase.regionserver.ScannerContext; 050import org.apache.hadoop.hbase.regionserver.ShipperListener; 051import org.apache.hadoop.hbase.regionserver.StoreFileScanner; 052import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 053import org.apache.hadoop.hbase.regionserver.StoreScanner; 054import org.apache.hadoop.hbase.regionserver.compactions.CloseChecker; 055import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; 056import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; 057import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; 058import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil; 059import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 060import org.apache.hadoop.hbase.security.User; 061import org.apache.hadoop.hbase.util.Bytes; 062import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 063import org.apache.yetus.audience.InterfaceAudience; 064import org.slf4j.Logger; 065import org.slf4j.LoggerFactory; 066 067import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap; 068import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSetMultimap; 069import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 070import org.apache.hbase.thirdparty.com.google.common.collect.SetMultimap; 071 072/** 073 * Compact passed set of files in the mob-enabled column family. 074 */ 075@InterfaceAudience.Private 076public class DefaultMobStoreCompactor extends DefaultCompactor { 077 078 private static final Logger LOG = LoggerFactory.getLogger(DefaultMobStoreCompactor.class); 079 protected long mobSizeThreshold; 080 protected HMobStore mobStore; 081 protected boolean ioOptimizedMode = false; 082 083 /* 084 * MOB file reference set thread local variable. It contains set of a MOB file names, which newly 085 * compacted store file has references to. This variable is populated during compaction and the 086 * content of it is written into meta section of a newly created store file at the final step of 087 * compaction process. 088 */ 089 090 static ThreadLocal<SetMultimap<TableName, String>> mobRefSet = 091 ThreadLocal.withInitial(HashMultimap::create); 092 093 /* 094 * Is it user or system-originated request. 095 */ 096 097 static ThreadLocal<Boolean> userRequest = new ThreadLocal<Boolean>() { 098 @Override 099 protected Boolean initialValue() { 100 return Boolean.FALSE; 101 } 102 }; 103 104 /* 105 * Disable IO mode. IO mode can be forcefully disabled if compactor finds old MOB file 106 * (pre-distributed compaction). This means that migration has not been completed yet. During data 107 * migration (upgrade) process only general compaction is allowed. 108 */ 109 110 static ThreadLocal<Boolean> disableIO = new ThreadLocal<Boolean>() { 111 112 @Override 113 protected Boolean initialValue() { 114 return Boolean.FALSE; 115 } 116 }; 117 118 /* 119 * Map : MOB file name - file length Can be expensive for large amount of MOB files. 120 */ 121 static ThreadLocal<HashMap<String, Long>> mobLengthMap = 122 new ThreadLocal<HashMap<String, Long>>() { 123 @Override 124 protected HashMap<String, Long> initialValue() { 125 return new HashMap<String, Long>(); 126 } 127 }; 128 129 private final InternalScannerFactory scannerFactory = new InternalScannerFactory() { 130 131 @Override 132 public ScanType getScanType(CompactionRequestImpl request) { 133 return request.isAllFiles() ? COMPACT_DROP_DELETES : COMPACT_RETAIN_DELETES; 134 } 135 136 @Override 137 public InternalScanner createScanner(ScanInfo scanInfo, List<StoreFileScanner> scanners, 138 ScanType scanType, FileDetails fd, long smallestReadPoint) throws IOException { 139 return new StoreScanner(store, scanInfo, scanners, scanType, smallestReadPoint, 140 fd.earliestPutTs); 141 } 142 }; 143 144 private final CellSinkFactory<StoreFileWriter> writerFactory = 145 new CellSinkFactory<StoreFileWriter>() { 146 @Override 147 public StoreFileWriter createWriter(InternalScanner scanner, 148 org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd, 149 boolean shouldDropBehind, boolean major, Consumer<Path> writerCreationTracker) 150 throws IOException { 151 // make this writer with tags always because of possible new cells with tags. 152 return store.getStoreEngine() 153 .createWriter(createParams(fd, shouldDropBehind, major, writerCreationTracker) 154 .includeMVCCReadpoint(true).includesTag(true)); 155 } 156 }; 157 158 public DefaultMobStoreCompactor(Configuration conf, HStore store) { 159 super(conf, store); 160 // The mob cells reside in the mob-enabled column family which is held by HMobStore. 161 // During the compaction, the compactor reads the cells from the mob files and 162 // probably creates new mob files. All of these operations are included in HMobStore, 163 // so we need to cast the Store to HMobStore. 164 if (!(store instanceof HMobStore)) { 165 throw new IllegalArgumentException("The store " + store + " is not a HMobStore"); 166 } 167 this.mobStore = (HMobStore) store; 168 this.mobSizeThreshold = store.getColumnFamilyDescriptor().getMobThreshold(); 169 this.ioOptimizedMode = 170 conf.get(MobConstants.MOB_COMPACTION_TYPE_KEY, MobConstants.DEFAULT_MOB_COMPACTION_TYPE) 171 .equals(MobConstants.OPTIMIZED_MOB_COMPACTION_TYPE); 172 173 } 174 175 @Override 176 public List<Path> compact(CompactionRequestImpl request, 177 ThroughputController throughputController, User user) throws IOException { 178 String tableName = store.getTableName().toString(); 179 String regionName = store.getRegionInfo().getRegionNameAsString(); 180 String familyName = store.getColumnFamilyName(); 181 LOG.info( 182 "MOB compaction: major={} isAll={} priority={} throughput controller={}" 183 + " table={} cf={} region={}", 184 request.isMajor(), request.isAllFiles(), request.getPriority(), throughputController, 185 tableName, familyName, regionName); 186 if (request.getPriority() == HStore.PRIORITY_USER) { 187 userRequest.set(Boolean.TRUE); 188 } else { 189 userRequest.set(Boolean.FALSE); 190 } 191 LOG.debug("MOB compaction table={} cf={} region={} files: {}", tableName, familyName, 192 regionName, request.getFiles()); 193 // Check if I/O optimized MOB compaction 194 if (ioOptimizedMode) { 195 if (request.isMajor() && request.getPriority() == HStore.PRIORITY_USER) { 196 try { 197 final SetMultimap<TableName, String> mobRefs = request.getFiles().stream().map(file -> { 198 byte[] value = file.getMetadataValue(HStoreFile.MOB_FILE_REFS); 199 ImmutableSetMultimap.Builder<TableName, String> builder; 200 if (value == null) { 201 builder = ImmutableSetMultimap.builder(); 202 } else { 203 try { 204 builder = MobUtils.deserializeMobFileRefs(value); 205 } catch (RuntimeException exception) { 206 throw new RuntimeException("failure getting mob references for hfile " + file, 207 exception); 208 } 209 } 210 return builder; 211 }).reduce((a, b) -> a.putAll(b.build())).orElseGet(ImmutableSetMultimap::builder).build(); 212 // reset disableIO 213 disableIO.set(Boolean.FALSE); 214 if (!mobRefs.isEmpty()) { 215 calculateMobLengthMap(mobRefs); 216 } 217 LOG.info( 218 "Table={} cf={} region={}. I/O optimized MOB compaction. " 219 + "Total referenced MOB files: {}", 220 tableName, familyName, regionName, mobRefs.size()); 221 } catch (RuntimeException exception) { 222 throw new IOException("Failed to get list of referenced hfiles for request " + request, 223 exception); 224 } 225 } 226 } 227 228 return compact(request, scannerFactory, writerFactory, throughputController, user); 229 } 230 231 /** 232 * @param mobRefs multimap of original table name -> mob hfile 233 */ 234 private void calculateMobLengthMap(SetMultimap<TableName, String> mobRefs) throws IOException { 235 FileSystem fs = store.getFileSystem(); 236 HashMap<String, Long> map = mobLengthMap.get(); 237 map.clear(); 238 for (Map.Entry<TableName, String> reference : mobRefs.entries()) { 239 final TableName table = reference.getKey(); 240 final String mobfile = reference.getValue(); 241 if (MobFileName.isOldMobFileName(mobfile)) { 242 disableIO.set(Boolean.TRUE); 243 } 244 List<Path> locations = mobStore.getLocations(table); 245 for (Path p : locations) { 246 try { 247 FileStatus st = fs.getFileStatus(new Path(p, mobfile)); 248 long size = st.getLen(); 249 LOG.debug("Referenced MOB file={} size={}", mobfile, size); 250 map.put(mobfile, size); 251 break; 252 } catch (FileNotFoundException exception) { 253 LOG.debug("Mob file {} was not in location {}. May have other locations to try.", mobfile, 254 p); 255 } 256 } 257 if (!map.containsKey(mobfile)) { 258 throw new FileNotFoundException("Could not find mob file " + mobfile + " in the list of " 259 + "expected locations: " + locations); 260 } 261 262 } 263 } 264 265 /** 266 * Performs compaction on a column family with the mob flag enabled. This works only when MOB 267 * compaction is explicitly requested (by User), or by Master There are two modes of a MOB 268 * compaction:<br> 269 * <p> 270 * <ul> 271 * <li>1. Full mode - when all MOB data for a region is compacted into a single MOB file. 272 * <li>2. I/O optimized mode - for use cases with no or infrequent updates/deletes of a <br> 273 * MOB data. The main idea behind i/o optimized compaction is to limit maximum size of a MOB file 274 * produced during compaction and to limit I/O write/read amplification. 275 * </ul> 276 * The basic algorithm of compaction is the following: <br> 277 * 1. If the Put cell has a mob reference tag, the cell's value is the path of the mob file. 278 * <ol> 279 * <li>If the value size of a cell is larger than the threshold, this cell is regarded as a mob, 280 * directly copy the (with mob tag) cell into the new store file.</li> 281 * <li>Otherwise, retrieve the mob cell from the mob file, and writes a copy of the cell into the 282 * new store file.</li> 283 * </ol> 284 * 2. If the Put cell doesn't have a reference tag. 285 * <ol> 286 * <li>If the value size of a cell is larger than the threshold, this cell is regarded as a mob, 287 * write this cell to a mob file, and write the path of this mob file to the store file.</li> 288 * <li>Otherwise, directly write this cell into the store file.</li> 289 * </ol> 290 * @param fd File details 291 * @param scanner Where to read from. 292 * @param writer Where to write to. 293 * @param smallestReadPoint Smallest read point. 294 * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= 295 * smallestReadPoint 296 * @param throughputController The compaction throughput controller. 297 * @param request compaction request. 298 * @param progress Progress reporter. 299 * @return Whether compaction ended; false if it was interrupted for any reason. 300 */ 301 @Override 302 protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, 303 long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController, 304 CompactionRequestImpl request, CompactionProgress progress) throws IOException { 305 long bytesWrittenProgressForLog = 0; 306 long bytesWrittenProgressForShippedCall = 0; 307 // Clear old mob references 308 mobRefSet.get().clear(); 309 boolean isUserRequest = userRequest.get(); 310 boolean major = request.isAllFiles(); 311 boolean compactMOBs = major && isUserRequest; 312 boolean discardMobMiss = conf.getBoolean(MobConstants.MOB_UNSAFE_DISCARD_MISS_KEY, 313 MobConstants.DEFAULT_MOB_DISCARD_MISS); 314 if (discardMobMiss) { 315 LOG.warn("{}=true. This is unsafe setting recommended only when first upgrading to a version" 316 + " with the distributed mob compaction feature on a cluster that has experienced MOB data " 317 + "corruption.", MobConstants.MOB_UNSAFE_DISCARD_MISS_KEY); 318 } 319 long maxMobFileSize = conf.getLong(MobConstants.MOB_COMPACTION_MAX_FILE_SIZE_KEY, 320 MobConstants.DEFAULT_MOB_COMPACTION_MAX_FILE_SIZE); 321 boolean ioOptimizedMode = this.ioOptimizedMode && !disableIO.get(); 322 LOG.info( 323 "Compact MOB={} optimized configured={} optimized enabled={} maximum MOB file size={}" 324 + " major={} store={}", 325 compactMOBs, this.ioOptimizedMode, ioOptimizedMode, maxMobFileSize, major, getStoreInfo()); 326 // Since scanner.next() can return 'false' but still be delivering data, 327 // we have to use a do/while loop. 328 List<Cell> cells = new ArrayList<>(); 329 // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME 330 long currentTime = EnvironmentEdgeManager.currentTime(); 331 long lastMillis = 0; 332 if (LOG.isDebugEnabled()) { 333 lastMillis = currentTime; 334 } 335 CloseChecker closeChecker = new CloseChecker(conf, currentTime); 336 String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction"); 337 long now = 0; 338 boolean hasMore; 339 byte[] fileName = null; 340 StoreFileWriter mobFileWriter = null; 341 /* 342 * mobCells are used only to decide if we need to commit or abort current MOB output file. 343 */ 344 long mobCells = 0; 345 long cellsCountCompactedToMob = 0, cellsCountCompactedFromMob = 0; 346 long cellsSizeCompactedToMob = 0, cellsSizeCompactedFromMob = 0; 347 boolean finished = false; 348 349 ScannerContext scannerContext = 350 ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); 351 throughputController.start(compactionName); 352 KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null; 353 long shippedCallSizeLimit = 354 (long) request.getFiles().size() * this.store.getColumnFamilyDescriptor().getBlocksize(); 355 356 Cell mobCell = null; 357 try { 358 359 mobFileWriter = newMobWriter(fd, major, request.getWriterCreationTracker()); 360 fileName = Bytes.toBytes(mobFileWriter.getPath().getName()); 361 362 do { 363 hasMore = scanner.next(cells, scannerContext); 364 now = EnvironmentEdgeManager.currentTime(); 365 for (Cell c : cells) { 366 if (compactMOBs) { 367 if (MobUtils.isMobReferenceCell(c)) { 368 String fName = MobUtils.getMobFileName(c); 369 // Added to support migration 370 try { 371 mobCell = mobStore.resolve(c, true, false).getCell(); 372 } catch (FileNotFoundException fnfe) { 373 if (discardMobMiss) { 374 LOG.error("Missing MOB cell: file={} not found cell={}", fName, c); 375 continue; 376 } else { 377 throw fnfe; 378 } 379 } 380 381 if (discardMobMiss && mobCell.getValueLength() == 0) { 382 LOG.error("Missing MOB cell value: file={} mob cell={} cell={}", fName, mobCell, c); 383 continue; 384 } else if (mobCell.getValueLength() == 0) { 385 String errMsg = 386 String.format("Found 0 length MOB cell in a file=%s mob cell=%s " + " cell=%s", 387 fName, mobCell, c); 388 throw new IOException(errMsg); 389 } 390 391 if (mobCell.getValueLength() > mobSizeThreshold) { 392 // put the mob data back to the MOB store file 393 PrivateCellUtil.setSequenceId(mobCell, c.getSequenceId()); 394 if (!ioOptimizedMode) { 395 mobFileWriter.append(mobCell); 396 mobCells++; 397 writer.append( 398 MobUtils.createMobRefCell(mobCell, fileName, this.mobStore.getRefCellTags())); 399 } else { 400 // I/O optimized mode 401 // Check if MOB cell origin file size is 402 // greater than threshold 403 Long size = mobLengthMap.get().get(fName); 404 if (size == null) { 405 // FATAL error (we should never get here though), abort compaction 406 // This error means that meta section of store file does not contain 407 // MOB file, which has references in at least one cell from this store file 408 String msg = String.format( 409 "Found an unexpected MOB file during compaction %s, aborting compaction %s", 410 fName, getStoreInfo()); 411 throw new IOException(msg); 412 } 413 // Can not be null 414 if (size < maxMobFileSize) { 415 // If MOB cell origin file is below threshold 416 // it is get compacted 417 mobFileWriter.append(mobCell); 418 // Update number of mobCells in a current mob writer 419 mobCells++; 420 writer.append( 421 MobUtils.createMobRefCell(mobCell, fileName, this.mobStore.getRefCellTags())); 422 // Update total size of the output (we do not take into account 423 // file compression yet) 424 long len = mobFileWriter.getPos(); 425 if (len > maxMobFileSize) { 426 LOG.debug("Closing output MOB File, length={} file={}, store={}", len, 427 mobFileWriter.getPath().getName(), getStoreInfo()); 428 commitOrAbortMobWriter(mobFileWriter, fd.maxSeqId, mobCells, major); 429 mobFileWriter = newMobWriter(fd, major, request.getWriterCreationTracker()); 430 fileName = Bytes.toBytes(mobFileWriter.getPath().getName()); 431 mobCells = 0; 432 } 433 } else { 434 // We leave large MOB file as is (is not compacted), 435 // then we update set of MOB file references 436 // and append mob cell directly to the store's writer 437 Optional<TableName> refTable = MobUtils.getTableName(c); 438 if (refTable.isPresent()) { 439 mobRefSet.get().put(refTable.get(), fName); 440 writer.append(c); 441 } else { 442 throw new IOException(String.format("MOB cell did not contain a tablename " 443 + "tag. should not be possible. see ref guide on mob troubleshooting. " 444 + "store=%s cell=%s", getStoreInfo(), c)); 445 } 446 } 447 } 448 } else { 449 // If MOB value is less than threshold, append it directly to a store file 450 PrivateCellUtil.setSequenceId(mobCell, c.getSequenceId()); 451 writer.append(mobCell); 452 cellsCountCompactedFromMob++; 453 cellsSizeCompactedFromMob += mobCell.getValueLength(); 454 } 455 } else { 456 // Not a MOB reference cell 457 int size = c.getValueLength(); 458 if (size > mobSizeThreshold) { 459 // This MOB cell comes from a regular store file 460 // therefore we store it into original mob output 461 mobFileWriter.append(c); 462 writer 463 .append(MobUtils.createMobRefCell(c, fileName, this.mobStore.getRefCellTags())); 464 mobCells++; 465 cellsCountCompactedToMob++; 466 cellsSizeCompactedToMob += c.getValueLength(); 467 if (ioOptimizedMode) { 468 // Update total size of the output (we do not take into account 469 // file compression yet) 470 long len = mobFileWriter.getPos(); 471 if (len > maxMobFileSize) { 472 commitOrAbortMobWriter(mobFileWriter, fd.maxSeqId, mobCells, major); 473 mobFileWriter = newMobWriter(fd, major, request.getWriterCreationTracker()); 474 fileName = Bytes.toBytes(mobFileWriter.getPath().getName()); 475 mobCells = 0; 476 } 477 } 478 } else { 479 // Not a MOB cell, write it directly to a store file 480 writer.append(c); 481 } 482 } 483 } else if (c.getTypeByte() != KeyValue.Type.Put.getCode()) { 484 // Not a major compaction or major with MOB disabled 485 // If the kv type is not put, directly write the cell 486 // to the store file. 487 writer.append(c); 488 } else if (MobUtils.isMobReferenceCell(c)) { 489 // Not a major MOB compaction, Put MOB reference 490 if (MobUtils.hasValidMobRefCellValue(c)) { 491 // We do not check mobSizeThreshold during normal compaction, 492 // leaving it to a MOB compaction run 493 Optional<TableName> refTable = MobUtils.getTableName(c); 494 if (refTable.isPresent()) { 495 mobRefSet.get().put(refTable.get(), MobUtils.getMobFileName(c)); 496 writer.append(c); 497 } else { 498 throw new IOException(String.format("MOB cell did not contain a tablename " 499 + "tag. should not be possible. see ref guide on mob troubleshooting. " 500 + "store=%s cell=%s", getStoreInfo(), c)); 501 } 502 } else { 503 String errMsg = String.format("Corrupted MOB reference: %s", c.toString()); 504 throw new IOException(errMsg); 505 } 506 } else if (c.getValueLength() <= mobSizeThreshold) { 507 // If the value size of a cell is not larger than the threshold, directly write it to 508 // the store file. 509 writer.append(c); 510 } else { 511 // If the value size of a cell is larger than the threshold, it's regarded as a mob, 512 // write this cell to a mob file, and write the path to the store file. 513 mobCells++; 514 // append the original keyValue in the mob file. 515 mobFileWriter.append(c); 516 Cell reference = MobUtils.createMobRefCell(c, fileName, this.mobStore.getRefCellTags()); 517 // write the cell whose value is the path of a mob file to the store file. 518 writer.append(reference); 519 cellsCountCompactedToMob++; 520 cellsSizeCompactedToMob += c.getValueLength(); 521 if (ioOptimizedMode) { 522 long len = mobFileWriter.getPos(); 523 if (len > maxMobFileSize) { 524 commitOrAbortMobWriter(mobFileWriter, fd.maxSeqId, mobCells, major); 525 mobFileWriter = newMobWriter(fd, major, request.getWriterCreationTracker()); 526 fileName = Bytes.toBytes(mobFileWriter.getPath().getName()); 527 mobCells = 0; 528 } 529 } 530 } 531 532 int len = c.getSerializedSize(); 533 ++progress.currentCompactedKVs; 534 progress.totalCompactedSize += len; 535 bytesWrittenProgressForShippedCall += len; 536 if (LOG.isDebugEnabled()) { 537 bytesWrittenProgressForLog += len; 538 } 539 throughputController.control(compactionName, len); 540 if (closeChecker.isSizeLimit(store, len)) { 541 progress.cancel(); 542 return false; 543 } 544 if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) { 545 ((ShipperListener) writer).beforeShipped(); 546 kvs.shipped(); 547 bytesWrittenProgressForShippedCall = 0; 548 } 549 } 550 // Log the progress of long running compactions every minute if 551 // logging at DEBUG level 552 if (LOG.isDebugEnabled()) { 553 if ((now - lastMillis) >= COMPACTION_PROGRESS_LOG_INTERVAL) { 554 String rate = String.format("%.2f", 555 (bytesWrittenProgressForLog / 1024.0) / ((now - lastMillis) / 1000.0)); 556 LOG.debug("Compaction progress: {} {}, rate={} KB/sec, throughputController is {}", 557 compactionName, progress, rate, throughputController); 558 lastMillis = now; 559 bytesWrittenProgressForLog = 0; 560 } 561 } 562 cells.clear(); 563 } while (hasMore); 564 finished = true; 565 } catch (InterruptedException e) { 566 progress.cancel(); 567 throw new InterruptedIOException( 568 "Interrupted while control throughput of compacting " + compactionName); 569 } catch (IOException t) { 570 String msg = "Mob compaction failed for region: " + store.getRegionInfo().getEncodedName(); 571 throw new IOException(msg, t); 572 } finally { 573 // Clone last cell in the final because writer will append last cell when committing. If 574 // don't clone here and once the scanner get closed, then the memory of last cell will be 575 // released. (HBASE-22582) 576 ((ShipperListener) writer).beforeShipped(); 577 throughputController.finish(compactionName); 578 if (!finished && mobFileWriter != null) { 579 // Remove all MOB references because compaction failed 580 clearThreadLocals(); 581 // Abort writer 582 LOG.debug("Aborting writer for {} because of a compaction failure, Store {}", 583 mobFileWriter.getPath(), getStoreInfo()); 584 abortWriter(mobFileWriter); 585 } 586 } 587 588 // Commit last MOB writer 589 commitOrAbortMobWriter(mobFileWriter, fd.maxSeqId, mobCells, major); 590 mobStore.updateCellsCountCompactedFromMob(cellsCountCompactedFromMob); 591 mobStore.updateCellsCountCompactedToMob(cellsCountCompactedToMob); 592 mobStore.updateCellsSizeCompactedFromMob(cellsSizeCompactedFromMob); 593 mobStore.updateCellsSizeCompactedToMob(cellsSizeCompactedToMob); 594 progress.complete(); 595 return true; 596 } 597 598 protected String getStoreInfo() { 599 return String.format("[table=%s family=%s region=%s]", store.getTableName().getNameAsString(), 600 store.getColumnFamilyName(), store.getRegionInfo().getEncodedName()); 601 } 602 603 private void clearThreadLocals() { 604 mobRefSet.get().clear(); 605 HashMap<String, Long> map = mobLengthMap.get(); 606 if (map != null) { 607 map.clear(); 608 } 609 } 610 611 private StoreFileWriter newMobWriter(FileDetails fd, boolean major, 612 Consumer<Path> writerCreationTracker) throws IOException { 613 try { 614 StoreFileWriter mobFileWriter = mobStore.getStoreEngine().requireWritingToTmpDirFirst() 615 ? mobStore.createWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount, 616 major ? majorCompactionCompression : minorCompactionCompression, 617 store.getRegionInfo().getStartKey(), true) 618 : mobStore.createWriter(new Date(fd.latestPutTs), fd.maxKeyCount, 619 major ? majorCompactionCompression : minorCompactionCompression, 620 store.getRegionInfo().getStartKey(), true, writerCreationTracker); 621 LOG.debug("New MOB writer created={} store={}", mobFileWriter.getPath().getName(), 622 getStoreInfo()); 623 // Add reference we get for compact MOB 624 mobRefSet.get().put(store.getTableName(), mobFileWriter.getPath().getName()); 625 return mobFileWriter; 626 } catch (IOException e) { 627 // Bailing out 628 throw new IOException(String.format("Failed to create mob writer, store=%s", getStoreInfo()), 629 e); 630 } 631 } 632 633 private void commitOrAbortMobWriter(StoreFileWriter mobFileWriter, long maxSeqId, long mobCells, 634 boolean major) throws IOException { 635 // Commit or abort major mob writer 636 // If IOException happens during below operation, some 637 // MOB files can be committed partially, but corresponding 638 // store file won't be committed, therefore these MOB files 639 // become orphans and will be deleted during next MOB cleaning chore cycle 640 641 if (mobFileWriter != null) { 642 LOG.debug("Commit or abort size={} mobCells={} major={} file={}, store={}", 643 mobFileWriter.getPos(), mobCells, major, mobFileWriter.getPath().getName(), getStoreInfo()); 644 Path path = 645 MobUtils.getMobFamilyPath(conf, store.getTableName(), store.getColumnFamilyName()); 646 if (mobCells > 0) { 647 // If the mob file is not empty, commit it. 648 mobFileWriter.appendMetadata(maxSeqId, major, mobCells); 649 mobFileWriter.close(); 650 mobStore.commitFile(mobFileWriter.getPath(), path); 651 } else { 652 // If the mob file is empty, delete it instead of committing. 653 LOG.debug("Aborting writer for {} because there are no MOB cells, store={}", 654 mobFileWriter.getPath(), getStoreInfo()); 655 // Remove MOB file from reference set 656 mobRefSet.get().remove(store.getTableName(), mobFileWriter.getPath().getName()); 657 abortWriter(mobFileWriter); 658 } 659 } else { 660 LOG.debug("Mob file writer is null, skipping commit/abort, store=", getStoreInfo()); 661 } 662 } 663 664 @Override 665 protected List<Path> commitWriter(StoreFileWriter writer, FileDetails fd, 666 CompactionRequestImpl request) throws IOException { 667 List<Path> newFiles = Lists.newArrayList(writer.getPath()); 668 writer.appendMetadata(fd.maxSeqId, request.isAllFiles(), request.getFiles()); 669 writer.appendMobMetadata(mobRefSet.get()); 670 writer.close(); 671 clearThreadLocals(); 672 return newFiles; 673 } 674}