View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver.wal;
19  
20  import static org.apache.hadoop.hbase.wal.DefaultWALProvider.WAL_FILE_NAME_DELIMITER;
21  
22  import java.io.FileNotFoundException;
23  import java.io.IOException;
24  import java.io.InterruptedIOException;
25  import java.io.OutputStream;
26  import java.lang.reflect.InvocationTargetException;
27  import java.lang.reflect.Method;
28  import java.net.URLEncoder;
29  import java.util.ArrayList;
30  import java.util.Arrays;
31  import java.util.Comparator;
32  import java.util.HashMap;
33  import java.util.List;
34  import java.util.Map;
35  import java.util.NavigableMap;
36  import java.util.Set;
37  import java.util.TreeMap;
38  import java.util.UUID;
39  import java.util.concurrent.BlockingQueue;
40  import java.util.concurrent.ConcurrentHashMap;
41  import java.util.concurrent.ConcurrentMap;
42  import java.util.concurrent.ConcurrentSkipListMap;
43  import java.util.concurrent.CopyOnWriteArrayList;
44  import java.util.concurrent.CountDownLatch;
45  import java.util.concurrent.ExecutionException;
46  import java.util.concurrent.ExecutorService;
47  import java.util.concurrent.Executors;
48  import java.util.concurrent.LinkedBlockingQueue;
49  import java.util.concurrent.TimeUnit;
50  import java.util.concurrent.atomic.AtomicBoolean;
51  import java.util.concurrent.atomic.AtomicInteger;
52  import java.util.concurrent.atomic.AtomicLong;
53  import java.util.concurrent.locks.ReentrantLock;
54  
55  import org.apache.commons.logging.Log;
56  import org.apache.commons.logging.LogFactory;
57  import org.apache.hadoop.conf.Configuration;
58  import org.apache.hadoop.fs.FSDataOutputStream;
59  import org.apache.hadoop.fs.FileStatus;
60  import org.apache.hadoop.fs.FileSystem;
61  import org.apache.hadoop.fs.Path;
62  import org.apache.hadoop.fs.PathFilter;
63  import org.apache.hadoop.hbase.Cell;
64  import org.apache.hadoop.hbase.CellUtil;
65  import org.apache.hadoop.hbase.HBaseConfiguration;
66  import org.apache.hadoop.hbase.HConstants;
67  import org.apache.hadoop.hbase.HRegionInfo;
68  import org.apache.hadoop.hbase.HTableDescriptor;
69  import org.apache.hadoop.hbase.TableName;
70  import org.apache.hadoop.hbase.classification.InterfaceAudience;
71  import org.apache.hadoop.hbase.util.Bytes;
72  import org.apache.hadoop.hbase.util.ClassSize;
73  import org.apache.hadoop.hbase.util.DrainBarrier;
74  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
75  import org.apache.hadoop.hbase.util.FSUtils;
76  import org.apache.hadoop.hbase.util.HasThread;
77  import org.apache.hadoop.hbase.util.Threads;
78  import org.apache.hadoop.hbase.wal.DefaultWALProvider;
79  import org.apache.hadoop.hbase.wal.WAL;
80  import org.apache.hadoop.hbase.wal.WALFactory;
81  import org.apache.hadoop.hbase.wal.WALKey;
82  import org.apache.hadoop.hbase.wal.WALPrettyPrinter;
83  import org.apache.hadoop.hbase.wal.WALProvider.Writer;
84  import org.apache.hadoop.hbase.wal.WALSplitter;
85  import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
86  import org.apache.hadoop.util.StringUtils;
87  import org.htrace.NullScope;
88  import org.htrace.Span;
89  import org.htrace.Trace;
90  import org.htrace.TraceScope;
91  
92  import com.google.common.annotations.VisibleForTesting;
93  import com.google.common.collect.Maps;
94  import com.lmax.disruptor.BlockingWaitStrategy;
95  import com.lmax.disruptor.EventHandler;
96  import com.lmax.disruptor.ExceptionHandler;
97  import com.lmax.disruptor.LifecycleAware;
98  import com.lmax.disruptor.TimeoutException;
99  import com.lmax.disruptor.dsl.Disruptor;
100 import com.lmax.disruptor.dsl.ProducerType;
101 
102 /**
103  * Implementation of {@link WAL} to go against {@link FileSystem}; i.e. keep WALs in HDFS.
104  * Only one WAL is ever being written at a time.  When a WAL hits a configured maximum size,
105  * it is rolled.  This is done internal to the implementation.
106  *
107  * <p>As data is flushed from the MemStore to other on-disk structures (files sorted by
108  * key, hfiles), a WAL becomes obsolete. We can let go of all the log edits/entries for a given
109  * HRegion-sequence id.  A bunch of work in the below is done keeping account of these region
110  * sequence ids -- what is flushed out to hfiles, and what is yet in WAL and in memory only.
111  *
112  * <p>It is only practical to delete entire files. Thus, we delete an entire on-disk file
113  * <code>F</code> when all of the edits in <code>F</code> have a log-sequence-id that's older
114  * (smaller) than the most-recent flush.
115  *
116  * <p>To read an WAL, call {@link WALFactory#createReader(org.apache.hadoop.fs.FileSystem,
117  * org.apache.hadoop.fs.Path)}.
118  */
119 @InterfaceAudience.Private
120 public class FSHLog implements WAL {
121   // IMPLEMENTATION NOTES:
122   //
123   // At the core is a ring buffer.  Our ring buffer is the LMAX Disruptor.  It tries to
124   // minimize synchronizations and volatile writes when multiple contending threads as is the case
125   // here appending and syncing on a single WAL.  The Disruptor is configured to handle multiple
126   // producers but it has one consumer only (the producers in HBase are IPC Handlers calling append
127   // and then sync).  The single consumer/writer pulls the appends and syncs off the ring buffer.
128   // When a handler calls sync, it is given back a future. The producer 'blocks' on the future so
129   // it does not return until the sync completes.  The future is passed over the ring buffer from
130   // the producer/handler to the consumer thread where it does its best to batch up the producer
131   // syncs so one WAL sync actually spans multiple producer sync invocations.  How well the
132   // batching works depends on the write rate; i.e. we tend to batch more in times of
133   // high writes/syncs.
134   //
135   // Calls to append now also wait until the append has been done on the consumer side of the
136   // disruptor.  We used to not wait but it makes the implemenation easier to grok if we have
137   // the region edit/sequence id after the append returns.
138   // 
139   // TODO: Handlers need to coordinate appending AND syncing.  Can we have the threads contend
140   // once only?  Probably hard given syncs take way longer than an append.
141   //
142   // The consumer threads pass the syncs off to multiple syncing threads in a round robin fashion
143   // to ensure we keep up back-to-back FS sync calls (FS sync calls are the long poll writing the
144   // WAL).  The consumer thread passes the futures to the sync threads for it to complete
145   // the futures when done.
146   //
147   // The 'sequence' in the below is the sequence of the append/sync on the ringbuffer.  It
148   // acts as a sort-of transaction id.  It is always incrementing.
149   //
150   // The RingBufferEventHandler class hosts the ring buffer consuming code.  The threads that
151   // do the actual FS sync are implementations of SyncRunner.  SafePointZigZagLatch is a
152   // synchronization class used to halt the consumer at a safe point --  just after all outstanding
153   // syncs and appends have completed -- so the log roller can swap the WAL out under it.
154 
155   static final Log LOG = LogFactory.getLog(FSHLog.class);
156 
157   private static final int DEFAULT_SLOW_SYNC_TIME_MS = 100; // in ms
158   
159   /**
160    * The nexus at which all incoming handlers meet.  Does appends and sync with an ordering.
161    * Appends and syncs are each put on the ring which means handlers need to
162    * smash up against the ring twice (can we make it once only? ... maybe not since time to append
163    * is so different from time to sync and sometimes we don't want to sync or we want to async
164    * the sync).  The ring is where we make sure of our ordering and it is also where we do
165    * batching up of handler sync calls.
166    */
167   private final Disruptor<RingBufferTruck> disruptor;
168 
169   /**
170    * An executorservice that runs the disrutpor AppendEventHandler append executor.
171    */
172   private final ExecutorService appendExecutor;
173 
174   /**
175    * This fellow is run by the above appendExecutor service but it is all about batching up appends
176    * and syncs; it may shutdown without cleaning out the last few appends or syncs.  To guard
177    * against this, keep a reference to this handler and do explicit close on way out to make sure
178    * all flushed out before we exit.
179    */
180   private final RingBufferEventHandler ringBufferEventHandler;
181 
182   /**
183    * Map of {@link SyncFuture}s keyed by Handler objects.  Used so we reuse SyncFutures.
184    * TODO: Reus FSWALEntry's rather than create them anew each time as we do SyncFutures here.
185    * TODO: Add a FSWalEntry and SyncFuture as thread locals on handlers rather than have them
186    * get them from this Map?
187    */
188   private final Map<Thread, SyncFuture> syncFuturesByHandler;
189 
190   /**
191    * The highest known outstanding unsync'd WALEdit sequence number where sequence number is the
192    * ring buffer sequence.  Maintained by the ring buffer consumer.
193    */
194   private volatile long highestUnsyncedSequence = -1;
195 
196   /**
197    * Updated to the ring buffer sequence of the last successful sync call.  This can be less than
198    * {@link #highestUnsyncedSequence} for case where we have an append where a sync has not yet
199    * come in for it.  Maintained by the syncing threads.
200    */
201   private final AtomicLong highestSyncedSequence = new AtomicLong(0);
202 
203   /**
204    * file system instance
205    */
206   private final FileSystem fs;
207   /**
208    * WAL directory, where all WAL files would be placed.
209    */
210   private final Path fullPathLogDir;
211   /**
212    * dir path where old logs are kept.
213    */
214   private final Path fullPathArchiveDir;
215 
216   /**
217    * Matches just those wal files that belong to this wal instance.
218    */
219   private final PathFilter ourFiles;
220 
221   /**
222    * Prefix of a WAL file, usually the region server name it is hosted on.
223    */
224   private final String logFilePrefix;
225 
226   /**
227    * Suffix included on generated wal file names 
228    */
229   private final String logFileSuffix;
230 
231   /**
232    * Prefix used when checking for wal membership.
233    */
234   private final String prefixPathStr;
235 
236   private final WALCoprocessorHost coprocessorHost;
237 
238   /**
239    * conf object
240    */
241   private final Configuration conf;
242   /** Listeners that are called on WAL events. */
243   private final List<WALActionsListener> listeners = new CopyOnWriteArrayList<WALActionsListener>();
244 
245   @Override
246   public void registerWALActionsListener(final WALActionsListener listener) {
247     this.listeners.add(listener);
248   }
249   
250   @Override
251   public boolean unregisterWALActionsListener(final WALActionsListener listener) {
252     return this.listeners.remove(listener);
253   }
254 
255   @Override
256   public WALCoprocessorHost getCoprocessorHost() {
257     return coprocessorHost;
258   }
259   /**
260    * FSDataOutputStream associated with the current SequenceFile.writer
261    */
262   private FSDataOutputStream hdfs_out;
263 
264   // All about log rolling if not enough replicas outstanding.
265 
266   // Minimum tolerable replicas, if the actual value is lower than it, rollWriter will be triggered
267   private final int minTolerableReplication;
268 
269   // DFSOutputStream.getNumCurrentReplicas method instance gotten via reflection.
270   private final Method getNumCurrentReplicas;
271   private final Method getPipeLine; // refers to DFSOutputStream.getPipeLine
272   private final int slowSyncNs;
273 
274   private final static Object [] NO_ARGS = new Object []{};
275 
276   // If live datanode count is lower than the default replicas value,
277   // RollWriter will be triggered in each sync(So the RollWriter will be
278   // triggered one by one in a short time). Using it as a workaround to slow
279   // down the roll frequency triggered by checkLowReplication().
280   private final AtomicInteger consecutiveLogRolls = new AtomicInteger(0);
281 
282   private final int lowReplicationRollLimit;
283 
284   // If consecutiveLogRolls is larger than lowReplicationRollLimit,
285   // then disable the rolling in checkLowReplication().
286   // Enable it if the replications recover.
287   private volatile boolean lowReplicationRollEnabled = true;
288 
289   /**
290    * Current log file.
291    */
292   volatile Writer writer;
293 
294   /** The barrier used to ensure that close() waits for all log rolls and flushes to finish. */
295   private final DrainBarrier closeBarrier = new DrainBarrier();
296 
297   /**
298    * This lock makes sure only one log roll runs at a time. Should not be taken while any other
299    * lock is held. We don't just use synchronized because that results in bogus and tedious
300    * findbugs warning when it thinks synchronized controls writer thread safety.  It is held when
301    * we are actually rolling the log.  It is checked when we are looking to see if we should roll
302    * the log or not.
303    */
304   private final ReentrantLock rollWriterLock = new ReentrantLock(true);
305 
306   private volatile boolean closed = false;
307   private final AtomicBoolean shutdown = new AtomicBoolean(false);
308 
309   // The timestamp (in ms) when the log file was created.
310   private final AtomicLong filenum = new AtomicLong(-1);
311 
312   // Number of transactions in the current Wal.
313   private final AtomicInteger numEntries = new AtomicInteger(0);
314 
315   // If > than this size, roll the log.
316   private final long logrollsize;
317 
318   /**
319    * The total size of wal
320    */
321   private AtomicLong totalLogSize = new AtomicLong(0);
322 
323   /*
324    * If more than this many logs, force flush of oldest region to oldest edit
325    * goes to disk.  If too many and we crash, then will take forever replaying.
326    * Keep the number of logs tidy.
327    */
328   private final int maxLogs;
329 
330   /** Number of log close errors tolerated before we abort */
331   private final int closeErrorsTolerated;
332 
333   private final AtomicInteger closeErrorCount = new AtomicInteger();
334 
335   // Region sequence id accounting across flushes and for knowing when we can GC a WAL.  These
336   // sequence id numbers are by region and unrelated to the ring buffer sequence number accounting
337   // done above in failedSequence, highest sequence, etc.
338   /**
339    * This lock ties all operations on lowestFlushingStoreSequenceIds and
340    * oldestUnflushedStoreSequenceIds Maps with the exception of append's putIfAbsent call into
341    * oldestUnflushedStoreSequenceIds. We use these Maps to find out the low bound regions
342    * sequence id, or to find regions with old sequence ids to force flush; we are interested in
343    * old stuff not the new additions (TODO: IS THIS SAFE?  CHECK!).
344    */
345   private final Object regionSequenceIdLock = new Object();
346 
347   /**
348    * Map of encoded region names and family names to their OLDEST -- i.e. their first,
349    * the longest-lived -- sequence id in memstore. Note that this sequence id is the region
350    * sequence id.  This is not related to the id we use above for {@link #highestSyncedSequence}
351    * and {@link #highestUnsyncedSequence} which is the sequence from the disruptor
352    * ring buffer.
353    */
354   private final ConcurrentMap<byte[], ConcurrentMap<byte[], Long>> oldestUnflushedStoreSequenceIds
355     = new ConcurrentSkipListMap<byte[], ConcurrentMap<byte[], Long>>(
356       Bytes.BYTES_COMPARATOR);
357 
358   /**
359    * Map of encoded region names and family names to their lowest or OLDEST sequence/edit id in
360    * memstore currently being flushed out to hfiles. Entries are moved here from
361    * {@link #oldestUnflushedStoreSequenceIds} while the lock {@link #regionSequenceIdLock} is held
362    * (so movement between the Maps is atomic). This is not related to the id we use above for
363    * {@link #highestSyncedSequence} and {@link #highestUnsyncedSequence} which is the sequence from
364    * the disruptor ring buffer, an internal detail.
365    */
366   private final Map<byte[], Map<byte[], Long>> lowestFlushingStoreSequenceIds =
367     new TreeMap<byte[], Map<byte[], Long>>(Bytes.BYTES_COMPARATOR);
368 
369  /**
370   * Map of region encoded names to the latest region sequence id.  Updated on each append of
371   * WALEdits to the WAL. We create one map for each WAL file at the time it is rolled.
372   * <p>When deciding whether to archive a WAL file, we compare the sequence IDs in this map to
373   * {@link #lowestFlushingRegionSequenceIds} and {@link #oldestUnflushedRegionSequenceIds}.
374   * See {@link FSHLog#areAllRegionsFlushed(Map, Map, Map)} for more info.
375   * <p>
376   * This map uses byte[] as the key, and uses reference equality. It works in our use case as we
377   * use {@link HRegionInfo#getEncodedNameAsBytes()} as keys. For a given region, it always returns
378   * the same array.
379   */
380   private Map<byte[], Long> highestRegionSequenceIds = new HashMap<byte[], Long>();
381 
382   /**
383    * WAL Comparator; it compares the timestamp (log filenum), present in the log file name.
384    * Throws an IllegalArgumentException if used to compare paths from different wals.
385    */
386   public final Comparator<Path> LOG_NAME_COMPARATOR = new Comparator<Path>() {
387     @Override
388     public int compare(Path o1, Path o2) {
389       long t1 = getFileNumFromFileName(o1);
390       long t2 = getFileNumFromFileName(o2);
391       if (t1 == t2) return 0;
392       return (t1 > t2) ? 1 : -1;
393     }
394   };
395 
396   /**
397    * Map of wal log file to the latest sequence ids of all regions it has entries of.
398    * The map is sorted by the log file creation timestamp (contained in the log file name).
399    */
400   private NavigableMap<Path, Map<byte[], Long>> byWalRegionSequenceIds =
401     new ConcurrentSkipListMap<Path, Map<byte[], Long>>(LOG_NAME_COMPARATOR);
402 
403   /**
404    * Exception handler to pass the disruptor ringbuffer.  Same as native implementation only it
405    * logs using our logger instead of java native logger.
406    */
407   static class RingBufferExceptionHandler implements ExceptionHandler {
408     @Override
409     public void handleEventException(Throwable ex, long sequence, Object event) {
410       LOG.error("Sequence=" + sequence + ", event=" + event, ex);
411       throw new RuntimeException(ex);
412     }
413 
414     @Override
415     public void handleOnStartException(Throwable ex) {
416       LOG.error(ex);
417       throw new RuntimeException(ex);
418     }
419 
420     @Override
421     public void handleOnShutdownException(Throwable ex) {
422       LOG.error(ex);
423       throw new RuntimeException(ex);
424     }
425   }
426 
427   /**
428    * Constructor.
429    *
430    * @param fs filesystem handle
431    * @param root path for stored and archived wals
432    * @param logDir dir where wals are stored
433    * @param conf configuration to use
434    * @throws IOException
435    */
436   public FSHLog(final FileSystem fs, final Path root, final String logDir, final Configuration conf)
437       throws IOException {
438     this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null);
439   }
440 
441   /**
442    * Create an edit log at the given <code>dir</code> location.
443    *
444    * You should never have to load an existing log. If there is a log at
445    * startup, it should have already been processed and deleted by the time the
446    * WAL object is started up.
447    *
448    * @param fs filesystem handle
449    * @param rootDir path to where logs and oldlogs
450    * @param logDir dir where wals are stored
451    * @param archiveDir dir where wals are archived
452    * @param conf configuration to use
453    * @param listeners Listeners on WAL events. Listeners passed here will
454    * be registered before we do anything else; e.g. the
455    * Constructor {@link #rollWriter()}.
456    * @param failIfWALExists If true IOException will be thrown if files related to this wal
457    *        already exist.
458    * @param prefix should always be hostname and port in distributed env and
459    *        it will be URL encoded before being used.
460    *        If prefix is null, "wal" will be used
461    * @param suffix will be url encoded. null is treated as empty. non-empty must start with
462    *        {@link DefaultWALProvider#WAL_FILE_NAME_DELIMITER}
463    * @throws IOException
464    */
465   public FSHLog(final FileSystem fs, final Path rootDir, final String logDir,
466       final String archiveDir, final Configuration conf,
467       final List<WALActionsListener> listeners,
468       final boolean failIfWALExists, final String prefix, final String suffix)
469       throws IOException {
470     this.fs = fs;
471     this.fullPathLogDir = new Path(rootDir, logDir);
472     this.fullPathArchiveDir = new Path(rootDir, archiveDir);
473     this.conf = conf;
474 
475     if (!fs.exists(fullPathLogDir) && !fs.mkdirs(fullPathLogDir)) {
476       throw new IOException("Unable to mkdir " + fullPathLogDir);
477     }
478 
479     if (!fs.exists(this.fullPathArchiveDir)) {
480       if (!fs.mkdirs(this.fullPathArchiveDir)) {
481         throw new IOException("Unable to mkdir " + this.fullPathArchiveDir);
482       }
483     }
484 
485     // If prefix is null||empty then just name it wal
486     this.logFilePrefix =
487       prefix == null || prefix.isEmpty() ? "wal" : URLEncoder.encode(prefix, "UTF8");
488     // we only correctly differentiate suffices when numeric ones start with '.'
489     if (suffix != null && !(suffix.isEmpty()) && !(suffix.startsWith(WAL_FILE_NAME_DELIMITER))) {
490       throw new IllegalArgumentException("wal suffix must start with '" + WAL_FILE_NAME_DELIMITER +
491           "' but instead was '" + suffix + "'");
492     }
493     this.logFileSuffix = (suffix == null) ? "" : URLEncoder.encode(suffix, "UTF8");
494     this.prefixPathStr = new Path(fullPathLogDir,
495         logFilePrefix + WAL_FILE_NAME_DELIMITER).toString();
496 
497     this.ourFiles = new PathFilter() {
498       @Override
499       public boolean accept(final Path fileName) {
500         // The path should start with dir/<prefix> and end with our suffix
501         final String fileNameString = fileName.toString();
502         if (!fileNameString.startsWith(prefixPathStr)) {
503           return false;
504         }
505         if (logFileSuffix.isEmpty()) {
506           // in the case of the null suffix, we need to ensure the filename ends with a timestamp.
507           return org.apache.commons.lang.StringUtils.isNumeric(
508               fileNameString.substring(prefixPathStr.length()));
509         } else if (!fileNameString.endsWith(logFileSuffix)) {
510           return false;
511         }
512         return true;
513       }
514     };
515 
516     if (failIfWALExists) {
517       final FileStatus[] walFiles = FSUtils.listStatus(fs, fullPathLogDir, ourFiles);
518       if (null != walFiles && 0 != walFiles.length) {
519         throw new IOException("Target WAL already exists within directory " + fullPathLogDir);
520       }
521     }
522 
523     // Register listeners.  TODO: Should this exist anymore?  We have CPs?
524     if (listeners != null) {
525       for (WALActionsListener i: listeners) {
526         registerWALActionsListener(i);
527       }
528     }
529     this.coprocessorHost = new WALCoprocessorHost(this, conf);
530 
531     // Get size to roll log at. Roll at 95% of HDFS block size so we avoid crossing HDFS blocks
532     // (it costs a little x'ing bocks)
533     final long blocksize = this.conf.getLong("hbase.regionserver.hlog.blocksize",
534         FSUtils.getDefaultBlockSize(this.fs, this.fullPathLogDir));
535     this.logrollsize =
536       (long)(blocksize * conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f));
537 
538     this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", 32);
539     this.minTolerableReplication = conf.getInt( "hbase.regionserver.hlog.tolerable.lowreplication",
540         FSUtils.getDefaultReplication(fs, this.fullPathLogDir));
541     this.lowReplicationRollLimit =
542       conf.getInt("hbase.regionserver.hlog.lowreplication.rolllimit", 5);
543     this.closeErrorsTolerated = conf.getInt("hbase.regionserver.logroll.errors.tolerated", 0);
544     int maxHandlersCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, 200);
545 
546     LOG.info("WAL configuration: blocksize=" + StringUtils.byteDesc(blocksize) +
547       ", rollsize=" + StringUtils.byteDesc(this.logrollsize) +
548       ", prefix=" + this.logFilePrefix + ", suffix=" + logFileSuffix + ", logDir=" +
549       this.fullPathLogDir + ", archiveDir=" + this.fullPathArchiveDir);
550 
551     // rollWriter sets this.hdfs_out if it can.
552     rollWriter();
553 
554     this.slowSyncNs =
555         1000000 * conf.getInt("hbase.regionserver.hlog.slowsync.ms",
556           DEFAULT_SLOW_SYNC_TIME_MS);
557     // handle the reflection necessary to call getNumCurrentReplicas(). TODO: Replace with
558     // HdfsDataOutputStream#getCurrentBlockReplication() and go without reflection.
559     this.getNumCurrentReplicas = getGetNumCurrentReplicas(this.hdfs_out);
560     this.getPipeLine = getGetPipeline(this.hdfs_out);
561 
562     // This is the 'writer' -- a single threaded executor.  This single thread 'consumes' what is
563     // put on the ring buffer.
564     String hostingThreadName = Thread.currentThread().getName();
565     this.appendExecutor = Executors.
566       newSingleThreadExecutor(Threads.getNamedThreadFactory(hostingThreadName + ".append"));
567     // Preallocate objects to use on the ring buffer.  The way that appends and syncs work, we will
568     // be stuck and make no progress if the buffer is filled with appends only and there is no
569     // sync. If no sync, then the handlers will be outstanding just waiting on sync completion
570     // before they return.
571     final int preallocatedEventCount =
572       this.conf.getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16);
573     // Using BlockingWaitStrategy.  Stuff that is going on here takes so long it makes no sense
574     // spinning as other strategies do.
575     this.disruptor =
576       new Disruptor<RingBufferTruck>(RingBufferTruck.EVENT_FACTORY, preallocatedEventCount,
577         this.appendExecutor, ProducerType.MULTI, new BlockingWaitStrategy());
578     // Advance the ring buffer sequence so that it starts from 1 instead of 0,
579     // because SyncFuture.NOT_DONE = 0.
580     this.disruptor.getRingBuffer().next();
581     this.ringBufferEventHandler =
582       new RingBufferEventHandler(conf.getInt("hbase.regionserver.hlog.syncer.count", 5),
583         maxHandlersCount);
584     this.disruptor.handleExceptionsWith(new RingBufferExceptionHandler());
585     this.disruptor.handleEventsWith(new RingBufferEventHandler [] {this.ringBufferEventHandler});
586     // Presize our map of SyncFutures by handler objects.
587     this.syncFuturesByHandler = new ConcurrentHashMap<Thread, SyncFuture>(maxHandlersCount);
588     // Starting up threads in constructor is a no no; Interface should have an init call.
589     this.disruptor.start();
590   }
591 
592   /**
593    * Get the backing files associated with this WAL.
594    * @return may be null if there are no files.
595    */
596   protected FileStatus[] getFiles() throws IOException {
597     return FSUtils.listStatus(fs, fullPathLogDir, ourFiles);
598   }
599 
600   /**
601    * Currently, we need to expose the writer's OutputStream to tests so that they can manipulate
602    * the default behavior (such as setting the maxRecoveryErrorCount value for example (see
603    * {@link TestWALReplay#testReplayEditsWrittenIntoWAL()}). This is done using reflection on the
604    * underlying HDFS OutputStream.
605    * NOTE: This could be removed once Hadoop1 support is removed.
606    * @return null if underlying stream is not ready.
607    */
608   @VisibleForTesting
609   OutputStream getOutputStream() {
610     return this.hdfs_out.getWrappedStream();
611   }
612 
613   @Override
614   public byte [][] rollWriter() throws FailedLogCloseException, IOException {
615     return rollWriter(false);
616   }
617 
618   /**
619    * retrieve the next path to use for writing.
620    * Increments the internal filenum.
621    */
622   private Path getNewPath() throws IOException {
623     this.filenum.set(System.currentTimeMillis());
624     Path newPath = getCurrentFileName();
625     while (fs.exists(newPath)) {
626       this.filenum.incrementAndGet();
627       newPath = getCurrentFileName();
628     }
629     return newPath;
630   }
631 
632   Path getOldPath() {
633     long currentFilenum = this.filenum.get();
634     Path oldPath = null;
635     if (currentFilenum > 0) {
636       // ComputeFilename  will take care of meta wal filename
637       oldPath = computeFilename(currentFilenum);
638     } // I presume if currentFilenum is <= 0, this is first file and null for oldPath if fine?
639     return oldPath;
640   }
641 
642   /**
643    * Tell listeners about pre log roll.
644    * @throws IOException 
645    */
646   private void tellListenersAboutPreLogRoll(final Path oldPath, final Path newPath)
647   throws IOException {
648     if (!this.listeners.isEmpty()) {
649       for (WALActionsListener i : this.listeners) {
650         i.preLogRoll(oldPath, newPath);
651       }
652     }
653   }
654 
655   /**
656    * Tell listeners about post log roll.
657    * @throws IOException 
658    */
659   private void tellListenersAboutPostLogRoll(final Path oldPath, final Path newPath)
660   throws IOException {
661     if (!this.listeners.isEmpty()) {
662       for (WALActionsListener i : this.listeners) {
663         i.postLogRoll(oldPath, newPath);
664       }
665     }
666   }
667 
668   /**
669    * Run a sync after opening to set up the pipeline.
670    * @param nextWriter
671    * @param startTimeNanos
672    */
673   private void preemptiveSync(final ProtobufLogWriter nextWriter) {
674     long startTimeNanos = System.nanoTime();
675     try {
676       nextWriter.sync();
677       postSync(System.nanoTime() - startTimeNanos, 0);
678     } catch (IOException e) {
679       // optimization failed, no need to abort here.
680       LOG.warn("pre-sync failed but an optimization so keep going", e);
681     }
682   }
683 
684   @Override
685   public byte [][] rollWriter(boolean force) throws FailedLogCloseException, IOException {
686     rollWriterLock.lock();
687     try {
688       // Return if nothing to flush.
689       if (!force && (this.writer != null && this.numEntries.get() <= 0)) return null;
690       byte [][] regionsToFlush = null;
691       if (this.closed) {
692         LOG.debug("WAL closed. Skipping rolling of writer");
693         return regionsToFlush;
694       }
695       if (!closeBarrier.beginOp()) {
696         LOG.debug("WAL closing. Skipping rolling of writer");
697         return regionsToFlush;
698       }
699       TraceScope scope = Trace.startSpan("FSHLog.rollWriter");
700       try {
701         Path oldPath = getOldPath();
702         Path newPath = getNewPath();
703         // Any exception from here on is catastrophic, non-recoverable so we currently abort.
704         Writer nextWriter = this.createWriterInstance(newPath);
705         FSDataOutputStream nextHdfsOut = null;
706         if (nextWriter instanceof ProtobufLogWriter) {
707           nextHdfsOut = ((ProtobufLogWriter)nextWriter).getStream();
708           // If a ProtobufLogWriter, go ahead and try and sync to force setup of pipeline.
709           // If this fails, we just keep going.... it is an optimization, not the end of the world.
710           preemptiveSync((ProtobufLogWriter)nextWriter);
711         }
712         tellListenersAboutPreLogRoll(oldPath, newPath);
713         // NewPath could be equal to oldPath if replaceWriter fails.
714         newPath = replaceWriter(oldPath, newPath, nextWriter, nextHdfsOut);
715         tellListenersAboutPostLogRoll(oldPath, newPath);
716         // Can we delete any of the old log files?
717         if (getNumRolledLogFiles() > 0) {
718           cleanOldLogs();
719           regionsToFlush = findRegionsToForceFlush();
720         }
721       } finally {
722         closeBarrier.endOp();
723         assert scope == NullScope.INSTANCE || !scope.isDetached();
724         scope.close();
725       }
726       return regionsToFlush;
727     } finally {
728       rollWriterLock.unlock();
729     }
730   }
731 
732   /**
733    * This method allows subclasses to inject different writers without having to
734    * extend other methods like rollWriter().
735    *
736    * @return Writer instance
737    */
738   protected Writer createWriterInstance(final Path path) throws IOException {
739     return DefaultWALProvider.createWriter(conf, fs, path, false);
740   }
741 
742   private long getLowestSeqId(Map<byte[], Long> seqIdMap) {
743     long result = HConstants.NO_SEQNUM;
744     for (Long seqNum: seqIdMap.values()) {
745       if (result == HConstants.NO_SEQNUM || seqNum.longValue() < result) {
746         result = seqNum.longValue();
747       }
748     }
749     return result;
750   }
751 
752   private <T extends Map<byte[], Long>> Map<byte[], Long> copyMapWithLowestSeqId(
753       Map<byte[], T> mapToCopy) {
754     Map<byte[], Long> copied = Maps.newHashMap();
755     for (Map.Entry<byte[], T> entry: mapToCopy.entrySet()) {
756       long lowestSeqId = getLowestSeqId(entry.getValue());
757       if (lowestSeqId != HConstants.NO_SEQNUM) {
758         copied.put(entry.getKey(), lowestSeqId);
759       }
760     }
761     return copied;
762   }
763 
764   /**
765    * Archive old logs that could be archived: a log is eligible for archiving if all its WALEdits
766    * have been flushed to hfiles.
767    * <p>
768    * For each log file, it compares its region to sequenceId map
769    * (@link {@link FSHLog#highestRegionSequenceIds} with corresponding region entries in
770    * {@link FSHLog#lowestFlushingRegionSequenceIds} and
771    * {@link FSHLog#oldestUnflushedRegionSequenceIds}. If all the regions in the map are flushed
772    * past of their value, then the wal is eligible for archiving.
773    * @throws IOException
774    */
775   private void cleanOldLogs() throws IOException {
776     Map<byte[], Long> lowestFlushingRegionSequenceIdsLocal = null;
777     Map<byte[], Long> oldestUnflushedRegionSequenceIdsLocal = null;
778     List<Path> logsToArchive = new ArrayList<Path>();
779     // make a local copy so as to avoid locking when we iterate over these maps.
780     synchronized (regionSequenceIdLock) {
781       lowestFlushingRegionSequenceIdsLocal =
782           copyMapWithLowestSeqId(this.lowestFlushingStoreSequenceIds);
783       oldestUnflushedRegionSequenceIdsLocal =
784           copyMapWithLowestSeqId(this.oldestUnflushedStoreSequenceIds);
785     }
786     for (Map.Entry<Path, Map<byte[], Long>> e : byWalRegionSequenceIds.entrySet()) {
787       // iterate over the log file.
788       Path log = e.getKey();
789       Map<byte[], Long> sequenceNums = e.getValue();
790       // iterate over the map for this log file, and tell whether it should be archive or not.
791       if (areAllRegionsFlushed(sequenceNums, lowestFlushingRegionSequenceIdsLocal,
792           oldestUnflushedRegionSequenceIdsLocal)) {
793         logsToArchive.add(log);
794         LOG.debug("WAL file ready for archiving " + log);
795       }
796     }
797     for (Path p : logsToArchive) {
798       this.totalLogSize.addAndGet(-this.fs.getFileStatus(p).getLen());
799       archiveLogFile(p);
800       this.byWalRegionSequenceIds.remove(p);
801     }
802   }
803 
804   /**
805    * Takes a region:sequenceId map for a WAL file, and checks whether the file can be archived.
806    * It compares the region entries present in the passed sequenceNums map with the local copy of
807    * {@link #oldestUnflushedRegionSequenceIds} and {@link #lowestFlushingRegionSequenceIds}. If,
808    * for all regions, the value is lesser than the minimum of values present in the
809    * oldestFlushing/UnflushedSeqNums, then the wal file is eligible for archiving.
810    * @param sequenceNums for a WAL, at the time when it was rolled.
811    * @param oldestFlushingMap
812    * @param oldestUnflushedMap
813    * @return true if wal is eligible for archiving, false otherwise.
814    */
815    static boolean areAllRegionsFlushed(Map<byte[], Long> sequenceNums,
816       Map<byte[], Long> oldestFlushingMap, Map<byte[], Long> oldestUnflushedMap) {
817     for (Map.Entry<byte[], Long> regionSeqIdEntry : sequenceNums.entrySet()) {
818       // find region entries in the flushing/unflushed map. If there is no entry, it meansj
819       // a region doesn't have any unflushed entry.
820       long oldestFlushing = oldestFlushingMap.containsKey(regionSeqIdEntry.getKey()) ?
821           oldestFlushingMap.get(regionSeqIdEntry.getKey()) : Long.MAX_VALUE;
822       long oldestUnFlushed = oldestUnflushedMap.containsKey(regionSeqIdEntry.getKey()) ?
823           oldestUnflushedMap.get(regionSeqIdEntry.getKey()) : Long.MAX_VALUE;
824           // do a minimum to be sure to contain oldest sequence Id
825       long minSeqNum = Math.min(oldestFlushing, oldestUnFlushed);
826       if (minSeqNum <= regionSeqIdEntry.getValue()) return false;// can't archive
827     }
828     return true;
829   }
830 
831   /**
832    * Iterates over the given map of regions, and compares their sequence numbers with corresponding
833    * entries in {@link #oldestUnflushedRegionSequenceIds}. If the sequence number is greater or
834    * equal, the region is eligible to flush, otherwise, there is no benefit to flush (from the
835    * perspective of passed regionsSequenceNums map), because the region has already flushed the
836    * entries present in the WAL file for which this method is called for (typically, the oldest
837    * wal file).
838    * @param regionsSequenceNums
839    * @return regions which should be flushed (whose sequence numbers are larger than their
840    * corresponding un-flushed entries.
841    */
842   private byte[][] findEligibleMemstoresToFlush(Map<byte[], Long> regionsSequenceNums) {
843     List<byte[]> regionsToFlush = null;
844     // Keeping the old behavior of iterating unflushedSeqNums under oldestSeqNumsLock.
845     synchronized (regionSequenceIdLock) {
846       for (Map.Entry<byte[], Long> e: regionsSequenceNums.entrySet()) {
847         long unFlushedVal = getEarliestMemstoreSeqNum(e.getKey());
848         if (unFlushedVal != HConstants.NO_SEQNUM && unFlushedVal <= e.getValue()) {
849           if (regionsToFlush == null)
850             regionsToFlush = new ArrayList<byte[]>();
851           regionsToFlush.add(e.getKey());
852         }
853       }
854     }
855     return regionsToFlush == null ? null : regionsToFlush
856         .toArray(new byte[][] { HConstants.EMPTY_BYTE_ARRAY });
857   }
858 
859   /**
860    * If the number of un-archived WAL files is greater than maximum allowed, it checks
861    * the first (oldest) WAL file, and returns the regions which should be flushed so that it could
862    * be archived.
863    * @return regions to flush in order to archive oldest wal file.
864    * @throws IOException
865    */
866   byte[][] findRegionsToForceFlush() throws IOException {
867     byte [][] regions = null;
868     int logCount = getNumRolledLogFiles();
869     if (logCount > this.maxLogs && logCount > 0) {
870       Map.Entry<Path, Map<byte[], Long>> firstWALEntry =
871         this.byWalRegionSequenceIds.firstEntry();
872       regions = findEligibleMemstoresToFlush(firstWALEntry.getValue());
873     }
874     if (regions != null) {
875       StringBuilder sb = new StringBuilder();
876       for (int i = 0; i < regions.length; i++) {
877         if (i > 0) sb.append(", ");
878         sb.append(Bytes.toStringBinary(regions[i]));
879       }
880       LOG.info("Too many wals: logs=" + logCount + ", maxlogs=" +
881          this.maxLogs + "; forcing flush of " + regions.length + " regions(s): " +
882          sb.toString());
883     }
884     return regions;
885   }
886 
887   /**
888    * Cleans up current writer closing it and then puts in place the passed in
889    * <code>nextWriter</code>.
890    *
891    * In the case of creating a new WAL, oldPath will be null.
892    *
893    * In the case of rolling over from one file to the next, none of the params will be null.
894    *
895    * In the case of closing out this FSHLog with no further use newPath, nextWriter, and
896    * nextHdfsOut will be null.
897    *
898    * @param oldPath may be null
899    * @param newPath may be null
900    * @param nextWriter may be null
901    * @param nextHdfsOut may be null
902    * @return the passed in <code>newPath</code>
903    * @throws IOException if there is a problem flushing or closing the underlying FS
904    */
905   Path replaceWriter(final Path oldPath, final Path newPath, Writer nextWriter,
906       final FSDataOutputStream nextHdfsOut)
907   throws IOException {
908     // Ask the ring buffer writer to pause at a safe point.  Once we do this, the writer
909     // thread will eventually pause. An error hereafter needs to release the writer thread
910     // regardless -- hence the finally block below.  Note, this method is called from the FSHLog
911     // constructor BEFORE the ring buffer is set running so it is null on first time through
912     // here; allow for that.
913     SyncFuture syncFuture = null;
914     SafePointZigZagLatch zigzagLatch = (this.ringBufferEventHandler == null)?
915       null: this.ringBufferEventHandler.attainSafePoint();
916     TraceScope scope = Trace.startSpan("FSHFile.replaceWriter");
917     try {
918       // Wait on the safe point to be achieved.  Send in a sync in case nothing has hit the
919       // ring buffer between the above notification of writer that we want it to go to
920       // 'safe point' and then here where we are waiting on it to attain safe point.  Use
921       // 'sendSync' instead of 'sync' because we do not want this thread to block waiting on it
922       // to come back.  Cleanup this syncFuture down below after we are ready to run again.
923       try {
924         if (zigzagLatch != null) {
925           Trace.addTimelineAnnotation("awaiting safepoint");
926           syncFuture = zigzagLatch.waitSafePoint(publishSyncOnRingBuffer());
927         }
928       } catch (FailedSyncBeforeLogCloseException e) {
929         if (isUnflushedEntries()) throw e;
930         // Else, let is pass through to the close.
931         LOG.warn("Failed last sync but no outstanding unsync edits so falling through to close; " +
932           e.getMessage());
933       }
934 
935       // It is at the safe point.  Swap out writer from under the blocked writer thread.
936       // TODO: This is close is inline with critical section.  Should happen in background?
937       try {
938         if (this.writer != null) {
939           Trace.addTimelineAnnotation("closing writer");
940           this.writer.close();
941           Trace.addTimelineAnnotation("writer closed");
942         }
943         this.closeErrorCount.set(0);
944       } catch (IOException ioe) {
945         int errors = closeErrorCount.incrementAndGet();
946         if (!isUnflushedEntries() && (errors <= this.closeErrorsTolerated)) {
947           LOG.warn("Riding over failed WAL close of " + oldPath + ", cause=\"" +
948             ioe.getMessage() + "\", errors=" + errors +
949             "; THIS FILE WAS NOT CLOSED BUT ALL EDITS SYNCED SO SHOULD BE OK");
950         } else {
951           throw ioe;
952         }
953       }
954       this.writer = nextWriter;
955       this.hdfs_out = nextHdfsOut;
956       int oldNumEntries = this.numEntries.get();
957       this.numEntries.set(0);
958       final String newPathString = (null == newPath ? null : FSUtils.getPath(newPath));
959       if (oldPath != null) {
960         this.byWalRegionSequenceIds.put(oldPath, this.highestRegionSequenceIds);
961         this.highestRegionSequenceIds = new HashMap<byte[], Long>();
962         long oldFileLen = this.fs.getFileStatus(oldPath).getLen();
963         this.totalLogSize.addAndGet(oldFileLen);
964         LOG.info("Rolled WAL " + FSUtils.getPath(oldPath) + " with entries=" + oldNumEntries +
965           ", filesize=" + StringUtils.byteDesc(oldFileLen) + "; new WAL " +
966           newPathString);
967       } else {
968         LOG.info("New WAL " + newPathString);
969       }
970     } catch (InterruptedException ie) {
971       // Perpetuate the interrupt
972       Thread.currentThread().interrupt();
973     } catch (IOException e) {
974       long count = getUnflushedEntriesCount();
975       LOG.error("Failed close of WAL writer " + oldPath + ", unflushedEntries=" + count, e);
976       throw new FailedLogCloseException(oldPath + ", unflushedEntries=" + count, e);
977     } finally {
978       try {
979         // Let the writer thread go regardless, whether error or not.
980         if (zigzagLatch != null) {
981           zigzagLatch.releaseSafePoint();
982           // It will be null if we failed our wait on safe point above.
983           if (syncFuture != null) blockOnSync(syncFuture);
984         }
985       } finally {
986         scope.close();
987       }
988     }
989     return newPath;
990   }
991 
992   long getUnflushedEntriesCount() {
993     long highestSynced = this.highestSyncedSequence.get();
994     return highestSynced > this.highestUnsyncedSequence?
995       0: this.highestUnsyncedSequence - highestSynced;
996   }
997 
998   boolean isUnflushedEntries() {
999     return getUnflushedEntriesCount() > 0;
1000   }
1001 
1002   /*
1003    * only public so WALSplitter can use.
1004    * @return archived location of a WAL file with the given path p
1005    */
1006   public static Path getWALArchivePath(Path archiveDir, Path p) {
1007     return new Path(archiveDir, p.getName());
1008   }
1009 
1010   private void archiveLogFile(final Path p) throws IOException {
1011     Path newPath = getWALArchivePath(this.fullPathArchiveDir, p);
1012     // Tell our listeners that a log is going to be archived.
1013     if (!this.listeners.isEmpty()) {
1014       for (WALActionsListener i : this.listeners) {
1015         i.preLogArchive(p, newPath);
1016       }
1017     }
1018     if (!FSUtils.renameAndSetModifyTime(this.fs, p, newPath)) {
1019       throw new IOException("Unable to rename " + p + " to " + newPath);
1020     }
1021     // Tell our listeners that a log has been archived.
1022     if (!this.listeners.isEmpty()) {
1023       for (WALActionsListener i : this.listeners) {
1024         i.postLogArchive(p, newPath);
1025       }
1026     }
1027   }
1028 
1029   /**
1030    * This is a convenience method that computes a new filename with a given
1031    * file-number.
1032    * @param filenum to use
1033    * @return Path
1034    */
1035   protected Path computeFilename(final long filenum) {
1036     if (filenum < 0) {
1037       throw new RuntimeException("wal file number can't be < 0");
1038     }
1039     String child = logFilePrefix + WAL_FILE_NAME_DELIMITER + filenum + logFileSuffix;
1040     return new Path(fullPathLogDir, child);
1041   }
1042 
1043   /**
1044    * This is a convenience method that computes a new filename with a given
1045    * using the current WAL file-number
1046    * @return Path
1047    */
1048   public Path getCurrentFileName() {
1049     return computeFilename(this.filenum.get());
1050   }
1051 
1052   @Override
1053   public String toString() {
1054     return "FSHLog " + logFilePrefix + ":" + logFileSuffix + "(num " + filenum + ")";
1055   }
1056 
1057 /**
1058  * A log file has a creation timestamp (in ms) in its file name ({@link #filenum}.
1059  * This helper method returns the creation timestamp from a given log file.
1060  * It extracts the timestamp assuming the filename is created with the
1061  * {@link #computeFilename(long filenum)} method.
1062  * @param fileName
1063  * @return timestamp, as in the log file name.
1064  */
1065   protected long getFileNumFromFileName(Path fileName) {
1066     if (fileName == null) throw new IllegalArgumentException("file name can't be null");
1067     if (!ourFiles.accept(fileName)) {
1068       throw new IllegalArgumentException("The log file " + fileName +
1069           " doesn't belong to this wal. (" + toString() + ")");
1070     }
1071     final String fileNameString = fileName.toString();
1072     String chompedPath = fileNameString.substring(prefixPathStr.length(),
1073         (fileNameString.length() - logFileSuffix.length()));
1074     return Long.parseLong(chompedPath);
1075   }
1076 
1077   @Override
1078   public void close() throws IOException {
1079     shutdown();
1080     final FileStatus[] files = getFiles();
1081     if (null != files && 0 != files.length) {
1082       for (FileStatus file : files) {
1083         Path p = getWALArchivePath(this.fullPathArchiveDir, file.getPath());
1084         // Tell our listeners that a log is going to be archived.
1085         if (!this.listeners.isEmpty()) {
1086           for (WALActionsListener i : this.listeners) {
1087             i.preLogArchive(file.getPath(), p);
1088           }
1089         }
1090 
1091         if (!FSUtils.renameAndSetModifyTime(fs, file.getPath(), p)) {
1092           throw new IOException("Unable to rename " + file.getPath() + " to " + p);
1093         }
1094         // Tell our listeners that a log was archived.
1095         if (!this.listeners.isEmpty()) {
1096           for (WALActionsListener i : this.listeners) {
1097             i.postLogArchive(file.getPath(), p);
1098           }
1099         }
1100       }
1101       LOG.debug("Moved " + files.length + " WAL file(s) to " +
1102         FSUtils.getPath(this.fullPathArchiveDir));
1103     }
1104     LOG.info("Closed WAL: " + toString() );
1105   }
1106 
1107   @Override
1108   public void shutdown() throws IOException {
1109     if (shutdown.compareAndSet(false, true)) {
1110       try {
1111         // Prevent all further flushing and rolling.
1112         closeBarrier.stopAndDrainOps();
1113       } catch (InterruptedException e) {
1114         LOG.error("Exception while waiting for cache flushes and log rolls", e);
1115         Thread.currentThread().interrupt();
1116       }
1117 
1118       // Shutdown the disruptor.  Will stop after all entries have been processed.  Make sure we
1119       // have stopped incoming appends before calling this else it will not shutdown.  We are
1120       // conservative below waiting a long time and if not elapsed, then halting.
1121       if (this.disruptor != null) {
1122         long timeoutms = conf.getLong("hbase.wal.disruptor.shutdown.timeout.ms", 60000);
1123         try {
1124           this.disruptor.shutdown(timeoutms, TimeUnit.MILLISECONDS);
1125         } catch (TimeoutException e) {
1126           LOG.warn("Timed out bringing down disruptor after " + timeoutms + "ms; forcing halt " +
1127             "(It is a problem if this is NOT an ABORT! -- DATALOSS!!!!)");
1128           this.disruptor.halt();
1129           this.disruptor.shutdown();
1130         }
1131       }
1132       // With disruptor down, this is safe to let go.
1133       if (this.appendExecutor !=  null) this.appendExecutor.shutdown();
1134 
1135       // Tell our listeners that the log is closing
1136       if (!this.listeners.isEmpty()) {
1137         for (WALActionsListener i : this.listeners) {
1138           i.logCloseRequested();
1139         }
1140       }
1141       this.closed = true;
1142       if (LOG.isDebugEnabled()) {
1143         LOG.debug("Closing WAL writer in " + FSUtils.getPath(fullPathLogDir));
1144       }
1145       if (this.writer != null) {
1146         this.writer.close();
1147         this.writer = null;
1148       }
1149     }
1150   }
1151 
1152   /**
1153    * @param now
1154    * @param encodedRegionName Encoded name of the region as returned by
1155    * <code>HRegionInfo#getEncodedNameAsBytes()</code>.
1156    * @param tableName
1157    * @param clusterIds that have consumed the change
1158    * @return New log key.
1159    */
1160   protected WALKey makeKey(byte[] encodedRegionName, TableName tableName, long seqnum,
1161       long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
1162     // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
1163     return new HLogKey(encodedRegionName, tableName, seqnum, now, clusterIds, nonceGroup, nonce);
1164   }
1165   
1166   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH_EXCEPTION",
1167       justification="Will never be null")
1168   @Override
1169   public long append(final HTableDescriptor htd, final HRegionInfo hri, final WALKey key,
1170       final WALEdit edits, final AtomicLong sequenceId, final boolean inMemstore, 
1171       final List<Cell> memstoreCells) throws IOException {
1172     if (this.closed) throw new IOException("Cannot append; log is closed");
1173     // Make a trace scope for the append.  It is closed on other side of the ring buffer by the
1174     // single consuming thread.  Don't have to worry about it.
1175     TraceScope scope = Trace.startSpan("FSHLog.append");
1176 
1177     // This is crazy how much it takes to make an edit.  Do we need all this stuff!!!!????  We need
1178     // all this to make a key and then below to append the edit, we need to carry htd, info,
1179     // etc. all over the ring buffer.
1180     FSWALEntry entry = null;
1181     long sequence = this.disruptor.getRingBuffer().next();
1182     try {
1183       RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence);
1184       // Construction of FSWALEntry sets a latch.  The latch is thrown just after we stamp the
1185       // edit with its edit/sequence id.  The below entry.getRegionSequenceId will wait on the
1186       // latch to be thrown.  TODO: reuse FSWALEntry as we do SyncFuture rather create per append.
1187       entry = new FSWALEntry(sequence, key, edits, sequenceId, inMemstore, htd, hri, memstoreCells);
1188       truck.loadPayload(entry, scope.detach());
1189     } finally {
1190       this.disruptor.getRingBuffer().publish(sequence);
1191     }
1192     return sequence;
1193   }
1194 
1195   /**
1196    * Thread to runs the hdfs sync call. This call takes a while to complete.  This is the longest
1197    * pole adding edits to the WAL and this must complete to be sure all edits persisted.  We run
1198    * multiple threads sync'ng rather than one that just syncs in series so we have better
1199    * latencies; otherwise, an edit that arrived just after a sync started, might have to wait
1200    * almost the length of two sync invocations before it is marked done.
1201    * <p>When the sync completes, it marks all the passed in futures done.  On the other end of the
1202    * sync future is a blocked thread, usually a regionserver Handler.  There may be more than one
1203    * future passed in the case where a few threads arrive at about the same time and all invoke
1204    * 'sync'.  In this case we'll batch up the invocations and run one filesystem sync only for a
1205    * batch of Handler sync invocations.  Do not confuse these Handler SyncFutures with the futures
1206    * an ExecutorService returns when you call submit. We have no use for these in this model. These
1207    * SyncFutures are 'artificial', something to hold the Handler until the filesystem sync
1208    * completes.
1209    */
1210   private class SyncRunner extends HasThread {
1211     private volatile long sequence;
1212     private final BlockingQueue<SyncFuture> syncFutures;
1213  
1214     /**
1215      * UPDATE! 
1216      * @param syncs the batch of calls to sync that arrived as this thread was starting; when done,
1217      * we will put the result of the actual hdfs sync call as the result.
1218      * @param sequence The sequence number on the ring buffer when this thread was set running.
1219      * If this actual writer sync completes then all appends up this point have been
1220      * flushed/synced/pushed to datanodes.  If we fail, then the passed in <code>syncs</code>
1221      * futures will return the exception to their clients; some of the edits may have made it out
1222      * to data nodes but we will report all that were part of this session as failed.
1223      */
1224     SyncRunner(final String name, final int maxHandlersCount) {
1225       super(name);
1226       // LinkedBlockingQueue because of
1227       // http://www.javacodegeeks.com/2010/09/java-best-practices-queue-battle-and.html
1228       // Could use other blockingqueues here or concurrent queues.
1229       //
1230       // We could let the capacity be 'open' but bound it so we get alerted in pathological case
1231       // where we cannot sync and we have a bunch of threads all backed up waiting on their syncs
1232       // to come in.  LinkedBlockingQueue actually shrinks when you remove elements so Q should
1233       // stay neat and tidy in usual case.  Let the max size be three times the maximum handlers.
1234       // The passed in maxHandlerCount is the user-level handlers which is what we put up most of
1235       // but HBase has other handlers running too -- opening region handlers which want to write
1236       // the meta table when succesful (i.e. sync), closing handlers -- etc.  These are usually
1237       // much fewer in number than the user-space handlers so Q-size should be user handlers plus
1238       // some space for these other handlers.  Lets multiply by 3 for good-measure.
1239       this.syncFutures = new LinkedBlockingQueue<SyncFuture>(maxHandlersCount * 3);
1240     }
1241 
1242     void offer(final long sequence, final SyncFuture [] syncFutures, final int syncFutureCount) {
1243       // Set sequence first because the add to the queue will wake the thread if sleeping.
1244       this.sequence = sequence;
1245       this.syncFutures.addAll(Arrays.asList(syncFutures).subList(0, syncFutureCount));
1246     }
1247 
1248     /**
1249      * Release the passed <code>syncFuture</code>
1250      * @param syncFuture
1251      * @param currentSequence
1252      * @param t
1253      * @return Returns 1.
1254      */
1255     private int releaseSyncFuture(final SyncFuture syncFuture, final long currentSequence,
1256         final Throwable t) {
1257       if (!syncFuture.done(currentSequence, t)) throw new IllegalStateException();
1258       // This function releases one sync future only.
1259       return 1;
1260     }
1261  
1262     /**
1263      * Release all SyncFutures whose sequence is <= <code>currentSequence</code>.
1264      * @param currentSequence
1265      * @param t May be non-null if we are processing SyncFutures because an exception was thrown.
1266      * @return Count of SyncFutures we let go.
1267      */
1268     private int releaseSyncFutures(final long currentSequence, final Throwable t) {
1269       int syncCount = 0;
1270       for (SyncFuture syncFuture; (syncFuture = this.syncFutures.peek()) != null;) {
1271         if (syncFuture.getRingBufferSequence() > currentSequence) break;
1272         releaseSyncFuture(syncFuture, currentSequence, t);
1273         if (!this.syncFutures.remove(syncFuture)) {
1274           throw new IllegalStateException(syncFuture.toString());
1275         }
1276         syncCount++;
1277       }
1278       return syncCount;
1279     }
1280 
1281     /**
1282      * @param sequence The sequence we ran the filesystem sync against.
1283      * @return Current highest synced sequence.
1284      */
1285     private long updateHighestSyncedSequence(long sequence) {
1286       long currentHighestSyncedSequence;
1287       // Set the highestSyncedSequence IFF our current sequence id is the 'highest'.
1288       do {
1289         currentHighestSyncedSequence = highestSyncedSequence.get();
1290         if (currentHighestSyncedSequence >= sequence) {
1291           // Set the sync number to current highwater mark; might be able to let go more
1292           // queued sync futures
1293           sequence = currentHighestSyncedSequence;
1294           break;
1295         }
1296       } while (!highestSyncedSequence.compareAndSet(currentHighestSyncedSequence, sequence));
1297       return sequence;
1298     }
1299 
1300     public void run() {
1301       long currentSequence;
1302       while (!isInterrupted()) {
1303         int syncCount = 0;
1304         SyncFuture takeSyncFuture;
1305         try {
1306           while (true) {
1307             // We have to process what we 'take' from the queue
1308             takeSyncFuture = this.syncFutures.take();
1309             currentSequence = this.sequence;
1310             long syncFutureSequence = takeSyncFuture.getRingBufferSequence();
1311             if (syncFutureSequence > currentSequence) {
1312               throw new IllegalStateException("currentSequence=" + syncFutureSequence +
1313                 ", syncFutureSequence=" + syncFutureSequence);
1314             }
1315             // See if we can process any syncfutures BEFORE we go sync.
1316             long currentHighestSyncedSequence = highestSyncedSequence.get();
1317             if (currentSequence < currentHighestSyncedSequence) {
1318               syncCount += releaseSyncFuture(takeSyncFuture, currentHighestSyncedSequence, null);
1319               // Done with the 'take'.  Go around again and do a new 'take'.
1320               continue;
1321             }
1322             break;
1323           }
1324           // I got something.  Lets run.  Save off current sequence number in case it changes
1325           // while we run.
1326           TraceScope scope = Trace.continueSpan(takeSyncFuture.getSpan());
1327           long start = System.nanoTime();
1328           Throwable t = null;
1329           try {
1330             Trace.addTimelineAnnotation("syncing writer");
1331             writer.sync();
1332             Trace.addTimelineAnnotation("writer synced");
1333             currentSequence = updateHighestSyncedSequence(currentSequence);
1334           } catch (IOException e) {
1335             LOG.error("Error syncing, request close of wal ", e);
1336             t = e;
1337           } catch (Exception e) {
1338             LOG.warn("UNEXPECTED", e);
1339             t = e;
1340           } finally {
1341             // reattach the span to the future before releasing.
1342             takeSyncFuture.setSpan(scope.detach());
1343             // First release what we 'took' from the queue.
1344             syncCount += releaseSyncFuture(takeSyncFuture, currentSequence, t);
1345             // Can we release other syncs?
1346             syncCount += releaseSyncFutures(currentSequence, t);
1347             if (t != null) {
1348               requestLogRoll();
1349             } else checkLogRoll();
1350           }
1351           postSync(System.nanoTime() - start, syncCount);
1352         } catch (InterruptedException e) {
1353           // Presume legit interrupt.
1354           Thread.currentThread().interrupt();
1355         } catch (Throwable t) {
1356           LOG.warn("UNEXPECTED, continuing", t);
1357         }
1358       }
1359     }
1360   }
1361 
1362   /**
1363    * Schedule a log roll if needed.
1364    */
1365   void checkLogRoll() {
1366     // Will return immediately if we are in the middle of a WAL log roll currently.
1367     if (!rollWriterLock.tryLock()) return;
1368     boolean lowReplication;
1369     try {
1370       lowReplication = checkLowReplication();
1371     } finally {
1372       rollWriterLock.unlock();
1373     }
1374     try {
1375       if (lowReplication || writer != null && writer.getLength() > logrollsize) {
1376         requestLogRoll(lowReplication);
1377       }
1378     } catch (IOException e) {
1379       LOG.warn("Writer.getLength() failed; continuing", e);
1380     }
1381   }
1382 
1383   /*
1384    * @return true if number of replicas for the WAL is lower than threshold
1385    */
1386   private boolean checkLowReplication() {
1387     boolean logRollNeeded = false;
1388     // if the number of replicas in HDFS has fallen below the configured
1389     // value, then roll logs.
1390     try {
1391       int numCurrentReplicas = getLogReplication();
1392       if (numCurrentReplicas != 0 && numCurrentReplicas < this.minTolerableReplication) {
1393         if (this.lowReplicationRollEnabled) {
1394           if (this.consecutiveLogRolls.get() < this.lowReplicationRollLimit) {
1395             LOG.warn("HDFS pipeline error detected. " + "Found "
1396                 + numCurrentReplicas + " replicas but expecting no less than "
1397                 + this.minTolerableReplication + " replicas. "
1398                 + " Requesting close of wal.");
1399             logRollNeeded = true;
1400             // If rollWriter is requested, increase consecutiveLogRolls. Once it
1401             // is larger than lowReplicationRollLimit, disable the
1402             // LowReplication-Roller
1403             this.consecutiveLogRolls.getAndIncrement();
1404           } else {
1405             LOG.warn("Too many consecutive RollWriter requests, it's a sign of "
1406                 + "the total number of live datanodes is lower than the tolerable replicas.");
1407             this.consecutiveLogRolls.set(0);
1408             this.lowReplicationRollEnabled = false;
1409           }
1410         }
1411       } else if (numCurrentReplicas >= this.minTolerableReplication) {
1412         if (!this.lowReplicationRollEnabled) {
1413           // The new writer's log replicas is always the default value.
1414           // So we should not enable LowReplication-Roller. If numEntries
1415           // is lower than or equals 1, we consider it as a new writer.
1416           if (this.numEntries.get() <= 1) {
1417             return logRollNeeded;
1418           }
1419           // Once the live datanode number and the replicas return to normal,
1420           // enable the LowReplication-Roller.
1421           this.lowReplicationRollEnabled = true;
1422           LOG.info("LowReplication-Roller was enabled.");
1423         }
1424       }
1425     } catch (Exception e) {
1426       LOG.warn("Unable to invoke DFSOutputStream.getNumCurrentReplicas" + e +
1427         " still proceeding ahead...");
1428     }
1429     return logRollNeeded;
1430   }
1431 
1432   private SyncFuture publishSyncOnRingBuffer() {
1433     return publishSyncOnRingBuffer(null);
1434   }
1435 
1436   private SyncFuture publishSyncOnRingBuffer(Span span) {
1437     long sequence = this.disruptor.getRingBuffer().next();
1438     SyncFuture syncFuture = getSyncFuture(sequence, span);
1439     try {
1440       RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence);
1441       truck.loadPayload(syncFuture);
1442     } finally {
1443       this.disruptor.getRingBuffer().publish(sequence);
1444     }
1445     return syncFuture;
1446   }
1447 
1448   // Sync all known transactions
1449   private Span publishSyncThenBlockOnCompletion(Span span) throws IOException {
1450     return blockOnSync(publishSyncOnRingBuffer(span));
1451   }
1452 
1453   private Span blockOnSync(final SyncFuture syncFuture) throws IOException {
1454     // Now we have published the ringbuffer, halt the current thread until we get an answer back.
1455     try {
1456       syncFuture.get();
1457       return syncFuture.getSpan();
1458     } catch (InterruptedException ie) {
1459       LOG.warn("Interrupted", ie);
1460       throw convertInterruptedExceptionToIOException(ie);
1461     } catch (ExecutionException e) {
1462       throw ensureIOException(e.getCause());
1463     }
1464   }
1465 
1466   private IOException convertInterruptedExceptionToIOException(final InterruptedException ie) {
1467     Thread.currentThread().interrupt();
1468     IOException ioe = new InterruptedIOException();
1469     ioe.initCause(ie);
1470     return ioe;
1471   }
1472 
1473   private SyncFuture getSyncFuture(final long sequence, Span span) {
1474     SyncFuture syncFuture = this.syncFuturesByHandler.get(Thread.currentThread());
1475     if (syncFuture == null) {
1476       syncFuture = new SyncFuture();
1477       this.syncFuturesByHandler.put(Thread.currentThread(), syncFuture);
1478     }
1479     return syncFuture.reset(sequence, span);
1480   }
1481 
1482   private void postSync(final long timeInNanos, final int handlerSyncs) {
1483     if (timeInNanos > this.slowSyncNs) {
1484       String msg =
1485           new StringBuilder().append("Slow sync cost: ")
1486               .append(timeInNanos / 1000000).append(" ms, current pipeline: ")
1487               .append(Arrays.toString(getPipeLine())).toString();
1488       Trace.addTimelineAnnotation(msg);
1489       LOG.info(msg);
1490     }
1491     if (!listeners.isEmpty()) {
1492       for (WALActionsListener listener : listeners) {
1493         listener.postSync(timeInNanos, handlerSyncs);
1494       }
1495     }
1496   }
1497 
1498   private long postAppend(final Entry e, final long elapsedTime) {
1499     long len = 0;
1500     if (!listeners.isEmpty()) {
1501       for (Cell cell : e.getEdit().getCells()) {
1502         len += CellUtil.estimatedSerializedSizeOf(cell);
1503       }
1504       for (WALActionsListener listener : listeners) {
1505         listener.postAppend(len, elapsedTime);
1506       }
1507     }
1508     return len;
1509   }
1510 
1511   /**
1512    * Find the 'getNumCurrentReplicas' on the passed <code>os</code> stream.
1513    * This is used for getting current replicas of a file being written.
1514    * @return Method or null.
1515    */
1516   private Method getGetNumCurrentReplicas(final FSDataOutputStream os) {
1517     // TODO: Remove all this and use the now publically available
1518     // HdfsDataOutputStream#getCurrentBlockReplication()
1519     Method m = null;
1520     if (os != null) {
1521       Class<? extends OutputStream> wrappedStreamClass = os.getWrappedStream().getClass();
1522       try {
1523         m = wrappedStreamClass.getDeclaredMethod("getNumCurrentReplicas", new Class<?>[] {});
1524         m.setAccessible(true);
1525       } catch (NoSuchMethodException e) {
1526         LOG.info("FileSystem's output stream doesn't support getNumCurrentReplicas; " +
1527          "HDFS-826 not available; fsOut=" + wrappedStreamClass.getName());
1528       } catch (SecurityException e) {
1529         LOG.info("No access to getNumCurrentReplicas on FileSystems's output stream; HDFS-826 " +
1530           "not available; fsOut=" + wrappedStreamClass.getName(), e);
1531         m = null; // could happen on setAccessible()
1532       }
1533     }
1534     if (m != null) {
1535       if (LOG.isTraceEnabled()) LOG.trace("Using getNumCurrentReplicas");
1536     }
1537     return m;
1538   }
1539 
1540   /**
1541    * This method gets the datanode replication count for the current WAL.
1542    *
1543    * If the pipeline isn't started yet or is empty, you will get the default
1544    * replication factor.  Therefore, if this function returns 0, it means you
1545    * are not properly running with the HDFS-826 patch.
1546    * @throws InvocationTargetException
1547    * @throws IllegalAccessException
1548    * @throws IllegalArgumentException
1549    *
1550    * @throws Exception
1551    */
1552   @VisibleForTesting
1553   int getLogReplication()
1554   throws IllegalArgumentException, IllegalAccessException, InvocationTargetException {
1555     final OutputStream stream = getOutputStream();
1556     if (this.getNumCurrentReplicas != null && stream != null) {
1557       Object repl = this.getNumCurrentReplicas.invoke(stream, NO_ARGS);
1558       if (repl instanceof Integer) {
1559         return ((Integer)repl).intValue();
1560       }
1561     }
1562     return 0;
1563   }
1564 
1565   @Override
1566   public void sync() throws IOException {
1567     TraceScope scope = Trace.startSpan("FSHLog.sync");
1568     try {
1569       scope = Trace.continueSpan(publishSyncThenBlockOnCompletion(scope.detach()));
1570     } finally {
1571       assert scope == NullScope.INSTANCE || !scope.isDetached();
1572       scope.close();
1573     }
1574   }
1575 
1576   @Override
1577   public void sync(long txid) throws IOException {
1578     if (this.highestSyncedSequence.get() >= txid){
1579       // Already sync'd.
1580       return;
1581     }
1582     TraceScope scope = Trace.startSpan("FSHLog.sync");
1583     try {
1584       scope = Trace.continueSpan(publishSyncThenBlockOnCompletion(scope.detach()));
1585     } finally {
1586       assert scope == NullScope.INSTANCE || !scope.isDetached();
1587       scope.close();
1588     }
1589   }
1590 
1591   // public only until class moves to o.a.h.h.wal
1592   public void requestLogRoll() {
1593     requestLogRoll(false);
1594   }
1595 
1596   private void requestLogRoll(boolean tooFewReplicas) {
1597     if (!this.listeners.isEmpty()) {
1598       for (WALActionsListener i: this.listeners) {
1599         i.logRollRequested(tooFewReplicas);
1600       }
1601     }
1602   }
1603 
1604   // public only until class moves to o.a.h.h.wal
1605   /** @return the number of rolled log files */
1606   public int getNumRolledLogFiles() {
1607     return byWalRegionSequenceIds.size();
1608   }
1609 
1610   // public only until class moves to o.a.h.h.wal
1611   /** @return the number of log files in use */
1612   public int getNumLogFiles() {
1613     // +1 for current use log
1614     return getNumRolledLogFiles() + 1;
1615   }
1616 
1617   // public only until class moves to o.a.h.h.wal
1618   /** @return the size of log files in use */
1619   public long getLogFileSize() {
1620     return this.totalLogSize.get();
1621   }
1622 
1623   @Override
1624   public boolean startCacheFlush(final byte[] encodedRegionName,
1625       Set<byte[]> flushedFamilyNames) {
1626     Map<byte[], Long> oldStoreSeqNum = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
1627     if (!closeBarrier.beginOp()) {
1628       LOG.info("Flush will not be started for " + Bytes.toString(encodedRegionName) +
1629         " - because the server is closing.");
1630       return false;
1631     }
1632     synchronized (regionSequenceIdLock) {
1633       ConcurrentMap<byte[], Long> oldestUnflushedStoreSequenceIdsOfRegion =
1634           oldestUnflushedStoreSequenceIds.get(encodedRegionName);
1635       if (oldestUnflushedStoreSequenceIdsOfRegion != null) {
1636         for (byte[] familyName: flushedFamilyNames) {
1637           Long seqId = oldestUnflushedStoreSequenceIdsOfRegion.remove(familyName);
1638           if (seqId != null) {
1639             oldStoreSeqNum.put(familyName, seqId);
1640           }
1641         }
1642         if (!oldStoreSeqNum.isEmpty()) {
1643           Map<byte[], Long> oldValue = this.lowestFlushingStoreSequenceIds.put(
1644               encodedRegionName, oldStoreSeqNum);
1645           assert oldValue == null: "Flushing map not cleaned up for "
1646               + Bytes.toString(encodedRegionName);
1647         }
1648         if (oldestUnflushedStoreSequenceIdsOfRegion.isEmpty()) {
1649           // Remove it otherwise it will be in oldestUnflushedStoreSequenceIds for ever
1650           // even if the region is already moved to other server.
1651           // Do not worry about data racing, we held write lock of region when calling
1652           // startCacheFlush, so no one can add value to the map we removed.
1653           oldestUnflushedStoreSequenceIds.remove(encodedRegionName);
1654         }
1655       }
1656     }
1657     if (oldStoreSeqNum.isEmpty()) {
1658       // TODO: if we have no oldStoreSeqNum, and WAL is not disabled, presumably either
1659       // the region is already flushing (which would make this call invalid), or there
1660       // were no appends after last flush, so why are we starting flush? Maybe we should
1661       // assert not empty. Less rigorous, but safer, alternative is telling the caller to stop.
1662       // For now preserve old logic.
1663       LOG.warn("Couldn't find oldest seqNum for the region we are about to flush: ["
1664         + Bytes.toString(encodedRegionName) + "]");
1665     }
1666     return true;
1667   }
1668 
1669   @Override
1670   public void completeCacheFlush(final byte [] encodedRegionName) {
1671     synchronized (regionSequenceIdLock) {
1672       this.lowestFlushingStoreSequenceIds.remove(encodedRegionName);
1673     }
1674     closeBarrier.endOp();
1675   }
1676 
1677   private ConcurrentMap<byte[], Long> getOrCreateOldestUnflushedStoreSequenceIdsOfRegion(
1678       byte[] encodedRegionName) {
1679     ConcurrentMap<byte[], Long> oldestUnflushedStoreSequenceIdsOfRegion =
1680         oldestUnflushedStoreSequenceIds.get(encodedRegionName);
1681     if (oldestUnflushedStoreSequenceIdsOfRegion != null) {
1682       return oldestUnflushedStoreSequenceIdsOfRegion;
1683     }
1684     oldestUnflushedStoreSequenceIdsOfRegion =
1685         new ConcurrentSkipListMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
1686     ConcurrentMap<byte[], Long> alreadyPut =
1687         oldestUnflushedStoreSequenceIds.put(encodedRegionName,
1688           oldestUnflushedStoreSequenceIdsOfRegion);
1689     return alreadyPut == null ? oldestUnflushedStoreSequenceIdsOfRegion : alreadyPut;
1690   }
1691 
1692   @Override
1693   public void abortCacheFlush(byte[] encodedRegionName) {
1694     Map<byte[], Long> storeSeqNumsBeforeFlushStarts;
1695     Map<byte[], Long> currentStoreSeqNums = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
1696     synchronized (regionSequenceIdLock) {
1697       storeSeqNumsBeforeFlushStarts = this.lowestFlushingStoreSequenceIds.remove(
1698         encodedRegionName);
1699       if (storeSeqNumsBeforeFlushStarts != null) {
1700         ConcurrentMap<byte[], Long> oldestUnflushedStoreSequenceIdsOfRegion =
1701             getOrCreateOldestUnflushedStoreSequenceIdsOfRegion(encodedRegionName);
1702         for (Map.Entry<byte[], Long> familyNameAndSeqId: storeSeqNumsBeforeFlushStarts
1703             .entrySet()) {
1704           currentStoreSeqNums.put(familyNameAndSeqId.getKey(),
1705             oldestUnflushedStoreSequenceIdsOfRegion.put(familyNameAndSeqId.getKey(),
1706               familyNameAndSeqId.getValue()));
1707         }
1708       }
1709     }
1710     closeBarrier.endOp();
1711     if (storeSeqNumsBeforeFlushStarts != null) {
1712       for (Map.Entry<byte[], Long> familyNameAndSeqId : storeSeqNumsBeforeFlushStarts.entrySet()) {
1713         Long currentSeqNum = currentStoreSeqNums.get(familyNameAndSeqId.getKey());
1714         if (currentSeqNum != null
1715             && currentSeqNum.longValue() <= familyNameAndSeqId.getValue().longValue()) {
1716           String errorStr =
1717               "Region " + Bytes.toString(encodedRegionName) + " family "
1718                   + Bytes.toString(familyNameAndSeqId.getKey())
1719                   + " acquired edits out of order current memstore seq=" + currentSeqNum
1720                   + ", previous oldest unflushed id=" + familyNameAndSeqId.getValue();
1721           LOG.error(errorStr);
1722           Runtime.getRuntime().halt(1);
1723         }
1724       }
1725     }
1726   }
1727 
1728   @VisibleForTesting
1729   boolean isLowReplicationRollEnabled() {
1730       return lowReplicationRollEnabled;
1731   }
1732 
1733   public static final long FIXED_OVERHEAD = ClassSize.align(
1734     ClassSize.OBJECT + (5 * ClassSize.REFERENCE) +
1735     ClassSize.ATOMIC_INTEGER + Bytes.SIZEOF_INT + (3 * Bytes.SIZEOF_LONG));
1736 
1737   private static void split(final Configuration conf, final Path p)
1738   throws IOException {
1739     FileSystem fs = FileSystem.get(conf);
1740     if (!fs.exists(p)) {
1741       throw new FileNotFoundException(p.toString());
1742     }
1743     if (!fs.getFileStatus(p).isDirectory()) {
1744       throw new IOException(p + " is not a directory");
1745     }
1746 
1747     final Path baseDir = FSUtils.getRootDir(conf);
1748     final Path archiveDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME);
1749     WALSplitter.split(baseDir, p, archiveDir, fs, conf, WALFactory.getInstance(conf));
1750   }
1751 
1752 
1753   @Override
1754   public long getEarliestMemstoreSeqNum(byte[] encodedRegionName) {
1755     ConcurrentMap<byte[], Long> oldestUnflushedStoreSequenceIdsOfRegion =
1756         this.oldestUnflushedStoreSequenceIds.get(encodedRegionName);
1757     return oldestUnflushedStoreSequenceIdsOfRegion != null ?
1758         getLowestSeqId(oldestUnflushedStoreSequenceIdsOfRegion) : HConstants.NO_SEQNUM;
1759   }
1760 
1761   @Override
1762   public long getEarliestMemstoreSeqNum(byte[] encodedRegionName,
1763       byte[] familyName) {
1764     ConcurrentMap<byte[], Long> oldestUnflushedStoreSequenceIdsOfRegion =
1765         this.oldestUnflushedStoreSequenceIds.get(encodedRegionName);
1766     if (oldestUnflushedStoreSequenceIdsOfRegion != null) {
1767       Long result = oldestUnflushedStoreSequenceIdsOfRegion.get(familyName);
1768       return result != null ? result.longValue() : HConstants.NO_SEQNUM;
1769     } else {
1770       return HConstants.NO_SEQNUM;
1771     }
1772   }
1773 
1774   /**
1775    * This class is used coordinating two threads holding one thread at a
1776    * 'safe point' while the orchestrating thread does some work that requires the first thread
1777    * paused: e.g. holding the WAL writer while its WAL is swapped out from under it by another
1778    * thread.
1779    * 
1780    * <p>Thread A signals Thread B to hold when it gets to a 'safe point'.  Thread A wait until
1781    * Thread B gets there. When the 'safe point' has been attained, Thread B signals Thread A.
1782    * Thread B then holds at the 'safe point'.  Thread A on notification that Thread B is paused,
1783    * goes ahead and does the work it needs to do while Thread B is holding.  When Thread A is done,
1784    * it flags B and then Thread A and Thread B continue along on their merry way.  Pause and
1785    * signalling 'zigzags' between the two participating threads.  We use two latches -- one the
1786    * inverse of the other -- pausing and signaling when states are achieved.
1787    * 
1788    * <p>To start up the drama, Thread A creates an instance of this class each time it would do
1789    * this zigzag dance and passes it to Thread B (these classes use Latches so it is one shot
1790    * only). Thread B notices the new instance (via reading a volatile reference or how ever) and it
1791    * starts to work toward the 'safe point'.  Thread A calls {@link #waitSafePoint()} when it
1792    * cannot proceed until the Thread B 'safe point' is attained. Thread A will be held inside in
1793    * {@link #waitSafePoint()} until Thread B reaches the 'safe point'.  Once there, Thread B
1794    * frees Thread A by calling {@link #safePointAttained()}.  Thread A now knows Thread B
1795    * is at the 'safe point' and that it is holding there (When Thread B calls
1796    * {@link #safePointAttained()} it blocks here until Thread A calls {@link #releaseSafePoint()}).
1797    * Thread A proceeds to do what it needs to do while Thread B is paused.  When finished,
1798    * it lets Thread B lose by calling {@link #releaseSafePoint()} and away go both Threads again.
1799    */
1800   static class SafePointZigZagLatch {
1801     /**
1802      * Count down this latch when safe point attained.
1803      */
1804     private volatile CountDownLatch safePointAttainedLatch = new CountDownLatch(1);
1805     /**
1806      * Latch to wait on.  Will be released when we can proceed.
1807      */
1808     private volatile CountDownLatch safePointReleasedLatch = new CountDownLatch(1);
1809  
1810     /**
1811      * For Thread A to call when it is ready to wait on the 'safe point' to be attained.
1812      * Thread A will be held in here until Thread B calls {@link #safePointAttained()}
1813      * @throws InterruptedException
1814      * @throws ExecutionException
1815      * @param syncFuture We need this as barometer on outstanding syncs.  If it comes home with
1816      * an exception, then something is up w/ our syncing.
1817      * @return The passed <code>syncFuture</code>
1818      * @throws FailedSyncBeforeLogCloseException 
1819      */
1820     SyncFuture waitSafePoint(final SyncFuture syncFuture)
1821     throws InterruptedException, FailedSyncBeforeLogCloseException {
1822       while (true) {
1823         if (this.safePointAttainedLatch.await(1, TimeUnit.NANOSECONDS)) break;
1824         if (syncFuture.isThrowable()) {
1825           throw new FailedSyncBeforeLogCloseException(syncFuture.getThrowable());
1826         }
1827       }
1828       return syncFuture;
1829     }
1830  
1831     /**
1832      * Called by Thread B when it attains the 'safe point'.  In this method, Thread B signals
1833      * Thread A it can proceed. Thread B will be held in here until {@link #releaseSafePoint()}
1834      * is called by Thread A.
1835      * @throws InterruptedException
1836      */
1837     void safePointAttained() throws InterruptedException {
1838       this.safePointAttainedLatch.countDown();
1839       this.safePointReleasedLatch.await();
1840     }
1841 
1842     /**
1843      * Called by Thread A when it is done with the work it needs to do while Thread B is
1844      * halted.  This will release the Thread B held in a call to {@link #safePointAttained()}
1845      */
1846     void releaseSafePoint() {
1847       this.safePointReleasedLatch.countDown();
1848     }
1849 
1850     /**
1851      * @return True is this is a 'cocked', fresh instance, and not one that has already fired.
1852      */
1853     boolean isCocked() {
1854       return this.safePointAttainedLatch.getCount() > 0 &&
1855         this.safePointReleasedLatch.getCount() > 0;
1856     }
1857   }
1858 
1859   /**
1860    * Handler that is run by the disruptor ringbuffer consumer. Consumer is a SINGLE
1861    * 'writer/appender' thread.  Appends edits and starts up sync runs.  Tries its best to batch up
1862    * syncs.  There is no discernible benefit batching appends so we just append as they come in
1863    * because it simplifies the below implementation.  See metrics for batching effectiveness
1864    * (In measurement, at 100 concurrent handlers writing 1k, we are batching > 10 appends and 10
1865    * handler sync invocations for every actual dfsclient sync call; at 10 concurrent handlers,
1866    * YMMV).
1867    * <p>Herein, we have an array into which we store the sync futures as they come in.  When we
1868    * have a 'batch', we'll then pass what we have collected to a SyncRunner thread to do the
1869    * filesystem sync.  When it completes, it will then call
1870    * {@link SyncFuture#done(long, Throwable)} on each of SyncFutures in the batch to release
1871    * blocked Handler threads.
1872    * <p>I've tried various effects to try and make latencies low while keeping throughput high.
1873    * I've tried keeping a single Queue of SyncFutures in this class appending to its tail as the
1874    * syncs coming and having sync runner threads poll off the head to 'finish' completed
1875    * SyncFutures.  I've tried linkedlist, and various from concurrent utils whether
1876    * LinkedBlockingQueue or ArrayBlockingQueue, etc.  The more points of synchronization, the
1877    * more 'work' (according to 'perf stats') that has to be done; small increases in stall
1878    * percentages seem to have a big impact on throughput/latencies.  The below model where we have
1879    * an array into which we stash the syncs and then hand them off to the sync thread seemed like
1880    * a decent compromise.  See HBASE-8755 for more detail.
1881    */
1882   class RingBufferEventHandler implements EventHandler<RingBufferTruck>, LifecycleAware {
1883     private final SyncRunner [] syncRunners;
1884     private final SyncFuture [] syncFutures;
1885     // Had 'interesting' issues when this was non-volatile.  On occasion, we'd not pass all
1886     // syncFutures to the next sync'ing thread.
1887     private volatile int syncFuturesCount = 0;
1888     private volatile SafePointZigZagLatch zigzagLatch;
1889     /**
1890      * Object to block on while waiting on safe point.
1891      */
1892     private final Object safePointWaiter = new Object();
1893     private volatile boolean shutdown = false;
1894 
1895     /**
1896      * Which syncrunner to use next.
1897      */
1898     private int syncRunnerIndex;
1899 
1900     RingBufferEventHandler(final int syncRunnerCount, final int maxHandlersCount) {
1901       this.syncFutures = new SyncFuture[maxHandlersCount];
1902       this.syncRunners = new SyncRunner[syncRunnerCount];
1903       for (int i = 0; i < syncRunnerCount; i++) {
1904         this.syncRunners[i] = new SyncRunner("sync." + i, maxHandlersCount);
1905       }
1906     }
1907 
1908     private void cleanupOutstandingSyncsOnException(final long sequence, final Exception e) {
1909       for (int i = 0; i < this.syncFuturesCount; i++) this.syncFutures[i].done(sequence, e);
1910       this.syncFuturesCount = 0;
1911     }
1912 
1913     @Override
1914     // We can set endOfBatch in the below method if at end of our this.syncFutures array
1915     public void onEvent(final RingBufferTruck truck, final long sequence, boolean endOfBatch)
1916     throws Exception {
1917       // Appends and syncs are coming in order off the ringbuffer.  We depend on this fact.  We'll
1918       // add appends to dfsclient as they come in.  Batching appends doesn't give any significant
1919       // benefit on measurement.  Handler sync calls we will batch up.
1920 
1921       try {
1922         if (truck.hasSyncFuturePayload()) {
1923           this.syncFutures[this.syncFuturesCount++] = truck.unloadSyncFuturePayload();
1924           // Force flush of syncs if we are carrying a full complement of syncFutures.
1925           if (this.syncFuturesCount == this.syncFutures.length) endOfBatch = true;
1926         } else if (truck.hasFSWALEntryPayload()) {
1927           TraceScope scope = Trace.continueSpan(truck.unloadSpanPayload());
1928           try {
1929             append(truck.unloadFSWALEntryPayload());
1930           } catch (Exception e) {
1931             // If append fails, presume any pending syncs will fail too; let all waiting handlers
1932             // know of the exception.
1933             cleanupOutstandingSyncsOnException(sequence, e);
1934             // Return to keep processing.
1935             return;
1936           } finally {
1937             assert scope == NullScope.INSTANCE || !scope.isDetached();
1938             scope.close(); // append scope is complete
1939           }
1940         } else {
1941           // They can't both be null.  Fail all up to this!!!
1942           cleanupOutstandingSyncsOnException(sequence,
1943             new IllegalStateException("Neither append nor sync"));
1944           // Return to keep processing.
1945           return;
1946         }
1947 
1948         // TODO: Check size and if big go ahead and call a sync if we have enough data.
1949 
1950         // If not a batch, return to consume more events from the ring buffer before proceeding;
1951         // we want to get up a batch of syncs and appends before we go do a filesystem sync.
1952         if (!endOfBatch || this.syncFuturesCount <= 0) return;
1953 
1954         // Now we have a batch.
1955 
1956         if (LOG.isTraceEnabled()) {
1957           LOG.trace("Sequence=" + sequence + ", syncCount=" + this.syncFuturesCount);
1958         }
1959 
1960         // Below expects that the offer 'transfers' responsibility for the outstanding syncs to the
1961         // syncRunner. We should never get an exception in here. HBASE-11145 was because queue
1962         // was sized exactly to the count of user handlers but we could have more if we factor in
1963         // meta handlers doing opens and closes.
1964         int index = Math.abs(this.syncRunnerIndex++) % this.syncRunners.length;
1965         try {
1966           this.syncRunners[index].offer(sequence, this.syncFutures, this.syncFuturesCount);
1967         } catch (Exception e) {
1968           cleanupOutstandingSyncsOnException(sequence, e);
1969           throw e;
1970         }
1971         attainSafePoint(sequence);
1972         this.syncFuturesCount = 0;
1973       } catch (Throwable t) {
1974         LOG.error("UNEXPECTED!!! syncFutures.length=" + this.syncFutures.length, t);
1975       }
1976     }
1977 
1978     SafePointZigZagLatch attainSafePoint() {
1979       this.zigzagLatch = new SafePointZigZagLatch();
1980       return this.zigzagLatch;
1981     }
1982 
1983     /**
1984      * Check if we should attain safe point.  If so, go there and then wait till signalled before
1985      * we proceeding.
1986      */
1987     private void attainSafePoint(final long currentSequence) {
1988       if (this.zigzagLatch == null || !this.zigzagLatch.isCocked()) return;
1989       // If here, another thread is waiting on us to get to safe point.  Don't leave it hanging.
1990       try {
1991         // Wait on outstanding syncers; wait for them to finish syncing (unless we've been
1992         // shutdown or unless our latch has been thrown because we have been aborted).
1993         while (!this.shutdown && this.zigzagLatch.isCocked() &&
1994             highestSyncedSequence.get() < currentSequence) {
1995           synchronized (this.safePointWaiter) {
1996             this.safePointWaiter.wait(0, 1);
1997           }
1998         }
1999         // Tell waiting thread we've attained safe point
2000         this.zigzagLatch.safePointAttained();
2001       } catch (InterruptedException e) {
2002         LOG.warn("Interrupted ", e);
2003         Thread.currentThread().interrupt();
2004       }
2005     }
2006 
2007     private void updateOldestUnflushedSequenceIds(byte[] encodedRegionName,
2008         Set<byte[]> familyNameSet, Long lRegionSequenceId) {
2009       ConcurrentMap<byte[], Long> oldestUnflushedStoreSequenceIdsOfRegion =
2010           getOrCreateOldestUnflushedStoreSequenceIdsOfRegion(encodedRegionName);
2011       for (byte[] familyName : familyNameSet) {
2012         oldestUnflushedStoreSequenceIdsOfRegion.putIfAbsent(familyName, lRegionSequenceId);
2013       }
2014     }
2015 
2016     /**
2017      * Append to the WAL.  Does all CP and WAL listener calls.
2018      * @param entry
2019      * @throws Exception
2020      */
2021     void append(final FSWALEntry entry) throws Exception {
2022       // TODO: WORK ON MAKING THIS APPEND FASTER. DOING WAY TOO MUCH WORK WITH CPs, PBing, etc.
2023       atHeadOfRingBufferEventHandlerAppend();
2024 
2025       long start = EnvironmentEdgeManager.currentTime();
2026       byte [] encodedRegionName = entry.getKey().getEncodedRegionName();
2027       long regionSequenceId = WALKey.NO_SEQUENCE_ID;
2028       try {
2029         // We are about to append this edit; update the region-scoped sequence number.  Do it
2030         // here inside this single appending/writing thread.  Events are ordered on the ringbuffer
2031         // so region sequenceids will also be in order.
2032         regionSequenceId = entry.stampRegionSequenceId();
2033         
2034         // Edits are empty, there is nothing to append.  Maybe empty when we are looking for a 
2035         // region sequence id only, a region edit/sequence id that is not associated with an actual 
2036         // edit. It has to go through all the rigmarole to be sure we have the right ordering.
2037         if (entry.getEdit().isEmpty()) {
2038           return;
2039         }
2040         
2041         // Coprocessor hook.
2042         if (!coprocessorHost.preWALWrite(entry.getHRegionInfo(), entry.getKey(),
2043             entry.getEdit())) {
2044           if (entry.getEdit().isReplay()) {
2045             // Set replication scope null so that this won't be replicated
2046             entry.getKey().setScopes(null);
2047           }
2048         }
2049         if (!listeners.isEmpty()) {
2050           for (WALActionsListener i: listeners) {
2051             // TODO: Why does listener take a table description and CPs take a regioninfo?  Fix.
2052             i.visitLogEntryBeforeWrite(entry.getHTableDescriptor(), entry.getKey(),
2053               entry.getEdit());
2054           }
2055         }
2056 
2057         writer.append(entry);
2058         assert highestUnsyncedSequence < entry.getSequence();
2059         highestUnsyncedSequence = entry.getSequence();
2060         Long lRegionSequenceId = Long.valueOf(regionSequenceId);
2061         highestRegionSequenceIds.put(encodedRegionName, lRegionSequenceId);
2062         if (entry.isInMemstore()) {
2063           updateOldestUnflushedSequenceIds(encodedRegionName,
2064               entry.getFamilyNames(), lRegionSequenceId);
2065         }
2066 
2067         coprocessorHost.postWALWrite(entry.getHRegionInfo(), entry.getKey(), entry.getEdit());
2068         // Update metrics.
2069         postAppend(entry, EnvironmentEdgeManager.currentTime() - start);
2070       } catch (Exception e) {
2071         LOG.fatal("Could not append. Requesting close of wal", e);
2072         requestLogRoll();
2073         throw e;
2074       }
2075       numEntries.incrementAndGet();
2076     }
2077 
2078     @Override
2079     public void onStart() {
2080       for (SyncRunner syncRunner: this.syncRunners) syncRunner.start();
2081     }
2082 
2083     @Override
2084     public void onShutdown() {
2085       for (SyncRunner syncRunner: this.syncRunners) syncRunner.interrupt();
2086     }
2087   }
2088 
2089   /**
2090    * Exposed for testing only.  Use to tricks like halt the ring buffer appending.
2091    */
2092   @VisibleForTesting
2093   void atHeadOfRingBufferEventHandlerAppend() {
2094     // Noop
2095   }
2096 
2097   private static IOException ensureIOException(final Throwable t) {
2098     return (t instanceof IOException)? (IOException)t: new IOException(t);
2099   }
2100 
2101   private static void usage() {
2102     System.err.println("Usage: FSHLog <ARGS>");
2103     System.err.println("Arguments:");
2104     System.err.println(" --dump  Dump textual representation of passed one or more files");
2105     System.err.println("         For example: " +
2106       "FSHLog --dump hdfs://example.com:9000/hbase/.logs/MACHINE/LOGFILE");
2107     System.err.println(" --split Split the passed directory of WAL logs");
2108     System.err.println("         For example: " +
2109       "FSHLog --split hdfs://example.com:9000/hbase/.logs/DIR");
2110   }
2111 
2112   /**
2113    * Pass one or more log file names and it will either dump out a text version
2114    * on <code>stdout</code> or split the specified log files.
2115    *
2116    * @param args
2117    * @throws IOException
2118    */
2119   public static void main(String[] args) throws IOException {
2120     if (args.length < 2) {
2121       usage();
2122       System.exit(-1);
2123     }
2124     // either dump using the WALPrettyPrinter or split, depending on args
2125     if (args[0].compareTo("--dump") == 0) {
2126       WALPrettyPrinter.run(Arrays.copyOfRange(args, 1, args.length));
2127     } else if (args[0].compareTo("--perf") == 0) {
2128       LOG.fatal("Please use the WALPerformanceEvaluation tool instead. i.e.:");
2129       LOG.fatal("\thbase org.apache.hadoop.hbase.wal.WALPerformanceEvaluation --iterations " +
2130           args[1]);
2131       System.exit(-1);
2132     } else if (args[0].compareTo("--split") == 0) {
2133       Configuration conf = HBaseConfiguration.create();
2134       for (int i = 1; i < args.length; i++) {
2135         try {
2136           Path logPath = new Path(args[i]);
2137           FSUtils.setFsDefault(conf, logPath);
2138           split(conf, logPath);
2139         } catch (IOException t) {
2140           t.printStackTrace(System.err);
2141           System.exit(-1);
2142         }
2143       }
2144     } else {
2145       usage();
2146       System.exit(-1);
2147     }
2148   }
2149   
2150   /**
2151    * Find the 'getPipeline' on the passed <code>os</code> stream.
2152    * @return Method or null.
2153    */
2154   private Method getGetPipeline(final FSDataOutputStream os) {
2155     Method m = null;
2156     if (os != null) {
2157       Class<? extends OutputStream> wrappedStreamClass = os.getWrappedStream()
2158           .getClass();
2159       try {
2160         m = wrappedStreamClass.getDeclaredMethod("getPipeline",
2161           new Class<?>[] {});
2162         m.setAccessible(true);
2163       } catch (NoSuchMethodException e) {
2164         LOG.info("FileSystem's output stream doesn't support"
2165             + " getPipeline; not available; fsOut="
2166             + wrappedStreamClass.getName());
2167       } catch (SecurityException e) {
2168         LOG.info(
2169           "Doesn't have access to getPipeline on "
2170               + "FileSystems's output stream ; fsOut="
2171               + wrappedStreamClass.getName(), e);
2172         m = null; // could happen on setAccessible()
2173       }
2174     }
2175     return m;
2176   }
2177 
2178   /**
2179    * This method gets the pipeline for the current WAL.
2180    */
2181   @VisibleForTesting
2182   DatanodeInfo[] getPipeLine() {
2183     if (this.getPipeLine != null && this.hdfs_out != null) {
2184       Object repl;
2185       try {
2186         repl = this.getPipeLine.invoke(getOutputStream(), NO_ARGS);
2187         if (repl instanceof DatanodeInfo[]) {
2188           return ((DatanodeInfo[]) repl);
2189         }
2190       } catch (Exception e) {
2191         LOG.info("Get pipeline failed", e);
2192       }
2193     }
2194     return new DatanodeInfo[0];
2195   }
2196 }