001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Arrays;
024import java.util.Collections;
025import java.util.List;
026import java.util.NavigableSet;
027import java.util.TreeSet;
028import java.util.UUID;
029import java.util.regex.Matcher;
030import java.util.regex.Pattern;
031import org.apache.commons.lang3.ArrayUtils;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FileAlreadyExistsException;
034import org.apache.hadoop.fs.FileStatus;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.fs.PathFilter;
038import org.apache.hadoop.hbase.CellUtil;
039import org.apache.hadoop.hbase.ExtendedCell;
040import org.apache.hadoop.hbase.ExtendedCellScanner;
041import org.apache.hadoop.hbase.HConstants;
042import org.apache.hadoop.hbase.TableName;
043import org.apache.hadoop.hbase.client.Delete;
044import org.apache.hadoop.hbase.client.Durability;
045import org.apache.hadoop.hbase.client.Mutation;
046import org.apache.hadoop.hbase.client.Put;
047import org.apache.hadoop.hbase.client.RegionInfo;
048import org.apache.hadoop.hbase.client.Row;
049import org.apache.hadoop.hbase.regionserver.HRegion;
050import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
051import org.apache.hadoop.hbase.util.Bytes;
052import org.apache.hadoop.hbase.util.CommonFSUtils;
053import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
054import org.apache.hadoop.hbase.util.FSUtils;
055import org.apache.hadoop.hbase.util.IOExceptionSupplier;
056import org.apache.hadoop.hbase.util.Pair;
057import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
058import org.apache.yetus.audience.InterfaceAudience;
059import org.slf4j.Logger;
060import org.slf4j.LoggerFactory;
061
062import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
063import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
064import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
065
066/**
067 * This class provides static methods to support WAL splitting related works
068 */
069@InterfaceAudience.Private
070public final class WALSplitUtil {
071  private static final Logger LOG = LoggerFactory.getLogger(WALSplitUtil.class);
072
073  private static final Pattern EDITFILES_NAME_PATTERN = Pattern.compile("-?[0-9]+");
074  private static final String RECOVERED_LOG_TMPFILE_SUFFIX = ".temp";
075  private static final String SEQUENCE_ID_FILE_SUFFIX = ".seqid";
076  private static final String OLD_SEQUENCE_ID_FILE_SUFFIX = "_seqid";
077  private static final int SEQUENCE_ID_FILE_SUFFIX_LENGTH = SEQUENCE_ID_FILE_SUFFIX.length();
078
079  private WALSplitUtil() {
080  }
081
082  /**
083   * Completes the work done by splitLogFile by archiving logs
084   * <p>
085   * It is invoked by SplitLogManager once it knows that one of the SplitLogWorkers have completed
086   * the splitLogFile() part. If the master crashes then this function might get called multiple
087   * times.
088   * <p>
089   */
090  public static void finishSplitLogFile(String logfile, Configuration conf) throws IOException {
091    Path walDir = CommonFSUtils.getWALRootDir(conf);
092    Path oldLogDir = new Path(walDir, HConstants.HREGION_OLDLOGDIR_NAME);
093    Path walPath;
094    if (CommonFSUtils.isStartingWithPath(walDir, logfile)) {
095      walPath = new Path(logfile);
096    } else {
097      walPath = new Path(walDir, logfile);
098    }
099    FileSystem walFS = walDir.getFileSystem(conf);
100    boolean corrupt = ZKSplitLog.isCorrupted(walDir, walPath.getName(), walFS);
101    archive(walPath, corrupt, oldLogDir, walFS, conf);
102    Path stagingDir = ZKSplitLog.getSplitLogDir(walDir, walPath.getName());
103    walFS.delete(stagingDir, true);
104  }
105
106  /**
107   * Moves processed logs to a oldLogDir after successful processing Moves corrupted logs (any log
108   * that couldn't be successfully parsed to corruptDir (.corrupt) for later investigation
109   */
110  static void archive(final Path wal, final boolean corrupt, final Path oldWALDir,
111    final FileSystem walFS, final Configuration conf) throws IOException {
112    Path dir;
113    Path target;
114    if (corrupt) {
115      dir = new Path(CommonFSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME);
116      if (conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir") != null) {
117        LOG.warn("hbase.regionserver.hlog.splitlog.corrupt.dir is deprecated. Default to {}", dir);
118      }
119      target = new Path(dir, wal.getName());
120    } else {
121      dir = oldWALDir;
122      target = AbstractFSWAL.getWALArchivePath(oldWALDir, wal);
123    }
124    mkdir(walFS, dir);
125    moveWAL(walFS, wal, target);
126  }
127
128  private static void mkdir(FileSystem fs, Path dir) throws IOException {
129    if (!fs.mkdirs(dir)) {
130      LOG.warn("Failed mkdir {}", dir);
131    }
132  }
133
134  /**
135   * Move WAL. Used to move processed WALs to archive or bad WALs to corrupt WAL dir. WAL may have
136   * already been moved; makes allowance.
137   */
138  public static void moveWAL(FileSystem fs, Path p, Path targetDir) throws IOException {
139    if (fs.exists(p)) {
140      if (!CommonFSUtils.renameAndSetModifyTime(fs, p, targetDir)) {
141        LOG.warn("Failed move of {} to {}", p, targetDir);
142      } else {
143        LOG.info("Moved {} to {}", p, targetDir);
144      }
145    }
146  }
147
148  /**
149   * Path to a file under RECOVERED_EDITS_DIR directory of the region found in <code>logEntry</code>
150   * named for the sequenceid in the passed <code>logEntry</code>: e.g.
151   * /hbase/some_table/2323432434/recovered.edits/2332. This method also ensures existence of
152   * RECOVERED_EDITS_DIR under the region creating it if necessary. And also set storage policy for
153   * RECOVERED_EDITS_DIR if WAL_STORAGE_POLICY is configured.
154   * @param tableName           the table name
155   * @param encodedRegionName   the encoded region name
156   * @param seqId               the sequence id which used to generate file name
157   * @param fileNameBeingSplit  the file being split currently. Used to generate tmp file name.
158   * @param tmpDirName          of the directory used to sideline old recovered edits file
159   * @param conf                configuration
160   * @param workerNameComponent the worker name component for the file name
161   * @return Path to file into which to dump split log edits.
162   */
163  @SuppressWarnings("deprecation")
164  static Path getRegionSplitEditsPath(TableName tableName, byte[] encodedRegionName, long seqId,
165    String fileNameBeingSplit, String tmpDirName, Configuration conf, String workerNameComponent)
166    throws IOException {
167    FileSystem walFS = CommonFSUtils.getWALFileSystem(conf);
168    Path tableDir = CommonFSUtils.getWALTableDir(conf, tableName);
169    String encodedRegionNameStr = Bytes.toString(encodedRegionName);
170    Path regionDir = HRegion.getRegionDir(tableDir, encodedRegionNameStr);
171    Path dir = getRegionDirRecoveredEditsDir(regionDir);
172
173    if (walFS.exists(dir) && walFS.isFile(dir)) {
174      Path tmp = new Path(tmpDirName);
175      if (!walFS.exists(tmp)) {
176        walFS.mkdirs(tmp);
177      }
178      tmp = new Path(tmp, HConstants.RECOVERED_EDITS_DIR + "_" + encodedRegionNameStr);
179      LOG.warn("Found existing old file: {}. It could be some "
180        + "leftover of an old installation. It should be a folder instead. " + "So moving it to {}",
181        dir, tmp);
182      if (!walFS.rename(dir, tmp)) {
183        LOG.warn("Failed to sideline old file {}", dir);
184      }
185    }
186
187    if (!walFS.exists(dir) && !walFS.mkdirs(dir)) {
188      LOG.warn("mkdir failed on {}", dir);
189    } else {
190      String storagePolicy =
191        conf.get(HConstants.WAL_STORAGE_POLICY, HConstants.DEFAULT_WAL_STORAGE_POLICY);
192      CommonFSUtils.setStoragePolicy(walFS, dir, storagePolicy);
193    }
194    // Append fileBeingSplit to prevent name conflict since we may have duplicate wal entries now.
195    // Append file name ends with RECOVERED_LOG_TMPFILE_SUFFIX to ensure
196    // region's replayRecoveredEdits will not delete it
197    String fileName = formatRecoveredEditsFileName(seqId);
198    fileName =
199      getTmpRecoveredEditsFileName(fileName + "-" + fileNameBeingSplit + "-" + workerNameComponent);
200    return new Path(dir, fileName);
201  }
202
203  private static String getTmpRecoveredEditsFileName(String fileName) {
204    return fileName + RECOVERED_LOG_TMPFILE_SUFFIX;
205  }
206
207  /**
208   * Get the completed recovered edits file path, renaming it to be by last edit in the file from
209   * its first edit. Then we could use the name to skip recovered edits when doing
210   * HRegion#replayRecoveredEditsIfAny(Map, CancelableProgressable, MonitoredTask).
211   * @return dstPath take file's last edit log seq num as the name
212   */
213  static Path getCompletedRecoveredEditsFilePath(Path srcPath, long maximumEditWALSeqNum) {
214    String fileName = formatRecoveredEditsFileName(maximumEditWALSeqNum);
215    return new Path(srcPath.getParent(), fileName);
216  }
217
218  static String formatRecoveredEditsFileName(final long seqid) {
219    return String.format("%019d", seqid);
220  }
221
222  /**
223   * @param regionDir This regions directory in the filesystem.
224   * @return The directory that holds recovered edits files for the region <code>regionDir</code>
225   */
226  public static Path getRegionDirRecoveredEditsDir(final Path regionDir) {
227    return new Path(regionDir, HConstants.RECOVERED_EDITS_DIR);
228  }
229
230  /**
231   * Check whether there is recovered.edits in the region dir
232   * @param conf       conf
233   * @param regionInfo the region to check
234   * @return true if recovered.edits exist in the region dir
235   */
236  public static boolean hasRecoveredEdits(final Configuration conf, final RegionInfo regionInfo)
237    throws IOException {
238    // No recovered.edits for non default replica regions
239    if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
240      return false;
241    }
242    // Only default replica region can reach here, so we can use regioninfo
243    // directly without converting it to default replica's regioninfo.
244    Path regionWALDir =
245      CommonFSUtils.getWALRegionDir(conf, regionInfo.getTable(), regionInfo.getEncodedName());
246    Path regionDir = FSUtils.getRegionDirFromRootDir(CommonFSUtils.getRootDir(conf), regionInfo);
247    Path wrongRegionWALDir =
248      CommonFSUtils.getWrongWALRegionDir(conf, regionInfo.getTable(), regionInfo.getEncodedName());
249    FileSystem walFs = CommonFSUtils.getWALFileSystem(conf);
250    FileSystem rootFs = CommonFSUtils.getRootDirFileSystem(conf);
251    NavigableSet<Path> files = getSplitEditFilesSorted(walFs, regionWALDir);
252    if (!files.isEmpty()) {
253      return true;
254    }
255    files = getSplitEditFilesSorted(rootFs, regionDir);
256    if (!files.isEmpty()) {
257      return true;
258    }
259    files = getSplitEditFilesSorted(walFs, wrongRegionWALDir);
260    return !files.isEmpty();
261  }
262
263  /**
264   * This method will check 3 places for finding the max sequence id file. One is the expected
265   * place, another is the old place under the region directory, and the last one is the wrong one
266   * we introduced in HBASE-20734. See HBASE-22617 for more details.
267   * <p/>
268   * Notice that, you should always call this method instead of
269   * {@link #getMaxRegionSequenceId(FileSystem, Path)} until 4.0.0 release.
270   * @deprecated Only for compatibility, will be removed in 4.0.0.
271   */
272  @Deprecated
273  public static long getMaxRegionSequenceId(Configuration conf, RegionInfo region,
274    IOExceptionSupplier<FileSystem> rootFsSupplier, IOExceptionSupplier<FileSystem> walFsSupplier)
275    throws IOException {
276    FileSystem rootFs = rootFsSupplier.get();
277    FileSystem walFs = walFsSupplier.get();
278    Path regionWALDir =
279      CommonFSUtils.getWALRegionDir(conf, region.getTable(), region.getEncodedName());
280    // This is the old place where we store max sequence id file
281    Path regionDir = FSUtils.getRegionDirFromRootDir(CommonFSUtils.getRootDir(conf), region);
282    // This is for HBASE-20734, where we use a wrong directory, see HBASE-22617 for more details.
283    Path wrongRegionWALDir =
284      CommonFSUtils.getWrongWALRegionDir(conf, region.getTable(), region.getEncodedName());
285    long maxSeqId = getMaxRegionSequenceId(walFs, regionWALDir);
286    maxSeqId = Math.max(maxSeqId, getMaxRegionSequenceId(rootFs, regionDir));
287    maxSeqId = Math.max(maxSeqId, getMaxRegionSequenceId(walFs, wrongRegionWALDir));
288    return maxSeqId;
289  }
290
291  /**
292   * Returns sorted set of edit files made by splitter, excluding files with '.temp' suffix.
293   * @param walFS     WAL FileSystem used to retrieving split edits files.
294   * @param regionDir WAL region dir to look for recovered edits files under.
295   * @return Files in passed <code>regionDir</code> as a sorted set.
296   */
297  public static NavigableSet<Path> getSplitEditFilesSorted(final FileSystem walFS,
298    final Path regionDir) throws IOException {
299    NavigableSet<Path> filesSorted = new TreeSet<>();
300    Path editsdir = getRegionDirRecoveredEditsDir(regionDir);
301    if (!walFS.exists(editsdir)) {
302      return filesSorted;
303    }
304    FileStatus[] files = CommonFSUtils.listStatus(walFS, editsdir, new PathFilter() {
305      @Override
306      public boolean accept(Path p) {
307        boolean result = false;
308        try {
309          // Return files and only files that match the editfile names pattern.
310          // There can be other files in this directory other than edit files.
311          // In particular, on error, we'll move aside the bad edit file giving
312          // it a timestamp suffix. See moveAsideBadEditsFile.
313          Matcher m = EDITFILES_NAME_PATTERN.matcher(p.getName());
314          result = walFS.isFile(p) && m.matches();
315          // Skip the file whose name ends with RECOVERED_LOG_TMPFILE_SUFFIX,
316          // because it means splitwal thread is writting this file.
317          if (p.getName().endsWith(RECOVERED_LOG_TMPFILE_SUFFIX)) {
318            result = false;
319          }
320          // Skip SeqId Files
321          if (isSequenceIdFile(p)) {
322            result = false;
323          }
324        } catch (IOException e) {
325          LOG.warn("Failed isFile check on {}", p, e);
326        }
327        return result;
328      }
329    });
330    if (ArrayUtils.isNotEmpty(files)) {
331      Arrays.asList(files).forEach(status -> filesSorted.add(status.getPath()));
332    }
333    return filesSorted;
334  }
335
336  /**
337   * Move aside a bad edits file.
338   * @param fs    the file system used to rename bad edits file.
339   * @param edits Edits file to move aside.
340   * @return The name of the moved aside file.
341   */
342  public static Path moveAsideBadEditsFile(final FileSystem fs, final Path edits)
343    throws IOException {
344    Path moveAsideName =
345      new Path(edits.getParent(), edits.getName() + "." + EnvironmentEdgeManager.currentTime());
346    if (!fs.rename(edits, moveAsideName)) {
347      LOG.warn("Rename failed from {} to {}", edits, moveAsideName);
348    }
349    return moveAsideName;
350  }
351
352  /**
353   * Is the given file a region open sequence id file.
354   */
355  public static boolean isSequenceIdFile(final Path file) {
356    return file.getName().endsWith(SEQUENCE_ID_FILE_SUFFIX)
357      || file.getName().endsWith(OLD_SEQUENCE_ID_FILE_SUFFIX);
358  }
359
360  private static FileStatus[] getSequenceIdFiles(FileSystem walFS, Path regionDir)
361    throws IOException {
362    // TODO: Why are we using a method in here as part of our normal region open where
363    // there is no splitting involved? Fix. St.Ack 01/20/2017.
364    Path editsDir = getRegionDirRecoveredEditsDir(regionDir);
365    try {
366      FileStatus[] files = walFS.listStatus(editsDir, WALSplitUtil::isSequenceIdFile);
367      return files != null ? files : new FileStatus[0];
368    } catch (FileNotFoundException e) {
369      return new FileStatus[0];
370    }
371  }
372
373  private static long getMaxSequenceId(FileStatus[] files) {
374    long maxSeqId = -1L;
375    for (FileStatus file : files) {
376      String fileName = file.getPath().getName();
377      try {
378        maxSeqId = Math.max(maxSeqId, Long
379          .parseLong(fileName.substring(0, fileName.length() - SEQUENCE_ID_FILE_SUFFIX_LENGTH)));
380      } catch (NumberFormatException ex) {
381        LOG.warn("Invalid SeqId File Name={}", fileName);
382      }
383    }
384    return maxSeqId;
385  }
386
387  /**
388   * Get the max sequence id which is stored in the region directory. -1 if none.
389   */
390  public static long getMaxRegionSequenceId(FileSystem walFS, Path regionDir) throws IOException {
391    return getMaxSequenceId(getSequenceIdFiles(walFS, regionDir));
392  }
393
394  /**
395   * Create a file with name as region's max sequence id
396   */
397  public static void writeRegionSequenceIdFile(FileSystem walFS, Path regionDir, long newMaxSeqId)
398    throws IOException {
399    FileStatus[] files = getSequenceIdFiles(walFS, regionDir);
400    long maxSeqId = getMaxSequenceId(files);
401    if (maxSeqId > newMaxSeqId) {
402      throw new IOException("The new max sequence id " + newMaxSeqId
403        + " is less than the old max sequence id " + maxSeqId);
404    }
405    // write a new seqId file
406    Path newSeqIdFile =
407      new Path(getRegionDirRecoveredEditsDir(regionDir), newMaxSeqId + SEQUENCE_ID_FILE_SUFFIX);
408    if (newMaxSeqId != maxSeqId) {
409      try {
410        if (!walFS.createNewFile(newSeqIdFile) && !walFS.exists(newSeqIdFile)) {
411          throw new IOException("Failed to create SeqId file:" + newSeqIdFile);
412        }
413        LOG.debug("Wrote file={}, newMaxSeqId={}, maxSeqId={}", newSeqIdFile, newMaxSeqId,
414          maxSeqId);
415      } catch (FileAlreadyExistsException ignored) {
416        // latest hdfs throws this exception. it's all right if newSeqIdFile already exists
417      }
418    }
419    // remove old ones
420    for (FileStatus status : files) {
421      if (!newSeqIdFile.equals(status.getPath())) {
422        walFS.delete(status.getPath(), false);
423      }
424    }
425  }
426
427  /** A struct used by getMutationsFromWALEntry */
428  public static class MutationReplay implements Comparable<MutationReplay> {
429    public MutationReplay(ClientProtos.MutationProto.MutationType type, Mutation mutation,
430      long nonceGroup, long nonce) {
431      this.type = type;
432      this.mutation = mutation;
433      if (this.mutation.getDurability() != Durability.SKIP_WAL) {
434        // using ASYNC_WAL for relay
435        this.mutation.setDurability(Durability.ASYNC_WAL);
436      }
437      this.nonceGroup = nonceGroup;
438      this.nonce = nonce;
439    }
440
441    private final ClientProtos.MutationProto.MutationType type;
442    @SuppressWarnings("checkstyle:VisibilityModifier")
443    public final Mutation mutation;
444    @SuppressWarnings("checkstyle:VisibilityModifier")
445    public final long nonceGroup;
446    @SuppressWarnings("checkstyle:VisibilityModifier")
447    public final long nonce;
448
449    @Override
450    public int compareTo(final MutationReplay d) {
451      return Row.COMPARATOR.compare(mutation, d.mutation);
452    }
453
454    @Override
455    public boolean equals(Object obj) {
456      if (!(obj instanceof MutationReplay)) {
457        return false;
458      } else {
459        return this.compareTo((MutationReplay) obj) == 0;
460      }
461    }
462
463    @Override
464    public int hashCode() {
465      return this.mutation.hashCode();
466    }
467
468    public ClientProtos.MutationProto.MutationType getType() {
469      return type;
470    }
471  }
472
473  /**
474   * This function is used to construct mutations from a WALEntry. It also reconstructs WALKey &amp;
475   * WALEdit from the passed in WALEntry
476   * @param logEntry pair of WALKey and WALEdit instance stores WALKey and WALEdit instances
477   *                 extracted from the passed in WALEntry.
478   * @return list of Pair&lt;MutationType, Mutation&gt; to be replayed
479   * @deprecated Since 3.0.0, will be removed in 4.0.0.
480   */
481  @Deprecated
482  public static List<MutationReplay> getMutationsFromWALEntry(AdminProtos.WALEntry entry,
483    ExtendedCellScanner cells, Pair<WALKey, WALEdit> logEntry, Durability durability)
484    throws IOException {
485    if (entry == null) {
486      // return an empty array
487      return Collections.emptyList();
488    }
489
490    long replaySeqId = (entry.getKey().hasOrigSequenceNumber())
491      ? entry.getKey().getOrigSequenceNumber()
492      : entry.getKey().getLogSequenceNumber();
493    int count = entry.getAssociatedCellCount();
494    List<MutationReplay> mutations = new ArrayList<>();
495    ExtendedCell previousCell = null;
496    Mutation m = null;
497    WALKeyImpl key = null;
498    WALEdit val = null;
499    if (logEntry != null) {
500      val = new WALEdit();
501    }
502
503    for (int i = 0; i < count; i++) {
504      // Throw index out of bounds if our cell count is off
505      if (!cells.advance()) {
506        throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
507      }
508      ExtendedCell cell = cells.current();
509      if (val != null) {
510        WALEditInternalHelper.addExtendedCell(val, cell);
511      }
512
513      boolean isNewRowOrType =
514        previousCell == null || previousCell.getTypeByte() != cell.getTypeByte()
515          || !CellUtil.matchingRows(previousCell, cell);
516      if (isNewRowOrType) {
517        // Create new mutation
518        if (CellUtil.isDelete(cell)) {
519          m = new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
520          // Deletes don't have nonces.
521          mutations.add(new MutationReplay(ClientProtos.MutationProto.MutationType.DELETE, m,
522            HConstants.NO_NONCE, HConstants.NO_NONCE));
523        } else {
524          m = new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
525          // Puts might come from increment or append, thus we need nonces.
526          long nonceGroup =
527            entry.getKey().hasNonceGroup() ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
528          long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
529          mutations.add(
530            new MutationReplay(ClientProtos.MutationProto.MutationType.PUT, m, nonceGroup, nonce));
531        }
532      }
533      if (CellUtil.isDelete(cell)) {
534        ((Delete) m).add(cell);
535      } else {
536        ((Put) m).add(cell);
537      }
538      m.setDurability(durability);
539      previousCell = cell;
540    }
541
542    // reconstruct WALKey
543    if (logEntry != null) {
544      org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALKey walKeyProto =
545        entry.getKey();
546      List<UUID> clusterIds = new ArrayList<>(walKeyProto.getClusterIdsCount());
547      for (HBaseProtos.UUID uuid : entry.getKey().getClusterIdsList()) {
548        clusterIds.add(new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits()));
549      }
550      key = new WALKeyImpl(walKeyProto.getEncodedRegionName().toByteArray(),
551        TableName.valueOf(walKeyProto.getTableName().toByteArray()), replaySeqId,
552        walKeyProto.getWriteTime(), clusterIds, walKeyProto.getNonceGroup(), walKeyProto.getNonce(),
553        null);
554      logEntry.setFirst(key);
555      logEntry.setSecond(val);
556    }
557
558    return mutations;
559  }
560
561  /**
562   * Return path to recovered.hfiles directory of the region's column family: e.g.
563   * /hbase/some_table/2323432434/cf/recovered.hfiles/. This method also ensures existence of
564   * recovered.hfiles directory under the region's column family, creating it if necessary.
565   * @param rootFS            the root file system
566   * @param conf              configuration
567   * @param tableName         the table name
568   * @param encodedRegionName the encoded region name
569   * @param familyName        the column family name
570   * @return Path to recovered.hfiles directory of the region's column family.
571   */
572  static Path tryCreateRecoveredHFilesDir(FileSystem rootFS, Configuration conf,
573    TableName tableName, String encodedRegionName, String familyName) throws IOException {
574    Path rootDir = CommonFSUtils.getRootDir(conf);
575    Path regionDir = FSUtils.getRegionDirFromTableDir(CommonFSUtils.getTableDir(rootDir, tableName),
576      encodedRegionName);
577    Path dir = getRecoveredHFilesDir(regionDir, familyName);
578    if (!rootFS.exists(dir) && !rootFS.mkdirs(dir)) {
579      LOG.warn("mkdir failed on {}, region {}, column family {}", dir, encodedRegionName,
580        familyName);
581    }
582    return dir;
583  }
584
585  /**
586   * @param regionDir  This regions directory in the filesystem
587   * @param familyName The column family name
588   * @return The directory that holds recovered hfiles for the region's column family
589   */
590  private static Path getRecoveredHFilesDir(final Path regionDir, String familyName) {
591    return new Path(new Path(regionDir, familyName), HConstants.RECOVERED_HFILES_DIR);
592  }
593
594  public static FileStatus[] getRecoveredHFiles(final FileSystem rootFS, final Path regionDir,
595    String familyName) throws IOException {
596    Path dir = getRecoveredHFilesDir(regionDir, familyName);
597    return CommonFSUtils.listStatus(rootFS, dir);
598  }
599}