001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.ERROR;
021import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.SIZE;
022import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
023
024import com.lmax.disruptor.RingBuffer;
025import com.lmax.disruptor.Sequence;
026import com.lmax.disruptor.Sequencer;
027import java.io.IOException;
028import java.lang.reflect.Field;
029import java.util.ArrayDeque;
030import java.util.Comparator;
031import java.util.Deque;
032import java.util.Iterator;
033import java.util.List;
034import java.util.Queue;
035import java.util.SortedSet;
036import java.util.TreeSet;
037import java.util.concurrent.ExecutorService;
038import java.util.concurrent.Executors;
039import java.util.concurrent.LinkedBlockingQueue;
040import java.util.concurrent.ThreadPoolExecutor;
041import java.util.concurrent.TimeUnit;
042import java.util.concurrent.atomic.AtomicBoolean;
043import java.util.concurrent.locks.Condition;
044import java.util.concurrent.locks.Lock;
045import java.util.concurrent.locks.ReentrantLock;
046import java.util.function.Supplier;
047import org.apache.hadoop.conf.Configuration;
048import org.apache.hadoop.fs.FileSystem;
049import org.apache.hadoop.fs.Path;
050import org.apache.hadoop.hbase.Abortable;
051import org.apache.hadoop.hbase.HBaseInterfaceAudience;
052import org.apache.hadoop.hbase.client.RegionInfo;
053import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
054import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
055import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
056import org.apache.hadoop.hbase.wal.WALEdit;
057import org.apache.hadoop.hbase.wal.WALKeyImpl;
058import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter;
059import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
060import org.apache.yetus.audience.InterfaceAudience;
061import org.slf4j.Logger;
062import org.slf4j.LoggerFactory;
063
064import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
065import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
066import org.apache.hbase.thirdparty.io.netty.channel.Channel;
067import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
068import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
069import org.apache.hbase.thirdparty.io.netty.util.concurrent.SingleThreadEventExecutor;
070
071/**
072 * An asynchronous implementation of FSWAL.
073 * <p>
074 * Here 'waitingConsumePayloads' acts as the RingBuffer in FSHLog.
075 * <p>
076 * For append, we process it as follow:
077 * <ol>
078 * <li>In the caller thread(typically, in the rpc handler thread):
079 * <ol>
080 * <li>Insert the entry into 'waitingConsumePayloads'. Use ringbuffer sequence as txid.</li>
081 * <li>Schedule the consumer task if needed. See {@link #shouldScheduleConsumer()} for more details.
082 * </li>
083 * </ol>
084 * </li>
085 * <li>In the consumer task(executed in a single threaded thread pool)
086 * <ol>
087 * <li>Poll the entry from {@link #waitingConsumePayloads} and insert it into
088 * {@link #toWriteAppends}</li>
089 * <li>Poll the entry from {@link #toWriteAppends}, append it to the AsyncWriter, and insert it into
090 * {@link #unackedAppends}</li>
091 * <li>If the buffered size reaches {@link #batchSize}, or there is a sync request, then we call
092 * sync on the AsyncWriter.</li>
093 * <li>In the callback methods:
094 * <ul>
095 * <li>If succeeded, poll the entry from {@link #unackedAppends} and drop it.</li>
096 * <li>If failed, add all the entries in {@link #unackedAppends} back to {@link #toWriteAppends} and
097 * wait for writing them again.</li>
098 * </ul>
099 * </li>
100 * </ol>
101 * </li>
102 * </ol>
103 * For sync, the processing stages are almost same. And different from FSHLog, we will open a new
104 * writer and rewrite unacked entries to the new writer and sync again if we hit a sync error.
105 * <p>
106 * Here we only describe the logic of doReplaceWriter. The main logic of rollWriter is same with
107 * FSHLog.<br>
108 * For a normal roll request(for example, we have reached the log roll size):
109 * <ol>
110 * <li>In the log roller thread, we will set {@link #waitingRoll} to true and
111 * {@link #readyForRolling} to false, and then wait on {@link #readyForRolling}(see
112 * {@link #waitForSafePoint()}).</li>
113 * <li>In the consumer thread, we will stop polling entries from {@link #waitingConsumePayloads} if
114 * {@link #waitingRoll} is true, and also stop writing the entries in {@link #toWriteAppends} out.
115 * </li>
116 * <li>If there are unflush data in the writer, sync them.</li>
117 * <li>When all out-going sync request is finished, i.e, the {@link #unackedAppends} is empty,
118 * signal the {@link #readyForRollingCond}.</li>
119 * <li>Back to the log roller thread, now we can confirm that there are no out-going entries, i.e.,
120 * we reach a safe point. So it is safe to replace old writer with new writer now.</li>
121 * <li>Set {@link #writerBroken} and {@link #waitingRoll} to false.</li>
122 * <li>Schedule the consumer task.</li>
123 * <li>Schedule a background task to close the old writer.</li>
124 * </ol>
125 * For a broken writer roll request, the only difference is that we can bypass the wait for safe
126 * point stage.
127 */
128@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
129public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
130
131  private static final Logger LOG = LoggerFactory.getLogger(AsyncFSWAL.class);
132
133  private static final Comparator<SyncFuture> SEQ_COMPARATOR =
134    Comparator.comparingLong(SyncFuture::getTxid).thenComparingInt(System::identityHashCode);
135
136  public static final String WAL_BATCH_SIZE = "hbase.wal.batch.size";
137  public static final long DEFAULT_WAL_BATCH_SIZE = 64L * 1024;
138
139  public static final String ASYNC_WAL_USE_SHARED_EVENT_LOOP =
140    "hbase.wal.async.use-shared-event-loop";
141  public static final boolean DEFAULT_ASYNC_WAL_USE_SHARED_EVENT_LOOP = false;
142
143  public static final String ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS =
144    "hbase.wal.async.wait.on.shutdown.seconds";
145  public static final int DEFAULT_ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS = 5;
146
147  private final EventLoopGroup eventLoopGroup;
148
149  private final ExecutorService consumeExecutor;
150
151  private final Class<? extends Channel> channelClass;
152
153  private final Lock consumeLock = new ReentrantLock();
154
155  private final Runnable consumer = this::consume;
156
157  // check if there is already a consumer task in the event loop's task queue
158  private final Supplier<Boolean> hasConsumerTask;
159
160  private static final int MAX_EPOCH = 0x3FFFFFFF;
161  // the lowest bit is waitingRoll, which means new writer is created and we are waiting for old
162  // writer to be closed.
163  // the second lowest bit is writerBroken which means the current writer is broken and rollWriter
164  // is needed.
165  // all other bits are the epoch number of the current writer, this is used to detect whether the
166  // writer is still the one when you issue the sync.
167  // notice that, modification to this field is only allowed under the protection of consumeLock.
168  private volatile int epochAndState;
169
170  private boolean readyForRolling;
171
172  private final Condition readyForRollingCond = consumeLock.newCondition();
173
174  private final RingBuffer<RingBufferTruck> waitingConsumePayloads;
175
176  private final Sequence waitingConsumePayloadsGatingSequence;
177
178  private final AtomicBoolean consumerScheduled = new AtomicBoolean(false);
179
180  private final long batchSize;
181
182  private final ExecutorService closeExecutor = Executors.newCachedThreadPool(
183    new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build());
184
185  private volatile AsyncFSOutput fsOut;
186
187  private final Deque<FSWALEntry> toWriteAppends = new ArrayDeque<>();
188
189  private final Deque<FSWALEntry> unackedAppends = new ArrayDeque<>();
190
191  private final SortedSet<SyncFuture> syncFutures = new TreeSet<>(SEQ_COMPARATOR);
192
193  // the highest txid of WAL entries being processed
194  private long highestProcessedAppendTxid;
195
196  // file length when we issue last sync request on the writer
197  private long fileLengthAtLastSync;
198
199  private long highestProcessedAppendTxidAtLastSync;
200
201  private final int waitOnShutdownInSeconds;
202
203  private final StreamSlowMonitor streamSlowMonitor;
204
205  public AsyncFSWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir,
206    Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists, String prefix,
207    String suffix, EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass)
208    throws FailedLogCloseException, IOException {
209    this(fs, null, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix,
210      eventLoopGroup, channelClass, StreamSlowMonitor.create(conf, "monitorForSuffix"));
211  }
212
213  public AsyncFSWAL(FileSystem fs, Abortable abortable, Path rootDir, String logDir,
214    String archiveDir, Configuration conf, List<WALActionsListener> listeners,
215    boolean failIfWALExists, String prefix, String suffix, EventLoopGroup eventLoopGroup,
216    Class<? extends Channel> channelClass, StreamSlowMonitor monitor)
217    throws FailedLogCloseException, IOException {
218    super(fs, abortable, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix,
219      suffix);
220    this.eventLoopGroup = eventLoopGroup;
221    this.channelClass = channelClass;
222    this.streamSlowMonitor = monitor;
223    Supplier<Boolean> hasConsumerTask;
224    if (conf.getBoolean(ASYNC_WAL_USE_SHARED_EVENT_LOOP, DEFAULT_ASYNC_WAL_USE_SHARED_EVENT_LOOP)) {
225      this.consumeExecutor = eventLoopGroup.next();
226      if (consumeExecutor instanceof SingleThreadEventExecutor) {
227        try {
228          Field field = SingleThreadEventExecutor.class.getDeclaredField("taskQueue");
229          field.setAccessible(true);
230          Queue<?> queue = (Queue<?>) field.get(consumeExecutor);
231          hasConsumerTask = () -> queue.peek() == consumer;
232        } catch (Exception e) {
233          LOG.warn("Can not get task queue of " + consumeExecutor
234            + ", this is not necessary, just give up", e);
235          hasConsumerTask = () -> false;
236        }
237      } else {
238        hasConsumerTask = () -> false;
239      }
240    } else {
241      ThreadPoolExecutor threadPool =
242        new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
243          new ThreadFactoryBuilder().setNameFormat("AsyncFSWAL-%d-" + rootDir.toString()
244            + "-prefix:" + (prefix == null ? "default" : prefix).replace("%", "%%")).setDaemon(true)
245            .build());
246      hasConsumerTask = () -> threadPool.getQueue().peek() == consumer;
247      this.consumeExecutor = threadPool;
248    }
249
250    this.hasConsumerTask = hasConsumerTask;
251    int preallocatedEventCount =
252      conf.getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16);
253    waitingConsumePayloads =
254      RingBuffer.createMultiProducer(RingBufferTruck::new, preallocatedEventCount);
255    waitingConsumePayloadsGatingSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
256    waitingConsumePayloads.addGatingSequences(waitingConsumePayloadsGatingSequence);
257
258    // inrease the ringbuffer sequence so our txid is start from 1
259    waitingConsumePayloads.publish(waitingConsumePayloads.next());
260    waitingConsumePayloadsGatingSequence.set(waitingConsumePayloads.getCursor());
261
262    batchSize = conf.getLong(WAL_BATCH_SIZE, DEFAULT_WAL_BATCH_SIZE);
263    waitOnShutdownInSeconds = conf.getInt(ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS,
264      DEFAULT_ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS);
265  }
266
267  /**
268   * Helper that marks the future as DONE and offers it back to the cache.
269   */
270  private void markFutureDoneAndOffer(SyncFuture future, long txid, Throwable t) {
271    future.done(txid, t);
272    syncFutureCache.offer(future);
273  }
274
275  private static boolean waitingRoll(int epochAndState) {
276    return (epochAndState & 1) != 0;
277  }
278
279  private static boolean writerBroken(int epochAndState) {
280    return ((epochAndState >>> 1) & 1) != 0;
281  }
282
283  private static int epoch(int epochAndState) {
284    return epochAndState >>> 2;
285  }
286
287  // return whether we have successfully set readyForRolling to true.
288  private boolean trySetReadyForRolling() {
289    // Check without holding lock first. Usually we will just return here.
290    // waitingRoll is volatile and unacedEntries is only accessed inside event loop so it is safe to
291    // check them outside the consumeLock.
292    if (!waitingRoll(epochAndState) || !unackedAppends.isEmpty()) {
293      return false;
294    }
295    consumeLock.lock();
296    try {
297      // 1. a roll is requested
298      // 2. all out-going entries have been acked(we have confirmed above).
299      if (waitingRoll(epochAndState)) {
300        readyForRolling = true;
301        readyForRollingCond.signalAll();
302        return true;
303      } else {
304        return false;
305      }
306    } finally {
307      consumeLock.unlock();
308    }
309  }
310
311  private void syncFailed(long epochWhenSync, Throwable error) {
312    LOG.warn("sync failed", error);
313    boolean shouldRequestLogRoll = true;
314    consumeLock.lock();
315    try {
316      int currentEpochAndState = epochAndState;
317      if (epoch(currentEpochAndState) != epochWhenSync || writerBroken(currentEpochAndState)) {
318        // this is not the previous writer which means we have already rolled the writer.
319        // or this is still the current writer, but we have already marked it as broken and request
320        // a roll.
321        return;
322      }
323      this.epochAndState = currentEpochAndState | 0b10;
324      if (waitingRoll(currentEpochAndState)) {
325        readyForRolling = true;
326        readyForRollingCond.signalAll();
327        // this means we have already in the middle of a rollWriter so just tell the roller thread
328        // that you can continue without requesting an extra log roll.
329        shouldRequestLogRoll = false;
330      }
331    } finally {
332      consumeLock.unlock();
333    }
334    for (Iterator<FSWALEntry> iter = unackedAppends.descendingIterator(); iter.hasNext();) {
335      toWriteAppends.addFirst(iter.next());
336    }
337    highestUnsyncedTxid = highestSyncedTxid.get();
338    if (shouldRequestLogRoll) {
339      // request a roll.
340      requestLogRoll(ERROR);
341    }
342  }
343
344  private void syncCompleted(long epochWhenSync, AsyncWriter writer, long processedTxid,
345    long startTimeNs) {
346    // Please see the last several comments on HBASE-22761, it is possible that we get a
347    // syncCompleted which acks a previous sync request after we received a syncFailed on the same
348    // writer. So here we will also check on the epoch and state, if the epoch has already been
349    // changed, i.e, we have already rolled the writer, or the writer is already broken, we should
350    // just skip here, to avoid mess up the state or accidentally release some WAL entries and
351    // cause data corruption.
352    // The syncCompleted call is on the critical write path so we should try our best to make it
353    // fast. So here we do not hold consumeLock, for increasing performance. It is safe because
354    // there are only 3 possible situations:
355    // 1. For normal case, the only place where we change epochAndState is when rolling the writer.
356    // Before rolling actually happen, we will only change the state to waitingRoll which is another
357    // bit than writerBroken, and when we actually change the epoch, we can make sure that there is
358    // no out going sync request. So we will always pass the check here and there is no problem.
359    // 2. The writer is broken, but we have not called syncFailed yet. In this case, since
360    // syncFailed and syncCompleted are executed in the same thread, we will just face the same
361    // situation with #1.
362    // 3. The writer is broken, and syncFailed has been called. Then when we arrive here, there are
363    // only 2 possible situations:
364    // a. we arrive before we actually roll the writer, then we will find out the writer is broken
365    // and give up.
366    // b. we arrive after we actually roll the writer, then we will find out the epoch is changed
367    // and give up.
368    // For both #a and #b, we do not need to hold the consumeLock as we will always update the
369    // epochAndState as a whole.
370    // So in general, for all the cases above, we do not need to hold the consumeLock.
371    int epochAndState = this.epochAndState;
372    if (epoch(epochAndState) != epochWhenSync || writerBroken(epochAndState)) {
373      LOG.warn("Got a sync complete call after the writer is broken, skip");
374      return;
375    }
376    highestSyncedTxid.set(processedTxid);
377    for (Iterator<FSWALEntry> iter = unackedAppends.iterator(); iter.hasNext();) {
378      FSWALEntry entry = iter.next();
379      if (entry.getTxid() <= processedTxid) {
380        entry.release();
381        iter.remove();
382      } else {
383        break;
384      }
385    }
386    postSync(System.nanoTime() - startTimeNs, finishSync());
387    if (trySetReadyForRolling()) {
388      // we have just finished a roll, then do not need to check for log rolling, the writer will be
389      // closed soon.
390      return;
391    }
392    // If we haven't already requested a roll, check if we have exceeded logrollsize
393    if (!isLogRollRequested() && writer.getLength() > logrollsize) {
394      if (LOG.isDebugEnabled()) {
395        LOG.debug("Requesting log roll because of file size threshold; length=" + writer.getLength()
396          + ", logrollsize=" + logrollsize);
397      }
398      requestLogRoll(SIZE);
399    }
400  }
401
402  // find all the sync futures between these two txids to see if we need to issue a hsync, if no
403  // sync futures then just use the default one.
404  private boolean isHsync(long beginTxid, long endTxid) {
405    SortedSet<SyncFuture> futures = syncFutures.subSet(new SyncFuture().reset(beginTxid, false),
406      new SyncFuture().reset(endTxid + 1, false));
407    if (futures.isEmpty()) {
408      return useHsync;
409    }
410    for (SyncFuture future : futures) {
411      if (future.isForceSync()) {
412        return true;
413      }
414    }
415    return false;
416  }
417
418  private void sync(AsyncWriter writer) {
419    fileLengthAtLastSync = writer.getLength();
420    long currentHighestProcessedAppendTxid = highestProcessedAppendTxid;
421    boolean shouldUseHsync =
422      isHsync(highestProcessedAppendTxidAtLastSync, currentHighestProcessedAppendTxid);
423    highestProcessedAppendTxidAtLastSync = currentHighestProcessedAppendTxid;
424    final long startTimeNs = System.nanoTime();
425    final long epoch = (long) epochAndState >>> 2L;
426    addListener(writer.sync(shouldUseHsync), (result, error) -> {
427      if (error != null) {
428        syncFailed(epoch, error);
429      } else {
430        syncCompleted(epoch, writer, currentHighestProcessedAppendTxid, startTimeNs);
431      }
432    }, consumeExecutor);
433  }
434
435  private int finishSyncLowerThanTxid(long txid) {
436    int finished = 0;
437    for (Iterator<SyncFuture> iter = syncFutures.iterator(); iter.hasNext();) {
438      SyncFuture sync = iter.next();
439      if (sync.getTxid() <= txid) {
440        markFutureDoneAndOffer(sync, txid, null);
441        iter.remove();
442        finished++;
443      } else {
444        break;
445      }
446    }
447    return finished;
448  }
449
450  // try advancing the highestSyncedTxid as much as possible
451  private int finishSync() {
452    if (unackedAppends.isEmpty()) {
453      // All outstanding appends have been acked.
454      if (toWriteAppends.isEmpty()) {
455        // Also no appends that wait to be written out, then just finished all pending syncs.
456        long maxSyncTxid = highestSyncedTxid.get();
457        for (SyncFuture sync : syncFutures) {
458          maxSyncTxid = Math.max(maxSyncTxid, sync.getTxid());
459          markFutureDoneAndOffer(sync, maxSyncTxid, null);
460        }
461        highestSyncedTxid.set(maxSyncTxid);
462        int finished = syncFutures.size();
463        syncFutures.clear();
464        return finished;
465      } else {
466        // There is no append between highestProcessedAppendTxid and lowestUnprocessedAppendTxid, so
467        // if highestSyncedTxid >= highestProcessedAppendTxid, then all syncs whose txid are between
468        // highestProcessedAppendTxid and lowestUnprocessedAppendTxid can be finished.
469        long lowestUnprocessedAppendTxid = toWriteAppends.peek().getTxid();
470        assert lowestUnprocessedAppendTxid > highestProcessedAppendTxid;
471        long doneTxid = lowestUnprocessedAppendTxid - 1;
472        highestSyncedTxid.set(doneTxid);
473        return finishSyncLowerThanTxid(doneTxid);
474      }
475    } else {
476      // There are still unacked appends. So let's move the highestSyncedTxid to the txid of the
477      // first unacked append minus 1.
478      long lowestUnackedAppendTxid = unackedAppends.peek().getTxid();
479      long doneTxid = Math.max(lowestUnackedAppendTxid - 1, highestSyncedTxid.get());
480      highestSyncedTxid.set(doneTxid);
481      return finishSyncLowerThanTxid(doneTxid);
482    }
483  }
484
485  // confirm non-empty before calling
486  private static long getLastTxid(Deque<FSWALEntry> queue) {
487    return queue.peekLast().getTxid();
488  }
489
490  private void appendAndSync() {
491    final AsyncWriter writer = this.writer;
492    // maybe a sync request is not queued when we issue a sync, so check here to see if we could
493    // finish some.
494    finishSync();
495    long newHighestProcessedAppendTxid = -1L;
496    // this is used to avoid calling peedLast every time on unackedAppends, appendAndAsync is single
497    // threaded, this could save us some cycles
498    boolean addedToUnackedAppends = false;
499    for (Iterator<FSWALEntry> iter = toWriteAppends.iterator(); iter.hasNext();) {
500      FSWALEntry entry = iter.next();
501      boolean appended;
502      try {
503        appended = appendEntry(writer, entry);
504      } catch (IOException e) {
505        throw new AssertionError("should not happen", e);
506      }
507      newHighestProcessedAppendTxid = entry.getTxid();
508      iter.remove();
509      if (appended) {
510        // This is possible, when we fail to sync, we will add the unackedAppends back to
511        // toWriteAppends, so here we may get an entry which is already in the unackedAppends.
512        if (
513          addedToUnackedAppends || unackedAppends.isEmpty()
514            || getLastTxid(unackedAppends) < entry.getTxid()
515        ) {
516          unackedAppends.addLast(entry);
517          addedToUnackedAppends = true;
518        }
519        // See HBASE-25905, here we need to make sure that, we will always write all the entries in
520        // unackedAppends out. As the code in the consume method will assume that, the entries in
521        // unackedAppends have all been sent out so if there is roll request and unackedAppends is
522        // not empty, we could just return as later there will be a syncCompleted call to clear the
523        // unackedAppends, or a syncFailed to lead us to another state.
524        // There could be other ways to fix, such as changing the logic in the consume method, but
525        // it will break the assumption and then (may) lead to a big refactoring. So here let's use
526        // this way to fix first, can optimize later.
527        if (
528          writer.getLength() - fileLengthAtLastSync >= batchSize
529            && (addedToUnackedAppends || entry.getTxid() >= getLastTxid(unackedAppends))
530        ) {
531          break;
532        }
533      }
534    }
535    // if we have a newer transaction id, update it.
536    // otherwise, use the previous transaction id.
537    if (newHighestProcessedAppendTxid > 0) {
538      highestProcessedAppendTxid = newHighestProcessedAppendTxid;
539    } else {
540      newHighestProcessedAppendTxid = highestProcessedAppendTxid;
541    }
542
543    if (writer.getLength() - fileLengthAtLastSync >= batchSize) {
544      // sync because buffer size limit.
545      sync(writer);
546      return;
547    }
548    if (writer.getLength() == fileLengthAtLastSync) {
549      // we haven't written anything out, just advance the highestSyncedSequence since we may only
550      // stamped some region sequence id.
551      if (unackedAppends.isEmpty()) {
552        highestSyncedTxid.set(highestProcessedAppendTxid);
553        finishSync();
554        trySetReadyForRolling();
555      }
556      return;
557    }
558    // reach here means that we have some unsynced data but haven't reached the batch size yet
559    // but we will not issue a sync directly here even if there are sync requests because we may
560    // have some new data in the ringbuffer, so let's just return here and delay the decision of
561    // whether to issue a sync in the caller method.
562  }
563
564  private void drainNonMarkerEditsAndFailSyncs() {
565    if (toWriteAppends.isEmpty()) {
566      return;
567    }
568    boolean hasNonMarkerEdits = false;
569    Iterator<FSWALEntry> iter = toWriteAppends.descendingIterator();
570    while (iter.hasNext()) {
571      FSWALEntry entry = iter.next();
572      if (!entry.getEdit().isMetaEdit()) {
573        entry.release();
574        hasNonMarkerEdits = true;
575        break;
576      }
577    }
578    if (hasNonMarkerEdits) {
579      for (;;) {
580        iter.remove();
581        if (!iter.hasNext()) {
582          break;
583        }
584        iter.next().release();
585      }
586      for (FSWALEntry entry : unackedAppends) {
587        entry.release();
588      }
589      unackedAppends.clear();
590      // fail the sync futures which are under the txid of the first remaining edit, if none, fail
591      // all the sync futures.
592      long txid = toWriteAppends.isEmpty() ? Long.MAX_VALUE : toWriteAppends.peek().getTxid();
593      IOException error = new IOException("WAL is closing, only marker edit is allowed");
594      for (Iterator<SyncFuture> syncIter = syncFutures.iterator(); syncIter.hasNext();) {
595        SyncFuture future = syncIter.next();
596        if (future.getTxid() < txid) {
597          markFutureDoneAndOffer(future, future.getTxid(), error);
598          syncIter.remove();
599        } else {
600          break;
601        }
602      }
603    }
604  }
605
606  private void consume() {
607    consumeLock.lock();
608    try {
609      int currentEpochAndState = epochAndState;
610      if (writerBroken(currentEpochAndState)) {
611        return;
612      }
613      if (waitingRoll(currentEpochAndState)) {
614        if (writer.getLength() > fileLengthAtLastSync) {
615          // issue a sync
616          sync(writer);
617        } else {
618          if (unackedAppends.isEmpty()) {
619            readyForRolling = true;
620            readyForRollingCond.signalAll();
621          }
622        }
623        return;
624      }
625    } finally {
626      consumeLock.unlock();
627    }
628    long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1;
629    for (long cursorBound = waitingConsumePayloads.getCursor(); nextCursor
630        <= cursorBound; nextCursor++) {
631      if (!waitingConsumePayloads.isPublished(nextCursor)) {
632        break;
633      }
634      RingBufferTruck truck = waitingConsumePayloads.get(nextCursor);
635      switch (truck.type()) {
636        case APPEND:
637          toWriteAppends.addLast(truck.unloadAppend());
638          break;
639        case SYNC:
640          syncFutures.add(truck.unloadSync());
641          break;
642        default:
643          LOG.warn("RingBufferTruck with unexpected type: " + truck.type());
644          break;
645      }
646      waitingConsumePayloadsGatingSequence.set(nextCursor);
647    }
648    if (markerEditOnly()) {
649      drainNonMarkerEditsAndFailSyncs();
650    }
651    appendAndSync();
652    if (hasConsumerTask.get()) {
653      return;
654    }
655    if (toWriteAppends.isEmpty()) {
656      if (waitingConsumePayloadsGatingSequence.get() == waitingConsumePayloads.getCursor()) {
657        consumerScheduled.set(false);
658        // recheck here since in append and sync we do not hold the consumeLock. Thing may
659        // happen like
660        // 1. we check cursor, no new entry
661        // 2. someone publishes a new entry to ringbuffer and the consumerScheduled is true and
662        // give up scheduling the consumer task.
663        // 3. we set consumerScheduled to false and also give up scheduling consumer task.
664        if (waitingConsumePayloadsGatingSequence.get() == waitingConsumePayloads.getCursor()) {
665          // we will give up consuming so if there are some unsynced data we need to issue a sync.
666          if (
667            writer.getLength() > fileLengthAtLastSync && !syncFutures.isEmpty()
668              && syncFutures.last().getTxid() > highestProcessedAppendTxidAtLastSync
669          ) {
670            // no new data in the ringbuffer and we have at least one sync request
671            sync(writer);
672          }
673          return;
674        } else {
675          // maybe someone has grabbed this before us
676          if (!consumerScheduled.compareAndSet(false, true)) {
677            return;
678          }
679        }
680      }
681    }
682    // reschedule if we still have something to write.
683    consumeExecutor.execute(consumer);
684  }
685
686  private boolean shouldScheduleConsumer() {
687    int currentEpochAndState = epochAndState;
688    if (writerBroken(currentEpochAndState) || waitingRoll(currentEpochAndState)) {
689      return false;
690    }
691    return consumerScheduled.compareAndSet(false, true);
692  }
693
694  // This is used by sync replication, where we are going to close the wal soon after we reopen all
695  // the regions. Will be overridden by sub classes.
696  protected boolean markerEditOnly() {
697    return false;
698  }
699
700  @Override
701  protected long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore)
702    throws IOException {
703    if (markerEditOnly() && !edits.isMetaEdit()) {
704      throw new IOException("WAL is closing, only marker edit is allowed");
705    }
706    long txid =
707      stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, waitingConsumePayloads);
708    if (shouldScheduleConsumer()) {
709      consumeExecutor.execute(consumer);
710    }
711    return txid;
712  }
713
714  @Override
715  protected void doSync(boolean forceSync) throws IOException {
716    long txid = waitingConsumePayloads.next();
717    SyncFuture future;
718    try {
719      future = getSyncFuture(txid, forceSync);
720      RingBufferTruck truck = waitingConsumePayloads.get(txid);
721      truck.load(future);
722    } finally {
723      waitingConsumePayloads.publish(txid);
724    }
725    if (shouldScheduleConsumer()) {
726      consumeExecutor.execute(consumer);
727    }
728    blockOnSync(future);
729  }
730
731  @Override
732  protected void doSync(long txid, boolean forceSync) throws IOException {
733    if (highestSyncedTxid.get() >= txid) {
734      return;
735    }
736    // here we do not use ring buffer sequence as txid
737    long sequence = waitingConsumePayloads.next();
738    SyncFuture future;
739    try {
740      future = getSyncFuture(txid, forceSync);
741      RingBufferTruck truck = waitingConsumePayloads.get(sequence);
742      truck.load(future);
743    } finally {
744      waitingConsumePayloads.publish(sequence);
745    }
746    if (shouldScheduleConsumer()) {
747      consumeExecutor.execute(consumer);
748    }
749    blockOnSync(future);
750  }
751
752  protected final AsyncWriter createAsyncWriter(FileSystem fs, Path path) throws IOException {
753    return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, false, this.blocksize,
754      eventLoopGroup, channelClass, streamSlowMonitor);
755  }
756
757  @Override
758  protected AsyncWriter createWriterInstance(Path path) throws IOException {
759    return createAsyncWriter(fs, path);
760  }
761
762  private void waitForSafePoint() {
763    consumeLock.lock();
764    try {
765      int currentEpochAndState = epochAndState;
766      if (writerBroken(currentEpochAndState) || this.writer == null) {
767        return;
768      }
769      consumerScheduled.set(true);
770      epochAndState = currentEpochAndState | 1;
771      readyForRolling = false;
772      consumeExecutor.execute(consumer);
773      while (!readyForRolling) {
774        readyForRollingCond.awaitUninterruptibly();
775      }
776    } finally {
777      consumeLock.unlock();
778    }
779  }
780
781  protected final long closeWriter(AsyncWriter writer, Path path) {
782    if (writer != null) {
783      inflightWALClosures.put(path.getName(), writer);
784      long fileLength = writer.getLength();
785      closeExecutor.execute(() -> {
786        try {
787          writer.close();
788        } catch (IOException e) {
789          LOG.warn("close old writer failed", e);
790        } finally {
791          inflightWALClosures.remove(path.getName());
792        }
793      });
794      return fileLength;
795    } else {
796      return 0L;
797    }
798  }
799
800  @Override
801  protected void doReplaceWriter(Path oldPath, Path newPath, AsyncWriter nextWriter)
802    throws IOException {
803    Preconditions.checkNotNull(nextWriter);
804    waitForSafePoint();
805    long oldFileLen = closeWriter(this.writer, oldPath);
806    logRollAndSetupWalProps(oldPath, newPath, oldFileLen);
807    this.writer = nextWriter;
808    if (nextWriter instanceof AsyncProtobufLogWriter) {
809      this.fsOut = ((AsyncProtobufLogWriter) nextWriter).getOutput();
810    }
811    this.fileLengthAtLastSync = nextWriter.getLength();
812    this.highestProcessedAppendTxidAtLastSync = 0L;
813    consumeLock.lock();
814    try {
815      consumerScheduled.set(true);
816      int currentEpoch = epochAndState >>> 2;
817      int nextEpoch = currentEpoch == MAX_EPOCH ? 0 : currentEpoch + 1;
818      // set a new epoch and also clear waitingRoll and writerBroken
819      this.epochAndState = nextEpoch << 2;
820      // Reset rollRequested status
821      rollRequested.set(false);
822      consumeExecutor.execute(consumer);
823    } finally {
824      consumeLock.unlock();
825    }
826  }
827
828  @Override
829  protected void doShutdown() throws IOException {
830    waitForSafePoint();
831    closeWriter(this.writer, getOldPath());
832    this.writer = null;
833    closeExecutor.shutdown();
834    try {
835      if (!closeExecutor.awaitTermination(waitOnShutdownInSeconds, TimeUnit.SECONDS)) {
836        LOG.error("We have waited " + waitOnShutdownInSeconds + " seconds but"
837          + " the close of async writer doesn't complete."
838          + "Please check the status of underlying filesystem"
839          + " or increase the wait time by the config \"" + ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS
840          + "\"");
841      }
842    } catch (InterruptedException e) {
843      LOG.error("The wait for close of async writer is interrupted");
844      Thread.currentThread().interrupt();
845    }
846    IOException error = new IOException("WAL has been closed");
847    long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1;
848    // drain all the pending sync requests
849    for (long cursorBound = waitingConsumePayloads.getCursor(); nextCursor
850        <= cursorBound; nextCursor++) {
851      if (!waitingConsumePayloads.isPublished(nextCursor)) {
852        break;
853      }
854      RingBufferTruck truck = waitingConsumePayloads.get(nextCursor);
855      switch (truck.type()) {
856        case SYNC:
857          syncFutures.add(truck.unloadSync());
858          break;
859        default:
860          break;
861      }
862    }
863    // and fail them
864    syncFutures.forEach(f -> markFutureDoneAndOffer(f, f.getTxid(), error));
865    if (!(consumeExecutor instanceof EventLoop)) {
866      consumeExecutor.shutdown();
867    }
868  }
869
870  @Override
871  protected void doAppend(AsyncWriter writer, FSWALEntry entry) {
872    writer.append(entry);
873  }
874
875  @Override
876  DatanodeInfo[] getPipeline() {
877    AsyncFSOutput output = this.fsOut;
878    return output != null ? output.getPipeline() : new DatanodeInfo[0];
879  }
880
881  @Override
882  int getLogReplication() {
883    return getPipeline().length;
884  }
885
886  @Override
887  protected boolean doCheckLogLowReplication() {
888    // not like FSHLog, AsyncFSOutput will fail immediately if there are errors writing to DNs, so
889    // typically there is no 'low replication' state, only a 'broken' state.
890    AsyncFSOutput output = this.fsOut;
891    return output != null && output.isBroken();
892  }
893}