View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.wal;
20  
21  import java.io.EOFException;
22  import java.io.FileNotFoundException;
23  import java.io.IOException;
24  import java.io.InterruptedIOException;
25  import java.text.ParseException;
26  import java.util.ArrayList;
27  import java.util.Collections;
28  import java.util.HashSet;
29  import java.util.LinkedList;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.NavigableSet;
33  import java.util.Set;
34  import java.util.TreeMap;
35  import java.util.TreeSet;
36  import java.util.UUID;
37  import java.util.concurrent.Callable;
38  import java.util.concurrent.CompletionService;
39  import java.util.concurrent.ConcurrentHashMap;
40  import java.util.concurrent.ExecutionException;
41  import java.util.concurrent.ExecutorCompletionService;
42  import java.util.concurrent.Future;
43  import java.util.concurrent.ThreadFactory;
44  import java.util.concurrent.ThreadPoolExecutor;
45  import java.util.concurrent.TimeUnit;
46  import java.util.concurrent.atomic.AtomicBoolean;
47  import java.util.concurrent.atomic.AtomicLong;
48  import java.util.concurrent.atomic.AtomicReference;
49  import java.util.regex.Matcher;
50  import java.util.regex.Pattern;
51  
52  import org.apache.commons.logging.Log;
53  import org.apache.commons.logging.LogFactory;
54  import org.apache.hadoop.conf.Configuration;
55  import org.apache.hadoop.fs.FileAlreadyExistsException;
56  import org.apache.hadoop.fs.FileStatus;
57  import org.apache.hadoop.fs.FileSystem;
58  import org.apache.hadoop.fs.Path;
59  import org.apache.hadoop.fs.PathFilter;
60  import org.apache.hadoop.hbase.Cell;
61  import org.apache.hadoop.hbase.CellScanner;
62  import org.apache.hadoop.hbase.CellUtil;
63  import org.apache.hadoop.hbase.CoordinatedStateException;
64  import org.apache.hadoop.hbase.CoordinatedStateManager;
65  import org.apache.hadoop.hbase.HBaseConfiguration;
66  import org.apache.hadoop.hbase.HConstants;
67  import org.apache.hadoop.hbase.HRegionInfo;
68  import org.apache.hadoop.hbase.HRegionLocation;
69  import org.apache.hadoop.hbase.RemoteExceptionHandler;
70  import org.apache.hadoop.hbase.ServerName;
71  import org.apache.hadoop.hbase.TableName;
72  import org.apache.hadoop.hbase.TableNotFoundException;
73  import org.apache.hadoop.hbase.TableStateManager;
74  import org.apache.hadoop.hbase.classification.InterfaceAudience;
75  import org.apache.hadoop.hbase.client.ConnectionUtils;
76  import org.apache.hadoop.hbase.client.Delete;
77  import org.apache.hadoop.hbase.client.Durability;
78  import org.apache.hadoop.hbase.client.HConnection;
79  import org.apache.hadoop.hbase.client.HConnectionManager;
80  import org.apache.hadoop.hbase.client.Mutation;
81  import org.apache.hadoop.hbase.client.Put;
82  import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
83  import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
84  import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
85  import org.apache.hadoop.hbase.io.HeapSize;
86  import org.apache.hadoop.hbase.master.SplitLogManager;
87  import org.apache.hadoop.hbase.monitoring.MonitoredTask;
88  import org.apache.hadoop.hbase.monitoring.TaskMonitor;
89  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
90  import org.apache.hadoop.hbase.protobuf.RequestConverter;
91  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
92  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest;
93  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse;
94  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
95  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
96  import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
97  import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
98  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
99  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
100 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
101 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
102 import org.apache.hadoop.hbase.regionserver.HRegion;
103 import org.apache.hadoop.hbase.regionserver.LastSequenceId;
104 // imports for things that haven't moved from regionserver.wal yet.
105 import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
106 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
107 import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
108 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
109 import org.apache.hadoop.hbase.regionserver.wal.WALEditsReplaySink;
110 import org.apache.hadoop.hbase.util.Bytes;
111 import org.apache.hadoop.hbase.util.CancelableProgressable;
112 import org.apache.hadoop.hbase.util.ClassSize;
113 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
114 import org.apache.hadoop.hbase.util.FSUtils;
115 import org.apache.hadoop.hbase.util.Pair;
116 import org.apache.hadoop.hbase.util.Threads;
117 import org.apache.hadoop.hbase.wal.WAL.Entry;
118 import org.apache.hadoop.hbase.wal.WAL.Reader;
119 import org.apache.hadoop.hbase.wal.WALProvider.Writer;
120 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
121 import org.apache.hadoop.io.MultipleIOException;
122 
123 import com.google.common.annotations.VisibleForTesting;
124 import com.google.common.base.Preconditions;
125 import com.google.common.collect.Lists;
126 import com.google.protobuf.ServiceException;
127 import com.google.protobuf.TextFormat;
128 
129 /**
130  * This class is responsible for splitting up a bunch of regionserver commit log
131  * files that are no longer being written to, into new files, one per region for
132  * region to replay on startup. Delete the old log files when finished.
133  */
134 @InterfaceAudience.Private
135 public class WALSplitter {
136   private static final Log LOG = LogFactory.getLog(WALSplitter.class);
137 
138   /** By default we retry errors in splitting, rather than skipping. */
139   public static final boolean SPLIT_SKIP_ERRORS_DEFAULT = false;
140 
141   // Parameters for split process
142   protected final Path rootDir;
143   protected final FileSystem fs;
144   protected final Configuration conf;
145 
146   // Major subcomponents of the split process.
147   // These are separated into inner classes to make testing easier.
148   PipelineController controller;
149   OutputSink outputSink;
150   EntryBuffers entryBuffers;
151 
152   private Set<TableName> disablingOrDisabledTables =
153       new HashSet<TableName>();
154   private BaseCoordinatedStateManager csm;
155   private final WALFactory walFactory;
156 
157   private MonitoredTask status;
158 
159   // For checking the latest flushed sequence id
160   protected final LastSequenceId sequenceIdChecker;
161 
162   protected boolean distributedLogReplay;
163 
164   // Map encodedRegionName -> lastFlushedSequenceId
165   protected Map<String, Long> lastFlushedSequenceIds = new ConcurrentHashMap<String, Long>();
166 
167   // Map encodedRegionName -> maxSeqIdInStores
168   protected Map<String, Map<byte[], Long>> regionMaxSeqIdInStores =
169       new ConcurrentHashMap<String, Map<byte[], Long>>();
170 
171   // Failed region server that the wal file being split belongs to
172   protected String failedServerName = "";
173 
174   // Number of writer threads
175   private final int numWriterThreads;
176 
177   // Min batch size when replay WAL edits
178   private final int minBatchSize;
179 
180   WALSplitter(final WALFactory factory, Configuration conf, Path rootDir,
181       FileSystem fs, LastSequenceId idChecker,
182       CoordinatedStateManager csm, RecoveryMode mode) {
183     this.conf = HBaseConfiguration.create(conf);
184     String codecClassName = conf
185         .get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
186     this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName);
187     this.rootDir = rootDir;
188     this.fs = fs;
189     this.sequenceIdChecker = idChecker;
190     this.csm = (BaseCoordinatedStateManager)csm;
191     this.walFactory = factory;
192     this.controller = new PipelineController();
193 
194     entryBuffers = new EntryBuffers(controller,
195         this.conf.getInt("hbase.regionserver.hlog.splitlog.buffersize",
196             128*1024*1024));
197 
198     // a larger minBatchSize may slow down recovery because replay writer has to wait for
199     // enough edits before replaying them
200     this.minBatchSize = this.conf.getInt("hbase.regionserver.wal.logreplay.batch.size", 64);
201     this.distributedLogReplay = (RecoveryMode.LOG_REPLAY == mode);
202 
203     this.numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
204     if (csm != null && this.distributedLogReplay) {
205       outputSink = new LogReplayOutputSink(controller, entryBuffers, numWriterThreads);
206     } else {
207       if (this.distributedLogReplay) {
208         LOG.info("ZooKeeperWatcher is passed in as NULL so disable distrubitedLogRepaly.");
209       }
210       this.distributedLogReplay = false;
211       outputSink = new LogRecoveredEditsOutputSink(controller, entryBuffers, numWriterThreads);
212     }
213 
214   }
215 
216   /**
217    * Splits a WAL file into region's recovered-edits directory.
218    * This is the main entry point for distributed log splitting from SplitLogWorker.
219    * <p>
220    * If the log file has N regions then N recovered.edits files will be produced.
221    * <p>
222    * @param rootDir
223    * @param logfile
224    * @param fs
225    * @param conf
226    * @param reporter
227    * @param idChecker
228    * @param cp coordination state manager
229    * @return false if it is interrupted by the progress-able.
230    * @throws IOException
231    */
232   public static boolean splitLogFile(Path rootDir, FileStatus logfile, FileSystem fs,
233       Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker,
234       CoordinatedStateManager cp, RecoveryMode mode, final WALFactory factory) throws IOException {
235     WALSplitter s = new WALSplitter(factory, conf, rootDir, fs, idChecker, cp, mode);
236     return s.splitLogFile(logfile, reporter);
237   }
238 
239   // A wrapper to split one log folder using the method used by distributed
240   // log splitting. Used by tools and unit tests. It should be package private.
241   // It is public only because UpgradeTo96 and TestWALObserver are in different packages,
242   // which uses this method to do log splitting.
243   public static List<Path> split(Path rootDir, Path logDir, Path oldLogDir,
244       FileSystem fs, Configuration conf, final WALFactory factory) throws IOException {
245     final FileStatus[] logfiles = SplitLogManager.getFileList(conf,
246         Collections.singletonList(logDir), null);
247     List<Path> splits = new ArrayList<Path>();
248     if (logfiles != null && logfiles.length > 0) {
249       for (FileStatus logfile: logfiles) {
250         WALSplitter s = new WALSplitter(factory, conf, rootDir, fs, null, null,
251             RecoveryMode.LOG_SPLITTING);
252         if (s.splitLogFile(logfile, null)) {
253           finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf);
254           if (s.outputSink.splits != null) {
255             splits.addAll(s.outputSink.splits);
256           }
257         }
258       }
259     }
260     if (!fs.delete(logDir, true)) {
261       throw new IOException("Unable to delete src dir: " + logDir);
262     }
263     return splits;
264   }
265 
266   /**
267    * log splitting implementation, splits one log file.
268    * @param logfile should be an actual log file.
269    */
270   boolean splitLogFile(FileStatus logfile, CancelableProgressable reporter) throws IOException {
271     Preconditions.checkState(status == null);
272     Preconditions.checkArgument(logfile.isFile(),
273         "passed in file status is for something other than a regular file.");
274     boolean isCorrupted = false;
275     boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors",
276       SPLIT_SKIP_ERRORS_DEFAULT);
277     int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024);
278     Path logPath = logfile.getPath();
279     boolean outputSinkStarted = false;
280     boolean progress_failed = false;
281     int editsCount = 0;
282     int editsSkipped = 0;
283 
284     status =
285         TaskMonitor.get().createStatus(
286           "Splitting log file " + logfile.getPath() + "into a temporary staging area.");
287     Reader in = null;
288     try {
289       long logLength = logfile.getLen();
290       LOG.info("Splitting wal: " + logPath + ", length=" + logLength);
291       LOG.info("DistributedLogReplay = " + this.distributedLogReplay);
292       status.setStatus("Opening log file");
293       if (reporter != null && !reporter.progress()) {
294         progress_failed = true;
295         return false;
296       }
297       try {
298         in = getReader(logfile, skipErrors, reporter);
299       } catch (CorruptedLogFileException e) {
300         LOG.warn("Could not get reader, corrupted log file " + logPath, e);
301         ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs);
302         isCorrupted = true;
303       }
304       if (in == null) {
305         LOG.warn("Nothing to split in log file " + logPath);
306         return true;
307       }
308       if (csm != null) {
309         try {
310           TableStateManager tsm = csm.getTableStateManager();
311           disablingOrDisabledTables = tsm.getTablesInStates(
312           ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING);
313         } catch (CoordinatedStateException e) {
314           throw new IOException("Can't get disabling/disabled tables", e);
315         }
316       }
317       int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3);
318       int numOpenedFilesLastCheck = 0;
319       outputSink.setReporter(reporter);
320       outputSink.startWriterThreads();
321       outputSinkStarted = true;
322       Entry entry;
323       Long lastFlushedSequenceId = -1L;
324       ServerName serverName = DefaultWALProvider.getServerNameFromWALDirectoryName(logPath);
325       failedServerName = (serverName == null) ? "" : serverName.getServerName();
326       while ((entry = getNextLogLine(in, logPath, skipErrors)) != null) {
327         byte[] region = entry.getKey().getEncodedRegionName();
328         String encodedRegionNameAsStr = Bytes.toString(region);
329         lastFlushedSequenceId = lastFlushedSequenceIds.get(encodedRegionNameAsStr);
330         if (lastFlushedSequenceId == null) {
331           if (this.distributedLogReplay) {
332             RegionStoreSequenceIds ids =
333                 csm.getSplitLogWorkerCoordination().getRegionFlushedSequenceId(failedServerName,
334                   encodedRegionNameAsStr);
335             if (ids != null) {
336               lastFlushedSequenceId = ids.getLastFlushedSequenceId();
337               if (LOG.isDebugEnabled()) {
338                 LOG.debug("DLR Last flushed sequenceid for " + encodedRegionNameAsStr + ": " +
339                   TextFormat.shortDebugString(ids));
340               }
341             }
342           } else if (sequenceIdChecker != null) {
343             RegionStoreSequenceIds ids = sequenceIdChecker.getLastSequenceId(region);
344             Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
345             for (StoreSequenceId storeSeqId : ids.getStoreSequenceIdList()) {
346               maxSeqIdInStores.put(storeSeqId.getFamilyName().toByteArray(),
347                 storeSeqId.getSequenceId());
348             }
349             regionMaxSeqIdInStores.put(encodedRegionNameAsStr, maxSeqIdInStores);
350             lastFlushedSequenceId = ids.getLastFlushedSequenceId();
351             if (LOG.isDebugEnabled()) {
352               LOG.debug("DLS Last flushed sequenceid for " + encodedRegionNameAsStr + ": " +
353                   TextFormat.shortDebugString(ids));
354             }
355           }
356           if (lastFlushedSequenceId == null) {
357             lastFlushedSequenceId = -1L;
358           }
359           lastFlushedSequenceIds.put(encodedRegionNameAsStr, lastFlushedSequenceId);
360         }
361         if (lastFlushedSequenceId >= entry.getKey().getLogSeqNum()) {
362           editsSkipped++;
363           continue;
364         }
365         // Don't send Compaction/Close/Open region events to recovered edit type sinks.
366         if (entry.getEdit().isMetaEdit() && !outputSink.keepRegionEvent(entry)) {
367           editsSkipped++;
368           continue;
369         }
370         entryBuffers.appendEntry(entry);
371         editsCount++;
372         int moreWritersFromLastCheck = this.getNumOpenWriters() - numOpenedFilesLastCheck;
373         // If sufficient edits have passed, check if we should report progress.
374         if (editsCount % interval == 0
375             || moreWritersFromLastCheck > numOpenedFilesBeforeReporting) {
376           numOpenedFilesLastCheck = this.getNumOpenWriters();
377           String countsStr = (editsCount - (editsSkipped + outputSink.getSkippedEdits()))
378               + " edits, skipped " + editsSkipped + " edits.";
379           status.setStatus("Split " + countsStr);
380           if (reporter != null && !reporter.progress()) {
381             progress_failed = true;
382             return false;
383           }
384         }
385       }
386     } catch (InterruptedException ie) {
387       IOException iie = new InterruptedIOException();
388       iie.initCause(ie);
389       throw iie;
390     } catch (CorruptedLogFileException e) {
391       LOG.warn("Could not parse, corrupted log file " + logPath, e);
392       csm.getSplitLogWorkerCoordination().markCorrupted(rootDir,
393         logfile.getPath().getName(), fs);
394       isCorrupted = true;
395     } catch (IOException e) {
396       e = RemoteExceptionHandler.checkIOException(e);
397       throw e;
398     } finally {
399       LOG.debug("Finishing writing output logs and closing down.");
400       try {
401         if (null != in) {
402           in.close();
403         }
404       } catch (IOException exception) {
405         LOG.warn("Could not close wal reader: " + exception.getMessage());
406         LOG.debug("exception details", exception);
407       }
408       try {
409         if (outputSinkStarted) {
410           // Set progress_failed to true as the immediate following statement will reset its value
411           // when finishWritingAndClose() throws exception, progress_failed has the right value
412           progress_failed = true;
413           progress_failed = outputSink.finishWritingAndClose() == null;
414         }
415       } finally {
416         String msg =
417             "Processed " + editsCount + " edits across " + outputSink.getNumberOfRecoveredRegions()
418                 + " regions; edits skipped=" + editsSkipped + "; log file=" + logPath +
419                 ", length=" + logfile.getLen() + // See if length got updated post lease recovery
420                 ", corrupted=" + isCorrupted + ", progress failed=" + progress_failed;
421         LOG.info(msg);
422         status.markComplete(msg);
423       }
424     }
425     return !progress_failed;
426   }
427 
428   /**
429    * Completes the work done by splitLogFile by archiving logs
430    * <p>
431    * It is invoked by SplitLogManager once it knows that one of the
432    * SplitLogWorkers have completed the splitLogFile() part. If the master
433    * crashes then this function might get called multiple times.
434    * <p>
435    * @param logfile
436    * @param conf
437    * @throws IOException
438    */
439   public static void finishSplitLogFile(String logfile,
440       Configuration conf)  throws IOException {
441     Path rootdir = FSUtils.getRootDir(conf);
442     Path oldLogDir = new Path(rootdir, HConstants.HREGION_OLDLOGDIR_NAME);
443     Path logPath;
444     if (FSUtils.isStartingWithPath(rootdir, logfile)) {
445       logPath = new Path(logfile);
446     } else {
447       logPath = new Path(rootdir, logfile);
448     }
449     finishSplitLogFile(rootdir, oldLogDir, logPath, conf);
450   }
451 
452   static void finishSplitLogFile(Path rootdir, Path oldLogDir,
453       Path logPath, Configuration conf) throws IOException {
454     List<Path> processedLogs = new ArrayList<Path>();
455     List<Path> corruptedLogs = new ArrayList<Path>();
456     FileSystem fs;
457     fs = rootdir.getFileSystem(conf);
458     if (ZKSplitLog.isCorrupted(rootdir, logPath.getName(), fs)) {
459       corruptedLogs.add(logPath);
460     } else {
461       processedLogs.add(logPath);
462     }
463     archiveLogs(corruptedLogs, processedLogs, oldLogDir, fs, conf);
464     Path stagingDir = ZKSplitLog.getSplitLogDir(rootdir, logPath.getName());
465     fs.delete(stagingDir, true);
466   }
467 
468   /**
469    * Moves processed logs to a oldLogDir after successful processing Moves
470    * corrupted logs (any log that couldn't be successfully parsed to corruptDir
471    * (.corrupt) for later investigation
472    *
473    * @param corruptedLogs
474    * @param processedLogs
475    * @param oldLogDir
476    * @param fs
477    * @param conf
478    * @throws IOException
479    */
480   private static void archiveLogs(
481       final List<Path> corruptedLogs,
482       final List<Path> processedLogs, final Path oldLogDir,
483       final FileSystem fs, final Configuration conf) throws IOException {
484     final Path corruptDir = new Path(FSUtils.getRootDir(conf), conf.get(
485         "hbase.regionserver.hlog.splitlog.corrupt.dir",  HConstants.CORRUPT_DIR_NAME));
486 
487     if (!fs.mkdirs(corruptDir)) {
488       LOG.info("Unable to mkdir " + corruptDir);
489     }
490     fs.mkdirs(oldLogDir);
491 
492     // this method can get restarted or called multiple times for archiving
493     // the same log files.
494     for (Path corrupted : corruptedLogs) {
495       Path p = new Path(corruptDir, corrupted.getName());
496       if (fs.exists(corrupted)) {
497         if (!fs.rename(corrupted, p)) {
498           LOG.warn("Unable to move corrupted log " + corrupted + " to " + p);
499         } else {
500           LOG.warn("Moved corrupted log " + corrupted + " to " + p);
501         }
502       }
503     }
504 
505     for (Path p : processedLogs) {
506       Path newPath = FSHLog.getWALArchivePath(oldLogDir, p);
507       if (fs.exists(p)) {
508         if (!FSUtils.renameAndSetModifyTime(fs, p, newPath)) {
509           LOG.warn("Unable to move  " + p + " to " + newPath);
510         } else {
511           LOG.info("Archived processed log " + p + " to " + newPath);
512         }
513       }
514     }
515   }
516 
517   /**
518    * Path to a file under RECOVERED_EDITS_DIR directory of the region found in
519    * <code>logEntry</code> named for the sequenceid in the passed
520    * <code>logEntry</code>: e.g. /hbase/some_table/2323432434/recovered.edits/2332.
521    * This method also ensures existence of RECOVERED_EDITS_DIR under the region
522    * creating it if necessary.
523    * @param fs
524    * @param logEntry
525    * @param rootDir HBase root dir.
526    * @return Path to file into which to dump split log edits.
527    * @throws IOException
528    */
529   @SuppressWarnings("deprecation")
530   static Path getRegionSplitEditsPath(final FileSystem fs,
531       final Entry logEntry, final Path rootDir, boolean isCreate)
532   throws IOException {
533     Path tableDir = FSUtils.getTableDir(rootDir, logEntry.getKey().getTablename());
534     String encodedRegionName = Bytes.toString(logEntry.getKey().getEncodedRegionName());
535     Path regiondir = HRegion.getRegionDir(tableDir, encodedRegionName);
536     Path dir = getRegionDirRecoveredEditsDir(regiondir);
537 
538     if (!fs.exists(regiondir)) {
539       LOG.info("This region's directory doesn't exist: "
540           + regiondir.toString() + ". It is very likely that it was" +
541           " already split so it's safe to discard those edits.");
542       return null;
543     }
544     if (fs.exists(dir) && fs.isFile(dir)) {
545       Path tmp = new Path("/tmp");
546       if (!fs.exists(tmp)) {
547         fs.mkdirs(tmp);
548       }
549       tmp = new Path(tmp,
550         HConstants.RECOVERED_EDITS_DIR + "_" + encodedRegionName);
551       LOG.warn("Found existing old file: " + dir + ". It could be some "
552         + "leftover of an old installation. It should be a folder instead. "
553         + "So moving it to " + tmp);
554       if (!fs.rename(dir, tmp)) {
555         LOG.warn("Failed to sideline old file " + dir);
556       }
557     }
558 
559     if (isCreate && !fs.exists(dir)) {
560       if (!fs.mkdirs(dir)) LOG.warn("mkdir failed on " + dir);
561     }
562     // Append file name ends with RECOVERED_LOG_TMPFILE_SUFFIX to ensure
563     // region's replayRecoveredEdits will not delete it
564     String fileName = formatRecoveredEditsFileName(logEntry.getKey().getLogSeqNum());
565     fileName = getTmpRecoveredEditsFileName(fileName);
566     return new Path(dir, fileName);
567   }
568 
569   static String getTmpRecoveredEditsFileName(String fileName) {
570     return fileName + RECOVERED_LOG_TMPFILE_SUFFIX;
571   }
572 
573   /**
574    * Get the completed recovered edits file path, renaming it to be by last edit
575    * in the file from its first edit. Then we could use the name to skip
576    * recovered edits when doing {@link HRegion#replayRecoveredEditsIfAny}.
577    * @param srcPath
578    * @param maximumEditLogSeqNum
579    * @return dstPath take file's last edit log seq num as the name
580    */
581   static Path getCompletedRecoveredEditsFilePath(Path srcPath,
582       Long maximumEditLogSeqNum) {
583     String fileName = formatRecoveredEditsFileName(maximumEditLogSeqNum);
584     return new Path(srcPath.getParent(), fileName);
585   }
586 
587   static String formatRecoveredEditsFileName(final long seqid) {
588     return String.format("%019d", seqid);
589   }
590 
591   private static final Pattern EDITFILES_NAME_PATTERN = Pattern.compile("-?[0-9]+");
592   private static final String RECOVERED_LOG_TMPFILE_SUFFIX = ".temp";
593 
594   /**
595    * @param regiondir
596    *          This regions directory in the filesystem.
597    * @return The directory that holds recovered edits files for the region
598    *         <code>regiondir</code>
599    */
600   public static Path getRegionDirRecoveredEditsDir(final Path regiondir) {
601     return new Path(regiondir, HConstants.RECOVERED_EDITS_DIR);
602   }
603 
604   /**
605    * Returns sorted set of edit files made by splitter, excluding files
606    * with '.temp' suffix.
607    *
608    * @param fs
609    * @param regiondir
610    * @return Files in passed <code>regiondir</code> as a sorted set.
611    * @throws IOException
612    */
613   public static NavigableSet<Path> getSplitEditFilesSorted(final FileSystem fs,
614       final Path regiondir) throws IOException {
615     NavigableSet<Path> filesSorted = new TreeSet<Path>();
616     Path editsdir = getRegionDirRecoveredEditsDir(regiondir);
617     if (!fs.exists(editsdir))
618       return filesSorted;
619     FileStatus[] files = FSUtils.listStatus(fs, editsdir, new PathFilter() {
620       @Override
621       public boolean accept(Path p) {
622         boolean result = false;
623         try {
624           // Return files and only files that match the editfile names pattern.
625           // There can be other files in this directory other than edit files.
626           // In particular, on error, we'll move aside the bad edit file giving
627           // it a timestamp suffix. See moveAsideBadEditsFile.
628           Matcher m = EDITFILES_NAME_PATTERN.matcher(p.getName());
629           result = fs.isFile(p) && m.matches();
630           // Skip the file whose name ends with RECOVERED_LOG_TMPFILE_SUFFIX,
631           // because it means splitwal thread is writting this file.
632           if (p.getName().endsWith(RECOVERED_LOG_TMPFILE_SUFFIX)) {
633             result = false;
634           }
635           // Skip SeqId Files
636           if (isSequenceIdFile(p)) {
637             result = false;
638           }
639         } catch (IOException e) {
640           LOG.warn("Failed isFile check on " + p);
641         }
642         return result;
643       }
644     });
645     if (files == null) {
646       return filesSorted;
647     }
648     for (FileStatus status : files) {
649       filesSorted.add(status.getPath());
650     }
651     return filesSorted;
652   }
653 
654   /**
655    * Move aside a bad edits file.
656    *
657    * @param fs
658    * @param edits
659    *          Edits file to move aside.
660    * @return The name of the moved aside file.
661    * @throws IOException
662    */
663   public static Path moveAsideBadEditsFile(final FileSystem fs, final Path edits)
664       throws IOException {
665     Path moveAsideName = new Path(edits.getParent(), edits.getName() + "."
666         + System.currentTimeMillis());
667     if (!fs.rename(edits, moveAsideName)) {
668       LOG.warn("Rename failed from " + edits + " to " + moveAsideName);
669     }
670     return moveAsideName;
671   }
672 
673   private static final String SEQUENCE_ID_FILE_SUFFIX = ".seqid";
674   private static final String OLD_SEQUENCE_ID_FILE_SUFFIX = "_seqid";
675   private static final int SEQUENCE_ID_FILE_SUFFIX_LENGTH = SEQUENCE_ID_FILE_SUFFIX.length();
676 
677   /**
678    * Is the given file a region open sequence id file.
679    */
680   @VisibleForTesting
681   public static boolean isSequenceIdFile(final Path file) {
682     return file.getName().endsWith(SEQUENCE_ID_FILE_SUFFIX)
683         || file.getName().endsWith(OLD_SEQUENCE_ID_FILE_SUFFIX);
684   }
685 
686   /**
687    * Create a file with name as region open sequence id
688    * @param fs
689    * @param regiondir
690    * @param newSeqId
691    * @param saftyBumper
692    * @return long new sequence Id value
693    * @throws IOException
694    */
695   public static long writeRegionSequenceIdFile(final FileSystem fs, final Path regiondir,
696       long newSeqId, long saftyBumper) throws IOException {
697 
698     Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
699     long maxSeqId = 0;
700     FileStatus[] files = null;
701     if (fs.exists(editsdir)) {
702       files = FSUtils.listStatus(fs, editsdir, new PathFilter() {
703         @Override
704         public boolean accept(Path p) {
705           return isSequenceIdFile(p);
706         }
707       });
708       if (files != null) {
709         for (FileStatus status : files) {
710           String fileName = status.getPath().getName();
711           try {
712             Long tmpSeqId = Long.parseLong(fileName.substring(0, fileName.length()
713                 - SEQUENCE_ID_FILE_SUFFIX_LENGTH));
714             maxSeqId = Math.max(tmpSeqId, maxSeqId);
715           } catch (NumberFormatException ex) {
716             LOG.warn("Invalid SeqId File Name=" + fileName);
717           }
718         }
719       }
720     }
721     if (maxSeqId > newSeqId) {
722       newSeqId = maxSeqId;
723     }
724     newSeqId += saftyBumper; // bump up SeqId
725 
726     // write a new seqId file
727     Path newSeqIdFile = new Path(editsdir, newSeqId + SEQUENCE_ID_FILE_SUFFIX);
728     if (newSeqId != maxSeqId) {
729       try {
730         if (!fs.createNewFile(newSeqIdFile) && !fs.exists(newSeqIdFile)) {
731           throw new IOException("Failed to create SeqId file:" + newSeqIdFile);
732         }
733         if (LOG.isDebugEnabled()) {
734           LOG.debug("Wrote region seqId=" + newSeqIdFile + " to file, newSeqId=" + newSeqId
735               + ", maxSeqId=" + maxSeqId);
736         }
737       } catch (FileAlreadyExistsException ignored) {
738         // latest hdfs throws this exception. it's all right if newSeqIdFile already exists
739       }
740     }
741     // remove old ones
742     if (files != null) {
743       for (FileStatus status : files) {
744         if (newSeqIdFile.equals(status.getPath())) {
745           continue;
746         }
747         fs.delete(status.getPath(), false);
748       }
749     }
750     return newSeqId;
751   }
752 
753   /**
754    * Create a new {@link Reader} for reading logs to split.
755    *
756    * @param file
757    * @return A new Reader instance, caller should close
758    * @throws IOException
759    * @throws CorruptedLogFileException
760    */
761   protected Reader getReader(FileStatus file, boolean skipErrors, CancelableProgressable reporter)
762       throws IOException, CorruptedLogFileException {
763     Path path = file.getPath();
764     long length = file.getLen();
765     Reader in;
766 
767     // Check for possibly empty file. With appends, currently Hadoop reports a
768     // zero length even if the file has been sync'd. Revisit if HDFS-376 or
769     // HDFS-878 is committed.
770     if (length <= 0) {
771       LOG.warn("File " + path + " might be still open, length is 0");
772     }
773 
774     try {
775       FSUtils.getInstance(fs, conf).recoverFileLease(fs, path, conf, reporter);
776       try {
777         in = getReader(path, reporter);
778       } catch (EOFException e) {
779         if (length <= 0) {
780           // TODO should we ignore an empty, not-last log file if skip.errors
781           // is false? Either way, the caller should decide what to do. E.g.
782           // ignore if this is the last log in sequence.
783           // TODO is this scenario still possible if the log has been
784           // recovered (i.e. closed)
785           LOG.warn("Could not open " + path + " for reading. File is empty", e);
786           return null;
787         } else {
788           // EOFException being ignored
789           return null;
790         }
791       }
792     } catch (IOException e) {
793       if (e instanceof FileNotFoundException) {
794         // A wal file may not exist anymore. Nothing can be recovered so move on
795         LOG.warn("File " + path + " doesn't exist anymore.", e);
796         return null;
797       }
798       if (!skipErrors || e instanceof InterruptedIOException) {
799         throw e; // Don't mark the file corrupted if interrupted, or not skipErrors
800       }
801       CorruptedLogFileException t =
802         new CorruptedLogFileException("skipErrors=true Could not open wal " +
803             path + " ignoring");
804       t.initCause(e);
805       throw t;
806     }
807     return in;
808   }
809 
810   static private Entry getNextLogLine(Reader in, Path path, boolean skipErrors)
811   throws CorruptedLogFileException, IOException {
812     try {
813       return in.next();
814     } catch (EOFException eof) {
815       // truncated files are expected if a RS crashes (see HBASE-2643)
816       LOG.info("EOF from wal " + path + ".  continuing");
817       return null;
818     } catch (IOException e) {
819       // If the IOE resulted from bad file format,
820       // then this problem is idempotent and retrying won't help
821       if (e.getCause() != null &&
822           (e.getCause() instanceof ParseException ||
823            e.getCause() instanceof org.apache.hadoop.fs.ChecksumException)) {
824         LOG.warn("Parse exception " + e.getCause().toString() + " from wal "
825            + path + ".  continuing");
826         return null;
827       }
828       if (!skipErrors) {
829         throw e;
830       }
831       CorruptedLogFileException t =
832         new CorruptedLogFileException("skipErrors=true Ignoring exception" +
833             " while parsing wal " + path + ". Marking as corrupted");
834       t.initCause(e);
835       throw t;
836     }
837   }
838 
839   /**
840    * Create a new {@link Writer} for writing log splits.
841    * @return a new Writer instance, caller should close
842    */
843   protected Writer createWriter(Path logfile)
844       throws IOException {
845     return walFactory.createRecoveredEditsWriter(fs, logfile);
846   }
847 
848   /**
849    * Create a new {@link Reader} for reading logs to split.
850    * @return new Reader instance, caller should close
851    */
852   protected Reader getReader(Path curLogFile, CancelableProgressable reporter) throws IOException {
853     return walFactory.createReader(fs, curLogFile, reporter);
854   }
855 
856   /**
857    * Get current open writers
858    */
859   private int getNumOpenWriters() {
860     int result = 0;
861     if (this.outputSink != null) {
862       result += this.outputSink.getNumOpenWriters();
863     }
864     return result;
865   }
866 
867   /**
868    * Contains some methods to control WAL-entries producer / consumer interactions
869    */
870   public static class PipelineController {
871     // If an exception is thrown by one of the other threads, it will be
872     // stored here.
873     AtomicReference<Throwable> thrown = new AtomicReference<Throwable>();
874 
875     // Wait/notify for when data has been produced by the writer thread,
876     // consumed by the reader thread, or an exception occurred
877     public final Object dataAvailable = new Object();
878 
879     void writerThreadError(Throwable t) {
880       thrown.compareAndSet(null, t);
881     }
882 
883     /**
884      * Check for errors in the writer threads. If any is found, rethrow it.
885      */
886     void checkForErrors() throws IOException {
887       Throwable thrown = this.thrown.get();
888       if (thrown == null) return;
889       if (thrown instanceof IOException) {
890         throw new IOException(thrown);
891       } else {
892         throw new RuntimeException(thrown);
893       }
894     }
895   }
896 
897   /**
898    * Class which accumulates edits and separates them into a buffer per region
899    * while simultaneously accounting RAM usage. Blocks if the RAM usage crosses
900    * a predefined threshold.
901    *
902    * Writer threads then pull region-specific buffers from this class.
903    */
904   public static class EntryBuffers {
905     PipelineController controller;
906 
907     Map<byte[], RegionEntryBuffer> buffers =
908       new TreeMap<byte[], RegionEntryBuffer>(Bytes.BYTES_COMPARATOR);
909 
910     /* Track which regions are currently in the middle of writing. We don't allow
911        an IO thread to pick up bytes from a region if we're already writing
912        data for that region in a different IO thread. */
913     Set<byte[]> currentlyWriting = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
914 
915     long totalBuffered = 0;
916     long maxHeapUsage;
917 
918     public EntryBuffers(PipelineController controller, long maxHeapUsage) {
919       this.controller = controller;
920       this.maxHeapUsage = maxHeapUsage;
921     }
922 
923     /**
924      * Append a log entry into the corresponding region buffer.
925      * Blocks if the total heap usage has crossed the specified threshold.
926      *
927      * @throws InterruptedException
928      * @throws IOException
929      */
930     public void appendEntry(Entry entry) throws InterruptedException, IOException {
931       WALKey key = entry.getKey();
932 
933       RegionEntryBuffer buffer;
934       long incrHeap;
935       synchronized (this) {
936         buffer = buffers.get(key.getEncodedRegionName());
937         if (buffer == null) {
938           buffer = new RegionEntryBuffer(key.getTablename(), key.getEncodedRegionName());
939           buffers.put(key.getEncodedRegionName(), buffer);
940         }
941         incrHeap= buffer.appendEntry(entry);
942       }
943 
944       // If we crossed the chunk threshold, wait for more space to be available
945       synchronized (controller.dataAvailable) {
946         totalBuffered += incrHeap;
947         while (totalBuffered > maxHeapUsage && controller.thrown.get() == null) {
948           LOG.debug("Used " + totalBuffered + " bytes of buffered edits, waiting for IO threads...");
949           controller.dataAvailable.wait(2000);
950         }
951         controller.dataAvailable.notifyAll();
952       }
953       controller.checkForErrors();
954     }
955 
956     /**
957      * @return RegionEntryBuffer a buffer of edits to be written or replayed.
958      */
959     synchronized RegionEntryBuffer getChunkToWrite() {
960       long biggestSize = 0;
961       byte[] biggestBufferKey = null;
962 
963       for (Map.Entry<byte[], RegionEntryBuffer> entry : buffers.entrySet()) {
964         long size = entry.getValue().heapSize();
965         if (size > biggestSize && (!currentlyWriting.contains(entry.getKey()))) {
966           biggestSize = size;
967           biggestBufferKey = entry.getKey();
968         }
969       }
970       if (biggestBufferKey == null) {
971         return null;
972       }
973 
974       RegionEntryBuffer buffer = buffers.remove(biggestBufferKey);
975       currentlyWriting.add(biggestBufferKey);
976       return buffer;
977     }
978 
979     void doneWriting(RegionEntryBuffer buffer) {
980       synchronized (this) {
981         boolean removed = currentlyWriting.remove(buffer.encodedRegionName);
982         assert removed;
983       }
984       long size = buffer.heapSize();
985 
986       synchronized (controller.dataAvailable) {
987         totalBuffered -= size;
988         // We may unblock writers
989         controller.dataAvailable.notifyAll();
990       }
991     }
992 
993     synchronized boolean isRegionCurrentlyWriting(byte[] region) {
994       return currentlyWriting.contains(region);
995     }
996 
997     public void waitUntilDrained() {
998       synchronized (controller.dataAvailable) {
999         while (totalBuffered > 0) {
1000           try {
1001             controller.dataAvailable.wait(2000);
1002           } catch (InterruptedException e) {
1003             LOG.warn("Got intrerrupted while waiting for EntryBuffers is drained");
1004             Thread.interrupted();
1005             break;
1006           }
1007         }
1008       }
1009     }
1010   }
1011 
1012   /**
1013    * A buffer of some number of edits for a given region.
1014    * This accumulates edits and also provides a memory optimization in order to
1015    * share a single byte array instance for the table and region name.
1016    * Also tracks memory usage of the accumulated edits.
1017    */
1018   public static class RegionEntryBuffer implements HeapSize {
1019     long heapInBuffer = 0;
1020     List<Entry> entryBuffer;
1021     TableName tableName;
1022     byte[] encodedRegionName;
1023 
1024     RegionEntryBuffer(TableName tableName, byte[] region) {
1025       this.tableName = tableName;
1026       this.encodedRegionName = region;
1027       this.entryBuffer = new LinkedList<Entry>();
1028     }
1029 
1030     long appendEntry(Entry entry) {
1031       internify(entry);
1032       entryBuffer.add(entry);
1033       long incrHeap = entry.getEdit().heapSize() +
1034         ClassSize.align(2 * ClassSize.REFERENCE) + // WALKey pointers
1035         0; // TODO linkedlist entry
1036       heapInBuffer += incrHeap;
1037       return incrHeap;
1038     }
1039 
1040     private void internify(Entry entry) {
1041       WALKey k = entry.getKey();
1042       k.internTableName(this.tableName);
1043       k.internEncodedRegionName(this.encodedRegionName);
1044     }
1045 
1046     @Override
1047     public long heapSize() {
1048       return heapInBuffer;
1049     }
1050 
1051     public byte[] getEncodedRegionName() {
1052       return encodedRegionName;
1053     }
1054 
1055     public List<Entry> getEntryBuffer() {
1056       return entryBuffer;
1057     }
1058 
1059     public TableName getTableName() {
1060       return tableName;
1061     }
1062   }
1063 
1064   public static class WriterThread extends Thread {
1065     private volatile boolean shouldStop = false;
1066     private PipelineController controller;
1067     private EntryBuffers entryBuffers;
1068     private OutputSink outputSink = null;
1069 
1070     WriterThread(PipelineController controller, EntryBuffers entryBuffers, OutputSink sink, int i){
1071       super(Thread.currentThread().getName() + "-Writer-" + i);
1072       this.controller = controller;
1073       this.entryBuffers = entryBuffers;
1074       outputSink = sink;
1075     }
1076 
1077     @Override
1078     public void run()  {
1079       try {
1080         doRun();
1081       } catch (Throwable t) {
1082         LOG.error("Exiting thread", t);
1083         controller.writerThreadError(t);
1084       }
1085     }
1086 
1087     private void doRun() throws IOException {
1088       if (LOG.isTraceEnabled()) LOG.trace("Writer thread starting");
1089       while (true) {
1090         RegionEntryBuffer buffer = entryBuffers.getChunkToWrite();
1091         if (buffer == null) {
1092           // No data currently available, wait on some more to show up
1093           synchronized (controller.dataAvailable) {
1094             if (shouldStop && !this.outputSink.flush()) {
1095               return;
1096             }
1097             try {
1098               controller.dataAvailable.wait(500);
1099             } catch (InterruptedException ie) {
1100               if (!shouldStop) {
1101                 throw new RuntimeException(ie);
1102               }
1103             }
1104           }
1105           continue;
1106         }
1107 
1108         assert buffer != null;
1109         try {
1110           writeBuffer(buffer);
1111         } finally {
1112           entryBuffers.doneWriting(buffer);
1113         }
1114       }
1115     }
1116 
1117     private void writeBuffer(RegionEntryBuffer buffer) throws IOException {
1118       outputSink.append(buffer);
1119     }
1120 
1121     void finish() {
1122       synchronized (controller.dataAvailable) {
1123         shouldStop = true;
1124         controller.dataAvailable.notifyAll();
1125       }
1126     }
1127   }
1128 
1129   /**
1130    * The following class is an abstraction class to provide a common interface to support both
1131    * existing recovered edits file sink and region server WAL edits replay sink
1132    */
1133   public static abstract class OutputSink {
1134 
1135     protected PipelineController controller;
1136     protected EntryBuffers entryBuffers;
1137 
1138     protected Map<byte[], SinkWriter> writers = Collections
1139         .synchronizedMap(new TreeMap<byte[], SinkWriter>(Bytes.BYTES_COMPARATOR));;
1140 
1141     protected final Map<byte[], Long> regionMaximumEditLogSeqNum = Collections
1142         .synchronizedMap(new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR));
1143 
1144     protected final List<WriterThread> writerThreads = Lists.newArrayList();
1145 
1146     /* Set of regions which we've decided should not output edits */
1147     protected final Set<byte[]> blacklistedRegions = Collections
1148         .synchronizedSet(new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR));
1149 
1150     protected boolean closeAndCleanCompleted = false;
1151 
1152     protected boolean writersClosed = false;
1153 
1154     protected final int numThreads;
1155 
1156     protected CancelableProgressable reporter = null;
1157 
1158     protected AtomicLong skippedEdits = new AtomicLong();
1159 
1160     protected List<Path> splits = null;
1161 
1162     public OutputSink(PipelineController controller, EntryBuffers entryBuffers, int numWriters) {
1163       numThreads = numWriters;
1164       this.controller = controller;
1165       this.entryBuffers = entryBuffers;
1166     }
1167 
1168     void setReporter(CancelableProgressable reporter) {
1169       this.reporter = reporter;
1170     }
1171 
1172     /**
1173      * Start the threads that will pump data from the entryBuffers to the output files.
1174      */
1175     public synchronized void startWriterThreads() {
1176       for (int i = 0; i < numThreads; i++) {
1177         WriterThread t = new WriterThread(controller, entryBuffers, this, i);
1178         t.start();
1179         writerThreads.add(t);
1180       }
1181     }
1182 
1183     /**
1184      *
1185      * Update region's maximum edit log SeqNum.
1186      */
1187     void updateRegionMaximumEditLogSeqNum(Entry entry) {
1188       synchronized (regionMaximumEditLogSeqNum) {
1189         Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(entry.getKey()
1190             .getEncodedRegionName());
1191         if (currentMaxSeqNum == null || entry.getKey().getLogSeqNum() > currentMaxSeqNum) {
1192           regionMaximumEditLogSeqNum.put(entry.getKey().getEncodedRegionName(), entry.getKey()
1193               .getLogSeqNum());
1194         }
1195       }
1196     }
1197 
1198     Long getRegionMaximumEditLogSeqNum(byte[] region) {
1199       return regionMaximumEditLogSeqNum.get(region);
1200     }
1201 
1202     /**
1203      * @return the number of currently opened writers
1204      */
1205     int getNumOpenWriters() {
1206       return this.writers.size();
1207     }
1208 
1209     long getSkippedEdits() {
1210       return this.skippedEdits.get();
1211     }
1212 
1213     /**
1214      * Wait for writer threads to dump all info to the sink
1215      * @return true when there is no error
1216      * @throws IOException
1217      */
1218     protected boolean finishWriting(boolean interrupt) throws IOException {
1219       LOG.debug("Waiting for split writer threads to finish");
1220       boolean progress_failed = false;
1221       for (WriterThread t : writerThreads) {
1222         t.finish();
1223       }
1224       if (interrupt) {
1225         for (WriterThread t : writerThreads) {
1226           t.interrupt(); // interrupt the writer threads. We are stopping now.
1227         }
1228       }
1229 
1230       for (WriterThread t : writerThreads) {
1231         if (!progress_failed && reporter != null && !reporter.progress()) {
1232           progress_failed = true;
1233         }
1234         try {
1235           t.join();
1236         } catch (InterruptedException ie) {
1237           IOException iie = new InterruptedIOException();
1238           iie.initCause(ie);
1239           throw iie;
1240         }
1241       }
1242       controller.checkForErrors();
1243       LOG.info(this.writerThreads.size() + " split writers finished; closing...");
1244       return (!progress_failed);
1245     }
1246 
1247     public abstract List<Path> finishWritingAndClose() throws IOException;
1248 
1249     /**
1250      * @return a map from encoded region ID to the number of edits written out for that region.
1251      */
1252     public abstract Map<byte[], Long> getOutputCounts();
1253 
1254     /**
1255      * @return number of regions we've recovered
1256      */
1257     public abstract int getNumberOfRecoveredRegions();
1258 
1259     /**
1260      * @param buffer A WAL Edit Entry
1261      * @throws IOException
1262      */
1263     public abstract void append(RegionEntryBuffer buffer) throws IOException;
1264 
1265     /**
1266      * WriterThread call this function to help flush internal remaining edits in buffer before close
1267      * @return true when underlying sink has something to flush
1268      */
1269     public boolean flush() throws IOException {
1270       return false;
1271     }
1272 
1273     /**
1274      * Some WALEdit's contain only KV's for account on what happened to a region.
1275      * Not all sinks will want to get all of those edits.
1276      *
1277      * @return Return true if this sink wants to accept this region-level WALEdit.
1278      */
1279     public abstract boolean keepRegionEvent(Entry entry);
1280   }
1281 
1282   /**
1283    * Class that manages the output streams from the log splitting process.
1284    */
1285   class LogRecoveredEditsOutputSink extends OutputSink {
1286 
1287     public LogRecoveredEditsOutputSink(PipelineController controller, EntryBuffers entryBuffers,
1288         int numWriters) {
1289       // More threads could potentially write faster at the expense
1290       // of causing more disk seeks as the logs are split.
1291       // 3. After a certain setting (probably around 3) the
1292       // process will be bound on the reader in the current
1293       // implementation anyway.
1294       super(controller, entryBuffers, numWriters);
1295     }
1296 
1297     /**
1298      * @return null if failed to report progress
1299      * @throws IOException
1300      */
1301     @Override
1302     public List<Path> finishWritingAndClose() throws IOException {
1303       boolean isSuccessful = false;
1304       List<Path> result = null;
1305       try {
1306         isSuccessful = finishWriting(false);
1307       } finally {
1308         result = close();
1309         List<IOException> thrown = closeLogWriters(null);
1310         if (thrown != null && !thrown.isEmpty()) {
1311           throw MultipleIOException.createIOException(thrown);
1312         }
1313       }
1314       if (isSuccessful) {
1315         splits = result;
1316       }
1317       return splits;
1318     }
1319 
1320     /**
1321      * Close all of the output streams.
1322      * @return the list of paths written.
1323      */
1324     private List<Path> close() throws IOException {
1325       Preconditions.checkState(!closeAndCleanCompleted);
1326 
1327       final List<Path> paths = new ArrayList<Path>();
1328       final List<IOException> thrown = Lists.newArrayList();
1329       ThreadPoolExecutor closeThreadPool = Threads.getBoundedCachedThreadPool(numThreads, 30L,
1330         TimeUnit.SECONDS, new ThreadFactory() {
1331           private int count = 1;
1332 
1333           @Override
1334           public Thread newThread(Runnable r) {
1335             Thread t = new Thread(r, "split-log-closeStream-" + count++);
1336             return t;
1337           }
1338         });
1339       CompletionService<Void> completionService =
1340         new ExecutorCompletionService<Void>(closeThreadPool);
1341       for (final Map.Entry<byte[], SinkWriter> writersEntry : writers.entrySet()) {
1342         if (LOG.isTraceEnabled()) {
1343           LOG.trace("Submitting close of " + ((WriterAndPath)writersEntry.getValue()).p);
1344         }
1345         completionService.submit(new Callable<Void>() {
1346           @Override
1347           public Void call() throws Exception {
1348             WriterAndPath wap = (WriterAndPath) writersEntry.getValue();
1349             if (LOG.isTraceEnabled()) LOG.trace("Closing " + wap.p);
1350             try {
1351               wap.w.close();
1352             } catch (IOException ioe) {
1353               LOG.error("Couldn't close log at " + wap.p, ioe);
1354               thrown.add(ioe);
1355               return null;
1356             }
1357             if (LOG.isDebugEnabled()) {
1358               LOG.debug("Closed wap " + wap.p + " (wrote " + wap.editsWritten
1359                 + " edits, skipped " + wap.editsSkipped + " edits in "
1360                 + (wap.nanosSpent / 1000 / 1000) + "ms");
1361             }
1362             if (wap.editsWritten == 0) {
1363               // just remove the empty recovered.edits file
1364               if (fs.exists(wap.p) && !fs.delete(wap.p, false)) {
1365                 LOG.warn("Failed deleting empty " + wap.p);
1366                 throw new IOException("Failed deleting empty  " + wap.p);
1367               }
1368               return null;
1369             }
1370 
1371             Path dst = getCompletedRecoveredEditsFilePath(wap.p,
1372               regionMaximumEditLogSeqNum.get(writersEntry.getKey()));
1373             try {
1374               if (!dst.equals(wap.p) && fs.exists(dst)) {
1375                 LOG.warn("Found existing old edits file. It could be the "
1376                     + "result of a previous failed split attempt. Deleting " + dst + ", length="
1377                     + fs.getFileStatus(dst).getLen());
1378                 if (!fs.delete(dst, false)) {
1379                   LOG.warn("Failed deleting of old " + dst);
1380                   throw new IOException("Failed deleting of old " + dst);
1381                 }
1382               }
1383               // Skip the unit tests which create a splitter that reads and
1384               // writes the data without touching disk.
1385               // TestHLogSplit#testThreading is an example.
1386               if (fs.exists(wap.p)) {
1387                 if (!fs.rename(wap.p, dst)) {
1388                   throw new IOException("Failed renaming " + wap.p + " to " + dst);
1389                 }
1390                 LOG.info("Rename " + wap.p + " to " + dst);
1391               }
1392             } catch (IOException ioe) {
1393               LOG.error("Couldn't rename " + wap.p + " to " + dst, ioe);
1394               thrown.add(ioe);
1395               return null;
1396             }
1397             paths.add(dst);
1398             return null;
1399           }
1400         });
1401       }
1402 
1403       boolean progress_failed = false;
1404       try {
1405         for (int i = 0, n = this.writers.size(); i < n; i++) {
1406           Future<Void> future = completionService.take();
1407           future.get();
1408           if (!progress_failed && reporter != null && !reporter.progress()) {
1409             progress_failed = true;
1410           }
1411         }
1412       } catch (InterruptedException e) {
1413         IOException iie = new InterruptedIOException();
1414         iie.initCause(e);
1415         throw iie;
1416       } catch (ExecutionException e) {
1417         throw new IOException(e.getCause());
1418       } finally {
1419         closeThreadPool.shutdownNow();
1420       }
1421 
1422       if (!thrown.isEmpty()) {
1423         throw MultipleIOException.createIOException(thrown);
1424       }
1425       writersClosed = true;
1426       closeAndCleanCompleted = true;
1427       if (progress_failed) {
1428         return null;
1429       }
1430       return paths;
1431     }
1432 
1433     private List<IOException> closeLogWriters(List<IOException> thrown) throws IOException {
1434       if (writersClosed) {
1435         return thrown;
1436       }
1437 
1438       if (thrown == null) {
1439         thrown = Lists.newArrayList();
1440       }
1441       try {
1442         for (WriterThread t : writerThreads) {
1443           while (t.isAlive()) {
1444             t.shouldStop = true;
1445             t.interrupt();
1446             try {
1447               t.join(10);
1448             } catch (InterruptedException e) {
1449               IOException iie = new InterruptedIOException();
1450               iie.initCause(e);
1451               throw iie;
1452             }
1453           }
1454         }
1455       } finally {
1456         synchronized (writers) {
1457           WriterAndPath wap = null;
1458           for (SinkWriter tmpWAP : writers.values()) {
1459             try {
1460               wap = (WriterAndPath) tmpWAP;
1461               wap.w.close();
1462             } catch (IOException ioe) {
1463               LOG.error("Couldn't close log at " + wap.p, ioe);
1464               thrown.add(ioe);
1465               continue;
1466             }
1467             LOG.info("Closed log " + wap.p + " (wrote " + wap.editsWritten + " edits in "
1468                 + (wap.nanosSpent / 1000 / 1000) + "ms)");
1469           }
1470         }
1471         writersClosed = true;
1472       }
1473 
1474       return thrown;
1475     }
1476 
1477     /**
1478      * Get a writer and path for a log starting at the given entry. This function is threadsafe so
1479      * long as multiple threads are always acting on different regions.
1480      * @return null if this region shouldn't output any logs
1481      */
1482     private WriterAndPath getWriterAndPath(Entry entry) throws IOException {
1483       byte region[] = entry.getKey().getEncodedRegionName();
1484       WriterAndPath ret = (WriterAndPath) writers.get(region);
1485       if (ret != null) {
1486         return ret;
1487       }
1488       // If we already decided that this region doesn't get any output
1489       // we don't need to check again.
1490       if (blacklistedRegions.contains(region)) {
1491         return null;
1492       }
1493       ret = createWAP(region, entry, rootDir);
1494       if (ret == null) {
1495         blacklistedRegions.add(region);
1496         return null;
1497       }
1498       writers.put(region, ret);
1499       return ret;
1500     }
1501 
1502     /**
1503      * @return a path with a write for that path. caller should close.
1504      */
1505     private WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir) throws IOException {
1506       Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, true);
1507       if (regionedits == null) {
1508         return null;
1509       }
1510       if (fs.exists(regionedits)) {
1511         LOG.warn("Found old edits file. It could be the "
1512             + "result of a previous failed split attempt. Deleting " + regionedits + ", length="
1513             + fs.getFileStatus(regionedits).getLen());
1514         if (!fs.delete(regionedits, false)) {
1515           LOG.warn("Failed delete of old " + regionedits);
1516         }
1517       }
1518       Writer w = createWriter(regionedits);
1519       LOG.debug("Creating writer path=" + regionedits);
1520       return new WriterAndPath(regionedits, w);
1521     }
1522 
1523     private void filterCellByStore(Entry logEntry) {
1524       Map<byte[], Long> maxSeqIdInStores =
1525           regionMaxSeqIdInStores.get(Bytes.toString(logEntry.getKey().getEncodedRegionName()));
1526       if (maxSeqIdInStores == null || maxSeqIdInStores.isEmpty()) {
1527         return;
1528       }
1529       // Create the array list for the cells that aren't filtered.
1530       // We make the assumption that most cells will be kept.
1531       ArrayList<Cell> keptCells = new ArrayList<Cell>(logEntry.getEdit().getCells().size());
1532       for (Cell cell : logEntry.getEdit().getCells()) {
1533         if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
1534           keptCells.add(cell);
1535         } else {
1536           byte[] family = CellUtil.cloneFamily(cell);
1537           Long maxSeqId = maxSeqIdInStores.get(family);
1538           // Do not skip cell even if maxSeqId is null. Maybe we are in a rolling upgrade,
1539           // or the master was crashed before and we can not get the information.
1540           if (maxSeqId == null || maxSeqId.longValue() < logEntry.getKey().getLogSeqNum()) {
1541             keptCells.add(cell);
1542           }
1543         }
1544       }
1545 
1546       // Anything in the keptCells array list is still live.
1547       // So rather than removing the cells from the array list
1548       // which would be an O(n^2) operation, we just replace the list
1549       logEntry.getEdit().setCells(keptCells);
1550     }
1551 
1552     @Override
1553     public void append(RegionEntryBuffer buffer) throws IOException {
1554       List<Entry> entries = buffer.entryBuffer;
1555       if (entries.isEmpty()) {
1556         LOG.warn("got an empty buffer, skipping");
1557         return;
1558       }
1559 
1560       WriterAndPath wap = null;
1561 
1562       long startTime = System.nanoTime();
1563       try {
1564         int editsCount = 0;
1565 
1566         for (Entry logEntry : entries) {
1567           if (wap == null) {
1568             wap = getWriterAndPath(logEntry);
1569             if (wap == null) {
1570               if (LOG.isDebugEnabled()) {
1571                 LOG.debug("getWriterAndPath decided we don't need to write edits for " + logEntry);
1572               }
1573               return;
1574             }
1575           }
1576           filterCellByStore(logEntry);
1577           if (!logEntry.getEdit().isEmpty()) {
1578             wap.w.append(logEntry);
1579             this.updateRegionMaximumEditLogSeqNum(logEntry);
1580             editsCount++;
1581           } else {
1582             wap.incrementSkippedEdits(1);
1583           }
1584         }
1585         // Pass along summary statistics
1586         wap.incrementEdits(editsCount);
1587         wap.incrementNanoTime(System.nanoTime() - startTime);
1588       } catch (IOException e) {
1589         e = RemoteExceptionHandler.checkIOException(e);
1590         LOG.fatal(" Got while writing log entry to log", e);
1591         throw e;
1592       }
1593     }
1594 
1595     @Override
1596     public boolean keepRegionEvent(Entry entry) {
1597       ArrayList<Cell> cells = entry.getEdit().getCells();
1598       for (int i = 0; i < cells.size(); i++) {
1599         if (WALEdit.isCompactionMarker(cells.get(i))) {
1600           return true;
1601         }
1602       }
1603       return false;
1604     }
1605 
1606     /**
1607      * @return a map from encoded region ID to the number of edits written out for that region.
1608      */
1609     @Override
1610     public Map<byte[], Long> getOutputCounts() {
1611       TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
1612       synchronized (writers) {
1613         for (Map.Entry<byte[], SinkWriter> entry : writers.entrySet()) {
1614           ret.put(entry.getKey(), entry.getValue().editsWritten);
1615         }
1616       }
1617       return ret;
1618     }
1619 
1620     @Override
1621     public int getNumberOfRecoveredRegions() {
1622       return writers.size();
1623     }
1624   }
1625 
1626   /**
1627    * Class wraps the actual writer which writes data out and related statistics
1628    */
1629   public abstract static class SinkWriter {
1630     /* Count of edits written to this path */
1631     long editsWritten = 0;
1632     /* Count of edits skipped to this path */
1633     long editsSkipped = 0;
1634     /* Number of nanos spent writing to this log */
1635     long nanosSpent = 0;
1636 
1637     void incrementEdits(int edits) {
1638       editsWritten += edits;
1639     }
1640 
1641     void incrementSkippedEdits(int skipped) {
1642       editsSkipped += skipped;
1643     }
1644 
1645     void incrementNanoTime(long nanos) {
1646       nanosSpent += nanos;
1647     }
1648   }
1649 
1650   /**
1651    * Private data structure that wraps a Writer and its Path, also collecting statistics about the
1652    * data written to this output.
1653    */
1654   private final static class WriterAndPath extends SinkWriter {
1655     final Path p;
1656     final Writer w;
1657 
1658     WriterAndPath(final Path p, final Writer w) {
1659       this.p = p;
1660       this.w = w;
1661     }
1662   }
1663 
1664   /**
1665    * Class that manages to replay edits from WAL files directly to assigned fail over region servers
1666    */
1667   class LogReplayOutputSink extends OutputSink {
1668     private static final double BUFFER_THRESHOLD = 0.35;
1669     private static final String KEY_DELIMITER = "#";
1670 
1671     private long waitRegionOnlineTimeOut;
1672     private final Set<String> recoveredRegions = Collections.synchronizedSet(new HashSet<String>());
1673     private final Map<String, RegionServerWriter> writers =
1674         new ConcurrentHashMap<String, RegionServerWriter>();
1675     // online encoded region name -> region location map
1676     private final Map<String, HRegionLocation> onlineRegions =
1677         new ConcurrentHashMap<String, HRegionLocation>();
1678 
1679     private Map<TableName, HConnection> tableNameToHConnectionMap = Collections
1680         .synchronizedMap(new TreeMap<TableName, HConnection>());
1681     /**
1682      * Map key -> value layout
1683      * <servername>:<table name> -> Queue<Row>
1684      */
1685     private Map<String, List<Pair<HRegionLocation, Entry>>> serverToBufferQueueMap =
1686         new ConcurrentHashMap<String, List<Pair<HRegionLocation, Entry>>>();
1687     private List<Throwable> thrown = new ArrayList<Throwable>();
1688 
1689     // The following sink is used in distrubitedLogReplay mode for entries of regions in a disabling
1690     // table. It's a limitation of distributedLogReplay. Because log replay needs a region is
1691     // assigned and online before it can replay wal edits while regions of disabling/disabled table
1692     // won't be assigned by AM. We can retire this code after HBASE-8234.
1693     private LogRecoveredEditsOutputSink logRecoveredEditsOutputSink;
1694     private boolean hasEditsInDisablingOrDisabledTables = false;
1695 
1696     public LogReplayOutputSink(PipelineController controller, EntryBuffers entryBuffers,
1697         int numWriters) {
1698       super(controller, entryBuffers, numWriters);
1699       this.waitRegionOnlineTimeOut = conf.getInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT,
1700           ZKSplitLogManagerCoordination.DEFAULT_TIMEOUT);
1701       this.logRecoveredEditsOutputSink = new LogRecoveredEditsOutputSink(controller,
1702         entryBuffers, numWriters);
1703       this.logRecoveredEditsOutputSink.setReporter(reporter);
1704     }
1705 
1706     @Override
1707     public void append(RegionEntryBuffer buffer) throws IOException {
1708       List<Entry> entries = buffer.entryBuffer;
1709       if (entries.isEmpty()) {
1710         LOG.warn("got an empty buffer, skipping");
1711         return;
1712       }
1713 
1714       // check if current region in a disabling or disabled table
1715       if (disablingOrDisabledTables.contains(buffer.tableName)) {
1716         // need fall back to old way
1717         logRecoveredEditsOutputSink.append(buffer);
1718         hasEditsInDisablingOrDisabledTables = true;
1719         // store regions we have recovered so far
1720         addToRecoveredRegions(Bytes.toString(buffer.encodedRegionName));
1721         return;
1722       }
1723 
1724       // group entries by region servers
1725       groupEditsByServer(entries);
1726 
1727       // process workitems
1728       String maxLocKey = null;
1729       int maxSize = 0;
1730       List<Pair<HRegionLocation, Entry>> maxQueue = null;
1731       synchronized (this.serverToBufferQueueMap) {
1732         for (Map.Entry<String, List<Pair<HRegionLocation, Entry>>> entry:
1733               serverToBufferQueueMap.entrySet()) {
1734           List<Pair<HRegionLocation, Entry>> curQueue = entry.getValue();
1735           if (curQueue.size() > maxSize) {
1736             maxSize = curQueue.size();
1737             maxQueue = curQueue;
1738             maxLocKey = entry.getKey();
1739           }
1740         }
1741         if (maxSize < minBatchSize
1742             && entryBuffers.totalBuffered < BUFFER_THRESHOLD * entryBuffers.maxHeapUsage) {
1743           // buffer more to process
1744           return;
1745         } else if (maxSize > 0) {
1746           this.serverToBufferQueueMap.remove(maxLocKey);
1747         }
1748       }
1749 
1750       if (maxSize > 0) {
1751         processWorkItems(maxLocKey, maxQueue);
1752       }
1753     }
1754 
1755     private void addToRecoveredRegions(String encodedRegionName) {
1756       if (!recoveredRegions.contains(encodedRegionName)) {
1757         recoveredRegions.add(encodedRegionName);
1758       }
1759     }
1760 
1761     /**
1762      * Helper function to group WALEntries to individual region servers
1763      * @throws IOException
1764      */
1765     private void groupEditsByServer(List<Entry> entries) throws IOException {
1766       Set<TableName> nonExistentTables = null;
1767       Long cachedLastFlushedSequenceId = -1l;
1768       for (Entry entry : entries) {
1769         WALEdit edit = entry.getEdit();
1770         TableName table = entry.getKey().getTablename();
1771         // clear scopes which isn't needed for recovery
1772         entry.getKey().setScopes(null);
1773         String encodeRegionNameStr = Bytes.toString(entry.getKey().getEncodedRegionName());
1774         // skip edits of non-existent tables
1775         if (nonExistentTables != null && nonExistentTables.contains(table)) {
1776           this.skippedEdits.incrementAndGet();
1777           continue;
1778         }
1779 
1780         Map<byte[], Long> maxStoreSequenceIds = null;
1781         boolean needSkip = false;
1782         HRegionLocation loc = null;
1783         String locKey = null;
1784         List<Cell> cells = edit.getCells();
1785         List<Cell> skippedCells = new ArrayList<Cell>();
1786         HConnection hconn = this.getConnectionByTableName(table);
1787 
1788         for (Cell cell : cells) {
1789           byte[] row = CellUtil.cloneRow(cell);
1790           byte[] family = CellUtil.cloneFamily(cell);
1791           boolean isCompactionEntry = false;
1792           if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
1793             CompactionDescriptor compaction = WALEdit.getCompaction(cell);
1794             if (compaction != null && compaction.hasRegionName()) {
1795               try {
1796                 byte[][] regionName = HRegionInfo.parseRegionName(compaction.getRegionName()
1797                   .toByteArray());
1798                 row = regionName[1]; // startKey of the region
1799                 family = compaction.getFamilyName().toByteArray();
1800                 isCompactionEntry = true;
1801               } catch (Exception ex) {
1802                 LOG.warn("Unexpected exception received, ignoring " + ex);
1803                 skippedCells.add(cell);
1804                 continue;
1805               }
1806             } else {
1807               skippedCells.add(cell);
1808               continue;
1809             }
1810           }
1811 
1812           try {
1813             loc =
1814                 locateRegionAndRefreshLastFlushedSequenceId(hconn, table, row,
1815                   encodeRegionNameStr);
1816             // skip replaying the compaction if the region is gone
1817             if (isCompactionEntry && !encodeRegionNameStr.equalsIgnoreCase(
1818               loc.getRegionInfo().getEncodedName())) {
1819               LOG.info("Not replaying a compaction marker for an older region: "
1820                   + encodeRegionNameStr);
1821               needSkip = true;
1822             }
1823           } catch (TableNotFoundException ex) {
1824             // table has been deleted so skip edits of the table
1825             LOG.info("Table " + table + " doesn't exist. Skip log replay for region "
1826                 + encodeRegionNameStr);
1827             lastFlushedSequenceIds.put(encodeRegionNameStr, Long.MAX_VALUE);
1828             if (nonExistentTables == null) {
1829               nonExistentTables = new TreeSet<TableName>();
1830             }
1831             nonExistentTables.add(table);
1832             this.skippedEdits.incrementAndGet();
1833             needSkip = true;
1834             break;
1835           }
1836 
1837           cachedLastFlushedSequenceId =
1838               lastFlushedSequenceIds.get(loc.getRegionInfo().getEncodedName());
1839           if (cachedLastFlushedSequenceId != null
1840               && cachedLastFlushedSequenceId >= entry.getKey().getLogSeqNum()) {
1841             // skip the whole WAL entry
1842             this.skippedEdits.incrementAndGet();
1843             needSkip = true;
1844             break;
1845           } else {
1846             if (maxStoreSequenceIds == null) {
1847               maxStoreSequenceIds =
1848                   regionMaxSeqIdInStores.get(loc.getRegionInfo().getEncodedName());
1849             }
1850             if (maxStoreSequenceIds != null) {
1851               Long maxStoreSeqId = maxStoreSequenceIds.get(family);
1852               if (maxStoreSeqId == null || maxStoreSeqId >= entry.getKey().getLogSeqNum()) {
1853                 // skip current kv if column family doesn't exist anymore or already flushed
1854                 skippedCells.add(cell);
1855                 continue;
1856               }
1857             }
1858           }
1859         }
1860 
1861         // skip the edit
1862         if (loc == null || needSkip) continue;
1863 
1864         if (!skippedCells.isEmpty()) {
1865           cells.removeAll(skippedCells);
1866         }
1867 
1868         synchronized (serverToBufferQueueMap) {
1869           locKey = loc.getHostnamePort() + KEY_DELIMITER + table;
1870           List<Pair<HRegionLocation, Entry>> queue = serverToBufferQueueMap.get(locKey);
1871           if (queue == null) {
1872             queue =
1873                 Collections.synchronizedList(new ArrayList<Pair<HRegionLocation, Entry>>());
1874             serverToBufferQueueMap.put(locKey, queue);
1875           }
1876           queue.add(new Pair<HRegionLocation, Entry>(loc, entry));
1877         }
1878         // store regions we have recovered so far
1879         addToRecoveredRegions(loc.getRegionInfo().getEncodedName());
1880       }
1881     }
1882 
1883     /**
1884      * Locate destination region based on table name & row. This function also makes sure the
1885      * destination region is online for replay.
1886      * @throws IOException
1887      */
1888     private HRegionLocation locateRegionAndRefreshLastFlushedSequenceId(HConnection hconn,
1889         TableName table, byte[] row, String originalEncodedRegionName) throws IOException {
1890       // fetch location from cache
1891       HRegionLocation loc = onlineRegions.get(originalEncodedRegionName);
1892       if(loc != null) return loc;
1893       // fetch location from hbase:meta directly without using cache to avoid hit old dead server
1894       loc = hconn.getRegionLocation(table, row, true);
1895       if (loc == null) {
1896         throw new IOException("Can't locate location for row:" + Bytes.toString(row)
1897             + " of table:" + table);
1898       }
1899       // check if current row moves to a different region due to region merge/split
1900       if (!originalEncodedRegionName.equalsIgnoreCase(loc.getRegionInfo().getEncodedName())) {
1901         // originalEncodedRegionName should have already flushed
1902         lastFlushedSequenceIds.put(originalEncodedRegionName, Long.MAX_VALUE);
1903         HRegionLocation tmpLoc = onlineRegions.get(loc.getRegionInfo().getEncodedName());
1904         if (tmpLoc != null) return tmpLoc;
1905       }
1906 
1907       Long lastFlushedSequenceId = -1l;
1908       AtomicBoolean isRecovering = new AtomicBoolean(true);
1909       loc = waitUntilRegionOnline(loc, row, this.waitRegionOnlineTimeOut, isRecovering);
1910       if (!isRecovering.get()) {
1911         // region isn't in recovering at all because WAL file may contain a region that has
1912         // been moved to somewhere before hosting RS fails
1913         lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), Long.MAX_VALUE);
1914         LOG.info("logReplay skip region: " + loc.getRegionInfo().getEncodedName()
1915             + " because it's not in recovering.");
1916       } else {
1917         Long cachedLastFlushedSequenceId =
1918             lastFlushedSequenceIds.get(loc.getRegionInfo().getEncodedName());
1919 
1920         // retrieve last flushed sequence Id from ZK. Because region postOpenDeployTasks will
1921         // update the value for the region
1922         RegionStoreSequenceIds ids =
1923             csm.getSplitLogWorkerCoordination().getRegionFlushedSequenceId(failedServerName,
1924               loc.getRegionInfo().getEncodedName());
1925         if (ids != null) {
1926           lastFlushedSequenceId = ids.getLastFlushedSequenceId();
1927           Map<byte[], Long> storeIds = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
1928           List<StoreSequenceId> maxSeqIdInStores = ids.getStoreSequenceIdList();
1929           for (StoreSequenceId id : maxSeqIdInStores) {
1930             storeIds.put(id.getFamilyName().toByteArray(), id.getSequenceId());
1931           }
1932           regionMaxSeqIdInStores.put(loc.getRegionInfo().getEncodedName(), storeIds);
1933         }
1934 
1935         if (cachedLastFlushedSequenceId == null
1936             || lastFlushedSequenceId > cachedLastFlushedSequenceId) {
1937           lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), lastFlushedSequenceId);
1938         }
1939       }
1940 
1941       onlineRegions.put(loc.getRegionInfo().getEncodedName(), loc);
1942       return loc;
1943     }
1944 
1945     private void processWorkItems(String key, List<Pair<HRegionLocation, Entry>> actions)
1946         throws IOException {
1947       RegionServerWriter rsw = null;
1948 
1949       long startTime = System.nanoTime();
1950       try {
1951         rsw = getRegionServerWriter(key);
1952         rsw.sink.replayEntries(actions);
1953 
1954         // Pass along summary statistics
1955         rsw.incrementEdits(actions.size());
1956         rsw.incrementNanoTime(System.nanoTime() - startTime);
1957       } catch (IOException e) {
1958         e = RemoteExceptionHandler.checkIOException(e);
1959         LOG.fatal(" Got while writing log entry to log", e);
1960         throw e;
1961       }
1962     }
1963 
1964     /**
1965      * Wait until region is online on the destination region server
1966      * @param loc
1967      * @param row
1968      * @param timeout How long to wait
1969      * @param isRecovering Recovering state of the region interested on destination region server.
1970      * @return True when region is online on the destination region server
1971      * @throws InterruptedException
1972      */
1973     private HRegionLocation waitUntilRegionOnline(HRegionLocation loc, byte[] row,
1974         final long timeout, AtomicBoolean isRecovering)
1975         throws IOException {
1976       final long endTime = EnvironmentEdgeManager.currentTime() + timeout;
1977       final long pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
1978         HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
1979       boolean reloadLocation = false;
1980       TableName tableName = loc.getRegionInfo().getTable();
1981       int tries = 0;
1982       Throwable cause = null;
1983       while (endTime > EnvironmentEdgeManager.currentTime()) {
1984         try {
1985           // Try and get regioninfo from the hosting server.
1986           HConnection hconn = getConnectionByTableName(tableName);
1987           if(reloadLocation) {
1988             loc = hconn.getRegionLocation(tableName, row, true);
1989           }
1990           BlockingInterface remoteSvr = hconn.getAdmin(loc.getServerName());
1991           HRegionInfo region = loc.getRegionInfo();
1992           try {
1993             GetRegionInfoRequest request =
1994                 RequestConverter.buildGetRegionInfoRequest(region.getRegionName());
1995             GetRegionInfoResponse response = remoteSvr.getRegionInfo(null, request);
1996             if (HRegionInfo.convert(response.getRegionInfo()) != null) {
1997               isRecovering.set((response.hasIsRecovering()) ? response.getIsRecovering() : true);
1998               return loc;
1999             }
2000           } catch (ServiceException se) {
2001             throw ProtobufUtil.getRemoteException(se);
2002           }
2003         } catch (IOException e) {
2004           cause = e.getCause();
2005           if(!(cause instanceof RegionOpeningException)) {
2006             reloadLocation = true;
2007           }
2008         }
2009         long expectedSleep = ConnectionUtils.getPauseTime(pause, tries);
2010         try {
2011           Thread.sleep(expectedSleep);
2012         } catch (InterruptedException e) {
2013           throw new IOException("Interrupted when waiting region " +
2014               loc.getRegionInfo().getEncodedName() + " online.", e);
2015         }
2016         tries++;
2017       }
2018 
2019       throw new IOException("Timeout when waiting region " + loc.getRegionInfo().getEncodedName() +
2020         " online for " + timeout + " milliseconds.", cause);
2021     }
2022 
2023     @Override
2024     public boolean flush() throws IOException {
2025       String curLoc = null;
2026       int curSize = 0;
2027       List<Pair<HRegionLocation, Entry>> curQueue = null;
2028       synchronized (this.serverToBufferQueueMap) {
2029         for (Map.Entry<String, List<Pair<HRegionLocation, Entry>>> entry :
2030                 serverToBufferQueueMap.entrySet()) {
2031           String locationKey = entry.getKey();
2032           curQueue = entry.getValue();
2033           if (!curQueue.isEmpty()) {
2034             curSize = curQueue.size();
2035             curLoc = locationKey;
2036             break;
2037           }
2038         }
2039         if (curSize > 0) {
2040           this.serverToBufferQueueMap.remove(curLoc);
2041         }
2042       }
2043 
2044       if (curSize > 0) {
2045         this.processWorkItems(curLoc, curQueue);
2046         // We should already have control of the monitor; ensure this is the case.
2047         synchronized(controller.dataAvailable) {
2048           controller.dataAvailable.notifyAll();
2049         }
2050         return true;
2051       }
2052       return false;
2053     }
2054 
2055     @Override
2056     public boolean keepRegionEvent(Entry entry) {
2057       return true;
2058     }
2059 
2060     void addWriterError(Throwable t) {
2061       thrown.add(t);
2062     }
2063 
2064     @Override
2065     public List<Path> finishWritingAndClose() throws IOException {
2066       try {
2067         if (!finishWriting(false)) {
2068           return null;
2069         }
2070         if (hasEditsInDisablingOrDisabledTables) {
2071           splits = logRecoveredEditsOutputSink.finishWritingAndClose();
2072         } else {
2073           splits = new ArrayList<Path>();
2074         }
2075         // returns an empty array in order to keep interface same as old way
2076         return splits;
2077       } finally {
2078         List<IOException> thrown = closeRegionServerWriters();
2079         if (thrown != null && !thrown.isEmpty()) {
2080           throw MultipleIOException.createIOException(thrown);
2081         }
2082       }
2083     }
2084 
2085     @Override
2086     int getNumOpenWriters() {
2087       return this.writers.size() + this.logRecoveredEditsOutputSink.getNumOpenWriters();
2088     }
2089 
2090     private List<IOException> closeRegionServerWriters() throws IOException {
2091       List<IOException> result = null;
2092       if (!writersClosed) {
2093         result = Lists.newArrayList();
2094         try {
2095           for (WriterThread t : writerThreads) {
2096             while (t.isAlive()) {
2097               t.shouldStop = true;
2098               t.interrupt();
2099               try {
2100                 t.join(10);
2101               } catch (InterruptedException e) {
2102                 IOException iie = new InterruptedIOException();
2103                 iie.initCause(e);
2104                 throw iie;
2105               }
2106             }
2107           }
2108         } finally {
2109           synchronized (writers) {
2110             for (Map.Entry<String, RegionServerWriter> entry : writers.entrySet()) {
2111               String locationKey = entry.getKey();
2112               RegionServerWriter tmpW = entry.getValue();
2113               try {
2114                 tmpW.close();
2115               } catch (IOException ioe) {
2116                 LOG.error("Couldn't close writer for region server:" + locationKey, ioe);
2117                 result.add(ioe);
2118               }
2119             }
2120           }
2121 
2122           // close connections
2123           synchronized (this.tableNameToHConnectionMap) {
2124             for (Map.Entry<TableName, HConnection> entry :
2125                   tableNameToHConnectionMap.entrySet()) {
2126               TableName tableName = entry.getKey();
2127               HConnection hconn = entry.getValue();
2128               try {
2129                 hconn.clearRegionCache();
2130                 hconn.close();
2131               } catch (IOException ioe) {
2132                 result.add(ioe);
2133               }
2134             }
2135           }
2136           writersClosed = true;
2137         }
2138       }
2139       return result;
2140     }
2141 
2142     @Override
2143     public Map<byte[], Long> getOutputCounts() {
2144       TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
2145       synchronized (writers) {
2146         for (Map.Entry<String, RegionServerWriter> entry : writers.entrySet()) {
2147           ret.put(Bytes.toBytes(entry.getKey()), entry.getValue().editsWritten);
2148         }
2149       }
2150       return ret;
2151     }
2152 
2153     @Override
2154     public int getNumberOfRecoveredRegions() {
2155       return this.recoveredRegions.size();
2156     }
2157 
2158     /**
2159      * Get a writer and path for a log starting at the given entry. This function is threadsafe so
2160      * long as multiple threads are always acting on different regions.
2161      * @return null if this region shouldn't output any logs
2162      */
2163     private RegionServerWriter getRegionServerWriter(String loc) throws IOException {
2164       RegionServerWriter ret = writers.get(loc);
2165       if (ret != null) {
2166         return ret;
2167       }
2168 
2169       TableName tableName = getTableFromLocationStr(loc);
2170       if(tableName == null){
2171         throw new IOException("Invalid location string:" + loc + " found. Replay aborted.");
2172       }
2173 
2174       HConnection hconn = getConnectionByTableName(tableName);
2175       synchronized (writers) {
2176         ret = writers.get(loc);
2177         if (ret == null) {
2178           ret = new RegionServerWriter(conf, tableName, hconn);
2179           writers.put(loc, ret);
2180         }
2181       }
2182       return ret;
2183     }
2184 
2185     private HConnection getConnectionByTableName(final TableName tableName) throws IOException {
2186       HConnection hconn = this.tableNameToHConnectionMap.get(tableName);
2187       if (hconn == null) {
2188         synchronized (this.tableNameToHConnectionMap) {
2189           hconn = this.tableNameToHConnectionMap.get(tableName);
2190           if (hconn == null) {
2191             hconn = HConnectionManager.getConnection(conf);
2192             this.tableNameToHConnectionMap.put(tableName, hconn);
2193           }
2194         }
2195       }
2196       return hconn;
2197     }
2198     private TableName getTableFromLocationStr(String loc) {
2199       /**
2200        * location key is in format <server name:port>#<table name>
2201        */
2202       String[] splits = loc.split(KEY_DELIMITER);
2203       if (splits.length != 2) {
2204         return null;
2205       }
2206       return TableName.valueOf(splits[1]);
2207     }
2208   }
2209 
2210   /**
2211    * Private data structure that wraps a receiving RS and collecting statistics about the data
2212    * written to this newly assigned RS.
2213    */
2214   private final static class RegionServerWriter extends SinkWriter {
2215     final WALEditsReplaySink sink;
2216 
2217     RegionServerWriter(final Configuration conf, final TableName tableName, final HConnection conn)
2218         throws IOException {
2219       this.sink = new WALEditsReplaySink(conf, tableName, conn);
2220     }
2221 
2222     void close() throws IOException {
2223     }
2224   }
2225 
2226   static class CorruptedLogFileException extends Exception {
2227     private static final long serialVersionUID = 1L;
2228 
2229     CorruptedLogFileException(String s) {
2230       super(s);
2231     }
2232   }
2233 
2234   /** A struct used by getMutationsFromWALEntry */
2235   public static class MutationReplay {
2236     public MutationReplay(MutationType type, Mutation mutation, long nonceGroup, long nonce) {
2237       this.type = type;
2238       this.mutation = mutation;
2239       if(this.mutation.getDurability() != Durability.SKIP_WAL) {
2240         // using ASYNC_WAL for relay
2241         this.mutation.setDurability(Durability.ASYNC_WAL);
2242       }
2243       this.nonceGroup = nonceGroup;
2244       this.nonce = nonce;
2245     }
2246 
2247     public final MutationType type;
2248     public final Mutation mutation;
2249     public final long nonceGroup;
2250     public final long nonce;
2251   }
2252 
2253   /**
2254    * This function is used to construct mutations from a WALEntry. It also
2255    * reconstructs WALKey &amp; WALEdit from the passed in WALEntry
2256    * @param entry
2257    * @param cells
2258    * @param logEntry pair of WALKey and WALEdit instance stores WALKey and WALEdit instances
2259    *          extracted from the passed in WALEntry.
2260    * @param durability
2261    * @return list of Pair&lt;MutationType, Mutation&gt; to be replayed
2262    * @throws IOException
2263    */
2264   public static List<MutationReplay> getMutationsFromWALEntry(WALEntry entry, CellScanner cells,
2265       Pair<WALKey, WALEdit> logEntry, Durability durability)
2266           throws IOException {
2267     if (entry == null) {
2268       // return an empty array
2269       return new ArrayList<MutationReplay>();
2270     }
2271 
2272     long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ?
2273       entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber();
2274     int count = entry.getAssociatedCellCount();
2275     List<MutationReplay> mutations = new ArrayList<MutationReplay>();
2276     Cell previousCell = null;
2277     Mutation m = null;
2278     WALKey key = null;
2279     WALEdit val = null;
2280     if (logEntry != null) val = new WALEdit();
2281 
2282     for (int i = 0; i < count; i++) {
2283       // Throw index out of bounds if our cell count is off
2284       if (!cells.advance()) {
2285         throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
2286       }
2287       Cell cell = cells.current();
2288       if (val != null) val.add(cell);
2289 
2290       boolean isNewRowOrType =
2291           previousCell == null || previousCell.getTypeByte() != cell.getTypeByte()
2292               || !CellUtil.matchingRow(previousCell, cell);
2293       if (isNewRowOrType) {
2294         // Create new mutation
2295         if (CellUtil.isDelete(cell)) {
2296           m = new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
2297           // Deletes don't have nonces.
2298           mutations.add(new MutationReplay(
2299               MutationType.DELETE, m, HConstants.NO_NONCE, HConstants.NO_NONCE));
2300         } else {
2301           m = new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
2302           // Puts might come from increment or append, thus we need nonces.
2303           long nonceGroup = entry.getKey().hasNonceGroup()
2304               ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
2305           long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
2306           mutations.add(new MutationReplay(MutationType.PUT, m, nonceGroup, nonce));
2307         }
2308       }
2309       if (CellUtil.isDelete(cell)) {
2310         ((Delete) m).addDeleteMarker(cell);
2311       } else {
2312         ((Put) m).add(cell);
2313       }
2314       m.setDurability(durability);
2315       previousCell = cell;
2316     }
2317 
2318     // reconstruct WALKey
2319     if (logEntry != null) {
2320       org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey walKeyProto = entry.getKey();
2321       List<UUID> clusterIds = new ArrayList<UUID>(walKeyProto.getClusterIdsCount());
2322       for (HBaseProtos.UUID uuid : entry.getKey().getClusterIdsList()) {
2323         clusterIds.add(new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits()));
2324       }
2325       // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
2326       key = new HLogKey(walKeyProto.getEncodedRegionName().toByteArray(), TableName.valueOf(
2327               walKeyProto.getTableName().toByteArray()), replaySeqId, walKeyProto.getWriteTime(),
2328               clusterIds, walKeyProto.getNonceGroup(), walKeyProto.getNonce(), null);
2329       logEntry.setFirst(key);
2330       logEntry.setSecond(val);
2331     }
2332 
2333     return mutations;
2334   }
2335 }