View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver.wal;
20  
21  import java.io.EOFException;
22  import java.io.FileNotFoundException;
23  import java.io.IOException;
24  import java.io.InterruptedIOException;
25  import java.text.ParseException;
26  import java.util.ArrayList;
27  import java.util.Collections;
28  import java.util.HashSet;
29  import java.util.LinkedList;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.Set;
33  import java.util.TreeMap;
34  import java.util.TreeSet;
35  import java.util.UUID;
36  import java.util.concurrent.Callable;
37  import java.util.concurrent.CompletionService;
38  import java.util.concurrent.ConcurrentHashMap;
39  import java.util.concurrent.ExecutionException;
40  import java.util.concurrent.ExecutorCompletionService;
41  import java.util.concurrent.Future;
42  import java.util.concurrent.ThreadFactory;
43  import java.util.concurrent.ThreadPoolExecutor;
44  import java.util.concurrent.TimeUnit;
45  import java.util.concurrent.atomic.AtomicBoolean;
46  import java.util.concurrent.atomic.AtomicLong;
47  import java.util.concurrent.atomic.AtomicReference;
48  
49  import com.google.common.base.Preconditions;
50  import com.google.common.collect.Lists;
51  import com.google.protobuf.ServiceException;
52  import org.apache.commons.logging.Log;
53  import org.apache.commons.logging.LogFactory;
54  import org.apache.hadoop.classification.InterfaceAudience;
55  import org.apache.hadoop.conf.Configuration;
56  import org.apache.hadoop.fs.FileStatus;
57  import org.apache.hadoop.fs.FileSystem;
58  import org.apache.hadoop.fs.Path;
59  import org.apache.hadoop.hbase.Cell;
60  import org.apache.hadoop.hbase.CellScanner;
61  import org.apache.hadoop.hbase.CellUtil;
62  import org.apache.hadoop.hbase.CoordinatedStateManager;
63  import org.apache.hadoop.hbase.HBaseConfiguration;
64  import org.apache.hadoop.hbase.HConstants;
65  import org.apache.hadoop.hbase.HRegionInfo;
66  import org.apache.hadoop.hbase.HRegionLocation;
67  import org.apache.hadoop.hbase.KeyValue;
68  import org.apache.hadoop.hbase.ServerName;
69  import org.apache.hadoop.hbase.TableName;
70  import org.apache.hadoop.hbase.TableNotFoundException;
71  import org.apache.hadoop.hbase.Tag;
72  import org.apache.hadoop.hbase.TagType;
73  import org.apache.hadoop.hbase.client.ConnectionUtils;
74  import org.apache.hadoop.hbase.client.Delete;
75  import org.apache.hadoop.hbase.client.Durability;
76  import org.apache.hadoop.hbase.client.HConnection;
77  import org.apache.hadoop.hbase.client.HConnectionManager;
78  import org.apache.hadoop.hbase.client.Mutation;
79  import org.apache.hadoop.hbase.client.Put;
80  import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
81  import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
82  import org.apache.hadoop.hbase.client.TableState;
83  import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
84  import org.apache.hadoop.hbase.io.HeapSize;
85  import org.apache.hadoop.hbase.monitoring.MonitoredTask;
86  import org.apache.hadoop.hbase.monitoring.TaskMonitor;
87  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
88  import org.apache.hadoop.hbase.protobuf.RequestConverter;
89  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
90  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest;
91  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse;
92  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
93  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
94  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
95  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
96  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey;
97  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds;
98  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
99  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.StoreSequenceId;
100 import org.apache.hadoop.hbase.regionserver.HRegion;
101 import org.apache.hadoop.hbase.regionserver.LastSequenceId;
102 import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
103 import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
104 import org.apache.hadoop.hbase.regionserver.wal.HLog.Writer;
105 import org.apache.hadoop.hbase.util.Bytes;
106 import org.apache.hadoop.hbase.util.CancelableProgressable;
107 import org.apache.hadoop.hbase.util.ClassSize;
108 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
109 import org.apache.hadoop.hbase.util.FSUtils;
110 import org.apache.hadoop.hbase.util.Pair;
111 import org.apache.hadoop.hbase.util.Threads;
112 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
113 import org.apache.hadoop.io.MultipleIOException;
114 import org.apache.hadoop.ipc.RemoteException;
115 
116 /**
117  * This class is responsible for splitting up a bunch of regionserver commit log
118  * files that are no longer being written to, into new files, one per region for
119  * region to replay on startup. Delete the old log files when finished.
120  */
121 @InterfaceAudience.Private
122 public class HLogSplitter {
123   static final Log LOG = LogFactory.getLog(HLogSplitter.class);
124 
125   // Parameters for split process
126   protected final Path rootDir;
127   protected final FileSystem fs;
128   protected final Configuration conf;
129 
130   // Major subcomponents of the split process.
131   // These are separated into inner classes to make testing easier.
132   PipelineController controller;
133   OutputSink outputSink;
134   EntryBuffers entryBuffers;
135 
136   private Set<TableName> disablingOrDisabledTables =
137       new HashSet<TableName>();
138   private BaseCoordinatedStateManager csm;
139 
140   private MonitoredTask status;
141 
142   // For checking the latest flushed sequence id
143   protected final LastSequenceId sequenceIdChecker;
144 
145   protected boolean distributedLogReplay;
146 
147   // Map encodedRegionName -> lastFlushedSequenceId
148   protected Map<String, Long> lastFlushedSequenceIds = new ConcurrentHashMap<String, Long>();
149 
150   // Map encodedRegionName -> maxSeqIdInStores
151   protected Map<String, Map<byte[], Long>> regionMaxSeqIdInStores =
152       new ConcurrentHashMap<String, Map<byte[], Long>>();
153 
154   // Failed region server that the wal file being split belongs to
155   protected String failedServerName = "";
156 
157   // Number of writer threads
158   private final int numWriterThreads;
159 
160   // Min batch size when replay WAL edits
161   private final int minBatchSize;
162 
163   HLogSplitter(Configuration conf, Path rootDir,
164       FileSystem fs, LastSequenceId idChecker,
165       CoordinatedStateManager csm, RecoveryMode mode) {
166     this.conf = HBaseConfiguration.create(conf);
167     String codecClassName = conf
168         .get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
169     this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName);
170     this.rootDir = rootDir;
171     this.fs = fs;
172     this.sequenceIdChecker = idChecker;
173     this.csm = (BaseCoordinatedStateManager)csm;
174     this.controller = new PipelineController();
175 
176     entryBuffers = new EntryBuffers(controller,
177         this.conf.getInt("hbase.regionserver.hlog.splitlog.buffersize",
178             128*1024*1024));
179 
180     // a larger minBatchSize may slow down recovery because replay writer has to wait for
181     // enough edits before replaying them
182     this.minBatchSize = this.conf.getInt("hbase.regionserver.wal.logreplay.batch.size", 64);
183     this.distributedLogReplay = (RecoveryMode.LOG_REPLAY == mode);
184 
185     this.numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
186     if (csm != null && this.distributedLogReplay) {
187       outputSink = new LogReplayOutputSink(controller, entryBuffers, numWriterThreads);
188     } else {
189       if (this.distributedLogReplay) {
190         LOG.info("ZooKeeperWatcher is passed in as NULL so disable distrubitedLogRepaly.");
191       }
192       this.distributedLogReplay = false;
193       outputSink = new LogRecoveredEditsOutputSink(controller, entryBuffers, numWriterThreads);
194     }
195 
196   }
197 
198   /**
199    * Splits a HLog file into region's recovered-edits directory.
200    * This is the main entry point for distributed log splitting from SplitLogWorker.
201    * <p>
202    * If the log file has N regions then N recovered.edits files will be produced.
203    * <p>
204    * @param rootDir
205    * @param logfile
206    * @param fs
207    * @param conf
208    * @param reporter
209    * @param idChecker
210    * @param cp coordination state manager
211    * @return false if it is interrupted by the progress-able.
212    * @throws IOException
213    */
214   public static boolean splitLogFile(Path rootDir, FileStatus logfile, FileSystem fs,
215       Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker,
216       CoordinatedStateManager cp, RecoveryMode mode) throws IOException {
217     HLogSplitter s = new HLogSplitter(conf, rootDir, fs, idChecker, cp, mode);
218     return s.splitLogFile(logfile, reporter);
219   }
220 
221   // A wrapper to split one log folder using the method used by distributed
222   // log splitting. Used by tools and unit tests. It should be package private.
223   // It is public only because TestWALObserver is in a different package,
224   // which uses this method to to log splitting.
225   public static List<Path> split(Path rootDir, Path logDir, Path oldLogDir,
226       FileSystem fs, Configuration conf) throws IOException {
227     FileStatus[] logfiles = fs.listStatus(logDir);
228     List<Path> splits = new ArrayList<Path>();
229     if (logfiles != null && logfiles.length > 0) {
230       for (FileStatus logfile: logfiles) {
231         HLogSplitter s =
232             new HLogSplitter(conf, rootDir, fs, null, null, RecoveryMode.LOG_SPLITTING);
233         if (s.splitLogFile(logfile, null)) {
234           finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf);
235           if (s.outputSink.splits != null) {
236             splits.addAll(s.outputSink.splits);
237           }
238         }
239       }
240     }
241     if (!fs.delete(logDir, true)) {
242       throw new IOException("Unable to delete src dir: " + logDir);
243     }
244     return splits;
245   }
246 
247   // The real log splitter. It just splits one log file.
248   boolean splitLogFile(FileStatus logfile,
249       CancelableProgressable reporter) throws IOException {
250     boolean isCorrupted = false;
251     Preconditions.checkState(status == null);
252     boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors",
253       HLog.SPLIT_SKIP_ERRORS_DEFAULT);
254     int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024);
255     Path logPath = logfile.getPath();
256     boolean outputSinkStarted = false;
257     boolean progress_failed = false;
258     int editsCount = 0;
259     int editsSkipped = 0;
260 
261     status =
262         TaskMonitor.get().createStatus(
263           "Splitting log file " + logfile.getPath() + "into a temporary staging area.");
264     try {
265       long logLength = logfile.getLen();
266       LOG.info("Splitting hlog: " + logPath + ", length=" + logLength);
267       LOG.info("DistributedLogReplay = " + this.distributedLogReplay);
268       status.setStatus("Opening log file");
269       if (reporter != null && !reporter.progress()) {
270         progress_failed = true;
271         return false;
272       }
273       Reader in = null;
274       try {
275         in = getReader(fs, logfile, conf, skipErrors, reporter);
276       } catch (CorruptedLogFileException e) {
277         LOG.warn("Could not get reader, corrupted log file " + logPath, e);
278         ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs);
279         isCorrupted = true;
280       }
281       if (in == null) {
282         LOG.warn("Nothing to split in log file " + logPath);
283         return true;
284       }
285       if(csm != null) {
286         HConnection scc = csm.getServer().getShortCircuitConnection();
287         TableName[] tables = scc.listTableNames();
288         for (TableName table : tables) {
289           if (scc.getTableState(table)
290               .inStates(TableState.State.DISABLED, TableState.State.DISABLING)) {
291             disablingOrDisabledTables.add(table);
292           }
293         }
294       }
295       int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3);
296       int numOpenedFilesLastCheck = 0;
297       outputSink.setReporter(reporter);
298       outputSink.startWriterThreads();
299       outputSinkStarted = true;
300       Entry entry;
301       Long lastFlushedSequenceId = -1L;
302       ServerName serverName = HLogUtil.getServerNameFromHLogDirectoryName(logPath);
303       failedServerName = (serverName == null) ? "" : serverName.getServerName();
304       while ((entry = getNextLogLine(in, logPath, skipErrors)) != null) {
305         byte[] region = entry.getKey().getEncodedRegionName();
306         String key = Bytes.toString(region);
307         lastFlushedSequenceId = lastFlushedSequenceIds.get(key);
308         if (lastFlushedSequenceId == null) {
309           if (this.distributedLogReplay) {
310             RegionStoreSequenceIds ids =
311                 csm.getSplitLogWorkerCoordination().getRegionFlushedSequenceId(failedServerName,
312                   key);
313             if (ids != null) {
314               lastFlushedSequenceId = ids.getLastFlushedSequenceId();
315             }
316           } else if (sequenceIdChecker != null) {
317             lastFlushedSequenceId = sequenceIdChecker.getLastSequenceId(region);
318           }
319           if (lastFlushedSequenceId == null) {
320             lastFlushedSequenceId = -1L;
321           }
322           lastFlushedSequenceIds.put(key, lastFlushedSequenceId);
323         }
324         if (lastFlushedSequenceId >= entry.getKey().getLogSeqNum()) {
325           editsSkipped++;
326           continue;
327         }
328         entryBuffers.appendEntry(entry);
329         editsCount++;
330         int moreWritersFromLastCheck = this.getNumOpenWriters() - numOpenedFilesLastCheck;
331         // If sufficient edits have passed, check if we should report progress.
332         if (editsCount % interval == 0
333             || moreWritersFromLastCheck > numOpenedFilesBeforeReporting) {
334           numOpenedFilesLastCheck = this.getNumOpenWriters();
335           String countsStr = (editsCount - (editsSkipped + outputSink.getSkippedEdits()))
336               + " edits, skipped " + editsSkipped + " edits.";
337           status.setStatus("Split " + countsStr);
338           if (reporter != null && !reporter.progress()) {
339             progress_failed = true;
340             return false;
341           }
342         }
343       }
344     } catch (InterruptedException ie) {
345       IOException iie = new InterruptedIOException();
346       iie.initCause(ie);
347       throw iie;
348     } catch (CorruptedLogFileException e) {
349       LOG.warn("Could not parse, corrupted log file " + logPath, e);
350       csm.getSplitLogWorkerCoordination().markCorrupted(rootDir,
351         logfile.getPath().getName(), fs);
352       isCorrupted = true;
353     } catch (IOException e) {
354       e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
355       throw e;
356     } finally {
357       LOG.debug("Finishing writing output logs and closing down.");
358       try {
359         if (outputSinkStarted) {
360           // Set progress_failed to true as the immediate following statement will reset its value
361           // when finishWritingAndClose() throws exception, progress_failed has the right value
362           progress_failed = true;
363           progress_failed = outputSink.finishWritingAndClose() == null;
364         }
365       } finally {
366         String msg =
367             "Processed " + editsCount + " edits across " + outputSink.getNumberOfRecoveredRegions()
368                 + " regions; log file=" + logPath + " is corrupted = " + isCorrupted
369                 + " progress failed = " + progress_failed;
370         LOG.info(msg);
371         status.markComplete(msg);
372       }
373     }
374     return !progress_failed;
375   }
376 
377   /**
378    * Completes the work done by splitLogFile by archiving logs
379    * <p>
380    * It is invoked by SplitLogManager once it knows that one of the
381    * SplitLogWorkers have completed the splitLogFile() part. If the master
382    * crashes then this function might get called multiple times.
383    * <p>
384    * @param logfile
385    * @param conf
386    * @throws IOException
387    */
388   public static void finishSplitLogFile(String logfile,
389       Configuration conf)  throws IOException {
390     Path rootdir = FSUtils.getRootDir(conf);
391     Path oldLogDir = new Path(rootdir, HConstants.HREGION_OLDLOGDIR_NAME);
392     Path logPath;
393     if (FSUtils.isStartingWithPath(rootdir, logfile)) {
394       logPath = new Path(logfile);
395     } else {
396       logPath = new Path(rootdir, logfile);
397     }
398     finishSplitLogFile(rootdir, oldLogDir, logPath, conf);
399   }
400 
401   static void finishSplitLogFile(Path rootdir, Path oldLogDir,
402       Path logPath, Configuration conf) throws IOException {
403     List<Path> processedLogs = new ArrayList<Path>();
404     List<Path> corruptedLogs = new ArrayList<Path>();
405     FileSystem fs;
406     fs = rootdir.getFileSystem(conf);
407     if (ZKSplitLog.isCorrupted(rootdir, logPath.getName(), fs)) {
408       corruptedLogs.add(logPath);
409     } else {
410       processedLogs.add(logPath);
411     }
412     archiveLogs(corruptedLogs, processedLogs, oldLogDir, fs, conf);
413     Path stagingDir = ZKSplitLog.getSplitLogDir(rootdir, logPath.getName());
414     fs.delete(stagingDir, true);
415   }
416 
417   /**
418    * Moves processed logs to a oldLogDir after successful processing Moves
419    * corrupted logs (any log that couldn't be successfully parsed to corruptDir
420    * (.corrupt) for later investigation
421    *
422    * @param corruptedLogs
423    * @param processedLogs
424    * @param oldLogDir
425    * @param fs
426    * @param conf
427    * @throws IOException
428    */
429   private static void archiveLogs(
430       final List<Path> corruptedLogs,
431       final List<Path> processedLogs, final Path oldLogDir,
432       final FileSystem fs, final Configuration conf) throws IOException {
433     final Path corruptDir = new Path(FSUtils.getRootDir(conf), conf.get(
434         "hbase.regionserver.hlog.splitlog.corrupt.dir",  HConstants.CORRUPT_DIR_NAME));
435 
436     if (!fs.mkdirs(corruptDir)) {
437       LOG.info("Unable to mkdir " + corruptDir);
438     }
439     fs.mkdirs(oldLogDir);
440 
441     // this method can get restarted or called multiple times for archiving
442     // the same log files.
443     for (Path corrupted : corruptedLogs) {
444       Path p = new Path(corruptDir, corrupted.getName());
445       if (fs.exists(corrupted)) {
446         if (!fs.rename(corrupted, p)) {
447           LOG.warn("Unable to move corrupted log " + corrupted + " to " + p);
448         } else {
449           LOG.warn("Moved corrupted log " + corrupted + " to " + p);
450         }
451       }
452     }
453 
454     for (Path p : processedLogs) {
455       Path newPath = FSHLog.getHLogArchivePath(oldLogDir, p);
456       if (fs.exists(p)) {
457         if (!FSUtils.renameAndSetModifyTime(fs, p, newPath)) {
458           LOG.warn("Unable to move  " + p + " to " + newPath);
459         } else {
460           LOG.info("Archived processed log " + p + " to " + newPath);
461         }
462       }
463     }
464   }
465 
466   /**
467    * Path to a file under RECOVERED_EDITS_DIR directory of the region found in
468    * <code>logEntry</code> named for the sequenceid in the passed
469    * <code>logEntry</code>: e.g. /hbase/some_table/2323432434/recovered.edits/2332.
470    * This method also ensures existence of RECOVERED_EDITS_DIR under the region
471    * creating it if necessary.
472    * @param fs
473    * @param logEntry
474    * @param rootDir HBase root dir.
475    * @return Path to file into which to dump split log edits.
476    * @throws IOException
477    */
478   @SuppressWarnings("deprecation")
479   static Path getRegionSplitEditsPath(final FileSystem fs,
480       final Entry logEntry, final Path rootDir, boolean isCreate)
481   throws IOException {
482     Path tableDir = FSUtils.getTableDir(rootDir, logEntry.getKey().getTablename());
483     String encodedRegionName = Bytes.toString(logEntry.getKey().getEncodedRegionName());
484     Path regiondir = HRegion.getRegionDir(tableDir, encodedRegionName);
485     Path dir = HLogUtil.getRegionDirRecoveredEditsDir(regiondir);
486 
487     if (!fs.exists(regiondir)) {
488       LOG.info("This region's directory doesn't exist: "
489           + regiondir.toString() + ". It is very likely that it was" +
490           " already split so it's safe to discard those edits.");
491       return null;
492     }
493     if (fs.exists(dir) && fs.isFile(dir)) {
494       Path tmp = new Path("/tmp");
495       if (!fs.exists(tmp)) {
496         fs.mkdirs(tmp);
497       }
498       tmp = new Path(tmp,
499         HConstants.RECOVERED_EDITS_DIR + "_" + encodedRegionName);
500       LOG.warn("Found existing old file: " + dir + ". It could be some "
501         + "leftover of an old installation. It should be a folder instead. "
502         + "So moving it to " + tmp);
503       if (!fs.rename(dir, tmp)) {
504         LOG.warn("Failed to sideline old file " + dir);
505       }
506     }
507 
508     if (isCreate && !fs.exists(dir)) {
509       if (!fs.mkdirs(dir)) LOG.warn("mkdir failed on " + dir);
510     }
511     // Append file name ends with RECOVERED_LOG_TMPFILE_SUFFIX to ensure
512     // region's replayRecoveredEdits will not delete it
513     String fileName = formatRecoveredEditsFileName(logEntry.getKey().getLogSeqNum());
514     fileName = getTmpRecoveredEditsFileName(fileName);
515     return new Path(dir, fileName);
516   }
517 
518   static String getTmpRecoveredEditsFileName(String fileName) {
519     return fileName + HLog.RECOVERED_LOG_TMPFILE_SUFFIX;
520   }
521 
522   /**
523    * Get the completed recovered edits file path, renaming it to be by last edit
524    * in the file from its first edit. Then we could use the name to skip
525    * recovered edits when doing {@link HRegion#replayRecoveredEditsIfAny}.
526    * @param srcPath
527    * @param maximumEditLogSeqNum
528    * @return dstPath take file's last edit log seq num as the name
529    */
530   static Path getCompletedRecoveredEditsFilePath(Path srcPath,
531       Long maximumEditLogSeqNum) {
532     String fileName = formatRecoveredEditsFileName(maximumEditLogSeqNum);
533     return new Path(srcPath.getParent(), fileName);
534   }
535 
536   static String formatRecoveredEditsFileName(final long seqid) {
537     return String.format("%019d", seqid);
538   }
539 
540   /**
541    * Create a new {@link Reader} for reading logs to split.
542    *
543    * @param fs
544    * @param file
545    * @param conf
546    * @return A new Reader instance
547    * @throws IOException
548    * @throws CorruptedLogFileException
549    */
550   protected Reader getReader(FileSystem fs, FileStatus file, Configuration conf,
551       boolean skipErrors, CancelableProgressable reporter)
552       throws IOException, CorruptedLogFileException {
553     Path path = file.getPath();
554     long length = file.getLen();
555     Reader in;
556 
557     // Check for possibly empty file. With appends, currently Hadoop reports a
558     // zero length even if the file has been sync'd. Revisit if HDFS-376 or
559     // HDFS-878 is committed.
560     if (length <= 0) {
561       LOG.warn("File " + path + " might be still open, length is 0");
562     }
563 
564     try {
565       FSUtils.getInstance(fs, conf).recoverFileLease(fs, path, conf, reporter);
566       try {
567         in = getReader(fs, path, conf, reporter);
568       } catch (EOFException e) {
569         if (length <= 0) {
570           // TODO should we ignore an empty, not-last log file if skip.errors
571           // is false? Either way, the caller should decide what to do. E.g.
572           // ignore if this is the last log in sequence.
573           // TODO is this scenario still possible if the log has been
574           // recovered (i.e. closed)
575           LOG.warn("Could not open " + path + " for reading. File is empty", e);
576           return null;
577         } else {
578           // EOFException being ignored
579           return null;
580         }
581       }
582     } catch (IOException e) {
583       if (e instanceof FileNotFoundException) {
584         // A wal file may not exist anymore. Nothing can be recovered so move on
585         LOG.warn("File " + path + " doesn't exist anymore.", e);
586         return null;
587       }
588       if (!skipErrors || e instanceof InterruptedIOException) {
589         throw e; // Don't mark the file corrupted if interrupted, or not skipErrors
590       }
591       CorruptedLogFileException t =
592         new CorruptedLogFileException("skipErrors=true Could not open hlog " +
593             path + " ignoring");
594       t.initCause(e);
595       throw t;
596     }
597     return in;
598   }
599 
600   static private Entry getNextLogLine(Reader in, Path path, boolean skipErrors)
601   throws CorruptedLogFileException, IOException {
602     try {
603       return in.next();
604     } catch (EOFException eof) {
605       // truncated files are expected if a RS crashes (see HBASE-2643)
606       LOG.info("EOF from hlog " + path + ".  continuing");
607       return null;
608     } catch (IOException e) {
609       // If the IOE resulted from bad file format,
610       // then this problem is idempotent and retrying won't help
611       if (e.getCause() != null &&
612           (e.getCause() instanceof ParseException ||
613            e.getCause() instanceof org.apache.hadoop.fs.ChecksumException)) {
614         LOG.warn("Parse exception " + e.getCause().toString() + " from hlog "
615            + path + ".  continuing");
616         return null;
617       }
618       if (!skipErrors) {
619         throw e;
620       }
621       CorruptedLogFileException t =
622         new CorruptedLogFileException("skipErrors=true Ignoring exception" +
623             " while parsing hlog " + path + ". Marking as corrupted");
624       t.initCause(e);
625       throw t;
626     }
627   }
628 
629   /**
630    * Create a new {@link Writer} for writing log splits.
631    */
632   protected Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
633       throws IOException {
634     return HLogFactory.createRecoveredEditsWriter(fs, logfile, conf);
635   }
636 
637   /**
638    * Create a new {@link Reader} for reading logs to split.
639    */
640   protected Reader getReader(FileSystem fs, Path curLogFile,
641       Configuration conf, CancelableProgressable reporter) throws IOException {
642     return HLogFactory.createReader(fs, curLogFile, conf, reporter);
643   }
644 
645   /**
646    * Get current open writers
647    */
648   private int getNumOpenWriters() {
649     int result = 0;
650     if (this.outputSink != null) {
651       result += this.outputSink.getNumOpenWriters();
652     }
653     return result;
654   }
655 
656   /**
657    * Contains some methods to control WAL-entries producer / consumer interactions
658    */
659   public static class PipelineController {
660     // If an exception is thrown by one of the other threads, it will be
661     // stored here.
662     AtomicReference<Throwable> thrown = new AtomicReference<Throwable>();
663 
664     // Wait/notify for when data has been produced by the writer thread,
665     // consumed by the reader thread, or an exception occurred
666     public final Object dataAvailable = new Object();
667 
668     void writerThreadError(Throwable t) {
669       thrown.compareAndSet(null, t);
670     }
671 
672     /**
673      * Check for errors in the writer threads. If any is found, rethrow it.
674      */
675     void checkForErrors() throws IOException {
676       Throwable thrown = this.thrown.get();
677       if (thrown == null) return;
678       if (thrown instanceof IOException) {
679         throw new IOException(thrown);
680       } else {
681         throw new RuntimeException(thrown);
682       }
683     }
684   }
685 
686   /**
687    * Class which accumulates edits and separates them into a buffer per region
688    * while simultaneously accounting RAM usage. Blocks if the RAM usage crosses
689    * a predefined threshold.
690    *
691    * Writer threads then pull region-specific buffers from this class.
692    */
693   public static class EntryBuffers {
694     PipelineController controller;
695 
696     Map<byte[], RegionEntryBuffer> buffers =
697       new TreeMap<byte[], RegionEntryBuffer>(Bytes.BYTES_COMPARATOR);
698 
699     /* Track which regions are currently in the middle of writing. We don't allow
700        an IO thread to pick up bytes from a region if we're already writing
701        data for that region in a different IO thread. */
702     Set<byte[]> currentlyWriting = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
703 
704     long totalBuffered = 0;
705     long maxHeapUsage;
706 
707     public EntryBuffers(PipelineController controller, long maxHeapUsage) {
708       this.controller = controller;
709       this.maxHeapUsage = maxHeapUsage;
710     }
711 
712     /**
713      * Append a log entry into the corresponding region buffer.
714      * Blocks if the total heap usage has crossed the specified threshold.
715      *
716      * @throws InterruptedException
717      * @throws IOException
718      */
719     public void appendEntry(Entry entry) throws InterruptedException, IOException {
720       HLogKey key = entry.getKey();
721 
722       RegionEntryBuffer buffer;
723       long incrHeap;
724       synchronized (this) {
725         buffer = buffers.get(key.getEncodedRegionName());
726         if (buffer == null) {
727           buffer = new RegionEntryBuffer(key.getTablename(), key.getEncodedRegionName());
728           buffers.put(key.getEncodedRegionName(), buffer);
729         }
730         incrHeap= buffer.appendEntry(entry);
731       }
732 
733       // If we crossed the chunk threshold, wait for more space to be available
734       synchronized (controller.dataAvailable) {
735         totalBuffered += incrHeap;
736         while (totalBuffered > maxHeapUsage && controller.thrown.get() == null) {
737           LOG.debug("Used " + totalBuffered + " bytes of buffered edits, waiting for IO threads...");
738           controller.dataAvailable.wait(2000);
739         }
740         controller.dataAvailable.notifyAll();
741       }
742       controller.checkForErrors();
743     }
744 
745     /**
746      * @return RegionEntryBuffer a buffer of edits to be written or replayed.
747      */
748     synchronized RegionEntryBuffer getChunkToWrite() {
749       long biggestSize = 0;
750       byte[] biggestBufferKey = null;
751 
752       for (Map.Entry<byte[], RegionEntryBuffer> entry : buffers.entrySet()) {
753         long size = entry.getValue().heapSize();
754         if (size > biggestSize && (!currentlyWriting.contains(entry.getKey()))) {
755           biggestSize = size;
756           biggestBufferKey = entry.getKey();
757         }
758       }
759       if (biggestBufferKey == null) {
760         return null;
761       }
762 
763       RegionEntryBuffer buffer = buffers.remove(biggestBufferKey);
764       currentlyWriting.add(biggestBufferKey);
765       return buffer;
766     }
767 
768     void doneWriting(RegionEntryBuffer buffer) {
769       synchronized (this) {
770         boolean removed = currentlyWriting.remove(buffer.encodedRegionName);
771         assert removed;
772       }
773       long size = buffer.heapSize();
774 
775       synchronized (controller.dataAvailable) {
776         totalBuffered -= size;
777         // We may unblock writers
778         controller.dataAvailable.notifyAll();
779       }
780     }
781 
782     synchronized boolean isRegionCurrentlyWriting(byte[] region) {
783       return currentlyWriting.contains(region);
784     }
785 
786     public void waitUntilDrained() {
787       synchronized (controller.dataAvailable) {
788         while (totalBuffered > 0) {
789           try {
790             controller.dataAvailable.wait(2000);
791           } catch (InterruptedException e) {
792             LOG.warn("Got intrerrupted while waiting for EntryBuffers is drained");
793             Thread.interrupted();
794             break;
795           }
796         }
797       }
798     }
799   }
800 
801   /**
802    * A buffer of some number of edits for a given region.
803    * This accumulates edits and also provides a memory optimization in order to
804    * share a single byte array instance for the table and region name.
805    * Also tracks memory usage of the accumulated edits.
806    */
807   public static class RegionEntryBuffer implements HeapSize {
808     long heapInBuffer = 0;
809     List<Entry> entryBuffer;
810     TableName tableName;
811     byte[] encodedRegionName;
812 
813     RegionEntryBuffer(TableName tableName, byte[] region) {
814       this.tableName = tableName;
815       this.encodedRegionName = region;
816       this.entryBuffer = new LinkedList<Entry>();
817     }
818 
819     long appendEntry(Entry entry) {
820       internify(entry);
821       entryBuffer.add(entry);
822       long incrHeap = entry.getEdit().heapSize() +
823         ClassSize.align(2 * ClassSize.REFERENCE) + // HLogKey pointers
824         0; // TODO linkedlist entry
825       heapInBuffer += incrHeap;
826       return incrHeap;
827     }
828 
829     private void internify(Entry entry) {
830       HLogKey k = entry.getKey();
831       k.internTableName(this.tableName);
832       k.internEncodedRegionName(this.encodedRegionName);
833     }
834 
835     @Override
836     public long heapSize() {
837       return heapInBuffer;
838     }
839 
840     public byte[] getEncodedRegionName() {
841       return encodedRegionName;
842     }
843 
844     public List<Entry> getEntryBuffer() {
845       return entryBuffer;
846     }
847 
848     public TableName getTableName() {
849       return tableName;
850     }
851   }
852 
853   public static class WriterThread extends Thread {
854     private volatile boolean shouldStop = false;
855     private PipelineController controller;
856     private EntryBuffers entryBuffers;
857     private OutputSink outputSink = null;
858 
859     WriterThread(PipelineController controller, EntryBuffers entryBuffers, OutputSink sink, int i){
860       super(Thread.currentThread().getName() + "-Writer-" + i);
861       this.controller = controller;
862       this.entryBuffers = entryBuffers;
863       outputSink = sink;
864     }
865 
866     @Override
867     public void run()  {
868       try {
869         doRun();
870       } catch (Throwable t) {
871         LOG.error("Exiting thread", t);
872         controller.writerThreadError(t);
873       }
874     }
875 
876     private void doRun() throws IOException {
877       LOG.debug("Writer thread " + this + ": starting");
878       while (true) {
879         RegionEntryBuffer buffer = entryBuffers.getChunkToWrite();
880         if (buffer == null) {
881           // No data currently available, wait on some more to show up
882           synchronized (controller.dataAvailable) {
883             if (shouldStop && !this.outputSink.flush()) {
884               return;
885             }
886             try {
887               controller.dataAvailable.wait(500);
888             } catch (InterruptedException ie) {
889               if (!shouldStop) {
890                 throw new RuntimeException(ie);
891               }
892             }
893           }
894           continue;
895         }
896 
897         assert buffer != null;
898         try {
899           writeBuffer(buffer);
900         } finally {
901           entryBuffers.doneWriting(buffer);
902         }
903       }
904     }
905 
906     private void writeBuffer(RegionEntryBuffer buffer) throws IOException {
907       outputSink.append(buffer);
908     }
909 
910     void finish() {
911       synchronized (controller.dataAvailable) {
912         shouldStop = true;
913         controller.dataAvailable.notifyAll();
914       }
915     }
916   }
917 
918   /**
919    * The following class is an abstraction class to provide a common interface to support both
920    * existing recovered edits file sink and region server WAL edits replay sink
921    */
922   public static abstract class OutputSink {
923 
924     protected PipelineController controller;
925     protected EntryBuffers entryBuffers;
926 
927     protected Map<byte[], SinkWriter> writers = Collections
928         .synchronizedMap(new TreeMap<byte[], SinkWriter>(Bytes.BYTES_COMPARATOR));;
929 
930     protected final Map<byte[], Long> regionMaximumEditLogSeqNum = Collections
931         .synchronizedMap(new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR));
932 
933     protected final List<WriterThread> writerThreads = Lists.newArrayList();
934 
935     /* Set of regions which we've decided should not output edits */
936     protected final Set<byte[]> blacklistedRegions = Collections
937         .synchronizedSet(new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR));
938 
939     protected boolean closeAndCleanCompleted = false;
940 
941     protected boolean writersClosed = false;
942 
943     protected final int numThreads;
944 
945     protected CancelableProgressable reporter = null;
946 
947     protected AtomicLong skippedEdits = new AtomicLong();
948 
949     protected List<Path> splits = null;
950 
951     public OutputSink(PipelineController controller, EntryBuffers entryBuffers, int numWriters) {
952       numThreads = numWriters;
953       this.controller = controller;
954       this.entryBuffers = entryBuffers;
955     }
956 
957     void setReporter(CancelableProgressable reporter) {
958       this.reporter = reporter;
959     }
960 
961     /**
962      * Start the threads that will pump data from the entryBuffers to the output files.
963      */
964     public synchronized void startWriterThreads() {
965       for (int i = 0; i < numThreads; i++) {
966         WriterThread t = new WriterThread(controller, entryBuffers, this, i);
967         t.start();
968         writerThreads.add(t);
969       }
970     }
971 
972     /**
973      *
974      * Update region's maximum edit log SeqNum.
975      */
976     void updateRegionMaximumEditLogSeqNum(Entry entry) {
977       synchronized (regionMaximumEditLogSeqNum) {
978         Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(entry.getKey()
979             .getEncodedRegionName());
980         if (currentMaxSeqNum == null || entry.getKey().getLogSeqNum() > currentMaxSeqNum) {
981           regionMaximumEditLogSeqNum.put(entry.getKey().getEncodedRegionName(), entry.getKey()
982               .getLogSeqNum());
983         }
984       }
985     }
986 
987     Long getRegionMaximumEditLogSeqNum(byte[] region) {
988       return regionMaximumEditLogSeqNum.get(region);
989     }
990 
991     /**
992      * @return the number of currently opened writers
993      */
994     int getNumOpenWriters() {
995       return this.writers.size();
996     }
997 
998     long getSkippedEdits() {
999       return this.skippedEdits.get();
1000     }
1001 
1002     /**
1003      * Wait for writer threads to dump all info to the sink
1004      * @return true when there is no error
1005      * @throws IOException
1006      */
1007     protected boolean finishWriting() throws IOException {
1008       LOG.debug("Waiting for split writer threads to finish");
1009       boolean progress_failed = false;
1010       for (WriterThread t : writerThreads) {
1011         t.finish();
1012       }
1013       for (WriterThread t : writerThreads) {
1014         if (!progress_failed && reporter != null && !reporter.progress()) {
1015           progress_failed = true;
1016         }
1017         try {
1018           t.join();
1019         } catch (InterruptedException ie) {
1020           IOException iie = new InterruptedIOException();
1021           iie.initCause(ie);
1022           throw iie;
1023         }
1024       }
1025       controller.checkForErrors();
1026       LOG.info("Split writers finished");
1027       return (!progress_failed);
1028     }
1029 
1030     public abstract List<Path> finishWritingAndClose() throws IOException;
1031 
1032     /**
1033      * @return a map from encoded region ID to the number of edits written out for that region.
1034      */
1035     public abstract Map<byte[], Long> getOutputCounts();
1036 
1037     /**
1038      * @return number of regions we've recovered
1039      */
1040     public abstract int getNumberOfRecoveredRegions();
1041 
1042     /**
1043      * @param buffer A WAL Edit Entry
1044      * @throws IOException
1045      */
1046     public abstract void append(RegionEntryBuffer buffer) throws IOException;
1047 
1048     /**
1049      * WriterThread call this function to help flush internal remaining edits in buffer before close
1050      * @return true when underlying sink has something to flush
1051      */
1052     public boolean flush() throws IOException {
1053       return false;
1054     }
1055   }
1056 
1057   /**
1058    * Class that manages the output streams from the log splitting process.
1059    */
1060   class LogRecoveredEditsOutputSink extends OutputSink {
1061 
1062     public LogRecoveredEditsOutputSink(PipelineController controller, EntryBuffers entryBuffers,
1063         int numWriters) {
1064       // More threads could potentially write faster at the expense
1065       // of causing more disk seeks as the logs are split.
1066       // 3. After a certain setting (probably around 3) the
1067       // process will be bound on the reader in the current
1068       // implementation anyway.
1069       super(controller, entryBuffers, numWriters);
1070     }
1071 
1072     /**
1073      * @return null if failed to report progress
1074      * @throws IOException
1075      */
1076     @Override
1077     public List<Path> finishWritingAndClose() throws IOException {
1078       boolean isSuccessful = false;
1079       List<Path> result = null;
1080       try {
1081         isSuccessful = finishWriting();
1082       } finally {
1083         result = close();
1084         List<IOException> thrown = closeLogWriters(null);
1085         if (thrown != null && !thrown.isEmpty()) {
1086           throw MultipleIOException.createIOException(thrown);
1087         }
1088       }
1089       if (isSuccessful) {
1090         splits = result;
1091       }
1092       return splits;
1093     }
1094 
1095     /**
1096      * Close all of the output streams.
1097      * @return the list of paths written.
1098      */
1099     private List<Path> close() throws IOException {
1100       Preconditions.checkState(!closeAndCleanCompleted);
1101 
1102       final List<Path> paths = new ArrayList<Path>();
1103       final List<IOException> thrown = Lists.newArrayList();
1104       ThreadPoolExecutor closeThreadPool = Threads.getBoundedCachedThreadPool(numThreads, 30L,
1105         TimeUnit.SECONDS, new ThreadFactory() {
1106           private int count = 1;
1107 
1108           @Override
1109           public Thread newThread(Runnable r) {
1110             Thread t = new Thread(r, "split-log-closeStream-" + count++);
1111             return t;
1112           }
1113         });
1114       CompletionService<Void> completionService =
1115         new ExecutorCompletionService<Void>(closeThreadPool);
1116       for (final Map.Entry<byte[], SinkWriter> writersEntry : writers.entrySet()) {
1117         LOG.debug("Submitting close of " + ((WriterAndPath)writersEntry.getValue()).p);
1118         completionService.submit(new Callable<Void>() {
1119           @Override
1120           public Void call() throws Exception {
1121             WriterAndPath wap = (WriterAndPath) writersEntry.getValue();
1122             LOG.debug("Closing " + wap.p);
1123             try {
1124               wap.w.close();
1125             } catch (IOException ioe) {
1126               LOG.error("Couldn't close log at " + wap.p, ioe);
1127               thrown.add(ioe);
1128               return null;
1129             }
1130             LOG.info("Closed wap " + wap.p + " (wrote " + wap.editsWritten + " edits in "
1131                 + (wap.nanosSpent / 1000 / 1000) + "ms)");
1132 
1133             if (wap.editsWritten == 0) {
1134               // just remove the empty recovered.edits file
1135               if (fs.exists(wap.p) && !fs.delete(wap.p, false)) {
1136                 LOG.warn("Failed deleting empty " + wap.p);
1137                 throw new IOException("Failed deleting empty  " + wap.p);
1138               }
1139               return null;
1140             }
1141 
1142             Path dst = getCompletedRecoveredEditsFilePath(wap.p,
1143               regionMaximumEditLogSeqNum.get(writersEntry.getKey()));
1144             try {
1145               if (!dst.equals(wap.p) && fs.exists(dst)) {
1146                 LOG.warn("Found existing old edits file. It could be the "
1147                     + "result of a previous failed split attempt. Deleting " + dst + ", length="
1148                     + fs.getFileStatus(dst).getLen());
1149                 if (!fs.delete(dst, false)) {
1150                   LOG.warn("Failed deleting of old " + dst);
1151                   throw new IOException("Failed deleting of old " + dst);
1152                 }
1153               }
1154               // Skip the unit tests which create a splitter that reads and
1155               // writes the data without touching disk.
1156               // TestHLogSplit#testThreading is an example.
1157               if (fs.exists(wap.p)) {
1158                 if (!fs.rename(wap.p, dst)) {
1159                   throw new IOException("Failed renaming " + wap.p + " to " + dst);
1160                 }
1161                 LOG.info("Rename " + wap.p + " to " + dst);
1162               }
1163             } catch (IOException ioe) {
1164               LOG.error("Couldn't rename " + wap.p + " to " + dst, ioe);
1165               thrown.add(ioe);
1166               return null;
1167             }
1168             paths.add(dst);
1169             return null;
1170           }
1171         });
1172       }
1173 
1174       boolean progress_failed = false;
1175       try {
1176         for (int i = 0, n = this.writers.size(); i < n; i++) {
1177           Future<Void> future = completionService.take();
1178           future.get();
1179           if (!progress_failed && reporter != null && !reporter.progress()) {
1180             progress_failed = true;
1181           }
1182         }
1183       } catch (InterruptedException e) {
1184         IOException iie = new InterruptedIOException();
1185         iie.initCause(e);
1186         throw iie;
1187       } catch (ExecutionException e) {
1188         throw new IOException(e.getCause());
1189       } finally {
1190         closeThreadPool.shutdownNow();
1191       }
1192 
1193       if (!thrown.isEmpty()) {
1194         throw MultipleIOException.createIOException(thrown);
1195       }
1196       writersClosed = true;
1197       closeAndCleanCompleted = true;
1198       if (progress_failed) {
1199         return null;
1200       }
1201       return paths;
1202     }
1203 
1204     private List<IOException> closeLogWriters(List<IOException> thrown) throws IOException {
1205       if (writersClosed) {
1206         return thrown;
1207       }
1208 
1209       if (thrown == null) {
1210         thrown = Lists.newArrayList();
1211       }
1212       try {
1213         for (WriterThread t : writerThreads) {
1214           while (t.isAlive()) {
1215             t.shouldStop = true;
1216             t.interrupt();
1217             try {
1218               t.join(10);
1219             } catch (InterruptedException e) {
1220               IOException iie = new InterruptedIOException();
1221               iie.initCause(e);
1222               throw iie;
1223             }
1224           }
1225         }
1226       } finally {
1227         synchronized (writers) {
1228           WriterAndPath wap = null;
1229           for (SinkWriter tmpWAP : writers.values()) {
1230             try {
1231               wap = (WriterAndPath) tmpWAP;
1232               wap.w.close();
1233             } catch (IOException ioe) {
1234               LOG.error("Couldn't close log at " + wap.p, ioe);
1235               thrown.add(ioe);
1236               continue;
1237             }
1238             LOG.info("Closed log " + wap.p + " (wrote " + wap.editsWritten + " edits in "
1239                 + (wap.nanosSpent / 1000 / 1000) + "ms)");
1240           }
1241         }
1242         writersClosed = true;
1243       }
1244 
1245       return thrown;
1246     }
1247 
1248     /**
1249      * Get a writer and path for a log starting at the given entry. This function is threadsafe so
1250      * long as multiple threads are always acting on different regions.
1251      * @return null if this region shouldn't output any logs
1252      */
1253     private WriterAndPath getWriterAndPath(Entry entry) throws IOException {
1254       byte region[] = entry.getKey().getEncodedRegionName();
1255       WriterAndPath ret = (WriterAndPath) writers.get(region);
1256       if (ret != null) {
1257         return ret;
1258       }
1259       // If we already decided that this region doesn't get any output
1260       // we don't need to check again.
1261       if (blacklistedRegions.contains(region)) {
1262         return null;
1263       }
1264       ret = createWAP(region, entry, rootDir, fs, conf);
1265       if (ret == null) {
1266         blacklistedRegions.add(region);
1267         return null;
1268       }
1269       writers.put(region, ret);
1270       return ret;
1271     }
1272 
1273     private WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir, FileSystem fs,
1274         Configuration conf) throws IOException {
1275       Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, true);
1276       if (regionedits == null) {
1277         return null;
1278       }
1279       if (fs.exists(regionedits)) {
1280         LOG.warn("Found old edits file. It could be the "
1281             + "result of a previous failed split attempt. Deleting " + regionedits + ", length="
1282             + fs.getFileStatus(regionedits).getLen());
1283         if (!fs.delete(regionedits, false)) {
1284           LOG.warn("Failed delete of old " + regionedits);
1285         }
1286       }
1287       Writer w = createWriter(fs, regionedits, conf);
1288       LOG.info("Creating writer path=" + regionedits + " region=" + Bytes.toStringBinary(region));
1289       return (new WriterAndPath(regionedits, w));
1290     }
1291 
1292     @Override
1293     public void append(RegionEntryBuffer buffer) throws IOException {
1294       List<Entry> entries = buffer.entryBuffer;
1295       if (entries.isEmpty()) {
1296         LOG.warn("got an empty buffer, skipping");
1297         return;
1298       }
1299 
1300       WriterAndPath wap = null;
1301 
1302       long startTime = System.nanoTime();
1303       try {
1304         int editsCount = 0;
1305 
1306         for (Entry logEntry : entries) {
1307           if (wap == null) {
1308             wap = getWriterAndPath(logEntry);
1309             if (wap == null) {
1310               // getWriterAndPath decided we don't need to write these edits
1311               return;
1312             }
1313           }
1314           wap.w.append(logEntry);
1315           this.updateRegionMaximumEditLogSeqNum(logEntry);
1316           editsCount++;
1317         }
1318         // Pass along summary statistics
1319         wap.incrementEdits(editsCount);
1320         wap.incrementNanoTime(System.nanoTime() - startTime);
1321       } catch (IOException e) {
1322           e = e instanceof RemoteException ?
1323                   ((RemoteException)e).unwrapRemoteException() : e;
1324         LOG.fatal(" Got while writing log entry to log", e);
1325         throw e;
1326       }
1327     }
1328 
1329     /**
1330      * @return a map from encoded region ID to the number of edits written out for that region.
1331      */
1332     @Override
1333     public Map<byte[], Long> getOutputCounts() {
1334       TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
1335       synchronized (writers) {
1336         for (Map.Entry<byte[], SinkWriter> entry : writers.entrySet()) {
1337           ret.put(entry.getKey(), entry.getValue().editsWritten);
1338         }
1339       }
1340       return ret;
1341     }
1342 
1343     @Override
1344     public int getNumberOfRecoveredRegions() {
1345       return writers.size();
1346     }
1347   }
1348 
1349   /**
1350    * Class wraps the actual writer which writes data out and related statistics
1351    */
1352   public abstract static class SinkWriter {
1353     /* Count of edits written to this path */
1354     long editsWritten = 0;
1355     /* Number of nanos spent writing to this log */
1356     long nanosSpent = 0;
1357 
1358     void incrementEdits(int edits) {
1359       editsWritten += edits;
1360     }
1361 
1362     void incrementNanoTime(long nanos) {
1363       nanosSpent += nanos;
1364     }
1365   }
1366 
1367   /**
1368    * Private data structure that wraps a Writer and its Path, also collecting statistics about the
1369    * data written to this output.
1370    */
1371   private final static class WriterAndPath extends SinkWriter {
1372     final Path p;
1373     final Writer w;
1374 
1375     WriterAndPath(final Path p, final Writer w) {
1376       this.p = p;
1377       this.w = w;
1378     }
1379   }
1380 
1381   /**
1382    * Class that manages to replay edits from WAL files directly to assigned fail over region servers
1383    */
1384   class LogReplayOutputSink extends OutputSink {
1385     private static final double BUFFER_THRESHOLD = 0.35;
1386     private static final String KEY_DELIMITER = "#";
1387 
1388     private long waitRegionOnlineTimeOut;
1389     private final Set<String> recoveredRegions = Collections.synchronizedSet(new HashSet<String>());
1390     private final Map<String, RegionServerWriter> writers =
1391         new ConcurrentHashMap<String, RegionServerWriter>();
1392     // online encoded region name -> region location map
1393     private final Map<String, HRegionLocation> onlineRegions =
1394         new ConcurrentHashMap<String, HRegionLocation>();
1395 
1396     private Map<TableName, HConnection> tableNameToHConnectionMap = Collections
1397         .synchronizedMap(new TreeMap<TableName, HConnection>());
1398     /**
1399      * Map key -> value layout
1400      * <servername>:<table name> -> Queue<Row>
1401      */
1402     private Map<String, List<Pair<HRegionLocation, HLog.Entry>>> serverToBufferQueueMap =
1403         new ConcurrentHashMap<String, List<Pair<HRegionLocation, HLog.Entry>>>();
1404     private List<Throwable> thrown = new ArrayList<Throwable>();
1405 
1406     // The following sink is used in distrubitedLogReplay mode for entries of regions in a disabling
1407     // table. It's a limitation of distributedLogReplay. Because log replay needs a region is
1408     // assigned and online before it can replay wal edits while regions of disabling/disabled table
1409     // won't be assigned by AM. We can retire this code after HBASE-8234.
1410     private LogRecoveredEditsOutputSink logRecoveredEditsOutputSink;
1411     private boolean hasEditsInDisablingOrDisabledTables = false;
1412 
1413     public LogReplayOutputSink(PipelineController controller, EntryBuffers entryBuffers,
1414         int numWriters) {
1415       super(controller, entryBuffers, numWriters);
1416       this.waitRegionOnlineTimeOut =
1417           conf.getInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT,
1418             ZKSplitLogManagerCoordination.DEFAULT_TIMEOUT);
1419       this.logRecoveredEditsOutputSink = new LogRecoveredEditsOutputSink(controller,
1420         entryBuffers, numWriters);
1421       this.logRecoveredEditsOutputSink.setReporter(reporter);
1422     }
1423 
1424     @Override
1425     public void append(RegionEntryBuffer buffer) throws IOException {
1426       List<Entry> entries = buffer.entryBuffer;
1427       if (entries.isEmpty()) {
1428         LOG.warn("got an empty buffer, skipping");
1429         return;
1430       }
1431 
1432       // check if current region in a disabling or disabled table
1433       if (disablingOrDisabledTables.contains(buffer.tableName)) {
1434         // need fall back to old way
1435         logRecoveredEditsOutputSink.append(buffer);
1436         hasEditsInDisablingOrDisabledTables = true;
1437         // store regions we have recovered so far
1438         addToRecoveredRegions(Bytes.toString(buffer.encodedRegionName));
1439         return;
1440       }
1441 
1442       // group entries by region servers
1443       groupEditsByServer(entries);
1444 
1445       // process workitems
1446       String maxLocKey = null;
1447       int maxSize = 0;
1448       List<Pair<HRegionLocation, HLog.Entry>> maxQueue = null;
1449       synchronized (this.serverToBufferQueueMap) {
1450         for (String key : this.serverToBufferQueueMap.keySet()) {
1451           List<Pair<HRegionLocation, HLog.Entry>> curQueue = this.serverToBufferQueueMap.get(key);
1452           if (curQueue.size() > maxSize) {
1453             maxSize = curQueue.size();
1454             maxQueue = curQueue;
1455             maxLocKey = key;
1456           }
1457         }
1458         if (maxSize < minBatchSize
1459             && entryBuffers.totalBuffered < BUFFER_THRESHOLD * entryBuffers.maxHeapUsage) {
1460           // buffer more to process
1461           return;
1462         } else if (maxSize > 0) {
1463           this.serverToBufferQueueMap.remove(maxLocKey);
1464         }
1465       }
1466 
1467       if (maxSize > 0) {
1468         processWorkItems(maxLocKey, maxQueue);
1469       }
1470     }
1471 
1472     private void addToRecoveredRegions(String encodedRegionName) {
1473       if (!recoveredRegions.contains(encodedRegionName)) {
1474         recoveredRegions.add(encodedRegionName);
1475       }
1476     }
1477 
1478     /**
1479      * Helper function to group WALEntries to individual region servers
1480      * @throws IOException
1481      */
1482     private void groupEditsByServer(List<Entry> entries) throws IOException {
1483       Set<TableName> nonExistentTables = null;
1484       Long cachedLastFlushedSequenceId = -1l;
1485       for (HLog.Entry entry : entries) {
1486         WALEdit edit = entry.getEdit();
1487         TableName table = entry.getKey().getTablename();
1488         // clear scopes which isn't needed for recovery
1489         entry.getKey().setScopes(null);
1490         String encodeRegionNameStr = Bytes.toString(entry.getKey().getEncodedRegionName());
1491         // skip edits of non-existent tables
1492         if (nonExistentTables != null && nonExistentTables.contains(table)) {
1493           this.skippedEdits.incrementAndGet();
1494           continue;
1495         }
1496 
1497         Map<byte[], Long> maxStoreSequenceIds = null;
1498         boolean needSkip = false;
1499         HRegionLocation loc = null;
1500         String locKey = null;
1501         List<Cell> cells = edit.getCells();
1502         List<Cell> skippedCells = new ArrayList<Cell>();
1503         HConnection hconn = this.getConnectionByTableName(table);
1504 
1505         for (Cell cell : cells) {
1506           byte[] row = cell.getRow();
1507           byte[] family = cell.getFamily();
1508           boolean isCompactionEntry = false;
1509           if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
1510             CompactionDescriptor compaction = WALEdit.getCompaction(cell);
1511             if (compaction != null && compaction.hasRegionName()) {
1512               try {
1513                 byte[][] regionName = HRegionInfo.parseRegionName(compaction.getRegionName()
1514                   .toByteArray());
1515                 row = regionName[1]; // startKey of the region
1516                 family = compaction.getFamilyName().toByteArray();
1517                 isCompactionEntry = true;
1518               } catch (Exception ex) {
1519                 LOG.warn("Unexpected exception received, ignoring " + ex);
1520                 skippedCells.add(cell);
1521                 continue;
1522               }
1523             } else {
1524               skippedCells.add(cell);
1525               continue;
1526             }
1527           }
1528 
1529           try {
1530             loc =
1531                 locateRegionAndRefreshLastFlushedSequenceId(hconn, table, row,
1532                   encodeRegionNameStr);
1533             // skip replaying the compaction if the region is gone
1534             if (isCompactionEntry && !encodeRegionNameStr.equalsIgnoreCase(
1535               loc.getRegionInfo().getEncodedName())) {
1536               LOG.info("Not replaying a compaction marker for an older region: "
1537                   + encodeRegionNameStr);
1538               needSkip = true;
1539             }
1540           } catch (TableNotFoundException ex) {
1541             // table has been deleted so skip edits of the table
1542             LOG.info("Table " + table + " doesn't exist. Skip log replay for region "
1543                 + encodeRegionNameStr);
1544             lastFlushedSequenceIds.put(encodeRegionNameStr, Long.MAX_VALUE);
1545             if (nonExistentTables == null) {
1546               nonExistentTables = new TreeSet<TableName>();
1547             }
1548             nonExistentTables.add(table);
1549             this.skippedEdits.incrementAndGet();
1550             needSkip = true;
1551             break;
1552           }
1553 
1554           cachedLastFlushedSequenceId =
1555               lastFlushedSequenceIds.get(loc.getRegionInfo().getEncodedName());
1556           if (cachedLastFlushedSequenceId != null
1557               && cachedLastFlushedSequenceId >= entry.getKey().getLogSeqNum()) {
1558             // skip the whole HLog entry
1559             this.skippedEdits.incrementAndGet();
1560             needSkip = true;
1561             break;
1562           } else {
1563             if (maxStoreSequenceIds == null) {
1564               maxStoreSequenceIds =
1565                   regionMaxSeqIdInStores.get(loc.getRegionInfo().getEncodedName());
1566             }
1567             if (maxStoreSequenceIds != null) {
1568               Long maxStoreSeqId = maxStoreSequenceIds.get(family);
1569               if (maxStoreSeqId == null || maxStoreSeqId >= entry.getKey().getLogSeqNum()) {
1570                 // skip current kv if column family doesn't exist anymore or already flushed
1571                 skippedCells.add(cell);
1572                 continue;
1573               }
1574             }
1575           }
1576         }
1577 
1578         // skip the edit
1579         if (loc == null || needSkip) continue;
1580 
1581         if (!skippedCells.isEmpty()) {
1582           cells.removeAll(skippedCells);
1583         }
1584 
1585         synchronized (serverToBufferQueueMap) {
1586           locKey = loc.getHostnamePort() + KEY_DELIMITER + table;
1587           List<Pair<HRegionLocation, HLog.Entry>> queue = serverToBufferQueueMap.get(locKey);
1588           if (queue == null) {
1589             queue =
1590                 Collections.synchronizedList(new ArrayList<Pair<HRegionLocation, HLog.Entry>>());
1591             serverToBufferQueueMap.put(locKey, queue);
1592           }
1593           queue.add(new Pair<HRegionLocation, HLog.Entry>(loc, entry));
1594         }
1595         // store regions we have recovered so far
1596         addToRecoveredRegions(loc.getRegionInfo().getEncodedName());
1597       }
1598     }
1599 
1600     /**
1601      * Locate destination region based on table name & row. This function also makes sure the
1602      * destination region is online for replay.
1603      * @throws IOException
1604      */
1605     private HRegionLocation locateRegionAndRefreshLastFlushedSequenceId(HConnection hconn,
1606         TableName table, byte[] row, String originalEncodedRegionName) throws IOException {
1607       // fetch location from cache
1608       HRegionLocation loc = onlineRegions.get(originalEncodedRegionName);
1609       if(loc != null) return loc;
1610       // fetch location from hbase:meta directly without using cache to avoid hit old dead server
1611       loc = hconn.getRegionLocation(table, row, true);
1612       if (loc == null) {
1613         throw new IOException("Can't locate location for row:" + Bytes.toString(row)
1614             + " of table:" + table);
1615       }
1616       // check if current row moves to a different region due to region merge/split
1617       if (!originalEncodedRegionName.equalsIgnoreCase(loc.getRegionInfo().getEncodedName())) {
1618         // originalEncodedRegionName should have already flushed
1619         lastFlushedSequenceIds.put(originalEncodedRegionName, Long.MAX_VALUE);
1620         HRegionLocation tmpLoc = onlineRegions.get(loc.getRegionInfo().getEncodedName());
1621         if (tmpLoc != null) return tmpLoc;
1622       }
1623 
1624       Long lastFlushedSequenceId = -1l;
1625       AtomicBoolean isRecovering = new AtomicBoolean(true);
1626       loc = waitUntilRegionOnline(loc, row, this.waitRegionOnlineTimeOut, isRecovering);
1627       if (!isRecovering.get()) {
1628         // region isn't in recovering at all because WAL file may contain a region that has
1629         // been moved to somewhere before hosting RS fails
1630         lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), Long.MAX_VALUE);
1631         LOG.info("logReplay skip region: " + loc.getRegionInfo().getEncodedName()
1632             + " because it's not in recovering.");
1633       } else {
1634         Long cachedLastFlushedSequenceId =
1635             lastFlushedSequenceIds.get(loc.getRegionInfo().getEncodedName());
1636 
1637         // retrieve last flushed sequence Id from ZK. Because region postOpenDeployTasks will
1638         // update the value for the region
1639         RegionStoreSequenceIds ids =
1640             csm.getSplitLogWorkerCoordination().getRegionFlushedSequenceId(failedServerName,
1641               loc.getRegionInfo().getEncodedName());
1642         if (ids != null) {
1643           lastFlushedSequenceId = ids.getLastFlushedSequenceId();
1644           Map<byte[], Long> storeIds = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
1645           List<StoreSequenceId> maxSeqIdInStores = ids.getStoreSequenceIdList();
1646           for (StoreSequenceId id : maxSeqIdInStores) {
1647             storeIds.put(id.getFamilyName().toByteArray(), id.getSequenceId());
1648           }
1649           regionMaxSeqIdInStores.put(loc.getRegionInfo().getEncodedName(), storeIds);
1650         }
1651 
1652         if (cachedLastFlushedSequenceId == null
1653             || lastFlushedSequenceId > cachedLastFlushedSequenceId) {
1654           lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), lastFlushedSequenceId);
1655         }
1656       }
1657 
1658       onlineRegions.put(loc.getRegionInfo().getEncodedName(), loc);
1659       return loc;
1660     }
1661 
1662     private void processWorkItems(String key, List<Pair<HRegionLocation, HLog.Entry>> actions)
1663         throws IOException {
1664       RegionServerWriter rsw = null;
1665 
1666       long startTime = System.nanoTime();
1667       try {
1668         rsw = getRegionServerWriter(key);
1669         rsw.sink.replayEntries(actions);
1670 
1671         // Pass along summary statistics
1672         rsw.incrementEdits(actions.size());
1673         rsw.incrementNanoTime(System.nanoTime() - startTime);
1674       } catch (IOException e) {
1675         e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
1676         LOG.fatal(" Got while writing log entry to log", e);
1677         throw e;
1678       }
1679     }
1680 
1681     /**
1682      * Wait until region is online on the destination region server
1683      * @param loc
1684      * @param row
1685      * @param timeout How long to wait
1686      * @param isRecovering Recovering state of the region interested on destination region server.
1687      * @return True when region is online on the destination region server
1688      * @throws InterruptedException
1689      */
1690     private HRegionLocation waitUntilRegionOnline(HRegionLocation loc, byte[] row,
1691         final long timeout, AtomicBoolean isRecovering)
1692         throws IOException {
1693       final long endTime = EnvironmentEdgeManager.currentTime() + timeout;
1694       final long pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
1695         HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
1696       boolean reloadLocation = false;
1697       TableName tableName = loc.getRegionInfo().getTable();
1698       int tries = 0;
1699       Throwable cause = null;
1700       while (endTime > EnvironmentEdgeManager.currentTime()) {
1701         try {
1702           // Try and get regioninfo from the hosting server.
1703           HConnection hconn = getConnectionByTableName(tableName);
1704           if(reloadLocation) {
1705             loc = hconn.getRegionLocation(tableName, row, true);
1706           }
1707           BlockingInterface remoteSvr = hconn.getAdmin(loc.getServerName());
1708           HRegionInfo region = loc.getRegionInfo();
1709           try {
1710             GetRegionInfoRequest request =
1711                 RequestConverter.buildGetRegionInfoRequest(region.getRegionName());
1712             GetRegionInfoResponse response = remoteSvr.getRegionInfo(null, request);
1713             if (HRegionInfo.convert(response.getRegionInfo()) != null) {
1714               isRecovering.set((response.hasIsRecovering()) ? response.getIsRecovering() : true);
1715               return loc;
1716             }
1717           } catch (ServiceException se) {
1718             throw ProtobufUtil.getRemoteException(se);
1719           }
1720         } catch (IOException e) {
1721           cause = e.getCause();
1722           if(!(cause instanceof RegionOpeningException)) {
1723             reloadLocation = true;
1724           }
1725         }
1726         long expectedSleep = ConnectionUtils.getPauseTime(pause, tries);
1727         try {
1728           Thread.sleep(expectedSleep);
1729         } catch (InterruptedException e) {
1730           throw new IOException("Interrupted when waiting region " +
1731               loc.getRegionInfo().getEncodedName() + " online.", e);
1732         }
1733         tries++;
1734       }
1735 
1736       throw new IOException("Timeout when waiting region " + loc.getRegionInfo().getEncodedName() +
1737         " online for " + timeout + " milliseconds.", cause);
1738     }
1739 
1740     @Override
1741     public boolean flush() throws IOException {
1742       String curLoc = null;
1743       int curSize = 0;
1744       List<Pair<HRegionLocation, HLog.Entry>> curQueue = null;
1745       synchronized (this.serverToBufferQueueMap) {
1746         for (String locationKey : this.serverToBufferQueueMap.keySet()) {
1747           curQueue = this.serverToBufferQueueMap.get(locationKey);
1748           if (!curQueue.isEmpty()) {
1749             curSize = curQueue.size();
1750             curLoc = locationKey;
1751             break;
1752           }
1753         }
1754         if (curSize > 0) {
1755           this.serverToBufferQueueMap.remove(curLoc);
1756         }
1757       }
1758 
1759       if (curSize > 0) {
1760         this.processWorkItems(curLoc, curQueue);
1761         controller.dataAvailable.notifyAll();
1762         return true;
1763       }
1764       return false;
1765     }
1766 
1767     void addWriterError(Throwable t) {
1768       thrown.add(t);
1769     }
1770 
1771     @Override
1772     public List<Path> finishWritingAndClose() throws IOException {
1773       try {
1774         if (!finishWriting()) {
1775           return null;
1776         }
1777         if (hasEditsInDisablingOrDisabledTables) {
1778           splits = logRecoveredEditsOutputSink.finishWritingAndClose();
1779         } else {
1780           splits = new ArrayList<Path>();
1781         }
1782         // returns an empty array in order to keep interface same as old way
1783         return splits;
1784       } finally {
1785         List<IOException> thrown = closeRegionServerWriters();
1786         if (thrown != null && !thrown.isEmpty()) {
1787           throw MultipleIOException.createIOException(thrown);
1788         }
1789       }
1790     }
1791 
1792     @Override
1793     int getNumOpenWriters() {
1794       return this.writers.size() + this.logRecoveredEditsOutputSink.getNumOpenWriters();
1795     }
1796 
1797     private List<IOException> closeRegionServerWriters() throws IOException {
1798       List<IOException> result = null;
1799       if (!writersClosed) {
1800         result = Lists.newArrayList();
1801         try {
1802           for (WriterThread t : writerThreads) {
1803             while (t.isAlive()) {
1804               t.shouldStop = true;
1805               t.interrupt();
1806               try {
1807                 t.join(10);
1808               } catch (InterruptedException e) {
1809                 IOException iie = new InterruptedIOException();
1810                 iie.initCause(e);
1811                 throw iie;
1812               }
1813             }
1814           }
1815         } finally {
1816           synchronized (writers) {
1817             for (String locationKey : writers.keySet()) {
1818               RegionServerWriter tmpW = writers.get(locationKey);
1819               try {
1820                 tmpW.close();
1821               } catch (IOException ioe) {
1822                 LOG.error("Couldn't close writer for region server:" + locationKey, ioe);
1823                 result.add(ioe);
1824               }
1825             }
1826           }
1827 
1828           // close connections
1829           synchronized (this.tableNameToHConnectionMap) {
1830             for (TableName tableName : this.tableNameToHConnectionMap.keySet()) {
1831               HConnection hconn = this.tableNameToHConnectionMap.get(tableName);
1832               try {
1833                 hconn.clearRegionCache();
1834                 hconn.close();
1835               } catch (IOException ioe) {
1836                 result.add(ioe);
1837               }
1838             }
1839           }
1840           writersClosed = true;
1841         }
1842       }
1843       return result;
1844     }
1845 
1846     @Override
1847     public Map<byte[], Long> getOutputCounts() {
1848       TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
1849       synchronized (writers) {
1850         for (Map.Entry<String, RegionServerWriter> entry : writers.entrySet()) {
1851           ret.put(Bytes.toBytes(entry.getKey()), entry.getValue().editsWritten);
1852         }
1853       }
1854       return ret;
1855     }
1856 
1857     @Override
1858     public int getNumberOfRecoveredRegions() {
1859       return this.recoveredRegions.size();
1860     }
1861 
1862     /**
1863      * Get a writer and path for a log starting at the given entry. This function is threadsafe so
1864      * long as multiple threads are always acting on different regions.
1865      * @return null if this region shouldn't output any logs
1866      */
1867     private RegionServerWriter getRegionServerWriter(String loc) throws IOException {
1868       RegionServerWriter ret = writers.get(loc);
1869       if (ret != null) {
1870         return ret;
1871       }
1872 
1873       TableName tableName = getTableFromLocationStr(loc);
1874       if(tableName == null){
1875         throw new IOException("Invalid location string:" + loc + " found. Replay aborted.");
1876       }
1877 
1878       HConnection hconn = getConnectionByTableName(tableName);
1879       synchronized (writers) {
1880         ret = writers.get(loc);
1881         if (ret == null) {
1882           ret = new RegionServerWriter(conf, tableName, hconn);
1883           writers.put(loc, ret);
1884         }
1885       }
1886       return ret;
1887     }
1888 
1889     private HConnection getConnectionByTableName(final TableName tableName) throws IOException {
1890       HConnection hconn = this.tableNameToHConnectionMap.get(tableName);
1891       if (hconn == null) {
1892         synchronized (this.tableNameToHConnectionMap) {
1893           hconn = this.tableNameToHConnectionMap.get(tableName);
1894           if (hconn == null) {
1895             hconn = HConnectionManager.getConnection(conf);
1896             this.tableNameToHConnectionMap.put(tableName, hconn);
1897           }
1898         }
1899       }
1900       return hconn;
1901     }
1902     private TableName getTableFromLocationStr(String loc) {
1903       /**
1904        * location key is in format <server name:port>#<table name>
1905        */
1906       String[] splits = loc.split(KEY_DELIMITER);
1907       if (splits.length != 2) {
1908         return null;
1909       }
1910       return TableName.valueOf(splits[1]);
1911     }
1912   }
1913 
1914   /**
1915    * Private data structure that wraps a receiving RS and collecting statistics about the data
1916    * written to this newly assigned RS.
1917    */
1918   private final static class RegionServerWriter extends SinkWriter {
1919     final WALEditsReplaySink sink;
1920 
1921     RegionServerWriter(final Configuration conf, final TableName tableName, final HConnection conn)
1922         throws IOException {
1923       this.sink = new WALEditsReplaySink(conf, tableName, conn);
1924     }
1925 
1926     void close() throws IOException {
1927     }
1928   }
1929 
1930   static class CorruptedLogFileException extends Exception {
1931     private static final long serialVersionUID = 1L;
1932 
1933     CorruptedLogFileException(String s) {
1934       super(s);
1935     }
1936   }
1937 
1938   /** A struct used by getMutationsFromWALEntry */
1939   public static class MutationReplay {
1940     public MutationReplay(MutationType type, Mutation mutation, long nonceGroup, long nonce) {
1941       this.type = type;
1942       this.mutation = mutation;
1943       if(this.mutation.getDurability() != Durability.SKIP_WAL) {
1944         // using ASYNC_WAL for relay
1945         this.mutation.setDurability(Durability.ASYNC_WAL);
1946       }
1947       this.nonceGroup = nonceGroup;
1948       this.nonce = nonce;
1949     }
1950 
1951     public final MutationType type;
1952     public final Mutation mutation;
1953     public final long nonceGroup;
1954     public final long nonce;
1955   }
1956 
1957  /**
1958   * Tag original sequence number for each edit to be replayed
1959   * @param seqId
1960   * @param cell
1961   */
1962   private static Cell tagReplayLogSequenceNumber(long seqId, Cell cell) {
1963     // Tag puts with original sequence number if there is no LOG_REPLAY_TAG yet
1964     boolean needAddRecoveryTag = true;
1965     if (cell.getTagsLength() > 0) {
1966       Tag tmpTag = Tag.getTag(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength(),
1967         TagType.LOG_REPLAY_TAG_TYPE);
1968       if (tmpTag != null) {
1969         // found an existing log replay tag so reuse it
1970         needAddRecoveryTag = false;
1971       }
1972     }
1973     if (needAddRecoveryTag) {
1974       List<Tag> newTags = new ArrayList<Tag>();
1975       Tag replayTag = new Tag(TagType.LOG_REPLAY_TAG_TYPE, Bytes.toBytes(seqId));
1976       newTags.add(replayTag);
1977       return KeyValue.cloneAndAddTags(cell, newTags);
1978     }
1979     return cell;
1980   }
1981 
1982   /**
1983    * This function is used to construct mutations from a WALEntry. It also reconstructs HLogKey &
1984    * WALEdit from the passed in WALEntry
1985    * @param entry
1986    * @param cells
1987    * @param logEntry pair of HLogKey and WALEdit instance stores HLogKey and WALEdit instances
1988    *          extracted from the passed in WALEntry.
1989    * @param addLogReplayTag
1990    * @return list of Pair<MutationType, Mutation> to be replayed
1991    * @throws IOException
1992    */
1993   public static List<MutationReplay> getMutationsFromWALEntry(WALEntry entry, CellScanner cells,
1994       Pair<HLogKey, WALEdit> logEntry, boolean addLogReplayTag, Durability durability)
1995           throws IOException {
1996 
1997     if (entry == null) {
1998       // return an empty array
1999       return new ArrayList<MutationReplay>();
2000     }
2001 
2002     long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ?
2003       entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber();
2004     int count = entry.getAssociatedCellCount();
2005     List<MutationReplay> mutations = new ArrayList<MutationReplay>();
2006     Cell previousCell = null;
2007     Mutation m = null;
2008     HLogKey key = null;
2009     WALEdit val = null;
2010     if (logEntry != null) val = new WALEdit();
2011 
2012     for (int i = 0; i < count; i++) {
2013       // Throw index out of bounds if our cell count is off
2014       if (!cells.advance()) {
2015         throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
2016       }
2017       Cell cell = cells.current();
2018       if (val != null) val.add(cell);
2019 
2020       boolean isNewRowOrType =
2021           previousCell == null || previousCell.getTypeByte() != cell.getTypeByte()
2022               || !CellUtil.matchingRow(previousCell, cell);
2023       if (isNewRowOrType) {
2024         // Create new mutation
2025         if (CellUtil.isDelete(cell)) {
2026           m = new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
2027           // Deletes don't have nonces.
2028           mutations.add(new MutationReplay(
2029               MutationType.DELETE, m, HConstants.NO_NONCE, HConstants.NO_NONCE));
2030         } else {
2031           m = new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
2032           // Puts might come from increment or append, thus we need nonces.
2033           long nonceGroup = entry.getKey().hasNonceGroup()
2034               ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
2035           long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
2036           mutations.add(new MutationReplay(MutationType.PUT, m, nonceGroup, nonce));
2037         }
2038       }
2039       if (CellUtil.isDelete(cell)) {
2040         ((Delete) m).addDeleteMarker(cell);
2041       } else {
2042         Cell tmpNewCell = cell;
2043         if (addLogReplayTag) {
2044           tmpNewCell = tagReplayLogSequenceNumber(replaySeqId, cell);
2045         }
2046         ((Put) m).add(tmpNewCell);
2047       }
2048       if (m != null) {
2049         m.setDurability(durability);
2050       }
2051       previousCell = cell;
2052     }
2053 
2054     // reconstruct HLogKey
2055     if (logEntry != null) {
2056       WALKey walKey = entry.getKey();
2057       List<UUID> clusterIds = new ArrayList<UUID>(walKey.getClusterIdsCount());
2058       for (HBaseProtos.UUID uuid : entry.getKey().getClusterIdsList()) {
2059         clusterIds.add(new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits()));
2060       }
2061       key = new HLogKey(walKey.getEncodedRegionName().toByteArray(), TableName.valueOf(walKey
2062               .getTableName().toByteArray()), replaySeqId, walKey.getWriteTime(), clusterIds,
2063               walKey.getNonceGroup(), walKey.getNonce());
2064       logEntry.setFirst(key);
2065       logEntry.setSecond(val);
2066     }
2067 
2068     return mutations;
2069   }
2070 }