001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.ERROR;
021import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.SIZE;
022import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
023
024import com.lmax.disruptor.RingBuffer;
025import com.lmax.disruptor.Sequence;
026import com.lmax.disruptor.Sequencer;
027import java.io.IOException;
028import java.lang.reflect.Field;
029import java.util.ArrayDeque;
030import java.util.Comparator;
031import java.util.Deque;
032import java.util.Iterator;
033import java.util.List;
034import java.util.Queue;
035import java.util.SortedSet;
036import java.util.TreeSet;
037import java.util.concurrent.ExecutorService;
038import java.util.concurrent.Executors;
039import java.util.concurrent.LinkedBlockingQueue;
040import java.util.concurrent.ThreadPoolExecutor;
041import java.util.concurrent.TimeUnit;
042import java.util.concurrent.atomic.AtomicBoolean;
043import java.util.concurrent.locks.Condition;
044import java.util.concurrent.locks.Lock;
045import java.util.concurrent.locks.ReentrantLock;
046import java.util.function.Supplier;
047import org.apache.hadoop.conf.Configuration;
048import org.apache.hadoop.fs.FileSystem;
049import org.apache.hadoop.fs.Path;
050import org.apache.hadoop.hbase.HBaseInterfaceAudience;
051import org.apache.hadoop.hbase.client.RegionInfo;
052import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
053import org.apache.hadoop.hbase.trace.TraceUtil;
054import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
055import org.apache.hadoop.hbase.wal.WALEdit;
056import org.apache.hadoop.hbase.wal.WALKeyImpl;
057import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter;
058import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
059import org.apache.htrace.core.TraceScope;
060import org.apache.yetus.audience.InterfaceAudience;
061import org.slf4j.Logger;
062import org.slf4j.LoggerFactory;
063
064import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
065import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
066import org.apache.hbase.thirdparty.io.netty.channel.Channel;
067import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
068import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
069import org.apache.hbase.thirdparty.io.netty.util.concurrent.SingleThreadEventExecutor;
070
071/**
072 * An asynchronous implementation of FSWAL.
073 * <p>
074 * Here 'waitingConsumePayloads' acts as the RingBuffer in FSHLog.
075 * <p>
076 * For append, we process it as follow:
077 * <ol>
078 * <li>In the caller thread(typically, in the rpc handler thread):
079 * <ol>
080 * <li>Insert the entry into 'waitingConsumePayloads'. Use ringbuffer sequence as txid.</li>
081 * <li>Schedule the consumer task if needed. See {@link #shouldScheduleConsumer()} for more details.
082 * </li>
083 * </ol>
084 * </li>
085 * <li>In the consumer task(executed in a single threaded thread pool)
086 * <ol>
087 * <li>Poll the entry from {@link #waitingConsumePayloads} and insert it into
088 * {@link #toWriteAppends}</li>
089 * <li>Poll the entry from {@link #toWriteAppends}, append it to the AsyncWriter, and insert it into
090 * {@link #unackedAppends}</li>
091 * <li>If the buffered size reaches {@link #batchSize}, or there is a sync request, then we call
092 * sync on the AsyncWriter.</li>
093 * <li>In the callback methods:
094 * <ul>
095 * <li>If succeeded, poll the entry from {@link #unackedAppends} and drop it.</li>
096 * <li>If failed, add all the entries in {@link #unackedAppends} back to {@link #toWriteAppends} and
097 * wait for writing them again.</li>
098 * </ul>
099 * </li>
100 * </ol>
101 * </li>
102 * </ol>
103 * For sync, the processing stages are almost same. And different from FSHLog, we will open a new
104 * writer and rewrite unacked entries to the new writer and sync again if we hit a sync error.
105 * <p>
106 * Here we only describe the logic of doReplaceWriter. The main logic of rollWriter is same with
107 * FSHLog.<br>
108 * For a normal roll request(for example, we have reached the log roll size):
109 * <ol>
110 * <li>In the log roller thread, we will set {@link #waitingRoll} to true and
111 * {@link #readyForRolling} to false, and then wait on {@link #readyForRolling}(see
112 * {@link #waitForSafePoint()}).</li>
113 * <li>In the consumer thread, we will stop polling entries from {@link #waitingConsumePayloads} if
114 * {@link #waitingRoll} is true, and also stop writing the entries in {@link #toWriteAppends} out.
115 * </li>
116 * <li>If there are unflush data in the writer, sync them.</li>
117 * <li>When all out-going sync request is finished, i.e, the {@link #unackedAppends} is empty,
118 * signal the {@link #readyForRollingCond}.</li>
119 * <li>Back to the log roller thread, now we can confirm that there are no out-going entries, i.e.,
120 * we reach a safe point. So it is safe to replace old writer with new writer now.</li>
121 * <li>Set {@link #writerBroken} and {@link #waitingRoll} to false.</li>
122 * <li>Schedule the consumer task.</li>
123 * <li>Schedule a background task to close the old writer.</li>
124 * </ol>
125 * For a broken writer roll request, the only difference is that we can bypass the wait for safe
126 * point stage.
127 */
128@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
129public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
130
131  private static final Logger LOG = LoggerFactory.getLogger(AsyncFSWAL.class);
132
133  private static final Comparator<SyncFuture> SEQ_COMPARATOR = (o1, o2) -> {
134    int c = Long.compare(o1.getTxid(), o2.getTxid());
135    return c != 0 ? c : Integer.compare(System.identityHashCode(o1), System.identityHashCode(o2));
136  };
137
138  public static final String WAL_BATCH_SIZE = "hbase.wal.batch.size";
139  public static final long DEFAULT_WAL_BATCH_SIZE = 64L * 1024;
140
141  public static final String ASYNC_WAL_USE_SHARED_EVENT_LOOP =
142    "hbase.wal.async.use-shared-event-loop";
143  public static final boolean DEFAULT_ASYNC_WAL_USE_SHARED_EVENT_LOOP = false;
144
145  public static final String ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS =
146    "hbase.wal.async.wait.on.shutdown.seconds";
147  public static final int DEFAULT_ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS = 5;
148
149  private final EventLoopGroup eventLoopGroup;
150
151  private final ExecutorService consumeExecutor;
152
153  private final Class<? extends Channel> channelClass;
154
155  private final Lock consumeLock = new ReentrantLock();
156
157  private final Runnable consumer = this::consume;
158
159  // check if there is already a consumer task in the event loop's task queue
160  private final Supplier<Boolean> hasConsumerTask;
161
162  private static final int MAX_EPOCH = 0x3FFFFFFF;
163  // the lowest bit is waitingRoll, which means new writer is created and we are waiting for old
164  // writer to be closed.
165  // the second lowest bit is writerBorken which means the current writer is broken and rollWriter
166  // is needed.
167  // all other bits are the epoch number of the current writer, this is used to detect whether the
168  // writer is still the one when you issue the sync.
169  // notice that, modification to this field is only allowed under the protection of consumeLock.
170  private volatile int epochAndState;
171
172  private boolean readyForRolling;
173
174  private final Condition readyForRollingCond = consumeLock.newCondition();
175
176  private final RingBuffer<RingBufferTruck> waitingConsumePayloads;
177
178  private final Sequence waitingConsumePayloadsGatingSequence;
179
180  private final AtomicBoolean consumerScheduled = new AtomicBoolean(false);
181
182  private final long batchSize;
183
184  private final ExecutorService closeExecutor = Executors.newCachedThreadPool(
185    new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build());
186
187  private volatile AsyncFSOutput fsOut;
188
189  private final Deque<FSWALEntry> toWriteAppends = new ArrayDeque<>();
190
191  private final Deque<FSWALEntry> unackedAppends = new ArrayDeque<>();
192
193  private final SortedSet<SyncFuture> syncFutures = new TreeSet<>(SEQ_COMPARATOR);
194
195  // the highest txid of WAL entries being processed
196  private long highestProcessedAppendTxid;
197
198  // file length when we issue last sync request on the writer
199  private long fileLengthAtLastSync;
200
201  private long highestProcessedAppendTxidAtLastSync;
202
203  private final int waitOnShutdownInSeconds;
204
205  public AsyncFSWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir,
206      Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists,
207      String prefix, String suffix, EventLoopGroup eventLoopGroup,
208      Class<? extends Channel> channelClass) throws FailedLogCloseException, IOException {
209    super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix);
210    this.eventLoopGroup = eventLoopGroup;
211    this.channelClass = channelClass;
212    Supplier<Boolean> hasConsumerTask;
213    if (conf.getBoolean(ASYNC_WAL_USE_SHARED_EVENT_LOOP, DEFAULT_ASYNC_WAL_USE_SHARED_EVENT_LOOP)) {
214      this.consumeExecutor = eventLoopGroup.next();
215      if (consumeExecutor instanceof SingleThreadEventExecutor) {
216        try {
217          Field field = SingleThreadEventExecutor.class.getDeclaredField("taskQueue");
218          field.setAccessible(true);
219          Queue<?> queue = (Queue<?>) field.get(consumeExecutor);
220          hasConsumerTask = () -> queue.peek() == consumer;
221        } catch (Exception e) {
222          LOG.warn("Can not get task queue of " + consumeExecutor +
223            ", this is not necessary, just give up", e);
224          hasConsumerTask = () -> false;
225        }
226      } else {
227        hasConsumerTask = () -> false;
228      }
229    } else {
230      ThreadPoolExecutor threadPool =
231        new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
232            new ThreadFactoryBuilder().setNameFormat("AsyncFSWAL-%d").setDaemon(true).build());
233      hasConsumerTask = () -> threadPool.getQueue().peek() == consumer;
234      this.consumeExecutor = threadPool;
235    }
236
237    this.hasConsumerTask = hasConsumerTask;
238    int preallocatedEventCount =
239      conf.getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16);
240    waitingConsumePayloads =
241      RingBuffer.createMultiProducer(RingBufferTruck::new, preallocatedEventCount);
242    waitingConsumePayloadsGatingSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
243    waitingConsumePayloads.addGatingSequences(waitingConsumePayloadsGatingSequence);
244
245    // inrease the ringbuffer sequence so our txid is start from 1
246    waitingConsumePayloads.publish(waitingConsumePayloads.next());
247    waitingConsumePayloadsGatingSequence.set(waitingConsumePayloads.getCursor());
248
249    batchSize = conf.getLong(WAL_BATCH_SIZE, DEFAULT_WAL_BATCH_SIZE);
250    waitOnShutdownInSeconds = conf.getInt(ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS,
251      DEFAULT_ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS);
252  }
253
254  private static boolean waitingRoll(int epochAndState) {
255    return (epochAndState & 1) != 0;
256  }
257
258  private static boolean writerBroken(int epochAndState) {
259    return ((epochAndState >>> 1) & 1) != 0;
260  }
261
262  private static int epoch(int epochAndState) {
263    return epochAndState >>> 2;
264  }
265
266  // return whether we have successfully set readyForRolling to true.
267  private boolean trySetReadyForRolling() {
268    // Check without holding lock first. Usually we will just return here.
269    // waitingRoll is volatile and unacedEntries is only accessed inside event loop so it is safe to
270    // check them outside the consumeLock.
271    if (!waitingRoll(epochAndState) || !unackedAppends.isEmpty()) {
272      return false;
273    }
274    consumeLock.lock();
275    try {
276      // 1. a roll is requested
277      // 2. all out-going entries have been acked(we have confirmed above).
278      if (waitingRoll(epochAndState)) {
279        readyForRolling = true;
280        readyForRollingCond.signalAll();
281        return true;
282      } else {
283        return false;
284      }
285    } finally {
286      consumeLock.unlock();
287    }
288  }
289
290  private void syncFailed(long epochWhenSync, Throwable error) {
291    LOG.warn("sync failed", error);
292    boolean shouldRequestLogRoll = true;
293    consumeLock.lock();
294    try {
295      int currentEpochAndState = epochAndState;
296      if (epoch(currentEpochAndState) != epochWhenSync || writerBroken(currentEpochAndState)) {
297        // this is not the previous writer which means we have already rolled the writer.
298        // or this is still the current writer, but we have already marked it as broken and request
299        // a roll.
300        return;
301      }
302      this.epochAndState = currentEpochAndState | 0b10;
303      if (waitingRoll(currentEpochAndState)) {
304        readyForRolling = true;
305        readyForRollingCond.signalAll();
306        // this means we have already in the middle of a rollWriter so just tell the roller thread
307        // that you can continue without requesting an extra log roll.
308        shouldRequestLogRoll = false;
309      }
310    } finally {
311      consumeLock.unlock();
312    }
313    for (Iterator<FSWALEntry> iter = unackedAppends.descendingIterator(); iter.hasNext();) {
314      toWriteAppends.addFirst(iter.next());
315    }
316    highestUnsyncedTxid = highestSyncedTxid.get();
317    if (shouldRequestLogRoll) {
318      // request a roll.
319      requestLogRoll(ERROR);
320    }
321  }
322
323  private void syncCompleted(AsyncWriter writer, long processedTxid, long startTimeNs) {
324    highestSyncedTxid.set(processedTxid);
325    for (Iterator<FSWALEntry> iter = unackedAppends.iterator(); iter.hasNext();) {
326      if (iter.next().getTxid() <= processedTxid) {
327        iter.remove();
328      } else {
329        break;
330      }
331    }
332    postSync(System.nanoTime() - startTimeNs, finishSync(true));
333    if (trySetReadyForRolling()) {
334      // we have just finished a roll, then do not need to check for log rolling, the writer will be
335      // closed soon.
336      return;
337    }
338    // If we haven't already requested a roll, check if we have exceeded logrollsize
339    if (!isLogRollRequested() && writer.getLength() > logrollsize) {
340      if (LOG.isDebugEnabled()) {
341        LOG.debug("Requesting log roll because of file size threshold; length=" +
342          writer.getLength() + ", logrollsize=" + logrollsize);
343      }
344      requestLogRoll(SIZE);
345    }
346  }
347
348  private void sync(AsyncWriter writer) {
349    fileLengthAtLastSync = writer.getLength();
350    long currentHighestProcessedAppendTxid = highestProcessedAppendTxid;
351    highestProcessedAppendTxidAtLastSync = currentHighestProcessedAppendTxid;
352    final long startTimeNs = System.nanoTime();
353    final long epoch = (long) epochAndState >>> 2L;
354    addListener(writer.sync(), (result, error) -> {
355      if (error != null) {
356        syncFailed(epoch, error);
357      } else {
358        syncCompleted(writer, currentHighestProcessedAppendTxid, startTimeNs);
359      }
360    }, consumeExecutor);
361  }
362
363  private void addTimeAnnotation(SyncFuture future, String annotation) {
364    TraceUtil.addTimelineAnnotation(annotation);
365    // TODO handle htrace API change, see HBASE-18895
366    // future.setSpan(scope.getSpan());
367  }
368
369  private int finishSyncLowerThanTxid(long txid, boolean addSyncTrace) {
370    int finished = 0;
371    for (Iterator<SyncFuture> iter = syncFutures.iterator(); iter.hasNext();) {
372      SyncFuture sync = iter.next();
373      if (sync.getTxid() <= txid) {
374        sync.done(txid, null);
375        iter.remove();
376        finished++;
377        if (addSyncTrace) {
378          addTimeAnnotation(sync, "writer synced");
379        }
380      } else {
381        break;
382      }
383    }
384    return finished;
385  }
386
387  // try advancing the highestSyncedTxid as much as possible
388  private int finishSync(boolean addSyncTrace) {
389    if (unackedAppends.isEmpty()) {
390      // All outstanding appends have been acked.
391      if (toWriteAppends.isEmpty()) {
392        // Also no appends that wait to be written out, then just finished all pending syncs.
393        long maxSyncTxid = highestSyncedTxid.get();
394        for (SyncFuture sync : syncFutures) {
395          maxSyncTxid = Math.max(maxSyncTxid, sync.getTxid());
396          sync.done(maxSyncTxid, null);
397          if (addSyncTrace) {
398            addTimeAnnotation(sync, "writer synced");
399          }
400        }
401        highestSyncedTxid.set(maxSyncTxid);
402        int finished = syncFutures.size();
403        syncFutures.clear();
404        return finished;
405      } else {
406        // There is no append between highestProcessedAppendTxid and lowestUnprocessedAppendTxid, so
407        // if highestSyncedTxid >= highestProcessedAppendTxid, then all syncs whose txid are between
408        // highestProcessedAppendTxid and lowestUnprocessedAppendTxid can be finished.
409        long lowestUnprocessedAppendTxid = toWriteAppends.peek().getTxid();
410        assert lowestUnprocessedAppendTxid > highestProcessedAppendTxid;
411        long doneTxid = lowestUnprocessedAppendTxid - 1;
412        highestSyncedTxid.set(doneTxid);
413        return finishSyncLowerThanTxid(doneTxid, addSyncTrace);
414      }
415    } else {
416      // There are still unacked appends. So let's move the highestSyncedTxid to the txid of the
417      // first unacked append minus 1.
418      long lowestUnackedAppendTxid = unackedAppends.peek().getTxid();
419      long doneTxid = Math.max(lowestUnackedAppendTxid - 1, highestSyncedTxid.get());
420      highestSyncedTxid.set(doneTxid);
421      return finishSyncLowerThanTxid(doneTxid, addSyncTrace);
422    }
423  }
424
425  private void appendAndSync() {
426    final AsyncWriter writer = this.writer;
427    // maybe a sync request is not queued when we issue a sync, so check here to see if we could
428    // finish some.
429    finishSync(false);
430    long newHighestProcessedAppendTxid = -1L;
431    for (Iterator<FSWALEntry> iter = toWriteAppends.iterator(); iter.hasNext();) {
432      FSWALEntry entry = iter.next();
433      boolean appended;
434      try {
435        appended = append(writer, entry);
436      } catch (IOException e) {
437        throw new AssertionError("should not happen", e);
438      }
439      newHighestProcessedAppendTxid = entry.getTxid();
440      iter.remove();
441      if (appended) {
442        // This is possible, when we fail to sync, we will add the unackedAppends back to
443        // toWriteAppends, so here we may get an entry which is already in the unackedAppends.
444        if (unackedAppends.isEmpty() || unackedAppends.peekLast().getTxid() < entry.getTxid()) {
445          unackedAppends.addLast(entry);
446        }
447        if (writer.getLength() - fileLengthAtLastSync >= batchSize) {
448          break;
449        }
450      }
451    }
452    // if we have a newer transaction id, update it.
453    // otherwise, use the previous transaction id.
454    if (newHighestProcessedAppendTxid > 0) {
455      highestProcessedAppendTxid = newHighestProcessedAppendTxid;
456    } else {
457      newHighestProcessedAppendTxid = highestProcessedAppendTxid;
458    }
459
460    if (writer.getLength() - fileLengthAtLastSync >= batchSize) {
461      // sync because buffer size limit.
462      sync(writer);
463      return;
464    }
465    if (writer.getLength() == fileLengthAtLastSync) {
466      // we haven't written anything out, just advance the highestSyncedSequence since we may only
467      // stamped some region sequence id.
468      if (unackedAppends.isEmpty()) {
469        highestSyncedTxid.set(highestProcessedAppendTxid);
470        finishSync(false);
471        trySetReadyForRolling();
472      }
473      return;
474    }
475    // reach here means that we have some unsynced data but haven't reached the batch size yet
476    // but we will not issue a sync directly here even if there are sync requests because we may
477    // have some new data in the ringbuffer, so let's just return here and delay the decision of
478    // whether to issue a sync in the caller method.
479  }
480
481  private void drainNonMarkerEditsAndFailSyncs() {
482    if (toWriteAppends.isEmpty()) {
483      return;
484    }
485    boolean hasNonMarkerEdits = false;
486    Iterator<FSWALEntry> iter = toWriteAppends.descendingIterator();
487    while (iter.hasNext()) {
488      FSWALEntry entry = iter.next();
489      if (!entry.getEdit().isMetaEdit()) {
490        hasNonMarkerEdits = true;
491        break;
492      }
493    }
494    if (hasNonMarkerEdits) {
495      for (;;) {
496        iter.remove();
497        if (!iter.hasNext()) {
498          break;
499        }
500        iter.next();
501      }
502      unackedAppends.clear();
503      // fail the sync futures which are under the txid of the first remaining edit, if none, fail
504      // all the sync futures.
505      long txid = toWriteAppends.isEmpty() ? Long.MAX_VALUE : toWriteAppends.peek().getTxid();
506      IOException error = new IOException("WAL is closing, only marker edit is allowed");
507      for (Iterator<SyncFuture> syncIter = syncFutures.iterator(); syncIter.hasNext();) {
508        SyncFuture future = syncIter.next();
509        if (future.getTxid() < txid) {
510          future.done(future.getTxid(), error);
511          syncIter.remove();
512        } else {
513          break;
514        }
515      }
516    }
517  }
518
519  private void consume() {
520    consumeLock.lock();
521    try {
522      int currentEpochAndState = epochAndState;
523      if (writerBroken(currentEpochAndState)) {
524        return;
525      }
526      if (waitingRoll(currentEpochAndState)) {
527        if (writer.getLength() > fileLengthAtLastSync) {
528          // issue a sync
529          sync(writer);
530        } else {
531          if (unackedAppends.isEmpty()) {
532            readyForRolling = true;
533            readyForRollingCond.signalAll();
534          }
535        }
536        return;
537      }
538    } finally {
539      consumeLock.unlock();
540    }
541    long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1;
542    for (long cursorBound = waitingConsumePayloads.getCursor(); nextCursor <= cursorBound;
543      nextCursor++) {
544      if (!waitingConsumePayloads.isPublished(nextCursor)) {
545        break;
546      }
547      RingBufferTruck truck = waitingConsumePayloads.get(nextCursor);
548      switch (truck.type()) {
549        case APPEND:
550          toWriteAppends.addLast(truck.unloadAppend());
551          break;
552        case SYNC:
553          syncFutures.add(truck.unloadSync());
554          break;
555        default:
556          LOG.warn("RingBufferTruck with unexpected type: " + truck.type());
557          break;
558      }
559      waitingConsumePayloadsGatingSequence.set(nextCursor);
560    }
561    if (markerEditOnly()) {
562      drainNonMarkerEditsAndFailSyncs();
563    }
564    appendAndSync();
565    if (hasConsumerTask.get()) {
566      return;
567    }
568    if (toWriteAppends.isEmpty()) {
569      if (waitingConsumePayloadsGatingSequence.get() == waitingConsumePayloads.getCursor()) {
570        consumerScheduled.set(false);
571        // recheck here since in append and sync we do not hold the consumeLock. Thing may
572        // happen like
573        // 1. we check cursor, no new entry
574        // 2. someone publishes a new entry to ringbuffer and the consumerScheduled is true and
575        // give up scheduling the consumer task.
576        // 3. we set consumerScheduled to false and also give up scheduling consumer task.
577        if (waitingConsumePayloadsGatingSequence.get() == waitingConsumePayloads.getCursor()) {
578          // we will give up consuming so if there are some unsynced data we need to issue a sync.
579          if (writer.getLength() > fileLengthAtLastSync && !syncFutures.isEmpty() &&
580            syncFutures.last().getTxid() > highestProcessedAppendTxidAtLastSync) {
581            // no new data in the ringbuffer and we have at least one sync request
582            sync(writer);
583          }
584          return;
585        } else {
586          // maybe someone has grabbed this before us
587          if (!consumerScheduled.compareAndSet(false, true)) {
588            return;
589          }
590        }
591      }
592    }
593    // reschedule if we still have something to write.
594    consumeExecutor.execute(consumer);
595  }
596
597  private boolean shouldScheduleConsumer() {
598    int currentEpochAndState = epochAndState;
599    if (writerBroken(currentEpochAndState) || waitingRoll(currentEpochAndState)) {
600      return false;
601    }
602    return consumerScheduled.compareAndSet(false, true);
603  }
604
605  // This is used by sync replication, where we are going to close the wal soon after we reopen all
606  // the regions. Will be overridden by sub classes.
607  protected boolean markerEditOnly() {
608    return false;
609  }
610
611  @Override
612  public long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore)
613      throws IOException {
614    if (markerEditOnly() && !edits.isMetaEdit()) {
615      throw new IOException("WAL is closing, only marker edit is allowed");
616    }
617    long txid =
618      stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, waitingConsumePayloads);
619    if (shouldScheduleConsumer()) {
620      consumeExecutor.execute(consumer);
621    }
622    return txid;
623  }
624
625  @Override
626  public void sync() throws IOException {
627    try (TraceScope scope = TraceUtil.createTrace("AsyncFSWAL.sync")) {
628      long txid = waitingConsumePayloads.next();
629      SyncFuture future;
630      try {
631        future = getSyncFuture(txid);
632        RingBufferTruck truck = waitingConsumePayloads.get(txid);
633        truck.load(future);
634      } finally {
635        waitingConsumePayloads.publish(txid);
636      }
637      if (shouldScheduleConsumer()) {
638        consumeExecutor.execute(consumer);
639      }
640      blockOnSync(future);
641    }
642  }
643
644  @Override
645  public void sync(long txid) throws IOException {
646    if (highestSyncedTxid.get() >= txid) {
647      return;
648    }
649    try (TraceScope scope = TraceUtil.createTrace("AsyncFSWAL.sync")) {
650      // here we do not use ring buffer sequence as txid
651      long sequence = waitingConsumePayloads.next();
652      SyncFuture future;
653      try {
654        future = getSyncFuture(txid);
655        RingBufferTruck truck = waitingConsumePayloads.get(sequence);
656        truck.load(future);
657      } finally {
658        waitingConsumePayloads.publish(sequence);
659      }
660      if (shouldScheduleConsumer()) {
661        consumeExecutor.execute(consumer);
662      }
663      blockOnSync(future);
664    }
665  }
666
667  protected final AsyncWriter createAsyncWriter(FileSystem fs, Path path) throws IOException {
668    return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, false, this.blocksize,
669      eventLoopGroup, channelClass);
670  }
671
672  @Override
673  protected AsyncWriter createWriterInstance(Path path) throws IOException {
674    return createAsyncWriter(fs, path);
675  }
676
677  private void waitForSafePoint() {
678    consumeLock.lock();
679    try {
680      int currentEpochAndState = epochAndState;
681      if (writerBroken(currentEpochAndState) || this.writer == null) {
682        return;
683      }
684      consumerScheduled.set(true);
685      epochAndState = currentEpochAndState | 1;
686      readyForRolling = false;
687      consumeExecutor.execute(consumer);
688      while (!readyForRolling) {
689        readyForRollingCond.awaitUninterruptibly();
690      }
691    } finally {
692      consumeLock.unlock();
693    }
694  }
695
696  protected final long closeWriter(AsyncWriter writer) {
697    if (writer != null) {
698      long fileLength = writer.getLength();
699      closeExecutor.execute(() -> {
700        try {
701          writer.close();
702        } catch (IOException e) {
703          LOG.warn("close old writer failed", e);
704        }
705      });
706      return fileLength;
707    } else {
708      return 0L;
709    }
710  }
711
712  @Override
713  protected void doReplaceWriter(Path oldPath, Path newPath, AsyncWriter nextWriter)
714      throws IOException {
715    Preconditions.checkNotNull(nextWriter);
716    waitForSafePoint();
717    long oldFileLen = closeWriter(this.writer);
718    logRollAndSetupWalProps(oldPath, newPath, oldFileLen);
719    this.writer = nextWriter;
720    if (nextWriter instanceof AsyncProtobufLogWriter) {
721      this.fsOut = ((AsyncProtobufLogWriter) nextWriter).getOutput();
722    }
723    this.fileLengthAtLastSync = nextWriter.getLength();
724    this.highestProcessedAppendTxidAtLastSync = 0L;
725    consumeLock.lock();
726    try {
727      consumerScheduled.set(true);
728      int currentEpoch = epochAndState >>> 2;
729      int nextEpoch = currentEpoch == MAX_EPOCH ? 0 : currentEpoch + 1;
730      // set a new epoch and also clear waitingRoll and writerBroken
731      this.epochAndState = nextEpoch << 2;
732      // Reset rollRequested status
733      rollRequested.set(false);
734      consumeExecutor.execute(consumer);
735    } finally {
736      consumeLock.unlock();
737    }
738  }
739
740  @Override
741  protected void doShutdown() throws IOException {
742    waitForSafePoint();
743    closeWriter(this.writer);
744    this.writer = null;
745    closeExecutor.shutdown();
746    try {
747      if (!closeExecutor.awaitTermination(waitOnShutdownInSeconds, TimeUnit.SECONDS)) {
748        LOG.error("We have waited " + waitOnShutdownInSeconds + " seconds but" +
749          " the close of async writer doesn't complete." +
750          "Please check the status of underlying filesystem" +
751          " or increase the wait time by the config \"" + ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS +
752          "\"");
753      }
754    } catch (InterruptedException e) {
755      LOG.error("The wait for close of async writer is interrupted");
756      Thread.currentThread().interrupt();
757    }
758    IOException error = new IOException("WAL has been closed");
759    long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1;
760    // drain all the pending sync requests
761    for (long cursorBound = waitingConsumePayloads.getCursor(); nextCursor <= cursorBound;
762      nextCursor++) {
763      if (!waitingConsumePayloads.isPublished(nextCursor)) {
764        break;
765      }
766      RingBufferTruck truck = waitingConsumePayloads.get(nextCursor);
767      switch (truck.type()) {
768        case SYNC:
769          syncFutures.add(truck.unloadSync());
770          break;
771        default:
772          break;
773      }
774    }
775    // and fail them
776    syncFutures.forEach(f -> f.done(f.getTxid(), error));
777    if (!(consumeExecutor instanceof EventLoop)) {
778      consumeExecutor.shutdown();
779    }
780  }
781
782  @Override
783  protected void doAppend(AsyncWriter writer, FSWALEntry entry) {
784    writer.append(entry);
785  }
786
787  @Override
788  DatanodeInfo[] getPipeline() {
789    AsyncFSOutput output = this.fsOut;
790    return output != null ? output.getPipeline() : new DatanodeInfo[0];
791  }
792
793  @Override
794  int getLogReplication() {
795    return getPipeline().length;
796  }
797
798  @Override
799  protected boolean doCheckLogLowReplication() {
800    // not like FSHLog, AsyncFSOutput will fail immediately if there are errors writing to DNs, so
801    // typically there is no 'low replication' state, only a 'broken' state.
802    AsyncFSOutput output = this.fsOut;
803    return output != null && output.isBroken();
804  }
805}