1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver.wal;
20  
21  import java.io.EOFException;
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.lang.reflect.Constructor;
25  import java.lang.reflect.InvocationTargetException;
26  import java.net.ConnectException;
27  import java.text.ParseException;
28  import java.util.ArrayList;
29  import java.util.Collections;
30  import java.util.HashSet;
31  import java.util.LinkedList;
32  import java.util.List;
33  import java.util.Map;
34  import java.util.Set;
35  import java.util.TreeMap;
36  import java.util.TreeSet;
37  import java.util.concurrent.Callable;
38  import java.util.concurrent.CompletionService;
39  import java.util.concurrent.ConcurrentHashMap;
40  import java.util.concurrent.ConcurrentLinkedQueue;
41  import java.util.concurrent.CountDownLatch;
42  import java.util.concurrent.ExecutionException;
43  import java.util.concurrent.ExecutorCompletionService;
44  import java.util.concurrent.Future;
45  import java.util.concurrent.ThreadFactory;
46  import java.util.concurrent.ThreadPoolExecutor;
47  import java.util.concurrent.TimeUnit;
48  import java.util.concurrent.atomic.AtomicInteger;
49  import java.util.concurrent.atomic.AtomicLong;
50  import java.util.concurrent.atomic.AtomicReference;
51  
52  import org.apache.commons.logging.Log;
53  import org.apache.commons.logging.LogFactory;
54  import org.apache.hadoop.classification.InterfaceAudience;
55  import org.apache.hadoop.conf.Configuration;
56  import org.apache.hadoop.fs.FileStatus;
57  import org.apache.hadoop.fs.FileSystem;
58  import org.apache.hadoop.fs.Path;
59  import org.apache.hadoop.hbase.HConstants;
60  import org.apache.hadoop.hbase.HRegionInfo;
61  import org.apache.hadoop.hbase.HRegionLocation;
62  import org.apache.hadoop.hbase.HTableDescriptor;
63  import org.apache.hadoop.hbase.KeyValue;
64  import org.apache.hadoop.hbase.RemoteExceptionHandler;
65  import org.apache.hadoop.hbase.ServerName;
66  import org.apache.hadoop.hbase.client.ConnectionUtils;
67  import org.apache.hadoop.hbase.client.Delete;
68  import org.apache.hadoop.hbase.client.HConnection;
69  import org.apache.hadoop.hbase.client.HConnectionManager;
70  import org.apache.hadoop.hbase.client.Put;
71  import org.apache.hadoop.hbase.client.RetriesExhaustedException;
72  import org.apache.hadoop.hbase.client.Row;
73  import org.apache.hadoop.hbase.exceptions.OrphanHLogAfterSplitException;
74  import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
75  import org.apache.hadoop.hbase.exceptions.TableNotFoundException;
76  import org.apache.hadoop.hbase.io.HeapSize;
77  import org.apache.hadoop.hbase.master.RegionState;
78  import org.apache.hadoop.hbase.master.SplitLogManager;
79  import org.apache.hadoop.hbase.monitoring.MonitoredTask;
80  import org.apache.hadoop.hbase.monitoring.TaskMonitor;
81  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
82  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
83  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.Table;
84  import org.apache.hadoop.hbase.regionserver.HRegion;
85  import org.apache.hadoop.hbase.regionserver.LastSequenceId;
86  import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
87  import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
88  import org.apache.hadoop.hbase.regionserver.wal.HLog.Writer;
89  import org.apache.hadoop.hbase.util.Bytes;
90  import org.apache.hadoop.hbase.util.CancelableProgressable;
91  import org.apache.hadoop.hbase.util.ClassSize;
92  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
93  import org.apache.hadoop.hbase.util.FSUtils;
94  import org.apache.hadoop.hbase.util.Pair;
95  import org.apache.hadoop.hbase.util.Threads;
96  import org.apache.hadoop.hbase.zookeeper.ZKAssign;
97  import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
98  import org.apache.hadoop.hbase.zookeeper.ZKTable;
99  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
100 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
101 import org.apache.hadoop.io.MultipleIOException;
102 import org.apache.hadoop.ipc.RemoteException;
103 import org.apache.zookeeper.KeeperException;
104 import org.apache.zookeeper.data.Stat;
105 
106 import com.google.common.base.Preconditions;
107 import com.google.common.collect.Lists;
108 
109 /**
110  * This class is responsible for splitting up a bunch of regionserver commit log
111  * files that are no longer being written to, into new files, one per region for
112  * region to replay on startup. Delete the old log files when finished.
113  */
114 @InterfaceAudience.Private
115 public class HLogSplitter {
116   private static final String LOG_SPLITTER_IMPL = "hbase.hlog.splitter.impl";
117 
118   static final Log LOG = LogFactory.getLog(HLogSplitter.class);
119 
120   private boolean hasSplit = false;
121   private long splitTime = 0;
122   private long splitSize = 0;
123 
124 
125   // Parameters for split process
126   protected final Path rootDir;
127   protected final Path srcDir;
128   protected final Path oldLogDir;
129   protected final FileSystem fs;
130   protected final Configuration conf;
131 
132   // Major subcomponents of the split process.
133   // These are separated into inner classes to make testing easier.
134   OutputSink outputSink;
135   EntryBuffers entryBuffers;
136 
137   private Set<String> disablingOrDisabledTables = new HashSet<String>();
138   private ZooKeeperWatcher watcher;
139 
140   // If an exception is thrown by one of the other threads, it will be
141   // stored here.
142   protected AtomicReference<Throwable> thrown = new AtomicReference<Throwable>();
143 
144   // Wait/notify for when data has been produced by the reader thread,
145   // consumed by the reader thread, or an exception occurred
146   final Object dataAvailable = new Object();
147 
148   private MonitoredTask status;
149 
150   // For checking the latest flushed sequence id
151   protected final LastSequenceId sequenceIdChecker;
152 
153   protected boolean distributedLogReplay;
154 
155   // Map encodedRegionName -> lastFlushedSequenceId
156   Map<String, Long> lastFlushedSequenceIds = new ConcurrentHashMap<String, Long>();
157 
158   // Number of writer threads
159   private final int numWriterThreads;
160 
161   // Min batch size when replay WAL edits
162   private final int minBatchSize;
163   
164   /**
165    * Create a new HLogSplitter using the given {@link Configuration} and the
166    * <code>hbase.hlog.splitter.impl</code> property to derived the instance class to use.
167    * distributedLogReplay won't be enabled by this constructor.
168    * <p>
169    * @param conf
170    * @param rootDir hbase directory
171    * @param srcDir logs directory
172    * @param oldLogDir directory where processed logs are archived to
173    * @param fs FileSystem
174    * @return New HLogSplitter instance
175    */
176   public static HLogSplitter createLogSplitter(Configuration conf,
177       final Path rootDir, final Path srcDir,
178       Path oldLogDir, final FileSystem fs)  {
179 
180     @SuppressWarnings("unchecked")
181     Class<? extends HLogSplitter> splitterClass = (Class<? extends HLogSplitter>) conf
182         .getClass(LOG_SPLITTER_IMPL, HLogSplitter.class);
183     try {
184        Constructor<? extends HLogSplitter> constructor =
185          splitterClass.getConstructor(
186           Configuration.class, // conf
187           Path.class, // rootDir
188           Path.class, // srcDir
189           Path.class, // oldLogDir
190           FileSystem.class, // fs
191           LastSequenceId.class);
192       return constructor.newInstance(conf, rootDir, srcDir, oldLogDir, fs, null);
193     } catch (IllegalArgumentException e) {
194       throw new RuntimeException(e);
195     } catch (InstantiationException e) {
196       throw new RuntimeException(e);
197     } catch (IllegalAccessException e) {
198       throw new RuntimeException(e);
199     } catch (InvocationTargetException e) {
200       throw new RuntimeException(e);
201     } catch (SecurityException e) {
202       throw new RuntimeException(e);
203     } catch (NoSuchMethodException e) {
204       throw new RuntimeException(e);
205     }
206   }
207 
208   public HLogSplitter(Configuration conf, Path rootDir, Path srcDir,
209       Path oldLogDir, FileSystem fs, LastSequenceId idChecker) {
210       this(conf, rootDir, srcDir, oldLogDir, fs, idChecker, null);
211   }
212 
213   public HLogSplitter(Configuration conf, Path rootDir, Path srcDir,
214       Path oldLogDir, FileSystem fs, LastSequenceId idChecker, ZooKeeperWatcher zkw) {
215     this.conf = conf;
216     this.rootDir = rootDir;
217     this.srcDir = srcDir;
218     this.oldLogDir = oldLogDir;
219     this.fs = fs;
220     this.sequenceIdChecker = idChecker;
221     this.watcher = zkw;
222 
223     entryBuffers = new EntryBuffers(
224         conf.getInt("hbase.regionserver.hlog.splitlog.buffersize",
225             128*1024*1024));
226 
227     this.minBatchSize = conf.getInt("hbase.regionserver.wal.logreplay.batch.size", 512);
228     this.distributedLogReplay = this.conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, 
229       HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG);
230 
231     this.numWriterThreads = conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
232     if (zkw != null && this.distributedLogReplay) {
233       outputSink = new LogReplayOutputSink(numWriterThreads);
234     } else {
235       if (this.distributedLogReplay) {
236         LOG.info("ZooKeeperWatcher is passed in as NULL so disable distrubitedLogRepaly.");
237       }
238       this.distributedLogReplay = false;
239       outputSink = new LogRecoveredEditsOutputSink(numWriterThreads);
240     }
241   }
242 
243   /**
244    * Split up a bunch of regionserver commit log files that are no longer being
245    * written to, into new files, one per region for region to replay on startup.
246    * Delete the old log files when finished.
247    *
248    * @throws IOException will throw if corrupted hlogs aren't tolerated
249    * @return the list of splits
250    */
251   public List<Path> splitLog()
252       throws IOException {
253     return splitLog((CountDownLatch) null);
254   }
255   
256   /**
257    * Split up a bunch of regionserver commit log files that are no longer being
258    * written to, into new files, one per region for region to replay on startup.
259    * Delete the old log files when finished.
260    *
261    * @param latch
262    * @throws IOException will throw if corrupted hlogs aren't tolerated
263    * @return the list of splits
264    */
265   public List<Path> splitLog(CountDownLatch latch)
266       throws IOException {
267     Preconditions.checkState(!hasSplit,
268         "An HLogSplitter instance may only be used once");
269     hasSplit = true;
270 
271     status = TaskMonitor.get().createStatus(
272         "Splitting logs in " + srcDir);
273 
274     long startTime = EnvironmentEdgeManager.currentTimeMillis();
275 
276     status.setStatus("Determining files to split...");
277     List<Path> splits = null;
278     if (!fs.exists(srcDir)) {
279       // Nothing to do
280       status.markComplete("No log directory existed to split.");
281       return splits;
282     }
283     FileStatus[] logfiles = fs.listStatus(srcDir);
284     if (logfiles == null || logfiles.length == 0) {
285       // Nothing to do
286       return splits;
287     }
288     logAndReport("Splitting " + logfiles.length + " hlog(s) in "
289     + srcDir.toString());
290     splits = splitLog(logfiles, latch);
291 
292     splitTime = EnvironmentEdgeManager.currentTimeMillis() - startTime;
293     String msg = "hlog file splitting completed in " + splitTime +
294         " ms for " + srcDir.toString();
295     status.markComplete(msg);
296     LOG.info(msg);
297     return splits;
298   }
299 
300   private void logAndReport(String msg) {
301     status.setStatus(msg);
302     LOG.info(msg);
303   }
304 
305   /**
306    * @return time that this split took
307    */
308   public long getTime() {
309     return this.splitTime;
310   }
311 
312   /**
313    * @return aggregate size of hlogs that were split
314    */
315   public long getSize() {
316     return this.splitSize;
317   }
318 
319   /**
320    * @return a map from encoded region ID to the number of edits written out
321    * for that region.
322    */
323   Map<byte[], Long> getOutputCounts() {
324     Preconditions.checkState(hasSplit);
325     return outputSink.getOutputCounts();
326   }
327 
328   /**
329    * Splits or Replays the HLog edits in the given list of logfiles (that are a mix of edits on
330    * multiple regions) by region and then splits(or replay when distributedLogReplay is true) them
331    * per region directories, in batches.
332    * <p>
333    * This process is split into multiple threads. In the main thread, we loop through the logs to be
334    * split. For each log, we:
335    * <ul>
336    * <li>Recover it (take and drop HDFS lease) to ensure no other process can write</li>
337    * <li>Read each edit (see {@link #parseHLog}</li>
338    * <li>Mark as "processed" or "corrupt" depending on outcome</li>
339    * </ul>
340    * <p>
341    * Each edit is passed into the EntryBuffers instance, which takes care of memory accounting and
342    * splitting the edits by region.
343    * <p>
344    * The OutputSink object then manages N other WriterThreads which pull chunks of edits from
345    * EntryBuffers and write them to either recovered.edits files or replay them to newly assigned
346    * region servers directly
347    * <p>
348    * After the process is complete, the log files are archived to a separate directory.
349    */
350   private List<Path> splitLog(final FileStatus[] logfiles, CountDownLatch latch)
351       throws IOException {
352     List<Path> processedLogs = new ArrayList<Path>(logfiles.length);
353     List<Path> corruptedLogs = new ArrayList<Path>(logfiles.length);
354     List<Path> splits;
355 
356     boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors", true);
357 
358     countTotalBytes(logfiles);
359     splitSize = 0;
360 
361     outputSink.startWriterThreads();
362 
363     try {
364       int i = 0;
365       for (FileStatus log : logfiles) {
366        Path logPath = log.getPath();
367         long logLength = log.getLen();
368         splitSize += logLength;
369         logAndReport("Splitting hlog " + (i++ + 1) + " of " + logfiles.length
370             + ": " + logPath + ", length=" + logLength);
371         Reader in = null;
372         try {
373           //actually, for meta-only hlogs, we don't need to go thru the process
374           //of parsing and segregating by regions since all the logs are for
375           //meta only. However, there is a sequence number that can be obtained
376           //only by parsing.. so we parse for all files currently
377           //TODO: optimize this part somehow
378           in = getReader(fs, log, conf, skipErrors, null);
379           if (in != null) {
380             parseHLog(in, logPath, entryBuffers, fs, conf, skipErrors);
381           }
382           processedLogs.add(logPath);
383         } catch (CorruptedLogFileException e) {
384           LOG.info("Got while parsing hlog " + logPath +
385               ". Marking as corrupted", e);
386           corruptedLogs.add(logPath);
387         } finally {
388           if (in != null) {
389             try {
390               in.close();
391             } catch (IOException e) {
392               LOG.warn("Close log reader threw exception -- continuing", e);
393             }
394           }
395         }
396       }
397       status.setStatus("Log splits complete. Checking for orphaned logs.");
398 
399       if (latch != null) {
400         try {
401           latch.await();
402         } catch (InterruptedException ie) {
403           LOG.warn("wait for latch interrupted");
404           Thread.currentThread().interrupt();
405         }
406       }
407       FileStatus[] currFiles = fs.listStatus(srcDir);
408       if (currFiles.length > processedLogs.size()
409           + corruptedLogs.size()) {
410         throw new OrphanHLogAfterSplitException(
411           "Discovered orphan hlog after split. Maybe the "
412             + "HRegionServer was not dead when we started");
413       }
414     } finally {
415       status.setStatus("Finishing writing output logs and closing down.");
416       splits = outputSink.finishWritingAndClose();
417     }
418     status.setStatus("Archiving logs after completed split");
419     archiveLogs(srcDir, corruptedLogs, processedLogs, oldLogDir, fs, conf);
420     return splits;
421   }
422 
423   /**
424    * @return the total size of the passed list of files.
425    */
426   private static long countTotalBytes(FileStatus[] logfiles) {
427     long ret = 0;
428     for (FileStatus stat : logfiles) {
429       ret += stat.getLen();
430     }
431     return ret;
432   }
433 
434   /**
435    * Splits a HLog file into region's recovered-edits directory
436    * <p>
437    * If the log file has N regions then N recovered.edits files will be produced.
438    * <p>
439    * @param rootDir
440    * @param logfile
441    * @param fs
442    * @param conf
443    * @param reporter
444    * @param idChecker
445    * @param zkw ZooKeeperWatcher if it's null, we will back to the old-style log splitting where we
446    *          dump out recoved.edits files for regions to replay on.
447    * @return false if it is interrupted by the progress-able.
448    * @throws IOException
449    */
450   static public boolean splitLogFile(Path rootDir, FileStatus logfile, FileSystem fs,
451       Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker,
452       ZooKeeperWatcher zkw)
453       throws IOException {
454     HLogSplitter s = new HLogSplitter(conf, rootDir, null, null/* oldLogDir */, fs, idChecker, zkw);
455     return s.splitLogFile(logfile, reporter);
456   }
457 
458   /**
459    * Splits a HLog file into region's recovered-edits directory
460    * <p>
461    * If the log file has N regions then N recovered.edits files will be produced.
462    * <p>
463    * @param rootDir
464    * @param logfile
465    * @param fs
466    * @param conf
467    * @param reporter
468    * @return false if it is interrupted by the progress-able.
469    * @throws IOException
470    */
471   static public boolean splitLogFile(Path rootDir, FileStatus logfile, FileSystem fs,
472       Configuration conf, CancelableProgressable reporter)
473       throws IOException {
474     return HLogSplitter.splitLogFile(rootDir, logfile, fs, conf, reporter, null, null);
475   }
476 
477   public boolean splitLogFile(FileStatus logfile,
478       CancelableProgressable reporter) throws IOException {
479     boolean isCorrupted = false;
480     Preconditions.checkState(status == null);
481     boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors",
482       HLog.SPLIT_SKIP_ERRORS_DEFAULT);
483     int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024);
484     Path logPath = logfile.getPath();
485     boolean outputSinkStarted = false;
486     boolean progress_failed = false;
487     int editsCount = 0;
488     int editsSkipped = 0;
489 
490     try {
491       status = TaskMonitor.get().createStatus(
492         "Splitting log file " + logfile.getPath() +
493         "into a temporary staging area.");
494       long logLength = logfile.getLen();
495       LOG.info("Splitting hlog: " + logPath + ", length=" + logLength);
496       LOG.info("DistributedLogReplay = " + this.distributedLogReplay);
497       status.setStatus("Opening log file");
498       if (reporter != null && !reporter.progress()) {
499         progress_failed = true;
500         return false;
501       }
502       Reader in = null;
503       try {
504         in = getReader(fs, logfile, conf, skipErrors, reporter);
505       } catch (CorruptedLogFileException e) {
506         LOG.warn("Could not get reader, corrupted log file " + logPath, e);
507         ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs);
508         isCorrupted = true;
509       }
510       if (in == null) {
511         status.markComplete("Was nothing to split in log file");
512         LOG.warn("Nothing to split in log file " + logPath);
513         return true;
514       }
515       if(watcher != null) {
516         try {
517           disablingOrDisabledTables = ZKTable.getDisabledOrDisablingTables(watcher);
518         } catch (KeeperException e) {
519           throw new IOException("Can't get disabling/disabled tables", e);
520         }
521       }
522       int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3);
523       int numOpenedFilesLastCheck = 0;
524       outputSink.setReporter(reporter);
525       outputSink.startWriterThreads();
526       outputSinkStarted = true;
527       Entry entry;
528       Long lastFlushedSequenceId = -1L;
529       ServerName serverName = HLogUtil.getServerNameFromHLogDirectoryName(logPath);
530       String serverNameStr = (serverName == null) ? "" : serverName.getServerName(); 
531       while ((entry = getNextLogLine(in, logPath, skipErrors)) != null) {
532         byte[] region = entry.getKey().getEncodedRegionName();
533         String key = Bytes.toString(region);
534         lastFlushedSequenceId = lastFlushedSequenceIds.get(key);
535         if (lastFlushedSequenceId == null) {
536           if (this.distributedLogReplay) {
537             lastFlushedSequenceId = SplitLogManager.getLastFlushedSequenceId(this.watcher,
538               serverNameStr, key);
539           } else if (sequenceIdChecker != null) {
540             lastFlushedSequenceId = sequenceIdChecker.getLastSequenceId(region);
541           }
542           if (lastFlushedSequenceId != null) {
543             lastFlushedSequenceIds.put(key, lastFlushedSequenceId);
544           } else {
545             lastFlushedSequenceId = -1L;
546           }
547         }
548         if (lastFlushedSequenceId >= entry.getKey().getLogSeqNum()) {
549           editsSkipped++;
550           continue;
551         }
552         entryBuffers.appendEntry(entry);
553         editsCount++;
554         int moreWritersFromLastCheck = this.getNumOpenWriters() - numOpenedFilesLastCheck;
555         // If sufficient edits have passed, check if we should report progress.
556         if (editsCount % interval == 0
557             || moreWritersFromLastCheck > numOpenedFilesBeforeReporting) {
558           numOpenedFilesLastCheck = this.getNumOpenWriters();
559           String countsStr = (editsCount - (editsSkipped + outputSink.getSkippedEdits()))
560               + " edits, skipped " + editsSkipped + " edits.";
561           status.setStatus("Split " + countsStr);
562           if (reporter != null && !reporter.progress()) {
563             progress_failed = true;
564             return false;
565           }
566         }
567       }
568     } catch (InterruptedException ie) {
569       IOException iie = new InterruptedIOException();
570       iie.initCause(ie);
571       throw iie;
572     } catch (CorruptedLogFileException e) {
573       LOG.warn("Could not parse, corrupted log file " + logPath, e);
574       ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs);
575       isCorrupted = true;
576     } catch (IOException e) {
577       e = RemoteExceptionHandler.checkIOException(e);
578       throw e;
579     } finally {
580       LOG.info("Finishing writing output logs and closing down.");
581       if (outputSinkStarted) {
582         progress_failed = outputSink.finishWritingAndClose() == null;
583       }
584       String msg = "Processed " + editsCount + " edits across "
585           + outputSink.getNumberOfRecoveredRegions() + " regions; log file=" + logPath
586           + " is corrupted = " + isCorrupted + " progress failed = " + progress_failed;
587       LOG.info(msg);
588       status.markComplete(msg);
589     }
590     return !progress_failed;
591   }
592 
593   /**
594    * Completes the work done by splitLogFile by archiving logs
595    * <p>
596    * It is invoked by SplitLogManager once it knows that one of the
597    * SplitLogWorkers have completed the splitLogFile() part. If the master
598    * crashes then this function might get called multiple times.
599    * <p>
600    * @param logfile
601    * @param conf
602    * @throws IOException
603    */
604   public static void finishSplitLogFile(String logfile, Configuration conf)
605       throws IOException {
606     Path rootdir = FSUtils.getRootDir(conf);
607     Path oldLogDir = new Path(rootdir, HConstants.HREGION_OLDLOGDIR_NAME);
608     finishSplitLogFile(rootdir, oldLogDir, logfile, conf);
609   }
610 
611   public static void finishSplitLogFile(Path rootdir, Path oldLogDir,
612       String logfile, Configuration conf) throws IOException {
613     List<Path> processedLogs = new ArrayList<Path>();
614     List<Path> corruptedLogs = new ArrayList<Path>();
615     FileSystem fs;
616     fs = rootdir.getFileSystem(conf);
617     Path logPath = null;
618     if (FSUtils.isStartingWithPath(rootdir, logfile)) {
619       logPath = new Path(logfile);
620     } else {
621       logPath = new Path(rootdir, logfile);
622     }
623     if (ZKSplitLog.isCorrupted(rootdir, logPath.getName(), fs)) {
624       corruptedLogs.add(logPath);
625     } else {
626       processedLogs.add(logPath);
627     }
628     archiveLogs(null, corruptedLogs, processedLogs, oldLogDir, fs, conf);
629     Path stagingDir = ZKSplitLog.getSplitLogDir(rootdir, logPath.getName());
630     fs.delete(stagingDir, true);
631   }
632 
633   /**
634    * Moves processed logs to a oldLogDir after successful processing Moves
635    * corrupted logs (any log that couldn't be successfully parsed to corruptDir
636    * (.corrupt) for later investigation
637    *
638    * @param corruptedLogs
639    * @param processedLogs
640    * @param oldLogDir
641    * @param fs
642    * @param conf
643    * @throws IOException
644    */
645   private static void archiveLogs(
646       final Path srcDir,
647       final List<Path> corruptedLogs,
648       final List<Path> processedLogs, final Path oldLogDir,
649       final FileSystem fs, final Configuration conf) throws IOException {
650     final Path corruptDir = new Path(FSUtils.getRootDir(conf), conf.get(
651         "hbase.regionserver.hlog.splitlog.corrupt.dir",  HConstants.CORRUPT_DIR_NAME));
652 
653     if (!fs.mkdirs(corruptDir)) {
654       LOG.info("Unable to mkdir " + corruptDir);
655     }
656     fs.mkdirs(oldLogDir);
657 
658     // this method can get restarted or called multiple times for archiving
659     // the same log files.
660     for (Path corrupted : corruptedLogs) {
661       Path p = new Path(corruptDir, corrupted.getName());
662       if (fs.exists(corrupted)) {
663         if (!fs.rename(corrupted, p)) {
664           LOG.warn("Unable to move corrupted log " + corrupted + " to " + p);
665         } else {
666           LOG.warn("Moved corrupted log " + corrupted + " to " + p);
667         }
668       }
669     }
670 
671     for (Path p : processedLogs) {
672       Path newPath = FSHLog.getHLogArchivePath(oldLogDir, p);
673       if (fs.exists(p)) {
674         if (!fs.rename(p, newPath)) {
675           LOG.warn("Unable to move  " + p + " to " + newPath);
676         } else {
677           LOG.debug("Archived processed log " + p + " to " + newPath);
678         }
679       }
680     }
681 
682     // distributed log splitting removes the srcDir (region's log dir) later
683     // when all the log files in that srcDir have been successfully processed
684     if (srcDir != null && !fs.delete(srcDir, true)) {
685       throw new IOException("Unable to delete src dir: " + srcDir);
686     }
687   }
688 
689   /**
690    * Path to a file under RECOVERED_EDITS_DIR directory of the region found in
691    * <code>logEntry</code> named for the sequenceid in the passed
692    * <code>logEntry</code>: e.g. /hbase/some_table/2323432434/recovered.edits/2332.
693    * This method also ensures existence of RECOVERED_EDITS_DIR under the region
694    * creating it if necessary.
695    * @param fs
696    * @param logEntry
697    * @param rootDir HBase root dir.
698    * @return Path to file into which to dump split log edits.
699    * @throws IOException
700    */
701   @SuppressWarnings("deprecation")
702   static Path getRegionSplitEditsPath(final FileSystem fs,
703       final Entry logEntry, final Path rootDir, boolean isCreate)
704   throws IOException {
705     Path tableDir = HTableDescriptor.getTableDir(rootDir, logEntry.getKey().getTablename());
706     String encodedRegionName = Bytes.toString(logEntry.getKey().getEncodedRegionName());
707     Path regiondir = HRegion.getRegionDir(tableDir, encodedRegionName);
708     Path dir = HLogUtil.getRegionDirRecoveredEditsDir(regiondir);
709 
710     if (!fs.exists(regiondir)) {
711       LOG.info("This region's directory doesn't exist: "
712           + regiondir.toString() + ". It is very likely that it was" +
713           " already split so it's safe to discard those edits.");
714       return null;
715     }
716     if (fs.exists(dir) && fs.isFile(dir)) {
717       Path tmp = new Path("/tmp");
718       if (!fs.exists(tmp)) {
719         fs.mkdirs(tmp);
720       }
721       tmp = new Path(tmp,
722         HConstants.RECOVERED_EDITS_DIR + "_" + encodedRegionName);
723       LOG.warn("Found existing old file: " + dir + ". It could be some "
724         + "leftover of an old installation. It should be a folder instead. "
725         + "So moving it to " + tmp);
726       if (!fs.rename(dir, tmp)) {
727         LOG.warn("Failed to sideline old file " + dir);
728       }
729     }
730 
731     if (isCreate && !fs.exists(dir)) {
732       if (!fs.mkdirs(dir)) LOG.warn("mkdir failed on " + dir);
733     }
734     // Append file name ends with RECOVERED_LOG_TMPFILE_SUFFIX to ensure
735     // region's replayRecoveredEdits will not delete it
736     String fileName = formatRecoveredEditsFileName(logEntry.getKey().getLogSeqNum());
737     fileName = getTmpRecoveredEditsFileName(fileName);
738     return new Path(dir, fileName);
739   }
740 
741   static String getTmpRecoveredEditsFileName(String fileName) {
742     return fileName + HLog.RECOVERED_LOG_TMPFILE_SUFFIX;
743   }
744 
745   /**
746    * Get the completed recovered edits file path, renaming it to be by last edit
747    * in the file from its first edit. Then we could use the name to skip
748    * recovered edits when doing {@link HRegion#replayRecoveredEditsIfAny}.
749    * @param srcPath
750    * @param maximumEditLogSeqNum
751    * @return dstPath take file's last edit log seq num as the name
752    */
753   static Path getCompletedRecoveredEditsFilePath(Path srcPath,
754       Long maximumEditLogSeqNum) {
755     String fileName = formatRecoveredEditsFileName(maximumEditLogSeqNum);
756     return new Path(srcPath.getParent(), fileName);
757   }
758 
759   static String formatRecoveredEditsFileName(final long seqid) {
760     return String.format("%019d", seqid);
761   }
762 
763   /**
764    * Parse a single hlog and put the edits in entryBuffers
765    *
766    * @param in the hlog reader
767    * @param path the path of the log file
768    * @param entryBuffers the buffer to hold the parsed edits
769    * @param fs the file system
770    * @param conf the configuration
771    * @param skipErrors indicator if CorruptedLogFileException should be thrown instead of IOException
772    * @throws IOException
773    * @throws CorruptedLogFileException if hlog is corrupted
774    */
775   private void parseHLog(final Reader in, Path path,
776 		EntryBuffers entryBuffers, final FileSystem fs,
777     final Configuration conf, boolean skipErrors)
778 	throws IOException, CorruptedLogFileException {
779     int editsCount = 0;
780     try {
781       Entry entry;
782       while ((entry = getNextLogLine(in, path, skipErrors)) != null) {
783         entryBuffers.appendEntry(entry);
784         editsCount++;
785       }
786     } catch (InterruptedException ie) {
787       IOException t = new InterruptedIOException();
788       t.initCause(ie);
789       throw t;
790     } finally {
791       LOG.debug("Pushed=" + editsCount + " entries from " + path);
792     }
793   }
794 
795   /**
796    * Create a new {@link Reader} for reading logs to split.
797    *
798    * @param fs
799    * @param file
800    * @param conf
801    * @return A new Reader instance
802    * @throws IOException
803    * @throws CorruptedLogFileException
804    */
805   protected Reader getReader(FileSystem fs, FileStatus file, Configuration conf,
806       boolean skipErrors, CancelableProgressable reporter)
807       throws IOException, CorruptedLogFileException {
808     Path path = file.getPath();
809     long length = file.getLen();
810     Reader in;
811 
812 
813     // Check for possibly empty file. With appends, currently Hadoop reports a
814     // zero length even if the file has been sync'd. Revisit if HDFS-376 or
815     // HDFS-878 is committed.
816     if (length <= 0) {
817       LOG.warn("File " + path + " might be still open, length is 0");
818     }
819 
820     try {
821       FSUtils.getInstance(fs, conf).recoverFileLease(fs, path, conf, reporter);
822       try {
823         in = getReader(fs, path, conf, reporter);
824       } catch (EOFException e) {
825         if (length <= 0) {
826           // TODO should we ignore an empty, not-last log file if skip.errors
827           // is false? Either way, the caller should decide what to do. E.g.
828           // ignore if this is the last log in sequence.
829           // TODO is this scenario still possible if the log has been
830           // recovered (i.e. closed)
831           LOG.warn("Could not open " + path + " for reading. File is empty", e);
832           return null;
833         } else {
834           // EOFException being ignored
835           return null;
836         }
837       }
838     } catch (IOException e) {
839       if (!skipErrors || e instanceof InterruptedIOException) {
840         throw e; // Don't mark the file corrupted if interrupted, or not skipErrors
841       }
842       CorruptedLogFileException t =
843         new CorruptedLogFileException("skipErrors=true Could not open hlog " +
844             path + " ignoring");
845       t.initCause(e);
846       throw t;
847     }
848     return in;
849   }
850 
851   static private Entry getNextLogLine(Reader in, Path path, boolean skipErrors)
852   throws CorruptedLogFileException, IOException {
853     try {
854       return in.next();
855     } catch (EOFException eof) {
856       // truncated files are expected if a RS crashes (see HBASE-2643)
857       LOG.info("EOF from hlog " + path + ".  continuing");
858       return null;
859     } catch (IOException e) {
860       // If the IOE resulted from bad file format,
861       // then this problem is idempotent and retrying won't help
862       if (e.getCause() != null &&
863           (e.getCause() instanceof ParseException ||
864            e.getCause() instanceof org.apache.hadoop.fs.ChecksumException)) {
865         LOG.warn("Parse exception " + e.getCause().toString() + " from hlog "
866            + path + ".  continuing");
867         return null;
868       }
869       if (!skipErrors) {
870         throw e;
871       }
872       CorruptedLogFileException t =
873         new CorruptedLogFileException("skipErrors=true Ignoring exception" +
874             " while parsing hlog " + path + ". Marking as corrupted");
875       t.initCause(e);
876       throw t;
877     }
878   }
879 
880 
881   private void writerThreadError(Throwable t) {
882     thrown.compareAndSet(null, t);
883   }
884 
885   /**
886    * Check for errors in the writer threads. If any is found, rethrow it.
887    */
888   private void checkForErrors() throws IOException {
889     Throwable thrown = this.thrown.get();
890     if (thrown == null) return;
891     if (thrown instanceof IOException) {
892       throw (IOException)thrown;
893     } else {
894       throw new RuntimeException(thrown);
895     }
896   }
897   /**
898    * Create a new {@link Writer} for writing log splits.
899    */
900   protected Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
901       throws IOException {
902     return HLogFactory.createWriter(fs, logfile, conf);
903   }
904 
905   /**
906    * Create a new {@link Reader} for reading logs to split.
907    */
908   protected Reader getReader(FileSystem fs, Path curLogFile,
909       Configuration conf, CancelableProgressable reporter) throws IOException {
910     return HLogFactory.createReader(fs, curLogFile, conf, reporter);
911   }
912 
913   /**
914    * Get current open writers
915    * @return
916    */
917   private int getNumOpenWriters() {
918     int result = 0;
919     if (this.outputSink != null) {
920       result += this.outputSink.getNumOpenWriters();
921     }
922     return result;
923   }
924 
925   /**
926    * Class which accumulates edits and separates them into a buffer per region
927    * while simultaneously accounting RAM usage. Blocks if the RAM usage crosses
928    * a predefined threshold.
929    *
930    * Writer threads then pull region-specific buffers from this class.
931    */
932   class EntryBuffers {
933     Map<byte[], RegionEntryBuffer> buffers =
934       new TreeMap<byte[], RegionEntryBuffer>(Bytes.BYTES_COMPARATOR);
935 
936     /* Track which regions are currently in the middle of writing. We don't allow
937        an IO thread to pick up bytes from a region if we're already writing
938        data for that region in a different IO thread. */
939     Set<byte[]> currentlyWriting = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
940 
941     long totalBuffered = 0;
942     long maxHeapUsage;
943 
944     EntryBuffers(long maxHeapUsage) {
945       this.maxHeapUsage = maxHeapUsage;
946     }
947 
948     /**
949      * Append a log entry into the corresponding region buffer.
950      * Blocks if the total heap usage has crossed the specified threshold.
951      *
952      * @throws InterruptedException
953      * @throws IOException
954      */
955     void appendEntry(Entry entry) throws InterruptedException, IOException {
956       HLogKey key = entry.getKey();
957 
958       RegionEntryBuffer buffer;
959       long incrHeap;
960       synchronized (this) {
961         buffer = buffers.get(key.getEncodedRegionName());
962         if (buffer == null) {
963           buffer = new RegionEntryBuffer(key.getTablename(), key.getEncodedRegionName());
964           buffers.put(key.getEncodedRegionName(), buffer);
965         }
966         incrHeap= buffer.appendEntry(entry);
967       }
968 
969       // If we crossed the chunk threshold, wait for more space to be available
970       synchronized (dataAvailable) {
971         totalBuffered += incrHeap;
972         while (totalBuffered > maxHeapUsage && thrown.get() == null) {
973           LOG.debug("Used " + totalBuffered + " bytes of buffered edits, waiting for IO threads...");
974           dataAvailable.wait(2000);
975         }
976         dataAvailable.notifyAll();
977       }
978       checkForErrors();
979     }
980 
981     /**
982      * @return RegionEntryBuffer a buffer of edits to be written or replayed.
983      */
984     synchronized RegionEntryBuffer getChunkToWrite() {
985       long biggestSize = 0;
986       byte[] biggestBufferKey = null;
987 
988       for (Map.Entry<byte[], RegionEntryBuffer> entry : buffers.entrySet()) {
989         long size = entry.getValue().heapSize();
990         if (size > biggestSize && (!currentlyWriting.contains(entry.getKey()))) {
991           biggestSize = size;
992           biggestBufferKey = entry.getKey();
993         }
994       }
995       if (biggestBufferKey == null) {
996         return null;
997       }
998 
999       RegionEntryBuffer buffer = buffers.remove(biggestBufferKey);
1000       currentlyWriting.add(biggestBufferKey);
1001       return buffer;
1002     }
1003 
1004     void doneWriting(RegionEntryBuffer buffer) {
1005       synchronized (this) {
1006         boolean removed = currentlyWriting.remove(buffer.encodedRegionName);
1007         assert removed;
1008       }
1009       long size = buffer.heapSize();
1010 
1011       synchronized (dataAvailable) {
1012         totalBuffered -= size;
1013         // We may unblock writers
1014         dataAvailable.notifyAll();
1015       }
1016     }
1017 
1018     synchronized boolean isRegionCurrentlyWriting(byte[] region) {
1019       return currentlyWriting.contains(region);
1020     }
1021   }
1022 
1023   /**
1024    * A buffer of some number of edits for a given region.
1025    * This accumulates edits and also provides a memory optimization in order to
1026    * share a single byte array instance for the table and region name.
1027    * Also tracks memory usage of the accumulated edits.
1028    */
1029   static class RegionEntryBuffer implements HeapSize {
1030     long heapInBuffer = 0;
1031     List<Entry> entryBuffer;
1032     byte[] tableName;
1033     byte[] encodedRegionName;
1034 
1035     RegionEntryBuffer(byte[] table, byte[] region) {
1036       this.tableName = table;
1037       this.encodedRegionName = region;
1038       this.entryBuffer = new LinkedList<Entry>();
1039     }
1040 
1041     long appendEntry(Entry entry) {
1042       internify(entry);
1043       entryBuffer.add(entry);
1044       long incrHeap = entry.getEdit().heapSize() +
1045         ClassSize.align(2 * ClassSize.REFERENCE) + // HLogKey pointers
1046         0; // TODO linkedlist entry
1047       heapInBuffer += incrHeap;
1048       return incrHeap;
1049     }
1050 
1051     private void internify(Entry entry) {
1052       HLogKey k = entry.getKey();
1053       k.internTableName(this.tableName);
1054       k.internEncodedRegionName(this.encodedRegionName);
1055     }
1056 
1057     public long heapSize() {
1058       return heapInBuffer;
1059     }
1060   }
1061 
1062 
1063   class WriterThread extends Thread {
1064     private volatile boolean shouldStop = false;
1065     private OutputSink outputSink = null;
1066 
1067     WriterThread(OutputSink sink, int i) {
1068       super("WriterThread-" + i);
1069       outputSink = sink;
1070     }
1071 
1072     public void run()  {
1073       try {
1074         doRun();
1075       } catch (Throwable t) {
1076         LOG.error("Error in log splitting write thread", t);
1077         writerThreadError(t);
1078       }
1079     }
1080 
1081     private void doRun() throws IOException {
1082       LOG.debug("Writer thread " + this + ": starting");
1083       while (true) {
1084         RegionEntryBuffer buffer = entryBuffers.getChunkToWrite();
1085         if (buffer == null) {
1086           // No data currently available, wait on some more to show up
1087           synchronized (dataAvailable) {
1088             if (shouldStop && !this.outputSink.flush()) {
1089               return;
1090             }
1091             try {
1092               dataAvailable.wait(500);
1093             } catch (InterruptedException ie) {
1094               if (!shouldStop) {
1095                 throw new RuntimeException(ie);
1096               }
1097             }
1098           }
1099           continue;
1100         }
1101 
1102         assert buffer != null;
1103         try {
1104           writeBuffer(buffer);
1105         } finally {
1106           entryBuffers.doneWriting(buffer);
1107         }
1108       }
1109     }
1110 
1111 
1112     private void writeBuffer(RegionEntryBuffer buffer) throws IOException {
1113       outputSink.append(buffer);
1114     }
1115 
1116     void finish() {
1117       synchronized (dataAvailable) {
1118         shouldStop = true;
1119         dataAvailable.notifyAll();
1120       }
1121     }
1122   }
1123 
1124   Path convertRegionEditsToTemp(Path rootdir, Path edits, String tmpname) {
1125     List<String> components = new ArrayList<String>(10);
1126     do {
1127       components.add(edits.getName());
1128       edits = edits.getParent();
1129     } while (edits.depth() > rootdir.depth());
1130     Path ret = ZKSplitLog.getSplitLogDir(rootdir, tmpname);
1131     for (int i = components.size() - 1; i >= 0; i--) {
1132       ret = new Path(ret, components.get(i));
1133     }
1134     try {
1135       if (fs.exists(ret)) {
1136         LOG.warn("Found existing old temporary edits file. It could be the "
1137             + "result of a previous failed split attempt. Deleting "
1138             + ret + ", length="
1139             + fs.getFileStatus(ret).getLen());
1140         if (!fs.delete(ret, false)) {
1141           LOG.warn("Failed delete of old " + ret);
1142         }
1143       }
1144       Path dir = ret.getParent();
1145       if (!fs.exists(dir)) {
1146         if (!fs.mkdirs(dir)) LOG.warn("mkdir failed on " + dir);
1147       }
1148     } catch (IOException e) {
1149       LOG.warn("Could not prepare temp staging area ", e);
1150       // ignore, exceptions will be thrown elsewhere
1151     }
1152     return ret;
1153   }
1154 
1155   /**
1156    * The following class is an abstraction class to provide a common interface to support both
1157    * existing recovered edits file sink and region server WAL edits replay sink
1158    */
1159    abstract class OutputSink {
1160 
1161     protected Map<byte[], SinkWriter> writers = Collections
1162         .synchronizedMap(new TreeMap<byte[], SinkWriter>(Bytes.BYTES_COMPARATOR));;
1163 
1164     protected final Map<byte[], Long> regionMaximumEditLogSeqNum = Collections
1165         .synchronizedMap(new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR));
1166 
1167     protected final List<WriterThread> writerThreads = Lists.newArrayList();
1168 
1169     /* Set of regions which we've decided should not output edits */
1170     protected final Set<byte[]> blacklistedRegions = Collections
1171         .synchronizedSet(new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR));
1172 
1173     protected boolean closeAndCleanCompleted = false;
1174 
1175     protected boolean writersClosed = false;
1176 
1177     protected final int numThreads;
1178 
1179     protected CancelableProgressable reporter = null;
1180 
1181     protected AtomicLong skippedEdits = new AtomicLong();
1182 
1183     public OutputSink(int numWriters) {
1184       numThreads = numWriters;
1185     }
1186 
1187     void setReporter(CancelableProgressable reporter) {
1188       this.reporter = reporter;
1189     }
1190 
1191     /**
1192      * Start the threads that will pump data from the entryBuffers to the output files.
1193      */
1194     synchronized void startWriterThreads() {
1195       for (int i = 0; i < numThreads; i++) {
1196         WriterThread t = new WriterThread(this, i);
1197         t.start();
1198         writerThreads.add(t);
1199       }
1200     }
1201 
1202     /**
1203      *
1204      * Update region's maximum edit log SeqNum.
1205      */
1206     void updateRegionMaximumEditLogSeqNum(Entry entry) {
1207       synchronized (regionMaximumEditLogSeqNum) {
1208         Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(entry.getKey()
1209             .getEncodedRegionName());
1210         if (currentMaxSeqNum == null || entry.getKey().getLogSeqNum() > currentMaxSeqNum) {
1211           regionMaximumEditLogSeqNum.put(entry.getKey().getEncodedRegionName(), entry.getKey()
1212               .getLogSeqNum());
1213         }
1214       }
1215     }
1216 
1217     Long getRegionMaximumEditLogSeqNum(byte[] region) {
1218       return regionMaximumEditLogSeqNum.get(region);
1219     }
1220 
1221     /**
1222      * @return the number of currently opened writers
1223      */
1224     int getNumOpenWriters() {
1225       return this.writers.size();
1226     }
1227 
1228     long getSkippedEdits() {
1229       return this.skippedEdits.get();
1230     }
1231 
1232     /**
1233      * Wait for writer threads to dump all info to the sink
1234      * @return true when there is no error
1235      * @throws IOException
1236      */
1237     protected boolean finishWriting() throws IOException {
1238       LOG.info("Waiting for split writer threads to finish");
1239       boolean progress_failed = false;
1240       for (WriterThread t : writerThreads) {
1241         t.finish();
1242       }
1243       for (WriterThread t : writerThreads) {
1244         if (!progress_failed && reporter != null && !reporter.progress()) {
1245           progress_failed = true;
1246         }
1247         try {
1248           t.join();
1249         } catch (InterruptedException ie) {
1250           IOException iie = new InterruptedIOException();
1251           iie.initCause(ie);
1252           throw iie;
1253         }
1254         checkForErrors();
1255       }
1256       LOG.info("Split writers finished");
1257       return (!progress_failed);
1258     }
1259 
1260     abstract List<Path> finishWritingAndClose() throws IOException;
1261 
1262     /**
1263      * @return a map from encoded region ID to the number of edits written out for that region.
1264      */
1265     abstract Map<byte[], Long> getOutputCounts();
1266 
1267     /**
1268      * @return number of regions we've recovered
1269      */
1270     abstract int getNumberOfRecoveredRegions();
1271 
1272     /**
1273      * @param entry A WAL Edit Entry
1274      * @throws IOException
1275      */
1276     abstract void append(RegionEntryBuffer buffer) throws IOException;
1277 
1278     /**
1279      * WriterThread call this function to help flush internal remaining edits in buffer before close
1280      * @return true when underlying sink has something to flush
1281      */
1282     protected boolean flush() throws IOException {
1283       return false;
1284     }
1285   }
1286 
1287   /**
1288    * Class that manages the output streams from the log splitting process.
1289    */
1290   class LogRecoveredEditsOutputSink extends OutputSink {
1291 
1292     public LogRecoveredEditsOutputSink(int numWriters) {
1293       // More threads could potentially write faster at the expense
1294       // of causing more disk seeks as the logs are split.
1295       // 3. After a certain setting (probably around 3) the
1296       // process will be bound on the reader in the current
1297       // implementation anyway.
1298       super(numWriters);
1299     }
1300 
1301     /**
1302      * @return null if failed to report progress
1303      * @throws IOException
1304      */
1305     @Override
1306     List<Path> finishWritingAndClose() throws IOException {
1307       boolean isSuccessful = false;
1308       List<Path> result = null;
1309       try {
1310         isSuccessful = finishWriting();
1311       } finally {
1312         result = close();
1313         List<IOException> thrown = closeLogWriters(null);
1314         if (thrown != null && !thrown.isEmpty()) {
1315           throw MultipleIOException.createIOException(thrown);
1316         }
1317       }
1318       return (isSuccessful) ? result : null;
1319     }
1320 
1321     /**
1322      * Close all of the output streams.
1323      * @return the list of paths written.
1324      */
1325     private List<Path> close() throws IOException {
1326       Preconditions.checkState(!closeAndCleanCompleted);
1327 
1328       final List<Path> paths = new ArrayList<Path>();
1329       final List<IOException> thrown = Lists.newArrayList();
1330       ThreadPoolExecutor closeThreadPool = Threads.getBoundedCachedThreadPool(numThreads, 30L,
1331         TimeUnit.SECONDS, new ThreadFactory() {
1332           private int count = 1;
1333 
1334           public Thread newThread(Runnable r) {
1335             Thread t = new Thread(r, "split-log-closeStream-" + count++);
1336             return t;
1337           }
1338         });
1339       CompletionService<Void> completionService = new ExecutorCompletionService<Void>(
1340           closeThreadPool);
1341       for (final Map.Entry<byte[], ? extends SinkWriter> writersEntry : writers.entrySet()) {
1342         completionService.submit(new Callable<Void>() {
1343           public Void call() throws Exception {
1344             WriterAndPath wap = (WriterAndPath) writersEntry.getValue();
1345             try {
1346               wap.w.close();
1347             } catch (IOException ioe) {
1348               LOG.error("Couldn't close log at " + wap.p, ioe);
1349               thrown.add(ioe);
1350               return null;
1351             }
1352             LOG.info("Closed path " + wap.p + " (wrote " + wap.editsWritten + " edits in "
1353                 + (wap.nanosSpent / 1000 / 1000) + "ms)");
1354 
1355             if (wap.editsWritten == 0) {
1356               // just remove the empty recovered.edits file
1357               if (fs.exists(wap.p) && !fs.delete(wap.p, false)) {
1358                 LOG.warn("Failed deleting empty " + wap.p);
1359                 throw new IOException("Failed deleting empty  " + wap.p);
1360               }
1361               return null;
1362             }
1363 
1364             Path dst = getCompletedRecoveredEditsFilePath(wap.p,
1365               regionMaximumEditLogSeqNum.get(writersEntry.getKey()));
1366             try {
1367               if (!dst.equals(wap.p) && fs.exists(dst)) {
1368                 LOG.warn("Found existing old edits file. It could be the "
1369                     + "result of a previous failed split attempt. Deleting " + dst + ", length="
1370                     + fs.getFileStatus(dst).getLen());
1371                 if (!fs.delete(dst, false)) {
1372                   LOG.warn("Failed deleting of old " + dst);
1373                   throw new IOException("Failed deleting of old " + dst);
1374                 }
1375               }
1376               // Skip the unit tests which create a splitter that reads and
1377               // writes the data without touching disk.
1378               // TestHLogSplit#testThreading is an example.
1379               if (fs.exists(wap.p)) {
1380                 if (!fs.rename(wap.p, dst)) {
1381                   throw new IOException("Failed renaming " + wap.p + " to " + dst);
1382                 }
1383                 LOG.debug("Rename " + wap.p + " to " + dst);
1384               }
1385             } catch (IOException ioe) {
1386               LOG.error("Couldn't rename " + wap.p + " to " + dst, ioe);
1387               thrown.add(ioe);
1388               return null;
1389             }
1390             paths.add(dst);
1391             return null;
1392           }
1393         });
1394       }
1395 
1396       boolean progress_failed = false;
1397       try {
1398         for (int i = 0, n = this.writers.size(); i < n; i++) {
1399           Future<Void> future = completionService.take();
1400           future.get();
1401           if (!progress_failed && reporter != null && !reporter.progress()) {
1402             progress_failed = true;
1403           }
1404         }
1405       } catch (InterruptedException e) {
1406         IOException iie = new InterruptedIOException();
1407         iie.initCause(e);
1408         throw iie;
1409       } catch (ExecutionException e) {
1410         throw new IOException(e.getCause());
1411       } finally {
1412         closeThreadPool.shutdownNow();
1413       }
1414 
1415       if (!thrown.isEmpty()) {
1416         throw MultipleIOException.createIOException(thrown);
1417       }
1418       writersClosed = true;
1419       closeAndCleanCompleted = true;
1420       if (progress_failed) {
1421         return null;
1422       }
1423       return paths;
1424     }
1425 
1426     private List<IOException> closeLogWriters(List<IOException> thrown) throws IOException {
1427       if (writersClosed) {
1428         return thrown;
1429       }
1430 
1431       if (thrown == null) {
1432         thrown = Lists.newArrayList();
1433       }
1434       try {
1435         for (WriterThread t : writerThreads) {
1436           while (t.isAlive()) {
1437             t.shouldStop = true;
1438             t.interrupt();
1439             try {
1440               t.join(10);
1441             } catch (InterruptedException e) {
1442               IOException iie = new InterruptedIOException();
1443               iie.initCause(e);
1444               throw iie;
1445             }
1446           }
1447         }
1448       } finally {
1449         synchronized (writers) {
1450           WriterAndPath wap = null;
1451           for (SinkWriter tmpWAP : writers.values()) {
1452             try {
1453               wap = (WriterAndPath) tmpWAP;
1454               wap.w.close();
1455             } catch (IOException ioe) {
1456               LOG.error("Couldn't close log at " + wap.p, ioe);
1457               thrown.add(ioe);
1458               continue;
1459             }
1460             LOG.info("Closed path " + wap.p + " (wrote " + wap.editsWritten + " edits in "
1461                 + (wap.nanosSpent / 1000 / 1000) + "ms)");
1462           }
1463         }
1464         writersClosed = true;
1465       }
1466 
1467       return thrown;
1468     }
1469 
1470     /**
1471      * Get a writer and path for a log starting at the given entry. This function is threadsafe so
1472      * long as multiple threads are always acting on different regions.
1473      * @return null if this region shouldn't output any logs
1474      */
1475     private WriterAndPath getWriterAndPath(Entry entry) throws IOException {
1476       byte region[] = entry.getKey().getEncodedRegionName();
1477       WriterAndPath ret = (WriterAndPath) writers.get(region);
1478       if (ret != null) {
1479         return ret;
1480       }
1481       // If we already decided that this region doesn't get any output
1482       // we don't need to check again.
1483       if (blacklistedRegions.contains(region)) {
1484         return null;
1485       }
1486       ret = createWAP(region, entry, rootDir, fs, conf);
1487       if (ret == null) {
1488         blacklistedRegions.add(region);
1489         return null;
1490       }
1491       writers.put(region, ret);
1492       return ret;
1493     }
1494 
1495     private WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir, FileSystem fs,
1496         Configuration conf) throws IOException {
1497       Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, true);
1498       if (regionedits == null) {
1499         return null;
1500       }
1501       if (fs.exists(regionedits)) {
1502         LOG.warn("Found old edits file. It could be the "
1503             + "result of a previous failed split attempt. Deleting " + regionedits + ", length="
1504             + fs.getFileStatus(regionedits).getLen());
1505         if (!fs.delete(regionedits, false)) {
1506           LOG.warn("Failed delete of old " + regionedits);
1507         }
1508       }
1509       Writer w = createWriter(fs, regionedits, conf);
1510       LOG.debug("Creating writer path=" + regionedits + " region=" + Bytes.toStringBinary(region));
1511       return (new WriterAndPath(regionedits, w));
1512     }
1513 
1514     void append(RegionEntryBuffer buffer) throws IOException {
1515       List<Entry> entries = buffer.entryBuffer;
1516       if (entries.isEmpty()) {
1517         LOG.warn("got an empty buffer, skipping");
1518         return;
1519       }
1520 
1521       WriterAndPath wap = null;
1522 
1523       long startTime = System.nanoTime();
1524       try {
1525         int editsCount = 0;
1526 
1527         for (Entry logEntry : entries) {
1528           if (wap == null) {
1529             wap = getWriterAndPath(logEntry);
1530             if (wap == null) {
1531               // getWriterAndPath decided we don't need to write these edits
1532               return;
1533             }
1534           }
1535           wap.w.append(logEntry);
1536           this.updateRegionMaximumEditLogSeqNum(logEntry);
1537           editsCount++;
1538         }
1539         // Pass along summary statistics
1540         wap.incrementEdits(editsCount);
1541         wap.incrementNanoTime(System.nanoTime() - startTime);
1542       } catch (IOException e) {
1543         e = RemoteExceptionHandler.checkIOException(e);
1544         LOG.fatal(" Got while writing log entry to log", e);
1545         throw e;
1546       }
1547     }
1548 
1549     /**
1550      * @return a map from encoded region ID to the number of edits written out for that region.
1551      */
1552     Map<byte[], Long> getOutputCounts() {
1553       TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
1554       synchronized (writers) {
1555         for (Map.Entry<byte[], ? extends SinkWriter> entry : writers.entrySet()) {
1556           ret.put(entry.getKey(), entry.getValue().editsWritten);
1557         }
1558       }
1559       return ret;
1560     }
1561 
1562     @Override
1563     int getNumberOfRecoveredRegions() {
1564       return writers.size();
1565     }
1566   }
1567 
1568   /**
1569    * Class wraps the actual writer which writes data out and related statistics
1570    */
1571   private abstract static class SinkWriter {
1572     /* Count of edits written to this path */
1573     long editsWritten = 0;
1574     /* Number of nanos spent writing to this log */
1575     long nanosSpent = 0;
1576 
1577     void incrementEdits(int edits) {
1578       editsWritten += edits;
1579     }
1580 
1581     void incrementNanoTime(long nanos) {
1582       nanosSpent += nanos;
1583     }
1584   }
1585 
1586   /**
1587    * Private data structure that wraps a Writer and its Path, also collecting statistics about the
1588    * data written to this output.
1589    */
1590   private final static class WriterAndPath extends SinkWriter {
1591     final Path p;
1592     final Writer w;
1593 
1594     WriterAndPath(final Path p, final Writer w) {
1595       this.p = p;
1596       this.w = w;
1597     }
1598   }
1599 
1600   /**
1601    * Class that manages to replay edits from WAL files directly to assigned fail over region servers
1602    */
1603   class LogReplayOutputSink extends OutputSink {
1604     private static final double BUFFER_THRESHOLD = 0.35;
1605     private static final String KEY_DELIMITER = "#";
1606 
1607     private long waitRegionOnlineTimeOut;
1608     private final Set<String> recoveredRegions = Collections.synchronizedSet(new HashSet<String>());
1609     private final Map<String, RegionServerWriter> writers = 
1610         new ConcurrentHashMap<String, RegionServerWriter>();
1611     // online encoded region name map
1612     private final Set<String> onlineRegions = Collections.synchronizedSet(new HashSet<String>());
1613 
1614     private Map<byte[], HConnection> tableNameToHConnectionMap = Collections
1615         .synchronizedMap(new TreeMap<byte[], HConnection>(Bytes.BYTES_COMPARATOR));
1616     /**
1617      * Map key -> value layout 
1618      * <servername>:<table name> -> Queue<Row>
1619      */
1620     private Map<String, List<Pair<HRegionLocation, Row>>> serverToBufferQueueMap = 
1621         new ConcurrentHashMap<String, List<Pair<HRegionLocation, Row>>>();
1622     private List<Throwable> thrown = new ArrayList<Throwable>();
1623 
1624     // The following sink is used in distrubitedLogReplay mode for entries of regions in a disabling
1625     // table. It's a limitation of distributedLogReplay. Because log replay needs a region is
1626     // assigned and online before it can replay wal edits while regions of disabling/disabled table
1627     // won't be assigned by AM. We can retire this code after HBASE-8234.
1628     private LogRecoveredEditsOutputSink logRecoveredEditsOutputSink;
1629     private boolean hasEditsInDisablingOrDisabledTables = false;
1630 
1631     public LogReplayOutputSink(int numWriters) {
1632       super(numWriters);
1633 
1634       this.waitRegionOnlineTimeOut = conf.getInt("hbase.splitlog.manager.timeout", 
1635         SplitLogManager.DEFAULT_TIMEOUT);
1636       this.logRecoveredEditsOutputSink = new LogRecoveredEditsOutputSink(numWriters);
1637       this.logRecoveredEditsOutputSink.setReporter(reporter);
1638     }
1639 
1640     void append(RegionEntryBuffer buffer) throws IOException {
1641       List<Entry> entries = buffer.entryBuffer;
1642       if (entries.isEmpty()) {
1643         LOG.warn("got an empty buffer, skipping");
1644         return;
1645       }
1646       
1647       // check if current region in a disabling or disabled table
1648       if (disablingOrDisabledTables.contains(Bytes.toString(buffer.tableName))) {
1649         // need fall back to old way
1650         logRecoveredEditsOutputSink.append(buffer);
1651         hasEditsInDisablingOrDisabledTables = true;
1652         // store regions we have recovered so far
1653         addToRecoveredRegions(Bytes.toString(buffer.encodedRegionName));
1654         return;
1655       }
1656 
1657       // group entries by region servers
1658       groupEditsByServer(entries);
1659 
1660       // process workitems
1661       String maxLocKey = null;
1662       int maxSize = 0;
1663       List<Pair<HRegionLocation, Row>> maxQueue = null;
1664       synchronized (this.serverToBufferQueueMap) {
1665         for (String key : this.serverToBufferQueueMap.keySet()) {
1666           List<Pair<HRegionLocation, Row>> curQueue = this.serverToBufferQueueMap.get(key);
1667           if (curQueue.size() > maxSize) {
1668             maxSize = curQueue.size();
1669             maxQueue = curQueue;
1670             maxLocKey = key;
1671           }
1672         }
1673         if (maxSize < minBatchSize
1674             && entryBuffers.totalBuffered < BUFFER_THRESHOLD * entryBuffers.maxHeapUsage) {
1675           // buffer more to process
1676           return;
1677         } else if (maxSize > 0) {
1678           this.serverToBufferQueueMap.remove(maxLocKey);
1679         }
1680       }
1681 
1682       if (maxSize > 0) {
1683         processWorkItems(maxLocKey, maxQueue);
1684       }
1685     }
1686 
1687     private void addToRecoveredRegions(String encodedRegionName) {
1688       if (!recoveredRegions.contains(encodedRegionName)) {
1689         recoveredRegions.add(encodedRegionName);
1690       }
1691     }
1692 
1693     /**
1694      * Helper function to group WALEntries to individual region servers
1695      * @throws IOException
1696      */
1697     private void groupEditsByServer(List<Entry> entries) throws IOException {
1698       Set<byte[]> nonExistentTables = null;
1699       Long cachedLastFlushedSequenceId = -1l;
1700       for (HLog.Entry entry : entries) {
1701         WALEdit edit = entry.getEdit();
1702         byte[] table = entry.getKey().getTablename();
1703         String encodeRegionNameStr = Bytes.toString(entry.getKey().getEncodedRegionName());
1704         // skip edits of non-existent tables
1705         if (nonExistentTables != null && nonExistentTables.contains(table)) {
1706           this.skippedEdits.incrementAndGet();
1707           continue;
1708         }
1709         boolean needSkip = false;
1710         Put put = null;
1711         Delete del = null;
1712         KeyValue lastKV = null;
1713         HRegionLocation loc = null;
1714         Row preRow = null;
1715         HRegionLocation preLoc = null;
1716         Row lastAddedRow = null; // it is not really needed here just be conservative
1717         String preKey = null;
1718         List<KeyValue> kvs = edit.getKeyValues();
1719         HConnection hconn = this.getConnectionByTableName(table);
1720 
1721         for (KeyValue kv : kvs) {
1722           // filtering HLog meta entries
1723           // We don't handle HBASE-2231 because we may or may not replay a compaction event.
1724           // Details at https://issues.apache.org/jira/browse/HBASE-2231?focusedCommentId=13647143&
1725           // page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13647143
1726           if (kv.matchingFamily(WALEdit.METAFAMILY)) continue;
1727 
1728           if (lastKV == null || lastKV.getType() != kv.getType() || !lastKV.matchingRow(kv)) {
1729             if (preRow != null) {
1730               synchronized (serverToBufferQueueMap) {
1731                 List<Pair<HRegionLocation, Row>> queue = serverToBufferQueueMap.get(preKey);
1732                 if (queue == null) {
1733                   queue = Collections.synchronizedList(new ArrayList<Pair<HRegionLocation, Row>>());
1734                   serverToBufferQueueMap.put(preKey, queue);
1735                 }
1736                 queue.add(new Pair<HRegionLocation, Row>(preLoc, preRow));
1737                 lastAddedRow = preRow;
1738               }
1739               // store regions we have recovered so far
1740               addToRecoveredRegions(preLoc.getRegionInfo().getEncodedName());
1741             }
1742 
1743             try {
1744               loc = locateRegionAndRefreshLastFlushedSequenceId(hconn, table, kv.getRow());
1745             } catch (TableNotFoundException ex) {
1746               // table has been deleted so skip edits of the table
1747               LOG.info("Table " + Bytes.toString(table)
1748                   + " doesn't exist. Skip log replay for region " + encodeRegionNameStr);
1749               lastFlushedSequenceIds.put(encodeRegionNameStr, Long.MAX_VALUE);
1750               if (nonExistentTables == null) {
1751                 nonExistentTables = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
1752               }
1753               nonExistentTables.add(table);
1754               this.skippedEdits.incrementAndGet();
1755               needSkip = true;
1756               break;
1757             }
1758             cachedLastFlushedSequenceId = lastFlushedSequenceIds.get(loc.getRegionInfo()
1759                 .getEncodedName());
1760             if (cachedLastFlushedSequenceId != null
1761                 && cachedLastFlushedSequenceId >= entry.getKey().getLogSeqNum()) {
1762               // skip the whole HLog entry
1763               this.skippedEdits.incrementAndGet();
1764               needSkip = true;
1765               break;
1766             }
1767             
1768             if (kv.isDelete()) {
1769               del = new Delete(kv.getRow());
1770               del.setClusterId(entry.getKey().getClusterId());
1771               preRow = del;
1772             } else {
1773               put = new Put(kv.getRow());
1774               put.setClusterId(entry.getKey().getClusterId());
1775               preRow = put;
1776             }
1777             preKey = loc.getHostnamePort() + KEY_DELIMITER + Bytes.toString(table);
1778             preLoc = loc;
1779           }
1780           if (kv.isDelete()) {
1781             del.addDeleteMarker(kv);
1782           } else {
1783             put.add(kv);
1784           }
1785           lastKV = kv;
1786         }
1787 
1788         // skip the edit
1789         if(needSkip) continue;
1790         
1791         // add the last row
1792         if (preRow != null && lastAddedRow != preRow) {
1793           synchronized (serverToBufferQueueMap) {
1794             List<Pair<HRegionLocation, Row>> queue = serverToBufferQueueMap.get(preKey);
1795             if (queue == null) {
1796               queue = Collections.synchronizedList(new ArrayList<Pair<HRegionLocation, Row>>());
1797               serverToBufferQueueMap.put(preKey, queue);
1798             }
1799             queue.add(new Pair<HRegionLocation, Row>(preLoc, preRow));
1800           }
1801           // store regions we have recovered so far
1802           addToRecoveredRegions(preLoc.getRegionInfo().getEncodedName());
1803         }
1804       }
1805     }
1806 
1807     /**
1808      * Locate destination region based on table name & row. This function also makes sure the
1809      * destination region is online for replay.
1810      * @throws IOException
1811      */
1812     private HRegionLocation locateRegionAndRefreshLastFlushedSequenceId(HConnection hconn,
1813         byte[] table, byte[] row) throws IOException {
1814       HRegionLocation loc = hconn.getRegionLocation(table, row, false);
1815       if (loc == null) {
1816         throw new IOException("Can't locate location for row:" + Bytes.toString(row)
1817             + " of table:" + Bytes.toString(table));
1818       }
1819       if (onlineRegions.contains(loc.getRegionInfo().getEncodedName())) {
1820         return loc;
1821       }
1822 
1823       Long lastFlushedSequenceId = -1l;
1824       loc = waitUntilRegionOnline(loc, row, this.waitRegionOnlineTimeOut);
1825       Long cachedLastFlushedSequenceId = lastFlushedSequenceIds.get(loc.getRegionInfo()
1826           .getEncodedName());
1827 
1828       onlineRegions.add(loc.getRegionInfo().getEncodedName());
1829       // retrieve last flushed sequence Id from ZK. Because region postOpenDeployTasks will
1830       // update the value for the region
1831       lastFlushedSequenceId = SplitLogManager.getLastFlushedSequenceId(watcher, loc
1832           .getServerName().getServerName(), loc.getRegionInfo().getEncodedName());
1833       if (cachedLastFlushedSequenceId == null
1834           || lastFlushedSequenceId > cachedLastFlushedSequenceId) {
1835         lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), lastFlushedSequenceId);
1836       } else if (loc.getRegionInfo().isRecovering() == false) {
1837         // region isn't in recovering at all because WAL file may contain a region that has
1838         // been moved to somewhere before hosting RS fails
1839         lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), Long.MAX_VALUE);
1840         LOG.info("logReplay skip region: " + loc.getRegionInfo().getEncodedName()
1841             + " because it's not in recovering.");
1842       }
1843       
1844       return loc;
1845     }
1846 
1847     private void processWorkItems(String key, List<Pair<HRegionLocation, Row>> actions)
1848         throws IOException {
1849       RegionServerWriter rsw = null;
1850 
1851       long startTime = System.nanoTime();
1852       try {
1853         rsw = getRegionServerWriter(key);
1854         rsw.sink.replayEntries(actions);
1855 
1856         // Pass along summary statistics
1857         rsw.incrementEdits(actions.size());
1858         rsw.incrementNanoTime(System.nanoTime() - startTime);
1859       } catch (IOException e) {
1860         e = RemoteExceptionHandler.checkIOException(e);
1861         LOG.fatal(" Got while writing log entry to log", e);
1862         throw e;
1863       }
1864     }
1865 
1866     /**
1867      * Wait until region is online on the destination region server
1868      * @param loc
1869      * @param row
1870      * @param timeout How long to wait
1871      * @return True when region is online on the destination region server
1872      * @throws InterruptedException
1873      */
1874     private HRegionLocation waitUntilRegionOnline(HRegionLocation loc, byte[] row,
1875         final long timeout)
1876         throws IOException { 
1877       final long endTime = EnvironmentEdgeManager.currentTimeMillis() + timeout;
1878       final long pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
1879         HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
1880       boolean reloadLocation = false;
1881       byte[] tableName = loc.getRegionInfo().getTableName();
1882       int tries = 0;
1883       Throwable cause = null;
1884       while (endTime > EnvironmentEdgeManager.currentTimeMillis()) {
1885         try {
1886           // Try and get regioninfo from the hosting server.
1887           HConnection hconn = getConnectionByTableName(tableName);
1888           if(reloadLocation) {
1889             loc = hconn.getRegionLocation(tableName, row, true);
1890           }
1891           BlockingInterface remoteSvr = hconn.getAdmin(loc.getServerName());
1892           HRegionInfo region = loc.getRegionInfo();
1893           if((region =ProtobufUtil.getRegionInfo(remoteSvr, region.getRegionName())) != null) {
1894             loc.getRegionInfo().setRecovering(region.isRecovering());
1895             return loc;
1896           }
1897         } catch (IOException e) {
1898           cause = e.getCause();
1899           if(!(cause instanceof RegionOpeningException)) {
1900             reloadLocation = true;
1901           }
1902         }
1903         long expectedSleep = ConnectionUtils.getPauseTime(pause, tries);
1904         try {
1905           Thread.sleep(expectedSleep);
1906         } catch (InterruptedException e) {
1907           Thread.currentThread().interrupt();
1908           throw new IOException("Interrupted when waiting regon " + 
1909               loc.getRegionInfo().getEncodedName() + " online.", e);
1910         }
1911         tries++;
1912       }
1913       
1914       throw new IOException("Timeout when waiting region " + loc.getRegionInfo().getEncodedName() +
1915         " online for " + timeout + " milliseconds.", cause);
1916     }
1917 
1918     @Override
1919     protected boolean flush() throws IOException {
1920       String curLoc = null;
1921       int curSize = 0;
1922       List<Pair<HRegionLocation, Row>> curQueue = null;
1923       synchronized (this.serverToBufferQueueMap) {
1924         for (String locationKey : this.serverToBufferQueueMap.keySet()) {
1925           curQueue = this.serverToBufferQueueMap.get(locationKey);
1926           if (!curQueue.isEmpty()) {
1927             curSize = curQueue.size();
1928             curLoc = locationKey;
1929             break;
1930           }
1931         }
1932         if (curSize > 0) {
1933           this.serverToBufferQueueMap.remove(curLoc);
1934         }
1935       }
1936 
1937       if (curSize > 0) {
1938         this.processWorkItems(curLoc, curQueue);
1939         dataAvailable.notifyAll();
1940         return true;
1941       }
1942       return false;
1943     }
1944 
1945     void addWriterError(Throwable t) {
1946       thrown.add(t);
1947     }
1948 
1949     @Override
1950     List<Path> finishWritingAndClose() throws IOException {
1951       List<Path> result = new ArrayList<Path>();
1952       try {
1953         if (!finishWriting()) {
1954           return null;
1955         }
1956         if (hasEditsInDisablingOrDisabledTables) {
1957           result = logRecoveredEditsOutputSink.finishWritingAndClose();
1958         }
1959         // returns an empty array in order to keep interface same as old way
1960         return result;
1961       } finally {
1962         List<IOException> thrown = closeRegionServerWriters();
1963         if (thrown != null && !thrown.isEmpty()) {
1964           throw MultipleIOException.createIOException(thrown);
1965         }
1966       }
1967     }
1968 
1969     @Override
1970     int getNumOpenWriters() {
1971       return this.writers.size() + this.logRecoveredEditsOutputSink.getNumOpenWriters();
1972     }
1973 
1974     private List<IOException> closeRegionServerWriters() throws IOException {
1975       List<IOException> result = null;
1976       if (!writersClosed) {
1977         result = Lists.newArrayList();
1978         try {
1979           for (WriterThread t : writerThreads) {
1980             while (t.isAlive()) {
1981               t.shouldStop = true;
1982               t.interrupt();
1983               try {
1984                 t.join(10);
1985               } catch (InterruptedException e) {
1986                 IOException iie = new InterruptedIOException();
1987                 iie.initCause(e);
1988                 throw iie;
1989               }
1990             }
1991           }
1992         } finally {
1993           synchronized (writers) {
1994             for (String locationKey : writers.keySet()) {
1995               RegionServerWriter tmpW = writers.get(locationKey);
1996               try {
1997                 tmpW.close();
1998               } catch (IOException ioe) {
1999                 LOG.error("Couldn't close writer for region server:" + locationKey, ioe);
2000                 result.add(ioe);
2001               }
2002             }
2003           }
2004 
2005           // close connections
2006           synchronized (this.tableNameToHConnectionMap) {
2007             for (byte[] tableName : this.tableNameToHConnectionMap.keySet()) {
2008               HConnection hconn = this.tableNameToHConnectionMap.get(tableName);
2009               try {
2010                 hconn.close();
2011               } catch (IOException ioe) {
2012                 result.add(ioe);
2013               }
2014             }
2015           }
2016           writersClosed = true;
2017         }
2018       }
2019       return result;
2020     }
2021 
2022     Map<byte[], Long> getOutputCounts() {
2023       TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
2024       synchronized (writers) {
2025         for (Map.Entry<String, RegionServerWriter> entry : writers.entrySet()) {
2026           ret.put(Bytes.toBytes(entry.getKey()), entry.getValue().editsWritten);
2027         }
2028       }
2029       return ret;
2030     }
2031 
2032     @Override
2033     int getNumberOfRecoveredRegions() {
2034       return this.recoveredRegions.size();
2035     }
2036 
2037     /**
2038      * Get a writer and path for a log starting at the given entry. This function is threadsafe so
2039      * long as multiple threads are always acting on different regions.
2040      * @return null if this region shouldn't output any logs
2041      */
2042     private RegionServerWriter getRegionServerWriter(String loc) throws IOException {
2043       RegionServerWriter ret = writers.get(loc);
2044       if (ret != null) {
2045         return ret;
2046       }
2047 
2048       String tableName = getTableFromLocationStr(loc);
2049       if(tableName.isEmpty()){
2050         LOG.warn("Invalid location string:" + loc + " found.");
2051       }
2052 
2053       HConnection hconn = getConnectionByTableName(Bytes.toBytes(tableName));
2054       synchronized (writers) {
2055         ret = writers.get(loc);
2056         if (ret == null) {
2057           ret = new RegionServerWriter(conf, Bytes.toBytes(tableName), hconn);
2058           writers.put(loc, ret);
2059         }
2060       }
2061       return ret;
2062     }
2063 
2064     private HConnection getConnectionByTableName(final byte[] tableName) throws IOException {
2065       HConnection hconn = this.tableNameToHConnectionMap.get(tableName);
2066       if (hconn == null) {
2067         synchronized (this.tableNameToHConnectionMap) {
2068           hconn = this.tableNameToHConnectionMap.get(tableName);
2069           if (hconn == null) {
2070             hconn =  HConnectionManager.createConnection(conf);
2071             this.tableNameToHConnectionMap.put(tableName, hconn);
2072           }
2073         }
2074       }
2075       return hconn;
2076     }
2077     
2078     private String getTableFromLocationStr(String loc) {
2079       /**
2080        * location key is in format <server name:port>#<table name>
2081        */
2082       String[] splits = loc.split(KEY_DELIMITER);
2083       if (splits.length != 2) {
2084         return "";
2085       }
2086       return splits[1];
2087     }
2088   }
2089 
2090   /**
2091    * Private data structure that wraps a receiving RS and collecting statistics about the data
2092    * written to this newly assigned RS.
2093    */
2094   private final static class RegionServerWriter extends SinkWriter {
2095     final WALEditsReplaySink sink;
2096 
2097     RegionServerWriter(final Configuration conf, final byte[] tableName, final HConnection conn)
2098         throws IOException {
2099       this.sink = new WALEditsReplaySink(conf, tableName, conn);
2100     }
2101 
2102     void close() throws IOException {
2103     }
2104   }
2105 
2106   static class CorruptedLogFileException extends Exception {
2107     private static final long serialVersionUID = 1L;
2108 
2109     CorruptedLogFileException(String s) {
2110       super(s);
2111     }
2112   }
2113 }