001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.ERROR;
021import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.SIZE;
022import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
023
024import com.lmax.disruptor.RingBuffer;
025import com.lmax.disruptor.Sequence;
026import com.lmax.disruptor.Sequencer;
027import java.io.IOException;
028import java.lang.reflect.Field;
029import java.util.ArrayDeque;
030import java.util.Comparator;
031import java.util.Deque;
032import java.util.Iterator;
033import java.util.List;
034import java.util.Queue;
035import java.util.SortedSet;
036import java.util.TreeSet;
037import java.util.concurrent.ExecutorService;
038import java.util.concurrent.Executors;
039import java.util.concurrent.LinkedBlockingQueue;
040import java.util.concurrent.ThreadPoolExecutor;
041import java.util.concurrent.TimeUnit;
042import java.util.concurrent.atomic.AtomicBoolean;
043import java.util.concurrent.locks.Condition;
044import java.util.concurrent.locks.Lock;
045import java.util.concurrent.locks.ReentrantLock;
046import java.util.function.Supplier;
047import org.apache.hadoop.conf.Configuration;
048import org.apache.hadoop.fs.FileSystem;
049import org.apache.hadoop.fs.Path;
050import org.apache.hadoop.hbase.Abortable;
051import org.apache.hadoop.hbase.HBaseInterfaceAudience;
052import org.apache.hadoop.hbase.client.RegionInfo;
053import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
054import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
055import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
056import org.apache.hadoop.hbase.wal.WALEdit;
057import org.apache.hadoop.hbase.wal.WALKeyImpl;
058import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter;
059import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
060import org.apache.yetus.audience.InterfaceAudience;
061import org.slf4j.Logger;
062import org.slf4j.LoggerFactory;
063
064import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
065import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
066import org.apache.hbase.thirdparty.io.netty.channel.Channel;
067import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
068import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
069import org.apache.hbase.thirdparty.io.netty.util.concurrent.SingleThreadEventExecutor;
070
071/**
072 * An asynchronous implementation of FSWAL.
073 * <p>
074 * Here 'waitingConsumePayloads' acts as the RingBuffer in FSHLog.
075 * <p>
076 * For append, we process it as follow:
077 * <ol>
078 * <li>In the caller thread(typically, in the rpc handler thread):
079 * <ol>
080 * <li>Insert the entry into 'waitingConsumePayloads'. Use ringbuffer sequence as txid.</li>
081 * <li>Schedule the consumer task if needed. See {@link #shouldScheduleConsumer()} for more details.
082 * </li>
083 * </ol>
084 * </li>
085 * <li>In the consumer task(executed in a single threaded thread pool)
086 * <ol>
087 * <li>Poll the entry from {@link #waitingConsumePayloads} and insert it into
088 * {@link #toWriteAppends}</li>
089 * <li>Poll the entry from {@link #toWriteAppends}, append it to the AsyncWriter, and insert it into
090 * {@link #unackedAppends}</li>
091 * <li>If the buffered size reaches {@link #batchSize}, or there is a sync request, then we call
092 * sync on the AsyncWriter.</li>
093 * <li>In the callback methods:
094 * <ul>
095 * <li>If succeeded, poll the entry from {@link #unackedAppends} and drop it.</li>
096 * <li>If failed, add all the entries in {@link #unackedAppends} back to {@link #toWriteAppends} and
097 * wait for writing them again.</li>
098 * </ul>
099 * </li>
100 * </ol>
101 * </li>
102 * </ol>
103 * For sync, the processing stages are almost same. And different from FSHLog, we will open a new
104 * writer and rewrite unacked entries to the new writer and sync again if we hit a sync error.
105 * <p>
106 * Here we only describe the logic of doReplaceWriter. The main logic of rollWriter is same with
107 * FSHLog.<br>
108 * For a normal roll request(for example, we have reached the log roll size):
109 * <ol>
110 * <li>In the log roller thread, we will set {@link #waitingRoll} to true and
111 * {@link #readyForRolling} to false, and then wait on {@link #readyForRolling}(see
112 * {@link #waitForSafePoint()}).</li>
113 * <li>In the consumer thread, we will stop polling entries from {@link #waitingConsumePayloads} if
114 * {@link #waitingRoll} is true, and also stop writing the entries in {@link #toWriteAppends} out.
115 * </li>
116 * <li>If there are unflush data in the writer, sync them.</li>
117 * <li>When all out-going sync request is finished, i.e, the {@link #unackedAppends} is empty,
118 * signal the {@link #readyForRollingCond}.</li>
119 * <li>Back to the log roller thread, now we can confirm that there are no out-going entries, i.e.,
120 * we reach a safe point. So it is safe to replace old writer with new writer now.</li>
121 * <li>Set {@link #writerBroken} and {@link #waitingRoll} to false.</li>
122 * <li>Schedule the consumer task.</li>
123 * <li>Schedule a background task to close the old writer.</li>
124 * </ol>
125 * For a broken writer roll request, the only difference is that we can bypass the wait for safe
126 * point stage.
127 */
128@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
129public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
130
131  private static final Logger LOG = LoggerFactory.getLogger(AsyncFSWAL.class);
132
133  private static final Comparator<SyncFuture> SEQ_COMPARATOR =
134    Comparator.comparingLong(SyncFuture::getTxid).thenComparingInt(System::identityHashCode);
135
136  public static final String WAL_BATCH_SIZE = "hbase.wal.batch.size";
137  public static final long DEFAULT_WAL_BATCH_SIZE = 64L * 1024;
138
139  public static final String ASYNC_WAL_USE_SHARED_EVENT_LOOP =
140    "hbase.wal.async.use-shared-event-loop";
141  public static final boolean DEFAULT_ASYNC_WAL_USE_SHARED_EVENT_LOOP = false;
142
143  public static final String ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS =
144    "hbase.wal.async.wait.on.shutdown.seconds";
145  public static final int DEFAULT_ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS = 5;
146
147  private final EventLoopGroup eventLoopGroup;
148
149  private final ExecutorService consumeExecutor;
150
151  private final Class<? extends Channel> channelClass;
152
153  private final Lock consumeLock = new ReentrantLock();
154
155  private final Runnable consumer = this::consume;
156
157  // check if there is already a consumer task in the event loop's task queue
158  private final Supplier<Boolean> hasConsumerTask;
159
160  private static final int MAX_EPOCH = 0x3FFFFFFF;
161  // the lowest bit is waitingRoll, which means new writer is created and we are waiting for old
162  // writer to be closed.
163  // the second lowest bit is writerBroken which means the current writer is broken and rollWriter
164  // is needed.
165  // all other bits are the epoch number of the current writer, this is used to detect whether the
166  // writer is still the one when you issue the sync.
167  // notice that, modification to this field is only allowed under the protection of consumeLock.
168  private volatile int epochAndState;
169
170  private boolean readyForRolling;
171
172  private final Condition readyForRollingCond = consumeLock.newCondition();
173
174  private final RingBuffer<RingBufferTruck> waitingConsumePayloads;
175
176  private final Sequence waitingConsumePayloadsGatingSequence;
177
178  private final AtomicBoolean consumerScheduled = new AtomicBoolean(false);
179
180  private final long batchSize;
181
182  private final ExecutorService closeExecutor = Executors.newCachedThreadPool(
183    new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build());
184
185  private volatile AsyncFSOutput fsOut;
186
187  private final Deque<FSWALEntry> toWriteAppends = new ArrayDeque<>();
188
189  private final Deque<FSWALEntry> unackedAppends = new ArrayDeque<>();
190
191  private final SortedSet<SyncFuture> syncFutures = new TreeSet<>(SEQ_COMPARATOR);
192
193  // the highest txid of WAL entries being processed
194  private long highestProcessedAppendTxid;
195
196  // file length when we issue last sync request on the writer
197  private long fileLengthAtLastSync;
198
199  private long highestProcessedAppendTxidAtLastSync;
200
201  private final int waitOnShutdownInSeconds;
202
203  private final StreamSlowMonitor streamSlowMonitor;
204
205  public AsyncFSWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir,
206    Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists, String prefix,
207    String suffix, EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass)
208    throws FailedLogCloseException, IOException {
209    this(fs, null, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix,
210      eventLoopGroup, channelClass, StreamSlowMonitor.create(conf, "monitorForSuffix"));
211  }
212
213  public AsyncFSWAL(FileSystem fs, Abortable abortable, Path rootDir, String logDir,
214    String archiveDir, Configuration conf, List<WALActionsListener> listeners,
215    boolean failIfWALExists, String prefix, String suffix, EventLoopGroup eventLoopGroup,
216    Class<? extends Channel> channelClass, StreamSlowMonitor monitor)
217    throws FailedLogCloseException, IOException {
218    super(fs, abortable, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix,
219      suffix);
220    this.eventLoopGroup = eventLoopGroup;
221    this.channelClass = channelClass;
222    this.streamSlowMonitor = monitor;
223    Supplier<Boolean> hasConsumerTask;
224    if (conf.getBoolean(ASYNC_WAL_USE_SHARED_EVENT_LOOP, DEFAULT_ASYNC_WAL_USE_SHARED_EVENT_LOOP)) {
225      this.consumeExecutor = eventLoopGroup.next();
226      if (consumeExecutor instanceof SingleThreadEventExecutor) {
227        try {
228          Field field = SingleThreadEventExecutor.class.getDeclaredField("taskQueue");
229          field.setAccessible(true);
230          Queue<?> queue = (Queue<?>) field.get(consumeExecutor);
231          hasConsumerTask = () -> queue.peek() == consumer;
232        } catch (Exception e) {
233          LOG.warn("Can not get task queue of " + consumeExecutor
234            + ", this is not necessary, just give up", e);
235          hasConsumerTask = () -> false;
236        }
237      } else {
238        hasConsumerTask = () -> false;
239      }
240    } else {
241      ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
242        new LinkedBlockingQueue<Runnable>(), new ThreadFactoryBuilder()
243          .setNameFormat("AsyncFSWAL-%d-" + rootDir.toString()).setDaemon(true).build());
244      hasConsumerTask = () -> threadPool.getQueue().peek() == consumer;
245      this.consumeExecutor = threadPool;
246    }
247
248    this.hasConsumerTask = hasConsumerTask;
249    int preallocatedEventCount =
250      conf.getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16);
251    waitingConsumePayloads =
252      RingBuffer.createMultiProducer(RingBufferTruck::new, preallocatedEventCount);
253    waitingConsumePayloadsGatingSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
254    waitingConsumePayloads.addGatingSequences(waitingConsumePayloadsGatingSequence);
255
256    // inrease the ringbuffer sequence so our txid is start from 1
257    waitingConsumePayloads.publish(waitingConsumePayloads.next());
258    waitingConsumePayloadsGatingSequence.set(waitingConsumePayloads.getCursor());
259
260    batchSize = conf.getLong(WAL_BATCH_SIZE, DEFAULT_WAL_BATCH_SIZE);
261    waitOnShutdownInSeconds = conf.getInt(ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS,
262      DEFAULT_ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS);
263  }
264
265  /**
266   * Helper that marks the future as DONE and offers it back to the cache.
267   */
268  private void markFutureDoneAndOffer(SyncFuture future, long txid, Throwable t) {
269    future.done(txid, t);
270    syncFutureCache.offer(future);
271  }
272
273  private static boolean waitingRoll(int epochAndState) {
274    return (epochAndState & 1) != 0;
275  }
276
277  private static boolean writerBroken(int epochAndState) {
278    return ((epochAndState >>> 1) & 1) != 0;
279  }
280
281  private static int epoch(int epochAndState) {
282    return epochAndState >>> 2;
283  }
284
285  // return whether we have successfully set readyForRolling to true.
286  private boolean trySetReadyForRolling() {
287    // Check without holding lock first. Usually we will just return here.
288    // waitingRoll is volatile and unacedEntries is only accessed inside event loop so it is safe to
289    // check them outside the consumeLock.
290    if (!waitingRoll(epochAndState) || !unackedAppends.isEmpty()) {
291      return false;
292    }
293    consumeLock.lock();
294    try {
295      // 1. a roll is requested
296      // 2. all out-going entries have been acked(we have confirmed above).
297      if (waitingRoll(epochAndState)) {
298        readyForRolling = true;
299        readyForRollingCond.signalAll();
300        return true;
301      } else {
302        return false;
303      }
304    } finally {
305      consumeLock.unlock();
306    }
307  }
308
309  private void syncFailed(long epochWhenSync, Throwable error) {
310    LOG.warn("sync failed", error);
311    boolean shouldRequestLogRoll = true;
312    consumeLock.lock();
313    try {
314      int currentEpochAndState = epochAndState;
315      if (epoch(currentEpochAndState) != epochWhenSync || writerBroken(currentEpochAndState)) {
316        // this is not the previous writer which means we have already rolled the writer.
317        // or this is still the current writer, but we have already marked it as broken and request
318        // a roll.
319        return;
320      }
321      this.epochAndState = currentEpochAndState | 0b10;
322      if (waitingRoll(currentEpochAndState)) {
323        readyForRolling = true;
324        readyForRollingCond.signalAll();
325        // this means we have already in the middle of a rollWriter so just tell the roller thread
326        // that you can continue without requesting an extra log roll.
327        shouldRequestLogRoll = false;
328      }
329    } finally {
330      consumeLock.unlock();
331    }
332    for (Iterator<FSWALEntry> iter = unackedAppends.descendingIterator(); iter.hasNext();) {
333      toWriteAppends.addFirst(iter.next());
334    }
335    highestUnsyncedTxid = highestSyncedTxid.get();
336    if (shouldRequestLogRoll) {
337      // request a roll.
338      requestLogRoll(ERROR);
339    }
340  }
341
342  private void syncCompleted(long epochWhenSync, AsyncWriter writer, long processedTxid,
343    long startTimeNs) {
344    // Please see the last several comments on HBASE-22761, it is possible that we get a
345    // syncCompleted which acks a previous sync request after we received a syncFailed on the same
346    // writer. So here we will also check on the epoch and state, if the epoch has already been
347    // changed, i.e, we have already rolled the writer, or the writer is already broken, we should
348    // just skip here, to avoid mess up the state or accidentally release some WAL entries and
349    // cause data corruption.
350    // The syncCompleted call is on the critical write path so we should try our best to make it
351    // fast. So here we do not hold consumeLock, for increasing performance. It is safe because
352    // there are only 3 possible situations:
353    // 1. For normal case, the only place where we change epochAndState is when rolling the writer.
354    // Before rolling actually happen, we will only change the state to waitingRoll which is another
355    // bit than writerBroken, and when we actually change the epoch, we can make sure that there is
356    // no out going sync request. So we will always pass the check here and there is no problem.
357    // 2. The writer is broken, but we have not called syncFailed yet. In this case, since
358    // syncFailed and syncCompleted are executed in the same thread, we will just face the same
359    // situation with #1.
360    // 3. The writer is broken, and syncFailed has been called. Then when we arrive here, there are
361    // only 2 possible situations:
362    // a. we arrive before we actually roll the writer, then we will find out the writer is broken
363    // and give up.
364    // b. we arrive after we actually roll the writer, then we will find out the epoch is changed
365    // and give up.
366    // For both #a and #b, we do not need to hold the consumeLock as we will always update the
367    // epochAndState as a whole.
368    // So in general, for all the cases above, we do not need to hold the consumeLock.
369    int epochAndState = this.epochAndState;
370    if (epoch(epochAndState) != epochWhenSync || writerBroken(epochAndState)) {
371      LOG.warn("Got a sync complete call after the writer is broken, skip");
372      return;
373    }
374    highestSyncedTxid.set(processedTxid);
375    for (Iterator<FSWALEntry> iter = unackedAppends.iterator(); iter.hasNext();) {
376      FSWALEntry entry = iter.next();
377      if (entry.getTxid() <= processedTxid) {
378        entry.release();
379        iter.remove();
380      } else {
381        break;
382      }
383    }
384    postSync(System.nanoTime() - startTimeNs, finishSync());
385    if (trySetReadyForRolling()) {
386      // we have just finished a roll, then do not need to check for log rolling, the writer will be
387      // closed soon.
388      return;
389    }
390    // If we haven't already requested a roll, check if we have exceeded logrollsize
391    if (!isLogRollRequested() && writer.getLength() > logrollsize) {
392      if (LOG.isDebugEnabled()) {
393        LOG.debug("Requesting log roll because of file size threshold; length=" + writer.getLength()
394          + ", logrollsize=" + logrollsize);
395      }
396      requestLogRoll(SIZE);
397    }
398  }
399
400  // find all the sync futures between these two txids to see if we need to issue a hsync, if no
401  // sync futures then just use the default one.
402  private boolean isHsync(long beginTxid, long endTxid) {
403    SortedSet<SyncFuture> futures = syncFutures.subSet(new SyncFuture().reset(beginTxid, false),
404      new SyncFuture().reset(endTxid + 1, false));
405    if (futures.isEmpty()) {
406      return useHsync;
407    }
408    for (SyncFuture future : futures) {
409      if (future.isForceSync()) {
410        return true;
411      }
412    }
413    return false;
414  }
415
416  private void sync(AsyncWriter writer) {
417    fileLengthAtLastSync = writer.getLength();
418    long currentHighestProcessedAppendTxid = highestProcessedAppendTxid;
419    boolean shouldUseHsync =
420      isHsync(highestProcessedAppendTxidAtLastSync, currentHighestProcessedAppendTxid);
421    highestProcessedAppendTxidAtLastSync = currentHighestProcessedAppendTxid;
422    final long startTimeNs = System.nanoTime();
423    final long epoch = (long) epochAndState >>> 2L;
424    addListener(writer.sync(shouldUseHsync), (result, error) -> {
425      if (error != null) {
426        syncFailed(epoch, error);
427      } else {
428        syncCompleted(epoch, writer, currentHighestProcessedAppendTxid, startTimeNs);
429      }
430    }, consumeExecutor);
431  }
432
433  private int finishSyncLowerThanTxid(long txid) {
434    int finished = 0;
435    for (Iterator<SyncFuture> iter = syncFutures.iterator(); iter.hasNext();) {
436      SyncFuture sync = iter.next();
437      if (sync.getTxid() <= txid) {
438        markFutureDoneAndOffer(sync, txid, null);
439        iter.remove();
440        finished++;
441      } else {
442        break;
443      }
444    }
445    return finished;
446  }
447
448  // try advancing the highestSyncedTxid as much as possible
449  private int finishSync() {
450    if (unackedAppends.isEmpty()) {
451      // All outstanding appends have been acked.
452      if (toWriteAppends.isEmpty()) {
453        // Also no appends that wait to be written out, then just finished all pending syncs.
454        long maxSyncTxid = highestSyncedTxid.get();
455        for (SyncFuture sync : syncFutures) {
456          maxSyncTxid = Math.max(maxSyncTxid, sync.getTxid());
457          sync.done(maxSyncTxid, null);
458        }
459        highestSyncedTxid.set(maxSyncTxid);
460        int finished = syncFutures.size();
461        syncFutures.clear();
462        return finished;
463      } else {
464        // There is no append between highestProcessedAppendTxid and lowestUnprocessedAppendTxid, so
465        // if highestSyncedTxid >= highestProcessedAppendTxid, then all syncs whose txid are between
466        // highestProcessedAppendTxid and lowestUnprocessedAppendTxid can be finished.
467        long lowestUnprocessedAppendTxid = toWriteAppends.peek().getTxid();
468        assert lowestUnprocessedAppendTxid > highestProcessedAppendTxid;
469        long doneTxid = lowestUnprocessedAppendTxid - 1;
470        highestSyncedTxid.set(doneTxid);
471        return finishSyncLowerThanTxid(doneTxid);
472      }
473    } else {
474      // There are still unacked appends. So let's move the highestSyncedTxid to the txid of the
475      // first unacked append minus 1.
476      long lowestUnackedAppendTxid = unackedAppends.peek().getTxid();
477      long doneTxid = Math.max(lowestUnackedAppendTxid - 1, highestSyncedTxid.get());
478      highestSyncedTxid.set(doneTxid);
479      return finishSyncLowerThanTxid(doneTxid);
480    }
481  }
482
483  // confirm non-empty before calling
484  private static long getLastTxid(Deque<FSWALEntry> queue) {
485    return queue.peekLast().getTxid();
486  }
487
488  private void appendAndSync() {
489    final AsyncWriter writer = this.writer;
490    // maybe a sync request is not queued when we issue a sync, so check here to see if we could
491    // finish some.
492    finishSync();
493    long newHighestProcessedAppendTxid = -1L;
494    // this is used to avoid calling peedLast every time on unackedAppends, appendAndAsync is single
495    // threaded, this could save us some cycles
496    boolean addedToUnackedAppends = false;
497    for (Iterator<FSWALEntry> iter = toWriteAppends.iterator(); iter.hasNext();) {
498      FSWALEntry entry = iter.next();
499      boolean appended;
500      try {
501        appended = appendEntry(writer, entry);
502      } catch (IOException e) {
503        throw new AssertionError("should not happen", e);
504      }
505      newHighestProcessedAppendTxid = entry.getTxid();
506      iter.remove();
507      if (appended) {
508        // This is possible, when we fail to sync, we will add the unackedAppends back to
509        // toWriteAppends, so here we may get an entry which is already in the unackedAppends.
510        if (
511          addedToUnackedAppends || unackedAppends.isEmpty()
512            || getLastTxid(unackedAppends) < entry.getTxid()
513        ) {
514          unackedAppends.addLast(entry);
515          addedToUnackedAppends = true;
516        }
517        // See HBASE-25905, here we need to make sure that, we will always write all the entries in
518        // unackedAppends out. As the code in the consume method will assume that, the entries in
519        // unackedAppends have all been sent out so if there is roll request and unackedAppends is
520        // not empty, we could just return as later there will be a syncCompleted call to clear the
521        // unackedAppends, or a syncFailed to lead us to another state.
522        // There could be other ways to fix, such as changing the logic in the consume method, but
523        // it will break the assumption and then (may) lead to a big refactoring. So here let's use
524        // this way to fix first, can optimize later.
525        if (
526          writer.getLength() - fileLengthAtLastSync >= batchSize
527            && (addedToUnackedAppends || entry.getTxid() >= getLastTxid(unackedAppends))
528        ) {
529          break;
530        }
531      }
532    }
533    // if we have a newer transaction id, update it.
534    // otherwise, use the previous transaction id.
535    if (newHighestProcessedAppendTxid > 0) {
536      highestProcessedAppendTxid = newHighestProcessedAppendTxid;
537    } else {
538      newHighestProcessedAppendTxid = highestProcessedAppendTxid;
539    }
540
541    if (writer.getLength() - fileLengthAtLastSync >= batchSize) {
542      // sync because buffer size limit.
543      sync(writer);
544      return;
545    }
546    if (writer.getLength() == fileLengthAtLastSync) {
547      // we haven't written anything out, just advance the highestSyncedSequence since we may only
548      // stamped some region sequence id.
549      if (unackedAppends.isEmpty()) {
550        highestSyncedTxid.set(highestProcessedAppendTxid);
551        finishSync();
552        trySetReadyForRolling();
553      }
554      return;
555    }
556    // reach here means that we have some unsynced data but haven't reached the batch size yet
557    // but we will not issue a sync directly here even if there are sync requests because we may
558    // have some new data in the ringbuffer, so let's just return here and delay the decision of
559    // whether to issue a sync in the caller method.
560  }
561
562  private void consume() {
563    consumeLock.lock();
564    try {
565      int currentEpochAndState = epochAndState;
566      if (writerBroken(currentEpochAndState)) {
567        return;
568      }
569      if (waitingRoll(currentEpochAndState)) {
570        if (writer.getLength() > fileLengthAtLastSync) {
571          // issue a sync
572          sync(writer);
573        } else {
574          if (unackedAppends.isEmpty()) {
575            readyForRolling = true;
576            readyForRollingCond.signalAll();
577          }
578        }
579        return;
580      }
581    } finally {
582      consumeLock.unlock();
583    }
584    long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1;
585    for (long cursorBound = waitingConsumePayloads.getCursor(); nextCursor
586        <= cursorBound; nextCursor++) {
587      if (!waitingConsumePayloads.isPublished(nextCursor)) {
588        break;
589      }
590      RingBufferTruck truck = waitingConsumePayloads.get(nextCursor);
591      switch (truck.type()) {
592        case APPEND:
593          toWriteAppends.addLast(truck.unloadAppend());
594          break;
595        case SYNC:
596          syncFutures.add(truck.unloadSync());
597          break;
598        default:
599          LOG.warn("RingBufferTruck with unexpected type: " + truck.type());
600          break;
601      }
602      waitingConsumePayloadsGatingSequence.set(nextCursor);
603    }
604    appendAndSync();
605    if (hasConsumerTask.get()) {
606      return;
607    }
608    if (toWriteAppends.isEmpty()) {
609      if (waitingConsumePayloadsGatingSequence.get() == waitingConsumePayloads.getCursor()) {
610        consumerScheduled.set(false);
611        // recheck here since in append and sync we do not hold the consumeLock. Thing may
612        // happen like
613        // 1. we check cursor, no new entry
614        // 2. someone publishes a new entry to ringbuffer and the consumerScheduled is true and
615        // give up scheduling the consumer task.
616        // 3. we set consumerScheduled to false and also give up scheduling consumer task.
617        if (waitingConsumePayloadsGatingSequence.get() == waitingConsumePayloads.getCursor()) {
618          // we will give up consuming so if there are some unsynced data we need to issue a sync.
619          if (
620            writer.getLength() > fileLengthAtLastSync && !syncFutures.isEmpty()
621              && syncFutures.last().getTxid() > highestProcessedAppendTxidAtLastSync
622          ) {
623            // no new data in the ringbuffer and we have at least one sync request
624            sync(writer);
625          }
626          return;
627        } else {
628          // maybe someone has grabbed this before us
629          if (!consumerScheduled.compareAndSet(false, true)) {
630            return;
631          }
632        }
633      }
634    }
635    // reschedule if we still have something to write.
636    consumeExecutor.execute(consumer);
637  }
638
639  private boolean shouldScheduleConsumer() {
640    int currentEpochAndState = epochAndState;
641    if (writerBroken(currentEpochAndState) || waitingRoll(currentEpochAndState)) {
642      return false;
643    }
644    return consumerScheduled.compareAndSet(false, true);
645  }
646
647  @Override
648  protected long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore)
649    throws IOException {
650    long txid =
651      stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, waitingConsumePayloads);
652    if (shouldScheduleConsumer()) {
653      consumeExecutor.execute(consumer);
654    }
655    return txid;
656  }
657
658  @Override
659  protected void doSync(boolean forceSync) throws IOException {
660    long txid = waitingConsumePayloads.next();
661    SyncFuture future;
662    try {
663      future = getSyncFuture(txid, forceSync);
664      RingBufferTruck truck = waitingConsumePayloads.get(txid);
665      truck.load(future);
666    } finally {
667      waitingConsumePayloads.publish(txid);
668    }
669    if (shouldScheduleConsumer()) {
670      consumeExecutor.execute(consumer);
671    }
672    blockOnSync(future);
673  }
674
675  @Override
676  protected void doSync(long txid, boolean forceSync) throws IOException {
677    if (highestSyncedTxid.get() >= txid) {
678      return;
679    }
680    // here we do not use ring buffer sequence as txid
681    long sequence = waitingConsumePayloads.next();
682    SyncFuture future;
683    try {
684      future = getSyncFuture(txid, forceSync);
685      RingBufferTruck truck = waitingConsumePayloads.get(sequence);
686      truck.load(future);
687    } finally {
688      waitingConsumePayloads.publish(sequence);
689    }
690    if (shouldScheduleConsumer()) {
691      consumeExecutor.execute(consumer);
692    }
693    blockOnSync(future);
694  }
695
696  @Override
697  protected AsyncWriter createWriterInstance(Path path) throws IOException {
698    return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, false, this.blocksize,
699      eventLoopGroup, channelClass, streamSlowMonitor);
700  }
701
702  private void waitForSafePoint() {
703    consumeLock.lock();
704    try {
705      int currentEpochAndState = epochAndState;
706      if (writerBroken(currentEpochAndState) || this.writer == null) {
707        return;
708      }
709      consumerScheduled.set(true);
710      epochAndState = currentEpochAndState | 1;
711      readyForRolling = false;
712      consumeExecutor.execute(consumer);
713      while (!readyForRolling) {
714        readyForRollingCond.awaitUninterruptibly();
715      }
716    } finally {
717      consumeLock.unlock();
718    }
719  }
720
721  protected final long closeWriter(AsyncWriter writer, Path path) {
722    if (writer != null) {
723      inflightWALClosures.put(path.getName(), writer);
724      long fileLength = writer.getLength();
725      closeExecutor.execute(() -> {
726        try {
727          writer.close();
728        } catch (IOException e) {
729          LOG.warn("close old writer failed", e);
730        } finally {
731          inflightWALClosures.remove(path.getName());
732        }
733      });
734      return fileLength;
735    } else {
736      return 0L;
737    }
738  }
739
740  @Override
741  protected void doReplaceWriter(Path oldPath, Path newPath, AsyncWriter nextWriter)
742    throws IOException {
743    Preconditions.checkNotNull(nextWriter);
744    waitForSafePoint();
745    long oldFileLen = closeWriter(this.writer, oldPath);
746    logRollAndSetupWalProps(oldPath, newPath, oldFileLen);
747    this.writer = nextWriter;
748    if (nextWriter instanceof AsyncProtobufLogWriter) {
749      this.fsOut = ((AsyncProtobufLogWriter) nextWriter).getOutput();
750    }
751    this.fileLengthAtLastSync = nextWriter.getLength();
752    this.highestProcessedAppendTxidAtLastSync = 0L;
753    consumeLock.lock();
754    try {
755      consumerScheduled.set(true);
756      int currentEpoch = epochAndState >>> 2;
757      int nextEpoch = currentEpoch == MAX_EPOCH ? 0 : currentEpoch + 1;
758      // set a new epoch and also clear waitingRoll and writerBroken
759      this.epochAndState = nextEpoch << 2;
760      // Reset rollRequested status
761      rollRequested.set(false);
762      consumeExecutor.execute(consumer);
763    } finally {
764      consumeLock.unlock();
765    }
766  }
767
768  @Override
769  protected void doShutdown() throws IOException {
770    waitForSafePoint();
771    closeWriter(this.writer, getOldPath());
772    this.writer = null;
773    closeExecutor.shutdown();
774    try {
775      if (!closeExecutor.awaitTermination(waitOnShutdownInSeconds, TimeUnit.SECONDS)) {
776        LOG.error("We have waited " + waitOnShutdownInSeconds + " seconds but"
777          + " the close of async writer doesn't complete."
778          + "Please check the status of underlying filesystem"
779          + " or increase the wait time by the config \"" + ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS
780          + "\"");
781      }
782    } catch (InterruptedException e) {
783      LOG.error("The wait for close of async writer is interrupted");
784      Thread.currentThread().interrupt();
785    }
786    IOException error = new IOException("WAL has been closed");
787    long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1;
788    // drain all the pending sync requests
789    for (long cursorBound = waitingConsumePayloads.getCursor(); nextCursor
790        <= cursorBound; nextCursor++) {
791      if (!waitingConsumePayloads.isPublished(nextCursor)) {
792        break;
793      }
794      RingBufferTruck truck = waitingConsumePayloads.get(nextCursor);
795      switch (truck.type()) {
796        case SYNC:
797          syncFutures.add(truck.unloadSync());
798          break;
799        default:
800          break;
801      }
802    }
803    // and fail them
804    syncFutures.forEach(f -> markFutureDoneAndOffer(f, f.getTxid(), error));
805    if (!(consumeExecutor instanceof EventLoop)) {
806      consumeExecutor.shutdown();
807    }
808  }
809
810  @Override
811  protected void doAppend(AsyncWriter writer, FSWALEntry entry) {
812    writer.append(entry);
813  }
814
815  @Override
816  DatanodeInfo[] getPipeline() {
817    AsyncFSOutput output = this.fsOut;
818    return output != null ? output.getPipeline() : new DatanodeInfo[0];
819  }
820
821  @Override
822  int getLogReplication() {
823    return getPipeline().length;
824  }
825
826  @Override
827  protected boolean doCheckLogLowReplication() {
828    // not like FSHLog, AsyncFSOutput will fail immediately if there are errors writing to DNs, so
829    // typically there is no 'low replication' state, only a 'broken' state.
830    AsyncFSOutput output = this.fsOut;
831    return output != null && output.isBroken();
832  }
833}