1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver.wal;
20  
21  import java.io.FileNotFoundException;
22  import java.io.IOException;
23  import java.io.OutputStream;
24  import java.lang.reflect.InvocationTargetException;
25  import java.lang.reflect.Method;
26  import java.net.URLEncoder;
27  import java.util.ArrayList;
28  import java.util.Arrays;
29  import java.util.Collections;
30  import java.util.LinkedList;
31  import java.util.List;
32  import java.util.Map;
33  import java.util.SortedMap;
34  import java.util.TreeMap;
35  import java.util.TreeSet;
36  import java.util.UUID;
37  import java.util.concurrent.ConcurrentSkipListMap;
38  import java.util.concurrent.CopyOnWriteArrayList;
39  import java.util.concurrent.atomic.AtomicBoolean;
40  import java.util.concurrent.atomic.AtomicInteger;
41  import java.util.concurrent.atomic.AtomicLong;
42  
43  import org.apache.commons.logging.Log;
44  import org.apache.commons.logging.LogFactory;
45  import org.apache.hadoop.classification.InterfaceAudience;
46  import org.apache.hadoop.conf.Configuration;
47  import org.apache.hadoop.fs.FSDataOutputStream;
48  import org.apache.hadoop.fs.FileStatus;
49  import org.apache.hadoop.fs.FileSystem;
50  import org.apache.hadoop.fs.Path;
51  import org.apache.hadoop.fs.Syncable;
52  import org.apache.hadoop.hbase.HBaseConfiguration;
53  import org.apache.hadoop.hbase.HConstants;
54  import org.apache.hadoop.hbase.HRegionInfo;
55  import org.apache.hadoop.hbase.HTableDescriptor;
56  import org.apache.hadoop.hbase.KeyValue;
57  import org.apache.hadoop.hbase.exceptions.FailedLogCloseException;
58  import org.apache.hadoop.hbase.util.Bytes;
59  import org.apache.hadoop.hbase.util.ClassSize;
60  import org.apache.hadoop.hbase.util.DrainBarrier;
61  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
62  import org.apache.hadoop.hbase.util.FSUtils;
63  import org.apache.hadoop.hbase.util.HasThread;
64  import org.apache.hadoop.hbase.util.Threads;
65  import org.apache.hadoop.util.StringUtils;
66  
67  /**
68   * HLog stores all the edits to the HStore.  Its the hbase write-ahead-log
69   * implementation.
70   *
71   * It performs logfile-rolling, so external callers are not aware that the
72   * underlying file is being rolled.
73   *
74   * <p>
75   * There is one HLog per RegionServer.  All edits for all Regions carried by
76   * a particular RegionServer are entered first in the HLog.
77   *
78   * <p>
79   * Each HRegion is identified by a unique long <code>int</code>. HRegions do
80   * not need to declare themselves before using the HLog; they simply include
81   * their HRegion-id in the <code>append</code> or
82   * <code>completeCacheFlush</code> calls.
83   *
84   * <p>
85   * An HLog consists of multiple on-disk files, which have a chronological order.
86   * As data is flushed to other (better) on-disk structures, the log becomes
87   * obsolete. We can destroy all the log messages for a given HRegion-id up to
88   * the most-recent CACHEFLUSH message from that HRegion.
89   *
90   * <p>
91   * It's only practical to delete entire files. Thus, we delete an entire on-disk
92   * file F when all of the messages in F have a log-sequence-id that's older
93   * (smaller) than the most-recent CACHEFLUSH message for every HRegion that has
94   * a message in F.
95   *
96   * <p>
97   * Synchronized methods can never execute in parallel. However, between the
98   * start of a cache flush and the completion point, appends are allowed but log
99   * rolling is not. To prevent log rolling taking place during this period, a
100  * separate reentrant lock is used.
101  *
102  * <p>To read an HLog, call {@link HLogFactory#createReader(org.apache.hadoop.fs.FileSystem,
103  * org.apache.hadoop.fs.Path, org.apache.hadoop.conf.Configuration)}.
104  *
105  */
106 @InterfaceAudience.Private
107 class FSHLog implements HLog, Syncable {
108   static final Log LOG = LogFactory.getLog(FSHLog.class);
109 
110   private final FileSystem fs;
111   private final Path rootDir;
112   private final Path dir;
113   private final Configuration conf;
114   // Listeners that are called on WAL events.
115   private List<WALActionsListener> listeners =
116     new CopyOnWriteArrayList<WALActionsListener>();
117   private final long optionalFlushInterval;
118   private final long blocksize;
119   private final String prefix;
120   private final AtomicLong unflushedEntries = new AtomicLong(0);
121   private volatile long syncedTillHere = 0;
122   private long lastDeferredTxid;
123   private final Path oldLogDir;
124   private volatile boolean logRollRunning;
125 
126   private WALCoprocessorHost coprocessorHost;
127 
128   private FSDataOutputStream hdfs_out; // FSDataOutputStream associated with the current SequenceFile.writer
129   // Minimum tolerable replicas, if the actual value is lower than it,
130   // rollWriter will be triggered
131   private int minTolerableReplication;
132   private Method getNumCurrentReplicas; // refers to DFSOutputStream.getNumCurrentReplicas
133   final static Object [] NO_ARGS = new Object []{};
134 
135   /** The barrier used to ensure that close() waits for all log rolls and flushes to finish. */
136   private DrainBarrier closeBarrier = new DrainBarrier();
137 
138   /**
139    * Current log file.
140    */
141   Writer writer;
142 
143   /**
144    * Map of all log files but the current one.
145    */
146   final SortedMap<Long, Path> outputfiles =
147     Collections.synchronizedSortedMap(new TreeMap<Long, Path>());
148 
149 
150   /**
151    * This lock synchronizes all operations on oldestUnflushedSeqNums and oldestFlushingSeqNums,
152    * with the exception of append's putIfAbsent into oldestUnflushedSeqNums.
153    * We only use these to find out the low bound seqNum, or to find regions with old seqNums to
154    * force flush them, so we don't care about these numbers messing with anything. */
155   private final Object oldestSeqNumsLock = new Object();
156 
157   /**
158    * This lock makes sure only one log roll runs at the same time. Should not be taken while
159    * any other lock is held. We don't just use synchronized because that results in bogus and
160    * tedious findbugs warning when it thinks synchronized controls writer thread safety */
161   private final Object rollWriterLock = new Object();
162 
163   /**
164    * Map of encoded region names to their most recent sequence/edit id in their memstore.
165    */
166   private final ConcurrentSkipListMap<byte [], Long> oldestUnflushedSeqNums =
167     new ConcurrentSkipListMap<byte [], Long>(Bytes.BYTES_COMPARATOR);
168   /**
169    * Map of encoded region names to their most recent sequence/edit id in their memstore;
170    * contains the regions that are currently flushing. That way we can store two numbers for
171    * flushing and non-flushing (oldestUnflushedSeqNums) memstore for the same region.
172    */
173   private final Map<byte[], Long> oldestFlushingSeqNums =
174     new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
175 
176   private volatile boolean closed = false;
177 
178   private final AtomicLong logSeqNum = new AtomicLong(0);
179 
180   private boolean forMeta = false;
181 
182   // The timestamp (in ms) when the log file was created.
183   private volatile long filenum = -1;
184 
185   //number of transactions in the current Hlog.
186   private final AtomicInteger numEntries = new AtomicInteger(0);
187 
188   // If live datanode count is lower than the default replicas value,
189   // RollWriter will be triggered in each sync(So the RollWriter will be
190   // triggered one by one in a short time). Using it as a workaround to slow
191   // down the roll frequency triggered by checkLowReplication().
192   private AtomicInteger consecutiveLogRolls = new AtomicInteger(0);
193   private final int lowReplicationRollLimit;
194 
195   // If consecutiveLogRolls is larger than lowReplicationRollLimit,
196   // then disable the rolling in checkLowReplication().
197   // Enable it if the replications recover.
198   private volatile boolean lowReplicationRollEnabled = true;
199 
200   // If > than this size, roll the log. This is typically 0.95 times the size
201   // of the default Hdfs block size.
202   private final long logrollsize;
203 
204   // We synchronize on updateLock to prevent updates and to prevent a log roll
205   // during an update
206   // locked during appends
207   private final Object updateLock = new Object();
208   private final Object flushLock = new Object();
209 
210   private final boolean enabled;
211 
212   /*
213    * If more than this many logs, force flush of oldest region to oldest edit
214    * goes to disk.  If too many and we crash, then will take forever replaying.
215    * Keep the number of logs tidy.
216    */
217   private final int maxLogs;
218 
219   /**
220    * Thread that handles optional sync'ing
221    */
222   private final LogSyncer logSyncer;
223 
224   /** Number of log close errors tolerated before we abort */
225   private final int closeErrorsTolerated;
226 
227   private final AtomicInteger closeErrorCount = new AtomicInteger();
228   private final MetricsWAL metrics;
229 
230   /**
231    * Constructor.
232    *
233    * @param fs filesystem handle
234    * @param root path for stored and archived hlogs
235    * @param logDir dir where hlogs are stored
236    * @param conf configuration to use
237    * @throws IOException
238    */
239   public FSHLog(final FileSystem fs, final Path root, final String logDir,
240                 final Configuration conf)
241   throws IOException {
242     this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME,
243         conf, null, true, null, false);
244   }
245 
246   /**
247    * Constructor.
248    *
249    * @param fs filesystem handle
250    * @param root path for stored and archived hlogs
251    * @param logDir dir where hlogs are stored
252    * @param oldLogDir dir where hlogs are archived
253    * @param conf configuration to use
254    * @throws IOException
255    */
256   public FSHLog(final FileSystem fs, final Path root, final String logDir,
257                 final String oldLogDir, final Configuration conf)
258   throws IOException {
259     this(fs, root, logDir, oldLogDir,
260         conf, null, true, null, false);
261   }
262 
263   /**
264    * Create an edit log at the given <code>dir</code> location.
265    *
266    * You should never have to load an existing log. If there is a log at
267    * startup, it should have already been processed and deleted by the time the
268    * HLog object is started up.
269    *
270    * @param fs filesystem handle
271    * @param root path for stored and archived hlogs
272    * @param logDir dir where hlogs are stored
273    * @param conf configuration to use
274    * @param listeners Listeners on WAL events. Listeners passed here will
275    * be registered before we do anything else; e.g. the
276    * Constructor {@link #rollWriter()}.
277    * @param prefix should always be hostname and port in distributed env and
278    *        it will be URL encoded before being used.
279    *        If prefix is null, "hlog" will be used
280    * @throws IOException
281    */
282   public FSHLog(final FileSystem fs, final Path root, final String logDir,
283       final Configuration conf, final List<WALActionsListener> listeners,
284       final String prefix) throws IOException {
285     this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME,
286         conf, listeners, true, prefix, false);
287   }
288 
289   /**
290    * Create an edit log at the given <code>dir</code> location.
291    *
292    * You should never have to load an existing log. If there is a log at
293    * startup, it should have already been processed and deleted by the time the
294    * HLog object is started up.
295    *
296    * @param fs filesystem handle
297    * @param root path to where logs and oldlogs
298    * @param logDir dir where hlogs are stored
299    * @param oldLogDir dir where hlogs are archived
300    * @param conf configuration to use
301    * @param listeners Listeners on WAL events. Listeners passed here will
302    * be registered before we do anything else; e.g. the
303    * Constructor {@link #rollWriter()}.
304    * @param failIfLogDirExists If true IOException will be thrown if dir already exists.
305    * @param prefix should always be hostname and port in distributed env and
306    *        it will be URL encoded before being used.
307    *        If prefix is null, "hlog" will be used
308    * @param forMeta if this hlog is meant for meta updates
309    * @throws IOException
310    */
311   public FSHLog(final FileSystem fs, final Path root, final String logDir,
312       final String oldLogDir, final Configuration conf,
313       final List<WALActionsListener> listeners,
314       final boolean failIfLogDirExists, final String prefix, boolean forMeta)
315   throws IOException {
316     super();
317     this.fs = fs;
318     this.rootDir = root;
319     this.dir = new Path(this.rootDir, logDir);
320     this.oldLogDir = new Path(this.rootDir, oldLogDir);
321     this.forMeta = forMeta;
322     this.conf = conf;
323 
324     if (listeners != null) {
325       for (WALActionsListener i: listeners) {
326         registerWALActionsListener(i);
327       }
328     }
329 
330     this.blocksize = this.conf.getLong("hbase.regionserver.hlog.blocksize",
331         FSUtils.getDefaultBlockSize(this.fs, this.dir));
332     // Roll at 95% of block size.
333     float multi = conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f);
334     this.logrollsize = (long)(this.blocksize * multi);
335     this.optionalFlushInterval =
336       conf.getLong("hbase.regionserver.optionallogflushinterval", 1 * 1000);
337 
338     this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", 32);
339     this.minTolerableReplication = conf.getInt(
340         "hbase.regionserver.hlog.tolerable.lowreplication",
341         FSUtils.getDefaultReplication(fs, this.dir));
342     this.lowReplicationRollLimit = conf.getInt(
343         "hbase.regionserver.hlog.lowreplication.rolllimit", 5);
344     this.enabled = conf.getBoolean("hbase.regionserver.hlog.enabled", true);
345     this.closeErrorsTolerated = conf.getInt(
346         "hbase.regionserver.logroll.errors.tolerated", 0);
347 
348     this.logSyncer = new LogSyncer(this.optionalFlushInterval);
349 
350     LOG.info("WAL/HLog configuration: blocksize=" +
351       StringUtils.byteDesc(this.blocksize) +
352       ", rollsize=" + StringUtils.byteDesc(this.logrollsize) +
353       ", enabled=" + this.enabled +
354       ", optionallogflushinternal=" + this.optionalFlushInterval + "ms");
355     // If prefix is null||empty then just name it hlog
356     this.prefix = prefix == null || prefix.isEmpty() ?
357         "hlog" : URLEncoder.encode(prefix, "UTF8");
358 
359     boolean dirExists = false;
360     if (failIfLogDirExists && (dirExists = this.fs.exists(dir))) {
361       throw new IOException("Target HLog directory already exists: " + dir);
362     }
363     if (!dirExists && !fs.mkdirs(dir)) {
364       throw new IOException("Unable to mkdir " + dir);
365     }
366 
367     if (!fs.exists(this.oldLogDir)) {
368       if (!fs.mkdirs(this.oldLogDir)) {
369         throw new IOException("Unable to mkdir " + this.oldLogDir);
370       }
371     }
372     // rollWriter sets this.hdfs_out if it can.
373     rollWriter();
374 
375     // handle the reflection necessary to call getNumCurrentReplicas()
376     this.getNumCurrentReplicas = getGetNumCurrentReplicas(this.hdfs_out);
377 
378     // When optionalFlushInterval is set as 0, don't start a thread for deferred log sync.
379     if (this.optionalFlushInterval > 0) {
380       Threads.setDaemonThreadRunning(logSyncer.getThread(), Thread.currentThread().getName()
381           + ".logSyncer");
382     } else {
383       LOG.info("hbase.regionserver.optionallogflushinterval is set as "
384           + this.optionalFlushInterval + ". Deferred log syncing won't work. "
385           + "Any Mutation, marked to be deferred synced, will be flushed immediately.");
386     }
387     coprocessorHost = new WALCoprocessorHost(this, conf);
388 
389     this.metrics = new MetricsWAL();
390   }
391 
392   /**
393    * Find the 'getNumCurrentReplicas' on the passed <code>os</code> stream.
394    * @return Method or null.
395    */
396   private Method getGetNumCurrentReplicas(final FSDataOutputStream os) {
397     Method m = null;
398     if (os != null) {
399       Class<? extends OutputStream> wrappedStreamClass = os.getWrappedStream()
400           .getClass();
401       try {
402         m = wrappedStreamClass.getDeclaredMethod("getNumCurrentReplicas",
403             new Class<?>[] {});
404         m.setAccessible(true);
405       } catch (NoSuchMethodException e) {
406         LOG.info("FileSystem's output stream doesn't support"
407             + " getNumCurrentReplicas; --HDFS-826 not available; fsOut="
408             + wrappedStreamClass.getName());
409       } catch (SecurityException e) {
410         LOG.info("Doesn't have access to getNumCurrentReplicas on "
411             + "FileSystems's output stream --HDFS-826 not available; fsOut="
412             + wrappedStreamClass.getName(), e);
413         m = null; // could happen on setAccessible()
414       }
415     }
416     if (m != null) {
417       if (LOG.isTraceEnabled()) LOG.trace("Using getNumCurrentReplicas--HDFS-826");
418     }
419     return m;
420   }
421 
422   @Override
423   public void registerWALActionsListener(final WALActionsListener listener) {
424     this.listeners.add(listener);
425   }
426 
427   @Override
428   public boolean unregisterWALActionsListener(final WALActionsListener listener) {
429     return this.listeners.remove(listener);
430   }
431 
432   @Override
433   public long getFilenum() {
434     return this.filenum;
435   }
436 
437   @Override
438   public void setSequenceNumber(final long newvalue) {
439     for (long id = this.logSeqNum.get(); id < newvalue &&
440         !this.logSeqNum.compareAndSet(id, newvalue); id = this.logSeqNum.get()) {
441       // This could spin on occasion but better the occasional spin than locking
442       // every increment of sequence number.
443       LOG.debug("Changed sequenceid from " + id + " to " + newvalue);
444     }
445   }
446 
447   @Override
448   public long getSequenceNumber() {
449     return logSeqNum.get();
450   }
451 
452   /**
453    * Method used internal to this class and for tests only.
454    * @return The wrapped stream our writer is using; its not the
455    * writer's 'out' FSDatoOutputStream but the stream that this 'out' wraps
456    * (In hdfs its an instance of DFSDataOutputStream).
457    *
458    * usage: see TestLogRolling.java
459    */
460   OutputStream getOutputStream() {
461     return this.hdfs_out.getWrappedStream();
462   }
463 
464   @Override
465   public byte [][] rollWriter() throws FailedLogCloseException, IOException {
466     return rollWriter(false);
467   }
468 
469   @Override
470   public byte [][] rollWriter(boolean force)
471       throws FailedLogCloseException, IOException {
472     synchronized (rollWriterLock) {
473       // Return if nothing to flush.
474       if (!force && this.writer != null && this.numEntries.get() <= 0) {
475         return null;
476       }
477       byte [][] regionsToFlush = null;
478       if (closed) {
479         LOG.debug("HLog closed. Skipping rolling of writer");
480         return null;
481       }
482       try {
483         this.logRollRunning = true;
484         if (!closeBarrier.beginOp()) {
485           LOG.debug("HLog closing. Skipping rolling of writer");
486           return regionsToFlush;
487         }
488         // Do all the preparation outside of the updateLock to block
489         // as less as possible the incoming writes
490         long currentFilenum = this.filenum;
491         Path oldPath = null;
492         if (currentFilenum > 0) {
493           //computeFilename  will take care of meta hlog filename
494           oldPath = computeFilename(currentFilenum);
495         }
496         this.filenum = System.currentTimeMillis();
497         Path newPath = computeFilename();
498 
499         // Tell our listeners that a new log is about to be created
500         if (!this.listeners.isEmpty()) {
501           for (WALActionsListener i : this.listeners) {
502             i.preLogRoll(oldPath, newPath);
503           }
504         }
505         FSHLog.Writer nextWriter = this.createWriterInstance(fs, newPath, conf);
506         // Can we get at the dfsclient outputstream?
507         FSDataOutputStream nextHdfsOut = null;
508         if (nextWriter instanceof ProtobufLogWriter) {
509           nextHdfsOut = ((ProtobufLogWriter)nextWriter).getStream();
510         }
511 
512         Path oldFile = null;
513         int oldNumEntries = 0;
514         synchronized (updateLock) {
515           // Clean up current writer.
516           oldNumEntries = this.numEntries.get();
517           oldFile = cleanupCurrentWriter(currentFilenum);
518           this.writer = nextWriter;
519           this.hdfs_out = nextHdfsOut;
520           this.numEntries.set(0);
521         }
522         if (oldFile == null) LOG.info("New WAL " + FSUtils.getPath(newPath));
523         else LOG.info("Rolled WAL " + FSUtils.getPath(oldFile) + " with entries=" + oldNumEntries +
524           ", filesize=" + StringUtils.humanReadableInt(this.fs.getFileStatus(oldFile).getLen()) +
525           "; new WAL " + FSUtils.getPath(newPath));
526 
527         // Tell our listeners that a new log was created
528         if (!this.listeners.isEmpty()) {
529           for (WALActionsListener i : this.listeners) {
530             i.postLogRoll(oldPath, newPath);
531           }
532         }
533 
534         // Can we delete any of the old log files?
535         if (getNumLogFiles() > 0) {
536           cleanOldLogs();
537           regionsToFlush = getRegionsToForceFlush();
538         }
539       } finally {
540         this.logRollRunning = false;
541         closeBarrier.endOp();
542       }
543       return regionsToFlush;
544     }
545   }
546 
547   /**
548    * This method allows subclasses to inject different writers without having to
549    * extend other methods like rollWriter().
550    *
551    * @param fs
552    * @param path
553    * @param conf
554    * @return Writer instance
555    * @throws IOException
556    */
557   protected Writer createWriterInstance(final FileSystem fs, final Path path,
558       final Configuration conf) throws IOException {
559     if (forMeta) {
560       //TODO: set a higher replication for the hlog files (HBASE-6773)
561     }
562     return HLogFactory.createWriter(fs, path, conf);
563   }
564 
565   /*
566    * Clean up old commit logs.
567    * @return If lots of logs, flush the returned region so next time through
568    * we can clean logs. Returns null if nothing to flush.  Returns array of
569    * encoded region names to flush.
570    * @throws IOException
571    */
572   private void cleanOldLogs() throws IOException {
573     long oldestOutstandingSeqNum = Long.MAX_VALUE;
574     synchronized (oldestSeqNumsLock) {
575       Long oldestFlushing = (oldestFlushingSeqNums.size() > 0)
576         ? Collections.min(oldestFlushingSeqNums.values()) : Long.MAX_VALUE;
577       Long oldestUnflushed = (oldestUnflushedSeqNums.size() > 0)
578         ? Collections.min(oldestUnflushedSeqNums.values()) : Long.MAX_VALUE;
579       oldestOutstandingSeqNum = Math.min(oldestFlushing, oldestUnflushed);
580     }
581 
582     // Get the set of all log files whose last sequence number is smaller than
583     // the oldest edit's sequence number.
584     TreeSet<Long> sequenceNumbers = new TreeSet<Long>(this.outputfiles.headMap(
585         oldestOutstandingSeqNum).keySet());
586     // Now remove old log files (if any)
587     if (LOG.isDebugEnabled()) {
588       if (sequenceNumbers.size() > 0) {
589         LOG.debug("Found " + sequenceNumbers.size() + " hlogs to remove" +
590           " out of total " + this.outputfiles.size() + ";" +
591           " oldest outstanding sequenceid is " + oldestOutstandingSeqNum);
592       }
593     }
594     for (Long seq : sequenceNumbers) {
595       archiveLogFile(this.outputfiles.remove(seq), seq);
596     }
597   }
598 
599   /**
600    * Return regions that have edits that are equal or less than a certain sequence number.
601    * Static due to some old unit test.
602    * @param walSeqNum The sequence number to compare with.
603    * @param regionsToSeqNums Encoded region names to sequence ids
604    * @return All regions whose seqNum <= walSeqNum. Null if no regions found.
605    */
606   static byte[][] findMemstoresWithEditsEqualOrOlderThan(
607       final long walSeqNum, final Map<byte[], Long> regionsToSeqNums) {
608     List<byte[]> regions = null;
609     for (Map.Entry<byte[], Long> e : regionsToSeqNums.entrySet()) {
610       if (e.getValue().longValue() <= walSeqNum) {
611         if (regions == null) regions = new ArrayList<byte[]>();
612         regions.add(e.getKey());
613       }
614     }
615     return regions == null ? null : regions
616         .toArray(new byte[][] { HConstants.EMPTY_BYTE_ARRAY });
617   }
618 
619   private byte[][] getRegionsToForceFlush() throws IOException {
620     // If too many log files, figure which regions we need to flush.
621     // Array is an array of encoded region names.
622     byte [][] regions = null;
623     int logCount = getNumLogFiles();
624     if (logCount > this.maxLogs && logCount > 0) {
625       // This is an array of encoded region names.
626       synchronized (oldestSeqNumsLock) {
627         regions = findMemstoresWithEditsEqualOrOlderThan(this.outputfiles.firstKey(),
628           this.oldestUnflushedSeqNums);
629       }
630       if (regions != null) {
631         StringBuilder sb = new StringBuilder();
632         for (int i = 0; i < regions.length; i++) {
633           if (i > 0) sb.append(", ");
634           sb.append(Bytes.toStringBinary(regions[i]));
635         }
636         LOG.info("Too many hlogs: logs=" + logCount + ", maxlogs=" +
637            this.maxLogs + "; forcing flush of " + regions.length + " regions(s): " +
638            sb.toString());
639       }
640     }
641     return regions;
642   }
643 
644   /*
645    * Cleans up current writer closing and adding to outputfiles.
646    * Presumes we're operating inside an updateLock scope.
647    * @return Path to current writer or null if none.
648    * @throws IOException
649    */
650   Path cleanupCurrentWriter(final long currentfilenum) throws IOException {
651     Path oldFile = null;
652     if (this.writer != null) {
653       // Close the current writer, get a new one.
654       try {
655         // Wait till all current transactions are written to the hlog.
656         // No new transactions can occur because we have the updatelock.
657         if (this.unflushedEntries.get() != this.syncedTillHere) {
658           LOG.debug("cleanupCurrentWriter " +
659                    " waiting for transactions to get synced " +
660                    " total " + this.unflushedEntries.get() +
661                    " synced till here " + syncedTillHere);
662           sync();
663         }
664         this.writer.close();
665         this.writer = null;
666         closeErrorCount.set(0);
667       } catch (IOException e) {
668         LOG.error("Failed close of HLog writer", e);
669         int errors = closeErrorCount.incrementAndGet();
670         if (errors <= closeErrorsTolerated && !hasDeferredEntries()) {
671           LOG.warn("Riding over HLog close failure! error count="+errors);
672         } else {
673           if (hasDeferredEntries()) {
674             LOG.error("Aborting due to unflushed edits in HLog");
675           }
676           // Failed close of log file.  Means we're losing edits.  For now,
677           // shut ourselves down to minimize loss.  Alternative is to try and
678           // keep going.  See HBASE-930.
679           FailedLogCloseException flce =
680             new FailedLogCloseException("#" + currentfilenum);
681           flce.initCause(e);
682           throw flce;
683         }
684       }
685       if (currentfilenum >= 0) {
686         oldFile = computeFilename(currentfilenum);
687         this.outputfiles.put(Long.valueOf(this.logSeqNum.get()), oldFile);
688       }
689     }
690     return oldFile;
691   }
692 
693   private void archiveLogFile(final Path p, final Long seqno) throws IOException {
694     Path newPath = getHLogArchivePath(this.oldLogDir, p);
695     LOG.info("moving old hlog file " + FSUtils.getPath(p) +
696       " whose highest sequenceid is " + seqno + " to " +
697       FSUtils.getPath(newPath));
698 
699     // Tell our listeners that a log is going to be archived.
700     if (!this.listeners.isEmpty()) {
701       for (WALActionsListener i : this.listeners) {
702         i.preLogArchive(p, newPath);
703       }
704     }
705     if (!FSUtils.renameAndSetModifyTime(this.fs, p, newPath)) {
706       throw new IOException("Unable to rename " + p + " to " + newPath);
707     }
708     // Tell our listeners that a log has been archived.
709     if (!this.listeners.isEmpty()) {
710       for (WALActionsListener i : this.listeners) {
711         i.postLogArchive(p, newPath);
712       }
713     }
714   }
715 
716   /**
717    * This is a convenience method that computes a new filename with a given
718    * using the current HLog file-number
719    * @return Path
720    */
721   protected Path computeFilename() {
722     return computeFilename(this.filenum);
723   }
724 
725   /**
726    * This is a convenience method that computes a new filename with a given
727    * file-number.
728    * @param filenum to use
729    * @return Path
730    */
731   protected Path computeFilename(long filenum) {
732     if (filenum < 0) {
733       throw new RuntimeException("hlog file number can't be < 0");
734     }
735     String child = prefix + "." + filenum;
736     if (forMeta) {
737       child += HLog.META_HLOG_FILE_EXTN;
738     }
739     return new Path(dir, child);
740   }
741 
742   @Override
743   public void closeAndDelete() throws IOException {
744     close();
745     if (!fs.exists(this.dir)) return;
746     FileStatus[] files = fs.listStatus(this.dir);
747     if (files != null) {
748       for(FileStatus file : files) {
749 
750         Path p = getHLogArchivePath(this.oldLogDir, file.getPath());
751         // Tell our listeners that a log is going to be archived.
752         if (!this.listeners.isEmpty()) {
753           for (WALActionsListener i : this.listeners) {
754             i.preLogArchive(file.getPath(), p);
755           }
756         }
757 
758         if (!FSUtils.renameAndSetModifyTime(fs, file.getPath(), p)) {
759           throw new IOException("Unable to rename " + file.getPath() + " to " + p);
760         }
761         // Tell our listeners that a log was archived.
762         if (!this.listeners.isEmpty()) {
763           for (WALActionsListener i : this.listeners) {
764             i.postLogArchive(file.getPath(), p);
765           }
766         }
767       }
768       LOG.debug("Moved " + files.length + " WAL file(s) to " + FSUtils.getPath(this.oldLogDir));
769     }
770     if (!fs.delete(dir, true)) {
771       LOG.info("Unable to delete " + dir);
772     }
773   }
774 
775   @Override
776   public void close() throws IOException {
777     if (this.closed) {
778       return;
779     }
780     // When optionalFlushInterval is 0, the logSyncer is not started as a Thread.
781     if (this.optionalFlushInterval > 0) {
782       try {
783         logSyncer.close();
784         // Make sure we synced everything
785         logSyncer.join(this.optionalFlushInterval * 2);
786       } catch (InterruptedException e) {
787         LOG.error("Exception while waiting for syncer thread to die", e);
788         Thread.currentThread().interrupt();
789       }
790     }
791     try {
792       // Prevent all further flushing and rolling.
793       closeBarrier.stopAndDrainOps();
794     } catch (InterruptedException e) {
795       LOG.error("Exception while waiting for cache flushes and log rolls", e);
796       Thread.currentThread().interrupt();
797     }
798 
799     // Tell our listeners that the log is closing
800     if (!this.listeners.isEmpty()) {
801       for (WALActionsListener i : this.listeners) {
802         i.logCloseRequested();
803       }
804     }
805     synchronized (updateLock) {
806       this.closed = true;
807       if (LOG.isDebugEnabled()) {
808         LOG.debug("Closing WAL writer in " + this.dir.toString());
809       }
810       if (this.writer != null) {
811         this.writer.close();
812         this.writer = null;
813       }
814     }
815   }
816 
817   /**
818    * @param now
819    * @param encodedRegionName Encoded name of the region as returned by
820    * <code>HRegionInfo#getEncodedNameAsBytes()</code>.
821    * @param tableName
822    * @param clusterId
823    * @return New log key.
824    */
825   protected HLogKey makeKey(byte[] encodedRegionName, byte[] tableName, long seqnum,
826       long now, UUID clusterId) {
827     return new HLogKey(encodedRegionName, tableName, seqnum, now, clusterId);
828   }
829 
830   @Override
831   public void append(HRegionInfo info, byte [] tableName, WALEdit edits,
832     final long now, HTableDescriptor htd)
833   throws IOException {
834     append(info, tableName, edits, now, htd, true);
835   }
836 
837   @Override
838   public void append(HRegionInfo info, byte [] tableName, WALEdit edits,
839     final long now, HTableDescriptor htd, boolean isInMemstore) throws IOException {
840     append(info, tableName, edits, HConstants.DEFAULT_CLUSTER_ID, now, htd, true, isInMemstore);
841   }
842 
843   /**
844    * Append a set of edits to the log. Log edits are keyed by (encoded)
845    * regionName, rowname, and log-sequence-id.
846    *
847    * Later, if we sort by these keys, we obtain all the relevant edits for a
848    * given key-range of the HRegion (TODO). Any edits that do not have a
849    * matching COMPLETE_CACHEFLUSH message can be discarded.
850    *
851    * <p>
852    * Logs cannot be restarted once closed, or once the HLog process dies. Each
853    * time the HLog starts, it must create a new log. This means that other
854    * systems should process the log appropriately upon each startup (and prior
855    * to initializing HLog).
856    *
857    * synchronized prevents appends during the completion of a cache flush or for
858    * the duration of a log roll.
859    *
860    * @param info
861    * @param tableName
862    * @param edits
863    * @param clusterId The originating clusterId for this edit (for replication)
864    * @param now
865    * @param doSync shall we sync?
866    * @return txid of this transaction
867    * @throws IOException
868    */
869   private long append(HRegionInfo info, byte [] tableName, WALEdit edits, UUID clusterId,
870       final long now, HTableDescriptor htd, boolean doSync, boolean isInMemstore)
871     throws IOException {
872       if (edits.isEmpty()) return this.unflushedEntries.get();
873       if (this.closed) {
874         throw new IOException("Cannot append; log is closed");
875       }
876       long txid = 0;
877       synchronized (this.updateLock) {
878         long seqNum = obtainSeqNum();
879         // The 'lastSeqWritten' map holds the sequence number of the oldest
880         // write for each region (i.e. the first edit added to the particular
881         // memstore). . When the cache is flushed, the entry for the
882         // region being flushed is removed if the sequence number of the flush
883         // is greater than or equal to the value in lastSeqWritten.
884         // Use encoded name.  Its shorter, guaranteed unique and a subset of
885         // actual  name.
886         byte [] encodedRegionName = info.getEncodedNameAsBytes();
887         if (isInMemstore) this.oldestUnflushedSeqNums.putIfAbsent(encodedRegionName, seqNum);
888         HLogKey logKey = makeKey(encodedRegionName, tableName, seqNum, now, clusterId);
889         doWrite(info, logKey, edits, htd);
890         this.numEntries.incrementAndGet();
891         txid = this.unflushedEntries.incrementAndGet();
892         if (htd.isDeferredLogFlush()) {
893           lastDeferredTxid = txid;
894         }
895       }
896       // Sync if catalog region, and if not then check if that table supports
897       // deferred log flushing
898       if (doSync &&
899           (info.isMetaRegion() ||
900           !htd.isDeferredLogFlush())) {
901         // sync txn to file system
902         this.sync(txid);
903       }
904       return txid;
905     }
906 
907   @Override
908   public long appendNoSync(HRegionInfo info, byte [] tableName, WALEdit edits,
909     UUID clusterId, final long now, HTableDescriptor htd)
910     throws IOException {
911     return append(info, tableName, edits, clusterId, now, htd, false, true);
912   }
913 
914   /**
915    * This class is responsible to hold the HLog's appended Entry list
916    * and to sync them according to a configurable interval.
917    *
918    * Deferred log flushing works first by piggy backing on this process by
919    * simply not sync'ing the appended Entry. It can also be sync'd by other
920    * non-deferred log flushed entries outside of this thread.
921    */
922   class LogSyncer extends HasThread {
923 
924     private final long optionalFlushInterval;
925 
926     private final AtomicBoolean closeLogSyncer = new AtomicBoolean(false);
927 
928     // List of pending writes to the HLog. There corresponds to transactions
929     // that have not yet returned to the client. We keep them cached here
930     // instead of writing them to HDFS piecemeal, because the HDFS write
931     // method is pretty heavyweight as far as locking is concerned. The
932     // goal is to increase the batchsize for writing-to-hdfs as well as
933     // sync-to-hdfs, so that we can get better system throughput.
934     private List<Entry> pendingWrites = new LinkedList<Entry>();
935 
936     LogSyncer(long optionalFlushInterval) {
937       this.optionalFlushInterval = optionalFlushInterval;
938     }
939 
940     @Override
941     public void run() {
942       try {
943         // awaiting with a timeout doesn't always
944         // throw exceptions on interrupt
945         while(!this.isInterrupted() && !closeLogSyncer.get()) {
946 
947           try {
948             if (unflushedEntries.get() <= syncedTillHere) {
949               synchronized (closeLogSyncer) {
950                 closeLogSyncer.wait(this.optionalFlushInterval);
951               }
952             }
953             // Calling sync since we waited or had unflushed entries.
954             // Entries appended but not sync'd are taken care of here AKA
955             // deferred log flush
956             sync();
957           } catch (IOException e) {
958             LOG.error("Error while syncing, requesting close of hlog ", e);
959             requestLogRoll();
960             Threads.sleep(this.optionalFlushInterval);
961           }
962         }
963       } catch (InterruptedException e) {
964         LOG.debug(getName() + " interrupted while waiting for sync requests");
965       } finally {
966         LOG.info(getName() + " exiting");
967       }
968     }
969 
970     // appends new writes to the pendingWrites. It is better to keep it in
971     // our own queue rather than writing it to the HDFS output stream because
972     // HDFSOutputStream.writeChunk is not lightweight at all.
973     synchronized void append(Entry e) throws IOException {
974       pendingWrites.add(e);
975     }
976 
977     // Returns all currently pending writes. New writes
978     // will accumulate in a new list.
979     synchronized List<Entry> getPendingWrites() {
980       List<Entry> save = this.pendingWrites;
981       this.pendingWrites = new LinkedList<Entry>();
982       return save;
983     }
984 
985     // writes out pending entries to the HLog
986     void hlogFlush(Writer writer, List<Entry> pending) throws IOException {
987       if (pending == null) return;
988 
989       // write out all accumulated Entries to hdfs.
990       for (Entry e : pending) {
991         writer.append(e);
992       }
993     }
994 
995     void close() {
996       synchronized (closeLogSyncer) {
997         closeLogSyncer.set(true);
998         closeLogSyncer.notifyAll();
999       }
1000     }
1001   }
1002 
1003   // sync all known transactions
1004   private void syncer() throws IOException {
1005     syncer(this.unflushedEntries.get()); // sync all pending items
1006   }
1007 
1008   // sync all transactions upto the specified txid
1009   private void syncer(long txid) throws IOException {
1010     // if the transaction that we are interested in is already
1011     // synced, then return immediately.
1012     if (txid <= this.syncedTillHere) {
1013       return;
1014     }
1015     Writer tempWriter;
1016     synchronized (this.updateLock) {
1017       if (this.closed) return;
1018       // Guaranteed non-null.
1019       // Note that parallel sync can close tempWriter.
1020       // The current method of dealing with this is to catch exceptions.
1021       // See HBASE-4387, HBASE-5623, HBASE-7329.
1022       tempWriter = this.writer;
1023     }
1024     try {
1025       long doneUpto;
1026       long now = EnvironmentEdgeManager.currentTimeMillis();
1027       // First flush all the pending writes to HDFS. Then
1028       // issue the sync to HDFS. If sync is successful, then update
1029       // syncedTillHere to indicate that transactions till this
1030       // number has been successfully synced.
1031       IOException ioe = null;
1032       List<Entry> pending = null;
1033       synchronized (flushLock) {
1034         if (txid <= this.syncedTillHere) {
1035           return;
1036         }
1037         doneUpto = this.unflushedEntries.get();
1038         pending = logSyncer.getPendingWrites();
1039         try {
1040           logSyncer.hlogFlush(tempWriter, pending);
1041         } catch(IOException io) {
1042           ioe = io;
1043           LOG.error("syncer encountered error, will retry. txid=" + txid, ioe);
1044         }
1045       }
1046       if (ioe != null && pending != null) {
1047         synchronized (this.updateLock) {
1048           synchronized (flushLock) {
1049             // HBASE-4387, HBASE-5623, retry with updateLock held
1050             tempWriter = this.writer;
1051             logSyncer.hlogFlush(tempWriter, pending);
1052           }
1053         }
1054       }
1055       // another thread might have sync'ed avoid double-sync'ing
1056       if (txid <= this.syncedTillHere) {
1057         return;
1058       }
1059       try {
1060         if (tempWriter != null) tempWriter.sync();
1061       } catch(IOException ex) {
1062         synchronized (this.updateLock) {
1063           // HBASE-4387, HBASE-5623, retry with updateLock held
1064           // TODO: we don't actually need to do it for concurrent close - what is the point
1065           //       of syncing new unrelated writer? Keep behavior for now.
1066           tempWriter = this.writer;
1067           if (tempWriter != null) tempWriter.sync();
1068         }
1069       }
1070       this.syncedTillHere = Math.max(this.syncedTillHere, doneUpto);
1071 
1072       this.metrics.finishSync(EnvironmentEdgeManager.currentTimeMillis() - now);
1073       // TODO: preserving the old behavior for now, but this check is strange. It's not
1074       //       protected by any locks here, so for all we know rolling locks might start
1075       //       as soon as we enter the "if". Is this best-effort optimization check?
1076       if (!this.logRollRunning) {
1077         checkLowReplication();
1078         try {
1079           if (tempWriter.getLength() > this.logrollsize) {
1080             requestLogRoll();
1081           }
1082         } catch (IOException x) {
1083           LOG.debug("Log roll failed and will be retried. (This is not an error)");
1084         }
1085       }
1086     } catch (IOException e) {
1087       LOG.fatal("Could not sync. Requesting roll of hlog", e);
1088       requestLogRoll();
1089       throw e;
1090     }
1091   }
1092 
1093   private void checkLowReplication() {
1094     // if the number of replicas in HDFS has fallen below the configured
1095     // value, then roll logs.
1096     try {
1097       int numCurrentReplicas = getLogReplication();
1098       if (numCurrentReplicas != 0
1099           && numCurrentReplicas < this.minTolerableReplication) {
1100         if (this.lowReplicationRollEnabled) {
1101           if (this.consecutiveLogRolls.get() < this.lowReplicationRollLimit) {
1102             LOG.warn("HDFS pipeline error detected. " + "Found "
1103                 + numCurrentReplicas + " replicas but expecting no less than "
1104                 + this.minTolerableReplication + " replicas. "
1105                 + " Requesting close of hlog.");
1106             requestLogRoll();
1107             // If rollWriter is requested, increase consecutiveLogRolls. Once it
1108             // is larger than lowReplicationRollLimit, disable the
1109             // LowReplication-Roller
1110             this.consecutiveLogRolls.getAndIncrement();
1111           } else {
1112             LOG.warn("Too many consecutive RollWriter requests, it's a sign of "
1113                 + "the total number of live datanodes is lower than the tolerable replicas.");
1114             this.consecutiveLogRolls.set(0);
1115             this.lowReplicationRollEnabled = false;
1116           }
1117         }
1118       } else if (numCurrentReplicas >= this.minTolerableReplication) {
1119 
1120         if (!this.lowReplicationRollEnabled) {
1121           // The new writer's log replicas is always the default value.
1122           // So we should not enable LowReplication-Roller. If numEntries
1123           // is lower than or equals 1, we consider it as a new writer.
1124           if (this.numEntries.get() <= 1) {
1125             return;
1126           }
1127           // Once the live datanode number and the replicas return to normal,
1128           // enable the LowReplication-Roller.
1129           this.lowReplicationRollEnabled = true;
1130           LOG.info("LowReplication-Roller was enabled.");
1131         }
1132       }
1133     } catch (Exception e) {
1134       LOG.warn("Unable to invoke DFSOutputStream.getNumCurrentReplicas" + e +
1135           " still proceeding ahead...");
1136     }
1137   }
1138 
1139   /**
1140    * This method gets the datanode replication count for the current HLog.
1141    *
1142    * If the pipeline isn't started yet or is empty, you will get the default
1143    * replication factor.  Therefore, if this function returns 0, it means you
1144    * are not properly running with the HDFS-826 patch.
1145    * @throws InvocationTargetException
1146    * @throws IllegalAccessException
1147    * @throws IllegalArgumentException
1148    *
1149    * @throws Exception
1150    */
1151   int getLogReplication()
1152   throws IllegalArgumentException, IllegalAccessException, InvocationTargetException {
1153     if (this.getNumCurrentReplicas != null && this.hdfs_out != null) {
1154       Object repl = this.getNumCurrentReplicas.invoke(getOutputStream(), NO_ARGS);
1155       if (repl instanceof Integer) {
1156         return ((Integer)repl).intValue();
1157       }
1158     }
1159     return 0;
1160   }
1161 
1162   boolean canGetCurReplicas() {
1163     return this.getNumCurrentReplicas != null;
1164   }
1165 
1166   @Override
1167   public void hsync() throws IOException {
1168     syncer();
1169   }
1170 
1171   @Override
1172   public void hflush() throws IOException {
1173     syncer();
1174   }
1175 
1176   @Override
1177   public void sync() throws IOException {
1178     syncer();
1179   }
1180 
1181   @Override
1182   public void sync(long txid) throws IOException {
1183     syncer(txid);
1184   }
1185 
1186   private void requestLogRoll() {
1187     if (!this.listeners.isEmpty()) {
1188       for (WALActionsListener i: this.listeners) {
1189         i.logRollRequested();
1190       }
1191     }
1192   }
1193 
1194   // TODO: Remove info.  Unused.
1195   protected void doWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit,
1196                            HTableDescriptor htd)
1197   throws IOException {
1198     if (!this.enabled) {
1199       return;
1200     }
1201     if (!this.listeners.isEmpty()) {
1202       for (WALActionsListener i: this.listeners) {
1203         i.visitLogEntryBeforeWrite(htd, logKey, logEdit);
1204       }
1205     }
1206     try {
1207       long now = EnvironmentEdgeManager.currentTimeMillis();
1208       // coprocessor hook:
1209       if (!coprocessorHost.preWALWrite(info, logKey, logEdit)) {
1210         if (logEdit.isReplay()) {
1211           // set replication scope null so that this won't be replicated
1212           logKey.setScopes(null);
1213         }
1214         // write to our buffer for the Hlog file.
1215         logSyncer.append(new FSHLog.Entry(logKey, logEdit));
1216       }
1217       long took = EnvironmentEdgeManager.currentTimeMillis() - now;
1218       coprocessorHost.postWALWrite(info, logKey, logEdit);
1219       long len = 0;
1220       for (KeyValue kv : logEdit.getKeyValues()) {
1221         len += kv.getLength();
1222       }
1223       this.metrics.finishAppend(took, len);
1224     } catch (IOException e) {
1225       LOG.fatal("Could not append. Requesting close of hlog", e);
1226       requestLogRoll();
1227       throw e;
1228     }
1229   }
1230 
1231 
1232   /** @return How many items have been added to the log */
1233   int getNumEntries() {
1234     return numEntries.get();
1235   }
1236 
1237   @Override
1238   public long obtainSeqNum() {
1239     return this.logSeqNum.incrementAndGet();
1240   }
1241 
1242   /** @return the number of log files in use */
1243   int getNumLogFiles() {
1244     return outputfiles.size();
1245   }
1246 
1247   @Override
1248   public Long startCacheFlush(final byte[] encodedRegionName) {
1249     Long oldRegionSeqNum = null;
1250     if (!closeBarrier.beginOp()) {
1251       return null;
1252     }
1253     synchronized (oldestSeqNumsLock) {
1254       oldRegionSeqNum = this.oldestUnflushedSeqNums.remove(encodedRegionName);
1255       if (oldRegionSeqNum != null) {
1256         Long oldValue = this.oldestFlushingSeqNums.put(encodedRegionName, oldRegionSeqNum);
1257         assert oldValue == null : "Flushing map not cleaned up for "
1258           + Bytes.toString(encodedRegionName);
1259       }
1260     }
1261     if (oldRegionSeqNum == null) {
1262       // TODO: if we have no oldRegionSeqNum, and WAL is not disabled, presumably either
1263       //       the region is already flushing (which would make this call invalid), or there
1264       //       were no appends after last flush, so why are we starting flush? Maybe we should
1265       //       assert not null, and switch to "long" everywhere. Less rigorous, but safer,
1266       //       alternative is telling the caller to stop. For now preserve old logic.
1267       LOG.warn("Couldn't find oldest seqNum for the region we are about to flush: ["
1268         + Bytes.toString(encodedRegionName) + "]");
1269     }
1270     return obtainSeqNum();
1271   }
1272 
1273   @Override
1274   public void completeCacheFlush(final byte [] encodedRegionName)
1275   {
1276     synchronized (oldestSeqNumsLock) {
1277       this.oldestFlushingSeqNums.remove(encodedRegionName);
1278     }
1279     closeBarrier.endOp();
1280   }
1281 
1282   @Override
1283   public void abortCacheFlush(byte[] encodedRegionName) {
1284     Long currentSeqNum = null, seqNumBeforeFlushStarts = null;
1285     synchronized (oldestSeqNumsLock) {
1286       seqNumBeforeFlushStarts = this.oldestFlushingSeqNums.remove(encodedRegionName);
1287       if (seqNumBeforeFlushStarts != null) {
1288         currentSeqNum =
1289           this.oldestUnflushedSeqNums.put(encodedRegionName, seqNumBeforeFlushStarts);
1290       }
1291     }
1292     closeBarrier.endOp();
1293     if ((currentSeqNum != null)
1294         && (currentSeqNum.longValue() <= seqNumBeforeFlushStarts.longValue())) {
1295       String errorStr = "Region " + Bytes.toString(encodedRegionName) +
1296           "acquired edits out of order current memstore seq=" + currentSeqNum
1297           + ", previous oldest unflushed id=" + seqNumBeforeFlushStarts;
1298       LOG.error(errorStr);
1299       assert false : errorStr;
1300       Runtime.getRuntime().halt(1);
1301     }
1302   }
1303 
1304   @Override
1305   public boolean isLowReplicationRollEnabled() {
1306       return lowReplicationRollEnabled;
1307   }
1308 
1309   /**
1310    * Get the directory we are making logs in.
1311    *
1312    * @return dir
1313    */
1314   protected Path getDir() {
1315     return dir;
1316   }
1317 
1318   static Path getHLogArchivePath(Path oldLogDir, Path p) {
1319     return new Path(oldLogDir, p.getName());
1320   }
1321 
1322   static String formatRecoveredEditsFileName(final long seqid) {
1323     return String.format("%019d", seqid);
1324   }
1325 
1326   public static final long FIXED_OVERHEAD = ClassSize.align(
1327     ClassSize.OBJECT + (5 * ClassSize.REFERENCE) +
1328     ClassSize.ATOMIC_INTEGER + Bytes.SIZEOF_INT + (3 * Bytes.SIZEOF_LONG));
1329 
1330   private static void usage() {
1331     System.err.println("Usage: HLog <ARGS>");
1332     System.err.println("Arguments:");
1333     System.err.println(" --dump  Dump textual representation of passed one or more files");
1334     System.err.println("         For example: HLog --dump hdfs://example.com:9000/hbase/.logs/MACHINE/LOGFILE");
1335     System.err.println(" --split Split the passed directory of WAL logs");
1336     System.err.println("         For example: HLog --split hdfs://example.com:9000/hbase/.logs/DIR");
1337   }
1338 
1339   private static void split(final Configuration conf, final Path p)
1340   throws IOException {
1341     FileSystem fs = FileSystem.get(conf);
1342     if (!fs.exists(p)) {
1343       throw new FileNotFoundException(p.toString());
1344     }
1345     final Path baseDir = FSUtils.getRootDir(conf);
1346     final Path oldLogDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME);
1347     if (!fs.getFileStatus(p).isDir()) {
1348       throw new IOException(p + " is not a directory");
1349     }
1350 
1351     HLogSplitter logSplitter = HLogSplitter.createLogSplitter(
1352         conf, baseDir, p, oldLogDir, fs);
1353     logSplitter.splitLog();
1354   }
1355 
1356   @Override
1357   public WALCoprocessorHost getCoprocessorHost() {
1358     return coprocessorHost;
1359   }
1360 
1361   /** Provide access to currently deferred sequence num for tests */
1362   boolean hasDeferredEntries() {
1363     return lastDeferredTxid > syncedTillHere;
1364   }
1365 
1366   @Override
1367   public long getEarliestMemstoreSeqNum(byte[] encodedRegionName) {
1368     Long result = oldestUnflushedSeqNums.get(encodedRegionName);
1369     return result == null ? HConstants.NO_SEQNUM : result.longValue();
1370   }
1371 
1372   /**
1373    * Pass one or more log file names and it will either dump out a text version
1374    * on <code>stdout</code> or split the specified log files.
1375    *
1376    * @param args
1377    * @throws IOException
1378    */
1379   public static void main(String[] args) throws IOException {
1380     if (args.length < 2) {
1381       usage();
1382       System.exit(-1);
1383     }
1384     // either dump using the HLogPrettyPrinter or split, depending on args
1385     if (args[0].compareTo("--dump") == 0) {
1386       HLogPrettyPrinter.run(Arrays.copyOfRange(args, 1, args.length));
1387     } else if (args[0].compareTo("--split") == 0) {
1388       Configuration conf = HBaseConfiguration.create();
1389       for (int i = 1; i < args.length; i++) {
1390         try {
1391           Path logPath = new Path(args[i]);
1392           FSUtils.setFsDefault(conf, logPath);
1393           split(conf, logPath);
1394         } catch (Throwable t) {
1395           t.printStackTrace(System.err);
1396           System.exit(-1);
1397         }
1398       }
1399     } else {
1400       usage();
1401       System.exit(-1);
1402     }
1403   }
1404 }