View Javadoc

1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.regionserver.wal;
21  
22  import java.io.EOFException;
23  import java.io.IOException;
24  import java.io.InterruptedIOException;
25  import java.lang.reflect.Constructor;
26  import java.lang.reflect.InvocationTargetException;
27  import java.text.ParseException;
28  import java.util.ArrayList;
29  import java.util.Collections;
30  import java.util.LinkedList;
31  import java.util.List;
32  import java.util.Map;
33  import java.util.Set;
34  import java.util.TreeMap;
35  import java.util.TreeSet;
36  import java.util.concurrent.atomic.AtomicReference;
37  import java.util.concurrent.CountDownLatch;
38  
39  import org.apache.commons.logging.Log;
40  import org.apache.commons.logging.LogFactory;
41  import org.apache.hadoop.conf.Configuration;
42  import org.apache.hadoop.fs.FileStatus;
43  import org.apache.hadoop.fs.FileSystem;
44  import org.apache.hadoop.fs.Path;
45  import org.apache.hadoop.hbase.HBaseFileSystem;
46  import org.apache.hadoop.hbase.HConstants;
47  import org.apache.hadoop.hbase.HTableDescriptor;
48  import org.apache.hadoop.hbase.RemoteExceptionHandler;
49  import org.apache.hadoop.hbase.io.HeapSize;
50  import org.apache.hadoop.hbase.monitoring.MonitoredTask;
51  import org.apache.hadoop.hbase.monitoring.TaskMonitor;
52  import org.apache.hadoop.hbase.regionserver.HRegion;
53  import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
54  import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
55  import org.apache.hadoop.hbase.regionserver.wal.HLog.Writer;
56  import org.apache.hadoop.hbase.util.Bytes;
57  import org.apache.hadoop.hbase.util.CancelableProgressable;
58  import org.apache.hadoop.hbase.util.ClassSize;
59  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
60  import org.apache.hadoop.hbase.util.FSUtils;
61  import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
62  import org.apache.hadoop.io.MultipleIOException;
63  
64  import com.google.common.base.Preconditions;
65  import com.google.common.collect.Lists;
66  
67  /**
68   * This class is responsible for splitting up a bunch of regionserver commit log
69   * files that are no longer being written to, into new files, one per region for
70   * region to replay on startup. Delete the old log files when finished.
71   */
72  public class HLogSplitter {
73    private static final String LOG_SPLITTER_IMPL = "hbase.hlog.splitter.impl";
74  
75    /**
76     * Name of file that holds recovered edits written by the wal log splitting
77     * code, one per region
78     */
79    public static final String RECOVERED_EDITS = "recovered.edits";
80  
81  
82    static final Log LOG = LogFactory.getLog(HLogSplitter.class);
83  
84    private boolean hasSplit = false;
85    private long splitTime = 0;
86    private long splitSize = 0;
87  
88  
89    // Parameters for split process
90    protected final Path rootDir;
91    protected final Path srcDir;
92    protected final Path oldLogDir;
93    protected final FileSystem fs;
94    protected final Configuration conf;
95    private final HLogFileSystem hlogFs;
96  
97    // Major subcomponents of the split process.
98    // These are separated into inner classes to make testing easier.
99    OutputSink outputSink;
100   EntryBuffers entryBuffers;
101 
102   // If an exception is thrown by one of the other threads, it will be
103   // stored here.
104   protected AtomicReference<Throwable> thrown = new AtomicReference<Throwable>();
105 
106   // Wait/notify for when data has been produced by the reader thread,
107   // consumed by the reader thread, or an exception occurred
108   Object dataAvailable = new Object();
109   
110   private MonitoredTask status;
111 
112 
113   /**
114    * Create a new HLogSplitter using the given {@link Configuration} and the
115    * <code>hbase.hlog.splitter.impl</code> property to derived the instance
116    * class to use.
117    * <p>
118    * @param conf
119    * @param rootDir hbase directory
120    * @param srcDir logs directory
121    * @param oldLogDir directory where processed logs are archived to
122    * @param fs FileSystem
123    * @return New HLogSplitter instance
124    */
125   public static HLogSplitter createLogSplitter(Configuration conf,
126       final Path rootDir, final Path srcDir,
127       Path oldLogDir, final FileSystem fs)  {
128 
129     @SuppressWarnings("unchecked")
130     Class<? extends HLogSplitter> splitterClass = (Class<? extends HLogSplitter>) conf
131         .getClass(LOG_SPLITTER_IMPL, HLogSplitter.class);
132     try {
133        Constructor<? extends HLogSplitter> constructor =
134          splitterClass.getConstructor(
135           Configuration.class, // conf
136           Path.class, // rootDir
137           Path.class, // srcDir
138           Path.class, // oldLogDir
139           FileSystem.class); // fs
140       return constructor.newInstance(conf, rootDir, srcDir, oldLogDir, fs);
141     } catch (IllegalArgumentException e) {
142       throw new RuntimeException(e);
143     } catch (InstantiationException e) {
144       throw new RuntimeException(e);
145     } catch (IllegalAccessException e) {
146       throw new RuntimeException(e);
147     } catch (InvocationTargetException e) {
148       throw new RuntimeException(e);
149     } catch (SecurityException e) {
150       throw new RuntimeException(e);
151     } catch (NoSuchMethodException e) {
152       throw new RuntimeException(e);
153     }
154   }
155 
156   public HLogSplitter(Configuration conf, Path rootDir, Path srcDir,
157       Path oldLogDir, FileSystem fs) {
158     this.conf = conf;
159     this.rootDir = rootDir;
160     this.srcDir = srcDir;
161     this.oldLogDir = oldLogDir;
162     this.fs = fs;
163 
164     entryBuffers = new EntryBuffers(
165         conf.getInt("hbase.regionserver.hlog.splitlog.buffersize",
166             128*1024*1024));
167     outputSink = new OutputSink();
168     this.hlogFs = new HLogFileSystem(conf);
169   }
170 
171   /**
172    * Split up a bunch of regionserver commit log files that are no longer being
173    * written to, into new files, one per region for region to replay on startup.
174    * Delete the old log files when finished.
175    *
176    * @throws IOException will throw if corrupted hlogs aren't tolerated
177    * @return the list of splits
178    */
179   public List<Path> splitLog()
180       throws IOException {
181     return splitLog((CountDownLatch) null);
182   }
183   
184   /**
185    * Split up a bunch of regionserver commit log files that are no longer being
186    * written to, into new files, one per region for region to replay on startup.
187    * Delete the old log files when finished.
188    *
189    * @param latch
190    * @throws IOException will throw if corrupted hlogs aren't tolerated
191    * @return the list of splits
192    */
193   public List<Path> splitLog(CountDownLatch latch)
194       throws IOException {
195     Preconditions.checkState(!hasSplit,
196         "An HLogSplitter instance may only be used once");
197     hasSplit = true;
198 
199     status = TaskMonitor.get().createStatus(
200         "Splitting logs in " + srcDir);
201     
202     long startTime = EnvironmentEdgeManager.currentTimeMillis();
203     
204     status.setStatus("Determining files to split...");
205     List<Path> splits = null;
206     if (!fs.exists(srcDir)) {
207       // Nothing to do
208       status.markComplete("No log directory existed to split.");
209       return splits;
210     }
211     FileStatus[] logfiles = fs.listStatus(srcDir);
212     if (logfiles == null || logfiles.length == 0) {
213       // Nothing to do
214       return splits;
215     }
216     logAndReport("Splitting " + logfiles.length + " hlog(s) in "
217     + srcDir.toString());
218     splits = splitLog(logfiles, latch);
219 
220     splitTime = EnvironmentEdgeManager.currentTimeMillis() - startTime;
221     String msg = "hlog file splitting completed in " + splitTime +
222         " ms for " + srcDir.toString();
223     status.markComplete(msg);
224     LOG.info(msg);
225     return splits;
226   }
227   
228   private void logAndReport(String msg) {
229     status.setStatus(msg);
230     LOG.info(msg);
231   }
232 
233   /**
234    * @return time that this split took
235    */
236   public long getTime() {
237     return this.splitTime;
238   }
239 
240   /**
241    * @return aggregate size of hlogs that were split
242    */
243   public long getSize() {
244     return this.splitSize;
245   }
246 
247   /**
248    * @return a map from encoded region ID to the number of edits written out
249    * for that region.
250    */
251   Map<byte[], Long> getOutputCounts() {
252     Preconditions.checkState(hasSplit);
253     return outputSink.getOutputCounts();
254   }
255 
256   /**
257    * Splits the HLog edits in the given list of logfiles (that are a mix of edits
258    * on multiple regions) by region and then splits them per region directories,
259    * in batches of (hbase.hlog.split.batch.size)
260    * <p>
261    * This process is split into multiple threads. In the main thread, we loop
262    * through the logs to be split. For each log, we:
263    * <ul>
264    *   <li> Recover it (take and drop HDFS lease) to ensure no other process can write</li>
265    *   <li> Read each edit (see {@link #parseHLog}</li>
266    *   <li> Mark as "processed" or "corrupt" depending on outcome</li>
267    * </ul>
268    * <p>
269    * Each edit is passed into the EntryBuffers instance, which takes care of
270    * memory accounting and splitting the edits by region.
271    * <p>
272    * The OutputSink object then manages N other WriterThreads which pull chunks
273    * of edits from EntryBuffers and write them to the output region directories.
274    * <p>
275    * After the process is complete, the log files are archived to a separate
276    * directory.
277    */
278   private List<Path> splitLog(final FileStatus[] logfiles, CountDownLatch latch)
279       throws IOException {
280     List<Path> processedLogs = new ArrayList<Path>();
281     List<Path> corruptedLogs = new ArrayList<Path>();
282     List<Path> splits = null;
283 
284     boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors", true);
285 
286     countTotalBytes(logfiles);
287     splitSize = 0;
288 
289     outputSink.startWriterThreads(entryBuffers);
290 
291     try {
292       int i = 0;
293       for (FileStatus log : logfiles) {
294        Path logPath = log.getPath();
295         long logLength = log.getLen();
296         splitSize += logLength;
297         logAndReport("Splitting hlog " + (i++ + 1) + " of " + logfiles.length
298             + ": " + logPath + ", length=" + logLength);
299         Reader in;
300         try {
301           //actually, for meta-only hlogs, we don't need to go thru the process
302           //of parsing and segregating by regions since all the logs are for
303           //meta only. However, there is a sequence number that can be obtained
304           //only by parsing.. so we parse for all files currently
305           //TODO: optimize this part somehow
306           in = getReader(fs, log, conf, skipErrors);
307           if (in != null) {
308             parseHLog(in, logPath, entryBuffers, fs, conf, skipErrors);
309             try {
310               in.close();
311             } catch (IOException e) {
312               LOG.warn("Close log reader threw exception -- continuing",
313                   e);
314             }
315           }
316           processedLogs.add(logPath);
317         } catch (CorruptedLogFileException e) {
318           LOG.info("Got while parsing hlog " + logPath +
319               ". Marking as corrupted", e);
320           corruptedLogs.add(logPath);
321           continue;
322         }
323       }
324       status.setStatus("Log splits complete. Checking for orphaned logs.");
325       
326       if (latch != null) {
327         try {
328           latch.await();
329         } catch (InterruptedException ie) {
330           LOG.warn("wait for latch interrupted");
331           Thread.currentThread().interrupt();
332         }
333       }
334       FileStatus[] currFiles = fs.listStatus(srcDir);
335       if (currFiles.length > processedLogs.size()
336           + corruptedLogs.size()) {
337         throw new OrphanHLogAfterSplitException(
338           "Discovered orphan hlog after split. Maybe the "
339             + "HRegionServer was not dead when we started");
340       }
341     } finally {
342       status.setStatus("Finishing writing output logs and closing down.");
343       splits = outputSink.finishWritingAndClose();
344     }
345     status.setStatus("Archiving logs after completed split");
346     archiveLogs(srcDir, corruptedLogs, processedLogs, oldLogDir, fs, conf);
347     return splits;
348   }
349 
350   /**
351    * @return the total size of the passed list of files.
352    */
353   private static long countTotalBytes(FileStatus[] logfiles) {
354     long ret = 0;
355     for (FileStatus stat : logfiles) {
356       ret += stat.getLen();
357     }
358     return ret;
359   }
360 
361   /**
362    * Splits a HLog file into region's recovered-edits directory
363    * <p>
364    * If the log file has N regions then N recovered.edits files will be
365    * produced. There is no buffering in this code. Instead it relies on the
366    * buffering in the SequenceFileWriter.
367    * <p>
368    * @param rootDir
369    * @param logfile
370    * @param fs
371    * @param conf
372    * @param reporter
373    * @return false if it is interrupted by the progress-able.
374    * @throws IOException
375    */
376   static public boolean splitLogFile(Path rootDir, FileStatus logfile,
377       FileSystem fs, Configuration conf, CancelableProgressable reporter)
378       throws IOException {
379     HLogSplitter s = new HLogSplitter(conf, rootDir, null, null /* oldLogDir */,
380         fs);
381     return s.splitLogFile(logfile, reporter);
382   }
383 
384   public boolean splitLogFile(FileStatus logfile,
385       CancelableProgressable reporter) throws IOException {
386     final Map<byte[], Object> logWriters = Collections.
387     synchronizedMap(new TreeMap<byte[], Object>(Bytes.BYTES_COMPARATOR));
388     boolean isCorrupted = false;
389     
390     Preconditions.checkState(status == null);
391     status = TaskMonitor.get().createStatus(
392         "Splitting log file " + logfile.getPath() +
393         "into a temporary staging area.");
394 
395     Object BAD_WRITER = new Object();
396 
397     boolean progress_failed = false;
398 
399     boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors",
400         HLog.SPLIT_SKIP_ERRORS_DEFAULT);
401     int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024);
402     // How often to send a progress report (default 1/2 the zookeeper session
403     // timeout of if that not set, the split log DEFAULT_TIMEOUT)
404     int period = conf.getInt("hbase.splitlog.report.period",
405       conf.getInt("hbase.splitlog.manager.timeout", ZKSplitLog.DEFAULT_TIMEOUT) / 2);
406     int numOpenedFilesBeforeReporting =
407       conf.getInt("hbase.splitlog.report.openedfiles", 3);
408     Path logPath = logfile.getPath();
409     long logLength = logfile.getLen();
410     LOG.info("Splitting hlog: " + logPath + ", length=" + logLength);
411     status.setStatus("Opening log file");
412     Reader in = null;
413     try {
414       in = getReader(fs, logfile, conf, skipErrors);
415     } catch (CorruptedLogFileException e) {
416       LOG.warn("Could not get reader, corrupted log file " + logPath, e);
417       ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs);
418       isCorrupted = true;
419     }
420     if (in == null) {
421       status.markComplete("Was nothing to split in log file");
422       LOG.warn("Nothing to split in log file " + logPath);
423       return true;
424     }
425     long t = EnvironmentEdgeManager.currentTimeMillis();
426     long last_report_at = t;
427     if (reporter != null && reporter.progress() == false) {
428       status.markComplete("Failed: reporter.progress asked us to terminate");
429       return false;
430     }
431     // Report progress every so many edits and/or files opened (opening a file
432     // takes a bit of time).
433     int editsCount = 0;
434     int numNewlyOpenedFiles = 0;
435     Entry entry;
436     try {
437       while ((entry = getNextLogLine(in,logPath, skipErrors)) != null) {
438         byte[] region = entry.getKey().getEncodedRegionName();
439         Object o = logWriters.get(region);
440         if (o == BAD_WRITER) {
441           continue;
442         }
443         WriterAndPath wap = (WriterAndPath)o;
444         if (wap == null) {
445           wap = createWAP(region, entry, rootDir, fs, conf);
446           numNewlyOpenedFiles++;
447           if (wap == null) {
448             // ignore edits from this region. It doesn't exist anymore.
449             // It was probably already split.
450             logWriters.put(region, BAD_WRITER);
451             continue;
452           } else {
453             logWriters.put(region, wap);
454           }
455         }
456         wap.w.append(entry);
457         outputSink.updateRegionMaximumEditLogSeqNum(entry);
458         editsCount++;
459         // If sufficient edits have passed OR we've opened a few files, check if
460         // we should report progress.
461         if (editsCount % interval == 0 ||
462             (numNewlyOpenedFiles > numOpenedFilesBeforeReporting)) {
463           // Zero out files counter each time we fall in here.
464           numNewlyOpenedFiles = 0;
465           String countsStr = "edits=" + editsCount + ", files=" + logWriters.size();
466           status.setStatus("Split " + countsStr);
467           long t1 = EnvironmentEdgeManager.currentTimeMillis();
468           if ((t1 - last_report_at) > period) {
469             last_report_at = t;
470             if (reporter != null && reporter.progress() == false) {
471               status.markComplete("Failed: reporter.progress asked us to terminate; " + countsStr);
472               progress_failed = true;
473               return false;
474             }
475           }
476         }
477       }
478     } catch (CorruptedLogFileException e) {
479       LOG.warn("Could not parse, corrupted log file " + logPath, e);
480       ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs);
481       isCorrupted = true;
482     } catch (IOException e) {
483       e = RemoteExceptionHandler.checkIOException(e);
484       throw e;
485     } finally {
486       boolean allWritersClosed = false;
487       try {
488         int n = 0;
489         for (Map.Entry<byte[], Object> logWritersEntry : logWriters.entrySet()) {
490           Object o = logWritersEntry.getValue();
491           long t1 = EnvironmentEdgeManager.currentTimeMillis();
492           if ((t1 - last_report_at) > period) {
493             last_report_at = t;
494             if ((progress_failed == false) && (reporter != null) && (reporter.progress() == false)) {
495               progress_failed = true;
496             }
497           }
498           if (o == BAD_WRITER) {
499             continue;
500           }
501           n++;
502           WriterAndPath wap = (WriterAndPath) o;
503           wap.writerClosed = true;
504           wap.w.close();
505           LOG.debug("Closed " + wap.p);
506           Path dst = getCompletedRecoveredEditsFilePath(wap.p,
507               outputSink.getRegionMaximumEditLogSeqNum(logWritersEntry.getKey()));
508           if (!dst.equals(wap.p) && fs.exists(dst)) {
509             LOG.warn("Found existing old edits file. It could be the "
510                 + "result of a previous failed split attempt. Deleting " + dst + ", length="
511                 + fs.getFileStatus(dst).getLen());
512             if (!HBaseFileSystem.deleteFileFromFileSystem(fs, dst)) {
513               LOG.warn("Failed deleting of old " + dst);
514               throw new IOException("Failed deleting of old " + dst);
515             }
516           }
517           // Skip the unit tests which create a splitter that reads and writes the
518           // data without touching disk. TestHLogSplit#testThreading is an
519           // example.
520           if (fs.exists(wap.p)) {
521             if (!HBaseFileSystem.renameDirForFileSystem(fs, wap.p, dst)) {
522               throw new IOException("Failed renaming " + wap.p + " to " + dst);
523             }
524             LOG.debug("Rename " + wap.p + " to " + dst);
525           }
526         }
527         allWritersClosed = true;
528         String msg = "Processed " + editsCount + " edits across " + n + " regions"
529             + " threw away edits for " + (logWriters.size() - n) + " regions" + "; log file="
530             + logPath + " is corrupted = " + isCorrupted + " progress failed = " + progress_failed;
531         LOG.info(msg);
532         status.markComplete(msg);
533       } finally {
534         if (!allWritersClosed) {
535           for (Map.Entry<byte[], Object> logWritersEntry : logWriters.entrySet()) {
536             Object o = logWritersEntry.getValue();
537             if (o != BAD_WRITER) {
538               WriterAndPath wap = (WriterAndPath) o;
539               try {
540                 if (!wap.writerClosed) {
541                   wap.writerClosed = true;
542                   wap.w.close();
543                 }
544               } catch (IOException e) {
545                 LOG.debug("Exception while closing the writer :", e);
546               }
547             }
548           }
549         }
550         in.close();
551       }
552     }
553     return !progress_failed;
554   }
555 
556   /**
557    * Completes the work done by splitLogFile by archiving logs
558    * <p>
559    * It is invoked by SplitLogManager once it knows that one of the
560    * SplitLogWorkers have completed the splitLogFile() part. If the master
561    * crashes then this function might get called multiple times.
562    * <p>
563    * @param logfile
564    * @param conf
565    * @throws IOException
566    */
567   public static void finishSplitLogFile(String logfile, Configuration conf)
568       throws IOException {
569     Path rootdir = FSUtils.getRootDir(conf);
570     Path oldLogDir = new Path(rootdir, HConstants.HREGION_OLDLOGDIR_NAME);
571     finishSplitLogFile(rootdir, oldLogDir, logfile, conf);
572   }
573 
574   public static void finishSplitLogFile(Path rootdir, Path oldLogDir,
575       String logfile, Configuration conf) throws IOException {
576     List<Path> processedLogs = new ArrayList<Path>();
577     List<Path> corruptedLogs = new ArrayList<Path>();
578     FileSystem fs;
579     fs = rootdir.getFileSystem(conf);
580     Path logPath = new Path(logfile);
581     if (ZKSplitLog.isCorrupted(rootdir, logPath.getName(), fs)) {
582       corruptedLogs.add(logPath);
583     } else {
584       processedLogs.add(logPath);
585     }
586     archiveLogs(null, corruptedLogs, processedLogs, oldLogDir, fs, conf);
587     Path stagingDir = ZKSplitLog.getSplitLogDir(rootdir, logPath.getName());
588     HBaseFileSystem.deleteDirFromFileSystem(fs, stagingDir);
589   }
590 
591   /**
592    * Moves processed logs to a oldLogDir after successful processing Moves
593    * corrupted logs (any log that couldn't be successfully parsed to corruptDir
594    * (.corrupt) for later investigation
595    *
596    * @param corruptedLogs
597    * @param processedLogs
598    * @param oldLogDir
599    * @param fs
600    * @param conf
601    * @throws IOException
602    */
603   private static void archiveLogs(
604       final Path srcDir,
605       final List<Path> corruptedLogs,
606       final List<Path> processedLogs, final Path oldLogDir,
607       final FileSystem fs, final Configuration conf) throws IOException {
608     final Path corruptDir = new Path(conf.get(HConstants.HBASE_DIR), conf.get(
609         "hbase.regionserver.hlog.splitlog.corrupt.dir",  HConstants.CORRUPT_DIR_NAME));
610 
611     if (!HBaseFileSystem.makeDirOnFileSystem(fs, corruptDir)) {
612       LOG.info("Unable to mkdir " + corruptDir);
613     }
614     HBaseFileSystem.makeDirOnFileSystem(fs, oldLogDir);
615 
616     // this method can get restarted or called multiple times for archiving
617     // the same log files.
618     for (Path corrupted : corruptedLogs) {
619       Path p = new Path(corruptDir, corrupted.getName());
620       if (fs.exists(corrupted)) {
621         if (!HBaseFileSystem.renameDirForFileSystem(fs, corrupted, p)) {
622           LOG.warn("Unable to move corrupted log " + corrupted + " to " + p);
623         } else {
624           LOG.warn("Moving corrupted log " + corrupted + " to " + p);
625         }
626       }
627     }
628 
629     for (Path p : processedLogs) {
630       Path newPath = HLog.getHLogArchivePath(oldLogDir, p);
631       if (fs.exists(p)) {
632         if (!HBaseFileSystem.renameAndSetModifyTime(fs, p, newPath)) {
633           LOG.warn("Unable to move  " + p + " to " + newPath);
634         } else {
635           LOG.debug("Archived processed log " + p + " to " + newPath);
636         }
637       }
638     }
639 
640     // distributed log splitting removes the srcDir (region's log dir) later
641     // when all the log files in that srcDir have been successfully processed
642     if (srcDir != null && !HBaseFileSystem.deleteDirFromFileSystem(fs, srcDir)) {
643       throw new IOException("Unable to delete src dir: " + srcDir);
644     }
645   }
646 
647   /**
648    * Path to a file under RECOVERED_EDITS_DIR directory of the region found in
649    * <code>logEntry</code> named for the sequenceid in the passed
650    * <code>logEntry</code>: e.g. /hbase/some_table/2323432434/recovered.edits/2332.
651    * This method also ensures existence of RECOVERED_EDITS_DIR under the region
652    * creating it if necessary.
653    * @param fs
654    * @param logEntry
655    * @param rootDir HBase root dir.
656    * @return Path to file into which to dump split log edits.
657    * @throws IOException
658    */
659   static Path getRegionSplitEditsPath(final FileSystem fs,
660       final Entry logEntry, final Path rootDir, boolean isCreate)
661   throws IOException {
662     Path tableDir = HTableDescriptor.getTableDir(rootDir, logEntry.getKey()
663         .getTablename());
664     String encodedRegionName = Bytes.toString(logEntry.getKey().getEncodedRegionName());
665     Path regiondir = HRegion.getRegionDir(tableDir, encodedRegionName);
666     Path dir = HLog.getRegionDirRecoveredEditsDir(regiondir);
667 
668     if (!fs.exists(regiondir)) {
669       LOG.info("This region's directory doesn't exist: "
670           + regiondir.toString() + ". It is very likely that it was" +
671           " already split so it's safe to discard those edits.");
672       return null;
673     }
674     if (fs.exists(dir) && fs.isFile(dir)) {
675       Path tmp = new Path("/tmp");
676       if (!fs.exists(tmp)) {
677         fs.mkdirs(tmp);
678       }
679       tmp = new Path(tmp,
680         HLog.RECOVERED_EDITS_DIR + "_" + encodedRegionName);
681       LOG.warn("Found existing old file: " + dir + ". It could be some "
682         + "leftover of an old installation. It should be a folder instead. "
683         + "So moving it to " + tmp);
684       if (!HBaseFileSystem.renameDirForFileSystem(fs, dir, tmp)) {
685         LOG.warn("Failed to sideline old file " + dir);
686       }
687     }
688     if (isCreate && !fs.exists(dir) && 
689         !HBaseFileSystem.makeDirOnFileSystem(fs, dir)) {
690       LOG.warn("mkdir failed on " + dir);
691     }
692     // Append file name ends with RECOVERED_LOG_TMPFILE_SUFFIX to ensure
693     // region's replayRecoveredEdits will not delete it
694     String fileName = formatRecoveredEditsFileName(logEntry.getKey()
695         .getLogSeqNum());
696     fileName = getTmpRecoveredEditsFileName(fileName);
697     return new Path(dir, fileName);
698   }
699 
700   static String getTmpRecoveredEditsFileName(String fileName) {
701     return fileName + HLog.RECOVERED_LOG_TMPFILE_SUFFIX;
702   }
703 
704   /**
705    * Get the completed recovered edits file path, renaming it to be by last edit
706    * in the file from its first edit. Then we could use the name to skip
707    * recovered edits when doing {@link HRegion#replayRecoveredEditsIfAny}.
708    * @param srcPath
709    * @param maximumEditLogSeqNum
710    * @return dstPath take file's last edit log seq num as the name
711    */
712   static Path getCompletedRecoveredEditsFilePath(Path srcPath,
713       Long maximumEditLogSeqNum) {
714     String fileName = formatRecoveredEditsFileName(maximumEditLogSeqNum);
715     return new Path(srcPath.getParent(), fileName);
716   }
717 
718   static String formatRecoveredEditsFileName(final long seqid) {
719     return String.format("%019d", seqid);
720   }
721 
722   /**
723    * Parse a single hlog and put the edits in entryBuffers
724    *
725    * @param in the hlog reader
726    * @param path the path of the log file
727    * @param entryBuffers the buffer to hold the parsed edits
728    * @param fs the file system
729    * @param conf the configuration
730    * @param skipErrors indicator if CorruptedLogFileException should be thrown instead of IOException
731    * @throws IOException
732    * @throws CorruptedLogFileException if hlog is corrupted
733    */
734   private void parseHLog(final Reader in, Path path,
735 		EntryBuffers entryBuffers, final FileSystem fs,
736     final Configuration conf, boolean skipErrors)
737 	throws IOException, CorruptedLogFileException {
738     int editsCount = 0;
739     try {
740       Entry entry;
741       while ((entry = getNextLogLine(in, path, skipErrors)) != null) {
742         entryBuffers.appendEntry(entry);
743         editsCount++;
744       }
745     } catch (InterruptedException ie) {
746       IOException t = new InterruptedIOException();
747       t.initCause(ie);
748       throw t;
749     } finally {
750       LOG.debug("Pushed=" + editsCount + " entries from " + path);
751     }
752   }
753 
754   /**
755    * Create a new {@link Reader} for reading logs to split.
756    *
757    * @param fs
758    * @param file
759    * @param conf
760    * @return A new Reader instance
761    * @throws IOException
762    * @throws CorruptedLogFile
763    */
764   protected Reader getReader(FileSystem fs, FileStatus file, Configuration conf,
765       boolean skipErrors)
766       throws IOException, CorruptedLogFileException {
767     Path path = file.getPath();
768     long length = file.getLen();
769     Reader in;
770 
771 
772     // Check for possibly empty file. With appends, currently Hadoop reports a
773     // zero length even if the file has been sync'd. Revisit if HDFS-376 or
774     // HDFS-878 is committed.
775     if (length <= 0) {
776       LOG.warn("File " + path + " might be still open, length is 0");
777     }
778 
779     try {
780       FSUtils.getInstance(fs, conf).recoverFileLease(fs, path, conf);
781       try {
782         in = getReader(fs, path, conf);
783       } catch (EOFException e) {
784         if (length <= 0) {
785           // TODO should we ignore an empty, not-last log file if skip.errors
786           // is false? Either way, the caller should decide what to do. E.g.
787           // ignore if this is the last log in sequence.
788           // TODO is this scenario still possible if the log has been
789           // recovered (i.e. closed)
790           LOG.warn("Could not open " + path + " for reading. File is empty", e);
791           return null;
792         } else {
793           // EOFException being ignored
794           return null;
795         }
796       }
797     } catch (IOException e) {
798       if (!skipErrors) {
799         throw e;
800       }
801       CorruptedLogFileException t =
802         new CorruptedLogFileException("skipErrors=true Could not open hlog " +
803             path + " ignoring");
804       t.initCause(e);
805       throw t;
806     }
807     return in;
808   }
809 
810   static private Entry getNextLogLine(Reader in, Path path, boolean skipErrors)
811   throws CorruptedLogFileException, IOException {
812     try {
813       return in.next();
814     } catch (EOFException eof) {
815       // truncated files are expected if a RS crashes (see HBASE-2643)
816       LOG.info("EOF from hlog " + path + ".  continuing");
817       return null;
818     } catch (IOException e) {
819       // If the IOE resulted from bad file format,
820       // then this problem is idempotent and retrying won't help
821       if (e.getCause() != null &&
822           (e.getCause() instanceof ParseException ||
823            e.getCause() instanceof org.apache.hadoop.fs.ChecksumException)) {
824         LOG.warn("Parse exception " + e.getCause().toString() + " from hlog "
825            + path + ".  continuing");
826         return null;
827       }
828       if (!skipErrors) {
829         throw e;
830       }
831       CorruptedLogFileException t =
832         new CorruptedLogFileException("skipErrors=true Ignoring exception" +
833             " while parsing hlog " + path + ". Marking as corrupted");
834       t.initCause(e);
835       throw t;
836     }
837   }
838 
839 
840   private void writerThreadError(Throwable t) {
841     thrown.compareAndSet(null, t);
842   }
843 
844   /**
845    * Check for errors in the writer threads. If any is found, rethrow it.
846    */
847   private void checkForErrors() throws IOException {
848     Throwable thrown = this.thrown.get();
849     if (thrown == null) return;
850     if (thrown instanceof IOException) {
851       throw (IOException)thrown;
852     } else {
853       throw new RuntimeException(thrown);
854     }
855   }
856   /**
857    * Create a new {@link Writer} for writing log splits.
858    */
859   protected Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
860       throws IOException {
861     return hlogFs.createWriter(fs, conf, logfile);
862   }
863 
864   /**
865    * Create a new {@link Reader} for reading logs to split.
866    */
867   protected Reader getReader(FileSystem fs, Path curLogFile, Configuration conf)
868       throws IOException {
869     return HLog.getReader(fs, curLogFile, conf);
870   }
871 
872   /**
873    * Class which accumulates edits and separates them into a buffer per region
874    * while simultaneously accounting RAM usage. Blocks if the RAM usage crosses
875    * a predefined threshold.
876    *
877    * Writer threads then pull region-specific buffers from this class.
878    */
879   class EntryBuffers {
880     Map<byte[], RegionEntryBuffer> buffers =
881       new TreeMap<byte[], RegionEntryBuffer>(Bytes.BYTES_COMPARATOR);
882 
883     /* Track which regions are currently in the middle of writing. We don't allow
884        an IO thread to pick up bytes from a region if we're already writing
885        data for that region in a different IO thread. */
886     Set<byte[]> currentlyWriting = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
887 
888     long totalBuffered = 0;
889     long maxHeapUsage;
890 
891     EntryBuffers(long maxHeapUsage) {
892       this.maxHeapUsage = maxHeapUsage;
893     }
894 
895     /**
896      * Append a log entry into the corresponding region buffer.
897      * Blocks if the total heap usage has crossed the specified threshold.
898      *
899      * @throws InterruptedException
900      * @throws IOException
901      */
902     void appendEntry(Entry entry) throws InterruptedException, IOException {
903       HLogKey key = entry.getKey();
904 
905       RegionEntryBuffer buffer;
906       long incrHeap;
907       synchronized (this) {
908         buffer = buffers.get(key.getEncodedRegionName());
909         if (buffer == null) {
910           buffer = new RegionEntryBuffer(key.getTablename(), key.getEncodedRegionName());
911           buffers.put(key.getEncodedRegionName(), buffer);
912         }
913         incrHeap= buffer.appendEntry(entry);        
914       }
915 
916       // If we crossed the chunk threshold, wait for more space to be available
917       synchronized (dataAvailable) {
918         totalBuffered += incrHeap;
919         while (totalBuffered > maxHeapUsage && thrown.get() == null) {
920           LOG.debug("Used " + totalBuffered + " bytes of buffered edits, waiting for IO threads...");
921           dataAvailable.wait(3000);
922         }
923         dataAvailable.notifyAll();
924       }
925       checkForErrors();
926     }
927 
928     synchronized RegionEntryBuffer getChunkToWrite() {
929       long biggestSize=0;
930       byte[] biggestBufferKey=null;
931 
932       for (Map.Entry<byte[], RegionEntryBuffer> entry : buffers.entrySet()) {
933         long size = entry.getValue().heapSize();
934         if (size > biggestSize && !currentlyWriting.contains(entry.getKey())) {
935           biggestSize = size;
936           biggestBufferKey = entry.getKey();
937         }
938       }
939       if (biggestBufferKey == null) {
940         return null;
941       }
942 
943       RegionEntryBuffer buffer = buffers.remove(biggestBufferKey);
944       currentlyWriting.add(biggestBufferKey);
945       return buffer;
946     }
947 
948     void doneWriting(RegionEntryBuffer buffer) {
949       synchronized (this) {
950         boolean removed = currentlyWriting.remove(buffer.encodedRegionName);
951         assert removed;
952       }
953       long size = buffer.heapSize();
954 
955       synchronized (dataAvailable) {
956         totalBuffered -= size;
957         // We may unblock writers
958         dataAvailable.notifyAll();
959       }
960     }
961 
962     synchronized boolean isRegionCurrentlyWriting(byte[] region) {
963       return currentlyWriting.contains(region);
964     }
965   }
966 
967   /**
968    * A buffer of some number of edits for a given region.
969    * This accumulates edits and also provides a memory optimization in order to
970    * share a single byte array instance for the table and region name.
971    * Also tracks memory usage of the accumulated edits.
972    */
973   static class RegionEntryBuffer implements HeapSize {
974     long heapInBuffer = 0;
975     List<Entry> entryBuffer;
976     byte[] tableName;
977     byte[] encodedRegionName;
978 
979     RegionEntryBuffer(byte[] table, byte[] region) {
980       this.tableName = table;
981       this.encodedRegionName = region;
982       this.entryBuffer = new LinkedList<Entry>();
983     }
984 
985     long appendEntry(Entry entry) {
986       internify(entry);
987       entryBuffer.add(entry);
988       long incrHeap = entry.getEdit().heapSize() +
989         ClassSize.align(2 * ClassSize.REFERENCE) + // HLogKey pointers
990         0; // TODO linkedlist entry
991       heapInBuffer += incrHeap;
992       return incrHeap;
993     }
994 
995     private void internify(Entry entry) {
996       HLogKey k = entry.getKey();
997       k.internTableName(this.tableName);
998       k.internEncodedRegionName(this.encodedRegionName);
999     }
1000 
1001     public long heapSize() {
1002       return heapInBuffer;
1003     }
1004   }
1005 
1006 
1007   class WriterThread extends Thread {
1008     private volatile boolean shouldStop = false;
1009 
1010     WriterThread(int i) {
1011       super("WriterThread-" + i);
1012     }
1013 
1014     public void run()  {
1015       try {
1016         doRun();
1017       } catch (Throwable t) {
1018         LOG.error("Error in log splitting write thread", t);
1019         writerThreadError(t);
1020       }
1021     }
1022 
1023     private void doRun() throws IOException {
1024       LOG.debug("Writer thread " + this + ": starting");
1025       while (true) {
1026         RegionEntryBuffer buffer = entryBuffers.getChunkToWrite();
1027         if (buffer == null) {
1028           // No data currently available, wait on some more to show up
1029           synchronized (dataAvailable) {
1030             if (shouldStop) return;
1031             try {
1032               dataAvailable.wait(1000);
1033             } catch (InterruptedException ie) {
1034               if (!shouldStop) {
1035                 throw new RuntimeException(ie);
1036               }
1037             }
1038           }
1039           continue;
1040         }
1041 
1042         assert buffer != null;
1043         try {
1044           writeBuffer(buffer);
1045         } finally {
1046           entryBuffers.doneWriting(buffer);
1047         }
1048       }
1049     }
1050 
1051 
1052     private void writeBuffer(RegionEntryBuffer buffer) throws IOException {
1053       List<Entry> entries = buffer.entryBuffer;
1054       if (entries.isEmpty()) {
1055         LOG.warn(this.getName() + " got an empty buffer, skipping");
1056         return;
1057       }
1058 
1059       WriterAndPath wap = null;
1060 
1061       long startTime = System.nanoTime();
1062       try {
1063         int editsCount = 0;
1064 
1065         for (Entry logEntry : entries) {
1066           if (wap == null) {
1067             wap = outputSink.getWriterAndPath(logEntry);
1068             if (wap == null) {
1069               // getWriterAndPath decided we don't need to write these edits
1070               // Message was already logged
1071               return;
1072             }
1073           }
1074           wap.w.append(logEntry);
1075           outputSink.updateRegionMaximumEditLogSeqNum(logEntry);
1076           editsCount++;
1077         }
1078         // Pass along summary statistics
1079         wap.incrementEdits(editsCount);
1080         wap.incrementNanoTime(System.nanoTime() - startTime);
1081       } catch (IOException e) {
1082         e = RemoteExceptionHandler.checkIOException(e);
1083         LOG.fatal(this.getName() + " Got while writing log entry to log", e);
1084         throw e;
1085       }
1086     }
1087 
1088     void finish() {
1089       synchronized (dataAvailable) {
1090         shouldStop = true;
1091         dataAvailable.notifyAll();
1092       }
1093     }
1094   }
1095 
1096   private WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir,
1097       FileSystem fs, Configuration conf)
1098   throws IOException {
1099     Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, true);
1100     if (regionedits == null) {
1101       return null;
1102     }
1103     if (fs.exists(regionedits)) {
1104       LOG.warn("Found existing old edits file. It could be the "
1105           + "result of a previous failed split attempt. Deleting "
1106           + regionedits + ", length="
1107           + fs.getFileStatus(regionedits).getLen());
1108       if (!HBaseFileSystem.deleteFileFromFileSystem(fs, regionedits)) {
1109         LOG.warn("Failed delete of old " + regionedits);
1110       }
1111     }
1112     Writer w = createWriter(fs, regionedits, conf);
1113     LOG.debug("Creating writer path=" + regionedits + " region="
1114         + Bytes.toStringBinary(region));
1115     return (new WriterAndPath(regionedits, w));
1116   }
1117 
1118   Path convertRegionEditsToTemp(Path rootdir, Path edits, String tmpname) {
1119     List<String> components = new ArrayList<String>(10);
1120     do {
1121       components.add(edits.getName());
1122       edits = edits.getParent();
1123     } while (edits.depth() > rootdir.depth());
1124     Path ret = ZKSplitLog.getSplitLogDir(rootdir, tmpname);
1125     for (int i = components.size() - 1; i >= 0; i--) {
1126       ret = new Path(ret, components.get(i));
1127     }
1128     try {
1129       if (fs.exists(ret)) {
1130         LOG.warn("Found existing old temporary edits file. It could be the "
1131             + "result of a previous failed split attempt. Deleting "
1132             + ret + ", length="
1133             + fs.getFileStatus(ret).getLen());
1134         if (!HBaseFileSystem.deleteFileFromFileSystem(fs, ret)) {
1135           LOG.warn("Failed delete of old " + ret);
1136         }
1137       }
1138       Path dir = ret.getParent();
1139       if (!fs.exists(dir) && !HBaseFileSystem.makeDirOnFileSystem(fs, dir)) { 
1140           LOG.warn("mkdir failed on " + dir);
1141       }
1142     } catch (IOException e) {
1143       LOG.warn("Could not prepare temp staging area ", e);
1144       // ignore, exceptions will be thrown elsewhere
1145     }
1146     return ret;
1147   }
1148 
1149   /**
1150    * Class that manages the output streams from the log splitting process.
1151    */
1152   class OutputSink {
1153     private final Map<byte[], WriterAndPath> logWriters = Collections.synchronizedMap(
1154           new TreeMap<byte[], WriterAndPath>(Bytes.BYTES_COMPARATOR));
1155     private final Map<byte[], Long> regionMaximumEditLogSeqNum = Collections
1156         .synchronizedMap(new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR));
1157     private final List<WriterThread> writerThreads = Lists.newArrayList();
1158 
1159     /* Set of regions which we've decided should not output edits */
1160     private final Set<byte[]> blacklistedRegions = Collections.synchronizedSet(
1161         new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR));
1162 
1163     private boolean closeAndCleanCompleted = false;
1164     
1165     private boolean logWritersClosed  = false;
1166 
1167     /**
1168      * Start the threads that will pump data from the entryBuffers
1169      * to the output files.
1170      * @return the list of started threads
1171      */
1172     synchronized void startWriterThreads(EntryBuffers entryBuffers) {
1173       // More threads could potentially write faster at the expense
1174       // of causing more disk seeks as the logs are split.
1175       // 3. After a certain setting (probably around 3) the
1176       // process will be bound on the reader in the current
1177       // implementation anyway.
1178       int numThreads = conf.getInt(
1179           "hbase.regionserver.hlog.splitlog.writer.threads", 3);
1180 
1181       for (int i = 0; i < numThreads; i++) {
1182         WriterThread t = new WriterThread(i);
1183         t.start();
1184         writerThreads.add(t);
1185       }
1186     }
1187 
1188     List<Path> finishWritingAndClose() throws IOException {
1189       LOG.info("Waiting for split writer threads to finish");
1190       try {
1191         for (WriterThread t : writerThreads) {
1192           t.finish();
1193         }
1194         for (WriterThread t : writerThreads) {
1195           try {
1196             t.join();
1197           } catch (InterruptedException ie) {
1198             throw new IOException(ie);
1199           }
1200           checkForErrors();
1201         }
1202         LOG.info("Split writers finished");
1203 
1204         return closeStreams();
1205       } finally {
1206         List<IOException> thrown = closeLogWriters(null);
1207         if (thrown != null && !thrown.isEmpty()) {
1208           throw MultipleIOException.createIOException(thrown);
1209         }
1210       }
1211     }
1212 
1213     /**
1214      * Close all of the output streams.
1215      * @return the list of paths written.
1216      */
1217     private List<Path> closeStreams() throws IOException {
1218       Preconditions.checkState(!closeAndCleanCompleted);
1219 
1220       List<Path> paths = new ArrayList<Path>();
1221       List<IOException> thrown = Lists.newArrayList();
1222       closeLogWriters(thrown);
1223       for (Map.Entry<byte[], WriterAndPath> logWritersEntry : logWriters
1224           .entrySet()) {
1225         WriterAndPath wap = logWritersEntry.getValue();
1226         Path dst = getCompletedRecoveredEditsFilePath(wap.p,
1227             regionMaximumEditLogSeqNum.get(logWritersEntry.getKey()));
1228         try {
1229           if (!dst.equals(wap.p) && fs.exists(dst)) {
1230             LOG.warn("Found existing old edits file. It could be the "
1231                 + "result of a previous failed split attempt. Deleting " + dst
1232                 + ", length=" + fs.getFileStatus(dst).getLen());
1233             if (!HBaseFileSystem.deleteFileFromFileSystem(fs, dst)) {
1234               LOG.warn("Failed deleting of old " + dst);
1235               throw new IOException("Failed deleting of old " + dst);
1236             }
1237           }
1238           // Skip the unit tests which create a splitter that reads and writes
1239           // the data without touching disk. TestHLogSplit#testThreading is an
1240           // example.
1241           if (fs.exists(wap.p)) {
1242             if (!HBaseFileSystem.renameDirForFileSystem(fs, wap.p, dst)) {
1243               throw new IOException("Failed renaming " + wap.p + " to " + dst);
1244             }
1245             LOG.debug("Rename " + wap.p + " to " + dst);
1246           }
1247         } catch (IOException ioe) {
1248           LOG.error("Couldn't rename " + wap.p + " to " + dst, ioe);
1249           thrown.add(ioe);
1250           continue;
1251         }
1252         paths.add(dst);
1253       }
1254       if (!thrown.isEmpty()) {
1255         throw MultipleIOException.createIOException(thrown);
1256       }
1257 
1258       closeAndCleanCompleted = true;
1259       return paths;
1260     }
1261     
1262     private List<IOException> closeLogWriters(List<IOException> thrown)
1263         throws IOException {
1264       if (!logWritersClosed) {
1265         if (thrown == null) {
1266           thrown = Lists.newArrayList();
1267         }
1268         try {
1269           for (WriterThread t : writerThreads) {
1270             while (t.isAlive()) {
1271               t.shouldStop = true;
1272               t.interrupt();
1273               try {
1274                 t.join(10);
1275               } catch (InterruptedException e) {
1276                 IOException iie = new InterruptedIOException();
1277                 iie.initCause(e);
1278                 throw iie;
1279               }
1280             }
1281           }
1282         } finally {
1283           synchronized (logWriters) {
1284             for (WriterAndPath wap : logWriters.values()) {
1285               try {
1286                 wap.w.close();
1287               } catch (IOException ioe) {
1288                 LOG.error("Couldn't close log at " + wap.p, ioe);
1289                 thrown.add(ioe);
1290                 continue;
1291               }
1292               LOG.info("Closed path " + wap.p + " (wrote " + wap.editsWritten
1293                   + " edits in " + (wap.nanosSpent / 1000 / 1000) + "ms)");
1294             }
1295           }
1296           logWritersClosed = true;
1297         }
1298       }
1299       return thrown;
1300     }
1301 
1302     /**
1303      * Get a writer and path for a log starting at the given entry.
1304      *
1305      * This function is threadsafe so long as multiple threads are always
1306      * acting on different regions.
1307      *
1308      * @return null if this region shouldn't output any logs
1309      */
1310     WriterAndPath getWriterAndPath(Entry entry) throws IOException {
1311       byte region[] = entry.getKey().getEncodedRegionName();
1312       WriterAndPath ret = logWriters.get(region);
1313       if (ret != null) {
1314         return ret;
1315       }
1316       // If we already decided that this region doesn't get any output
1317       // we don't need to check again.
1318       if (blacklistedRegions.contains(region)) {
1319         return null;
1320       }
1321       ret = createWAP(region, entry, rootDir, fs, conf);
1322       if (ret == null) {
1323         blacklistedRegions.add(region);
1324         return null;
1325       }
1326       logWriters.put(region, ret);
1327       return ret;
1328     }
1329 
1330     /**
1331      * Update region's maximum edit log SeqNum.
1332      */
1333     void updateRegionMaximumEditLogSeqNum(Entry entry) {
1334       synchronized (regionMaximumEditLogSeqNum) {
1335         Long currentMaxSeqNum=regionMaximumEditLogSeqNum.get(entry.getKey().getEncodedRegionName());
1336         if (currentMaxSeqNum == null
1337             || entry.getKey().getLogSeqNum() > currentMaxSeqNum) {
1338           regionMaximumEditLogSeqNum.put(entry.getKey().getEncodedRegionName(),
1339               entry.getKey().getLogSeqNum());
1340         }
1341       }
1342 
1343     }
1344 
1345     Long getRegionMaximumEditLogSeqNum(byte[] region) {
1346       return regionMaximumEditLogSeqNum.get(region);
1347     }
1348 
1349     /**
1350      * @return a map from encoded region ID to the number of edits written out
1351      * for that region.
1352      */
1353     private Map<byte[], Long> getOutputCounts() {
1354       TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(
1355           Bytes.BYTES_COMPARATOR);
1356       synchronized (logWriters) {
1357         for (Map.Entry<byte[], WriterAndPath> entry : logWriters.entrySet()) {
1358           ret.put(entry.getKey(), entry.getValue().editsWritten);
1359         }
1360       }
1361       return ret;
1362     }
1363   }
1364 
1365 
1366 
1367   /**
1368    *  Private data structure that wraps a Writer and its Path,
1369    *  also collecting statistics about the data written to this
1370    *  output.
1371    */
1372   private final static class WriterAndPath {
1373     final Path p;
1374     final Writer w;
1375 
1376     /* Count of edits written to this path */
1377     long editsWritten = 0;
1378     /* Number of nanos spent writing to this log */
1379     long nanosSpent = 0;
1380     
1381     /* To check whether a close has already been tried on the
1382      * writer
1383      */
1384     boolean writerClosed = false;
1385 
1386     WriterAndPath(final Path p, final Writer w) {
1387       this.p = p;
1388       this.w = w;
1389     }
1390 
1391     void incrementEdits(int edits) {
1392       editsWritten += edits;
1393     }
1394 
1395     void incrementNanoTime(long nanos) {
1396       nanosSpent += nanos;
1397     }
1398   }
1399 
1400   static class CorruptedLogFileException extends Exception {
1401     private static final long serialVersionUID = 1L;
1402     CorruptedLogFileException(String s) {
1403       super(s);
1404     }
1405   }
1406 }