001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.SLOW_SYNC;
021
022import java.io.IOException;
023import java.io.OutputStream;
024import java.util.Arrays;
025import java.util.List;
026import java.util.concurrent.BlockingQueue;
027import java.util.concurrent.CompletableFuture;
028import java.util.concurrent.LinkedBlockingQueue;
029import java.util.concurrent.atomic.AtomicInteger;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.FSDataOutputStream;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.hbase.Abortable;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.util.Bytes;
037import org.apache.hadoop.hbase.util.ClassSize;
038import org.apache.hadoop.hbase.util.CommonFSUtils;
039import org.apache.hadoop.hbase.wal.FSHLogProvider;
040import org.apache.hadoop.hbase.wal.WALProvider.Writer;
041import org.apache.hadoop.hdfs.DFSOutputStream;
042import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
043import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
044import org.apache.yetus.audience.InterfaceAudience;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048/**
049 * The original implementation of FSWAL.
050 */
051@InterfaceAudience.Private
052public class FSHLog extends AbstractFSWAL<Writer> {
053  // IMPLEMENTATION NOTES:
054  //
055  // At the core is a ring buffer. Our ring buffer is the LMAX Disruptor. It tries to
056  // minimize synchronizations and volatile writes when multiple contending threads as is the case
057  // here appending and syncing on a single WAL. The Disruptor is configured to handle multiple
058  // producers but it has one consumer only (the producers in HBase are IPC Handlers calling append
059  // and then sync). The single consumer/writer pulls the appends and syncs off the ring buffer.
060  // When a handler calls sync, it is given back a future. The producer 'blocks' on the future so
061  // it does not return until the sync completes. The future is passed over the ring buffer from
062  // the producer/handler to the consumer thread where it does its best to batch up the producer
063  // syncs so one WAL sync actually spans multiple producer sync invocations. How well the
064  // batching works depends on the write rate; i.e. we tend to batch more in times of
065  // high writes/syncs.
066  //
067  // Calls to append now also wait until the append has been done on the consumer side of the
068  // disruptor. We used to not wait but it makes the implementation easier to grok if we have
069  // the region edit/sequence id after the append returns.
070  //
071  // TODO: Handlers need to coordinate appending AND syncing. Can we have the threads contend
072  // once only? Probably hard given syncs take way longer than an append.
073  //
074  // The consumer threads pass the syncs off to multiple syncing threads in a round robin fashion
075  // to ensure we keep up back-to-back FS sync calls (FS sync calls are the long poll writing the
076  // WAL). The consumer thread passes the futures to the sync threads for it to complete
077  // the futures when done.
078  //
079  // The 'sequence' in the below is the sequence of the append/sync on the ringbuffer. It
080  // acts as a sort-of transaction id. It is always incrementing.
081  //
082  // The RingBufferEventHandler class hosts the ring buffer consuming code. The threads that
083  // do the actual FS sync are implementations of SyncRunner. SafePointZigZagLatch is a
084  // synchronization class used to halt the consumer at a safe point -- just after all outstanding
085  // syncs and appends have completed -- so the log roller can swap the WAL out under it.
086  //
087  // We use ring buffer sequence as txid of FSWALEntry and SyncFuture.
088  private static final Logger LOG = LoggerFactory.getLogger(FSHLog.class);
089
090  private static final String TOLERABLE_LOW_REPLICATION =
091    "hbase.regionserver.hlog.tolerable.lowreplication";
092  private static final String LOW_REPLICATION_ROLL_LIMIT =
093    "hbase.regionserver.hlog.lowreplication.rolllimit";
094  private static final int DEFAULT_LOW_REPLICATION_ROLL_LIMIT = 5;
095  private static final String SYNCER_COUNT = "hbase.regionserver.hlog.syncer.count";
096  private static final int DEFAULT_SYNCER_COUNT = 5;
097  private static final String MAX_BATCH_COUNT = "hbase.regionserver.wal.sync.batch.count";
098  private static final int DEFAULT_MAX_BATCH_COUNT = 200;
099
100  private static final String FSHLOG_WAIT_ON_SHUTDOWN_IN_SECONDS =
101    "hbase.wal.fshlog.wait.on.shutdown.seconds";
102  private static final int DEFAULT_FSHLOG_WAIT_ON_SHUTDOWN_IN_SECONDS = 5;
103
104  private static final IOException WITER_REPLACED_EXCEPTION =
105    new IOException("Writer was replaced!");
106  private static final IOException WITER_BROKEN_EXCEPTION = new IOException("Wirter was broken!");
107  private static final IOException WAL_CLOSE_EXCEPTION = new IOException("WAL was closed!");
108
109  /**
110   * FSDataOutputStream associated with the current SequenceFile.writer
111   */
112  private FSDataOutputStream hdfs_out;
113
114  // All about log rolling if not enough replicas outstanding.
115
116  // Minimum tolerable replicas, if the actual value is lower than it, rollWriter will be triggered
117  private final int minTolerableReplication;
118
119  // If live datanode count is lower than the default replicas value,
120  // RollWriter will be triggered in each sync(So the RollWriter will be
121  // triggered one by one in a short time). Using it as a workaround to slow
122  // down the roll frequency triggered by checkLowReplication().
123  private final AtomicInteger consecutiveLogRolls = new AtomicInteger(0);
124
125  private final int lowReplicationRollLimit;
126
127  // If consecutiveLogRolls is larger than lowReplicationRollLimit,
128  // then disable the rolling in checkLowReplication().
129  // Enable it if the replications recover.
130  private volatile boolean lowReplicationRollEnabled = true;
131
132  private final int syncerCount;
133  private final int maxSyncRequestCount;
134
135  /**
136   * Which syncrunner to use next.
137   */
138  private int syncRunnerIndex = 0;
139
140  private SyncRunner[] syncRunners = null;
141
142  /**
143   * Constructor.
144   * @param fs     filesystem handle
145   * @param root   path for stored and archived wals
146   * @param logDir dir where wals are stored
147   * @param conf   configuration to use
148   */
149  public FSHLog(final FileSystem fs, final Path root, final String logDir, final Configuration conf)
150    throws IOException {
151    this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null);
152  }
153
154  public FSHLog(final FileSystem fs, Abortable abortable, final Path root, final String logDir,
155    final Configuration conf) throws IOException {
156    this(fs, abortable, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null,
157      null, null, null);
158  }
159
160  public FSHLog(final FileSystem fs, final Path rootDir, final String logDir,
161    final String archiveDir, final Configuration conf, final List<WALActionsListener> listeners,
162    final boolean failIfWALExists, final String prefix, final String suffix) throws IOException {
163    this(fs, null, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix,
164      null, null);
165  }
166
167  /**
168   * Create an edit log at the given <code>dir</code> location. You should never have to load an
169   * existing log. If there is a log at startup, it should have already been processed and deleted
170   * by the time the WAL object is started up.
171   * @param fs              filesystem handle
172   * @param abortable       Abortable - the server here
173   * @param rootDir         path to where logs and oldlogs
174   * @param logDir          dir where wals are stored
175   * @param archiveDir      dir where wals are archived
176   * @param conf            configuration to use
177   * @param listeners       Listeners on WAL events. Listeners passed here will be registered before
178   *                        we do anything else; e.g. the Constructor {@link #rollWriter()}.
179   * @param failIfWALExists If true IOException will be thrown if files related to this wal already
180   *                        exist.
181   * @param prefix          should always be hostname and port in distributed env and it will be URL
182   *                        encoded before being used. If prefix is null, "wal" will be used
183   * @param suffix          will be url encoded. null is treated as empty. non-empty must start with
184   *                        {@link org.apache.hadoop.hbase.wal.AbstractFSWALProvider#WAL_FILE_NAME_DELIMITER}
185   */
186  public FSHLog(final FileSystem fs, final Abortable abortable, final Path rootDir,
187    final String logDir, final String archiveDir, final Configuration conf,
188    final List<WALActionsListener> listeners, final boolean failIfWALExists, final String prefix,
189    final String suffix, FileSystem remoteFs, Path remoteWALDir) throws IOException {
190    super(fs, abortable, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix,
191      suffix, remoteFs, remoteWALDir);
192    this.minTolerableReplication =
193      conf.getInt(TOLERABLE_LOW_REPLICATION, CommonFSUtils.getDefaultReplication(fs, this.walDir));
194    this.lowReplicationRollLimit =
195      conf.getInt(LOW_REPLICATION_ROLL_LIMIT, DEFAULT_LOW_REPLICATION_ROLL_LIMIT);
196
197    // Advance the ring buffer sequence so that it starts from 1 instead of 0,
198    // because SyncFuture.NOT_DONE = 0.
199
200    this.syncerCount = conf.getInt(SYNCER_COUNT, DEFAULT_SYNCER_COUNT);
201    this.maxSyncRequestCount = conf.getInt(MAX_BATCH_COUNT,
202      conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, DEFAULT_MAX_BATCH_COUNT));
203
204    this.createSingleThreadPoolConsumeExecutor("FSHLog", rootDir, prefix);
205
206    this.setWaitOnShutdownInSeconds(
207      conf.getInt(FSHLOG_WAIT_ON_SHUTDOWN_IN_SECONDS, DEFAULT_FSHLOG_WAIT_ON_SHUTDOWN_IN_SECONDS),
208      FSHLOG_WAIT_ON_SHUTDOWN_IN_SECONDS);
209  }
210
211  @Override
212  public void init() throws IOException {
213    super.init();
214    this.createSyncRunnersAndStart();
215  }
216
217  private void createSyncRunnersAndStart() {
218    this.syncRunnerIndex = 0;
219    this.syncRunners = new SyncRunner[syncerCount];
220    for (int i = 0; i < syncerCount; i++) {
221      this.syncRunners[i] = new SyncRunner("sync." + i, maxSyncRequestCount);
222      this.syncRunners[i].start();
223    }
224  }
225
226  /**
227   * Currently, we need to expose the writer's OutputStream to tests so that they can manipulate the
228   * default behavior (such as setting the maxRecoveryErrorCount value). This is done using
229   * reflection on the underlying HDFS OutputStream. NOTE: This could be removed once Hadoop1
230   * support is removed.
231   * @return null if underlying stream is not ready.
232   */
233  OutputStream getOutputStream() {
234    FSDataOutputStream fsdos = this.hdfs_out;
235    return fsdos != null ? fsdos.getWrappedStream() : null;
236  }
237
238  /**
239   * Run a sync after opening to set up the pipeline.
240   */
241  private void preemptiveSync(final ProtobufLogWriter nextWriter) {
242    long startTimeNanos = System.nanoTime();
243    try {
244      nextWriter.sync(useHsync);
245      postSync(System.nanoTime() - startTimeNanos, 0);
246    } catch (IOException e) {
247      // optimization failed, no need to abort here.
248      LOG.warn("pre-sync failed but an optimization so keep going", e);
249    }
250  }
251
252  /**
253   * This method allows subclasses to inject different writers without having to extend other
254   * methods like rollWriter().
255   * @return Writer instance
256   */
257  @Override
258  protected Writer createWriterInstance(FileSystem fs, Path path) throws IOException {
259    Writer writer = FSHLogProvider.createWriter(conf, fs, path, false, this.blocksize);
260    if (writer instanceof ProtobufLogWriter) {
261      preemptiveSync((ProtobufLogWriter) writer);
262    }
263    return writer;
264  }
265
266  @Override
267  protected void doAppend(Writer writer, FSWALEntry entry) throws IOException {
268    writer.append(entry);
269  }
270
271  @Override
272  protected void onWriterReplaced(Writer nextWriter) {
273    if (nextWriter != null && nextWriter instanceof ProtobufLogWriter) {
274      this.hdfs_out = ((ProtobufLogWriter) nextWriter).getStream();
275    } else {
276      this.hdfs_out = null;
277    }
278    this.createSyncRunnersAndStart();
279  }
280
281  @Override
282  protected void doCleanUpResources() {
283    this.shutDownSyncRunners();
284  };
285
286  private void shutDownSyncRunners() {
287    SyncRunner[] syncRunnersToUse = this.syncRunners;
288    if (syncRunnersToUse != null) {
289      for (SyncRunner syncRunner : syncRunnersToUse) {
290        syncRunner.shutDown();
291      }
292    }
293    this.syncRunners = null;
294  }
295
296  @Override
297  protected CompletableFuture<Long> doWriterSync(Writer writer, boolean shouldUseHSync,
298    long txidWhenSync) {
299    CompletableFuture<Long> future = new CompletableFuture<>();
300    SyncRequest syncRequest = new SyncRequest(writer, shouldUseHSync, txidWhenSync, future);
301    this.offerSyncRequest(syncRequest);
302    return future;
303  }
304
305  private void offerSyncRequest(SyncRequest syncRequest) {
306    for (int i = 0; i < this.syncRunners.length; i++) {
307      this.syncRunnerIndex = (this.syncRunnerIndex + 1) % this.syncRunners.length;
308      if (this.syncRunners[this.syncRunnerIndex].offer(syncRequest)) {
309        return;
310      }
311    }
312    syncRequest.completableFuture
313      .completeExceptionally(new IOException("There is no available syncRunner."));
314  }
315
316  static class SyncRequest {
317    private final Writer writer;
318    private final boolean shouldUseHSync;
319    private final long sequenceWhenSync;
320    private final CompletableFuture<Long> completableFuture;
321
322    public SyncRequest(Writer writer, boolean shouldUseHSync, long txidWhenSync,
323      CompletableFuture<Long> completableFuture) {
324      this.writer = writer;
325      this.shouldUseHSync = shouldUseHSync;
326      this.sequenceWhenSync = txidWhenSync;
327      this.completableFuture = completableFuture;
328    }
329
330  }
331
332  /**
333   * Thread to runs the hdfs sync call. This call takes a while to complete. This is the longest
334   * pole adding edits to the WAL and this must complete to be sure all edits persisted. We run
335   * multiple threads sync'ng rather than one that just syncs in series so we have better latencies;
336   * otherwise, an edit that arrived just after a sync started, might have to wait almost the length
337   * of two sync invocations before it is marked done.
338   * <p>
339   * When the sync completes, it marks all the passed in futures done. On the other end of the sync
340   * future is a blocked thread, usually a regionserver Handler. There may be more than one future
341   * passed in the case where a few threads arrive at about the same time and all invoke 'sync'. In
342   * this case we'll batch up the invocations and run one filesystem sync only for a batch of
343   * Handler sync invocations. Do not confuse these Handler SyncFutures with the futures an
344   * ExecutorService returns when you call submit. We have no use for these in this model. These
345   * SyncFutures are 'artificial', something to hold the Handler until the filesystem sync
346   * completes.
347   */
348  private class SyncRunner extends Thread {
349    // Keep around last exception thrown. Clear on successful sync.
350    private final BlockingQueue<SyncRequest> syncRequests;
351    private volatile boolean shutDown = false;
352
353    SyncRunner(final String name, final int maxHandlersCount) {
354      super(name);
355      // LinkedBlockingQueue because of
356      // http://www.javacodegeeks.com/2010/09/java-best-practices-queue-battle-and.html
357      // Could use other blockingqueues here or concurrent queues.
358      //
359      // We could let the capacity be 'open' but bound it so we get alerted in pathological case
360      // where we cannot sync and we have a bunch of threads all backed up waiting on their syncs
361      // to come in. LinkedBlockingQueue actually shrinks when you remove elements so Q should
362      // stay neat and tidy in usual case. Let the max size be three times the maximum handlers.
363      // The passed in maxHandlerCount is the user-level handlers which is what we put up most of
364      // but HBase has other handlers running too -- opening region handlers which want to write
365      // the meta table when succesful (i.e. sync), closing handlers -- etc. These are usually
366      // much fewer in number than the user-space handlers so Q-size should be user handlers plus
367      // some space for these other handlers. Lets multiply by 3 for good-measure.
368      this.syncRequests = new LinkedBlockingQueue<>(maxHandlersCount * 3);
369    }
370
371    boolean offer(SyncRequest syncRequest) {
372      if (this.shutDown) {
373        return false;
374      }
375
376      if (!this.syncRequests.offer(syncRequest)) {
377        return false;
378      }
379
380      // recheck
381      if (this.shutDown) {
382        if (this.syncRequests.remove(syncRequest)) {
383          return false;
384        }
385      }
386      return true;
387    }
388
389    private void completeSyncRequests(SyncRequest syncRequest, long syncedSequenceId) {
390      if (syncRequest != null) {
391        syncRequest.completableFuture.complete(syncedSequenceId);
392      }
393      while (true) {
394        SyncRequest head = this.syncRequests.peek();
395        if (head == null) {
396          break;
397        }
398        if (head.sequenceWhenSync > syncedSequenceId) {
399          break;
400        }
401        head.completableFuture.complete(syncedSequenceId);
402        this.syncRequests.poll();
403      }
404    }
405
406    private void completeExceptionallySyncRequests(SyncRequest syncRequest, Exception exception) {
407      if (syncRequest != null) {
408        syncRequest.completableFuture.completeExceptionally(exception);
409      }
410      while (true) {
411        SyncRequest head = this.syncRequests.peek();
412        if (head == null) {
413          break;
414        }
415        if (head.writer != syncRequest.writer) {
416          break;
417        }
418        head.completableFuture.completeExceptionally(exception);
419        this.syncRequests.poll();
420      }
421    }
422
423    private SyncRequest takeSyncRequest() throws InterruptedException {
424      while (true) {
425        // We have to process what we 'take' from the queue
426        SyncRequest syncRequest = this.syncRequests.take();
427        // See if we can process any syncfutures BEFORE we go sync.
428        long currentHighestSyncedSequence = highestSyncedTxid.get();
429        if (syncRequest.sequenceWhenSync < currentHighestSyncedSequence) {
430          syncRequest.completableFuture.complete(currentHighestSyncedSequence);
431          continue;
432        }
433        return syncRequest;
434      }
435    }
436
437    @Override
438    public void run() {
439      while (!this.shutDown) {
440        try {
441          SyncRequest syncRequest = this.takeSyncRequest();
442          // I got something. Lets run. Save off current sequence number in case it changes
443          // while we run.
444          long currentSequenceToUse = syncRequest.sequenceWhenSync;
445          boolean writerBroken = isWriterBroken();
446          long currentHighestProcessedAppendTxid = highestProcessedAppendTxid;
447          Writer currentWriter = writer;
448          if (currentWriter != syncRequest.writer) {
449            syncRequest.completableFuture.completeExceptionally(WITER_REPLACED_EXCEPTION);
450            continue;
451          }
452          if (writerBroken) {
453            syncRequest.completableFuture.completeExceptionally(WITER_BROKEN_EXCEPTION);
454            continue;
455          }
456          if (currentHighestProcessedAppendTxid > currentSequenceToUse) {
457            currentSequenceToUse = currentHighestProcessedAppendTxid;
458          }
459          Exception lastException = null;
460          try {
461            writer.sync(syncRequest.shouldUseHSync);
462          } catch (IOException e) {
463            LOG.error("Error syncing", e);
464            lastException = e;
465          } catch (Exception e) {
466            LOG.warn("UNEXPECTED", e);
467            lastException = e;
468          } finally {
469            if (lastException != null) {
470              this.completeExceptionallySyncRequests(syncRequest, lastException);
471            } else {
472              this.completeSyncRequests(syncRequest, currentSequenceToUse);
473            }
474          }
475        } catch (InterruptedException e) {
476          // Presume legit interrupt.
477          LOG.info("interrupted");
478        } catch (Throwable t) {
479          LOG.warn("UNEXPECTED, continuing", t);
480        }
481      }
482      this.clearSyncRequestsWhenShutDown();
483    }
484
485    private void clearSyncRequestsWhenShutDown() {
486      while (true) {
487        SyncRequest syncRequest = this.syncRequests.poll();
488        if (syncRequest == null) {
489          break;
490        }
491        syncRequest.completableFuture.completeExceptionally(WAL_CLOSE_EXCEPTION);
492      }
493    }
494
495    void shutDown() {
496      try {
497        this.shutDown = true;
498        this.interrupt();
499        this.join();
500      } catch (InterruptedException e) {
501        LOG.warn("interrupted", e);
502        Thread.currentThread().interrupt();
503      }
504    }
505  }
506
507  @Override
508  protected void checkSlowSyncCount() {
509    if (isLogRollRequested()) {
510      return;
511    }
512    if (doCheckSlowSync()) {
513      // We log this already in checkSlowSync
514      requestLogRoll(SLOW_SYNC);
515    }
516  }
517
518  /** Returns true if number of replicas for the WAL is lower than threshold */
519  @Override
520  protected boolean doCheckLogLowReplication() {
521    boolean logRollNeeded = false;
522    // if the number of replicas in HDFS has fallen below the configured
523    // value, then roll logs.
524    try {
525      int numCurrentReplicas = getLogReplication();
526      if (numCurrentReplicas != 0 && numCurrentReplicas < this.minTolerableReplication) {
527        if (this.lowReplicationRollEnabled) {
528          if (this.consecutiveLogRolls.get() < this.lowReplicationRollLimit) {
529            LOG.warn("HDFS pipeline error detected. " + "Found " + numCurrentReplicas
530              + " replicas but expecting no less than " + this.minTolerableReplication
531              + " replicas. " + " Requesting close of WAL. current pipeline: "
532              + Arrays.toString(getPipeline()));
533            logRollNeeded = true;
534            // If rollWriter is requested, increase consecutiveLogRolls. Once it
535            // is larger than lowReplicationRollLimit, disable the
536            // LowReplication-Roller
537            this.consecutiveLogRolls.getAndIncrement();
538          } else {
539            LOG.warn("Too many consecutive RollWriter requests, it's a sign of "
540              + "the total number of live datanodes is lower than the tolerable replicas.");
541            this.consecutiveLogRolls.set(0);
542            this.lowReplicationRollEnabled = false;
543          }
544        }
545      } else if (numCurrentReplicas >= this.minTolerableReplication) {
546        if (!this.lowReplicationRollEnabled) {
547          // The new writer's log replicas is always the default value.
548          // So we should not enable LowReplication-Roller. If numEntries
549          // is lower than or equals 1, we consider it as a new writer.
550          if (this.numEntries.get() <= 1) {
551            return logRollNeeded;
552          }
553          // Once the live datanode number and the replicas return to normal,
554          // enable the LowReplication-Roller.
555          this.lowReplicationRollEnabled = true;
556          LOG.info("LowReplication-Roller was enabled.");
557        }
558      }
559    } catch (Exception e) {
560      LOG.warn("DFSOutputStream.getNumCurrentReplicas failed because of " + e + ", continuing...");
561    }
562    return logRollNeeded;
563  }
564
565  /**
566   * {@inheritDoc}
567   * <p>
568   * If the pipeline isn't started yet or is empty, you will get the default replication factor.
569   * Therefore, if this function returns 0, it means you are not properly running with the HDFS-826
570   * patch.
571   */
572  @Override
573  int getLogReplication() {
574    try {
575      // in standalone mode, it will return 0
576      if (this.hdfs_out instanceof HdfsDataOutputStream) {
577        return ((HdfsDataOutputStream) this.hdfs_out).getCurrentBlockReplication();
578      }
579    } catch (IOException e) {
580      LOG.info("", e);
581    }
582    return 0;
583  }
584
585  boolean isLowReplicationRollEnabled() {
586    return lowReplicationRollEnabled;
587  }
588
589  public static final long FIXED_OVERHEAD =
590    ClassSize.align(ClassSize.OBJECT + (5 * ClassSize.REFERENCE) + (2 * ClassSize.ATOMIC_INTEGER)
591      + (3 * Bytes.SIZEOF_INT) + (4 * Bytes.SIZEOF_LONG));
592
593  /**
594   * This method gets the pipeline for the current WAL.
595   */
596  @Override
597  DatanodeInfo[] getPipeline() {
598    if (this.hdfs_out != null) {
599      if (this.hdfs_out.getWrappedStream() instanceof DFSOutputStream) {
600        return ((DFSOutputStream) this.hdfs_out.getWrappedStream()).getPipeline();
601      }
602    }
603    return new DatanodeInfo[0];
604  }
605
606  @Override
607  protected Writer createCombinedWriter(Writer localWriter, Writer remoteWriter) {
608    // put remote writer first as usually it will cost more time to finish, so we write to it first
609    return CombinedWriter.create(remoteWriter, localWriter);
610  }
611}