001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import java.io.EOFException;
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.io.InterruptedIOException;
024import java.text.ParseException;
025import java.util.ArrayList;
026import java.util.Arrays;
027import java.util.Collections;
028import java.util.HashMap;
029import java.util.List;
030import java.util.Map;
031import java.util.NavigableSet;
032import java.util.Set;
033import java.util.TreeMap;
034import java.util.TreeSet;
035import java.util.UUID;
036import java.util.concurrent.Callable;
037import java.util.concurrent.CompletionService;
038import java.util.concurrent.ConcurrentHashMap;
039import java.util.concurrent.ExecutionException;
040import java.util.concurrent.ExecutorCompletionService;
041import java.util.concurrent.Future;
042import java.util.concurrent.ThreadFactory;
043import java.util.concurrent.ThreadPoolExecutor;
044import java.util.concurrent.TimeUnit;
045import java.util.concurrent.atomic.AtomicLong;
046import java.util.concurrent.atomic.AtomicReference;
047import java.util.regex.Matcher;
048import java.util.regex.Pattern;
049import org.apache.commons.lang3.ArrayUtils;
050import org.apache.hadoop.conf.Configuration;
051import org.apache.hadoop.fs.FileAlreadyExistsException;
052import org.apache.hadoop.fs.FileStatus;
053import org.apache.hadoop.fs.FileSystem;
054import org.apache.hadoop.fs.Path;
055import org.apache.hadoop.fs.PathFilter;
056import org.apache.hadoop.hbase.Cell;
057import org.apache.hadoop.hbase.CellScanner;
058import org.apache.hadoop.hbase.CellUtil;
059import org.apache.hadoop.hbase.HBaseConfiguration;
060import org.apache.hadoop.hbase.HConstants;
061import org.apache.hadoop.hbase.TableName;
062import org.apache.hadoop.hbase.client.Delete;
063import org.apache.hadoop.hbase.client.Durability;
064import org.apache.hadoop.hbase.client.Mutation;
065import org.apache.hadoop.hbase.client.Put;
066import org.apache.hadoop.hbase.client.RegionInfo;
067import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
068import org.apache.hadoop.hbase.io.HeapSize;
069import org.apache.hadoop.hbase.log.HBaseMarkers;
070import org.apache.hadoop.hbase.master.SplitLogManager;
071import org.apache.hadoop.hbase.monitoring.MonitoredTask;
072import org.apache.hadoop.hbase.monitoring.TaskMonitor;
073import org.apache.hadoop.hbase.regionserver.HRegion;
074import org.apache.hadoop.hbase.procedure2.util.StringUtils;
075import org.apache.hadoop.hbase.regionserver.LastSequenceId;
076import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
077import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
078import org.apache.hadoop.hbase.util.Bytes;
079import org.apache.hadoop.hbase.util.CancelableProgressable;
080import org.apache.hadoop.hbase.util.ClassSize;
081import org.apache.hadoop.hbase.util.CollectionUtils.IOExceptionSupplier;
082import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
083import org.apache.hadoop.hbase.util.FSUtils;
084import org.apache.hadoop.hbase.util.Pair;
085import org.apache.hadoop.hbase.util.Threads;
086import org.apache.hadoop.hbase.wal.WAL.Entry;
087import org.apache.hadoop.hbase.wal.WAL.Reader;
088import org.apache.hadoop.hbase.wal.WALProvider.Writer;
089import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
090import org.apache.hadoop.io.MultipleIOException;
091import org.apache.hadoop.ipc.RemoteException;
092import org.apache.yetus.audience.InterfaceAudience;
093import org.slf4j.Logger;
094import org.slf4j.LoggerFactory;
095
096import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
097import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
098import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
099import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
100import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
101import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils;
102
103import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
104import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
105import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
106import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
107import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
108/**
109 * This class is responsible for splitting up a bunch of regionserver commit log
110 * files that are no longer being written to, into new files, one per region, for
111 * recovering data on startup. Delete the old log files when finished.
112 */
113@InterfaceAudience.Private
114public class WALSplitter {
115  private static final Logger LOG = LoggerFactory.getLogger(WALSplitter.class);
116
117  /** By default we retry errors in splitting, rather than skipping. */
118  public static final boolean SPLIT_SKIP_ERRORS_DEFAULT = false;
119
120  // Parameters for split process
121  protected final Path walDir;
122  protected final FileSystem walFS;
123  protected final Configuration conf;
124
125  // Major subcomponents of the split process.
126  // These are separated into inner classes to make testing easier.
127  OutputSink outputSink;
128  private EntryBuffers entryBuffers;
129
130  private SplitLogWorkerCoordination splitLogWorkerCoordination;
131  private final WALFactory walFactory;
132
133  private MonitoredTask status;
134
135  // For checking the latest flushed sequence id
136  protected final LastSequenceId sequenceIdChecker;
137
138  // Map encodedRegionName -> lastFlushedSequenceId
139  protected Map<String, Long> lastFlushedSequenceIds = new ConcurrentHashMap<>();
140
141  // Map encodedRegionName -> maxSeqIdInStores
142  protected Map<String, Map<byte[], Long>> regionMaxSeqIdInStores = new ConcurrentHashMap<>();
143
144  // the file being split currently
145  private FileStatus fileBeingSplit;
146
147  // if we limit the number of writers opened for sinking recovered edits
148  private final boolean splitWriterCreationBounded;
149
150  public final static String SPLIT_WRITER_CREATION_BOUNDED = "hbase.split.writer.creation.bounded";
151
152
153  @VisibleForTesting
154  WALSplitter(final WALFactory factory, Configuration conf, Path walDir,
155      FileSystem walFS, LastSequenceId idChecker,
156      SplitLogWorkerCoordination splitLogWorkerCoordination) {
157    this.conf = HBaseConfiguration.create(conf);
158    String codecClassName = conf
159        .get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
160    this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName);
161    this.walDir = walDir;
162    this.walFS = walFS;
163    this.sequenceIdChecker = idChecker;
164    this.splitLogWorkerCoordination = splitLogWorkerCoordination;
165
166    this.walFactory = factory;
167    PipelineController controller = new PipelineController();
168
169    this.splitWriterCreationBounded = conf.getBoolean(SPLIT_WRITER_CREATION_BOUNDED, false);
170
171    entryBuffers = new EntryBuffers(controller,
172        this.conf.getLong("hbase.regionserver.hlog.splitlog.buffersize", 128 * 1024 * 1024),
173        splitWriterCreationBounded);
174
175    int numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
176    if(splitWriterCreationBounded){
177      outputSink = new BoundedLogWriterCreationOutputSink(
178          controller, entryBuffers, numWriterThreads);
179    }else {
180      outputSink = new LogRecoveredEditsOutputSink(controller, entryBuffers, numWriterThreads);
181    }
182  }
183
184  /**
185   * Splits a WAL file into region's recovered-edits directory.
186   * This is the main entry point for distributed log splitting from SplitLogWorker.
187   * <p>
188   * If the log file has N regions then N recovered.edits files will be produced.
189   * <p>
190   * @return false if it is interrupted by the progress-able.
191   */
192  public static boolean splitLogFile(Path walDir, FileStatus logfile, FileSystem walFS,
193      Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker,
194      SplitLogWorkerCoordination splitLogWorkerCoordination, final WALFactory factory)
195      throws IOException {
196    WALSplitter s = new WALSplitter(factory, conf, walDir, walFS, idChecker,
197        splitLogWorkerCoordination);
198    return s.splitLogFile(logfile, reporter);
199  }
200
201  // A wrapper to split one log folder using the method used by distributed
202  // log splitting. Used by tools and unit tests. It should be package private.
203  // It is public only because TestWALObserver is in a different package,
204  // which uses this method to do log splitting.
205  @VisibleForTesting
206  public static List<Path> split(Path rootDir, Path logDir, Path oldLogDir,
207      FileSystem walFS, Configuration conf, final WALFactory factory) throws IOException {
208    final FileStatus[] logfiles = SplitLogManager.getFileList(conf,
209        Collections.singletonList(logDir), null);
210    List<Path> splits = new ArrayList<>();
211    if (ArrayUtils.isNotEmpty(logfiles)) {
212      for (FileStatus logfile: logfiles) {
213        WALSplitter s = new WALSplitter(factory, conf, rootDir, walFS, null, null);
214        if (s.splitLogFile(logfile, null)) {
215          finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf);
216          if (s.outputSink.splits != null) {
217            splits.addAll(s.outputSink.splits);
218          }
219        }
220      }
221    }
222    if (!walFS.delete(logDir, true)) {
223      throw new IOException("Unable to delete src dir: " + logDir);
224    }
225    return splits;
226  }
227
228  /**
229   * log splitting implementation, splits one log file.
230   * @param logfile should be an actual log file.
231   */
232  @VisibleForTesting
233  boolean splitLogFile(FileStatus logfile, CancelableProgressable reporter) throws IOException {
234    Preconditions.checkState(status == null);
235    Preconditions.checkArgument(logfile.isFile(),
236        "passed in file status is for something other than a regular file.");
237    boolean isCorrupted = false;
238    boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors",
239      SPLIT_SKIP_ERRORS_DEFAULT);
240    int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024);
241    Path logPath = logfile.getPath();
242    boolean outputSinkStarted = false;
243    boolean progress_failed = false;
244    int editsCount = 0;
245    int editsSkipped = 0;
246
247    status = TaskMonitor.get().createStatus(
248          "Splitting log file " + logfile.getPath() + "into a temporary staging area.");
249    Reader logFileReader = null;
250    this.fileBeingSplit = logfile;
251    long startTS = EnvironmentEdgeManager.currentTime();
252    try {
253      long logLength = logfile.getLen();
254      LOG.info("Splitting WAL={}, size={} ({} bytes)", logPath, StringUtils.humanSize(logLength),
255          logLength);
256      status.setStatus("Opening log file");
257      if (reporter != null && !reporter.progress()) {
258        progress_failed = true;
259        return false;
260      }
261      logFileReader = getReader(logfile, skipErrors, reporter);
262      if (logFileReader == null) {
263        LOG.warn("Nothing to split in WAL={}", logPath);
264        return true;
265      }
266      long openCost = EnvironmentEdgeManager.currentTime() - startTS;
267      LOG.info("Open WAL={} cost {} ms", logPath, openCost);
268      int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3);
269      int numOpenedFilesLastCheck = 0;
270      outputSink.setReporter(reporter);
271      outputSink.startWriterThreads();
272      outputSinkStarted = true;
273      Entry entry;
274      Long lastFlushedSequenceId = -1L;
275      startTS = EnvironmentEdgeManager.currentTime();
276      while ((entry = getNextLogLine(logFileReader, logPath, skipErrors)) != null) {
277        byte[] region = entry.getKey().getEncodedRegionName();
278        String encodedRegionNameAsStr = Bytes.toString(region);
279        lastFlushedSequenceId = lastFlushedSequenceIds.get(encodedRegionNameAsStr);
280        if (lastFlushedSequenceId == null) {
281          if (sequenceIdChecker != null) {
282            RegionStoreSequenceIds ids = sequenceIdChecker.getLastSequenceId(region);
283            Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR);
284            for (StoreSequenceId storeSeqId : ids.getStoreSequenceIdList()) {
285              maxSeqIdInStores.put(storeSeqId.getFamilyName().toByteArray(),
286                storeSeqId.getSequenceId());
287            }
288            regionMaxSeqIdInStores.put(encodedRegionNameAsStr, maxSeqIdInStores);
289            lastFlushedSequenceId = ids.getLastFlushedSequenceId();
290            if (LOG.isDebugEnabled()) {
291              LOG.debug("DLS Last flushed sequenceid for " + encodedRegionNameAsStr + ": " +
292                  TextFormat.shortDebugString(ids));
293            }
294          }
295          if (lastFlushedSequenceId == null) {
296            lastFlushedSequenceId = -1L;
297          }
298          lastFlushedSequenceIds.put(encodedRegionNameAsStr, lastFlushedSequenceId);
299        }
300        if (lastFlushedSequenceId >= entry.getKey().getSequenceId()) {
301          editsSkipped++;
302          continue;
303        }
304        // Don't send Compaction/Close/Open region events to recovered edit type sinks.
305        if (entry.getEdit().isMetaEdit() && !outputSink.keepRegionEvent(entry)) {
306          editsSkipped++;
307          continue;
308        }
309        entryBuffers.appendEntry(entry);
310        editsCount++;
311        int moreWritersFromLastCheck = this.getNumOpenWriters() - numOpenedFilesLastCheck;
312        // If sufficient edits have passed, check if we should report progress.
313        if (editsCount % interval == 0
314            || moreWritersFromLastCheck > numOpenedFilesBeforeReporting) {
315          numOpenedFilesLastCheck = this.getNumOpenWriters();
316          String countsStr = (editsCount - (editsSkipped + outputSink.getSkippedEdits()))
317              + " edits, skipped " + editsSkipped + " edits.";
318          status.setStatus("Split " + countsStr);
319          if (reporter != null && !reporter.progress()) {
320            progress_failed = true;
321            return false;
322          }
323        }
324      }
325    } catch (InterruptedException ie) {
326      IOException iie = new InterruptedIOException();
327      iie.initCause(ie);
328      throw iie;
329    } catch (CorruptedLogFileException e) {
330      LOG.warn("Could not parse, corrupted WAL={}", logPath, e);
331      if (splitLogWorkerCoordination != null) {
332        // Some tests pass in a csm of null.
333        splitLogWorkerCoordination.markCorrupted(walDir, logfile.getPath().getName(), walFS);
334      } else {
335        // for tests only
336        ZKSplitLog.markCorrupted(walDir, logfile.getPath().getName(), walFS);
337      }
338      isCorrupted = true;
339    } catch (IOException e) {
340      e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
341      throw e;
342    } finally {
343      LOG.debug("Finishing writing output logs and closing down");
344      try {
345        if (null != logFileReader) {
346          logFileReader.close();
347        }
348      } catch (IOException exception) {
349        LOG.warn("Could not close WAL reader", exception);
350      }
351      try {
352        if (outputSinkStarted) {
353          // Set progress_failed to true as the immediate following statement will reset its value
354          // when finishWritingAndClose() throws exception, progress_failed has the right value
355          progress_failed = true;
356          progress_failed = outputSink.finishWritingAndClose() == null;
357        }
358      } finally {
359        long processCost = EnvironmentEdgeManager.currentTime() - startTS;
360        // See if length got updated post lease recovery
361        String msg = "Processed " + editsCount + " edits across " +
362            outputSink.getNumberOfRecoveredRegions() + " regions cost " + processCost +
363            " ms; edits skipped=" + editsSkipped + "; WAL=" + logPath + ", size=" +
364            StringUtils.humanSize(logfile.getLen()) + ", length=" + logfile.getLen() +
365            ", corrupted=" + isCorrupted + ", progress failed=" + progress_failed;
366        LOG.info(msg);
367        status.markComplete(msg);
368      }
369    }
370    return !progress_failed;
371  }
372
373  /**
374   * Completes the work done by splitLogFile by archiving logs
375   * <p>
376   * It is invoked by SplitLogManager once it knows that one of the
377   * SplitLogWorkers have completed the splitLogFile() part. If the master
378   * crashes then this function might get called multiple times.
379   * <p>
380   * @param logfile
381   * @param conf
382   * @throws IOException
383   */
384  public static void finishSplitLogFile(String logfile,
385      Configuration conf)  throws IOException {
386    Path walDir = FSUtils.getWALRootDir(conf);
387    Path oldLogDir = new Path(walDir, HConstants.HREGION_OLDLOGDIR_NAME);
388    Path logPath;
389    if (FSUtils.isStartingWithPath(walDir, logfile)) {
390      logPath = new Path(logfile);
391    } else {
392      logPath = new Path(walDir, logfile);
393    }
394    finishSplitLogFile(walDir, oldLogDir, logPath, conf);
395  }
396
397  private static void finishSplitLogFile(Path walDir, Path oldLogDir,
398      Path logPath, Configuration conf) throws IOException {
399    List<Path> processedLogs = new ArrayList<>();
400    List<Path> corruptedLogs = new ArrayList<>();
401    FileSystem walFS = walDir.getFileSystem(conf);
402    if (ZKSplitLog.isCorrupted(walDir, logPath.getName(), walFS)) {
403      corruptedLogs.add(logPath);
404    } else {
405      processedLogs.add(logPath);
406    }
407    archiveLogs(corruptedLogs, processedLogs, oldLogDir, walFS, conf);
408    Path stagingDir = ZKSplitLog.getSplitLogDir(walDir, logPath.getName());
409    walFS.delete(stagingDir, true);
410  }
411
412  /**
413   * Moves processed logs to a oldLogDir after successful processing Moves
414   * corrupted logs (any log that couldn't be successfully parsed to corruptDir
415   * (.corrupt) for later investigation
416   *
417   * @param corruptedLogs
418   * @param processedLogs
419   * @param oldLogDir
420   * @param walFS WAL FileSystem to archive files on.
421   * @param conf
422   * @throws IOException
423   */
424  private static void archiveLogs(
425      final List<Path> corruptedLogs,
426      final List<Path> processedLogs, final Path oldLogDir,
427      final FileSystem walFS, final Configuration conf) throws IOException {
428    final Path corruptDir = new Path(FSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME);
429    if (conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir") != null) {
430      LOG.warn("hbase.regionserver.hlog.splitlog.corrupt.dir is deprecated. Default to {}",
431          corruptDir);
432    }
433    if (!walFS.mkdirs(corruptDir)) {
434      LOG.info("Unable to mkdir {}", corruptDir);
435    }
436    walFS.mkdirs(oldLogDir);
437
438    // this method can get restarted or called multiple times for archiving
439    // the same log files.
440    for (Path corrupted : corruptedLogs) {
441      Path p = new Path(corruptDir, corrupted.getName());
442      if (walFS.exists(corrupted)) {
443        if (!walFS.rename(corrupted, p)) {
444          LOG.warn("Unable to move corrupted log {} to {}", corrupted, p);
445        } else {
446          LOG.warn("Moved corrupted log {} to {}", corrupted, p);
447        }
448      }
449    }
450
451    for (Path p : processedLogs) {
452      Path newPath = AbstractFSWAL.getWALArchivePath(oldLogDir, p);
453      if (walFS.exists(p)) {
454        if (!FSUtils.renameAndSetModifyTime(walFS, p, newPath)) {
455          LOG.warn("Unable to move {} to {}", p, newPath);
456        } else {
457          LOG.info("Archived processed log {} to {}", p, newPath);
458        }
459      }
460    }
461  }
462
463  /**
464   * Path to a file under RECOVERED_EDITS_DIR directory of the region found in
465   * <code>logEntry</code> named for the sequenceid in the passed
466   * <code>logEntry</code>: e.g. /hbase/some_table/2323432434/recovered.edits/2332.
467   * This method also ensures existence of RECOVERED_EDITS_DIR under the region
468   * creating it if necessary.
469   * @param logEntry
470   * @param fileNameBeingSplit the file being split currently. Used to generate tmp file name.
471   * @param tmpDirName of the directory used to sideline old recovered edits file
472   * @param conf
473   * @return Path to file into which to dump split log edits.
474   * @throws IOException
475   */
476  @SuppressWarnings("deprecation")
477  @VisibleForTesting
478  static Path getRegionSplitEditsPath(final Entry logEntry, String fileNameBeingSplit,
479      String tmpDirName, Configuration conf) throws IOException {
480    FileSystem walFS = FSUtils.getWALFileSystem(conf);
481    Path tableDir = FSUtils.getWALTableDir(conf, logEntry.getKey().getTableName());
482    String encodedRegionName = Bytes.toString(logEntry.getKey().getEncodedRegionName());
483    Path regionDir = HRegion.getRegionDir(tableDir, encodedRegionName);
484    Path dir = getRegionDirRecoveredEditsDir(regionDir);
485
486
487    if (walFS.exists(dir) && walFS.isFile(dir)) {
488      Path tmp = new Path(tmpDirName);
489      if (!walFS.exists(tmp)) {
490        walFS.mkdirs(tmp);
491      }
492      tmp = new Path(tmp,
493        HConstants.RECOVERED_EDITS_DIR + "_" + encodedRegionName);
494      LOG.warn("Found existing old file: {}. It could be some "
495        + "leftover of an old installation. It should be a folder instead. "
496        + "So moving it to {}", dir, tmp);
497      if (!walFS.rename(dir, tmp)) {
498        LOG.warn("Failed to sideline old file {}", dir);
499      }
500    }
501
502    if (!walFS.exists(dir) && !walFS.mkdirs(dir)) {
503      LOG.warn("mkdir failed on {}", dir);
504    }
505    // Append fileBeingSplit to prevent name conflict since we may have duplicate wal entries now.
506    // Append file name ends with RECOVERED_LOG_TMPFILE_SUFFIX to ensure
507    // region's replayRecoveredEdits will not delete it
508    String fileName = formatRecoveredEditsFileName(logEntry.getKey().getSequenceId());
509    fileName = getTmpRecoveredEditsFileName(fileName + "-" + fileNameBeingSplit);
510    return new Path(dir, fileName);
511  }
512
513  private static String getTmpRecoveredEditsFileName(String fileName) {
514    return fileName + RECOVERED_LOG_TMPFILE_SUFFIX;
515  }
516
517  /**
518   * Get the completed recovered edits file path, renaming it to be by last edit
519   * in the file from its first edit. Then we could use the name to skip
520   * recovered edits when doing {@link HRegion#replayRecoveredEditsIfAny}.
521   * @param srcPath
522   * @param maximumEditLogSeqNum
523   * @return dstPath take file's last edit log seq num as the name
524   */
525  private static Path getCompletedRecoveredEditsFilePath(Path srcPath,
526      long maximumEditLogSeqNum) {
527    String fileName = formatRecoveredEditsFileName(maximumEditLogSeqNum);
528    return new Path(srcPath.getParent(), fileName);
529  }
530
531  @VisibleForTesting
532  static String formatRecoveredEditsFileName(final long seqid) {
533    return String.format("%019d", seqid);
534  }
535
536  private static final Pattern EDITFILES_NAME_PATTERN = Pattern.compile("-?[0-9]+");
537  private static final String RECOVERED_LOG_TMPFILE_SUFFIX = ".temp";
538
539  /**
540   * @param regionDir
541   *          This regions directory in the filesystem.
542   * @return The directory that holds recovered edits files for the region
543   *         <code>regionDir</code>
544   */
545  public static Path getRegionDirRecoveredEditsDir(final Path regionDir) {
546    return new Path(regionDir, HConstants.RECOVERED_EDITS_DIR);
547  }
548
549  /**
550   * Check whether there is recovered.edits in the region dir
551   * @param conf conf
552   * @param regionInfo the region to check
553   * @throws IOException IOException
554   * @return true if recovered.edits exist in the region dir
555   */
556  public static boolean hasRecoveredEdits(final Configuration conf,
557    final RegionInfo regionInfo) throws IOException {
558    // No recovered.edits for non default replica regions
559    if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
560      return false;
561    }
562    // Only default replica region can reach here, so we can use regioninfo
563    // directly without converting it to default replica's regioninfo.
564    Path regionWALDir =
565        FSUtils.getWALRegionDir(conf, regionInfo.getTable(), regionInfo.getEncodedName());
566    Path regionDir = FSUtils.getRegionDirFromRootDir(FSUtils.getRootDir(conf), regionInfo);
567    Path wrongRegionWALDir =
568        FSUtils.getWrongWALRegionDir(conf, regionInfo.getTable(), regionInfo.getEncodedName());
569    FileSystem walFs = FSUtils.getWALFileSystem(conf);
570    FileSystem rootFs = FSUtils.getRootDirFileSystem(conf);
571    NavigableSet<Path> files = getSplitEditFilesSorted(walFs, regionWALDir);
572    if (!files.isEmpty()) {
573      return true;
574    }
575    files = getSplitEditFilesSorted(rootFs, regionDir);
576    if (!files.isEmpty()) {
577      return true;
578    }
579    files = getSplitEditFilesSorted(walFs, wrongRegionWALDir);
580    return !files.isEmpty();
581  }
582
583
584  /**
585   * Returns sorted set of edit files made by splitter, excluding files
586   * with '.temp' suffix.
587   *
588   * @param walFS WAL FileSystem used to retrieving split edits files.
589   * @param regionDir WAL region dir to look for recovered edits files under.
590   * @return Files in passed <code>regionDir</code> as a sorted set.
591   * @throws IOException
592   */
593  public static NavigableSet<Path> getSplitEditFilesSorted(final FileSystem walFS,
594      final Path regionDir) throws IOException {
595    NavigableSet<Path> filesSorted = new TreeSet<>();
596    Path editsdir = getRegionDirRecoveredEditsDir(regionDir);
597    if (!walFS.exists(editsdir)) {
598      return filesSorted;
599    }
600    FileStatus[] files = FSUtils.listStatus(walFS, editsdir, new PathFilter() {
601      @Override
602      public boolean accept(Path p) {
603        boolean result = false;
604        try {
605          // Return files and only files that match the editfile names pattern.
606          // There can be other files in this directory other than edit files.
607          // In particular, on error, we'll move aside the bad edit file giving
608          // it a timestamp suffix. See moveAsideBadEditsFile.
609          Matcher m = EDITFILES_NAME_PATTERN.matcher(p.getName());
610          result = walFS.isFile(p) && m.matches();
611          // Skip the file whose name ends with RECOVERED_LOG_TMPFILE_SUFFIX,
612          // because it means splitwal thread is writting this file.
613          if (p.getName().endsWith(RECOVERED_LOG_TMPFILE_SUFFIX)) {
614            result = false;
615          }
616          // Skip SeqId Files
617          if (isSequenceIdFile(p)) {
618            result = false;
619          }
620        } catch (IOException e) {
621          LOG.warn("Failed isFile check on {}", p, e);
622        }
623        return result;
624      }
625    });
626    if (ArrayUtils.isNotEmpty(files)) {
627      Arrays.asList(files).forEach(status -> filesSorted.add(status.getPath()));
628    }
629    return filesSorted;
630  }
631
632  /**
633   * Move aside a bad edits file.
634   *
635   * @param walFS WAL FileSystem used to rename bad edits file.
636   * @param edits
637   *          Edits file to move aside.
638   * @return The name of the moved aside file.
639   * @throws IOException
640   */
641  public static Path moveAsideBadEditsFile(final FileSystem walFS, final Path edits)
642      throws IOException {
643    Path moveAsideName = new Path(edits.getParent(), edits.getName() + "."
644        + System.currentTimeMillis());
645    if (!walFS.rename(edits, moveAsideName)) {
646      LOG.warn("Rename failed from {} to {}", edits, moveAsideName);
647    }
648    return moveAsideName;
649  }
650
651  private static final String SEQUENCE_ID_FILE_SUFFIX = ".seqid";
652  private static final String OLD_SEQUENCE_ID_FILE_SUFFIX = "_seqid";
653  private static final int SEQUENCE_ID_FILE_SUFFIX_LENGTH = SEQUENCE_ID_FILE_SUFFIX.length();
654
655  /**
656   * Is the given file a region open sequence id file.
657   */
658  @VisibleForTesting
659  public static boolean isSequenceIdFile(final Path file) {
660    return file.getName().endsWith(SEQUENCE_ID_FILE_SUFFIX)
661        || file.getName().endsWith(OLD_SEQUENCE_ID_FILE_SUFFIX);
662  }
663
664  private static FileStatus[] getSequenceIdFiles(FileSystem walFS, Path regionDir)
665      throws IOException {
666    // TODO: Why are we using a method in here as part of our normal region open where
667    // there is no splitting involved? Fix. St.Ack 01/20/2017.
668    Path editsDir = WALSplitter.getRegionDirRecoveredEditsDir(regionDir);
669    try {
670      FileStatus[] files = walFS.listStatus(editsDir, WALSplitter::isSequenceIdFile);
671      return files != null ? files : new FileStatus[0];
672    } catch (FileNotFoundException e) {
673      return new FileStatus[0];
674    }
675  }
676
677  private static long getMaxSequenceId(FileStatus[] files) {
678    long maxSeqId = -1L;
679    for (FileStatus file : files) {
680      String fileName = file.getPath().getName();
681      try {
682        maxSeqId = Math.max(maxSeqId, Long
683          .parseLong(fileName.substring(0, fileName.length() - SEQUENCE_ID_FILE_SUFFIX_LENGTH)));
684      } catch (NumberFormatException ex) {
685        LOG.warn("Invalid SeqId File Name={}", fileName);
686      }
687    }
688    return maxSeqId;
689  }
690
691  /**
692   * Get the max sequence id which is stored in the region directory. -1 if none.
693   */
694  public static long getMaxRegionSequenceId(FileSystem walFS, Path regionDir) throws IOException {
695    return getMaxSequenceId(getSequenceIdFiles(walFS, regionDir));
696  }
697
698  /**
699   * Create a file with name as region's max sequence id
700   */
701  public static void writeRegionSequenceIdFile(FileSystem walFS, Path regionDir, long newMaxSeqId)
702      throws IOException {
703    FileStatus[] files = getSequenceIdFiles(walFS, regionDir);
704    long maxSeqId = getMaxSequenceId(files);
705    if (maxSeqId > newMaxSeqId) {
706      throw new IOException("The new max sequence id " + newMaxSeqId +
707        " is less than the old max sequence id " + maxSeqId);
708    }
709    // write a new seqId file
710    Path newSeqIdFile = new Path(WALSplitter.getRegionDirRecoveredEditsDir(regionDir),
711      newMaxSeqId + SEQUENCE_ID_FILE_SUFFIX);
712    if (newMaxSeqId != maxSeqId) {
713      try {
714        if (!walFS.createNewFile(newSeqIdFile) && !walFS.exists(newSeqIdFile)) {
715          throw new IOException("Failed to create SeqId file:" + newSeqIdFile);
716        }
717        LOG.debug("Wrote file={}, newMaxSeqId={}, maxSeqId={}", newSeqIdFile, newMaxSeqId,
718          maxSeqId);
719      } catch (FileAlreadyExistsException ignored) {
720        // latest hdfs throws this exception. it's all right if newSeqIdFile already exists
721      }
722    }
723    // remove old ones
724    for (FileStatus status : files) {
725      if (!newSeqIdFile.equals(status.getPath())) {
726        walFS.delete(status.getPath(), false);
727      }
728    }
729  }
730
731  /**
732   * This method will check 3 places for finding the max sequence id file. One is the expected
733   * place, another is the old place under the region directory, and the last one is the wrong one
734   * we introduced in HBASE-20734. See HBASE-22617 for more details.
735   * <p/>
736   * Notice that, you should always call this method instead of
737   * {@link #getMaxRegionSequenceId(FileSystem, Path)} until 4.0.0 release.
738   * @deprecated Only for compatibility, will be removed in 4.0.0.
739   */
740  @Deprecated
741  public static long getMaxRegionSequenceId(Configuration conf, RegionInfo region,
742      IOExceptionSupplier<FileSystem> rootFsSupplier, IOExceptionSupplier<FileSystem> walFsSupplier)
743      throws IOException {
744    FileSystem rootFs = rootFsSupplier.get();
745    FileSystem walFs = walFsSupplier.get();
746    Path regionWALDir = FSUtils.getWALRegionDir(conf, region.getTable(), region.getEncodedName());
747    // This is the old place where we store max sequence id file
748    Path regionDir = FSUtils.getRegionDirFromRootDir(FSUtils.getRootDir(conf), region);
749    // This is for HBASE-20734, where we use a wrong directory, see HBASE-22617 for more details.
750    Path wrongRegionWALDir =
751      FSUtils.getWrongWALRegionDir(conf, region.getTable(), region.getEncodedName());
752    long maxSeqId = getMaxRegionSequenceId(walFs, regionWALDir);
753    maxSeqId = Math.max(maxSeqId, getMaxRegionSequenceId(rootFs, regionDir));
754    maxSeqId = Math.max(maxSeqId, getMaxRegionSequenceId(walFs, wrongRegionWALDir));
755    return maxSeqId;
756  }
757
758  /**
759   * Create a new {@link Reader} for reading logs to split.
760   *
761   * @param file
762   * @return A new Reader instance, caller should close
763   * @throws IOException
764   * @throws CorruptedLogFileException
765   */
766  protected Reader getReader(FileStatus file, boolean skipErrors, CancelableProgressable reporter)
767      throws IOException, CorruptedLogFileException {
768    Path path = file.getPath();
769    long length = file.getLen();
770    Reader in;
771
772    // Check for possibly empty file. With appends, currently Hadoop reports a
773    // zero length even if the file has been sync'd. Revisit if HDFS-376 or
774    // HDFS-878 is committed.
775    if (length <= 0) {
776      LOG.warn("File {} might be still open, length is 0", path);
777    }
778
779    try {
780      FSUtils.getInstance(walFS, conf).recoverFileLease(walFS, path, conf, reporter);
781      try {
782        in = getReader(path, reporter);
783      } catch (EOFException e) {
784        if (length <= 0) {
785          // TODO should we ignore an empty, not-last log file if skip.errors
786          // is false? Either way, the caller should decide what to do. E.g.
787          // ignore if this is the last log in sequence.
788          // TODO is this scenario still possible if the log has been
789          // recovered (i.e. closed)
790          LOG.warn("Could not open {} for reading. File is empty", path, e);
791        }
792        // EOFException being ignored
793        return null;
794      }
795    } catch (IOException e) {
796      if (e instanceof FileNotFoundException) {
797        // A wal file may not exist anymore. Nothing can be recovered so move on
798        LOG.warn("File {} does not exist anymore", path, e);
799        return null;
800      }
801      if (!skipErrors || e instanceof InterruptedIOException) {
802        throw e; // Don't mark the file corrupted if interrupted, or not skipErrors
803      }
804      CorruptedLogFileException t =
805        new CorruptedLogFileException("skipErrors=true Could not open wal " +
806            path + " ignoring");
807      t.initCause(e);
808      throw t;
809    }
810    return in;
811  }
812
813  static private Entry getNextLogLine(Reader in, Path path, boolean skipErrors)
814  throws CorruptedLogFileException, IOException {
815    try {
816      return in.next();
817    } catch (EOFException eof) {
818      // truncated files are expected if a RS crashes (see HBASE-2643)
819      LOG.info("EOF from wal {}. Continuing.", path);
820      return null;
821    } catch (IOException e) {
822      // If the IOE resulted from bad file format,
823      // then this problem is idempotent and retrying won't help
824      if (e.getCause() != null &&
825          (e.getCause() instanceof ParseException ||
826           e.getCause() instanceof org.apache.hadoop.fs.ChecksumException)) {
827        LOG.warn("Parse exception from wal {}. Continuing", path, e);
828        return null;
829      }
830      if (!skipErrors) {
831        throw e;
832      }
833      CorruptedLogFileException t =
834        new CorruptedLogFileException("skipErrors=true Ignoring exception" +
835            " while parsing wal " + path + ". Marking as corrupted");
836      t.initCause(e);
837      throw t;
838    }
839  }
840
841  /**
842   * Create a new {@link Writer} for writing log splits.
843   * @return a new Writer instance, caller should close
844   */
845  protected Writer createWriter(Path logfile)
846      throws IOException {
847    return walFactory.createRecoveredEditsWriter(walFS, logfile);
848  }
849
850  /**
851   * Create a new {@link Reader} for reading logs to split.
852   * @return new Reader instance, caller should close
853   */
854  protected Reader getReader(Path curLogFile, CancelableProgressable reporter) throws IOException {
855    return walFactory.createReader(walFS, curLogFile, reporter);
856  }
857
858  /**
859   * Get current open writers
860   */
861  private int getNumOpenWriters() {
862    int result = 0;
863    if (this.outputSink != null) {
864      result += this.outputSink.getNumOpenWriters();
865    }
866    return result;
867  }
868
869  /**
870   * Contains some methods to control WAL-entries producer / consumer interactions
871   */
872  public static class PipelineController {
873    // If an exception is thrown by one of the other threads, it will be
874    // stored here.
875    AtomicReference<Throwable> thrown = new AtomicReference<>();
876
877    // Wait/notify for when data has been produced by the writer thread,
878    // consumed by the reader thread, or an exception occurred
879    public final Object dataAvailable = new Object();
880
881    void writerThreadError(Throwable t) {
882      thrown.compareAndSet(null, t);
883    }
884
885    /**
886     * Check for errors in the writer threads. If any is found, rethrow it.
887     */
888    void checkForErrors() throws IOException {
889      Throwable thrown = this.thrown.get();
890      if (thrown == null) return;
891      if (thrown instanceof IOException) {
892        throw new IOException(thrown);
893      } else {
894        throw new RuntimeException(thrown);
895      }
896    }
897  }
898
899  /**
900   * Class which accumulates edits and separates them into a buffer per region
901   * while simultaneously accounting RAM usage. Blocks if the RAM usage crosses
902   * a predefined threshold.
903   *
904   * Writer threads then pull region-specific buffers from this class.
905   */
906  public static class EntryBuffers {
907    PipelineController controller;
908
909    Map<byte[], RegionEntryBuffer> buffers = new TreeMap<>(Bytes.BYTES_COMPARATOR);
910
911    /* Track which regions are currently in the middle of writing. We don't allow
912       an IO thread to pick up bytes from a region if we're already writing
913       data for that region in a different IO thread. */
914    Set<byte[]> currentlyWriting = new TreeSet<>(Bytes.BYTES_COMPARATOR);
915
916    long totalBuffered = 0;
917    long maxHeapUsage;
918    boolean splitWriterCreationBounded;
919
920    public EntryBuffers(PipelineController controller, long maxHeapUsage) {
921      this(controller, maxHeapUsage, false);
922    }
923
924    public EntryBuffers(PipelineController controller, long maxHeapUsage,
925        boolean splitWriterCreationBounded){
926      this.controller = controller;
927      this.maxHeapUsage = maxHeapUsage;
928      this.splitWriterCreationBounded = splitWriterCreationBounded;
929    }
930
931    /**
932     * Append a log entry into the corresponding region buffer.
933     * Blocks if the total heap usage has crossed the specified threshold.
934     *
935     * @throws InterruptedException
936     * @throws IOException
937     */
938    public void appendEntry(Entry entry) throws InterruptedException, IOException {
939      WALKey key = entry.getKey();
940
941      RegionEntryBuffer buffer;
942      long incrHeap;
943      synchronized (this) {
944        buffer = buffers.get(key.getEncodedRegionName());
945        if (buffer == null) {
946          buffer = new RegionEntryBuffer(key.getTableName(), key.getEncodedRegionName());
947          buffers.put(key.getEncodedRegionName(), buffer);
948        }
949        incrHeap= buffer.appendEntry(entry);
950      }
951
952      // If we crossed the chunk threshold, wait for more space to be available
953      synchronized (controller.dataAvailable) {
954        totalBuffered += incrHeap;
955        while (totalBuffered > maxHeapUsage && controller.thrown.get() == null) {
956          LOG.debug("Used {} bytes of buffered edits, waiting for IO threads", totalBuffered);
957          controller.dataAvailable.wait(2000);
958        }
959        controller.dataAvailable.notifyAll();
960      }
961      controller.checkForErrors();
962    }
963
964    /**
965     * @return RegionEntryBuffer a buffer of edits to be written.
966     */
967    synchronized RegionEntryBuffer getChunkToWrite() {
968      // The core part of limiting opening writers is it doesn't return chunk only if the
969      // heap size is over maxHeapUsage. Thus it doesn't need to create a writer for each
970      // region during splitting. It will flush all the logs in the buffer after splitting
971      // through a threadpool, which means the number of writers it created is under control.
972      if (splitWriterCreationBounded && totalBuffered < maxHeapUsage) {
973        return null;
974      }
975      long biggestSize = 0;
976      byte[] biggestBufferKey = null;
977
978      for (Map.Entry<byte[], RegionEntryBuffer> entry : buffers.entrySet()) {
979        long size = entry.getValue().heapSize();
980        if (size > biggestSize && (!currentlyWriting.contains(entry.getKey()))) {
981          biggestSize = size;
982          biggestBufferKey = entry.getKey();
983        }
984      }
985      if (biggestBufferKey == null) {
986        return null;
987      }
988
989      RegionEntryBuffer buffer = buffers.remove(biggestBufferKey);
990      currentlyWriting.add(biggestBufferKey);
991      return buffer;
992    }
993
994    void doneWriting(RegionEntryBuffer buffer) {
995      synchronized (this) {
996        boolean removed = currentlyWriting.remove(buffer.encodedRegionName);
997        assert removed;
998      }
999      long size = buffer.heapSize();
1000
1001      synchronized (controller.dataAvailable) {
1002        totalBuffered -= size;
1003        // We may unblock writers
1004        controller.dataAvailable.notifyAll();
1005      }
1006    }
1007
1008    synchronized boolean isRegionCurrentlyWriting(byte[] region) {
1009      return currentlyWriting.contains(region);
1010    }
1011
1012    public void waitUntilDrained() {
1013      synchronized (controller.dataAvailable) {
1014        while (totalBuffered > 0) {
1015          try {
1016            controller.dataAvailable.wait(2000);
1017          } catch (InterruptedException e) {
1018            LOG.warn("Got interrupted while waiting for EntryBuffers is drained");
1019            Thread.interrupted();
1020            break;
1021          }
1022        }
1023      }
1024    }
1025  }
1026
1027  /**
1028   * A buffer of some number of edits for a given region.
1029   * This accumulates edits and also provides a memory optimization in order to
1030   * share a single byte array instance for the table and region name.
1031   * Also tracks memory usage of the accumulated edits.
1032   */
1033  public static class RegionEntryBuffer implements HeapSize {
1034    long heapInBuffer = 0;
1035    List<Entry> entryBuffer;
1036    TableName tableName;
1037    byte[] encodedRegionName;
1038
1039    RegionEntryBuffer(TableName tableName, byte[] region) {
1040      this.tableName = tableName;
1041      this.encodedRegionName = region;
1042      this.entryBuffer = new ArrayList<>();
1043    }
1044
1045    long appendEntry(Entry entry) {
1046      internify(entry);
1047      entryBuffer.add(entry);
1048      long incrHeap = entry.getEdit().heapSize() +
1049        ClassSize.align(2 * ClassSize.REFERENCE) + // WALKey pointers
1050        0; // TODO linkedlist entry
1051      heapInBuffer += incrHeap;
1052      return incrHeap;
1053    }
1054
1055    private void internify(Entry entry) {
1056      WALKeyImpl k = entry.getKey();
1057      k.internTableName(this.tableName);
1058      k.internEncodedRegionName(this.encodedRegionName);
1059    }
1060
1061    @Override
1062    public long heapSize() {
1063      return heapInBuffer;
1064    }
1065
1066    public byte[] getEncodedRegionName() {
1067      return encodedRegionName;
1068    }
1069
1070    public List<Entry> getEntryBuffer() {
1071      return entryBuffer;
1072    }
1073
1074    public TableName getTableName() {
1075      return tableName;
1076    }
1077  }
1078
1079  public static class WriterThread extends Thread {
1080    private volatile boolean shouldStop = false;
1081    private PipelineController controller;
1082    private EntryBuffers entryBuffers;
1083    private OutputSink outputSink = null;
1084
1085    WriterThread(PipelineController controller, EntryBuffers entryBuffers, OutputSink sink, int i){
1086      super(Thread.currentThread().getName() + "-Writer-" + i);
1087      this.controller = controller;
1088      this.entryBuffers = entryBuffers;
1089      outputSink = sink;
1090    }
1091
1092    @Override
1093    public void run()  {
1094      try {
1095        doRun();
1096      } catch (Throwable t) {
1097        LOG.error("Exiting thread", t);
1098        controller.writerThreadError(t);
1099      }
1100    }
1101
1102    private void doRun() throws IOException {
1103      LOG.trace("Writer thread starting");
1104      while (true) {
1105        RegionEntryBuffer buffer = entryBuffers.getChunkToWrite();
1106        if (buffer == null) {
1107          // No data currently available, wait on some more to show up
1108          synchronized (controller.dataAvailable) {
1109            if (shouldStop && !this.outputSink.flush()) {
1110              return;
1111            }
1112            try {
1113              controller.dataAvailable.wait(500);
1114            } catch (InterruptedException ie) {
1115              if (!shouldStop) {
1116                throw new RuntimeException(ie);
1117              }
1118            }
1119          }
1120          continue;
1121        }
1122
1123        assert buffer != null;
1124        try {
1125          writeBuffer(buffer);
1126        } finally {
1127          entryBuffers.doneWriting(buffer);
1128        }
1129      }
1130    }
1131
1132    private void writeBuffer(RegionEntryBuffer buffer) throws IOException {
1133      outputSink.append(buffer);
1134    }
1135
1136    void finish() {
1137      synchronized (controller.dataAvailable) {
1138        shouldStop = true;
1139        controller.dataAvailable.notifyAll();
1140      }
1141    }
1142  }
1143
1144  /**
1145   * The following class is an abstraction class to provide a common interface to support
1146   * different ways of consuming recovered edits.
1147   */
1148  public static abstract class OutputSink {
1149
1150    protected PipelineController controller;
1151    protected EntryBuffers entryBuffers;
1152
1153    protected ConcurrentHashMap<String, SinkWriter> writers = new ConcurrentHashMap<>();
1154    protected final ConcurrentHashMap<String, Long> regionMaximumEditLogSeqNum =
1155        new ConcurrentHashMap<>();
1156
1157
1158    protected final List<WriterThread> writerThreads = Lists.newArrayList();
1159
1160    /* Set of regions which we've decided should not output edits */
1161    protected final Set<byte[]> blacklistedRegions = Collections
1162        .synchronizedSet(new TreeSet<>(Bytes.BYTES_COMPARATOR));
1163
1164    protected boolean closeAndCleanCompleted = false;
1165
1166    protected boolean writersClosed = false;
1167
1168    protected final int numThreads;
1169
1170    protected CancelableProgressable reporter = null;
1171
1172    protected AtomicLong skippedEdits = new AtomicLong();
1173
1174    protected List<Path> splits = null;
1175
1176    public OutputSink(PipelineController controller, EntryBuffers entryBuffers, int numWriters) {
1177      numThreads = numWriters;
1178      this.controller = controller;
1179      this.entryBuffers = entryBuffers;
1180    }
1181
1182    void setReporter(CancelableProgressable reporter) {
1183      this.reporter = reporter;
1184    }
1185
1186    /**
1187     * Start the threads that will pump data from the entryBuffers to the output files.
1188     */
1189    public synchronized void startWriterThreads() {
1190      for (int i = 0; i < numThreads; i++) {
1191        WriterThread t = new WriterThread(controller, entryBuffers, this, i);
1192        t.start();
1193        writerThreads.add(t);
1194      }
1195    }
1196
1197    /**
1198     *
1199     * Update region's maximum edit log SeqNum.
1200     */
1201    void updateRegionMaximumEditLogSeqNum(Entry entry) {
1202      synchronized (regionMaximumEditLogSeqNum) {
1203        String regionName = Bytes.toString(entry.getKey().getEncodedRegionName());
1204        Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(regionName);
1205        if (currentMaxSeqNum == null || entry.getKey().getSequenceId() > currentMaxSeqNum) {
1206          regionMaximumEditLogSeqNum.put(regionName, entry.getKey().getSequenceId());
1207        }
1208      }
1209    }
1210
1211    /**
1212     * @return the number of currently opened writers
1213     */
1214    int getNumOpenWriters() {
1215      return this.writers.size();
1216    }
1217
1218    long getSkippedEdits() {
1219      return this.skippedEdits.get();
1220    }
1221
1222    /**
1223     * Wait for writer threads to dump all info to the sink
1224     * @return true when there is no error
1225     * @throws IOException
1226     */
1227    protected boolean finishWriting(boolean interrupt) throws IOException {
1228      LOG.debug("Waiting for split writer threads to finish");
1229      boolean progress_failed = false;
1230      for (WriterThread t : writerThreads) {
1231        t.finish();
1232      }
1233      if (interrupt) {
1234        for (WriterThread t : writerThreads) {
1235          t.interrupt(); // interrupt the writer threads. We are stopping now.
1236        }
1237      }
1238
1239      for (WriterThread t : writerThreads) {
1240        if (!progress_failed && reporter != null && !reporter.progress()) {
1241          progress_failed = true;
1242        }
1243        try {
1244          t.join();
1245        } catch (InterruptedException ie) {
1246          IOException iie = new InterruptedIOException();
1247          iie.initCause(ie);
1248          throw iie;
1249        }
1250      }
1251      controller.checkForErrors();
1252      LOG.info("{} split writers finished; closing.", this.writerThreads.size());
1253      return (!progress_failed);
1254    }
1255
1256    public abstract List<Path> finishWritingAndClose() throws IOException;
1257
1258    /**
1259     * @return a map from encoded region ID to the number of edits written out for that region.
1260     */
1261    public abstract Map<byte[], Long> getOutputCounts();
1262
1263    /**
1264     * @return number of regions we've recovered
1265     */
1266    public abstract int getNumberOfRecoveredRegions();
1267
1268    /**
1269     * @param buffer A WAL Edit Entry
1270     * @throws IOException
1271     */
1272    public abstract void append(RegionEntryBuffer buffer) throws IOException;
1273
1274    /**
1275     * WriterThread call this function to help flush internal remaining edits in buffer before close
1276     * @return true when underlying sink has something to flush
1277     */
1278    public boolean flush() throws IOException {
1279      return false;
1280    }
1281
1282    /**
1283     * Some WALEdit's contain only KV's for account on what happened to a region.
1284     * Not all sinks will want to get all of those edits.
1285     *
1286     * @return Return true if this sink wants to accept this region-level WALEdit.
1287     */
1288    public abstract boolean keepRegionEvent(Entry entry);
1289  }
1290
1291  /**
1292   * Class that manages the output streams from the log splitting process.
1293   */
1294  class LogRecoveredEditsOutputSink extends OutputSink {
1295
1296    public LogRecoveredEditsOutputSink(PipelineController controller, EntryBuffers entryBuffers,
1297        int numWriters) {
1298      // More threads could potentially write faster at the expense
1299      // of causing more disk seeks as the logs are split.
1300      // 3. After a certain setting (probably around 3) the
1301      // process will be bound on the reader in the current
1302      // implementation anyway.
1303      super(controller, entryBuffers, numWriters);
1304    }
1305
1306    /**
1307     * @return null if failed to report progress
1308     * @throws IOException
1309     */
1310    @Override
1311    public List<Path> finishWritingAndClose() throws IOException {
1312      boolean isSuccessful = false;
1313      List<Path> result = null;
1314      try {
1315        isSuccessful = finishWriting(false);
1316      } finally {
1317        result = close();
1318        List<IOException> thrown = closeLogWriters(null);
1319        if (CollectionUtils.isNotEmpty(thrown)) {
1320          throw MultipleIOException.createIOException(thrown);
1321        }
1322      }
1323      if (isSuccessful) {
1324        splits = result;
1325      }
1326      return splits;
1327    }
1328
1329    // delete the one with fewer wal entries
1330    private void deleteOneWithFewerEntries(WriterAndPath wap, Path dst)
1331        throws IOException {
1332      long dstMinLogSeqNum = -1L;
1333      try (WAL.Reader reader = walFactory.createReader(walFS, dst)) {
1334        WAL.Entry entry = reader.next();
1335        if (entry != null) {
1336          dstMinLogSeqNum = entry.getKey().getSequenceId();
1337        }
1338      } catch (EOFException e) {
1339        LOG.debug("Got EOF when reading first WAL entry from {}, an empty or broken WAL file?",
1340            dst, e);
1341      }
1342      if (wap.minLogSeqNum < dstMinLogSeqNum) {
1343        LOG.warn("Found existing old edits file. It could be the result of a previous failed"
1344            + " split attempt or we have duplicated wal entries. Deleting " + dst + ", length="
1345            + walFS.getFileStatus(dst).getLen());
1346        if (!walFS.delete(dst, false)) {
1347          LOG.warn("Failed deleting of old {}", dst);
1348          throw new IOException("Failed deleting of old " + dst);
1349        }
1350      } else {
1351        LOG.warn("Found existing old edits file and we have less entries. Deleting " + wap.p
1352            + ", length=" + walFS.getFileStatus(wap.p).getLen());
1353        if (!walFS.delete(wap.p, false)) {
1354          LOG.warn("Failed deleting of {}", wap.p);
1355          throw new IOException("Failed deleting of " + wap.p);
1356        }
1357      }
1358    }
1359
1360    /**
1361     * Close all of the output streams.
1362     * @return the list of paths written.
1363     */
1364    List<Path> close() throws IOException {
1365      Preconditions.checkState(!closeAndCleanCompleted);
1366
1367      final List<Path> paths = new ArrayList<>();
1368      final List<IOException> thrown = Lists.newArrayList();
1369      ThreadPoolExecutor closeThreadPool = Threads
1370          .getBoundedCachedThreadPool(numThreads, 30L, TimeUnit.SECONDS, new ThreadFactory() {
1371            private int count = 1;
1372
1373            @Override public Thread newThread(Runnable r) {
1374              Thread t = new Thread(r, "split-log-closeStream-" + count++);
1375              return t;
1376            }
1377          });
1378      CompletionService<Void> completionService = new ExecutorCompletionService<>(closeThreadPool);
1379      boolean progress_failed;
1380      try {
1381        progress_failed = executeCloseTask(completionService, thrown, paths);
1382      } catch (InterruptedException e) {
1383        IOException iie = new InterruptedIOException();
1384        iie.initCause(e);
1385        throw iie;
1386      } catch (ExecutionException e) {
1387        throw new IOException(e.getCause());
1388      } finally {
1389        closeThreadPool.shutdownNow();
1390      }
1391      if (!thrown.isEmpty()) {
1392        throw MultipleIOException.createIOException(thrown);
1393      }
1394      writersClosed = true;
1395      closeAndCleanCompleted = true;
1396      if (progress_failed) {
1397        return null;
1398      }
1399      return paths;
1400    }
1401
1402    /**
1403     * @param completionService threadPool to execute the closing tasks
1404     * @param thrown store the exceptions
1405     * @param paths arrayList to store the paths written
1406     * @return if close tasks executed successful
1407     */
1408    boolean executeCloseTask(CompletionService<Void> completionService,
1409        List<IOException> thrown, List<Path> paths)
1410        throws InterruptedException, ExecutionException {
1411      for (final Map.Entry<String, SinkWriter> writersEntry : writers.entrySet()) {
1412        if (LOG.isTraceEnabled()) {
1413          LOG.trace("Submitting close of " + ((WriterAndPath) writersEntry.getValue()).p);
1414        }
1415        completionService.submit(new Callable<Void>() {
1416          @Override public Void call() throws Exception {
1417            WriterAndPath wap = (WriterAndPath) writersEntry.getValue();
1418            Path dst = closeWriter(writersEntry.getKey(), wap, thrown);
1419            paths.add(dst);
1420            return null;
1421          }
1422        });
1423      }
1424      boolean progress_failed = false;
1425      for (int i = 0, n = this.writers.size(); i < n; i++) {
1426        Future<Void> future = completionService.take();
1427        future.get();
1428        if (!progress_failed && reporter != null && !reporter.progress()) {
1429          progress_failed = true;
1430        }
1431      }
1432      return progress_failed;
1433    }
1434
1435    Path closeWriter(String encodedRegionName, WriterAndPath wap,
1436        List<IOException> thrown) throws IOException{
1437      LOG.trace("Closing " + wap.p);
1438      try {
1439        wap.w.close();
1440      } catch (IOException ioe) {
1441        LOG.error("Couldn't close log at " + wap.p, ioe);
1442        thrown.add(ioe);
1443        return null;
1444      }
1445      if (LOG.isDebugEnabled()) {
1446        LOG.debug("Closed wap " + wap.p + " (wrote " + wap.editsWritten
1447            + " edits, skipped " + wap.editsSkipped + " edits in "
1448            + (wap.nanosSpent / 1000 / 1000) + "ms");
1449      }
1450      if (wap.editsWritten == 0) {
1451        // just remove the empty recovered.edits file
1452        if (walFS.exists(wap.p) && !walFS.delete(wap.p, false)) {
1453          LOG.warn("Failed deleting empty " + wap.p);
1454          throw new IOException("Failed deleting empty  " + wap.p);
1455        }
1456        return null;
1457      }
1458
1459      Path dst = getCompletedRecoveredEditsFilePath(wap.p,
1460          regionMaximumEditLogSeqNum.get(encodedRegionName));
1461      try {
1462        if (!dst.equals(wap.p) && walFS.exists(dst)) {
1463          deleteOneWithFewerEntries(wap, dst);
1464        }
1465        // Skip the unit tests which create a splitter that reads and
1466        // writes the data without touching disk.
1467        // TestHLogSplit#testThreading is an example.
1468        if (walFS.exists(wap.p)) {
1469          if (!walFS.rename(wap.p, dst)) {
1470            throw new IOException("Failed renaming " + wap.p + " to " + dst);
1471          }
1472          LOG.info("Rename " + wap.p + " to " + dst);
1473        }
1474      } catch (IOException ioe) {
1475        LOG.error("Couldn't rename " + wap.p + " to " + dst, ioe);
1476        thrown.add(ioe);
1477        return null;
1478      }
1479      return dst;
1480    }
1481
1482    private List<IOException> closeLogWriters(List<IOException> thrown) throws IOException {
1483      if (writersClosed) {
1484        return thrown;
1485      }
1486      if (thrown == null) {
1487        thrown = Lists.newArrayList();
1488      }
1489      try {
1490        for (WriterThread t : writerThreads) {
1491          while (t.isAlive()) {
1492            t.shouldStop = true;
1493            t.interrupt();
1494            try {
1495              t.join(10);
1496            } catch (InterruptedException e) {
1497              IOException iie = new InterruptedIOException();
1498              iie.initCause(e);
1499              throw iie;
1500            }
1501          }
1502        }
1503      } finally {
1504        WriterAndPath wap = null;
1505        for (SinkWriter tmpWAP : writers.values()) {
1506          try {
1507            wap = (WriterAndPath) tmpWAP;
1508            wap.w.close();
1509          } catch (IOException ioe) {
1510            LOG.error("Couldn't close log at " + wap.p, ioe);
1511            thrown.add(ioe);
1512            continue;
1513          }
1514          LOG.info(
1515              "Closed log " + wap.p + " (wrote " + wap.editsWritten + " edits in " + (wap.nanosSpent
1516                  / 1000 / 1000) + "ms)");
1517        }
1518        writersClosed = true;
1519      }
1520
1521      return thrown;
1522    }
1523
1524    /**
1525     * Get a writer and path for a log starting at the given entry. This function is threadsafe so
1526     * long as multiple threads are always acting on different regions.
1527     * @return null if this region shouldn't output any logs
1528     */
1529    WriterAndPath getWriterAndPath(Entry entry, boolean reusable) throws IOException {
1530      byte region[] = entry.getKey().getEncodedRegionName();
1531      String regionName = Bytes.toString(region);
1532      WriterAndPath ret = (WriterAndPath) writers.get(regionName);
1533      if (ret != null) {
1534        return ret;
1535      }
1536      // If we already decided that this region doesn't get any output
1537      // we don't need to check again.
1538      if (blacklistedRegions.contains(region)) {
1539        return null;
1540      }
1541      ret = createWAP(region, entry);
1542      if (ret == null) {
1543        blacklistedRegions.add(region);
1544        return null;
1545      }
1546      if(reusable) {
1547        writers.put(regionName, ret);
1548      }
1549      return ret;
1550    }
1551
1552    /**
1553     * @return a path with a write for that path. caller should close.
1554     */
1555    WriterAndPath createWAP(byte[] region, Entry entry) throws IOException {
1556      String tmpDirName = conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
1557        HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
1558      Path regionedits = getRegionSplitEditsPath(entry,
1559          fileBeingSplit.getPath().getName(), tmpDirName, conf);
1560      if (regionedits == null) {
1561        return null;
1562      }
1563      FileSystem walFs = FSUtils.getWALFileSystem(conf);
1564      if (walFs.exists(regionedits)) {
1565        LOG.warn("Found old edits file. It could be the "
1566            + "result of a previous failed split attempt. Deleting " + regionedits + ", length="
1567            + walFs.getFileStatus(regionedits).getLen());
1568        if (!walFs.delete(regionedits, false)) {
1569          LOG.warn("Failed delete of old {}", regionedits);
1570        }
1571      }
1572      Writer w = createWriter(regionedits);
1573      LOG.debug("Creating writer path={}", regionedits);
1574      return new WriterAndPath(regionedits, w, entry.getKey().getSequenceId());
1575    }
1576
1577    void filterCellByStore(Entry logEntry) {
1578      Map<byte[], Long> maxSeqIdInStores =
1579          regionMaxSeqIdInStores.get(Bytes.toString(logEntry.getKey().getEncodedRegionName()));
1580      if (MapUtils.isEmpty(maxSeqIdInStores)) {
1581        return;
1582      }
1583      // Create the array list for the cells that aren't filtered.
1584      // We make the assumption that most cells will be kept.
1585      ArrayList<Cell> keptCells = new ArrayList<>(logEntry.getEdit().getCells().size());
1586      for (Cell cell : logEntry.getEdit().getCells()) {
1587        if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
1588          keptCells.add(cell);
1589        } else {
1590          byte[] family = CellUtil.cloneFamily(cell);
1591          Long maxSeqId = maxSeqIdInStores.get(family);
1592          // Do not skip cell even if maxSeqId is null. Maybe we are in a rolling upgrade,
1593          // or the master was crashed before and we can not get the information.
1594          if (maxSeqId == null || maxSeqId.longValue() < logEntry.getKey().getSequenceId()) {
1595            keptCells.add(cell);
1596          }
1597        }
1598      }
1599
1600      // Anything in the keptCells array list is still live.
1601      // So rather than removing the cells from the array list
1602      // which would be an O(n^2) operation, we just replace the list
1603      logEntry.getEdit().setCells(keptCells);
1604    }
1605
1606    @Override
1607    public void append(RegionEntryBuffer buffer) throws IOException {
1608      appendBuffer(buffer, true);
1609    }
1610
1611    WriterAndPath appendBuffer(RegionEntryBuffer buffer, boolean reusable) throws IOException{
1612      List<Entry> entries = buffer.entryBuffer;
1613      if (entries.isEmpty()) {
1614        LOG.warn("got an empty buffer, skipping");
1615        return null;
1616      }
1617
1618      WriterAndPath wap = null;
1619
1620      long startTime = System.nanoTime();
1621      try {
1622        int editsCount = 0;
1623
1624        for (Entry logEntry : entries) {
1625          if (wap == null) {
1626            wap = getWriterAndPath(logEntry, reusable);
1627            if (wap == null) {
1628              if (LOG.isTraceEnabled()) {
1629                // This log spews the full edit. Can be massive in the log. Enable only debugging
1630                // WAL lost edit issues.
1631                LOG.trace("getWriterAndPath decided we don't need to write edits for {}", logEntry);
1632              }
1633              return null;
1634            }
1635          }
1636          filterCellByStore(logEntry);
1637          if (!logEntry.getEdit().isEmpty()) {
1638            wap.w.append(logEntry);
1639            this.updateRegionMaximumEditLogSeqNum(logEntry);
1640            editsCount++;
1641          } else {
1642            wap.incrementSkippedEdits(1);
1643          }
1644        }
1645        // Pass along summary statistics
1646        wap.incrementEdits(editsCount);
1647        wap.incrementNanoTime(System.nanoTime() - startTime);
1648      } catch (IOException e) {
1649          e = e instanceof RemoteException ?
1650                  ((RemoteException)e).unwrapRemoteException() : e;
1651        LOG.error(HBaseMarkers.FATAL, "Got while writing log entry to log", e);
1652        throw e;
1653      }
1654      return wap;
1655    }
1656
1657    @Override
1658    public boolean keepRegionEvent(Entry entry) {
1659      ArrayList<Cell> cells = entry.getEdit().getCells();
1660      for (Cell cell : cells) {
1661        if (WALEdit.isCompactionMarker(cell)) {
1662          return true;
1663        }
1664      }
1665      return false;
1666    }
1667
1668    /**
1669     * @return a map from encoded region ID to the number of edits written out for that region.
1670     */
1671    @Override
1672    public Map<byte[], Long> getOutputCounts() {
1673      TreeMap<byte[], Long> ret = new TreeMap<>(Bytes.BYTES_COMPARATOR);
1674      for (Map.Entry<String, SinkWriter> entry : writers.entrySet()) {
1675        ret.put(Bytes.toBytes(entry.getKey()), entry.getValue().editsWritten);
1676      }
1677      return ret;
1678    }
1679
1680    @Override
1681    public int getNumberOfRecoveredRegions() {
1682      return writers.size();
1683    }
1684  }
1685
1686  /**
1687   *
1688   */
1689  class BoundedLogWriterCreationOutputSink extends LogRecoveredEditsOutputSink {
1690
1691    private ConcurrentHashMap<String, Long> regionRecoverStatMap = new ConcurrentHashMap<>();
1692
1693    public BoundedLogWriterCreationOutputSink(PipelineController controller,
1694        EntryBuffers entryBuffers, int numWriters) {
1695      super(controller, entryBuffers, numWriters);
1696    }
1697
1698    @Override
1699    public List<Path> finishWritingAndClose() throws IOException {
1700      boolean isSuccessful;
1701      List<Path> result;
1702      try {
1703        isSuccessful = finishWriting(false);
1704      } finally {
1705        result = close();
1706      }
1707      if (isSuccessful) {
1708        splits = result;
1709      }
1710      return splits;
1711    }
1712
1713    @Override
1714    boolean executeCloseTask(CompletionService<Void> completionService,
1715        List<IOException> thrown, List<Path> paths)
1716        throws InterruptedException, ExecutionException {
1717      for (final Map.Entry<byte[], RegionEntryBuffer> buffer : entryBuffers.buffers.entrySet()) {
1718        LOG.info("Submitting writeThenClose of {}",
1719            Bytes.toString(buffer.getValue().encodedRegionName));
1720        completionService.submit(new Callable<Void>() {
1721          @Override
1722          public Void call() throws Exception {
1723            Path dst = writeThenClose(buffer.getValue());
1724            paths.add(dst);
1725            return null;
1726          }
1727        });
1728      }
1729      boolean progress_failed = false;
1730      for (int i = 0, n = entryBuffers.buffers.size(); i < n; i++) {
1731        Future<Void> future = completionService.take();
1732        future.get();
1733        if (!progress_failed && reporter != null && !reporter.progress()) {
1734          progress_failed = true;
1735        }
1736      }
1737
1738      return progress_failed;
1739    }
1740
1741    /**
1742     * since the splitting process may create multiple output files, we need a map
1743     * regionRecoverStatMap to track the output count of each region.
1744     * @return a map from encoded region ID to the number of edits written out for that region.
1745     */
1746    @Override
1747    public Map<byte[], Long> getOutputCounts() {
1748      Map<byte[], Long> regionRecoverStatMapResult = new HashMap<>();
1749      for(Map.Entry<String, Long> entry: regionRecoverStatMap.entrySet()){
1750        regionRecoverStatMapResult.put(Bytes.toBytes(entry.getKey()), entry.getValue());
1751      }
1752      return regionRecoverStatMapResult;
1753    }
1754
1755    /**
1756     * @return the number of recovered regions
1757     */
1758    @Override
1759    public int getNumberOfRecoveredRegions() {
1760      return regionRecoverStatMap.size();
1761    }
1762
1763    /**
1764     * Append the buffer to a new recovered edits file, then close it after all done
1765     * @param buffer contain all entries of a certain region
1766     * @throws IOException when closeWriter failed
1767     */
1768    @Override
1769    public void append(RegionEntryBuffer buffer) throws IOException {
1770      writeThenClose(buffer);
1771    }
1772
1773    private Path writeThenClose(RegionEntryBuffer buffer) throws IOException {
1774      WriterAndPath wap = appendBuffer(buffer, false);
1775      if(wap != null) {
1776        String encodedRegionName = Bytes.toString(buffer.encodedRegionName);
1777        Long value = regionRecoverStatMap.putIfAbsent(encodedRegionName, wap.editsWritten);
1778        if (value != null) {
1779          Long newValue = regionRecoverStatMap.get(encodedRegionName) + wap.editsWritten;
1780          regionRecoverStatMap.put(encodedRegionName, newValue);
1781        }
1782      }
1783
1784      Path dst = null;
1785      List<IOException> thrown = new ArrayList<>();
1786      if(wap != null){
1787        dst = closeWriter(Bytes.toString(buffer.encodedRegionName), wap, thrown);
1788      }
1789      if (!thrown.isEmpty()) {
1790        throw MultipleIOException.createIOException(thrown);
1791      }
1792      return dst;
1793    }
1794  }
1795
1796  /**
1797   * Class wraps the actual writer which writes data out and related statistics
1798   */
1799  public abstract static class SinkWriter {
1800    /* Count of edits written to this path */
1801    long editsWritten = 0;
1802    /* Count of edits skipped to this path */
1803    long editsSkipped = 0;
1804    /* Number of nanos spent writing to this log */
1805    long nanosSpent = 0;
1806
1807    void incrementEdits(int edits) {
1808      editsWritten += edits;
1809    }
1810
1811    void incrementSkippedEdits(int skipped) {
1812      editsSkipped += skipped;
1813    }
1814
1815    void incrementNanoTime(long nanos) {
1816      nanosSpent += nanos;
1817    }
1818  }
1819
1820  /**
1821   * Private data structure that wraps a Writer and its Path, also collecting statistics about the
1822   * data written to this output.
1823   */
1824  private final static class WriterAndPath extends SinkWriter {
1825    final Path p;
1826    final Writer w;
1827    final long minLogSeqNum;
1828
1829    WriterAndPath(final Path p, final Writer w, final long minLogSeqNum) {
1830      this.p = p;
1831      this.w = w;
1832      this.minLogSeqNum = minLogSeqNum;
1833    }
1834  }
1835
1836  static class CorruptedLogFileException extends Exception {
1837    private static final long serialVersionUID = 1L;
1838
1839    CorruptedLogFileException(String s) {
1840      super(s);
1841    }
1842  }
1843
1844  /** A struct used by getMutationsFromWALEntry */
1845  public static class MutationReplay implements Comparable<MutationReplay> {
1846    public MutationReplay(MutationType type, Mutation mutation, long nonceGroup, long nonce) {
1847      this.type = type;
1848      this.mutation = mutation;
1849      if(this.mutation.getDurability() != Durability.SKIP_WAL) {
1850        // using ASYNC_WAL for relay
1851        this.mutation.setDurability(Durability.ASYNC_WAL);
1852      }
1853      this.nonceGroup = nonceGroup;
1854      this.nonce = nonce;
1855    }
1856
1857    public final MutationType type;
1858    public final Mutation mutation;
1859    public final long nonceGroup;
1860    public final long nonce;
1861
1862    @Override
1863    public int compareTo(final MutationReplay d) {
1864      return this.mutation.compareTo(d.mutation);
1865    }
1866
1867    @Override
1868    public boolean equals(Object obj) {
1869      if(!(obj instanceof MutationReplay)) {
1870        return false;
1871      } else {
1872        return this.compareTo((MutationReplay)obj) == 0;
1873      }
1874    }
1875
1876    @Override
1877    public int hashCode() {
1878      return this.mutation.hashCode();
1879    }
1880  }
1881
1882  /**
1883   * This function is used to construct mutations from a WALEntry. It also
1884   * reconstructs WALKey &amp; WALEdit from the passed in WALEntry
1885   * @param entry
1886   * @param cells
1887   * @param logEntry pair of WALKey and WALEdit instance stores WALKey and WALEdit instances
1888   *          extracted from the passed in WALEntry.
1889   * @return list of Pair&lt;MutationType, Mutation&gt; to be replayed
1890   * @throws IOException
1891   */
1892  public static List<MutationReplay> getMutationsFromWALEntry(WALEntry entry, CellScanner cells,
1893      Pair<WALKey, WALEdit> logEntry, Durability durability) throws IOException {
1894    if (entry == null) {
1895      // return an empty array
1896      return Collections.emptyList();
1897    }
1898
1899    long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ?
1900      entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber();
1901    int count = entry.getAssociatedCellCount();
1902    List<MutationReplay> mutations = new ArrayList<>();
1903    Cell previousCell = null;
1904    Mutation m = null;
1905    WALKeyImpl key = null;
1906    WALEdit val = null;
1907    if (logEntry != null) {
1908      val = new WALEdit();
1909    }
1910
1911    for (int i = 0; i < count; i++) {
1912      // Throw index out of bounds if our cell count is off
1913      if (!cells.advance()) {
1914        throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
1915      }
1916      Cell cell = cells.current();
1917      if (val != null) val.add(cell);
1918
1919      boolean isNewRowOrType =
1920          previousCell == null || previousCell.getTypeByte() != cell.getTypeByte()
1921              || !CellUtil.matchingRows(previousCell, cell);
1922      if (isNewRowOrType) {
1923        // Create new mutation
1924        if (CellUtil.isDelete(cell)) {
1925          m = new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
1926          // Deletes don't have nonces.
1927          mutations.add(new MutationReplay(
1928              MutationType.DELETE, m, HConstants.NO_NONCE, HConstants.NO_NONCE));
1929        } else {
1930          m = new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
1931          // Puts might come from increment or append, thus we need nonces.
1932          long nonceGroup = entry.getKey().hasNonceGroup()
1933              ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
1934          long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
1935          mutations.add(new MutationReplay(MutationType.PUT, m, nonceGroup, nonce));
1936        }
1937      }
1938      if (CellUtil.isDelete(cell)) {
1939        ((Delete) m).add(cell);
1940      } else {
1941        ((Put) m).add(cell);
1942      }
1943      m.setDurability(durability);
1944      previousCell = cell;
1945    }
1946
1947    // reconstruct WALKey
1948    if (logEntry != null) {
1949      org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALKey walKeyProto =
1950          entry.getKey();
1951      List<UUID> clusterIds = new ArrayList<>(walKeyProto.getClusterIdsCount());
1952      for (HBaseProtos.UUID uuid : entry.getKey().getClusterIdsList()) {
1953        clusterIds.add(new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits()));
1954      }
1955      key = new WALKeyImpl(walKeyProto.getEncodedRegionName().toByteArray(), TableName.valueOf(
1956              walKeyProto.getTableName().toByteArray()), replaySeqId, walKeyProto.getWriteTime(),
1957              clusterIds, walKeyProto.getNonceGroup(), walKeyProto.getNonce(), null);
1958      logEntry.setFirst(key);
1959      logEntry.setSecond(val);
1960    }
1961
1962    return mutations;
1963  }
1964}