001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.mob.compactions; 020 021import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY; 022import static org.apache.hadoop.hbase.regionserver.HStoreFile.MOB_CELLS_COUNT; 023import static org.apache.hadoop.hbase.regionserver.HStoreFile.SKIP_RESET_SEQ_ID; 024 025import java.io.FileNotFoundException; 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.Calendar; 029import java.util.Collection; 030import java.util.Collections; 031import java.util.Comparator; 032import java.util.Date; 033import java.util.HashMap; 034import java.util.List; 035import java.util.Map; 036import java.util.Map.Entry; 037import java.util.NavigableMap; 038import java.util.Objects; 039import java.util.TreeMap; 040import java.util.concurrent.Callable; 041import java.util.concurrent.ExecutorService; 042import java.util.concurrent.Future; 043 044import org.apache.hadoop.conf.Configuration; 045import org.apache.hadoop.fs.FileStatus; 046import org.apache.hadoop.fs.FileSystem; 047import org.apache.hadoop.fs.Path; 048import org.apache.hadoop.hbase.ArrayBackedTag; 049import org.apache.hadoop.hbase.Cell; 050import org.apache.hadoop.hbase.CellComparator; 051import org.apache.hadoop.hbase.HConstants; 052import org.apache.hadoop.hbase.TableName; 053import org.apache.hadoop.hbase.Tag; 054import org.apache.hadoop.hbase.TagType; 055import org.apache.hadoop.hbase.TagUtil; 056import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 057import org.apache.hadoop.hbase.client.Connection; 058import org.apache.hadoop.hbase.client.ConnectionFactory; 059import org.apache.hadoop.hbase.client.MobCompactPartitionPolicy; 060import org.apache.hadoop.hbase.client.Table; 061import org.apache.hadoop.hbase.io.HFileLink; 062import org.apache.hadoop.hbase.io.crypto.Encryption; 063import org.apache.hadoop.hbase.io.hfile.CacheConfig; 064import org.apache.hadoop.hbase.io.hfile.HFile; 065import org.apache.hadoop.hbase.io.hfile.HFile.Reader; 066import org.apache.hadoop.hbase.mob.MobConstants; 067import org.apache.hadoop.hbase.mob.MobFileName; 068import org.apache.hadoop.hbase.mob.MobUtils; 069import org.apache.hadoop.hbase.mob.compactions.MobCompactionRequest.CompactionType; 070import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionDelPartition; 071import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionDelPartitionId; 072import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartition; 073import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartitionId; 074import org.apache.hadoop.hbase.regionserver.BloomType; 075import org.apache.hadoop.hbase.regionserver.HStore; 076import org.apache.hadoop.hbase.regionserver.HStoreFile; 077import org.apache.hadoop.hbase.regionserver.ScanInfo; 078import org.apache.hadoop.hbase.regionserver.ScanType; 079import org.apache.hadoop.hbase.regionserver.ScannerContext; 080import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 081import org.apache.hadoop.hbase.regionserver.StoreFileScanner; 082import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 083import org.apache.hadoop.hbase.regionserver.StoreScanner; 084import org.apache.hadoop.hbase.security.EncryptionUtil; 085import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; 086import org.apache.hadoop.hbase.util.Bytes; 087import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 088import org.apache.hadoop.hbase.util.Pair; 089import org.apache.yetus.audience.InterfaceAudience; 090import org.slf4j.Logger; 091import org.slf4j.LoggerFactory; 092 093/** 094 * An implementation of {@link MobCompactor} that compacts the mob files in partitions. 095 */ 096@InterfaceAudience.Private 097public class PartitionedMobCompactor extends MobCompactor { 098 099 private static final Logger LOG = LoggerFactory.getLogger(PartitionedMobCompactor.class); 100 protected long mergeableSize; 101 protected int delFileMaxCount; 102 /** The number of files compacted in a batch */ 103 protected int compactionBatchSize; 104 protected int compactionKVMax; 105 106 private final Path tempPath; 107 private final Path bulkloadPath; 108 private final CacheConfig compactionCacheConfig; 109 private final byte[] refCellTags; 110 private Encryption.Context cryptoContext = Encryption.Context.NONE; 111 112 public PartitionedMobCompactor(Configuration conf, FileSystem fs, TableName tableName, 113 ColumnFamilyDescriptor column, ExecutorService pool) throws IOException { 114 super(conf, fs, tableName, column, pool); 115 mergeableSize = conf.getLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD, 116 MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD); 117 delFileMaxCount = conf.getInt(MobConstants.MOB_DELFILE_MAX_COUNT, 118 MobConstants.DEFAULT_MOB_DELFILE_MAX_COUNT); 119 // default is 100 120 compactionBatchSize = conf.getInt(MobConstants.MOB_COMPACTION_BATCH_SIZE, 121 MobConstants.DEFAULT_MOB_COMPACTION_BATCH_SIZE); 122 tempPath = new Path(MobUtils.getMobHome(conf), MobConstants.TEMP_DIR_NAME); 123 bulkloadPath = new Path(tempPath, new Path(MobConstants.BULKLOAD_DIR_NAME, new Path( 124 tableName.getNamespaceAsString(), tableName.getQualifierAsString()))); 125 compactionKVMax = this.conf.getInt(HConstants.COMPACTION_KV_MAX, 126 HConstants.COMPACTION_KV_MAX_DEFAULT); 127 Configuration copyOfConf = new Configuration(conf); 128 copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f); 129 compactionCacheConfig = new CacheConfig(copyOfConf); 130 List<Tag> tags = new ArrayList<>(2); 131 tags.add(MobConstants.MOB_REF_TAG); 132 Tag tableNameTag = new ArrayBackedTag(TagType.MOB_TABLE_NAME_TAG_TYPE, tableName.getName()); 133 tags.add(tableNameTag); 134 this.refCellTags = TagUtil.fromList(tags); 135 cryptoContext = EncryptionUtil.createEncryptionContext(copyOfConf, column); 136 } 137 138 @Override 139 public List<Path> compact(List<FileStatus> files, boolean allFiles) throws IOException { 140 if (files == null || files.isEmpty()) { 141 LOG.info("No candidate mob files"); 142 return null; 143 } 144 LOG.info("is allFiles: " + allFiles); 145 146 // find the files to compact. 147 PartitionedMobCompactionRequest request = select(files, allFiles); 148 // compact the files. 149 return performCompaction(request); 150 } 151 152 /** 153 * Selects the compacted mob/del files. 154 * Iterates the candidates to find out all the del files and small mob files. 155 * @param candidates All the candidates. 156 * @param allFiles Whether add all mob files into the compaction. 157 * @return A compaction request. 158 * @throws IOException if IO failure is encountered 159 */ 160 protected PartitionedMobCompactionRequest select(List<FileStatus> candidates, 161 boolean allFiles) throws IOException { 162 final Map<CompactionPartitionId, CompactionPartition> filesToCompact = new HashMap<>(); 163 final CompactionPartitionId id = new CompactionPartitionId(); 164 final NavigableMap<CompactionDelPartitionId, CompactionDelPartition> delFilesToCompact = new TreeMap<>(); 165 final CompactionDelPartitionId delId = new CompactionDelPartitionId(); 166 final ArrayList<CompactionDelPartition> allDelPartitions = new ArrayList<>(); 167 int selectedFileCount = 0; 168 int irrelevantFileCount = 0; 169 int totalDelFiles = 0; 170 MobCompactPartitionPolicy policy = column.getMobCompactPartitionPolicy(); 171 172 Calendar calendar = Calendar.getInstance(); 173 Date currentDate = new Date(); 174 Date firstDayOfCurrentMonth = null; 175 Date firstDayOfCurrentWeek = null; 176 177 if (policy == MobCompactPartitionPolicy.MONTHLY) { 178 firstDayOfCurrentMonth = MobUtils.getFirstDayOfMonth(calendar, currentDate); 179 firstDayOfCurrentWeek = MobUtils.getFirstDayOfWeek(calendar, currentDate); 180 } else if (policy == MobCompactPartitionPolicy.WEEKLY) { 181 firstDayOfCurrentWeek = MobUtils.getFirstDayOfWeek(calendar, currentDate); 182 } 183 184 // We check if there is any del files so the logic can be optimized for the following processing 185 // First step is to check if there is any delete files. If there is any delete files, 186 // For each Partition, it needs to read its startKey and endKey from files. 187 // If there is no delete file, there is no need to read startKey and endKey from files, this 188 // is an optimization. 189 boolean withDelFiles = false; 190 for (FileStatus file : candidates) { 191 if (!file.isFile()) { 192 continue; 193 } 194 // group the del files and small files. 195 FileStatus linkedFile = file; 196 if (HFileLink.isHFileLink(file.getPath())) { 197 HFileLink link = HFileLink.buildFromHFileLinkPattern(conf, file.getPath()); 198 linkedFile = getLinkedFileStatus(link); 199 if (linkedFile == null) { 200 continue; 201 } 202 } 203 if (StoreFileInfo.isDelFile(linkedFile.getPath())) { 204 withDelFiles = true; 205 break; 206 } 207 } 208 209 for (FileStatus file : candidates) { 210 if (!file.isFile()) { 211 irrelevantFileCount++; 212 continue; 213 } 214 // group the del files and small files. 215 FileStatus linkedFile = file; 216 if (HFileLink.isHFileLink(file.getPath())) { 217 HFileLink link = HFileLink.buildFromHFileLinkPattern(conf, file.getPath()); 218 linkedFile = getLinkedFileStatus(link); 219 if (linkedFile == null) { 220 // If the linked file cannot be found, regard it as an irrelevantFileCount file 221 irrelevantFileCount++; 222 continue; 223 } 224 } 225 if (withDelFiles && StoreFileInfo.isDelFile(linkedFile.getPath())) { 226 // File in the Del Partition List 227 228 // Get delId from the file 229 try (Reader reader = HFile.createReader(fs, linkedFile.getPath(), conf)) { 230 delId.setStartKey(reader.getFirstRowKey().get()); 231 delId.setEndKey(reader.getLastRowKey().get()); 232 } 233 CompactionDelPartition delPartition = delFilesToCompact.get(delId); 234 if (delPartition == null) { 235 CompactionDelPartitionId newDelId = 236 new CompactionDelPartitionId(delId.getStartKey(), delId.getEndKey()); 237 delPartition = new CompactionDelPartition(newDelId); 238 delFilesToCompact.put(newDelId, delPartition); 239 } 240 delPartition.addDelFile(file); 241 totalDelFiles ++; 242 } else { 243 String fileName = linkedFile.getPath().getName(); 244 String date = MobFileName.getDateFromName(fileName); 245 boolean skipCompaction = MobUtils 246 .fillPartitionId(id, firstDayOfCurrentMonth, firstDayOfCurrentWeek, date, policy, 247 calendar, mergeableSize); 248 if (allFiles || (!skipCompaction && (linkedFile.getLen() < id.getThreshold()))) { 249 // add all files if allFiles is true, 250 // otherwise add the small files to the merge pool 251 // filter out files which are not supposed to be compacted with the 252 // current policy 253 254 id.setStartKey(MobFileName.getStartKeyFromName(fileName)); 255 CompactionPartition compactionPartition = filesToCompact.get(id); 256 if (compactionPartition == null) { 257 CompactionPartitionId newId = new CompactionPartitionId(id.getStartKey(), id.getDate()); 258 compactionPartition = new CompactionPartition(newId); 259 compactionPartition.addFile(file); 260 filesToCompact.put(newId, compactionPartition); 261 newId.updateLatestDate(date); 262 } else { 263 compactionPartition.addFile(file); 264 compactionPartition.getPartitionId().updateLatestDate(date); 265 } 266 267 if (withDelFiles) { 268 // get startKey and endKey from the file and update partition 269 // TODO: is it possible to skip read of most hfiles? 270 try (Reader reader = HFile.createReader(fs, linkedFile.getPath(), conf)) { 271 compactionPartition.setStartKey(reader.getFirstRowKey().get()); 272 compactionPartition.setEndKey(reader.getLastRowKey().get()); 273 } 274 } 275 276 selectedFileCount++; 277 } 278 } 279 } 280 281 /* 282 * Merge del files so there are only non-overlapped del file lists 283 */ 284 for(Map.Entry<CompactionDelPartitionId, CompactionDelPartition> entry : delFilesToCompact.entrySet()) { 285 if (allDelPartitions.size() > 0) { 286 // check if the current key range overlaps the previous one 287 CompactionDelPartition prev = allDelPartitions.get(allDelPartitions.size() - 1); 288 if (Bytes.compareTo(prev.getId().getEndKey(), entry.getKey().getStartKey()) >= 0) { 289 // merge them together 290 prev.getId().setEndKey(entry.getValue().getId().getEndKey()); 291 prev.addDelFileList(entry.getValue().listDelFiles()); 292 293 } else { 294 allDelPartitions.add(entry.getValue()); 295 } 296 } else { 297 allDelPartitions.add(entry.getValue()); 298 } 299 } 300 301 PartitionedMobCompactionRequest request = new PartitionedMobCompactionRequest( 302 filesToCompact.values(), allDelPartitions); 303 if (candidates.size() == (totalDelFiles + selectedFileCount + irrelevantFileCount)) { 304 // all the files are selected 305 request.setCompactionType(CompactionType.ALL_FILES); 306 } 307 LOG.info("The compaction type is {}, the request has {} del files, {} selected files, and {} " + 308 "irrelevant files table '{}' and column '{}'", request.getCompactionType(), totalDelFiles, 309 selectedFileCount, irrelevantFileCount, tableName, column.getNameAsString()); 310 return request; 311 } 312 313 /** 314 * Performs the compaction on the selected files. 315 * <ol> 316 * <li>Compacts the del files.</li> 317 * <li>Compacts the selected small mob files and all the del files.</li> 318 * <li>If all the candidates are selected, delete the del files.</li> 319 * </ol> 320 * @param request The compaction request. 321 * @return The paths of new mob files generated in the compaction. 322 * @throws IOException if IO failure is encountered 323 */ 324 protected List<Path> performCompaction(PartitionedMobCompactionRequest request) 325 throws IOException { 326 327 // merge the del files, it is per del partition 328 for (CompactionDelPartition delPartition : request.getDelPartitions()) { 329 if (delPartition.getDelFileCount() <= 1) continue; 330 List<Path> newDelPaths = compactDelFiles(request, delPartition.listDelFiles()); 331 delPartition.cleanDelFiles(); 332 delPartition.addDelFileList(newDelPaths); 333 } 334 335 List<Path> paths = null; 336 int totalDelFileCount = 0; 337 try { 338 for (CompactionDelPartition delPartition : request.getDelPartitions()) { 339 for (Path newDelPath : delPartition.listDelFiles()) { 340 HStoreFile sf = 341 new HStoreFile(fs, newDelPath, conf, compactionCacheConfig, BloomType.NONE, true); 342 // pre-create reader of a del file to avoid race condition when opening the reader in each 343 // partition. 344 sf.initReader(); 345 delPartition.addStoreFile(sf); 346 totalDelFileCount++; 347 } 348 } 349 LOG.info("After merging, there are {} del files. table='{}' column='{}'", totalDelFileCount, 350 tableName, column.getNameAsString()); 351 // compact the mob files by partitions. 352 paths = compactMobFiles(request); 353 LOG.info("After compaction, there are {} mob files. table='{}' column='{}'", paths.size(), 354 tableName, column.getNameAsString()); 355 } finally { 356 for (CompactionDelPartition delPartition : request.getDelPartitions()) { 357 closeStoreFileReaders(delPartition.getStoreFiles()); 358 } 359 } 360 361 // archive the del files if all the mob files are selected. 362 if (request.type == CompactionType.ALL_FILES && !request.getDelPartitions().isEmpty()) { 363 LOG.info("After a mob compaction with all files selected, archiving the del files for " + 364 "table='{}' and column='{}'", tableName, column.getNameAsString()); 365 for (CompactionDelPartition delPartition : request.getDelPartitions()) { 366 LOG.info(Objects.toString(delPartition.listDelFiles())); 367 try { 368 MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), 369 delPartition.getStoreFiles()); 370 } catch (IOException e) { 371 LOG.error("Failed to archive the del files {} for partition {} table='{}' and " + 372 "column='{}'", delPartition.getStoreFiles(), delPartition.getId(), tableName, 373 column.getNameAsString(), e); 374 } 375 } 376 } 377 return paths; 378 } 379 380 static class DelPartitionComparator implements Comparator<CompactionDelPartition> { 381 private boolean compareStartKey; 382 383 DelPartitionComparator(boolean compareStartKey) { 384 this.compareStartKey = compareStartKey; 385 } 386 387 public boolean getCompareStartKey() { 388 return this.compareStartKey; 389 } 390 391 public void setCompareStartKey(final boolean compareStartKey) { 392 this.compareStartKey = compareStartKey; 393 } 394 395 @Override 396 public int compare(CompactionDelPartition o1, CompactionDelPartition o2) { 397 398 if (compareStartKey) { 399 return Bytes.compareTo(o1.getId().getStartKey(), o2.getId().getStartKey()); 400 } else { 401 return Bytes.compareTo(o1.getId().getEndKey(), o2.getId().getEndKey()); 402 } 403 } 404 } 405 406 List<HStoreFile> getListOfDelFilesForPartition(final CompactionPartition partition, 407 final List<CompactionDelPartition> delPartitions) { 408 // Binary search for startKey and endKey 409 410 List<HStoreFile> result = new ArrayList<>(); 411 412 DelPartitionComparator comparator = new DelPartitionComparator(false); 413 CompactionDelPartitionId id = new CompactionDelPartitionId(null, partition.getStartKey()); 414 CompactionDelPartition target = new CompactionDelPartition(id); 415 int start = Collections.binarySearch(delPartitions, target, comparator); 416 417 // Get the start index for partition 418 if (start < 0) { 419 // Calculate the insert point 420 start = (start + 1) * (-1); 421 if (start == delPartitions.size()) { 422 // no overlap 423 return result; 424 } else { 425 // Check another case which has no overlap 426 if (Bytes.compareTo(partition.getEndKey(), delPartitions.get(start).getId().getStartKey()) < 0) { 427 return result; 428 } 429 } 430 } 431 432 // Search for end index for the partition 433 comparator.setCompareStartKey(true); 434 id.setStartKey(partition.getEndKey()); 435 int end = Collections.binarySearch(delPartitions, target, comparator); 436 437 if (end < 0) { 438 end = (end + 1) * (-1); 439 if (end == 0) { 440 return result; 441 } else { 442 --end; 443 if (Bytes.compareTo(partition.getStartKey(), delPartitions.get(end).getId().getEndKey()) > 0) { 444 return result; 445 } 446 } 447 } 448 449 for (int i = start; i <= end; ++i) { 450 result.addAll(delPartitions.get(i).getStoreFiles()); 451 } 452 453 return result; 454 } 455 456 /** 457 * Compacts the selected small mob files and all the del files. 458 * @param request The compaction request. 459 * @return The paths of new mob files after compactions. 460 * @throws IOException if IO failure is encountered 461 */ 462 protected List<Path> compactMobFiles(final PartitionedMobCompactionRequest request) 463 throws IOException { 464 Collection<CompactionPartition> partitions = request.compactionPartitions; 465 if (partitions == null || partitions.isEmpty()) { 466 LOG.info("No partitions of mob files in table='{}' and column='{}'", tableName, 467 column.getNameAsString()); 468 return Collections.emptyList(); 469 } 470 List<Path> paths = new ArrayList<>(); 471 final Connection c = ConnectionFactory.createConnection(conf); 472 final Table table = c.getTable(tableName); 473 474 try { 475 Map<CompactionPartitionId, Future<List<Path>>> results = new HashMap<>(); 476 // compact the mob files by partitions in parallel. 477 for (final CompactionPartition partition : partitions) { 478 479 // How to efficiently come up a list of delFiles for one partition? 480 // Search the delPartitions and collect all the delFiles for the partition 481 // One optimization can do is that if there is no del file, we do not need to 482 // come up with startKey/endKey. 483 List<HStoreFile> delFiles = getListOfDelFilesForPartition(partition, 484 request.getDelPartitions()); 485 486 results.put(partition.getPartitionId(), pool.submit(new Callable<List<Path>>() { 487 @Override 488 public List<Path> call() throws Exception { 489 LOG.info("Compacting mob files for partition {} for table='{}' and column='{}'", 490 partition.getPartitionId(), tableName, column.getNameAsString()); 491 return compactMobFilePartition(request, partition, delFiles, c, table); 492 } 493 })); 494 } 495 // compact the partitions in parallel. 496 List<CompactionPartitionId> failedPartitions = new ArrayList<>(); 497 for (Entry<CompactionPartitionId, Future<List<Path>>> result : results.entrySet()) { 498 try { 499 paths.addAll(result.getValue().get()); 500 } catch (Exception e) { 501 // just log the error 502 LOG.error("Failed to compact the partition {} for table='{}' and column='{}'", 503 result.getKey(), tableName, column.getNameAsString(), e); 504 failedPartitions.add(result.getKey()); 505 } 506 } 507 if (!failedPartitions.isEmpty()) { 508 // if any partition fails in the compaction, directly throw an exception. 509 throw new IOException("Failed to compact the partitions " + failedPartitions + 510 " for table='" + tableName + "' column='" + column.getNameAsString() + "'"); 511 } 512 } finally { 513 try { 514 table.close(); 515 } catch (IOException e) { 516 LOG.error("Failed to close the Table", e); 517 } 518 } 519 return paths; 520 } 521 522 /** 523 * Compacts a partition of selected small mob files and all the del files. 524 * @param request The compaction request. 525 * @param partition A compaction partition. 526 * @param delFiles The del files. 527 * @param connection The connection to use. 528 * @param table The current table. 529 * @return The paths of new mob files after compactions. 530 * @throws IOException if IO failure is encountered 531 */ 532 private List<Path> compactMobFilePartition(PartitionedMobCompactionRequest request, 533 CompactionPartition partition, 534 List<HStoreFile> delFiles, 535 Connection connection, 536 Table table) throws IOException { 537 if (MobUtils.isMobFileExpired(column, EnvironmentEdgeManager.currentTime(), 538 partition.getPartitionId().getDate())) { 539 // If the files in the partition are expired, do not compact them and directly 540 // return an empty list. 541 return Collections.emptyList(); 542 } 543 List<Path> newFiles = new ArrayList<>(); 544 List<FileStatus> files = partition.listFiles(); 545 int offset = 0; 546 Path bulkloadPathOfPartition = new Path(bulkloadPath, partition.getPartitionId().toString()); 547 Path bulkloadColumnPath = new Path(bulkloadPathOfPartition, column.getNameAsString()); 548 while (offset < files.size()) { 549 int batch = compactionBatchSize; 550 if (files.size() - offset < compactionBatchSize) { 551 batch = files.size() - offset; 552 } 553 if (batch == 1 && delFiles.isEmpty()) { 554 // only one file left and no del files, do not compact it, 555 // and directly add it to the new files. 556 newFiles.add(files.get(offset).getPath()); 557 offset++; 558 continue; 559 } 560 // clean the bulkload directory to avoid loading old files. 561 fs.delete(bulkloadPathOfPartition, true); 562 // add the selected mob files and del files into filesToCompact 563 List<HStoreFile> filesToCompact = new ArrayList<>(); 564 for (int i = offset; i < batch + offset; i++) { 565 HStoreFile sf = new HStoreFile(fs, files.get(i).getPath(), conf, compactionCacheConfig, 566 BloomType.NONE, true); 567 filesToCompact.add(sf); 568 } 569 filesToCompact.addAll(delFiles); 570 // compact the mob files in a batch. 571 compactMobFilesInBatch(request, partition, connection, table, filesToCompact, batch, 572 bulkloadPathOfPartition, bulkloadColumnPath, newFiles); 573 // move to the next batch. 574 offset += batch; 575 } 576 LOG.info("Compaction is finished. The number of mob files is changed from {} to {} for " + 577 "partition={} for table='{}' and column='{}'", files.size(), newFiles.size(), 578 partition.getPartitionId(), tableName, column.getNameAsString()); 579 return newFiles; 580 } 581 582 /** 583 * Closes the readers of store files. 584 * @param storeFiles The store files to be closed. 585 */ 586 private void closeStoreFileReaders(List<HStoreFile> storeFiles) { 587 for (HStoreFile storeFile : storeFiles) { 588 try { 589 storeFile.closeStoreFile(true); 590 } catch (IOException e) { 591 LOG.warn("Failed to close the reader on store file " + storeFile.getPath(), e); 592 } 593 } 594 } 595 596 /** 597 * Compacts a partition of selected small mob files and all the del files in a batch. 598 * @param request The compaction request. 599 * @param partition A compaction partition. 600 * @param connection To use for transport 601 * @param table The current table. 602 * @param filesToCompact The files to be compacted. 603 * @param batch The number of mob files to be compacted in a batch. 604 * @param bulkloadPathOfPartition The directory where the bulkload column of the current 605 * partition is saved. 606 * @param bulkloadColumnPath The directory where the bulkload files of current partition 607 * are saved. 608 * @param newFiles The paths of new mob files after compactions. 609 * @throws IOException if IO failure is encountered 610 */ 611 private void compactMobFilesInBatch(PartitionedMobCompactionRequest request, 612 CompactionPartition partition, 613 Connection connection, Table table, 614 List<HStoreFile> filesToCompact, int batch, 615 Path bulkloadPathOfPartition, Path bulkloadColumnPath, 616 List<Path> newFiles) 617 throws IOException { 618 // open scanner to the selected mob files and del files. 619 StoreScanner scanner = createScanner(filesToCompact, ScanType.COMPACT_DROP_DELETES); 620 // the mob files to be compacted, not include the del files. 621 List<HStoreFile> mobFilesToCompact = filesToCompact.subList(0, batch); 622 // Pair(maxSeqId, cellsCount) 623 Pair<Long, Long> fileInfo = getFileInfo(mobFilesToCompact); 624 // open writers for the mob files and new ref store files. 625 StoreFileWriter writer = null; 626 StoreFileWriter refFileWriter = null; 627 Path filePath = null; 628 long mobCells = 0; 629 boolean cleanupTmpMobFile = false; 630 boolean cleanupBulkloadDirOfPartition = false; 631 boolean cleanupCommittedMobFile = false; 632 boolean closeReaders= true; 633 634 try { 635 try { 636 writer = MobUtils 637 .createWriter(conf, fs, column, partition.getPartitionId().getLatestDate(), tempPath, 638 Long.MAX_VALUE, column.getCompactionCompressionType(), 639 partition.getPartitionId().getStartKey(), compactionCacheConfig, cryptoContext, 640 true); 641 cleanupTmpMobFile = true; 642 filePath = writer.getPath(); 643 byte[] fileName = Bytes.toBytes(filePath.getName()); 644 // create a temp file and open a writer for it in the bulkloadPath 645 refFileWriter = MobUtils.createRefFileWriter(conf, fs, column, bulkloadColumnPath, 646 fileInfo.getSecond().longValue(), compactionCacheConfig, cryptoContext, true); 647 cleanupBulkloadDirOfPartition = true; 648 List<Cell> cells = new ArrayList<>(); 649 boolean hasMore; 650 ScannerContext scannerContext = 651 ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); 652 do { 653 hasMore = scanner.next(cells, scannerContext); 654 for (Cell cell : cells) { 655 // write the mob cell to the mob file. 656 writer.append(cell); 657 // write the new reference cell to the store file. 658 Cell reference = MobUtils.createMobRefCell(cell, fileName, this.refCellTags); 659 refFileWriter.append(reference); 660 mobCells++; 661 } 662 cells.clear(); 663 } while (hasMore); 664 } finally { 665 // close the scanner. 666 scanner.close(); 667 668 if (cleanupTmpMobFile) { 669 // append metadata to the mob file, and close the mob file writer. 670 closeMobFileWriter(writer, fileInfo.getFirst(), mobCells); 671 } 672 673 if (cleanupBulkloadDirOfPartition) { 674 // append metadata and bulkload info to the ref mob file, and close the writer. 675 closeRefFileWriter(refFileWriter, fileInfo.getFirst(), request.selectionTime); 676 } 677 } 678 679 if (mobCells > 0) { 680 // commit mob file 681 MobUtils.commitFile(conf, fs, filePath, mobFamilyDir, compactionCacheConfig); 682 cleanupTmpMobFile = false; 683 cleanupCommittedMobFile = true; 684 // bulkload the ref file 685 LOG.info("start MOB ref bulkload for partition {} table='{}' column='{}'", 686 partition.getPartitionId(), tableName, column.getNameAsString()); 687 bulkloadRefFile(connection, table, bulkloadPathOfPartition, filePath.getName()); 688 cleanupCommittedMobFile = false; 689 LOG.info("end MOB ref bulkload for partition {} table='{}' column='{}'", 690 partition.getPartitionId(), tableName, column.getNameAsString()); 691 newFiles.add(new Path(mobFamilyDir, filePath.getName())); 692 } 693 694 // archive the old mob files, do not archive the del files. 695 try { 696 closeStoreFileReaders(mobFilesToCompact); 697 closeReaders = false; 698 MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), mobFilesToCompact); 699 } catch (IOException e) { 700 LOG.error("Failed to archive the files " + mobFilesToCompact, e); 701 } 702 } finally { 703 if (closeReaders) { 704 closeStoreFileReaders(mobFilesToCompact); 705 } 706 707 if (cleanupTmpMobFile) { 708 deletePath(filePath); 709 } 710 711 if (cleanupBulkloadDirOfPartition) { 712 // delete the bulkload files in bulkloadPath 713 deletePath(bulkloadPathOfPartition); 714 } 715 716 if (cleanupCommittedMobFile) { 717 LOG.error("failed MOB ref bulkload for partition {} table='{}' column='{}'", 718 partition.getPartitionId(), tableName, column.getNameAsString()); 719 MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), 720 Collections.singletonList(new HStoreFile(fs, new Path(mobFamilyDir, filePath.getName()), 721 conf, compactionCacheConfig, BloomType.NONE, true))); 722 } 723 } 724 } 725 726 /** 727 * Compacts the del files in batches which avoids opening too many files. 728 * @param request The compaction request. 729 * @param delFilePaths Del file paths to compact 730 * @return The paths of new del files after merging or the original files if no merging 731 * is necessary. 732 * @throws IOException if IO failure is encountered 733 */ 734 protected List<Path> compactDelFiles(PartitionedMobCompactionRequest request, 735 List<Path> delFilePaths) throws IOException { 736 if (delFilePaths.size() <= delFileMaxCount) { 737 return delFilePaths; 738 } 739 // when there are more del files than the number that is allowed, merge it firstly. 740 int offset = 0; 741 List<Path> paths = new ArrayList<>(); 742 while (offset < delFilePaths.size()) { 743 // get the batch 744 int batch = compactionBatchSize; 745 if (delFilePaths.size() - offset < compactionBatchSize) { 746 batch = delFilePaths.size() - offset; 747 } 748 List<HStoreFile> batchedDelFiles = new ArrayList<>(); 749 if (batch == 1) { 750 // only one file left, do not compact it, directly add it to the new files. 751 paths.add(delFilePaths.get(offset)); 752 offset++; 753 continue; 754 } 755 for (int i = offset; i < batch + offset; i++) { 756 batchedDelFiles.add(new HStoreFile(fs, delFilePaths.get(i), conf, compactionCacheConfig, 757 BloomType.NONE, true)); 758 } 759 // compact the del files in a batch. 760 paths.add(compactDelFilesInBatch(request, batchedDelFiles)); 761 // move to the next batch. 762 offset += batch; 763 } 764 return compactDelFiles(request, paths); 765 } 766 767 /** 768 * Compacts the del file in a batch. 769 * @param request The compaction request. 770 * @param delFiles The del files. 771 * @return The path of new del file after merging. 772 * @throws IOException if IO failure is encountered 773 */ 774 private Path compactDelFilesInBatch(PartitionedMobCompactionRequest request, 775 List<HStoreFile> delFiles) throws IOException { 776 // create a scanner for the del files. 777 StoreScanner scanner = createScanner(delFiles, ScanType.COMPACT_RETAIN_DELETES); 778 StoreFileWriter writer = null; 779 Path filePath = null; 780 try { 781 writer = MobUtils.createDelFileWriter(conf, fs, column, 782 MobUtils.formatDate(new Date(request.selectionTime)), tempPath, Long.MAX_VALUE, 783 column.getCompactionCompressionType(), HConstants.EMPTY_START_ROW, compactionCacheConfig, 784 cryptoContext); 785 filePath = writer.getPath(); 786 List<Cell> cells = new ArrayList<>(); 787 boolean hasMore; 788 ScannerContext scannerContext = 789 ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); 790 do { 791 hasMore = scanner.next(cells, scannerContext); 792 for (Cell cell : cells) { 793 writer.append(cell); 794 } 795 cells.clear(); 796 } while (hasMore); 797 } finally { 798 scanner.close(); 799 if (writer != null) { 800 try { 801 writer.close(); 802 } catch (IOException e) { 803 LOG.error("Failed to close the writer of the file " + filePath, e); 804 } 805 } 806 } 807 // commit the new del file 808 Path path = MobUtils.commitFile(conf, fs, filePath, mobFamilyDir, compactionCacheConfig); 809 // archive the old del files 810 try { 811 MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), delFiles); 812 } catch (IOException e) { 813 LOG.error("Failed to archive the old del files " + delFiles, e); 814 } 815 return path; 816 } 817 818 /** 819 * Creates a store scanner. 820 * @param filesToCompact The files to be compacted. 821 * @param scanType The scan type. 822 * @return The store scanner. 823 * @throws IOException if IO failure is encountered 824 */ 825 private StoreScanner createScanner(List<HStoreFile> filesToCompact, ScanType scanType) 826 throws IOException { 827 List<StoreFileScanner> scanners = StoreFileScanner.getScannersForStoreFiles(filesToCompact, 828 false, true, false, false, HConstants.LATEST_TIMESTAMP); 829 long ttl = HStore.determineTTLFromFamily(column); 830 ScanInfo scanInfo = new ScanInfo(conf, column, ttl, 0, CellComparator.getInstance()); 831 return new StoreScanner(scanInfo, scanType, scanners); 832 } 833 834 /** 835 * Bulkloads the current file. 836 * 837 * @param connection to use to get admin/RegionLocator 838 * @param table The current table. 839 * @param bulkloadDirectory The path of bulkload directory. 840 * @param fileName The current file name. 841 * @throws IOException if IO failure is encountered 842 */ 843 private void bulkloadRefFile(Connection connection, Table table, Path bulkloadDirectory, 844 String fileName) 845 throws IOException { 846 // bulkload the ref file 847 try { 848 LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf); 849 bulkload.disableReplication(); 850 bulkload.doBulkLoad(bulkloadDirectory, connection.getAdmin(), table, 851 connection.getRegionLocator(table.getName())); 852 } catch (Exception e) { 853 throw new IOException(e); 854 } 855 } 856 857 /** 858 * Closes the mob file writer. 859 * @param writer The mob file writer. 860 * @param maxSeqId Maximum sequence id. 861 * @param mobCellsCount The number of mob cells. 862 * @throws IOException if IO failure is encountered 863 */ 864 private void closeMobFileWriter(StoreFileWriter writer, long maxSeqId, long mobCellsCount) 865 throws IOException { 866 if (writer != null) { 867 writer.appendMetadata(maxSeqId, false, mobCellsCount); 868 try { 869 writer.close(); 870 } catch (IOException e) { 871 LOG.error("Failed to close the writer of the file " + writer.getPath(), e); 872 } 873 } 874 } 875 876 /** 877 * Closes the ref file writer. 878 * @param writer The ref file writer. 879 * @param maxSeqId Maximum sequence id. 880 * @param bulkloadTime The timestamp at which the bulk load file is created. 881 * @throws IOException if IO failure is encountered 882 */ 883 private void closeRefFileWriter(StoreFileWriter writer, long maxSeqId, long bulkloadTime) 884 throws IOException { 885 if (writer != null) { 886 writer.appendMetadata(maxSeqId, false); 887 writer.appendFileInfo(BULKLOAD_TIME_KEY, Bytes.toBytes(bulkloadTime)); 888 writer.appendFileInfo(SKIP_RESET_SEQ_ID, Bytes.toBytes(true)); 889 try { 890 writer.close(); 891 } catch (IOException e) { 892 LOG.error("Failed to close the writer of the ref file " + writer.getPath(), e); 893 } 894 } 895 } 896 897 /** 898 * Gets the max seqId and number of cells of the store files. 899 * @param storeFiles The store files. 900 * @return The pair of the max seqId and number of cells of the store files. 901 * @throws IOException if IO failure is encountered 902 */ 903 private Pair<Long, Long> getFileInfo(List<HStoreFile> storeFiles) throws IOException { 904 long maxSeqId = 0; 905 long maxKeyCount = 0; 906 for (HStoreFile sf : storeFiles) { 907 // the readers will be closed later after the merge. 908 maxSeqId = Math.max(maxSeqId, sf.getMaxSequenceId()); 909 sf.initReader(); 910 byte[] count = sf.getReader().loadFileInfo().get(MOB_CELLS_COUNT); 911 if (count != null) { 912 maxKeyCount += Bytes.toLong(count); 913 } 914 } 915 return new Pair<>(maxSeqId, maxKeyCount); 916 } 917 918 /** 919 * Deletes a file. 920 * @param path The path of the file to be deleted. 921 */ 922 private void deletePath(Path path) { 923 LOG.debug("Cleanup, delete path '{}'", path); 924 try { 925 if (path != null) { 926 fs.delete(path, true); 927 } 928 } catch (IOException e) { 929 LOG.error("Failed to delete the file " + path, e); 930 } 931 } 932 933 private FileStatus getLinkedFileStatus(HFileLink link) throws IOException { 934 Path[] locations = link.getLocations(); 935 FileStatus file; 936 for (Path location : locations) { 937 938 if (location != null) { 939 try { 940 file = fs.getFileStatus(location); 941 if (file != null) { 942 return file; 943 } 944 } catch (FileNotFoundException e) { 945 } 946 } 947 } 948 LOG.warn("The file " + link + " links to can not be found"); 949 return null; 950 } 951}