View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver.wal;
20  
21  import java.io.EOFException;
22  import java.io.FileNotFoundException;
23  import java.io.IOException;
24  import java.io.InterruptedIOException;
25  import java.text.ParseException;
26  import java.util.ArrayList;
27  import java.util.Collections;
28  import java.util.HashSet;
29  import java.util.LinkedList;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.Set;
33  import java.util.TreeMap;
34  import java.util.TreeSet;
35  import java.util.UUID;
36  import java.util.concurrent.Callable;
37  import java.util.concurrent.CompletionService;
38  import java.util.concurrent.ConcurrentHashMap;
39  import java.util.concurrent.ExecutionException;
40  import java.util.concurrent.ExecutorCompletionService;
41  import java.util.concurrent.Future;
42  import java.util.concurrent.ThreadFactory;
43  import java.util.concurrent.ThreadPoolExecutor;
44  import java.util.concurrent.TimeUnit;
45  import java.util.concurrent.atomic.AtomicBoolean;
46  import java.util.concurrent.atomic.AtomicLong;
47  import java.util.concurrent.atomic.AtomicReference;
48  
49  import org.apache.commons.logging.Log;
50  import org.apache.commons.logging.LogFactory;
51  import org.apache.hadoop.classification.InterfaceAudience;
52  import org.apache.hadoop.conf.Configuration;
53  import org.apache.hadoop.fs.FileStatus;
54  import org.apache.hadoop.fs.FileSystem;
55  import org.apache.hadoop.fs.Path;
56  import org.apache.hadoop.hbase.Cell;
57  import org.apache.hadoop.hbase.CellScanner;
58  import org.apache.hadoop.hbase.CellUtil;
59  import org.apache.hadoop.hbase.CoordinatedStateManager;
60  import org.apache.hadoop.hbase.CoordinatedStateException;
61  import org.apache.hadoop.hbase.HBaseConfiguration;
62  import org.apache.hadoop.hbase.HConstants;
63  import org.apache.hadoop.hbase.HRegionInfo;
64  import org.apache.hadoop.hbase.HRegionLocation;
65  import org.apache.hadoop.hbase.KeyValue;
66  import org.apache.hadoop.hbase.KeyValueUtil;
67  import org.apache.hadoop.hbase.RemoteExceptionHandler;
68  import org.apache.hadoop.hbase.ServerName;
69  import org.apache.hadoop.hbase.TableName;
70  import org.apache.hadoop.hbase.TableNotFoundException;
71  import org.apache.hadoop.hbase.TableStateManager;
72  import org.apache.hadoop.hbase.Tag;
73  import org.apache.hadoop.hbase.TagType;
74  import org.apache.hadoop.hbase.client.ConnectionUtils;
75  import org.apache.hadoop.hbase.client.Delete;
76  import org.apache.hadoop.hbase.client.Durability;
77  import org.apache.hadoop.hbase.client.HConnection;
78  import org.apache.hadoop.hbase.client.HConnectionManager;
79  import org.apache.hadoop.hbase.client.Mutation;
80  import org.apache.hadoop.hbase.client.Put;
81  import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
82  import org.apache.hadoop.hbase.io.HeapSize;
83  import org.apache.hadoop.hbase.master.SplitLogManager;
84  import org.apache.hadoop.hbase.monitoring.MonitoredTask;
85  import org.apache.hadoop.hbase.monitoring.TaskMonitor;
86  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
87  import org.apache.hadoop.hbase.protobuf.RequestConverter;
88  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
89  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest;
90  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse;
91  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
92  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
93  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
94  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
95  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey;
96  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
97  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds;
98  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
99  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.StoreSequenceId;
100 import org.apache.hadoop.hbase.regionserver.HRegion;
101 import org.apache.hadoop.hbase.regionserver.LastSequenceId;
102 import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
103 import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
104 import org.apache.hadoop.hbase.regionserver.wal.HLog.Writer;
105 import org.apache.hadoop.hbase.util.Bytes;
106 import org.apache.hadoop.hbase.util.CancelableProgressable;
107 import org.apache.hadoop.hbase.util.ClassSize;
108 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
109 import org.apache.hadoop.hbase.util.FSUtils;
110 import org.apache.hadoop.hbase.util.Pair;
111 import org.apache.hadoop.hbase.util.Threads;
112 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
113 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
114 import org.apache.hadoop.io.MultipleIOException;
115 
116 import com.google.common.base.Preconditions;
117 import com.google.common.collect.Lists;
118 import com.google.protobuf.ServiceException;
119 
120 /**
121  * This class is responsible for splitting up a bunch of regionserver commit log
122  * files that are no longer being written to, into new files, one per region for
123  * region to replay on startup. Delete the old log files when finished.
124  */
125 @InterfaceAudience.Private
126 public class HLogSplitter {
127   static final Log LOG = LogFactory.getLog(HLogSplitter.class);
128 
129   // Parameters for split process
130   protected final Path rootDir;
131   protected final FileSystem fs;
132   protected final Configuration conf;
133 
134   // Major subcomponents of the split process.
135   // These are separated into inner classes to make testing easier.
136   OutputSink outputSink;
137   EntryBuffers entryBuffers;
138 
139   private Set<TableName> disablingOrDisabledTables =
140       new HashSet<TableName>();
141   private ZooKeeperWatcher watcher;
142   private CoordinatedStateManager csm;
143 
144   // If an exception is thrown by one of the other threads, it will be
145   // stored here.
146   protected AtomicReference<Throwable> thrown = new AtomicReference<Throwable>();
147 
148   // Wait/notify for when data has been produced by the reader thread,
149   // consumed by the reader thread, or an exception occurred
150   final Object dataAvailable = new Object();
151 
152   private MonitoredTask status;
153 
154   // For checking the latest flushed sequence id
155   protected final LastSequenceId sequenceIdChecker;
156 
157   protected boolean distributedLogReplay;
158 
159   // Map encodedRegionName -> lastFlushedSequenceId
160   protected Map<String, Long> lastFlushedSequenceIds = new ConcurrentHashMap<String, Long>();
161 
162   // Map encodedRegionName -> maxSeqIdInStores
163   protected Map<String, Map<byte[], Long>> regionMaxSeqIdInStores =
164       new ConcurrentHashMap<String, Map<byte[], Long>>();
165 
166   // Failed region server that the wal file being split belongs to
167   protected String failedServerName = "";
168 
169   // Number of writer threads
170   private final int numWriterThreads;
171 
172   // Min batch size when replay WAL edits
173   private final int minBatchSize;
174 
175   HLogSplitter(Configuration conf, Path rootDir,
176       FileSystem fs, LastSequenceId idChecker, ZooKeeperWatcher zkw,
177       CoordinatedStateManager csm, RecoveryMode mode) {
178     this.conf = HBaseConfiguration.create(conf);
179     String codecClassName = conf
180         .get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
181     this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName);
182     this.rootDir = rootDir;
183     this.fs = fs;
184     this.sequenceIdChecker = idChecker;
185     this.watcher = zkw;
186     this.csm = csm;
187 
188     entryBuffers = new EntryBuffers(
189         this.conf.getInt("hbase.regionserver.hlog.splitlog.buffersize",
190             128*1024*1024));
191 
192     // a larger minBatchSize may slow down recovery because replay writer has to wait for
193     // enough edits before replaying them
194     this.minBatchSize = this.conf.getInt("hbase.regionserver.wal.logreplay.batch.size", 64);
195     this.distributedLogReplay = (RecoveryMode.LOG_REPLAY == mode);
196 
197     this.numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
198     if (zkw != null && csm != null && this.distributedLogReplay) {
199       outputSink = new LogReplayOutputSink(numWriterThreads);
200     } else {
201       if (this.distributedLogReplay) {
202         LOG.info("ZooKeeperWatcher is passed in as NULL so disable distrubitedLogRepaly.");
203       }
204       this.distributedLogReplay = false;
205       outputSink = new LogRecoveredEditsOutputSink(numWriterThreads);
206     }
207 
208   }
209 
210   /**
211    * Splits a HLog file into region's recovered-edits directory.
212    * This is the main entry point for distributed log splitting from SplitLogWorker.
213    * <p>
214    * If the log file has N regions then N recovered.edits files will be produced.
215    * <p>
216    * @param rootDir
217    * @param logfile
218    * @param fs
219    * @param conf
220    * @param reporter
221    * @param idChecker
222    * @param zkw ZooKeeperWatcher if it's null, we will back to the old-style log splitting where we
223    *          dump out recoved.edits files for regions to replay on.
224    * @return false if it is interrupted by the progress-able.
225    * @throws IOException
226    */
227   public static boolean splitLogFile(Path rootDir, FileStatus logfile, FileSystem fs,
228       Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker,
229       ZooKeeperWatcher zkw, CoordinatedStateManager cp, RecoveryMode mode) throws IOException {
230     HLogSplitter s = new HLogSplitter(conf, rootDir, fs, idChecker, zkw, cp, mode);
231     return s.splitLogFile(logfile, reporter);
232   }
233 
234   // A wrapper to split one log folder using the method used by distributed
235   // log splitting. Used by tools and unit tests. It should be package private.
236   // It is public only because TestWALObserver is in a different package,
237   // which uses this method to to log splitting.
238   public static List<Path> split(Path rootDir, Path logDir, Path oldLogDir,
239       FileSystem fs, Configuration conf) throws IOException {
240     FileStatus[] logfiles = fs.listStatus(logDir);
241     List<Path> splits = new ArrayList<Path>();
242     if (logfiles != null && logfiles.length > 0) {
243       for (FileStatus logfile: logfiles) {
244         HLogSplitter s = new HLogSplitter(conf, rootDir, fs, null, null, null,
245           RecoveryMode.LOG_SPLITTING);
246         if (s.splitLogFile(logfile, null)) {
247           finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf);
248           if (s.outputSink.splits != null) {
249             splits.addAll(s.outputSink.splits);
250           }
251         }
252       }
253     }
254     if (!fs.delete(logDir, true)) {
255       throw new IOException("Unable to delete src dir: " + logDir);
256     }
257     return splits;
258   }
259 
260   // The real log splitter. It just splits one log file.
261   boolean splitLogFile(FileStatus logfile,
262       CancelableProgressable reporter) throws IOException {
263     boolean isCorrupted = false;
264     Preconditions.checkState(status == null);
265     boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors",
266       HLog.SPLIT_SKIP_ERRORS_DEFAULT);
267     int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024);
268     Path logPath = logfile.getPath();
269     boolean outputSinkStarted = false;
270     boolean progress_failed = false;
271     int editsCount = 0;
272     int editsSkipped = 0;
273 
274     status =
275         TaskMonitor.get().createStatus(
276           "Splitting log file " + logfile.getPath() + "into a temporary staging area.");
277     try {
278       long logLength = logfile.getLen();
279       LOG.info("Splitting hlog: " + logPath + ", length=" + logLength);
280       LOG.info("DistributedLogReplay = " + this.distributedLogReplay);
281       status.setStatus("Opening log file");
282       if (reporter != null && !reporter.progress()) {
283         progress_failed = true;
284         return false;
285       }
286       Reader in = null;
287       try {
288         in = getReader(fs, logfile, conf, skipErrors, reporter);
289       } catch (CorruptedLogFileException e) {
290         LOG.warn("Could not get reader, corrupted log file " + logPath, e);
291         ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs);
292         isCorrupted = true;
293       }
294       if (in == null) {
295         LOG.warn("Nothing to split in log file " + logPath);
296         return true;
297       }
298       if(watcher != null && csm != null) {
299         try {
300           TableStateManager tsm = csm.getTableStateManager();
301           disablingOrDisabledTables = tsm.getTablesInStates(
302             ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING);
303         } catch (CoordinatedStateException e) {
304           throw new IOException("Can't get disabling/disabled tables", e);
305         }
306       }
307       int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3);
308       int numOpenedFilesLastCheck = 0;
309       outputSink.setReporter(reporter);
310       outputSink.startWriterThreads();
311       outputSinkStarted = true;
312       Entry entry;
313       Long lastFlushedSequenceId = -1L;
314       ServerName serverName = HLogUtil.getServerNameFromHLogDirectoryName(logPath);
315       failedServerName = (serverName == null) ? "" : serverName.getServerName();
316       while ((entry = getNextLogLine(in, logPath, skipErrors)) != null) {
317         byte[] region = entry.getKey().getEncodedRegionName();
318         String key = Bytes.toString(region);
319         lastFlushedSequenceId = lastFlushedSequenceIds.get(key);
320         if (lastFlushedSequenceId == null) {
321           if (this.distributedLogReplay) {
322             RegionStoreSequenceIds ids =
323                 SplitLogManager.getRegionFlushedSequenceId(this.watcher, failedServerName, key);
324             if (ids != null) {
325               lastFlushedSequenceId = ids.getLastFlushedSequenceId();
326             }
327           } else if (sequenceIdChecker != null) {
328             lastFlushedSequenceId = sequenceIdChecker.getLastSequenceId(region);
329           }
330           if (lastFlushedSequenceId == null) {
331             lastFlushedSequenceId = -1L;
332           }
333           lastFlushedSequenceIds.put(key, lastFlushedSequenceId);
334         }
335         if (lastFlushedSequenceId >= entry.getKey().getLogSeqNum()) {
336           editsSkipped++;
337           continue;
338         }
339         entryBuffers.appendEntry(entry);
340         editsCount++;
341         int moreWritersFromLastCheck = this.getNumOpenWriters() - numOpenedFilesLastCheck;
342         // If sufficient edits have passed, check if we should report progress.
343         if (editsCount % interval == 0
344             || moreWritersFromLastCheck > numOpenedFilesBeforeReporting) {
345           numOpenedFilesLastCheck = this.getNumOpenWriters();
346           String countsStr = (editsCount - (editsSkipped + outputSink.getSkippedEdits()))
347               + " edits, skipped " + editsSkipped + " edits.";
348           status.setStatus("Split " + countsStr);
349           if (reporter != null && !reporter.progress()) {
350             progress_failed = true;
351             return false;
352           }
353         }
354       }
355     } catch (InterruptedException ie) {
356       IOException iie = new InterruptedIOException();
357       iie.initCause(ie);
358       throw iie;
359     } catch (CorruptedLogFileException e) {
360       LOG.warn("Could not parse, corrupted log file " + logPath, e);
361       ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs);
362       isCorrupted = true;
363     } catch (IOException e) {
364       e = RemoteExceptionHandler.checkIOException(e);
365       throw e;
366     } finally {
367       LOG.debug("Finishing writing output logs and closing down.");
368       try {
369         if (outputSinkStarted) {
370           // Set progress_failed to true as the immediate following statement will reset its value
371           // when finishWritingAndClose() throws exception, progress_failed has the right value
372           progress_failed = true;
373           progress_failed = outputSink.finishWritingAndClose() == null;
374         }
375       } finally {
376         String msg =
377             "Processed " + editsCount + " edits across " + outputSink.getNumberOfRecoveredRegions()
378                 + " regions; log file=" + logPath + " is corrupted = " + isCorrupted
379                 + " progress failed = " + progress_failed;
380         LOG.info(msg);
381         status.markComplete(msg);
382       }
383     }
384     return !progress_failed;
385   }
386 
387   /**
388    * Completes the work done by splitLogFile by archiving logs
389    * <p>
390    * It is invoked by SplitLogManager once it knows that one of the
391    * SplitLogWorkers have completed the splitLogFile() part. If the master
392    * crashes then this function might get called multiple times.
393    * <p>
394    * @param logfile
395    * @param conf
396    * @throws IOException
397    */
398   public static void finishSplitLogFile(String logfile,
399       Configuration conf)  throws IOException {
400     Path rootdir = FSUtils.getRootDir(conf);
401     Path oldLogDir = new Path(rootdir, HConstants.HREGION_OLDLOGDIR_NAME);
402     Path logPath;
403     if (FSUtils.isStartingWithPath(rootdir, logfile)) {
404       logPath = new Path(logfile);
405     } else {
406       logPath = new Path(rootdir, logfile);
407     }
408     finishSplitLogFile(rootdir, oldLogDir, logPath, conf);
409   }
410 
411   static void finishSplitLogFile(Path rootdir, Path oldLogDir,
412       Path logPath, Configuration conf) throws IOException {
413     List<Path> processedLogs = new ArrayList<Path>();
414     List<Path> corruptedLogs = new ArrayList<Path>();
415     FileSystem fs;
416     fs = rootdir.getFileSystem(conf);
417     if (ZKSplitLog.isCorrupted(rootdir, logPath.getName(), fs)) {
418       corruptedLogs.add(logPath);
419     } else {
420       processedLogs.add(logPath);
421     }
422     archiveLogs(corruptedLogs, processedLogs, oldLogDir, fs, conf);
423     Path stagingDir = ZKSplitLog.getSplitLogDir(rootdir, logPath.getName());
424     fs.delete(stagingDir, true);
425   }
426 
427   /**
428    * Moves processed logs to a oldLogDir after successful processing Moves
429    * corrupted logs (any log that couldn't be successfully parsed to corruptDir
430    * (.corrupt) for later investigation
431    *
432    * @param corruptedLogs
433    * @param processedLogs
434    * @param oldLogDir
435    * @param fs
436    * @param conf
437    * @throws IOException
438    */
439   private static void archiveLogs(
440       final List<Path> corruptedLogs,
441       final List<Path> processedLogs, final Path oldLogDir,
442       final FileSystem fs, final Configuration conf) throws IOException {
443     final Path corruptDir = new Path(FSUtils.getRootDir(conf), conf.get(
444         "hbase.regionserver.hlog.splitlog.corrupt.dir",  HConstants.CORRUPT_DIR_NAME));
445 
446     if (!fs.mkdirs(corruptDir)) {
447       LOG.info("Unable to mkdir " + corruptDir);
448     }
449     fs.mkdirs(oldLogDir);
450 
451     // this method can get restarted or called multiple times for archiving
452     // the same log files.
453     for (Path corrupted : corruptedLogs) {
454       Path p = new Path(corruptDir, corrupted.getName());
455       if (fs.exists(corrupted)) {
456         if (!fs.rename(corrupted, p)) {
457           LOG.warn("Unable to move corrupted log " + corrupted + " to " + p);
458         } else {
459           LOG.warn("Moved corrupted log " + corrupted + " to " + p);
460         }
461       }
462     }
463 
464     for (Path p : processedLogs) {
465       Path newPath = FSHLog.getHLogArchivePath(oldLogDir, p);
466       if (fs.exists(p)) {
467         if (!FSUtils.renameAndSetModifyTime(fs, p, newPath)) {
468           LOG.warn("Unable to move  " + p + " to " + newPath);
469         } else {
470           LOG.info("Archived processed log " + p + " to " + newPath);
471         }
472       }
473     }
474   }
475 
476   /**
477    * Path to a file under RECOVERED_EDITS_DIR directory of the region found in
478    * <code>logEntry</code> named for the sequenceid in the passed
479    * <code>logEntry</code>: e.g. /hbase/some_table/2323432434/recovered.edits/2332.
480    * This method also ensures existence of RECOVERED_EDITS_DIR under the region
481    * creating it if necessary.
482    * @param fs
483    * @param logEntry
484    * @param rootDir HBase root dir.
485    * @return Path to file into which to dump split log edits.
486    * @throws IOException
487    */
488   @SuppressWarnings("deprecation")
489   static Path getRegionSplitEditsPath(final FileSystem fs,
490       final Entry logEntry, final Path rootDir, boolean isCreate)
491   throws IOException {
492     Path tableDir = FSUtils.getTableDir(rootDir, logEntry.getKey().getTablename());
493     String encodedRegionName = Bytes.toString(logEntry.getKey().getEncodedRegionName());
494     Path regiondir = HRegion.getRegionDir(tableDir, encodedRegionName);
495     Path dir = HLogUtil.getRegionDirRecoveredEditsDir(regiondir);
496 
497     if (!fs.exists(regiondir)) {
498       LOG.info("This region's directory doesn't exist: "
499           + regiondir.toString() + ". It is very likely that it was" +
500           " already split so it's safe to discard those edits.");
501       return null;
502     }
503     if (fs.exists(dir) && fs.isFile(dir)) {
504       Path tmp = new Path("/tmp");
505       if (!fs.exists(tmp)) {
506         fs.mkdirs(tmp);
507       }
508       tmp = new Path(tmp,
509         HConstants.RECOVERED_EDITS_DIR + "_" + encodedRegionName);
510       LOG.warn("Found existing old file: " + dir + ". It could be some "
511         + "leftover of an old installation. It should be a folder instead. "
512         + "So moving it to " + tmp);
513       if (!fs.rename(dir, tmp)) {
514         LOG.warn("Failed to sideline old file " + dir);
515       }
516     }
517 
518     if (isCreate && !fs.exists(dir)) {
519       if (!fs.mkdirs(dir)) LOG.warn("mkdir failed on " + dir);
520     }
521     // Append file name ends with RECOVERED_LOG_TMPFILE_SUFFIX to ensure
522     // region's replayRecoveredEdits will not delete it
523     String fileName = formatRecoveredEditsFileName(logEntry.getKey().getLogSeqNum());
524     fileName = getTmpRecoveredEditsFileName(fileName);
525     return new Path(dir, fileName);
526   }
527 
528   static String getTmpRecoveredEditsFileName(String fileName) {
529     return fileName + HLog.RECOVERED_LOG_TMPFILE_SUFFIX;
530   }
531 
532   /**
533    * Get the completed recovered edits file path, renaming it to be by last edit
534    * in the file from its first edit. Then we could use the name to skip
535    * recovered edits when doing {@link HRegion#replayRecoveredEditsIfAny}.
536    * @param srcPath
537    * @param maximumEditLogSeqNum
538    * @return dstPath take file's last edit log seq num as the name
539    */
540   static Path getCompletedRecoveredEditsFilePath(Path srcPath,
541       Long maximumEditLogSeqNum) {
542     String fileName = formatRecoveredEditsFileName(maximumEditLogSeqNum);
543     return new Path(srcPath.getParent(), fileName);
544   }
545 
546   static String formatRecoveredEditsFileName(final long seqid) {
547     return String.format("%019d", seqid);
548   }
549 
550   /**
551    * Create a new {@link Reader} for reading logs to split.
552    *
553    * @param fs
554    * @param file
555    * @param conf
556    * @return A new Reader instance
557    * @throws IOException
558    * @throws CorruptedLogFileException
559    */
560   protected Reader getReader(FileSystem fs, FileStatus file, Configuration conf,
561       boolean skipErrors, CancelableProgressable reporter)
562       throws IOException, CorruptedLogFileException {
563     Path path = file.getPath();
564     long length = file.getLen();
565     Reader in;
566 
567     // Check for possibly empty file. With appends, currently Hadoop reports a
568     // zero length even if the file has been sync'd. Revisit if HDFS-376 or
569     // HDFS-878 is committed.
570     if (length <= 0) {
571       LOG.warn("File " + path + " might be still open, length is 0");
572     }
573 
574     try {
575       FSUtils.getInstance(fs, conf).recoverFileLease(fs, path, conf, reporter);
576       try {
577         in = getReader(fs, path, conf, reporter);
578       } catch (EOFException e) {
579         if (length <= 0) {
580           // TODO should we ignore an empty, not-last log file if skip.errors
581           // is false? Either way, the caller should decide what to do. E.g.
582           // ignore if this is the last log in sequence.
583           // TODO is this scenario still possible if the log has been
584           // recovered (i.e. closed)
585           LOG.warn("Could not open " + path + " for reading. File is empty", e);
586           return null;
587         } else {
588           // EOFException being ignored
589           return null;
590         }
591       }
592     } catch (IOException e) {
593       if (e instanceof FileNotFoundException) {
594         // A wal file may not exist anymore. Nothing can be recovered so move on
595         LOG.warn("File " + path + " doesn't exist anymore.", e);
596         return null;
597       }
598       if (!skipErrors || e instanceof InterruptedIOException) {
599         throw e; // Don't mark the file corrupted if interrupted, or not skipErrors
600       }
601       CorruptedLogFileException t =
602         new CorruptedLogFileException("skipErrors=true Could not open hlog " +
603             path + " ignoring");
604       t.initCause(e);
605       throw t;
606     }
607     return in;
608   }
609 
610   static private Entry getNextLogLine(Reader in, Path path, boolean skipErrors)
611   throws CorruptedLogFileException, IOException {
612     try {
613       return in.next();
614     } catch (EOFException eof) {
615       // truncated files are expected if a RS crashes (see HBASE-2643)
616       LOG.info("EOF from hlog " + path + ".  continuing");
617       return null;
618     } catch (IOException e) {
619       // If the IOE resulted from bad file format,
620       // then this problem is idempotent and retrying won't help
621       if (e.getCause() != null &&
622           (e.getCause() instanceof ParseException ||
623            e.getCause() instanceof org.apache.hadoop.fs.ChecksumException)) {
624         LOG.warn("Parse exception " + e.getCause().toString() + " from hlog "
625            + path + ".  continuing");
626         return null;
627       }
628       if (!skipErrors) {
629         throw e;
630       }
631       CorruptedLogFileException t =
632         new CorruptedLogFileException("skipErrors=true Ignoring exception" +
633             " while parsing hlog " + path + ". Marking as corrupted");
634       t.initCause(e);
635       throw t;
636     }
637   }
638 
639   private void writerThreadError(Throwable t) {
640     thrown.compareAndSet(null, t);
641   }
642 
643   /**
644    * Check for errors in the writer threads. If any is found, rethrow it.
645    */
646   private void checkForErrors() throws IOException {
647     Throwable thrown = this.thrown.get();
648     if (thrown == null) return;
649     if (thrown instanceof IOException) {
650       throw new IOException(thrown);
651     } else {
652       throw new RuntimeException(thrown);
653     }
654   }
655   /**
656    * Create a new {@link Writer} for writing log splits.
657    */
658   protected Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
659       throws IOException {
660     return HLogFactory.createRecoveredEditsWriter(fs, logfile, conf);
661   }
662 
663   /**
664    * Create a new {@link Reader} for reading logs to split.
665    */
666   protected Reader getReader(FileSystem fs, Path curLogFile,
667       Configuration conf, CancelableProgressable reporter) throws IOException {
668     return HLogFactory.createReader(fs, curLogFile, conf, reporter);
669   }
670 
671   /**
672    * Get current open writers
673    */
674   private int getNumOpenWriters() {
675     int result = 0;
676     if (this.outputSink != null) {
677       result += this.outputSink.getNumOpenWriters();
678     }
679     return result;
680   }
681 
682   /**
683    * Class which accumulates edits and separates them into a buffer per region
684    * while simultaneously accounting RAM usage. Blocks if the RAM usage crosses
685    * a predefined threshold.
686    *
687    * Writer threads then pull region-specific buffers from this class.
688    */
689   class EntryBuffers {
690     Map<byte[], RegionEntryBuffer> buffers =
691       new TreeMap<byte[], RegionEntryBuffer>(Bytes.BYTES_COMPARATOR);
692 
693     /* Track which regions are currently in the middle of writing. We don't allow
694        an IO thread to pick up bytes from a region if we're already writing
695        data for that region in a different IO thread. */
696     Set<byte[]> currentlyWriting = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
697 
698     long totalBuffered = 0;
699     long maxHeapUsage;
700 
701     EntryBuffers(long maxHeapUsage) {
702       this.maxHeapUsage = maxHeapUsage;
703     }
704 
705     /**
706      * Append a log entry into the corresponding region buffer.
707      * Blocks if the total heap usage has crossed the specified threshold.
708      *
709      * @throws InterruptedException
710      * @throws IOException
711      */
712     void appendEntry(Entry entry) throws InterruptedException, IOException {
713       HLogKey key = entry.getKey();
714 
715       RegionEntryBuffer buffer;
716       long incrHeap;
717       synchronized (this) {
718         buffer = buffers.get(key.getEncodedRegionName());
719         if (buffer == null) {
720           buffer = new RegionEntryBuffer(key.getTablename(), key.getEncodedRegionName());
721           buffers.put(key.getEncodedRegionName(), buffer);
722         }
723         incrHeap= buffer.appendEntry(entry);
724       }
725 
726       // If we crossed the chunk threshold, wait for more space to be available
727       synchronized (dataAvailable) {
728         totalBuffered += incrHeap;
729         while (totalBuffered > maxHeapUsage && thrown.get() == null) {
730           LOG.debug("Used " + totalBuffered + " bytes of buffered edits, waiting for IO threads...");
731           dataAvailable.wait(2000);
732         }
733         dataAvailable.notifyAll();
734       }
735       checkForErrors();
736     }
737 
738     /**
739      * @return RegionEntryBuffer a buffer of edits to be written or replayed.
740      */
741     synchronized RegionEntryBuffer getChunkToWrite() {
742       long biggestSize = 0;
743       byte[] biggestBufferKey = null;
744 
745       for (Map.Entry<byte[], RegionEntryBuffer> entry : buffers.entrySet()) {
746         long size = entry.getValue().heapSize();
747         if (size > biggestSize && (!currentlyWriting.contains(entry.getKey()))) {
748           biggestSize = size;
749           biggestBufferKey = entry.getKey();
750         }
751       }
752       if (biggestBufferKey == null) {
753         return null;
754       }
755 
756       RegionEntryBuffer buffer = buffers.remove(biggestBufferKey);
757       currentlyWriting.add(biggestBufferKey);
758       return buffer;
759     }
760 
761     void doneWriting(RegionEntryBuffer buffer) {
762       synchronized (this) {
763         boolean removed = currentlyWriting.remove(buffer.encodedRegionName);
764         assert removed;
765       }
766       long size = buffer.heapSize();
767 
768       synchronized (dataAvailable) {
769         totalBuffered -= size;
770         // We may unblock writers
771         dataAvailable.notifyAll();
772       }
773     }
774 
775     synchronized boolean isRegionCurrentlyWriting(byte[] region) {
776       return currentlyWriting.contains(region);
777     }
778   }
779 
780   /**
781    * A buffer of some number of edits for a given region.
782    * This accumulates edits and also provides a memory optimization in order to
783    * share a single byte array instance for the table and region name.
784    * Also tracks memory usage of the accumulated edits.
785    */
786   static class RegionEntryBuffer implements HeapSize {
787     long heapInBuffer = 0;
788     List<Entry> entryBuffer;
789     TableName tableName;
790     byte[] encodedRegionName;
791 
792     RegionEntryBuffer(TableName tableName, byte[] region) {
793       this.tableName = tableName;
794       this.encodedRegionName = region;
795       this.entryBuffer = new LinkedList<Entry>();
796     }
797 
798     long appendEntry(Entry entry) {
799       internify(entry);
800       entryBuffer.add(entry);
801       long incrHeap = entry.getEdit().heapSize() +
802         ClassSize.align(2 * ClassSize.REFERENCE) + // HLogKey pointers
803         0; // TODO linkedlist entry
804       heapInBuffer += incrHeap;
805       return incrHeap;
806     }
807 
808     private void internify(Entry entry) {
809       HLogKey k = entry.getKey();
810       k.internTableName(this.tableName);
811       k.internEncodedRegionName(this.encodedRegionName);
812     }
813 
814     @Override
815     public long heapSize() {
816       return heapInBuffer;
817     }
818   }
819 
820   class WriterThread extends Thread {
821     private volatile boolean shouldStop = false;
822     private OutputSink outputSink = null;
823 
824     WriterThread(OutputSink sink, int i) {
825       super(Thread.currentThread().getName() + "-Writer-" + i);
826       outputSink = sink;
827     }
828 
829     @Override
830     public void run()  {
831       try {
832         doRun();
833       } catch (Throwable t) {
834         LOG.error("Exiting thread", t);
835         writerThreadError(t);
836       }
837     }
838 
839     private void doRun() throws IOException {
840       LOG.debug("Writer thread " + this + ": starting");
841       while (true) {
842         RegionEntryBuffer buffer = entryBuffers.getChunkToWrite();
843         if (buffer == null) {
844           // No data currently available, wait on some more to show up
845           synchronized (dataAvailable) {
846             if (shouldStop && !this.outputSink.flush()) {
847               return;
848             }
849             try {
850               dataAvailable.wait(500);
851             } catch (InterruptedException ie) {
852               if (!shouldStop) {
853                 throw new RuntimeException(ie);
854               }
855             }
856           }
857           continue;
858         }
859 
860         assert buffer != null;
861         try {
862           writeBuffer(buffer);
863         } finally {
864           entryBuffers.doneWriting(buffer);
865         }
866       }
867     }
868 
869     private void writeBuffer(RegionEntryBuffer buffer) throws IOException {
870       outputSink.append(buffer);
871     }
872 
873     void finish() {
874       synchronized (dataAvailable) {
875         shouldStop = true;
876         dataAvailable.notifyAll();
877       }
878     }
879   }
880 
881   /**
882    * The following class is an abstraction class to provide a common interface to support both
883    * existing recovered edits file sink and region server WAL edits replay sink
884    */
885    abstract class OutputSink {
886 
887     protected Map<byte[], SinkWriter> writers = Collections
888         .synchronizedMap(new TreeMap<byte[], SinkWriter>(Bytes.BYTES_COMPARATOR));;
889 
890     protected final Map<byte[], Long> regionMaximumEditLogSeqNum = Collections
891         .synchronizedMap(new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR));
892 
893     protected final List<WriterThread> writerThreads = Lists.newArrayList();
894 
895     /* Set of regions which we've decided should not output edits */
896     protected final Set<byte[]> blacklistedRegions = Collections
897         .synchronizedSet(new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR));
898 
899     protected boolean closeAndCleanCompleted = false;
900 
901     protected boolean writersClosed = false;
902 
903     protected final int numThreads;
904 
905     protected CancelableProgressable reporter = null;
906 
907     protected AtomicLong skippedEdits = new AtomicLong();
908 
909     protected List<Path> splits = null;
910 
911     public OutputSink(int numWriters) {
912       numThreads = numWriters;
913     }
914 
915     void setReporter(CancelableProgressable reporter) {
916       this.reporter = reporter;
917     }
918 
919     /**
920      * Start the threads that will pump data from the entryBuffers to the output files.
921      */
922     synchronized void startWriterThreads() {
923       for (int i = 0; i < numThreads; i++) {
924         WriterThread t = new WriterThread(this, i);
925         t.start();
926         writerThreads.add(t);
927       }
928     }
929 
930     /**
931      *
932      * Update region's maximum edit log SeqNum.
933      */
934     void updateRegionMaximumEditLogSeqNum(Entry entry) {
935       synchronized (regionMaximumEditLogSeqNum) {
936         Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(entry.getKey()
937             .getEncodedRegionName());
938         if (currentMaxSeqNum == null || entry.getKey().getLogSeqNum() > currentMaxSeqNum) {
939           regionMaximumEditLogSeqNum.put(entry.getKey().getEncodedRegionName(), entry.getKey()
940               .getLogSeqNum());
941         }
942       }
943     }
944 
945     Long getRegionMaximumEditLogSeqNum(byte[] region) {
946       return regionMaximumEditLogSeqNum.get(region);
947     }
948 
949     /**
950      * @return the number of currently opened writers
951      */
952     int getNumOpenWriters() {
953       return this.writers.size();
954     }
955 
956     long getSkippedEdits() {
957       return this.skippedEdits.get();
958     }
959 
960     /**
961      * Wait for writer threads to dump all info to the sink
962      * @return true when there is no error
963      * @throws IOException
964      */
965     protected boolean finishWriting() throws IOException {
966       LOG.debug("Waiting for split writer threads to finish");
967       boolean progress_failed = false;
968       for (WriterThread t : writerThreads) {
969         t.finish();
970       }
971       for (WriterThread t : writerThreads) {
972         if (!progress_failed && reporter != null && !reporter.progress()) {
973           progress_failed = true;
974         }
975         try {
976           t.join();
977         } catch (InterruptedException ie) {
978           IOException iie = new InterruptedIOException();
979           iie.initCause(ie);
980           throw iie;
981         }
982       }
983       checkForErrors();
984       LOG.info("Split writers finished");
985       return (!progress_failed);
986     }
987 
988     abstract List<Path> finishWritingAndClose() throws IOException;
989 
990     /**
991      * @return a map from encoded region ID to the number of edits written out for that region.
992      */
993     abstract Map<byte[], Long> getOutputCounts();
994 
995     /**
996      * @return number of regions we've recovered
997      */
998     abstract int getNumberOfRecoveredRegions();
999 
1000     /**
1001      * @param buffer A WAL Edit Entry
1002      * @throws IOException
1003      */
1004     abstract void append(RegionEntryBuffer buffer) throws IOException;
1005 
1006     /**
1007      * WriterThread call this function to help flush internal remaining edits in buffer before close
1008      * @return true when underlying sink has something to flush
1009      */
1010     protected boolean flush() throws IOException {
1011       return false;
1012     }
1013   }
1014 
1015   /**
1016    * Class that manages the output streams from the log splitting process.
1017    */
1018   class LogRecoveredEditsOutputSink extends OutputSink {
1019 
1020     public LogRecoveredEditsOutputSink(int numWriters) {
1021       // More threads could potentially write faster at the expense
1022       // of causing more disk seeks as the logs are split.
1023       // 3. After a certain setting (probably around 3) the
1024       // process will be bound on the reader in the current
1025       // implementation anyway.
1026       super(numWriters);
1027     }
1028 
1029     /**
1030      * @return null if failed to report progress
1031      * @throws IOException
1032      */
1033     @Override
1034     List<Path> finishWritingAndClose() throws IOException {
1035       boolean isSuccessful = false;
1036       List<Path> result = null;
1037       try {
1038         isSuccessful = finishWriting();
1039       } finally {
1040         result = close();
1041         List<IOException> thrown = closeLogWriters(null);
1042         if (thrown != null && !thrown.isEmpty()) {
1043           throw MultipleIOException.createIOException(thrown);
1044         }
1045       }
1046       if (isSuccessful) {
1047         splits = result;
1048       }
1049       return splits;
1050     }
1051 
1052     /**
1053      * Close all of the output streams.
1054      * @return the list of paths written.
1055      */
1056     private List<Path> close() throws IOException {
1057       Preconditions.checkState(!closeAndCleanCompleted);
1058 
1059       final List<Path> paths = new ArrayList<Path>();
1060       final List<IOException> thrown = Lists.newArrayList();
1061       ThreadPoolExecutor closeThreadPool = Threads.getBoundedCachedThreadPool(numThreads, 30L,
1062         TimeUnit.SECONDS, new ThreadFactory() {
1063           private int count = 1;
1064 
1065           @Override
1066           public Thread newThread(Runnable r) {
1067             Thread t = new Thread(r, "split-log-closeStream-" + count++);
1068             return t;
1069           }
1070         });
1071       CompletionService<Void> completionService =
1072         new ExecutorCompletionService<Void>(closeThreadPool);
1073       for (final Map.Entry<byte[], SinkWriter> writersEntry : writers.entrySet()) {
1074         LOG.debug("Submitting close of " + ((WriterAndPath)writersEntry.getValue()).p);
1075         completionService.submit(new Callable<Void>() {
1076           @Override
1077           public Void call() throws Exception {
1078             WriterAndPath wap = (WriterAndPath) writersEntry.getValue();
1079             LOG.debug("Closing " + wap.p);
1080             try {
1081               wap.w.close();
1082             } catch (IOException ioe) {
1083               LOG.error("Couldn't close log at " + wap.p, ioe);
1084               thrown.add(ioe);
1085               return null;
1086             }
1087             LOG.info("Closed wap " + wap.p + " (wrote " + wap.editsWritten + " edits in "
1088                 + (wap.nanosSpent / 1000 / 1000) + "ms)");
1089 
1090             if (wap.editsWritten == 0) {
1091               // just remove the empty recovered.edits file
1092               if (fs.exists(wap.p) && !fs.delete(wap.p, false)) {
1093                 LOG.warn("Failed deleting empty " + wap.p);
1094                 throw new IOException("Failed deleting empty  " + wap.p);
1095               }
1096               return null;
1097             }
1098 
1099             Path dst = getCompletedRecoveredEditsFilePath(wap.p,
1100               regionMaximumEditLogSeqNum.get(writersEntry.getKey()));
1101             try {
1102               if (!dst.equals(wap.p) && fs.exists(dst)) {
1103                 LOG.warn("Found existing old edits file. It could be the "
1104                     + "result of a previous failed split attempt. Deleting " + dst + ", length="
1105                     + fs.getFileStatus(dst).getLen());
1106                 if (!fs.delete(dst, false)) {
1107                   LOG.warn("Failed deleting of old " + dst);
1108                   throw new IOException("Failed deleting of old " + dst);
1109                 }
1110               }
1111               // Skip the unit tests which create a splitter that reads and
1112               // writes the data without touching disk.
1113               // TestHLogSplit#testThreading is an example.
1114               if (fs.exists(wap.p)) {
1115                 if (!fs.rename(wap.p, dst)) {
1116                   throw new IOException("Failed renaming " + wap.p + " to " + dst);
1117                 }
1118                 LOG.info("Rename " + wap.p + " to " + dst);
1119               }
1120             } catch (IOException ioe) {
1121               LOG.error("Couldn't rename " + wap.p + " to " + dst, ioe);
1122               thrown.add(ioe);
1123               return null;
1124             }
1125             paths.add(dst);
1126             return null;
1127           }
1128         });
1129       }
1130 
1131       boolean progress_failed = false;
1132       try {
1133         for (int i = 0, n = this.writers.size(); i < n; i++) {
1134           Future<Void> future = completionService.take();
1135           future.get();
1136           if (!progress_failed && reporter != null && !reporter.progress()) {
1137             progress_failed = true;
1138           }
1139         }
1140       } catch (InterruptedException e) {
1141         IOException iie = new InterruptedIOException();
1142         iie.initCause(e);
1143         throw iie;
1144       } catch (ExecutionException e) {
1145         throw new IOException(e.getCause());
1146       } finally {
1147         closeThreadPool.shutdownNow();
1148       }
1149 
1150       if (!thrown.isEmpty()) {
1151         throw MultipleIOException.createIOException(thrown);
1152       }
1153       writersClosed = true;
1154       closeAndCleanCompleted = true;
1155       if (progress_failed) {
1156         return null;
1157       }
1158       return paths;
1159     }
1160 
1161     private List<IOException> closeLogWriters(List<IOException> thrown) throws IOException {
1162       if (writersClosed) {
1163         return thrown;
1164       }
1165 
1166       if (thrown == null) {
1167         thrown = Lists.newArrayList();
1168       }
1169       try {
1170         for (WriterThread t : writerThreads) {
1171           while (t.isAlive()) {
1172             t.shouldStop = true;
1173             t.interrupt();
1174             try {
1175               t.join(10);
1176             } catch (InterruptedException e) {
1177               IOException iie = new InterruptedIOException();
1178               iie.initCause(e);
1179               throw iie;
1180             }
1181           }
1182         }
1183       } finally {
1184         synchronized (writers) {
1185           WriterAndPath wap = null;
1186           for (SinkWriter tmpWAP : writers.values()) {
1187             try {
1188               wap = (WriterAndPath) tmpWAP;
1189               wap.w.close();
1190             } catch (IOException ioe) {
1191               LOG.error("Couldn't close log at " + wap.p, ioe);
1192               thrown.add(ioe);
1193               continue;
1194             }
1195             LOG.info("Closed log " + wap.p + " (wrote " + wap.editsWritten + " edits in "
1196                 + (wap.nanosSpent / 1000 / 1000) + "ms)");
1197           }
1198         }
1199         writersClosed = true;
1200       }
1201 
1202       return thrown;
1203     }
1204 
1205     /**
1206      * Get a writer and path for a log starting at the given entry. This function is threadsafe so
1207      * long as multiple threads are always acting on different regions.
1208      * @return null if this region shouldn't output any logs
1209      */
1210     private WriterAndPath getWriterAndPath(Entry entry) throws IOException {
1211       byte region[] = entry.getKey().getEncodedRegionName();
1212       WriterAndPath ret = (WriterAndPath) writers.get(region);
1213       if (ret != null) {
1214         return ret;
1215       }
1216       // If we already decided that this region doesn't get any output
1217       // we don't need to check again.
1218       if (blacklistedRegions.contains(region)) {
1219         return null;
1220       }
1221       ret = createWAP(region, entry, rootDir, fs, conf);
1222       if (ret == null) {
1223         blacklistedRegions.add(region);
1224         return null;
1225       }
1226       writers.put(region, ret);
1227       return ret;
1228     }
1229 
1230     private WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir, FileSystem fs,
1231         Configuration conf) throws IOException {
1232       Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, true);
1233       if (regionedits == null) {
1234         return null;
1235       }
1236       if (fs.exists(regionedits)) {
1237         LOG.warn("Found old edits file. It could be the "
1238             + "result of a previous failed split attempt. Deleting " + regionedits + ", length="
1239             + fs.getFileStatus(regionedits).getLen());
1240         if (!fs.delete(regionedits, false)) {
1241           LOG.warn("Failed delete of old " + regionedits);
1242         }
1243       }
1244       Writer w = createWriter(fs, regionedits, conf);
1245       LOG.info("Creating writer path=" + regionedits + " region=" + Bytes.toStringBinary(region));
1246       return (new WriterAndPath(regionedits, w));
1247     }
1248 
1249     @Override
1250     void append(RegionEntryBuffer buffer) throws IOException {
1251       List<Entry> entries = buffer.entryBuffer;
1252       if (entries.isEmpty()) {
1253         LOG.warn("got an empty buffer, skipping");
1254         return;
1255       }
1256 
1257       WriterAndPath wap = null;
1258 
1259       long startTime = System.nanoTime();
1260       try {
1261         int editsCount = 0;
1262 
1263         for (Entry logEntry : entries) {
1264           if (wap == null) {
1265             wap = getWriterAndPath(logEntry);
1266             if (wap == null) {
1267               // getWriterAndPath decided we don't need to write these edits
1268               return;
1269             }
1270           }
1271           wap.w.append(logEntry);
1272           this.updateRegionMaximumEditLogSeqNum(logEntry);
1273           editsCount++;
1274         }
1275         // Pass along summary statistics
1276         wap.incrementEdits(editsCount);
1277         wap.incrementNanoTime(System.nanoTime() - startTime);
1278       } catch (IOException e) {
1279         e = RemoteExceptionHandler.checkIOException(e);
1280         LOG.fatal(" Got while writing log entry to log", e);
1281         throw e;
1282       }
1283     }
1284 
1285     /**
1286      * @return a map from encoded region ID to the number of edits written out for that region.
1287      */
1288     @Override
1289     Map<byte[], Long> getOutputCounts() {
1290       TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
1291       synchronized (writers) {
1292         for (Map.Entry<byte[], SinkWriter> entry : writers.entrySet()) {
1293           ret.put(entry.getKey(), entry.getValue().editsWritten);
1294         }
1295       }
1296       return ret;
1297     }
1298 
1299     @Override
1300     int getNumberOfRecoveredRegions() {
1301       return writers.size();
1302     }
1303   }
1304 
1305   /**
1306    * Class wraps the actual writer which writes data out and related statistics
1307    */
1308   private abstract static class SinkWriter {
1309     /* Count of edits written to this path */
1310     long editsWritten = 0;
1311     /* Number of nanos spent writing to this log */
1312     long nanosSpent = 0;
1313 
1314     void incrementEdits(int edits) {
1315       editsWritten += edits;
1316     }
1317 
1318     void incrementNanoTime(long nanos) {
1319       nanosSpent += nanos;
1320     }
1321   }
1322 
1323   /**
1324    * Private data structure that wraps a Writer and its Path, also collecting statistics about the
1325    * data written to this output.
1326    */
1327   private final static class WriterAndPath extends SinkWriter {
1328     final Path p;
1329     final Writer w;
1330 
1331     WriterAndPath(final Path p, final Writer w) {
1332       this.p = p;
1333       this.w = w;
1334     }
1335   }
1336 
1337   /**
1338    * Class that manages to replay edits from WAL files directly to assigned fail over region servers
1339    */
1340   class LogReplayOutputSink extends OutputSink {
1341     private static final double BUFFER_THRESHOLD = 0.35;
1342     private static final String KEY_DELIMITER = "#";
1343 
1344     private long waitRegionOnlineTimeOut;
1345     private final Set<String> recoveredRegions = Collections.synchronizedSet(new HashSet<String>());
1346     private final Map<String, RegionServerWriter> writers =
1347         new ConcurrentHashMap<String, RegionServerWriter>();
1348     // online encoded region name -> region location map
1349     private final Map<String, HRegionLocation> onlineRegions =
1350         new ConcurrentHashMap<String, HRegionLocation>();
1351 
1352     private Map<TableName, HConnection> tableNameToHConnectionMap = Collections
1353         .synchronizedMap(new TreeMap<TableName, HConnection>());
1354     /**
1355      * Map key -> value layout
1356      * <servername>:<table name> -> Queue<Row>
1357      */
1358     private Map<String, List<Pair<HRegionLocation, HLog.Entry>>> serverToBufferQueueMap =
1359         new ConcurrentHashMap<String, List<Pair<HRegionLocation, HLog.Entry>>>();
1360     private List<Throwable> thrown = new ArrayList<Throwable>();
1361 
1362     // The following sink is used in distrubitedLogReplay mode for entries of regions in a disabling
1363     // table. It's a limitation of distributedLogReplay. Because log replay needs a region is
1364     // assigned and online before it can replay wal edits while regions of disabling/disabled table
1365     // won't be assigned by AM. We can retire this code after HBASE-8234.
1366     private LogRecoveredEditsOutputSink logRecoveredEditsOutputSink;
1367     private boolean hasEditsInDisablingOrDisabledTables = false;
1368 
1369     public LogReplayOutputSink(int numWriters) {
1370       super(numWriters);
1371       this.waitRegionOnlineTimeOut = conf.getInt("hbase.splitlog.manager.timeout",
1372         SplitLogManager.DEFAULT_TIMEOUT);
1373       this.logRecoveredEditsOutputSink = new LogRecoveredEditsOutputSink(numWriters);
1374       this.logRecoveredEditsOutputSink.setReporter(reporter);
1375     }
1376 
1377     @Override
1378     void append(RegionEntryBuffer buffer) throws IOException {
1379       List<Entry> entries = buffer.entryBuffer;
1380       if (entries.isEmpty()) {
1381         LOG.warn("got an empty buffer, skipping");
1382         return;
1383       }
1384 
1385       // check if current region in a disabling or disabled table
1386       if (disablingOrDisabledTables.contains(buffer.tableName)) {
1387         // need fall back to old way
1388         logRecoveredEditsOutputSink.append(buffer);
1389         hasEditsInDisablingOrDisabledTables = true;
1390         // store regions we have recovered so far
1391         addToRecoveredRegions(Bytes.toString(buffer.encodedRegionName));
1392         return;
1393       }
1394 
1395       // group entries by region servers
1396       groupEditsByServer(entries);
1397 
1398       // process workitems
1399       String maxLocKey = null;
1400       int maxSize = 0;
1401       List<Pair<HRegionLocation, HLog.Entry>> maxQueue = null;
1402       synchronized (this.serverToBufferQueueMap) {
1403         for (String key : this.serverToBufferQueueMap.keySet()) {
1404           List<Pair<HRegionLocation, HLog.Entry>> curQueue = this.serverToBufferQueueMap.get(key);
1405           if (curQueue.size() > maxSize) {
1406             maxSize = curQueue.size();
1407             maxQueue = curQueue;
1408             maxLocKey = key;
1409           }
1410         }
1411         if (maxSize < minBatchSize
1412             && entryBuffers.totalBuffered < BUFFER_THRESHOLD * entryBuffers.maxHeapUsage) {
1413           // buffer more to process
1414           return;
1415         } else if (maxSize > 0) {
1416           this.serverToBufferQueueMap.remove(maxLocKey);
1417         }
1418       }
1419 
1420       if (maxSize > 0) {
1421         processWorkItems(maxLocKey, maxQueue);
1422       }
1423     }
1424 
1425     private void addToRecoveredRegions(String encodedRegionName) {
1426       if (!recoveredRegions.contains(encodedRegionName)) {
1427         recoveredRegions.add(encodedRegionName);
1428       }
1429     }
1430 
1431     /**
1432      * Helper function to group WALEntries to individual region servers
1433      * @throws IOException
1434      */
1435     private void groupEditsByServer(List<Entry> entries) throws IOException {
1436       Set<TableName> nonExistentTables = null;
1437       Long cachedLastFlushedSequenceId = -1l;
1438       for (HLog.Entry entry : entries) {
1439         WALEdit edit = entry.getEdit();
1440         TableName table = entry.getKey().getTablename();
1441         // clear scopes which isn't needed for recovery
1442         entry.getKey().setScopes(null);
1443         String encodeRegionNameStr = Bytes.toString(entry.getKey().getEncodedRegionName());
1444         // skip edits of non-existent tables
1445         if (nonExistentTables != null && nonExistentTables.contains(table)) {
1446           this.skippedEdits.incrementAndGet();
1447           continue;
1448         }
1449 
1450         Map<byte[], Long> maxStoreSequenceIds = null;
1451         boolean needSkip = false;
1452         HRegionLocation loc = null;
1453         String locKey = null;
1454         List<KeyValue> kvs = edit.getKeyValues();
1455         List<KeyValue> skippedKVs = new ArrayList<KeyValue>();
1456         HConnection hconn = this.getConnectionByTableName(table);
1457 
1458         for (KeyValue kv : kvs) {
1459           byte[] row = kv.getRow();
1460           byte[] family = kv.getFamily();
1461           boolean isCompactionEntry = false;
1462           if (CellUtil.matchingFamily(kv, WALEdit.METAFAMILY)) {
1463             CompactionDescriptor compaction = WALEdit.getCompaction(kv);
1464             if (compaction != null && compaction.hasRegionName()) {
1465               try {
1466                 byte[][] regionName = HRegionInfo.parseRegionName(compaction.getRegionName()
1467                   .toByteArray());
1468                 row = regionName[1]; // startKey of the region
1469                 family = compaction.getFamilyName().toByteArray();
1470                 isCompactionEntry = true;
1471               } catch (Exception ex) {
1472                 LOG.warn("Unexpected exception received, ignoring " + ex);
1473                 skippedKVs.add(kv);
1474                 continue;
1475               }
1476             } else {
1477               skippedKVs.add(kv);
1478               continue;
1479             }
1480           }
1481 
1482           try {
1483             loc =
1484                 locateRegionAndRefreshLastFlushedSequenceId(hconn, table, row,
1485                   encodeRegionNameStr);
1486             // skip replaying the compaction if the region is gone
1487             if (isCompactionEntry && !encodeRegionNameStr.equalsIgnoreCase(
1488               loc.getRegionInfo().getEncodedName())) {
1489               LOG.info("Not replaying a compaction marker for an older region: "
1490                   + encodeRegionNameStr);
1491               needSkip = true;
1492             }
1493           } catch (TableNotFoundException ex) {
1494             // table has been deleted so skip edits of the table
1495             LOG.info("Table " + table + " doesn't exist. Skip log replay for region "
1496                 + encodeRegionNameStr);
1497             lastFlushedSequenceIds.put(encodeRegionNameStr, Long.MAX_VALUE);
1498             if (nonExistentTables == null) {
1499               nonExistentTables = new TreeSet<TableName>();
1500             }
1501             nonExistentTables.add(table);
1502             this.skippedEdits.incrementAndGet();
1503             needSkip = true;
1504             break;
1505           }
1506 
1507           cachedLastFlushedSequenceId =
1508               lastFlushedSequenceIds.get(loc.getRegionInfo().getEncodedName());
1509           if (cachedLastFlushedSequenceId != null
1510               && cachedLastFlushedSequenceId >= entry.getKey().getLogSeqNum()) {
1511             // skip the whole HLog entry
1512             this.skippedEdits.incrementAndGet();
1513             needSkip = true;
1514             break;
1515           } else {
1516             if (maxStoreSequenceIds == null) {
1517               maxStoreSequenceIds =
1518                   regionMaxSeqIdInStores.get(loc.getRegionInfo().getEncodedName());
1519             }
1520             if (maxStoreSequenceIds != null) {
1521               Long maxStoreSeqId = maxStoreSequenceIds.get(family);
1522               if (maxStoreSeqId == null || maxStoreSeqId >= entry.getKey().getLogSeqNum()) {
1523                 // skip current kv if column family doesn't exist anymore or already flushed
1524                 skippedKVs.add(kv);
1525                 continue;
1526               }
1527             }
1528           }
1529         }
1530 
1531         // skip the edit
1532         if (loc == null || needSkip) continue;
1533 
1534         if (!skippedKVs.isEmpty()) {
1535           kvs.removeAll(skippedKVs);
1536         }
1537 
1538         synchronized (serverToBufferQueueMap) {
1539           locKey = loc.getHostnamePort() + KEY_DELIMITER + table;
1540           List<Pair<HRegionLocation, HLog.Entry>> queue = serverToBufferQueueMap.get(locKey);
1541           if (queue == null) {
1542             queue =
1543                 Collections.synchronizedList(new ArrayList<Pair<HRegionLocation, HLog.Entry>>());
1544             serverToBufferQueueMap.put(locKey, queue);
1545           }
1546           queue.add(new Pair<HRegionLocation, HLog.Entry>(loc, entry));
1547         }
1548         // store regions we have recovered so far
1549         addToRecoveredRegions(loc.getRegionInfo().getEncodedName());
1550       }
1551     }
1552 
1553     /**
1554      * Locate destination region based on table name & row. This function also makes sure the
1555      * destination region is online for replay.
1556      * @throws IOException
1557      */
1558     private HRegionLocation locateRegionAndRefreshLastFlushedSequenceId(HConnection hconn,
1559         TableName table, byte[] row, String originalEncodedRegionName) throws IOException {
1560       // fetch location from cache
1561       HRegionLocation loc = onlineRegions.get(originalEncodedRegionName);
1562       if(loc != null) return loc;
1563       // fetch location from hbase:meta directly without using cache to avoid hit old dead server
1564       loc = hconn.getRegionLocation(table, row, true);
1565       if (loc == null) {
1566         throw new IOException("Can't locate location for row:" + Bytes.toString(row)
1567             + " of table:" + table);
1568       }
1569       // check if current row moves to a different region due to region merge/split
1570       if (!originalEncodedRegionName.equalsIgnoreCase(loc.getRegionInfo().getEncodedName())) {
1571         // originalEncodedRegionName should have already flushed
1572         lastFlushedSequenceIds.put(originalEncodedRegionName, Long.MAX_VALUE);
1573         HRegionLocation tmpLoc = onlineRegions.get(loc.getRegionInfo().getEncodedName());
1574         if (tmpLoc != null) return tmpLoc;
1575       }
1576 
1577       Long lastFlushedSequenceId = -1l;
1578       AtomicBoolean isRecovering = new AtomicBoolean(true);
1579       loc = waitUntilRegionOnline(loc, row, this.waitRegionOnlineTimeOut, isRecovering);
1580       if (!isRecovering.get()) {
1581         // region isn't in recovering at all because WAL file may contain a region that has
1582         // been moved to somewhere before hosting RS fails
1583         lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), Long.MAX_VALUE);
1584         LOG.info("logReplay skip region: " + loc.getRegionInfo().getEncodedName()
1585             + " because it's not in recovering.");
1586       } else {
1587         Long cachedLastFlushedSequenceId =
1588             lastFlushedSequenceIds.get(loc.getRegionInfo().getEncodedName());
1589 
1590         // retrieve last flushed sequence Id from ZK. Because region postOpenDeployTasks will
1591         // update the value for the region
1592         RegionStoreSequenceIds ids =
1593             SplitLogManager.getRegionFlushedSequenceId(watcher, failedServerName, loc
1594                 .getRegionInfo().getEncodedName());
1595         if (ids != null) {
1596           lastFlushedSequenceId = ids.getLastFlushedSequenceId();
1597           Map<byte[], Long> storeIds = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
1598           List<StoreSequenceId> maxSeqIdInStores = ids.getStoreSequenceIdList();
1599           for (StoreSequenceId id : maxSeqIdInStores) {
1600             storeIds.put(id.getFamilyName().toByteArray(), id.getSequenceId());
1601           }
1602           regionMaxSeqIdInStores.put(loc.getRegionInfo().getEncodedName(), storeIds);
1603         }
1604 
1605         if (cachedLastFlushedSequenceId == null
1606             || lastFlushedSequenceId > cachedLastFlushedSequenceId) {
1607           lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), lastFlushedSequenceId);
1608         }
1609       }
1610 
1611       onlineRegions.put(loc.getRegionInfo().getEncodedName(), loc);
1612       return loc;
1613     }
1614 
1615     private void processWorkItems(String key, List<Pair<HRegionLocation, HLog.Entry>> actions)
1616         throws IOException {
1617       RegionServerWriter rsw = null;
1618 
1619       long startTime = System.nanoTime();
1620       try {
1621         rsw = getRegionServerWriter(key);
1622         rsw.sink.replayEntries(actions);
1623 
1624         // Pass along summary statistics
1625         rsw.incrementEdits(actions.size());
1626         rsw.incrementNanoTime(System.nanoTime() - startTime);
1627       } catch (IOException e) {
1628         e = RemoteExceptionHandler.checkIOException(e);
1629         LOG.fatal(" Got while writing log entry to log", e);
1630         throw e;
1631       }
1632     }
1633 
1634     /**
1635      * Wait until region is online on the destination region server
1636      * @param loc
1637      * @param row
1638      * @param timeout How long to wait
1639      * @param isRecovering Recovering state of the region interested on destination region server.
1640      * @return True when region is online on the destination region server
1641      * @throws InterruptedException
1642      */
1643     private HRegionLocation waitUntilRegionOnline(HRegionLocation loc, byte[] row,
1644         final long timeout, AtomicBoolean isRecovering)
1645         throws IOException {
1646       final long endTime = EnvironmentEdgeManager.currentTimeMillis() + timeout;
1647       final long pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
1648         HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
1649       boolean reloadLocation = false;
1650       TableName tableName = loc.getRegionInfo().getTable();
1651       int tries = 0;
1652       Throwable cause = null;
1653       while (endTime > EnvironmentEdgeManager.currentTimeMillis()) {
1654         try {
1655           // Try and get regioninfo from the hosting server.
1656           HConnection hconn = getConnectionByTableName(tableName);
1657           if(reloadLocation) {
1658             loc = hconn.getRegionLocation(tableName, row, true);
1659           }
1660           BlockingInterface remoteSvr = hconn.getAdmin(loc.getServerName());
1661           HRegionInfo region = loc.getRegionInfo();
1662           try {
1663             GetRegionInfoRequest request =
1664                 RequestConverter.buildGetRegionInfoRequest(region.getRegionName());
1665             GetRegionInfoResponse response = remoteSvr.getRegionInfo(null, request);
1666             if (HRegionInfo.convert(response.getRegionInfo()) != null) {
1667               isRecovering.set((response.hasIsRecovering()) ? response.getIsRecovering() : true);
1668               return loc;
1669             }
1670           } catch (ServiceException se) {
1671             throw ProtobufUtil.getRemoteException(se);
1672           }
1673         } catch (IOException e) {
1674           cause = e.getCause();
1675           if(!(cause instanceof RegionOpeningException)) {
1676             reloadLocation = true;
1677           }
1678         }
1679         long expectedSleep = ConnectionUtils.getPauseTime(pause, tries);
1680         try {
1681           Thread.sleep(expectedSleep);
1682         } catch (InterruptedException e) {
1683           throw new IOException("Interrupted when waiting region " +
1684               loc.getRegionInfo().getEncodedName() + " online.", e);
1685         }
1686         tries++;
1687       }
1688 
1689       throw new IOException("Timeout when waiting region " + loc.getRegionInfo().getEncodedName() +
1690         " online for " + timeout + " milliseconds.", cause);
1691     }
1692 
1693     @Override
1694     protected boolean flush() throws IOException {
1695       String curLoc = null;
1696       int curSize = 0;
1697       List<Pair<HRegionLocation, HLog.Entry>> curQueue = null;
1698       synchronized (this.serverToBufferQueueMap) {
1699         for (String locationKey : this.serverToBufferQueueMap.keySet()) {
1700           curQueue = this.serverToBufferQueueMap.get(locationKey);
1701           if (!curQueue.isEmpty()) {
1702             curSize = curQueue.size();
1703             curLoc = locationKey;
1704             break;
1705           }
1706         }
1707         if (curSize > 0) {
1708           this.serverToBufferQueueMap.remove(curLoc);
1709         }
1710       }
1711 
1712       if (curSize > 0) {
1713         this.processWorkItems(curLoc, curQueue);
1714         dataAvailable.notifyAll();
1715         return true;
1716       }
1717       return false;
1718     }
1719 
1720     void addWriterError(Throwable t) {
1721       thrown.add(t);
1722     }
1723 
1724     @Override
1725     List<Path> finishWritingAndClose() throws IOException {
1726       try {
1727         if (!finishWriting()) {
1728           return null;
1729         }
1730         if (hasEditsInDisablingOrDisabledTables) {
1731           splits = logRecoveredEditsOutputSink.finishWritingAndClose();
1732         } else {
1733           splits = new ArrayList<Path>();
1734         }
1735         // returns an empty array in order to keep interface same as old way
1736         return splits;
1737       } finally {
1738         List<IOException> thrown = closeRegionServerWriters();
1739         if (thrown != null && !thrown.isEmpty()) {
1740           throw MultipleIOException.createIOException(thrown);
1741         }
1742       }
1743     }
1744 
1745     @Override
1746     int getNumOpenWriters() {
1747       return this.writers.size() + this.logRecoveredEditsOutputSink.getNumOpenWriters();
1748     }
1749 
1750     private List<IOException> closeRegionServerWriters() throws IOException {
1751       List<IOException> result = null;
1752       if (!writersClosed) {
1753         result = Lists.newArrayList();
1754         try {
1755           for (WriterThread t : writerThreads) {
1756             while (t.isAlive()) {
1757               t.shouldStop = true;
1758               t.interrupt();
1759               try {
1760                 t.join(10);
1761               } catch (InterruptedException e) {
1762                 IOException iie = new InterruptedIOException();
1763                 iie.initCause(e);
1764                 throw iie;
1765               }
1766             }
1767           }
1768         } finally {
1769           synchronized (writers) {
1770             for (String locationKey : writers.keySet()) {
1771               RegionServerWriter tmpW = writers.get(locationKey);
1772               try {
1773                 tmpW.close();
1774               } catch (IOException ioe) {
1775                 LOG.error("Couldn't close writer for region server:" + locationKey, ioe);
1776                 result.add(ioe);
1777               }
1778             }
1779           }
1780 
1781           // close connections
1782           synchronized (this.tableNameToHConnectionMap) {
1783             for (TableName tableName : this.tableNameToHConnectionMap.keySet()) {
1784               HConnection hconn = this.tableNameToHConnectionMap.get(tableName);
1785               try {
1786                 hconn.clearRegionCache();
1787                 hconn.close();
1788               } catch (IOException ioe) {
1789                 result.add(ioe);
1790               }
1791             }
1792           }
1793           writersClosed = true;
1794         }
1795       }
1796       return result;
1797     }
1798 
1799     @Override
1800     Map<byte[], Long> getOutputCounts() {
1801       TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
1802       synchronized (writers) {
1803         for (Map.Entry<String, RegionServerWriter> entry : writers.entrySet()) {
1804           ret.put(Bytes.toBytes(entry.getKey()), entry.getValue().editsWritten);
1805         }
1806       }
1807       return ret;
1808     }
1809 
1810     @Override
1811     int getNumberOfRecoveredRegions() {
1812       return this.recoveredRegions.size();
1813     }
1814 
1815     /**
1816      * Get a writer and path for a log starting at the given entry. This function is threadsafe so
1817      * long as multiple threads are always acting on different regions.
1818      * @return null if this region shouldn't output any logs
1819      */
1820     private RegionServerWriter getRegionServerWriter(String loc) throws IOException {
1821       RegionServerWriter ret = writers.get(loc);
1822       if (ret != null) {
1823         return ret;
1824       }
1825 
1826       TableName tableName = getTableFromLocationStr(loc);
1827       if(tableName == null){
1828         throw new IOException("Invalid location string:" + loc + " found. Replay aborted.");
1829       }
1830 
1831       HConnection hconn = getConnectionByTableName(tableName);
1832       synchronized (writers) {
1833         ret = writers.get(loc);
1834         if (ret == null) {
1835           ret = new RegionServerWriter(conf, tableName, hconn);
1836           writers.put(loc, ret);
1837         }
1838       }
1839       return ret;
1840     }
1841 
1842     private HConnection getConnectionByTableName(final TableName tableName) throws IOException {
1843       HConnection hconn = this.tableNameToHConnectionMap.get(tableName);
1844       if (hconn == null) {
1845         synchronized (this.tableNameToHConnectionMap) {
1846           hconn = this.tableNameToHConnectionMap.get(tableName);
1847           if (hconn == null) {
1848             hconn = HConnectionManager.getConnection(conf);
1849             this.tableNameToHConnectionMap.put(tableName, hconn);
1850           }
1851         }
1852       }
1853       return hconn;
1854     }
1855     private TableName getTableFromLocationStr(String loc) {
1856       /**
1857        * location key is in format <server name:port>#<table name>
1858        */
1859       String[] splits = loc.split(KEY_DELIMITER);
1860       if (splits.length != 2) {
1861         return null;
1862       }
1863       return TableName.valueOf(splits[1]);
1864     }
1865   }
1866 
1867   /**
1868    * Private data structure that wraps a receiving RS and collecting statistics about the data
1869    * written to this newly assigned RS.
1870    */
1871   private final static class RegionServerWriter extends SinkWriter {
1872     final WALEditsReplaySink sink;
1873 
1874     RegionServerWriter(final Configuration conf, final TableName tableName, final HConnection conn)
1875         throws IOException {
1876       this.sink = new WALEditsReplaySink(conf, tableName, conn);
1877     }
1878 
1879     void close() throws IOException {
1880     }
1881   }
1882 
1883   static class CorruptedLogFileException extends Exception {
1884     private static final long serialVersionUID = 1L;
1885 
1886     CorruptedLogFileException(String s) {
1887       super(s);
1888     }
1889   }
1890 
1891   /** A struct used by getMutationsFromWALEntry */
1892   public static class MutationReplay {
1893     public MutationReplay(MutationType type, Mutation mutation, long nonceGroup, long nonce) {
1894       this.type = type;
1895       this.mutation = mutation;
1896       if(this.mutation.getDurability() != Durability.SKIP_WAL) {
1897         // using ASYNC_WAL for relay
1898         this.mutation.setDurability(Durability.ASYNC_WAL);
1899       }
1900       this.nonceGroup = nonceGroup;
1901       this.nonce = nonce;
1902     }
1903 
1904     public final MutationType type;
1905     public final Mutation mutation;
1906     public final long nonceGroup;
1907     public final long nonce;
1908   }
1909 
1910  /**
1911   * Tag original sequence number for each edit to be replayed
1912   * @param seqId
1913   * @param cell
1914   */
1915   private static Cell tagReplayLogSequenceNumber(long seqId, Cell cell) {
1916     // Tag puts with original sequence number if there is no LOG_REPLAY_TAG yet
1917     boolean needAddRecoveryTag = true;
1918     if (cell.getTagsLength() > 0) {
1919       Tag tmpTag = Tag.getTag(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength(),
1920         TagType.LOG_REPLAY_TAG_TYPE);
1921       if (tmpTag != null) {
1922         // found an existing log replay tag so reuse it
1923         needAddRecoveryTag = false;
1924       }
1925     }
1926     if (needAddRecoveryTag) {
1927       List<Tag> newTags = new ArrayList<Tag>();
1928       Tag replayTag = new Tag(TagType.LOG_REPLAY_TAG_TYPE, Bytes.toBytes(seqId));
1929       newTags.add(replayTag);
1930       return KeyValue.cloneAndAddTags(cell, newTags);
1931     }
1932     return cell;
1933   }
1934 
1935   /**
1936    * This function is used to construct mutations from a WALEntry. It also reconstructs HLogKey &
1937    * WALEdit from the passed in WALEntry
1938    * @param entry
1939    * @param cells
1940    * @param logEntry pair of HLogKey and WALEdit instance stores HLogKey and WALEdit instances
1941    *          extracted from the passed in WALEntry.
1942    * @param addLogReplayTag
1943    * @return list of Pair<MutationType, Mutation> to be replayed
1944    * @throws IOException
1945    */
1946   public static List<MutationReplay> getMutationsFromWALEntry(WALEntry entry, CellScanner cells,
1947       Pair<HLogKey, WALEdit> logEntry, boolean addLogReplayTag) throws IOException {
1948 
1949     if (entry == null) {
1950       // return an empty array
1951       return new ArrayList<MutationReplay>();
1952     }
1953 
1954     long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ?
1955       entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber();
1956     int count = entry.getAssociatedCellCount();
1957     List<MutationReplay> mutations = new ArrayList<MutationReplay>();
1958     Cell previousCell = null;
1959     Mutation m = null;
1960     HLogKey key = null;
1961     WALEdit val = null;
1962     if (logEntry != null) val = new WALEdit();
1963 
1964     for (int i = 0; i < count; i++) {
1965       // Throw index out of bounds if our cell count is off
1966       if (!cells.advance()) {
1967         throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
1968       }
1969       Cell cell = cells.current();
1970       if (val != null) val.add(KeyValueUtil.ensureKeyValue(cell));
1971 
1972       boolean isNewRowOrType =
1973           previousCell == null || previousCell.getTypeByte() != cell.getTypeByte()
1974               || !CellUtil.matchingRow(previousCell, cell);
1975       if (isNewRowOrType) {
1976         // Create new mutation
1977         if (CellUtil.isDelete(cell)) {
1978           m = new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
1979           // Deletes don't have nonces.
1980           mutations.add(new MutationReplay(
1981               MutationType.DELETE, m, HConstants.NO_NONCE, HConstants.NO_NONCE));
1982         } else {
1983           m = new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
1984           // Puts might come from increment or append, thus we need nonces.
1985           long nonceGroup = entry.getKey().hasNonceGroup()
1986               ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
1987           long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
1988           mutations.add(new MutationReplay(MutationType.PUT, m, nonceGroup, nonce));
1989         }
1990       }
1991       if (CellUtil.isDelete(cell)) {
1992         ((Delete) m).addDeleteMarker(KeyValueUtil.ensureKeyValue(cell));
1993       } else {
1994         Cell tmpNewCell = cell;
1995         if (addLogReplayTag) {
1996           tmpNewCell = tagReplayLogSequenceNumber(replaySeqId, cell);
1997         }
1998         ((Put) m).add(KeyValueUtil.ensureKeyValue(tmpNewCell));
1999       }
2000       previousCell = cell;
2001     }
2002 
2003     // reconstruct HLogKey
2004     if (logEntry != null) {
2005       WALKey walKey = entry.getKey();
2006       List<UUID> clusterIds = new ArrayList<UUID>(walKey.getClusterIdsCount());
2007       for (HBaseProtos.UUID uuid : entry.getKey().getClusterIdsList()) {
2008         clusterIds.add(new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits()));
2009       }
2010       key = new HLogKey(walKey.getEncodedRegionName().toByteArray(), TableName.valueOf(walKey
2011               .getTableName().toByteArray()), replaySeqId, walKey.getWriteTime(), clusterIds,
2012               walKey.getNonceGroup(), walKey.getNonce());
2013       logEntry.setFirst(key);
2014       logEntry.setSecond(val);
2015     }
2016 
2017     return mutations;
2018   }
2019 }