View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver.wal;
20  
21  import java.io.EOFException;
22  import java.io.FileNotFoundException;
23  import java.io.IOException;
24  import java.io.InterruptedIOException;
25  import java.text.ParseException;
26  import java.util.ArrayList;
27  import java.util.Collections;
28  import java.util.HashSet;
29  import java.util.LinkedList;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.Set;
33  import java.util.TreeMap;
34  import java.util.TreeSet;
35  import java.util.UUID;
36  import java.util.concurrent.Callable;
37  import java.util.concurrent.CompletionService;
38  import java.util.concurrent.ConcurrentHashMap;
39  import java.util.concurrent.ExecutionException;
40  import java.util.concurrent.ExecutorCompletionService;
41  import java.util.concurrent.Future;
42  import java.util.concurrent.ThreadFactory;
43  import java.util.concurrent.ThreadPoolExecutor;
44  import java.util.concurrent.TimeUnit;
45  import java.util.concurrent.atomic.AtomicBoolean;
46  import java.util.concurrent.atomic.AtomicLong;
47  import java.util.concurrent.atomic.AtomicReference;
48  
49  import org.apache.commons.logging.Log;
50  import org.apache.commons.logging.LogFactory;
51  import org.apache.hadoop.classification.InterfaceAudience;
52  import org.apache.hadoop.conf.Configuration;
53  import org.apache.hadoop.fs.FileStatus;
54  import org.apache.hadoop.fs.FileSystem;
55  import org.apache.hadoop.fs.Path;
56  import org.apache.hadoop.hbase.Cell;
57  import org.apache.hadoop.hbase.CellScanner;
58  import org.apache.hadoop.hbase.CellUtil;
59  import org.apache.hadoop.hbase.CoordinatedStateException;
60  import org.apache.hadoop.hbase.CoordinatedStateManager;
61  import org.apache.hadoop.hbase.HBaseConfiguration;
62  import org.apache.hadoop.hbase.HConstants;
63  import org.apache.hadoop.hbase.HRegionInfo;
64  import org.apache.hadoop.hbase.HRegionLocation;
65  import org.apache.hadoop.hbase.KeyValue;
66  import org.apache.hadoop.hbase.KeyValueUtil;
67  import org.apache.hadoop.hbase.ServerName;
68  import org.apache.hadoop.hbase.TableName;
69  import org.apache.hadoop.hbase.TableNotFoundException;
70  import org.apache.hadoop.hbase.TableStateManager;
71  import org.apache.hadoop.hbase.Tag;
72  import org.apache.hadoop.hbase.TagType;
73  import org.apache.hadoop.hbase.client.ConnectionUtils;
74  import org.apache.hadoop.hbase.client.Delete;
75  import org.apache.hadoop.hbase.client.Durability;
76  import org.apache.hadoop.hbase.client.HConnection;
77  import org.apache.hadoop.hbase.client.HConnectionManager;
78  import org.apache.hadoop.hbase.client.Mutation;
79  import org.apache.hadoop.hbase.client.Put;
80  import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
81  import org.apache.hadoop.hbase.io.HeapSize;
82  import org.apache.hadoop.hbase.master.SplitLogManager;
83  import org.apache.hadoop.hbase.monitoring.MonitoredTask;
84  import org.apache.hadoop.hbase.monitoring.TaskMonitor;
85  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
86  import org.apache.hadoop.hbase.protobuf.RequestConverter;
87  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
88  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest;
89  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse;
90  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
91  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
92  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
93  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
94  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey;
95  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
96  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds;
97  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
98  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.StoreSequenceId;
99  import org.apache.hadoop.hbase.regionserver.HRegion;
100 import org.apache.hadoop.hbase.regionserver.LastSequenceId;
101 import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
102 import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
103 import org.apache.hadoop.hbase.regionserver.wal.HLog.Writer;
104 import org.apache.hadoop.hbase.util.Bytes;
105 import org.apache.hadoop.hbase.util.CancelableProgressable;
106 import org.apache.hadoop.hbase.util.ClassSize;
107 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
108 import org.apache.hadoop.hbase.util.FSUtils;
109 import org.apache.hadoop.hbase.util.Pair;
110 import org.apache.hadoop.hbase.util.Threads;
111 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
112 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
113 import org.apache.hadoop.io.MultipleIOException;
114 import org.apache.hadoop.ipc.RemoteException;
115 
116 import com.google.common.base.Preconditions;
117 import com.google.common.collect.Lists;
118 import com.google.protobuf.ServiceException;
119 
120 /**
121  * This class is responsible for splitting up a bunch of regionserver commit log
122  * files that are no longer being written to, into new files, one per region for
123  * region to replay on startup. Delete the old log files when finished.
124  */
125 @InterfaceAudience.Private
126 public class HLogSplitter {
127   static final Log LOG = LogFactory.getLog(HLogSplitter.class);
128 
129   // Parameters for split process
130   protected final Path rootDir;
131   protected final FileSystem fs;
132   protected final Configuration conf;
133 
134   // Major subcomponents of the split process.
135   // These are separated into inner classes to make testing easier.
136   PipelineController controller;
137   OutputSink outputSink;
138   EntryBuffers entryBuffers;
139 
140   private Set<TableName> disablingOrDisabledTables =
141       new HashSet<TableName>();
142   private ZooKeeperWatcher watcher;
143   private CoordinatedStateManager csm;
144 
145   private MonitoredTask status;
146 
147   // For checking the latest flushed sequence id
148   protected final LastSequenceId sequenceIdChecker;
149 
150   protected boolean distributedLogReplay;
151 
152   // Map encodedRegionName -> lastFlushedSequenceId
153   protected Map<String, Long> lastFlushedSequenceIds = new ConcurrentHashMap<String, Long>();
154 
155   // Map encodedRegionName -> maxSeqIdInStores
156   protected Map<String, Map<byte[], Long>> regionMaxSeqIdInStores =
157       new ConcurrentHashMap<String, Map<byte[], Long>>();
158 
159   // Failed region server that the wal file being split belongs to
160   protected String failedServerName = "";
161 
162   // Number of writer threads
163   private final int numWriterThreads;
164 
165   // Min batch size when replay WAL edits
166   private final int minBatchSize;
167 
168   HLogSplitter(Configuration conf, Path rootDir,
169       FileSystem fs, LastSequenceId idChecker, ZooKeeperWatcher zkw,
170       CoordinatedStateManager csm, RecoveryMode mode) {
171     this.conf = HBaseConfiguration.create(conf);
172     String codecClassName = conf
173         .get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
174     this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName);
175     this.rootDir = rootDir;
176     this.fs = fs;
177     this.sequenceIdChecker = idChecker;
178     this.watcher = zkw;
179     this.csm = csm;
180     this.controller = new PipelineController();
181 
182     entryBuffers = new EntryBuffers(controller,
183         this.conf.getInt("hbase.regionserver.hlog.splitlog.buffersize",
184             128*1024*1024));
185 
186     // a larger minBatchSize may slow down recovery because replay writer has to wait for
187     // enough edits before replaying them
188     this.minBatchSize = this.conf.getInt("hbase.regionserver.wal.logreplay.batch.size", 64);
189     this.distributedLogReplay = (RecoveryMode.LOG_REPLAY == mode);
190 
191     this.numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
192     if (zkw != null && csm != null && this.distributedLogReplay) {
193       outputSink = new LogReplayOutputSink(controller, entryBuffers, numWriterThreads);
194     } else {
195       if (this.distributedLogReplay) {
196         LOG.info("ZooKeeperWatcher is passed in as NULL so disable distrubitedLogRepaly.");
197       }
198       this.distributedLogReplay = false;
199       outputSink = new LogRecoveredEditsOutputSink(controller, entryBuffers, numWriterThreads);
200     }
201 
202   }
203 
204   /**
205    * Splits a HLog file into region's recovered-edits directory.
206    * This is the main entry point for distributed log splitting from SplitLogWorker.
207    * <p>
208    * If the log file has N regions then N recovered.edits files will be produced.
209    * <p>
210    * @param rootDir
211    * @param logfile
212    * @param fs
213    * @param conf
214    * @param reporter
215    * @param idChecker
216    * @param zkw ZooKeeperWatcher if it's null, we will back to the old-style log splitting where we
217    *          dump out recoved.edits files for regions to replay on.
218    * @return false if it is interrupted by the progress-able.
219    * @throws IOException
220    */
221   public static boolean splitLogFile(Path rootDir, FileStatus logfile, FileSystem fs,
222       Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker,
223       ZooKeeperWatcher zkw, CoordinatedStateManager cp, RecoveryMode mode) throws IOException {
224     HLogSplitter s = new HLogSplitter(conf, rootDir, fs, idChecker, zkw, cp, mode);
225     return s.splitLogFile(logfile, reporter);
226   }
227 
228   // A wrapper to split one log folder using the method used by distributed
229   // log splitting. Used by tools and unit tests. It should be package private.
230   // It is public only because TestWALObserver is in a different package,
231   // which uses this method to to log splitting.
232   public static List<Path> split(Path rootDir, Path logDir, Path oldLogDir,
233       FileSystem fs, Configuration conf) throws IOException {
234     FileStatus[] logfiles = fs.listStatus(logDir);
235     List<Path> splits = new ArrayList<Path>();
236     if (logfiles != null && logfiles.length > 0) {
237       for (FileStatus logfile: logfiles) {
238         HLogSplitter s = new HLogSplitter(conf, rootDir, fs, null, null, null,
239           RecoveryMode.LOG_SPLITTING);
240         if (s.splitLogFile(logfile, null)) {
241           finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf);
242           if (s.outputSink.splits != null) {
243             splits.addAll(s.outputSink.splits);
244           }
245         }
246       }
247     }
248     if (!fs.delete(logDir, true)) {
249       throw new IOException("Unable to delete src dir: " + logDir);
250     }
251     return splits;
252   }
253 
254   // The real log splitter. It just splits one log file.
255   boolean splitLogFile(FileStatus logfile,
256       CancelableProgressable reporter) throws IOException {
257     boolean isCorrupted = false;
258     Preconditions.checkState(status == null);
259     boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors",
260       HLog.SPLIT_SKIP_ERRORS_DEFAULT);
261     int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024);
262     Path logPath = logfile.getPath();
263     boolean outputSinkStarted = false;
264     boolean progress_failed = false;
265     int editsCount = 0;
266     int editsSkipped = 0;
267 
268     status =
269         TaskMonitor.get().createStatus(
270           "Splitting log file " + logfile.getPath() + "into a temporary staging area.");
271     try {
272       long logLength = logfile.getLen();
273       LOG.info("Splitting hlog: " + logPath + ", length=" + logLength);
274       LOG.info("DistributedLogReplay = " + this.distributedLogReplay);
275       status.setStatus("Opening log file");
276       if (reporter != null && !reporter.progress()) {
277         progress_failed = true;
278         return false;
279       }
280       Reader in = null;
281       try {
282         in = getReader(fs, logfile, conf, skipErrors, reporter);
283       } catch (CorruptedLogFileException e) {
284         LOG.warn("Could not get reader, corrupted log file " + logPath, e);
285         ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs);
286         isCorrupted = true;
287       }
288       if (in == null) {
289         LOG.warn("Nothing to split in log file " + logPath);
290         return true;
291       }
292       if(watcher != null && csm != null) {
293         try {
294           TableStateManager tsm = csm.getTableStateManager();
295           disablingOrDisabledTables = tsm.getTablesInStates(
296             ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING);
297         } catch (CoordinatedStateException e) {
298           throw new IOException("Can't get disabling/disabled tables", e);
299         }
300       }
301       int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3);
302       int numOpenedFilesLastCheck = 0;
303       outputSink.setReporter(reporter);
304       outputSink.startWriterThreads();
305       outputSinkStarted = true;
306       Entry entry;
307       Long lastFlushedSequenceId = -1L;
308       ServerName serverName = HLogUtil.getServerNameFromHLogDirectoryName(logPath);
309       failedServerName = (serverName == null) ? "" : serverName.getServerName();
310       while ((entry = getNextLogLine(in, logPath, skipErrors)) != null) {
311         byte[] region = entry.getKey().getEncodedRegionName();
312         String key = Bytes.toString(region);
313         lastFlushedSequenceId = lastFlushedSequenceIds.get(key);
314         if (lastFlushedSequenceId == null) {
315           if (this.distributedLogReplay) {
316             RegionStoreSequenceIds ids =
317                 SplitLogManager.getRegionFlushedSequenceId(this.watcher, failedServerName, key);
318             if (ids != null) {
319               lastFlushedSequenceId = ids.getLastFlushedSequenceId();
320             }
321           } else if (sequenceIdChecker != null) {
322             lastFlushedSequenceId = sequenceIdChecker.getLastSequenceId(region);
323           }
324           if (lastFlushedSequenceId == null) {
325             lastFlushedSequenceId = -1L;
326           }
327           lastFlushedSequenceIds.put(key, lastFlushedSequenceId);
328         }
329         if (lastFlushedSequenceId >= entry.getKey().getLogSeqNum()) {
330           editsSkipped++;
331           continue;
332         }
333         entryBuffers.appendEntry(entry);
334         editsCount++;
335         int moreWritersFromLastCheck = this.getNumOpenWriters() - numOpenedFilesLastCheck;
336         // If sufficient edits have passed, check if we should report progress.
337         if (editsCount % interval == 0
338             || moreWritersFromLastCheck > numOpenedFilesBeforeReporting) {
339           numOpenedFilesLastCheck = this.getNumOpenWriters();
340           String countsStr = (editsCount - (editsSkipped + outputSink.getSkippedEdits()))
341               + " edits, skipped " + editsSkipped + " edits.";
342           status.setStatus("Split " + countsStr);
343           if (reporter != null && !reporter.progress()) {
344             progress_failed = true;
345             return false;
346           }
347         }
348       }
349     } catch (InterruptedException ie) {
350       IOException iie = new InterruptedIOException();
351       iie.initCause(ie);
352       throw iie;
353     } catch (CorruptedLogFileException e) {
354       LOG.warn("Could not parse, corrupted log file " + logPath, e);
355       ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs);
356       isCorrupted = true;
357     } catch (IOException e) {
358       e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
359       throw e;
360     } finally {
361       LOG.debug("Finishing writing output logs and closing down.");
362       try {
363         if (outputSinkStarted) {
364           // Set progress_failed to true as the immediate following statement will reset its value
365           // when finishWritingAndClose() throws exception, progress_failed has the right value
366           progress_failed = true;
367           progress_failed = outputSink.finishWritingAndClose() == null;
368         }
369       } finally {
370         String msg =
371             "Processed " + editsCount + " edits across " + outputSink.getNumberOfRecoveredRegions()
372                 + " regions; log file=" + logPath + " is corrupted = " + isCorrupted
373                 + " progress failed = " + progress_failed;
374         LOG.info(msg);
375         status.markComplete(msg);
376       }
377     }
378     return !progress_failed;
379   }
380 
381   /**
382    * Completes the work done by splitLogFile by archiving logs
383    * <p>
384    * It is invoked by SplitLogManager once it knows that one of the
385    * SplitLogWorkers have completed the splitLogFile() part. If the master
386    * crashes then this function might get called multiple times.
387    * <p>
388    * @param logfile
389    * @param conf
390    * @throws IOException
391    */
392   public static void finishSplitLogFile(String logfile,
393       Configuration conf)  throws IOException {
394     Path rootdir = FSUtils.getRootDir(conf);
395     Path oldLogDir = new Path(rootdir, HConstants.HREGION_OLDLOGDIR_NAME);
396     Path logPath;
397     if (FSUtils.isStartingWithPath(rootdir, logfile)) {
398       logPath = new Path(logfile);
399     } else {
400       logPath = new Path(rootdir, logfile);
401     }
402     finishSplitLogFile(rootdir, oldLogDir, logPath, conf);
403   }
404 
405   static void finishSplitLogFile(Path rootdir, Path oldLogDir,
406       Path logPath, Configuration conf) throws IOException {
407     List<Path> processedLogs = new ArrayList<Path>();
408     List<Path> corruptedLogs = new ArrayList<Path>();
409     FileSystem fs;
410     fs = rootdir.getFileSystem(conf);
411     if (ZKSplitLog.isCorrupted(rootdir, logPath.getName(), fs)) {
412       corruptedLogs.add(logPath);
413     } else {
414       processedLogs.add(logPath);
415     }
416     archiveLogs(corruptedLogs, processedLogs, oldLogDir, fs, conf);
417     Path stagingDir = ZKSplitLog.getSplitLogDir(rootdir, logPath.getName());
418     fs.delete(stagingDir, true);
419   }
420 
421   /**
422    * Moves processed logs to a oldLogDir after successful processing Moves
423    * corrupted logs (any log that couldn't be successfully parsed to corruptDir
424    * (.corrupt) for later investigation
425    *
426    * @param corruptedLogs
427    * @param processedLogs
428    * @param oldLogDir
429    * @param fs
430    * @param conf
431    * @throws IOException
432    */
433   private static void archiveLogs(
434       final List<Path> corruptedLogs,
435       final List<Path> processedLogs, final Path oldLogDir,
436       final FileSystem fs, final Configuration conf) throws IOException {
437     final Path corruptDir = new Path(FSUtils.getRootDir(conf), conf.get(
438         "hbase.regionserver.hlog.splitlog.corrupt.dir",  HConstants.CORRUPT_DIR_NAME));
439 
440     if (!fs.mkdirs(corruptDir)) {
441       LOG.info("Unable to mkdir " + corruptDir);
442     }
443     fs.mkdirs(oldLogDir);
444 
445     // this method can get restarted or called multiple times for archiving
446     // the same log files.
447     for (Path corrupted : corruptedLogs) {
448       Path p = new Path(corruptDir, corrupted.getName());
449       if (fs.exists(corrupted)) {
450         if (!fs.rename(corrupted, p)) {
451           LOG.warn("Unable to move corrupted log " + corrupted + " to " + p);
452         } else {
453           LOG.warn("Moved corrupted log " + corrupted + " to " + p);
454         }
455       }
456     }
457 
458     for (Path p : processedLogs) {
459       Path newPath = FSHLog.getHLogArchivePath(oldLogDir, p);
460       if (fs.exists(p)) {
461         if (!FSUtils.renameAndSetModifyTime(fs, p, newPath)) {
462           LOG.warn("Unable to move  " + p + " to " + newPath);
463         } else {
464           LOG.info("Archived processed log " + p + " to " + newPath);
465         }
466       }
467     }
468   }
469 
470   /**
471    * Path to a file under RECOVERED_EDITS_DIR directory of the region found in
472    * <code>logEntry</code> named for the sequenceid in the passed
473    * <code>logEntry</code>: e.g. /hbase/some_table/2323432434/recovered.edits/2332.
474    * This method also ensures existence of RECOVERED_EDITS_DIR under the region
475    * creating it if necessary.
476    * @param fs
477    * @param logEntry
478    * @param rootDir HBase root dir.
479    * @return Path to file into which to dump split log edits.
480    * @throws IOException
481    */
482   @SuppressWarnings("deprecation")
483   static Path getRegionSplitEditsPath(final FileSystem fs,
484       final Entry logEntry, final Path rootDir, boolean isCreate)
485   throws IOException {
486     Path tableDir = FSUtils.getTableDir(rootDir, logEntry.getKey().getTablename());
487     String encodedRegionName = Bytes.toString(logEntry.getKey().getEncodedRegionName());
488     Path regiondir = HRegion.getRegionDir(tableDir, encodedRegionName);
489     Path dir = HLogUtil.getRegionDirRecoveredEditsDir(regiondir);
490 
491     if (!fs.exists(regiondir)) {
492       LOG.info("This region's directory doesn't exist: "
493           + regiondir.toString() + ". It is very likely that it was" +
494           " already split so it's safe to discard those edits.");
495       return null;
496     }
497     if (fs.exists(dir) && fs.isFile(dir)) {
498       Path tmp = new Path("/tmp");
499       if (!fs.exists(tmp)) {
500         fs.mkdirs(tmp);
501       }
502       tmp = new Path(tmp,
503         HConstants.RECOVERED_EDITS_DIR + "_" + encodedRegionName);
504       LOG.warn("Found existing old file: " + dir + ". It could be some "
505         + "leftover of an old installation. It should be a folder instead. "
506         + "So moving it to " + tmp);
507       if (!fs.rename(dir, tmp)) {
508         LOG.warn("Failed to sideline old file " + dir);
509       }
510     }
511 
512     if (isCreate && !fs.exists(dir)) {
513       if (!fs.mkdirs(dir)) LOG.warn("mkdir failed on " + dir);
514     }
515     // Append file name ends with RECOVERED_LOG_TMPFILE_SUFFIX to ensure
516     // region's replayRecoveredEdits will not delete it
517     String fileName = formatRecoveredEditsFileName(logEntry.getKey().getLogSeqNum());
518     fileName = getTmpRecoveredEditsFileName(fileName);
519     return new Path(dir, fileName);
520   }
521 
522   static String getTmpRecoveredEditsFileName(String fileName) {
523     return fileName + HLog.RECOVERED_LOG_TMPFILE_SUFFIX;
524   }
525 
526   /**
527    * Get the completed recovered edits file path, renaming it to be by last edit
528    * in the file from its first edit. Then we could use the name to skip
529    * recovered edits when doing {@link HRegion#replayRecoveredEditsIfAny}.
530    * @param srcPath
531    * @param maximumEditLogSeqNum
532    * @return dstPath take file's last edit log seq num as the name
533    */
534   static Path getCompletedRecoveredEditsFilePath(Path srcPath,
535       Long maximumEditLogSeqNum) {
536     String fileName = formatRecoveredEditsFileName(maximumEditLogSeqNum);
537     return new Path(srcPath.getParent(), fileName);
538   }
539 
540   static String formatRecoveredEditsFileName(final long seqid) {
541     return String.format("%019d", seqid);
542   }
543 
544   /**
545    * Create a new {@link Reader} for reading logs to split.
546    *
547    * @param fs
548    * @param file
549    * @param conf
550    * @return A new Reader instance
551    * @throws IOException
552    * @throws CorruptedLogFileException
553    */
554   protected Reader getReader(FileSystem fs, FileStatus file, Configuration conf,
555       boolean skipErrors, CancelableProgressable reporter)
556       throws IOException, CorruptedLogFileException {
557     Path path = file.getPath();
558     long length = file.getLen();
559     Reader in;
560 
561     // Check for possibly empty file. With appends, currently Hadoop reports a
562     // zero length even if the file has been sync'd. Revisit if HDFS-376 or
563     // HDFS-878 is committed.
564     if (length <= 0) {
565       LOG.warn("File " + path + " might be still open, length is 0");
566     }
567 
568     try {
569       FSUtils.getInstance(fs, conf).recoverFileLease(fs, path, conf, reporter);
570       try {
571         in = getReader(fs, path, conf, reporter);
572       } catch (EOFException e) {
573         if (length <= 0) {
574           // TODO should we ignore an empty, not-last log file if skip.errors
575           // is false? Either way, the caller should decide what to do. E.g.
576           // ignore if this is the last log in sequence.
577           // TODO is this scenario still possible if the log has been
578           // recovered (i.e. closed)
579           LOG.warn("Could not open " + path + " for reading. File is empty", e);
580           return null;
581         } else {
582           // EOFException being ignored
583           return null;
584         }
585       }
586     } catch (IOException e) {
587       if (e instanceof FileNotFoundException) {
588         // A wal file may not exist anymore. Nothing can be recovered so move on
589         LOG.warn("File " + path + " doesn't exist anymore.", e);
590         return null;
591       }
592       if (!skipErrors || e instanceof InterruptedIOException) {
593         throw e; // Don't mark the file corrupted if interrupted, or not skipErrors
594       }
595       CorruptedLogFileException t =
596         new CorruptedLogFileException("skipErrors=true Could not open hlog " +
597             path + " ignoring");
598       t.initCause(e);
599       throw t;
600     }
601     return in;
602   }
603 
604   static private Entry getNextLogLine(Reader in, Path path, boolean skipErrors)
605   throws CorruptedLogFileException, IOException {
606     try {
607       return in.next();
608     } catch (EOFException eof) {
609       // truncated files are expected if a RS crashes (see HBASE-2643)
610       LOG.info("EOF from hlog " + path + ".  continuing");
611       return null;
612     } catch (IOException e) {
613       // If the IOE resulted from bad file format,
614       // then this problem is idempotent and retrying won't help
615       if (e.getCause() != null &&
616           (e.getCause() instanceof ParseException ||
617            e.getCause() instanceof org.apache.hadoop.fs.ChecksumException)) {
618         LOG.warn("Parse exception " + e.getCause().toString() + " from hlog "
619            + path + ".  continuing");
620         return null;
621       }
622       if (!skipErrors) {
623         throw e;
624       }
625       CorruptedLogFileException t =
626         new CorruptedLogFileException("skipErrors=true Ignoring exception" +
627             " while parsing hlog " + path + ". Marking as corrupted");
628       t.initCause(e);
629       throw t;
630     }
631   }
632 
633   /**
634    * Create a new {@link Writer} for writing log splits.
635    */
636   protected Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
637       throws IOException {
638     return HLogFactory.createRecoveredEditsWriter(fs, logfile, conf);
639   }
640 
641   /**
642    * Create a new {@link Reader} for reading logs to split.
643    */
644   protected Reader getReader(FileSystem fs, Path curLogFile,
645       Configuration conf, CancelableProgressable reporter) throws IOException {
646     return HLogFactory.createReader(fs, curLogFile, conf, reporter);
647   }
648 
649   /**
650    * Get current open writers
651    */
652   private int getNumOpenWriters() {
653     int result = 0;
654     if (this.outputSink != null) {
655       result += this.outputSink.getNumOpenWriters();
656     }
657     return result;
658   }
659 
660   /**
661    * Contains some methods to control WAL-entries producer / consumer interactions
662    */
663   public static class PipelineController {
664     // If an exception is thrown by one of the other threads, it will be
665     // stored here.
666     AtomicReference<Throwable> thrown = new AtomicReference<Throwable>();
667 
668     // Wait/notify for when data has been produced by the writer thread,
669     // consumed by the reader thread, or an exception occurred
670     public final Object dataAvailable = new Object();
671 
672     void writerThreadError(Throwable t) {
673       thrown.compareAndSet(null, t);
674     }
675 
676     /**
677      * Check for errors in the writer threads. If any is found, rethrow it.
678      */
679     void checkForErrors() throws IOException {
680       Throwable thrown = this.thrown.get();
681       if (thrown == null) return;
682       if (thrown instanceof IOException) {
683         throw new IOException(thrown);
684       } else {
685         throw new RuntimeException(thrown);
686       }
687     }
688   }
689 
690   /**
691    * Class which accumulates edits and separates them into a buffer per region
692    * while simultaneously accounting RAM usage. Blocks if the RAM usage crosses
693    * a predefined threshold.
694    *
695    * Writer threads then pull region-specific buffers from this class.
696    */
697   public static class EntryBuffers {
698     PipelineController controller;
699 
700     Map<byte[], RegionEntryBuffer> buffers =
701       new TreeMap<byte[], RegionEntryBuffer>(Bytes.BYTES_COMPARATOR);
702 
703     /* Track which regions are currently in the middle of writing. We don't allow
704        an IO thread to pick up bytes from a region if we're already writing
705        data for that region in a different IO thread. */
706     Set<byte[]> currentlyWriting = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
707 
708     long totalBuffered = 0;
709     long maxHeapUsage;
710 
711     public EntryBuffers(PipelineController controller, long maxHeapUsage) {
712       this.controller = controller;
713       this.maxHeapUsage = maxHeapUsage;
714     }
715 
716     /**
717      * Append a log entry into the corresponding region buffer.
718      * Blocks if the total heap usage has crossed the specified threshold.
719      *
720      * @throws InterruptedException
721      * @throws IOException
722      */
723     public void appendEntry(Entry entry) throws InterruptedException, IOException {
724       HLogKey key = entry.getKey();
725 
726       RegionEntryBuffer buffer;
727       long incrHeap;
728       synchronized (this) {
729         buffer = buffers.get(key.getEncodedRegionName());
730         if (buffer == null) {
731           buffer = new RegionEntryBuffer(key.getTablename(), key.getEncodedRegionName());
732           buffers.put(key.getEncodedRegionName(), buffer);
733         }
734         incrHeap= buffer.appendEntry(entry);
735       }
736 
737       // If we crossed the chunk threshold, wait for more space to be available
738       synchronized (controller.dataAvailable) {
739         totalBuffered += incrHeap;
740         while (totalBuffered > maxHeapUsage && controller.thrown.get() == null) {
741           LOG.debug("Used " + totalBuffered + " bytes of buffered edits, waiting for IO threads...");
742           controller.dataAvailable.wait(2000);
743         }
744         controller.dataAvailable.notifyAll();
745       }
746       controller.checkForErrors();
747     }
748 
749     /**
750      * @return RegionEntryBuffer a buffer of edits to be written or replayed.
751      */
752     synchronized RegionEntryBuffer getChunkToWrite() {
753       long biggestSize = 0;
754       byte[] biggestBufferKey = null;
755 
756       for (Map.Entry<byte[], RegionEntryBuffer> entry : buffers.entrySet()) {
757         long size = entry.getValue().heapSize();
758         if (size > biggestSize && (!currentlyWriting.contains(entry.getKey()))) {
759           biggestSize = size;
760           biggestBufferKey = entry.getKey();
761         }
762       }
763       if (biggestBufferKey == null) {
764         return null;
765       }
766 
767       RegionEntryBuffer buffer = buffers.remove(biggestBufferKey);
768       currentlyWriting.add(biggestBufferKey);
769       return buffer;
770     }
771 
772     void doneWriting(RegionEntryBuffer buffer) {
773       synchronized (this) {
774         boolean removed = currentlyWriting.remove(buffer.encodedRegionName);
775         assert removed;
776       }
777       long size = buffer.heapSize();
778 
779       synchronized (controller.dataAvailable) {
780         totalBuffered -= size;
781         // We may unblock writers
782         controller.dataAvailable.notifyAll();
783       }
784     }
785 
786     synchronized boolean isRegionCurrentlyWriting(byte[] region) {
787       return currentlyWriting.contains(region);
788     }
789 
790     public void waitUntilDrained() {
791       synchronized (controller.dataAvailable) {
792         while (totalBuffered > 0) {
793           try {
794             controller.dataAvailable.wait(2000);
795           } catch (InterruptedException e) {
796             LOG.warn("Got intrerrupted while waiting for EntryBuffers is drained");
797             Thread.interrupted();
798             break;
799           }
800         }
801       }
802     }
803   }
804 
805   /**
806    * A buffer of some number of edits for a given region.
807    * This accumulates edits and also provides a memory optimization in order to
808    * share a single byte array instance for the table and region name.
809    * Also tracks memory usage of the accumulated edits.
810    */
811   public static class RegionEntryBuffer implements HeapSize {
812     long heapInBuffer = 0;
813     List<Entry> entryBuffer;
814     TableName tableName;
815     byte[] encodedRegionName;
816 
817     RegionEntryBuffer(TableName tableName, byte[] region) {
818       this.tableName = tableName;
819       this.encodedRegionName = region;
820       this.entryBuffer = new LinkedList<Entry>();
821     }
822 
823     long appendEntry(Entry entry) {
824       internify(entry);
825       entryBuffer.add(entry);
826       long incrHeap = entry.getEdit().heapSize() +
827         ClassSize.align(2 * ClassSize.REFERENCE) + // HLogKey pointers
828         0; // TODO linkedlist entry
829       heapInBuffer += incrHeap;
830       return incrHeap;
831     }
832 
833     private void internify(Entry entry) {
834       HLogKey k = entry.getKey();
835       k.internTableName(this.tableName);
836       k.internEncodedRegionName(this.encodedRegionName);
837     }
838 
839     @Override
840     public long heapSize() {
841       return heapInBuffer;
842     }
843 
844     public byte[] getEncodedRegionName() {
845       return encodedRegionName;
846     }
847 
848     public List<Entry> getEntryBuffer() {
849       return entryBuffer;
850     }
851 
852     public TableName getTableName() {
853       return tableName;
854     }
855   }
856 
857   public static class WriterThread extends Thread {
858     private volatile boolean shouldStop = false;
859     private PipelineController controller;
860     private EntryBuffers entryBuffers;
861     private OutputSink outputSink = null;
862 
863     WriterThread(PipelineController controller, EntryBuffers entryBuffers, OutputSink sink, int i){
864       super(Thread.currentThread().getName() + "-Writer-" + i);
865       this.controller = controller;
866       this.entryBuffers = entryBuffers;
867       outputSink = sink;
868     }
869 
870     @Override
871     public void run()  {
872       try {
873         doRun();
874       } catch (Throwable t) {
875         LOG.error("Exiting thread", t);
876         controller.writerThreadError(t);
877       }
878     }
879 
880     private void doRun() throws IOException {
881       LOG.debug("Writer thread " + this + ": starting");
882       while (true) {
883         RegionEntryBuffer buffer = entryBuffers.getChunkToWrite();
884         if (buffer == null) {
885           // No data currently available, wait on some more to show up
886           synchronized (controller.dataAvailable) {
887             if (shouldStop && !this.outputSink.flush()) {
888               return;
889             }
890             try {
891               controller.dataAvailable.wait(500);
892             } catch (InterruptedException ie) {
893               if (!shouldStop) {
894                 throw new RuntimeException(ie);
895               }
896             }
897           }
898           continue;
899         }
900 
901         assert buffer != null;
902         try {
903           writeBuffer(buffer);
904         } finally {
905           entryBuffers.doneWriting(buffer);
906         }
907       }
908     }
909 
910     private void writeBuffer(RegionEntryBuffer buffer) throws IOException {
911       outputSink.append(buffer);
912     }
913 
914     void finish() {
915       synchronized (controller.dataAvailable) {
916         shouldStop = true;
917         controller.dataAvailable.notifyAll();
918       }
919     }
920   }
921 
922   /**
923    * The following class is an abstraction class to provide a common interface to support both
924    * existing recovered edits file sink and region server WAL edits replay sink
925    */
926   public static abstract class OutputSink {
927 
928     protected PipelineController controller;
929     protected EntryBuffers entryBuffers;
930 
931     protected Map<byte[], SinkWriter> writers = Collections
932         .synchronizedMap(new TreeMap<byte[], SinkWriter>(Bytes.BYTES_COMPARATOR));;
933 
934     protected final Map<byte[], Long> regionMaximumEditLogSeqNum = Collections
935         .synchronizedMap(new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR));
936 
937     protected final List<WriterThread> writerThreads = Lists.newArrayList();
938 
939     /* Set of regions which we've decided should not output edits */
940     protected final Set<byte[]> blacklistedRegions = Collections
941         .synchronizedSet(new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR));
942 
943     protected boolean closeAndCleanCompleted = false;
944 
945     protected boolean writersClosed = false;
946 
947     protected final int numThreads;
948 
949     protected CancelableProgressable reporter = null;
950 
951     protected AtomicLong skippedEdits = new AtomicLong();
952 
953     protected List<Path> splits = null;
954 
955     public OutputSink(PipelineController controller, EntryBuffers entryBuffers, int numWriters) {
956       numThreads = numWriters;
957       this.controller = controller;
958       this.entryBuffers = entryBuffers;
959     }
960 
961     void setReporter(CancelableProgressable reporter) {
962       this.reporter = reporter;
963     }
964 
965     /**
966      * Start the threads that will pump data from the entryBuffers to the output files.
967      */
968     public synchronized void startWriterThreads() {
969       for (int i = 0; i < numThreads; i++) {
970         WriterThread t = new WriterThread(controller, entryBuffers, this, i);
971         t.start();
972         writerThreads.add(t);
973       }
974     }
975 
976     /**
977      *
978      * Update region's maximum edit log SeqNum.
979      */
980     void updateRegionMaximumEditLogSeqNum(Entry entry) {
981       synchronized (regionMaximumEditLogSeqNum) {
982         Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(entry.getKey()
983             .getEncodedRegionName());
984         if (currentMaxSeqNum == null || entry.getKey().getLogSeqNum() > currentMaxSeqNum) {
985           regionMaximumEditLogSeqNum.put(entry.getKey().getEncodedRegionName(), entry.getKey()
986               .getLogSeqNum());
987         }
988       }
989     }
990 
991     Long getRegionMaximumEditLogSeqNum(byte[] region) {
992       return regionMaximumEditLogSeqNum.get(region);
993     }
994 
995     /**
996      * @return the number of currently opened writers
997      */
998     int getNumOpenWriters() {
999       return this.writers.size();
1000     }
1001 
1002     long getSkippedEdits() {
1003       return this.skippedEdits.get();
1004     }
1005 
1006     /**
1007      * Wait for writer threads to dump all info to the sink
1008      * @return true when there is no error
1009      * @throws IOException
1010      */
1011     protected boolean finishWriting() throws IOException {
1012       LOG.debug("Waiting for split writer threads to finish");
1013       boolean progress_failed = false;
1014       for (WriterThread t : writerThreads) {
1015         t.finish();
1016       }
1017       for (WriterThread t : writerThreads) {
1018         if (!progress_failed && reporter != null && !reporter.progress()) {
1019           progress_failed = true;
1020         }
1021         try {
1022           t.join();
1023         } catch (InterruptedException ie) {
1024           IOException iie = new InterruptedIOException();
1025           iie.initCause(ie);
1026           throw iie;
1027         }
1028       }
1029       controller.checkForErrors();
1030       LOG.info("Split writers finished");
1031       return (!progress_failed);
1032     }
1033 
1034     public abstract List<Path> finishWritingAndClose() throws IOException;
1035 
1036     /**
1037      * @return a map from encoded region ID to the number of edits written out for that region.
1038      */
1039     public abstract Map<byte[], Long> getOutputCounts();
1040 
1041     /**
1042      * @return number of regions we've recovered
1043      */
1044     public abstract int getNumberOfRecoveredRegions();
1045 
1046     /**
1047      * @param buffer A WAL Edit Entry
1048      * @throws IOException
1049      */
1050     public abstract void append(RegionEntryBuffer buffer) throws IOException;
1051 
1052     /**
1053      * WriterThread call this function to help flush internal remaining edits in buffer before close
1054      * @return true when underlying sink has something to flush
1055      */
1056     public boolean flush() throws IOException {
1057       return false;
1058     }
1059   }
1060 
1061   /**
1062    * Class that manages the output streams from the log splitting process.
1063    */
1064   class LogRecoveredEditsOutputSink extends OutputSink {
1065 
1066     public LogRecoveredEditsOutputSink(PipelineController controller, EntryBuffers entryBuffers,
1067         int numWriters) {
1068       // More threads could potentially write faster at the expense
1069       // of causing more disk seeks as the logs are split.
1070       // 3. After a certain setting (probably around 3) the
1071       // process will be bound on the reader in the current
1072       // implementation anyway.
1073       super(controller, entryBuffers, numWriters);
1074     }
1075 
1076     /**
1077      * @return null if failed to report progress
1078      * @throws IOException
1079      */
1080     @Override
1081     public List<Path> finishWritingAndClose() throws IOException {
1082       boolean isSuccessful = false;
1083       List<Path> result = null;
1084       try {
1085         isSuccessful = finishWriting();
1086       } finally {
1087         result = close();
1088         List<IOException> thrown = closeLogWriters(null);
1089         if (thrown != null && !thrown.isEmpty()) {
1090           throw MultipleIOException.createIOException(thrown);
1091         }
1092       }
1093       if (isSuccessful) {
1094         splits = result;
1095       }
1096       return splits;
1097     }
1098 
1099     /**
1100      * Close all of the output streams.
1101      * @return the list of paths written.
1102      */
1103     private List<Path> close() throws IOException {
1104       Preconditions.checkState(!closeAndCleanCompleted);
1105 
1106       final List<Path> paths = new ArrayList<Path>();
1107       final List<IOException> thrown = Lists.newArrayList();
1108       ThreadPoolExecutor closeThreadPool = Threads.getBoundedCachedThreadPool(numThreads, 30L,
1109         TimeUnit.SECONDS, new ThreadFactory() {
1110           private int count = 1;
1111 
1112           @Override
1113           public Thread newThread(Runnable r) {
1114             Thread t = new Thread(r, "split-log-closeStream-" + count++);
1115             return t;
1116           }
1117         });
1118       CompletionService<Void> completionService =
1119         new ExecutorCompletionService<Void>(closeThreadPool);
1120       for (final Map.Entry<byte[], SinkWriter> writersEntry : writers.entrySet()) {
1121         LOG.debug("Submitting close of " + ((WriterAndPath)writersEntry.getValue()).p);
1122         completionService.submit(new Callable<Void>() {
1123           @Override
1124           public Void call() throws Exception {
1125             WriterAndPath wap = (WriterAndPath) writersEntry.getValue();
1126             LOG.debug("Closing " + wap.p);
1127             try {
1128               wap.w.close();
1129             } catch (IOException ioe) {
1130               LOG.error("Couldn't close log at " + wap.p, ioe);
1131               thrown.add(ioe);
1132               return null;
1133             }
1134             LOG.info("Closed wap " + wap.p + " (wrote " + wap.editsWritten + " edits in "
1135                 + (wap.nanosSpent / 1000 / 1000) + "ms)");
1136 
1137             if (wap.editsWritten == 0) {
1138               // just remove the empty recovered.edits file
1139               if (fs.exists(wap.p) && !fs.delete(wap.p, false)) {
1140                 LOG.warn("Failed deleting empty " + wap.p);
1141                 throw new IOException("Failed deleting empty  " + wap.p);
1142               }
1143               return null;
1144             }
1145 
1146             Path dst = getCompletedRecoveredEditsFilePath(wap.p,
1147               regionMaximumEditLogSeqNum.get(writersEntry.getKey()));
1148             try {
1149               if (!dst.equals(wap.p) && fs.exists(dst)) {
1150                 LOG.warn("Found existing old edits file. It could be the "
1151                     + "result of a previous failed split attempt. Deleting " + dst + ", length="
1152                     + fs.getFileStatus(dst).getLen());
1153                 if (!fs.delete(dst, false)) {
1154                   LOG.warn("Failed deleting of old " + dst);
1155                   throw new IOException("Failed deleting of old " + dst);
1156                 }
1157               }
1158               // Skip the unit tests which create a splitter that reads and
1159               // writes the data without touching disk.
1160               // TestHLogSplit#testThreading is an example.
1161               if (fs.exists(wap.p)) {
1162                 if (!fs.rename(wap.p, dst)) {
1163                   throw new IOException("Failed renaming " + wap.p + " to " + dst);
1164                 }
1165                 LOG.info("Rename " + wap.p + " to " + dst);
1166               }
1167             } catch (IOException ioe) {
1168               LOG.error("Couldn't rename " + wap.p + " to " + dst, ioe);
1169               thrown.add(ioe);
1170               return null;
1171             }
1172             paths.add(dst);
1173             return null;
1174           }
1175         });
1176       }
1177 
1178       boolean progress_failed = false;
1179       try {
1180         for (int i = 0, n = this.writers.size(); i < n; i++) {
1181           Future<Void> future = completionService.take();
1182           future.get();
1183           if (!progress_failed && reporter != null && !reporter.progress()) {
1184             progress_failed = true;
1185           }
1186         }
1187       } catch (InterruptedException e) {
1188         IOException iie = new InterruptedIOException();
1189         iie.initCause(e);
1190         throw iie;
1191       } catch (ExecutionException e) {
1192         throw new IOException(e.getCause());
1193       } finally {
1194         closeThreadPool.shutdownNow();
1195       }
1196 
1197       if (!thrown.isEmpty()) {
1198         throw MultipleIOException.createIOException(thrown);
1199       }
1200       writersClosed = true;
1201       closeAndCleanCompleted = true;
1202       if (progress_failed) {
1203         return null;
1204       }
1205       return paths;
1206     }
1207 
1208     private List<IOException> closeLogWriters(List<IOException> thrown) throws IOException {
1209       if (writersClosed) {
1210         return thrown;
1211       }
1212 
1213       if (thrown == null) {
1214         thrown = Lists.newArrayList();
1215       }
1216       try {
1217         for (WriterThread t : writerThreads) {
1218           while (t.isAlive()) {
1219             t.shouldStop = true;
1220             t.interrupt();
1221             try {
1222               t.join(10);
1223             } catch (InterruptedException e) {
1224               IOException iie = new InterruptedIOException();
1225               iie.initCause(e);
1226               throw iie;
1227             }
1228           }
1229         }
1230       } finally {
1231         synchronized (writers) {
1232           WriterAndPath wap = null;
1233           for (SinkWriter tmpWAP : writers.values()) {
1234             try {
1235               wap = (WriterAndPath) tmpWAP;
1236               wap.w.close();
1237             } catch (IOException ioe) {
1238               LOG.error("Couldn't close log at " + wap.p, ioe);
1239               thrown.add(ioe);
1240               continue;
1241             }
1242             LOG.info("Closed log " + wap.p + " (wrote " + wap.editsWritten + " edits in "
1243                 + (wap.nanosSpent / 1000 / 1000) + "ms)");
1244           }
1245         }
1246         writersClosed = true;
1247       }
1248 
1249       return thrown;
1250     }
1251 
1252     /**
1253      * Get a writer and path for a log starting at the given entry. This function is threadsafe so
1254      * long as multiple threads are always acting on different regions.
1255      * @return null if this region shouldn't output any logs
1256      */
1257     private WriterAndPath getWriterAndPath(Entry entry) throws IOException {
1258       byte region[] = entry.getKey().getEncodedRegionName();
1259       WriterAndPath ret = (WriterAndPath) writers.get(region);
1260       if (ret != null) {
1261         return ret;
1262       }
1263       // If we already decided that this region doesn't get any output
1264       // we don't need to check again.
1265       if (blacklistedRegions.contains(region)) {
1266         return null;
1267       }
1268       ret = createWAP(region, entry, rootDir, fs, conf);
1269       if (ret == null) {
1270         blacklistedRegions.add(region);
1271         return null;
1272       }
1273       writers.put(region, ret);
1274       return ret;
1275     }
1276 
1277     private WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir, FileSystem fs,
1278         Configuration conf) throws IOException {
1279       Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, true);
1280       if (regionedits == null) {
1281         return null;
1282       }
1283       if (fs.exists(regionedits)) {
1284         LOG.warn("Found old edits file. It could be the "
1285             + "result of a previous failed split attempt. Deleting " + regionedits + ", length="
1286             + fs.getFileStatus(regionedits).getLen());
1287         if (!fs.delete(regionedits, false)) {
1288           LOG.warn("Failed delete of old " + regionedits);
1289         }
1290       }
1291       Writer w = createWriter(fs, regionedits, conf);
1292       LOG.info("Creating writer path=" + regionedits + " region=" + Bytes.toStringBinary(region));
1293       return (new WriterAndPath(regionedits, w));
1294     }
1295 
1296     @Override
1297     public void append(RegionEntryBuffer buffer) throws IOException {
1298       List<Entry> entries = buffer.entryBuffer;
1299       if (entries.isEmpty()) {
1300         LOG.warn("got an empty buffer, skipping");
1301         return;
1302       }
1303 
1304       WriterAndPath wap = null;
1305 
1306       long startTime = System.nanoTime();
1307       try {
1308         int editsCount = 0;
1309 
1310         for (Entry logEntry : entries) {
1311           if (wap == null) {
1312             wap = getWriterAndPath(logEntry);
1313             if (wap == null) {
1314               // getWriterAndPath decided we don't need to write these edits
1315               return;
1316             }
1317           }
1318           wap.w.append(logEntry);
1319           this.updateRegionMaximumEditLogSeqNum(logEntry);
1320           editsCount++;
1321         }
1322         // Pass along summary statistics
1323         wap.incrementEdits(editsCount);
1324         wap.incrementNanoTime(System.nanoTime() - startTime);
1325       } catch (IOException e) {
1326           e = e instanceof RemoteException ?
1327                   ((RemoteException)e).unwrapRemoteException() : e;
1328         LOG.fatal(" Got while writing log entry to log", e);
1329         throw e;
1330       }
1331     }
1332 
1333     /**
1334      * @return a map from encoded region ID to the number of edits written out for that region.
1335      */
1336     @Override
1337     public Map<byte[], Long> getOutputCounts() {
1338       TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
1339       synchronized (writers) {
1340         for (Map.Entry<byte[], SinkWriter> entry : writers.entrySet()) {
1341           ret.put(entry.getKey(), entry.getValue().editsWritten);
1342         }
1343       }
1344       return ret;
1345     }
1346 
1347     @Override
1348     public int getNumberOfRecoveredRegions() {
1349       return writers.size();
1350     }
1351   }
1352 
1353   /**
1354    * Class wraps the actual writer which writes data out and related statistics
1355    */
1356   public abstract static class SinkWriter {
1357     /* Count of edits written to this path */
1358     long editsWritten = 0;
1359     /* Number of nanos spent writing to this log */
1360     long nanosSpent = 0;
1361 
1362     void incrementEdits(int edits) {
1363       editsWritten += edits;
1364     }
1365 
1366     void incrementNanoTime(long nanos) {
1367       nanosSpent += nanos;
1368     }
1369   }
1370 
1371   /**
1372    * Private data structure that wraps a Writer and its Path, also collecting statistics about the
1373    * data written to this output.
1374    */
1375   private final static class WriterAndPath extends SinkWriter {
1376     final Path p;
1377     final Writer w;
1378 
1379     WriterAndPath(final Path p, final Writer w) {
1380       this.p = p;
1381       this.w = w;
1382     }
1383   }
1384 
1385   /**
1386    * Class that manages to replay edits from WAL files directly to assigned fail over region servers
1387    */
1388   class LogReplayOutputSink extends OutputSink {
1389     private static final double BUFFER_THRESHOLD = 0.35;
1390     private static final String KEY_DELIMITER = "#";
1391 
1392     private long waitRegionOnlineTimeOut;
1393     private final Set<String> recoveredRegions = Collections.synchronizedSet(new HashSet<String>());
1394     private final Map<String, RegionServerWriter> writers =
1395         new ConcurrentHashMap<String, RegionServerWriter>();
1396     // online encoded region name -> region location map
1397     private final Map<String, HRegionLocation> onlineRegions =
1398         new ConcurrentHashMap<String, HRegionLocation>();
1399 
1400     private Map<TableName, HConnection> tableNameToHConnectionMap = Collections
1401         .synchronizedMap(new TreeMap<TableName, HConnection>());
1402     /**
1403      * Map key -> value layout
1404      * <servername>:<table name> -> Queue<Row>
1405      */
1406     private Map<String, List<Pair<HRegionLocation, HLog.Entry>>> serverToBufferQueueMap =
1407         new ConcurrentHashMap<String, List<Pair<HRegionLocation, HLog.Entry>>>();
1408     private List<Throwable> thrown = new ArrayList<Throwable>();
1409 
1410     // The following sink is used in distrubitedLogReplay mode for entries of regions in a disabling
1411     // table. It's a limitation of distributedLogReplay. Because log replay needs a region is
1412     // assigned and online before it can replay wal edits while regions of disabling/disabled table
1413     // won't be assigned by AM. We can retire this code after HBASE-8234.
1414     private LogRecoveredEditsOutputSink logRecoveredEditsOutputSink;
1415     private boolean hasEditsInDisablingOrDisabledTables = false;
1416 
1417     public LogReplayOutputSink(PipelineController controller, EntryBuffers entryBuffers,
1418         int numWriters) {
1419       super(controller, entryBuffers, numWriters);
1420       this.waitRegionOnlineTimeOut = conf.getInt("hbase.splitlog.manager.timeout",
1421         SplitLogManager.DEFAULT_TIMEOUT);
1422       this.logRecoveredEditsOutputSink = new LogRecoveredEditsOutputSink(controller,
1423         entryBuffers, numWriters);
1424       this.logRecoveredEditsOutputSink.setReporter(reporter);
1425     }
1426 
1427     @Override
1428     public void append(RegionEntryBuffer buffer) throws IOException {
1429       List<Entry> entries = buffer.entryBuffer;
1430       if (entries.isEmpty()) {
1431         LOG.warn("got an empty buffer, skipping");
1432         return;
1433       }
1434 
1435       // check if current region in a disabling or disabled table
1436       if (disablingOrDisabledTables.contains(buffer.tableName)) {
1437         // need fall back to old way
1438         logRecoveredEditsOutputSink.append(buffer);
1439         hasEditsInDisablingOrDisabledTables = true;
1440         // store regions we have recovered so far
1441         addToRecoveredRegions(Bytes.toString(buffer.encodedRegionName));
1442         return;
1443       }
1444 
1445       // group entries by region servers
1446       groupEditsByServer(entries);
1447 
1448       // process workitems
1449       String maxLocKey = null;
1450       int maxSize = 0;
1451       List<Pair<HRegionLocation, HLog.Entry>> maxQueue = null;
1452       synchronized (this.serverToBufferQueueMap) {
1453         for (String key : this.serverToBufferQueueMap.keySet()) {
1454           List<Pair<HRegionLocation, HLog.Entry>> curQueue = this.serverToBufferQueueMap.get(key);
1455           if (curQueue.size() > maxSize) {
1456             maxSize = curQueue.size();
1457             maxQueue = curQueue;
1458             maxLocKey = key;
1459           }
1460         }
1461         if (maxSize < minBatchSize
1462             && entryBuffers.totalBuffered < BUFFER_THRESHOLD * entryBuffers.maxHeapUsage) {
1463           // buffer more to process
1464           return;
1465         } else if (maxSize > 0) {
1466           this.serverToBufferQueueMap.remove(maxLocKey);
1467         }
1468       }
1469 
1470       if (maxSize > 0) {
1471         processWorkItems(maxLocKey, maxQueue);
1472       }
1473     }
1474 
1475     private void addToRecoveredRegions(String encodedRegionName) {
1476       if (!recoveredRegions.contains(encodedRegionName)) {
1477         recoveredRegions.add(encodedRegionName);
1478       }
1479     }
1480 
1481     /**
1482      * Helper function to group WALEntries to individual region servers
1483      * @throws IOException
1484      */
1485     private void groupEditsByServer(List<Entry> entries) throws IOException {
1486       Set<TableName> nonExistentTables = null;
1487       Long cachedLastFlushedSequenceId = -1l;
1488       for (HLog.Entry entry : entries) {
1489         WALEdit edit = entry.getEdit();
1490         TableName table = entry.getKey().getTablename();
1491         // clear scopes which isn't needed for recovery
1492         entry.getKey().setScopes(null);
1493         String encodeRegionNameStr = Bytes.toString(entry.getKey().getEncodedRegionName());
1494         // skip edits of non-existent tables
1495         if (nonExistentTables != null && nonExistentTables.contains(table)) {
1496           this.skippedEdits.incrementAndGet();
1497           continue;
1498         }
1499 
1500         Map<byte[], Long> maxStoreSequenceIds = null;
1501         boolean needSkip = false;
1502         HRegionLocation loc = null;
1503         String locKey = null;
1504         List<KeyValue> kvs = edit.getKeyValues();
1505         List<KeyValue> skippedKVs = new ArrayList<KeyValue>();
1506         HConnection hconn = this.getConnectionByTableName(table);
1507 
1508         for (KeyValue kv : kvs) {
1509           byte[] row = kv.getRow();
1510           byte[] family = kv.getFamily();
1511           boolean isCompactionEntry = false;
1512           if (CellUtil.matchingFamily(kv, WALEdit.METAFAMILY)) {
1513             CompactionDescriptor compaction = WALEdit.getCompaction(kv);
1514             if (compaction != null && compaction.hasRegionName()) {
1515               try {
1516                 byte[][] regionName = HRegionInfo.parseRegionName(compaction.getRegionName()
1517                   .toByteArray());
1518                 row = regionName[1]; // startKey of the region
1519                 family = compaction.getFamilyName().toByteArray();
1520                 isCompactionEntry = true;
1521               } catch (Exception ex) {
1522                 LOG.warn("Unexpected exception received, ignoring " + ex);
1523                 skippedKVs.add(kv);
1524                 continue;
1525               }
1526             } else {
1527               skippedKVs.add(kv);
1528               continue;
1529             }
1530           }
1531 
1532           try {
1533             loc =
1534                 locateRegionAndRefreshLastFlushedSequenceId(hconn, table, row,
1535                   encodeRegionNameStr);
1536             // skip replaying the compaction if the region is gone
1537             if (isCompactionEntry && !encodeRegionNameStr.equalsIgnoreCase(
1538               loc.getRegionInfo().getEncodedName())) {
1539               LOG.info("Not replaying a compaction marker for an older region: "
1540                   + encodeRegionNameStr);
1541               needSkip = true;
1542             }
1543           } catch (TableNotFoundException ex) {
1544             // table has been deleted so skip edits of the table
1545             LOG.info("Table " + table + " doesn't exist. Skip log replay for region "
1546                 + encodeRegionNameStr);
1547             lastFlushedSequenceIds.put(encodeRegionNameStr, Long.MAX_VALUE);
1548             if (nonExistentTables == null) {
1549               nonExistentTables = new TreeSet<TableName>();
1550             }
1551             nonExistentTables.add(table);
1552             this.skippedEdits.incrementAndGet();
1553             needSkip = true;
1554             break;
1555           }
1556 
1557           cachedLastFlushedSequenceId =
1558               lastFlushedSequenceIds.get(loc.getRegionInfo().getEncodedName());
1559           if (cachedLastFlushedSequenceId != null
1560               && cachedLastFlushedSequenceId >= entry.getKey().getLogSeqNum()) {
1561             // skip the whole HLog entry
1562             this.skippedEdits.incrementAndGet();
1563             needSkip = true;
1564             break;
1565           } else {
1566             if (maxStoreSequenceIds == null) {
1567               maxStoreSequenceIds =
1568                   regionMaxSeqIdInStores.get(loc.getRegionInfo().getEncodedName());
1569             }
1570             if (maxStoreSequenceIds != null) {
1571               Long maxStoreSeqId = maxStoreSequenceIds.get(family);
1572               if (maxStoreSeqId == null || maxStoreSeqId >= entry.getKey().getLogSeqNum()) {
1573                 // skip current kv if column family doesn't exist anymore or already flushed
1574                 skippedKVs.add(kv);
1575                 continue;
1576               }
1577             }
1578           }
1579         }
1580 
1581         // skip the edit
1582         if (loc == null || needSkip) continue;
1583 
1584         if (!skippedKVs.isEmpty()) {
1585           kvs.removeAll(skippedKVs);
1586         }
1587 
1588         synchronized (serverToBufferQueueMap) {
1589           locKey = loc.getHostnamePort() + KEY_DELIMITER + table;
1590           List<Pair<HRegionLocation, HLog.Entry>> queue = serverToBufferQueueMap.get(locKey);
1591           if (queue == null) {
1592             queue =
1593                 Collections.synchronizedList(new ArrayList<Pair<HRegionLocation, HLog.Entry>>());
1594             serverToBufferQueueMap.put(locKey, queue);
1595           }
1596           queue.add(new Pair<HRegionLocation, HLog.Entry>(loc, entry));
1597         }
1598         // store regions we have recovered so far
1599         addToRecoveredRegions(loc.getRegionInfo().getEncodedName());
1600       }
1601     }
1602 
1603     /**
1604      * Locate destination region based on table name & row. This function also makes sure the
1605      * destination region is online for replay.
1606      * @throws IOException
1607      */
1608     private HRegionLocation locateRegionAndRefreshLastFlushedSequenceId(HConnection hconn,
1609         TableName table, byte[] row, String originalEncodedRegionName) throws IOException {
1610       // fetch location from cache
1611       HRegionLocation loc = onlineRegions.get(originalEncodedRegionName);
1612       if(loc != null) return loc;
1613       // fetch location from hbase:meta directly without using cache to avoid hit old dead server
1614       loc = hconn.getRegionLocation(table, row, true);
1615       if (loc == null) {
1616         throw new IOException("Can't locate location for row:" + Bytes.toString(row)
1617             + " of table:" + table);
1618       }
1619       // check if current row moves to a different region due to region merge/split
1620       if (!originalEncodedRegionName.equalsIgnoreCase(loc.getRegionInfo().getEncodedName())) {
1621         // originalEncodedRegionName should have already flushed
1622         lastFlushedSequenceIds.put(originalEncodedRegionName, Long.MAX_VALUE);
1623         HRegionLocation tmpLoc = onlineRegions.get(loc.getRegionInfo().getEncodedName());
1624         if (tmpLoc != null) return tmpLoc;
1625       }
1626 
1627       Long lastFlushedSequenceId = -1l;
1628       AtomicBoolean isRecovering = new AtomicBoolean(true);
1629       loc = waitUntilRegionOnline(loc, row, this.waitRegionOnlineTimeOut, isRecovering);
1630       if (!isRecovering.get()) {
1631         // region isn't in recovering at all because WAL file may contain a region that has
1632         // been moved to somewhere before hosting RS fails
1633         lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), Long.MAX_VALUE);
1634         LOG.info("logReplay skip region: " + loc.getRegionInfo().getEncodedName()
1635             + " because it's not in recovering.");
1636       } else {
1637         Long cachedLastFlushedSequenceId =
1638             lastFlushedSequenceIds.get(loc.getRegionInfo().getEncodedName());
1639 
1640         // retrieve last flushed sequence Id from ZK. Because region postOpenDeployTasks will
1641         // update the value for the region
1642         RegionStoreSequenceIds ids =
1643             SplitLogManager.getRegionFlushedSequenceId(watcher, failedServerName, loc
1644                 .getRegionInfo().getEncodedName());
1645         if (ids != null) {
1646           lastFlushedSequenceId = ids.getLastFlushedSequenceId();
1647           Map<byte[], Long> storeIds = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
1648           List<StoreSequenceId> maxSeqIdInStores = ids.getStoreSequenceIdList();
1649           for (StoreSequenceId id : maxSeqIdInStores) {
1650             storeIds.put(id.getFamilyName().toByteArray(), id.getSequenceId());
1651           }
1652           regionMaxSeqIdInStores.put(loc.getRegionInfo().getEncodedName(), storeIds);
1653         }
1654 
1655         if (cachedLastFlushedSequenceId == null
1656             || lastFlushedSequenceId > cachedLastFlushedSequenceId) {
1657           lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), lastFlushedSequenceId);
1658         }
1659       }
1660 
1661       onlineRegions.put(loc.getRegionInfo().getEncodedName(), loc);
1662       return loc;
1663     }
1664 
1665     private void processWorkItems(String key, List<Pair<HRegionLocation, HLog.Entry>> actions)
1666         throws IOException {
1667       RegionServerWriter rsw = null;
1668 
1669       long startTime = System.nanoTime();
1670       try {
1671         rsw = getRegionServerWriter(key);
1672         rsw.sink.replayEntries(actions);
1673 
1674         // Pass along summary statistics
1675         rsw.incrementEdits(actions.size());
1676         rsw.incrementNanoTime(System.nanoTime() - startTime);
1677       } catch (IOException e) {
1678         e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
1679         LOG.fatal(" Got while writing log entry to log", e);
1680         throw e;
1681       }
1682     }
1683 
1684     /**
1685      * Wait until region is online on the destination region server
1686      * @param loc
1687      * @param row
1688      * @param timeout How long to wait
1689      * @param isRecovering Recovering state of the region interested on destination region server.
1690      * @return True when region is online on the destination region server
1691      * @throws InterruptedException
1692      */
1693     private HRegionLocation waitUntilRegionOnline(HRegionLocation loc, byte[] row,
1694         final long timeout, AtomicBoolean isRecovering)
1695         throws IOException {
1696       final long endTime = EnvironmentEdgeManager.currentTimeMillis() + timeout;
1697       final long pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
1698         HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
1699       boolean reloadLocation = false;
1700       TableName tableName = loc.getRegionInfo().getTable();
1701       int tries = 0;
1702       Throwable cause = null;
1703       while (endTime > EnvironmentEdgeManager.currentTimeMillis()) {
1704         try {
1705           // Try and get regioninfo from the hosting server.
1706           HConnection hconn = getConnectionByTableName(tableName);
1707           if(reloadLocation) {
1708             loc = hconn.getRegionLocation(tableName, row, true);
1709           }
1710           BlockingInterface remoteSvr = hconn.getAdmin(loc.getServerName());
1711           HRegionInfo region = loc.getRegionInfo();
1712           try {
1713             GetRegionInfoRequest request =
1714                 RequestConverter.buildGetRegionInfoRequest(region.getRegionName());
1715             GetRegionInfoResponse response = remoteSvr.getRegionInfo(null, request);
1716             if (HRegionInfo.convert(response.getRegionInfo()) != null) {
1717               isRecovering.set((response.hasIsRecovering()) ? response.getIsRecovering() : true);
1718               return loc;
1719             }
1720           } catch (ServiceException se) {
1721             throw ProtobufUtil.getRemoteException(se);
1722           }
1723         } catch (IOException e) {
1724           cause = e.getCause();
1725           if(!(cause instanceof RegionOpeningException)) {
1726             reloadLocation = true;
1727           }
1728         }
1729         long expectedSleep = ConnectionUtils.getPauseTime(pause, tries);
1730         try {
1731           Thread.sleep(expectedSleep);
1732         } catch (InterruptedException e) {
1733           throw new IOException("Interrupted when waiting region " +
1734               loc.getRegionInfo().getEncodedName() + " online.", e);
1735         }
1736         tries++;
1737       }
1738 
1739       throw new IOException("Timeout when waiting region " + loc.getRegionInfo().getEncodedName() +
1740         " online for " + timeout + " milliseconds.", cause);
1741     }
1742 
1743     @Override
1744     public boolean flush() throws IOException {
1745       String curLoc = null;
1746       int curSize = 0;
1747       List<Pair<HRegionLocation, HLog.Entry>> curQueue = null;
1748       synchronized (this.serverToBufferQueueMap) {
1749         for (String locationKey : this.serverToBufferQueueMap.keySet()) {
1750           curQueue = this.serverToBufferQueueMap.get(locationKey);
1751           if (!curQueue.isEmpty()) {
1752             curSize = curQueue.size();
1753             curLoc = locationKey;
1754             break;
1755           }
1756         }
1757         if (curSize > 0) {
1758           this.serverToBufferQueueMap.remove(curLoc);
1759         }
1760       }
1761 
1762       if (curSize > 0) {
1763         this.processWorkItems(curLoc, curQueue);
1764         controller.dataAvailable.notifyAll();
1765         return true;
1766       }
1767       return false;
1768     }
1769 
1770     void addWriterError(Throwable t) {
1771       thrown.add(t);
1772     }
1773 
1774     @Override
1775     public List<Path> finishWritingAndClose() throws IOException {
1776       try {
1777         if (!finishWriting()) {
1778           return null;
1779         }
1780         if (hasEditsInDisablingOrDisabledTables) {
1781           splits = logRecoveredEditsOutputSink.finishWritingAndClose();
1782         } else {
1783           splits = new ArrayList<Path>();
1784         }
1785         // returns an empty array in order to keep interface same as old way
1786         return splits;
1787       } finally {
1788         List<IOException> thrown = closeRegionServerWriters();
1789         if (thrown != null && !thrown.isEmpty()) {
1790           throw MultipleIOException.createIOException(thrown);
1791         }
1792       }
1793     }
1794 
1795     @Override
1796     int getNumOpenWriters() {
1797       return this.writers.size() + this.logRecoveredEditsOutputSink.getNumOpenWriters();
1798     }
1799 
1800     private List<IOException> closeRegionServerWriters() throws IOException {
1801       List<IOException> result = null;
1802       if (!writersClosed) {
1803         result = Lists.newArrayList();
1804         try {
1805           for (WriterThread t : writerThreads) {
1806             while (t.isAlive()) {
1807               t.shouldStop = true;
1808               t.interrupt();
1809               try {
1810                 t.join(10);
1811               } catch (InterruptedException e) {
1812                 IOException iie = new InterruptedIOException();
1813                 iie.initCause(e);
1814                 throw iie;
1815               }
1816             }
1817           }
1818         } finally {
1819           synchronized (writers) {
1820             for (String locationKey : writers.keySet()) {
1821               RegionServerWriter tmpW = writers.get(locationKey);
1822               try {
1823                 tmpW.close();
1824               } catch (IOException ioe) {
1825                 LOG.error("Couldn't close writer for region server:" + locationKey, ioe);
1826                 result.add(ioe);
1827               }
1828             }
1829           }
1830 
1831           // close connections
1832           synchronized (this.tableNameToHConnectionMap) {
1833             for (TableName tableName : this.tableNameToHConnectionMap.keySet()) {
1834               HConnection hconn = this.tableNameToHConnectionMap.get(tableName);
1835               try {
1836                 hconn.clearRegionCache();
1837                 hconn.close();
1838               } catch (IOException ioe) {
1839                 result.add(ioe);
1840               }
1841             }
1842           }
1843           writersClosed = true;
1844         }
1845       }
1846       return result;
1847     }
1848 
1849     @Override
1850     public Map<byte[], Long> getOutputCounts() {
1851       TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
1852       synchronized (writers) {
1853         for (Map.Entry<String, RegionServerWriter> entry : writers.entrySet()) {
1854           ret.put(Bytes.toBytes(entry.getKey()), entry.getValue().editsWritten);
1855         }
1856       }
1857       return ret;
1858     }
1859 
1860     @Override
1861     public int getNumberOfRecoveredRegions() {
1862       return this.recoveredRegions.size();
1863     }
1864 
1865     /**
1866      * Get a writer and path for a log starting at the given entry. This function is threadsafe so
1867      * long as multiple threads are always acting on different regions.
1868      * @return null if this region shouldn't output any logs
1869      */
1870     private RegionServerWriter getRegionServerWriter(String loc) throws IOException {
1871       RegionServerWriter ret = writers.get(loc);
1872       if (ret != null) {
1873         return ret;
1874       }
1875 
1876       TableName tableName = getTableFromLocationStr(loc);
1877       if(tableName == null){
1878         throw new IOException("Invalid location string:" + loc + " found. Replay aborted.");
1879       }
1880 
1881       HConnection hconn = getConnectionByTableName(tableName);
1882       synchronized (writers) {
1883         ret = writers.get(loc);
1884         if (ret == null) {
1885           ret = new RegionServerWriter(conf, tableName, hconn);
1886           writers.put(loc, ret);
1887         }
1888       }
1889       return ret;
1890     }
1891 
1892     private HConnection getConnectionByTableName(final TableName tableName) throws IOException {
1893       HConnection hconn = this.tableNameToHConnectionMap.get(tableName);
1894       if (hconn == null) {
1895         synchronized (this.tableNameToHConnectionMap) {
1896           hconn = this.tableNameToHConnectionMap.get(tableName);
1897           if (hconn == null) {
1898             hconn = HConnectionManager.getConnection(conf);
1899             this.tableNameToHConnectionMap.put(tableName, hconn);
1900           }
1901         }
1902       }
1903       return hconn;
1904     }
1905     private TableName getTableFromLocationStr(String loc) {
1906       /**
1907        * location key is in format <server name:port>#<table name>
1908        */
1909       String[] splits = loc.split(KEY_DELIMITER);
1910       if (splits.length != 2) {
1911         return null;
1912       }
1913       return TableName.valueOf(splits[1]);
1914     }
1915   }
1916 
1917   /**
1918    * Private data structure that wraps a receiving RS and collecting statistics about the data
1919    * written to this newly assigned RS.
1920    */
1921   private final static class RegionServerWriter extends SinkWriter {
1922     final WALEditsReplaySink sink;
1923 
1924     RegionServerWriter(final Configuration conf, final TableName tableName, final HConnection conn)
1925         throws IOException {
1926       this.sink = new WALEditsReplaySink(conf, tableName, conn);
1927     }
1928 
1929     void close() throws IOException {
1930     }
1931   }
1932 
1933   static class CorruptedLogFileException extends Exception {
1934     private static final long serialVersionUID = 1L;
1935 
1936     CorruptedLogFileException(String s) {
1937       super(s);
1938     }
1939   }
1940 
1941   /** A struct used by getMutationsFromWALEntry */
1942   public static class MutationReplay {
1943     public MutationReplay(MutationType type, Mutation mutation, long nonceGroup, long nonce) {
1944       this.type = type;
1945       this.mutation = mutation;
1946       if(this.mutation.getDurability() != Durability.SKIP_WAL) {
1947         // using ASYNC_WAL for relay
1948         this.mutation.setDurability(Durability.ASYNC_WAL);
1949       }
1950       this.nonceGroup = nonceGroup;
1951       this.nonce = nonce;
1952     }
1953 
1954     public final MutationType type;
1955     public final Mutation mutation;
1956     public final long nonceGroup;
1957     public final long nonce;
1958   }
1959 
1960  /**
1961   * Tag original sequence number for each edit to be replayed
1962   * @param seqId
1963   * @param cell
1964   */
1965   private static Cell tagReplayLogSequenceNumber(long seqId, Cell cell) {
1966     // Tag puts with original sequence number if there is no LOG_REPLAY_TAG yet
1967     boolean needAddRecoveryTag = true;
1968     if (cell.getTagsLength() > 0) {
1969       Tag tmpTag = Tag.getTag(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength(),
1970         TagType.LOG_REPLAY_TAG_TYPE);
1971       if (tmpTag != null) {
1972         // found an existing log replay tag so reuse it
1973         needAddRecoveryTag = false;
1974       }
1975     }
1976     if (needAddRecoveryTag) {
1977       List<Tag> newTags = new ArrayList<Tag>();
1978       Tag replayTag = new Tag(TagType.LOG_REPLAY_TAG_TYPE, Bytes.toBytes(seqId));
1979       newTags.add(replayTag);
1980       return KeyValue.cloneAndAddTags(cell, newTags);
1981     }
1982     return cell;
1983   }
1984 
1985   /**
1986    * This function is used to construct mutations from a WALEntry. It also reconstructs HLogKey &
1987    * WALEdit from the passed in WALEntry
1988    * @param entry
1989    * @param cells
1990    * @param logEntry pair of HLogKey and WALEdit instance stores HLogKey and WALEdit instances
1991    *          extracted from the passed in WALEntry.
1992    * @param addLogReplayTag
1993    * @return list of Pair<MutationType, Mutation> to be replayed
1994    * @throws IOException
1995    */
1996   public static List<MutationReplay> getMutationsFromWALEntry(WALEntry entry, CellScanner cells,
1997       Pair<HLogKey, WALEdit> logEntry, boolean addLogReplayTag, Durability durability)
1998           throws IOException {
1999 
2000     if (entry == null) {
2001       // return an empty array
2002       return new ArrayList<MutationReplay>();
2003     }
2004 
2005     long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ?
2006       entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber();
2007     int count = entry.getAssociatedCellCount();
2008     List<MutationReplay> mutations = new ArrayList<MutationReplay>();
2009     Cell previousCell = null;
2010     Mutation m = null;
2011     HLogKey key = null;
2012     WALEdit val = null;
2013     if (logEntry != null) val = new WALEdit();
2014 
2015     for (int i = 0; i < count; i++) {
2016       // Throw index out of bounds if our cell count is off
2017       if (!cells.advance()) {
2018         throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
2019       }
2020       Cell cell = cells.current();
2021       if (val != null) val.add(KeyValueUtil.ensureKeyValue(cell));
2022 
2023       boolean isNewRowOrType =
2024           previousCell == null || previousCell.getTypeByte() != cell.getTypeByte()
2025               || !CellUtil.matchingRow(previousCell, cell);
2026       if (isNewRowOrType) {
2027         // Create new mutation
2028         if (CellUtil.isDelete(cell)) {
2029           m = new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
2030           // Deletes don't have nonces.
2031           mutations.add(new MutationReplay(
2032               MutationType.DELETE, m, HConstants.NO_NONCE, HConstants.NO_NONCE));
2033         } else {
2034           m = new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
2035           // Puts might come from increment or append, thus we need nonces.
2036           long nonceGroup = entry.getKey().hasNonceGroup()
2037               ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
2038           long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
2039           mutations.add(new MutationReplay(MutationType.PUT, m, nonceGroup, nonce));
2040         }
2041       }
2042       if (CellUtil.isDelete(cell)) {
2043         ((Delete) m).addDeleteMarker(KeyValueUtil.ensureKeyValue(cell));
2044       } else {
2045         Cell tmpNewCell = cell;
2046         if (addLogReplayTag) {
2047           tmpNewCell = tagReplayLogSequenceNumber(replaySeqId, cell);
2048         }
2049         ((Put) m).add(KeyValueUtil.ensureKeyValue(tmpNewCell));
2050       }
2051       if (m != null) {
2052         m.setDurability(durability);
2053       }
2054       previousCell = cell;
2055     }
2056 
2057     // reconstruct HLogKey
2058     if (logEntry != null) {
2059       WALKey walKey = entry.getKey();
2060       List<UUID> clusterIds = new ArrayList<UUID>(walKey.getClusterIdsCount());
2061       for (HBaseProtos.UUID uuid : entry.getKey().getClusterIdsList()) {
2062         clusterIds.add(new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits()));
2063       }
2064       key = new HLogKey(walKey.getEncodedRegionName().toByteArray(), TableName.valueOf(walKey
2065               .getTableName().toByteArray()), replaySeqId, walKey.getWriteTime(), clusterIds,
2066               walKey.getNonceGroup(), walKey.getNonce());
2067       logEntry.setFirst(key);
2068       logEntry.setSecond(val);
2069     }
2070 
2071     return mutations;
2072   }
2073 }