001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import java.io.EOFException;
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.io.InterruptedIOException;
024import java.text.ParseException;
025import java.util.ArrayList;
026import java.util.Arrays;
027import java.util.Collections;
028import java.util.HashMap;
029import java.util.List;
030import java.util.Map;
031import java.util.NavigableSet;
032import java.util.Set;
033import java.util.TreeMap;
034import java.util.TreeSet;
035import java.util.UUID;
036import java.util.concurrent.Callable;
037import java.util.concurrent.CompletionService;
038import java.util.concurrent.ConcurrentHashMap;
039import java.util.concurrent.ExecutionException;
040import java.util.concurrent.ExecutorCompletionService;
041import java.util.concurrent.Future;
042import java.util.concurrent.ThreadFactory;
043import java.util.concurrent.ThreadPoolExecutor;
044import java.util.concurrent.TimeUnit;
045import java.util.concurrent.atomic.AtomicLong;
046import java.util.concurrent.atomic.AtomicReference;
047import java.util.regex.Matcher;
048import java.util.regex.Pattern;
049import org.apache.commons.lang3.ArrayUtils;
050import org.apache.hadoop.conf.Configuration;
051import org.apache.hadoop.fs.FileAlreadyExistsException;
052import org.apache.hadoop.fs.FileStatus;
053import org.apache.hadoop.fs.FileSystem;
054import org.apache.hadoop.fs.Path;
055import org.apache.hadoop.fs.PathFilter;
056import org.apache.hadoop.hbase.Cell;
057import org.apache.hadoop.hbase.CellScanner;
058import org.apache.hadoop.hbase.CellUtil;
059import org.apache.hadoop.hbase.HBaseConfiguration;
060import org.apache.hadoop.hbase.HConstants;
061import org.apache.hadoop.hbase.TableName;
062import org.apache.hadoop.hbase.client.Delete;
063import org.apache.hadoop.hbase.client.Durability;
064import org.apache.hadoop.hbase.client.Mutation;
065import org.apache.hadoop.hbase.client.Put;
066import org.apache.hadoop.hbase.client.RegionInfo;
067import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
068import org.apache.hadoop.hbase.io.HeapSize;
069import org.apache.hadoop.hbase.log.HBaseMarkers;
070import org.apache.hadoop.hbase.master.SplitLogManager;
071import org.apache.hadoop.hbase.monitoring.MonitoredTask;
072import org.apache.hadoop.hbase.monitoring.TaskMonitor;
073import org.apache.hadoop.hbase.regionserver.HRegion;
074import org.apache.hadoop.hbase.regionserver.LastSequenceId;
075import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
076import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
077import org.apache.hadoop.hbase.util.Bytes;
078import org.apache.hadoop.hbase.util.CancelableProgressable;
079import org.apache.hadoop.hbase.util.ClassSize;
080import org.apache.hadoop.hbase.util.CollectionUtils.IOExceptionSupplier;
081import org.apache.hadoop.hbase.util.FSUtils;
082import org.apache.hadoop.hbase.util.Pair;
083import org.apache.hadoop.hbase.util.Threads;
084import org.apache.hadoop.hbase.wal.WAL.Entry;
085import org.apache.hadoop.hbase.wal.WAL.Reader;
086import org.apache.hadoop.hbase.wal.WALProvider.Writer;
087import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
088import org.apache.hadoop.io.MultipleIOException;
089import org.apache.hadoop.ipc.RemoteException;
090import org.apache.yetus.audience.InterfaceAudience;
091import org.slf4j.Logger;
092import org.slf4j.LoggerFactory;
093
094import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
095import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
096import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
097import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
098import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
099import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils;
100
101import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
102import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
103import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
104import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
105import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
106/**
107 * This class is responsible for splitting up a bunch of regionserver commit log
108 * files that are no longer being written to, into new files, one per region, for
109 * recovering data on startup. Delete the old log files when finished.
110 */
111@InterfaceAudience.Private
112public class WALSplitter {
113  private static final Logger LOG = LoggerFactory.getLogger(WALSplitter.class);
114
115  /** By default we retry errors in splitting, rather than skipping. */
116  public static final boolean SPLIT_SKIP_ERRORS_DEFAULT = false;
117
118  // Parameters for split process
119  protected final Path walDir;
120  protected final FileSystem walFS;
121  protected final Configuration conf;
122
123  // Major subcomponents of the split process.
124  // These are separated into inner classes to make testing easier.
125  OutputSink outputSink;
126  private EntryBuffers entryBuffers;
127
128  private SplitLogWorkerCoordination splitLogWorkerCoordination;
129  private final WALFactory walFactory;
130
131  private MonitoredTask status;
132
133  // For checking the latest flushed sequence id
134  protected final LastSequenceId sequenceIdChecker;
135
136  // Map encodedRegionName -> lastFlushedSequenceId
137  protected Map<String, Long> lastFlushedSequenceIds = new ConcurrentHashMap<>();
138
139  // Map encodedRegionName -> maxSeqIdInStores
140  protected Map<String, Map<byte[], Long>> regionMaxSeqIdInStores = new ConcurrentHashMap<>();
141
142  // the file being split currently
143  private FileStatus fileBeingSplit;
144
145  // if we limit the number of writers opened for sinking recovered edits
146  private final boolean splitWriterCreationBounded;
147
148  public final static String SPLIT_WRITER_CREATION_BOUNDED = "hbase.split.writer.creation.bounded";
149
150
151  @VisibleForTesting
152  WALSplitter(final WALFactory factory, Configuration conf, Path walDir,
153      FileSystem walFS, LastSequenceId idChecker,
154      SplitLogWorkerCoordination splitLogWorkerCoordination) {
155    this.conf = HBaseConfiguration.create(conf);
156    String codecClassName = conf
157        .get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
158    this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName);
159    this.walDir = walDir;
160    this.walFS = walFS;
161    this.sequenceIdChecker = idChecker;
162    this.splitLogWorkerCoordination = splitLogWorkerCoordination;
163
164    this.walFactory = factory;
165    PipelineController controller = new PipelineController();
166
167    this.splitWriterCreationBounded = conf.getBoolean(SPLIT_WRITER_CREATION_BOUNDED, false);
168
169    entryBuffers = new EntryBuffers(controller,
170        this.conf.getLong("hbase.regionserver.hlog.splitlog.buffersize", 128 * 1024 * 1024),
171        splitWriterCreationBounded);
172
173    int numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
174    if(splitWriterCreationBounded){
175      outputSink = new BoundedLogWriterCreationOutputSink(
176          controller, entryBuffers, numWriterThreads);
177    }else {
178      outputSink = new LogRecoveredEditsOutputSink(controller, entryBuffers, numWriterThreads);
179    }
180  }
181
182  /**
183   * Splits a WAL file into region's recovered-edits directory.
184   * This is the main entry point for distributed log splitting from SplitLogWorker.
185   * <p>
186   * If the log file has N regions then N recovered.edits files will be produced.
187   * <p>
188   * @return false if it is interrupted by the progress-able.
189   */
190  public static boolean splitLogFile(Path walDir, FileStatus logfile, FileSystem walFS,
191      Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker,
192      SplitLogWorkerCoordination splitLogWorkerCoordination, final WALFactory factory)
193      throws IOException {
194    WALSplitter s = new WALSplitter(factory, conf, walDir, walFS, idChecker,
195        splitLogWorkerCoordination);
196    return s.splitLogFile(logfile, reporter);
197  }
198
199  // A wrapper to split one log folder using the method used by distributed
200  // log splitting. Used by tools and unit tests. It should be package private.
201  // It is public only because TestWALObserver is in a different package,
202  // which uses this method to do log splitting.
203  @VisibleForTesting
204  public static List<Path> split(Path rootDir, Path logDir, Path oldLogDir,
205      FileSystem walFS, Configuration conf, final WALFactory factory) throws IOException {
206    final FileStatus[] logfiles = SplitLogManager.getFileList(conf,
207        Collections.singletonList(logDir), null);
208    List<Path> splits = new ArrayList<>();
209    if (ArrayUtils.isNotEmpty(logfiles)) {
210      for (FileStatus logfile: logfiles) {
211        WALSplitter s = new WALSplitter(factory, conf, rootDir, walFS, null, null);
212        if (s.splitLogFile(logfile, null)) {
213          finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf);
214          if (s.outputSink.splits != null) {
215            splits.addAll(s.outputSink.splits);
216          }
217        }
218      }
219    }
220    if (!walFS.delete(logDir, true)) {
221      throw new IOException("Unable to delete src dir: " + logDir);
222    }
223    return splits;
224  }
225
226  /**
227   * log splitting implementation, splits one log file.
228   * @param logfile should be an actual log file.
229   */
230  @VisibleForTesting
231  boolean splitLogFile(FileStatus logfile, CancelableProgressable reporter) throws IOException {
232    Preconditions.checkState(status == null);
233    Preconditions.checkArgument(logfile.isFile(),
234        "passed in file status is for something other than a regular file.");
235    boolean isCorrupted = false;
236    boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors",
237      SPLIT_SKIP_ERRORS_DEFAULT);
238    int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024);
239    Path logPath = logfile.getPath();
240    boolean outputSinkStarted = false;
241    boolean progress_failed = false;
242    int editsCount = 0;
243    int editsSkipped = 0;
244
245    status = TaskMonitor.get().createStatus(
246          "Splitting log file " + logfile.getPath() + "into a temporary staging area.");
247    Reader logFileReader = null;
248    this.fileBeingSplit = logfile;
249    try {
250      long logLength = logfile.getLen();
251      LOG.info("Splitting WAL={}, length={}", logPath, logLength);
252      status.setStatus("Opening log file");
253      if (reporter != null && !reporter.progress()) {
254        progress_failed = true;
255        return false;
256      }
257      logFileReader = getReader(logfile, skipErrors, reporter);
258      if (logFileReader == null) {
259        LOG.warn("Nothing to split in WAL={}", logPath);
260        return true;
261      }
262      int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3);
263      int numOpenedFilesLastCheck = 0;
264      outputSink.setReporter(reporter);
265      outputSink.startWriterThreads();
266      outputSinkStarted = true;
267      Entry entry;
268      Long lastFlushedSequenceId = -1L;
269      while ((entry = getNextLogLine(logFileReader, logPath, skipErrors)) != null) {
270        byte[] region = entry.getKey().getEncodedRegionName();
271        String encodedRegionNameAsStr = Bytes.toString(region);
272        lastFlushedSequenceId = lastFlushedSequenceIds.get(encodedRegionNameAsStr);
273        if (lastFlushedSequenceId == null) {
274          if (sequenceIdChecker != null) {
275            RegionStoreSequenceIds ids = sequenceIdChecker.getLastSequenceId(region);
276            Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR);
277            for (StoreSequenceId storeSeqId : ids.getStoreSequenceIdList()) {
278              maxSeqIdInStores.put(storeSeqId.getFamilyName().toByteArray(),
279                storeSeqId.getSequenceId());
280            }
281            regionMaxSeqIdInStores.put(encodedRegionNameAsStr, maxSeqIdInStores);
282            lastFlushedSequenceId = ids.getLastFlushedSequenceId();
283            if (LOG.isDebugEnabled()) {
284              LOG.debug("DLS Last flushed sequenceid for " + encodedRegionNameAsStr + ": " +
285                  TextFormat.shortDebugString(ids));
286            }
287          }
288          if (lastFlushedSequenceId == null) {
289            lastFlushedSequenceId = -1L;
290          }
291          lastFlushedSequenceIds.put(encodedRegionNameAsStr, lastFlushedSequenceId);
292        }
293        if (lastFlushedSequenceId >= entry.getKey().getSequenceId()) {
294          editsSkipped++;
295          continue;
296        }
297        // Don't send Compaction/Close/Open region events to recovered edit type sinks.
298        if (entry.getEdit().isMetaEdit() && !outputSink.keepRegionEvent(entry)) {
299          editsSkipped++;
300          continue;
301        }
302        entryBuffers.appendEntry(entry);
303        editsCount++;
304        int moreWritersFromLastCheck = this.getNumOpenWriters() - numOpenedFilesLastCheck;
305        // If sufficient edits have passed, check if we should report progress.
306        if (editsCount % interval == 0
307            || moreWritersFromLastCheck > numOpenedFilesBeforeReporting) {
308          numOpenedFilesLastCheck = this.getNumOpenWriters();
309          String countsStr = (editsCount - (editsSkipped + outputSink.getSkippedEdits()))
310              + " edits, skipped " + editsSkipped + " edits.";
311          status.setStatus("Split " + countsStr);
312          if (reporter != null && !reporter.progress()) {
313            progress_failed = true;
314            return false;
315          }
316        }
317      }
318    } catch (InterruptedException ie) {
319      IOException iie = new InterruptedIOException();
320      iie.initCause(ie);
321      throw iie;
322    } catch (CorruptedLogFileException e) {
323      LOG.warn("Could not parse, corrupted WAL={}", logPath, e);
324      if (splitLogWorkerCoordination != null) {
325        // Some tests pass in a csm of null.
326        splitLogWorkerCoordination.markCorrupted(walDir, logfile.getPath().getName(), walFS);
327      } else {
328        // for tests only
329        ZKSplitLog.markCorrupted(walDir, logfile.getPath().getName(), walFS);
330      }
331      isCorrupted = true;
332    } catch (IOException e) {
333      e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
334      throw e;
335    } finally {
336      LOG.debug("Finishing writing output logs and closing down");
337      try {
338        if (null != logFileReader) {
339          logFileReader.close();
340        }
341      } catch (IOException exception) {
342        LOG.warn("Could not close WAL reader", exception);
343      }
344      try {
345        if (outputSinkStarted) {
346          // Set progress_failed to true as the immediate following statement will reset its value
347          // when finishWritingAndClose() throws exception, progress_failed has the right value
348          progress_failed = true;
349          progress_failed = outputSink.finishWritingAndClose() == null;
350        }
351      } finally {
352        String msg =
353            "Processed " + editsCount + " edits across " + outputSink.getNumberOfRecoveredRegions()
354                + " regions; edits skipped=" + editsSkipped + "; log file=" + logPath +
355                ", length=" + logfile.getLen() + // See if length got updated post lease recovery
356                ", corrupted=" + isCorrupted + ", progress failed=" + progress_failed;
357        LOG.info(msg);
358        status.markComplete(msg);
359      }
360    }
361    return !progress_failed;
362  }
363
364  /**
365   * Completes the work done by splitLogFile by archiving logs
366   * <p>
367   * It is invoked by SplitLogManager once it knows that one of the
368   * SplitLogWorkers have completed the splitLogFile() part. If the master
369   * crashes then this function might get called multiple times.
370   * <p>
371   * @param logfile
372   * @param conf
373   * @throws IOException
374   */
375  public static void finishSplitLogFile(String logfile,
376      Configuration conf)  throws IOException {
377    Path walDir = FSUtils.getWALRootDir(conf);
378    Path oldLogDir = new Path(walDir, HConstants.HREGION_OLDLOGDIR_NAME);
379    Path logPath;
380    if (FSUtils.isStartingWithPath(walDir, logfile)) {
381      logPath = new Path(logfile);
382    } else {
383      logPath = new Path(walDir, logfile);
384    }
385    finishSplitLogFile(walDir, oldLogDir, logPath, conf);
386  }
387
388  private static void finishSplitLogFile(Path walDir, Path oldLogDir,
389      Path logPath, Configuration conf) throws IOException {
390    List<Path> processedLogs = new ArrayList<>();
391    List<Path> corruptedLogs = new ArrayList<>();
392    FileSystem walFS = walDir.getFileSystem(conf);
393    if (ZKSplitLog.isCorrupted(walDir, logPath.getName(), walFS)) {
394      corruptedLogs.add(logPath);
395    } else {
396      processedLogs.add(logPath);
397    }
398    archiveLogs(corruptedLogs, processedLogs, oldLogDir, walFS, conf);
399    Path stagingDir = ZKSplitLog.getSplitLogDir(walDir, logPath.getName());
400    walFS.delete(stagingDir, true);
401  }
402
403  /**
404   * Moves processed logs to a oldLogDir after successful processing Moves
405   * corrupted logs (any log that couldn't be successfully parsed to corruptDir
406   * (.corrupt) for later investigation
407   *
408   * @param corruptedLogs
409   * @param processedLogs
410   * @param oldLogDir
411   * @param walFS WAL FileSystem to archive files on.
412   * @param conf
413   * @throws IOException
414   */
415  private static void archiveLogs(
416      final List<Path> corruptedLogs,
417      final List<Path> processedLogs, final Path oldLogDir,
418      final FileSystem walFS, final Configuration conf) throws IOException {
419    final Path corruptDir = new Path(FSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME);
420    if (conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir") != null) {
421      LOG.warn("hbase.regionserver.hlog.splitlog.corrupt.dir is deprecated. Default to {}",
422          corruptDir);
423    }
424    if (!walFS.mkdirs(corruptDir)) {
425      LOG.info("Unable to mkdir {}", corruptDir);
426    }
427    walFS.mkdirs(oldLogDir);
428
429    // this method can get restarted or called multiple times for archiving
430    // the same log files.
431    for (Path corrupted : corruptedLogs) {
432      Path p = new Path(corruptDir, corrupted.getName());
433      if (walFS.exists(corrupted)) {
434        if (!walFS.rename(corrupted, p)) {
435          LOG.warn("Unable to move corrupted log {} to {}", corrupted, p);
436        } else {
437          LOG.warn("Moved corrupted log {} to {}", corrupted, p);
438        }
439      }
440    }
441
442    for (Path p : processedLogs) {
443      Path newPath = AbstractFSWAL.getWALArchivePath(oldLogDir, p);
444      if (walFS.exists(p)) {
445        if (!FSUtils.renameAndSetModifyTime(walFS, p, newPath)) {
446          LOG.warn("Unable to move {} to {}", p, newPath);
447        } else {
448          LOG.info("Archived processed log {} to {}", p, newPath);
449        }
450      }
451    }
452  }
453
454  /**
455   * Path to a file under RECOVERED_EDITS_DIR directory of the region found in
456   * <code>logEntry</code> named for the sequenceid in the passed
457   * <code>logEntry</code>: e.g. /hbase/some_table/2323432434/recovered.edits/2332.
458   * This method also ensures existence of RECOVERED_EDITS_DIR under the region
459   * creating it if necessary.
460   * @param fs
461   * @param logEntry
462   * @param rootDir HBase root dir.
463   * @param fileNameBeingSplit the file being split currently. Used to generate tmp file name.
464   * @return Path to file into which to dump split log edits.
465   * @throws IOException
466   */
467  @SuppressWarnings("deprecation")
468  @VisibleForTesting
469  static Path getRegionSplitEditsPath(final Entry logEntry, String fileNameBeingSplit,
470      Configuration conf) throws IOException {
471    FileSystem walFS = FSUtils.getWALFileSystem(conf);
472    Path tableDir = FSUtils.getWALTableDir(conf, logEntry.getKey().getTableName());
473    String encodedRegionName = Bytes.toString(logEntry.getKey().getEncodedRegionName());
474    Path regionDir = HRegion.getRegionDir(tableDir, encodedRegionName);
475    Path dir = getRegionDirRecoveredEditsDir(regionDir);
476
477
478    if (walFS.exists(dir) && walFS.isFile(dir)) {
479      Path tmp = new Path("/tmp");
480      if (!walFS.exists(tmp)) {
481        walFS.mkdirs(tmp);
482      }
483      tmp = new Path(tmp,
484        HConstants.RECOVERED_EDITS_DIR + "_" + encodedRegionName);
485      LOG.warn("Found existing old file: {}. It could be some "
486        + "leftover of an old installation. It should be a folder instead. "
487        + "So moving it to {}", dir, tmp);
488      if (!walFS.rename(dir, tmp)) {
489        LOG.warn("Failed to sideline old file {}", dir);
490      }
491    }
492
493    if (!walFS.exists(dir) && !walFS.mkdirs(dir)) {
494      LOG.warn("mkdir failed on {}", dir);
495    }
496    // Append fileBeingSplit to prevent name conflict since we may have duplicate wal entries now.
497    // Append file name ends with RECOVERED_LOG_TMPFILE_SUFFIX to ensure
498    // region's replayRecoveredEdits will not delete it
499    String fileName = formatRecoveredEditsFileName(logEntry.getKey().getSequenceId());
500    fileName = getTmpRecoveredEditsFileName(fileName + "-" + fileNameBeingSplit);
501    return new Path(dir, fileName);
502  }
503
504  private static String getTmpRecoveredEditsFileName(String fileName) {
505    return fileName + RECOVERED_LOG_TMPFILE_SUFFIX;
506  }
507
508  /**
509   * Get the completed recovered edits file path, renaming it to be by last edit
510   * in the file from its first edit. Then we could use the name to skip
511   * recovered edits when doing {@link HRegion#replayRecoveredEditsIfAny}.
512   * @param srcPath
513   * @param maximumEditLogSeqNum
514   * @return dstPath take file's last edit log seq num as the name
515   */
516  private static Path getCompletedRecoveredEditsFilePath(Path srcPath,
517      long maximumEditLogSeqNum) {
518    String fileName = formatRecoveredEditsFileName(maximumEditLogSeqNum);
519    return new Path(srcPath.getParent(), fileName);
520  }
521
522  @VisibleForTesting
523  static String formatRecoveredEditsFileName(final long seqid) {
524    return String.format("%019d", seqid);
525  }
526
527  private static final Pattern EDITFILES_NAME_PATTERN = Pattern.compile("-?[0-9]+");
528  private static final String RECOVERED_LOG_TMPFILE_SUFFIX = ".temp";
529
530  /**
531   * @param regionDir
532   *          This regions directory in the filesystem.
533   * @return The directory that holds recovered edits files for the region
534   *         <code>regionDir</code>
535   */
536  public static Path getRegionDirRecoveredEditsDir(final Path regionDir) {
537    return new Path(regionDir, HConstants.RECOVERED_EDITS_DIR);
538  }
539
540  /**
541   * Check whether there is recovered.edits in the region dir
542   * @param conf conf
543   * @param regionInfo the region to check
544   * @throws IOException IOException
545   * @return true if recovered.edits exist in the region dir
546   */
547  public static boolean hasRecoveredEdits(final Configuration conf,
548    final RegionInfo regionInfo) throws IOException {
549    // No recovered.edits for non default replica regions
550    if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
551      return false;
552    }
553    // Only default replica region can reach here, so we can use regioninfo
554    // directly without converting it to default replica's regioninfo.
555    Path regionWALDir =
556        FSUtils.getWALRegionDir(conf, regionInfo.getTable(), regionInfo.getEncodedName());
557    Path regionDir = FSUtils.getRegionDirFromRootDir(FSUtils.getRootDir(conf), regionInfo);
558    Path wrongRegionWALDir =
559        FSUtils.getWrongWALRegionDir(conf, regionInfo.getTable(), regionInfo.getEncodedName());
560    FileSystem walFs = FSUtils.getWALFileSystem(conf);
561    FileSystem rootFs = FSUtils.getRootDirFileSystem(conf);
562    NavigableSet<Path> files = getSplitEditFilesSorted(walFs, regionWALDir);
563    if (!files.isEmpty()) {
564      return true;
565    }
566    files = getSplitEditFilesSorted(rootFs, regionDir);
567    if (!files.isEmpty()) {
568      return true;
569    }
570    files = getSplitEditFilesSorted(walFs, wrongRegionWALDir);
571    return !files.isEmpty();
572  }
573
574
575  /**
576   * Returns sorted set of edit files made by splitter, excluding files
577   * with '.temp' suffix.
578   *
579   * @param walFS WAL FileSystem used to retrieving split edits files.
580   * @param regionDir WAL region dir to look for recovered edits files under.
581   * @return Files in passed <code>regionDir</code> as a sorted set.
582   * @throws IOException
583   */
584  public static NavigableSet<Path> getSplitEditFilesSorted(final FileSystem walFS,
585      final Path regionDir) throws IOException {
586    NavigableSet<Path> filesSorted = new TreeSet<>();
587    Path editsdir = getRegionDirRecoveredEditsDir(regionDir);
588    if (!walFS.exists(editsdir)) {
589      return filesSorted;
590    }
591    FileStatus[] files = FSUtils.listStatus(walFS, editsdir, new PathFilter() {
592      @Override
593      public boolean accept(Path p) {
594        boolean result = false;
595        try {
596          // Return files and only files that match the editfile names pattern.
597          // There can be other files in this directory other than edit files.
598          // In particular, on error, we'll move aside the bad edit file giving
599          // it a timestamp suffix. See moveAsideBadEditsFile.
600          Matcher m = EDITFILES_NAME_PATTERN.matcher(p.getName());
601          result = walFS.isFile(p) && m.matches();
602          // Skip the file whose name ends with RECOVERED_LOG_TMPFILE_SUFFIX,
603          // because it means splitwal thread is writting this file.
604          if (p.getName().endsWith(RECOVERED_LOG_TMPFILE_SUFFIX)) {
605            result = false;
606          }
607          // Skip SeqId Files
608          if (isSequenceIdFile(p)) {
609            result = false;
610          }
611        } catch (IOException e) {
612          LOG.warn("Failed isFile check on {}", p, e);
613        }
614        return result;
615      }
616    });
617    if (ArrayUtils.isNotEmpty(files)) {
618      Arrays.asList(files).forEach(status -> filesSorted.add(status.getPath()));
619    }
620    return filesSorted;
621  }
622
623  /**
624   * Move aside a bad edits file.
625   *
626   * @param walFS WAL FileSystem used to rename bad edits file.
627   * @param edits
628   *          Edits file to move aside.
629   * @return The name of the moved aside file.
630   * @throws IOException
631   */
632  public static Path moveAsideBadEditsFile(final FileSystem walFS, final Path edits)
633      throws IOException {
634    Path moveAsideName = new Path(edits.getParent(), edits.getName() + "."
635        + System.currentTimeMillis());
636    if (!walFS.rename(edits, moveAsideName)) {
637      LOG.warn("Rename failed from {} to {}", edits, moveAsideName);
638    }
639    return moveAsideName;
640  }
641
642  private static final String SEQUENCE_ID_FILE_SUFFIX = ".seqid";
643  private static final String OLD_SEQUENCE_ID_FILE_SUFFIX = "_seqid";
644  private static final int SEQUENCE_ID_FILE_SUFFIX_LENGTH = SEQUENCE_ID_FILE_SUFFIX.length();
645
646  /**
647   * Is the given file a region open sequence id file.
648   */
649  @VisibleForTesting
650  public static boolean isSequenceIdFile(final Path file) {
651    return file.getName().endsWith(SEQUENCE_ID_FILE_SUFFIX)
652        || file.getName().endsWith(OLD_SEQUENCE_ID_FILE_SUFFIX);
653  }
654
655  private static FileStatus[] getSequenceIdFiles(FileSystem walFS, Path regionDir)
656      throws IOException {
657    // TODO: Why are we using a method in here as part of our normal region open where
658    // there is no splitting involved? Fix. St.Ack 01/20/2017.
659    Path editsDir = WALSplitter.getRegionDirRecoveredEditsDir(regionDir);
660    try {
661      FileStatus[] files = walFS.listStatus(editsDir, WALSplitter::isSequenceIdFile);
662      return files != null ? files : new FileStatus[0];
663    } catch (FileNotFoundException e) {
664      return new FileStatus[0];
665    }
666  }
667
668  private static long getMaxSequenceId(FileStatus[] files) {
669    long maxSeqId = -1L;
670    for (FileStatus file : files) {
671      String fileName = file.getPath().getName();
672      try {
673        maxSeqId = Math.max(maxSeqId, Long
674          .parseLong(fileName.substring(0, fileName.length() - SEQUENCE_ID_FILE_SUFFIX_LENGTH)));
675      } catch (NumberFormatException ex) {
676        LOG.warn("Invalid SeqId File Name={}", fileName);
677      }
678    }
679    return maxSeqId;
680  }
681
682  /**
683   * Get the max sequence id which is stored in the region directory. -1 if none.
684   */
685  public static long getMaxRegionSequenceId(FileSystem walFS, Path regionDir) throws IOException {
686    return getMaxSequenceId(getSequenceIdFiles(walFS, regionDir));
687  }
688
689  /**
690   * Create a file with name as region's max sequence id
691   */
692  public static void writeRegionSequenceIdFile(FileSystem walFS, Path regionDir, long newMaxSeqId)
693      throws IOException {
694    FileStatus[] files = getSequenceIdFiles(walFS, regionDir);
695    long maxSeqId = getMaxSequenceId(files);
696    if (maxSeqId > newMaxSeqId) {
697      throw new IOException("The new max sequence id " + newMaxSeqId +
698        " is less than the old max sequence id " + maxSeqId);
699    }
700    // write a new seqId file
701    Path newSeqIdFile = new Path(WALSplitter.getRegionDirRecoveredEditsDir(regionDir),
702      newMaxSeqId + SEQUENCE_ID_FILE_SUFFIX);
703    if (newMaxSeqId != maxSeqId) {
704      try {
705        if (!walFS.createNewFile(newSeqIdFile) && !walFS.exists(newSeqIdFile)) {
706          throw new IOException("Failed to create SeqId file:" + newSeqIdFile);
707        }
708        LOG.debug("Wrote file={}, newMaxSeqId={}, maxSeqId={}", newSeqIdFile, newMaxSeqId,
709          maxSeqId);
710      } catch (FileAlreadyExistsException ignored) {
711        // latest hdfs throws this exception. it's all right if newSeqIdFile already exists
712      }
713    }
714    // remove old ones
715    for (FileStatus status : files) {
716      if (!newSeqIdFile.equals(status.getPath())) {
717        walFS.delete(status.getPath(), false);
718      }
719    }
720  }
721
722  /**
723   * This method will check 3 places for finding the max sequence id file. One is the expected
724   * place, another is the old place under the region directory, and the last one is the wrong one
725   * we introduced in HBASE-20734. See HBASE-22617 for more details.
726   * <p/>
727   * Notice that, you should always call this method instead of
728   * {@link #getMaxRegionSequenceId(FileSystem, Path)} until 4.0.0 release.
729   * @deprecated Only for compatibility, will be removed in 4.0.0.
730   */
731  @Deprecated
732  public static long getMaxRegionSequenceId(Configuration conf, RegionInfo region,
733      IOExceptionSupplier<FileSystem> rootFsSupplier, IOExceptionSupplier<FileSystem> walFsSupplier)
734      throws IOException {
735    FileSystem rootFs = rootFsSupplier.get();
736    FileSystem walFs = walFsSupplier.get();
737    Path regionWALDir = FSUtils.getWALRegionDir(conf, region.getTable(), region.getEncodedName());
738    // This is the old place where we store max sequence id file
739    Path regionDir = FSUtils.getRegionDirFromRootDir(FSUtils.getRootDir(conf), region);
740    // This is for HBASE-20734, where we use a wrong directory, see HBASE-22617 for more details.
741    Path wrongRegionWALDir =
742      FSUtils.getWrongWALRegionDir(conf, region.getTable(), region.getEncodedName());
743    long maxSeqId = getMaxRegionSequenceId(walFs, regionWALDir);
744    maxSeqId = Math.max(maxSeqId, getMaxRegionSequenceId(rootFs, regionDir));
745    maxSeqId = Math.max(maxSeqId, getMaxRegionSequenceId(walFs, wrongRegionWALDir));
746    return maxSeqId;
747  }
748
749  /**
750   * Create a new {@link Reader} for reading logs to split.
751   *
752   * @param file
753   * @return A new Reader instance, caller should close
754   * @throws IOException
755   * @throws CorruptedLogFileException
756   */
757  protected Reader getReader(FileStatus file, boolean skipErrors, CancelableProgressable reporter)
758      throws IOException, CorruptedLogFileException {
759    Path path = file.getPath();
760    long length = file.getLen();
761    Reader in;
762
763    // Check for possibly empty file. With appends, currently Hadoop reports a
764    // zero length even if the file has been sync'd. Revisit if HDFS-376 or
765    // HDFS-878 is committed.
766    if (length <= 0) {
767      LOG.warn("File {} might be still open, length is 0", path);
768    }
769
770    try {
771      FSUtils.getInstance(walFS, conf).recoverFileLease(walFS, path, conf, reporter);
772      try {
773        in = getReader(path, reporter);
774      } catch (EOFException e) {
775        if (length <= 0) {
776          // TODO should we ignore an empty, not-last log file if skip.errors
777          // is false? Either way, the caller should decide what to do. E.g.
778          // ignore if this is the last log in sequence.
779          // TODO is this scenario still possible if the log has been
780          // recovered (i.e. closed)
781          LOG.warn("Could not open {} for reading. File is empty", path, e);
782        }
783        // EOFException being ignored
784        return null;
785      }
786    } catch (IOException e) {
787      if (e instanceof FileNotFoundException) {
788        // A wal file may not exist anymore. Nothing can be recovered so move on
789        LOG.warn("File {} does not exist anymore", path, e);
790        return null;
791      }
792      if (!skipErrors || e instanceof InterruptedIOException) {
793        throw e; // Don't mark the file corrupted if interrupted, or not skipErrors
794      }
795      CorruptedLogFileException t =
796        new CorruptedLogFileException("skipErrors=true Could not open wal " +
797            path + " ignoring");
798      t.initCause(e);
799      throw t;
800    }
801    return in;
802  }
803
804  static private Entry getNextLogLine(Reader in, Path path, boolean skipErrors)
805  throws CorruptedLogFileException, IOException {
806    try {
807      return in.next();
808    } catch (EOFException eof) {
809      // truncated files are expected if a RS crashes (see HBASE-2643)
810      LOG.info("EOF from wal {}. Continuing.", path);
811      return null;
812    } catch (IOException e) {
813      // If the IOE resulted from bad file format,
814      // then this problem is idempotent and retrying won't help
815      if (e.getCause() != null &&
816          (e.getCause() instanceof ParseException ||
817           e.getCause() instanceof org.apache.hadoop.fs.ChecksumException)) {
818        LOG.warn("Parse exception from wal {}. Continuing", path, e);
819        return null;
820      }
821      if (!skipErrors) {
822        throw e;
823      }
824      CorruptedLogFileException t =
825        new CorruptedLogFileException("skipErrors=true Ignoring exception" +
826            " while parsing wal " + path + ". Marking as corrupted");
827      t.initCause(e);
828      throw t;
829    }
830  }
831
832  /**
833   * Create a new {@link Writer} for writing log splits.
834   * @return a new Writer instance, caller should close
835   */
836  protected Writer createWriter(Path logfile)
837      throws IOException {
838    return walFactory.createRecoveredEditsWriter(walFS, logfile);
839  }
840
841  /**
842   * Create a new {@link Reader} for reading logs to split.
843   * @return new Reader instance, caller should close
844   */
845  protected Reader getReader(Path curLogFile, CancelableProgressable reporter) throws IOException {
846    return walFactory.createReader(walFS, curLogFile, reporter);
847  }
848
849  /**
850   * Get current open writers
851   */
852  private int getNumOpenWriters() {
853    int result = 0;
854    if (this.outputSink != null) {
855      result += this.outputSink.getNumOpenWriters();
856    }
857    return result;
858  }
859
860  /**
861   * Contains some methods to control WAL-entries producer / consumer interactions
862   */
863  public static class PipelineController {
864    // If an exception is thrown by one of the other threads, it will be
865    // stored here.
866    AtomicReference<Throwable> thrown = new AtomicReference<>();
867
868    // Wait/notify for when data has been produced by the writer thread,
869    // consumed by the reader thread, or an exception occurred
870    public final Object dataAvailable = new Object();
871
872    void writerThreadError(Throwable t) {
873      thrown.compareAndSet(null, t);
874    }
875
876    /**
877     * Check for errors in the writer threads. If any is found, rethrow it.
878     */
879    void checkForErrors() throws IOException {
880      Throwable thrown = this.thrown.get();
881      if (thrown == null) return;
882      if (thrown instanceof IOException) {
883        throw new IOException(thrown);
884      } else {
885        throw new RuntimeException(thrown);
886      }
887    }
888  }
889
890  /**
891   * Class which accumulates edits and separates them into a buffer per region
892   * while simultaneously accounting RAM usage. Blocks if the RAM usage crosses
893   * a predefined threshold.
894   *
895   * Writer threads then pull region-specific buffers from this class.
896   */
897  public static class EntryBuffers {
898    PipelineController controller;
899
900    Map<byte[], RegionEntryBuffer> buffers = new TreeMap<>(Bytes.BYTES_COMPARATOR);
901
902    /* Track which regions are currently in the middle of writing. We don't allow
903       an IO thread to pick up bytes from a region if we're already writing
904       data for that region in a different IO thread. */
905    Set<byte[]> currentlyWriting = new TreeSet<>(Bytes.BYTES_COMPARATOR);
906
907    long totalBuffered = 0;
908    long maxHeapUsage;
909    boolean splitWriterCreationBounded;
910
911    public EntryBuffers(PipelineController controller, long maxHeapUsage) {
912      this(controller, maxHeapUsage, false);
913    }
914
915    public EntryBuffers(PipelineController controller, long maxHeapUsage,
916        boolean splitWriterCreationBounded){
917      this.controller = controller;
918      this.maxHeapUsage = maxHeapUsage;
919      this.splitWriterCreationBounded = splitWriterCreationBounded;
920    }
921
922    /**
923     * Append a log entry into the corresponding region buffer.
924     * Blocks if the total heap usage has crossed the specified threshold.
925     *
926     * @throws InterruptedException
927     * @throws IOException
928     */
929    public void appendEntry(Entry entry) throws InterruptedException, IOException {
930      WALKey key = entry.getKey();
931
932      RegionEntryBuffer buffer;
933      long incrHeap;
934      synchronized (this) {
935        buffer = buffers.get(key.getEncodedRegionName());
936        if (buffer == null) {
937          buffer = new RegionEntryBuffer(key.getTableName(), key.getEncodedRegionName());
938          buffers.put(key.getEncodedRegionName(), buffer);
939        }
940        incrHeap= buffer.appendEntry(entry);
941      }
942
943      // If we crossed the chunk threshold, wait for more space to be available
944      synchronized (controller.dataAvailable) {
945        totalBuffered += incrHeap;
946        while (totalBuffered > maxHeapUsage && controller.thrown.get() == null) {
947          LOG.debug("Used {} bytes of buffered edits, waiting for IO threads", totalBuffered);
948          controller.dataAvailable.wait(2000);
949        }
950        controller.dataAvailable.notifyAll();
951      }
952      controller.checkForErrors();
953    }
954
955    /**
956     * @return RegionEntryBuffer a buffer of edits to be written.
957     */
958    synchronized RegionEntryBuffer getChunkToWrite() {
959      // The core part of limiting opening writers is it doesn't return chunk only if the
960      // heap size is over maxHeapUsage. Thus it doesn't need to create a writer for each
961      // region during splitting. It will flush all the logs in the buffer after splitting
962      // through a threadpool, which means the number of writers it created is under control.
963      if (splitWriterCreationBounded && totalBuffered < maxHeapUsage) {
964        return null;
965      }
966      long biggestSize = 0;
967      byte[] biggestBufferKey = null;
968
969      for (Map.Entry<byte[], RegionEntryBuffer> entry : buffers.entrySet()) {
970        long size = entry.getValue().heapSize();
971        if (size > biggestSize && (!currentlyWriting.contains(entry.getKey()))) {
972          biggestSize = size;
973          biggestBufferKey = entry.getKey();
974        }
975      }
976      if (biggestBufferKey == null) {
977        return null;
978      }
979
980      RegionEntryBuffer buffer = buffers.remove(biggestBufferKey);
981      currentlyWriting.add(biggestBufferKey);
982      return buffer;
983    }
984
985    void doneWriting(RegionEntryBuffer buffer) {
986      synchronized (this) {
987        boolean removed = currentlyWriting.remove(buffer.encodedRegionName);
988        assert removed;
989      }
990      long size = buffer.heapSize();
991
992      synchronized (controller.dataAvailable) {
993        totalBuffered -= size;
994        // We may unblock writers
995        controller.dataAvailable.notifyAll();
996      }
997    }
998
999    synchronized boolean isRegionCurrentlyWriting(byte[] region) {
1000      return currentlyWriting.contains(region);
1001    }
1002
1003    public void waitUntilDrained() {
1004      synchronized (controller.dataAvailable) {
1005        while (totalBuffered > 0) {
1006          try {
1007            controller.dataAvailable.wait(2000);
1008          } catch (InterruptedException e) {
1009            LOG.warn("Got interrupted while waiting for EntryBuffers is drained");
1010            Thread.interrupted();
1011            break;
1012          }
1013        }
1014      }
1015    }
1016  }
1017
1018  /**
1019   * A buffer of some number of edits for a given region.
1020   * This accumulates edits and also provides a memory optimization in order to
1021   * share a single byte array instance for the table and region name.
1022   * Also tracks memory usage of the accumulated edits.
1023   */
1024  public static class RegionEntryBuffer implements HeapSize {
1025    long heapInBuffer = 0;
1026    List<Entry> entryBuffer;
1027    TableName tableName;
1028    byte[] encodedRegionName;
1029
1030    RegionEntryBuffer(TableName tableName, byte[] region) {
1031      this.tableName = tableName;
1032      this.encodedRegionName = region;
1033      this.entryBuffer = new ArrayList<>();
1034    }
1035
1036    long appendEntry(Entry entry) {
1037      internify(entry);
1038      entryBuffer.add(entry);
1039      long incrHeap = entry.getEdit().heapSize() +
1040        ClassSize.align(2 * ClassSize.REFERENCE) + // WALKey pointers
1041        0; // TODO linkedlist entry
1042      heapInBuffer += incrHeap;
1043      return incrHeap;
1044    }
1045
1046    private void internify(Entry entry) {
1047      WALKeyImpl k = entry.getKey();
1048      k.internTableName(this.tableName);
1049      k.internEncodedRegionName(this.encodedRegionName);
1050    }
1051
1052    @Override
1053    public long heapSize() {
1054      return heapInBuffer;
1055    }
1056
1057    public byte[] getEncodedRegionName() {
1058      return encodedRegionName;
1059    }
1060
1061    public List<Entry> getEntryBuffer() {
1062      return entryBuffer;
1063    }
1064
1065    public TableName getTableName() {
1066      return tableName;
1067    }
1068  }
1069
1070  public static class WriterThread extends Thread {
1071    private volatile boolean shouldStop = false;
1072    private PipelineController controller;
1073    private EntryBuffers entryBuffers;
1074    private OutputSink outputSink = null;
1075
1076    WriterThread(PipelineController controller, EntryBuffers entryBuffers, OutputSink sink, int i){
1077      super(Thread.currentThread().getName() + "-Writer-" + i);
1078      this.controller = controller;
1079      this.entryBuffers = entryBuffers;
1080      outputSink = sink;
1081    }
1082
1083    @Override
1084    public void run()  {
1085      try {
1086        doRun();
1087      } catch (Throwable t) {
1088        LOG.error("Exiting thread", t);
1089        controller.writerThreadError(t);
1090      }
1091    }
1092
1093    private void doRun() throws IOException {
1094      LOG.trace("Writer thread starting");
1095      while (true) {
1096        RegionEntryBuffer buffer = entryBuffers.getChunkToWrite();
1097        if (buffer == null) {
1098          // No data currently available, wait on some more to show up
1099          synchronized (controller.dataAvailable) {
1100            if (shouldStop && !this.outputSink.flush()) {
1101              return;
1102            }
1103            try {
1104              controller.dataAvailable.wait(500);
1105            } catch (InterruptedException ie) {
1106              if (!shouldStop) {
1107                throw new RuntimeException(ie);
1108              }
1109            }
1110          }
1111          continue;
1112        }
1113
1114        assert buffer != null;
1115        try {
1116          writeBuffer(buffer);
1117        } finally {
1118          entryBuffers.doneWriting(buffer);
1119        }
1120      }
1121    }
1122
1123    private void writeBuffer(RegionEntryBuffer buffer) throws IOException {
1124      outputSink.append(buffer);
1125    }
1126
1127    void finish() {
1128      synchronized (controller.dataAvailable) {
1129        shouldStop = true;
1130        controller.dataAvailable.notifyAll();
1131      }
1132    }
1133  }
1134
1135  /**
1136   * The following class is an abstraction class to provide a common interface to support
1137   * different ways of consuming recovered edits.
1138   */
1139  public static abstract class OutputSink {
1140
1141    protected PipelineController controller;
1142    protected EntryBuffers entryBuffers;
1143
1144    protected ConcurrentHashMap<String, SinkWriter> writers = new ConcurrentHashMap<>();
1145    protected final ConcurrentHashMap<String, Long> regionMaximumEditLogSeqNum =
1146        new ConcurrentHashMap<>();
1147
1148
1149    protected final List<WriterThread> writerThreads = Lists.newArrayList();
1150
1151    /* Set of regions which we've decided should not output edits */
1152    protected final Set<byte[]> blacklistedRegions = Collections
1153        .synchronizedSet(new TreeSet<>(Bytes.BYTES_COMPARATOR));
1154
1155    protected boolean closeAndCleanCompleted = false;
1156
1157    protected boolean writersClosed = false;
1158
1159    protected final int numThreads;
1160
1161    protected CancelableProgressable reporter = null;
1162
1163    protected AtomicLong skippedEdits = new AtomicLong();
1164
1165    protected List<Path> splits = null;
1166
1167    public OutputSink(PipelineController controller, EntryBuffers entryBuffers, int numWriters) {
1168      numThreads = numWriters;
1169      this.controller = controller;
1170      this.entryBuffers = entryBuffers;
1171    }
1172
1173    void setReporter(CancelableProgressable reporter) {
1174      this.reporter = reporter;
1175    }
1176
1177    /**
1178     * Start the threads that will pump data from the entryBuffers to the output files.
1179     */
1180    public synchronized void startWriterThreads() {
1181      for (int i = 0; i < numThreads; i++) {
1182        WriterThread t = new WriterThread(controller, entryBuffers, this, i);
1183        t.start();
1184        writerThreads.add(t);
1185      }
1186    }
1187
1188    /**
1189     *
1190     * Update region's maximum edit log SeqNum.
1191     */
1192    void updateRegionMaximumEditLogSeqNum(Entry entry) {
1193      synchronized (regionMaximumEditLogSeqNum) {
1194        String regionName = Bytes.toString(entry.getKey().getEncodedRegionName());
1195        Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(regionName);
1196        if (currentMaxSeqNum == null || entry.getKey().getSequenceId() > currentMaxSeqNum) {
1197          regionMaximumEditLogSeqNum.put(regionName, entry.getKey().getSequenceId());
1198        }
1199      }
1200    }
1201
1202    /**
1203     * @return the number of currently opened writers
1204     */
1205    int getNumOpenWriters() {
1206      return this.writers.size();
1207    }
1208
1209    long getSkippedEdits() {
1210      return this.skippedEdits.get();
1211    }
1212
1213    /**
1214     * Wait for writer threads to dump all info to the sink
1215     * @return true when there is no error
1216     * @throws IOException
1217     */
1218    protected boolean finishWriting(boolean interrupt) throws IOException {
1219      LOG.debug("Waiting for split writer threads to finish");
1220      boolean progress_failed = false;
1221      for (WriterThread t : writerThreads) {
1222        t.finish();
1223      }
1224      if (interrupt) {
1225        for (WriterThread t : writerThreads) {
1226          t.interrupt(); // interrupt the writer threads. We are stopping now.
1227        }
1228      }
1229
1230      for (WriterThread t : writerThreads) {
1231        if (!progress_failed && reporter != null && !reporter.progress()) {
1232          progress_failed = true;
1233        }
1234        try {
1235          t.join();
1236        } catch (InterruptedException ie) {
1237          IOException iie = new InterruptedIOException();
1238          iie.initCause(ie);
1239          throw iie;
1240        }
1241      }
1242      controller.checkForErrors();
1243      LOG.info("{} split writers finished; closing.", this.writerThreads.size());
1244      return (!progress_failed);
1245    }
1246
1247    public abstract List<Path> finishWritingAndClose() throws IOException;
1248
1249    /**
1250     * @return a map from encoded region ID to the number of edits written out for that region.
1251     */
1252    public abstract Map<byte[], Long> getOutputCounts();
1253
1254    /**
1255     * @return number of regions we've recovered
1256     */
1257    public abstract int getNumberOfRecoveredRegions();
1258
1259    /**
1260     * @param buffer A WAL Edit Entry
1261     * @throws IOException
1262     */
1263    public abstract void append(RegionEntryBuffer buffer) throws IOException;
1264
1265    /**
1266     * WriterThread call this function to help flush internal remaining edits in buffer before close
1267     * @return true when underlying sink has something to flush
1268     */
1269    public boolean flush() throws IOException {
1270      return false;
1271    }
1272
1273    /**
1274     * Some WALEdit's contain only KV's for account on what happened to a region.
1275     * Not all sinks will want to get all of those edits.
1276     *
1277     * @return Return true if this sink wants to accept this region-level WALEdit.
1278     */
1279    public abstract boolean keepRegionEvent(Entry entry);
1280  }
1281
1282  /**
1283   * Class that manages the output streams from the log splitting process.
1284   */
1285  class LogRecoveredEditsOutputSink extends OutputSink {
1286
1287    public LogRecoveredEditsOutputSink(PipelineController controller, EntryBuffers entryBuffers,
1288        int numWriters) {
1289      // More threads could potentially write faster at the expense
1290      // of causing more disk seeks as the logs are split.
1291      // 3. After a certain setting (probably around 3) the
1292      // process will be bound on the reader in the current
1293      // implementation anyway.
1294      super(controller, entryBuffers, numWriters);
1295    }
1296
1297    /**
1298     * @return null if failed to report progress
1299     * @throws IOException
1300     */
1301    @Override
1302    public List<Path> finishWritingAndClose() throws IOException {
1303      boolean isSuccessful = false;
1304      List<Path> result = null;
1305      try {
1306        isSuccessful = finishWriting(false);
1307      } finally {
1308        result = close();
1309        List<IOException> thrown = closeLogWriters(null);
1310        if (CollectionUtils.isNotEmpty(thrown)) {
1311          throw MultipleIOException.createIOException(thrown);
1312        }
1313      }
1314      if (isSuccessful) {
1315        splits = result;
1316      }
1317      return splits;
1318    }
1319
1320    // delete the one with fewer wal entries
1321    private void deleteOneWithFewerEntries(WriterAndPath wap, Path dst)
1322        throws IOException {
1323      long dstMinLogSeqNum = -1L;
1324      try (WAL.Reader reader = walFactory.createReader(walFS, dst)) {
1325        WAL.Entry entry = reader.next();
1326        if (entry != null) {
1327          dstMinLogSeqNum = entry.getKey().getSequenceId();
1328        }
1329      } catch (EOFException e) {
1330        LOG.debug("Got EOF when reading first WAL entry from {}, an empty or broken WAL file?",
1331            dst, e);
1332      }
1333      if (wap.minLogSeqNum < dstMinLogSeqNum) {
1334        LOG.warn("Found existing old edits file. It could be the result of a previous failed"
1335            + " split attempt or we have duplicated wal entries. Deleting " + dst + ", length="
1336            + walFS.getFileStatus(dst).getLen());
1337        if (!walFS.delete(dst, false)) {
1338          LOG.warn("Failed deleting of old {}", dst);
1339          throw new IOException("Failed deleting of old " + dst);
1340        }
1341      } else {
1342        LOG.warn("Found existing old edits file and we have less entries. Deleting " + wap.p
1343            + ", length=" + walFS.getFileStatus(wap.p).getLen());
1344        if (!walFS.delete(wap.p, false)) {
1345          LOG.warn("Failed deleting of {}", wap.p);
1346          throw new IOException("Failed deleting of " + wap.p);
1347        }
1348      }
1349    }
1350
1351    /**
1352     * Close all of the output streams.
1353     * @return the list of paths written.
1354     */
1355    List<Path> close() throws IOException {
1356      Preconditions.checkState(!closeAndCleanCompleted);
1357
1358      final List<Path> paths = new ArrayList<>();
1359      final List<IOException> thrown = Lists.newArrayList();
1360      ThreadPoolExecutor closeThreadPool = Threads
1361          .getBoundedCachedThreadPool(numThreads, 30L, TimeUnit.SECONDS, new ThreadFactory() {
1362            private int count = 1;
1363
1364            @Override public Thread newThread(Runnable r) {
1365              Thread t = new Thread(r, "split-log-closeStream-" + count++);
1366              return t;
1367            }
1368          });
1369      CompletionService<Void> completionService = new ExecutorCompletionService<>(closeThreadPool);
1370      boolean progress_failed;
1371      try {
1372        progress_failed = executeCloseTask(completionService, thrown, paths);
1373      } catch (InterruptedException e) {
1374        IOException iie = new InterruptedIOException();
1375        iie.initCause(e);
1376        throw iie;
1377      } catch (ExecutionException e) {
1378        throw new IOException(e.getCause());
1379      } finally {
1380        closeThreadPool.shutdownNow();
1381      }
1382      if (!thrown.isEmpty()) {
1383        throw MultipleIOException.createIOException(thrown);
1384      }
1385      writersClosed = true;
1386      closeAndCleanCompleted = true;
1387      if (progress_failed) {
1388        return null;
1389      }
1390      return paths;
1391    }
1392
1393    /**
1394     * @param completionService threadPool to execute the closing tasks
1395     * @param thrown store the exceptions
1396     * @param paths arrayList to store the paths written
1397     * @return if close tasks executed successful
1398     */
1399    boolean executeCloseTask(CompletionService<Void> completionService,
1400        List<IOException> thrown, List<Path> paths)
1401        throws InterruptedException, ExecutionException {
1402      for (final Map.Entry<String, SinkWriter> writersEntry : writers.entrySet()) {
1403        if (LOG.isTraceEnabled()) {
1404          LOG.trace("Submitting close of " + ((WriterAndPath) writersEntry.getValue()).p);
1405        }
1406        completionService.submit(new Callable<Void>() {
1407          @Override public Void call() throws Exception {
1408            WriterAndPath wap = (WriterAndPath) writersEntry.getValue();
1409            Path dst = closeWriter(writersEntry.getKey(), wap, thrown);
1410            paths.add(dst);
1411            return null;
1412          }
1413        });
1414      }
1415      boolean progress_failed = false;
1416      for (int i = 0, n = this.writers.size(); i < n; i++) {
1417        Future<Void> future = completionService.take();
1418        future.get();
1419        if (!progress_failed && reporter != null && !reporter.progress()) {
1420          progress_failed = true;
1421        }
1422      }
1423      return progress_failed;
1424    }
1425
1426    Path closeWriter(String encodedRegionName, WriterAndPath wap,
1427        List<IOException> thrown) throws IOException{
1428      LOG.trace("Closing " + wap.p);
1429      try {
1430        wap.w.close();
1431      } catch (IOException ioe) {
1432        LOG.error("Couldn't close log at " + wap.p, ioe);
1433        thrown.add(ioe);
1434        return null;
1435      }
1436      if (LOG.isDebugEnabled()) {
1437        LOG.debug("Closed wap " + wap.p + " (wrote " + wap.editsWritten
1438            + " edits, skipped " + wap.editsSkipped + " edits in "
1439            + (wap.nanosSpent / 1000 / 1000) + "ms");
1440      }
1441      if (wap.editsWritten == 0) {
1442        // just remove the empty recovered.edits file
1443        if (walFS.exists(wap.p) && !walFS.delete(wap.p, false)) {
1444          LOG.warn("Failed deleting empty " + wap.p);
1445          throw new IOException("Failed deleting empty  " + wap.p);
1446        }
1447        return null;
1448      }
1449
1450      Path dst = getCompletedRecoveredEditsFilePath(wap.p,
1451          regionMaximumEditLogSeqNum.get(encodedRegionName));
1452      try {
1453        if (!dst.equals(wap.p) && walFS.exists(dst)) {
1454          deleteOneWithFewerEntries(wap, dst);
1455        }
1456        // Skip the unit tests which create a splitter that reads and
1457        // writes the data without touching disk.
1458        // TestHLogSplit#testThreading is an example.
1459        if (walFS.exists(wap.p)) {
1460          if (!walFS.rename(wap.p, dst)) {
1461            throw new IOException("Failed renaming " + wap.p + " to " + dst);
1462          }
1463          LOG.info("Rename " + wap.p + " to " + dst);
1464        }
1465      } catch (IOException ioe) {
1466        LOG.error("Couldn't rename " + wap.p + " to " + dst, ioe);
1467        thrown.add(ioe);
1468        return null;
1469      }
1470      return dst;
1471    }
1472
1473    private List<IOException> closeLogWriters(List<IOException> thrown) throws IOException {
1474      if (writersClosed) {
1475        return thrown;
1476      }
1477      if (thrown == null) {
1478        thrown = Lists.newArrayList();
1479      }
1480      try {
1481        for (WriterThread t : writerThreads) {
1482          while (t.isAlive()) {
1483            t.shouldStop = true;
1484            t.interrupt();
1485            try {
1486              t.join(10);
1487            } catch (InterruptedException e) {
1488              IOException iie = new InterruptedIOException();
1489              iie.initCause(e);
1490              throw iie;
1491            }
1492          }
1493        }
1494      } finally {
1495        WriterAndPath wap = null;
1496        for (SinkWriter tmpWAP : writers.values()) {
1497          try {
1498            wap = (WriterAndPath) tmpWAP;
1499            wap.w.close();
1500          } catch (IOException ioe) {
1501            LOG.error("Couldn't close log at " + wap.p, ioe);
1502            thrown.add(ioe);
1503            continue;
1504          }
1505          LOG.info(
1506              "Closed log " + wap.p + " (wrote " + wap.editsWritten + " edits in " + (wap.nanosSpent
1507                  / 1000 / 1000) + "ms)");
1508        }
1509        writersClosed = true;
1510      }
1511
1512      return thrown;
1513    }
1514
1515    /**
1516     * Get a writer and path for a log starting at the given entry. This function is threadsafe so
1517     * long as multiple threads are always acting on different regions.
1518     * @return null if this region shouldn't output any logs
1519     */
1520    WriterAndPath getWriterAndPath(Entry entry, boolean reusable) throws IOException {
1521      byte region[] = entry.getKey().getEncodedRegionName();
1522      String regionName = Bytes.toString(region);
1523      WriterAndPath ret = (WriterAndPath) writers.get(regionName);
1524      if (ret != null) {
1525        return ret;
1526      }
1527      // If we already decided that this region doesn't get any output
1528      // we don't need to check again.
1529      if (blacklistedRegions.contains(region)) {
1530        return null;
1531      }
1532      ret = createWAP(region, entry);
1533      if (ret == null) {
1534        blacklistedRegions.add(region);
1535        return null;
1536      }
1537      if(reusable) {
1538        writers.put(regionName, ret);
1539      }
1540      return ret;
1541    }
1542
1543    /**
1544     * @return a path with a write for that path. caller should close.
1545     */
1546    WriterAndPath createWAP(byte[] region, Entry entry) throws IOException {
1547      Path regionedits = getRegionSplitEditsPath(entry,
1548          fileBeingSplit.getPath().getName(), conf);
1549      if (regionedits == null) {
1550        return null;
1551      }
1552      FileSystem walFs = FSUtils.getWALFileSystem(conf);
1553      if (walFs.exists(regionedits)) {
1554        LOG.warn("Found old edits file. It could be the "
1555            + "result of a previous failed split attempt. Deleting " + regionedits + ", length="
1556            + walFs.getFileStatus(regionedits).getLen());
1557        if (!walFs.delete(regionedits, false)) {
1558          LOG.warn("Failed delete of old {}", regionedits);
1559        }
1560      }
1561      Writer w = createWriter(regionedits);
1562      LOG.debug("Creating writer path={}", regionedits);
1563      return new WriterAndPath(regionedits, w, entry.getKey().getSequenceId());
1564    }
1565
1566    void filterCellByStore(Entry logEntry) {
1567      Map<byte[], Long> maxSeqIdInStores =
1568          regionMaxSeqIdInStores.get(Bytes.toString(logEntry.getKey().getEncodedRegionName()));
1569      if (MapUtils.isEmpty(maxSeqIdInStores)) {
1570        return;
1571      }
1572      // Create the array list for the cells that aren't filtered.
1573      // We make the assumption that most cells will be kept.
1574      ArrayList<Cell> keptCells = new ArrayList<>(logEntry.getEdit().getCells().size());
1575      for (Cell cell : logEntry.getEdit().getCells()) {
1576        if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
1577          keptCells.add(cell);
1578        } else {
1579          byte[] family = CellUtil.cloneFamily(cell);
1580          Long maxSeqId = maxSeqIdInStores.get(family);
1581          // Do not skip cell even if maxSeqId is null. Maybe we are in a rolling upgrade,
1582          // or the master was crashed before and we can not get the information.
1583          if (maxSeqId == null || maxSeqId.longValue() < logEntry.getKey().getSequenceId()) {
1584            keptCells.add(cell);
1585          }
1586        }
1587      }
1588
1589      // Anything in the keptCells array list is still live.
1590      // So rather than removing the cells from the array list
1591      // which would be an O(n^2) operation, we just replace the list
1592      logEntry.getEdit().setCells(keptCells);
1593    }
1594
1595    @Override
1596    public void append(RegionEntryBuffer buffer) throws IOException {
1597      appendBuffer(buffer, true);
1598    }
1599
1600    WriterAndPath appendBuffer(RegionEntryBuffer buffer, boolean reusable) throws IOException{
1601      List<Entry> entries = buffer.entryBuffer;
1602      if (entries.isEmpty()) {
1603        LOG.warn("got an empty buffer, skipping");
1604        return null;
1605      }
1606
1607      WriterAndPath wap = null;
1608
1609      long startTime = System.nanoTime();
1610      try {
1611        int editsCount = 0;
1612
1613        for (Entry logEntry : entries) {
1614          if (wap == null) {
1615            wap = getWriterAndPath(logEntry, reusable);
1616            if (wap == null) {
1617              if (LOG.isTraceEnabled()) {
1618                // This log spews the full edit. Can be massive in the log. Enable only debugging
1619                // WAL lost edit issues.
1620                LOG.trace("getWriterAndPath decided we don't need to write edits for {}", logEntry);
1621              }
1622              return null;
1623            }
1624          }
1625          filterCellByStore(logEntry);
1626          if (!logEntry.getEdit().isEmpty()) {
1627            wap.w.append(logEntry);
1628            this.updateRegionMaximumEditLogSeqNum(logEntry);
1629            editsCount++;
1630          } else {
1631            wap.incrementSkippedEdits(1);
1632          }
1633        }
1634        // Pass along summary statistics
1635        wap.incrementEdits(editsCount);
1636        wap.incrementNanoTime(System.nanoTime() - startTime);
1637      } catch (IOException e) {
1638          e = e instanceof RemoteException ?
1639                  ((RemoteException)e).unwrapRemoteException() : e;
1640        LOG.error(HBaseMarkers.FATAL, "Got while writing log entry to log", e);
1641        throw e;
1642      }
1643      return wap;
1644    }
1645
1646    @Override
1647    public boolean keepRegionEvent(Entry entry) {
1648      ArrayList<Cell> cells = entry.getEdit().getCells();
1649      for (Cell cell : cells) {
1650        if (WALEdit.isCompactionMarker(cell)) {
1651          return true;
1652        }
1653      }
1654      return false;
1655    }
1656
1657    /**
1658     * @return a map from encoded region ID to the number of edits written out for that region.
1659     */
1660    @Override
1661    public Map<byte[], Long> getOutputCounts() {
1662      TreeMap<byte[], Long> ret = new TreeMap<>(Bytes.BYTES_COMPARATOR);
1663      for (Map.Entry<String, SinkWriter> entry : writers.entrySet()) {
1664        ret.put(Bytes.toBytes(entry.getKey()), entry.getValue().editsWritten);
1665      }
1666      return ret;
1667    }
1668
1669    @Override
1670    public int getNumberOfRecoveredRegions() {
1671      return writers.size();
1672    }
1673  }
1674
1675  /**
1676   *
1677   */
1678  class BoundedLogWriterCreationOutputSink extends LogRecoveredEditsOutputSink {
1679
1680    private ConcurrentHashMap<String, Long> regionRecoverStatMap = new ConcurrentHashMap<>();
1681
1682    public BoundedLogWriterCreationOutputSink(PipelineController controller,
1683        EntryBuffers entryBuffers, int numWriters) {
1684      super(controller, entryBuffers, numWriters);
1685    }
1686
1687    @Override
1688    public List<Path> finishWritingAndClose() throws IOException {
1689      boolean isSuccessful;
1690      List<Path> result;
1691      try {
1692        isSuccessful = finishWriting(false);
1693      } finally {
1694        result = close();
1695      }
1696      if (isSuccessful) {
1697        splits = result;
1698      }
1699      return splits;
1700    }
1701
1702    @Override
1703    boolean executeCloseTask(CompletionService<Void> completionService,
1704        List<IOException> thrown, List<Path> paths)
1705        throws InterruptedException, ExecutionException {
1706      for (final Map.Entry<byte[], RegionEntryBuffer> buffer : entryBuffers.buffers.entrySet()) {
1707        LOG.info("Submitting writeThenClose of {}",
1708            Arrays.toString(buffer.getValue().encodedRegionName));
1709        completionService.submit(new Callable<Void>() {
1710          @Override
1711          public Void call() throws Exception {
1712            Path dst = writeThenClose(buffer.getValue());
1713            paths.add(dst);
1714            return null;
1715          }
1716        });
1717      }
1718      boolean progress_failed = false;
1719      for (int i = 0, n = entryBuffers.buffers.size(); i < n; i++) {
1720        Future<Void> future = completionService.take();
1721        future.get();
1722        if (!progress_failed && reporter != null && !reporter.progress()) {
1723          progress_failed = true;
1724        }
1725      }
1726
1727      return progress_failed;
1728    }
1729
1730    /**
1731     * since the splitting process may create multiple output files, we need a map
1732     * regionRecoverStatMap to track the output count of each region.
1733     * @return a map from encoded region ID to the number of edits written out for that region.
1734     */
1735    @Override
1736    public Map<byte[], Long> getOutputCounts() {
1737      Map<byte[], Long> regionRecoverStatMapResult = new HashMap<>();
1738      for(Map.Entry<String, Long> entry: regionRecoverStatMap.entrySet()){
1739        regionRecoverStatMapResult.put(Bytes.toBytes(entry.getKey()), entry.getValue());
1740      }
1741      return regionRecoverStatMapResult;
1742    }
1743
1744    /**
1745     * @return the number of recovered regions
1746     */
1747    @Override
1748    public int getNumberOfRecoveredRegions() {
1749      return regionRecoverStatMap.size();
1750    }
1751
1752    /**
1753     * Append the buffer to a new recovered edits file, then close it after all done
1754     * @param buffer contain all entries of a certain region
1755     * @throws IOException when closeWriter failed
1756     */
1757    @Override
1758    public void append(RegionEntryBuffer buffer) throws IOException {
1759      writeThenClose(buffer);
1760    }
1761
1762    private Path writeThenClose(RegionEntryBuffer buffer) throws IOException {
1763      WriterAndPath wap = appendBuffer(buffer, false);
1764      if(wap != null) {
1765        String encodedRegionName = Bytes.toString(buffer.encodedRegionName);
1766        Long value = regionRecoverStatMap.putIfAbsent(encodedRegionName, wap.editsWritten);
1767        if (value != null) {
1768          Long newValue = regionRecoverStatMap.get(encodedRegionName) + wap.editsWritten;
1769          regionRecoverStatMap.put(encodedRegionName, newValue);
1770        }
1771      }
1772
1773      Path dst = null;
1774      List<IOException> thrown = new ArrayList<>();
1775      if(wap != null){
1776        dst = closeWriter(Bytes.toString(buffer.encodedRegionName), wap, thrown);
1777      }
1778      if (!thrown.isEmpty()) {
1779        throw MultipleIOException.createIOException(thrown);
1780      }
1781      return dst;
1782    }
1783  }
1784
1785  /**
1786   * Class wraps the actual writer which writes data out and related statistics
1787   */
1788  public abstract static class SinkWriter {
1789    /* Count of edits written to this path */
1790    long editsWritten = 0;
1791    /* Count of edits skipped to this path */
1792    long editsSkipped = 0;
1793    /* Number of nanos spent writing to this log */
1794    long nanosSpent = 0;
1795
1796    void incrementEdits(int edits) {
1797      editsWritten += edits;
1798    }
1799
1800    void incrementSkippedEdits(int skipped) {
1801      editsSkipped += skipped;
1802    }
1803
1804    void incrementNanoTime(long nanos) {
1805      nanosSpent += nanos;
1806    }
1807  }
1808
1809  /**
1810   * Private data structure that wraps a Writer and its Path, also collecting statistics about the
1811   * data written to this output.
1812   */
1813  private final static class WriterAndPath extends SinkWriter {
1814    final Path p;
1815    final Writer w;
1816    final long minLogSeqNum;
1817
1818    WriterAndPath(final Path p, final Writer w, final long minLogSeqNum) {
1819      this.p = p;
1820      this.w = w;
1821      this.minLogSeqNum = minLogSeqNum;
1822    }
1823  }
1824
1825  static class CorruptedLogFileException extends Exception {
1826    private static final long serialVersionUID = 1L;
1827
1828    CorruptedLogFileException(String s) {
1829      super(s);
1830    }
1831  }
1832
1833  /** A struct used by getMutationsFromWALEntry */
1834  public static class MutationReplay implements Comparable<MutationReplay> {
1835    public MutationReplay(MutationType type, Mutation mutation, long nonceGroup, long nonce) {
1836      this.type = type;
1837      this.mutation = mutation;
1838      if(this.mutation.getDurability() != Durability.SKIP_WAL) {
1839        // using ASYNC_WAL for relay
1840        this.mutation.setDurability(Durability.ASYNC_WAL);
1841      }
1842      this.nonceGroup = nonceGroup;
1843      this.nonce = nonce;
1844    }
1845
1846    public final MutationType type;
1847    public final Mutation mutation;
1848    public final long nonceGroup;
1849    public final long nonce;
1850
1851    @Override
1852    public int compareTo(final MutationReplay d) {
1853      return this.mutation.compareTo(d.mutation);
1854    }
1855
1856    @Override
1857    public boolean equals(Object obj) {
1858      if(!(obj instanceof MutationReplay)) {
1859        return false;
1860      } else {
1861        return this.compareTo((MutationReplay)obj) == 0;
1862      }
1863    }
1864
1865    @Override
1866    public int hashCode() {
1867      return this.mutation.hashCode();
1868    }
1869  }
1870
1871  /**
1872   * This function is used to construct mutations from a WALEntry. It also
1873   * reconstructs WALKey &amp; WALEdit from the passed in WALEntry
1874   * @param entry
1875   * @param cells
1876   * @param logEntry pair of WALKey and WALEdit instance stores WALKey and WALEdit instances
1877   *          extracted from the passed in WALEntry.
1878   * @return list of Pair&lt;MutationType, Mutation&gt; to be replayed
1879   * @throws IOException
1880   */
1881  public static List<MutationReplay> getMutationsFromWALEntry(WALEntry entry, CellScanner cells,
1882      Pair<WALKey, WALEdit> logEntry, Durability durability) throws IOException {
1883    if (entry == null) {
1884      // return an empty array
1885      return Collections.emptyList();
1886    }
1887
1888    long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ?
1889      entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber();
1890    int count = entry.getAssociatedCellCount();
1891    List<MutationReplay> mutations = new ArrayList<>();
1892    Cell previousCell = null;
1893    Mutation m = null;
1894    WALKeyImpl key = null;
1895    WALEdit val = null;
1896    if (logEntry != null) {
1897      val = new WALEdit();
1898    }
1899
1900    for (int i = 0; i < count; i++) {
1901      // Throw index out of bounds if our cell count is off
1902      if (!cells.advance()) {
1903        throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
1904      }
1905      Cell cell = cells.current();
1906      if (val != null) val.add(cell);
1907
1908      boolean isNewRowOrType =
1909          previousCell == null || previousCell.getTypeByte() != cell.getTypeByte()
1910              || !CellUtil.matchingRows(previousCell, cell);
1911      if (isNewRowOrType) {
1912        // Create new mutation
1913        if (CellUtil.isDelete(cell)) {
1914          m = new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
1915          // Deletes don't have nonces.
1916          mutations.add(new MutationReplay(
1917              MutationType.DELETE, m, HConstants.NO_NONCE, HConstants.NO_NONCE));
1918        } else {
1919          m = new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
1920          // Puts might come from increment or append, thus we need nonces.
1921          long nonceGroup = entry.getKey().hasNonceGroup()
1922              ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
1923          long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
1924          mutations.add(new MutationReplay(MutationType.PUT, m, nonceGroup, nonce));
1925        }
1926      }
1927      if (CellUtil.isDelete(cell)) {
1928        ((Delete) m).add(cell);
1929      } else {
1930        ((Put) m).add(cell);
1931      }
1932      m.setDurability(durability);
1933      previousCell = cell;
1934    }
1935
1936    // reconstruct WALKey
1937    if (logEntry != null) {
1938      org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALKey walKeyProto =
1939          entry.getKey();
1940      List<UUID> clusterIds = new ArrayList<>(walKeyProto.getClusterIdsCount());
1941      for (HBaseProtos.UUID uuid : entry.getKey().getClusterIdsList()) {
1942        clusterIds.add(new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits()));
1943      }
1944      key = new WALKeyImpl(walKeyProto.getEncodedRegionName().toByteArray(), TableName.valueOf(
1945              walKeyProto.getTableName().toByteArray()), replaySeqId, walKeyProto.getWriteTime(),
1946              clusterIds, walKeyProto.getNonceGroup(), walKeyProto.getNonce(), null);
1947      logEntry.setFirst(key);
1948      logEntry.setSecond(val);
1949    }
1950
1951    return mutations;
1952  }
1953}