001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Arrays;
024import java.util.Collections;
025import java.util.List;
026import java.util.NavigableSet;
027import java.util.TreeSet;
028import java.util.UUID;
029import java.util.regex.Matcher;
030import java.util.regex.Pattern;
031import org.apache.commons.lang3.ArrayUtils;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FileAlreadyExistsException;
034import org.apache.hadoop.fs.FileStatus;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.fs.PathFilter;
038import org.apache.hadoop.hbase.Cell;
039import org.apache.hadoop.hbase.CellScanner;
040import org.apache.hadoop.hbase.CellUtil;
041import org.apache.hadoop.hbase.HConstants;
042import org.apache.hadoop.hbase.TableName;
043import org.apache.hadoop.hbase.client.Delete;
044import org.apache.hadoop.hbase.client.Durability;
045import org.apache.hadoop.hbase.client.Mutation;
046import org.apache.hadoop.hbase.client.Put;
047import org.apache.hadoop.hbase.client.RegionInfo;
048import org.apache.hadoop.hbase.regionserver.HRegion;
049import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
050import org.apache.hadoop.hbase.util.Bytes;
051import org.apache.hadoop.hbase.util.CommonFSUtils;
052import org.apache.hadoop.hbase.util.ConcurrentMapUtils.IOExceptionSupplier;
053import org.apache.hadoop.hbase.util.FSUtils;
054import org.apache.hadoop.hbase.util.Pair;
055import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
056import org.apache.yetus.audience.InterfaceAudience;
057import org.slf4j.Logger;
058import org.slf4j.LoggerFactory;
059
060import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
061import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
062import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
063
064/**
065 * This class provides static methods to support WAL splitting related works
066 */
067@InterfaceAudience.Private
068public final class WALSplitUtil {
069  private static final Logger LOG = LoggerFactory.getLogger(WALSplitUtil.class);
070
071  private static final Pattern EDITFILES_NAME_PATTERN = Pattern.compile("-?[0-9]+");
072  private static final String RECOVERED_LOG_TMPFILE_SUFFIX = ".temp";
073  private static final String SEQUENCE_ID_FILE_SUFFIX = ".seqid";
074  private static final String OLD_SEQUENCE_ID_FILE_SUFFIX = "_seqid";
075  private static final int SEQUENCE_ID_FILE_SUFFIX_LENGTH = SEQUENCE_ID_FILE_SUFFIX.length();
076
077  private WALSplitUtil() {
078  }
079
080  /**
081   * Completes the work done by splitLogFile by archiving logs
082   * <p>
083   * It is invoked by SplitLogManager once it knows that one of the SplitLogWorkers have completed
084   * the splitLogFile() part. If the master crashes then this function might get called multiple
085   * times.
086   * <p>
087   */
088  public static void finishSplitLogFile(String logfile, Configuration conf) throws IOException {
089    Path walDir = CommonFSUtils.getWALRootDir(conf);
090    Path oldLogDir = new Path(walDir, HConstants.HREGION_OLDLOGDIR_NAME);
091    Path walPath;
092    if (CommonFSUtils.isStartingWithPath(walDir, logfile)) {
093      walPath = new Path(logfile);
094    } else {
095      walPath = new Path(walDir, logfile);
096    }
097    FileSystem walFS = walDir.getFileSystem(conf);
098    boolean corrupt = ZKSplitLog.isCorrupted(walDir, walPath.getName(), walFS);
099    archive(walPath, corrupt, oldLogDir, walFS, conf);
100    Path stagingDir = ZKSplitLog.getSplitLogDir(walDir, walPath.getName());
101    walFS.delete(stagingDir, true);
102  }
103
104  /**
105   * Moves processed logs to a oldLogDir after successful processing Moves corrupted logs (any log
106   * that couldn't be successfully parsed to corruptDir (.corrupt) for later investigation
107   */
108  static void archive(final Path wal, final boolean corrupt, final Path oldWALDir,
109      final FileSystem walFS, final Configuration conf) throws IOException {
110    Path dir;
111    Path target;
112    if (corrupt) {
113      dir = new Path(CommonFSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME);
114      if (conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir") != null) {
115        LOG.warn("hbase.regionserver.hlog.splitlog.corrupt.dir is deprecated. Default to {}", dir);
116      }
117      target = new Path(dir, wal.getName());
118    } else {
119      dir = oldWALDir;
120      target = AbstractFSWAL.getWALArchivePath(oldWALDir, wal);
121    }
122    mkdir(walFS, dir);
123    moveWAL(walFS, wal, target);
124  }
125
126  private static void mkdir(FileSystem fs, Path dir) throws IOException {
127    if (!fs.mkdirs(dir)) {
128      LOG.warn("Failed mkdir {}", dir);
129    }
130  }
131
132  /**
133   * Move WAL. Used to move processed WALs to archive or bad WALs to corrupt WAL dir.
134   * WAL may have already been moved; makes allowance.
135   */
136  public static void moveWAL(FileSystem fs, Path p, Path targetDir) throws IOException {
137    if (fs.exists(p)) {
138      if (!CommonFSUtils.renameAndSetModifyTime(fs, p, targetDir)) {
139        LOG.warn("Failed move of {} to {}", p, targetDir);
140      } else {
141        LOG.info("Moved {} to {}", p, targetDir);
142      }
143    }
144  }
145
146  /**
147   * Path to a file under RECOVERED_EDITS_DIR directory of the region found in <code>logEntry</code>
148   * named for the sequenceid in the passed <code>logEntry</code>: e.g.
149   * /hbase/some_table/2323432434/recovered.edits/2332. This method also ensures existence of
150   * RECOVERED_EDITS_DIR under the region creating it if necessary.
151   * @param tableName the table name
152   * @param encodedRegionName the encoded region name
153   * @param seqId the sequence id which used to generate file name
154   * @param fileNameBeingSplit the file being split currently. Used to generate tmp file name.
155   * @param tmpDirName of the directory used to sideline old recovered edits file
156   * @param conf configuration
157   * @return Path to file into which to dump split log edits.
158   */
159  @SuppressWarnings("deprecation")
160  static Path getRegionSplitEditsPath(TableName tableName, byte[] encodedRegionName, long seqId,
161      String fileNameBeingSplit, String tmpDirName, Configuration conf) throws IOException {
162    FileSystem walFS = CommonFSUtils.getWALFileSystem(conf);
163    Path tableDir = CommonFSUtils.getWALTableDir(conf, tableName);
164    String encodedRegionNameStr = Bytes.toString(encodedRegionName);
165    Path regionDir = HRegion.getRegionDir(tableDir, encodedRegionNameStr);
166    Path dir = getRegionDirRecoveredEditsDir(regionDir);
167
168    if (walFS.exists(dir) && walFS.isFile(dir)) {
169      Path tmp = new Path(tmpDirName);
170      if (!walFS.exists(tmp)) {
171        walFS.mkdirs(tmp);
172      }
173      tmp = new Path(tmp, HConstants.RECOVERED_EDITS_DIR + "_" + encodedRegionNameStr);
174      LOG.warn("Found existing old file: {}. It could be some "
175          + "leftover of an old installation. It should be a folder instead. "
176          + "So moving it to {}",
177        dir, tmp);
178      if (!walFS.rename(dir, tmp)) {
179        LOG.warn("Failed to sideline old file {}", dir);
180      }
181    }
182
183    if (!walFS.exists(dir) && !walFS.mkdirs(dir)) {
184      LOG.warn("mkdir failed on {}", dir);
185    }
186    // Append fileBeingSplit to prevent name conflict since we may have duplicate wal entries now.
187    // Append file name ends with RECOVERED_LOG_TMPFILE_SUFFIX to ensure
188    // region's replayRecoveredEdits will not delete it
189    String fileName = formatRecoveredEditsFileName(seqId);
190    fileName = getTmpRecoveredEditsFileName(fileName + "-" + fileNameBeingSplit);
191    return new Path(dir, fileName);
192  }
193
194  private static String getTmpRecoveredEditsFileName(String fileName) {
195    return fileName + RECOVERED_LOG_TMPFILE_SUFFIX;
196  }
197
198  /**
199   * Get the completed recovered edits file path, renaming it to be by last edit in the file from
200   * its first edit. Then we could use the name to skip recovered edits when doing
201   * HRegion#replayRecoveredEditsIfAny(Map, CancelableProgressable, MonitoredTask).
202   * @return dstPath take file's last edit log seq num as the name
203   */
204  static Path getCompletedRecoveredEditsFilePath(Path srcPath, long maximumEditWALSeqNum) {
205    String fileName = formatRecoveredEditsFileName(maximumEditWALSeqNum);
206    return new Path(srcPath.getParent(), fileName);
207  }
208
209  static String formatRecoveredEditsFileName(final long seqid) {
210    return String.format("%019d", seqid);
211  }
212
213  /**
214   * @param regionDir This regions directory in the filesystem.
215   * @return The directory that holds recovered edits files for the region <code>regionDir</code>
216   */
217  public static Path getRegionDirRecoveredEditsDir(final Path regionDir) {
218    return new Path(regionDir, HConstants.RECOVERED_EDITS_DIR);
219  }
220
221  /**
222   * Check whether there is recovered.edits in the region dir
223   * @param conf conf
224   * @param regionInfo the region to check
225   * @return true if recovered.edits exist in the region dir
226   */
227  public static boolean hasRecoveredEdits(final Configuration conf, final RegionInfo regionInfo)
228      throws IOException {
229    // No recovered.edits for non default replica regions
230    if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
231      return false;
232    }
233    // Only default replica region can reach here, so we can use regioninfo
234    // directly without converting it to default replica's regioninfo.
235    Path regionWALDir =
236      CommonFSUtils.getWALRegionDir(conf, regionInfo.getTable(), regionInfo.getEncodedName());
237    Path regionDir = FSUtils.getRegionDirFromRootDir(CommonFSUtils.getRootDir(conf), regionInfo);
238    Path wrongRegionWALDir =
239      CommonFSUtils.getWrongWALRegionDir(conf, regionInfo.getTable(), regionInfo.getEncodedName());
240    FileSystem walFs = CommonFSUtils.getWALFileSystem(conf);
241    FileSystem rootFs = CommonFSUtils.getRootDirFileSystem(conf);
242    NavigableSet<Path> files = getSplitEditFilesSorted(walFs, regionWALDir);
243    if (!files.isEmpty()) {
244      return true;
245    }
246    files = getSplitEditFilesSorted(rootFs, regionDir);
247    if (!files.isEmpty()) {
248      return true;
249    }
250    files = getSplitEditFilesSorted(walFs, wrongRegionWALDir);
251    return !files.isEmpty();
252  }
253
254  /**
255   * This method will check 3 places for finding the max sequence id file. One is the expected
256   * place, another is the old place under the region directory, and the last one is the wrong one
257   * we introduced in HBASE-20734. See HBASE-22617 for more details.
258   * <p/>
259   * Notice that, you should always call this method instead of
260   * {@link #getMaxRegionSequenceId(FileSystem, Path)} until 4.0.0 release.
261   * @deprecated Only for compatibility, will be removed in 4.0.0.
262   */
263  @Deprecated
264  public static long getMaxRegionSequenceId(Configuration conf, RegionInfo region,
265    IOExceptionSupplier<FileSystem> rootFsSupplier, IOExceptionSupplier<FileSystem> walFsSupplier)
266    throws IOException {
267    FileSystem rootFs = rootFsSupplier.get();
268    FileSystem walFs = walFsSupplier.get();
269    Path regionWALDir =
270      CommonFSUtils.getWALRegionDir(conf, region.getTable(), region.getEncodedName());
271    // This is the old place where we store max sequence id file
272    Path regionDir = FSUtils.getRegionDirFromRootDir(CommonFSUtils.getRootDir(conf), region);
273    // This is for HBASE-20734, where we use a wrong directory, see HBASE-22617 for more details.
274    Path wrongRegionWALDir =
275      CommonFSUtils.getWrongWALRegionDir(conf, region.getTable(), region.getEncodedName());
276    long maxSeqId = getMaxRegionSequenceId(walFs, regionWALDir);
277    maxSeqId = Math.max(maxSeqId, getMaxRegionSequenceId(rootFs, regionDir));
278    maxSeqId = Math.max(maxSeqId, getMaxRegionSequenceId(walFs, wrongRegionWALDir));
279    return maxSeqId;
280  }
281
282  /**
283   * Returns sorted set of edit files made by splitter, excluding files with '.temp' suffix.
284   * @param walFS WAL FileSystem used to retrieving split edits files.
285   * @param regionDir WAL region dir to look for recovered edits files under.
286   * @return Files in passed <code>regionDir</code> as a sorted set.
287   */
288  public static NavigableSet<Path> getSplitEditFilesSorted(final FileSystem walFS,
289      final Path regionDir) throws IOException {
290    NavigableSet<Path> filesSorted = new TreeSet<>();
291    Path editsdir = getRegionDirRecoveredEditsDir(regionDir);
292    if (!walFS.exists(editsdir)) {
293      return filesSorted;
294    }
295    FileStatus[] files = CommonFSUtils.listStatus(walFS, editsdir, new PathFilter() {
296      @Override
297      public boolean accept(Path p) {
298        boolean result = false;
299        try {
300          // Return files and only files that match the editfile names pattern.
301          // There can be other files in this directory other than edit files.
302          // In particular, on error, we'll move aside the bad edit file giving
303          // it a timestamp suffix. See moveAsideBadEditsFile.
304          Matcher m = EDITFILES_NAME_PATTERN.matcher(p.getName());
305          result = walFS.isFile(p) && m.matches();
306          // Skip the file whose name ends with RECOVERED_LOG_TMPFILE_SUFFIX,
307          // because it means splitwal thread is writting this file.
308          if (p.getName().endsWith(RECOVERED_LOG_TMPFILE_SUFFIX)) {
309            result = false;
310          }
311          // Skip SeqId Files
312          if (isSequenceIdFile(p)) {
313            result = false;
314          }
315        } catch (IOException e) {
316          LOG.warn("Failed isFile check on {}", p, e);
317        }
318        return result;
319      }
320    });
321    if (ArrayUtils.isNotEmpty(files)) {
322      Arrays.asList(files).forEach(status -> filesSorted.add(status.getPath()));
323    }
324    return filesSorted;
325  }
326
327  /**
328   * Move aside a bad edits file.
329   * @param fs the file system used to rename bad edits file.
330   * @param edits Edits file to move aside.
331   * @return The name of the moved aside file.
332   */
333  public static Path moveAsideBadEditsFile(final FileSystem fs, final Path edits)
334      throws IOException {
335    Path moveAsideName =
336        new Path(edits.getParent(), edits.getName() + "." + System.currentTimeMillis());
337    if (!fs.rename(edits, moveAsideName)) {
338      LOG.warn("Rename failed from {} to {}", edits, moveAsideName);
339    }
340    return moveAsideName;
341  }
342
343  /**
344   * Is the given file a region open sequence id file.
345   */
346  public static boolean isSequenceIdFile(final Path file) {
347    return file.getName().endsWith(SEQUENCE_ID_FILE_SUFFIX)
348        || file.getName().endsWith(OLD_SEQUENCE_ID_FILE_SUFFIX);
349  }
350
351  private static FileStatus[] getSequenceIdFiles(FileSystem walFS, Path regionDir)
352      throws IOException {
353    // TODO: Why are we using a method in here as part of our normal region open where
354    // there is no splitting involved? Fix. St.Ack 01/20/2017.
355    Path editsDir = getRegionDirRecoveredEditsDir(regionDir);
356    try {
357      FileStatus[] files = walFS.listStatus(editsDir, WALSplitUtil::isSequenceIdFile);
358      return files != null ? files : new FileStatus[0];
359    } catch (FileNotFoundException e) {
360      return new FileStatus[0];
361    }
362  }
363
364  private static long getMaxSequenceId(FileStatus[] files) {
365    long maxSeqId = -1L;
366    for (FileStatus file : files) {
367      String fileName = file.getPath().getName();
368      try {
369        maxSeqId = Math.max(maxSeqId, Long
370            .parseLong(fileName.substring(0, fileName.length() - SEQUENCE_ID_FILE_SUFFIX_LENGTH)));
371      } catch (NumberFormatException ex) {
372        LOG.warn("Invalid SeqId File Name={}", fileName);
373      }
374    }
375    return maxSeqId;
376  }
377
378  /**
379   * Get the max sequence id which is stored in the region directory. -1 if none.
380   */
381  public static long getMaxRegionSequenceId(FileSystem walFS, Path regionDir) throws IOException {
382    return getMaxSequenceId(getSequenceIdFiles(walFS, regionDir));
383  }
384
385  /**
386   * Create a file with name as region's max sequence id
387   */
388  public static void writeRegionSequenceIdFile(FileSystem walFS, Path regionDir, long newMaxSeqId)
389      throws IOException {
390    FileStatus[] files = getSequenceIdFiles(walFS, regionDir);
391    long maxSeqId = getMaxSequenceId(files);
392    if (maxSeqId > newMaxSeqId) {
393      throw new IOException("The new max sequence id " + newMaxSeqId
394          + " is less than the old max sequence id " + maxSeqId);
395    }
396    // write a new seqId file
397    Path newSeqIdFile =
398        new Path(getRegionDirRecoveredEditsDir(regionDir), newMaxSeqId + SEQUENCE_ID_FILE_SUFFIX);
399    if (newMaxSeqId != maxSeqId) {
400      try {
401        if (!walFS.createNewFile(newSeqIdFile) && !walFS.exists(newSeqIdFile)) {
402          throw new IOException("Failed to create SeqId file:" + newSeqIdFile);
403        }
404        LOG.debug("Wrote file={}, newMaxSeqId={}, maxSeqId={}", newSeqIdFile, newMaxSeqId,
405          maxSeqId);
406      } catch (FileAlreadyExistsException ignored) {
407        // latest hdfs throws this exception. it's all right if newSeqIdFile already exists
408      }
409    }
410    // remove old ones
411    for (FileStatus status : files) {
412      if (!newSeqIdFile.equals(status.getPath())) {
413        walFS.delete(status.getPath(), false);
414      }
415    }
416  }
417
418  /** A struct used by getMutationsFromWALEntry */
419  public static class MutationReplay implements Comparable<MutationReplay> {
420    public MutationReplay(ClientProtos.MutationProto.MutationType type, Mutation mutation,
421        long nonceGroup, long nonce) {
422      this.type = type;
423      this.mutation = mutation;
424      if (this.mutation.getDurability() != Durability.SKIP_WAL) {
425        // using ASYNC_WAL for relay
426        this.mutation.setDurability(Durability.ASYNC_WAL);
427      }
428      this.nonceGroup = nonceGroup;
429      this.nonce = nonce;
430    }
431
432    private final ClientProtos.MutationProto.MutationType type;
433    @SuppressWarnings("checkstyle:VisibilityModifier") public final Mutation mutation;
434    @SuppressWarnings("checkstyle:VisibilityModifier") public final long nonceGroup;
435    @SuppressWarnings("checkstyle:VisibilityModifier") public final long nonce;
436
437    @Override
438    public int compareTo(final MutationReplay d) {
439      return this.mutation.compareTo(d.mutation);
440    }
441
442    @Override
443    public boolean equals(Object obj) {
444      if (!(obj instanceof MutationReplay)) {
445        return false;
446      } else {
447        return this.compareTo((MutationReplay) obj) == 0;
448      }
449    }
450
451    @Override
452    public int hashCode() {
453      return this.mutation.hashCode();
454    }
455
456    public ClientProtos.MutationProto.MutationType getType() {
457      return type;
458    }
459  }
460
461  /**
462   * This function is used to construct mutations from a WALEntry. It also reconstructs WALKey &amp;
463   * WALEdit from the passed in WALEntry
464   * @param logEntry pair of WALKey and WALEdit instance stores WALKey and WALEdit instances
465   *          extracted from the passed in WALEntry.
466   * @return list of Pair&lt;MutationType, Mutation&gt; to be replayed
467   */
468  public static List<MutationReplay> getMutationsFromWALEntry(AdminProtos.WALEntry entry,
469      CellScanner cells, Pair<WALKey, WALEdit> logEntry, Durability durability) throws IOException {
470    if (entry == null) {
471      // return an empty array
472      return Collections.emptyList();
473    }
474
475    long replaySeqId =
476        (entry.getKey().hasOrigSequenceNumber()) ? entry.getKey().getOrigSequenceNumber()
477            : entry.getKey().getLogSequenceNumber();
478    int count = entry.getAssociatedCellCount();
479    List<MutationReplay> mutations = new ArrayList<>();
480    Cell previousCell = null;
481    Mutation m = null;
482    WALKeyImpl key = null;
483    WALEdit val = null;
484    if (logEntry != null) {
485      val = new WALEdit();
486    }
487
488    for (int i = 0; i < count; i++) {
489      // Throw index out of bounds if our cell count is off
490      if (!cells.advance()) {
491        throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
492      }
493      Cell cell = cells.current();
494      if (val != null) {
495        val.add(cell);
496      }
497
498      boolean isNewRowOrType =
499          previousCell == null || previousCell.getTypeByte() != cell.getTypeByte()
500              || !CellUtil.matchingRows(previousCell, cell);
501      if (isNewRowOrType) {
502        // Create new mutation
503        if (CellUtil.isDelete(cell)) {
504          m = new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
505          // Deletes don't have nonces.
506          mutations.add(new MutationReplay(ClientProtos.MutationProto.MutationType.DELETE, m,
507              HConstants.NO_NONCE, HConstants.NO_NONCE));
508        } else {
509          m = new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
510          // Puts might come from increment or append, thus we need nonces.
511          long nonceGroup =
512              entry.getKey().hasNonceGroup() ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
513          long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
514          mutations.add(
515            new MutationReplay(ClientProtos.MutationProto.MutationType.PUT, m, nonceGroup, nonce));
516        }
517      }
518      if (CellUtil.isDelete(cell)) {
519        ((Delete) m).add(cell);
520      } else {
521        ((Put) m).add(cell);
522      }
523      m.setDurability(durability);
524      previousCell = cell;
525    }
526
527    // reconstruct WALKey
528    if (logEntry != null) {
529      org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALKey walKeyProto =
530          entry.getKey();
531      List<UUID> clusterIds = new ArrayList<>(walKeyProto.getClusterIdsCount());
532      for (HBaseProtos.UUID uuid : entry.getKey().getClusterIdsList()) {
533        clusterIds.add(new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits()));
534      }
535      key = new WALKeyImpl(walKeyProto.getEncodedRegionName().toByteArray(),
536          TableName.valueOf(walKeyProto.getTableName().toByteArray()), replaySeqId,
537          walKeyProto.getWriteTime(), clusterIds, walKeyProto.getNonceGroup(),
538          walKeyProto.getNonce(), null);
539      logEntry.setFirst(key);
540      logEntry.setSecond(val);
541    }
542
543    return mutations;
544  }
545
546  /**
547   * Return path to recovered.hfiles directory of the region's column family: e.g.
548   * /hbase/some_table/2323432434/cf/recovered.hfiles/. This method also ensures existence of
549   * recovered.hfiles directory under the region's column family, creating it if necessary.
550   * @param rootFS the root file system
551   * @param conf configuration
552   * @param tableName the table name
553   * @param encodedRegionName the encoded region name
554   * @param familyName the column family name
555   * @return Path to recovered.hfiles directory of the region's column family.
556   */
557  static Path tryCreateRecoveredHFilesDir(FileSystem rootFS, Configuration conf,
558      TableName tableName, String encodedRegionName, String familyName) throws IOException {
559    Path rootDir = CommonFSUtils.getRootDir(conf);
560    Path regionDir = FSUtils.getRegionDirFromTableDir(CommonFSUtils.getTableDir(rootDir, tableName),
561      encodedRegionName);
562    Path dir = getRecoveredHFilesDir(regionDir, familyName);
563    if (!rootFS.exists(dir) && !rootFS.mkdirs(dir)) {
564      LOG.warn("mkdir failed on {}, region {}, column family {}", dir, encodedRegionName,
565        familyName);
566    }
567    return dir;
568  }
569
570  /**
571   * @param regionDir  This regions directory in the filesystem
572   * @param familyName The column family name
573   * @return The directory that holds recovered hfiles for the region's column family
574   */
575  private static Path getRecoveredHFilesDir(final Path regionDir, String familyName) {
576    return new Path(new Path(regionDir, familyName), HConstants.RECOVERED_HFILES_DIR);
577  }
578
579  public static FileStatus[] getRecoveredHFiles(final FileSystem rootFS,
580      final Path regionDir, String familyName) throws IOException {
581    Path dir = getRecoveredHFilesDir(regionDir, familyName);
582    return CommonFSUtils.listStatus(rootFS, dir);
583  }
584}