001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import java.io.FileNotFoundException; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Arrays; 024import java.util.Collections; 025import java.util.List; 026import java.util.NavigableSet; 027import java.util.TreeSet; 028import java.util.UUID; 029import java.util.regex.Matcher; 030import java.util.regex.Pattern; 031import org.apache.commons.lang3.ArrayUtils; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FileAlreadyExistsException; 034import org.apache.hadoop.fs.FileStatus; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.fs.PathFilter; 038import org.apache.hadoop.hbase.CellUtil; 039import org.apache.hadoop.hbase.ExtendedCell; 040import org.apache.hadoop.hbase.ExtendedCellScanner; 041import org.apache.hadoop.hbase.HConstants; 042import org.apache.hadoop.hbase.TableName; 043import org.apache.hadoop.hbase.client.Delete; 044import org.apache.hadoop.hbase.client.Durability; 045import org.apache.hadoop.hbase.client.Mutation; 046import org.apache.hadoop.hbase.client.Put; 047import org.apache.hadoop.hbase.client.RegionInfo; 048import org.apache.hadoop.hbase.client.Row; 049import org.apache.hadoop.hbase.regionserver.HRegion; 050import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; 051import org.apache.hadoop.hbase.util.Bytes; 052import org.apache.hadoop.hbase.util.CommonFSUtils; 053import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 054import org.apache.hadoop.hbase.util.FSUtils; 055import org.apache.hadoop.hbase.util.IOExceptionSupplier; 056import org.apache.hadoop.hbase.util.Pair; 057import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; 058import org.apache.yetus.audience.InterfaceAudience; 059import org.slf4j.Logger; 060import org.slf4j.LoggerFactory; 061 062import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 063import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 064import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 065 066/** 067 * This class provides static methods to support WAL splitting related works 068 */ 069@InterfaceAudience.Private 070public final class WALSplitUtil { 071 private static final Logger LOG = LoggerFactory.getLogger(WALSplitUtil.class); 072 073 private static final Pattern EDITFILES_NAME_PATTERN = Pattern.compile("-?[0-9]+"); 074 private static final String RECOVERED_LOG_TMPFILE_SUFFIX = ".temp"; 075 private static final String SEQUENCE_ID_FILE_SUFFIX = ".seqid"; 076 private static final String OLD_SEQUENCE_ID_FILE_SUFFIX = "_seqid"; 077 private static final int SEQUENCE_ID_FILE_SUFFIX_LENGTH = SEQUENCE_ID_FILE_SUFFIX.length(); 078 079 private WALSplitUtil() { 080 } 081 082 /** 083 * Completes the work done by splitLogFile by archiving logs 084 * <p> 085 * It is invoked by SplitLogManager once it knows that one of the SplitLogWorkers have completed 086 * the splitLogFile() part. If the master crashes then this function might get called multiple 087 * times. 088 * <p> 089 */ 090 public static void finishSplitLogFile(String logfile, Configuration conf) throws IOException { 091 Path walDir = CommonFSUtils.getWALRootDir(conf); 092 Path oldLogDir = new Path(walDir, HConstants.HREGION_OLDLOGDIR_NAME); 093 Path walPath; 094 if (CommonFSUtils.isStartingWithPath(walDir, logfile)) { 095 walPath = new Path(logfile); 096 } else { 097 walPath = new Path(walDir, logfile); 098 } 099 FileSystem walFS = walDir.getFileSystem(conf); 100 boolean corrupt = ZKSplitLog.isCorrupted(walDir, walPath.getName(), walFS); 101 archive(walPath, corrupt, oldLogDir, walFS, conf); 102 Path stagingDir = ZKSplitLog.getSplitLogDir(walDir, walPath.getName()); 103 walFS.delete(stagingDir, true); 104 } 105 106 /** 107 * Moves processed logs to a oldLogDir after successful processing Moves corrupted logs (any log 108 * that couldn't be successfully parsed to corruptDir (.corrupt) for later investigation 109 */ 110 static void archive(final Path wal, final boolean corrupt, final Path oldWALDir, 111 final FileSystem walFS, final Configuration conf) throws IOException { 112 Path dir; 113 Path target; 114 if (corrupt) { 115 dir = new Path(CommonFSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME); 116 if (conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir") != null) { 117 LOG.warn("hbase.regionserver.hlog.splitlog.corrupt.dir is deprecated. Default to {}", dir); 118 } 119 target = new Path(dir, wal.getName()); 120 } else { 121 dir = oldWALDir; 122 target = AbstractFSWAL.getWALArchivePath(oldWALDir, wal); 123 } 124 mkdir(walFS, dir); 125 moveWAL(walFS, wal, target); 126 } 127 128 private static void mkdir(FileSystem fs, Path dir) throws IOException { 129 if (!fs.mkdirs(dir)) { 130 LOG.warn("Failed mkdir {}", dir); 131 } 132 } 133 134 /** 135 * Move WAL. Used to move processed WALs to archive or bad WALs to corrupt WAL dir. WAL may have 136 * already been moved; makes allowance. 137 */ 138 public static void moveWAL(FileSystem fs, Path p, Path targetDir) throws IOException { 139 if (fs.exists(p)) { 140 if (!CommonFSUtils.renameAndSetModifyTime(fs, p, targetDir)) { 141 LOG.warn("Failed move of {} to {}", p, targetDir); 142 } else { 143 LOG.info("Moved {} to {}", p, targetDir); 144 } 145 } 146 } 147 148 /** 149 * Path to a file under RECOVERED_EDITS_DIR directory of the region found in <code>logEntry</code> 150 * named for the sequenceid in the passed <code>logEntry</code>: e.g. 151 * /hbase/some_table/2323432434/recovered.edits/2332. This method also ensures existence of 152 * RECOVERED_EDITS_DIR under the region creating it if necessary. And also set storage policy for 153 * RECOVERED_EDITS_DIR if WAL_STORAGE_POLICY is configured. 154 * @param tableName the table name 155 * @param encodedRegionName the encoded region name 156 * @param seqId the sequence id which used to generate file name 157 * @param fileNameBeingSplit the file being split currently. Used to generate tmp file name. 158 * @param tmpDirName of the directory used to sideline old recovered edits file 159 * @param conf configuration 160 * @param workerNameComponent the worker name component for the file name 161 * @return Path to file into which to dump split log edits. 162 */ 163 @SuppressWarnings("deprecation") 164 static Path getRegionSplitEditsPath(TableName tableName, byte[] encodedRegionName, long seqId, 165 String fileNameBeingSplit, String tmpDirName, Configuration conf, String workerNameComponent) 166 throws IOException { 167 FileSystem walFS = CommonFSUtils.getWALFileSystem(conf); 168 Path tableDir = CommonFSUtils.getWALTableDir(conf, tableName); 169 String encodedRegionNameStr = Bytes.toString(encodedRegionName); 170 Path regionDir = HRegion.getRegionDir(tableDir, encodedRegionNameStr); 171 Path dir = getRegionDirRecoveredEditsDir(regionDir); 172 173 if (walFS.exists(dir) && walFS.isFile(dir)) { 174 Path tmp = new Path(tmpDirName); 175 if (!walFS.exists(tmp)) { 176 walFS.mkdirs(tmp); 177 } 178 tmp = new Path(tmp, HConstants.RECOVERED_EDITS_DIR + "_" + encodedRegionNameStr); 179 LOG.warn("Found existing old file: {}. It could be some " 180 + "leftover of an old installation. It should be a folder instead. " + "So moving it to {}", 181 dir, tmp); 182 if (!walFS.rename(dir, tmp)) { 183 LOG.warn("Failed to sideline old file {}", dir); 184 } 185 } 186 187 if (!walFS.exists(dir) && !walFS.mkdirs(dir)) { 188 LOG.warn("mkdir failed on {}", dir); 189 } else { 190 String storagePolicy = 191 conf.get(HConstants.WAL_STORAGE_POLICY, HConstants.DEFAULT_WAL_STORAGE_POLICY); 192 CommonFSUtils.setStoragePolicy(walFS, dir, storagePolicy); 193 } 194 // Append fileBeingSplit to prevent name conflict since we may have duplicate wal entries now. 195 // Append file name ends with RECOVERED_LOG_TMPFILE_SUFFIX to ensure 196 // region's replayRecoveredEdits will not delete it 197 String fileName = formatRecoveredEditsFileName(seqId); 198 fileName = 199 getTmpRecoveredEditsFileName(fileName + "-" + fileNameBeingSplit + "-" + workerNameComponent); 200 return new Path(dir, fileName); 201 } 202 203 private static String getTmpRecoveredEditsFileName(String fileName) { 204 return fileName + RECOVERED_LOG_TMPFILE_SUFFIX; 205 } 206 207 /** 208 * Get the completed recovered edits file path, renaming it to be by last edit in the file from 209 * its first edit. Then we could use the name to skip recovered edits when doing 210 * HRegion#replayRecoveredEditsIfAny(Map, CancelableProgressable, MonitoredTask). 211 * @return dstPath take file's last edit log seq num as the name 212 */ 213 static Path getCompletedRecoveredEditsFilePath(Path srcPath, long maximumEditWALSeqNum) { 214 String fileName = formatRecoveredEditsFileName(maximumEditWALSeqNum); 215 return new Path(srcPath.getParent(), fileName); 216 } 217 218 static String formatRecoveredEditsFileName(final long seqid) { 219 return String.format("%019d", seqid); 220 } 221 222 /** 223 * @param regionDir This regions directory in the filesystem. 224 * @return The directory that holds recovered edits files for the region <code>regionDir</code> 225 */ 226 public static Path getRegionDirRecoveredEditsDir(final Path regionDir) { 227 return new Path(regionDir, HConstants.RECOVERED_EDITS_DIR); 228 } 229 230 /** 231 * Check whether there is recovered.edits in the region dir 232 * @param conf conf 233 * @param regionInfo the region to check 234 * @return true if recovered.edits exist in the region dir 235 */ 236 public static boolean hasRecoveredEdits(final Configuration conf, final RegionInfo regionInfo) 237 throws IOException { 238 // No recovered.edits for non default replica regions 239 if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { 240 return false; 241 } 242 // Only default replica region can reach here, so we can use regioninfo 243 // directly without converting it to default replica's regioninfo. 244 Path regionWALDir = 245 CommonFSUtils.getWALRegionDir(conf, regionInfo.getTable(), regionInfo.getEncodedName()); 246 Path regionDir = FSUtils.getRegionDirFromRootDir(CommonFSUtils.getRootDir(conf), regionInfo); 247 Path wrongRegionWALDir = 248 CommonFSUtils.getWrongWALRegionDir(conf, regionInfo.getTable(), regionInfo.getEncodedName()); 249 FileSystem walFs = CommonFSUtils.getWALFileSystem(conf); 250 FileSystem rootFs = CommonFSUtils.getRootDirFileSystem(conf); 251 NavigableSet<Path> files = getSplitEditFilesSorted(walFs, regionWALDir); 252 if (!files.isEmpty()) { 253 return true; 254 } 255 files = getSplitEditFilesSorted(rootFs, regionDir); 256 if (!files.isEmpty()) { 257 return true; 258 } 259 files = getSplitEditFilesSorted(walFs, wrongRegionWALDir); 260 return !files.isEmpty(); 261 } 262 263 /** 264 * This method will check 3 places for finding the max sequence id file. One is the expected 265 * place, another is the old place under the region directory, and the last one is the wrong one 266 * we introduced in HBASE-20734. See HBASE-22617 for more details. 267 * <p/> 268 * Notice that, you should always call this method instead of 269 * {@link #getMaxRegionSequenceId(FileSystem, Path)} until 4.0.0 release. 270 * @deprecated Only for compatibility, will be removed in 4.0.0. 271 */ 272 @Deprecated 273 public static long getMaxRegionSequenceId(Configuration conf, RegionInfo region, 274 IOExceptionSupplier<FileSystem> rootFsSupplier, IOExceptionSupplier<FileSystem> walFsSupplier) 275 throws IOException { 276 FileSystem rootFs = rootFsSupplier.get(); 277 FileSystem walFs = walFsSupplier.get(); 278 Path regionWALDir = 279 CommonFSUtils.getWALRegionDir(conf, region.getTable(), region.getEncodedName()); 280 // This is the old place where we store max sequence id file 281 Path regionDir = FSUtils.getRegionDirFromRootDir(CommonFSUtils.getRootDir(conf), region); 282 // This is for HBASE-20734, where we use a wrong directory, see HBASE-22617 for more details. 283 Path wrongRegionWALDir = 284 CommonFSUtils.getWrongWALRegionDir(conf, region.getTable(), region.getEncodedName()); 285 long maxSeqId = getMaxRegionSequenceId(walFs, regionWALDir); 286 maxSeqId = Math.max(maxSeqId, getMaxRegionSequenceId(rootFs, regionDir)); 287 maxSeqId = Math.max(maxSeqId, getMaxRegionSequenceId(walFs, wrongRegionWALDir)); 288 return maxSeqId; 289 } 290 291 /** 292 * Returns sorted set of edit files made by splitter, excluding files with '.temp' suffix. 293 * @param walFS WAL FileSystem used to retrieving split edits files. 294 * @param regionDir WAL region dir to look for recovered edits files under. 295 * @return Files in passed <code>regionDir</code> as a sorted set. 296 */ 297 public static NavigableSet<Path> getSplitEditFilesSorted(final FileSystem walFS, 298 final Path regionDir) throws IOException { 299 NavigableSet<Path> filesSorted = new TreeSet<>(); 300 Path editsdir = getRegionDirRecoveredEditsDir(regionDir); 301 if (!walFS.exists(editsdir)) { 302 return filesSorted; 303 } 304 FileStatus[] files = CommonFSUtils.listStatus(walFS, editsdir, new PathFilter() { 305 @Override 306 public boolean accept(Path p) { 307 boolean result = false; 308 try { 309 // Return files and only files that match the editfile names pattern. 310 // There can be other files in this directory other than edit files. 311 // In particular, on error, we'll move aside the bad edit file giving 312 // it a timestamp suffix. See moveAsideBadEditsFile. 313 Matcher m = EDITFILES_NAME_PATTERN.matcher(p.getName()); 314 result = walFS.isFile(p) && m.matches(); 315 // Skip the file whose name ends with RECOVERED_LOG_TMPFILE_SUFFIX, 316 // because it means splitwal thread is writting this file. 317 if (p.getName().endsWith(RECOVERED_LOG_TMPFILE_SUFFIX)) { 318 result = false; 319 } 320 // Skip SeqId Files 321 if (isSequenceIdFile(p)) { 322 result = false; 323 } 324 } catch (IOException e) { 325 LOG.warn("Failed isFile check on {}", p, e); 326 } 327 return result; 328 } 329 }); 330 if (ArrayUtils.isNotEmpty(files)) { 331 Arrays.asList(files).forEach(status -> filesSorted.add(status.getPath())); 332 } 333 return filesSorted; 334 } 335 336 /** 337 * Move aside a bad edits file. 338 * @param fs the file system used to rename bad edits file. 339 * @param edits Edits file to move aside. 340 * @return The name of the moved aside file. 341 */ 342 public static Path moveAsideBadEditsFile(final FileSystem fs, final Path edits) 343 throws IOException { 344 Path moveAsideName = 345 new Path(edits.getParent(), edits.getName() + "." + EnvironmentEdgeManager.currentTime()); 346 if (!fs.rename(edits, moveAsideName)) { 347 LOG.warn("Rename failed from {} to {}", edits, moveAsideName); 348 } 349 return moveAsideName; 350 } 351 352 /** 353 * Is the given file a region open sequence id file. 354 */ 355 public static boolean isSequenceIdFile(final Path file) { 356 return file.getName().endsWith(SEQUENCE_ID_FILE_SUFFIX) 357 || file.getName().endsWith(OLD_SEQUENCE_ID_FILE_SUFFIX); 358 } 359 360 private static FileStatus[] getSequenceIdFiles(FileSystem walFS, Path regionDir) 361 throws IOException { 362 // TODO: Why are we using a method in here as part of our normal region open where 363 // there is no splitting involved? Fix. St.Ack 01/20/2017. 364 Path editsDir = getRegionDirRecoveredEditsDir(regionDir); 365 try { 366 FileStatus[] files = walFS.listStatus(editsDir, WALSplitUtil::isSequenceIdFile); 367 return files != null ? files : new FileStatus[0]; 368 } catch (FileNotFoundException e) { 369 return new FileStatus[0]; 370 } 371 } 372 373 private static long getMaxSequenceId(FileStatus[] files) { 374 long maxSeqId = -1L; 375 for (FileStatus file : files) { 376 String fileName = file.getPath().getName(); 377 try { 378 maxSeqId = Math.max(maxSeqId, Long 379 .parseLong(fileName.substring(0, fileName.length() - SEQUENCE_ID_FILE_SUFFIX_LENGTH))); 380 } catch (NumberFormatException ex) { 381 LOG.warn("Invalid SeqId File Name={}", fileName); 382 } 383 } 384 return maxSeqId; 385 } 386 387 /** 388 * Get the max sequence id which is stored in the region directory. -1 if none. 389 */ 390 public static long getMaxRegionSequenceId(FileSystem walFS, Path regionDir) throws IOException { 391 return getMaxSequenceId(getSequenceIdFiles(walFS, regionDir)); 392 } 393 394 /** 395 * Create a file with name as region's max sequence id 396 */ 397 public static void writeRegionSequenceIdFile(FileSystem walFS, Path regionDir, long newMaxSeqId) 398 throws IOException { 399 FileStatus[] files = getSequenceIdFiles(walFS, regionDir); 400 long maxSeqId = getMaxSequenceId(files); 401 if (maxSeqId > newMaxSeqId) { 402 throw new IOException("The new max sequence id " + newMaxSeqId 403 + " is less than the old max sequence id " + maxSeqId); 404 } 405 // write a new seqId file 406 Path newSeqIdFile = 407 new Path(getRegionDirRecoveredEditsDir(regionDir), newMaxSeqId + SEQUENCE_ID_FILE_SUFFIX); 408 if (newMaxSeqId != maxSeqId) { 409 try { 410 if (!walFS.createNewFile(newSeqIdFile) && !walFS.exists(newSeqIdFile)) { 411 throw new IOException("Failed to create SeqId file:" + newSeqIdFile); 412 } 413 LOG.debug("Wrote file={}, newMaxSeqId={}, maxSeqId={}", newSeqIdFile, newMaxSeqId, 414 maxSeqId); 415 } catch (FileAlreadyExistsException ignored) { 416 // latest hdfs throws this exception. it's all right if newSeqIdFile already exists 417 } 418 } 419 // remove old ones 420 for (FileStatus status : files) { 421 if (!newSeqIdFile.equals(status.getPath())) { 422 walFS.delete(status.getPath(), false); 423 } 424 } 425 } 426 427 /** A struct used by getMutationsFromWALEntry */ 428 public static class MutationReplay implements Comparable<MutationReplay> { 429 public MutationReplay(ClientProtos.MutationProto.MutationType type, Mutation mutation, 430 long nonceGroup, long nonce) { 431 this.type = type; 432 this.mutation = mutation; 433 if (this.mutation.getDurability() != Durability.SKIP_WAL) { 434 // using ASYNC_WAL for relay 435 this.mutation.setDurability(Durability.ASYNC_WAL); 436 } 437 this.nonceGroup = nonceGroup; 438 this.nonce = nonce; 439 } 440 441 private final ClientProtos.MutationProto.MutationType type; 442 @SuppressWarnings("checkstyle:VisibilityModifier") 443 public final Mutation mutation; 444 @SuppressWarnings("checkstyle:VisibilityModifier") 445 public final long nonceGroup; 446 @SuppressWarnings("checkstyle:VisibilityModifier") 447 public final long nonce; 448 449 @Override 450 public int compareTo(final MutationReplay d) { 451 return Row.COMPARATOR.compare(mutation, d.mutation); 452 } 453 454 @Override 455 public boolean equals(Object obj) { 456 if (!(obj instanceof MutationReplay)) { 457 return false; 458 } else { 459 return this.compareTo((MutationReplay) obj) == 0; 460 } 461 } 462 463 @Override 464 public int hashCode() { 465 return this.mutation.hashCode(); 466 } 467 468 public ClientProtos.MutationProto.MutationType getType() { 469 return type; 470 } 471 } 472 473 /** 474 * This function is used to construct mutations from a WALEntry. It also reconstructs WALKey & 475 * WALEdit from the passed in WALEntry 476 * @param logEntry pair of WALKey and WALEdit instance stores WALKey and WALEdit instances 477 * extracted from the passed in WALEntry. 478 * @return list of Pair<MutationType, Mutation> to be replayed 479 * @deprecated Since 3.0.0, will be removed in 4.0.0. 480 */ 481 @Deprecated 482 public static List<MutationReplay> getMutationsFromWALEntry(AdminProtos.WALEntry entry, 483 ExtendedCellScanner cells, Pair<WALKey, WALEdit> logEntry, Durability durability) 484 throws IOException { 485 if (entry == null) { 486 // return an empty array 487 return Collections.emptyList(); 488 } 489 490 long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) 491 ? entry.getKey().getOrigSequenceNumber() 492 : entry.getKey().getLogSequenceNumber(); 493 int count = entry.getAssociatedCellCount(); 494 List<MutationReplay> mutations = new ArrayList<>(); 495 ExtendedCell previousCell = null; 496 Mutation m = null; 497 WALKeyImpl key = null; 498 WALEdit val = null; 499 if (logEntry != null) { 500 val = new WALEdit(); 501 } 502 503 for (int i = 0; i < count; i++) { 504 // Throw index out of bounds if our cell count is off 505 if (!cells.advance()) { 506 throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i); 507 } 508 ExtendedCell cell = cells.current(); 509 if (val != null) { 510 WALEditInternalHelper.addExtendedCell(val, cell); 511 } 512 513 boolean isNewRowOrType = 514 previousCell == null || previousCell.getTypeByte() != cell.getTypeByte() 515 || !CellUtil.matchingRows(previousCell, cell); 516 if (isNewRowOrType) { 517 // Create new mutation 518 if (CellUtil.isDelete(cell)) { 519 m = new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); 520 // Deletes don't have nonces. 521 mutations.add(new MutationReplay(ClientProtos.MutationProto.MutationType.DELETE, m, 522 HConstants.NO_NONCE, HConstants.NO_NONCE)); 523 } else { 524 m = new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); 525 // Puts might come from increment or append, thus we need nonces. 526 long nonceGroup = 527 entry.getKey().hasNonceGroup() ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE; 528 long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE; 529 mutations.add( 530 new MutationReplay(ClientProtos.MutationProto.MutationType.PUT, m, nonceGroup, nonce)); 531 } 532 } 533 if (CellUtil.isDelete(cell)) { 534 ((Delete) m).add(cell); 535 } else { 536 ((Put) m).add(cell); 537 } 538 m.setDurability(durability); 539 previousCell = cell; 540 } 541 542 // reconstruct WALKey 543 if (logEntry != null) { 544 org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALKey walKeyProto = 545 entry.getKey(); 546 List<UUID> clusterIds = new ArrayList<>(walKeyProto.getClusterIdsCount()); 547 for (HBaseProtos.UUID uuid : entry.getKey().getClusterIdsList()) { 548 clusterIds.add(new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits())); 549 } 550 key = new WALKeyImpl(walKeyProto.getEncodedRegionName().toByteArray(), 551 TableName.valueOf(walKeyProto.getTableName().toByteArray()), replaySeqId, 552 walKeyProto.getWriteTime(), clusterIds, walKeyProto.getNonceGroup(), walKeyProto.getNonce(), 553 null); 554 logEntry.setFirst(key); 555 logEntry.setSecond(val); 556 } 557 558 return mutations; 559 } 560 561 /** 562 * Return path to recovered.hfiles directory of the region's column family: e.g. 563 * /hbase/some_table/2323432434/cf/recovered.hfiles/. This method also ensures existence of 564 * recovered.hfiles directory under the region's column family, creating it if necessary. 565 * @param rootFS the root file system 566 * @param conf configuration 567 * @param tableName the table name 568 * @param encodedRegionName the encoded region name 569 * @param familyName the column family name 570 * @return Path to recovered.hfiles directory of the region's column family. 571 */ 572 static Path tryCreateRecoveredHFilesDir(FileSystem rootFS, Configuration conf, 573 TableName tableName, String encodedRegionName, String familyName) throws IOException { 574 Path rootDir = CommonFSUtils.getRootDir(conf); 575 Path regionDir = FSUtils.getRegionDirFromTableDir(CommonFSUtils.getTableDir(rootDir, tableName), 576 encodedRegionName); 577 Path dir = getRecoveredHFilesDir(regionDir, familyName); 578 if (!rootFS.exists(dir) && !rootFS.mkdirs(dir)) { 579 LOG.warn("mkdir failed on {}, region {}, column family {}", dir, encodedRegionName, 580 familyName); 581 } 582 return dir; 583 } 584 585 /** 586 * @param regionDir This regions directory in the filesystem 587 * @param familyName The column family name 588 * @return The directory that holds recovered hfiles for the region's column family 589 */ 590 private static Path getRecoveredHFilesDir(final Path regionDir, String familyName) { 591 return new Path(new Path(regionDir, familyName), HConstants.RECOVERED_HFILES_DIR); 592 } 593 594 public static FileStatus[] getRecoveredHFiles(final FileSystem rootFS, final Path regionDir, 595 String familyName) throws IOException { 596 Path dir = getRecoveredHFilesDir(regionDir, familyName); 597 return CommonFSUtils.listStatus(rootFS, dir); 598 } 599}