001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.ERROR;
021import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.SIZE;
022import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
023
024import com.lmax.disruptor.RingBuffer;
025import com.lmax.disruptor.Sequence;
026import com.lmax.disruptor.Sequencer;
027import java.io.IOException;
028import java.lang.reflect.Field;
029import java.util.ArrayDeque;
030import java.util.Comparator;
031import java.util.Deque;
032import java.util.Iterator;
033import java.util.List;
034import java.util.Queue;
035import java.util.SortedSet;
036import java.util.TreeSet;
037import java.util.concurrent.ExecutorService;
038import java.util.concurrent.Executors;
039import java.util.concurrent.LinkedBlockingQueue;
040import java.util.concurrent.ThreadPoolExecutor;
041import java.util.concurrent.TimeUnit;
042import java.util.concurrent.atomic.AtomicBoolean;
043import java.util.concurrent.locks.Condition;
044import java.util.concurrent.locks.Lock;
045import java.util.concurrent.locks.ReentrantLock;
046import java.util.function.Supplier;
047
048import org.apache.hadoop.conf.Configuration;
049import org.apache.hadoop.fs.FileSystem;
050import org.apache.hadoop.fs.Path;
051import org.apache.hadoop.hbase.Abortable;
052import org.apache.hadoop.hbase.HBaseInterfaceAudience;
053import org.apache.hadoop.hbase.client.RegionInfo;
054import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
055import org.apache.hadoop.hbase.trace.TraceUtil;
056import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
057import org.apache.hadoop.hbase.wal.WALEdit;
058import org.apache.hadoop.hbase.wal.WALKeyImpl;
059import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter;
060import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
061import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
062import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
063import org.apache.htrace.core.TraceScope;
064import org.apache.yetus.audience.InterfaceAudience;
065import org.slf4j.Logger;
066import org.slf4j.LoggerFactory;
067import org.apache.hbase.thirdparty.io.netty.channel.Channel;
068import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
069import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
070import org.apache.hbase.thirdparty.io.netty.util.concurrent.SingleThreadEventExecutor;
071
072
073/**
074 * An asynchronous implementation of FSWAL.
075 * <p>
076 * Here 'waitingConsumePayloads' acts as the RingBuffer in FSHLog.
077 * <p>
078 * For append, we process it as follow:
079 * <ol>
080 * <li>In the caller thread(typically, in the rpc handler thread):
081 * <ol>
082 * <li>Insert the entry into 'waitingConsumePayloads'. Use ringbuffer sequence as txid.</li>
083 * <li>Schedule the consumer task if needed. See {@link #shouldScheduleConsumer()} for more details.
084 * </li>
085 * </ol>
086 * </li>
087 * <li>In the consumer task(executed in a single threaded thread pool)
088 * <ol>
089 * <li>Poll the entry from {@link #waitingConsumePayloads} and insert it into
090 * {@link #toWriteAppends}</li>
091 * <li>Poll the entry from {@link #toWriteAppends}, append it to the AsyncWriter, and insert it into
092 * {@link #unackedAppends}</li>
093 * <li>If the buffered size reaches {@link #batchSize}, or there is a sync request, then we call
094 * sync on the AsyncWriter.</li>
095 * <li>In the callback methods:
096 * <ul>
097 * <li>If succeeded, poll the entry from {@link #unackedAppends} and drop it.</li>
098 * <li>If failed, add all the entries in {@link #unackedAppends} back to {@link #toWriteAppends} and
099 * wait for writing them again.</li>
100 * </ul>
101 * </li>
102 * </ol>
103 * </li>
104 * </ol>
105 * For sync, the processing stages are almost same. And different from FSHLog, we will open a new
106 * writer and rewrite unacked entries to the new writer and sync again if we hit a sync error.
107 * <p>
108 * Here we only describe the logic of doReplaceWriter. The main logic of rollWriter is same with
109 * FSHLog.<br>
110 * For a normal roll request(for example, we have reached the log roll size):
111 * <ol>
112 * <li>In the log roller thread, we will set {@link #waitingRoll} to true and
113 * {@link #readyForRolling} to false, and then wait on {@link #readyForRolling}(see
114 * {@link #waitForSafePoint()}).</li>
115 * <li>In the consumer thread, we will stop polling entries from {@link #waitingConsumePayloads} if
116 * {@link #waitingRoll} is true, and also stop writing the entries in {@link #toWriteAppends} out.
117 * </li>
118 * <li>If there are unflush data in the writer, sync them.</li>
119 * <li>When all out-going sync request is finished, i.e, the {@link #unackedAppends} is empty,
120 * signal the {@link #readyForRollingCond}.</li>
121 * <li>Back to the log roller thread, now we can confirm that there are no out-going entries, i.e.,
122 * we reach a safe point. So it is safe to replace old writer with new writer now.</li>
123 * <li>Set {@link #writerBroken} and {@link #waitingRoll} to false.</li>
124 * <li>Schedule the consumer task.</li>
125 * <li>Schedule a background task to close the old writer.</li>
126 * </ol>
127 * For a broken writer roll request, the only difference is that we can bypass the wait for safe
128 * point stage.
129 */
130@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
131public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
132
133  private static final Logger LOG = LoggerFactory.getLogger(AsyncFSWAL.class);
134
135  private static final Comparator<SyncFuture> SEQ_COMPARATOR = (o1, o2) -> {
136    int c = Long.compare(o1.getTxid(), o2.getTxid());
137    return c != 0 ? c : Integer.compare(System.identityHashCode(o1), System.identityHashCode(o2));
138  };
139
140  public static final String WAL_BATCH_SIZE = "hbase.wal.batch.size";
141  public static final long DEFAULT_WAL_BATCH_SIZE = 64L * 1024;
142
143  public static final String ASYNC_WAL_USE_SHARED_EVENT_LOOP =
144    "hbase.wal.async.use-shared-event-loop";
145  public static final boolean DEFAULT_ASYNC_WAL_USE_SHARED_EVENT_LOOP = false;
146
147  public static final String ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS =
148    "hbase.wal.async.wait.on.shutdown.seconds";
149  public static final int DEFAULT_ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS = 5;
150
151  private final EventLoopGroup eventLoopGroup;
152
153  private final ExecutorService consumeExecutor;
154
155  private final Class<? extends Channel> channelClass;
156
157  private final Lock consumeLock = new ReentrantLock();
158
159  private final Runnable consumer = this::consume;
160
161  // check if there is already a consumer task in the event loop's task queue
162  private final Supplier<Boolean> hasConsumerTask;
163
164  private static final int MAX_EPOCH = 0x3FFFFFFF;
165  // the lowest bit is waitingRoll, which means new writer is created and we are waiting for old
166  // writer to be closed.
167  // the second lowest bit is writerBorken which means the current writer is broken and rollWriter
168  // is needed.
169  // all other bits are the epoch number of the current writer, this is used to detect whether the
170  // writer is still the one when you issue the sync.
171  // notice that, modification to this field is only allowed under the protection of consumeLock.
172  private volatile int epochAndState;
173
174  private boolean readyForRolling;
175
176  private final Condition readyForRollingCond = consumeLock.newCondition();
177
178  private final RingBuffer<RingBufferTruck> waitingConsumePayloads;
179
180  private final Sequence waitingConsumePayloadsGatingSequence;
181
182  private final AtomicBoolean consumerScheduled = new AtomicBoolean(false);
183
184  private final long batchSize;
185
186  private final ExecutorService closeExecutor = Executors.newCachedThreadPool(
187    new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build());
188
189  private volatile AsyncFSOutput fsOut;
190
191  private final Deque<FSWALEntry> toWriteAppends = new ArrayDeque<>();
192
193  private final Deque<FSWALEntry> unackedAppends = new ArrayDeque<>();
194
195  private final SortedSet<SyncFuture> syncFutures = new TreeSet<>(SEQ_COMPARATOR);
196
197  // the highest txid of WAL entries being processed
198  private long highestProcessedAppendTxid;
199
200  // file length when we issue last sync request on the writer
201  private long fileLengthAtLastSync;
202
203  private long highestProcessedAppendTxidAtLastSync;
204
205  private final int waitOnShutdownInSeconds;
206
207  public AsyncFSWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir,
208      Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists,
209      String prefix, String suffix, EventLoopGroup eventLoopGroup,
210      Class<? extends Channel> channelClass) throws FailedLogCloseException, IOException {
211    this(fs, null, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix,
212        eventLoopGroup, channelClass);
213  }
214
215  public AsyncFSWAL(FileSystem fs, Abortable abortable, Path rootDir, String logDir,
216      String archiveDir, Configuration conf, List<WALActionsListener> listeners,
217      boolean failIfWALExists, String prefix, String suffix, EventLoopGroup eventLoopGroup,
218      Class<? extends Channel> channelClass) throws FailedLogCloseException, IOException {
219    super(fs, abortable, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix,
220        suffix);
221    this.eventLoopGroup = eventLoopGroup;
222    this.channelClass = channelClass;
223    Supplier<Boolean> hasConsumerTask;
224    if (conf.getBoolean(ASYNC_WAL_USE_SHARED_EVENT_LOOP, DEFAULT_ASYNC_WAL_USE_SHARED_EVENT_LOOP)) {
225      this.consumeExecutor = eventLoopGroup.next();
226      if (consumeExecutor instanceof SingleThreadEventExecutor) {
227        try {
228          Field field = SingleThreadEventExecutor.class.getDeclaredField("taskQueue");
229          field.setAccessible(true);
230          Queue<?> queue = (Queue<?>) field.get(consumeExecutor);
231          hasConsumerTask = () -> queue.peek() == consumer;
232        } catch (Exception e) {
233          LOG.warn("Can not get task queue of " + consumeExecutor +
234            ", this is not necessary, just give up", e);
235          hasConsumerTask = () -> false;
236        }
237      } else {
238        hasConsumerTask = () -> false;
239      }
240    } else {
241      ThreadPoolExecutor threadPool =
242        new ThreadPoolExecutor(1, 1, 0L,
243          TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
244            new ThreadFactoryBuilder().setNameFormat("AsyncFSWAL-%d-" + rootDir.toString()).
245              setDaemon(true).build());
246      hasConsumerTask = () -> threadPool.getQueue().peek() == consumer;
247      this.consumeExecutor = threadPool;
248    }
249
250    this.hasConsumerTask = hasConsumerTask;
251    int preallocatedEventCount =
252      conf.getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16);
253    waitingConsumePayloads =
254      RingBuffer.createMultiProducer(RingBufferTruck::new, preallocatedEventCount);
255    waitingConsumePayloadsGatingSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
256    waitingConsumePayloads.addGatingSequences(waitingConsumePayloadsGatingSequence);
257
258    // inrease the ringbuffer sequence so our txid is start from 1
259    waitingConsumePayloads.publish(waitingConsumePayloads.next());
260    waitingConsumePayloadsGatingSequence.set(waitingConsumePayloads.getCursor());
261
262    batchSize = conf.getLong(WAL_BATCH_SIZE, DEFAULT_WAL_BATCH_SIZE);
263    waitOnShutdownInSeconds = conf.getInt(ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS,
264      DEFAULT_ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS);
265  }
266
267  private static boolean waitingRoll(int epochAndState) {
268    return (epochAndState & 1) != 0;
269  }
270
271  private static boolean writerBroken(int epochAndState) {
272    return ((epochAndState >>> 1) & 1) != 0;
273  }
274
275  private static int epoch(int epochAndState) {
276    return epochAndState >>> 2;
277  }
278
279  // return whether we have successfully set readyForRolling to true.
280  private boolean trySetReadyForRolling() {
281    // Check without holding lock first. Usually we will just return here.
282    // waitingRoll is volatile and unacedEntries is only accessed inside event loop so it is safe to
283    // check them outside the consumeLock.
284    if (!waitingRoll(epochAndState) || !unackedAppends.isEmpty()) {
285      return false;
286    }
287    consumeLock.lock();
288    try {
289      // 1. a roll is requested
290      // 2. all out-going entries have been acked(we have confirmed above).
291      if (waitingRoll(epochAndState)) {
292        readyForRolling = true;
293        readyForRollingCond.signalAll();
294        return true;
295      } else {
296        return false;
297      }
298    } finally {
299      consumeLock.unlock();
300    }
301  }
302
303  private void syncFailed(long epochWhenSync, Throwable error) {
304    LOG.warn("sync failed", error);
305    boolean shouldRequestLogRoll = true;
306    consumeLock.lock();
307    try {
308      int currentEpochAndState = epochAndState;
309      if (epoch(currentEpochAndState) != epochWhenSync || writerBroken(currentEpochAndState)) {
310        // this is not the previous writer which means we have already rolled the writer.
311        // or this is still the current writer, but we have already marked it as broken and request
312        // a roll.
313        return;
314      }
315      this.epochAndState = currentEpochAndState | 0b10;
316      if (waitingRoll(currentEpochAndState)) {
317        readyForRolling = true;
318        readyForRollingCond.signalAll();
319        // this means we have already in the middle of a rollWriter so just tell the roller thread
320        // that you can continue without requesting an extra log roll.
321        shouldRequestLogRoll = false;
322      }
323    } finally {
324      consumeLock.unlock();
325    }
326    for (Iterator<FSWALEntry> iter = unackedAppends.descendingIterator(); iter.hasNext();) {
327      toWriteAppends.addFirst(iter.next());
328    }
329    highestUnsyncedTxid = highestSyncedTxid.get();
330    if (shouldRequestLogRoll) {
331      // request a roll.
332      requestLogRoll(ERROR);
333    }
334  }
335
336  private void syncCompleted(AsyncWriter writer, long processedTxid, long startTimeNs) {
337    highestSyncedTxid.set(processedTxid);
338    for (Iterator<FSWALEntry> iter = unackedAppends.iterator(); iter.hasNext();) {
339      FSWALEntry entry = iter.next();
340      if (entry.getTxid() <= processedTxid) {
341        entry.release();
342        iter.remove();
343      } else {
344        break;
345      }
346    }
347    postSync(System.nanoTime() - startTimeNs, finishSync(true));
348    if (trySetReadyForRolling()) {
349      // we have just finished a roll, then do not need to check for log rolling, the writer will be
350      // closed soon.
351      return;
352    }
353    // If we haven't already requested a roll, check if we have exceeded logrollsize
354    if (!isLogRollRequested() && writer.getLength() > logrollsize) {
355      if (LOG.isDebugEnabled()) {
356        LOG.debug("Requesting log roll because of file size threshold; length=" +
357          writer.getLength() + ", logrollsize=" + logrollsize);
358      }
359      requestLogRoll(SIZE);
360    }
361  }
362
363  // find all the sync futures between these two txids to see if we need to issue a hsync, if no
364  // sync futures then just use the default one.
365  private boolean isHsync(long beginTxid, long endTxid) {
366    SortedSet<SyncFuture> futures =
367      syncFutures.subSet(new SyncFuture().reset(beginTxid), new SyncFuture().reset(endTxid + 1));
368    if (futures.isEmpty()) {
369      return useHsync;
370    }
371    for (SyncFuture future : futures) {
372      if (future.isForceSync()) {
373        return true;
374      }
375    }
376    return false;
377  }
378
379  private void sync(AsyncWriter writer) {
380    fileLengthAtLastSync = writer.getLength();
381    long currentHighestProcessedAppendTxid = highestProcessedAppendTxid;
382    boolean shouldUseHsync =
383      isHsync(highestProcessedAppendTxidAtLastSync, currentHighestProcessedAppendTxid);
384    highestProcessedAppendTxidAtLastSync = currentHighestProcessedAppendTxid;
385    final long startTimeNs = System.nanoTime();
386    final long epoch = (long) epochAndState >>> 2L;
387    addListener(writer.sync(shouldUseHsync), (result, error) -> {
388      if (error != null) {
389        syncFailed(epoch, error);
390      } else {
391        syncCompleted(writer, currentHighestProcessedAppendTxid, startTimeNs);
392      }
393    }, consumeExecutor);
394  }
395
396  private void addTimeAnnotation(SyncFuture future, String annotation) {
397    TraceUtil.addTimelineAnnotation(annotation);
398    // TODO handle htrace API change, see HBASE-18895
399    // future.setSpan(scope.getSpan());
400  }
401
402  private int finishSyncLowerThanTxid(long txid, boolean addSyncTrace) {
403    int finished = 0;
404    for (Iterator<SyncFuture> iter = syncFutures.iterator(); iter.hasNext();) {
405      SyncFuture sync = iter.next();
406      if (sync.getTxid() <= txid) {
407        sync.done(txid, null);
408        iter.remove();
409        finished++;
410        if (addSyncTrace) {
411          addTimeAnnotation(sync, "writer synced");
412        }
413      } else {
414        break;
415      }
416    }
417    return finished;
418  }
419
420  // try advancing the highestSyncedTxid as much as possible
421  private int finishSync(boolean addSyncTrace) {
422    if (unackedAppends.isEmpty()) {
423      // All outstanding appends have been acked.
424      if (toWriteAppends.isEmpty()) {
425        // Also no appends that wait to be written out, then just finished all pending syncs.
426        long maxSyncTxid = highestSyncedTxid.get();
427        for (SyncFuture sync : syncFutures) {
428          maxSyncTxid = Math.max(maxSyncTxid, sync.getTxid());
429          sync.done(maxSyncTxid, null);
430          if (addSyncTrace) {
431            addTimeAnnotation(sync, "writer synced");
432          }
433        }
434        highestSyncedTxid.set(maxSyncTxid);
435        int finished = syncFutures.size();
436        syncFutures.clear();
437        return finished;
438      } else {
439        // There is no append between highestProcessedAppendTxid and lowestUnprocessedAppendTxid, so
440        // if highestSyncedTxid >= highestProcessedAppendTxid, then all syncs whose txid are between
441        // highestProcessedAppendTxid and lowestUnprocessedAppendTxid can be finished.
442        long lowestUnprocessedAppendTxid = toWriteAppends.peek().getTxid();
443        assert lowestUnprocessedAppendTxid > highestProcessedAppendTxid;
444        long doneTxid = lowestUnprocessedAppendTxid - 1;
445        highestSyncedTxid.set(doneTxid);
446        return finishSyncLowerThanTxid(doneTxid, addSyncTrace);
447      }
448    } else {
449      // There are still unacked appends. So let's move the highestSyncedTxid to the txid of the
450      // first unacked append minus 1.
451      long lowestUnackedAppendTxid = unackedAppends.peek().getTxid();
452      long doneTxid = Math.max(lowestUnackedAppendTxid - 1, highestSyncedTxid.get());
453      highestSyncedTxid.set(doneTxid);
454      return finishSyncLowerThanTxid(doneTxid, addSyncTrace);
455    }
456  }
457
458  private void appendAndSync() {
459    final AsyncWriter writer = this.writer;
460    // maybe a sync request is not queued when we issue a sync, so check here to see if we could
461    // finish some.
462    finishSync(false);
463    long newHighestProcessedAppendTxid = -1L;
464    for (Iterator<FSWALEntry> iter = toWriteAppends.iterator(); iter.hasNext();) {
465      FSWALEntry entry = iter.next();
466      boolean appended;
467      try {
468        appended = appendEntry(writer, entry);
469      } catch (IOException e) {
470        throw new AssertionError("should not happen", e);
471      }
472      newHighestProcessedAppendTxid = entry.getTxid();
473      iter.remove();
474      if (appended) {
475        // This is possible, when we fail to sync, we will add the unackedAppends back to
476        // toWriteAppends, so here we may get an entry which is already in the unackedAppends.
477        if (unackedAppends.isEmpty() || unackedAppends.peekLast().getTxid() < entry.getTxid()) {
478          unackedAppends.addLast(entry);
479        }
480        if (writer.getLength() - fileLengthAtLastSync >= batchSize) {
481          break;
482        }
483      }
484    }
485    // if we have a newer transaction id, update it.
486    // otherwise, use the previous transaction id.
487    if (newHighestProcessedAppendTxid > 0) {
488      highestProcessedAppendTxid = newHighestProcessedAppendTxid;
489    } else {
490      newHighestProcessedAppendTxid = highestProcessedAppendTxid;
491    }
492
493    if (writer.getLength() - fileLengthAtLastSync >= batchSize) {
494      // sync because buffer size limit.
495      sync(writer);
496      return;
497    }
498    if (writer.getLength() == fileLengthAtLastSync) {
499      // we haven't written anything out, just advance the highestSyncedSequence since we may only
500      // stamped some region sequence id.
501      if (unackedAppends.isEmpty()) {
502        highestSyncedTxid.set(highestProcessedAppendTxid);
503        finishSync(false);
504        trySetReadyForRolling();
505      }
506      return;
507    }
508    // reach here means that we have some unsynced data but haven't reached the batch size yet
509    // but we will not issue a sync directly here even if there are sync requests because we may
510    // have some new data in the ringbuffer, so let's just return here and delay the decision of
511    // whether to issue a sync in the caller method.
512  }
513
514  private void consume() {
515    consumeLock.lock();
516    try {
517      int currentEpochAndState = epochAndState;
518      if (writerBroken(currentEpochAndState)) {
519        return;
520      }
521      if (waitingRoll(currentEpochAndState)) {
522        if (writer.getLength() > fileLengthAtLastSync) {
523          // issue a sync
524          sync(writer);
525        } else {
526          if (unackedAppends.isEmpty()) {
527            readyForRolling = true;
528            readyForRollingCond.signalAll();
529          }
530        }
531        return;
532      }
533    } finally {
534      consumeLock.unlock();
535    }
536    long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1;
537    for (long cursorBound = waitingConsumePayloads.getCursor(); nextCursor <= cursorBound;
538      nextCursor++) {
539      if (!waitingConsumePayloads.isPublished(nextCursor)) {
540        break;
541      }
542      RingBufferTruck truck = waitingConsumePayloads.get(nextCursor);
543      switch (truck.type()) {
544        case APPEND:
545          toWriteAppends.addLast(truck.unloadAppend());
546          break;
547        case SYNC:
548          syncFutures.add(truck.unloadSync());
549          break;
550        default:
551          LOG.warn("RingBufferTruck with unexpected type: " + truck.type());
552          break;
553      }
554      waitingConsumePayloadsGatingSequence.set(nextCursor);
555    }
556    appendAndSync();
557    if (hasConsumerTask.get()) {
558      return;
559    }
560    if (toWriteAppends.isEmpty()) {
561      if (waitingConsumePayloadsGatingSequence.get() == waitingConsumePayloads.getCursor()) {
562        consumerScheduled.set(false);
563        // recheck here since in append and sync we do not hold the consumeLock. Thing may
564        // happen like
565        // 1. we check cursor, no new entry
566        // 2. someone publishes a new entry to ringbuffer and the consumerScheduled is true and
567        // give up scheduling the consumer task.
568        // 3. we set consumerScheduled to false and also give up scheduling consumer task.
569        if (waitingConsumePayloadsGatingSequence.get() == waitingConsumePayloads.getCursor()) {
570          // we will give up consuming so if there are some unsynced data we need to issue a sync.
571          if (writer.getLength() > fileLengthAtLastSync && !syncFutures.isEmpty() &&
572            syncFutures.last().getTxid() > highestProcessedAppendTxidAtLastSync) {
573            // no new data in the ringbuffer and we have at least one sync request
574            sync(writer);
575          }
576          return;
577        } else {
578          // maybe someone has grabbed this before us
579          if (!consumerScheduled.compareAndSet(false, true)) {
580            return;
581          }
582        }
583      }
584    }
585    // reschedule if we still have something to write.
586    consumeExecutor.execute(consumer);
587  }
588
589  private boolean shouldScheduleConsumer() {
590    int currentEpochAndState = epochAndState;
591    if (writerBroken(currentEpochAndState) || waitingRoll(currentEpochAndState)) {
592      return false;
593    }
594    return consumerScheduled.compareAndSet(false, true);
595  }
596
597  @Override
598  protected long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore)
599      throws IOException {
600    long txid = stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore,
601      waitingConsumePayloads);
602    if (shouldScheduleConsumer()) {
603      consumeExecutor.execute(consumer);
604    }
605    return txid;
606  }
607
608  @Override
609  public void sync() throws IOException {
610    sync(useHsync);
611  }
612
613  @Override
614  public void sync(long txid) throws IOException {
615    sync(txid, useHsync);
616  }
617
618  @Override
619  public void sync(boolean forceSync) throws IOException {
620    try (TraceScope scope = TraceUtil.createTrace("AsyncFSWAL.sync")) {
621      long txid = waitingConsumePayloads.next();
622      SyncFuture future;
623      try {
624        future = getSyncFuture(txid, forceSync);
625        RingBufferTruck truck = waitingConsumePayloads.get(txid);
626        truck.load(future);
627      } finally {
628        waitingConsumePayloads.publish(txid);
629      }
630      if (shouldScheduleConsumer()) {
631        consumeExecutor.execute(consumer);
632      }
633      blockOnSync(future);
634    }
635  }
636
637  @Override
638  public void sync(long txid, boolean forceSync) throws IOException {
639    if (highestSyncedTxid.get() >= txid) {
640      return;
641    }
642    try (TraceScope scope = TraceUtil.createTrace("AsyncFSWAL.sync")) {
643      // here we do not use ring buffer sequence as txid
644      long sequence = waitingConsumePayloads.next();
645      SyncFuture future;
646      try {
647        future = getSyncFuture(txid, forceSync);
648        RingBufferTruck truck = waitingConsumePayloads.get(sequence);
649        truck.load(future);
650      } finally {
651        waitingConsumePayloads.publish(sequence);
652      }
653      if (shouldScheduleConsumer()) {
654        consumeExecutor.execute(consumer);
655      }
656      blockOnSync(future);
657    }
658  }
659
660  @Override
661  protected AsyncWriter createWriterInstance(Path path) throws IOException {
662    return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, false,
663        this.blocksize, eventLoopGroup, channelClass);
664  }
665
666  private void waitForSafePoint() {
667    consumeLock.lock();
668    try {
669      int currentEpochAndState = epochAndState;
670      if (writerBroken(currentEpochAndState) || this.writer == null) {
671        return;
672      }
673      consumerScheduled.set(true);
674      epochAndState = currentEpochAndState | 1;
675      readyForRolling = false;
676      consumeExecutor.execute(consumer);
677      while (!readyForRolling) {
678        readyForRollingCond.awaitUninterruptibly();
679      }
680    } finally {
681      consumeLock.unlock();
682    }
683  }
684
685  protected final long closeWriter(AsyncWriter writer) {
686    if (writer != null) {
687      long fileLength = writer.getLength();
688      closeExecutor.execute(() -> {
689        try {
690          writer.close();
691        } catch (IOException e) {
692          LOG.warn("close old writer failed", e);
693        }
694      });
695      return fileLength;
696    } else {
697      return 0L;
698    }
699  }
700
701  @Override
702  protected void doReplaceWriter(Path oldPath, Path newPath, AsyncWriter nextWriter)
703      throws IOException {
704    Preconditions.checkNotNull(nextWriter);
705    waitForSafePoint();
706    long oldFileLen = closeWriter(this.writer);
707    logRollAndSetupWalProps(oldPath, newPath, oldFileLen);
708    this.writer = nextWriter;
709    if (nextWriter instanceof AsyncProtobufLogWriter) {
710      this.fsOut = ((AsyncProtobufLogWriter) nextWriter).getOutput();
711    }
712    this.fileLengthAtLastSync = nextWriter.getLength();
713    this.highestProcessedAppendTxidAtLastSync = 0L;
714    consumeLock.lock();
715    try {
716      consumerScheduled.set(true);
717      int currentEpoch = epochAndState >>> 2;
718      int nextEpoch = currentEpoch == MAX_EPOCH ? 0 : currentEpoch + 1;
719      // set a new epoch and also clear waitingRoll and writerBroken
720      this.epochAndState = nextEpoch << 2;
721      // Reset rollRequested status
722      rollRequested.set(false);
723      consumeExecutor.execute(consumer);
724    } finally {
725      consumeLock.unlock();
726    }
727  }
728
729  @Override
730  protected void doShutdown() throws IOException {
731    waitForSafePoint();
732    closeWriter(this.writer);
733    this.writer = null;
734    closeExecutor.shutdown();
735    try {
736      if (!closeExecutor.awaitTermination(waitOnShutdownInSeconds, TimeUnit.SECONDS)) {
737        LOG.error("We have waited " + waitOnShutdownInSeconds + " seconds but" +
738          " the close of async writer doesn't complete." +
739          "Please check the status of underlying filesystem" +
740          " or increase the wait time by the config \"" + ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS +
741          "\"");
742      }
743    } catch (InterruptedException e) {
744      LOG.error("The wait for close of async writer is interrupted");
745      Thread.currentThread().interrupt();
746    }
747    IOException error = new IOException("WAL has been closed");
748    long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1;
749    // drain all the pending sync requests
750    for (long cursorBound = waitingConsumePayloads.getCursor(); nextCursor <= cursorBound;
751      nextCursor++) {
752      if (!waitingConsumePayloads.isPublished(nextCursor)) {
753        break;
754      }
755      RingBufferTruck truck = waitingConsumePayloads.get(nextCursor);
756      switch (truck.type()) {
757        case SYNC:
758          syncFutures.add(truck.unloadSync());
759          break;
760        default:
761          break;
762      }
763    }
764    // and fail them
765    syncFutures.forEach(f -> f.done(f.getTxid(), error));
766    if (!(consumeExecutor instanceof EventLoop)) {
767      consumeExecutor.shutdown();
768    }
769  }
770
771  @Override
772  protected void doAppend(AsyncWriter writer, FSWALEntry entry) {
773    writer.append(entry);
774  }
775
776  @Override
777  DatanodeInfo[] getPipeline() {
778    AsyncFSOutput output = this.fsOut;
779    return output != null ? output.getPipeline() : new DatanodeInfo[0];
780  }
781
782  @Override
783  int getLogReplication() {
784    return getPipeline().length;
785  }
786
787  @Override
788  protected boolean doCheckLogLowReplication() {
789    // not like FSHLog, AsyncFSOutput will fail immediately if there are errors writing to DNs, so
790    // typically there is no 'low replication' state, only a 'broken' state.
791    AsyncFSOutput output = this.fsOut;
792    return output != null && output.isBroken();
793  }
794}