001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Arrays;
024import java.util.Collections;
025import java.util.List;
026import java.util.NavigableSet;
027import java.util.TreeSet;
028import java.util.UUID;
029import java.util.regex.Matcher;
030import java.util.regex.Pattern;
031import org.apache.commons.lang3.ArrayUtils;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FileAlreadyExistsException;
034import org.apache.hadoop.fs.FileStatus;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.fs.PathFilter;
038import org.apache.hadoop.hbase.Cell;
039import org.apache.hadoop.hbase.CellScanner;
040import org.apache.hadoop.hbase.CellUtil;
041import org.apache.hadoop.hbase.HConstants;
042import org.apache.hadoop.hbase.TableName;
043import org.apache.hadoop.hbase.client.Delete;
044import org.apache.hadoop.hbase.client.Durability;
045import org.apache.hadoop.hbase.client.Mutation;
046import org.apache.hadoop.hbase.client.Put;
047import org.apache.hadoop.hbase.client.RegionInfo;
048import org.apache.hadoop.hbase.regionserver.HRegion;
049import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
050import org.apache.hadoop.hbase.util.Bytes;
051import org.apache.hadoop.hbase.util.ConcurrentMapUtils.IOExceptionSupplier;
052import org.apache.hadoop.hbase.util.FSUtils;
053import org.apache.hadoop.hbase.util.Pair;
054import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
055import org.apache.yetus.audience.InterfaceAudience;
056import org.slf4j.Logger;
057import org.slf4j.LoggerFactory;
058
059import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
060
061import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
062import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
063import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
064
065/**
066 * This class provides static methods to support WAL splitting related works
067 */
068@InterfaceAudience.Private
069public final class WALSplitUtil {
070  private static final Logger LOG = LoggerFactory.getLogger(WALSplitUtil.class);
071
072  private static final Pattern EDITFILES_NAME_PATTERN = Pattern.compile("-?[0-9]+");
073  private static final String RECOVERED_LOG_TMPFILE_SUFFIX = ".temp";
074  private static final String SEQUENCE_ID_FILE_SUFFIX = ".seqid";
075  private static final String OLD_SEQUENCE_ID_FILE_SUFFIX = "_seqid";
076  private static final int SEQUENCE_ID_FILE_SUFFIX_LENGTH = SEQUENCE_ID_FILE_SUFFIX.length();
077
078  private WALSplitUtil() {
079  }
080
081  /**
082   * Completes the work done by splitLogFile by archiving logs
083   * <p>
084   * It is invoked by SplitLogManager once it knows that one of the SplitLogWorkers have completed
085   * the splitLogFile() part. If the master crashes then this function might get called multiple
086   * times.
087   * <p>
088   * @param logfile
089   * @param conf
090   * @throws IOException
091   */
092  public static void finishSplitLogFile(String logfile, Configuration conf) throws IOException {
093    Path walDir = FSUtils.getWALRootDir(conf);
094    Path oldLogDir = new Path(walDir, HConstants.HREGION_OLDLOGDIR_NAME);
095    Path walPath;
096    if (FSUtils.isStartingWithPath(walDir, logfile)) {
097      walPath = new Path(logfile);
098    } else {
099      walPath = new Path(walDir, logfile);
100    }
101    finishSplitLogFile(walDir, oldLogDir, walPath, conf);
102  }
103
104  static void finishSplitLogFile(Path walDir, Path oldWALDir, Path walPath,
105      Configuration conf) throws IOException {
106    List<Path> processedLogs = new ArrayList<>();
107    List<Path> corruptedLogs = new ArrayList<>();
108    FileSystem walFS = walDir.getFileSystem(conf);
109    if (ZKSplitLog.isCorrupted(walDir, walPath.getName(), walFS)) {
110      corruptedLogs.add(walPath);
111    } else {
112      processedLogs.add(walPath);
113    }
114    archiveWALs(corruptedLogs, processedLogs, oldWALDir, walFS, conf);
115    Path stagingDir = ZKSplitLog.getSplitLogDir(walDir, walPath.getName());
116    walFS.delete(stagingDir, true);
117  }
118
119  /**
120   * Moves processed logs to a oldLogDir after successful processing Moves corrupted logs (any log
121   * that couldn't be successfully parsed to corruptDir (.corrupt) for later investigation
122   */
123  private static void archiveWALs(final List<Path> corruptedWALs, final List<Path> processedWALs,
124      final Path oldWALDir, final FileSystem walFS, final Configuration conf) throws IOException {
125    final Path corruptDir = new Path(FSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME);
126    if (conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir") != null) {
127      LOG.warn("hbase.regionserver.hlog.splitlog.corrupt.dir is deprecated. Default to {}",
128        corruptDir);
129    }
130    if (!walFS.mkdirs(corruptDir)) {
131      LOG.info("Unable to mkdir {}", corruptDir);
132    }
133    walFS.mkdirs(oldWALDir);
134
135    // this method can get restarted or called multiple times for archiving
136    // the same log files.
137    for (Path corruptedWAL : corruptedWALs) {
138      Path p = new Path(corruptDir, corruptedWAL.getName());
139      if (walFS.exists(corruptedWAL)) {
140        if (!walFS.rename(corruptedWAL, p)) {
141          LOG.warn("Unable to move corrupted log {} to {}", corruptedWAL, p);
142        } else {
143          LOG.warn("Moved corrupted log {} to {}", corruptedWAL, p);
144        }
145      }
146    }
147
148    for (Path p : processedWALs) {
149      Path newPath = AbstractFSWAL.getWALArchivePath(oldWALDir, p);
150      if (walFS.exists(p)) {
151        if (!FSUtils.renameAndSetModifyTime(walFS, p, newPath)) {
152          LOG.warn("Unable to move {} to {}", p, newPath);
153        } else {
154          LOG.info("Archived processed log {} to {}", p, newPath);
155        }
156      }
157    }
158  }
159
160  /**
161   * Path to a file under RECOVERED_EDITS_DIR directory of the region found in <code>logEntry</code>
162   * named for the sequenceid in the passed <code>logEntry</code>: e.g.
163   * /hbase/some_table/2323432434/recovered.edits/2332. This method also ensures existence of
164   * RECOVERED_EDITS_DIR under the region creating it if necessary.
165   * @param walEntry walEntry to recover
166   * @param fileNameBeingSplit the file being split currently. Used to generate tmp file name.
167   * @param tmpDirName of the directory used to sideline old recovered edits file
168   * @param conf configuration
169   * @return Path to file into which to dump split log edits.
170   * @throws IOException
171   */
172  @SuppressWarnings("deprecation")
173  @VisibleForTesting
174  static Path getRegionSplitEditsPath(final WAL.Entry walEntry, String fileNameBeingSplit,
175      String tmpDirName, Configuration conf) throws IOException {
176    FileSystem walFS = FSUtils.getWALFileSystem(conf);
177    Path tableDir = FSUtils.getWALTableDir(conf, walEntry.getKey().getTableName());
178    String encodedRegionName = Bytes.toString(walEntry.getKey().getEncodedRegionName());
179    Path regionDir = HRegion.getRegionDir(tableDir, encodedRegionName);
180    Path dir = getRegionDirRecoveredEditsDir(regionDir);
181
182    if (walFS.exists(dir) && walFS.isFile(dir)) {
183      Path tmp = new Path(tmpDirName);
184      if (!walFS.exists(tmp)) {
185        walFS.mkdirs(tmp);
186      }
187      tmp = new Path(tmp, HConstants.RECOVERED_EDITS_DIR + "_" + encodedRegionName);
188      LOG.warn("Found existing old file: {}. It could be some "
189          + "leftover of an old installation. It should be a folder instead. "
190          + "So moving it to {}",
191        dir, tmp);
192      if (!walFS.rename(dir, tmp)) {
193        LOG.warn("Failed to sideline old file {}", dir);
194      }
195    }
196
197    if (!walFS.exists(dir) && !walFS.mkdirs(dir)) {
198      LOG.warn("mkdir failed on {}", dir);
199    }
200    // Append fileBeingSplit to prevent name conflict since we may have duplicate wal entries now.
201    // Append file name ends with RECOVERED_LOG_TMPFILE_SUFFIX to ensure
202    // region's replayRecoveredEdits will not delete it
203    String fileName = formatRecoveredEditsFileName(walEntry.getKey().getSequenceId());
204    fileName = getTmpRecoveredEditsFileName(fileName + "-" + fileNameBeingSplit);
205    return new Path(dir, fileName);
206  }
207
208  private static String getTmpRecoveredEditsFileName(String fileName) {
209    return fileName + RECOVERED_LOG_TMPFILE_SUFFIX;
210  }
211
212  /**
213   * Get the completed recovered edits file path, renaming it to be by last edit in the file from
214   * its first edit. Then we could use the name to skip recovered edits when doing
215   * {@link HRegion#replayRecoveredEditsIfAny}.
216   * @return dstPath take file's last edit log seq num as the name
217   */
218  static Path getCompletedRecoveredEditsFilePath(Path srcPath, long maximumEditWALSeqNum) {
219    String fileName = formatRecoveredEditsFileName(maximumEditWALSeqNum);
220    return new Path(srcPath.getParent(), fileName);
221  }
222
223  @VisibleForTesting
224  static String formatRecoveredEditsFileName(final long seqid) {
225    return String.format("%019d", seqid);
226  }
227
228  /**
229   * @param regionDir This regions directory in the filesystem.
230   * @return The directory that holds recovered edits files for the region <code>regionDir</code>
231   */
232  public static Path getRegionDirRecoveredEditsDir(final Path regionDir) {
233    return new Path(regionDir, HConstants.RECOVERED_EDITS_DIR);
234  }
235
236  /**
237   * Check whether there is recovered.edits in the region dir
238   * @param conf conf
239   * @param regionInfo the region to check
240   * @return true if recovered.edits exist in the region dir
241   */
242  public static boolean hasRecoveredEdits(final Configuration conf, final RegionInfo regionInfo)
243      throws IOException {
244    // No recovered.edits for non default replica regions
245    if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
246      return false;
247    }
248    // Only default replica region can reach here, so we can use regioninfo
249    // directly without converting it to default replica's regioninfo.
250    Path regionWALDir =
251        FSUtils.getWALRegionDir(conf, regionInfo.getTable(), regionInfo.getEncodedName());
252    Path regionDir = FSUtils.getRegionDirFromRootDir(FSUtils.getRootDir(conf), regionInfo);
253    Path wrongRegionWALDir =
254        FSUtils.getWrongWALRegionDir(conf, regionInfo.getTable(), regionInfo.getEncodedName());
255    FileSystem walFs = FSUtils.getWALFileSystem(conf);
256    FileSystem rootFs = FSUtils.getRootDirFileSystem(conf);
257    NavigableSet<Path> files = getSplitEditFilesSorted(walFs, regionWALDir);
258    if (!files.isEmpty()) {
259      return true;
260    }
261    files = getSplitEditFilesSorted(rootFs, regionDir);
262    if (!files.isEmpty()) {
263      return true;
264    }
265    files = getSplitEditFilesSorted(walFs, wrongRegionWALDir);
266    return !files.isEmpty();
267  }
268
269  /**
270   * This method will check 3 places for finding the max sequence id file. One is the expected
271   * place, another is the old place under the region directory, and the last one is the wrong one
272   * we introduced in HBASE-20734. See HBASE-22617 for more details.
273   * <p/>
274   * Notice that, you should always call this method instead of
275   * {@link #getMaxRegionSequenceId(FileSystem, Path)} until 4.0.0 release.
276   * @deprecated Only for compatibility, will be removed in 4.0.0.
277   */
278  @Deprecated
279  public static long getMaxRegionSequenceId(Configuration conf, RegionInfo region,
280      IOExceptionSupplier<FileSystem> rootFsSupplier, IOExceptionSupplier<FileSystem> walFsSupplier)
281      throws IOException {
282    FileSystem rootFs = rootFsSupplier.get();
283    FileSystem walFs = walFsSupplier.get();
284    Path regionWALDir = FSUtils.getWALRegionDir(conf, region.getTable(), region.getEncodedName());
285    // This is the old place where we store max sequence id file
286    Path regionDir = FSUtils.getRegionDirFromRootDir(FSUtils.getRootDir(conf), region);
287    // This is for HBASE-20734, where we use a wrong directory, see HBASE-22617 for more details.
288    Path wrongRegionWALDir =
289      FSUtils.getWrongWALRegionDir(conf, region.getTable(), region.getEncodedName());
290    long maxSeqId = getMaxRegionSequenceId(walFs, regionWALDir);
291    maxSeqId = Math.max(maxSeqId, getMaxRegionSequenceId(rootFs, regionDir));
292    maxSeqId = Math.max(maxSeqId, getMaxRegionSequenceId(walFs, wrongRegionWALDir));
293    return maxSeqId;
294  }
295
296  /**
297   * Returns sorted set of edit files made by splitter, excluding files with '.temp' suffix.
298   * @param walFS WAL FileSystem used to retrieving split edits files.
299   * @param regionDir WAL region dir to look for recovered edits files under.
300   * @return Files in passed <code>regionDir</code> as a sorted set.
301   * @throws IOException
302   */
303  public static NavigableSet<Path> getSplitEditFilesSorted(final FileSystem walFS,
304      final Path regionDir) throws IOException {
305    NavigableSet<Path> filesSorted = new TreeSet<>();
306    Path editsdir = getRegionDirRecoveredEditsDir(regionDir);
307    if (!walFS.exists(editsdir)) {
308      return filesSorted;
309    }
310    FileStatus[] files = FSUtils.listStatus(walFS, editsdir, new PathFilter() {
311      @Override
312      public boolean accept(Path p) {
313        boolean result = false;
314        try {
315          // Return files and only files that match the editfile names pattern.
316          // There can be other files in this directory other than edit files.
317          // In particular, on error, we'll move aside the bad edit file giving
318          // it a timestamp suffix. See moveAsideBadEditsFile.
319          Matcher m = EDITFILES_NAME_PATTERN.matcher(p.getName());
320          result = walFS.isFile(p) && m.matches();
321          // Skip the file whose name ends with RECOVERED_LOG_TMPFILE_SUFFIX,
322          // because it means splitwal thread is writting this file.
323          if (p.getName().endsWith(RECOVERED_LOG_TMPFILE_SUFFIX)) {
324            result = false;
325          }
326          // Skip SeqId Files
327          if (isSequenceIdFile(p)) {
328            result = false;
329          }
330        } catch (IOException e) {
331          LOG.warn("Failed isFile check on {}", p, e);
332        }
333        return result;
334      }
335    });
336    if (ArrayUtils.isNotEmpty(files)) {
337      Arrays.asList(files).forEach(status -> filesSorted.add(status.getPath()));
338    }
339    return filesSorted;
340  }
341
342  /**
343   * Move aside a bad edits file.
344   * @param walFS WAL FileSystem used to rename bad edits file.
345   * @param edits Edits file to move aside.
346   * @return The name of the moved aside file.
347   * @throws IOException
348   */
349  public static Path moveAsideBadEditsFile(final FileSystem walFS, final Path edits)
350      throws IOException {
351    Path moveAsideName =
352        new Path(edits.getParent(), edits.getName() + "." + System.currentTimeMillis());
353    if (!walFS.rename(edits, moveAsideName)) {
354      LOG.warn("Rename failed from {} to {}", edits, moveAsideName);
355    }
356    return moveAsideName;
357  }
358
359  /**
360   * Is the given file a region open sequence id file.
361   */
362  @VisibleForTesting
363  public static boolean isSequenceIdFile(final Path file) {
364    return file.getName().endsWith(SEQUENCE_ID_FILE_SUFFIX)
365        || file.getName().endsWith(OLD_SEQUENCE_ID_FILE_SUFFIX);
366  }
367
368  private static FileStatus[] getSequenceIdFiles(FileSystem walFS, Path regionDir)
369      throws IOException {
370    // TODO: Why are we using a method in here as part of our normal region open where
371    // there is no splitting involved? Fix. St.Ack 01/20/2017.
372    Path editsDir = getRegionDirRecoveredEditsDir(regionDir);
373    try {
374      FileStatus[] files = walFS.listStatus(editsDir, WALSplitUtil::isSequenceIdFile);
375      return files != null ? files : new FileStatus[0];
376    } catch (FileNotFoundException e) {
377      return new FileStatus[0];
378    }
379  }
380
381  private static long getMaxSequenceId(FileStatus[] files) {
382    long maxSeqId = -1L;
383    for (FileStatus file : files) {
384      String fileName = file.getPath().getName();
385      try {
386        maxSeqId = Math.max(maxSeqId, Long
387            .parseLong(fileName.substring(0, fileName.length() - SEQUENCE_ID_FILE_SUFFIX_LENGTH)));
388      } catch (NumberFormatException ex) {
389        LOG.warn("Invalid SeqId File Name={}", fileName);
390      }
391    }
392    return maxSeqId;
393  }
394
395  /**
396   * Get the max sequence id which is stored in the region directory. -1 if none.
397   */
398  public static long getMaxRegionSequenceId(FileSystem walFS, Path regionDir) throws IOException {
399    return getMaxSequenceId(getSequenceIdFiles(walFS, regionDir));
400  }
401
402  /**
403   * Create a file with name as region's max sequence id
404   */
405  public static void writeRegionSequenceIdFile(FileSystem walFS, Path regionDir, long newMaxSeqId)
406      throws IOException {
407    FileStatus[] files = getSequenceIdFiles(walFS, regionDir);
408    long maxSeqId = getMaxSequenceId(files);
409    if (maxSeqId > newMaxSeqId) {
410      throw new IOException("The new max sequence id " + newMaxSeqId
411          + " is less than the old max sequence id " + maxSeqId);
412    }
413    // write a new seqId file
414    Path newSeqIdFile =
415        new Path(getRegionDirRecoveredEditsDir(regionDir), newMaxSeqId + SEQUENCE_ID_FILE_SUFFIX);
416    if (newMaxSeqId != maxSeqId) {
417      try {
418        if (!walFS.createNewFile(newSeqIdFile) && !walFS.exists(newSeqIdFile)) {
419          throw new IOException("Failed to create SeqId file:" + newSeqIdFile);
420        }
421        LOG.debug("Wrote file={}, newMaxSeqId={}, maxSeqId={}", newSeqIdFile, newMaxSeqId,
422          maxSeqId);
423      } catch (FileAlreadyExistsException ignored) {
424        // latest hdfs throws this exception. it's all right if newSeqIdFile already exists
425      }
426    }
427    // remove old ones
428    for (FileStatus status : files) {
429      if (!newSeqIdFile.equals(status.getPath())) {
430        walFS.delete(status.getPath(), false);
431      }
432    }
433  }
434
435  /** A struct used by getMutationsFromWALEntry */
436  public static class MutationReplay implements Comparable<MutationReplay> {
437    public MutationReplay(ClientProtos.MutationProto.MutationType type, Mutation mutation,
438        long nonceGroup, long nonce) {
439      this.type = type;
440      this.mutation = mutation;
441      if (this.mutation.getDurability() != Durability.SKIP_WAL) {
442        // using ASYNC_WAL for relay
443        this.mutation.setDurability(Durability.ASYNC_WAL);
444      }
445      this.nonceGroup = nonceGroup;
446      this.nonce = nonce;
447    }
448
449    private final ClientProtos.MutationProto.MutationType type;
450    public final Mutation mutation;
451    public final long nonceGroup;
452    public final long nonce;
453
454    @Override
455    public int compareTo(final MutationReplay d) {
456      return this.mutation.compareTo(d.mutation);
457    }
458
459    @Override
460    public boolean equals(Object obj) {
461      if (!(obj instanceof MutationReplay)) {
462        return false;
463      } else {
464        return this.compareTo((MutationReplay) obj) == 0;
465      }
466    }
467
468    @Override
469    public int hashCode() {
470      return this.mutation.hashCode();
471    }
472
473    public ClientProtos.MutationProto.MutationType getType() {
474      return type;
475    }
476  }
477
478  /**
479   * This function is used to construct mutations from a WALEntry. It also reconstructs WALKey &amp;
480   * WALEdit from the passed in WALEntry
481   * @param entry
482   * @param cells
483   * @param logEntry pair of WALKey and WALEdit instance stores WALKey and WALEdit instances
484   *          extracted from the passed in WALEntry.
485   * @return list of Pair&lt;MutationType, Mutation&gt; to be replayed
486   * @throws IOException
487   */
488  public static List<MutationReplay> getMutationsFromWALEntry(AdminProtos.WALEntry entry,
489      CellScanner cells, Pair<WALKey, WALEdit> logEntry, Durability durability) throws IOException {
490    if (entry == null) {
491      // return an empty array
492      return Collections.emptyList();
493    }
494
495    long replaySeqId =
496        (entry.getKey().hasOrigSequenceNumber()) ? entry.getKey().getOrigSequenceNumber()
497            : entry.getKey().getLogSequenceNumber();
498    int count = entry.getAssociatedCellCount();
499    List<MutationReplay> mutations = new ArrayList<>();
500    Cell previousCell = null;
501    Mutation m = null;
502    WALKeyImpl key = null;
503    WALEdit val = null;
504    if (logEntry != null) {
505      val = new WALEdit();
506    }
507
508    for (int i = 0; i < count; i++) {
509      // Throw index out of bounds if our cell count is off
510      if (!cells.advance()) {
511        throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
512      }
513      Cell cell = cells.current();
514      if (val != null) val.add(cell);
515
516      boolean isNewRowOrType =
517          previousCell == null || previousCell.getTypeByte() != cell.getTypeByte()
518              || !CellUtil.matchingRows(previousCell, cell);
519      if (isNewRowOrType) {
520        // Create new mutation
521        if (CellUtil.isDelete(cell)) {
522          m = new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
523          // Deletes don't have nonces.
524          mutations.add(new MutationReplay(ClientProtos.MutationProto.MutationType.DELETE, m,
525              HConstants.NO_NONCE, HConstants.NO_NONCE));
526        } else {
527          m = new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
528          // Puts might come from increment or append, thus we need nonces.
529          long nonceGroup =
530              entry.getKey().hasNonceGroup() ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
531          long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
532          mutations.add(
533            new MutationReplay(ClientProtos.MutationProto.MutationType.PUT, m, nonceGroup, nonce));
534        }
535      }
536      if (CellUtil.isDelete(cell)) {
537        ((Delete) m).add(cell);
538      } else {
539        ((Put) m).add(cell);
540      }
541      m.setDurability(durability);
542      previousCell = cell;
543    }
544
545    // reconstruct WALKey
546    if (logEntry != null) {
547      org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALKey walKeyProto =
548          entry.getKey();
549      List<UUID> clusterIds = new ArrayList<>(walKeyProto.getClusterIdsCount());
550      for (HBaseProtos.UUID uuid : entry.getKey().getClusterIdsList()) {
551        clusterIds.add(new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits()));
552      }
553      key = new WALKeyImpl(walKeyProto.getEncodedRegionName().toByteArray(),
554          TableName.valueOf(walKeyProto.getTableName().toByteArray()), replaySeqId,
555          walKeyProto.getWriteTime(), clusterIds, walKeyProto.getNonceGroup(),
556          walKeyProto.getNonce(), null);
557      logEntry.setFirst(key);
558      logEntry.setSecond(val);
559    }
560
561    return mutations;
562  }
563}