View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver.wal;
20  
21  import java.io.EOFException;
22  import java.io.FileNotFoundException;
23  import java.io.IOException;
24  import java.io.InterruptedIOException;
25  import java.text.ParseException;
26  import java.util.ArrayList;
27  import java.util.Collections;
28  import java.util.HashSet;
29  import java.util.LinkedList;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.Set;
33  import java.util.TreeMap;
34  import java.util.TreeSet;
35  import java.util.UUID;
36  import java.util.concurrent.Callable;
37  import java.util.concurrent.CompletionService;
38  import java.util.concurrent.ConcurrentHashMap;
39  import java.util.concurrent.ExecutionException;
40  import java.util.concurrent.ExecutorCompletionService;
41  import java.util.concurrent.Future;
42  import java.util.concurrent.ThreadFactory;
43  import java.util.concurrent.ThreadPoolExecutor;
44  import java.util.concurrent.TimeUnit;
45  import java.util.concurrent.atomic.AtomicBoolean;
46  import java.util.concurrent.atomic.AtomicLong;
47  import java.util.concurrent.atomic.AtomicReference;
48  
49  import org.apache.commons.logging.Log;
50  import org.apache.commons.logging.LogFactory;
51  import org.apache.hadoop.classification.InterfaceAudience;
52  import org.apache.hadoop.conf.Configuration;
53  import org.apache.hadoop.fs.FileStatus;
54  import org.apache.hadoop.fs.FileSystem;
55  import org.apache.hadoop.fs.Path;
56  import org.apache.hadoop.hbase.Cell;
57  import org.apache.hadoop.hbase.CellScanner;
58  import org.apache.hadoop.hbase.CellUtil;
59  import org.apache.hadoop.hbase.HBaseConfiguration;
60  import org.apache.hadoop.hbase.HConstants;
61  import org.apache.hadoop.hbase.HRegionInfo;
62  import org.apache.hadoop.hbase.HRegionLocation;
63  import org.apache.hadoop.hbase.KeyValue;
64  import org.apache.hadoop.hbase.KeyValueUtil;
65  import org.apache.hadoop.hbase.RemoteExceptionHandler;
66  import org.apache.hadoop.hbase.ServerName;
67  import org.apache.hadoop.hbase.TableName;
68  import org.apache.hadoop.hbase.TableNotFoundException;
69  import org.apache.hadoop.hbase.Tag;
70  import org.apache.hadoop.hbase.TagType;
71  import org.apache.hadoop.hbase.client.ConnectionUtils;
72  import org.apache.hadoop.hbase.client.Delete;
73  import org.apache.hadoop.hbase.client.HConnection;
74  import org.apache.hadoop.hbase.client.HConnectionManager;
75  import org.apache.hadoop.hbase.client.Mutation;
76  import org.apache.hadoop.hbase.client.Put;
77  import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
78  import org.apache.hadoop.hbase.io.HeapSize;
79  import org.apache.hadoop.hbase.master.SplitLogManager;
80  import org.apache.hadoop.hbase.monitoring.MonitoredTask;
81  import org.apache.hadoop.hbase.monitoring.TaskMonitor;
82  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
83  import org.apache.hadoop.hbase.protobuf.RequestConverter;
84  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
85  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest;
86  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse;
87  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
88  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
89  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
90  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey;
91  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds;
92  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.StoreSequenceId;
93  import org.apache.hadoop.hbase.regionserver.HRegion;
94  import org.apache.hadoop.hbase.regionserver.LastSequenceId;
95  import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
96  import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
97  import org.apache.hadoop.hbase.regionserver.wal.HLog.Writer;
98  import org.apache.hadoop.hbase.util.Bytes;
99  import org.apache.hadoop.hbase.util.CancelableProgressable;
100 import org.apache.hadoop.hbase.util.ClassSize;
101 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
102 import org.apache.hadoop.hbase.util.FSUtils;
103 import org.apache.hadoop.hbase.util.Pair;
104 import org.apache.hadoop.hbase.util.Threads;
105 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
106 import org.apache.hadoop.hbase.zookeeper.ZKTable;
107 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
108 import org.apache.hadoop.io.MultipleIOException;
109 import org.apache.zookeeper.KeeperException;
110 
111 import com.google.common.base.Preconditions;
112 import com.google.common.collect.Lists;
113 import com.google.protobuf.ServiceException;
114 
115 /**
116  * This class is responsible for splitting up a bunch of regionserver commit log
117  * files that are no longer being written to, into new files, one per region for
118  * region to replay on startup. Delete the old log files when finished.
119  */
120 @InterfaceAudience.Private
121 public class HLogSplitter {
122   static final Log LOG = LogFactory.getLog(HLogSplitter.class);
123 
124   // Parameters for split process
125   protected final Path rootDir;
126   protected final FileSystem fs;
127   protected final Configuration conf;
128 
129   // Major subcomponents of the split process.
130   // These are separated into inner classes to make testing easier.
131   OutputSink outputSink;
132   EntryBuffers entryBuffers;
133 
134   private Set<TableName> disablingOrDisabledTables =
135       new HashSet<TableName>();
136   private ZooKeeperWatcher watcher;
137 
138   // If an exception is thrown by one of the other threads, it will be
139   // stored here.
140   protected AtomicReference<Throwable> thrown = new AtomicReference<Throwable>();
141 
142   // Wait/notify for when data has been produced by the reader thread,
143   // consumed by the reader thread, or an exception occurred
144   final Object dataAvailable = new Object();
145 
146   private MonitoredTask status;
147 
148   // For checking the latest flushed sequence id
149   protected final LastSequenceId sequenceIdChecker;
150 
151   protected boolean distributedLogReplay;
152 
153   // Map encodedRegionName -> lastFlushedSequenceId
154   protected Map<String, Long> lastFlushedSequenceIds = new ConcurrentHashMap<String, Long>();
155 
156   // Map encodedRegionName -> maxSeqIdInStores
157   protected Map<String, Map<byte[], Long>> regionMaxSeqIdInStores =
158       new ConcurrentHashMap<String, Map<byte[], Long>>();
159 
160   // Failed region server that the wal file being split belongs to
161   protected String failedServerName = "";
162 
163   // Number of writer threads
164   private final int numWriterThreads;
165 
166   // Min batch size when replay WAL edits
167   private final int minBatchSize;
168 
169   HLogSplitter(Configuration conf, Path rootDir,
170       FileSystem fs, LastSequenceId idChecker, ZooKeeperWatcher zkw) {
171     this.conf = HBaseConfiguration.create(conf);
172     String codecClassName = conf
173         .get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
174     this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName);
175     this.rootDir = rootDir;
176     this.fs = fs;
177     this.sequenceIdChecker = idChecker;
178     this.watcher = zkw;
179 
180     entryBuffers = new EntryBuffers(
181         this.conf.getInt("hbase.regionserver.hlog.splitlog.buffersize",
182             128*1024*1024));
183 
184     // a larger minBatchSize may slow down recovery because replay writer has to wait for
185     // enough edits before replaying them
186     this.minBatchSize = this.conf.getInt("hbase.regionserver.wal.logreplay.batch.size", 64);
187     this.distributedLogReplay = HLogSplitter.isDistributedLogReplay(this.conf);
188 
189     this.numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
190     if (zkw != null && this.distributedLogReplay) {
191       outputSink = new LogReplayOutputSink(numWriterThreads);
192     } else {
193       if (this.distributedLogReplay) {
194         LOG.info("ZooKeeperWatcher is passed in as NULL so disable distrubitedLogRepaly.");
195       }
196       this.distributedLogReplay = false;
197       outputSink = new LogRecoveredEditsOutputSink(numWriterThreads);
198     }
199 
200   }
201 
202   /**
203    * Splits a HLog file into region's recovered-edits directory.
204    * This is the main entry point for distributed log splitting from SplitLogWorker.
205    * <p>
206    * If the log file has N regions then N recovered.edits files will be produced.
207    * <p>
208    * @param rootDir
209    * @param logfile
210    * @param fs
211    * @param conf
212    * @param reporter
213    * @param idChecker
214    * @param zkw ZooKeeperWatcher if it's null, we will back to the old-style log splitting where we
215    *          dump out recoved.edits files for regions to replay on.
216    * @return false if it is interrupted by the progress-able.
217    * @throws IOException
218    */
219   public static boolean splitLogFile(Path rootDir, FileStatus logfile, FileSystem fs,
220       Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker,
221       ZooKeeperWatcher zkw) throws IOException {
222     HLogSplitter s = new HLogSplitter(conf, rootDir, fs, idChecker, zkw);
223     return s.splitLogFile(logfile, reporter);
224   }
225 
226   // A wrapper to split one log folder using the method used by distributed
227   // log splitting. Used by tools and unit tests. It should be package private.
228   // It is public only because TestWALObserver is in a different package,
229   // which uses this method to to log splitting.
230   public static List<Path> split(Path rootDir, Path logDir, Path oldLogDir,
231       FileSystem fs, Configuration conf) throws IOException {
232     FileStatus[] logfiles = fs.listStatus(logDir);
233     List<Path> splits = new ArrayList<Path>();
234     if (logfiles != null && logfiles.length > 0) {
235       for (FileStatus logfile: logfiles) {
236         HLogSplitter s = new HLogSplitter(conf, rootDir, fs, null, null);
237         if (s.splitLogFile(logfile, null)) {
238           finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf);
239           if (s.outputSink.splits != null) {
240             splits.addAll(s.outputSink.splits);
241           }
242         }
243       }
244     }
245     if (!fs.delete(logDir, true)) {
246       throw new IOException("Unable to delete src dir: " + logDir);
247     }
248     return splits;
249   }
250 
251   // The real log splitter. It just splits one log file.
252   boolean splitLogFile(FileStatus logfile,
253       CancelableProgressable reporter) throws IOException {
254     boolean isCorrupted = false;
255     Preconditions.checkState(status == null);
256     boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors",
257       HLog.SPLIT_SKIP_ERRORS_DEFAULT);
258     int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024);
259     Path logPath = logfile.getPath();
260     boolean outputSinkStarted = false;
261     boolean progress_failed = false;
262     int editsCount = 0;
263     int editsSkipped = 0;
264 
265     try {
266       status = TaskMonitor.get().createStatus(
267         "Splitting log file " + logfile.getPath() +
268         "into a temporary staging area.");
269       long logLength = logfile.getLen();
270       LOG.info("Splitting hlog: " + logPath + ", length=" + logLength);
271       LOG.info("DistributedLogReplay = " + this.distributedLogReplay);
272       status.setStatus("Opening log file");
273       if (reporter != null && !reporter.progress()) {
274         progress_failed = true;
275         return false;
276       }
277       Reader in = null;
278       try {
279         in = getReader(fs, logfile, conf, skipErrors, reporter);
280       } catch (CorruptedLogFileException e) {
281         LOG.warn("Could not get reader, corrupted log file " + logPath, e);
282         ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs);
283         isCorrupted = true;
284       }
285       if (in == null) {
286         status.markComplete("Was nothing to split in log file");
287         LOG.warn("Nothing to split in log file " + logPath);
288         return true;
289       }
290       if(watcher != null) {
291         try {
292           disablingOrDisabledTables = ZKTable.getDisabledOrDisablingTables(watcher);
293         } catch (KeeperException e) {
294           throw new IOException("Can't get disabling/disabled tables", e);
295         }
296       }
297       int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3);
298       int numOpenedFilesLastCheck = 0;
299       outputSink.setReporter(reporter);
300       outputSink.startWriterThreads();
301       outputSinkStarted = true;
302       Entry entry;
303       Long lastFlushedSequenceId = -1L;
304       ServerName serverName = HLogUtil.getServerNameFromHLogDirectoryName(logPath);
305       failedServerName = (serverName == null) ? "" : serverName.getServerName();
306       while ((entry = getNextLogLine(in, logPath, skipErrors)) != null) {
307         byte[] region = entry.getKey().getEncodedRegionName();
308         String key = Bytes.toString(region);
309         lastFlushedSequenceId = lastFlushedSequenceIds.get(key);
310         if (lastFlushedSequenceId == null) {
311           if (this.distributedLogReplay) {
312             RegionStoreSequenceIds ids =
313                 SplitLogManager.getRegionFlushedSequenceId(this.watcher, failedServerName, key);
314             if (ids != null) {
315               lastFlushedSequenceId = ids.getLastFlushedSequenceId();
316             }
317           } else if (sequenceIdChecker != null) {
318             lastFlushedSequenceId = sequenceIdChecker.getLastSequenceId(region);
319           }
320           if (lastFlushedSequenceId == null) {
321             lastFlushedSequenceId = -1L;
322           }
323           lastFlushedSequenceIds.put(key, lastFlushedSequenceId);
324         }
325         if (lastFlushedSequenceId >= entry.getKey().getLogSeqNum()) {
326           editsSkipped++;
327           continue;
328         }
329         entryBuffers.appendEntry(entry);
330         editsCount++;
331         int moreWritersFromLastCheck = this.getNumOpenWriters() - numOpenedFilesLastCheck;
332         // If sufficient edits have passed, check if we should report progress.
333         if (editsCount % interval == 0
334             || moreWritersFromLastCheck > numOpenedFilesBeforeReporting) {
335           numOpenedFilesLastCheck = this.getNumOpenWriters();
336           String countsStr = (editsCount - (editsSkipped + outputSink.getSkippedEdits()))
337               + " edits, skipped " + editsSkipped + " edits.";
338           status.setStatus("Split " + countsStr);
339           if (reporter != null && !reporter.progress()) {
340             progress_failed = true;
341             return false;
342           }
343         }
344       }
345     } catch (InterruptedException ie) {
346       IOException iie = new InterruptedIOException();
347       iie.initCause(ie);
348       throw iie;
349     } catch (CorruptedLogFileException e) {
350       LOG.warn("Could not parse, corrupted log file " + logPath, e);
351       ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs);
352       isCorrupted = true;
353     } catch (IOException e) {
354       e = RemoteExceptionHandler.checkIOException(e);
355       throw e;
356     } finally {
357       LOG.info("Finishing writing output logs and closing down.");
358       if (outputSinkStarted) {
359         progress_failed = outputSink.finishWritingAndClose() == null;
360       }
361       String msg = "Processed " + editsCount + " edits across "
362           + outputSink.getNumberOfRecoveredRegions() + " regions; log file=" + logPath
363           + " is corrupted = " + isCorrupted + " progress failed = " + progress_failed;
364       LOG.info(msg);
365       status.markComplete(msg);
366     }
367     return !progress_failed;
368   }
369 
370   /**
371    * Completes the work done by splitLogFile by archiving logs
372    * <p>
373    * It is invoked by SplitLogManager once it knows that one of the
374    * SplitLogWorkers have completed the splitLogFile() part. If the master
375    * crashes then this function might get called multiple times.
376    * <p>
377    * @param logfile
378    * @param conf
379    * @throws IOException
380    */
381   public static void finishSplitLogFile(String logfile,
382       Configuration conf)  throws IOException {
383     Path rootdir = FSUtils.getRootDir(conf);
384     Path oldLogDir = new Path(rootdir, HConstants.HREGION_OLDLOGDIR_NAME);
385     Path logPath;
386     if (FSUtils.isStartingWithPath(rootdir, logfile)) {
387       logPath = new Path(logfile);
388     } else {
389       logPath = new Path(rootdir, logfile);
390     }
391     finishSplitLogFile(rootdir, oldLogDir, logPath, conf);
392   }
393 
394   static void finishSplitLogFile(Path rootdir, Path oldLogDir,
395       Path logPath, Configuration conf) throws IOException {
396     List<Path> processedLogs = new ArrayList<Path>();
397     List<Path> corruptedLogs = new ArrayList<Path>();
398     FileSystem fs;
399     fs = rootdir.getFileSystem(conf);
400     if (ZKSplitLog.isCorrupted(rootdir, logPath.getName(), fs)) {
401       corruptedLogs.add(logPath);
402     } else {
403       processedLogs.add(logPath);
404     }
405     archiveLogs(corruptedLogs, processedLogs, oldLogDir, fs, conf);
406     Path stagingDir = ZKSplitLog.getSplitLogDir(rootdir, logPath.getName());
407     fs.delete(stagingDir, true);
408   }
409 
410   /**
411    * Moves processed logs to a oldLogDir after successful processing Moves
412    * corrupted logs (any log that couldn't be successfully parsed to corruptDir
413    * (.corrupt) for later investigation
414    *
415    * @param corruptedLogs
416    * @param processedLogs
417    * @param oldLogDir
418    * @param fs
419    * @param conf
420    * @throws IOException
421    */
422   private static void archiveLogs(
423       final List<Path> corruptedLogs,
424       final List<Path> processedLogs, final Path oldLogDir,
425       final FileSystem fs, final Configuration conf) throws IOException {
426     final Path corruptDir = new Path(FSUtils.getRootDir(conf), conf.get(
427         "hbase.regionserver.hlog.splitlog.corrupt.dir",  HConstants.CORRUPT_DIR_NAME));
428 
429     if (!fs.mkdirs(corruptDir)) {
430       LOG.info("Unable to mkdir " + corruptDir);
431     }
432     fs.mkdirs(oldLogDir);
433 
434     // this method can get restarted or called multiple times for archiving
435     // the same log files.
436     for (Path corrupted : corruptedLogs) {
437       Path p = new Path(corruptDir, corrupted.getName());
438       if (fs.exists(corrupted)) {
439         if (!fs.rename(corrupted, p)) {
440           LOG.warn("Unable to move corrupted log " + corrupted + " to " + p);
441         } else {
442           LOG.warn("Moved corrupted log " + corrupted + " to " + p);
443         }
444       }
445     }
446 
447     for (Path p : processedLogs) {
448       Path newPath = FSHLog.getHLogArchivePath(oldLogDir, p);
449       if (fs.exists(p)) {
450         if (!FSUtils.renameAndSetModifyTime(fs, p, newPath)) {
451           LOG.warn("Unable to move  " + p + " to " + newPath);
452         } else {
453           LOG.debug("Archived processed log " + p + " to " + newPath);
454         }
455       }
456     }
457   }
458 
459   /**
460    * Path to a file under RECOVERED_EDITS_DIR directory of the region found in
461    * <code>logEntry</code> named for the sequenceid in the passed
462    * <code>logEntry</code>: e.g. /hbase/some_table/2323432434/recovered.edits/2332.
463    * This method also ensures existence of RECOVERED_EDITS_DIR under the region
464    * creating it if necessary.
465    * @param fs
466    * @param logEntry
467    * @param rootDir HBase root dir.
468    * @return Path to file into which to dump split log edits.
469    * @throws IOException
470    */
471   @SuppressWarnings("deprecation")
472   static Path getRegionSplitEditsPath(final FileSystem fs,
473       final Entry logEntry, final Path rootDir, boolean isCreate)
474   throws IOException {
475     Path tableDir = FSUtils.getTableDir(rootDir, logEntry.getKey().getTablename());
476     String encodedRegionName = Bytes.toString(logEntry.getKey().getEncodedRegionName());
477     Path regiondir = HRegion.getRegionDir(tableDir, encodedRegionName);
478     Path dir = HLogUtil.getRegionDirRecoveredEditsDir(regiondir);
479 
480     if (!fs.exists(regiondir)) {
481       LOG.info("This region's directory doesn't exist: "
482           + regiondir.toString() + ". It is very likely that it was" +
483           " already split so it's safe to discard those edits.");
484       return null;
485     }
486     if (fs.exists(dir) && fs.isFile(dir)) {
487       Path tmp = new Path("/tmp");
488       if (!fs.exists(tmp)) {
489         fs.mkdirs(tmp);
490       }
491       tmp = new Path(tmp,
492         HConstants.RECOVERED_EDITS_DIR + "_" + encodedRegionName);
493       LOG.warn("Found existing old file: " + dir + ". It could be some "
494         + "leftover of an old installation. It should be a folder instead. "
495         + "So moving it to " + tmp);
496       if (!fs.rename(dir, tmp)) {
497         LOG.warn("Failed to sideline old file " + dir);
498       }
499     }
500 
501     if (isCreate && !fs.exists(dir)) {
502       if (!fs.mkdirs(dir)) LOG.warn("mkdir failed on " + dir);
503     }
504     // Append file name ends with RECOVERED_LOG_TMPFILE_SUFFIX to ensure
505     // region's replayRecoveredEdits will not delete it
506     String fileName = formatRecoveredEditsFileName(logEntry.getKey().getLogSeqNum());
507     fileName = getTmpRecoveredEditsFileName(fileName);
508     return new Path(dir, fileName);
509   }
510 
511   static String getTmpRecoveredEditsFileName(String fileName) {
512     return fileName + HLog.RECOVERED_LOG_TMPFILE_SUFFIX;
513   }
514 
515   /**
516    * Get the completed recovered edits file path, renaming it to be by last edit
517    * in the file from its first edit. Then we could use the name to skip
518    * recovered edits when doing {@link HRegion#replayRecoveredEditsIfAny}.
519    * @param srcPath
520    * @param maximumEditLogSeqNum
521    * @return dstPath take file's last edit log seq num as the name
522    */
523   static Path getCompletedRecoveredEditsFilePath(Path srcPath,
524       Long maximumEditLogSeqNum) {
525     String fileName = formatRecoveredEditsFileName(maximumEditLogSeqNum);
526     return new Path(srcPath.getParent(), fileName);
527   }
528 
529   static String formatRecoveredEditsFileName(final long seqid) {
530     return String.format("%019d", seqid);
531   }
532 
533   /**
534    * Create a new {@link Reader} for reading logs to split.
535    *
536    * @param fs
537    * @param file
538    * @param conf
539    * @return A new Reader instance
540    * @throws IOException
541    * @throws CorruptedLogFileException
542    */
543   protected Reader getReader(FileSystem fs, FileStatus file, Configuration conf,
544       boolean skipErrors, CancelableProgressable reporter)
545       throws IOException, CorruptedLogFileException {
546     Path path = file.getPath();
547     long length = file.getLen();
548     Reader in;
549 
550     // Check for possibly empty file. With appends, currently Hadoop reports a
551     // zero length even if the file has been sync'd. Revisit if HDFS-376 or
552     // HDFS-878 is committed.
553     if (length <= 0) {
554       LOG.warn("File " + path + " might be still open, length is 0");
555     }
556 
557     try {
558       FSUtils.getInstance(fs, conf).recoverFileLease(fs, path, conf, reporter);
559       try {
560         in = getReader(fs, path, conf, reporter);
561       } catch (EOFException e) {
562         if (length <= 0) {
563           // TODO should we ignore an empty, not-last log file if skip.errors
564           // is false? Either way, the caller should decide what to do. E.g.
565           // ignore if this is the last log in sequence.
566           // TODO is this scenario still possible if the log has been
567           // recovered (i.e. closed)
568           LOG.warn("Could not open " + path + " for reading. File is empty", e);
569           return null;
570         } else {
571           // EOFException being ignored
572           return null;
573         }
574       }
575     } catch (IOException e) {
576       if (e instanceof FileNotFoundException) {
577         // A wal file may not exist anymore. Nothing can be recovered so move on
578         LOG.warn("File " + path + " doesn't exist anymore.", e);
579         return null;
580       }
581       if (!skipErrors || e instanceof InterruptedIOException) {
582         throw e; // Don't mark the file corrupted if interrupted, or not skipErrors
583       }
584       CorruptedLogFileException t =
585         new CorruptedLogFileException("skipErrors=true Could not open hlog " +
586             path + " ignoring");
587       t.initCause(e);
588       throw t;
589     }
590     return in;
591   }
592 
593   static private Entry getNextLogLine(Reader in, Path path, boolean skipErrors)
594   throws CorruptedLogFileException, IOException {
595     try {
596       return in.next();
597     } catch (EOFException eof) {
598       // truncated files are expected if a RS crashes (see HBASE-2643)
599       LOG.info("EOF from hlog " + path + ".  continuing");
600       return null;
601     } catch (IOException e) {
602       // If the IOE resulted from bad file format,
603       // then this problem is idempotent and retrying won't help
604       if (e.getCause() != null &&
605           (e.getCause() instanceof ParseException ||
606            e.getCause() instanceof org.apache.hadoop.fs.ChecksumException)) {
607         LOG.warn("Parse exception " + e.getCause().toString() + " from hlog "
608            + path + ".  continuing");
609         return null;
610       }
611       if (!skipErrors) {
612         throw e;
613       }
614       CorruptedLogFileException t =
615         new CorruptedLogFileException("skipErrors=true Ignoring exception" +
616             " while parsing hlog " + path + ". Marking as corrupted");
617       t.initCause(e);
618       throw t;
619     }
620   }
621 
622   private void writerThreadError(Throwable t) {
623     thrown.compareAndSet(null, t);
624   }
625 
626   /**
627    * Check for errors in the writer threads. If any is found, rethrow it.
628    */
629   private void checkForErrors() throws IOException {
630     Throwable thrown = this.thrown.get();
631     if (thrown == null) return;
632     if (thrown instanceof IOException) {
633       throw new IOException(thrown);
634     } else {
635       throw new RuntimeException(thrown);
636     }
637   }
638   /**
639    * Create a new {@link Writer} for writing log splits.
640    */
641   protected Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
642       throws IOException {
643     return HLogFactory.createRecoveredEditsWriter(fs, logfile, conf);
644   }
645 
646   /**
647    * Create a new {@link Reader} for reading logs to split.
648    */
649   protected Reader getReader(FileSystem fs, Path curLogFile,
650       Configuration conf, CancelableProgressable reporter) throws IOException {
651     return HLogFactory.createReader(fs, curLogFile, conf, reporter);
652   }
653 
654   /**
655    * Get current open writers
656    */
657   private int getNumOpenWriters() {
658     int result = 0;
659     if (this.outputSink != null) {
660       result += this.outputSink.getNumOpenWriters();
661     }
662     return result;
663   }
664 
665   /**
666    * Class which accumulates edits and separates them into a buffer per region
667    * while simultaneously accounting RAM usage. Blocks if the RAM usage crosses
668    * a predefined threshold.
669    *
670    * Writer threads then pull region-specific buffers from this class.
671    */
672   class EntryBuffers {
673     Map<byte[], RegionEntryBuffer> buffers =
674       new TreeMap<byte[], RegionEntryBuffer>(Bytes.BYTES_COMPARATOR);
675 
676     /* Track which regions are currently in the middle of writing. We don't allow
677        an IO thread to pick up bytes from a region if we're already writing
678        data for that region in a different IO thread. */
679     Set<byte[]> currentlyWriting = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
680 
681     long totalBuffered = 0;
682     long maxHeapUsage;
683 
684     EntryBuffers(long maxHeapUsage) {
685       this.maxHeapUsage = maxHeapUsage;
686     }
687 
688     /**
689      * Append a log entry into the corresponding region buffer.
690      * Blocks if the total heap usage has crossed the specified threshold.
691      *
692      * @throws InterruptedException
693      * @throws IOException
694      */
695     void appendEntry(Entry entry) throws InterruptedException, IOException {
696       HLogKey key = entry.getKey();
697 
698       RegionEntryBuffer buffer;
699       long incrHeap;
700       synchronized (this) {
701         buffer = buffers.get(key.getEncodedRegionName());
702         if (buffer == null) {
703           buffer = new RegionEntryBuffer(key.getTablename(), key.getEncodedRegionName());
704           buffers.put(key.getEncodedRegionName(), buffer);
705         }
706         incrHeap= buffer.appendEntry(entry);
707       }
708 
709       // If we crossed the chunk threshold, wait for more space to be available
710       synchronized (dataAvailable) {
711         totalBuffered += incrHeap;
712         while (totalBuffered > maxHeapUsage && thrown.get() == null) {
713           LOG.debug("Used " + totalBuffered + " bytes of buffered edits, waiting for IO threads...");
714           dataAvailable.wait(2000);
715         }
716         dataAvailable.notifyAll();
717       }
718       checkForErrors();
719     }
720 
721     /**
722      * @return RegionEntryBuffer a buffer of edits to be written or replayed.
723      */
724     synchronized RegionEntryBuffer getChunkToWrite() {
725       long biggestSize = 0;
726       byte[] biggestBufferKey = null;
727 
728       for (Map.Entry<byte[], RegionEntryBuffer> entry : buffers.entrySet()) {
729         long size = entry.getValue().heapSize();
730         if (size > biggestSize && (!currentlyWriting.contains(entry.getKey()))) {
731           biggestSize = size;
732           biggestBufferKey = entry.getKey();
733         }
734       }
735       if (biggestBufferKey == null) {
736         return null;
737       }
738 
739       RegionEntryBuffer buffer = buffers.remove(biggestBufferKey);
740       currentlyWriting.add(biggestBufferKey);
741       return buffer;
742     }
743 
744     void doneWriting(RegionEntryBuffer buffer) {
745       synchronized (this) {
746         boolean removed = currentlyWriting.remove(buffer.encodedRegionName);
747         assert removed;
748       }
749       long size = buffer.heapSize();
750 
751       synchronized (dataAvailable) {
752         totalBuffered -= size;
753         // We may unblock writers
754         dataAvailable.notifyAll();
755       }
756     }
757 
758     synchronized boolean isRegionCurrentlyWriting(byte[] region) {
759       return currentlyWriting.contains(region);
760     }
761   }
762 
763   /**
764    * A buffer of some number of edits for a given region.
765    * This accumulates edits and also provides a memory optimization in order to
766    * share a single byte array instance for the table and region name.
767    * Also tracks memory usage of the accumulated edits.
768    */
769   static class RegionEntryBuffer implements HeapSize {
770     long heapInBuffer = 0;
771     List<Entry> entryBuffer;
772     TableName tableName;
773     byte[] encodedRegionName;
774 
775     RegionEntryBuffer(TableName tableName, byte[] region) {
776       this.tableName = tableName;
777       this.encodedRegionName = region;
778       this.entryBuffer = new LinkedList<Entry>();
779     }
780 
781     long appendEntry(Entry entry) {
782       internify(entry);
783       entryBuffer.add(entry);
784       long incrHeap = entry.getEdit().heapSize() +
785         ClassSize.align(2 * ClassSize.REFERENCE) + // HLogKey pointers
786         0; // TODO linkedlist entry
787       heapInBuffer += incrHeap;
788       return incrHeap;
789     }
790 
791     private void internify(Entry entry) {
792       HLogKey k = entry.getKey();
793       k.internTableName(this.tableName);
794       k.internEncodedRegionName(this.encodedRegionName);
795     }
796 
797     public long heapSize() {
798       return heapInBuffer;
799     }
800   }
801 
802   class WriterThread extends Thread {
803     private volatile boolean shouldStop = false;
804     private OutputSink outputSink = null;
805 
806     WriterThread(OutputSink sink, int i) {
807       super(Thread.currentThread().getName() + "-Writer-" + i);
808       outputSink = sink;
809     }
810 
811     public void run()  {
812       try {
813         doRun();
814       } catch (Throwable t) {
815         LOG.error("Exiting thread", t);
816         writerThreadError(t);
817       }
818     }
819 
820     private void doRun() throws IOException {
821       LOG.debug("Writer thread " + this + ": starting");
822       while (true) {
823         RegionEntryBuffer buffer = entryBuffers.getChunkToWrite();
824         if (buffer == null) {
825           // No data currently available, wait on some more to show up
826           synchronized (dataAvailable) {
827             if (shouldStop && !this.outputSink.flush()) {
828               return;
829             }
830             try {
831               dataAvailable.wait(500);
832             } catch (InterruptedException ie) {
833               if (!shouldStop) {
834                 throw new RuntimeException(ie);
835               }
836             }
837           }
838           continue;
839         }
840 
841         assert buffer != null;
842         try {
843           writeBuffer(buffer);
844         } finally {
845           entryBuffers.doneWriting(buffer);
846         }
847       }
848     }
849 
850     private void writeBuffer(RegionEntryBuffer buffer) throws IOException {
851       outputSink.append(buffer);
852     }
853 
854     void finish() {
855       synchronized (dataAvailable) {
856         shouldStop = true;
857         dataAvailable.notifyAll();
858       }
859     }
860   }
861 
862   /**
863    * The following class is an abstraction class to provide a common interface to support both
864    * existing recovered edits file sink and region server WAL edits replay sink
865    */
866    abstract class OutputSink {
867 
868     protected Map<byte[], SinkWriter> writers = Collections
869         .synchronizedMap(new TreeMap<byte[], SinkWriter>(Bytes.BYTES_COMPARATOR));;
870 
871     protected final Map<byte[], Long> regionMaximumEditLogSeqNum = Collections
872         .synchronizedMap(new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR));
873 
874     protected final List<WriterThread> writerThreads = Lists.newArrayList();
875 
876     /* Set of regions which we've decided should not output edits */
877     protected final Set<byte[]> blacklistedRegions = Collections
878         .synchronizedSet(new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR));
879 
880     protected boolean closeAndCleanCompleted = false;
881 
882     protected boolean writersClosed = false;
883 
884     protected final int numThreads;
885 
886     protected CancelableProgressable reporter = null;
887 
888     protected AtomicLong skippedEdits = new AtomicLong();
889 
890     protected List<Path> splits = null;
891 
892     public OutputSink(int numWriters) {
893       numThreads = numWriters;
894     }
895 
896     void setReporter(CancelableProgressable reporter) {
897       this.reporter = reporter;
898     }
899 
900     /**
901      * Start the threads that will pump data from the entryBuffers to the output files.
902      */
903     synchronized void startWriterThreads() {
904       for (int i = 0; i < numThreads; i++) {
905         WriterThread t = new WriterThread(this, i);
906         t.start();
907         writerThreads.add(t);
908       }
909     }
910 
911     /**
912      *
913      * Update region's maximum edit log SeqNum.
914      */
915     void updateRegionMaximumEditLogSeqNum(Entry entry) {
916       synchronized (regionMaximumEditLogSeqNum) {
917         Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(entry.getKey()
918             .getEncodedRegionName());
919         if (currentMaxSeqNum == null || entry.getKey().getLogSeqNum() > currentMaxSeqNum) {
920           regionMaximumEditLogSeqNum.put(entry.getKey().getEncodedRegionName(), entry.getKey()
921               .getLogSeqNum());
922         }
923       }
924     }
925 
926     Long getRegionMaximumEditLogSeqNum(byte[] region) {
927       return regionMaximumEditLogSeqNum.get(region);
928     }
929 
930     /**
931      * @return the number of currently opened writers
932      */
933     int getNumOpenWriters() {
934       return this.writers.size();
935     }
936 
937     long getSkippedEdits() {
938       return this.skippedEdits.get();
939     }
940 
941     /**
942      * Wait for writer threads to dump all info to the sink
943      * @return true when there is no error
944      * @throws IOException
945      */
946     protected boolean finishWriting() throws IOException {
947       LOG.info("Waiting for split writer threads to finish");
948       boolean progress_failed = false;
949       for (WriterThread t : writerThreads) {
950         t.finish();
951       }
952       for (WriterThread t : writerThreads) {
953         if (!progress_failed && reporter != null && !reporter.progress()) {
954           progress_failed = true;
955         }
956         try {
957           t.join();
958         } catch (InterruptedException ie) {
959           IOException iie = new InterruptedIOException();
960           iie.initCause(ie);
961           throw iie;
962         }
963       }
964       checkForErrors();
965       LOG.info("Split writers finished");
966       return (!progress_failed);
967     }
968 
969     abstract List<Path> finishWritingAndClose() throws IOException;
970 
971     /**
972      * @return a map from encoded region ID to the number of edits written out for that region.
973      */
974     abstract Map<byte[], Long> getOutputCounts();
975 
976     /**
977      * @return number of regions we've recovered
978      */
979     abstract int getNumberOfRecoveredRegions();
980 
981     /**
982      * @param buffer A WAL Edit Entry
983      * @throws IOException
984      */
985     abstract void append(RegionEntryBuffer buffer) throws IOException;
986 
987     /**
988      * WriterThread call this function to help flush internal remaining edits in buffer before close
989      * @return true when underlying sink has something to flush
990      */
991     protected boolean flush() throws IOException {
992       return false;
993     }
994   }
995 
996   /**
997    * Class that manages the output streams from the log splitting process.
998    */
999   class LogRecoveredEditsOutputSink extends OutputSink {
1000 
1001     public LogRecoveredEditsOutputSink(int numWriters) {
1002       // More threads could potentially write faster at the expense
1003       // of causing more disk seeks as the logs are split.
1004       // 3. After a certain setting (probably around 3) the
1005       // process will be bound on the reader in the current
1006       // implementation anyway.
1007       super(numWriters);
1008     }
1009 
1010     /**
1011      * @return null if failed to report progress
1012      * @throws IOException
1013      */
1014     @Override
1015     List<Path> finishWritingAndClose() throws IOException {
1016       boolean isSuccessful = false;
1017       List<Path> result = null;
1018       try {
1019         isSuccessful = finishWriting();
1020       } finally {
1021         result = close();
1022         List<IOException> thrown = closeLogWriters(null);
1023         if (thrown != null && !thrown.isEmpty()) {
1024           throw MultipleIOException.createIOException(thrown);
1025         }
1026       }
1027       if (isSuccessful) {
1028         splits = result;
1029       }
1030       return splits;
1031     }
1032 
1033     /**
1034      * Close all of the output streams.
1035      * @return the list of paths written.
1036      */
1037     private List<Path> close() throws IOException {
1038       Preconditions.checkState(!closeAndCleanCompleted);
1039 
1040       final List<Path> paths = new ArrayList<Path>();
1041       final List<IOException> thrown = Lists.newArrayList();
1042       ThreadPoolExecutor closeThreadPool = Threads.getBoundedCachedThreadPool(numThreads, 30L,
1043         TimeUnit.SECONDS, new ThreadFactory() {
1044           private int count = 1;
1045 
1046           public Thread newThread(Runnable r) {
1047             Thread t = new Thread(r, "split-log-closeStream-" + count++);
1048             return t;
1049           }
1050         });
1051       CompletionService<Void> completionService =
1052         new ExecutorCompletionService<Void>(closeThreadPool);
1053       for (final Map.Entry<byte[], SinkWriter> writersEntry : writers.entrySet()) {
1054         LOG.debug("Submitting close of " + ((WriterAndPath)writersEntry.getValue()).p);
1055         completionService.submit(new Callable<Void>() {
1056           public Void call() throws Exception {
1057             WriterAndPath wap = (WriterAndPath) writersEntry.getValue();
1058             LOG.debug("Closing " + wap.p);
1059             try {
1060               wap.w.close();
1061             } catch (IOException ioe) {
1062               LOG.error("Couldn't close log at " + wap.p, ioe);
1063               thrown.add(ioe);
1064               return null;
1065             }
1066             LOG.info("Closed wap " + wap.p + " (wrote " + wap.editsWritten + " edits in "
1067                 + (wap.nanosSpent / 1000 / 1000) + "ms)");
1068 
1069             if (wap.editsWritten == 0) {
1070               // just remove the empty recovered.edits file
1071               if (fs.exists(wap.p) && !fs.delete(wap.p, false)) {
1072                 LOG.warn("Failed deleting empty " + wap.p);
1073                 throw new IOException("Failed deleting empty  " + wap.p);
1074               }
1075               return null;
1076             }
1077 
1078             Path dst = getCompletedRecoveredEditsFilePath(wap.p,
1079               regionMaximumEditLogSeqNum.get(writersEntry.getKey()));
1080             try {
1081               if (!dst.equals(wap.p) && fs.exists(dst)) {
1082                 LOG.warn("Found existing old edits file. It could be the "
1083                     + "result of a previous failed split attempt. Deleting " + dst + ", length="
1084                     + fs.getFileStatus(dst).getLen());
1085                 if (!fs.delete(dst, false)) {
1086                   LOG.warn("Failed deleting of old " + dst);
1087                   throw new IOException("Failed deleting of old " + dst);
1088                 }
1089               }
1090               // Skip the unit tests which create a splitter that reads and
1091               // writes the data without touching disk.
1092               // TestHLogSplit#testThreading is an example.
1093               if (fs.exists(wap.p)) {
1094                 if (!fs.rename(wap.p, dst)) {
1095                   throw new IOException("Failed renaming " + wap.p + " to " + dst);
1096                 }
1097                 LOG.debug("Rename " + wap.p + " to " + dst);
1098               }
1099             } catch (IOException ioe) {
1100               LOG.error("Couldn't rename " + wap.p + " to " + dst, ioe);
1101               thrown.add(ioe);
1102               return null;
1103             }
1104             paths.add(dst);
1105             return null;
1106           }
1107         });
1108       }
1109 
1110       boolean progress_failed = false;
1111       try {
1112         for (int i = 0, n = this.writers.size(); i < n; i++) {
1113           Future<Void> future = completionService.take();
1114           future.get();
1115           if (!progress_failed && reporter != null && !reporter.progress()) {
1116             progress_failed = true;
1117           }
1118         }
1119       } catch (InterruptedException e) {
1120         IOException iie = new InterruptedIOException();
1121         iie.initCause(e);
1122         throw iie;
1123       } catch (ExecutionException e) {
1124         throw new IOException(e.getCause());
1125       } finally {
1126         closeThreadPool.shutdownNow();
1127       }
1128 
1129       if (!thrown.isEmpty()) {
1130         throw MultipleIOException.createIOException(thrown);
1131       }
1132       writersClosed = true;
1133       closeAndCleanCompleted = true;
1134       if (progress_failed) {
1135         return null;
1136       }
1137       return paths;
1138     }
1139 
1140     private List<IOException> closeLogWriters(List<IOException> thrown) throws IOException {
1141       if (writersClosed) {
1142         return thrown;
1143       }
1144 
1145       if (thrown == null) {
1146         thrown = Lists.newArrayList();
1147       }
1148       try {
1149         for (WriterThread t : writerThreads) {
1150           while (t.isAlive()) {
1151             t.shouldStop = true;
1152             t.interrupt();
1153             try {
1154               t.join(10);
1155             } catch (InterruptedException e) {
1156               IOException iie = new InterruptedIOException();
1157               iie.initCause(e);
1158               throw iie;
1159             }
1160           }
1161         }
1162       } finally {
1163         synchronized (writers) {
1164           WriterAndPath wap = null;
1165           for (SinkWriter tmpWAP : writers.values()) {
1166             try {
1167               wap = (WriterAndPath) tmpWAP;
1168               wap.w.close();
1169             } catch (IOException ioe) {
1170               LOG.error("Couldn't close log at " + wap.p, ioe);
1171               thrown.add(ioe);
1172               continue;
1173             }
1174             LOG.info("Closed log " + wap.p + " (wrote " + wap.editsWritten + " edits in "
1175                 + (wap.nanosSpent / 1000 / 1000) + "ms)");
1176           }
1177         }
1178         writersClosed = true;
1179       }
1180 
1181       return thrown;
1182     }
1183 
1184     /**
1185      * Get a writer and path for a log starting at the given entry. This function is threadsafe so
1186      * long as multiple threads are always acting on different regions.
1187      * @return null if this region shouldn't output any logs
1188      */
1189     private WriterAndPath getWriterAndPath(Entry entry) throws IOException {
1190       byte region[] = entry.getKey().getEncodedRegionName();
1191       WriterAndPath ret = (WriterAndPath) writers.get(region);
1192       if (ret != null) {
1193         return ret;
1194       }
1195       // If we already decided that this region doesn't get any output
1196       // we don't need to check again.
1197       if (blacklistedRegions.contains(region)) {
1198         return null;
1199       }
1200       ret = createWAP(region, entry, rootDir, fs, conf);
1201       if (ret == null) {
1202         blacklistedRegions.add(region);
1203         return null;
1204       }
1205       writers.put(region, ret);
1206       return ret;
1207     }
1208 
1209     private WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir, FileSystem fs,
1210         Configuration conf) throws IOException {
1211       Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, true);
1212       if (regionedits == null) {
1213         return null;
1214       }
1215       if (fs.exists(regionedits)) {
1216         LOG.warn("Found old edits file. It could be the "
1217             + "result of a previous failed split attempt. Deleting " + regionedits + ", length="
1218             + fs.getFileStatus(regionedits).getLen());
1219         if (!fs.delete(regionedits, false)) {
1220           LOG.warn("Failed delete of old " + regionedits);
1221         }
1222       }
1223       Writer w = createWriter(fs, regionedits, conf);
1224       LOG.debug("Creating writer path=" + regionedits + " region=" + Bytes.toStringBinary(region));
1225       return (new WriterAndPath(regionedits, w));
1226     }
1227 
1228     void append(RegionEntryBuffer buffer) throws IOException {
1229       List<Entry> entries = buffer.entryBuffer;
1230       if (entries.isEmpty()) {
1231         LOG.warn("got an empty buffer, skipping");
1232         return;
1233       }
1234 
1235       WriterAndPath wap = null;
1236 
1237       long startTime = System.nanoTime();
1238       try {
1239         int editsCount = 0;
1240 
1241         for (Entry logEntry : entries) {
1242           if (wap == null) {
1243             wap = getWriterAndPath(logEntry);
1244             if (wap == null) {
1245               // getWriterAndPath decided we don't need to write these edits
1246               return;
1247             }
1248           }
1249           wap.w.append(logEntry);
1250           this.updateRegionMaximumEditLogSeqNum(logEntry);
1251           editsCount++;
1252         }
1253         // Pass along summary statistics
1254         wap.incrementEdits(editsCount);
1255         wap.incrementNanoTime(System.nanoTime() - startTime);
1256       } catch (IOException e) {
1257         e = RemoteExceptionHandler.checkIOException(e);
1258         LOG.fatal(" Got while writing log entry to log", e);
1259         throw e;
1260       }
1261     }
1262 
1263     /**
1264      * @return a map from encoded region ID to the number of edits written out for that region.
1265      */
1266     Map<byte[], Long> getOutputCounts() {
1267       TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
1268       synchronized (writers) {
1269         for (Map.Entry<byte[], SinkWriter> entry : writers.entrySet()) {
1270           ret.put(entry.getKey(), entry.getValue().editsWritten);
1271         }
1272       }
1273       return ret;
1274     }
1275 
1276     @Override
1277     int getNumberOfRecoveredRegions() {
1278       return writers.size();
1279     }
1280   }
1281 
1282   /**
1283    * Class wraps the actual writer which writes data out and related statistics
1284    */
1285   private abstract static class SinkWriter {
1286     /* Count of edits written to this path */
1287     long editsWritten = 0;
1288     /* Number of nanos spent writing to this log */
1289     long nanosSpent = 0;
1290 
1291     void incrementEdits(int edits) {
1292       editsWritten += edits;
1293     }
1294 
1295     void incrementNanoTime(long nanos) {
1296       nanosSpent += nanos;
1297     }
1298   }
1299 
1300   /**
1301    * Private data structure that wraps a Writer and its Path, also collecting statistics about the
1302    * data written to this output.
1303    */
1304   private final static class WriterAndPath extends SinkWriter {
1305     final Path p;
1306     final Writer w;
1307 
1308     WriterAndPath(final Path p, final Writer w) {
1309       this.p = p;
1310       this.w = w;
1311     }
1312   }
1313 
1314   /**
1315    * Class that manages to replay edits from WAL files directly to assigned fail over region servers
1316    */
1317   class LogReplayOutputSink extends OutputSink {
1318     private static final double BUFFER_THRESHOLD = 0.35;
1319     private static final String KEY_DELIMITER = "#";
1320 
1321     private long waitRegionOnlineTimeOut;
1322     private final Set<String> recoveredRegions = Collections.synchronizedSet(new HashSet<String>());
1323     private final Map<String, RegionServerWriter> writers =
1324         new ConcurrentHashMap<String, RegionServerWriter>();
1325     // online encoded region name -> region location map
1326     private final Map<String, HRegionLocation> onlineRegions =
1327         new ConcurrentHashMap<String, HRegionLocation>();
1328 
1329     private Map<TableName, HConnection> tableNameToHConnectionMap = Collections
1330         .synchronizedMap(new TreeMap<TableName, HConnection>());
1331     /**
1332      * Map key -> value layout
1333      * <servername>:<table name> -> Queue<Row>
1334      */
1335     private Map<String, List<Pair<HRegionLocation, HLog.Entry>>> serverToBufferQueueMap =
1336         new ConcurrentHashMap<String, List<Pair<HRegionLocation, HLog.Entry>>>();
1337     private List<Throwable> thrown = new ArrayList<Throwable>();
1338 
1339     // The following sink is used in distrubitedLogReplay mode for entries of regions in a disabling
1340     // table. It's a limitation of distributedLogReplay. Because log replay needs a region is
1341     // assigned and online before it can replay wal edits while regions of disabling/disabled table
1342     // won't be assigned by AM. We can retire this code after HBASE-8234.
1343     private LogRecoveredEditsOutputSink logRecoveredEditsOutputSink;
1344     private boolean hasEditsInDisablingOrDisabledTables = false;
1345 
1346     public LogReplayOutputSink(int numWriters) {
1347       super(numWriters);
1348       this.waitRegionOnlineTimeOut = conf.getInt("hbase.splitlog.manager.timeout",
1349         SplitLogManager.DEFAULT_TIMEOUT);
1350       this.logRecoveredEditsOutputSink = new LogRecoveredEditsOutputSink(numWriters);
1351       this.logRecoveredEditsOutputSink.setReporter(reporter);
1352     }
1353 
1354     void append(RegionEntryBuffer buffer) throws IOException {
1355       List<Entry> entries = buffer.entryBuffer;
1356       if (entries.isEmpty()) {
1357         LOG.warn("got an empty buffer, skipping");
1358         return;
1359       }
1360 
1361       // check if current region in a disabling or disabled table
1362       if (disablingOrDisabledTables.contains(buffer.tableName)) {
1363         // need fall back to old way
1364         logRecoveredEditsOutputSink.append(buffer);
1365         hasEditsInDisablingOrDisabledTables = true;
1366         // store regions we have recovered so far
1367         addToRecoveredRegions(Bytes.toString(buffer.encodedRegionName));
1368         return;
1369       }
1370 
1371       // group entries by region servers
1372       groupEditsByServer(entries);
1373 
1374       // process workitems
1375       String maxLocKey = null;
1376       int maxSize = 0;
1377       List<Pair<HRegionLocation, HLog.Entry>> maxQueue = null;
1378       synchronized (this.serverToBufferQueueMap) {
1379         for (String key : this.serverToBufferQueueMap.keySet()) {
1380           List<Pair<HRegionLocation, HLog.Entry>> curQueue = this.serverToBufferQueueMap.get(key);
1381           if (curQueue.size() > maxSize) {
1382             maxSize = curQueue.size();
1383             maxQueue = curQueue;
1384             maxLocKey = key;
1385           }
1386         }
1387         if (maxSize < minBatchSize
1388             && entryBuffers.totalBuffered < BUFFER_THRESHOLD * entryBuffers.maxHeapUsage) {
1389           // buffer more to process
1390           return;
1391         } else if (maxSize > 0) {
1392           this.serverToBufferQueueMap.remove(maxLocKey);
1393         }
1394       }
1395 
1396       if (maxSize > 0) {
1397         processWorkItems(maxLocKey, maxQueue);
1398       }
1399     }
1400 
1401     private void addToRecoveredRegions(String encodedRegionName) {
1402       if (!recoveredRegions.contains(encodedRegionName)) {
1403         recoveredRegions.add(encodedRegionName);
1404       }
1405     }
1406 
1407     /**
1408      * Helper function to group WALEntries to individual region servers
1409      * @throws IOException
1410      */
1411     private void groupEditsByServer(List<Entry> entries) throws IOException {
1412       Set<TableName> nonExistentTables = null;
1413       Long cachedLastFlushedSequenceId = -1l;
1414       for (HLog.Entry entry : entries) {
1415         WALEdit edit = entry.getEdit();
1416         TableName table = entry.getKey().getTablename();
1417         // clear scopes which isn't needed for recovery
1418         entry.getKey().setScopes(null);
1419         String encodeRegionNameStr = Bytes.toString(entry.getKey().getEncodedRegionName());
1420         // skip edits of non-existent tables
1421         if (nonExistentTables != null && nonExistentTables.contains(table)) {
1422           this.skippedEdits.incrementAndGet();
1423           continue;
1424         }
1425 
1426         Map<byte[], Long> maxStoreSequenceIds = null;
1427         boolean needSkip = false;
1428         HRegionLocation loc = null;
1429         String locKey = null;
1430         List<KeyValue> kvs = edit.getKeyValues();
1431         List<KeyValue> skippedKVs = new ArrayList<KeyValue>();
1432         HConnection hconn = this.getConnectionByTableName(table);
1433 
1434         for (KeyValue kv : kvs) {
1435           // filtering HLog meta entries
1436           // We don't handle HBASE-2231 because we may or may not replay a compaction event.
1437           // Details at https://issues.apache.org/jira/browse/HBASE-2231?focusedCommentId=13647143&
1438           // page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13647143
1439           if (kv.matchingFamily(WALEdit.METAFAMILY)) {
1440             skippedKVs.add(kv);
1441             continue;
1442           }
1443 
1444           try {
1445             loc =
1446                 locateRegionAndRefreshLastFlushedSequenceId(hconn, table, kv.getRow(),
1447                   encodeRegionNameStr);
1448           } catch (TableNotFoundException ex) {
1449             // table has been deleted so skip edits of the table
1450             LOG.info("Table " + table + " doesn't exist. Skip log replay for region "
1451                 + encodeRegionNameStr);
1452             lastFlushedSequenceIds.put(encodeRegionNameStr, Long.MAX_VALUE);
1453             if (nonExistentTables == null) {
1454               nonExistentTables = new TreeSet<TableName>();
1455             }
1456             nonExistentTables.add(table);
1457             this.skippedEdits.incrementAndGet();
1458             needSkip = true;
1459             break;
1460           }
1461 
1462           cachedLastFlushedSequenceId =
1463               lastFlushedSequenceIds.get(loc.getRegionInfo().getEncodedName());
1464           if (cachedLastFlushedSequenceId != null
1465               && cachedLastFlushedSequenceId >= entry.getKey().getLogSeqNum()) {
1466             // skip the whole HLog entry
1467             this.skippedEdits.incrementAndGet();
1468             needSkip = true;
1469             break;
1470           } else {
1471             if (maxStoreSequenceIds == null) {
1472               maxStoreSequenceIds =
1473                   regionMaxSeqIdInStores.get(loc.getRegionInfo().getEncodedName());
1474             }
1475             if (maxStoreSequenceIds != null) {
1476               Long maxStoreSeqId = maxStoreSequenceIds.get(kv.getFamily());
1477               if (maxStoreSeqId == null || maxStoreSeqId >= entry.getKey().getLogSeqNum()) {
1478                 // skip current kv if column family doesn't exist anymore or already flushed
1479                 skippedKVs.add(kv);
1480                 continue;
1481               }
1482             }
1483           }
1484         }
1485 
1486         // skip the edit
1487         if (loc == null || needSkip) continue;
1488 
1489         if (!skippedKVs.isEmpty()) {
1490           kvs.removeAll(skippedKVs);
1491         }
1492 
1493         synchronized (serverToBufferQueueMap) {
1494           locKey = loc.getHostnamePort() + KEY_DELIMITER + table;
1495           List<Pair<HRegionLocation, HLog.Entry>> queue = serverToBufferQueueMap.get(locKey);
1496           if (queue == null) {
1497             queue =
1498                 Collections.synchronizedList(new ArrayList<Pair<HRegionLocation, HLog.Entry>>());
1499             serverToBufferQueueMap.put(locKey, queue);
1500           }
1501           queue.add(new Pair<HRegionLocation, HLog.Entry>(loc, entry));
1502         }
1503         // store regions we have recovered so far
1504         addToRecoveredRegions(loc.getRegionInfo().getEncodedName());
1505       }
1506     }
1507 
1508     /**
1509      * Locate destination region based on table name & row. This function also makes sure the
1510      * destination region is online for replay.
1511      * @throws IOException
1512      */
1513     private HRegionLocation locateRegionAndRefreshLastFlushedSequenceId(HConnection hconn,
1514         TableName table, byte[] row, String originalEncodedRegionName) throws IOException {
1515       // fetch location from cache
1516       HRegionLocation loc = onlineRegions.get(originalEncodedRegionName);
1517       if(loc != null) return loc;
1518       // fetch location from hbase:meta directly without using cache to avoid hit old dead server
1519       loc = hconn.getRegionLocation(table, row, true);
1520       if (loc == null) {
1521         throw new IOException("Can't locate location for row:" + Bytes.toString(row)
1522             + " of table:" + table);
1523       }
1524       // check if current row moves to a different region due to region merge/split
1525       if (!originalEncodedRegionName.equalsIgnoreCase(loc.getRegionInfo().getEncodedName())) {
1526         // originalEncodedRegionName should have already flushed
1527         lastFlushedSequenceIds.put(originalEncodedRegionName, Long.MAX_VALUE);
1528         HRegionLocation tmpLoc = onlineRegions.get(loc.getRegionInfo().getEncodedName());
1529         if (tmpLoc != null) return tmpLoc;
1530       }
1531 
1532       Long lastFlushedSequenceId = -1l;
1533       AtomicBoolean isRecovering = new AtomicBoolean(true);
1534       loc = waitUntilRegionOnline(loc, row, this.waitRegionOnlineTimeOut, isRecovering);
1535       if (!isRecovering.get()) {
1536         // region isn't in recovering at all because WAL file may contain a region that has
1537         // been moved to somewhere before hosting RS fails
1538         lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), Long.MAX_VALUE);
1539         LOG.info("logReplay skip region: " + loc.getRegionInfo().getEncodedName()
1540             + " because it's not in recovering.");
1541       } else {
1542         Long cachedLastFlushedSequenceId =
1543             lastFlushedSequenceIds.get(loc.getRegionInfo().getEncodedName());
1544 
1545         // retrieve last flushed sequence Id from ZK. Because region postOpenDeployTasks will
1546         // update the value for the region
1547         RegionStoreSequenceIds ids =
1548             SplitLogManager.getRegionFlushedSequenceId(watcher, failedServerName, loc
1549                 .getRegionInfo().getEncodedName());
1550         if (ids != null) {
1551           lastFlushedSequenceId = ids.getLastFlushedSequenceId();
1552           Map<byte[], Long> storeIds = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
1553           List<StoreSequenceId> maxSeqIdInStores = ids.getStoreSequenceIdList();
1554           for (StoreSequenceId id : maxSeqIdInStores) {
1555             storeIds.put(id.getFamilyName().toByteArray(), id.getSequenceId());
1556           }
1557           regionMaxSeqIdInStores.put(loc.getRegionInfo().getEncodedName(), storeIds);
1558         }
1559 
1560         if (cachedLastFlushedSequenceId == null
1561             || lastFlushedSequenceId > cachedLastFlushedSequenceId) {
1562           lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), lastFlushedSequenceId);
1563         }
1564       }
1565 
1566       onlineRegions.put(loc.getRegionInfo().getEncodedName(), loc);
1567       return loc;
1568     }
1569 
1570     private void processWorkItems(String key, List<Pair<HRegionLocation, HLog.Entry>> actions)
1571         throws IOException {
1572       RegionServerWriter rsw = null;
1573 
1574       long startTime = System.nanoTime();
1575       try {
1576         rsw = getRegionServerWriter(key);
1577         rsw.sink.replayEntries(actions);
1578 
1579         // Pass along summary statistics
1580         rsw.incrementEdits(actions.size());
1581         rsw.incrementNanoTime(System.nanoTime() - startTime);
1582       } catch (IOException e) {
1583         e = RemoteExceptionHandler.checkIOException(e);
1584         LOG.fatal(" Got while writing log entry to log", e);
1585         throw e;
1586       }
1587     }
1588 
1589     /**
1590      * Wait until region is online on the destination region server
1591      * @param loc
1592      * @param row
1593      * @param timeout How long to wait
1594      * @param isRecovering Recovering state of the region interested on destination region server.
1595      * @return True when region is online on the destination region server
1596      * @throws InterruptedException
1597      */
1598     private HRegionLocation waitUntilRegionOnline(HRegionLocation loc, byte[] row,
1599         final long timeout, AtomicBoolean isRecovering)
1600         throws IOException {
1601       final long endTime = EnvironmentEdgeManager.currentTimeMillis() + timeout;
1602       final long pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
1603         HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
1604       boolean reloadLocation = false;
1605       TableName tableName = loc.getRegionInfo().getTable();
1606       int tries = 0;
1607       Throwable cause = null;
1608       while (endTime > EnvironmentEdgeManager.currentTimeMillis()) {
1609         try {
1610           // Try and get regioninfo from the hosting server.
1611           HConnection hconn = getConnectionByTableName(tableName);
1612           if(reloadLocation) {
1613             loc = hconn.getRegionLocation(tableName, row, true);
1614           }
1615           BlockingInterface remoteSvr = hconn.getAdmin(loc.getServerName());
1616           HRegionInfo region = loc.getRegionInfo();
1617           try {
1618             GetRegionInfoRequest request =
1619                 RequestConverter.buildGetRegionInfoRequest(region.getRegionName());
1620             GetRegionInfoResponse response = remoteSvr.getRegionInfo(null, request);
1621             if (HRegionInfo.convert(response.getRegionInfo()) != null) {
1622               isRecovering.set((response.hasIsRecovering()) ? response.getIsRecovering() : true);
1623               return loc;
1624             }
1625           } catch (ServiceException se) {
1626             throw ProtobufUtil.getRemoteException(se);
1627           }
1628         } catch (IOException e) {
1629           cause = e.getCause();
1630           if(!(cause instanceof RegionOpeningException)) {
1631             reloadLocation = true;
1632           }
1633         }
1634         long expectedSleep = ConnectionUtils.getPauseTime(pause, tries);
1635         try {
1636           Thread.sleep(expectedSleep);
1637         } catch (InterruptedException e) {
1638           throw new IOException("Interrupted when waiting region " +
1639               loc.getRegionInfo().getEncodedName() + " online.", e);
1640         }
1641         tries++;
1642       }
1643 
1644       throw new IOException("Timeout when waiting region " + loc.getRegionInfo().getEncodedName() +
1645         " online for " + timeout + " milliseconds.", cause);
1646     }
1647 
1648     @Override
1649     protected boolean flush() throws IOException {
1650       String curLoc = null;
1651       int curSize = 0;
1652       List<Pair<HRegionLocation, HLog.Entry>> curQueue = null;
1653       synchronized (this.serverToBufferQueueMap) {
1654         for (String locationKey : this.serverToBufferQueueMap.keySet()) {
1655           curQueue = this.serverToBufferQueueMap.get(locationKey);
1656           if (!curQueue.isEmpty()) {
1657             curSize = curQueue.size();
1658             curLoc = locationKey;
1659             break;
1660           }
1661         }
1662         if (curSize > 0) {
1663           this.serverToBufferQueueMap.remove(curLoc);
1664         }
1665       }
1666 
1667       if (curSize > 0) {
1668         this.processWorkItems(curLoc, curQueue);
1669         dataAvailable.notifyAll();
1670         return true;
1671       }
1672       return false;
1673     }
1674 
1675     void addWriterError(Throwable t) {
1676       thrown.add(t);
1677     }
1678 
1679     @Override
1680     List<Path> finishWritingAndClose() throws IOException {
1681       try {
1682         if (!finishWriting()) {
1683           return null;
1684         }
1685         if (hasEditsInDisablingOrDisabledTables) {
1686           splits = logRecoveredEditsOutputSink.finishWritingAndClose();
1687         } else {
1688           splits = new ArrayList<Path>();
1689         }
1690         // returns an empty array in order to keep interface same as old way
1691         return splits;
1692       } finally {
1693         List<IOException> thrown = closeRegionServerWriters();
1694         if (thrown != null && !thrown.isEmpty()) {
1695           throw MultipleIOException.createIOException(thrown);
1696         }
1697       }
1698     }
1699 
1700     @Override
1701     int getNumOpenWriters() {
1702       return this.writers.size() + this.logRecoveredEditsOutputSink.getNumOpenWriters();
1703     }
1704 
1705     private List<IOException> closeRegionServerWriters() throws IOException {
1706       List<IOException> result = null;
1707       if (!writersClosed) {
1708         result = Lists.newArrayList();
1709         try {
1710           for (WriterThread t : writerThreads) {
1711             while (t.isAlive()) {
1712               t.shouldStop = true;
1713               t.interrupt();
1714               try {
1715                 t.join(10);
1716               } catch (InterruptedException e) {
1717                 IOException iie = new InterruptedIOException();
1718                 iie.initCause(e);
1719                 throw iie;
1720               }
1721             }
1722           }
1723         } finally {
1724           synchronized (writers) {
1725             for (String locationKey : writers.keySet()) {
1726               RegionServerWriter tmpW = writers.get(locationKey);
1727               try {
1728                 tmpW.close();
1729               } catch (IOException ioe) {
1730                 LOG.error("Couldn't close writer for region server:" + locationKey, ioe);
1731                 result.add(ioe);
1732               }
1733             }
1734           }
1735 
1736           // close connections
1737           synchronized (this.tableNameToHConnectionMap) {
1738             for (TableName tableName : this.tableNameToHConnectionMap.keySet()) {
1739               HConnection hconn = this.tableNameToHConnectionMap.get(tableName);
1740               try {
1741                 hconn.clearRegionCache();
1742                 hconn.close();
1743               } catch (IOException ioe) {
1744                 result.add(ioe);
1745               }
1746             }
1747           }
1748           writersClosed = true;
1749         }
1750       }
1751       return result;
1752     }
1753 
1754     Map<byte[], Long> getOutputCounts() {
1755       TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
1756       synchronized (writers) {
1757         for (Map.Entry<String, RegionServerWriter> entry : writers.entrySet()) {
1758           ret.put(Bytes.toBytes(entry.getKey()), entry.getValue().editsWritten);
1759         }
1760       }
1761       return ret;
1762     }
1763 
1764     @Override
1765     int getNumberOfRecoveredRegions() {
1766       return this.recoveredRegions.size();
1767     }
1768 
1769     /**
1770      * Get a writer and path for a log starting at the given entry. This function is threadsafe so
1771      * long as multiple threads are always acting on different regions.
1772      * @return null if this region shouldn't output any logs
1773      */
1774     private RegionServerWriter getRegionServerWriter(String loc) throws IOException {
1775       RegionServerWriter ret = writers.get(loc);
1776       if (ret != null) {
1777         return ret;
1778       }
1779 
1780       TableName tableName = getTableFromLocationStr(loc);
1781       if(tableName == null){
1782         throw new IOException("Invalid location string:" + loc + " found. Replay aborted.");
1783       }
1784 
1785       HConnection hconn = getConnectionByTableName(tableName);
1786       synchronized (writers) {
1787         ret = writers.get(loc);
1788         if (ret == null) {
1789           ret = new RegionServerWriter(conf, tableName, hconn);
1790           writers.put(loc, ret);
1791         }
1792       }
1793       return ret;
1794     }
1795 
1796     private HConnection getConnectionByTableName(final TableName tableName) throws IOException {
1797       HConnection hconn = this.tableNameToHConnectionMap.get(tableName);
1798       if (hconn == null) {
1799         synchronized (this.tableNameToHConnectionMap) {
1800           hconn = this.tableNameToHConnectionMap.get(tableName);
1801           if (hconn == null) {
1802             hconn = HConnectionManager.getConnection(conf);
1803             this.tableNameToHConnectionMap.put(tableName, hconn);
1804           }
1805         }
1806       }
1807       return hconn;
1808     }
1809     private TableName getTableFromLocationStr(String loc) {
1810       /**
1811        * location key is in format <server name:port>#<table name>
1812        */
1813       String[] splits = loc.split(KEY_DELIMITER);
1814       if (splits.length != 2) {
1815         return null;
1816       }
1817       return TableName.valueOf(splits[1]);
1818     }
1819   }
1820 
1821   /**
1822    * Private data structure that wraps a receiving RS and collecting statistics about the data
1823    * written to this newly assigned RS.
1824    */
1825   private final static class RegionServerWriter extends SinkWriter {
1826     final WALEditsReplaySink sink;
1827 
1828     RegionServerWriter(final Configuration conf, final TableName tableName, final HConnection conn)
1829         throws IOException {
1830       this.sink = new WALEditsReplaySink(conf, tableName, conn);
1831     }
1832 
1833     void close() throws IOException {
1834     }
1835   }
1836 
1837   static class CorruptedLogFileException extends Exception {
1838     private static final long serialVersionUID = 1L;
1839 
1840     CorruptedLogFileException(String s) {
1841       super(s);
1842     }
1843   }
1844 
1845   /** A struct used by getMutationsFromWALEntry */
1846   public static class MutationReplay {
1847     public MutationReplay(MutationType type, Mutation mutation, long nonceGroup, long nonce) {
1848       this.type = type;
1849       this.mutation = mutation;
1850       this.nonceGroup = nonceGroup;
1851       this.nonce = nonce;
1852     }
1853 
1854     public final MutationType type;
1855     public final Mutation mutation;
1856     public final long nonceGroup;
1857     public final long nonce;
1858   }
1859 
1860  /**
1861   * Tag original sequence number for each edit to be replayed
1862   * @param entry
1863   * @param cell
1864   */
1865   private static Cell tagReplayLogSequenceNumber(WALEntry entry, Cell cell) {
1866     // Tag puts with original sequence number if there is no LOG_REPLAY_TAG yet
1867     boolean needAddRecoveryTag = true;
1868     if (cell.getTagsLength() > 0) {
1869       Tag tmpTag = Tag.getTag(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength(),
1870         TagType.LOG_REPLAY_TAG_TYPE);
1871       if (tmpTag != null) {
1872         // found an existing log replay tag so reuse it
1873         needAddRecoveryTag = false;
1874       }
1875     }
1876     if (needAddRecoveryTag) {
1877       List<Tag> newTags = new ArrayList<Tag>();
1878       Tag replayTag = new Tag(TagType.LOG_REPLAY_TAG_TYPE, Bytes.toBytes(entry.getKey()
1879           .getLogSequenceNumber()));
1880       newTags.add(replayTag);
1881       return KeyValue.cloneAndAddTags(cell, newTags);
1882     }
1883     return cell;
1884   }
1885 
1886   /**
1887    * This function is used to construct mutations from a WALEntry. It also reconstructs HLogKey &
1888    * WALEdit from the passed in WALEntry
1889    * @param entry
1890    * @param cells
1891    * @param logEntry pair of HLogKey and WALEdit instance stores HLogKey and WALEdit instances
1892    *          extracted from the passed in WALEntry.
1893    * @param addLogReplayTag
1894    * @return list of Pair<MutationType, Mutation> to be replayed
1895    * @throws IOException
1896    */
1897   public static List<MutationReplay> getMutationsFromWALEntry(WALEntry entry, CellScanner cells,
1898       Pair<HLogKey, WALEdit> logEntry, boolean addLogReplayTag) throws IOException {
1899 
1900     if (entry == null) {
1901       // return an empty array
1902       return new ArrayList<MutationReplay>();
1903     }
1904 
1905     int count = entry.getAssociatedCellCount();
1906     List<MutationReplay> mutations = new ArrayList<MutationReplay>();
1907     Cell previousCell = null;
1908     Mutation m = null;
1909     HLogKey key = null;
1910     WALEdit val = null;
1911     if (logEntry != null) val = new WALEdit();
1912 
1913     for (int i = 0; i < count; i++) {
1914       // Throw index out of bounds if our cell count is off
1915       if (!cells.advance()) {
1916         throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
1917       }
1918       Cell cell = cells.current();
1919       if (val != null) val.add(KeyValueUtil.ensureKeyValue(cell));
1920 
1921       boolean isNewRowOrType =
1922           previousCell == null || previousCell.getTypeByte() != cell.getTypeByte()
1923               || !CellUtil.matchingRow(previousCell, cell);
1924       if (isNewRowOrType) {
1925         // Create new mutation
1926         if (CellUtil.isDelete(cell)) {
1927           m = new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
1928           // Deletes don't have nonces.
1929           mutations.add(new MutationReplay(
1930               MutationType.DELETE, m, HConstants.NO_NONCE, HConstants.NO_NONCE));
1931         } else {
1932           m = new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
1933           // Puts might come from increment or append, thus we need nonces.
1934           long nonceGroup = entry.getKey().hasNonceGroup()
1935               ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
1936           long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
1937           mutations.add(new MutationReplay(MutationType.PUT, m, nonceGroup, nonce));
1938         }
1939       }
1940       if (CellUtil.isDelete(cell)) {
1941         ((Delete) m).addDeleteMarker(KeyValueUtil.ensureKeyValue(cell));
1942       } else {
1943         Cell tmpNewCell = cell;
1944         if (addLogReplayTag) {
1945           tmpNewCell = tagReplayLogSequenceNumber(entry, cell);
1946         }
1947         ((Put) m).add(KeyValueUtil.ensureKeyValue(tmpNewCell));
1948       }
1949       previousCell = cell;
1950     }
1951 
1952     // reconstruct HLogKey
1953     if (logEntry != null) {
1954       WALKey walKey = entry.getKey();
1955       List<UUID> clusterIds = new ArrayList<UUID>(walKey.getClusterIdsCount());
1956       for (HBaseProtos.UUID uuid : entry.getKey().getClusterIdsList()) {
1957         clusterIds.add(new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits()));
1958       }
1959       key = new HLogKey(walKey.getEncodedRegionName().toByteArray(), TableName.valueOf(walKey
1960               .getTableName().toByteArray()), walKey.getLogSequenceNumber(), walKey.getWriteTime(),
1961               clusterIds, walKey.getNonceGroup(), walKey.getNonce());
1962       logEntry.setFirst(key);
1963       logEntry.setSecond(val);
1964     }
1965 
1966     return mutations;
1967   }
1968 
1969   /**
1970    * Returns if distributed log replay is turned on or not
1971    * @param conf
1972    * @return true when distributed log replay is turned on
1973    */
1974   public static boolean isDistributedLogReplay(Configuration conf) {
1975     return conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY,
1976       HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG);
1977   }
1978 }