View Javadoc

1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.regionserver.wal;
21  
22  import java.io.DataInput;
23  import java.io.DataOutput;
24  import java.io.FileNotFoundException;
25  import java.io.IOException;
26  import java.io.OutputStream;
27  import java.io.UnsupportedEncodingException;
28  import java.lang.reflect.InvocationTargetException;
29  import java.lang.reflect.Method;
30  import java.net.URLEncoder;
31  import java.util.ArrayList;
32  import java.util.Arrays;
33  import java.util.Collections;
34  import java.util.LinkedList;
35  import java.util.List;
36  import java.util.Map;
37  import java.util.NavigableSet;
38  import java.util.SortedMap;
39  import java.util.TreeMap;
40  import java.util.TreeSet;
41  import java.util.UUID;
42  import java.util.concurrent.ConcurrentSkipListMap;
43  import java.util.concurrent.CopyOnWriteArrayList;
44  import java.util.concurrent.atomic.AtomicBoolean;
45  import java.util.concurrent.atomic.AtomicInteger;
46  import java.util.concurrent.atomic.AtomicLong;
47  import java.util.concurrent.locks.Lock;
48  import java.util.concurrent.locks.ReentrantLock;
49  import java.util.regex.Matcher;
50  import java.util.regex.Pattern;
51  
52  import org.apache.commons.logging.Log;
53  import org.apache.commons.logging.LogFactory;
54  import org.apache.hadoop.conf.Configuration;
55  import org.apache.hadoop.fs.FSDataOutputStream;
56  import org.apache.hadoop.fs.FileStatus;
57  import org.apache.hadoop.fs.FileSystem;
58  import org.apache.hadoop.fs.Path;
59  import org.apache.hadoop.fs.PathFilter;
60  import org.apache.hadoop.fs.Syncable;
61  import org.apache.hadoop.hbase.HBaseConfiguration;
62  import org.apache.hadoop.hbase.HBaseFileSystem;
63  import org.apache.hadoop.hbase.HConstants;
64  import org.apache.hadoop.hbase.HRegionInfo;
65  import org.apache.hadoop.hbase.HTableDescriptor;
66  import org.apache.hadoop.hbase.KeyValue;
67  import org.apache.hadoop.hbase.ServerName;
68  import org.apache.hadoop.hbase.util.Bytes;
69  import org.apache.hadoop.hbase.util.ClassSize;
70  import org.apache.hadoop.hbase.util.FSUtils;
71  import org.apache.hadoop.hbase.util.HasThread;
72  import org.apache.hadoop.hbase.util.Threads;
73  import org.apache.hadoop.io.Writable;
74  import org.apache.hadoop.util.StringUtils;
75  
76  /**
77   * HLog stores all the edits to the HStore.  Its the hbase write-ahead-log
78   * implementation.
79   *
80   * It performs logfile-rolling, so external callers are not aware that the
81   * underlying file is being rolled.
82   *
83   * <p>
84   * There is one HLog per RegionServer.  All edits for all Regions carried by
85   * a particular RegionServer are entered first in the HLog.
86   *
87   * <p>
88   * Each HRegion is identified by a unique long <code>int</code>. HRegions do
89   * not need to declare themselves before using the HLog; they simply include
90   * their HRegion-id in the <code>append</code> or
91   * <code>completeCacheFlush</code> calls.
92   *
93   * <p>
94   * An HLog consists of multiple on-disk files, which have a chronological order.
95   * As data is flushed to other (better) on-disk structures, the log becomes
96   * obsolete. We can destroy all the log messages for a given HRegion-id up to
97   * the most-recent CACHEFLUSH message from that HRegion.
98   *
99   * <p>
100  * It's only practical to delete entire files. Thus, we delete an entire on-disk
101  * file F when all of the messages in F have a log-sequence-id that's older
102  * (smaller) than the most-recent CACHEFLUSH message for every HRegion that has
103  * a message in F.
104  *
105  * <p>
106  * Synchronized methods can never execute in parallel. However, between the
107  * start of a cache flush and the completion point, appends are allowed but log
108  * rolling is not. To prevent log rolling taking place during this period, a
109  * separate reentrant lock is used.
110  *
111  * <p>To read an HLog, call {@link #getReader(org.apache.hadoop.fs.FileSystem,
112  * org.apache.hadoop.fs.Path, org.apache.hadoop.conf.Configuration)}.
113  *
114  */
115 public class HLog implements Syncable {
116   static final Log LOG = LogFactory.getLog(HLog.class);
117   public static final byte [] METAFAMILY = Bytes.toBytes("METAFAMILY");
118   static final byte [] METAROW = Bytes.toBytes("METAROW");
119 
120   /** File Extension used while splitting an HLog into regions (HBASE-2312) */
121   public static final String SPLITTING_EXT = "-splitting";
122   public static final boolean SPLIT_SKIP_ERRORS_DEFAULT = false;
123   /** The META region's HLog filename extension */
124   public static final String META_HLOG_FILE_EXTN = ".meta";
125   public static final String SEPARATE_HLOG_FOR_META = "hbase.regionserver.separate.hlog.for.meta";
126 
127   /*
128    * Name of directory that holds recovered edits written by the wal log
129    * splitting code, one per region
130    */
131   public static final String RECOVERED_EDITS_DIR = "recovered.edits";
132   private static final Pattern EDITFILES_NAME_PATTERN =
133     Pattern.compile("-?[0-9]+");
134   public static final String RECOVERED_LOG_TMPFILE_SUFFIX = ".temp";
135   
136   private final FileSystem fs;
137   private final Path dir;
138   private final Configuration conf;
139   private final HLogFileSystem hlogFs;
140   // Listeners that are called on WAL events.
141   private List<WALActionsListener> listeners =
142     new CopyOnWriteArrayList<WALActionsListener>();
143   private final long optionalFlushInterval;
144   private final long blocksize;
145   private final String prefix;
146   private final AtomicLong unflushedEntries = new AtomicLong(0);
147   private volatile long syncedTillHere = 0;
148   private long lastDeferredTxid;
149   private final Path oldLogDir;
150   private volatile boolean logRollRunning;
151 
152   private static Class<? extends Writer> logWriterClass;
153   private static Class<? extends Reader> logReaderClass;
154 
155   private WALCoprocessorHost coprocessorHost;
156 
157   static void resetLogReaderClass() {
158     HLog.logReaderClass = null;
159   }
160 
161   private FSDataOutputStream hdfs_out; // FSDataOutputStream associated with the current SequenceFile.writer
162   // Minimum tolerable replicas, if the actual value is lower than it, 
163   // rollWriter will be triggered
164   private int minTolerableReplication;
165   private Method getNumCurrentReplicas; // refers to DFSOutputStream.getNumCurrentReplicas
166   final static Object [] NO_ARGS = new Object []{};
167 
168   public interface Reader {
169     void init(FileSystem fs, Path path, Configuration c) throws IOException;
170     void close() throws IOException;
171     Entry next() throws IOException;
172     Entry next(Entry reuse) throws IOException;
173     void seek(long pos) throws IOException;
174     long getPosition() throws IOException;
175     void reset() throws IOException;
176   }
177 
178   public interface Writer {
179     void init(FileSystem fs, Path path, Configuration c) throws IOException;
180     void close() throws IOException;
181     void sync() throws IOException;
182     void append(Entry entry) throws IOException;
183     long getLength() throws IOException;
184   }
185 
186   /*
187    * Current log file.
188    */
189   Writer writer;
190 
191   /*
192    * Map of all log files but the current one.
193    */
194   final SortedMap<Long, Path> outputfiles =
195     Collections.synchronizedSortedMap(new TreeMap<Long, Path>());
196 
197   /*
198    * Map of encoded region names to their most recent sequence/edit id in their
199    * memstore.
200    */
201   private final ConcurrentSkipListMap<byte [], Long> lastSeqWritten =
202     new ConcurrentSkipListMap<byte [], Long>(Bytes.BYTES_COMPARATOR);
203 
204   private volatile boolean closed = false;
205 
206   private final AtomicLong logSeqNum = new AtomicLong(0);
207 
208   private boolean forMeta = false;
209 
210   // The timestamp (in ms) when the log file was created.
211   private volatile long filenum = -1;
212 
213   //number of transactions in the current Hlog.
214   private final AtomicInteger numEntries = new AtomicInteger(0);
215 
216   // If live datanode count is lower than the default replicas value,
217   // RollWriter will be triggered in each sync(So the RollWriter will be
218   // triggered one by one in a short time). Using it as a workaround to slow
219   // down the roll frequency triggered by checkLowReplication().
220   private volatile int consecutiveLogRolls = 0;
221   private final int lowReplicationRollLimit;
222 
223   // If consecutiveLogRolls is larger than lowReplicationRollLimit,
224   // then disable the rolling in checkLowReplication().
225   // Enable it if the replications recover.
226   private volatile boolean lowReplicationRollEnabled = true;
227 
228   // If > than this size, roll the log. This is typically 0.95 times the size
229   // of the default Hdfs block size.
230   private final long logrollsize;
231 
232   // size of current log 
233   private long curLogSize = 0;
234 
235   // The total size of hlog
236   private AtomicLong totalLogSize = new AtomicLong(0);
237   
238   // This lock prevents starting a log roll during a cache flush.
239   // synchronized is insufficient because a cache flush spans two method calls.
240   private final Lock cacheFlushLock = new ReentrantLock();
241 
242   // We synchronize on updateLock to prevent updates and to prevent a log roll
243   // during an update
244   // locked during appends
245   private final Object updateLock = new Object();
246   private final Object flushLock = new Object();
247 
248   private final boolean enabled;
249 
250   /*
251    * If more than this many logs, force flush of oldest region to oldest edit
252    * goes to disk.  If too many and we crash, then will take forever replaying.
253    * Keep the number of logs tidy.
254    */
255   private final int maxLogs;
256 
257   /**
258    * Thread that handles optional sync'ing
259    */
260   private final LogSyncer logSyncer;
261 
262   /** Number of log close errors tolerated before we abort */
263   private final int closeErrorsTolerated;
264 
265   private final AtomicInteger closeErrorCount = new AtomicInteger();
266 
267   /**
268    * Pattern used to validate a HLog file name
269    */
270   private static final Pattern pattern = 
271       Pattern.compile(".*\\.\\d*("+HLog.META_HLOG_FILE_EXTN+")*");
272 
273   static byte [] COMPLETE_CACHE_FLUSH;
274   static {
275     try {
276       COMPLETE_CACHE_FLUSH =
277         "HBASE::CACHEFLUSH".getBytes(HConstants.UTF8_ENCODING);
278     } catch (UnsupportedEncodingException e) {
279       assert(false);
280     }
281   }
282 
283   public static class Metric {
284     public long min = Long.MAX_VALUE;
285     public long max = 0;
286     public long total = 0;
287     public int count = 0;
288 
289     synchronized void inc(final long val) {
290       min = Math.min(min, val);
291       max = Math.max(max, val);
292       total += val;
293       ++count;
294     }
295 
296     synchronized Metric get() {
297       Metric copy = new Metric();
298       copy.min = min;
299       copy.max = max;
300       copy.total = total;
301       copy.count = count;
302       this.min = Long.MAX_VALUE;
303       this.max = 0;
304       this.total = 0;
305       this.count = 0;
306       return copy;
307     }
308   }
309 
310   // For measuring latency of writes
311   private static Metric writeTime = new Metric();
312   private static Metric writeSize = new Metric();
313   // For measuring latency of syncs
314   private static Metric syncTime = new Metric();
315   //For measuring slow HLog appends
316   private static AtomicLong slowHLogAppendCount = new AtomicLong();
317   private static Metric slowHLogAppendTime = new Metric();
318   
319   public static Metric getWriteTime() {
320     return writeTime.get();
321   }
322 
323   public static Metric getWriteSize() {
324     return writeSize.get();
325   }
326 
327   public static Metric getSyncTime() {
328     return syncTime.get();
329   }
330 
331   public static long getSlowAppendCount() {
332     return slowHLogAppendCount.get();
333   }
334   
335   public static Metric getSlowAppendTime() {
336     return slowHLogAppendTime.get();
337   }
338 
339   /**
340    * Constructor.
341    *
342    * @param fs filesystem handle
343    * @param dir path to where hlogs are stored
344    * @param oldLogDir path to where hlogs are archived
345    * @param conf configuration to use
346    * @throws IOException
347    */
348   public HLog(final FileSystem fs, final Path dir, final Path oldLogDir,
349               final Configuration conf)
350   throws IOException {
351     this(fs, dir, oldLogDir, conf, null, true, null, false);
352   }
353 
354   /**
355    * Create an edit log at the given <code>dir</code> location.
356    *
357    * You should never have to load an existing log. If there is a log at
358    * startup, it should have already been processed and deleted by the time the
359    * HLog object is started up.
360    *
361    * @param fs filesystem handle
362    * @param dir path to where hlogs are stored
363    * @param oldLogDir path to where hlogs are archived
364    * @param conf configuration to use
365    * @param listeners Listeners on WAL events. Listeners passed here will
366    * be registered before we do anything else; e.g. the
367    * Constructor {@link #rollWriter()}.
368    * @param prefix should always be hostname and port in distributed env and
369    *        it will be URL encoded before being used.
370    *        If prefix is null, "hlog" will be used
371    * @throws IOException
372    */
373   public HLog(final FileSystem fs, final Path dir, final Path oldLogDir,
374       final Configuration conf, final List<WALActionsListener> listeners,
375       final String prefix) throws IOException {
376     this(fs, dir, oldLogDir, conf, listeners, true, prefix, false);
377   }
378 
379   /**
380    * Create an edit log at the given <code>dir</code> location.
381    *
382    * You should never have to load an existing log. If there is a log at
383    * startup, it should have already been processed and deleted by the time the
384    * HLog object is started up.
385    *
386    * @param fs filesystem handle
387    * @param dir path to where hlogs are stored
388    * @param oldLogDir path to where hlogs are archived
389    * @param conf configuration to use
390    * @param listeners Listeners on WAL events. Listeners passed here will
391    * be registered before we do anything else; e.g. the
392    * Constructor {@link #rollWriter()}.
393    * @param failIfLogDirExists If true IOException will be thrown if dir already exists.
394    * @param prefix should always be hostname and port in distributed env and
395    *        it will be URL encoded before being used.
396    *        If prefix is null, "hlog" will be used
397    * @param forMeta if this hlog is meant for meta updates
398    * @throws IOException
399    */
400   public HLog(final FileSystem fs, final Path dir, final Path oldLogDir,
401       final Configuration conf, final List<WALActionsListener> listeners,
402       final boolean failIfLogDirExists, final String prefix, boolean forMeta)
403   throws IOException {
404     super();
405     this.fs = fs;
406     this.dir = dir;
407     this.conf = conf;
408     this.hlogFs = new HLogFileSystem(conf);
409     if (listeners != null) {
410       for (WALActionsListener i: listeners) {
411         registerWALActionsListener(i);
412       }
413     }
414     this.blocksize = conf.getLong("hbase.regionserver.hlog.blocksize",
415         FSUtils.getDefaultBlockSize(this.fs, this.dir));
416     // Roll at 95% of block size.
417     float multi = conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f);
418     this.logrollsize = (long)(this.blocksize * multi);
419     this.optionalFlushInterval =
420       conf.getLong("hbase.regionserver.optionallogflushinterval", 1 * 1000);
421     boolean dirExists = false;
422     if (failIfLogDirExists && (dirExists = this.fs.exists(dir))) {
423       throw new IOException("Target HLog directory already exists: " + dir);
424     }
425     if (!dirExists && !HBaseFileSystem.makeDirOnFileSystem(fs, dir)) {
426       throw new IOException("Unable to mkdir " + dir);
427     }
428     this.oldLogDir = oldLogDir;
429     if (!fs.exists(oldLogDir) && !HBaseFileSystem.makeDirOnFileSystem(fs, oldLogDir)) {
430       throw new IOException("Unable to mkdir " + this.oldLogDir);
431     }
432     this.forMeta = forMeta;
433     this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", 32);
434     this.minTolerableReplication = conf.getInt(
435         "hbase.regionserver.hlog.tolerable.lowreplication",
436         FSUtils.getDefaultReplication(this.fs, this.dir));
437     this.lowReplicationRollLimit = conf.getInt(
438         "hbase.regionserver.hlog.lowreplication.rolllimit", 5);
439     this.enabled = conf.getBoolean("hbase.regionserver.hlog.enabled", true);
440     this.closeErrorsTolerated = conf.getInt(
441         "hbase.regionserver.logroll.errors.tolerated", 0);
442 
443     LOG.info("HLog configuration: blocksize=" +
444       StringUtils.byteDesc(this.blocksize) +
445       ", rollsize=" + StringUtils.byteDesc(this.logrollsize) +
446       ", enabled=" + this.enabled +
447       ", optionallogflushinternal=" + this.optionalFlushInterval + "ms");
448     // If prefix is null||empty then just name it hlog
449     this.prefix = prefix == null || prefix.isEmpty() ?
450         "hlog" : URLEncoder.encode(prefix, "UTF8");
451     // rollWriter sets this.hdfs_out if it can.
452     rollWriter();
453 
454     // handle the reflection necessary to call getNumCurrentReplicas()
455     this.getNumCurrentReplicas = getGetNumCurrentReplicas(this.hdfs_out);
456 
457     logSyncer = new LogSyncer(this.optionalFlushInterval);
458     // When optionalFlushInterval is set as 0, don't start a thread for deferred log sync.
459     if (this.optionalFlushInterval > 0) {
460       Threads.setDaemonThreadRunning(logSyncer.getThread(), Thread.currentThread().getName()
461           + ".logSyncer");
462     } else {
463       LOG.info("hbase.regionserver.optionallogflushinterval is set as "
464           + this.optionalFlushInterval + ". Deferred log syncing won't work. "
465           + "Any Mutation, marked to be deferred synced, will be flushed immediately.");
466     }
467     coprocessorHost = new WALCoprocessorHost(this, conf);
468   }
469 
470   /**
471    * Find the 'getNumCurrentReplicas' on the passed <code>os</code> stream.
472    * @return Method or null.
473    */
474   private Method getGetNumCurrentReplicas(final FSDataOutputStream os) {
475     Method m = null;
476     if (os != null) {
477       Class<? extends OutputStream> wrappedStreamClass = os.getWrappedStream()
478           .getClass();
479       try {
480         m = wrappedStreamClass.getDeclaredMethod("getNumCurrentReplicas",
481             new Class<?>[] {});
482         m.setAccessible(true);
483       } catch (NoSuchMethodException e) {
484         LOG.info("FileSystem's output stream doesn't support"
485             + " getNumCurrentReplicas; --HDFS-826 not available; fsOut="
486             + wrappedStreamClass.getName());
487       } catch (SecurityException e) {
488         LOG.info("Doesn't have access to getNumCurrentReplicas on "
489             + "FileSystems's output stream --HDFS-826 not available; fsOut="
490             + wrappedStreamClass.getName(), e);
491         m = null; // could happen on setAccessible()
492       }
493     }
494     if (m != null) {
495       LOG.info("Using getNumCurrentReplicas--HDFS-826");
496     }
497     return m;
498   }
499 
500   public void registerWALActionsListener(final WALActionsListener listener) {
501     this.listeners.add(listener);
502   }
503 
504   public boolean unregisterWALActionsListener(final WALActionsListener listener) {
505     return this.listeners.remove(listener);
506   }
507 
508   /**
509    * @return Current state of the monotonically increasing file id.
510    */
511   public long getFilenum() {
512     return this.filenum;
513   }
514 
515   /**
516    * Called by HRegionServer when it opens a new region to ensure that log
517    * sequence numbers are always greater than the latest sequence number of the
518    * region being brought on-line.
519    *
520    * @param newvalue We'll set log edit/sequence number to this value if it
521    * is greater than the current value.
522    */
523   public void setSequenceNumber(final long newvalue) {
524     for (long id = this.logSeqNum.get(); id < newvalue &&
525         !this.logSeqNum.compareAndSet(id, newvalue); id = this.logSeqNum.get()) {
526       // This could spin on occasion but better the occasional spin than locking
527       // every increment of sequence number.
528       LOG.debug("Changed sequenceid from " + logSeqNum + " to " + newvalue);
529     }
530   }
531 
532   /**
533    * @return log sequence number
534    */
535   public long getSequenceNumber() {
536     return logSeqNum.get();
537   }
538 
539   /**
540    * Method used internal to this class and for tests only.
541    * @return The wrapped stream our writer is using; its not the
542    * writer's 'out' FSDatoOutputStream but the stream that this 'out' wraps
543    * (In hdfs its an instance of DFSDataOutputStream).
544    */
545   // usage: see TestLogRolling.java
546   OutputStream getOutputStream() {
547     return this.hdfs_out.getWrappedStream();
548   }
549 
550   /**
551    * Roll the log writer. That is, start writing log messages to a new file.
552    *
553    * Because a log cannot be rolled during a cache flush, and a cache flush
554    * spans two method calls, a special lock needs to be obtained so that a cache
555    * flush cannot start when the log is being rolled and the log cannot be
556    * rolled during a cache flush.
557    *
558    * <p>Note that this method cannot be synchronized because it is possible that
559    * startCacheFlush runs, obtaining the cacheFlushLock, then this method could
560    * start which would obtain the lock on this but block on obtaining the
561    * cacheFlushLock and then completeCacheFlush could be called which would wait
562    * for the lock on this and consequently never release the cacheFlushLock
563    *
564    * @return If lots of logs, flush the returned regions so next time through
565    * we can clean logs. Returns null if nothing to flush.  Names are actual
566    * region names as returned by {@link HRegionInfo#getEncodedName()}
567    * @throws org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException
568    * @throws IOException
569    */
570   public byte [][] rollWriter() throws FailedLogCloseException, IOException {
571     return rollWriter(false);
572   }
573 
574   /**
575    * Roll the log writer. That is, start writing log messages to a new file.
576    *
577    * Because a log cannot be rolled during a cache flush, and a cache flush
578    * spans two method calls, a special lock needs to be obtained so that a cache
579    * flush cannot start when the log is being rolled and the log cannot be
580    * rolled during a cache flush.
581    *
582    * <p>Note that this method cannot be synchronized because it is possible that
583    * startCacheFlush runs, obtaining the cacheFlushLock, then this method could
584    * start which would obtain the lock on this but block on obtaining the
585    * cacheFlushLock and then completeCacheFlush could be called which would wait
586    * for the lock on this and consequently never release the cacheFlushLock
587    *
588    * @param force If true, force creation of a new writer even if no entries
589    * have been written to the current writer
590    * @return If lots of logs, flush the returned regions so next time through
591    * we can clean logs. Returns null if nothing to flush.  Names are actual
592    * region names as returned by {@link HRegionInfo#getEncodedName()}
593    * @throws org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException
594    * @throws IOException
595    */
596   public byte [][] rollWriter(boolean force)
597       throws FailedLogCloseException, IOException {
598     // Return if nothing to flush.
599     if (!force && this.writer != null && this.numEntries.get() <= 0) {
600       return null;
601     }
602     byte [][] regionsToFlush = null;
603     this.cacheFlushLock.lock();
604     this.logRollRunning = true;
605     try {
606       if (closed) {
607         LOG.debug("HLog closed.  Skipping rolling of writer");
608         return regionsToFlush;
609       }
610       // Do all the preparation outside of the updateLock to block
611       // as less as possible the incoming writes
612       long currentFilenum = this.filenum;
613       Path oldPath = null;
614       if (currentFilenum > 0) {
615         //computeFilename  will take care of meta hlog filename
616         oldPath = computeFilename(currentFilenum);
617       }
618       this.filenum = System.currentTimeMillis();
619       Path newPath = computeFilename();
620 
621       // Tell our listeners that a new log is about to be created
622       if (!this.listeners.isEmpty()) {
623         for (WALActionsListener i : this.listeners) {
624           i.preLogRoll(oldPath, newPath);
625         }
626       }
627       HLog.Writer nextWriter = this.createWriterInstance(fs, newPath, conf);
628       // Can we get at the dfsclient outputstream?  If an instance of
629       // SFLW, it'll have done the necessary reflection to get at the
630       // protected field name.
631       FSDataOutputStream nextHdfsOut = null;
632       if (nextWriter instanceof SequenceFileLogWriter) {
633         nextHdfsOut = ((SequenceFileLogWriter)nextWriter).getWriterFSDataOutputStream();
634         // perform the costly sync before we get the lock to roll writers.
635         try {
636           nextWriter.sync();
637         } catch (IOException e) {
638           // optimization failed, no need to abort here.
639           LOG.warn("pre-sync failed", e);
640         }
641       }
642 
643       synchronized (updateLock) {
644         // Clean up current writer.
645         Path oldFile = cleanupCurrentWriter(currentFilenum);
646         this.writer = nextWriter;
647         this.hdfs_out = nextHdfsOut;
648 
649         long oldFileLen = 0;
650         if (oldFile != null) {
651           oldFileLen = this.fs.getFileStatus(oldFile).getLen();
652           this.totalLogSize.addAndGet(oldFileLen);
653         }
654         LOG.info((oldFile != null?
655             "Roll " + FSUtils.getPath(oldFile) + ", entries=" +
656             this.numEntries.get() +
657             ", filesize=" + oldFileLen + ". ": "") +
658           " for " + FSUtils.getPath(newPath));
659         this.numEntries.set(0);
660       }
661       // Tell our listeners that a new log was created
662       if (!this.listeners.isEmpty()) {
663         for (WALActionsListener i : this.listeners) {
664           i.postLogRoll(oldPath, newPath);
665         }
666       }
667 
668       // Can we delete any of the old log files?
669       if (this.outputfiles.size() > 0) {
670         if (this.lastSeqWritten.isEmpty()) {
671           LOG.debug("Last sequenceid written is empty. Deleting all old hlogs");
672           // If so, then no new writes have come in since all regions were
673           // flushed (and removed from the lastSeqWritten map). Means can
674           // remove all but currently open log file.
675           for (Map.Entry<Long, Path> e : this.outputfiles.entrySet()) {
676             Path path = e.getValue();
677             this.totalLogSize.addAndGet(-this.fs.getFileStatus(path).getLen());
678             archiveLogFile(path, e.getKey());
679           }
680           this.outputfiles.clear();
681         } else {
682           regionsToFlush = cleanOldLogs();
683         }
684       }
685     } finally {
686       this.logRollRunning = false;
687       this.cacheFlushLock.unlock();
688     }
689     return regionsToFlush;
690   }
691 
692   /**
693    * This method allows subclasses to inject different writers without having to
694    * extend other methods like rollWriter().
695    * 
696    * @param fs
697    * @param path
698    * @param conf
699    * @return Writer instance
700    * @throws IOException
701    */
702   protected Writer createWriterInstance(final FileSystem fs, final Path path,
703       final Configuration conf) throws IOException {
704     if (forMeta) {
705       //TODO: set a higher replication for the hlog files (HBASE-6773)
706     }
707     return this.hlogFs.createWriter(fs, conf, path);
708   }
709 
710   /**
711    * Get a reader for the WAL.
712    * The proper way to tail a log that can be under construction is to first use this method
713    * to get a reader then call {@link HLog.Reader#reset()} to see the new data. It will also
714    * take care of keeping implementation-specific context (like compression).
715    * @param fs
716    * @param path
717    * @param conf
718    * @return A WAL reader.  Close when done with it.
719    * @throws IOException
720    */
721   public static Reader getReader(final FileSystem fs, final Path path,
722                                  Configuration conf)
723       throws IOException {
724     try {
725 
726       if (logReaderClass == null) {
727 
728         logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl",
729             SequenceFileLogReader.class, Reader.class);
730       }
731 
732 
733       HLog.Reader reader = logReaderClass.newInstance();
734       reader.init(fs, path, conf);
735       return reader;
736     } catch (IOException e) {
737       throw e;
738     }
739     catch (Exception e) {
740       throw new IOException("Cannot get log reader", e);
741     }
742   }
743 
744   /**
745    * Get a writer for the WAL.
746    * @param path
747    * @param conf
748    * @return A WAL writer.  Close when done with it.
749    * @throws IOException
750    */
751   public static Writer createWriter(final FileSystem fs,
752       final Path path, Configuration conf)
753   throws IOException {
754     try {
755       if (logWriterClass == null) {
756         logWriterClass = conf.getClass("hbase.regionserver.hlog.writer.impl",
757             SequenceFileLogWriter.class, Writer.class);
758       }
759       HLog.Writer writer = (HLog.Writer) logWriterClass.newInstance();
760       writer.init(fs, path, conf);
761       return writer;
762     } catch (Exception e) {
763       throw new IOException("cannot get log writer", e);
764     }
765   }
766 
767   /*
768    * Clean up old commit logs.
769    * @return If lots of logs, flush the returned region so next time through
770    * we can clean logs. Returns null if nothing to flush.  Returns array of
771    * encoded region names to flush.
772    * @throws IOException
773    */
774   private byte [][] cleanOldLogs() throws IOException {
775     Long oldestOutstandingSeqNum = getOldestOutstandingSeqNum();
776     // Get the set of all log files whose last sequence number is smaller than
777     // the oldest edit's sequence number.
778     TreeSet<Long> sequenceNumbers =
779       new TreeSet<Long>(this.outputfiles.headMap(
780         (Long.valueOf(oldestOutstandingSeqNum.longValue()))).keySet());
781     // Now remove old log files (if any)
782     int logsToRemove = sequenceNumbers.size();
783     if (logsToRemove > 0) {
784       if (LOG.isDebugEnabled()) {
785         // Find associated region; helps debugging.
786         byte [] oldestRegion = getOldestRegion(oldestOutstandingSeqNum);
787         LOG.debug("Found " + logsToRemove + " hlogs to remove" +
788           " out of total " + this.outputfiles.size() + ";" +
789           " oldest outstanding sequenceid is " + oldestOutstandingSeqNum +
790           " from region " + Bytes.toStringBinary(oldestRegion));
791       }
792       for (Long seq : sequenceNumbers) {
793         archiveLogFile(this.outputfiles.remove(seq), seq);
794       }
795     }
796 
797     // If too many log files, figure which regions we need to flush.
798     // Array is an array of encoded region names.
799     byte [][] regions = null;
800     int logCount = this.outputfiles == null? 0: this.outputfiles.size();
801     if (logCount > this.maxLogs && logCount > 0) {
802       // This is an array of encoded region names.
803       regions = findMemstoresWithEditsEqualOrOlderThan(this.outputfiles.firstKey(),
804         this.lastSeqWritten);
805       if (regions != null) {
806         StringBuilder sb = new StringBuilder();
807         for (int i = 0; i < regions.length; i++) {
808           if (i > 0) sb.append(", ");
809           sb.append(Bytes.toStringBinary(regions[i]));
810         }
811         LOG.info("Too many hlogs: logs=" + logCount + ", maxlogs=" +
812            this.maxLogs + "; forcing flush of " + regions.length + " regions(s): " +
813            sb.toString());
814       }
815     }
816     return regions;
817   }
818 
819   /**
820    * Return regions (memstores) that have edits that are equal or less than
821    * the passed <code>oldestWALseqid</code>.
822    * @param oldestWALseqid
823    * @param regionsToSeqids Encoded region names to sequence ids
824    * @return All regions whose seqid is < than <code>oldestWALseqid</code> (Not
825    * necessarily in order).  Null if no regions found.
826    */
827   static byte [][] findMemstoresWithEditsEqualOrOlderThan(final long oldestWALseqid,
828       final Map<byte [], Long> regionsToSeqids) {
829     //  This method is static so it can be unit tested the easier.
830     List<byte []> regions = null;
831     for (Map.Entry<byte [], Long> e: regionsToSeqids.entrySet()) {
832       if (e.getValue().longValue() <= oldestWALseqid) {
833         if (regions == null) regions = new ArrayList<byte []>();
834         // Key is encoded region name.
835         regions.add(e.getKey());
836       }
837     }
838     return regions == null?
839       null: regions.toArray(new byte [][] {HConstants.EMPTY_BYTE_ARRAY});
840   }
841 
842   /*
843    * @return Logs older than this id are safe to remove.
844    */
845   private Long getOldestOutstandingSeqNum() {
846     return Collections.min(this.lastSeqWritten.values());
847   }
848 
849   /**
850    * @param oldestOutstandingSeqNum
851    * @return (Encoded) name of oldest outstanding region.
852    */
853   private byte [] getOldestRegion(final Long oldestOutstandingSeqNum) {
854     byte [] oldestRegion = null;
855     for (Map.Entry<byte [], Long> e: this.lastSeqWritten.entrySet()) {
856       if (e.getValue().longValue() == oldestOutstandingSeqNum.longValue()) {
857         // Key is encoded region name.
858         oldestRegion = e.getKey();
859         break;
860       }
861     }
862     return oldestRegion;
863   }
864 
865   /*
866    * Cleans up current writer closing and adding to outputfiles.
867    * Presumes we're operating inside an updateLock scope.
868    * @return Path to current writer or null if none.
869    * @throws IOException
870    */
871   Path cleanupCurrentWriter(final long currentfilenum) throws IOException {
872     Path oldFile = null;
873     if (this.writer != null) {
874       // Close the current writer, get a new one.
875       try {
876         // Wait till all current transactions are written to the hlog.
877         // No new transactions can occur because we have the updatelock.
878         if (this.unflushedEntries.get() != this.syncedTillHere) {
879           LOG.debug("cleanupCurrentWriter " +
880                    " waiting for transactions to get synced " +
881                    " total " + this.unflushedEntries.get() +
882                    " synced till here " + syncedTillHere);
883           sync();
884         }
885         this.writer.close();
886         this.writer = null;
887         closeErrorCount.set(0);
888       } catch (IOException e) {
889         LOG.error("Failed close of HLog writer", e);
890         int errors = closeErrorCount.incrementAndGet();
891         if (errors <= closeErrorsTolerated && !hasDeferredEntries()) {
892           LOG.warn("Riding over HLog close failure! error count="+errors);
893         } else {
894           if (hasDeferredEntries()) {
895             LOG.error("Aborting due to unflushed edits in HLog");
896           }
897           // Failed close of log file.  Means we're losing edits.  For now,
898           // shut ourselves down to minimize loss.  Alternative is to try and
899           // keep going.  See HBASE-930.
900           FailedLogCloseException flce =
901             new FailedLogCloseException("#" + currentfilenum);
902           flce.initCause(e);
903           throw flce;
904         }
905       }
906       if (currentfilenum >= 0) {
907         oldFile = computeFilename(currentfilenum);
908         this.outputfiles.put(Long.valueOf(this.logSeqNum.get()), oldFile);
909       }
910     }
911     return oldFile;
912   }
913 
914   private void archiveLogFile(final Path p, final Long seqno) throws IOException {
915     Path newPath = getHLogArchivePath(this.oldLogDir, p);
916     LOG.info("moving old hlog file " + FSUtils.getPath(p) +
917       " whose highest sequenceid is " + seqno + " to " +
918       FSUtils.getPath(newPath));
919 
920     // Tell our listeners that a log is going to be archived.
921     if (!this.listeners.isEmpty()) {
922       for (WALActionsListener i : this.listeners) {
923         i.preLogArchive(p, newPath);
924       }
925     }
926     if (!HBaseFileSystem.renameAndSetModifyTime(this.fs, p, newPath)) {
927       throw new IOException("Unable to rename " + p + " to " + newPath);
928     }
929     // Tell our listeners that a log has been archived.
930     if (!this.listeners.isEmpty()) {
931       for (WALActionsListener i : this.listeners) {
932         i.postLogArchive(p, newPath);
933       }
934     }
935   }
936 
937   /**
938    * This is a convenience method that computes a new filename with a given
939    * using the current HLog file-number
940    * @return Path
941    */
942   protected Path computeFilename() {
943     return computeFilename(this.filenum);
944   }
945 
946   /**
947    * This is a convenience method that computes a new filename with a given
948    * file-number.
949    * @param filenum to use
950    * @return Path
951    */
952   protected Path computeFilename(long filenum) {
953     if (filenum < 0) {
954       throw new RuntimeException("hlog file number can't be < 0");
955     }
956     String child = prefix + "." + filenum;
957     if (forMeta) {
958       child += HLog.META_HLOG_FILE_EXTN;
959     }
960     return new Path(dir, child);
961   }
962 
963   public static boolean isMetaFile(Path p) {
964     if (p.getName().endsWith(HLog.META_HLOG_FILE_EXTN)) {
965       return true;
966     }
967     return false;
968   }
969 
970   /**
971    * Shut down the log and delete the log directory
972    *
973    * @throws IOException
974    */
975   public void closeAndDelete() throws IOException {
976     close();
977     if (!fs.exists(this.dir)) return;
978     FileStatus[] files = fs.listStatus(this.dir);
979     for(FileStatus file : files) {
980 
981       Path p = getHLogArchivePath(this.oldLogDir, file.getPath());
982       // Tell our listeners that a log is going to be archived.
983       if (!this.listeners.isEmpty()) {
984         for (WALActionsListener i : this.listeners) {
985           i.preLogArchive(file.getPath(), p);
986         }
987       }
988       if (!HBaseFileSystem.renameAndSetModifyTime(fs, file.getPath(), p)) {
989         throw new IOException("Unable to rename " + file.getPath() + " to " + p);
990       }
991       // Tell our listeners that a log was archived.
992       if (!this.listeners.isEmpty()) {
993         for (WALActionsListener i : this.listeners) {
994           i.postLogArchive(file.getPath(), p);
995         }
996       }
997     }
998     LOG.debug("Moved " + files.length + " log files to " +
999       FSUtils.getPath(this.oldLogDir));
1000     if (!HBaseFileSystem.deleteDirFromFileSystem(fs, dir)) {
1001       LOG.info("Unable to delete " + dir);
1002     }
1003   }
1004 
1005   /**
1006    * Shut down the log.
1007    *
1008    * @throws IOException
1009    */
1010   public void close() throws IOException {
1011     // When optionalFlushInterval is 0, the logSyncer is not started as a Thread.
1012     if (this.optionalFlushInterval > 0) {
1013       try {
1014         logSyncer.close();
1015         // Make sure we synced everything
1016         logSyncer.join(this.optionalFlushInterval * 2);
1017       } catch (InterruptedException e) {
1018         LOG.error("Exception while waiting for syncer thread to die", e);
1019       }
1020     }
1021 
1022     cacheFlushLock.lock();
1023     try {
1024       // Tell our listeners that the log is closing
1025       if (!this.listeners.isEmpty()) {
1026         for (WALActionsListener i : this.listeners) {
1027           i.logCloseRequested();
1028         }
1029       }
1030       synchronized (updateLock) {
1031         this.closed = true;
1032         if (LOG.isDebugEnabled()) {
1033           LOG.debug("closing hlog writer in " + this.dir.toString());
1034         }
1035         if (this.writer != null) {
1036           this.writer.close();
1037         }
1038       }
1039     } finally {
1040       cacheFlushLock.unlock();
1041     }
1042   }
1043 
1044   /**
1045    * @param now
1046    * @param regionName
1047    * @param tableName
1048    * @param clusterId
1049    * @return New log key.
1050    */
1051   protected HLogKey makeKey(byte[] regionName, byte[] tableName, long seqnum,
1052       long now, UUID clusterId) {
1053     return new HLogKey(regionName, tableName, seqnum, now, clusterId);
1054   }
1055 
1056 
1057   /** Append an entry to the log.
1058    *
1059    * @param regionInfo
1060    * @param logEdit
1061    * @param logKey
1062    * @param doSync shall we sync after writing the transaction
1063    * @return The txid of this transaction
1064    * @throws IOException
1065    */
1066   public long append(HRegionInfo regionInfo, HLogKey logKey, WALEdit logEdit,
1067                      HTableDescriptor htd, boolean doSync)
1068   throws IOException {
1069     if (this.closed) {
1070       throw new IOException("Cannot append; log is closed");
1071     }
1072     long txid = 0;
1073     synchronized (updateLock) {
1074       long seqNum = obtainSeqNum();
1075       logKey.setLogSeqNum(seqNum);
1076       // The 'lastSeqWritten' map holds the sequence number of the oldest
1077       // write for each region (i.e. the first edit added to the particular
1078       // memstore). When the cache is flushed, the entry for the
1079       // region being flushed is removed if the sequence number of the flush
1080       // is greater than or equal to the value in lastSeqWritten.
1081       this.lastSeqWritten.putIfAbsent(regionInfo.getEncodedNameAsBytes(),
1082         Long.valueOf(seqNum));
1083       doWrite(regionInfo, logKey, logEdit, htd);
1084       txid = this.unflushedEntries.incrementAndGet();
1085       this.numEntries.incrementAndGet();
1086       if (htd.isDeferredLogFlush()) {
1087         lastDeferredTxid = txid;
1088       }
1089     }
1090 
1091     // Sync if catalog region, and if not then check if that table supports
1092     // deferred log flushing
1093     if (doSync &&
1094         (regionInfo.isMetaRegion() ||
1095         !htd.isDeferredLogFlush())) {
1096       // sync txn to file system
1097       this.sync(txid);
1098     }
1099     return txid;
1100   }
1101 
1102   /**
1103    * Only used in tests.
1104    *
1105    * @param info
1106    * @param tableName
1107    * @param edits
1108    * @param now
1109    * @param htd
1110    * @throws IOException
1111    */
1112   public void append(HRegionInfo info, byte [] tableName, WALEdit edits,
1113     final long now, HTableDescriptor htd)
1114   throws IOException {
1115     append(info, tableName, edits, HConstants.DEFAULT_CLUSTER_ID, now, htd);
1116   }
1117 
1118   /**
1119    * Append a set of edits to the log. Log edits are keyed by (encoded)
1120    * regionName, rowname, and log-sequence-id.
1121    *
1122    * Later, if we sort by these keys, we obtain all the relevant edits for a
1123    * given key-range of the HRegion (TODO). Any edits that do not have a
1124    * matching COMPLETE_CACHEFLUSH message can be discarded.
1125    *
1126    * <p>
1127    * Logs cannot be restarted once closed, or once the HLog process dies. Each
1128    * time the HLog starts, it must create a new log. This means that other
1129    * systems should process the log appropriately upon each startup (and prior
1130    * to initializing HLog).
1131    *
1132    * synchronized prevents appends during the completion of a cache flush or for
1133    * the duration of a log roll.
1134    *
1135    * @param info
1136    * @param tableName
1137    * @param edits
1138    * @param clusterId The originating clusterId for this edit (for replication)
1139    * @param now
1140    * @param doSync shall we sync?
1141    * @return txid of this transaction
1142    * @throws IOException
1143    */
1144   private long append(HRegionInfo info, byte [] tableName, WALEdit edits, UUID clusterId,
1145       final long now, HTableDescriptor htd, boolean doSync)
1146     throws IOException {
1147       if (edits.isEmpty()) return this.unflushedEntries.get();;
1148       if (this.closed) {
1149         throw new IOException("Cannot append; log is closed");
1150       }
1151       long txid = 0;
1152       synchronized (this.updateLock) {
1153         long seqNum = obtainSeqNum();
1154         // The 'lastSeqWritten' map holds the sequence number of the oldest
1155         // write for each region (i.e. the first edit added to the particular
1156         // memstore). . When the cache is flushed, the entry for the
1157         // region being flushed is removed if the sequence number of the flush
1158         // is greater than or equal to the value in lastSeqWritten.
1159         // Use encoded name.  Its shorter, guaranteed unique and a subset of
1160         // actual  name.
1161         byte [] encodedRegionName = info.getEncodedNameAsBytes();
1162         this.lastSeqWritten.putIfAbsent(encodedRegionName, seqNum);
1163         HLogKey logKey = makeKey(encodedRegionName, tableName, seqNum, now, clusterId);
1164         doWrite(info, logKey, edits, htd);
1165         this.numEntries.incrementAndGet();
1166         txid = this.unflushedEntries.incrementAndGet();
1167         if (htd.isDeferredLogFlush()) {
1168           lastDeferredTxid = txid;
1169         }
1170       }
1171       // Sync if catalog region, and if not then check if that table supports
1172       // deferred log flushing
1173       if (doSync && 
1174           (info.isMetaRegion() ||
1175           !htd.isDeferredLogFlush())) {
1176         // sync txn to file system
1177         this.sync(txid);
1178       }
1179       return txid;
1180     }
1181 
1182   /**
1183    * Append a set of edits to the log. Log edits are keyed by (encoded)
1184    * regionName, rowname, and log-sequence-id. The HLog is not flushed
1185    * after this transaction is written to the log.
1186    *
1187    * @param info
1188    * @param tableName
1189    * @param edits
1190    * @param clusterId The originating clusterId for this edit (for replication)
1191    * @param now
1192    * @return txid of this transaction
1193    * @throws IOException
1194    */
1195   public long appendNoSync(HRegionInfo info, byte [] tableName, WALEdit edits, 
1196     UUID clusterId, final long now, HTableDescriptor htd)
1197     throws IOException {
1198     return append(info, tableName, edits, clusterId, now, htd, false);
1199   }
1200 
1201   /**
1202    * Append a set of edits to the log. Log edits are keyed by (encoded)
1203    * regionName, rowname, and log-sequence-id. The HLog is flushed
1204    * after this transaction is written to the log.
1205    *
1206    * @param info
1207    * @param tableName
1208    * @param edits
1209    * @param clusterId The originating clusterId for this edit (for replication)
1210    * @param now
1211    * @return txid of this transaction
1212    * @throws IOException
1213    */
1214   public long append(HRegionInfo info, byte [] tableName, WALEdit edits, 
1215     UUID clusterId, final long now, HTableDescriptor htd)
1216     throws IOException {
1217     return append(info, tableName, edits, clusterId, now, htd, true);
1218   }
1219 
1220   /**
1221    * This class is responsible to hold the HLog's appended Entry list
1222    * and to sync them according to a configurable interval.
1223    *
1224    * Deferred log flushing works first by piggy backing on this process by
1225    * simply not sync'ing the appended Entry. It can also be sync'd by other
1226    * non-deferred log flushed entries outside of this thread.
1227    */
1228   class LogSyncer extends HasThread {
1229 
1230     private final long optionalFlushInterval;
1231 
1232     private AtomicBoolean closeLogSyncer = new AtomicBoolean(false);
1233 
1234     // List of pending writes to the HLog. There corresponds to transactions
1235     // that have not yet returned to the client. We keep them cached here
1236     // instead of writing them to HDFS piecemeal, because the HDFS write 
1237     // method is pretty heavyweight as far as locking is concerned. The 
1238     // goal is to increase the batchsize for writing-to-hdfs as well as
1239     // sync-to-hdfs, so that we can get better system throughput.
1240     private List<Entry> pendingWrites = new LinkedList<Entry>();
1241 
1242     LogSyncer(long optionalFlushInterval) {
1243       this.optionalFlushInterval = optionalFlushInterval;
1244     }
1245 
1246     @Override
1247     public void run() {
1248       try {
1249         // awaiting with a timeout doesn't always
1250         // throw exceptions on interrupt
1251         while(!this.isInterrupted() && !closeLogSyncer.get()) {
1252 
1253           try {
1254             if (unflushedEntries.get() <= syncedTillHere) {
1255               synchronized (closeLogSyncer) {
1256                 closeLogSyncer.wait(this.optionalFlushInterval);
1257               }
1258             }
1259             // Calling sync since we waited or had unflushed entries.
1260             // Entries appended but not sync'd are taken care of here AKA
1261             // deferred log flush
1262             sync();
1263           } catch (IOException e) {
1264             LOG.error("Error while syncing, requesting close of hlog ", e);
1265             requestLogRoll();
1266           }
1267         }
1268       } catch (InterruptedException e) {
1269         LOG.debug(getName() + " interrupted while waiting for sync requests");
1270       } finally {
1271         LOG.info(getName() + " exiting");
1272       }
1273     }
1274 
1275     // appends new writes to the pendingWrites. It is better to keep it in
1276     // our own queue rather than writing it to the HDFS output stream because
1277     // HDFSOutputStream.writeChunk is not lightweight at all.
1278     synchronized void append(Entry e) throws IOException {
1279       pendingWrites.add(e);
1280     }
1281 
1282     // Returns all currently pending writes. New writes
1283     // will accumulate in a new list.
1284     synchronized List<Entry> getPendingWrites() {
1285       List<Entry> save = this.pendingWrites;
1286       this.pendingWrites = new LinkedList<Entry>();
1287       return save;
1288     }
1289 
1290     // writes out pending entries to the HLog
1291     void hlogFlush(Writer writer, List<Entry> pending) throws IOException {
1292       if (pending == null) return;
1293 
1294       // write out all accumulated Entries to hdfs.
1295       for (Entry e : pending) {
1296         writer.append(e);
1297       }
1298     }
1299 
1300     void close() {
1301       synchronized (closeLogSyncer) {
1302         closeLogSyncer.set(true);
1303         closeLogSyncer.notifyAll();
1304       }
1305     }
1306   }
1307 
1308   // sync all known transactions
1309   private void syncer() throws IOException {
1310     syncer(this.unflushedEntries.get()); // sync all pending items
1311   }
1312 
1313   // sync all transactions upto the specified txid
1314   private void syncer(long txid) throws IOException {
1315     // if the transaction that we are interested in is already
1316     // synced, then return immediately.
1317     if (txid <= this.syncedTillHere) {
1318       return;
1319     }
1320     Writer tempWriter;
1321     synchronized (this.updateLock) {
1322       if (this.closed) return;
1323       tempWriter = this.writer; // guaranteed non-null
1324     }
1325     try {
1326       long doneUpto;
1327       long now = System.currentTimeMillis();
1328       // First flush all the pending writes to HDFS. Then 
1329       // issue the sync to HDFS. If sync is successful, then update
1330       // syncedTillHere to indicate that transactions till this
1331       // number has been successfully synced.
1332       IOException ioe = null;
1333       List<Entry> pending = null;
1334       synchronized (flushLock) {
1335         if (txid <= this.syncedTillHere) {
1336           return;
1337         }
1338         doneUpto = this.unflushedEntries.get();
1339         pending = logSyncer.getPendingWrites();
1340         try {
1341           logSyncer.hlogFlush(tempWriter, pending);
1342         } catch(IOException io) {
1343           ioe = io;
1344           LOG.error("syncer encountered error, will retry. txid=" + txid, ioe);
1345         }
1346       }
1347       if (ioe != null && pending != null) {
1348         synchronized (this.updateLock) {
1349           synchronized (flushLock) {
1350             // HBASE-4387, HBASE-5623, retry with updateLock held
1351             tempWriter = this.writer;
1352             logSyncer.hlogFlush(tempWriter, pending);
1353           }
1354         }
1355       }
1356       // another thread might have sync'ed avoid double-sync'ing
1357       if (txid <= this.syncedTillHere) {
1358         return;
1359       }
1360       try {
1361         tempWriter.sync();
1362       } catch (IOException io) {
1363         synchronized (this.updateLock) {
1364           // HBASE-4387, HBASE-5623, retry with updateLock held
1365           tempWriter = this.writer;
1366           tempWriter.sync();
1367         }
1368       }
1369       this.syncedTillHere = Math.max(this.syncedTillHere, doneUpto);
1370 
1371       syncTime.inc(System.currentTimeMillis() - now);
1372       if (!this.logRollRunning) {
1373         checkLowReplication();
1374         try {
1375           curLogSize = tempWriter.getLength();
1376           if (curLogSize > this.logrollsize) {
1377             requestLogRoll();
1378           }
1379         } catch (IOException x) {
1380           LOG.debug("Log roll failed and will be retried. (This is not an error)");
1381         }
1382       }
1383     } catch (IOException e) {
1384       LOG.fatal("Could not sync. Requesting close of hlog", e);
1385       requestLogRoll();
1386       throw e;
1387     }
1388   }
1389 
1390   private void checkLowReplication() {
1391     // if the number of replicas in HDFS has fallen below the configured
1392     // value, then roll logs.
1393     try {
1394       int numCurrentReplicas = getLogReplication();
1395       if (numCurrentReplicas != 0
1396           && numCurrentReplicas < this.minTolerableReplication) {
1397         if (this.lowReplicationRollEnabled) {
1398           if (this.consecutiveLogRolls < this.lowReplicationRollLimit) {
1399             LOG.warn("HDFS pipeline error detected. " + "Found "
1400                 + numCurrentReplicas + " replicas but expecting no less than "
1401                 + this.minTolerableReplication + " replicas. "
1402                 + " Requesting close of hlog.");
1403             requestLogRoll();
1404             // If rollWriter is requested, increase consecutiveLogRolls. Once it
1405             // is larger than lowReplicationRollLimit, disable the
1406             // LowReplication-Roller
1407             this.consecutiveLogRolls++;
1408           } else {
1409             LOG.warn("Too many consecutive RollWriter requests, it's a sign of "
1410                 + "the total number of live datanodes is lower than the tolerable replicas.");
1411             this.consecutiveLogRolls = 0;
1412             this.lowReplicationRollEnabled = false;
1413           }
1414         }
1415       } else if (numCurrentReplicas >= this.minTolerableReplication) {
1416 
1417         if (!this.lowReplicationRollEnabled) {
1418           // The new writer's log replicas is always the default value.
1419           // So we should not enable LowReplication-Roller. If numEntries
1420           // is lower than or equals 1, we consider it as a new writer.
1421           if (this.numEntries.get() <= 1) {
1422             return;
1423           }
1424           // Once the live datanode number and the replicas return to normal,
1425           // enable the LowReplication-Roller.
1426           this.lowReplicationRollEnabled = true;
1427           LOG.info("LowReplication-Roller was enabled.");
1428         }
1429       }
1430     } catch (Exception e) {
1431       LOG.warn("Unable to invoke DFSOutputStream.getNumCurrentReplicas" + e +
1432           " still proceeding ahead...");
1433     }
1434   }
1435 
1436   /**
1437    * This method gets the datanode replication count for the current HLog.
1438    *
1439    * If the pipeline isn't started yet or is empty, you will get the default
1440    * replication factor.  Therefore, if this function returns 0, it means you
1441    * are not properly running with the HDFS-826 patch.
1442    * @throws InvocationTargetException
1443    * @throws IllegalAccessException
1444    * @throws IllegalArgumentException
1445    *
1446    * @throws Exception
1447    */
1448   int getLogReplication()
1449   throws IllegalArgumentException, IllegalAccessException, InvocationTargetException {
1450     if (this.getNumCurrentReplicas != null && this.hdfs_out != null) {
1451       Object repl = this.getNumCurrentReplicas.invoke(getOutputStream(), NO_ARGS);
1452       if (repl instanceof Integer) {
1453         return ((Integer)repl).intValue();
1454       }
1455     }
1456     return 0;
1457   }
1458 
1459   boolean canGetCurReplicas() {
1460     return this.getNumCurrentReplicas != null;
1461   }
1462 
1463   public void hsync() throws IOException {
1464     syncer();
1465   }
1466 
1467   public void hflush() throws IOException {
1468     syncer();
1469   }
1470 
1471   public void sync() throws IOException {
1472     syncer();
1473   }
1474 
1475   public void sync(long txid) throws IOException {
1476     syncer(txid);
1477   }
1478 
1479   private void requestLogRoll() {
1480     if (!this.listeners.isEmpty()) {
1481       for (WALActionsListener i: this.listeners) {
1482         i.logRollRequested();
1483       }
1484     }
1485   }
1486 
1487   protected void doWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit,
1488                            HTableDescriptor htd)
1489   throws IOException {
1490     if (!this.enabled) {
1491       return;
1492     }
1493     if (!this.listeners.isEmpty()) {
1494       for (WALActionsListener i: this.listeners) {
1495         i.visitLogEntryBeforeWrite(htd, logKey, logEdit);
1496       }
1497     }
1498     try {
1499       long now = System.currentTimeMillis();
1500       // coprocessor hook:
1501       if (!coprocessorHost.preWALWrite(info, logKey, logEdit)) {
1502         // write to our buffer for the Hlog file.
1503         logSyncer.append(new HLog.Entry(logKey, logEdit));
1504       }
1505       long took = System.currentTimeMillis() - now;
1506       coprocessorHost.postWALWrite(info, logKey, logEdit);
1507       writeTime.inc(took);
1508       long len = 0;
1509       for (KeyValue kv : logEdit.getKeyValues()) {
1510         len += kv.getLength();
1511       }
1512       writeSize.inc(len);
1513       if (took > 1000) {
1514         LOG.warn(String.format(
1515           "%s took %d ms appending an edit to hlog; editcount=%d, len~=%s",
1516           Thread.currentThread().getName(), took, this.numEntries.get(),
1517           StringUtils.humanReadableInt(len)));
1518         slowHLogAppendCount.incrementAndGet();
1519         slowHLogAppendTime.inc(took);
1520       }
1521     } catch (IOException e) {
1522       LOG.fatal("Could not append. Requesting close of hlog", e);
1523       requestLogRoll();
1524       throw e;
1525     }
1526   }
1527 
1528 
1529   /** @return How many items have been added to the log */
1530   int getNumEntries() {
1531     return numEntries.get();
1532   }
1533 
1534   /**
1535    * Obtain a log sequence number.
1536    */
1537   public long obtainSeqNum() {
1538     return this.logSeqNum.incrementAndGet();
1539   }
1540 
1541   /** @return the number of log files in use, including current one */
1542   public int getNumLogFiles() {
1543     return outputfiles.size() + 1;
1544   }
1545   
1546   /** @return the total size of log files in use, including current one */
1547   public long getNumLogFileSize() {
1548     return totalLogSize.get() + curLogSize;
1549   }
1550 
1551   private byte[] getSnapshotName(byte[] encodedRegionName) {
1552     byte snp[] = new byte[encodedRegionName.length + 3];
1553     // an encoded region name has only hex digits. s, n or p are not hex
1554     // and therefore snapshot-names will never collide with
1555     // encoded-region-names
1556     snp[0] = 's'; snp[1] = 'n'; snp[2] = 'p';
1557     for (int i = 0; i < encodedRegionName.length; i++) {
1558       snp[i+3] = encodedRegionName[i];
1559     }
1560     return snp;
1561   }
1562 
1563   /**
1564    * By acquiring a log sequence ID, we can allow log messages to continue while
1565    * we flush the cache.
1566    *
1567    * Acquire a lock so that we do not roll the log between the start and
1568    * completion of a cache-flush. Otherwise the log-seq-id for the flush will
1569    * not appear in the correct logfile.
1570    *
1571    * Ensuring that flushes and log-rolls don't happen concurrently also allows
1572    * us to temporarily put a log-seq-number in lastSeqWritten against the region
1573    * being flushed that might not be the earliest in-memory log-seq-number for
1574    * that region. By the time the flush is completed or aborted and before the
1575    * cacheFlushLock is released it is ensured that lastSeqWritten again has the
1576    * oldest in-memory edit's lsn for the region that was being flushed.
1577    *
1578    * In this method, by removing the entry in lastSeqWritten for the region
1579    * being flushed we ensure that the next edit inserted in this region will be
1580    * correctly recorded in {@link #append(HRegionInfo, byte[], WALEdit, long, HTableDescriptor)} The
1581    * lsn of the earliest in-memory lsn - which is now in the memstore snapshot -
1582    * is saved temporarily in the lastSeqWritten map while the flush is active.
1583    *
1584    * @return sequence ID to pass
1585    *         {@link #completeCacheFlush(byte[], byte[], long, boolean)} (byte[],
1586    *         byte[], long)}
1587    * @see #completeCacheFlush(byte[], byte[], long, boolean)
1588    * @see #abortCacheFlush(byte[])
1589    */
1590   public long startCacheFlush(final byte[] encodedRegionName) {
1591     this.cacheFlushLock.lock();
1592     Long seq = this.lastSeqWritten.remove(encodedRegionName);
1593     // seq is the lsn of the oldest edit associated with this region. If a
1594     // snapshot already exists - because the last flush failed - then seq will
1595     // be the lsn of the oldest edit in the snapshot
1596     if (seq != null) {
1597       // keeping the earliest sequence number of the snapshot in
1598       // lastSeqWritten maintains the correctness of
1599       // getOldestOutstandingSeqNum(). But it doesn't matter really because
1600       // everything is being done inside of cacheFlush lock.
1601       Long oldseq =
1602         lastSeqWritten.put(getSnapshotName(encodedRegionName), seq);
1603       if (oldseq != null) {
1604         LOG.error("Logic Error Snapshot seq id from earlier flush still" +
1605             " present! for region " + Bytes.toString(encodedRegionName) +
1606             " overwritten oldseq=" + oldseq + "with new seq=" + seq);
1607         Runtime.getRuntime().halt(1);
1608       }
1609     }
1610     return obtainSeqNum();
1611   }
1612 
1613 
1614   /**
1615    * Complete the cache flush
1616    *
1617    * Protected by cacheFlushLock
1618    *
1619    * @param encodedRegionName
1620    * @param tableName
1621    * @param logSeqId
1622    * @throws IOException
1623    */
1624   public void completeCacheFlush(final byte [] encodedRegionName,
1625       final byte [] tableName, final long logSeqId, final boolean isMetaRegion)
1626   throws IOException {
1627     try {
1628       if (this.closed) {
1629         return;
1630       }
1631       long txid = 0;
1632       synchronized (updateLock) {
1633         long now = System.currentTimeMillis();
1634         WALEdit edit = completeCacheFlushLogEdit();
1635         HLogKey key = makeKey(encodedRegionName, tableName, logSeqId,
1636             System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID);
1637         logSyncer.append(new Entry(key, edit));
1638         txid = this.unflushedEntries.incrementAndGet();
1639         writeTime.inc(System.currentTimeMillis() - now);
1640         long len = 0;
1641         for (KeyValue kv : edit.getKeyValues()) {
1642           len += kv.getLength();
1643         }
1644         writeSize.inc(len);
1645         this.numEntries.incrementAndGet();
1646       }
1647       // sync txn to file system
1648       this.sync(txid);
1649 
1650     } finally {
1651       // updateLock not needed for removing snapshot's entry
1652       // Cleaning up of lastSeqWritten is in the finally clause because we
1653       // don't want to confuse getOldestOutstandingSeqNum()
1654       this.lastSeqWritten.remove(getSnapshotName(encodedRegionName));
1655       this.cacheFlushLock.unlock();
1656     }
1657   }
1658 
1659   private WALEdit completeCacheFlushLogEdit() {
1660     KeyValue kv = new KeyValue(METAROW, METAFAMILY, null,
1661       System.currentTimeMillis(), COMPLETE_CACHE_FLUSH);
1662     WALEdit e = new WALEdit();
1663     e.add(kv);
1664     return e;
1665   }
1666 
1667   /**
1668    * Abort a cache flush.
1669    * Call if the flush fails. Note that the only recovery for an aborted flush
1670    * currently is a restart of the regionserver so the snapshot content dropped
1671    * by the failure gets restored to the memstore.
1672    */
1673   public void abortCacheFlush(byte[] encodedRegionName) {
1674     Long snapshot_seq =
1675       this.lastSeqWritten.remove(getSnapshotName(encodedRegionName));
1676     if (snapshot_seq != null) {
1677       // updateLock not necessary because we are racing against
1678       // lastSeqWritten.putIfAbsent() in append() and we will always win
1679       // before releasing cacheFlushLock make sure that the region's entry in
1680       // lastSeqWritten points to the earliest edit in the region
1681       Long current_memstore_earliest_seq =
1682         this.lastSeqWritten.put(encodedRegionName, snapshot_seq);
1683       if (current_memstore_earliest_seq != null &&
1684           (current_memstore_earliest_seq.longValue() <=
1685             snapshot_seq.longValue())) {
1686         LOG.error("Logic Error region " + Bytes.toString(encodedRegionName) +
1687             "acquired edits out of order current memstore seq=" +
1688             current_memstore_earliest_seq + " snapshot seq=" + snapshot_seq);
1689         Runtime.getRuntime().halt(1);
1690       }
1691     }
1692     this.cacheFlushLock.unlock();
1693   }
1694 
1695   /**
1696    * @param family
1697    * @return true if the column is a meta column
1698    */
1699   public static boolean isMetaFamily(byte [] family) {
1700     return Bytes.equals(METAFAMILY, family);
1701   }
1702 
1703   /**
1704    * Get LowReplication-Roller status
1705    * 
1706    * @return lowReplicationRollEnabled
1707    */
1708   public boolean isLowReplicationRollEnabled() {
1709     return lowReplicationRollEnabled;
1710   }
1711 
1712   @SuppressWarnings("unchecked")
1713   public static Class<? extends HLogKey> getKeyClass(Configuration conf) {
1714      return (Class<? extends HLogKey>)
1715        conf.getClass("hbase.regionserver.hlog.keyclass", HLogKey.class);
1716   }
1717 
1718   public static HLogKey newKey(Configuration conf) throws IOException {
1719     Class<? extends HLogKey> keyClass = getKeyClass(conf);
1720     try {
1721       return keyClass.newInstance();
1722     } catch (InstantiationException e) {
1723       throw new IOException("cannot create hlog key");
1724     } catch (IllegalAccessException e) {
1725       throw new IOException("cannot create hlog key");
1726     }
1727   }
1728 
1729   /**
1730    * Utility class that lets us keep track of the edit with it's key
1731    * Only used when splitting logs
1732    */
1733   public static class Entry implements Writable {
1734     private WALEdit edit;
1735     private HLogKey key;
1736 
1737     public Entry() {
1738       edit = new WALEdit();
1739       key = new HLogKey();
1740     }
1741 
1742     /**
1743      * Constructor for both params
1744      * @param edit log's edit
1745      * @param key log's key
1746      */
1747     public Entry(HLogKey key, WALEdit edit) {
1748       super();
1749       this.key = key;
1750       this.edit = edit;
1751     }
1752     /**
1753      * Gets the edit
1754      * @return edit
1755      */
1756     public WALEdit getEdit() {
1757       return edit;
1758     }
1759     /**
1760      * Gets the key
1761      * @return key
1762      */
1763     public HLogKey getKey() {
1764       return key;
1765     }
1766 
1767     @Override
1768     public String toString() {
1769       return this.key + "=" + this.edit;
1770     }
1771 
1772     @Override
1773     public void write(DataOutput dataOutput) throws IOException {
1774       this.key.write(dataOutput);
1775       this.edit.write(dataOutput);
1776     }
1777 
1778     @Override
1779     public void readFields(DataInput dataInput) throws IOException {
1780       this.key.readFields(dataInput);
1781       this.edit.readFields(dataInput);
1782     }
1783   }
1784 
1785   /**
1786    * Construct the HLog directory name
1787    *
1788    * @param serverName Server name formatted as described in {@link ServerName}
1789    * @return the HLog directory name
1790    */
1791   public static String getHLogDirectoryName(final String serverName) {
1792     StringBuilder dirName = new StringBuilder(HConstants.HREGION_LOGDIR_NAME);
1793     dirName.append("/");
1794     dirName.append(serverName);
1795     return dirName.toString();
1796   }
1797 
1798   /**
1799    * Get the directory we are making logs in.
1800    * 
1801    * @return dir
1802    */
1803   protected Path getDir() {
1804     return dir;
1805   }
1806   
1807   /**
1808    * @param filename name of the file to validate
1809    * @return <tt>true</tt> if the filename matches an HLog, <tt>false</tt>
1810    *         otherwise
1811    */
1812   public static boolean validateHLogFilename(String filename) {
1813     return pattern.matcher(filename).matches();
1814   }
1815 
1816   static Path getHLogArchivePath(Path oldLogDir, Path p) {
1817     return new Path(oldLogDir, p.getName());
1818   }
1819 
1820   static String formatRecoveredEditsFileName(final long seqid) {
1821     return String.format("%019d", seqid);
1822   }
1823 
1824   /**
1825    * Returns sorted set of edit files made by wal-log splitter, excluding files
1826    * with '.temp' suffix.
1827    * @param fs
1828    * @param regiondir
1829    * @return Files in passed <code>regiondir</code> as a sorted set.
1830    * @throws IOException
1831    */
1832   public static NavigableSet<Path> getSplitEditFilesSorted(final FileSystem fs,
1833       final Path regiondir)
1834   throws IOException {
1835     NavigableSet<Path> filesSorted = new TreeSet<Path>();
1836     Path editsdir = getRegionDirRecoveredEditsDir(regiondir);
1837     if (!fs.exists(editsdir)) return filesSorted;
1838     FileStatus[] files = FSUtils.listStatus(fs, editsdir, new PathFilter() {
1839       @Override
1840       public boolean accept(Path p) {
1841         boolean result = false;
1842         try {
1843           // Return files and only files that match the editfile names pattern.
1844           // There can be other files in this directory other than edit files.
1845           // In particular, on error, we'll move aside the bad edit file giving
1846           // it a timestamp suffix.  See moveAsideBadEditsFile.
1847           Matcher m = EDITFILES_NAME_PATTERN.matcher(p.getName());
1848           result = fs.isFile(p) && m.matches();
1849           // Skip the file whose name ends with RECOVERED_LOG_TMPFILE_SUFFIX,
1850           // because it means splithlog thread is writting this file.
1851           if (p.getName().endsWith(RECOVERED_LOG_TMPFILE_SUFFIX)) {
1852             result = false;
1853           }
1854         } catch (IOException e) {
1855           LOG.warn("Failed isFile check on " + p);
1856         }
1857         return result;
1858       }
1859     });
1860     if (files == null) return filesSorted;
1861     for (FileStatus status: files) {
1862       filesSorted.add(status.getPath());
1863     }
1864     return filesSorted;
1865   }
1866 
1867   /**
1868    * Move aside a bad edits file.
1869    * @param fs
1870    * @param edits Edits file to move aside.
1871    * @return The name of the moved aside file.
1872    * @throws IOException
1873    */
1874   public static Path moveAsideBadEditsFile(final FileSystem fs,
1875       final Path edits)
1876   throws IOException {
1877     Path moveAsideName = new Path(edits.getParent(), edits.getName() + "." +
1878       System.currentTimeMillis());
1879     if (!HBaseFileSystem.renameDirForFileSystem(fs, edits, moveAsideName)) {
1880       LOG.warn("Rename failed from " + edits + " to " + moveAsideName);
1881     }
1882     return moveAsideName;
1883   }
1884 
1885   /**
1886    * @param regiondir This regions directory in the filesystem.
1887    * @return The directory that holds recovered edits files for the region
1888    * <code>regiondir</code>
1889    */
1890   public static Path getRegionDirRecoveredEditsDir(final Path regiondir) {
1891     return new Path(regiondir, RECOVERED_EDITS_DIR);
1892   }
1893 
1894   public static final long FIXED_OVERHEAD = ClassSize.align(
1895     ClassSize.OBJECT + (5 * ClassSize.REFERENCE) +
1896     ClassSize.ATOMIC_INTEGER + Bytes.SIZEOF_INT + (3 * Bytes.SIZEOF_LONG));
1897 
1898   private static void usage() {
1899     System.err.println("Usage: HLog <ARGS>");
1900     System.err.println("Arguments:");
1901     System.err.println(" --dump  Dump textual representation of passed one or more files");
1902     System.err.println("         For example: HLog --dump hdfs://example.com:9000/hbase/.logs/MACHINE/LOGFILE");
1903     System.err.println(" --split Split the passed directory of WAL logs");
1904     System.err.println("         For example: HLog --split hdfs://example.com:9000/hbase/.logs/DIR");
1905   }
1906 
1907   private static void split(final Configuration conf, final Path p)
1908   throws IOException {
1909     FileSystem fs = FileSystem.get(conf);
1910     if (!fs.exists(p)) {
1911       throw new FileNotFoundException(p.toString());
1912     }
1913     final Path baseDir = new Path(conf.get(HConstants.HBASE_DIR));
1914     final Path oldLogDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME);
1915     if (!fs.getFileStatus(p).isDir()) {
1916       throw new IOException(p + " is not a directory");
1917     }
1918 
1919     HLogSplitter logSplitter = HLogSplitter.createLogSplitter(
1920         conf, baseDir, p, oldLogDir, fs);
1921     logSplitter.splitLog();
1922   }
1923 
1924   /**
1925    * @return Coprocessor host.
1926    */
1927   public WALCoprocessorHost getCoprocessorHost() {
1928     return coprocessorHost;
1929   }
1930 
1931   /** Provide access to currently deferred sequence num for tests */
1932   boolean hasDeferredEntries() {
1933     return lastDeferredTxid > syncedTillHere;
1934   }
1935 
1936   /**
1937    * Pass one or more log file names and it will either dump out a text version
1938    * on <code>stdout</code> or split the specified log files.
1939    *
1940    * @param args
1941    * @throws IOException
1942    */
1943   public static void main(String[] args) throws IOException {
1944     if (args.length < 2) {
1945       usage();
1946       System.exit(-1);
1947     }
1948     // either dump using the HLogPrettyPrinter or split, depending on args
1949     if (args[0].compareTo("--dump") == 0) {
1950       HLogPrettyPrinter.run(Arrays.copyOfRange(args, 1, args.length));
1951     } else if (args[0].compareTo("--split") == 0) {
1952       Configuration conf = HBaseConfiguration.create();
1953       for (int i = 1; i < args.length; i++) {
1954         try {
1955           conf.set("fs.default.name", args[i]);
1956           conf.set("fs.defaultFS", args[i]);
1957           Path logPath = new Path(args[i]);
1958           split(conf, logPath);
1959         } catch (Throwable t) {
1960           t.printStackTrace(System.err);
1961           System.exit(-1);
1962         }
1963       }
1964     } else {
1965       usage();
1966       System.exit(-1);
1967     }
1968   }
1969 }