001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.mob.compactions; 020 021import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY; 022import static org.apache.hadoop.hbase.regionserver.HStoreFile.MOB_CELLS_COUNT; 023import static org.apache.hadoop.hbase.regionserver.HStoreFile.SKIP_RESET_SEQ_ID; 024 025import java.io.FileNotFoundException; 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.Calendar; 029import java.util.Collection; 030import java.util.Collections; 031import java.util.Comparator; 032import java.util.Date; 033import java.util.HashMap; 034import java.util.List; 035import java.util.Map; 036import java.util.Map.Entry; 037import java.util.NavigableMap; 038import java.util.Objects; 039import java.util.TreeMap; 040import java.util.concurrent.Callable; 041import java.util.concurrent.ExecutorService; 042import java.util.concurrent.Future; 043 044import org.apache.hadoop.conf.Configuration; 045import org.apache.hadoop.fs.FileStatus; 046import org.apache.hadoop.fs.FileSystem; 047import org.apache.hadoop.fs.Path; 048import org.apache.hadoop.hbase.ArrayBackedTag; 049import org.apache.hadoop.hbase.Cell; 050import org.apache.hadoop.hbase.CellComparator; 051import org.apache.hadoop.hbase.HConstants; 052import org.apache.hadoop.hbase.TableName; 053import org.apache.hadoop.hbase.Tag; 054import org.apache.hadoop.hbase.TagType; 055import org.apache.hadoop.hbase.TagUtil; 056import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 057import org.apache.hadoop.hbase.client.Connection; 058import org.apache.hadoop.hbase.client.ConnectionFactory; 059import org.apache.hadoop.hbase.client.MobCompactPartitionPolicy; 060import org.apache.hadoop.hbase.client.Table; 061import org.apache.hadoop.hbase.io.HFileLink; 062import org.apache.hadoop.hbase.io.crypto.Encryption; 063import org.apache.hadoop.hbase.io.hfile.CacheConfig; 064import org.apache.hadoop.hbase.io.hfile.HFile; 065import org.apache.hadoop.hbase.io.hfile.HFile.Reader; 066import org.apache.hadoop.hbase.mob.MobConstants; 067import org.apache.hadoop.hbase.mob.MobFileName; 068import org.apache.hadoop.hbase.mob.MobUtils; 069import org.apache.hadoop.hbase.mob.compactions.MobCompactionRequest.CompactionType; 070import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionDelPartition; 071import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionDelPartitionId; 072import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartition; 073import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartitionId; 074import org.apache.hadoop.hbase.regionserver.BloomType; 075import org.apache.hadoop.hbase.regionserver.HStore; 076import org.apache.hadoop.hbase.regionserver.HStoreFile; 077import org.apache.hadoop.hbase.regionserver.ScanInfo; 078import org.apache.hadoop.hbase.regionserver.ScanType; 079import org.apache.hadoop.hbase.regionserver.ScannerContext; 080import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 081import org.apache.hadoop.hbase.regionserver.StoreFileScanner; 082import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 083import org.apache.hadoop.hbase.regionserver.StoreScanner; 084import org.apache.hadoop.hbase.security.EncryptionUtil; 085import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 086import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; 087import org.apache.hadoop.hbase.util.Bytes; 088import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 089import org.apache.hadoop.hbase.util.Pair; 090import org.apache.yetus.audience.InterfaceAudience; 091import org.slf4j.Logger; 092import org.slf4j.LoggerFactory; 093 094/** 095 * An implementation of {@link MobCompactor} that compacts the mob files in partitions. 096 */ 097@InterfaceAudience.Private 098public class PartitionedMobCompactor extends MobCompactor { 099 100 private static final Logger LOG = LoggerFactory.getLogger(PartitionedMobCompactor.class); 101 protected long mergeableSize; 102 protected int delFileMaxCount; 103 /** The number of files compacted in a batch */ 104 protected int compactionBatchSize; 105 protected int compactionKVMax; 106 107 private final Path tempPath; 108 private final Path bulkloadPath; 109 private final CacheConfig compactionCacheConfig; 110 private final byte[] refCellTags; 111 private Encryption.Context cryptoContext = Encryption.Context.NONE; 112 113 public PartitionedMobCompactor(Configuration conf, FileSystem fs, TableName tableName, 114 ColumnFamilyDescriptor column, ExecutorService pool) throws IOException { 115 super(conf, fs, tableName, column, pool); 116 mergeableSize = conf.getLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD, 117 MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD); 118 delFileMaxCount = conf.getInt(MobConstants.MOB_DELFILE_MAX_COUNT, 119 MobConstants.DEFAULT_MOB_DELFILE_MAX_COUNT); 120 // default is 100 121 compactionBatchSize = conf.getInt(MobConstants.MOB_COMPACTION_BATCH_SIZE, 122 MobConstants.DEFAULT_MOB_COMPACTION_BATCH_SIZE); 123 tempPath = new Path(MobUtils.getMobHome(conf), MobConstants.TEMP_DIR_NAME); 124 bulkloadPath = new Path(tempPath, new Path(MobConstants.BULKLOAD_DIR_NAME, new Path( 125 tableName.getNamespaceAsString(), tableName.getQualifierAsString()))); 126 compactionKVMax = this.conf.getInt(HConstants.COMPACTION_KV_MAX, 127 HConstants.COMPACTION_KV_MAX_DEFAULT); 128 Configuration copyOfConf = new Configuration(conf); 129 copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f); 130 compactionCacheConfig = new CacheConfig(copyOfConf); 131 List<Tag> tags = new ArrayList<>(2); 132 tags.add(MobConstants.MOB_REF_TAG); 133 Tag tableNameTag = new ArrayBackedTag(TagType.MOB_TABLE_NAME_TAG_TYPE, tableName.getName()); 134 tags.add(tableNameTag); 135 this.refCellTags = TagUtil.fromList(tags); 136 cryptoContext = EncryptionUtil.createEncryptionContext(copyOfConf, column); 137 } 138 139 @Override 140 public List<Path> compact(List<FileStatus> files, boolean allFiles) throws IOException { 141 if (files == null || files.isEmpty()) { 142 LOG.info("No candidate mob files"); 143 return null; 144 } 145 LOG.info("is allFiles: " + allFiles); 146 147 // find the files to compact. 148 PartitionedMobCompactionRequest request = select(files, allFiles); 149 // compact the files. 150 return performCompaction(request); 151 } 152 153 /** 154 * Selects the compacted mob/del files. 155 * Iterates the candidates to find out all the del files and small mob files. 156 * @param candidates All the candidates. 157 * @param allFiles Whether add all mob files into the compaction. 158 * @return A compaction request. 159 * @throws IOException if IO failure is encountered 160 */ 161 protected PartitionedMobCompactionRequest select(List<FileStatus> candidates, 162 boolean allFiles) throws IOException { 163 final Map<CompactionPartitionId, CompactionPartition> filesToCompact = new HashMap<>(); 164 final CompactionPartitionId id = new CompactionPartitionId(); 165 final NavigableMap<CompactionDelPartitionId, CompactionDelPartition> delFilesToCompact = new TreeMap<>(); 166 final CompactionDelPartitionId delId = new CompactionDelPartitionId(); 167 final ArrayList<CompactionDelPartition> allDelPartitions = new ArrayList<>(); 168 int selectedFileCount = 0; 169 int irrelevantFileCount = 0; 170 int totalDelFiles = 0; 171 MobCompactPartitionPolicy policy = column.getMobCompactPartitionPolicy(); 172 173 Calendar calendar = Calendar.getInstance(); 174 Date currentDate = new Date(); 175 Date firstDayOfCurrentMonth = null; 176 Date firstDayOfCurrentWeek = null; 177 178 if (policy == MobCompactPartitionPolicy.MONTHLY) { 179 firstDayOfCurrentMonth = MobUtils.getFirstDayOfMonth(calendar, currentDate); 180 firstDayOfCurrentWeek = MobUtils.getFirstDayOfWeek(calendar, currentDate); 181 } else if (policy == MobCompactPartitionPolicy.WEEKLY) { 182 firstDayOfCurrentWeek = MobUtils.getFirstDayOfWeek(calendar, currentDate); 183 } 184 185 // We check if there is any del files so the logic can be optimized for the following processing 186 // First step is to check if there is any delete files. If there is any delete files, 187 // For each Partition, it needs to read its startKey and endKey from files. 188 // If there is no delete file, there is no need to read startKey and endKey from files, this 189 // is an optimization. 190 boolean withDelFiles = false; 191 for (FileStatus file : candidates) { 192 if (!file.isFile()) { 193 continue; 194 } 195 // group the del files and small files. 196 FileStatus linkedFile = file; 197 if (HFileLink.isHFileLink(file.getPath())) { 198 HFileLink link = HFileLink.buildFromHFileLinkPattern(conf, file.getPath()); 199 linkedFile = getLinkedFileStatus(link); 200 if (linkedFile == null) { 201 continue; 202 } 203 } 204 if (StoreFileInfo.isDelFile(linkedFile.getPath())) { 205 withDelFiles = true; 206 break; 207 } 208 } 209 210 for (FileStatus file : candidates) { 211 if (!file.isFile()) { 212 irrelevantFileCount++; 213 continue; 214 } 215 // group the del files and small files. 216 FileStatus linkedFile = file; 217 if (HFileLink.isHFileLink(file.getPath())) { 218 HFileLink link = HFileLink.buildFromHFileLinkPattern(conf, file.getPath()); 219 linkedFile = getLinkedFileStatus(link); 220 if (linkedFile == null) { 221 // If the linked file cannot be found, regard it as an irrelevantFileCount file 222 irrelevantFileCount++; 223 continue; 224 } 225 } 226 if (withDelFiles && StoreFileInfo.isDelFile(linkedFile.getPath())) { 227 // File in the Del Partition List 228 229 // Get delId from the file 230 try (Reader reader = HFile.createReader(fs, linkedFile.getPath(), conf)) { 231 delId.setStartKey(reader.getFirstRowKey().get()); 232 delId.setEndKey(reader.getLastRowKey().get()); 233 } 234 CompactionDelPartition delPartition = delFilesToCompact.get(delId); 235 if (delPartition == null) { 236 CompactionDelPartitionId newDelId = 237 new CompactionDelPartitionId(delId.getStartKey(), delId.getEndKey()); 238 delPartition = new CompactionDelPartition(newDelId); 239 delFilesToCompact.put(newDelId, delPartition); 240 } 241 delPartition.addDelFile(file); 242 totalDelFiles ++; 243 } else { 244 String fileName = linkedFile.getPath().getName(); 245 String date = MobFileName.getDateFromName(fileName); 246 boolean skipCompaction = MobUtils 247 .fillPartitionId(id, firstDayOfCurrentMonth, firstDayOfCurrentWeek, date, policy, 248 calendar, mergeableSize); 249 if (allFiles || (!skipCompaction && (linkedFile.getLen() < id.getThreshold()))) { 250 // add all files if allFiles is true, 251 // otherwise add the small files to the merge pool 252 // filter out files which are not supposed to be compacted with the 253 // current policy 254 255 id.setStartKey(MobFileName.getStartKeyFromName(fileName)); 256 CompactionPartition compactionPartition = filesToCompact.get(id); 257 if (compactionPartition == null) { 258 CompactionPartitionId newId = new CompactionPartitionId(id.getStartKey(), id.getDate()); 259 compactionPartition = new CompactionPartition(newId); 260 compactionPartition.addFile(file); 261 filesToCompact.put(newId, compactionPartition); 262 newId.updateLatestDate(date); 263 } else { 264 compactionPartition.addFile(file); 265 compactionPartition.getPartitionId().updateLatestDate(date); 266 } 267 268 if (withDelFiles) { 269 // get startKey and endKey from the file and update partition 270 // TODO: is it possible to skip read of most hfiles? 271 try (Reader reader = HFile.createReader(fs, linkedFile.getPath(), conf)) { 272 compactionPartition.setStartKey(reader.getFirstRowKey().get()); 273 compactionPartition.setEndKey(reader.getLastRowKey().get()); 274 } 275 } 276 277 selectedFileCount++; 278 } 279 } 280 } 281 282 /* 283 * Merge del files so there are only non-overlapped del file lists 284 */ 285 for(Map.Entry<CompactionDelPartitionId, CompactionDelPartition> entry : delFilesToCompact.entrySet()) { 286 if (allDelPartitions.size() > 0) { 287 // check if the current key range overlaps the previous one 288 CompactionDelPartition prev = allDelPartitions.get(allDelPartitions.size() - 1); 289 if (Bytes.compareTo(prev.getId().getEndKey(), entry.getKey().getStartKey()) >= 0) { 290 // merge them together 291 prev.getId().setEndKey(entry.getValue().getId().getEndKey()); 292 prev.addDelFileList(entry.getValue().listDelFiles()); 293 294 } else { 295 allDelPartitions.add(entry.getValue()); 296 } 297 } else { 298 allDelPartitions.add(entry.getValue()); 299 } 300 } 301 302 PartitionedMobCompactionRequest request = new PartitionedMobCompactionRequest( 303 filesToCompact.values(), allDelPartitions); 304 if (candidates.size() == (totalDelFiles + selectedFileCount + irrelevantFileCount)) { 305 // all the files are selected 306 request.setCompactionType(CompactionType.ALL_FILES); 307 } 308 LOG.info("The compaction type is {}, the request has {} del files, {} selected files, and {} " + 309 "irrelevant files table '{}' and column '{}'", request.getCompactionType(), totalDelFiles, 310 selectedFileCount, irrelevantFileCount, tableName, column.getNameAsString()); 311 return request; 312 } 313 314 /** 315 * Performs the compaction on the selected files. 316 * <ol> 317 * <li>Compacts the del files.</li> 318 * <li>Compacts the selected small mob files and all the del files.</li> 319 * <li>If all the candidates are selected, delete the del files.</li> 320 * </ol> 321 * @param request The compaction request. 322 * @return The paths of new mob files generated in the compaction. 323 * @throws IOException if IO failure is encountered 324 */ 325 protected List<Path> performCompaction(PartitionedMobCompactionRequest request) 326 throws IOException { 327 328 // merge the del files, it is per del partition 329 for (CompactionDelPartition delPartition : request.getDelPartitions()) { 330 if (delPartition.getDelFileCount() <= 1) continue; 331 List<Path> newDelPaths = compactDelFiles(request, delPartition.listDelFiles()); 332 delPartition.cleanDelFiles(); 333 delPartition.addDelFileList(newDelPaths); 334 } 335 336 List<Path> paths = null; 337 int totalDelFileCount = 0; 338 try { 339 for (CompactionDelPartition delPartition : request.getDelPartitions()) { 340 for (Path newDelPath : delPartition.listDelFiles()) { 341 HStoreFile sf = 342 new HStoreFile(fs, newDelPath, conf, compactionCacheConfig, BloomType.NONE, true); 343 // pre-create reader of a del file to avoid race condition when opening the reader in each 344 // partition. 345 sf.initReader(); 346 delPartition.addStoreFile(sf); 347 totalDelFileCount++; 348 } 349 } 350 LOG.info("After merging, there are {} del files. table='{}' column='{}'", totalDelFileCount, 351 tableName, column.getNameAsString()); 352 // compact the mob files by partitions. 353 paths = compactMobFiles(request); 354 LOG.info("After compaction, there are {} mob files. table='{}' column='{}'", paths.size(), 355 tableName, column.getNameAsString()); 356 } finally { 357 for (CompactionDelPartition delPartition : request.getDelPartitions()) { 358 closeStoreFileReaders(delPartition.getStoreFiles()); 359 } 360 } 361 362 // archive the del files if all the mob files are selected. 363 if (request.type == CompactionType.ALL_FILES && !request.getDelPartitions().isEmpty()) { 364 LOG.info("After a mob compaction with all files selected, archiving the del files for " + 365 "table='{}' and column='{}'", tableName, column.getNameAsString()); 366 for (CompactionDelPartition delPartition : request.getDelPartitions()) { 367 LOG.info(Objects.toString(delPartition.listDelFiles())); 368 try { 369 MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), 370 delPartition.getStoreFiles()); 371 } catch (IOException e) { 372 LOG.error("Failed to archive the del files {} for partition {} table='{}' and " + 373 "column='{}'", delPartition.getStoreFiles(), delPartition.getId(), tableName, 374 column.getNameAsString(), e); 375 } 376 } 377 } 378 return paths; 379 } 380 381 static class DelPartitionComparator implements Comparator<CompactionDelPartition> { 382 private boolean compareStartKey; 383 384 DelPartitionComparator(boolean compareStartKey) { 385 this.compareStartKey = compareStartKey; 386 } 387 388 public boolean getCompareStartKey() { 389 return this.compareStartKey; 390 } 391 392 public void setCompareStartKey(final boolean compareStartKey) { 393 this.compareStartKey = compareStartKey; 394 } 395 396 @Override 397 public int compare(CompactionDelPartition o1, CompactionDelPartition o2) { 398 399 if (compareStartKey) { 400 return Bytes.compareTo(o1.getId().getStartKey(), o2.getId().getStartKey()); 401 } else { 402 return Bytes.compareTo(o1.getId().getEndKey(), o2.getId().getEndKey()); 403 } 404 } 405 } 406 407 @VisibleForTesting 408 List<HStoreFile> getListOfDelFilesForPartition(final CompactionPartition partition, 409 final List<CompactionDelPartition> delPartitions) { 410 // Binary search for startKey and endKey 411 412 List<HStoreFile> result = new ArrayList<>(); 413 414 DelPartitionComparator comparator = new DelPartitionComparator(false); 415 CompactionDelPartitionId id = new CompactionDelPartitionId(null, partition.getStartKey()); 416 CompactionDelPartition target = new CompactionDelPartition(id); 417 int start = Collections.binarySearch(delPartitions, target, comparator); 418 419 // Get the start index for partition 420 if (start < 0) { 421 // Calculate the insert point 422 start = (start + 1) * (-1); 423 if (start == delPartitions.size()) { 424 // no overlap 425 return result; 426 } else { 427 // Check another case which has no overlap 428 if (Bytes.compareTo(partition.getEndKey(), delPartitions.get(start).getId().getStartKey()) < 0) { 429 return result; 430 } 431 } 432 } 433 434 // Search for end index for the partition 435 comparator.setCompareStartKey(true); 436 id.setStartKey(partition.getEndKey()); 437 int end = Collections.binarySearch(delPartitions, target, comparator); 438 439 if (end < 0) { 440 end = (end + 1) * (-1); 441 if (end == 0) { 442 return result; 443 } else { 444 --end; 445 if (Bytes.compareTo(partition.getStartKey(), delPartitions.get(end).getId().getEndKey()) > 0) { 446 return result; 447 } 448 } 449 } 450 451 for (int i = start; i <= end; ++i) { 452 result.addAll(delPartitions.get(i).getStoreFiles()); 453 } 454 455 return result; 456 } 457 458 /** 459 * Compacts the selected small mob files and all the del files. 460 * @param request The compaction request. 461 * @return The paths of new mob files after compactions. 462 * @throws IOException if IO failure is encountered 463 */ 464 protected List<Path> compactMobFiles(final PartitionedMobCompactionRequest request) 465 throws IOException { 466 Collection<CompactionPartition> partitions = request.compactionPartitions; 467 if (partitions == null || partitions.isEmpty()) { 468 LOG.info("No partitions of mob files in table='{}' and column='{}'", tableName, 469 column.getNameAsString()); 470 return Collections.emptyList(); 471 } 472 List<Path> paths = new ArrayList<>(); 473 final Connection c = ConnectionFactory.createConnection(conf); 474 final Table table = c.getTable(tableName); 475 476 try { 477 Map<CompactionPartitionId, Future<List<Path>>> results = new HashMap<>(); 478 // compact the mob files by partitions in parallel. 479 for (final CompactionPartition partition : partitions) { 480 481 // How to efficiently come up a list of delFiles for one partition? 482 // Search the delPartitions and collect all the delFiles for the partition 483 // One optimization can do is that if there is no del file, we do not need to 484 // come up with startKey/endKey. 485 List<HStoreFile> delFiles = getListOfDelFilesForPartition(partition, 486 request.getDelPartitions()); 487 488 results.put(partition.getPartitionId(), pool.submit(new Callable<List<Path>>() { 489 @Override 490 public List<Path> call() throws Exception { 491 LOG.info("Compacting mob files for partition {} for table='{}' and column='{}'", 492 partition.getPartitionId(), tableName, column.getNameAsString()); 493 return compactMobFilePartition(request, partition, delFiles, c, table); 494 } 495 })); 496 } 497 // compact the partitions in parallel. 498 List<CompactionPartitionId> failedPartitions = new ArrayList<>(); 499 for (Entry<CompactionPartitionId, Future<List<Path>>> result : results.entrySet()) { 500 try { 501 paths.addAll(result.getValue().get()); 502 } catch (Exception e) { 503 // just log the error 504 LOG.error("Failed to compact the partition {} for table='{}' and column='{}'", 505 result.getKey(), tableName, column.getNameAsString(), e); 506 failedPartitions.add(result.getKey()); 507 } 508 } 509 if (!failedPartitions.isEmpty()) { 510 // if any partition fails in the compaction, directly throw an exception. 511 throw new IOException("Failed to compact the partitions " + failedPartitions + 512 " for table='" + tableName + "' column='" + column.getNameAsString() + "'"); 513 } 514 } finally { 515 try { 516 table.close(); 517 } catch (IOException e) { 518 LOG.error("Failed to close the Table", e); 519 } 520 } 521 return paths; 522 } 523 524 /** 525 * Compacts a partition of selected small mob files and all the del files. 526 * @param request The compaction request. 527 * @param partition A compaction partition. 528 * @param delFiles The del files. 529 * @param connection The connection to use. 530 * @param table The current table. 531 * @return The paths of new mob files after compactions. 532 * @throws IOException if IO failure is encountered 533 */ 534 private List<Path> compactMobFilePartition(PartitionedMobCompactionRequest request, 535 CompactionPartition partition, 536 List<HStoreFile> delFiles, 537 Connection connection, 538 Table table) throws IOException { 539 if (MobUtils.isMobFileExpired(column, EnvironmentEdgeManager.currentTime(), 540 partition.getPartitionId().getDate())) { 541 // If the files in the partition are expired, do not compact them and directly 542 // return an empty list. 543 return Collections.emptyList(); 544 } 545 List<Path> newFiles = new ArrayList<>(); 546 List<FileStatus> files = partition.listFiles(); 547 int offset = 0; 548 Path bulkloadPathOfPartition = new Path(bulkloadPath, partition.getPartitionId().toString()); 549 Path bulkloadColumnPath = new Path(bulkloadPathOfPartition, column.getNameAsString()); 550 while (offset < files.size()) { 551 int batch = compactionBatchSize; 552 if (files.size() - offset < compactionBatchSize) { 553 batch = files.size() - offset; 554 } 555 if (batch == 1 && delFiles.isEmpty()) { 556 // only one file left and no del files, do not compact it, 557 // and directly add it to the new files. 558 newFiles.add(files.get(offset).getPath()); 559 offset++; 560 continue; 561 } 562 // clean the bulkload directory to avoid loading old files. 563 fs.delete(bulkloadPathOfPartition, true); 564 // add the selected mob files and del files into filesToCompact 565 List<HStoreFile> filesToCompact = new ArrayList<>(); 566 for (int i = offset; i < batch + offset; i++) { 567 HStoreFile sf = new HStoreFile(fs, files.get(i).getPath(), conf, compactionCacheConfig, 568 BloomType.NONE, true); 569 filesToCompact.add(sf); 570 } 571 filesToCompact.addAll(delFiles); 572 // compact the mob files in a batch. 573 compactMobFilesInBatch(request, partition, connection, table, filesToCompact, batch, 574 bulkloadPathOfPartition, bulkloadColumnPath, newFiles); 575 // move to the next batch. 576 offset += batch; 577 } 578 LOG.info("Compaction is finished. The number of mob files is changed from {} to {} for " + 579 "partition={} for table='{}' and column='{}'", files.size(), newFiles.size(), 580 partition.getPartitionId(), tableName, column.getNameAsString()); 581 return newFiles; 582 } 583 584 /** 585 * Closes the readers of store files. 586 * @param storeFiles The store files to be closed. 587 */ 588 private void closeStoreFileReaders(List<HStoreFile> storeFiles) { 589 for (HStoreFile storeFile : storeFiles) { 590 try { 591 storeFile.closeStoreFile(true); 592 } catch (IOException e) { 593 LOG.warn("Failed to close the reader on store file " + storeFile.getPath(), e); 594 } 595 } 596 } 597 598 /** 599 * Compacts a partition of selected small mob files and all the del files in a batch. 600 * @param request The compaction request. 601 * @param partition A compaction partition. 602 * @param connection To use for transport 603 * @param table The current table. 604 * @param filesToCompact The files to be compacted. 605 * @param batch The number of mob files to be compacted in a batch. 606 * @param bulkloadPathOfPartition The directory where the bulkload column of the current 607 * partition is saved. 608 * @param bulkloadColumnPath The directory where the bulkload files of current partition 609 * are saved. 610 * @param newFiles The paths of new mob files after compactions. 611 * @throws IOException if IO failure is encountered 612 */ 613 private void compactMobFilesInBatch(PartitionedMobCompactionRequest request, 614 CompactionPartition partition, 615 Connection connection, Table table, 616 List<HStoreFile> filesToCompact, int batch, 617 Path bulkloadPathOfPartition, Path bulkloadColumnPath, 618 List<Path> newFiles) 619 throws IOException { 620 // open scanner to the selected mob files and del files. 621 StoreScanner scanner = createScanner(filesToCompact, ScanType.COMPACT_DROP_DELETES); 622 // the mob files to be compacted, not include the del files. 623 List<HStoreFile> mobFilesToCompact = filesToCompact.subList(0, batch); 624 // Pair(maxSeqId, cellsCount) 625 Pair<Long, Long> fileInfo = getFileInfo(mobFilesToCompact); 626 // open writers for the mob files and new ref store files. 627 StoreFileWriter writer = null; 628 StoreFileWriter refFileWriter = null; 629 Path filePath = null; 630 long mobCells = 0; 631 boolean cleanupTmpMobFile = false; 632 boolean cleanupBulkloadDirOfPartition = false; 633 boolean cleanupCommittedMobFile = false; 634 boolean closeReaders= true; 635 636 try { 637 try { 638 writer = MobUtils 639 .createWriter(conf, fs, column, partition.getPartitionId().getLatestDate(), tempPath, 640 Long.MAX_VALUE, column.getCompactionCompressionType(), 641 partition.getPartitionId().getStartKey(), compactionCacheConfig, cryptoContext, 642 true); 643 cleanupTmpMobFile = true; 644 filePath = writer.getPath(); 645 byte[] fileName = Bytes.toBytes(filePath.getName()); 646 // create a temp file and open a writer for it in the bulkloadPath 647 refFileWriter = MobUtils.createRefFileWriter(conf, fs, column, bulkloadColumnPath, 648 fileInfo.getSecond().longValue(), compactionCacheConfig, cryptoContext, true); 649 cleanupBulkloadDirOfPartition = true; 650 List<Cell> cells = new ArrayList<>(); 651 boolean hasMore; 652 ScannerContext scannerContext = 653 ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); 654 do { 655 hasMore = scanner.next(cells, scannerContext); 656 for (Cell cell : cells) { 657 // write the mob cell to the mob file. 658 writer.append(cell); 659 // write the new reference cell to the store file. 660 Cell reference = MobUtils.createMobRefCell(cell, fileName, this.refCellTags); 661 refFileWriter.append(reference); 662 mobCells++; 663 } 664 cells.clear(); 665 } while (hasMore); 666 } finally { 667 // close the scanner. 668 scanner.close(); 669 670 if (cleanupTmpMobFile) { 671 // append metadata to the mob file, and close the mob file writer. 672 closeMobFileWriter(writer, fileInfo.getFirst(), mobCells); 673 } 674 675 if (cleanupBulkloadDirOfPartition) { 676 // append metadata and bulkload info to the ref mob file, and close the writer. 677 closeRefFileWriter(refFileWriter, fileInfo.getFirst(), request.selectionTime); 678 } 679 } 680 681 if (mobCells > 0) { 682 // commit mob file 683 MobUtils.commitFile(conf, fs, filePath, mobFamilyDir, compactionCacheConfig); 684 cleanupTmpMobFile = false; 685 cleanupCommittedMobFile = true; 686 // bulkload the ref file 687 LOG.info("start MOB ref bulkload for partition {} table='{}' column='{}'", 688 partition.getPartitionId(), tableName, column.getNameAsString()); 689 bulkloadRefFile(connection, table, bulkloadPathOfPartition, filePath.getName()); 690 cleanupCommittedMobFile = false; 691 LOG.info("end MOB ref bulkload for partition {} table='{}' column='{}'", 692 partition.getPartitionId(), tableName, column.getNameAsString()); 693 newFiles.add(new Path(mobFamilyDir, filePath.getName())); 694 } 695 696 // archive the old mob files, do not archive the del files. 697 try { 698 closeStoreFileReaders(mobFilesToCompact); 699 closeReaders = false; 700 MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), mobFilesToCompact); 701 } catch (IOException e) { 702 LOG.error("Failed to archive the files " + mobFilesToCompact, e); 703 } 704 } finally { 705 if (closeReaders) { 706 closeStoreFileReaders(mobFilesToCompact); 707 } 708 709 if (cleanupTmpMobFile) { 710 deletePath(filePath); 711 } 712 713 if (cleanupBulkloadDirOfPartition) { 714 // delete the bulkload files in bulkloadPath 715 deletePath(bulkloadPathOfPartition); 716 } 717 718 if (cleanupCommittedMobFile) { 719 LOG.error("failed MOB ref bulkload for partition {} table='{}' column='{}'", 720 partition.getPartitionId(), tableName, column.getNameAsString()); 721 MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), 722 Collections.singletonList(new HStoreFile(fs, new Path(mobFamilyDir, filePath.getName()), 723 conf, compactionCacheConfig, BloomType.NONE, true))); 724 } 725 } 726 } 727 728 /** 729 * Compacts the del files in batches which avoids opening too many files. 730 * @param request The compaction request. 731 * @param delFilePaths Del file paths to compact 732 * @return The paths of new del files after merging or the original files if no merging 733 * is necessary. 734 * @throws IOException if IO failure is encountered 735 */ 736 protected List<Path> compactDelFiles(PartitionedMobCompactionRequest request, 737 List<Path> delFilePaths) throws IOException { 738 if (delFilePaths.size() <= delFileMaxCount) { 739 return delFilePaths; 740 } 741 // when there are more del files than the number that is allowed, merge it firstly. 742 int offset = 0; 743 List<Path> paths = new ArrayList<>(); 744 while (offset < delFilePaths.size()) { 745 // get the batch 746 int batch = compactionBatchSize; 747 if (delFilePaths.size() - offset < compactionBatchSize) { 748 batch = delFilePaths.size() - offset; 749 } 750 List<HStoreFile> batchedDelFiles = new ArrayList<>(); 751 if (batch == 1) { 752 // only one file left, do not compact it, directly add it to the new files. 753 paths.add(delFilePaths.get(offset)); 754 offset++; 755 continue; 756 } 757 for (int i = offset; i < batch + offset; i++) { 758 batchedDelFiles.add(new HStoreFile(fs, delFilePaths.get(i), conf, compactionCacheConfig, 759 BloomType.NONE, true)); 760 } 761 // compact the del files in a batch. 762 paths.add(compactDelFilesInBatch(request, batchedDelFiles)); 763 // move to the next batch. 764 offset += batch; 765 } 766 return compactDelFiles(request, paths); 767 } 768 769 /** 770 * Compacts the del file in a batch. 771 * @param request The compaction request. 772 * @param delFiles The del files. 773 * @return The path of new del file after merging. 774 * @throws IOException if IO failure is encountered 775 */ 776 private Path compactDelFilesInBatch(PartitionedMobCompactionRequest request, 777 List<HStoreFile> delFiles) throws IOException { 778 // create a scanner for the del files. 779 StoreScanner scanner = createScanner(delFiles, ScanType.COMPACT_RETAIN_DELETES); 780 StoreFileWriter writer = null; 781 Path filePath = null; 782 try { 783 writer = MobUtils.createDelFileWriter(conf, fs, column, 784 MobUtils.formatDate(new Date(request.selectionTime)), tempPath, Long.MAX_VALUE, 785 column.getCompactionCompressionType(), HConstants.EMPTY_START_ROW, compactionCacheConfig, 786 cryptoContext); 787 filePath = writer.getPath(); 788 List<Cell> cells = new ArrayList<>(); 789 boolean hasMore; 790 ScannerContext scannerContext = 791 ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); 792 do { 793 hasMore = scanner.next(cells, scannerContext); 794 for (Cell cell : cells) { 795 writer.append(cell); 796 } 797 cells.clear(); 798 } while (hasMore); 799 } finally { 800 scanner.close(); 801 if (writer != null) { 802 try { 803 writer.close(); 804 } catch (IOException e) { 805 LOG.error("Failed to close the writer of the file " + filePath, e); 806 } 807 } 808 } 809 // commit the new del file 810 Path path = MobUtils.commitFile(conf, fs, filePath, mobFamilyDir, compactionCacheConfig); 811 // archive the old del files 812 try { 813 MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), delFiles); 814 } catch (IOException e) { 815 LOG.error("Failed to archive the old del files " + delFiles, e); 816 } 817 return path; 818 } 819 820 /** 821 * Creates a store scanner. 822 * @param filesToCompact The files to be compacted. 823 * @param scanType The scan type. 824 * @return The store scanner. 825 * @throws IOException if IO failure is encountered 826 */ 827 private StoreScanner createScanner(List<HStoreFile> filesToCompact, ScanType scanType) 828 throws IOException { 829 List<StoreFileScanner> scanners = StoreFileScanner.getScannersForStoreFiles(filesToCompact, 830 false, true, false, false, HConstants.LATEST_TIMESTAMP); 831 long ttl = HStore.determineTTLFromFamily(column); 832 ScanInfo scanInfo = new ScanInfo(conf, column, ttl, 0, CellComparator.getInstance()); 833 return new StoreScanner(scanInfo, scanType, scanners); 834 } 835 836 /** 837 * Bulkloads the current file. 838 * 839 * @param connection to use to get admin/RegionLocator 840 * @param table The current table. 841 * @param bulkloadDirectory The path of bulkload directory. 842 * @param fileName The current file name. 843 * @throws IOException if IO failure is encountered 844 */ 845 private void bulkloadRefFile(Connection connection, Table table, Path bulkloadDirectory, 846 String fileName) 847 throws IOException { 848 // bulkload the ref file 849 try { 850 LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf); 851 bulkload.disableReplication(); 852 bulkload.doBulkLoad(bulkloadDirectory, connection.getAdmin(), table, 853 connection.getRegionLocator(table.getName())); 854 } catch (Exception e) { 855 throw new IOException(e); 856 } 857 } 858 859 /** 860 * Closes the mob file writer. 861 * @param writer The mob file writer. 862 * @param maxSeqId Maximum sequence id. 863 * @param mobCellsCount The number of mob cells. 864 * @throws IOException if IO failure is encountered 865 */ 866 private void closeMobFileWriter(StoreFileWriter writer, long maxSeqId, long mobCellsCount) 867 throws IOException { 868 if (writer != null) { 869 writer.appendMetadata(maxSeqId, false, mobCellsCount); 870 try { 871 writer.close(); 872 } catch (IOException e) { 873 LOG.error("Failed to close the writer of the file " + writer.getPath(), e); 874 } 875 } 876 } 877 878 /** 879 * Closes the ref file writer. 880 * @param writer The ref file writer. 881 * @param maxSeqId Maximum sequence id. 882 * @param bulkloadTime The timestamp at which the bulk load file is created. 883 * @throws IOException if IO failure is encountered 884 */ 885 private void closeRefFileWriter(StoreFileWriter writer, long maxSeqId, long bulkloadTime) 886 throws IOException { 887 if (writer != null) { 888 writer.appendMetadata(maxSeqId, false); 889 writer.appendFileInfo(BULKLOAD_TIME_KEY, Bytes.toBytes(bulkloadTime)); 890 writer.appendFileInfo(SKIP_RESET_SEQ_ID, Bytes.toBytes(true)); 891 try { 892 writer.close(); 893 } catch (IOException e) { 894 LOG.error("Failed to close the writer of the ref file " + writer.getPath(), e); 895 } 896 } 897 } 898 899 /** 900 * Gets the max seqId and number of cells of the store files. 901 * @param storeFiles The store files. 902 * @return The pair of the max seqId and number of cells of the store files. 903 * @throws IOException if IO failure is encountered 904 */ 905 private Pair<Long, Long> getFileInfo(List<HStoreFile> storeFiles) throws IOException { 906 long maxSeqId = 0; 907 long maxKeyCount = 0; 908 for (HStoreFile sf : storeFiles) { 909 // the readers will be closed later after the merge. 910 maxSeqId = Math.max(maxSeqId, sf.getMaxSequenceId()); 911 sf.initReader(); 912 byte[] count = sf.getReader().loadFileInfo().get(MOB_CELLS_COUNT); 913 if (count != null) { 914 maxKeyCount += Bytes.toLong(count); 915 } 916 } 917 return new Pair<>(maxSeqId, maxKeyCount); 918 } 919 920 /** 921 * Deletes a file. 922 * @param path The path of the file to be deleted. 923 */ 924 private void deletePath(Path path) { 925 LOG.debug("Cleanup, delete path '{}'", path); 926 try { 927 if (path != null) { 928 fs.delete(path, true); 929 } 930 } catch (IOException e) { 931 LOG.error("Failed to delete the file " + path, e); 932 } 933 } 934 935 private FileStatus getLinkedFileStatus(HFileLink link) throws IOException { 936 Path[] locations = link.getLocations(); 937 FileStatus file; 938 for (Path location : locations) { 939 940 if (location != null) { 941 try { 942 file = fs.getFileStatus(location); 943 if (file != null) { 944 return file; 945 } 946 } catch (FileNotFoundException e) { 947 } 948 } 949 } 950 LOG.warn("The file " + link + " links to can not be found"); 951 return null; 952 } 953}