View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver.wal;
20  
21  import java.io.EOFException;
22  import java.io.FileNotFoundException;
23  import java.io.IOException;
24  import java.io.InterruptedIOException;
25  import java.text.ParseException;
26  import java.util.ArrayList;
27  import java.util.Collections;
28  import java.util.HashSet;
29  import java.util.LinkedList;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.Set;
33  import java.util.TreeMap;
34  import java.util.TreeSet;
35  import java.util.UUID;
36  import java.util.concurrent.Callable;
37  import java.util.concurrent.CompletionService;
38  import java.util.concurrent.ConcurrentHashMap;
39  import java.util.concurrent.ExecutionException;
40  import java.util.concurrent.ExecutorCompletionService;
41  import java.util.concurrent.Future;
42  import java.util.concurrent.ThreadFactory;
43  import java.util.concurrent.ThreadPoolExecutor;
44  import java.util.concurrent.TimeUnit;
45  import java.util.concurrent.atomic.AtomicBoolean;
46  import java.util.concurrent.atomic.AtomicLong;
47  import java.util.concurrent.atomic.AtomicReference;
48  
49  import org.apache.commons.logging.Log;
50  import org.apache.commons.logging.LogFactory;
51  import org.apache.hadoop.classification.InterfaceAudience;
52  import org.apache.hadoop.conf.Configuration;
53  import org.apache.hadoop.fs.FileStatus;
54  import org.apache.hadoop.fs.FileSystem;
55  import org.apache.hadoop.fs.Path;
56  import org.apache.hadoop.hbase.Cell;
57  import org.apache.hadoop.hbase.CellScanner;
58  import org.apache.hadoop.hbase.CellUtil;
59  import org.apache.hadoop.hbase.CoordinatedStateManager;
60  import org.apache.hadoop.hbase.CoordinatedStateException;
61  import org.apache.hadoop.hbase.HBaseConfiguration;
62  import org.apache.hadoop.hbase.HConstants;
63  import org.apache.hadoop.hbase.HRegionInfo;
64  import org.apache.hadoop.hbase.HRegionLocation;
65  import org.apache.hadoop.hbase.KeyValue;
66  import org.apache.hadoop.hbase.KeyValueUtil;
67  import org.apache.hadoop.hbase.RemoteExceptionHandler;
68  import org.apache.hadoop.hbase.ServerName;
69  import org.apache.hadoop.hbase.TableName;
70  import org.apache.hadoop.hbase.TableNotFoundException;
71  import org.apache.hadoop.hbase.TableStateManager;
72  import org.apache.hadoop.hbase.Tag;
73  import org.apache.hadoop.hbase.TagType;
74  import org.apache.hadoop.hbase.client.ConnectionUtils;
75  import org.apache.hadoop.hbase.client.Delete;
76  import org.apache.hadoop.hbase.client.HConnection;
77  import org.apache.hadoop.hbase.client.HConnectionManager;
78  import org.apache.hadoop.hbase.client.Mutation;
79  import org.apache.hadoop.hbase.client.Put;
80  import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
81  import org.apache.hadoop.hbase.io.HeapSize;
82  import org.apache.hadoop.hbase.io.hfile.HFile;
83  import org.apache.hadoop.hbase.master.SplitLogManager;
84  import org.apache.hadoop.hbase.monitoring.MonitoredTask;
85  import org.apache.hadoop.hbase.monitoring.TaskMonitor;
86  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
87  import org.apache.hadoop.hbase.protobuf.RequestConverter;
88  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
89  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest;
90  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse;
91  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
92  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
93  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
94  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey;
95  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
96  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds;
97  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
98  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.StoreSequenceId;
99  import org.apache.hadoop.hbase.regionserver.HRegion;
100 import org.apache.hadoop.hbase.regionserver.LastSequenceId;
101 import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
102 import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
103 import org.apache.hadoop.hbase.regionserver.wal.HLog.Writer;
104 import org.apache.hadoop.hbase.util.Bytes;
105 import org.apache.hadoop.hbase.util.CancelableProgressable;
106 import org.apache.hadoop.hbase.util.ClassSize;
107 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
108 import org.apache.hadoop.hbase.util.FSUtils;
109 import org.apache.hadoop.hbase.util.Pair;
110 import org.apache.hadoop.hbase.util.Threads;
111 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
112 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
113 import org.apache.hadoop.io.MultipleIOException;
114 
115 import com.google.common.base.Preconditions;
116 import com.google.common.collect.Lists;
117 import com.google.protobuf.ServiceException;
118 
119 /**
120  * This class is responsible for splitting up a bunch of regionserver commit log
121  * files that are no longer being written to, into new files, one per region for
122  * region to replay on startup. Delete the old log files when finished.
123  */
124 @InterfaceAudience.Private
125 public class HLogSplitter {
126   static final Log LOG = LogFactory.getLog(HLogSplitter.class);
127 
128   // Parameters for split process
129   protected final Path rootDir;
130   protected final FileSystem fs;
131   protected final Configuration conf;
132 
133   // Major subcomponents of the split process.
134   // These are separated into inner classes to make testing easier.
135   OutputSink outputSink;
136   EntryBuffers entryBuffers;
137 
138   private Set<TableName> disablingOrDisabledTables =
139       new HashSet<TableName>();
140   private ZooKeeperWatcher watcher;
141   private CoordinatedStateManager csm;
142 
143   // If an exception is thrown by one of the other threads, it will be
144   // stored here.
145   protected AtomicReference<Throwable> thrown = new AtomicReference<Throwable>();
146 
147   // Wait/notify for when data has been produced by the reader thread,
148   // consumed by the reader thread, or an exception occurred
149   final Object dataAvailable = new Object();
150 
151   private MonitoredTask status;
152 
153   // For checking the latest flushed sequence id
154   protected final LastSequenceId sequenceIdChecker;
155 
156   protected boolean distributedLogReplay;
157 
158   // Map encodedRegionName -> lastFlushedSequenceId
159   protected Map<String, Long> lastFlushedSequenceIds = new ConcurrentHashMap<String, Long>();
160 
161   // Map encodedRegionName -> maxSeqIdInStores
162   protected Map<String, Map<byte[], Long>> regionMaxSeqIdInStores =
163       new ConcurrentHashMap<String, Map<byte[], Long>>();
164 
165   // Failed region server that the wal file being split belongs to
166   protected String failedServerName = "";
167 
168   // Number of writer threads
169   private final int numWriterThreads;
170 
171   // Min batch size when replay WAL edits
172   private final int minBatchSize;
173 
174   HLogSplitter(Configuration conf, Path rootDir,
175       FileSystem fs, LastSequenceId idChecker, ZooKeeperWatcher zkw,
176       CoordinatedStateManager csm, RecoveryMode mode) {
177     this.conf = HBaseConfiguration.create(conf);
178     String codecClassName = conf
179         .get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
180     this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName);
181     this.rootDir = rootDir;
182     this.fs = fs;
183     this.sequenceIdChecker = idChecker;
184     this.watcher = zkw;
185     this.csm = csm;
186 
187     entryBuffers = new EntryBuffers(
188         this.conf.getInt("hbase.regionserver.hlog.splitlog.buffersize",
189             128*1024*1024));
190 
191     // a larger minBatchSize may slow down recovery because replay writer has to wait for
192     // enough edits before replaying them
193     this.minBatchSize = this.conf.getInt("hbase.regionserver.wal.logreplay.batch.size", 64);
194     this.distributedLogReplay = (RecoveryMode.LOG_REPLAY == mode);
195 
196     this.numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
197     if (zkw != null && csm != null && this.distributedLogReplay) {
198       outputSink = new LogReplayOutputSink(numWriterThreads);
199     } else {
200       if (this.distributedLogReplay) {
201         LOG.info("ZooKeeperWatcher is passed in as NULL so disable distrubitedLogRepaly.");
202       }
203       this.distributedLogReplay = false;
204       outputSink = new LogRecoveredEditsOutputSink(numWriterThreads);
205     }
206 
207   }
208 
209   /**
210    * Splits a HLog file into region's recovered-edits directory.
211    * This is the main entry point for distributed log splitting from SplitLogWorker.
212    * <p>
213    * If the log file has N regions then N recovered.edits files will be produced.
214    * <p>
215    * @param rootDir
216    * @param logfile
217    * @param fs
218    * @param conf
219    * @param reporter
220    * @param idChecker
221    * @param zkw ZooKeeperWatcher if it's null, we will back to the old-style log splitting where we
222    *          dump out recoved.edits files for regions to replay on.
223    * @return false if it is interrupted by the progress-able.
224    * @throws IOException
225    */
226   public static boolean splitLogFile(Path rootDir, FileStatus logfile, FileSystem fs,
227       Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker,
228       ZooKeeperWatcher zkw, CoordinatedStateManager cp, RecoveryMode mode) throws IOException {
229     HLogSplitter s = new HLogSplitter(conf, rootDir, fs, idChecker, zkw, cp, mode);
230     return s.splitLogFile(logfile, reporter);
231   }
232 
233   // A wrapper to split one log folder using the method used by distributed
234   // log splitting. Used by tools and unit tests. It should be package private.
235   // It is public only because TestWALObserver is in a different package,
236   // which uses this method to to log splitting.
237   public static List<Path> split(Path rootDir, Path logDir, Path oldLogDir,
238       FileSystem fs, Configuration conf) throws IOException {
239     FileStatus[] logfiles = fs.listStatus(logDir);
240     List<Path> splits = new ArrayList<Path>();
241     if (logfiles != null && logfiles.length > 0) {
242       for (FileStatus logfile: logfiles) {
243         HLogSplitter s = new HLogSplitter(conf, rootDir, fs, null, null, null, 
244           RecoveryMode.LOG_SPLITTING);
245         if (s.splitLogFile(logfile, null)) {
246           finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf);
247           if (s.outputSink.splits != null) {
248             splits.addAll(s.outputSink.splits);
249           }
250         }
251       }
252     }
253     if (!fs.delete(logDir, true)) {
254       throw new IOException("Unable to delete src dir: " + logDir);
255     }
256     return splits;
257   }
258 
259   // The real log splitter. It just splits one log file.
260   boolean splitLogFile(FileStatus logfile,
261       CancelableProgressable reporter) throws IOException {
262     boolean isCorrupted = false;
263     Preconditions.checkState(status == null);
264     boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors",
265       HLog.SPLIT_SKIP_ERRORS_DEFAULT);
266     int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024);
267     Path logPath = logfile.getPath();
268     boolean outputSinkStarted = false;
269     boolean progress_failed = false;
270     int editsCount = 0;
271     int editsSkipped = 0;
272 
273     status =
274         TaskMonitor.get().createStatus(
275           "Splitting log file " + logfile.getPath() + "into a temporary staging area.");
276     try {
277       long logLength = logfile.getLen();
278       LOG.info("Splitting hlog: " + logPath + ", length=" + logLength);
279       LOG.info("DistributedLogReplay = " + this.distributedLogReplay);
280       status.setStatus("Opening log file");
281       if (reporter != null && !reporter.progress()) {
282         progress_failed = true;
283         return false;
284       }
285       Reader in = null;
286       try {
287         in = getReader(fs, logfile, conf, skipErrors, reporter);
288       } catch (CorruptedLogFileException e) {
289         LOG.warn("Could not get reader, corrupted log file " + logPath, e);
290         ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs);
291         isCorrupted = true;
292       }
293       if (in == null) {
294         LOG.warn("Nothing to split in log file " + logPath);
295         return true;
296       }
297       if(watcher != null && csm != null) {
298         try {
299           TableStateManager tsm = csm.getTableStateManager();
300           disablingOrDisabledTables = tsm.getTablesInStates(
301             ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING);
302         } catch (CoordinatedStateException e) {
303           throw new IOException("Can't get disabling/disabled tables", e);
304         }
305       }
306       int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3);
307       int numOpenedFilesLastCheck = 0;
308       outputSink.setReporter(reporter);
309       outputSink.startWriterThreads();
310       outputSinkStarted = true;
311       Entry entry;
312       Long lastFlushedSequenceId = -1L;
313       ServerName serverName = HLogUtil.getServerNameFromHLogDirectoryName(logPath);
314       failedServerName = (serverName == null) ? "" : serverName.getServerName();
315       while ((entry = getNextLogLine(in, logPath, skipErrors)) != null) {
316         byte[] region = entry.getKey().getEncodedRegionName();
317         String key = Bytes.toString(region);
318         lastFlushedSequenceId = lastFlushedSequenceIds.get(key);
319         if (lastFlushedSequenceId == null) {
320           if (this.distributedLogReplay) {
321             RegionStoreSequenceIds ids =
322                 SplitLogManager.getRegionFlushedSequenceId(this.watcher, failedServerName, key);
323             if (ids != null) {
324               lastFlushedSequenceId = ids.getLastFlushedSequenceId();
325             }
326           } else if (sequenceIdChecker != null) {
327             lastFlushedSequenceId = sequenceIdChecker.getLastSequenceId(region);
328           }
329           if (lastFlushedSequenceId == null) {
330             lastFlushedSequenceId = -1L;
331           }
332           lastFlushedSequenceIds.put(key, lastFlushedSequenceId);
333         }
334         if (lastFlushedSequenceId >= entry.getKey().getLogSeqNum()) {
335           editsSkipped++;
336           continue;
337         }
338         entryBuffers.appendEntry(entry);
339         editsCount++;
340         int moreWritersFromLastCheck = this.getNumOpenWriters() - numOpenedFilesLastCheck;
341         // If sufficient edits have passed, check if we should report progress.
342         if (editsCount % interval == 0
343             || moreWritersFromLastCheck > numOpenedFilesBeforeReporting) {
344           numOpenedFilesLastCheck = this.getNumOpenWriters();
345           String countsStr = (editsCount - (editsSkipped + outputSink.getSkippedEdits()))
346               + " edits, skipped " + editsSkipped + " edits.";
347           status.setStatus("Split " + countsStr);
348           if (reporter != null && !reporter.progress()) {
349             progress_failed = true;
350             return false;
351           }
352         }
353       }
354     } catch (InterruptedException ie) {
355       IOException iie = new InterruptedIOException();
356       iie.initCause(ie);
357       throw iie;
358     } catch (CorruptedLogFileException e) {
359       LOG.warn("Could not parse, corrupted log file " + logPath, e);
360       ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs);
361       isCorrupted = true;
362     } catch (IOException e) {
363       e = RemoteExceptionHandler.checkIOException(e);
364       throw e;
365     } finally {
366       LOG.debug("Finishing writing output logs and closing down.");
367       try {
368         if (outputSinkStarted) {
369           // Set progress_failed to true as the immediate following statement will reset its value
370           // when finishWritingAndClose() throws exception, progress_failed has the right value
371           progress_failed = true;
372           progress_failed = outputSink.finishWritingAndClose() == null;
373         }
374       } finally {
375         String msg =
376             "Processed " + editsCount + " edits across " + outputSink.getNumberOfRecoveredRegions()
377                 + " regions; log file=" + logPath + " is corrupted = " + isCorrupted
378                 + " progress failed = " + progress_failed;
379         LOG.info(msg);
380         status.markComplete(msg);
381       }
382     }
383     return !progress_failed;
384   }
385 
386   /**
387    * Completes the work done by splitLogFile by archiving logs
388    * <p>
389    * It is invoked by SplitLogManager once it knows that one of the
390    * SplitLogWorkers have completed the splitLogFile() part. If the master
391    * crashes then this function might get called multiple times.
392    * <p>
393    * @param logfile
394    * @param conf
395    * @throws IOException
396    */
397   public static void finishSplitLogFile(String logfile,
398       Configuration conf)  throws IOException {
399     Path rootdir = FSUtils.getRootDir(conf);
400     Path oldLogDir = new Path(rootdir, HConstants.HREGION_OLDLOGDIR_NAME);
401     Path logPath;
402     if (FSUtils.isStartingWithPath(rootdir, logfile)) {
403       logPath = new Path(logfile);
404     } else {
405       logPath = new Path(rootdir, logfile);
406     }
407     finishSplitLogFile(rootdir, oldLogDir, logPath, conf);
408   }
409 
410   static void finishSplitLogFile(Path rootdir, Path oldLogDir,
411       Path logPath, Configuration conf) throws IOException {
412     List<Path> processedLogs = new ArrayList<Path>();
413     List<Path> corruptedLogs = new ArrayList<Path>();
414     FileSystem fs;
415     fs = rootdir.getFileSystem(conf);
416     if (ZKSplitLog.isCorrupted(rootdir, logPath.getName(), fs)) {
417       corruptedLogs.add(logPath);
418     } else {
419       processedLogs.add(logPath);
420     }
421     archiveLogs(corruptedLogs, processedLogs, oldLogDir, fs, conf);
422     Path stagingDir = ZKSplitLog.getSplitLogDir(rootdir, logPath.getName());
423     fs.delete(stagingDir, true);
424   }
425 
426   /**
427    * Moves processed logs to a oldLogDir after successful processing Moves
428    * corrupted logs (any log that couldn't be successfully parsed to corruptDir
429    * (.corrupt) for later investigation
430    *
431    * @param corruptedLogs
432    * @param processedLogs
433    * @param oldLogDir
434    * @param fs
435    * @param conf
436    * @throws IOException
437    */
438   private static void archiveLogs(
439       final List<Path> corruptedLogs,
440       final List<Path> processedLogs, final Path oldLogDir,
441       final FileSystem fs, final Configuration conf) throws IOException {
442     final Path corruptDir = new Path(FSUtils.getRootDir(conf), conf.get(
443         "hbase.regionserver.hlog.splitlog.corrupt.dir",  HConstants.CORRUPT_DIR_NAME));
444 
445     if (!fs.mkdirs(corruptDir)) {
446       LOG.info("Unable to mkdir " + corruptDir);
447     }
448     fs.mkdirs(oldLogDir);
449 
450     // this method can get restarted or called multiple times for archiving
451     // the same log files.
452     for (Path corrupted : corruptedLogs) {
453       Path p = new Path(corruptDir, corrupted.getName());
454       if (fs.exists(corrupted)) {
455         if (!fs.rename(corrupted, p)) {
456           LOG.warn("Unable to move corrupted log " + corrupted + " to " + p);
457         } else {
458           LOG.warn("Moved corrupted log " + corrupted + " to " + p);
459         }
460       }
461     }
462 
463     for (Path p : processedLogs) {
464       Path newPath = FSHLog.getHLogArchivePath(oldLogDir, p);
465       if (fs.exists(p)) {
466         if (!FSUtils.renameAndSetModifyTime(fs, p, newPath)) {
467           LOG.warn("Unable to move  " + p + " to " + newPath);
468         } else {
469           LOG.info("Archived processed log " + p + " to " + newPath);
470         }
471       }
472     }
473   }
474 
475   /**
476    * Path to a file under RECOVERED_EDITS_DIR directory of the region found in
477    * <code>logEntry</code> named for the sequenceid in the passed
478    * <code>logEntry</code>: e.g. /hbase/some_table/2323432434/recovered.edits/2332.
479    * This method also ensures existence of RECOVERED_EDITS_DIR under the region
480    * creating it if necessary.
481    * @param fs
482    * @param logEntry
483    * @param rootDir HBase root dir.
484    * @return Path to file into which to dump split log edits.
485    * @throws IOException
486    */
487   @SuppressWarnings("deprecation")
488   static Path getRegionSplitEditsPath(final FileSystem fs,
489       final Entry logEntry, final Path rootDir, boolean isCreate)
490   throws IOException {
491     Path tableDir = FSUtils.getTableDir(rootDir, logEntry.getKey().getTablename());
492     String encodedRegionName = Bytes.toString(logEntry.getKey().getEncodedRegionName());
493     Path regiondir = HRegion.getRegionDir(tableDir, encodedRegionName);
494     Path dir = HLogUtil.getRegionDirRecoveredEditsDir(regiondir);
495 
496     if (!fs.exists(regiondir)) {
497       LOG.info("This region's directory doesn't exist: "
498           + regiondir.toString() + ". It is very likely that it was" +
499           " already split so it's safe to discard those edits.");
500       return null;
501     }
502     if (fs.exists(dir) && fs.isFile(dir)) {
503       Path tmp = new Path("/tmp");
504       if (!fs.exists(tmp)) {
505         fs.mkdirs(tmp);
506       }
507       tmp = new Path(tmp,
508         HConstants.RECOVERED_EDITS_DIR + "_" + encodedRegionName);
509       LOG.warn("Found existing old file: " + dir + ". It could be some "
510         + "leftover of an old installation. It should be a folder instead. "
511         + "So moving it to " + tmp);
512       if (!fs.rename(dir, tmp)) {
513         LOG.warn("Failed to sideline old file " + dir);
514       }
515     }
516 
517     if (isCreate && !fs.exists(dir)) {
518       if (!fs.mkdirs(dir)) LOG.warn("mkdir failed on " + dir);
519     }
520     // Append file name ends with RECOVERED_LOG_TMPFILE_SUFFIX to ensure
521     // region's replayRecoveredEdits will not delete it
522     String fileName = formatRecoveredEditsFileName(logEntry.getKey().getLogSeqNum());
523     fileName = getTmpRecoveredEditsFileName(fileName);
524     return new Path(dir, fileName);
525   }
526 
527   static String getTmpRecoveredEditsFileName(String fileName) {
528     return fileName + HLog.RECOVERED_LOG_TMPFILE_SUFFIX;
529   }
530 
531   /**
532    * Get the completed recovered edits file path, renaming it to be by last edit
533    * in the file from its first edit. Then we could use the name to skip
534    * recovered edits when doing {@link HRegion#replayRecoveredEditsIfAny}.
535    * @param srcPath
536    * @param maximumEditLogSeqNum
537    * @return dstPath take file's last edit log seq num as the name
538    */
539   static Path getCompletedRecoveredEditsFilePath(Path srcPath,
540       Long maximumEditLogSeqNum) {
541     String fileName = formatRecoveredEditsFileName(maximumEditLogSeqNum);
542     return new Path(srcPath.getParent(), fileName);
543   }
544 
545   static String formatRecoveredEditsFileName(final long seqid) {
546     return String.format("%019d", seqid);
547   }
548 
549   /**
550    * Create a new {@link Reader} for reading logs to split.
551    *
552    * @param fs
553    * @param file
554    * @param conf
555    * @return A new Reader instance
556    * @throws IOException
557    * @throws CorruptedLogFileException
558    */
559   protected Reader getReader(FileSystem fs, FileStatus file, Configuration conf,
560       boolean skipErrors, CancelableProgressable reporter)
561       throws IOException, CorruptedLogFileException {
562     Path path = file.getPath();
563     long length = file.getLen();
564     Reader in;
565 
566     // Check for possibly empty file. With appends, currently Hadoop reports a
567     // zero length even if the file has been sync'd. Revisit if HDFS-376 or
568     // HDFS-878 is committed.
569     if (length <= 0) {
570       LOG.warn("File " + path + " might be still open, length is 0");
571     }
572 
573     try {
574       FSUtils.getInstance(fs, conf).recoverFileLease(fs, path, conf, reporter);
575       try {
576         in = getReader(fs, path, conf, reporter);
577       } catch (EOFException e) {
578         if (length <= 0) {
579           // TODO should we ignore an empty, not-last log file if skip.errors
580           // is false? Either way, the caller should decide what to do. E.g.
581           // ignore if this is the last log in sequence.
582           // TODO is this scenario still possible if the log has been
583           // recovered (i.e. closed)
584           LOG.warn("Could not open " + path + " for reading. File is empty", e);
585           return null;
586         } else {
587           // EOFException being ignored
588           return null;
589         }
590       }
591     } catch (IOException e) {
592       if (e instanceof FileNotFoundException) {
593         // A wal file may not exist anymore. Nothing can be recovered so move on
594         LOG.warn("File " + path + " doesn't exist anymore.", e);
595         return null;
596       }
597       if (!skipErrors || e instanceof InterruptedIOException) {
598         throw e; // Don't mark the file corrupted if interrupted, or not skipErrors
599       }
600       CorruptedLogFileException t =
601         new CorruptedLogFileException("skipErrors=true Could not open hlog " +
602             path + " ignoring");
603       t.initCause(e);
604       throw t;
605     }
606     return in;
607   }
608 
609   static private Entry getNextLogLine(Reader in, Path path, boolean skipErrors)
610   throws CorruptedLogFileException, IOException {
611     try {
612       return in.next();
613     } catch (EOFException eof) {
614       // truncated files are expected if a RS crashes (see HBASE-2643)
615       LOG.info("EOF from hlog " + path + ".  continuing");
616       return null;
617     } catch (IOException e) {
618       // If the IOE resulted from bad file format,
619       // then this problem is idempotent and retrying won't help
620       if (e.getCause() != null &&
621           (e.getCause() instanceof ParseException ||
622            e.getCause() instanceof org.apache.hadoop.fs.ChecksumException)) {
623         LOG.warn("Parse exception " + e.getCause().toString() + " from hlog "
624            + path + ".  continuing");
625         return null;
626       }
627       if (!skipErrors) {
628         throw e;
629       }
630       CorruptedLogFileException t =
631         new CorruptedLogFileException("skipErrors=true Ignoring exception" +
632             " while parsing hlog " + path + ". Marking as corrupted");
633       t.initCause(e);
634       throw t;
635     }
636   }
637 
638   private void writerThreadError(Throwable t) {
639     thrown.compareAndSet(null, t);
640   }
641 
642   /**
643    * Check for errors in the writer threads. If any is found, rethrow it.
644    */
645   private void checkForErrors() throws IOException {
646     Throwable thrown = this.thrown.get();
647     if (thrown == null) return;
648     if (thrown instanceof IOException) {
649       throw new IOException(thrown);
650     } else {
651       throw new RuntimeException(thrown);
652     }
653   }
654   /**
655    * Create a new {@link Writer} for writing log splits.
656    */
657   protected Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
658       throws IOException {
659     return HLogFactory.createRecoveredEditsWriter(fs, logfile, conf);
660   }
661 
662   /**
663    * Create a new {@link Reader} for reading logs to split.
664    */
665   protected Reader getReader(FileSystem fs, Path curLogFile,
666       Configuration conf, CancelableProgressable reporter) throws IOException {
667     return HLogFactory.createReader(fs, curLogFile, conf, reporter);
668   }
669 
670   /**
671    * Get current open writers
672    */
673   private int getNumOpenWriters() {
674     int result = 0;
675     if (this.outputSink != null) {
676       result += this.outputSink.getNumOpenWriters();
677     }
678     return result;
679   }
680 
681   /**
682    * Class which accumulates edits and separates them into a buffer per region
683    * while simultaneously accounting RAM usage. Blocks if the RAM usage crosses
684    * a predefined threshold.
685    *
686    * Writer threads then pull region-specific buffers from this class.
687    */
688   class EntryBuffers {
689     Map<byte[], RegionEntryBuffer> buffers =
690       new TreeMap<byte[], RegionEntryBuffer>(Bytes.BYTES_COMPARATOR);
691 
692     /* Track which regions are currently in the middle of writing. We don't allow
693        an IO thread to pick up bytes from a region if we're already writing
694        data for that region in a different IO thread. */
695     Set<byte[]> currentlyWriting = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
696 
697     long totalBuffered = 0;
698     long maxHeapUsage;
699 
700     EntryBuffers(long maxHeapUsage) {
701       this.maxHeapUsage = maxHeapUsage;
702     }
703 
704     /**
705      * Append a log entry into the corresponding region buffer.
706      * Blocks if the total heap usage has crossed the specified threshold.
707      *
708      * @throws InterruptedException
709      * @throws IOException
710      */
711     void appendEntry(Entry entry) throws InterruptedException, IOException {
712       HLogKey key = entry.getKey();
713 
714       RegionEntryBuffer buffer;
715       long incrHeap;
716       synchronized (this) {
717         buffer = buffers.get(key.getEncodedRegionName());
718         if (buffer == null) {
719           buffer = new RegionEntryBuffer(key.getTablename(), key.getEncodedRegionName());
720           buffers.put(key.getEncodedRegionName(), buffer);
721         }
722         incrHeap= buffer.appendEntry(entry);
723       }
724 
725       // If we crossed the chunk threshold, wait for more space to be available
726       synchronized (dataAvailable) {
727         totalBuffered += incrHeap;
728         while (totalBuffered > maxHeapUsage && thrown.get() == null) {
729           LOG.debug("Used " + totalBuffered + " bytes of buffered edits, waiting for IO threads...");
730           dataAvailable.wait(2000);
731         }
732         dataAvailable.notifyAll();
733       }
734       checkForErrors();
735     }
736 
737     /**
738      * @return RegionEntryBuffer a buffer of edits to be written or replayed.
739      */
740     synchronized RegionEntryBuffer getChunkToWrite() {
741       long biggestSize = 0;
742       byte[] biggestBufferKey = null;
743 
744       for (Map.Entry<byte[], RegionEntryBuffer> entry : buffers.entrySet()) {
745         long size = entry.getValue().heapSize();
746         if (size > biggestSize && (!currentlyWriting.contains(entry.getKey()))) {
747           biggestSize = size;
748           biggestBufferKey = entry.getKey();
749         }
750       }
751       if (biggestBufferKey == null) {
752         return null;
753       }
754 
755       RegionEntryBuffer buffer = buffers.remove(biggestBufferKey);
756       currentlyWriting.add(biggestBufferKey);
757       return buffer;
758     }
759 
760     void doneWriting(RegionEntryBuffer buffer) {
761       synchronized (this) {
762         boolean removed = currentlyWriting.remove(buffer.encodedRegionName);
763         assert removed;
764       }
765       long size = buffer.heapSize();
766 
767       synchronized (dataAvailable) {
768         totalBuffered -= size;
769         // We may unblock writers
770         dataAvailable.notifyAll();
771       }
772     }
773 
774     synchronized boolean isRegionCurrentlyWriting(byte[] region) {
775       return currentlyWriting.contains(region);
776     }
777   }
778 
779   /**
780    * A buffer of some number of edits for a given region.
781    * This accumulates edits and also provides a memory optimization in order to
782    * share a single byte array instance for the table and region name.
783    * Also tracks memory usage of the accumulated edits.
784    */
785   static class RegionEntryBuffer implements HeapSize {
786     long heapInBuffer = 0;
787     List<Entry> entryBuffer;
788     TableName tableName;
789     byte[] encodedRegionName;
790 
791     RegionEntryBuffer(TableName tableName, byte[] region) {
792       this.tableName = tableName;
793       this.encodedRegionName = region;
794       this.entryBuffer = new LinkedList<Entry>();
795     }
796 
797     long appendEntry(Entry entry) {
798       internify(entry);
799       entryBuffer.add(entry);
800       long incrHeap = entry.getEdit().heapSize() +
801         ClassSize.align(2 * ClassSize.REFERENCE) + // HLogKey pointers
802         0; // TODO linkedlist entry
803       heapInBuffer += incrHeap;
804       return incrHeap;
805     }
806 
807     private void internify(Entry entry) {
808       HLogKey k = entry.getKey();
809       k.internTableName(this.tableName);
810       k.internEncodedRegionName(this.encodedRegionName);
811     }
812 
813     public long heapSize() {
814       return heapInBuffer;
815     }
816   }
817 
818   class WriterThread extends Thread {
819     private volatile boolean shouldStop = false;
820     private OutputSink outputSink = null;
821 
822     WriterThread(OutputSink sink, int i) {
823       super(Thread.currentThread().getName() + "-Writer-" + i);
824       outputSink = sink;
825     }
826 
827     public void run()  {
828       try {
829         doRun();
830       } catch (Throwable t) {
831         LOG.error("Exiting thread", t);
832         writerThreadError(t);
833       }
834     }
835 
836     private void doRun() throws IOException {
837       LOG.debug("Writer thread " + this + ": starting");
838       while (true) {
839         RegionEntryBuffer buffer = entryBuffers.getChunkToWrite();
840         if (buffer == null) {
841           // No data currently available, wait on some more to show up
842           synchronized (dataAvailable) {
843             if (shouldStop && !this.outputSink.flush()) {
844               return;
845             }
846             try {
847               dataAvailable.wait(500);
848             } catch (InterruptedException ie) {
849               if (!shouldStop) {
850                 throw new RuntimeException(ie);
851               }
852             }
853           }
854           continue;
855         }
856 
857         assert buffer != null;
858         try {
859           writeBuffer(buffer);
860         } finally {
861           entryBuffers.doneWriting(buffer);
862         }
863       }
864     }
865 
866     private void writeBuffer(RegionEntryBuffer buffer) throws IOException {
867       outputSink.append(buffer);
868     }
869 
870     void finish() {
871       synchronized (dataAvailable) {
872         shouldStop = true;
873         dataAvailable.notifyAll();
874       }
875     }
876   }
877 
878   /**
879    * The following class is an abstraction class to provide a common interface to support both
880    * existing recovered edits file sink and region server WAL edits replay sink
881    */
882    abstract class OutputSink {
883 
884     protected Map<byte[], SinkWriter> writers = Collections
885         .synchronizedMap(new TreeMap<byte[], SinkWriter>(Bytes.BYTES_COMPARATOR));;
886 
887     protected final Map<byte[], Long> regionMaximumEditLogSeqNum = Collections
888         .synchronizedMap(new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR));
889 
890     protected final List<WriterThread> writerThreads = Lists.newArrayList();
891 
892     /* Set of regions which we've decided should not output edits */
893     protected final Set<byte[]> blacklistedRegions = Collections
894         .synchronizedSet(new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR));
895 
896     protected boolean closeAndCleanCompleted = false;
897 
898     protected boolean writersClosed = false;
899 
900     protected final int numThreads;
901 
902     protected CancelableProgressable reporter = null;
903 
904     protected AtomicLong skippedEdits = new AtomicLong();
905 
906     protected List<Path> splits = null;
907 
908     public OutputSink(int numWriters) {
909       numThreads = numWriters;
910     }
911 
912     void setReporter(CancelableProgressable reporter) {
913       this.reporter = reporter;
914     }
915 
916     /**
917      * Start the threads that will pump data from the entryBuffers to the output files.
918      */
919     synchronized void startWriterThreads() {
920       for (int i = 0; i < numThreads; i++) {
921         WriterThread t = new WriterThread(this, i);
922         t.start();
923         writerThreads.add(t);
924       }
925     }
926 
927     /**
928      *
929      * Update region's maximum edit log SeqNum.
930      */
931     void updateRegionMaximumEditLogSeqNum(Entry entry) {
932       synchronized (regionMaximumEditLogSeqNum) {
933         Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(entry.getKey()
934             .getEncodedRegionName());
935         if (currentMaxSeqNum == null || entry.getKey().getLogSeqNum() > currentMaxSeqNum) {
936           regionMaximumEditLogSeqNum.put(entry.getKey().getEncodedRegionName(), entry.getKey()
937               .getLogSeqNum());
938         }
939       }
940     }
941 
942     Long getRegionMaximumEditLogSeqNum(byte[] region) {
943       return regionMaximumEditLogSeqNum.get(region);
944     }
945 
946     /**
947      * @return the number of currently opened writers
948      */
949     int getNumOpenWriters() {
950       return this.writers.size();
951     }
952 
953     long getSkippedEdits() {
954       return this.skippedEdits.get();
955     }
956 
957     /**
958      * Wait for writer threads to dump all info to the sink
959      * @return true when there is no error
960      * @throws IOException
961      */
962     protected boolean finishWriting() throws IOException {
963       LOG.debug("Waiting for split writer threads to finish");
964       boolean progress_failed = false;
965       for (WriterThread t : writerThreads) {
966         t.finish();
967       }
968       for (WriterThread t : writerThreads) {
969         if (!progress_failed && reporter != null && !reporter.progress()) {
970           progress_failed = true;
971         }
972         try {
973           t.join();
974         } catch (InterruptedException ie) {
975           IOException iie = new InterruptedIOException();
976           iie.initCause(ie);
977           throw iie;
978         }
979       }
980       checkForErrors();
981       LOG.info("Split writers finished");
982       return (!progress_failed);
983     }
984 
985     abstract List<Path> finishWritingAndClose() throws IOException;
986 
987     /**
988      * @return a map from encoded region ID to the number of edits written out for that region.
989      */
990     abstract Map<byte[], Long> getOutputCounts();
991 
992     /**
993      * @return number of regions we've recovered
994      */
995     abstract int getNumberOfRecoveredRegions();
996 
997     /**
998      * @param buffer A WAL Edit Entry
999      * @throws IOException
1000      */
1001     abstract void append(RegionEntryBuffer buffer) throws IOException;
1002 
1003     /**
1004      * WriterThread call this function to help flush internal remaining edits in buffer before close
1005      * @return true when underlying sink has something to flush
1006      */
1007     protected boolean flush() throws IOException {
1008       return false;
1009     }
1010   }
1011 
1012   /**
1013    * Class that manages the output streams from the log splitting process.
1014    */
1015   class LogRecoveredEditsOutputSink extends OutputSink {
1016 
1017     public LogRecoveredEditsOutputSink(int numWriters) {
1018       // More threads could potentially write faster at the expense
1019       // of causing more disk seeks as the logs are split.
1020       // 3. After a certain setting (probably around 3) the
1021       // process will be bound on the reader in the current
1022       // implementation anyway.
1023       super(numWriters);
1024     }
1025 
1026     /**
1027      * @return null if failed to report progress
1028      * @throws IOException
1029      */
1030     @Override
1031     List<Path> finishWritingAndClose() throws IOException {
1032       boolean isSuccessful = false;
1033       List<Path> result = null;
1034       try {
1035         isSuccessful = finishWriting();
1036       } finally {
1037         result = close();
1038         List<IOException> thrown = closeLogWriters(null);
1039         if (thrown != null && !thrown.isEmpty()) {
1040           throw MultipleIOException.createIOException(thrown);
1041         }
1042       }
1043       if (isSuccessful) {
1044         splits = result;
1045       }
1046       return splits;
1047     }
1048 
1049     /**
1050      * Close all of the output streams.
1051      * @return the list of paths written.
1052      */
1053     private List<Path> close() throws IOException {
1054       Preconditions.checkState(!closeAndCleanCompleted);
1055 
1056       final List<Path> paths = new ArrayList<Path>();
1057       final List<IOException> thrown = Lists.newArrayList();
1058       ThreadPoolExecutor closeThreadPool = Threads.getBoundedCachedThreadPool(numThreads, 30L,
1059         TimeUnit.SECONDS, new ThreadFactory() {
1060           private int count = 1;
1061 
1062           public Thread newThread(Runnable r) {
1063             Thread t = new Thread(r, "split-log-closeStream-" + count++);
1064             return t;
1065           }
1066         });
1067       CompletionService<Void> completionService =
1068         new ExecutorCompletionService<Void>(closeThreadPool);
1069       for (final Map.Entry<byte[], SinkWriter> writersEntry : writers.entrySet()) {
1070         LOG.debug("Submitting close of " + ((WriterAndPath)writersEntry.getValue()).p);
1071         completionService.submit(new Callable<Void>() {
1072           public Void call() throws Exception {
1073             WriterAndPath wap = (WriterAndPath) writersEntry.getValue();
1074             LOG.debug("Closing " + wap.p);
1075             try {
1076               wap.w.close();
1077             } catch (IOException ioe) {
1078               LOG.error("Couldn't close log at " + wap.p, ioe);
1079               thrown.add(ioe);
1080               return null;
1081             }
1082             LOG.info("Closed wap " + wap.p + " (wrote " + wap.editsWritten + " edits in "
1083                 + (wap.nanosSpent / 1000 / 1000) + "ms)");
1084 
1085             if (wap.editsWritten == 0) {
1086               // just remove the empty recovered.edits file
1087               if (fs.exists(wap.p) && !fs.delete(wap.p, false)) {
1088                 LOG.warn("Failed deleting empty " + wap.p);
1089                 throw new IOException("Failed deleting empty  " + wap.p);
1090               }
1091               return null;
1092             }
1093 
1094             Path dst = getCompletedRecoveredEditsFilePath(wap.p,
1095               regionMaximumEditLogSeqNum.get(writersEntry.getKey()));
1096             try {
1097               if (!dst.equals(wap.p) && fs.exists(dst)) {
1098                 LOG.warn("Found existing old edits file. It could be the "
1099                     + "result of a previous failed split attempt. Deleting " + dst + ", length="
1100                     + fs.getFileStatus(dst).getLen());
1101                 if (!fs.delete(dst, false)) {
1102                   LOG.warn("Failed deleting of old " + dst);
1103                   throw new IOException("Failed deleting of old " + dst);
1104                 }
1105               }
1106               // Skip the unit tests which create a splitter that reads and
1107               // writes the data without touching disk.
1108               // TestHLogSplit#testThreading is an example.
1109               if (fs.exists(wap.p)) {
1110                 if (!fs.rename(wap.p, dst)) {
1111                   throw new IOException("Failed renaming " + wap.p + " to " + dst);
1112                 }
1113                 LOG.info("Rename " + wap.p + " to " + dst);
1114               }
1115             } catch (IOException ioe) {
1116               LOG.error("Couldn't rename " + wap.p + " to " + dst, ioe);
1117               thrown.add(ioe);
1118               return null;
1119             }
1120             paths.add(dst);
1121             return null;
1122           }
1123         });
1124       }
1125 
1126       boolean progress_failed = false;
1127       try {
1128         for (int i = 0, n = this.writers.size(); i < n; i++) {
1129           Future<Void> future = completionService.take();
1130           future.get();
1131           if (!progress_failed && reporter != null && !reporter.progress()) {
1132             progress_failed = true;
1133           }
1134         }
1135       } catch (InterruptedException e) {
1136         IOException iie = new InterruptedIOException();
1137         iie.initCause(e);
1138         throw iie;
1139       } catch (ExecutionException e) {
1140         throw new IOException(e.getCause());
1141       } finally {
1142         closeThreadPool.shutdownNow();
1143       }
1144 
1145       if (!thrown.isEmpty()) {
1146         throw MultipleIOException.createIOException(thrown);
1147       }
1148       writersClosed = true;
1149       closeAndCleanCompleted = true;
1150       if (progress_failed) {
1151         return null;
1152       }
1153       return paths;
1154     }
1155 
1156     private List<IOException> closeLogWriters(List<IOException> thrown) throws IOException {
1157       if (writersClosed) {
1158         return thrown;
1159       }
1160 
1161       if (thrown == null) {
1162         thrown = Lists.newArrayList();
1163       }
1164       try {
1165         for (WriterThread t : writerThreads) {
1166           while (t.isAlive()) {
1167             t.shouldStop = true;
1168             t.interrupt();
1169             try {
1170               t.join(10);
1171             } catch (InterruptedException e) {
1172               IOException iie = new InterruptedIOException();
1173               iie.initCause(e);
1174               throw iie;
1175             }
1176           }
1177         }
1178       } finally {
1179         synchronized (writers) {
1180           WriterAndPath wap = null;
1181           for (SinkWriter tmpWAP : writers.values()) {
1182             try {
1183               wap = (WriterAndPath) tmpWAP;
1184               wap.w.close();
1185             } catch (IOException ioe) {
1186               LOG.error("Couldn't close log at " + wap.p, ioe);
1187               thrown.add(ioe);
1188               continue;
1189             }
1190             LOG.info("Closed log " + wap.p + " (wrote " + wap.editsWritten + " edits in "
1191                 + (wap.nanosSpent / 1000 / 1000) + "ms)");
1192           }
1193         }
1194         writersClosed = true;
1195       }
1196 
1197       return thrown;
1198     }
1199 
1200     /**
1201      * Get a writer and path for a log starting at the given entry. This function is threadsafe so
1202      * long as multiple threads are always acting on different regions.
1203      * @return null if this region shouldn't output any logs
1204      */
1205     private WriterAndPath getWriterAndPath(Entry entry) throws IOException {
1206       byte region[] = entry.getKey().getEncodedRegionName();
1207       WriterAndPath ret = (WriterAndPath) writers.get(region);
1208       if (ret != null) {
1209         return ret;
1210       }
1211       // If we already decided that this region doesn't get any output
1212       // we don't need to check again.
1213       if (blacklistedRegions.contains(region)) {
1214         return null;
1215       }
1216       ret = createWAP(region, entry, rootDir, fs, conf);
1217       if (ret == null) {
1218         blacklistedRegions.add(region);
1219         return null;
1220       }
1221       writers.put(region, ret);
1222       return ret;
1223     }
1224 
1225     private WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir, FileSystem fs,
1226         Configuration conf) throws IOException {
1227       Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, true);
1228       if (regionedits == null) {
1229         return null;
1230       }
1231       if (fs.exists(regionedits)) {
1232         LOG.warn("Found old edits file. It could be the "
1233             + "result of a previous failed split attempt. Deleting " + regionedits + ", length="
1234             + fs.getFileStatus(regionedits).getLen());
1235         if (!fs.delete(regionedits, false)) {
1236           LOG.warn("Failed delete of old " + regionedits);
1237         }
1238       }
1239       Writer w = createWriter(fs, regionedits, conf);
1240       LOG.info("Creating writer path=" + regionedits + " region=" + Bytes.toStringBinary(region));
1241       return (new WriterAndPath(regionedits, w));
1242     }
1243 
1244     void append(RegionEntryBuffer buffer) throws IOException {
1245       List<Entry> entries = buffer.entryBuffer;
1246       if (entries.isEmpty()) {
1247         LOG.warn("got an empty buffer, skipping");
1248         return;
1249       }
1250 
1251       WriterAndPath wap = null;
1252 
1253       long startTime = System.nanoTime();
1254       try {
1255         int editsCount = 0;
1256 
1257         for (Entry logEntry : entries) {
1258           if (wap == null) {
1259             wap = getWriterAndPath(logEntry);
1260             if (wap == null) {
1261               // getWriterAndPath decided we don't need to write these edits
1262               return;
1263             }
1264           }
1265           wap.w.append(logEntry);
1266           this.updateRegionMaximumEditLogSeqNum(logEntry);
1267           editsCount++;
1268         }
1269         // Pass along summary statistics
1270         wap.incrementEdits(editsCount);
1271         wap.incrementNanoTime(System.nanoTime() - startTime);
1272       } catch (IOException e) {
1273         e = RemoteExceptionHandler.checkIOException(e);
1274         LOG.fatal(" Got while writing log entry to log", e);
1275         throw e;
1276       }
1277     }
1278 
1279     /**
1280      * @return a map from encoded region ID to the number of edits written out for that region.
1281      */
1282     Map<byte[], Long> getOutputCounts() {
1283       TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
1284       synchronized (writers) {
1285         for (Map.Entry<byte[], SinkWriter> entry : writers.entrySet()) {
1286           ret.put(entry.getKey(), entry.getValue().editsWritten);
1287         }
1288       }
1289       return ret;
1290     }
1291 
1292     @Override
1293     int getNumberOfRecoveredRegions() {
1294       return writers.size();
1295     }
1296   }
1297 
1298   /**
1299    * Class wraps the actual writer which writes data out and related statistics
1300    */
1301   private abstract static class SinkWriter {
1302     /* Count of edits written to this path */
1303     long editsWritten = 0;
1304     /* Number of nanos spent writing to this log */
1305     long nanosSpent = 0;
1306 
1307     void incrementEdits(int edits) {
1308       editsWritten += edits;
1309     }
1310 
1311     void incrementNanoTime(long nanos) {
1312       nanosSpent += nanos;
1313     }
1314   }
1315 
1316   /**
1317    * Private data structure that wraps a Writer and its Path, also collecting statistics about the
1318    * data written to this output.
1319    */
1320   private final static class WriterAndPath extends SinkWriter {
1321     final Path p;
1322     final Writer w;
1323 
1324     WriterAndPath(final Path p, final Writer w) {
1325       this.p = p;
1326       this.w = w;
1327     }
1328   }
1329 
1330   /**
1331    * Class that manages to replay edits from WAL files directly to assigned fail over region servers
1332    */
1333   class LogReplayOutputSink extends OutputSink {
1334     private static final double BUFFER_THRESHOLD = 0.35;
1335     private static final String KEY_DELIMITER = "#";
1336 
1337     private long waitRegionOnlineTimeOut;
1338     private final Set<String> recoveredRegions = Collections.synchronizedSet(new HashSet<String>());
1339     private final Map<String, RegionServerWriter> writers =
1340         new ConcurrentHashMap<String, RegionServerWriter>();
1341     // online encoded region name -> region location map
1342     private final Map<String, HRegionLocation> onlineRegions =
1343         new ConcurrentHashMap<String, HRegionLocation>();
1344 
1345     private Map<TableName, HConnection> tableNameToHConnectionMap = Collections
1346         .synchronizedMap(new TreeMap<TableName, HConnection>());
1347     /**
1348      * Map key -> value layout
1349      * <servername>:<table name> -> Queue<Row>
1350      */
1351     private Map<String, List<Pair<HRegionLocation, HLog.Entry>>> serverToBufferQueueMap =
1352         new ConcurrentHashMap<String, List<Pair<HRegionLocation, HLog.Entry>>>();
1353     private List<Throwable> thrown = new ArrayList<Throwable>();
1354 
1355     // The following sink is used in distrubitedLogReplay mode for entries of regions in a disabling
1356     // table. It's a limitation of distributedLogReplay. Because log replay needs a region is
1357     // assigned and online before it can replay wal edits while regions of disabling/disabled table
1358     // won't be assigned by AM. We can retire this code after HBASE-8234.
1359     private LogRecoveredEditsOutputSink logRecoveredEditsOutputSink;
1360     private boolean hasEditsInDisablingOrDisabledTables = false;
1361 
1362     public LogReplayOutputSink(int numWriters) {
1363       super(numWriters);
1364       this.waitRegionOnlineTimeOut = conf.getInt("hbase.splitlog.manager.timeout",
1365         SplitLogManager.DEFAULT_TIMEOUT);
1366       this.logRecoveredEditsOutputSink = new LogRecoveredEditsOutputSink(numWriters);
1367       this.logRecoveredEditsOutputSink.setReporter(reporter);
1368     }
1369 
1370     void append(RegionEntryBuffer buffer) throws IOException {
1371       List<Entry> entries = buffer.entryBuffer;
1372       if (entries.isEmpty()) {
1373         LOG.warn("got an empty buffer, skipping");
1374         return;
1375       }
1376 
1377       // check if current region in a disabling or disabled table
1378       if (disablingOrDisabledTables.contains(buffer.tableName)) {
1379         // need fall back to old way
1380         logRecoveredEditsOutputSink.append(buffer);
1381         hasEditsInDisablingOrDisabledTables = true;
1382         // store regions we have recovered so far
1383         addToRecoveredRegions(Bytes.toString(buffer.encodedRegionName));
1384         return;
1385       }
1386 
1387       // group entries by region servers
1388       groupEditsByServer(entries);
1389 
1390       // process workitems
1391       String maxLocKey = null;
1392       int maxSize = 0;
1393       List<Pair<HRegionLocation, HLog.Entry>> maxQueue = null;
1394       synchronized (this.serverToBufferQueueMap) {
1395         for (String key : this.serverToBufferQueueMap.keySet()) {
1396           List<Pair<HRegionLocation, HLog.Entry>> curQueue = this.serverToBufferQueueMap.get(key);
1397           if (curQueue.size() > maxSize) {
1398             maxSize = curQueue.size();
1399             maxQueue = curQueue;
1400             maxLocKey = key;
1401           }
1402         }
1403         if (maxSize < minBatchSize
1404             && entryBuffers.totalBuffered < BUFFER_THRESHOLD * entryBuffers.maxHeapUsage) {
1405           // buffer more to process
1406           return;
1407         } else if (maxSize > 0) {
1408           this.serverToBufferQueueMap.remove(maxLocKey);
1409         }
1410       }
1411 
1412       if (maxSize > 0) {
1413         processWorkItems(maxLocKey, maxQueue);
1414       }
1415     }
1416 
1417     private void addToRecoveredRegions(String encodedRegionName) {
1418       if (!recoveredRegions.contains(encodedRegionName)) {
1419         recoveredRegions.add(encodedRegionName);
1420       }
1421     }
1422 
1423     /**
1424      * Helper function to group WALEntries to individual region servers
1425      * @throws IOException
1426      */
1427     private void groupEditsByServer(List<Entry> entries) throws IOException {
1428       Set<TableName> nonExistentTables = null;
1429       Long cachedLastFlushedSequenceId = -1l;
1430       for (HLog.Entry entry : entries) {
1431         WALEdit edit = entry.getEdit();
1432         TableName table = entry.getKey().getTablename();
1433         // clear scopes which isn't needed for recovery
1434         entry.getKey().setScopes(null);
1435         String encodeRegionNameStr = Bytes.toString(entry.getKey().getEncodedRegionName());
1436         // skip edits of non-existent tables
1437         if (nonExistentTables != null && nonExistentTables.contains(table)) {
1438           this.skippedEdits.incrementAndGet();
1439           continue;
1440         }
1441 
1442         Map<byte[], Long> maxStoreSequenceIds = null;
1443         boolean needSkip = false;
1444         HRegionLocation loc = null;
1445         String locKey = null;
1446         List<KeyValue> kvs = edit.getKeyValues();
1447         List<KeyValue> skippedKVs = new ArrayList<KeyValue>();
1448         HConnection hconn = this.getConnectionByTableName(table);
1449 
1450         for (KeyValue kv : kvs) {
1451           // filtering HLog meta entries
1452           // We don't handle HBASE-2231 because we may or may not replay a compaction event.
1453           // Details at https://issues.apache.org/jira/browse/HBASE-2231?focusedCommentId=13647143&
1454           // page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13647143
1455           if (CellUtil.matchingFamily(kv, WALEdit.METAFAMILY)) {
1456             skippedKVs.add(kv);
1457             continue;
1458           }
1459 
1460           try {
1461             loc =
1462                 locateRegionAndRefreshLastFlushedSequenceId(hconn, table, kv.getRow(),
1463                   encodeRegionNameStr);
1464           } catch (TableNotFoundException ex) {
1465             // table has been deleted so skip edits of the table
1466             LOG.info("Table " + table + " doesn't exist. Skip log replay for region "
1467                 + encodeRegionNameStr);
1468             lastFlushedSequenceIds.put(encodeRegionNameStr, Long.MAX_VALUE);
1469             if (nonExistentTables == null) {
1470               nonExistentTables = new TreeSet<TableName>();
1471             }
1472             nonExistentTables.add(table);
1473             this.skippedEdits.incrementAndGet();
1474             needSkip = true;
1475             break;
1476           }
1477 
1478           cachedLastFlushedSequenceId =
1479               lastFlushedSequenceIds.get(loc.getRegionInfo().getEncodedName());
1480           if (cachedLastFlushedSequenceId != null
1481               && cachedLastFlushedSequenceId >= entry.getKey().getLogSeqNum()) {
1482             // skip the whole HLog entry
1483             this.skippedEdits.incrementAndGet();
1484             needSkip = true;
1485             break;
1486           } else {
1487             if (maxStoreSequenceIds == null) {
1488               maxStoreSequenceIds =
1489                   regionMaxSeqIdInStores.get(loc.getRegionInfo().getEncodedName());
1490             }
1491             if (maxStoreSequenceIds != null) {
1492               Long maxStoreSeqId = maxStoreSequenceIds.get(kv.getFamily());
1493               if (maxStoreSeqId == null || maxStoreSeqId >= entry.getKey().getLogSeqNum()) {
1494                 // skip current kv if column family doesn't exist anymore or already flushed
1495                 skippedKVs.add(kv);
1496                 continue;
1497               }
1498             }
1499           }
1500         }
1501 
1502         // skip the edit
1503         if (loc == null || needSkip) continue;
1504 
1505         if (!skippedKVs.isEmpty()) {
1506           kvs.removeAll(skippedKVs);
1507         }
1508 
1509         synchronized (serverToBufferQueueMap) {
1510           locKey = loc.getHostnamePort() + KEY_DELIMITER + table;
1511           List<Pair<HRegionLocation, HLog.Entry>> queue = serverToBufferQueueMap.get(locKey);
1512           if (queue == null) {
1513             queue =
1514                 Collections.synchronizedList(new ArrayList<Pair<HRegionLocation, HLog.Entry>>());
1515             serverToBufferQueueMap.put(locKey, queue);
1516           }
1517           queue.add(new Pair<HRegionLocation, HLog.Entry>(loc, entry));
1518         }
1519         // store regions we have recovered so far
1520         addToRecoveredRegions(loc.getRegionInfo().getEncodedName());
1521       }
1522     }
1523 
1524     /**
1525      * Locate destination region based on table name & row. This function also makes sure the
1526      * destination region is online for replay.
1527      * @throws IOException
1528      */
1529     private HRegionLocation locateRegionAndRefreshLastFlushedSequenceId(HConnection hconn,
1530         TableName table, byte[] row, String originalEncodedRegionName) throws IOException {
1531       // fetch location from cache
1532       HRegionLocation loc = onlineRegions.get(originalEncodedRegionName);
1533       if(loc != null) return loc;
1534       // fetch location from hbase:meta directly without using cache to avoid hit old dead server
1535       loc = hconn.getRegionLocation(table, row, true);
1536       if (loc == null) {
1537         throw new IOException("Can't locate location for row:" + Bytes.toString(row)
1538             + " of table:" + table);
1539       }
1540       // check if current row moves to a different region due to region merge/split
1541       if (!originalEncodedRegionName.equalsIgnoreCase(loc.getRegionInfo().getEncodedName())) {
1542         // originalEncodedRegionName should have already flushed
1543         lastFlushedSequenceIds.put(originalEncodedRegionName, Long.MAX_VALUE);
1544         HRegionLocation tmpLoc = onlineRegions.get(loc.getRegionInfo().getEncodedName());
1545         if (tmpLoc != null) return tmpLoc;
1546       }
1547 
1548       Long lastFlushedSequenceId = -1l;
1549       AtomicBoolean isRecovering = new AtomicBoolean(true);
1550       loc = waitUntilRegionOnline(loc, row, this.waitRegionOnlineTimeOut, isRecovering);
1551       if (!isRecovering.get()) {
1552         // region isn't in recovering at all because WAL file may contain a region that has
1553         // been moved to somewhere before hosting RS fails
1554         lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), Long.MAX_VALUE);
1555         LOG.info("logReplay skip region: " + loc.getRegionInfo().getEncodedName()
1556             + " because it's not in recovering.");
1557       } else {
1558         Long cachedLastFlushedSequenceId =
1559             lastFlushedSequenceIds.get(loc.getRegionInfo().getEncodedName());
1560 
1561         // retrieve last flushed sequence Id from ZK. Because region postOpenDeployTasks will
1562         // update the value for the region
1563         RegionStoreSequenceIds ids =
1564             SplitLogManager.getRegionFlushedSequenceId(watcher, failedServerName, loc
1565                 .getRegionInfo().getEncodedName());
1566         if (ids != null) {
1567           lastFlushedSequenceId = ids.getLastFlushedSequenceId();
1568           Map<byte[], Long> storeIds = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
1569           List<StoreSequenceId> maxSeqIdInStores = ids.getStoreSequenceIdList();
1570           for (StoreSequenceId id : maxSeqIdInStores) {
1571             storeIds.put(id.getFamilyName().toByteArray(), id.getSequenceId());
1572           }
1573           regionMaxSeqIdInStores.put(loc.getRegionInfo().getEncodedName(), storeIds);
1574         }
1575 
1576         if (cachedLastFlushedSequenceId == null
1577             || lastFlushedSequenceId > cachedLastFlushedSequenceId) {
1578           lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), lastFlushedSequenceId);
1579         }
1580       }
1581 
1582       onlineRegions.put(loc.getRegionInfo().getEncodedName(), loc);
1583       return loc;
1584     }
1585 
1586     private void processWorkItems(String key, List<Pair<HRegionLocation, HLog.Entry>> actions)
1587         throws IOException {
1588       RegionServerWriter rsw = null;
1589 
1590       long startTime = System.nanoTime();
1591       try {
1592         rsw = getRegionServerWriter(key);
1593         rsw.sink.replayEntries(actions);
1594 
1595         // Pass along summary statistics
1596         rsw.incrementEdits(actions.size());
1597         rsw.incrementNanoTime(System.nanoTime() - startTime);
1598       } catch (IOException e) {
1599         e = RemoteExceptionHandler.checkIOException(e);
1600         LOG.fatal(" Got while writing log entry to log", e);
1601         throw e;
1602       }
1603     }
1604 
1605     /**
1606      * Wait until region is online on the destination region server
1607      * @param loc
1608      * @param row
1609      * @param timeout How long to wait
1610      * @param isRecovering Recovering state of the region interested on destination region server.
1611      * @return True when region is online on the destination region server
1612      * @throws InterruptedException
1613      */
1614     private HRegionLocation waitUntilRegionOnline(HRegionLocation loc, byte[] row,
1615         final long timeout, AtomicBoolean isRecovering)
1616         throws IOException {
1617       final long endTime = EnvironmentEdgeManager.currentTimeMillis() + timeout;
1618       final long pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
1619         HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
1620       boolean reloadLocation = false;
1621       TableName tableName = loc.getRegionInfo().getTable();
1622       int tries = 0;
1623       Throwable cause = null;
1624       while (endTime > EnvironmentEdgeManager.currentTimeMillis()) {
1625         try {
1626           // Try and get regioninfo from the hosting server.
1627           HConnection hconn = getConnectionByTableName(tableName);
1628           if(reloadLocation) {
1629             loc = hconn.getRegionLocation(tableName, row, true);
1630           }
1631           BlockingInterface remoteSvr = hconn.getAdmin(loc.getServerName());
1632           HRegionInfo region = loc.getRegionInfo();
1633           try {
1634             GetRegionInfoRequest request =
1635                 RequestConverter.buildGetRegionInfoRequest(region.getRegionName());
1636             GetRegionInfoResponse response = remoteSvr.getRegionInfo(null, request);
1637             if (HRegionInfo.convert(response.getRegionInfo()) != null) {
1638               isRecovering.set((response.hasIsRecovering()) ? response.getIsRecovering() : true);
1639               return loc;
1640             }
1641           } catch (ServiceException se) {
1642             throw ProtobufUtil.getRemoteException(se);
1643           }
1644         } catch (IOException e) {
1645           cause = e.getCause();
1646           if(!(cause instanceof RegionOpeningException)) {
1647             reloadLocation = true;
1648           }
1649         }
1650         long expectedSleep = ConnectionUtils.getPauseTime(pause, tries);
1651         try {
1652           Thread.sleep(expectedSleep);
1653         } catch (InterruptedException e) {
1654           throw new IOException("Interrupted when waiting region " +
1655               loc.getRegionInfo().getEncodedName() + " online.", e);
1656         }
1657         tries++;
1658       }
1659 
1660       throw new IOException("Timeout when waiting region " + loc.getRegionInfo().getEncodedName() +
1661         " online for " + timeout + " milliseconds.", cause);
1662     }
1663 
1664     @Override
1665     protected boolean flush() throws IOException {
1666       String curLoc = null;
1667       int curSize = 0;
1668       List<Pair<HRegionLocation, HLog.Entry>> curQueue = null;
1669       synchronized (this.serverToBufferQueueMap) {
1670         for (String locationKey : this.serverToBufferQueueMap.keySet()) {
1671           curQueue = this.serverToBufferQueueMap.get(locationKey);
1672           if (!curQueue.isEmpty()) {
1673             curSize = curQueue.size();
1674             curLoc = locationKey;
1675             break;
1676           }
1677         }
1678         if (curSize > 0) {
1679           this.serverToBufferQueueMap.remove(curLoc);
1680         }
1681       }
1682 
1683       if (curSize > 0) {
1684         this.processWorkItems(curLoc, curQueue);
1685         dataAvailable.notifyAll();
1686         return true;
1687       }
1688       return false;
1689     }
1690 
1691     void addWriterError(Throwable t) {
1692       thrown.add(t);
1693     }
1694 
1695     @Override
1696     List<Path> finishWritingAndClose() throws IOException {
1697       try {
1698         if (!finishWriting()) {
1699           return null;
1700         }
1701         if (hasEditsInDisablingOrDisabledTables) {
1702           splits = logRecoveredEditsOutputSink.finishWritingAndClose();
1703         } else {
1704           splits = new ArrayList<Path>();
1705         }
1706         // returns an empty array in order to keep interface same as old way
1707         return splits;
1708       } finally {
1709         List<IOException> thrown = closeRegionServerWriters();
1710         if (thrown != null && !thrown.isEmpty()) {
1711           throw MultipleIOException.createIOException(thrown);
1712         }
1713       }
1714     }
1715 
1716     @Override
1717     int getNumOpenWriters() {
1718       return this.writers.size() + this.logRecoveredEditsOutputSink.getNumOpenWriters();
1719     }
1720 
1721     private List<IOException> closeRegionServerWriters() throws IOException {
1722       List<IOException> result = null;
1723       if (!writersClosed) {
1724         result = Lists.newArrayList();
1725         try {
1726           for (WriterThread t : writerThreads) {
1727             while (t.isAlive()) {
1728               t.shouldStop = true;
1729               t.interrupt();
1730               try {
1731                 t.join(10);
1732               } catch (InterruptedException e) {
1733                 IOException iie = new InterruptedIOException();
1734                 iie.initCause(e);
1735                 throw iie;
1736               }
1737             }
1738           }
1739         } finally {
1740           synchronized (writers) {
1741             for (String locationKey : writers.keySet()) {
1742               RegionServerWriter tmpW = writers.get(locationKey);
1743               try {
1744                 tmpW.close();
1745               } catch (IOException ioe) {
1746                 LOG.error("Couldn't close writer for region server:" + locationKey, ioe);
1747                 result.add(ioe);
1748               }
1749             }
1750           }
1751 
1752           // close connections
1753           synchronized (this.tableNameToHConnectionMap) {
1754             for (TableName tableName : this.tableNameToHConnectionMap.keySet()) {
1755               HConnection hconn = this.tableNameToHConnectionMap.get(tableName);
1756               try {
1757                 hconn.clearRegionCache();
1758                 hconn.close();
1759               } catch (IOException ioe) {
1760                 result.add(ioe);
1761               }
1762             }
1763           }
1764           writersClosed = true;
1765         }
1766       }
1767       return result;
1768     }
1769 
1770     Map<byte[], Long> getOutputCounts() {
1771       TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
1772       synchronized (writers) {
1773         for (Map.Entry<String, RegionServerWriter> entry : writers.entrySet()) {
1774           ret.put(Bytes.toBytes(entry.getKey()), entry.getValue().editsWritten);
1775         }
1776       }
1777       return ret;
1778     }
1779 
1780     @Override
1781     int getNumberOfRecoveredRegions() {
1782       return this.recoveredRegions.size();
1783     }
1784 
1785     /**
1786      * Get a writer and path for a log starting at the given entry. This function is threadsafe so
1787      * long as multiple threads are always acting on different regions.
1788      * @return null if this region shouldn't output any logs
1789      */
1790     private RegionServerWriter getRegionServerWriter(String loc) throws IOException {
1791       RegionServerWriter ret = writers.get(loc);
1792       if (ret != null) {
1793         return ret;
1794       }
1795 
1796       TableName tableName = getTableFromLocationStr(loc);
1797       if(tableName == null){
1798         throw new IOException("Invalid location string:" + loc + " found. Replay aborted.");
1799       }
1800 
1801       HConnection hconn = getConnectionByTableName(tableName);
1802       synchronized (writers) {
1803         ret = writers.get(loc);
1804         if (ret == null) {
1805           ret = new RegionServerWriter(conf, tableName, hconn);
1806           writers.put(loc, ret);
1807         }
1808       }
1809       return ret;
1810     }
1811 
1812     private HConnection getConnectionByTableName(final TableName tableName) throws IOException {
1813       HConnection hconn = this.tableNameToHConnectionMap.get(tableName);
1814       if (hconn == null) {
1815         synchronized (this.tableNameToHConnectionMap) {
1816           hconn = this.tableNameToHConnectionMap.get(tableName);
1817           if (hconn == null) {
1818             hconn = HConnectionManager.getConnection(conf);
1819             this.tableNameToHConnectionMap.put(tableName, hconn);
1820           }
1821         }
1822       }
1823       return hconn;
1824     }
1825     private TableName getTableFromLocationStr(String loc) {
1826       /**
1827        * location key is in format <server name:port>#<table name>
1828        */
1829       String[] splits = loc.split(KEY_DELIMITER);
1830       if (splits.length != 2) {
1831         return null;
1832       }
1833       return TableName.valueOf(splits[1]);
1834     }
1835   }
1836 
1837   /**
1838    * Private data structure that wraps a receiving RS and collecting statistics about the data
1839    * written to this newly assigned RS.
1840    */
1841   private final static class RegionServerWriter extends SinkWriter {
1842     final WALEditsReplaySink sink;
1843 
1844     RegionServerWriter(final Configuration conf, final TableName tableName, final HConnection conn)
1845         throws IOException {
1846       this.sink = new WALEditsReplaySink(conf, tableName, conn);
1847     }
1848 
1849     void close() throws IOException {
1850     }
1851   }
1852 
1853   static class CorruptedLogFileException extends Exception {
1854     private static final long serialVersionUID = 1L;
1855 
1856     CorruptedLogFileException(String s) {
1857       super(s);
1858     }
1859   }
1860 
1861   /** A struct used by getMutationsFromWALEntry */
1862   public static class MutationReplay {
1863     public MutationReplay(MutationType type, Mutation mutation, long nonceGroup, long nonce) {
1864       this.type = type;
1865       this.mutation = mutation;
1866       this.nonceGroup = nonceGroup;
1867       this.nonce = nonce;
1868     }
1869 
1870     public final MutationType type;
1871     public final Mutation mutation;
1872     public final long nonceGroup;
1873     public final long nonce;
1874   }
1875 
1876  /**
1877   * Tag original sequence number for each edit to be replayed
1878   * @param entry
1879   * @param cell
1880   */
1881   private static Cell tagReplayLogSequenceNumber(WALEntry entry, Cell cell) {
1882     // Tag puts with original sequence number if there is no LOG_REPLAY_TAG yet
1883     boolean needAddRecoveryTag = true;
1884     if (cell.getTagsLength() > 0) {
1885       Tag tmpTag = Tag.getTag(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength(),
1886         TagType.LOG_REPLAY_TAG_TYPE);
1887       if (tmpTag != null) {
1888         // found an existing log replay tag so reuse it
1889         needAddRecoveryTag = false;
1890       }
1891     }
1892     if (needAddRecoveryTag) {
1893       List<Tag> newTags = new ArrayList<Tag>();
1894       Tag replayTag = new Tag(TagType.LOG_REPLAY_TAG_TYPE, Bytes.toBytes(entry.getKey()
1895           .getLogSequenceNumber()));
1896       newTags.add(replayTag);
1897       return KeyValue.cloneAndAddTags(cell, newTags);
1898     }
1899     return cell;
1900   }
1901 
1902   /**
1903    * This function is used to construct mutations from a WALEntry. It also reconstructs HLogKey &
1904    * WALEdit from the passed in WALEntry
1905    * @param entry
1906    * @param cells
1907    * @param logEntry pair of HLogKey and WALEdit instance stores HLogKey and WALEdit instances
1908    *          extracted from the passed in WALEntry.
1909    * @param addLogReplayTag
1910    * @return list of Pair<MutationType, Mutation> to be replayed
1911    * @throws IOException
1912    */
1913   public static List<MutationReplay> getMutationsFromWALEntry(WALEntry entry, CellScanner cells,
1914       Pair<HLogKey, WALEdit> logEntry, boolean addLogReplayTag) throws IOException {
1915 
1916     if (entry == null) {
1917       // return an empty array
1918       return new ArrayList<MutationReplay>();
1919     }
1920 
1921     int count = entry.getAssociatedCellCount();
1922     List<MutationReplay> mutations = new ArrayList<MutationReplay>();
1923     Cell previousCell = null;
1924     Mutation m = null;
1925     HLogKey key = null;
1926     WALEdit val = null;
1927     if (logEntry != null) val = new WALEdit();
1928 
1929     for (int i = 0; i < count; i++) {
1930       // Throw index out of bounds if our cell count is off
1931       if (!cells.advance()) {
1932         throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
1933       }
1934       Cell cell = cells.current();
1935       if (val != null) val.add(KeyValueUtil.ensureKeyValue(cell));
1936 
1937       boolean isNewRowOrType =
1938           previousCell == null || previousCell.getTypeByte() != cell.getTypeByte()
1939               || !CellUtil.matchingRow(previousCell, cell);
1940       if (isNewRowOrType) {
1941         // Create new mutation
1942         if (CellUtil.isDelete(cell)) {
1943           m = new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
1944           // Deletes don't have nonces.
1945           mutations.add(new MutationReplay(
1946               MutationType.DELETE, m, HConstants.NO_NONCE, HConstants.NO_NONCE));
1947         } else {
1948           m = new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
1949           // Puts might come from increment or append, thus we need nonces.
1950           long nonceGroup = entry.getKey().hasNonceGroup()
1951               ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
1952           long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
1953           mutations.add(new MutationReplay(MutationType.PUT, m, nonceGroup, nonce));
1954         }
1955       }
1956       if (CellUtil.isDelete(cell)) {
1957         ((Delete) m).addDeleteMarker(KeyValueUtil.ensureKeyValue(cell));
1958       } else {
1959         Cell tmpNewCell = cell;
1960         if (addLogReplayTag) {
1961           tmpNewCell = tagReplayLogSequenceNumber(entry, cell);
1962         }
1963         ((Put) m).add(KeyValueUtil.ensureKeyValue(tmpNewCell));
1964       }
1965       previousCell = cell;
1966     }
1967 
1968     // reconstruct HLogKey
1969     if (logEntry != null) {
1970       WALKey walKey = entry.getKey();
1971       List<UUID> clusterIds = new ArrayList<UUID>(walKey.getClusterIdsCount());
1972       for (HBaseProtos.UUID uuid : entry.getKey().getClusterIdsList()) {
1973         clusterIds.add(new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits()));
1974       }
1975       key = new HLogKey(walKey.getEncodedRegionName().toByteArray(), TableName.valueOf(walKey
1976               .getTableName().toByteArray()), walKey.getLogSequenceNumber(), 
1977               walKey.getWriteTime(), clusterIds, walKey.getNonceGroup(), walKey.getNonce());
1978       logEntry.setFirst(key);
1979       logEntry.setSecond(val);
1980     }
1981 
1982     return mutations;
1983   }
1984 }