001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mob; 019 020import static org.apache.hadoop.hbase.regionserver.ScanType.COMPACT_DROP_DELETES; 021import static org.apache.hadoop.hbase.regionserver.ScanType.COMPACT_RETAIN_DELETES; 022 023import java.io.FileNotFoundException; 024import java.io.IOException; 025import java.io.InterruptedIOException; 026import java.util.ArrayList; 027import java.util.Date; 028import java.util.HashMap; 029import java.util.List; 030import java.util.Map.Entry; 031import java.util.Optional; 032import java.util.function.Consumer; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.FileStatus; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.hbase.Cell; 038import org.apache.hadoop.hbase.DoNotRetryIOException; 039import org.apache.hadoop.hbase.KeyValue; 040import org.apache.hadoop.hbase.PrivateCellUtil; 041import org.apache.hadoop.hbase.TableName; 042import org.apache.hadoop.hbase.regionserver.CellSink; 043import org.apache.hadoop.hbase.regionserver.HMobStore; 044import org.apache.hadoop.hbase.regionserver.HStore; 045import org.apache.hadoop.hbase.regionserver.HStoreFile; 046import org.apache.hadoop.hbase.regionserver.InternalScanner; 047import org.apache.hadoop.hbase.regionserver.KeyValueScanner; 048import org.apache.hadoop.hbase.regionserver.ScanInfo; 049import org.apache.hadoop.hbase.regionserver.ScanType; 050import org.apache.hadoop.hbase.regionserver.ScannerContext; 051import org.apache.hadoop.hbase.regionserver.ShipperListener; 052import org.apache.hadoop.hbase.regionserver.StoreFileScanner; 053import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 054import org.apache.hadoop.hbase.regionserver.StoreScanner; 055import org.apache.hadoop.hbase.regionserver.compactions.CloseChecker; 056import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; 057import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; 058import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; 059import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil; 060import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 061import org.apache.hadoop.hbase.security.User; 062import org.apache.hadoop.hbase.util.Bytes; 063import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 064import org.apache.yetus.audience.InterfaceAudience; 065import org.slf4j.Logger; 066import org.slf4j.LoggerFactory; 067 068import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap; 069import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSetMultimap; 070import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 071import org.apache.hbase.thirdparty.com.google.common.collect.SetMultimap; 072 073/** 074 * Compact passed set of files in the mob-enabled column family. 075 */ 076@InterfaceAudience.Private 077public class DefaultMobStoreCompactor extends DefaultCompactor { 078 079 private static final Logger LOG = LoggerFactory.getLogger(DefaultMobStoreCompactor.class); 080 protected long mobSizeThreshold; 081 protected HMobStore mobStore; 082 protected boolean ioOptimizedMode = false; 083 084 /* 085 * MOB file reference set thread local variable. It contains set of a MOB file names, which newly 086 * compacted store file has references to. This variable is populated during compaction and the 087 * content of it is written into meta section of a newly created store file at the final step of 088 * compaction process. 089 */ 090 static ThreadLocal<SetMultimap<TableName, String>> mobRefSet = 091 ThreadLocal.withInitial(HashMultimap::create); 092 093 /* 094 * Is it user or system-originated request. 095 */ 096 097 static ThreadLocal<Boolean> userRequest = new ThreadLocal<Boolean>() { 098 @Override 099 protected Boolean initialValue() { 100 return Boolean.FALSE; 101 } 102 }; 103 104 /* 105 * Disable IO mode. IO mode can be forcefully disabled if compactor finds old MOB file 106 * (pre-distributed compaction). This means that migration has not been completed yet. During data 107 * migration (upgrade) process only general compaction is allowed. 108 */ 109 110 static ThreadLocal<Boolean> disableIO = new ThreadLocal<Boolean>() { 111 112 @Override 113 protected Boolean initialValue() { 114 return Boolean.FALSE; 115 } 116 }; 117 118 /* 119 * Map : MOB file name - file length Can be expensive for large amount of MOB files. 120 */ 121 static ThreadLocal<HashMap<String, Long>> mobLengthMap = 122 new ThreadLocal<HashMap<String, Long>>() { 123 @Override 124 protected HashMap<String, Long> initialValue() { 125 return new HashMap<String, Long>(); 126 } 127 }; 128 129 private final InternalScannerFactory scannerFactory = new InternalScannerFactory() { 130 131 @Override 132 public ScanType getScanType(CompactionRequestImpl request) { 133 return request.isAllFiles() ? COMPACT_DROP_DELETES : COMPACT_RETAIN_DELETES; 134 } 135 136 @Override 137 public InternalScanner createScanner(ScanInfo scanInfo, List<StoreFileScanner> scanners, 138 ScanType scanType, FileDetails fd, long smallestReadPoint) throws IOException { 139 return new StoreScanner(store, scanInfo, scanners, scanType, smallestReadPoint, 140 fd.earliestPutTs); 141 } 142 }; 143 144 private final CellSinkFactory<StoreFileWriter> writerFactory = 145 new CellSinkFactory<StoreFileWriter>() { 146 @Override 147 public StoreFileWriter createWriter(InternalScanner scanner, 148 org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd, 149 boolean shouldDropBehind, boolean major, Consumer<Path> writerCreationTracker) 150 throws IOException { 151 // make this writer with tags always because of possible new cells with tags. 152 return store.getStoreEngine() 153 .createWriter(createParams(fd, shouldDropBehind, major, writerCreationTracker) 154 .includeMVCCReadpoint(true).includesTag(true)); 155 } 156 }; 157 158 public DefaultMobStoreCompactor(Configuration conf, HStore store) { 159 super(conf, store); 160 // The mob cells reside in the mob-enabled column family which is held by HMobStore. 161 // During the compaction, the compactor reads the cells from the mob files and 162 // probably creates new mob files. All of these operations are included in HMobStore, 163 // so we need to cast the Store to HMobStore. 164 if (!(store instanceof HMobStore)) { 165 throw new IllegalArgumentException("The store " + store + " is not a HMobStore"); 166 } 167 this.mobStore = (HMobStore) store; 168 this.mobSizeThreshold = store.getColumnFamilyDescriptor().getMobThreshold(); 169 this.ioOptimizedMode = 170 conf.get(MobConstants.MOB_COMPACTION_TYPE_KEY, MobConstants.DEFAULT_MOB_COMPACTION_TYPE) 171 .equals(MobConstants.OPTIMIZED_MOB_COMPACTION_TYPE); 172 173 } 174 175 @Override 176 public List<Path> compact(CompactionRequestImpl request, 177 ThroughputController throughputController, User user) throws IOException { 178 String tableName = store.getTableName().toString(); 179 String regionName = store.getRegionInfo().getRegionNameAsString(); 180 String familyName = store.getColumnFamilyName(); 181 LOG.info( 182 "MOB compaction: major={} isAll={} priority={} throughput controller={}" 183 + " table={} cf={} region={}", 184 request.isMajor(), request.isAllFiles(), request.getPriority(), throughputController, 185 tableName, familyName, regionName); 186 if (request.getPriority() == HStore.PRIORITY_USER) { 187 userRequest.set(Boolean.TRUE); 188 } else { 189 userRequest.set(Boolean.FALSE); 190 } 191 LOG.debug("MOB compaction table={} cf={} region={} files: {}", tableName, familyName, 192 regionName, request.getFiles()); 193 // Check if I/O optimized MOB compaction 194 if (ioOptimizedMode) { 195 if (request.isMajor() && request.getPriority() == HStore.PRIORITY_USER) { 196 try { 197 final SetMultimap<TableName, String> mobRefs = request.getFiles().stream().map(file -> { 198 byte[] value = file.getMetadataValue(HStoreFile.MOB_FILE_REFS); 199 ImmutableSetMultimap.Builder<TableName, String> builder; 200 if (value == null) { 201 builder = ImmutableSetMultimap.builder(); 202 } else { 203 try { 204 builder = MobUtils.deserializeMobFileRefs(value); 205 } catch (RuntimeException exception) { 206 throw new RuntimeException("failure getting mob references for hfile " + file, 207 exception); 208 } 209 } 210 return builder; 211 }).reduce((a, b) -> a.putAll(b.build())).orElseGet(ImmutableSetMultimap::builder).build(); 212 // reset disableIO 213 disableIO.set(Boolean.FALSE); 214 if (!mobRefs.isEmpty()) { 215 calculateMobLengthMap(mobRefs); 216 } 217 LOG.info( 218 "Table={} cf={} region={}. I/O optimized MOB compaction. " 219 + "Total referenced MOB files: {}", 220 tableName, familyName, regionName, mobRefs.size()); 221 } catch (RuntimeException exception) { 222 throw new IOException("Failed to get list of referenced hfiles for request " + request, 223 exception); 224 } 225 } 226 } 227 228 return compact(request, scannerFactory, writerFactory, throughputController, user); 229 } 230 231 /** 232 * @param mobRefs multimap of original table name -> mob hfile 233 */ 234 private void calculateMobLengthMap(SetMultimap<TableName, String> mobRefs) throws IOException { 235 FileSystem fs = store.getFileSystem(); 236 HashMap<String, Long> map = mobLengthMap.get(); 237 map.clear(); 238 for (Entry<TableName, String> reference : mobRefs.entries()) { 239 final TableName table = reference.getKey(); 240 final String mobfile = reference.getValue(); 241 if (MobFileName.isOldMobFileName(mobfile)) { 242 disableIO.set(Boolean.TRUE); 243 } 244 List<Path> locations = mobStore.getLocations(table); 245 for (Path p : locations) { 246 try { 247 FileStatus st = fs.getFileStatus(new Path(p, mobfile)); 248 long size = st.getLen(); 249 LOG.debug("Referenced MOB file={} size={}", mobfile, size); 250 map.put(mobfile, size); 251 break; 252 } catch (FileNotFoundException exception) { 253 LOG.debug("Mob file {} was not in location {}. May have other locations to try.", mobfile, 254 p); 255 } 256 } 257 if (!map.containsKey(mobfile)) { 258 throw new FileNotFoundException("Could not find mob file " + mobfile + " in the list of " 259 + "expected locations: " + locations); 260 } 261 } 262 } 263 264 /** 265 * Performs compaction on a column family with the mob flag enabled. This works only when MOB 266 * compaction is explicitly requested (by User), or by Master There are two modes of a MOB 267 * compaction:<br> 268 * <p> 269 * <ul> 270 * <li>1. Full mode - when all MOB data for a region is compacted into a single MOB file. 271 * <li>2. I/O optimized mode - for use cases with no or infrequent updates/deletes of a <br> 272 * MOB data. The main idea behind i/o optimized compaction is to limit maximum size of a MOB file 273 * produced during compaction and to limit I/O write/read amplification. 274 * </ul> 275 * The basic algorithm of compaction is the following: <br> 276 * 1. If the Put cell has a mob reference tag, the cell's value is the path of the mob file. 277 * <ol> 278 * <li>If the value size of a cell is larger than the threshold, this cell is regarded as a mob, 279 * directly copy the (with mob tag) cell into the new store file.</li> 280 * <li>Otherwise, retrieve the mob cell from the mob file, and writes a copy of the cell into the 281 * new store file.</li> 282 * </ol> 283 * 2. If the Put cell doesn't have a reference tag. 284 * <ol> 285 * <li>If the value size of a cell is larger than the threshold, this cell is regarded as a mob, 286 * write this cell to a mob file, and write the path of this mob file to the store file.</li> 287 * <li>Otherwise, directly write this cell into the store file.</li> 288 * </ol> 289 * @param fd File details 290 * @param scanner Where to read from. 291 * @param writer Where to write to. 292 * @param smallestReadPoint Smallest read point. 293 * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= 294 * smallestReadPoint 295 * @param throughputController The compaction throughput controller. 296 * @param request compaction request. 297 * @param progress Progress reporter. 298 * @return Whether compaction ended; false if it was interrupted for any reason. 299 */ 300 @Override 301 protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, 302 long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController, 303 CompactionRequestImpl request, CompactionProgress progress) throws IOException { 304 long bytesWrittenProgressForLog = 0; 305 long bytesWrittenProgressForShippedCall = 0; 306 // Clear old mob references 307 mobRefSet.get().clear(); 308 boolean isUserRequest = userRequest.get(); 309 boolean major = request.isAllFiles(); 310 boolean compactMOBs = major && isUserRequest; 311 boolean discardMobMiss = conf.getBoolean(MobConstants.MOB_UNSAFE_DISCARD_MISS_KEY, 312 MobConstants.DEFAULT_MOB_DISCARD_MISS); 313 if (discardMobMiss) { 314 LOG.warn("{}=true. This is unsafe setting recommended only when first upgrading to a version" 315 + " with the distributed mob compaction feature on a cluster that has experienced MOB data " 316 + "corruption.", MobConstants.MOB_UNSAFE_DISCARD_MISS_KEY); 317 } 318 long maxMobFileSize = conf.getLong(MobConstants.MOB_COMPACTION_MAX_FILE_SIZE_KEY, 319 MobConstants.DEFAULT_MOB_COMPACTION_MAX_FILE_SIZE); 320 boolean ioOptimizedMode = this.ioOptimizedMode && !disableIO.get(); 321 LOG.info( 322 "Compact MOB={} optimized configured={} optimized enabled={} maximum MOB file size={}" 323 + " major={} store={}", 324 compactMOBs, this.ioOptimizedMode, ioOptimizedMode, maxMobFileSize, major, getStoreInfo()); 325 // Since scanner.next() can return 'false' but still be delivering data, 326 // we have to use a do/while loop. 327 List<Cell> cells = new ArrayList<>(); 328 // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME 329 long currentTime = EnvironmentEdgeManager.currentTime(); 330 long lastMillis = 0; 331 if (LOG.isDebugEnabled()) { 332 lastMillis = currentTime; 333 } 334 CloseChecker closeChecker = new CloseChecker(conf, currentTime); 335 String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction"); 336 long now = 0; 337 boolean hasMore; 338 byte[] fileName = null; 339 StoreFileWriter mobFileWriter = null; 340 /* 341 * mobCells are used only to decide if we need to commit or abort current MOB output file. 342 */ 343 long mobCells = 0; 344 long cellsCountCompactedToMob = 0, cellsCountCompactedFromMob = 0; 345 long cellsSizeCompactedToMob = 0, cellsSizeCompactedFromMob = 0; 346 boolean finished = false; 347 348 ScannerContext scannerContext = 349 ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); 350 throughputController.start(compactionName); 351 KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null; 352 long shippedCallSizeLimit = 353 (long) request.getFiles().size() * this.store.getColumnFamilyDescriptor().getBlocksize(); 354 355 Cell mobCell = null; 356 List<String> committedMobWriterFileNames = new ArrayList<>(); 357 try { 358 359 mobFileWriter = newMobWriter(fd, major, request.getWriterCreationTracker()); 360 fileName = Bytes.toBytes(mobFileWriter.getPath().getName()); 361 362 do { 363 hasMore = scanner.next(cells, scannerContext); 364 currentTime = EnvironmentEdgeManager.currentTime(); 365 if (LOG.isDebugEnabled()) { 366 now = currentTime; 367 } 368 if (closeChecker.isTimeLimit(store, currentTime)) { 369 progress.cancel(); 370 return false; 371 } 372 for (Cell c : cells) { 373 if (compactMOBs) { 374 if (MobUtils.isMobReferenceCell(c)) { 375 String fName = MobUtils.getMobFileName(c); 376 // Added to support migration 377 try { 378 mobCell = mobStore.resolve(c, true, false).getCell(); 379 } catch (DoNotRetryIOException e) { 380 if ( 381 discardMobMiss && e.getCause() != null 382 && e.getCause() instanceof FileNotFoundException 383 ) { 384 LOG.error("Missing MOB cell: file={} not found cell={}", fName, c); 385 continue; 386 } else { 387 throw e; 388 } 389 } 390 391 if (discardMobMiss && mobCell.getValueLength() == 0) { 392 LOG.error("Missing MOB cell value: file={} mob cell={} cell={}", fName, mobCell, c); 393 continue; 394 } else if (mobCell.getValueLength() == 0) { 395 String errMsg = 396 String.format("Found 0 length MOB cell in a file=%s mob cell=%s " + " cell=%s", 397 fName, mobCell, c); 398 throw new IOException(errMsg); 399 } 400 401 if (mobCell.getValueLength() > mobSizeThreshold) { 402 // put the mob data back to the MOB store file 403 PrivateCellUtil.setSequenceId(mobCell, c.getSequenceId()); 404 if (!ioOptimizedMode) { 405 mobFileWriter.append(mobCell); 406 mobCells++; 407 writer.append( 408 MobUtils.createMobRefCell(mobCell, fileName, this.mobStore.getRefCellTags())); 409 } else { 410 // I/O optimized mode 411 // Check if MOB cell origin file size is 412 // greater than threshold 413 Long size = mobLengthMap.get().get(fName); 414 if (size == null) { 415 // FATAL error (we should never get here though), abort compaction 416 // This error means that meta section of store file does not contain 417 // MOB file, which has references in at least one cell from this store file 418 String msg = String.format( 419 "Found an unexpected MOB file during compaction %s, aborting compaction %s", 420 fName, getStoreInfo()); 421 throw new IOException(msg); 422 } 423 // Can not be null 424 if (size < maxMobFileSize) { 425 // If MOB cell origin file is below threshold 426 // it is get compacted 427 mobFileWriter.append(mobCell); 428 // Update number of mobCells in a current mob writer 429 mobCells++; 430 writer.append( 431 MobUtils.createMobRefCell(mobCell, fileName, this.mobStore.getRefCellTags())); 432 // Update total size of the output (we do not take into account 433 // file compression yet) 434 long len = mobFileWriter.getPos(); 435 if (len > maxMobFileSize) { 436 LOG.debug("Closing output MOB File, length={} file={}, store={}", len, 437 mobFileWriter.getPath().getName(), getStoreInfo()); 438 mobFileWriter = switchToNewMobWriter(mobFileWriter, fd, mobCells, major, 439 request, committedMobWriterFileNames); 440 fileName = Bytes.toBytes(mobFileWriter.getPath().getName()); 441 mobCells = 0; 442 } 443 } else { 444 // We leave large MOB file as is (is not compacted), 445 // then we update set of MOB file references 446 // and append mob cell directly to the store's writer 447 Optional<TableName> refTable = MobUtils.getTableName(c); 448 if (refTable.isPresent()) { 449 mobRefSet.get().put(refTable.get(), fName); 450 writer.append(c); 451 } else { 452 throw new IOException(String.format("MOB cell did not contain a tablename " 453 + "tag. should not be possible. see ref guide on mob troubleshooting. " 454 + "store=%s cell=%s", getStoreInfo(), c)); 455 } 456 } 457 } 458 } else { 459 // If MOB value is less than threshold, append it directly to a store file 460 PrivateCellUtil.setSequenceId(mobCell, c.getSequenceId()); 461 writer.append(mobCell); 462 cellsCountCompactedFromMob++; 463 cellsSizeCompactedFromMob += mobCell.getValueLength(); 464 } 465 } else { 466 // Not a MOB reference cell 467 int size = c.getValueLength(); 468 if (size > mobSizeThreshold) { 469 // This MOB cell comes from a regular store file 470 // therefore we store it into original mob output 471 mobFileWriter.append(c); 472 writer 473 .append(MobUtils.createMobRefCell(c, fileName, this.mobStore.getRefCellTags())); 474 mobCells++; 475 cellsCountCompactedToMob++; 476 cellsSizeCompactedToMob += c.getValueLength(); 477 if (ioOptimizedMode) { 478 // Update total size of the output (we do not take into account 479 // file compression yet) 480 long len = mobFileWriter.getPos(); 481 if (len > maxMobFileSize) { 482 mobFileWriter = switchToNewMobWriter(mobFileWriter, fd, mobCells, major, 483 request, committedMobWriterFileNames); 484 fileName = Bytes.toBytes(mobFileWriter.getPath().getName()); 485 mobCells = 0; 486 } 487 } 488 } else { 489 // Not a MOB cell, write it directly to a store file 490 writer.append(c); 491 } 492 } 493 } else if (c.getTypeByte() != KeyValue.Type.Put.getCode()) { 494 // Not a major compaction or major with MOB disabled 495 // If the kv type is not put, directly write the cell 496 // to the store file. 497 writer.append(c); 498 } else if (MobUtils.isMobReferenceCell(c)) { 499 // Not a major MOB compaction, Put MOB reference 500 if (MobUtils.hasValidMobRefCellValue(c)) { 501 // We do not check mobSizeThreshold during normal compaction, 502 // leaving it to a MOB compaction run 503 Optional<TableName> refTable = MobUtils.getTableName(c); 504 if (refTable.isPresent()) { 505 mobRefSet.get().put(refTable.get(), MobUtils.getMobFileName(c)); 506 writer.append(c); 507 } else { 508 throw new IOException(String.format("MOB cell did not contain a tablename " 509 + "tag. should not be possible. see ref guide on mob troubleshooting. " 510 + "store=%s cell=%s", getStoreInfo(), c)); 511 } 512 } else { 513 String errMsg = String.format("Corrupted MOB reference: %s", c.toString()); 514 throw new IOException(errMsg); 515 } 516 } else if (c.getValueLength() <= mobSizeThreshold) { 517 // If the value size of a cell is not larger than the threshold, directly write it to 518 // the store file. 519 writer.append(c); 520 } else { 521 // If the value size of a cell is larger than the threshold, it's regarded as a mob, 522 // write this cell to a mob file, and write the path to the store file. 523 mobCells++; 524 // append the original keyValue in the mob file. 525 mobFileWriter.append(c); 526 Cell reference = MobUtils.createMobRefCell(c, fileName, this.mobStore.getRefCellTags()); 527 // write the cell whose value is the path of a mob file to the store file. 528 writer.append(reference); 529 cellsCountCompactedToMob++; 530 cellsSizeCompactedToMob += c.getValueLength(); 531 if (ioOptimizedMode) { 532 long len = mobFileWriter.getPos(); 533 if (len > maxMobFileSize) { 534 mobFileWriter = switchToNewMobWriter(mobFileWriter, fd, mobCells, major, request, 535 committedMobWriterFileNames); 536 fileName = Bytes.toBytes(mobFileWriter.getPath().getName()); 537 mobCells = 0; 538 } 539 } 540 } 541 542 int len = c.getSerializedSize(); 543 ++progress.currentCompactedKVs; 544 progress.totalCompactedSize += len; 545 bytesWrittenProgressForShippedCall += len; 546 if (LOG.isDebugEnabled()) { 547 bytesWrittenProgressForLog += len; 548 } 549 throughputController.control(compactionName, len); 550 if (closeChecker.isSizeLimit(store, len)) { 551 progress.cancel(); 552 return false; 553 } 554 if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) { 555 ((ShipperListener) writer).beforeShipped(); 556 kvs.shipped(); 557 bytesWrittenProgressForShippedCall = 0; 558 } 559 } 560 // Log the progress of long running compactions every minute if 561 // logging at DEBUG level 562 if (LOG.isDebugEnabled()) { 563 if ((now - lastMillis) >= COMPACTION_PROGRESS_LOG_INTERVAL) { 564 String rate = String.format("%.2f", 565 (bytesWrittenProgressForLog / 1024.0) / ((now - lastMillis) / 1000.0)); 566 LOG.debug("Compaction progress: {} {}, rate={} KB/sec, throughputController is {}", 567 compactionName, progress, rate, throughputController); 568 lastMillis = now; 569 bytesWrittenProgressForLog = 0; 570 } 571 } 572 cells.clear(); 573 } while (hasMore); 574 // Commit last MOB writer 575 commitOrAbortMobWriter(mobFileWriter, fd.maxSeqId, mobCells, major); 576 finished = true; 577 } catch (InterruptedException e) { 578 progress.cancel(); 579 throw new InterruptedIOException( 580 "Interrupted while control throughput of compacting " + compactionName); 581 } catch (IOException t) { 582 String msg = "Mob compaction failed for region: " + store.getRegionInfo().getEncodedName(); 583 throw new IOException(msg, t); 584 } finally { 585 // Clone last cell in the final because writer will append last cell when committing. If 586 // don't clone here and once the scanner get closed, then the memory of last cell will be 587 // released. (HBASE-22582) 588 ((ShipperListener) writer).beforeShipped(); 589 throughputController.finish(compactionName); 590 if (!finished && mobFileWriter != null) { 591 // Remove all MOB references because compaction failed 592 clearThreadLocals(); 593 // Abort writer 594 LOG.debug("Aborting writer for {} because of a compaction failure, Store {}", 595 mobFileWriter.getPath(), getStoreInfo()); 596 abortWriter(mobFileWriter); 597 deleteCommittedMobFiles(committedMobWriterFileNames); 598 } 599 } 600 601 mobStore.updateCellsCountCompactedFromMob(cellsCountCompactedFromMob); 602 mobStore.updateCellsCountCompactedToMob(cellsCountCompactedToMob); 603 mobStore.updateCellsSizeCompactedFromMob(cellsSizeCompactedFromMob); 604 mobStore.updateCellsSizeCompactedToMob(cellsSizeCompactedToMob); 605 progress.complete(); 606 return true; 607 } 608 609 protected String getStoreInfo() { 610 return String.format("[table=%s family=%s region=%s]", store.getTableName().getNameAsString(), 611 store.getColumnFamilyName(), store.getRegionInfo().getEncodedName()); 612 } 613 614 private void clearThreadLocals() { 615 mobRefSet.get().clear(); 616 HashMap<String, Long> map = mobLengthMap.get(); 617 if (map != null) { 618 map.clear(); 619 } 620 } 621 622 private StoreFileWriter newMobWriter(FileDetails fd, boolean major, 623 Consumer<Path> writerCreationTracker) throws IOException { 624 try { 625 StoreFileWriter mobFileWriter = mobStore.getStoreEngine().requireWritingToTmpDirFirst() 626 ? mobStore.createWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount, 627 major ? majorCompactionCompression : minorCompactionCompression, 628 store.getRegionInfo().getStartKey(), true) 629 : mobStore.createWriter(new Date(fd.latestPutTs), fd.maxKeyCount, 630 major ? majorCompactionCompression : minorCompactionCompression, 631 store.getRegionInfo().getStartKey(), true, writerCreationTracker); 632 LOG.debug("New MOB writer created={} store={}", mobFileWriter.getPath().getName(), 633 getStoreInfo()); 634 // Add reference we get for compact MOB 635 mobRefSet.get().put(store.getTableName(), mobFileWriter.getPath().getName()); 636 return mobFileWriter; 637 } catch (IOException e) { 638 // Bailing out 639 throw new IOException(String.format("Failed to create mob writer, store=%s", getStoreInfo()), 640 e); 641 } 642 } 643 644 private void commitOrAbortMobWriter(StoreFileWriter mobFileWriter, long maxSeqId, long mobCells, 645 boolean major) throws IOException { 646 // Commit or abort major mob writer 647 // If IOException happens during below operation, some 648 // MOB files can be committed partially, but corresponding 649 // store file won't be committed, therefore these MOB files 650 // become orphans and will be deleted during next MOB cleaning chore cycle 651 652 if (mobFileWriter != null) { 653 LOG.debug("Commit or abort size={} mobCells={} major={} file={}, store={}", 654 mobFileWriter.getPos(), mobCells, major, mobFileWriter.getPath().getName(), getStoreInfo()); 655 Path path = 656 MobUtils.getMobFamilyPath(conf, store.getTableName(), store.getColumnFamilyName()); 657 if (mobCells > 0) { 658 // If the mob file is not empty, commit it. 659 mobFileWriter.appendMetadata(maxSeqId, major, mobCells); 660 mobFileWriter.close(); 661 mobStore.commitFile(mobFileWriter.getPath(), path); 662 } else { 663 // If the mob file is empty, delete it instead of committing. 664 LOG.debug("Aborting writer for {} because there are no MOB cells, store={}", 665 mobFileWriter.getPath(), getStoreInfo()); 666 // Remove MOB file from reference set 667 mobRefSet.get().remove(store.getTableName(), mobFileWriter.getPath().getName()); 668 abortWriter(mobFileWriter); 669 } 670 } else { 671 LOG.debug("Mob file writer is null, skipping commit/abort, store=", getStoreInfo()); 672 } 673 } 674 675 @Override 676 protected List<Path> commitWriter(StoreFileWriter writer, FileDetails fd, 677 CompactionRequestImpl request) throws IOException { 678 List<Path> newFiles = Lists.newArrayList(writer.getPath()); 679 writer.appendMetadata(fd.maxSeqId, request.isAllFiles(), request.getFiles()); 680 writer.appendMobMetadata(mobRefSet.get()); 681 writer.close(); 682 clearThreadLocals(); 683 return newFiles; 684 } 685 686 private StoreFileWriter switchToNewMobWriter(StoreFileWriter mobFileWriter, FileDetails fd, 687 long mobCells, boolean major, CompactionRequestImpl request, 688 List<String> committedMobWriterFileNames) throws IOException { 689 commitOrAbortMobWriter(mobFileWriter, fd.maxSeqId, mobCells, major); 690 committedMobWriterFileNames.add(mobFileWriter.getPath().getName()); 691 return newMobWriter(fd, major, request.getWriterCreationTracker()); 692 } 693 694 private void deleteCommittedMobFiles(List<String> fileNames) { 695 if (fileNames.isEmpty()) { 696 return; 697 } 698 Path mobColumnFamilyPath = 699 MobUtils.getMobFamilyPath(conf, store.getTableName(), store.getColumnFamilyName()); 700 for (String fileName : fileNames) { 701 if (fileName == null) { 702 continue; 703 } 704 Path path = new Path(mobColumnFamilyPath, fileName); 705 try { 706 if (store.getFileSystem().exists(path)) { 707 store.getFileSystem().delete(path, false); 708 } 709 } catch (IOException e) { 710 LOG.warn("Failed to delete the mob file {} for an failed mob compaction.", path, e); 711 } 712 } 713 714 } 715 716}