001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import java.io.FileNotFoundException; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Arrays; 024import java.util.Collections; 025import java.util.List; 026import java.util.NavigableSet; 027import java.util.TreeSet; 028import java.util.UUID; 029import java.util.regex.Matcher; 030import java.util.regex.Pattern; 031import org.apache.commons.lang3.ArrayUtils; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FileAlreadyExistsException; 034import org.apache.hadoop.fs.FileStatus; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.fs.PathFilter; 038import org.apache.hadoop.hbase.CellUtil; 039import org.apache.hadoop.hbase.ExtendedCell; 040import org.apache.hadoop.hbase.ExtendedCellScanner; 041import org.apache.hadoop.hbase.HConstants; 042import org.apache.hadoop.hbase.TableName; 043import org.apache.hadoop.hbase.client.Delete; 044import org.apache.hadoop.hbase.client.Durability; 045import org.apache.hadoop.hbase.client.Mutation; 046import org.apache.hadoop.hbase.client.Put; 047import org.apache.hadoop.hbase.client.RegionInfo; 048import org.apache.hadoop.hbase.client.Row; 049import org.apache.hadoop.hbase.regionserver.HRegion; 050import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; 051import org.apache.hadoop.hbase.util.Bytes; 052import org.apache.hadoop.hbase.util.CommonFSUtils; 053import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 054import org.apache.hadoop.hbase.util.FSUtils; 055import org.apache.hadoop.hbase.util.IOExceptionSupplier; 056import org.apache.hadoop.hbase.util.Pair; 057import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; 058import org.apache.yetus.audience.InterfaceAudience; 059import org.slf4j.Logger; 060import org.slf4j.LoggerFactory; 061 062import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 063import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 064import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 065 066/** 067 * This class provides static methods to support WAL splitting related works 068 */ 069@InterfaceAudience.Private 070public final class WALSplitUtil { 071 private static final Logger LOG = LoggerFactory.getLogger(WALSplitUtil.class); 072 073 private static final Pattern EDITFILES_NAME_PATTERN = Pattern.compile("-?[0-9]+"); 074 private static final String RECOVERED_LOG_TMPFILE_SUFFIX = ".temp"; 075 private static final String SEQUENCE_ID_FILE_SUFFIX = ".seqid"; 076 private static final String OLD_SEQUENCE_ID_FILE_SUFFIX = "_seqid"; 077 private static final int SEQUENCE_ID_FILE_SUFFIX_LENGTH = SEQUENCE_ID_FILE_SUFFIX.length(); 078 079 private WALSplitUtil() { 080 } 081 082 /** 083 * Completes the work done by splitLogFile by archiving logs 084 * <p> 085 * It is invoked by SplitLogManager once it knows that one of the SplitLogWorkers have completed 086 * the splitLogFile() part. If the master crashes then this function might get called multiple 087 * times. 088 * <p> 089 */ 090 public static void finishSplitLogFile(String logfile, Configuration conf) throws IOException { 091 Path walDir = CommonFSUtils.getWALRootDir(conf); 092 Path oldLogDir = new Path(walDir, HConstants.HREGION_OLDLOGDIR_NAME); 093 Path walPath; 094 if (CommonFSUtils.isStartingWithPath(walDir, logfile)) { 095 walPath = new Path(logfile); 096 } else { 097 walPath = new Path(walDir, logfile); 098 } 099 FileSystem walFS = walDir.getFileSystem(conf); 100 boolean corrupt = ZKSplitLog.isCorrupted(walDir, walPath.getName(), walFS); 101 archive(walPath, corrupt, oldLogDir, walFS, conf); 102 Path stagingDir = ZKSplitLog.getSplitLogDir(walDir, walPath.getName()); 103 walFS.delete(stagingDir, true); 104 } 105 106 /** 107 * Moves processed logs to a oldLogDir after successful processing Moves corrupted logs (any log 108 * that couldn't be successfully parsed to corruptDir (.corrupt) for later investigation 109 */ 110 static void archive(final Path wal, final boolean corrupt, final Path oldWALDir, 111 final FileSystem walFS, final Configuration conf) throws IOException { 112 Path dir; 113 Path target; 114 if (corrupt) { 115 dir = new Path(CommonFSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME); 116 if (conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir") != null) { 117 LOG.warn("hbase.regionserver.hlog.splitlog.corrupt.dir is deprecated. Default to {}", dir); 118 } 119 target = new Path(dir, wal.getName()); 120 } else { 121 dir = oldWALDir; 122 target = AbstractFSWAL.getWALArchivePath(oldWALDir, wal); 123 } 124 mkdir(walFS, dir); 125 moveWAL(walFS, wal, target); 126 } 127 128 private static void mkdir(FileSystem fs, Path dir) throws IOException { 129 if (!fs.mkdirs(dir)) { 130 LOG.warn("Failed mkdir {}", dir); 131 } 132 } 133 134 /** 135 * Move WAL. Used to move processed WALs to archive or bad WALs to corrupt WAL dir. WAL may have 136 * already been moved; makes allowance. 137 */ 138 public static void moveWAL(FileSystem fs, Path p, Path targetDir) throws IOException { 139 if (fs.exists(p)) { 140 if (!CommonFSUtils.renameAndSetModifyTime(fs, p, targetDir)) { 141 LOG.warn("Failed move of {} to {}", p, targetDir); 142 } else { 143 LOG.info("Moved {} to {}", p, targetDir); 144 } 145 } 146 } 147 148 /** 149 * Path to a file under RECOVERED_EDITS_DIR directory of the region found in <code>logEntry</code> 150 * named for the sequenceid in the passed <code>logEntry</code>: e.g. 151 * /hbase/some_table/2323432434/recovered.edits/2332. This method also ensures existence of 152 * RECOVERED_EDITS_DIR under the region creating it if necessary. And also set storage policy for 153 * RECOVERED_EDITS_DIR if WAL_STORAGE_POLICY is configured. 154 * @param tableName the table name 155 * @param encodedRegionName the encoded region name 156 * @param seqId the sequence id which used to generate file name 157 * @param fileNameBeingSplit the file being split currently. Used to generate tmp file name. 158 * @param tmpDirName of the directory used to sideline old recovered edits file 159 * @param conf configuration 160 * @return Path to file into which to dump split log edits. 161 */ 162 @SuppressWarnings("deprecation") 163 static Path getRegionSplitEditsPath(TableName tableName, byte[] encodedRegionName, long seqId, 164 String fileNameBeingSplit, String tmpDirName, Configuration conf) throws IOException { 165 FileSystem walFS = CommonFSUtils.getWALFileSystem(conf); 166 Path tableDir = CommonFSUtils.getWALTableDir(conf, tableName); 167 String encodedRegionNameStr = Bytes.toString(encodedRegionName); 168 Path regionDir = HRegion.getRegionDir(tableDir, encodedRegionNameStr); 169 Path dir = getRegionDirRecoveredEditsDir(regionDir); 170 171 if (walFS.exists(dir) && walFS.isFile(dir)) { 172 Path tmp = new Path(tmpDirName); 173 if (!walFS.exists(tmp)) { 174 walFS.mkdirs(tmp); 175 } 176 tmp = new Path(tmp, HConstants.RECOVERED_EDITS_DIR + "_" + encodedRegionNameStr); 177 LOG.warn("Found existing old file: {}. It could be some " 178 + "leftover of an old installation. It should be a folder instead. " + "So moving it to {}", 179 dir, tmp); 180 if (!walFS.rename(dir, tmp)) { 181 LOG.warn("Failed to sideline old file {}", dir); 182 } 183 } 184 185 if (!walFS.exists(dir) && !walFS.mkdirs(dir)) { 186 LOG.warn("mkdir failed on {}", dir); 187 } else { 188 String storagePolicy = 189 conf.get(HConstants.WAL_STORAGE_POLICY, HConstants.DEFAULT_WAL_STORAGE_POLICY); 190 CommonFSUtils.setStoragePolicy(walFS, dir, storagePolicy); 191 } 192 // Append fileBeingSplit to prevent name conflict since we may have duplicate wal entries now. 193 // Append file name ends with RECOVERED_LOG_TMPFILE_SUFFIX to ensure 194 // region's replayRecoveredEdits will not delete it 195 String fileName = formatRecoveredEditsFileName(seqId); 196 fileName = getTmpRecoveredEditsFileName(fileName + "-" + fileNameBeingSplit); 197 return new Path(dir, fileName); 198 } 199 200 private static String getTmpRecoveredEditsFileName(String fileName) { 201 return fileName + RECOVERED_LOG_TMPFILE_SUFFIX; 202 } 203 204 /** 205 * Get the completed recovered edits file path, renaming it to be by last edit in the file from 206 * its first edit. Then we could use the name to skip recovered edits when doing 207 * HRegion#replayRecoveredEditsIfAny(Map, CancelableProgressable, MonitoredTask). 208 * @return dstPath take file's last edit log seq num as the name 209 */ 210 static Path getCompletedRecoveredEditsFilePath(Path srcPath, long maximumEditWALSeqNum) { 211 String fileName = formatRecoveredEditsFileName(maximumEditWALSeqNum); 212 return new Path(srcPath.getParent(), fileName); 213 } 214 215 static String formatRecoveredEditsFileName(final long seqid) { 216 return String.format("%019d", seqid); 217 } 218 219 /** 220 * @param regionDir This regions directory in the filesystem. 221 * @return The directory that holds recovered edits files for the region <code>regionDir</code> 222 */ 223 public static Path getRegionDirRecoveredEditsDir(final Path regionDir) { 224 return new Path(regionDir, HConstants.RECOVERED_EDITS_DIR); 225 } 226 227 /** 228 * Check whether there is recovered.edits in the region dir 229 * @param conf conf 230 * @param regionInfo the region to check 231 * @return true if recovered.edits exist in the region dir 232 */ 233 public static boolean hasRecoveredEdits(final Configuration conf, final RegionInfo regionInfo) 234 throws IOException { 235 // No recovered.edits for non default replica regions 236 if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { 237 return false; 238 } 239 // Only default replica region can reach here, so we can use regioninfo 240 // directly without converting it to default replica's regioninfo. 241 Path regionWALDir = 242 CommonFSUtils.getWALRegionDir(conf, regionInfo.getTable(), regionInfo.getEncodedName()); 243 Path regionDir = FSUtils.getRegionDirFromRootDir(CommonFSUtils.getRootDir(conf), regionInfo); 244 Path wrongRegionWALDir = 245 CommonFSUtils.getWrongWALRegionDir(conf, regionInfo.getTable(), regionInfo.getEncodedName()); 246 FileSystem walFs = CommonFSUtils.getWALFileSystem(conf); 247 FileSystem rootFs = CommonFSUtils.getRootDirFileSystem(conf); 248 NavigableSet<Path> files = getSplitEditFilesSorted(walFs, regionWALDir); 249 if (!files.isEmpty()) { 250 return true; 251 } 252 files = getSplitEditFilesSorted(rootFs, regionDir); 253 if (!files.isEmpty()) { 254 return true; 255 } 256 files = getSplitEditFilesSorted(walFs, wrongRegionWALDir); 257 return !files.isEmpty(); 258 } 259 260 /** 261 * This method will check 3 places for finding the max sequence id file. One is the expected 262 * place, another is the old place under the region directory, and the last one is the wrong one 263 * we introduced in HBASE-20734. See HBASE-22617 for more details. 264 * <p/> 265 * Notice that, you should always call this method instead of 266 * {@link #getMaxRegionSequenceId(FileSystem, Path)} until 4.0.0 release. 267 * @deprecated Only for compatibility, will be removed in 4.0.0. 268 */ 269 @Deprecated 270 public static long getMaxRegionSequenceId(Configuration conf, RegionInfo region, 271 IOExceptionSupplier<FileSystem> rootFsSupplier, IOExceptionSupplier<FileSystem> walFsSupplier) 272 throws IOException { 273 FileSystem rootFs = rootFsSupplier.get(); 274 FileSystem walFs = walFsSupplier.get(); 275 Path regionWALDir = 276 CommonFSUtils.getWALRegionDir(conf, region.getTable(), region.getEncodedName()); 277 // This is the old place where we store max sequence id file 278 Path regionDir = FSUtils.getRegionDirFromRootDir(CommonFSUtils.getRootDir(conf), region); 279 // This is for HBASE-20734, where we use a wrong directory, see HBASE-22617 for more details. 280 Path wrongRegionWALDir = 281 CommonFSUtils.getWrongWALRegionDir(conf, region.getTable(), region.getEncodedName()); 282 long maxSeqId = getMaxRegionSequenceId(walFs, regionWALDir); 283 maxSeqId = Math.max(maxSeqId, getMaxRegionSequenceId(rootFs, regionDir)); 284 maxSeqId = Math.max(maxSeqId, getMaxRegionSequenceId(walFs, wrongRegionWALDir)); 285 return maxSeqId; 286 } 287 288 /** 289 * Returns sorted set of edit files made by splitter, excluding files with '.temp' suffix. 290 * @param walFS WAL FileSystem used to retrieving split edits files. 291 * @param regionDir WAL region dir to look for recovered edits files under. 292 * @return Files in passed <code>regionDir</code> as a sorted set. 293 */ 294 public static NavigableSet<Path> getSplitEditFilesSorted(final FileSystem walFS, 295 final Path regionDir) throws IOException { 296 NavigableSet<Path> filesSorted = new TreeSet<>(); 297 Path editsdir = getRegionDirRecoveredEditsDir(regionDir); 298 if (!walFS.exists(editsdir)) { 299 return filesSorted; 300 } 301 FileStatus[] files = CommonFSUtils.listStatus(walFS, editsdir, new PathFilter() { 302 @Override 303 public boolean accept(Path p) { 304 boolean result = false; 305 try { 306 // Return files and only files that match the editfile names pattern. 307 // There can be other files in this directory other than edit files. 308 // In particular, on error, we'll move aside the bad edit file giving 309 // it a timestamp suffix. See moveAsideBadEditsFile. 310 Matcher m = EDITFILES_NAME_PATTERN.matcher(p.getName()); 311 result = walFS.isFile(p) && m.matches(); 312 // Skip the file whose name ends with RECOVERED_LOG_TMPFILE_SUFFIX, 313 // because it means splitwal thread is writting this file. 314 if (p.getName().endsWith(RECOVERED_LOG_TMPFILE_SUFFIX)) { 315 result = false; 316 } 317 // Skip SeqId Files 318 if (isSequenceIdFile(p)) { 319 result = false; 320 } 321 } catch (IOException e) { 322 LOG.warn("Failed isFile check on {}", p, e); 323 } 324 return result; 325 } 326 }); 327 if (ArrayUtils.isNotEmpty(files)) { 328 Arrays.asList(files).forEach(status -> filesSorted.add(status.getPath())); 329 } 330 return filesSorted; 331 } 332 333 /** 334 * Move aside a bad edits file. 335 * @param fs the file system used to rename bad edits file. 336 * @param edits Edits file to move aside. 337 * @return The name of the moved aside file. 338 */ 339 public static Path moveAsideBadEditsFile(final FileSystem fs, final Path edits) 340 throws IOException { 341 Path moveAsideName = 342 new Path(edits.getParent(), edits.getName() + "." + EnvironmentEdgeManager.currentTime()); 343 if (!fs.rename(edits, moveAsideName)) { 344 LOG.warn("Rename failed from {} to {}", edits, moveAsideName); 345 } 346 return moveAsideName; 347 } 348 349 /** 350 * Is the given file a region open sequence id file. 351 */ 352 public static boolean isSequenceIdFile(final Path file) { 353 return file.getName().endsWith(SEQUENCE_ID_FILE_SUFFIX) 354 || file.getName().endsWith(OLD_SEQUENCE_ID_FILE_SUFFIX); 355 } 356 357 private static FileStatus[] getSequenceIdFiles(FileSystem walFS, Path regionDir) 358 throws IOException { 359 // TODO: Why are we using a method in here as part of our normal region open where 360 // there is no splitting involved? Fix. St.Ack 01/20/2017. 361 Path editsDir = getRegionDirRecoveredEditsDir(regionDir); 362 try { 363 FileStatus[] files = walFS.listStatus(editsDir, WALSplitUtil::isSequenceIdFile); 364 return files != null ? files : new FileStatus[0]; 365 } catch (FileNotFoundException e) { 366 return new FileStatus[0]; 367 } 368 } 369 370 private static long getMaxSequenceId(FileStatus[] files) { 371 long maxSeqId = -1L; 372 for (FileStatus file : files) { 373 String fileName = file.getPath().getName(); 374 try { 375 maxSeqId = Math.max(maxSeqId, Long 376 .parseLong(fileName.substring(0, fileName.length() - SEQUENCE_ID_FILE_SUFFIX_LENGTH))); 377 } catch (NumberFormatException ex) { 378 LOG.warn("Invalid SeqId File Name={}", fileName); 379 } 380 } 381 return maxSeqId; 382 } 383 384 /** 385 * Get the max sequence id which is stored in the region directory. -1 if none. 386 */ 387 public static long getMaxRegionSequenceId(FileSystem walFS, Path regionDir) throws IOException { 388 return getMaxSequenceId(getSequenceIdFiles(walFS, regionDir)); 389 } 390 391 /** 392 * Create a file with name as region's max sequence id 393 */ 394 public static void writeRegionSequenceIdFile(FileSystem walFS, Path regionDir, long newMaxSeqId) 395 throws IOException { 396 FileStatus[] files = getSequenceIdFiles(walFS, regionDir); 397 long maxSeqId = getMaxSequenceId(files); 398 if (maxSeqId > newMaxSeqId) { 399 throw new IOException("The new max sequence id " + newMaxSeqId 400 + " is less than the old max sequence id " + maxSeqId); 401 } 402 // write a new seqId file 403 Path newSeqIdFile = 404 new Path(getRegionDirRecoveredEditsDir(regionDir), newMaxSeqId + SEQUENCE_ID_FILE_SUFFIX); 405 if (newMaxSeqId != maxSeqId) { 406 try { 407 if (!walFS.createNewFile(newSeqIdFile) && !walFS.exists(newSeqIdFile)) { 408 throw new IOException("Failed to create SeqId file:" + newSeqIdFile); 409 } 410 LOG.debug("Wrote file={}, newMaxSeqId={}, maxSeqId={}", newSeqIdFile, newMaxSeqId, 411 maxSeqId); 412 } catch (FileAlreadyExistsException ignored) { 413 // latest hdfs throws this exception. it's all right if newSeqIdFile already exists 414 } 415 } 416 // remove old ones 417 for (FileStatus status : files) { 418 if (!newSeqIdFile.equals(status.getPath())) { 419 walFS.delete(status.getPath(), false); 420 } 421 } 422 } 423 424 /** A struct used by getMutationsFromWALEntry */ 425 public static class MutationReplay implements Comparable<MutationReplay> { 426 public MutationReplay(ClientProtos.MutationProto.MutationType type, Mutation mutation, 427 long nonceGroup, long nonce) { 428 this.type = type; 429 this.mutation = mutation; 430 if (this.mutation.getDurability() != Durability.SKIP_WAL) { 431 // using ASYNC_WAL for relay 432 this.mutation.setDurability(Durability.ASYNC_WAL); 433 } 434 this.nonceGroup = nonceGroup; 435 this.nonce = nonce; 436 } 437 438 private final ClientProtos.MutationProto.MutationType type; 439 @SuppressWarnings("checkstyle:VisibilityModifier") 440 public final Mutation mutation; 441 @SuppressWarnings("checkstyle:VisibilityModifier") 442 public final long nonceGroup; 443 @SuppressWarnings("checkstyle:VisibilityModifier") 444 public final long nonce; 445 446 @Override 447 public int compareTo(final MutationReplay d) { 448 return Row.COMPARATOR.compare(mutation, d.mutation); 449 } 450 451 @Override 452 public boolean equals(Object obj) { 453 if (!(obj instanceof MutationReplay)) { 454 return false; 455 } else { 456 return this.compareTo((MutationReplay) obj) == 0; 457 } 458 } 459 460 @Override 461 public int hashCode() { 462 return this.mutation.hashCode(); 463 } 464 465 public ClientProtos.MutationProto.MutationType getType() { 466 return type; 467 } 468 } 469 470 /** 471 * This function is used to construct mutations from a WALEntry. It also reconstructs WALKey & 472 * WALEdit from the passed in WALEntry 473 * @param logEntry pair of WALKey and WALEdit instance stores WALKey and WALEdit instances 474 * extracted from the passed in WALEntry. 475 * @return list of Pair<MutationType, Mutation> to be replayed 476 * @deprecated Since 3.0.0, will be removed in 4.0.0. 477 */ 478 @Deprecated 479 public static List<MutationReplay> getMutationsFromWALEntry(AdminProtos.WALEntry entry, 480 ExtendedCellScanner cells, Pair<WALKey, WALEdit> logEntry, Durability durability) 481 throws IOException { 482 if (entry == null) { 483 // return an empty array 484 return Collections.emptyList(); 485 } 486 487 long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) 488 ? entry.getKey().getOrigSequenceNumber() 489 : entry.getKey().getLogSequenceNumber(); 490 int count = entry.getAssociatedCellCount(); 491 List<MutationReplay> mutations = new ArrayList<>(); 492 ExtendedCell previousCell = null; 493 Mutation m = null; 494 WALKeyImpl key = null; 495 WALEdit val = null; 496 if (logEntry != null) { 497 val = new WALEdit(); 498 } 499 500 for (int i = 0; i < count; i++) { 501 // Throw index out of bounds if our cell count is off 502 if (!cells.advance()) { 503 throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i); 504 } 505 ExtendedCell cell = cells.current(); 506 if (val != null) { 507 WALEditInternalHelper.addExtendedCell(val, cell); 508 } 509 510 boolean isNewRowOrType = 511 previousCell == null || previousCell.getTypeByte() != cell.getTypeByte() 512 || !CellUtil.matchingRows(previousCell, cell); 513 if (isNewRowOrType) { 514 // Create new mutation 515 if (CellUtil.isDelete(cell)) { 516 m = new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); 517 // Deletes don't have nonces. 518 mutations.add(new MutationReplay(ClientProtos.MutationProto.MutationType.DELETE, m, 519 HConstants.NO_NONCE, HConstants.NO_NONCE)); 520 } else { 521 m = new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); 522 // Puts might come from increment or append, thus we need nonces. 523 long nonceGroup = 524 entry.getKey().hasNonceGroup() ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE; 525 long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE; 526 mutations.add( 527 new MutationReplay(ClientProtos.MutationProto.MutationType.PUT, m, nonceGroup, nonce)); 528 } 529 } 530 if (CellUtil.isDelete(cell)) { 531 ((Delete) m).add(cell); 532 } else { 533 ((Put) m).add(cell); 534 } 535 m.setDurability(durability); 536 previousCell = cell; 537 } 538 539 // reconstruct WALKey 540 if (logEntry != null) { 541 org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALKey walKeyProto = 542 entry.getKey(); 543 List<UUID> clusterIds = new ArrayList<>(walKeyProto.getClusterIdsCount()); 544 for (HBaseProtos.UUID uuid : entry.getKey().getClusterIdsList()) { 545 clusterIds.add(new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits())); 546 } 547 key = new WALKeyImpl(walKeyProto.getEncodedRegionName().toByteArray(), 548 TableName.valueOf(walKeyProto.getTableName().toByteArray()), replaySeqId, 549 walKeyProto.getWriteTime(), clusterIds, walKeyProto.getNonceGroup(), walKeyProto.getNonce(), 550 null); 551 logEntry.setFirst(key); 552 logEntry.setSecond(val); 553 } 554 555 return mutations; 556 } 557 558 /** 559 * Return path to recovered.hfiles directory of the region's column family: e.g. 560 * /hbase/some_table/2323432434/cf/recovered.hfiles/. This method also ensures existence of 561 * recovered.hfiles directory under the region's column family, creating it if necessary. 562 * @param rootFS the root file system 563 * @param conf configuration 564 * @param tableName the table name 565 * @param encodedRegionName the encoded region name 566 * @param familyName the column family name 567 * @return Path to recovered.hfiles directory of the region's column family. 568 */ 569 static Path tryCreateRecoveredHFilesDir(FileSystem rootFS, Configuration conf, 570 TableName tableName, String encodedRegionName, String familyName) throws IOException { 571 Path rootDir = CommonFSUtils.getRootDir(conf); 572 Path regionDir = FSUtils.getRegionDirFromTableDir(CommonFSUtils.getTableDir(rootDir, tableName), 573 encodedRegionName); 574 Path dir = getRecoveredHFilesDir(regionDir, familyName); 575 if (!rootFS.exists(dir) && !rootFS.mkdirs(dir)) { 576 LOG.warn("mkdir failed on {}, region {}, column family {}", dir, encodedRegionName, 577 familyName); 578 } 579 return dir; 580 } 581 582 /** 583 * @param regionDir This regions directory in the filesystem 584 * @param familyName The column family name 585 * @return The directory that holds recovered hfiles for the region's column family 586 */ 587 private static Path getRecoveredHFilesDir(final Path regionDir, String familyName) { 588 return new Path(new Path(regionDir, familyName), HConstants.RECOVERED_HFILES_DIR); 589 } 590 591 public static FileStatus[] getRecoveredHFiles(final FileSystem rootFS, final Path regionDir, 592 String familyName) throws IOException { 593 Path dir = getRecoveredHFilesDir(regionDir, familyName); 594 return CommonFSUtils.listStatus(rootFS, dir); 595 } 596}