001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.mob.compactions; 020 021import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY; 022import static org.apache.hadoop.hbase.regionserver.HStoreFile.MOB_CELLS_COUNT; 023import static org.apache.hadoop.hbase.regionserver.HStoreFile.SKIP_RESET_SEQ_ID; 024 025import java.io.FileNotFoundException; 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.Calendar; 029import java.util.Collection; 030import java.util.Collections; 031import java.util.Comparator; 032import java.util.Date; 033import java.util.HashMap; 034import java.util.List; 035import java.util.Map; 036import java.util.Map.Entry; 037import java.util.NavigableMap; 038import java.util.Objects; 039import java.util.TreeMap; 040import java.util.concurrent.Callable; 041import java.util.concurrent.ExecutorService; 042import java.util.concurrent.Future; 043 044import org.apache.hadoop.conf.Configuration; 045import org.apache.hadoop.fs.FileStatus; 046import org.apache.hadoop.fs.FileSystem; 047import org.apache.hadoop.fs.Path; 048import org.apache.hadoop.hbase.ArrayBackedTag; 049import org.apache.hadoop.hbase.Cell; 050import org.apache.hadoop.hbase.CellComparator; 051import org.apache.hadoop.hbase.HConstants; 052import org.apache.hadoop.hbase.TableName; 053import org.apache.hadoop.hbase.Tag; 054import org.apache.hadoop.hbase.TagType; 055import org.apache.hadoop.hbase.TagUtil; 056import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 057import org.apache.hadoop.hbase.client.Connection; 058import org.apache.hadoop.hbase.client.ConnectionFactory; 059import org.apache.hadoop.hbase.client.MobCompactPartitionPolicy; 060import org.apache.hadoop.hbase.client.Table; 061import org.apache.hadoop.hbase.io.HFileLink; 062import org.apache.hadoop.hbase.io.crypto.Encryption; 063import org.apache.hadoop.hbase.io.hfile.CacheConfig; 064import org.apache.hadoop.hbase.io.hfile.HFile; 065import org.apache.hadoop.hbase.io.hfile.HFile.Reader; 066import org.apache.hadoop.hbase.mob.MobConstants; 067import org.apache.hadoop.hbase.mob.MobFileName; 068import org.apache.hadoop.hbase.mob.MobUtils; 069import org.apache.hadoop.hbase.mob.compactions.MobCompactionRequest.CompactionType; 070import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionDelPartition; 071import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionDelPartitionId; 072import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartition; 073import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartitionId; 074import org.apache.hadoop.hbase.regionserver.BloomType; 075import org.apache.hadoop.hbase.regionserver.HStore; 076import org.apache.hadoop.hbase.regionserver.HStoreFile; 077import org.apache.hadoop.hbase.regionserver.ScanInfo; 078import org.apache.hadoop.hbase.regionserver.ScanType; 079import org.apache.hadoop.hbase.regionserver.ScannerContext; 080import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 081import org.apache.hadoop.hbase.regionserver.StoreFileScanner; 082import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 083import org.apache.hadoop.hbase.regionserver.StoreScanner; 084import org.apache.hadoop.hbase.security.EncryptionUtil; 085import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 086import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; 087import org.apache.hadoop.hbase.util.Bytes; 088import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 089import org.apache.hadoop.hbase.util.Pair; 090import org.apache.yetus.audience.InterfaceAudience; 091import org.slf4j.Logger; 092import org.slf4j.LoggerFactory; 093 094/** 095 * An implementation of {@link MobCompactor} that compacts the mob files in partitions. 096 */ 097@InterfaceAudience.Private 098public class PartitionedMobCompactor extends MobCompactor { 099 100 private static final Logger LOG = LoggerFactory.getLogger(PartitionedMobCompactor.class); 101 protected long mergeableSize; 102 protected int delFileMaxCount; 103 /** The number of files compacted in a batch */ 104 protected int compactionBatchSize; 105 protected int compactionKVMax; 106 107 private final Path tempPath; 108 private final Path bulkloadPath; 109 private final CacheConfig compactionCacheConfig; 110 private final byte[] refCellTags; 111 private Encryption.Context cryptoContext = Encryption.Context.NONE; 112 113 public PartitionedMobCompactor(Configuration conf, FileSystem fs, TableName tableName, 114 ColumnFamilyDescriptor column, ExecutorService pool) throws IOException { 115 super(conf, fs, tableName, column, pool); 116 mergeableSize = conf.getLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD, 117 MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD); 118 delFileMaxCount = conf.getInt(MobConstants.MOB_DELFILE_MAX_COUNT, 119 MobConstants.DEFAULT_MOB_DELFILE_MAX_COUNT); 120 // default is 100 121 compactionBatchSize = conf.getInt(MobConstants.MOB_COMPACTION_BATCH_SIZE, 122 MobConstants.DEFAULT_MOB_COMPACTION_BATCH_SIZE); 123 tempPath = new Path(MobUtils.getMobHome(conf), MobConstants.TEMP_DIR_NAME); 124 bulkloadPath = new Path(tempPath, new Path(MobConstants.BULKLOAD_DIR_NAME, new Path( 125 tableName.getNamespaceAsString(), tableName.getQualifierAsString()))); 126 compactionKVMax = this.conf.getInt(HConstants.COMPACTION_KV_MAX, 127 HConstants.COMPACTION_KV_MAX_DEFAULT); 128 Configuration copyOfConf = new Configuration(conf); 129 copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f); 130 compactionCacheConfig = new CacheConfig(copyOfConf); 131 List<Tag> tags = new ArrayList<>(2); 132 tags.add(MobConstants.MOB_REF_TAG); 133 Tag tableNameTag = new ArrayBackedTag(TagType.MOB_TABLE_NAME_TAG_TYPE, tableName.getName()); 134 tags.add(tableNameTag); 135 this.refCellTags = TagUtil.fromList(tags); 136 cryptoContext = EncryptionUtil.createEncryptionContext(copyOfConf, column); 137 } 138 139 @Override 140 public List<Path> compact(List<FileStatus> files, boolean allFiles) throws IOException { 141 if (files == null || files.isEmpty()) { 142 LOG.info("No candidate mob files"); 143 return null; 144 } 145 LOG.info("is allFiles: " + allFiles); 146 147 // find the files to compact. 148 PartitionedMobCompactionRequest request = select(files, allFiles); 149 // compact the files. 150 return performCompaction(request); 151 } 152 153 /** 154 * Selects the compacted mob/del files. 155 * Iterates the candidates to find out all the del files and small mob files. 156 * @param candidates All the candidates. 157 * @param allFiles Whether add all mob files into the compaction. 158 * @return A compaction request. 159 * @throws IOException if IO failure is encountered 160 */ 161 protected PartitionedMobCompactionRequest select(List<FileStatus> candidates, 162 boolean allFiles) throws IOException { 163 final Map<CompactionPartitionId, CompactionPartition> filesToCompact = new HashMap<>(); 164 final CompactionPartitionId id = new CompactionPartitionId(); 165 final NavigableMap<CompactionDelPartitionId, CompactionDelPartition> delFilesToCompact = new TreeMap<>(); 166 final CompactionDelPartitionId delId = new CompactionDelPartitionId(); 167 final ArrayList<CompactionDelPartition> allDelPartitions = new ArrayList<>(); 168 int selectedFileCount = 0; 169 int irrelevantFileCount = 0; 170 int totalDelFiles = 0; 171 MobCompactPartitionPolicy policy = column.getMobCompactPartitionPolicy(); 172 173 Calendar calendar = Calendar.getInstance(); 174 Date currentDate = new Date(); 175 Date firstDayOfCurrentMonth = null; 176 Date firstDayOfCurrentWeek = null; 177 178 if (policy == MobCompactPartitionPolicy.MONTHLY) { 179 firstDayOfCurrentMonth = MobUtils.getFirstDayOfMonth(calendar, currentDate); 180 firstDayOfCurrentWeek = MobUtils.getFirstDayOfWeek(calendar, currentDate); 181 } else if (policy == MobCompactPartitionPolicy.WEEKLY) { 182 firstDayOfCurrentWeek = MobUtils.getFirstDayOfWeek(calendar, currentDate); 183 } 184 185 // We check if there is any del files so the logic can be optimized for the following processing 186 // First step is to check if there is any delete files. If there is any delete files, 187 // For each Partition, it needs to read its startKey and endKey from files. 188 // If there is no delete file, there is no need to read startKey and endKey from files, this 189 // is an optimization. 190 boolean withDelFiles = false; 191 for (FileStatus file : candidates) { 192 if (!file.isFile()) { 193 continue; 194 } 195 // group the del files and small files. 196 FileStatus linkedFile = file; 197 if (HFileLink.isHFileLink(file.getPath())) { 198 HFileLink link = HFileLink.buildFromHFileLinkPattern(conf, file.getPath()); 199 linkedFile = getLinkedFileStatus(link); 200 if (linkedFile == null) { 201 continue; 202 } 203 } 204 if (StoreFileInfo.isDelFile(linkedFile.getPath())) { 205 withDelFiles = true; 206 break; 207 } 208 } 209 210 for (FileStatus file : candidates) { 211 if (!file.isFile()) { 212 irrelevantFileCount++; 213 continue; 214 } 215 // group the del files and small files. 216 FileStatus linkedFile = file; 217 if (HFileLink.isHFileLink(file.getPath())) { 218 HFileLink link = HFileLink.buildFromHFileLinkPattern(conf, file.getPath()); 219 linkedFile = getLinkedFileStatus(link); 220 if (linkedFile == null) { 221 // If the linked file cannot be found, regard it as an irrelevantFileCount file 222 irrelevantFileCount++; 223 continue; 224 } 225 } 226 if (withDelFiles && StoreFileInfo.isDelFile(linkedFile.getPath())) { 227 // File in the Del Partition List 228 229 // Get delId from the file 230 try (Reader reader = HFile.createReader(fs, linkedFile.getPath(), conf)) { 231 delId.setStartKey(reader.getFirstRowKey().get()); 232 delId.setEndKey(reader.getLastRowKey().get()); 233 } 234 CompactionDelPartition delPartition = delFilesToCompact.get(delId); 235 if (delPartition == null) { 236 CompactionDelPartitionId newDelId = 237 new CompactionDelPartitionId(delId.getStartKey(), delId.getEndKey()); 238 delPartition = new CompactionDelPartition(newDelId); 239 delFilesToCompact.put(newDelId, delPartition); 240 } 241 delPartition.addDelFile(file); 242 totalDelFiles ++; 243 } else { 244 String fileName = linkedFile.getPath().getName(); 245 String date = MobFileName.getDateFromName(fileName); 246 boolean skipCompaction = MobUtils 247 .fillPartitionId(id, firstDayOfCurrentMonth, firstDayOfCurrentWeek, date, policy, 248 calendar, mergeableSize); 249 if (allFiles || (!skipCompaction && (linkedFile.getLen() < id.getThreshold()))) { 250 // add all files if allFiles is true, 251 // otherwise add the small files to the merge pool 252 // filter out files which are not supposed to be compacted with the 253 // current policy 254 255 id.setStartKey(MobFileName.getStartKeyFromName(fileName)); 256 CompactionPartition compactionPartition = filesToCompact.get(id); 257 if (compactionPartition == null) { 258 CompactionPartitionId newId = new CompactionPartitionId(id.getStartKey(), id.getDate()); 259 compactionPartition = new CompactionPartition(newId); 260 compactionPartition.addFile(file); 261 filesToCompact.put(newId, compactionPartition); 262 newId.updateLatestDate(date); 263 } else { 264 compactionPartition.addFile(file); 265 compactionPartition.getPartitionId().updateLatestDate(date); 266 } 267 268 if (withDelFiles) { 269 // get startKey and endKey from the file and update partition 270 // TODO: is it possible to skip read of most hfiles? 271 try (Reader reader = HFile.createReader(fs, linkedFile.getPath(), conf)) { 272 compactionPartition.setStartKey(reader.getFirstRowKey().get()); 273 compactionPartition.setEndKey(reader.getLastRowKey().get()); 274 } 275 } 276 277 selectedFileCount++; 278 } 279 } 280 } 281 282 /* 283 * Merge del files so there are only non-overlapped del file lists 284 */ 285 for(Map.Entry<CompactionDelPartitionId, CompactionDelPartition> entry : delFilesToCompact.entrySet()) { 286 if (allDelPartitions.size() > 0) { 287 // check if the current key range overlaps the previous one 288 CompactionDelPartition prev = allDelPartitions.get(allDelPartitions.size() - 1); 289 if (Bytes.compareTo(prev.getId().getEndKey(), entry.getKey().getStartKey()) >= 0) { 290 // merge them together 291 prev.getId().setEndKey(entry.getValue().getId().getEndKey()); 292 prev.addDelFileList(entry.getValue().listDelFiles()); 293 294 } else { 295 allDelPartitions.add(entry.getValue()); 296 } 297 } else { 298 allDelPartitions.add(entry.getValue()); 299 } 300 } 301 302 PartitionedMobCompactionRequest request = new PartitionedMobCompactionRequest( 303 filesToCompact.values(), allDelPartitions); 304 if (candidates.size() == (totalDelFiles + selectedFileCount + irrelevantFileCount)) { 305 // all the files are selected 306 request.setCompactionType(CompactionType.ALL_FILES); 307 } 308 LOG.info("The compaction type is " + request.getCompactionType() + ", the request has " 309 + totalDelFiles + " del files, " + selectedFileCount + " selected files, and " 310 + irrelevantFileCount + " irrelevant files"); 311 return request; 312 } 313 314 /** 315 * Performs the compaction on the selected files. 316 * <ol> 317 * <li>Compacts the del files.</li> 318 * <li>Compacts the selected small mob files and all the del files.</li> 319 * <li>If all the candidates are selected, delete the del files.</li> 320 * </ol> 321 * @param request The compaction request. 322 * @return The paths of new mob files generated in the compaction. 323 * @throws IOException if IO failure is encountered 324 */ 325 protected List<Path> performCompaction(PartitionedMobCompactionRequest request) 326 throws IOException { 327 328 // merge the del files, it is per del partition 329 for (CompactionDelPartition delPartition : request.getDelPartitions()) { 330 if (delPartition.getDelFileCount() <= 1) continue; 331 List<Path> newDelPaths = compactDelFiles(request, delPartition.listDelFiles()); 332 delPartition.cleanDelFiles(); 333 delPartition.addDelFileList(newDelPaths); 334 } 335 336 List<Path> paths = null; 337 int totalDelFileCount = 0; 338 try { 339 for (CompactionDelPartition delPartition : request.getDelPartitions()) { 340 for (Path newDelPath : delPartition.listDelFiles()) { 341 HStoreFile sf = 342 new HStoreFile(fs, newDelPath, conf, compactionCacheConfig, BloomType.NONE, true); 343 // pre-create reader of a del file to avoid race condition when opening the reader in each 344 // partition. 345 sf.initReader(); 346 delPartition.addStoreFile(sf); 347 totalDelFileCount++; 348 } 349 } 350 LOG.info("After merging, there are " + totalDelFileCount + " del files"); 351 // compact the mob files by partitions. 352 paths = compactMobFiles(request); 353 LOG.info("After compaction, there are " + paths.size() + " mob files"); 354 } finally { 355 for (CompactionDelPartition delPartition : request.getDelPartitions()) { 356 closeStoreFileReaders(delPartition.getStoreFiles()); 357 } 358 } 359 360 // archive the del files if all the mob files are selected. 361 if (request.type == CompactionType.ALL_FILES && !request.getDelPartitions().isEmpty()) { 362 LOG.info( 363 "After a mob compaction with all files selected, archiving the del files "); 364 for (CompactionDelPartition delPartition : request.getDelPartitions()) { 365 LOG.info(Objects.toString(delPartition.listDelFiles())); 366 try { 367 MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), 368 delPartition.getStoreFiles()); 369 } catch (IOException e) { 370 LOG.error("Failed to archive the del files " + delPartition.getStoreFiles(), e); 371 } 372 } 373 } 374 return paths; 375 } 376 377 static class DelPartitionComparator implements Comparator<CompactionDelPartition> { 378 private boolean compareStartKey; 379 380 DelPartitionComparator(boolean compareStartKey) { 381 this.compareStartKey = compareStartKey; 382 } 383 384 public boolean getCompareStartKey() { 385 return this.compareStartKey; 386 } 387 388 public void setCompareStartKey(final boolean compareStartKey) { 389 this.compareStartKey = compareStartKey; 390 } 391 392 @Override 393 public int compare(CompactionDelPartition o1, CompactionDelPartition o2) { 394 395 if (compareStartKey) { 396 return Bytes.compareTo(o1.getId().getStartKey(), o2.getId().getStartKey()); 397 } else { 398 return Bytes.compareTo(o1.getId().getEndKey(), o2.getId().getEndKey()); 399 } 400 } 401 } 402 403 @VisibleForTesting 404 List<HStoreFile> getListOfDelFilesForPartition(final CompactionPartition partition, 405 final List<CompactionDelPartition> delPartitions) { 406 // Binary search for startKey and endKey 407 408 List<HStoreFile> result = new ArrayList<>(); 409 410 DelPartitionComparator comparator = new DelPartitionComparator(false); 411 CompactionDelPartitionId id = new CompactionDelPartitionId(null, partition.getStartKey()); 412 CompactionDelPartition target = new CompactionDelPartition(id); 413 int start = Collections.binarySearch(delPartitions, target, comparator); 414 415 // Get the start index for partition 416 if (start < 0) { 417 // Calculate the insert point 418 start = (start + 1) * (-1); 419 if (start == delPartitions.size()) { 420 // no overlap 421 return result; 422 } else { 423 // Check another case which has no overlap 424 if (Bytes.compareTo(partition.getEndKey(), delPartitions.get(start).getId().getStartKey()) < 0) { 425 return result; 426 } 427 } 428 } 429 430 // Search for end index for the partition 431 comparator.setCompareStartKey(true); 432 id.setStartKey(partition.getEndKey()); 433 int end = Collections.binarySearch(delPartitions, target, comparator); 434 435 if (end < 0) { 436 end = (end + 1) * (-1); 437 if (end == 0) { 438 return result; 439 } else { 440 --end; 441 if (Bytes.compareTo(partition.getStartKey(), delPartitions.get(end).getId().getEndKey()) > 0) { 442 return result; 443 } 444 } 445 } 446 447 for (int i = start; i <= end; ++i) { 448 result.addAll(delPartitions.get(i).getStoreFiles()); 449 } 450 451 return result; 452 } 453 454 /** 455 * Compacts the selected small mob files and all the del files. 456 * @param request The compaction request. 457 * @return The paths of new mob files after compactions. 458 * @throws IOException if IO failure is encountered 459 */ 460 protected List<Path> compactMobFiles(final PartitionedMobCompactionRequest request) 461 throws IOException { 462 Collection<CompactionPartition> partitions = request.compactionPartitions; 463 if (partitions == null || partitions.isEmpty()) { 464 LOG.info("No partitions of mob files"); 465 return Collections.emptyList(); 466 } 467 List<Path> paths = new ArrayList<>(); 468 final Connection c = ConnectionFactory.createConnection(conf); 469 final Table table = c.getTable(tableName); 470 471 try { 472 Map<CompactionPartitionId, Future<List<Path>>> results = new HashMap<>(); 473 // compact the mob files by partitions in parallel. 474 for (final CompactionPartition partition : partitions) { 475 476 // How to efficiently come up a list of delFiles for one partition? 477 // Search the delPartitions and collect all the delFiles for the partition 478 // One optimization can do is that if there is no del file, we do not need to 479 // come up with startKey/endKey. 480 List<HStoreFile> delFiles = getListOfDelFilesForPartition(partition, 481 request.getDelPartitions()); 482 483 results.put(partition.getPartitionId(), pool.submit(new Callable<List<Path>>() { 484 @Override 485 public List<Path> call() throws Exception { 486 LOG.info("Compacting mob files for partition " + partition.getPartitionId()); 487 return compactMobFilePartition(request, partition, delFiles, c, table); 488 } 489 })); 490 } 491 // compact the partitions in parallel. 492 List<CompactionPartitionId> failedPartitions = new ArrayList<>(); 493 for (Entry<CompactionPartitionId, Future<List<Path>>> result : results.entrySet()) { 494 try { 495 paths.addAll(result.getValue().get()); 496 } catch (Exception e) { 497 // just log the error 498 LOG.error("Failed to compact the partition " + result.getKey(), e); 499 failedPartitions.add(result.getKey()); 500 } 501 } 502 if (!failedPartitions.isEmpty()) { 503 // if any partition fails in the compaction, directly throw an exception. 504 throw new IOException("Failed to compact the partitions " + failedPartitions); 505 } 506 } finally { 507 try { 508 table.close(); 509 } catch (IOException e) { 510 LOG.error("Failed to close the Table", e); 511 } 512 } 513 return paths; 514 } 515 516 /** 517 * Compacts a partition of selected small mob files and all the del files. 518 * @param request The compaction request. 519 * @param partition A compaction partition. 520 * @param delFiles The del files. 521 * @param connection The connection to use. 522 * @param table The current table. 523 * @return The paths of new mob files after compactions. 524 * @throws IOException if IO failure is encountered 525 */ 526 private List<Path> compactMobFilePartition(PartitionedMobCompactionRequest request, 527 CompactionPartition partition, 528 List<HStoreFile> delFiles, 529 Connection connection, 530 Table table) throws IOException { 531 if (MobUtils.isMobFileExpired(column, EnvironmentEdgeManager.currentTime(), 532 partition.getPartitionId().getDate())) { 533 // If the files in the partition are expired, do not compact them and directly 534 // return an empty list. 535 return Collections.emptyList(); 536 } 537 List<Path> newFiles = new ArrayList<>(); 538 List<FileStatus> files = partition.listFiles(); 539 int offset = 0; 540 Path bulkloadPathOfPartition = new Path(bulkloadPath, partition.getPartitionId().toString()); 541 Path bulkloadColumnPath = new Path(bulkloadPathOfPartition, column.getNameAsString()); 542 while (offset < files.size()) { 543 int batch = compactionBatchSize; 544 if (files.size() - offset < compactionBatchSize) { 545 batch = files.size() - offset; 546 } 547 if (batch == 1 && delFiles.isEmpty()) { 548 // only one file left and no del files, do not compact it, 549 // and directly add it to the new files. 550 newFiles.add(files.get(offset).getPath()); 551 offset++; 552 continue; 553 } 554 // clean the bulkload directory to avoid loading old files. 555 fs.delete(bulkloadPathOfPartition, true); 556 // add the selected mob files and del files into filesToCompact 557 List<HStoreFile> filesToCompact = new ArrayList<>(); 558 for (int i = offset; i < batch + offset; i++) { 559 HStoreFile sf = new HStoreFile(fs, files.get(i).getPath(), conf, compactionCacheConfig, 560 BloomType.NONE, true); 561 filesToCompact.add(sf); 562 } 563 filesToCompact.addAll(delFiles); 564 // compact the mob files in a batch. 565 compactMobFilesInBatch(request, partition, connection, table, filesToCompact, batch, 566 bulkloadPathOfPartition, bulkloadColumnPath, newFiles); 567 // move to the next batch. 568 offset += batch; 569 } 570 LOG.info("Compaction is finished. The number of mob files is changed from " + files.size() 571 + " to " + newFiles.size()); 572 return newFiles; 573 } 574 575 /** 576 * Closes the readers of store files. 577 * @param storeFiles The store files to be closed. 578 */ 579 private void closeStoreFileReaders(List<HStoreFile> storeFiles) { 580 for (HStoreFile storeFile : storeFiles) { 581 try { 582 storeFile.closeStoreFile(true); 583 } catch (IOException e) { 584 LOG.warn("Failed to close the reader on store file " + storeFile.getPath(), e); 585 } 586 } 587 } 588 589 /** 590 * Compacts a partition of selected small mob files and all the del files in a batch. 591 * @param request The compaction request. 592 * @param partition A compaction partition. 593 * @param connection To use for transport 594 * @param table The current table. 595 * @param filesToCompact The files to be compacted. 596 * @param batch The number of mob files to be compacted in a batch. 597 * @param bulkloadPathOfPartition The directory where the bulkload column of the current 598 * partition is saved. 599 * @param bulkloadColumnPath The directory where the bulkload files of current partition 600 * are saved. 601 * @param newFiles The paths of new mob files after compactions. 602 * @throws IOException if IO failure is encountered 603 */ 604 private void compactMobFilesInBatch(PartitionedMobCompactionRequest request, 605 CompactionPartition partition, 606 Connection connection, Table table, 607 List<HStoreFile> filesToCompact, int batch, 608 Path bulkloadPathOfPartition, Path bulkloadColumnPath, 609 List<Path> newFiles) 610 throws IOException { 611 // open scanner to the selected mob files and del files. 612 StoreScanner scanner = createScanner(filesToCompact, ScanType.COMPACT_DROP_DELETES); 613 // the mob files to be compacted, not include the del files. 614 List<HStoreFile> mobFilesToCompact = filesToCompact.subList(0, batch); 615 // Pair(maxSeqId, cellsCount) 616 Pair<Long, Long> fileInfo = getFileInfo(mobFilesToCompact); 617 // open writers for the mob files and new ref store files. 618 StoreFileWriter writer = null; 619 StoreFileWriter refFileWriter = null; 620 Path filePath = null; 621 long mobCells = 0; 622 boolean cleanupTmpMobFile = false; 623 boolean cleanupBulkloadDirOfPartition = false; 624 boolean cleanupCommittedMobFile = false; 625 boolean closeReaders= true; 626 627 try { 628 try { 629 writer = MobUtils 630 .createWriter(conf, fs, column, partition.getPartitionId().getLatestDate(), tempPath, 631 Long.MAX_VALUE, column.getCompactionCompressionType(), 632 partition.getPartitionId().getStartKey(), compactionCacheConfig, cryptoContext, 633 true); 634 cleanupTmpMobFile = true; 635 filePath = writer.getPath(); 636 byte[] fileName = Bytes.toBytes(filePath.getName()); 637 // create a temp file and open a writer for it in the bulkloadPath 638 refFileWriter = MobUtils.createRefFileWriter(conf, fs, column, bulkloadColumnPath, 639 fileInfo.getSecond().longValue(), compactionCacheConfig, cryptoContext, true); 640 cleanupBulkloadDirOfPartition = true; 641 List<Cell> cells = new ArrayList<>(); 642 boolean hasMore; 643 ScannerContext scannerContext = 644 ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); 645 do { 646 hasMore = scanner.next(cells, scannerContext); 647 for (Cell cell : cells) { 648 // write the mob cell to the mob file. 649 writer.append(cell); 650 // write the new reference cell to the store file. 651 Cell reference = MobUtils.createMobRefCell(cell, fileName, this.refCellTags); 652 refFileWriter.append(reference); 653 mobCells++; 654 } 655 cells.clear(); 656 } while (hasMore); 657 } finally { 658 // close the scanner. 659 scanner.close(); 660 661 if (cleanupTmpMobFile) { 662 // append metadata to the mob file, and close the mob file writer. 663 closeMobFileWriter(writer, fileInfo.getFirst(), mobCells); 664 } 665 666 if (cleanupBulkloadDirOfPartition) { 667 // append metadata and bulkload info to the ref mob file, and close the writer. 668 closeRefFileWriter(refFileWriter, fileInfo.getFirst(), request.selectionTime); 669 } 670 } 671 672 if (mobCells > 0) { 673 // commit mob file 674 MobUtils.commitFile(conf, fs, filePath, mobFamilyDir, compactionCacheConfig); 675 cleanupTmpMobFile = false; 676 cleanupCommittedMobFile = true; 677 // bulkload the ref file 678 bulkloadRefFile(connection, table, bulkloadPathOfPartition, filePath.getName()); 679 cleanupCommittedMobFile = false; 680 newFiles.add(new Path(mobFamilyDir, filePath.getName())); 681 } 682 683 // archive the old mob files, do not archive the del files. 684 try { 685 closeStoreFileReaders(mobFilesToCompact); 686 closeReaders = false; 687 MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), mobFilesToCompact); 688 } catch (IOException e) { 689 LOG.error("Failed to archive the files " + mobFilesToCompact, e); 690 } 691 } finally { 692 if (closeReaders) { 693 closeStoreFileReaders(mobFilesToCompact); 694 } 695 696 if (cleanupTmpMobFile) { 697 deletePath(filePath); 698 } 699 700 if (cleanupBulkloadDirOfPartition) { 701 // delete the bulkload files in bulkloadPath 702 deletePath(bulkloadPathOfPartition); 703 } 704 705 if (cleanupCommittedMobFile) { 706 deletePath(new Path(mobFamilyDir, filePath.getName())); 707 } 708 } 709 } 710 711 /** 712 * Compacts the del files in batches which avoids opening too many files. 713 * @param request The compaction request. 714 * @param delFilePaths Del file paths to compact 715 * @return The paths of new del files after merging or the original files if no merging 716 * is necessary. 717 * @throws IOException if IO failure is encountered 718 */ 719 protected List<Path> compactDelFiles(PartitionedMobCompactionRequest request, 720 List<Path> delFilePaths) throws IOException { 721 if (delFilePaths.size() <= delFileMaxCount) { 722 return delFilePaths; 723 } 724 // when there are more del files than the number that is allowed, merge it firstly. 725 int offset = 0; 726 List<Path> paths = new ArrayList<>(); 727 while (offset < delFilePaths.size()) { 728 // get the batch 729 int batch = compactionBatchSize; 730 if (delFilePaths.size() - offset < compactionBatchSize) { 731 batch = delFilePaths.size() - offset; 732 } 733 List<HStoreFile> batchedDelFiles = new ArrayList<>(); 734 if (batch == 1) { 735 // only one file left, do not compact it, directly add it to the new files. 736 paths.add(delFilePaths.get(offset)); 737 offset++; 738 continue; 739 } 740 for (int i = offset; i < batch + offset; i++) { 741 batchedDelFiles.add(new HStoreFile(fs, delFilePaths.get(i), conf, compactionCacheConfig, 742 BloomType.NONE, true)); 743 } 744 // compact the del files in a batch. 745 paths.add(compactDelFilesInBatch(request, batchedDelFiles)); 746 // move to the next batch. 747 offset += batch; 748 } 749 return compactDelFiles(request, paths); 750 } 751 752 /** 753 * Compacts the del file in a batch. 754 * @param request The compaction request. 755 * @param delFiles The del files. 756 * @return The path of new del file after merging. 757 * @throws IOException if IO failure is encountered 758 */ 759 private Path compactDelFilesInBatch(PartitionedMobCompactionRequest request, 760 List<HStoreFile> delFiles) throws IOException { 761 // create a scanner for the del files. 762 StoreScanner scanner = createScanner(delFiles, ScanType.COMPACT_RETAIN_DELETES); 763 StoreFileWriter writer = null; 764 Path filePath = null; 765 try { 766 writer = MobUtils.createDelFileWriter(conf, fs, column, 767 MobUtils.formatDate(new Date(request.selectionTime)), tempPath, Long.MAX_VALUE, 768 column.getCompactionCompressionType(), HConstants.EMPTY_START_ROW, compactionCacheConfig, 769 cryptoContext); 770 filePath = writer.getPath(); 771 List<Cell> cells = new ArrayList<>(); 772 boolean hasMore; 773 ScannerContext scannerContext = 774 ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); 775 do { 776 hasMore = scanner.next(cells, scannerContext); 777 for (Cell cell : cells) { 778 writer.append(cell); 779 } 780 cells.clear(); 781 } while (hasMore); 782 } finally { 783 scanner.close(); 784 if (writer != null) { 785 try { 786 writer.close(); 787 } catch (IOException e) { 788 LOG.error("Failed to close the writer of the file " + filePath, e); 789 } 790 } 791 } 792 // commit the new del file 793 Path path = MobUtils.commitFile(conf, fs, filePath, mobFamilyDir, compactionCacheConfig); 794 // archive the old del files 795 try { 796 MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), delFiles); 797 } catch (IOException e) { 798 LOG.error("Failed to archive the old del files " + delFiles, e); 799 } 800 return path; 801 } 802 803 /** 804 * Creates a store scanner. 805 * @param filesToCompact The files to be compacted. 806 * @param scanType The scan type. 807 * @return The store scanner. 808 * @throws IOException if IO failure is encountered 809 */ 810 private StoreScanner createScanner(List<HStoreFile> filesToCompact, ScanType scanType) 811 throws IOException { 812 List<StoreFileScanner> scanners = StoreFileScanner.getScannersForStoreFiles(filesToCompact, 813 false, true, false, false, HConstants.LATEST_TIMESTAMP); 814 long ttl = HStore.determineTTLFromFamily(column); 815 ScanInfo scanInfo = new ScanInfo(conf, column, ttl, 0, CellComparator.getInstance()); 816 return new StoreScanner(scanInfo, scanType, scanners); 817 } 818 819 /** 820 * Bulkloads the current file. 821 * 822 * @param connection to use to get admin/RegionLocator 823 * @param table The current table. 824 * @param bulkloadDirectory The path of bulkload directory. 825 * @param fileName The current file name. 826 * @throws IOException if IO failure is encountered 827 */ 828 private void bulkloadRefFile(Connection connection, Table table, Path bulkloadDirectory, 829 String fileName) 830 throws IOException { 831 // bulkload the ref file 832 try { 833 LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf); 834 bulkload.doBulkLoad(bulkloadDirectory, connection.getAdmin(), table, 835 connection.getRegionLocator(table.getName())); 836 } catch (Exception e) { 837 throw new IOException(e); 838 } 839 } 840 841 /** 842 * Closes the mob file writer. 843 * @param writer The mob file writer. 844 * @param maxSeqId Maximum sequence id. 845 * @param mobCellsCount The number of mob cells. 846 * @throws IOException if IO failure is encountered 847 */ 848 private void closeMobFileWriter(StoreFileWriter writer, long maxSeqId, long mobCellsCount) 849 throws IOException { 850 if (writer != null) { 851 writer.appendMetadata(maxSeqId, false, mobCellsCount); 852 try { 853 writer.close(); 854 } catch (IOException e) { 855 LOG.error("Failed to close the writer of the file " + writer.getPath(), e); 856 } 857 } 858 } 859 860 /** 861 * Closes the ref file writer. 862 * @param writer The ref file writer. 863 * @param maxSeqId Maximum sequence id. 864 * @param bulkloadTime The timestamp at which the bulk load file is created. 865 * @throws IOException if IO failure is encountered 866 */ 867 private void closeRefFileWriter(StoreFileWriter writer, long maxSeqId, long bulkloadTime) 868 throws IOException { 869 if (writer != null) { 870 writer.appendMetadata(maxSeqId, false); 871 writer.appendFileInfo(BULKLOAD_TIME_KEY, Bytes.toBytes(bulkloadTime)); 872 writer.appendFileInfo(SKIP_RESET_SEQ_ID, Bytes.toBytes(true)); 873 try { 874 writer.close(); 875 } catch (IOException e) { 876 LOG.error("Failed to close the writer of the ref file " + writer.getPath(), e); 877 } 878 } 879 } 880 881 /** 882 * Gets the max seqId and number of cells of the store files. 883 * @param storeFiles The store files. 884 * @return The pair of the max seqId and number of cells of the store files. 885 * @throws IOException if IO failure is encountered 886 */ 887 private Pair<Long, Long> getFileInfo(List<HStoreFile> storeFiles) throws IOException { 888 long maxSeqId = 0; 889 long maxKeyCount = 0; 890 for (HStoreFile sf : storeFiles) { 891 // the readers will be closed later after the merge. 892 maxSeqId = Math.max(maxSeqId, sf.getMaxSequenceId()); 893 sf.initReader(); 894 byte[] count = sf.getReader().loadFileInfo().get(MOB_CELLS_COUNT); 895 if (count != null) { 896 maxKeyCount += Bytes.toLong(count); 897 } 898 } 899 return new Pair<>(maxSeqId, maxKeyCount); 900 } 901 902 /** 903 * Deletes a file. 904 * @param path The path of the file to be deleted. 905 */ 906 private void deletePath(Path path) { 907 try { 908 if (path != null) { 909 fs.delete(path, true); 910 } 911 } catch (IOException e) { 912 LOG.error("Failed to delete the file " + path, e); 913 } 914 } 915 916 private FileStatus getLinkedFileStatus(HFileLink link) throws IOException { 917 Path[] locations = link.getLocations(); 918 FileStatus file; 919 for (Path location : locations) { 920 921 if (location != null) { 922 try { 923 file = fs.getFileStatus(location); 924 if (file != null) { 925 return file; 926 } 927 } catch (FileNotFoundException e) { 928 } 929 } 930 } 931 LOG.warn("The file " + link + " links to can not be found"); 932 return null; 933 } 934}