001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Arrays;
024import java.util.Collections;
025import java.util.List;
026import java.util.NavigableSet;
027import java.util.TreeSet;
028import java.util.UUID;
029import java.util.regex.Matcher;
030import java.util.regex.Pattern;
031import org.apache.commons.lang3.ArrayUtils;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FileAlreadyExistsException;
034import org.apache.hadoop.fs.FileStatus;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.fs.PathFilter;
038import org.apache.hadoop.hbase.Cell;
039import org.apache.hadoop.hbase.CellScanner;
040import org.apache.hadoop.hbase.CellUtil;
041import org.apache.hadoop.hbase.HConstants;
042import org.apache.hadoop.hbase.TableName;
043import org.apache.hadoop.hbase.client.Delete;
044import org.apache.hadoop.hbase.client.Durability;
045import org.apache.hadoop.hbase.client.Mutation;
046import org.apache.hadoop.hbase.client.Put;
047import org.apache.hadoop.hbase.client.RegionInfo;
048import org.apache.hadoop.hbase.client.Row;
049import org.apache.hadoop.hbase.regionserver.HRegion;
050import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
051import org.apache.hadoop.hbase.util.Bytes;
052import org.apache.hadoop.hbase.util.CommonFSUtils;
053import org.apache.hadoop.hbase.util.ConcurrentMapUtils.IOExceptionSupplier;
054import org.apache.hadoop.hbase.util.FSUtils;
055import org.apache.hadoop.hbase.util.Pair;
056import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
057import org.apache.yetus.audience.InterfaceAudience;
058import org.slf4j.Logger;
059import org.slf4j.LoggerFactory;
060
061import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
062
063import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
064import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
065import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
066
067/**
068 * This class provides static methods to support WAL splitting related works
069 */
070@InterfaceAudience.Private
071public final class WALSplitUtil {
072  private static final Logger LOG = LoggerFactory.getLogger(WALSplitUtil.class);
073
074  private static final Pattern EDITFILES_NAME_PATTERN = Pattern.compile("-?[0-9]+");
075  private static final String RECOVERED_LOG_TMPFILE_SUFFIX = ".temp";
076  private static final String SEQUENCE_ID_FILE_SUFFIX = ".seqid";
077  private static final String OLD_SEQUENCE_ID_FILE_SUFFIX = "_seqid";
078  private static final int SEQUENCE_ID_FILE_SUFFIX_LENGTH = SEQUENCE_ID_FILE_SUFFIX.length();
079
080  private WALSplitUtil() {
081  }
082
083  /**
084   * Completes the work done by splitLogFile by archiving logs
085   * <p>
086   * It is invoked by SplitLogManager once it knows that one of the SplitLogWorkers have completed
087   * the splitLogFile() part. If the master crashes then this function might get called multiple
088   * times.
089   * <p>
090   * @param logfile
091   * @param conf
092   * @throws IOException
093   */
094  public static void finishSplitLogFile(String logfile, Configuration conf) throws IOException {
095    Path walDir = CommonFSUtils.getWALRootDir(conf);
096    Path oldLogDir = new Path(walDir, HConstants.HREGION_OLDLOGDIR_NAME);
097    Path walPath;
098    if (CommonFSUtils.isStartingWithPath(walDir, logfile)) {
099      walPath = new Path(logfile);
100    } else {
101      walPath = new Path(walDir, logfile);
102    }
103    finishSplitLogFile(walDir, oldLogDir, walPath, conf);
104  }
105
106  static void finishSplitLogFile(Path walDir, Path oldWALDir, Path walPath,
107      Configuration conf) throws IOException {
108    List<Path> processedLogs = new ArrayList<>();
109    List<Path> corruptedLogs = new ArrayList<>();
110    FileSystem walFS = walDir.getFileSystem(conf);
111    if (ZKSplitLog.isCorrupted(walDir, walPath.getName(), walFS)) {
112      corruptedLogs.add(walPath);
113    } else {
114      processedLogs.add(walPath);
115    }
116    archiveWALs(corruptedLogs, processedLogs, oldWALDir, walFS, conf);
117    Path stagingDir = ZKSplitLog.getSplitLogDir(walDir, walPath.getName());
118    walFS.delete(stagingDir, true);
119  }
120
121  /**
122   * Moves processed logs to a oldLogDir after successful processing Moves corrupted logs (any log
123   * that couldn't be successfully parsed to corruptDir (.corrupt) for later investigation
124   */
125  private static void archiveWALs(final List<Path> corruptedWALs, final List<Path> processedWALs,
126      final Path oldWALDir, final FileSystem walFS, final Configuration conf) throws IOException {
127    final Path corruptDir =
128      new Path(CommonFSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME);
129    if (conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir") != null) {
130      LOG.warn("hbase.regionserver.hlog.splitlog.corrupt.dir is deprecated. Default to {}",
131        corruptDir);
132    }
133    if (!walFS.mkdirs(corruptDir)) {
134      LOG.info("Unable to mkdir {}", corruptDir);
135    }
136    walFS.mkdirs(oldWALDir);
137
138    // this method can get restarted or called multiple times for archiving
139    // the same log files.
140    for (Path corruptedWAL : corruptedWALs) {
141      Path p = new Path(corruptDir, corruptedWAL.getName());
142      if (walFS.exists(corruptedWAL)) {
143        if (!walFS.rename(corruptedWAL, p)) {
144          LOG.warn("Unable to move corrupted log {} to {}", corruptedWAL, p);
145        } else {
146          LOG.warn("Moved corrupted log {} to {}", corruptedWAL, p);
147        }
148      }
149    }
150
151    for (Path p : processedWALs) {
152      Path newPath = AbstractFSWAL.getWALArchivePath(oldWALDir, p);
153      if (walFS.exists(p)) {
154        if (!CommonFSUtils.renameAndSetModifyTime(walFS, p, newPath)) {
155          LOG.warn("Unable to move {} to {}", p, newPath);
156        } else {
157          LOG.info("Archived processed log {} to {}", p, newPath);
158        }
159      }
160    }
161  }
162
163  /**
164   * Path to a file under RECOVERED_EDITS_DIR directory of the region found in <code>logEntry</code>
165   * named for the sequenceid in the passed <code>logEntry</code>: e.g.
166   * /hbase/some_table/2323432434/recovered.edits/2332. This method also ensures existence of
167   * RECOVERED_EDITS_DIR under the region creating it if necessary.
168   * @param tableName the table name
169   * @param encodedRegionName the encoded region name
170   * @param seqId the sequence id which used to generate file name
171   * @param fileNameBeingSplit the file being split currently. Used to generate tmp file name.
172   * @param tmpDirName of the directory used to sideline old recovered edits file
173   * @param conf configuration
174   * @return Path to file into which to dump split log edits.
175   * @throws IOException
176   */
177  @SuppressWarnings("deprecation")
178  @VisibleForTesting
179  static Path getRegionSplitEditsPath(TableName tableName, byte[] encodedRegionName, long seqId,
180      String fileNameBeingSplit, String tmpDirName, Configuration conf) throws IOException {
181    FileSystem walFS = CommonFSUtils.getWALFileSystem(conf);
182    Path tableDir = CommonFSUtils.getWALTableDir(conf, tableName);
183    String encodedRegionNameStr = Bytes.toString(encodedRegionName);
184    Path regionDir = HRegion.getRegionDir(tableDir, encodedRegionNameStr);
185    Path dir = getRegionDirRecoveredEditsDir(regionDir);
186
187    if (walFS.exists(dir) && walFS.isFile(dir)) {
188      Path tmp = new Path(tmpDirName);
189      if (!walFS.exists(tmp)) {
190        walFS.mkdirs(tmp);
191      }
192      tmp = new Path(tmp, HConstants.RECOVERED_EDITS_DIR + "_" + encodedRegionNameStr);
193      LOG.warn("Found existing old file: {}. It could be some "
194          + "leftover of an old installation. It should be a folder instead. "
195          + "So moving it to {}",
196        dir, tmp);
197      if (!walFS.rename(dir, tmp)) {
198        LOG.warn("Failed to sideline old file {}", dir);
199      }
200    }
201
202    if (!walFS.exists(dir) && !walFS.mkdirs(dir)) {
203      LOG.warn("mkdir failed on {}", dir);
204    }
205    // Append fileBeingSplit to prevent name conflict since we may have duplicate wal entries now.
206    // Append file name ends with RECOVERED_LOG_TMPFILE_SUFFIX to ensure
207    // region's replayRecoveredEdits will not delete it
208    String fileName = formatRecoveredEditsFileName(seqId);
209    fileName = getTmpRecoveredEditsFileName(fileName + "-" + fileNameBeingSplit);
210    return new Path(dir, fileName);
211  }
212
213  private static String getTmpRecoveredEditsFileName(String fileName) {
214    return fileName + RECOVERED_LOG_TMPFILE_SUFFIX;
215  }
216
217  /**
218   * Get the completed recovered edits file path, renaming it to be by last edit in the file from
219   * its first edit. Then we could use the name to skip recovered edits when doing
220   * {@link HRegion#replayRecoveredEditsIfAny}.
221   * @return dstPath take file's last edit log seq num as the name
222   */
223  static Path getCompletedRecoveredEditsFilePath(Path srcPath, long maximumEditWALSeqNum) {
224    String fileName = formatRecoveredEditsFileName(maximumEditWALSeqNum);
225    return new Path(srcPath.getParent(), fileName);
226  }
227
228  @VisibleForTesting
229  static String formatRecoveredEditsFileName(final long seqid) {
230    return String.format("%019d", seqid);
231  }
232
233  /**
234   * @param regionDir This regions directory in the filesystem.
235   * @return The directory that holds recovered edits files for the region <code>regionDir</code>
236   */
237  public static Path getRegionDirRecoveredEditsDir(final Path regionDir) {
238    return new Path(regionDir, HConstants.RECOVERED_EDITS_DIR);
239  }
240
241  /**
242   * Check whether there is recovered.edits in the region dir
243   * @param conf conf
244   * @param regionInfo the region to check
245   * @return true if recovered.edits exist in the region dir
246   */
247  public static boolean hasRecoveredEdits(final Configuration conf, final RegionInfo regionInfo)
248      throws IOException {
249    // No recovered.edits for non default replica regions
250    if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
251      return false;
252    }
253    // Only default replica region can reach here, so we can use regioninfo
254    // directly without converting it to default replica's regioninfo.
255    Path regionWALDir =
256      CommonFSUtils.getWALRegionDir(conf, regionInfo.getTable(), regionInfo.getEncodedName());
257    Path regionDir = FSUtils.getRegionDirFromRootDir(CommonFSUtils.getRootDir(conf), regionInfo);
258    Path wrongRegionWALDir =
259      CommonFSUtils.getWrongWALRegionDir(conf, regionInfo.getTable(), regionInfo.getEncodedName());
260    FileSystem walFs = CommonFSUtils.getWALFileSystem(conf);
261    FileSystem rootFs = CommonFSUtils.getRootDirFileSystem(conf);
262    NavigableSet<Path> files = getSplitEditFilesSorted(walFs, regionWALDir);
263    if (!files.isEmpty()) {
264      return true;
265    }
266    files = getSplitEditFilesSorted(rootFs, regionDir);
267    if (!files.isEmpty()) {
268      return true;
269    }
270    files = getSplitEditFilesSorted(walFs, wrongRegionWALDir);
271    return !files.isEmpty();
272  }
273
274  /**
275   * This method will check 3 places for finding the max sequence id file. One is the expected
276   * place, another is the old place under the region directory, and the last one is the wrong one
277   * we introduced in HBASE-20734. See HBASE-22617 for more details.
278   * <p/>
279   * Notice that, you should always call this method instead of
280   * {@link #getMaxRegionSequenceId(FileSystem, Path)} until 4.0.0 release.
281   * @deprecated Only for compatibility, will be removed in 4.0.0.
282   */
283  @Deprecated
284  public static long getMaxRegionSequenceId(Configuration conf, RegionInfo region,
285    IOExceptionSupplier<FileSystem> rootFsSupplier, IOExceptionSupplier<FileSystem> walFsSupplier)
286    throws IOException {
287    FileSystem rootFs = rootFsSupplier.get();
288    FileSystem walFs = walFsSupplier.get();
289    Path regionWALDir =
290      CommonFSUtils.getWALRegionDir(conf, region.getTable(), region.getEncodedName());
291    // This is the old place where we store max sequence id file
292    Path regionDir = FSUtils.getRegionDirFromRootDir(CommonFSUtils.getRootDir(conf), region);
293    // This is for HBASE-20734, where we use a wrong directory, see HBASE-22617 for more details.
294    Path wrongRegionWALDir =
295      CommonFSUtils.getWrongWALRegionDir(conf, region.getTable(), region.getEncodedName());
296    long maxSeqId = getMaxRegionSequenceId(walFs, regionWALDir);
297    maxSeqId = Math.max(maxSeqId, getMaxRegionSequenceId(rootFs, regionDir));
298    maxSeqId = Math.max(maxSeqId, getMaxRegionSequenceId(walFs, wrongRegionWALDir));
299    return maxSeqId;
300  }
301
302  /**
303   * Returns sorted set of edit files made by splitter, excluding files with '.temp' suffix.
304   * @param walFS WAL FileSystem used to retrieving split edits files.
305   * @param regionDir WAL region dir to look for recovered edits files under.
306   * @return Files in passed <code>regionDir</code> as a sorted set.
307   * @throws IOException
308   */
309  public static NavigableSet<Path> getSplitEditFilesSorted(final FileSystem walFS,
310      final Path regionDir) throws IOException {
311    NavigableSet<Path> filesSorted = new TreeSet<>();
312    Path editsdir = getRegionDirRecoveredEditsDir(regionDir);
313    if (!walFS.exists(editsdir)) {
314      return filesSorted;
315    }
316    FileStatus[] files = CommonFSUtils.listStatus(walFS, editsdir, new PathFilter() {
317      @Override
318      public boolean accept(Path p) {
319        boolean result = false;
320        try {
321          // Return files and only files that match the editfile names pattern.
322          // There can be other files in this directory other than edit files.
323          // In particular, on error, we'll move aside the bad edit file giving
324          // it a timestamp suffix. See moveAsideBadEditsFile.
325          Matcher m = EDITFILES_NAME_PATTERN.matcher(p.getName());
326          result = walFS.isFile(p) && m.matches();
327          // Skip the file whose name ends with RECOVERED_LOG_TMPFILE_SUFFIX,
328          // because it means splitwal thread is writting this file.
329          if (p.getName().endsWith(RECOVERED_LOG_TMPFILE_SUFFIX)) {
330            result = false;
331          }
332          // Skip SeqId Files
333          if (isSequenceIdFile(p)) {
334            result = false;
335          }
336        } catch (IOException e) {
337          LOG.warn("Failed isFile check on {}", p, e);
338        }
339        return result;
340      }
341    });
342    if (ArrayUtils.isNotEmpty(files)) {
343      Arrays.asList(files).forEach(status -> filesSorted.add(status.getPath()));
344    }
345    return filesSorted;
346  }
347
348  /**
349   * Move aside a bad edits file.
350   * @param fs the file system used to rename bad edits file.
351   * @param edits Edits file to move aside.
352   * @return The name of the moved aside file.
353   * @throws IOException
354   */
355  public static Path moveAsideBadEditsFile(final FileSystem fs, final Path edits)
356      throws IOException {
357    Path moveAsideName =
358        new Path(edits.getParent(), edits.getName() + "." + System.currentTimeMillis());
359    if (!fs.rename(edits, moveAsideName)) {
360      LOG.warn("Rename failed from {} to {}", edits, moveAsideName);
361    }
362    return moveAsideName;
363  }
364
365  /**
366   * Is the given file a region open sequence id file.
367   */
368  @VisibleForTesting
369  public static boolean isSequenceIdFile(final Path file) {
370    return file.getName().endsWith(SEQUENCE_ID_FILE_SUFFIX)
371        || file.getName().endsWith(OLD_SEQUENCE_ID_FILE_SUFFIX);
372  }
373
374  private static FileStatus[] getSequenceIdFiles(FileSystem walFS, Path regionDir)
375      throws IOException {
376    // TODO: Why are we using a method in here as part of our normal region open where
377    // there is no splitting involved? Fix. St.Ack 01/20/2017.
378    Path editsDir = getRegionDirRecoveredEditsDir(regionDir);
379    try {
380      FileStatus[] files = walFS.listStatus(editsDir, WALSplitUtil::isSequenceIdFile);
381      return files != null ? files : new FileStatus[0];
382    } catch (FileNotFoundException e) {
383      return new FileStatus[0];
384    }
385  }
386
387  private static long getMaxSequenceId(FileStatus[] files) {
388    long maxSeqId = -1L;
389    for (FileStatus file : files) {
390      String fileName = file.getPath().getName();
391      try {
392        maxSeqId = Math.max(maxSeqId, Long
393            .parseLong(fileName.substring(0, fileName.length() - SEQUENCE_ID_FILE_SUFFIX_LENGTH)));
394      } catch (NumberFormatException ex) {
395        LOG.warn("Invalid SeqId File Name={}", fileName);
396      }
397    }
398    return maxSeqId;
399  }
400
401  /**
402   * Get the max sequence id which is stored in the region directory. -1 if none.
403   */
404  public static long getMaxRegionSequenceId(FileSystem walFS, Path regionDir) throws IOException {
405    return getMaxSequenceId(getSequenceIdFiles(walFS, regionDir));
406  }
407
408  /**
409   * Create a file with name as region's max sequence id
410   */
411  public static void writeRegionSequenceIdFile(FileSystem walFS, Path regionDir, long newMaxSeqId)
412      throws IOException {
413    FileStatus[] files = getSequenceIdFiles(walFS, regionDir);
414    long maxSeqId = getMaxSequenceId(files);
415    if (maxSeqId > newMaxSeqId) {
416      throw new IOException("The new max sequence id " + newMaxSeqId
417          + " is less than the old max sequence id " + maxSeqId);
418    }
419    // write a new seqId file
420    Path newSeqIdFile =
421        new Path(getRegionDirRecoveredEditsDir(regionDir), newMaxSeqId + SEQUENCE_ID_FILE_SUFFIX);
422    if (newMaxSeqId != maxSeqId) {
423      try {
424        if (!walFS.createNewFile(newSeqIdFile) && !walFS.exists(newSeqIdFile)) {
425          throw new IOException("Failed to create SeqId file:" + newSeqIdFile);
426        }
427        LOG.debug("Wrote file={}, newMaxSeqId={}, maxSeqId={}", newSeqIdFile, newMaxSeqId,
428          maxSeqId);
429      } catch (FileAlreadyExistsException ignored) {
430        // latest hdfs throws this exception. it's all right if newSeqIdFile already exists
431      }
432    }
433    // remove old ones
434    for (FileStatus status : files) {
435      if (!newSeqIdFile.equals(status.getPath())) {
436        walFS.delete(status.getPath(), false);
437      }
438    }
439  }
440
441  /** A struct used by getMutationsFromWALEntry */
442  public static class MutationReplay implements Comparable<MutationReplay> {
443    public MutationReplay(ClientProtos.MutationProto.MutationType type, Mutation mutation,
444        long nonceGroup, long nonce) {
445      this.type = type;
446      this.mutation = mutation;
447      if (this.mutation.getDurability() != Durability.SKIP_WAL) {
448        // using ASYNC_WAL for relay
449        this.mutation.setDurability(Durability.ASYNC_WAL);
450      }
451      this.nonceGroup = nonceGroup;
452      this.nonce = nonce;
453    }
454
455    private final ClientProtos.MutationProto.MutationType type;
456    public final Mutation mutation;
457    public final long nonceGroup;
458    public final long nonce;
459
460    @Override
461    public int compareTo(final MutationReplay d) {
462      return Row.COMPARATOR.compare(mutation, d.mutation);
463    }
464
465    @Override
466    public boolean equals(Object obj) {
467      if (!(obj instanceof MutationReplay)) {
468        return false;
469      } else {
470        return this.compareTo((MutationReplay) obj) == 0;
471      }
472    }
473
474    @Override
475    public int hashCode() {
476      return this.mutation.hashCode();
477    }
478
479    public ClientProtos.MutationProto.MutationType getType() {
480      return type;
481    }
482  }
483
484  /**
485   * This function is used to construct mutations from a WALEntry. It also reconstructs WALKey &amp;
486   * WALEdit from the passed in WALEntry
487   * @param entry
488   * @param cells
489   * @param logEntry pair of WALKey and WALEdit instance stores WALKey and WALEdit instances
490   *          extracted from the passed in WALEntry.
491   * @return list of Pair&lt;MutationType, Mutation&gt; to be replayed
492   * @throws IOException
493   */
494  public static List<MutationReplay> getMutationsFromWALEntry(AdminProtos.WALEntry entry,
495      CellScanner cells, Pair<WALKey, WALEdit> logEntry, Durability durability) throws IOException {
496    if (entry == null) {
497      // return an empty array
498      return Collections.emptyList();
499    }
500
501    long replaySeqId =
502        (entry.getKey().hasOrigSequenceNumber()) ? entry.getKey().getOrigSequenceNumber()
503            : entry.getKey().getLogSequenceNumber();
504    int count = entry.getAssociatedCellCount();
505    List<MutationReplay> mutations = new ArrayList<>();
506    Cell previousCell = null;
507    Mutation m = null;
508    WALKeyImpl key = null;
509    WALEdit val = null;
510    if (logEntry != null) {
511      val = new WALEdit();
512    }
513
514    for (int i = 0; i < count; i++) {
515      // Throw index out of bounds if our cell count is off
516      if (!cells.advance()) {
517        throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
518      }
519      Cell cell = cells.current();
520      if (val != null) val.add(cell);
521
522      boolean isNewRowOrType =
523          previousCell == null || previousCell.getTypeByte() != cell.getTypeByte()
524              || !CellUtil.matchingRows(previousCell, cell);
525      if (isNewRowOrType) {
526        // Create new mutation
527        if (CellUtil.isDelete(cell)) {
528          m = new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
529          // Deletes don't have nonces.
530          mutations.add(new MutationReplay(ClientProtos.MutationProto.MutationType.DELETE, m,
531              HConstants.NO_NONCE, HConstants.NO_NONCE));
532        } else {
533          m = new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
534          // Puts might come from increment or append, thus we need nonces.
535          long nonceGroup =
536              entry.getKey().hasNonceGroup() ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
537          long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
538          mutations.add(
539            new MutationReplay(ClientProtos.MutationProto.MutationType.PUT, m, nonceGroup, nonce));
540        }
541      }
542      if (CellUtil.isDelete(cell)) {
543        ((Delete) m).add(cell);
544      } else {
545        ((Put) m).add(cell);
546      }
547      m.setDurability(durability);
548      previousCell = cell;
549    }
550
551    // reconstruct WALKey
552    if (logEntry != null) {
553      org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALKey walKeyProto =
554          entry.getKey();
555      List<UUID> clusterIds = new ArrayList<>(walKeyProto.getClusterIdsCount());
556      for (HBaseProtos.UUID uuid : entry.getKey().getClusterIdsList()) {
557        clusterIds.add(new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits()));
558      }
559      key = new WALKeyImpl(walKeyProto.getEncodedRegionName().toByteArray(),
560          TableName.valueOf(walKeyProto.getTableName().toByteArray()), replaySeqId,
561          walKeyProto.getWriteTime(), clusterIds, walKeyProto.getNonceGroup(),
562          walKeyProto.getNonce(), null);
563      logEntry.setFirst(key);
564      logEntry.setSecond(val);
565    }
566
567    return mutations;
568  }
569
570  /**
571   * Return path to recovered.hfiles directory of the region's column family: e.g.
572   * /hbase/some_table/2323432434/cf/recovered.hfiles/. This method also ensures existence of
573   * recovered.hfiles directory under the region's column family, creating it if necessary.
574   * @param rootFS the root file system
575   * @param conf configuration
576   * @param tableName the table name
577   * @param encodedRegionName the encoded region name
578   * @param familyName the column family name
579   * @param seqId the sequence id which used to generate file name
580   * @param fileNameBeingSplit the file being split currently. Used to generate tmp file name
581   * @return Path to recovered.hfiles directory of the region's column family.
582   */
583  static Path tryCreateRecoveredHFilesDir(FileSystem rootFS, Configuration conf,
584      TableName tableName, String encodedRegionName, String familyName) throws IOException {
585    Path rootDir = CommonFSUtils.getRootDir(conf);
586    Path regionDir = FSUtils.getRegionDirFromTableDir(CommonFSUtils.getTableDir(rootDir, tableName),
587      encodedRegionName);
588    Path dir = getRecoveredHFilesDir(regionDir, familyName);
589    if (!rootFS.exists(dir) && !rootFS.mkdirs(dir)) {
590      LOG.warn("mkdir failed on {}, region {}, column family {}", dir, encodedRegionName,
591        familyName);
592    }
593    return dir;
594  }
595
596  /**
597   * @param regionDir  This regions directory in the filesystem
598   * @param familyName The column family name
599   * @return The directory that holds recovered hfiles for the region's column family
600   */
601  private static Path getRecoveredHFilesDir(final Path regionDir, String familyName) {
602    return new Path(new Path(regionDir, familyName), HConstants.RECOVERED_HFILES_DIR);
603  }
604
605  public static FileStatus[] getRecoveredHFiles(final FileSystem rootFS,
606      final Path regionDir, String familyName) throws IOException {
607    Path dir = getRecoveredHFilesDir(regionDir, familyName);
608    return CommonFSUtils.listStatus(rootFS, dir);
609  }
610}