001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import java.io.FileNotFoundException; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Arrays; 024import java.util.Collections; 025import java.util.List; 026import java.util.NavigableSet; 027import java.util.TreeSet; 028import java.util.UUID; 029import java.util.regex.Matcher; 030import java.util.regex.Pattern; 031import org.apache.commons.lang3.ArrayUtils; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FileAlreadyExistsException; 034import org.apache.hadoop.fs.FileStatus; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.fs.PathFilter; 038import org.apache.hadoop.hbase.Cell; 039import org.apache.hadoop.hbase.CellScanner; 040import org.apache.hadoop.hbase.CellUtil; 041import org.apache.hadoop.hbase.HConstants; 042import org.apache.hadoop.hbase.TableName; 043import org.apache.hadoop.hbase.client.Delete; 044import org.apache.hadoop.hbase.client.Durability; 045import org.apache.hadoop.hbase.client.Mutation; 046import org.apache.hadoop.hbase.client.Put; 047import org.apache.hadoop.hbase.client.RegionInfo; 048import org.apache.hadoop.hbase.regionserver.HRegion; 049import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; 050import org.apache.hadoop.hbase.util.Bytes; 051import org.apache.hadoop.hbase.util.CommonFSUtils; 052import org.apache.hadoop.hbase.util.ConcurrentMapUtils.IOExceptionSupplier; 053import org.apache.hadoop.hbase.util.FSUtils; 054import org.apache.hadoop.hbase.util.Pair; 055import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; 056import org.apache.yetus.audience.InterfaceAudience; 057import org.slf4j.Logger; 058import org.slf4j.LoggerFactory; 059 060import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 061 062import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 063import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 064import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 065 066/** 067 * This class provides static methods to support WAL splitting related works 068 */ 069@InterfaceAudience.Private 070public final class WALSplitUtil { 071 private static final Logger LOG = LoggerFactory.getLogger(WALSplitUtil.class); 072 073 private static final Pattern EDITFILES_NAME_PATTERN = Pattern.compile("-?[0-9]+"); 074 private static final String RECOVERED_LOG_TMPFILE_SUFFIX = ".temp"; 075 private static final String SEQUENCE_ID_FILE_SUFFIX = ".seqid"; 076 private static final String OLD_SEQUENCE_ID_FILE_SUFFIX = "_seqid"; 077 private static final int SEQUENCE_ID_FILE_SUFFIX_LENGTH = SEQUENCE_ID_FILE_SUFFIX.length(); 078 079 private WALSplitUtil() { 080 } 081 082 /** 083 * Completes the work done by splitLogFile by archiving logs 084 * <p> 085 * It is invoked by SplitLogManager once it knows that one of the SplitLogWorkers have completed 086 * the splitLogFile() part. If the master crashes then this function might get called multiple 087 * times. 088 * <p> 089 * @param logfile 090 * @param conf 091 * @throws IOException 092 */ 093 public static void finishSplitLogFile(String logfile, Configuration conf) throws IOException { 094 Path walDir = CommonFSUtils.getWALRootDir(conf); 095 Path oldLogDir = new Path(walDir, HConstants.HREGION_OLDLOGDIR_NAME); 096 Path walPath; 097 if (CommonFSUtils.isStartingWithPath(walDir, logfile)) { 098 walPath = new Path(logfile); 099 } else { 100 walPath = new Path(walDir, logfile); 101 } 102 finishSplitLogFile(walDir, oldLogDir, walPath, conf); 103 } 104 105 static void finishSplitLogFile(Path walDir, Path oldWALDir, Path walPath, 106 Configuration conf) throws IOException { 107 List<Path> processedLogs = new ArrayList<>(); 108 List<Path> corruptedLogs = new ArrayList<>(); 109 FileSystem walFS = walDir.getFileSystem(conf); 110 if (ZKSplitLog.isCorrupted(walDir, walPath.getName(), walFS)) { 111 corruptedLogs.add(walPath); 112 } else { 113 processedLogs.add(walPath); 114 } 115 archiveWALs(corruptedLogs, processedLogs, oldWALDir, walFS, conf); 116 Path stagingDir = ZKSplitLog.getSplitLogDir(walDir, walPath.getName()); 117 walFS.delete(stagingDir, true); 118 } 119 120 /** 121 * Moves processed logs to a oldLogDir after successful processing Moves corrupted logs (any log 122 * that couldn't be successfully parsed to corruptDir (.corrupt) for later investigation 123 */ 124 private static void archiveWALs(final List<Path> corruptedWALs, final List<Path> processedWALs, 125 final Path oldWALDir, final FileSystem walFS, final Configuration conf) throws IOException { 126 final Path corruptDir = 127 new Path(CommonFSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME); 128 if (conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir") != null) { 129 LOG.warn("hbase.regionserver.hlog.splitlog.corrupt.dir is deprecated. Default to {}", 130 corruptDir); 131 } 132 if (!walFS.mkdirs(corruptDir)) { 133 LOG.info("Unable to mkdir {}", corruptDir); 134 } 135 walFS.mkdirs(oldWALDir); 136 137 // this method can get restarted or called multiple times for archiving 138 // the same log files. 139 for (Path corruptedWAL : corruptedWALs) { 140 Path p = new Path(corruptDir, corruptedWAL.getName()); 141 if (walFS.exists(corruptedWAL)) { 142 if (!walFS.rename(corruptedWAL, p)) { 143 LOG.warn("Unable to move corrupted log {} to {}", corruptedWAL, p); 144 } else { 145 LOG.warn("Moved corrupted log {} to {}", corruptedWAL, p); 146 } 147 } 148 } 149 150 for (Path p : processedWALs) { 151 Path newPath = AbstractFSWAL.getWALArchivePath(oldWALDir, p); 152 if (walFS.exists(p)) { 153 if (!CommonFSUtils.renameAndSetModifyTime(walFS, p, newPath)) { 154 LOG.warn("Unable to move {} to {}", p, newPath); 155 } else { 156 LOG.info("Archived processed log {} to {}", p, newPath); 157 } 158 } 159 } 160 } 161 162 /** 163 * Path to a file under RECOVERED_EDITS_DIR directory of the region found in <code>logEntry</code> 164 * named for the sequenceid in the passed <code>logEntry</code>: e.g. 165 * /hbase/some_table/2323432434/recovered.edits/2332. This method also ensures existence of 166 * RECOVERED_EDITS_DIR under the region creating it if necessary. 167 * @param tableName the table name 168 * @param encodedRegionName the encoded region name 169 * @param seqId the sequence id which used to generate file name 170 * @param fileNameBeingSplit the file being split currently. Used to generate tmp file name. 171 * @param tmpDirName of the directory used to sideline old recovered edits file 172 * @param conf configuration 173 * @return Path to file into which to dump split log edits. 174 * @throws IOException 175 */ 176 @SuppressWarnings("deprecation") 177 @VisibleForTesting 178 static Path getRegionSplitEditsPath(TableName tableName, byte[] encodedRegionName, long seqId, 179 String fileNameBeingSplit, String tmpDirName, Configuration conf) throws IOException { 180 FileSystem walFS = CommonFSUtils.getWALFileSystem(conf); 181 Path tableDir = CommonFSUtils.getWALTableDir(conf, tableName); 182 String encodedRegionNameStr = Bytes.toString(encodedRegionName); 183 Path regionDir = HRegion.getRegionDir(tableDir, encodedRegionNameStr); 184 Path dir = getRegionDirRecoveredEditsDir(regionDir); 185 186 if (walFS.exists(dir) && walFS.isFile(dir)) { 187 Path tmp = new Path(tmpDirName); 188 if (!walFS.exists(tmp)) { 189 walFS.mkdirs(tmp); 190 } 191 tmp = new Path(tmp, HConstants.RECOVERED_EDITS_DIR + "_" + encodedRegionNameStr); 192 LOG.warn("Found existing old file: {}. It could be some " 193 + "leftover of an old installation. It should be a folder instead. " 194 + "So moving it to {}", 195 dir, tmp); 196 if (!walFS.rename(dir, tmp)) { 197 LOG.warn("Failed to sideline old file {}", dir); 198 } 199 } 200 201 if (!walFS.exists(dir) && !walFS.mkdirs(dir)) { 202 LOG.warn("mkdir failed on {}", dir); 203 } 204 // Append fileBeingSplit to prevent name conflict since we may have duplicate wal entries now. 205 // Append file name ends with RECOVERED_LOG_TMPFILE_SUFFIX to ensure 206 // region's replayRecoveredEdits will not delete it 207 String fileName = formatRecoveredEditsFileName(seqId); 208 fileName = getTmpRecoveredEditsFileName(fileName + "-" + fileNameBeingSplit); 209 return new Path(dir, fileName); 210 } 211 212 private static String getTmpRecoveredEditsFileName(String fileName) { 213 return fileName + RECOVERED_LOG_TMPFILE_SUFFIX; 214 } 215 216 /** 217 * Get the completed recovered edits file path, renaming it to be by last edit in the file from 218 * its first edit. Then we could use the name to skip recovered edits when doing 219 * {@link HRegion#replayRecoveredEditsIfAny}. 220 * @return dstPath take file's last edit log seq num as the name 221 */ 222 static Path getCompletedRecoveredEditsFilePath(Path srcPath, long maximumEditWALSeqNum) { 223 String fileName = formatRecoveredEditsFileName(maximumEditWALSeqNum); 224 return new Path(srcPath.getParent(), fileName); 225 } 226 227 @VisibleForTesting 228 static String formatRecoveredEditsFileName(final long seqid) { 229 return String.format("%019d", seqid); 230 } 231 232 /** 233 * @param regionDir This regions directory in the filesystem. 234 * @return The directory that holds recovered edits files for the region <code>regionDir</code> 235 */ 236 public static Path getRegionDirRecoveredEditsDir(final Path regionDir) { 237 return new Path(regionDir, HConstants.RECOVERED_EDITS_DIR); 238 } 239 240 /** 241 * Check whether there is recovered.edits in the region dir 242 * @param conf conf 243 * @param regionInfo the region to check 244 * @return true if recovered.edits exist in the region dir 245 */ 246 public static boolean hasRecoveredEdits(final Configuration conf, final RegionInfo regionInfo) 247 throws IOException { 248 // No recovered.edits for non default replica regions 249 if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { 250 return false; 251 } 252 // Only default replica region can reach here, so we can use regioninfo 253 // directly without converting it to default replica's regioninfo. 254 Path regionWALDir = 255 CommonFSUtils.getWALRegionDir(conf, regionInfo.getTable(), regionInfo.getEncodedName()); 256 Path regionDir = FSUtils.getRegionDirFromRootDir(CommonFSUtils.getRootDir(conf), regionInfo); 257 Path wrongRegionWALDir = 258 CommonFSUtils.getWrongWALRegionDir(conf, regionInfo.getTable(), regionInfo.getEncodedName()); 259 FileSystem walFs = CommonFSUtils.getWALFileSystem(conf); 260 FileSystem rootFs = CommonFSUtils.getRootDirFileSystem(conf); 261 NavigableSet<Path> files = getSplitEditFilesSorted(walFs, regionWALDir); 262 if (!files.isEmpty()) { 263 return true; 264 } 265 files = getSplitEditFilesSorted(rootFs, regionDir); 266 if (!files.isEmpty()) { 267 return true; 268 } 269 files = getSplitEditFilesSorted(walFs, wrongRegionWALDir); 270 return !files.isEmpty(); 271 } 272 273 /** 274 * This method will check 3 places for finding the max sequence id file. One is the expected 275 * place, another is the old place under the region directory, and the last one is the wrong one 276 * we introduced in HBASE-20734. See HBASE-22617 for more details. 277 * <p/> 278 * Notice that, you should always call this method instead of 279 * {@link #getMaxRegionSequenceId(FileSystem, Path)} until 4.0.0 release. 280 * @deprecated Only for compatibility, will be removed in 4.0.0. 281 */ 282 @Deprecated 283 public static long getMaxRegionSequenceId(Configuration conf, RegionInfo region, 284 IOExceptionSupplier<FileSystem> rootFsSupplier, IOExceptionSupplier<FileSystem> walFsSupplier) 285 throws IOException { 286 FileSystem rootFs = rootFsSupplier.get(); 287 FileSystem walFs = walFsSupplier.get(); 288 Path regionWALDir = 289 CommonFSUtils.getWALRegionDir(conf, region.getTable(), region.getEncodedName()); 290 // This is the old place where we store max sequence id file 291 Path regionDir = FSUtils.getRegionDirFromRootDir(CommonFSUtils.getRootDir(conf), region); 292 // This is for HBASE-20734, where we use a wrong directory, see HBASE-22617 for more details. 293 Path wrongRegionWALDir = 294 CommonFSUtils.getWrongWALRegionDir(conf, region.getTable(), region.getEncodedName()); 295 long maxSeqId = getMaxRegionSequenceId(walFs, regionWALDir); 296 maxSeqId = Math.max(maxSeqId, getMaxRegionSequenceId(rootFs, regionDir)); 297 maxSeqId = Math.max(maxSeqId, getMaxRegionSequenceId(walFs, wrongRegionWALDir)); 298 return maxSeqId; 299 } 300 301 /** 302 * Returns sorted set of edit files made by splitter, excluding files with '.temp' suffix. 303 * @param walFS WAL FileSystem used to retrieving split edits files. 304 * @param regionDir WAL region dir to look for recovered edits files under. 305 * @return Files in passed <code>regionDir</code> as a sorted set. 306 * @throws IOException 307 */ 308 public static NavigableSet<Path> getSplitEditFilesSorted(final FileSystem walFS, 309 final Path regionDir) throws IOException { 310 NavigableSet<Path> filesSorted = new TreeSet<>(); 311 Path editsdir = getRegionDirRecoveredEditsDir(regionDir); 312 if (!walFS.exists(editsdir)) { 313 return filesSorted; 314 } 315 FileStatus[] files = CommonFSUtils.listStatus(walFS, editsdir, new PathFilter() { 316 @Override 317 public boolean accept(Path p) { 318 boolean result = false; 319 try { 320 // Return files and only files that match the editfile names pattern. 321 // There can be other files in this directory other than edit files. 322 // In particular, on error, we'll move aside the bad edit file giving 323 // it a timestamp suffix. See moveAsideBadEditsFile. 324 Matcher m = EDITFILES_NAME_PATTERN.matcher(p.getName()); 325 result = walFS.isFile(p) && m.matches(); 326 // Skip the file whose name ends with RECOVERED_LOG_TMPFILE_SUFFIX, 327 // because it means splitwal thread is writting this file. 328 if (p.getName().endsWith(RECOVERED_LOG_TMPFILE_SUFFIX)) { 329 result = false; 330 } 331 // Skip SeqId Files 332 if (isSequenceIdFile(p)) { 333 result = false; 334 } 335 } catch (IOException e) { 336 LOG.warn("Failed isFile check on {}", p, e); 337 } 338 return result; 339 } 340 }); 341 if (ArrayUtils.isNotEmpty(files)) { 342 Arrays.asList(files).forEach(status -> filesSorted.add(status.getPath())); 343 } 344 return filesSorted; 345 } 346 347 /** 348 * Move aside a bad edits file. 349 * @param fs the file system used to rename bad edits file. 350 * @param edits Edits file to move aside. 351 * @return The name of the moved aside file. 352 * @throws IOException 353 */ 354 public static Path moveAsideBadEditsFile(final FileSystem fs, final Path edits) 355 throws IOException { 356 Path moveAsideName = 357 new Path(edits.getParent(), edits.getName() + "." + System.currentTimeMillis()); 358 if (!fs.rename(edits, moveAsideName)) { 359 LOG.warn("Rename failed from {} to {}", edits, moveAsideName); 360 } 361 return moveAsideName; 362 } 363 364 /** 365 * Is the given file a region open sequence id file. 366 */ 367 @VisibleForTesting 368 public static boolean isSequenceIdFile(final Path file) { 369 return file.getName().endsWith(SEQUENCE_ID_FILE_SUFFIX) 370 || file.getName().endsWith(OLD_SEQUENCE_ID_FILE_SUFFIX); 371 } 372 373 private static FileStatus[] getSequenceIdFiles(FileSystem walFS, Path regionDir) 374 throws IOException { 375 // TODO: Why are we using a method in here as part of our normal region open where 376 // there is no splitting involved? Fix. St.Ack 01/20/2017. 377 Path editsDir = getRegionDirRecoveredEditsDir(regionDir); 378 try { 379 FileStatus[] files = walFS.listStatus(editsDir, WALSplitUtil::isSequenceIdFile); 380 return files != null ? files : new FileStatus[0]; 381 } catch (FileNotFoundException e) { 382 return new FileStatus[0]; 383 } 384 } 385 386 private static long getMaxSequenceId(FileStatus[] files) { 387 long maxSeqId = -1L; 388 for (FileStatus file : files) { 389 String fileName = file.getPath().getName(); 390 try { 391 maxSeqId = Math.max(maxSeqId, Long 392 .parseLong(fileName.substring(0, fileName.length() - SEQUENCE_ID_FILE_SUFFIX_LENGTH))); 393 } catch (NumberFormatException ex) { 394 LOG.warn("Invalid SeqId File Name={}", fileName); 395 } 396 } 397 return maxSeqId; 398 } 399 400 /** 401 * Get the max sequence id which is stored in the region directory. -1 if none. 402 */ 403 public static long getMaxRegionSequenceId(FileSystem walFS, Path regionDir) throws IOException { 404 return getMaxSequenceId(getSequenceIdFiles(walFS, regionDir)); 405 } 406 407 /** 408 * Create a file with name as region's max sequence id 409 */ 410 public static void writeRegionSequenceIdFile(FileSystem walFS, Path regionDir, long newMaxSeqId) 411 throws IOException { 412 FileStatus[] files = getSequenceIdFiles(walFS, regionDir); 413 long maxSeqId = getMaxSequenceId(files); 414 if (maxSeqId > newMaxSeqId) { 415 throw new IOException("The new max sequence id " + newMaxSeqId 416 + " is less than the old max sequence id " + maxSeqId); 417 } 418 // write a new seqId file 419 Path newSeqIdFile = 420 new Path(getRegionDirRecoveredEditsDir(regionDir), newMaxSeqId + SEQUENCE_ID_FILE_SUFFIX); 421 if (newMaxSeqId != maxSeqId) { 422 try { 423 if (!walFS.createNewFile(newSeqIdFile) && !walFS.exists(newSeqIdFile)) { 424 throw new IOException("Failed to create SeqId file:" + newSeqIdFile); 425 } 426 LOG.debug("Wrote file={}, newMaxSeqId={}, maxSeqId={}", newSeqIdFile, newMaxSeqId, 427 maxSeqId); 428 } catch (FileAlreadyExistsException ignored) { 429 // latest hdfs throws this exception. it's all right if newSeqIdFile already exists 430 } 431 } 432 // remove old ones 433 for (FileStatus status : files) { 434 if (!newSeqIdFile.equals(status.getPath())) { 435 walFS.delete(status.getPath(), false); 436 } 437 } 438 } 439 440 /** A struct used by getMutationsFromWALEntry */ 441 public static class MutationReplay implements Comparable<MutationReplay> { 442 public MutationReplay(ClientProtos.MutationProto.MutationType type, Mutation mutation, 443 long nonceGroup, long nonce) { 444 this.type = type; 445 this.mutation = mutation; 446 if (this.mutation.getDurability() != Durability.SKIP_WAL) { 447 // using ASYNC_WAL for relay 448 this.mutation.setDurability(Durability.ASYNC_WAL); 449 } 450 this.nonceGroup = nonceGroup; 451 this.nonce = nonce; 452 } 453 454 private final ClientProtos.MutationProto.MutationType type; 455 public final Mutation mutation; 456 public final long nonceGroup; 457 public final long nonce; 458 459 @Override 460 public int compareTo(final MutationReplay d) { 461 return this.mutation.compareTo(d.mutation); 462 } 463 464 @Override 465 public boolean equals(Object obj) { 466 if (!(obj instanceof MutationReplay)) { 467 return false; 468 } else { 469 return this.compareTo((MutationReplay) obj) == 0; 470 } 471 } 472 473 @Override 474 public int hashCode() { 475 return this.mutation.hashCode(); 476 } 477 478 public ClientProtos.MutationProto.MutationType getType() { 479 return type; 480 } 481 } 482 483 /** 484 * This function is used to construct mutations from a WALEntry. It also reconstructs WALKey & 485 * WALEdit from the passed in WALEntry 486 * @param entry 487 * @param cells 488 * @param logEntry pair of WALKey and WALEdit instance stores WALKey and WALEdit instances 489 * extracted from the passed in WALEntry. 490 * @return list of Pair<MutationType, Mutation> to be replayed 491 * @throws IOException 492 */ 493 public static List<MutationReplay> getMutationsFromWALEntry(AdminProtos.WALEntry entry, 494 CellScanner cells, Pair<WALKey, WALEdit> logEntry, Durability durability) throws IOException { 495 if (entry == null) { 496 // return an empty array 497 return Collections.emptyList(); 498 } 499 500 long replaySeqId = 501 (entry.getKey().hasOrigSequenceNumber()) ? entry.getKey().getOrigSequenceNumber() 502 : entry.getKey().getLogSequenceNumber(); 503 int count = entry.getAssociatedCellCount(); 504 List<MutationReplay> mutations = new ArrayList<>(); 505 Cell previousCell = null; 506 Mutation m = null; 507 WALKeyImpl key = null; 508 WALEdit val = null; 509 if (logEntry != null) { 510 val = new WALEdit(); 511 } 512 513 for (int i = 0; i < count; i++) { 514 // Throw index out of bounds if our cell count is off 515 if (!cells.advance()) { 516 throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i); 517 } 518 Cell cell = cells.current(); 519 if (val != null) val.add(cell); 520 521 boolean isNewRowOrType = 522 previousCell == null || previousCell.getTypeByte() != cell.getTypeByte() 523 || !CellUtil.matchingRows(previousCell, cell); 524 if (isNewRowOrType) { 525 // Create new mutation 526 if (CellUtil.isDelete(cell)) { 527 m = new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); 528 // Deletes don't have nonces. 529 mutations.add(new MutationReplay(ClientProtos.MutationProto.MutationType.DELETE, m, 530 HConstants.NO_NONCE, HConstants.NO_NONCE)); 531 } else { 532 m = new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); 533 // Puts might come from increment or append, thus we need nonces. 534 long nonceGroup = 535 entry.getKey().hasNonceGroup() ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE; 536 long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE; 537 mutations.add( 538 new MutationReplay(ClientProtos.MutationProto.MutationType.PUT, m, nonceGroup, nonce)); 539 } 540 } 541 if (CellUtil.isDelete(cell)) { 542 ((Delete) m).add(cell); 543 } else { 544 ((Put) m).add(cell); 545 } 546 m.setDurability(durability); 547 previousCell = cell; 548 } 549 550 // reconstruct WALKey 551 if (logEntry != null) { 552 org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALKey walKeyProto = 553 entry.getKey(); 554 List<UUID> clusterIds = new ArrayList<>(walKeyProto.getClusterIdsCount()); 555 for (HBaseProtos.UUID uuid : entry.getKey().getClusterIdsList()) { 556 clusterIds.add(new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits())); 557 } 558 key = new WALKeyImpl(walKeyProto.getEncodedRegionName().toByteArray(), 559 TableName.valueOf(walKeyProto.getTableName().toByteArray()), replaySeqId, 560 walKeyProto.getWriteTime(), clusterIds, walKeyProto.getNonceGroup(), 561 walKeyProto.getNonce(), null); 562 logEntry.setFirst(key); 563 logEntry.setSecond(val); 564 } 565 566 return mutations; 567 } 568 569 /** 570 * Return path to recovered.hfiles directory of the region's column family: e.g. 571 * /hbase/some_table/2323432434/cf/recovered.hfiles/. This method also ensures existence of 572 * recovered.hfiles directory under the region's column family, creating it if necessary. 573 * @param rootFS the root file system 574 * @param conf configuration 575 * @param tableName the table name 576 * @param encodedRegionName the encoded region name 577 * @param familyName the column family name 578 * @param seqId the sequence id which used to generate file name 579 * @param fileNameBeingSplit the file being split currently. Used to generate tmp file name 580 * @return Path to recovered.hfiles directory of the region's column family. 581 */ 582 static Path tryCreateRecoveredHFilesDir(FileSystem rootFS, Configuration conf, 583 TableName tableName, String encodedRegionName, String familyName) throws IOException { 584 Path rootDir = CommonFSUtils.getRootDir(conf); 585 Path regionDir = FSUtils.getRegionDirFromTableDir(CommonFSUtils.getTableDir(rootDir, tableName), 586 encodedRegionName); 587 Path dir = getRecoveredHFilesDir(regionDir, familyName); 588 if (!rootFS.exists(dir) && !rootFS.mkdirs(dir)) { 589 LOG.warn("mkdir failed on {}, region {}, column family {}", dir, encodedRegionName, 590 familyName); 591 } 592 return dir; 593 } 594 595 /** 596 * @param regionDir This regions directory in the filesystem 597 * @param familyName The column family name 598 * @return The directory that holds recovered hfiles for the region's column family 599 */ 600 private static Path getRecoveredHFilesDir(final Path regionDir, String familyName) { 601 return new Path(new Path(regionDir, familyName), HConstants.RECOVERED_HFILES_DIR); 602 } 603 604 public static FileStatus[] getRecoveredHFiles(final FileSystem rootFS, 605 final Path regionDir, String familyName) throws IOException { 606 Path dir = getRecoveredHFilesDir(regionDir, familyName); 607 return CommonFSUtils.listStatus(rootFS, dir); 608 } 609}