001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import java.io.EOFException;
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.io.InterruptedIOException;
024import java.text.ParseException;
025import java.util.ArrayList;
026import java.util.Arrays;
027import java.util.Collections;
028import java.util.HashMap;
029import java.util.List;
030import java.util.Map;
031import java.util.NavigableSet;
032import java.util.Set;
033import java.util.TreeMap;
034import java.util.TreeSet;
035import java.util.UUID;
036import java.util.concurrent.Callable;
037import java.util.concurrent.CompletionService;
038import java.util.concurrent.ConcurrentHashMap;
039import java.util.concurrent.ExecutionException;
040import java.util.concurrent.ExecutorCompletionService;
041import java.util.concurrent.Future;
042import java.util.concurrent.ThreadFactory;
043import java.util.concurrent.ThreadPoolExecutor;
044import java.util.concurrent.TimeUnit;
045import java.util.concurrent.atomic.AtomicLong;
046import java.util.concurrent.atomic.AtomicReference;
047import java.util.regex.Matcher;
048import java.util.regex.Pattern;
049import org.apache.commons.lang3.ArrayUtils;
050import org.apache.hadoop.conf.Configuration;
051import org.apache.hadoop.fs.FileAlreadyExistsException;
052import org.apache.hadoop.fs.FileStatus;
053import org.apache.hadoop.fs.FileSystem;
054import org.apache.hadoop.fs.Path;
055import org.apache.hadoop.fs.PathFilter;
056import org.apache.hadoop.hbase.Cell;
057import org.apache.hadoop.hbase.CellScanner;
058import org.apache.hadoop.hbase.CellUtil;
059import org.apache.hadoop.hbase.HBaseConfiguration;
060import org.apache.hadoop.hbase.HConstants;
061import org.apache.hadoop.hbase.TableName;
062import org.apache.hadoop.hbase.client.Delete;
063import org.apache.hadoop.hbase.client.Durability;
064import org.apache.hadoop.hbase.client.Mutation;
065import org.apache.hadoop.hbase.client.Put;
066import org.apache.hadoop.hbase.client.RegionInfo;
067import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
068import org.apache.hadoop.hbase.io.HeapSize;
069import org.apache.hadoop.hbase.log.HBaseMarkers;
070import org.apache.hadoop.hbase.master.SplitLogManager;
071import org.apache.hadoop.hbase.monitoring.MonitoredTask;
072import org.apache.hadoop.hbase.monitoring.TaskMonitor;
073import org.apache.hadoop.hbase.regionserver.HRegion;
074import org.apache.hadoop.hbase.regionserver.LastSequenceId;
075import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
076import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
077import org.apache.hadoop.hbase.util.Bytes;
078import org.apache.hadoop.hbase.util.CancelableProgressable;
079import org.apache.hadoop.hbase.util.ClassSize;
080import org.apache.hadoop.hbase.util.FSUtils;
081import org.apache.hadoop.hbase.util.Pair;
082import org.apache.hadoop.hbase.util.Threads;
083import org.apache.hadoop.hbase.wal.WAL.Entry;
084import org.apache.hadoop.hbase.wal.WAL.Reader;
085import org.apache.hadoop.hbase.wal.WALProvider.Writer;
086import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
087import org.apache.hadoop.io.MultipleIOException;
088import org.apache.hadoop.ipc.RemoteException;
089import org.apache.yetus.audience.InterfaceAudience;
090import org.slf4j.Logger;
091import org.slf4j.LoggerFactory;
092
093import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
094import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
095import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
096import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
097import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
098import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils;
099
100import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
101import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
102import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
103import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
104import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
105/**
106 * This class is responsible for splitting up a bunch of regionserver commit log
107 * files that are no longer being written to, into new files, one per region, for
108 * recovering data on startup. Delete the old log files when finished.
109 */
110@InterfaceAudience.Private
111public class WALSplitter {
112  private static final Logger LOG = LoggerFactory.getLogger(WALSplitter.class);
113
114  /** By default we retry errors in splitting, rather than skipping. */
115  public static final boolean SPLIT_SKIP_ERRORS_DEFAULT = false;
116
117  // Parameters for split process
118  protected final Path walDir;
119  protected final FileSystem walFS;
120  protected final Configuration conf;
121
122  // Major subcomponents of the split process.
123  // These are separated into inner classes to make testing easier.
124  OutputSink outputSink;
125  private EntryBuffers entryBuffers;
126
127  private SplitLogWorkerCoordination splitLogWorkerCoordination;
128  private final WALFactory walFactory;
129
130  private MonitoredTask status;
131
132  // For checking the latest flushed sequence id
133  protected final LastSequenceId sequenceIdChecker;
134
135  // Map encodedRegionName -> lastFlushedSequenceId
136  protected Map<String, Long> lastFlushedSequenceIds = new ConcurrentHashMap<>();
137
138  // Map encodedRegionName -> maxSeqIdInStores
139  protected Map<String, Map<byte[], Long>> regionMaxSeqIdInStores = new ConcurrentHashMap<>();
140
141  // the file being split currently
142  private FileStatus fileBeingSplit;
143
144  // if we limit the number of writers opened for sinking recovered edits
145  private final boolean splitWriterCreationBounded;
146
147  public final static String SPLIT_WRITER_CREATION_BOUNDED = "hbase.split.writer.creation.bounded";
148
149
150  @VisibleForTesting
151  WALSplitter(final WALFactory factory, Configuration conf, Path walDir,
152      FileSystem walFS, LastSequenceId idChecker,
153      SplitLogWorkerCoordination splitLogWorkerCoordination) {
154    this.conf = HBaseConfiguration.create(conf);
155    String codecClassName = conf
156        .get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
157    this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName);
158    this.walDir = walDir;
159    this.walFS = walFS;
160    this.sequenceIdChecker = idChecker;
161    this.splitLogWorkerCoordination = splitLogWorkerCoordination;
162
163    this.walFactory = factory;
164    PipelineController controller = new PipelineController();
165
166    this.splitWriterCreationBounded = conf.getBoolean(SPLIT_WRITER_CREATION_BOUNDED, false);
167
168    entryBuffers = new EntryBuffers(controller,
169        this.conf.getLong("hbase.regionserver.hlog.splitlog.buffersize", 128 * 1024 * 1024),
170        splitWriterCreationBounded);
171
172    int numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
173    if(splitWriterCreationBounded){
174      outputSink = new BoundedLogWriterCreationOutputSink(
175          controller, entryBuffers, numWriterThreads);
176    }else {
177      outputSink = new LogRecoveredEditsOutputSink(controller, entryBuffers, numWriterThreads);
178    }
179  }
180
181  /**
182   * Splits a WAL file into region's recovered-edits directory.
183   * This is the main entry point for distributed log splitting from SplitLogWorker.
184   * <p>
185   * If the log file has N regions then N recovered.edits files will be produced.
186   * <p>
187   * @return false if it is interrupted by the progress-able.
188   */
189  public static boolean splitLogFile(Path walDir, FileStatus logfile, FileSystem walFS,
190      Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker,
191      SplitLogWorkerCoordination splitLogWorkerCoordination, final WALFactory factory)
192      throws IOException {
193    WALSplitter s = new WALSplitter(factory, conf, walDir, walFS, idChecker,
194        splitLogWorkerCoordination);
195    return s.splitLogFile(logfile, reporter);
196  }
197
198  // A wrapper to split one log folder using the method used by distributed
199  // log splitting. Used by tools and unit tests. It should be package private.
200  // It is public only because TestWALObserver is in a different package,
201  // which uses this method to do log splitting.
202  @VisibleForTesting
203  public static List<Path> split(Path rootDir, Path logDir, Path oldLogDir,
204      FileSystem walFS, Configuration conf, final WALFactory factory) throws IOException {
205    final FileStatus[] logfiles = SplitLogManager.getFileList(conf,
206        Collections.singletonList(logDir), null);
207    List<Path> splits = new ArrayList<>();
208    if (ArrayUtils.isNotEmpty(logfiles)) {
209      for (FileStatus logfile: logfiles) {
210        WALSplitter s = new WALSplitter(factory, conf, rootDir, walFS, null, null);
211        if (s.splitLogFile(logfile, null)) {
212          finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf);
213          if (s.outputSink.splits != null) {
214            splits.addAll(s.outputSink.splits);
215          }
216        }
217      }
218    }
219    if (!walFS.delete(logDir, true)) {
220      throw new IOException("Unable to delete src dir: " + logDir);
221    }
222    return splits;
223  }
224
225  /**
226   * log splitting implementation, splits one log file.
227   * @param logfile should be an actual log file.
228   */
229  @VisibleForTesting
230  boolean splitLogFile(FileStatus logfile, CancelableProgressable reporter) throws IOException {
231    Preconditions.checkState(status == null);
232    Preconditions.checkArgument(logfile.isFile(),
233        "passed in file status is for something other than a regular file.");
234    boolean isCorrupted = false;
235    boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors",
236      SPLIT_SKIP_ERRORS_DEFAULT);
237    int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024);
238    Path logPath = logfile.getPath();
239    boolean outputSinkStarted = false;
240    boolean progress_failed = false;
241    int editsCount = 0;
242    int editsSkipped = 0;
243
244    status = TaskMonitor.get().createStatus(
245          "Splitting log file " + logfile.getPath() + "into a temporary staging area.");
246    Reader logFileReader = null;
247    this.fileBeingSplit = logfile;
248    try {
249      long logLength = logfile.getLen();
250      LOG.info("Splitting WAL={}, length={}", logPath, logLength);
251      status.setStatus("Opening log file");
252      if (reporter != null && !reporter.progress()) {
253        progress_failed = true;
254        return false;
255      }
256      logFileReader = getReader(logfile, skipErrors, reporter);
257      if (logFileReader == null) {
258        LOG.warn("Nothing to split in WAL={}", logPath);
259        return true;
260      }
261      int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3);
262      int numOpenedFilesLastCheck = 0;
263      outputSink.setReporter(reporter);
264      outputSink.startWriterThreads();
265      outputSinkStarted = true;
266      Entry entry;
267      Long lastFlushedSequenceId = -1L;
268      while ((entry = getNextLogLine(logFileReader, logPath, skipErrors)) != null) {
269        byte[] region = entry.getKey().getEncodedRegionName();
270        String encodedRegionNameAsStr = Bytes.toString(region);
271        lastFlushedSequenceId = lastFlushedSequenceIds.get(encodedRegionNameAsStr);
272        if (lastFlushedSequenceId == null) {
273          if (sequenceIdChecker != null) {
274            RegionStoreSequenceIds ids = sequenceIdChecker.getLastSequenceId(region);
275            Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR);
276            for (StoreSequenceId storeSeqId : ids.getStoreSequenceIdList()) {
277              maxSeqIdInStores.put(storeSeqId.getFamilyName().toByteArray(),
278                storeSeqId.getSequenceId());
279            }
280            regionMaxSeqIdInStores.put(encodedRegionNameAsStr, maxSeqIdInStores);
281            lastFlushedSequenceId = ids.getLastFlushedSequenceId();
282            if (LOG.isDebugEnabled()) {
283              LOG.debug("DLS Last flushed sequenceid for " + encodedRegionNameAsStr + ": " +
284                  TextFormat.shortDebugString(ids));
285            }
286          }
287          if (lastFlushedSequenceId == null) {
288            lastFlushedSequenceId = -1L;
289          }
290          lastFlushedSequenceIds.put(encodedRegionNameAsStr, lastFlushedSequenceId);
291        }
292        if (lastFlushedSequenceId >= entry.getKey().getSequenceId()) {
293          editsSkipped++;
294          continue;
295        }
296        // Don't send Compaction/Close/Open region events to recovered edit type sinks.
297        if (entry.getEdit().isMetaEdit() && !outputSink.keepRegionEvent(entry)) {
298          editsSkipped++;
299          continue;
300        }
301        entryBuffers.appendEntry(entry);
302        editsCount++;
303        int moreWritersFromLastCheck = this.getNumOpenWriters() - numOpenedFilesLastCheck;
304        // If sufficient edits have passed, check if we should report progress.
305        if (editsCount % interval == 0
306            || moreWritersFromLastCheck > numOpenedFilesBeforeReporting) {
307          numOpenedFilesLastCheck = this.getNumOpenWriters();
308          String countsStr = (editsCount - (editsSkipped + outputSink.getSkippedEdits()))
309              + " edits, skipped " + editsSkipped + " edits.";
310          status.setStatus("Split " + countsStr);
311          if (reporter != null && !reporter.progress()) {
312            progress_failed = true;
313            return false;
314          }
315        }
316      }
317    } catch (InterruptedException ie) {
318      IOException iie = new InterruptedIOException();
319      iie.initCause(ie);
320      throw iie;
321    } catch (CorruptedLogFileException e) {
322      LOG.warn("Could not parse, corrupted WAL={}", logPath, e);
323      if (splitLogWorkerCoordination != null) {
324        // Some tests pass in a csm of null.
325        splitLogWorkerCoordination.markCorrupted(walDir, logfile.getPath().getName(), walFS);
326      } else {
327        // for tests only
328        ZKSplitLog.markCorrupted(walDir, logfile.getPath().getName(), walFS);
329      }
330      isCorrupted = true;
331    } catch (IOException e) {
332      e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
333      throw e;
334    } finally {
335      LOG.debug("Finishing writing output logs and closing down");
336      try {
337        if (null != logFileReader) {
338          logFileReader.close();
339        }
340      } catch (IOException exception) {
341        LOG.warn("Could not close WAL reader", exception);
342      }
343      try {
344        if (outputSinkStarted) {
345          // Set progress_failed to true as the immediate following statement will reset its value
346          // when finishWritingAndClose() throws exception, progress_failed has the right value
347          progress_failed = true;
348          progress_failed = outputSink.finishWritingAndClose() == null;
349        }
350      } finally {
351        String msg =
352            "Processed " + editsCount + " edits across " + outputSink.getNumberOfRecoveredRegions()
353                + " regions; edits skipped=" + editsSkipped + "; log file=" + logPath +
354                ", length=" + logfile.getLen() + // See if length got updated post lease recovery
355                ", corrupted=" + isCorrupted + ", progress failed=" + progress_failed;
356        LOG.info(msg);
357        status.markComplete(msg);
358      }
359    }
360    return !progress_failed;
361  }
362
363  /**
364   * Completes the work done by splitLogFile by archiving logs
365   * <p>
366   * It is invoked by SplitLogManager once it knows that one of the
367   * SplitLogWorkers have completed the splitLogFile() part. If the master
368   * crashes then this function might get called multiple times.
369   * <p>
370   * @param logfile
371   * @param conf
372   * @throws IOException
373   */
374  public static void finishSplitLogFile(String logfile,
375      Configuration conf)  throws IOException {
376    Path walDir = FSUtils.getWALRootDir(conf);
377    Path oldLogDir = new Path(walDir, HConstants.HREGION_OLDLOGDIR_NAME);
378    Path logPath;
379    if (FSUtils.isStartingWithPath(walDir, logfile)) {
380      logPath = new Path(logfile);
381    } else {
382      logPath = new Path(walDir, logfile);
383    }
384    finishSplitLogFile(walDir, oldLogDir, logPath, conf);
385  }
386
387  private static void finishSplitLogFile(Path walDir, Path oldLogDir,
388      Path logPath, Configuration conf) throws IOException {
389    List<Path> processedLogs = new ArrayList<>();
390    List<Path> corruptedLogs = new ArrayList<>();
391    FileSystem walFS = walDir.getFileSystem(conf);
392    if (ZKSplitLog.isCorrupted(walDir, logPath.getName(), walFS)) {
393      corruptedLogs.add(logPath);
394    } else {
395      processedLogs.add(logPath);
396    }
397    archiveLogs(corruptedLogs, processedLogs, oldLogDir, walFS, conf);
398    Path stagingDir = ZKSplitLog.getSplitLogDir(walDir, logPath.getName());
399    walFS.delete(stagingDir, true);
400  }
401
402  /**
403   * Moves processed logs to a oldLogDir after successful processing Moves
404   * corrupted logs (any log that couldn't be successfully parsed to corruptDir
405   * (.corrupt) for later investigation
406   *
407   * @param corruptedLogs
408   * @param processedLogs
409   * @param oldLogDir
410   * @param walFS WAL FileSystem to archive files on.
411   * @param conf
412   * @throws IOException
413   */
414  private static void archiveLogs(
415      final List<Path> corruptedLogs,
416      final List<Path> processedLogs, final Path oldLogDir,
417      final FileSystem walFS, final Configuration conf) throws IOException {
418    final Path corruptDir = new Path(FSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME);
419    if (conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir") != null) {
420      LOG.warn("hbase.regionserver.hlog.splitlog.corrupt.dir is deprecated. Default to {}",
421          corruptDir);
422    }
423    if (!walFS.mkdirs(corruptDir)) {
424      LOG.info("Unable to mkdir {}", corruptDir);
425    }
426    walFS.mkdirs(oldLogDir);
427
428    // this method can get restarted or called multiple times for archiving
429    // the same log files.
430    for (Path corrupted : corruptedLogs) {
431      Path p = new Path(corruptDir, corrupted.getName());
432      if (walFS.exists(corrupted)) {
433        if (!walFS.rename(corrupted, p)) {
434          LOG.warn("Unable to move corrupted log {} to {}", corrupted, p);
435        } else {
436          LOG.warn("Moved corrupted log {} to {}", corrupted, p);
437        }
438      }
439    }
440
441    for (Path p : processedLogs) {
442      Path newPath = AbstractFSWAL.getWALArchivePath(oldLogDir, p);
443      if (walFS.exists(p)) {
444        if (!FSUtils.renameAndSetModifyTime(walFS, p, newPath)) {
445          LOG.warn("Unable to move {} to {}", p, newPath);
446        } else {
447          LOG.info("Archived processed log {} to {}", p, newPath);
448        }
449      }
450    }
451  }
452
453  /**
454   * Path to a file under RECOVERED_EDITS_DIR directory of the region found in
455   * <code>logEntry</code> named for the sequenceid in the passed
456   * <code>logEntry</code>: e.g. /hbase/some_table/2323432434/recovered.edits/2332.
457   * This method also ensures existence of RECOVERED_EDITS_DIR under the region
458   * creating it if necessary.
459   * @param logEntry
460   * @param fileNameBeingSplit the file being split currently. Used to generate tmp file name.
461   * @param tmpDirName of the directory used to sideline old recovered edits file
462   * @param conf
463   * @return Path to file into which to dump split log edits.
464   * @throws IOException
465   */
466  @SuppressWarnings("deprecation")
467  @VisibleForTesting
468  static Path getRegionSplitEditsPath(final Entry logEntry, String fileNameBeingSplit,
469      String tmpDirName, Configuration conf) throws IOException {
470    FileSystem walFS = FSUtils.getWALFileSystem(conf);
471    Path tableDir = FSUtils.getWALTableDir(conf, logEntry.getKey().getTableName());
472    String encodedRegionName = Bytes.toString(logEntry.getKey().getEncodedRegionName());
473    Path regionDir = HRegion.getRegionDir(tableDir, encodedRegionName);
474    Path dir = getRegionDirRecoveredEditsDir(regionDir);
475
476
477    if (walFS.exists(dir) && walFS.isFile(dir)) {
478      Path tmp = new Path(tmpDirName);
479      if (!walFS.exists(tmp)) {
480        walFS.mkdirs(tmp);
481      }
482      tmp = new Path(tmp,
483        HConstants.RECOVERED_EDITS_DIR + "_" + encodedRegionName);
484      LOG.warn("Found existing old file: {}. It could be some "
485        + "leftover of an old installation. It should be a folder instead. "
486        + "So moving it to {}", dir, tmp);
487      if (!walFS.rename(dir, tmp)) {
488        LOG.warn("Failed to sideline old file {}", dir);
489      }
490    }
491
492    if (!walFS.exists(dir) && !walFS.mkdirs(dir)) {
493      LOG.warn("mkdir failed on {}", dir);
494    }
495    // Append fileBeingSplit to prevent name conflict since we may have duplicate wal entries now.
496    // Append file name ends with RECOVERED_LOG_TMPFILE_SUFFIX to ensure
497    // region's replayRecoveredEdits will not delete it
498    String fileName = formatRecoveredEditsFileName(logEntry.getKey().getSequenceId());
499    fileName = getTmpRecoveredEditsFileName(fileName + "-" + fileNameBeingSplit);
500    return new Path(dir, fileName);
501  }
502
503  private static String getTmpRecoveredEditsFileName(String fileName) {
504    return fileName + RECOVERED_LOG_TMPFILE_SUFFIX;
505  }
506
507  /**
508   * Get the completed recovered edits file path, renaming it to be by last edit
509   * in the file from its first edit. Then we could use the name to skip
510   * recovered edits when doing {@link HRegion#replayRecoveredEditsIfAny}.
511   * @param srcPath
512   * @param maximumEditLogSeqNum
513   * @return dstPath take file's last edit log seq num as the name
514   */
515  private static Path getCompletedRecoveredEditsFilePath(Path srcPath,
516      long maximumEditLogSeqNum) {
517    String fileName = formatRecoveredEditsFileName(maximumEditLogSeqNum);
518    return new Path(srcPath.getParent(), fileName);
519  }
520
521  @VisibleForTesting
522  static String formatRecoveredEditsFileName(final long seqid) {
523    return String.format("%019d", seqid);
524  }
525
526  private static final Pattern EDITFILES_NAME_PATTERN = Pattern.compile("-?[0-9]+");
527  private static final String RECOVERED_LOG_TMPFILE_SUFFIX = ".temp";
528
529  /**
530   * @param regionDir
531   *          This regions directory in the filesystem.
532   * @return The directory that holds recovered edits files for the region
533   *         <code>regionDir</code>
534   */
535  public static Path getRegionDirRecoveredEditsDir(final Path regionDir) {
536    return new Path(regionDir, HConstants.RECOVERED_EDITS_DIR);
537  }
538
539  /**
540   * Check whether there is recovered.edits in the region dir
541   * @param conf conf
542   * @param regionInfo the region to check
543   * @throws IOException IOException
544   * @return true if recovered.edits exist in the region dir
545   */
546  public static boolean hasRecoveredEdits(final Configuration conf,
547    final RegionInfo regionInfo) throws IOException {
548    // No recovered.edits for non default replica regions
549    if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
550      return false;
551    }
552    //Only default replica region can reach here, so we can use regioninfo
553    //directly without converting it to default replica's regioninfo.
554    Path regionDir = FSUtils.getWALRegionDir(conf, regionInfo.getTable(),
555        regionInfo.getEncodedName());
556    NavigableSet<Path> files = getSplitEditFilesSorted(FSUtils.getWALFileSystem(conf), regionDir);
557    return files != null && !files.isEmpty();
558  }
559
560
561  /**
562   * Returns sorted set of edit files made by splitter, excluding files
563   * with '.temp' suffix.
564   *
565   * @param walFS WAL FileSystem used to retrieving split edits files.
566   * @param regionDir WAL region dir to look for recovered edits files under.
567   * @return Files in passed <code>regionDir</code> as a sorted set.
568   * @throws IOException
569   */
570  public static NavigableSet<Path> getSplitEditFilesSorted(final FileSystem walFS,
571      final Path regionDir) throws IOException {
572    NavigableSet<Path> filesSorted = new TreeSet<>();
573    Path editsdir = getRegionDirRecoveredEditsDir(regionDir);
574    if (!walFS.exists(editsdir)) {
575      return filesSorted;
576    }
577    FileStatus[] files = FSUtils.listStatus(walFS, editsdir, new PathFilter() {
578      @Override
579      public boolean accept(Path p) {
580        boolean result = false;
581        try {
582          // Return files and only files that match the editfile names pattern.
583          // There can be other files in this directory other than edit files.
584          // In particular, on error, we'll move aside the bad edit file giving
585          // it a timestamp suffix. See moveAsideBadEditsFile.
586          Matcher m = EDITFILES_NAME_PATTERN.matcher(p.getName());
587          result = walFS.isFile(p) && m.matches();
588          // Skip the file whose name ends with RECOVERED_LOG_TMPFILE_SUFFIX,
589          // because it means splitwal thread is writting this file.
590          if (p.getName().endsWith(RECOVERED_LOG_TMPFILE_SUFFIX)) {
591            result = false;
592          }
593          // Skip SeqId Files
594          if (isSequenceIdFile(p)) {
595            result = false;
596          }
597        } catch (IOException e) {
598          LOG.warn("Failed isFile check on {}", p, e);
599        }
600        return result;
601      }
602    });
603    if (ArrayUtils.isNotEmpty(files)) {
604      Arrays.asList(files).forEach(status -> filesSorted.add(status.getPath()));
605    }
606    return filesSorted;
607  }
608
609  /**
610   * Move aside a bad edits file.
611   *
612   * @param walFS WAL FileSystem used to rename bad edits file.
613   * @param edits
614   *          Edits file to move aside.
615   * @return The name of the moved aside file.
616   * @throws IOException
617   */
618  public static Path moveAsideBadEditsFile(final FileSystem walFS, final Path edits)
619      throws IOException {
620    Path moveAsideName = new Path(edits.getParent(), edits.getName() + "."
621        + System.currentTimeMillis());
622    if (!walFS.rename(edits, moveAsideName)) {
623      LOG.warn("Rename failed from {} to {}", edits, moveAsideName);
624    }
625    return moveAsideName;
626  }
627
628  private static final String SEQUENCE_ID_FILE_SUFFIX = ".seqid";
629  private static final String OLD_SEQUENCE_ID_FILE_SUFFIX = "_seqid";
630  private static final int SEQUENCE_ID_FILE_SUFFIX_LENGTH = SEQUENCE_ID_FILE_SUFFIX.length();
631
632  /**
633   * Is the given file a region open sequence id file.
634   */
635  @VisibleForTesting
636  public static boolean isSequenceIdFile(final Path file) {
637    return file.getName().endsWith(SEQUENCE_ID_FILE_SUFFIX)
638        || file.getName().endsWith(OLD_SEQUENCE_ID_FILE_SUFFIX);
639  }
640
641  private static FileStatus[] getSequenceIdFiles(FileSystem walFS, Path regionDir)
642      throws IOException {
643    // TODO: Why are we using a method in here as part of our normal region open where
644    // there is no splitting involved? Fix. St.Ack 01/20/2017.
645    Path editsDir = WALSplitter.getRegionDirRecoveredEditsDir(regionDir);
646    try {
647      FileStatus[] files = walFS.listStatus(editsDir, WALSplitter::isSequenceIdFile);
648      return files != null ? files : new FileStatus[0];
649    } catch (FileNotFoundException e) {
650      return new FileStatus[0];
651    }
652  }
653
654  private static long getMaxSequenceId(FileStatus[] files) {
655    long maxSeqId = -1L;
656    for (FileStatus file : files) {
657      String fileName = file.getPath().getName();
658      try {
659        maxSeqId = Math.max(maxSeqId, Long
660          .parseLong(fileName.substring(0, fileName.length() - SEQUENCE_ID_FILE_SUFFIX_LENGTH)));
661      } catch (NumberFormatException ex) {
662        LOG.warn("Invalid SeqId File Name={}", fileName);
663      }
664    }
665    return maxSeqId;
666  }
667
668  /**
669   * Get the max sequence id which is stored in the region directory. -1 if none.
670   */
671  public static long getMaxRegionSequenceId(FileSystem walFS, Path regionDir) throws IOException {
672    return getMaxSequenceId(getSequenceIdFiles(walFS, regionDir));
673  }
674
675  /**
676   * Create a file with name as region's max sequence id
677   */
678  public static void writeRegionSequenceIdFile(FileSystem walFS, Path regionDir, long newMaxSeqId)
679      throws IOException {
680    FileStatus[] files = getSequenceIdFiles(walFS, regionDir);
681    long maxSeqId = getMaxSequenceId(files);
682    if (maxSeqId > newMaxSeqId) {
683      throw new IOException("The new max sequence id " + newMaxSeqId +
684        " is less than the old max sequence id " + maxSeqId);
685    }
686    // write a new seqId file
687    Path newSeqIdFile = new Path(WALSplitter.getRegionDirRecoveredEditsDir(regionDir),
688      newMaxSeqId + SEQUENCE_ID_FILE_SUFFIX);
689    if (newMaxSeqId != maxSeqId) {
690      try {
691        if (!walFS.createNewFile(newSeqIdFile) && !walFS.exists(newSeqIdFile)) {
692          throw new IOException("Failed to create SeqId file:" + newSeqIdFile);
693        }
694        LOG.debug("Wrote file={}, newMaxSeqId={}, maxSeqId={}", newSeqIdFile, newMaxSeqId,
695          maxSeqId);
696      } catch (FileAlreadyExistsException ignored) {
697        // latest hdfs throws this exception. it's all right if newSeqIdFile already exists
698      }
699    }
700    // remove old ones
701    for (FileStatus status : files) {
702      if (!newSeqIdFile.equals(status.getPath())) {
703        walFS.delete(status.getPath(), false);
704      }
705    }
706  }
707
708  /**
709   * Create a new {@link Reader} for reading logs to split.
710   *
711   * @param file
712   * @return A new Reader instance, caller should close
713   * @throws IOException
714   * @throws CorruptedLogFileException
715   */
716  protected Reader getReader(FileStatus file, boolean skipErrors, CancelableProgressable reporter)
717      throws IOException, CorruptedLogFileException {
718    Path path = file.getPath();
719    long length = file.getLen();
720    Reader in;
721
722    // Check for possibly empty file. With appends, currently Hadoop reports a
723    // zero length even if the file has been sync'd. Revisit if HDFS-376 or
724    // HDFS-878 is committed.
725    if (length <= 0) {
726      LOG.warn("File {} might be still open, length is 0", path);
727    }
728
729    try {
730      FSUtils.getInstance(walFS, conf).recoverFileLease(walFS, path, conf, reporter);
731      try {
732        in = getReader(path, reporter);
733      } catch (EOFException e) {
734        if (length <= 0) {
735          // TODO should we ignore an empty, not-last log file if skip.errors
736          // is false? Either way, the caller should decide what to do. E.g.
737          // ignore if this is the last log in sequence.
738          // TODO is this scenario still possible if the log has been
739          // recovered (i.e. closed)
740          LOG.warn("Could not open {} for reading. File is empty", path, e);
741        }
742        // EOFException being ignored
743        return null;
744      }
745    } catch (IOException e) {
746      if (e instanceof FileNotFoundException) {
747        // A wal file may not exist anymore. Nothing can be recovered so move on
748        LOG.warn("File {} does not exist anymore", path, e);
749        return null;
750      }
751      if (!skipErrors || e instanceof InterruptedIOException) {
752        throw e; // Don't mark the file corrupted if interrupted, or not skipErrors
753      }
754      CorruptedLogFileException t =
755        new CorruptedLogFileException("skipErrors=true Could not open wal " +
756            path + " ignoring");
757      t.initCause(e);
758      throw t;
759    }
760    return in;
761  }
762
763  static private Entry getNextLogLine(Reader in, Path path, boolean skipErrors)
764  throws CorruptedLogFileException, IOException {
765    try {
766      return in.next();
767    } catch (EOFException eof) {
768      // truncated files are expected if a RS crashes (see HBASE-2643)
769      LOG.info("EOF from wal {}. Continuing.", path);
770      return null;
771    } catch (IOException e) {
772      // If the IOE resulted from bad file format,
773      // then this problem is idempotent and retrying won't help
774      if (e.getCause() != null &&
775          (e.getCause() instanceof ParseException ||
776           e.getCause() instanceof org.apache.hadoop.fs.ChecksumException)) {
777        LOG.warn("Parse exception from wal {}. Continuing", path, e);
778        return null;
779      }
780      if (!skipErrors) {
781        throw e;
782      }
783      CorruptedLogFileException t =
784        new CorruptedLogFileException("skipErrors=true Ignoring exception" +
785            " while parsing wal " + path + ". Marking as corrupted");
786      t.initCause(e);
787      throw t;
788    }
789  }
790
791  /**
792   * Create a new {@link Writer} for writing log splits.
793   * @return a new Writer instance, caller should close
794   */
795  protected Writer createWriter(Path logfile)
796      throws IOException {
797    return walFactory.createRecoveredEditsWriter(walFS, logfile);
798  }
799
800  /**
801   * Create a new {@link Reader} for reading logs to split.
802   * @return new Reader instance, caller should close
803   */
804  protected Reader getReader(Path curLogFile, CancelableProgressable reporter) throws IOException {
805    return walFactory.createReader(walFS, curLogFile, reporter);
806  }
807
808  /**
809   * Get current open writers
810   */
811  private int getNumOpenWriters() {
812    int result = 0;
813    if (this.outputSink != null) {
814      result += this.outputSink.getNumOpenWriters();
815    }
816    return result;
817  }
818
819  /**
820   * Contains some methods to control WAL-entries producer / consumer interactions
821   */
822  public static class PipelineController {
823    // If an exception is thrown by one of the other threads, it will be
824    // stored here.
825    AtomicReference<Throwable> thrown = new AtomicReference<>();
826
827    // Wait/notify for when data has been produced by the writer thread,
828    // consumed by the reader thread, or an exception occurred
829    public final Object dataAvailable = new Object();
830
831    void writerThreadError(Throwable t) {
832      thrown.compareAndSet(null, t);
833    }
834
835    /**
836     * Check for errors in the writer threads. If any is found, rethrow it.
837     */
838    void checkForErrors() throws IOException {
839      Throwable thrown = this.thrown.get();
840      if (thrown == null) return;
841      if (thrown instanceof IOException) {
842        throw new IOException(thrown);
843      } else {
844        throw new RuntimeException(thrown);
845      }
846    }
847  }
848
849  /**
850   * Class which accumulates edits and separates them into a buffer per region
851   * while simultaneously accounting RAM usage. Blocks if the RAM usage crosses
852   * a predefined threshold.
853   *
854   * Writer threads then pull region-specific buffers from this class.
855   */
856  public static class EntryBuffers {
857    PipelineController controller;
858
859    Map<byte[], RegionEntryBuffer> buffers = new TreeMap<>(Bytes.BYTES_COMPARATOR);
860
861    /* Track which regions are currently in the middle of writing. We don't allow
862       an IO thread to pick up bytes from a region if we're already writing
863       data for that region in a different IO thread. */
864    Set<byte[]> currentlyWriting = new TreeSet<>(Bytes.BYTES_COMPARATOR);
865
866    long totalBuffered = 0;
867    long maxHeapUsage;
868    boolean splitWriterCreationBounded;
869
870    public EntryBuffers(PipelineController controller, long maxHeapUsage) {
871      this(controller, maxHeapUsage, false);
872    }
873
874    public EntryBuffers(PipelineController controller, long maxHeapUsage,
875        boolean splitWriterCreationBounded){
876      this.controller = controller;
877      this.maxHeapUsage = maxHeapUsage;
878      this.splitWriterCreationBounded = splitWriterCreationBounded;
879    }
880
881    /**
882     * Append a log entry into the corresponding region buffer.
883     * Blocks if the total heap usage has crossed the specified threshold.
884     *
885     * @throws InterruptedException
886     * @throws IOException
887     */
888    public void appendEntry(Entry entry) throws InterruptedException, IOException {
889      WALKey key = entry.getKey();
890
891      RegionEntryBuffer buffer;
892      long incrHeap;
893      synchronized (this) {
894        buffer = buffers.get(key.getEncodedRegionName());
895        if (buffer == null) {
896          buffer = new RegionEntryBuffer(key.getTableName(), key.getEncodedRegionName());
897          buffers.put(key.getEncodedRegionName(), buffer);
898        }
899        incrHeap= buffer.appendEntry(entry);
900      }
901
902      // If we crossed the chunk threshold, wait for more space to be available
903      synchronized (controller.dataAvailable) {
904        totalBuffered += incrHeap;
905        while (totalBuffered > maxHeapUsage && controller.thrown.get() == null) {
906          LOG.debug("Used {} bytes of buffered edits, waiting for IO threads", totalBuffered);
907          controller.dataAvailable.wait(2000);
908        }
909        controller.dataAvailable.notifyAll();
910      }
911      controller.checkForErrors();
912    }
913
914    /**
915     * @return RegionEntryBuffer a buffer of edits to be written.
916     */
917    synchronized RegionEntryBuffer getChunkToWrite() {
918      // The core part of limiting opening writers is it doesn't return chunk only if the
919      // heap size is over maxHeapUsage. Thus it doesn't need to create a writer for each
920      // region during splitting. It will flush all the logs in the buffer after splitting
921      // through a threadpool, which means the number of writers it created is under control.
922      if (splitWriterCreationBounded && totalBuffered < maxHeapUsage) {
923        return null;
924      }
925      long biggestSize = 0;
926      byte[] biggestBufferKey = null;
927
928      for (Map.Entry<byte[], RegionEntryBuffer> entry : buffers.entrySet()) {
929        long size = entry.getValue().heapSize();
930        if (size > biggestSize && (!currentlyWriting.contains(entry.getKey()))) {
931          biggestSize = size;
932          biggestBufferKey = entry.getKey();
933        }
934      }
935      if (biggestBufferKey == null) {
936        return null;
937      }
938
939      RegionEntryBuffer buffer = buffers.remove(biggestBufferKey);
940      currentlyWriting.add(biggestBufferKey);
941      return buffer;
942    }
943
944    void doneWriting(RegionEntryBuffer buffer) {
945      synchronized (this) {
946        boolean removed = currentlyWriting.remove(buffer.encodedRegionName);
947        assert removed;
948      }
949      long size = buffer.heapSize();
950
951      synchronized (controller.dataAvailable) {
952        totalBuffered -= size;
953        // We may unblock writers
954        controller.dataAvailable.notifyAll();
955      }
956    }
957
958    synchronized boolean isRegionCurrentlyWriting(byte[] region) {
959      return currentlyWriting.contains(region);
960    }
961
962    public void waitUntilDrained() {
963      synchronized (controller.dataAvailable) {
964        while (totalBuffered > 0) {
965          try {
966            controller.dataAvailable.wait(2000);
967          } catch (InterruptedException e) {
968            LOG.warn("Got interrupted while waiting for EntryBuffers is drained");
969            Thread.interrupted();
970            break;
971          }
972        }
973      }
974    }
975  }
976
977  /**
978   * A buffer of some number of edits for a given region.
979   * This accumulates edits and also provides a memory optimization in order to
980   * share a single byte array instance for the table and region name.
981   * Also tracks memory usage of the accumulated edits.
982   */
983  public static class RegionEntryBuffer implements HeapSize {
984    long heapInBuffer = 0;
985    List<Entry> entryBuffer;
986    TableName tableName;
987    byte[] encodedRegionName;
988
989    RegionEntryBuffer(TableName tableName, byte[] region) {
990      this.tableName = tableName;
991      this.encodedRegionName = region;
992      this.entryBuffer = new ArrayList<>();
993    }
994
995    long appendEntry(Entry entry) {
996      internify(entry);
997      entryBuffer.add(entry);
998      long incrHeap = entry.getEdit().heapSize() +
999        ClassSize.align(2 * ClassSize.REFERENCE) + // WALKey pointers
1000        0; // TODO linkedlist entry
1001      heapInBuffer += incrHeap;
1002      return incrHeap;
1003    }
1004
1005    private void internify(Entry entry) {
1006      WALKeyImpl k = entry.getKey();
1007      k.internTableName(this.tableName);
1008      k.internEncodedRegionName(this.encodedRegionName);
1009    }
1010
1011    @Override
1012    public long heapSize() {
1013      return heapInBuffer;
1014    }
1015
1016    public byte[] getEncodedRegionName() {
1017      return encodedRegionName;
1018    }
1019
1020    public List<Entry> getEntryBuffer() {
1021      return entryBuffer;
1022    }
1023
1024    public TableName getTableName() {
1025      return tableName;
1026    }
1027  }
1028
1029  public static class WriterThread extends Thread {
1030    private volatile boolean shouldStop = false;
1031    private PipelineController controller;
1032    private EntryBuffers entryBuffers;
1033    private OutputSink outputSink = null;
1034
1035    WriterThread(PipelineController controller, EntryBuffers entryBuffers, OutputSink sink, int i){
1036      super(Thread.currentThread().getName() + "-Writer-" + i);
1037      this.controller = controller;
1038      this.entryBuffers = entryBuffers;
1039      outputSink = sink;
1040    }
1041
1042    @Override
1043    public void run()  {
1044      try {
1045        doRun();
1046      } catch (Throwable t) {
1047        LOG.error("Exiting thread", t);
1048        controller.writerThreadError(t);
1049      }
1050    }
1051
1052    private void doRun() throws IOException {
1053      LOG.trace("Writer thread starting");
1054      while (true) {
1055        RegionEntryBuffer buffer = entryBuffers.getChunkToWrite();
1056        if (buffer == null) {
1057          // No data currently available, wait on some more to show up
1058          synchronized (controller.dataAvailable) {
1059            if (shouldStop && !this.outputSink.flush()) {
1060              return;
1061            }
1062            try {
1063              controller.dataAvailable.wait(500);
1064            } catch (InterruptedException ie) {
1065              if (!shouldStop) {
1066                throw new RuntimeException(ie);
1067              }
1068            }
1069          }
1070          continue;
1071        }
1072
1073        assert buffer != null;
1074        try {
1075          writeBuffer(buffer);
1076        } finally {
1077          entryBuffers.doneWriting(buffer);
1078        }
1079      }
1080    }
1081
1082    private void writeBuffer(RegionEntryBuffer buffer) throws IOException {
1083      outputSink.append(buffer);
1084    }
1085
1086    void finish() {
1087      synchronized (controller.dataAvailable) {
1088        shouldStop = true;
1089        controller.dataAvailable.notifyAll();
1090      }
1091    }
1092  }
1093
1094  /**
1095   * The following class is an abstraction class to provide a common interface to support
1096   * different ways of consuming recovered edits.
1097   */
1098  public static abstract class OutputSink {
1099
1100    protected PipelineController controller;
1101    protected EntryBuffers entryBuffers;
1102
1103    protected ConcurrentHashMap<String, SinkWriter> writers = new ConcurrentHashMap<>();
1104    protected final ConcurrentHashMap<String, Long> regionMaximumEditLogSeqNum =
1105        new ConcurrentHashMap<>();
1106
1107
1108    protected final List<WriterThread> writerThreads = Lists.newArrayList();
1109
1110    /* Set of regions which we've decided should not output edits */
1111    protected final Set<byte[]> blacklistedRegions = Collections
1112        .synchronizedSet(new TreeSet<>(Bytes.BYTES_COMPARATOR));
1113
1114    protected boolean closeAndCleanCompleted = false;
1115
1116    protected boolean writersClosed = false;
1117
1118    protected final int numThreads;
1119
1120    protected CancelableProgressable reporter = null;
1121
1122    protected AtomicLong skippedEdits = new AtomicLong();
1123
1124    protected List<Path> splits = null;
1125
1126    public OutputSink(PipelineController controller, EntryBuffers entryBuffers, int numWriters) {
1127      numThreads = numWriters;
1128      this.controller = controller;
1129      this.entryBuffers = entryBuffers;
1130    }
1131
1132    void setReporter(CancelableProgressable reporter) {
1133      this.reporter = reporter;
1134    }
1135
1136    /**
1137     * Start the threads that will pump data from the entryBuffers to the output files.
1138     */
1139    public synchronized void startWriterThreads() {
1140      for (int i = 0; i < numThreads; i++) {
1141        WriterThread t = new WriterThread(controller, entryBuffers, this, i);
1142        t.start();
1143        writerThreads.add(t);
1144      }
1145    }
1146
1147    /**
1148     *
1149     * Update region's maximum edit log SeqNum.
1150     */
1151    void updateRegionMaximumEditLogSeqNum(Entry entry) {
1152      synchronized (regionMaximumEditLogSeqNum) {
1153        String regionName = Bytes.toString(entry.getKey().getEncodedRegionName());
1154        Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(regionName);
1155        if (currentMaxSeqNum == null || entry.getKey().getSequenceId() > currentMaxSeqNum) {
1156          regionMaximumEditLogSeqNum.put(regionName, entry.getKey().getSequenceId());
1157        }
1158      }
1159    }
1160
1161    /**
1162     * @return the number of currently opened writers
1163     */
1164    int getNumOpenWriters() {
1165      return this.writers.size();
1166    }
1167
1168    long getSkippedEdits() {
1169      return this.skippedEdits.get();
1170    }
1171
1172    /**
1173     * Wait for writer threads to dump all info to the sink
1174     * @return true when there is no error
1175     * @throws IOException
1176     */
1177    protected boolean finishWriting(boolean interrupt) throws IOException {
1178      LOG.debug("Waiting for split writer threads to finish");
1179      boolean progress_failed = false;
1180      for (WriterThread t : writerThreads) {
1181        t.finish();
1182      }
1183      if (interrupt) {
1184        for (WriterThread t : writerThreads) {
1185          t.interrupt(); // interrupt the writer threads. We are stopping now.
1186        }
1187      }
1188
1189      for (WriterThread t : writerThreads) {
1190        if (!progress_failed && reporter != null && !reporter.progress()) {
1191          progress_failed = true;
1192        }
1193        try {
1194          t.join();
1195        } catch (InterruptedException ie) {
1196          IOException iie = new InterruptedIOException();
1197          iie.initCause(ie);
1198          throw iie;
1199        }
1200      }
1201      controller.checkForErrors();
1202      LOG.info("{} split writers finished; closing.", this.writerThreads.size());
1203      return (!progress_failed);
1204    }
1205
1206    public abstract List<Path> finishWritingAndClose() throws IOException;
1207
1208    /**
1209     * @return a map from encoded region ID to the number of edits written out for that region.
1210     */
1211    public abstract Map<byte[], Long> getOutputCounts();
1212
1213    /**
1214     * @return number of regions we've recovered
1215     */
1216    public abstract int getNumberOfRecoveredRegions();
1217
1218    /**
1219     * @param buffer A WAL Edit Entry
1220     * @throws IOException
1221     */
1222    public abstract void append(RegionEntryBuffer buffer) throws IOException;
1223
1224    /**
1225     * WriterThread call this function to help flush internal remaining edits in buffer before close
1226     * @return true when underlying sink has something to flush
1227     */
1228    public boolean flush() throws IOException {
1229      return false;
1230    }
1231
1232    /**
1233     * Some WALEdit's contain only KV's for account on what happened to a region.
1234     * Not all sinks will want to get all of those edits.
1235     *
1236     * @return Return true if this sink wants to accept this region-level WALEdit.
1237     */
1238    public abstract boolean keepRegionEvent(Entry entry);
1239  }
1240
1241  /**
1242   * Class that manages the output streams from the log splitting process.
1243   */
1244  class LogRecoveredEditsOutputSink extends OutputSink {
1245
1246    public LogRecoveredEditsOutputSink(PipelineController controller, EntryBuffers entryBuffers,
1247        int numWriters) {
1248      // More threads could potentially write faster at the expense
1249      // of causing more disk seeks as the logs are split.
1250      // 3. After a certain setting (probably around 3) the
1251      // process will be bound on the reader in the current
1252      // implementation anyway.
1253      super(controller, entryBuffers, numWriters);
1254    }
1255
1256    /**
1257     * @return null if failed to report progress
1258     * @throws IOException
1259     */
1260    @Override
1261    public List<Path> finishWritingAndClose() throws IOException {
1262      boolean isSuccessful = false;
1263      List<Path> result = null;
1264      try {
1265        isSuccessful = finishWriting(false);
1266      } finally {
1267        result = close();
1268        List<IOException> thrown = closeLogWriters(null);
1269        if (CollectionUtils.isNotEmpty(thrown)) {
1270          throw MultipleIOException.createIOException(thrown);
1271        }
1272      }
1273      if (isSuccessful) {
1274        splits = result;
1275      }
1276      return splits;
1277    }
1278
1279    // delete the one with fewer wal entries
1280    private void deleteOneWithFewerEntries(WriterAndPath wap, Path dst)
1281        throws IOException {
1282      long dstMinLogSeqNum = -1L;
1283      try (WAL.Reader reader = walFactory.createReader(walFS, dst)) {
1284        WAL.Entry entry = reader.next();
1285        if (entry != null) {
1286          dstMinLogSeqNum = entry.getKey().getSequenceId();
1287        }
1288      } catch (EOFException e) {
1289        LOG.debug("Got EOF when reading first WAL entry from {}, an empty or broken WAL file?",
1290            dst, e);
1291      }
1292      if (wap.minLogSeqNum < dstMinLogSeqNum) {
1293        LOG.warn("Found existing old edits file. It could be the result of a previous failed"
1294            + " split attempt or we have duplicated wal entries. Deleting " + dst + ", length="
1295            + walFS.getFileStatus(dst).getLen());
1296        if (!walFS.delete(dst, false)) {
1297          LOG.warn("Failed deleting of old {}", dst);
1298          throw new IOException("Failed deleting of old " + dst);
1299        }
1300      } else {
1301        LOG.warn("Found existing old edits file and we have less entries. Deleting " + wap.p
1302            + ", length=" + walFS.getFileStatus(wap.p).getLen());
1303        if (!walFS.delete(wap.p, false)) {
1304          LOG.warn("Failed deleting of {}", wap.p);
1305          throw new IOException("Failed deleting of " + wap.p);
1306        }
1307      }
1308    }
1309
1310    /**
1311     * Close all of the output streams.
1312     * @return the list of paths written.
1313     */
1314    List<Path> close() throws IOException {
1315      Preconditions.checkState(!closeAndCleanCompleted);
1316
1317      final List<Path> paths = new ArrayList<>();
1318      final List<IOException> thrown = Lists.newArrayList();
1319      ThreadPoolExecutor closeThreadPool = Threads
1320          .getBoundedCachedThreadPool(numThreads, 30L, TimeUnit.SECONDS, new ThreadFactory() {
1321            private int count = 1;
1322
1323            @Override public Thread newThread(Runnable r) {
1324              Thread t = new Thread(r, "split-log-closeStream-" + count++);
1325              return t;
1326            }
1327          });
1328      CompletionService<Void> completionService = new ExecutorCompletionService<>(closeThreadPool);
1329      boolean progress_failed;
1330      try {
1331        progress_failed = executeCloseTask(completionService, thrown, paths);
1332      } catch (InterruptedException e) {
1333        IOException iie = new InterruptedIOException();
1334        iie.initCause(e);
1335        throw iie;
1336      } catch (ExecutionException e) {
1337        throw new IOException(e.getCause());
1338      } finally {
1339        closeThreadPool.shutdownNow();
1340      }
1341      if (!thrown.isEmpty()) {
1342        throw MultipleIOException.createIOException(thrown);
1343      }
1344      writersClosed = true;
1345      closeAndCleanCompleted = true;
1346      if (progress_failed) {
1347        return null;
1348      }
1349      return paths;
1350    }
1351
1352    /**
1353     * @param completionService threadPool to execute the closing tasks
1354     * @param thrown store the exceptions
1355     * @param paths arrayList to store the paths written
1356     * @return if close tasks executed successful
1357     */
1358    boolean executeCloseTask(CompletionService<Void> completionService,
1359        List<IOException> thrown, List<Path> paths)
1360        throws InterruptedException, ExecutionException {
1361      for (final Map.Entry<String, SinkWriter> writersEntry : writers.entrySet()) {
1362        if (LOG.isTraceEnabled()) {
1363          LOG.trace("Submitting close of " + ((WriterAndPath) writersEntry.getValue()).p);
1364        }
1365        completionService.submit(new Callable<Void>() {
1366          @Override public Void call() throws Exception {
1367            WriterAndPath wap = (WriterAndPath) writersEntry.getValue();
1368            Path dst = closeWriter(writersEntry.getKey(), wap, thrown);
1369            paths.add(dst);
1370            return null;
1371          }
1372        });
1373      }
1374      boolean progress_failed = false;
1375      for (int i = 0, n = this.writers.size(); i < n; i++) {
1376        Future<Void> future = completionService.take();
1377        future.get();
1378        if (!progress_failed && reporter != null && !reporter.progress()) {
1379          progress_failed = true;
1380        }
1381      }
1382      return progress_failed;
1383    }
1384
1385    Path closeWriter(String encodedRegionName, WriterAndPath wap,
1386        List<IOException> thrown) throws IOException{
1387      LOG.trace("Closing " + wap.p);
1388      try {
1389        wap.w.close();
1390      } catch (IOException ioe) {
1391        LOG.error("Couldn't close log at " + wap.p, ioe);
1392        thrown.add(ioe);
1393        return null;
1394      }
1395      if (LOG.isDebugEnabled()) {
1396        LOG.debug("Closed wap " + wap.p + " (wrote " + wap.editsWritten
1397            + " edits, skipped " + wap.editsSkipped + " edits in "
1398            + (wap.nanosSpent / 1000 / 1000) + "ms");
1399      }
1400      if (wap.editsWritten == 0) {
1401        // just remove the empty recovered.edits file
1402        if (walFS.exists(wap.p) && !walFS.delete(wap.p, false)) {
1403          LOG.warn("Failed deleting empty " + wap.p);
1404          throw new IOException("Failed deleting empty  " + wap.p);
1405        }
1406        return null;
1407      }
1408
1409      Path dst = getCompletedRecoveredEditsFilePath(wap.p,
1410          regionMaximumEditLogSeqNum.get(encodedRegionName));
1411      try {
1412        if (!dst.equals(wap.p) && walFS.exists(dst)) {
1413          deleteOneWithFewerEntries(wap, dst);
1414        }
1415        // Skip the unit tests which create a splitter that reads and
1416        // writes the data without touching disk.
1417        // TestHLogSplit#testThreading is an example.
1418        if (walFS.exists(wap.p)) {
1419          if (!walFS.rename(wap.p, dst)) {
1420            throw new IOException("Failed renaming " + wap.p + " to " + dst);
1421          }
1422          LOG.info("Rename " + wap.p + " to " + dst);
1423        }
1424      } catch (IOException ioe) {
1425        LOG.error("Couldn't rename " + wap.p + " to " + dst, ioe);
1426        thrown.add(ioe);
1427        return null;
1428      }
1429      return dst;
1430    }
1431
1432    private List<IOException> closeLogWriters(List<IOException> thrown) throws IOException {
1433      if (writersClosed) {
1434        return thrown;
1435      }
1436      if (thrown == null) {
1437        thrown = Lists.newArrayList();
1438      }
1439      try {
1440        for (WriterThread t : writerThreads) {
1441          while (t.isAlive()) {
1442            t.shouldStop = true;
1443            t.interrupt();
1444            try {
1445              t.join(10);
1446            } catch (InterruptedException e) {
1447              IOException iie = new InterruptedIOException();
1448              iie.initCause(e);
1449              throw iie;
1450            }
1451          }
1452        }
1453      } finally {
1454        WriterAndPath wap = null;
1455        for (SinkWriter tmpWAP : writers.values()) {
1456          try {
1457            wap = (WriterAndPath) tmpWAP;
1458            wap.w.close();
1459          } catch (IOException ioe) {
1460            LOG.error("Couldn't close log at " + wap.p, ioe);
1461            thrown.add(ioe);
1462            continue;
1463          }
1464          LOG.info(
1465              "Closed log " + wap.p + " (wrote " + wap.editsWritten + " edits in " + (wap.nanosSpent
1466                  / 1000 / 1000) + "ms)");
1467        }
1468        writersClosed = true;
1469      }
1470
1471      return thrown;
1472    }
1473
1474    /**
1475     * Get a writer and path for a log starting at the given entry. This function is threadsafe so
1476     * long as multiple threads are always acting on different regions.
1477     * @return null if this region shouldn't output any logs
1478     */
1479    WriterAndPath getWriterAndPath(Entry entry, boolean reusable) throws IOException {
1480      byte region[] = entry.getKey().getEncodedRegionName();
1481      String regionName = Bytes.toString(region);
1482      WriterAndPath ret = (WriterAndPath) writers.get(regionName);
1483      if (ret != null) {
1484        return ret;
1485      }
1486      // If we already decided that this region doesn't get any output
1487      // we don't need to check again.
1488      if (blacklistedRegions.contains(region)) {
1489        return null;
1490      }
1491      ret = createWAP(region, entry);
1492      if (ret == null) {
1493        blacklistedRegions.add(region);
1494        return null;
1495      }
1496      if(reusable) {
1497        writers.put(regionName, ret);
1498      }
1499      return ret;
1500    }
1501
1502    /**
1503     * @return a path with a write for that path. caller should close.
1504     */
1505    WriterAndPath createWAP(byte[] region, Entry entry) throws IOException {
1506      String tmpDirName = conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
1507        HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
1508      Path regionedits = getRegionSplitEditsPath(entry,
1509          fileBeingSplit.getPath().getName(), tmpDirName, conf);
1510      if (regionedits == null) {
1511        return null;
1512      }
1513      FileSystem walFs = FSUtils.getWALFileSystem(conf);
1514      if (walFs.exists(regionedits)) {
1515        LOG.warn("Found old edits file. It could be the "
1516            + "result of a previous failed split attempt. Deleting " + regionedits + ", length="
1517            + walFs.getFileStatus(regionedits).getLen());
1518        if (!walFs.delete(regionedits, false)) {
1519          LOG.warn("Failed delete of old {}", regionedits);
1520        }
1521      }
1522      Writer w = createWriter(regionedits);
1523      LOG.debug("Creating writer path={}", regionedits);
1524      return new WriterAndPath(regionedits, w, entry.getKey().getSequenceId());
1525    }
1526
1527    void filterCellByStore(Entry logEntry) {
1528      Map<byte[], Long> maxSeqIdInStores =
1529          regionMaxSeqIdInStores.get(Bytes.toString(logEntry.getKey().getEncodedRegionName()));
1530      if (MapUtils.isEmpty(maxSeqIdInStores)) {
1531        return;
1532      }
1533      // Create the array list for the cells that aren't filtered.
1534      // We make the assumption that most cells will be kept.
1535      ArrayList<Cell> keptCells = new ArrayList<>(logEntry.getEdit().getCells().size());
1536      for (Cell cell : logEntry.getEdit().getCells()) {
1537        if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
1538          keptCells.add(cell);
1539        } else {
1540          byte[] family = CellUtil.cloneFamily(cell);
1541          Long maxSeqId = maxSeqIdInStores.get(family);
1542          // Do not skip cell even if maxSeqId is null. Maybe we are in a rolling upgrade,
1543          // or the master was crashed before and we can not get the information.
1544          if (maxSeqId == null || maxSeqId.longValue() < logEntry.getKey().getSequenceId()) {
1545            keptCells.add(cell);
1546          }
1547        }
1548      }
1549
1550      // Anything in the keptCells array list is still live.
1551      // So rather than removing the cells from the array list
1552      // which would be an O(n^2) operation, we just replace the list
1553      logEntry.getEdit().setCells(keptCells);
1554    }
1555
1556    @Override
1557    public void append(RegionEntryBuffer buffer) throws IOException {
1558      appendBuffer(buffer, true);
1559    }
1560
1561    WriterAndPath appendBuffer(RegionEntryBuffer buffer, boolean reusable) throws IOException{
1562      List<Entry> entries = buffer.entryBuffer;
1563      if (entries.isEmpty()) {
1564        LOG.warn("got an empty buffer, skipping");
1565        return null;
1566      }
1567
1568      WriterAndPath wap = null;
1569
1570      long startTime = System.nanoTime();
1571      try {
1572        int editsCount = 0;
1573
1574        for (Entry logEntry : entries) {
1575          if (wap == null) {
1576            wap = getWriterAndPath(logEntry, reusable);
1577            if (wap == null) {
1578              if (LOG.isTraceEnabled()) {
1579                // This log spews the full edit. Can be massive in the log. Enable only debugging
1580                // WAL lost edit issues.
1581                LOG.trace("getWriterAndPath decided we don't need to write edits for {}", logEntry);
1582              }
1583              return null;
1584            }
1585          }
1586          filterCellByStore(logEntry);
1587          if (!logEntry.getEdit().isEmpty()) {
1588            wap.w.append(logEntry);
1589            this.updateRegionMaximumEditLogSeqNum(logEntry);
1590            editsCount++;
1591          } else {
1592            wap.incrementSkippedEdits(1);
1593          }
1594        }
1595        // Pass along summary statistics
1596        wap.incrementEdits(editsCount);
1597        wap.incrementNanoTime(System.nanoTime() - startTime);
1598      } catch (IOException e) {
1599          e = e instanceof RemoteException ?
1600                  ((RemoteException)e).unwrapRemoteException() : e;
1601        LOG.error(HBaseMarkers.FATAL, "Got while writing log entry to log", e);
1602        throw e;
1603      }
1604      return wap;
1605    }
1606
1607    @Override
1608    public boolean keepRegionEvent(Entry entry) {
1609      ArrayList<Cell> cells = entry.getEdit().getCells();
1610      for (Cell cell : cells) {
1611        if (WALEdit.isCompactionMarker(cell)) {
1612          return true;
1613        }
1614      }
1615      return false;
1616    }
1617
1618    /**
1619     * @return a map from encoded region ID to the number of edits written out for that region.
1620     */
1621    @Override
1622    public Map<byte[], Long> getOutputCounts() {
1623      TreeMap<byte[], Long> ret = new TreeMap<>(Bytes.BYTES_COMPARATOR);
1624      for (Map.Entry<String, SinkWriter> entry : writers.entrySet()) {
1625        ret.put(Bytes.toBytes(entry.getKey()), entry.getValue().editsWritten);
1626      }
1627      return ret;
1628    }
1629
1630    @Override
1631    public int getNumberOfRecoveredRegions() {
1632      return writers.size();
1633    }
1634  }
1635
1636  /**
1637   *
1638   */
1639  class BoundedLogWriterCreationOutputSink extends LogRecoveredEditsOutputSink {
1640
1641    private ConcurrentHashMap<String, Long> regionRecoverStatMap = new ConcurrentHashMap<>();
1642
1643    public BoundedLogWriterCreationOutputSink(PipelineController controller,
1644        EntryBuffers entryBuffers, int numWriters) {
1645      super(controller, entryBuffers, numWriters);
1646    }
1647
1648    @Override
1649    public List<Path> finishWritingAndClose() throws IOException {
1650      boolean isSuccessful;
1651      List<Path> result;
1652      try {
1653        isSuccessful = finishWriting(false);
1654      } finally {
1655        result = close();
1656      }
1657      if (isSuccessful) {
1658        splits = result;
1659      }
1660      return splits;
1661    }
1662
1663    @Override
1664    boolean executeCloseTask(CompletionService<Void> completionService,
1665        List<IOException> thrown, List<Path> paths)
1666        throws InterruptedException, ExecutionException {
1667      for (final Map.Entry<byte[], RegionEntryBuffer> buffer : entryBuffers.buffers.entrySet()) {
1668        LOG.info("Submitting writeThenClose of {}",
1669            Arrays.toString(buffer.getValue().encodedRegionName));
1670        completionService.submit(new Callable<Void>() {
1671          @Override
1672          public Void call() throws Exception {
1673            Path dst = writeThenClose(buffer.getValue());
1674            paths.add(dst);
1675            return null;
1676          }
1677        });
1678      }
1679      boolean progress_failed = false;
1680      for (int i = 0, n = entryBuffers.buffers.size(); i < n; i++) {
1681        Future<Void> future = completionService.take();
1682        future.get();
1683        if (!progress_failed && reporter != null && !reporter.progress()) {
1684          progress_failed = true;
1685        }
1686      }
1687
1688      return progress_failed;
1689    }
1690
1691    /**
1692     * since the splitting process may create multiple output files, we need a map
1693     * regionRecoverStatMap to track the output count of each region.
1694     * @return a map from encoded region ID to the number of edits written out for that region.
1695     */
1696    @Override
1697    public Map<byte[], Long> getOutputCounts() {
1698      Map<byte[], Long> regionRecoverStatMapResult = new HashMap<>();
1699      for(Map.Entry<String, Long> entry: regionRecoverStatMap.entrySet()){
1700        regionRecoverStatMapResult.put(Bytes.toBytes(entry.getKey()), entry.getValue());
1701      }
1702      return regionRecoverStatMapResult;
1703    }
1704
1705    /**
1706     * @return the number of recovered regions
1707     */
1708    @Override
1709    public int getNumberOfRecoveredRegions() {
1710      return regionRecoverStatMap.size();
1711    }
1712
1713    /**
1714     * Append the buffer to a new recovered edits file, then close it after all done
1715     * @param buffer contain all entries of a certain region
1716     * @throws IOException when closeWriter failed
1717     */
1718    @Override
1719    public void append(RegionEntryBuffer buffer) throws IOException {
1720      writeThenClose(buffer);
1721    }
1722
1723    private Path writeThenClose(RegionEntryBuffer buffer) throws IOException {
1724      WriterAndPath wap = appendBuffer(buffer, false);
1725      if(wap != null) {
1726        String encodedRegionName = Bytes.toString(buffer.encodedRegionName);
1727        Long value = regionRecoverStatMap.putIfAbsent(encodedRegionName, wap.editsWritten);
1728        if (value != null) {
1729          Long newValue = regionRecoverStatMap.get(encodedRegionName) + wap.editsWritten;
1730          regionRecoverStatMap.put(encodedRegionName, newValue);
1731        }
1732      }
1733
1734      Path dst = null;
1735      List<IOException> thrown = new ArrayList<>();
1736      if(wap != null){
1737        dst = closeWriter(Bytes.toString(buffer.encodedRegionName), wap, thrown);
1738      }
1739      if (!thrown.isEmpty()) {
1740        throw MultipleIOException.createIOException(thrown);
1741      }
1742      return dst;
1743    }
1744  }
1745
1746  /**
1747   * Class wraps the actual writer which writes data out and related statistics
1748   */
1749  public abstract static class SinkWriter {
1750    /* Count of edits written to this path */
1751    long editsWritten = 0;
1752    /* Count of edits skipped to this path */
1753    long editsSkipped = 0;
1754    /* Number of nanos spent writing to this log */
1755    long nanosSpent = 0;
1756
1757    void incrementEdits(int edits) {
1758      editsWritten += edits;
1759    }
1760
1761    void incrementSkippedEdits(int skipped) {
1762      editsSkipped += skipped;
1763    }
1764
1765    void incrementNanoTime(long nanos) {
1766      nanosSpent += nanos;
1767    }
1768  }
1769
1770  /**
1771   * Private data structure that wraps a Writer and its Path, also collecting statistics about the
1772   * data written to this output.
1773   */
1774  private final static class WriterAndPath extends SinkWriter {
1775    final Path p;
1776    final Writer w;
1777    final long minLogSeqNum;
1778
1779    WriterAndPath(final Path p, final Writer w, final long minLogSeqNum) {
1780      this.p = p;
1781      this.w = w;
1782      this.minLogSeqNum = minLogSeqNum;
1783    }
1784  }
1785
1786  static class CorruptedLogFileException extends Exception {
1787    private static final long serialVersionUID = 1L;
1788
1789    CorruptedLogFileException(String s) {
1790      super(s);
1791    }
1792  }
1793
1794  /** A struct used by getMutationsFromWALEntry */
1795  public static class MutationReplay implements Comparable<MutationReplay> {
1796    public MutationReplay(MutationType type, Mutation mutation, long nonceGroup, long nonce) {
1797      this.type = type;
1798      this.mutation = mutation;
1799      if(this.mutation.getDurability() != Durability.SKIP_WAL) {
1800        // using ASYNC_WAL for relay
1801        this.mutation.setDurability(Durability.ASYNC_WAL);
1802      }
1803      this.nonceGroup = nonceGroup;
1804      this.nonce = nonce;
1805    }
1806
1807    public final MutationType type;
1808    public final Mutation mutation;
1809    public final long nonceGroup;
1810    public final long nonce;
1811
1812    @Override
1813    public int compareTo(final MutationReplay d) {
1814      return this.mutation.compareTo(d.mutation);
1815    }
1816
1817    @Override
1818    public boolean equals(Object obj) {
1819      if(!(obj instanceof MutationReplay)) {
1820        return false;
1821      } else {
1822        return this.compareTo((MutationReplay)obj) == 0;
1823      }
1824    }
1825
1826    @Override
1827    public int hashCode() {
1828      return this.mutation.hashCode();
1829    }
1830  }
1831
1832  /**
1833   * This function is used to construct mutations from a WALEntry. It also
1834   * reconstructs WALKey &amp; WALEdit from the passed in WALEntry
1835   * @param entry
1836   * @param cells
1837   * @param logEntry pair of WALKey and WALEdit instance stores WALKey and WALEdit instances
1838   *          extracted from the passed in WALEntry.
1839   * @return list of Pair&lt;MutationType, Mutation&gt; to be replayed
1840   * @throws IOException
1841   */
1842  public static List<MutationReplay> getMutationsFromWALEntry(WALEntry entry, CellScanner cells,
1843      Pair<WALKey, WALEdit> logEntry, Durability durability) throws IOException {
1844    if (entry == null) {
1845      // return an empty array
1846      return Collections.emptyList();
1847    }
1848
1849    long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ?
1850      entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber();
1851    int count = entry.getAssociatedCellCount();
1852    List<MutationReplay> mutations = new ArrayList<>();
1853    Cell previousCell = null;
1854    Mutation m = null;
1855    WALKeyImpl key = null;
1856    WALEdit val = null;
1857    if (logEntry != null) {
1858      val = new WALEdit();
1859    }
1860
1861    for (int i = 0; i < count; i++) {
1862      // Throw index out of bounds if our cell count is off
1863      if (!cells.advance()) {
1864        throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
1865      }
1866      Cell cell = cells.current();
1867      if (val != null) val.add(cell);
1868
1869      boolean isNewRowOrType =
1870          previousCell == null || previousCell.getTypeByte() != cell.getTypeByte()
1871              || !CellUtil.matchingRows(previousCell, cell);
1872      if (isNewRowOrType) {
1873        // Create new mutation
1874        if (CellUtil.isDelete(cell)) {
1875          m = new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
1876          // Deletes don't have nonces.
1877          mutations.add(new MutationReplay(
1878              MutationType.DELETE, m, HConstants.NO_NONCE, HConstants.NO_NONCE));
1879        } else {
1880          m = new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
1881          // Puts might come from increment or append, thus we need nonces.
1882          long nonceGroup = entry.getKey().hasNonceGroup()
1883              ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
1884          long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
1885          mutations.add(new MutationReplay(MutationType.PUT, m, nonceGroup, nonce));
1886        }
1887      }
1888      if (CellUtil.isDelete(cell)) {
1889        ((Delete) m).add(cell);
1890      } else {
1891        ((Put) m).add(cell);
1892      }
1893      m.setDurability(durability);
1894      previousCell = cell;
1895    }
1896
1897    // reconstruct WALKey
1898    if (logEntry != null) {
1899      org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALKey walKeyProto =
1900          entry.getKey();
1901      List<UUID> clusterIds = new ArrayList<>(walKeyProto.getClusterIdsCount());
1902      for (HBaseProtos.UUID uuid : entry.getKey().getClusterIdsList()) {
1903        clusterIds.add(new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits()));
1904      }
1905      key = new WALKeyImpl(walKeyProto.getEncodedRegionName().toByteArray(), TableName.valueOf(
1906              walKeyProto.getTableName().toByteArray()), replaySeqId, walKeyProto.getWriteTime(),
1907              clusterIds, walKeyProto.getNonceGroup(), walKeyProto.getNonce(), null);
1908      logEntry.setFirst(key);
1909      logEntry.setSecond(val);
1910    }
1911
1912    return mutations;
1913  }
1914}