001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import java.io.FileNotFoundException; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Arrays; 024import java.util.Collections; 025import java.util.List; 026import java.util.NavigableSet; 027import java.util.TreeSet; 028import java.util.UUID; 029import java.util.regex.Matcher; 030import java.util.regex.Pattern; 031import org.apache.commons.lang3.ArrayUtils; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FileAlreadyExistsException; 034import org.apache.hadoop.fs.FileStatus; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.fs.PathFilter; 038import org.apache.hadoop.hbase.Cell; 039import org.apache.hadoop.hbase.CellScanner; 040import org.apache.hadoop.hbase.CellUtil; 041import org.apache.hadoop.hbase.HConstants; 042import org.apache.hadoop.hbase.TableName; 043import org.apache.hadoop.hbase.client.Delete; 044import org.apache.hadoop.hbase.client.Durability; 045import org.apache.hadoop.hbase.client.Mutation; 046import org.apache.hadoop.hbase.client.Put; 047import org.apache.hadoop.hbase.client.RegionInfo; 048import org.apache.hadoop.hbase.regionserver.HRegion; 049import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; 050import org.apache.hadoop.hbase.util.Bytes; 051import org.apache.hadoop.hbase.util.CommonFSUtils; 052import org.apache.hadoop.hbase.util.ConcurrentMapUtils.IOExceptionSupplier; 053import org.apache.hadoop.hbase.util.FSUtils; 054import org.apache.hadoop.hbase.util.Pair; 055import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; 056import org.apache.yetus.audience.InterfaceAudience; 057import org.slf4j.Logger; 058import org.slf4j.LoggerFactory; 059 060import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 061import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 062import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 063 064/** 065 * This class provides static methods to support WAL splitting related works 066 */ 067@InterfaceAudience.Private 068public final class WALSplitUtil { 069 private static final Logger LOG = LoggerFactory.getLogger(WALSplitUtil.class); 070 071 private static final Pattern EDITFILES_NAME_PATTERN = Pattern.compile("-?[0-9]+"); 072 private static final String RECOVERED_LOG_TMPFILE_SUFFIX = ".temp"; 073 private static final String SEQUENCE_ID_FILE_SUFFIX = ".seqid"; 074 private static final String OLD_SEQUENCE_ID_FILE_SUFFIX = "_seqid"; 075 private static final int SEQUENCE_ID_FILE_SUFFIX_LENGTH = SEQUENCE_ID_FILE_SUFFIX.length(); 076 077 private WALSplitUtil() { 078 } 079 080 /** 081 * Completes the work done by splitLogFile by archiving logs 082 * <p> 083 * It is invoked by SplitLogManager once it knows that one of the SplitLogWorkers have completed 084 * the splitLogFile() part. If the master crashes then this function might get called multiple 085 * times. 086 * <p> 087 */ 088 public static void finishSplitLogFile(String logfile, Configuration conf) throws IOException { 089 Path walDir = CommonFSUtils.getWALRootDir(conf); 090 Path oldLogDir = new Path(walDir, HConstants.HREGION_OLDLOGDIR_NAME); 091 Path walPath; 092 if (CommonFSUtils.isStartingWithPath(walDir, logfile)) { 093 walPath = new Path(logfile); 094 } else { 095 walPath = new Path(walDir, logfile); 096 } 097 FileSystem walFS = walDir.getFileSystem(conf); 098 boolean corrupt = ZKSplitLog.isCorrupted(walDir, walPath.getName(), walFS); 099 archive(walPath, corrupt, oldLogDir, walFS, conf); 100 Path stagingDir = ZKSplitLog.getSplitLogDir(walDir, walPath.getName()); 101 walFS.delete(stagingDir, true); 102 } 103 104 /** 105 * Moves processed logs to a oldLogDir after successful processing Moves corrupted logs (any log 106 * that couldn't be successfully parsed to corruptDir (.corrupt) for later investigation 107 */ 108 static void archive(final Path wal, final boolean corrupt, final Path oldWALDir, 109 final FileSystem walFS, final Configuration conf) throws IOException { 110 Path dir; 111 Path target; 112 if (corrupt) { 113 dir = new Path(CommonFSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME); 114 if (conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir") != null) { 115 LOG.warn("hbase.regionserver.hlog.splitlog.corrupt.dir is deprecated. Default to {}", dir); 116 } 117 target = new Path(dir, wal.getName()); 118 } else { 119 dir = oldWALDir; 120 target = AbstractFSWAL.getWALArchivePath(oldWALDir, wal); 121 } 122 mkdir(walFS, dir); 123 moveWAL(walFS, wal, target); 124 } 125 126 private static void mkdir(FileSystem fs, Path dir) throws IOException { 127 if (!fs.mkdirs(dir)) { 128 LOG.warn("Failed mkdir {}", dir); 129 } 130 } 131 132 /** 133 * Move WAL. Used to move processed WALs to archive or bad WALs to corrupt WAL dir. 134 * WAL may have already been moved; makes allowance. 135 */ 136 public static void moveWAL(FileSystem fs, Path p, Path targetDir) throws IOException { 137 if (fs.exists(p)) { 138 if (!CommonFSUtils.renameAndSetModifyTime(fs, p, targetDir)) { 139 LOG.warn("Failed move of {} to {}", p, targetDir); 140 } else { 141 LOG.info("Moved {} to {}", p, targetDir); 142 } 143 } 144 } 145 146 /** 147 * Path to a file under RECOVERED_EDITS_DIR directory of the region found in <code>logEntry</code> 148 * named for the sequenceid in the passed <code>logEntry</code>: e.g. 149 * /hbase/some_table/2323432434/recovered.edits/2332. This method also ensures existence of 150 * RECOVERED_EDITS_DIR under the region creating it if necessary. 151 * @param tableName the table name 152 * @param encodedRegionName the encoded region name 153 * @param seqId the sequence id which used to generate file name 154 * @param fileNameBeingSplit the file being split currently. Used to generate tmp file name. 155 * @param tmpDirName of the directory used to sideline old recovered edits file 156 * @param conf configuration 157 * @return Path to file into which to dump split log edits. 158 */ 159 @SuppressWarnings("deprecation") 160 static Path getRegionSplitEditsPath(TableName tableName, byte[] encodedRegionName, long seqId, 161 String fileNameBeingSplit, String tmpDirName, Configuration conf) throws IOException { 162 FileSystem walFS = CommonFSUtils.getWALFileSystem(conf); 163 Path tableDir = CommonFSUtils.getWALTableDir(conf, tableName); 164 String encodedRegionNameStr = Bytes.toString(encodedRegionName); 165 Path regionDir = HRegion.getRegionDir(tableDir, encodedRegionNameStr); 166 Path dir = getRegionDirRecoveredEditsDir(regionDir); 167 168 if (walFS.exists(dir) && walFS.isFile(dir)) { 169 Path tmp = new Path(tmpDirName); 170 if (!walFS.exists(tmp)) { 171 walFS.mkdirs(tmp); 172 } 173 tmp = new Path(tmp, HConstants.RECOVERED_EDITS_DIR + "_" + encodedRegionNameStr); 174 LOG.warn("Found existing old file: {}. It could be some " 175 + "leftover of an old installation. It should be a folder instead. " 176 + "So moving it to {}", 177 dir, tmp); 178 if (!walFS.rename(dir, tmp)) { 179 LOG.warn("Failed to sideline old file {}", dir); 180 } 181 } 182 183 if (!walFS.exists(dir) && !walFS.mkdirs(dir)) { 184 LOG.warn("mkdir failed on {}", dir); 185 } 186 // Append fileBeingSplit to prevent name conflict since we may have duplicate wal entries now. 187 // Append file name ends with RECOVERED_LOG_TMPFILE_SUFFIX to ensure 188 // region's replayRecoveredEdits will not delete it 189 String fileName = formatRecoveredEditsFileName(seqId); 190 fileName = getTmpRecoveredEditsFileName(fileName + "-" + fileNameBeingSplit); 191 return new Path(dir, fileName); 192 } 193 194 private static String getTmpRecoveredEditsFileName(String fileName) { 195 return fileName + RECOVERED_LOG_TMPFILE_SUFFIX; 196 } 197 198 /** 199 * Get the completed recovered edits file path, renaming it to be by last edit in the file from 200 * its first edit. Then we could use the name to skip recovered edits when doing 201 * HRegion#replayRecoveredEditsIfAny(Map, CancelableProgressable, MonitoredTask). 202 * @return dstPath take file's last edit log seq num as the name 203 */ 204 static Path getCompletedRecoveredEditsFilePath(Path srcPath, long maximumEditWALSeqNum) { 205 String fileName = formatRecoveredEditsFileName(maximumEditWALSeqNum); 206 return new Path(srcPath.getParent(), fileName); 207 } 208 209 static String formatRecoveredEditsFileName(final long seqid) { 210 return String.format("%019d", seqid); 211 } 212 213 /** 214 * @param regionDir This regions directory in the filesystem. 215 * @return The directory that holds recovered edits files for the region <code>regionDir</code> 216 */ 217 public static Path getRegionDirRecoveredEditsDir(final Path regionDir) { 218 return new Path(regionDir, HConstants.RECOVERED_EDITS_DIR); 219 } 220 221 /** 222 * Check whether there is recovered.edits in the region dir 223 * @param conf conf 224 * @param regionInfo the region to check 225 * @return true if recovered.edits exist in the region dir 226 */ 227 public static boolean hasRecoveredEdits(final Configuration conf, final RegionInfo regionInfo) 228 throws IOException { 229 // No recovered.edits for non default replica regions 230 if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { 231 return false; 232 } 233 // Only default replica region can reach here, so we can use regioninfo 234 // directly without converting it to default replica's regioninfo. 235 Path regionWALDir = 236 CommonFSUtils.getWALRegionDir(conf, regionInfo.getTable(), regionInfo.getEncodedName()); 237 Path regionDir = FSUtils.getRegionDirFromRootDir(CommonFSUtils.getRootDir(conf), regionInfo); 238 Path wrongRegionWALDir = 239 CommonFSUtils.getWrongWALRegionDir(conf, regionInfo.getTable(), regionInfo.getEncodedName()); 240 FileSystem walFs = CommonFSUtils.getWALFileSystem(conf); 241 FileSystem rootFs = CommonFSUtils.getRootDirFileSystem(conf); 242 NavigableSet<Path> files = getSplitEditFilesSorted(walFs, regionWALDir); 243 if (!files.isEmpty()) { 244 return true; 245 } 246 files = getSplitEditFilesSorted(rootFs, regionDir); 247 if (!files.isEmpty()) { 248 return true; 249 } 250 files = getSplitEditFilesSorted(walFs, wrongRegionWALDir); 251 return !files.isEmpty(); 252 } 253 254 /** 255 * This method will check 3 places for finding the max sequence id file. One is the expected 256 * place, another is the old place under the region directory, and the last one is the wrong one 257 * we introduced in HBASE-20734. See HBASE-22617 for more details. 258 * <p/> 259 * Notice that, you should always call this method instead of 260 * {@link #getMaxRegionSequenceId(FileSystem, Path)} until 4.0.0 release. 261 * @deprecated Only for compatibility, will be removed in 4.0.0. 262 */ 263 @Deprecated 264 public static long getMaxRegionSequenceId(Configuration conf, RegionInfo region, 265 IOExceptionSupplier<FileSystem> rootFsSupplier, IOExceptionSupplier<FileSystem> walFsSupplier) 266 throws IOException { 267 FileSystem rootFs = rootFsSupplier.get(); 268 FileSystem walFs = walFsSupplier.get(); 269 Path regionWALDir = 270 CommonFSUtils.getWALRegionDir(conf, region.getTable(), region.getEncodedName()); 271 // This is the old place where we store max sequence id file 272 Path regionDir = FSUtils.getRegionDirFromRootDir(CommonFSUtils.getRootDir(conf), region); 273 // This is for HBASE-20734, where we use a wrong directory, see HBASE-22617 for more details. 274 Path wrongRegionWALDir = 275 CommonFSUtils.getWrongWALRegionDir(conf, region.getTable(), region.getEncodedName()); 276 long maxSeqId = getMaxRegionSequenceId(walFs, regionWALDir); 277 maxSeqId = Math.max(maxSeqId, getMaxRegionSequenceId(rootFs, regionDir)); 278 maxSeqId = Math.max(maxSeqId, getMaxRegionSequenceId(walFs, wrongRegionWALDir)); 279 return maxSeqId; 280 } 281 282 /** 283 * Returns sorted set of edit files made by splitter, excluding files with '.temp' suffix. 284 * @param walFS WAL FileSystem used to retrieving split edits files. 285 * @param regionDir WAL region dir to look for recovered edits files under. 286 * @return Files in passed <code>regionDir</code> as a sorted set. 287 */ 288 public static NavigableSet<Path> getSplitEditFilesSorted(final FileSystem walFS, 289 final Path regionDir) throws IOException { 290 NavigableSet<Path> filesSorted = new TreeSet<>(); 291 Path editsdir = getRegionDirRecoveredEditsDir(regionDir); 292 if (!walFS.exists(editsdir)) { 293 return filesSorted; 294 } 295 FileStatus[] files = CommonFSUtils.listStatus(walFS, editsdir, new PathFilter() { 296 @Override 297 public boolean accept(Path p) { 298 boolean result = false; 299 try { 300 // Return files and only files that match the editfile names pattern. 301 // There can be other files in this directory other than edit files. 302 // In particular, on error, we'll move aside the bad edit file giving 303 // it a timestamp suffix. See moveAsideBadEditsFile. 304 Matcher m = EDITFILES_NAME_PATTERN.matcher(p.getName()); 305 result = walFS.isFile(p) && m.matches(); 306 // Skip the file whose name ends with RECOVERED_LOG_TMPFILE_SUFFIX, 307 // because it means splitwal thread is writting this file. 308 if (p.getName().endsWith(RECOVERED_LOG_TMPFILE_SUFFIX)) { 309 result = false; 310 } 311 // Skip SeqId Files 312 if (isSequenceIdFile(p)) { 313 result = false; 314 } 315 } catch (IOException e) { 316 LOG.warn("Failed isFile check on {}", p, e); 317 } 318 return result; 319 } 320 }); 321 if (ArrayUtils.isNotEmpty(files)) { 322 Arrays.asList(files).forEach(status -> filesSorted.add(status.getPath())); 323 } 324 return filesSorted; 325 } 326 327 /** 328 * Move aside a bad edits file. 329 * @param fs the file system used to rename bad edits file. 330 * @param edits Edits file to move aside. 331 * @return The name of the moved aside file. 332 */ 333 public static Path moveAsideBadEditsFile(final FileSystem fs, final Path edits) 334 throws IOException { 335 Path moveAsideName = 336 new Path(edits.getParent(), edits.getName() + "." + System.currentTimeMillis()); 337 if (!fs.rename(edits, moveAsideName)) { 338 LOG.warn("Rename failed from {} to {}", edits, moveAsideName); 339 } 340 return moveAsideName; 341 } 342 343 /** 344 * Is the given file a region open sequence id file. 345 */ 346 public static boolean isSequenceIdFile(final Path file) { 347 return file.getName().endsWith(SEQUENCE_ID_FILE_SUFFIX) 348 || file.getName().endsWith(OLD_SEQUENCE_ID_FILE_SUFFIX); 349 } 350 351 private static FileStatus[] getSequenceIdFiles(FileSystem walFS, Path regionDir) 352 throws IOException { 353 // TODO: Why are we using a method in here as part of our normal region open where 354 // there is no splitting involved? Fix. St.Ack 01/20/2017. 355 Path editsDir = getRegionDirRecoveredEditsDir(regionDir); 356 try { 357 FileStatus[] files = walFS.listStatus(editsDir, WALSplitUtil::isSequenceIdFile); 358 return files != null ? files : new FileStatus[0]; 359 } catch (FileNotFoundException e) { 360 return new FileStatus[0]; 361 } 362 } 363 364 private static long getMaxSequenceId(FileStatus[] files) { 365 long maxSeqId = -1L; 366 for (FileStatus file : files) { 367 String fileName = file.getPath().getName(); 368 try { 369 maxSeqId = Math.max(maxSeqId, Long 370 .parseLong(fileName.substring(0, fileName.length() - SEQUENCE_ID_FILE_SUFFIX_LENGTH))); 371 } catch (NumberFormatException ex) { 372 LOG.warn("Invalid SeqId File Name={}", fileName); 373 } 374 } 375 return maxSeqId; 376 } 377 378 /** 379 * Get the max sequence id which is stored in the region directory. -1 if none. 380 */ 381 public static long getMaxRegionSequenceId(FileSystem walFS, Path regionDir) throws IOException { 382 return getMaxSequenceId(getSequenceIdFiles(walFS, regionDir)); 383 } 384 385 /** 386 * Create a file with name as region's max sequence id 387 */ 388 public static void writeRegionSequenceIdFile(FileSystem walFS, Path regionDir, long newMaxSeqId) 389 throws IOException { 390 FileStatus[] files = getSequenceIdFiles(walFS, regionDir); 391 long maxSeqId = getMaxSequenceId(files); 392 if (maxSeqId > newMaxSeqId) { 393 throw new IOException("The new max sequence id " + newMaxSeqId 394 + " is less than the old max sequence id " + maxSeqId); 395 } 396 // write a new seqId file 397 Path newSeqIdFile = 398 new Path(getRegionDirRecoveredEditsDir(regionDir), newMaxSeqId + SEQUENCE_ID_FILE_SUFFIX); 399 if (newMaxSeqId != maxSeqId) { 400 try { 401 if (!walFS.createNewFile(newSeqIdFile) && !walFS.exists(newSeqIdFile)) { 402 throw new IOException("Failed to create SeqId file:" + newSeqIdFile); 403 } 404 LOG.debug("Wrote file={}, newMaxSeqId={}, maxSeqId={}", newSeqIdFile, newMaxSeqId, 405 maxSeqId); 406 } catch (FileAlreadyExistsException ignored) { 407 // latest hdfs throws this exception. it's all right if newSeqIdFile already exists 408 } 409 } 410 // remove old ones 411 for (FileStatus status : files) { 412 if (!newSeqIdFile.equals(status.getPath())) { 413 walFS.delete(status.getPath(), false); 414 } 415 } 416 } 417 418 /** A struct used by getMutationsFromWALEntry */ 419 public static class MutationReplay implements Comparable<MutationReplay> { 420 public MutationReplay(ClientProtos.MutationProto.MutationType type, Mutation mutation, 421 long nonceGroup, long nonce) { 422 this.type = type; 423 this.mutation = mutation; 424 if (this.mutation.getDurability() != Durability.SKIP_WAL) { 425 // using ASYNC_WAL for relay 426 this.mutation.setDurability(Durability.ASYNC_WAL); 427 } 428 this.nonceGroup = nonceGroup; 429 this.nonce = nonce; 430 } 431 432 private final ClientProtos.MutationProto.MutationType type; 433 @SuppressWarnings("checkstyle:VisibilityModifier") public final Mutation mutation; 434 @SuppressWarnings("checkstyle:VisibilityModifier") public final long nonceGroup; 435 @SuppressWarnings("checkstyle:VisibilityModifier") public final long nonce; 436 437 @Override 438 public int compareTo(final MutationReplay d) { 439 return this.mutation.compareTo(d.mutation); 440 } 441 442 @Override 443 public boolean equals(Object obj) { 444 if (!(obj instanceof MutationReplay)) { 445 return false; 446 } else { 447 return this.compareTo((MutationReplay) obj) == 0; 448 } 449 } 450 451 @Override 452 public int hashCode() { 453 return this.mutation.hashCode(); 454 } 455 456 public ClientProtos.MutationProto.MutationType getType() { 457 return type; 458 } 459 } 460 461 /** 462 * This function is used to construct mutations from a WALEntry. It also reconstructs WALKey & 463 * WALEdit from the passed in WALEntry 464 * @param logEntry pair of WALKey and WALEdit instance stores WALKey and WALEdit instances 465 * extracted from the passed in WALEntry. 466 * @return list of Pair<MutationType, Mutation> to be replayed 467 */ 468 public static List<MutationReplay> getMutationsFromWALEntry(AdminProtos.WALEntry entry, 469 CellScanner cells, Pair<WALKey, WALEdit> logEntry, Durability durability) throws IOException { 470 if (entry == null) { 471 // return an empty array 472 return Collections.emptyList(); 473 } 474 475 long replaySeqId = 476 (entry.getKey().hasOrigSequenceNumber()) ? entry.getKey().getOrigSequenceNumber() 477 : entry.getKey().getLogSequenceNumber(); 478 int count = entry.getAssociatedCellCount(); 479 List<MutationReplay> mutations = new ArrayList<>(); 480 Cell previousCell = null; 481 Mutation m = null; 482 WALKeyImpl key = null; 483 WALEdit val = null; 484 if (logEntry != null) { 485 val = new WALEdit(); 486 } 487 488 for (int i = 0; i < count; i++) { 489 // Throw index out of bounds if our cell count is off 490 if (!cells.advance()) { 491 throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i); 492 } 493 Cell cell = cells.current(); 494 if (val != null) { 495 val.add(cell); 496 } 497 498 boolean isNewRowOrType = 499 previousCell == null || previousCell.getTypeByte() != cell.getTypeByte() 500 || !CellUtil.matchingRows(previousCell, cell); 501 if (isNewRowOrType) { 502 // Create new mutation 503 if (CellUtil.isDelete(cell)) { 504 m = new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); 505 // Deletes don't have nonces. 506 mutations.add(new MutationReplay(ClientProtos.MutationProto.MutationType.DELETE, m, 507 HConstants.NO_NONCE, HConstants.NO_NONCE)); 508 } else { 509 m = new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); 510 // Puts might come from increment or append, thus we need nonces. 511 long nonceGroup = 512 entry.getKey().hasNonceGroup() ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE; 513 long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE; 514 mutations.add( 515 new MutationReplay(ClientProtos.MutationProto.MutationType.PUT, m, nonceGroup, nonce)); 516 } 517 } 518 if (CellUtil.isDelete(cell)) { 519 ((Delete) m).add(cell); 520 } else { 521 ((Put) m).add(cell); 522 } 523 m.setDurability(durability); 524 previousCell = cell; 525 } 526 527 // reconstruct WALKey 528 if (logEntry != null) { 529 org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALKey walKeyProto = 530 entry.getKey(); 531 List<UUID> clusterIds = new ArrayList<>(walKeyProto.getClusterIdsCount()); 532 for (HBaseProtos.UUID uuid : entry.getKey().getClusterIdsList()) { 533 clusterIds.add(new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits())); 534 } 535 key = new WALKeyImpl(walKeyProto.getEncodedRegionName().toByteArray(), 536 TableName.valueOf(walKeyProto.getTableName().toByteArray()), replaySeqId, 537 walKeyProto.getWriteTime(), clusterIds, walKeyProto.getNonceGroup(), 538 walKeyProto.getNonce(), null); 539 logEntry.setFirst(key); 540 logEntry.setSecond(val); 541 } 542 543 return mutations; 544 } 545 546 /** 547 * Return path to recovered.hfiles directory of the region's column family: e.g. 548 * /hbase/some_table/2323432434/cf/recovered.hfiles/. This method also ensures existence of 549 * recovered.hfiles directory under the region's column family, creating it if necessary. 550 * @param rootFS the root file system 551 * @param conf configuration 552 * @param tableName the table name 553 * @param encodedRegionName the encoded region name 554 * @param familyName the column family name 555 * @return Path to recovered.hfiles directory of the region's column family. 556 */ 557 static Path tryCreateRecoveredHFilesDir(FileSystem rootFS, Configuration conf, 558 TableName tableName, String encodedRegionName, String familyName) throws IOException { 559 Path rootDir = CommonFSUtils.getRootDir(conf); 560 Path regionDir = FSUtils.getRegionDirFromTableDir(CommonFSUtils.getTableDir(rootDir, tableName), 561 encodedRegionName); 562 Path dir = getRecoveredHFilesDir(regionDir, familyName); 563 if (!rootFS.exists(dir) && !rootFS.mkdirs(dir)) { 564 LOG.warn("mkdir failed on {}, region {}, column family {}", dir, encodedRegionName, 565 familyName); 566 } 567 return dir; 568 } 569 570 /** 571 * @param regionDir This regions directory in the filesystem 572 * @param familyName The column family name 573 * @return The directory that holds recovered hfiles for the region's column family 574 */ 575 private static Path getRecoveredHFilesDir(final Path regionDir, String familyName) { 576 return new Path(new Path(regionDir, familyName), HConstants.RECOVERED_HFILES_DIR); 577 } 578 579 public static FileStatus[] getRecoveredHFiles(final FileSystem rootFS, 580 final Path regionDir, String familyName) throws IOException { 581 Path dir = getRecoveredHFilesDir(regionDir, familyName); 582 return CommonFSUtils.listStatus(rootFS, dir); 583 } 584}