001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
021
022import com.lmax.disruptor.RingBuffer;
023import com.lmax.disruptor.Sequence;
024import com.lmax.disruptor.Sequencer;
025import java.io.IOException;
026import java.lang.reflect.Field;
027import java.util.ArrayDeque;
028import java.util.Comparator;
029import java.util.Deque;
030import java.util.Iterator;
031import java.util.List;
032import java.util.Queue;
033import java.util.SortedSet;
034import java.util.TreeSet;
035import java.util.concurrent.ExecutorService;
036import java.util.concurrent.Executors;
037import java.util.concurrent.LinkedBlockingQueue;
038import java.util.concurrent.ThreadPoolExecutor;
039import java.util.concurrent.TimeUnit;
040import java.util.concurrent.atomic.AtomicBoolean;
041import java.util.concurrent.locks.Condition;
042import java.util.concurrent.locks.Lock;
043import java.util.concurrent.locks.ReentrantLock;
044import java.util.function.Supplier;
045import org.apache.hadoop.conf.Configuration;
046import org.apache.hadoop.fs.FileSystem;
047import org.apache.hadoop.fs.Path;
048import org.apache.hadoop.hbase.HBaseInterfaceAudience;
049import org.apache.hadoop.hbase.client.RegionInfo;
050import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
051import org.apache.hadoop.hbase.trace.TraceUtil;
052import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
053import org.apache.hadoop.hbase.wal.WALEdit;
054import org.apache.hadoop.hbase.wal.WALKeyImpl;
055import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter;
056import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
057import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
058import org.apache.htrace.core.TraceScope;
059import org.apache.yetus.audience.InterfaceAudience;
060import org.slf4j.Logger;
061import org.slf4j.LoggerFactory;
062
063import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
064import org.apache.hbase.thirdparty.io.netty.channel.Channel;
065import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
066import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
067import org.apache.hbase.thirdparty.io.netty.util.concurrent.SingleThreadEventExecutor;
068
069/**
070 * An asynchronous implementation of FSWAL.
071 * <p>
072 * Here 'waitingConsumePayloads' acts as the RingBuffer in FSHLog.
073 * <p>
074 * For append, we process it as follow:
075 * <ol>
076 * <li>In the caller thread(typically, in the rpc handler thread):
077 * <ol>
078 * <li>Insert the entry into 'waitingConsumePayloads'. Use ringbuffer sequence as txid.</li>
079 * <li>Schedule the consumer task if needed. See {@link #shouldScheduleConsumer()} for more details.
080 * </li>
081 * </ol>
082 * </li>
083 * <li>In the consumer task(executed in a single threaded thread pool)
084 * <ol>
085 * <li>Poll the entry from {@link #waitingConsumePayloads} and insert it into
086 * {@link #toWriteAppends}</li>
087 * <li>Poll the entry from {@link #toWriteAppends}, append it to the AsyncWriter, and insert it into
088 * {@link #unackedAppends}</li>
089 * <li>If the buffered size reaches {@link #batchSize}, or there is a sync request, then we call
090 * sync on the AsyncWriter.</li>
091 * <li>In the callback methods:
092 * <ul>
093 * <li>If succeeded, poll the entry from {@link #unackedAppends} and drop it.</li>
094 * <li>If failed, add all the entries in {@link #unackedAppends} back to {@link #toWriteAppends} and
095 * wait for writing them again.</li>
096 * </ul>
097 * </li>
098 * </ol>
099 * </li>
100 * </ol>
101 * For sync, the processing stages are almost same. And different from FSHLog, we will open a new
102 * writer and rewrite unacked entries to the new writer and sync again if we hit a sync error.
103 * <p>
104 * Here we only describe the logic of doReplaceWriter. The main logic of rollWriter is same with
105 * FSHLog.<br>
106 * For a normal roll request(for example, we have reached the log roll size):
107 * <ol>
108 * <li>In the log roller thread, we will set {@link #waitingRoll} to true and
109 * {@link #readyForRolling} to false, and then wait on {@link #readyForRolling}(see
110 * {@link #waitForSafePoint()}).</li>
111 * <li>In the consumer thread, we will stop polling entries from {@link #waitingConsumePayloads} if
112 * {@link #waitingRoll} is true, and also stop writing the entries in {@link #toWriteAppends} out.
113 * </li>
114 * <li>If there are unflush data in the writer, sync them.</li>
115 * <li>When all out-going sync request is finished, i.e, the {@link #unackedAppends} is empty,
116 * signal the {@link #readyForRollingCond}.</li>
117 * <li>Back to the log roller thread, now we can confirm that there are no out-going entries, i.e.,
118 * we reach a safe point. So it is safe to replace old writer with new writer now.</li>
119 * <li>Set {@link #writerBroken} and {@link #waitingRoll} to false.</li>
120 * <li>Schedule the consumer task.</li>
121 * <li>Schedule a background task to close the old writer.</li>
122 * </ol>
123 * For a broken writer roll request, the only difference is that we can bypass the wait for safe
124 * point stage.
125 */
126@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
127public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
128
129  private static final Logger LOG = LoggerFactory.getLogger(AsyncFSWAL.class);
130
131  private static final Comparator<SyncFuture> SEQ_COMPARATOR = (o1, o2) -> {
132    int c = Long.compare(o1.getTxid(), o2.getTxid());
133    return c != 0 ? c : Integer.compare(System.identityHashCode(o1), System.identityHashCode(o2));
134  };
135
136  public static final String WAL_BATCH_SIZE = "hbase.wal.batch.size";
137  public static final long DEFAULT_WAL_BATCH_SIZE = 64L * 1024;
138
139  public static final String ASYNC_WAL_USE_SHARED_EVENT_LOOP =
140    "hbase.wal.async.use-shared-event-loop";
141  public static final boolean DEFAULT_ASYNC_WAL_USE_SHARED_EVENT_LOOP = false;
142
143  public static final String ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS =
144    "hbase.wal.async.wait.on.shutdown.seconds";
145  public static final int DEFAULT_ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS = 5;
146
147  private final EventLoopGroup eventLoopGroup;
148
149  private final ExecutorService consumeExecutor;
150
151  private final Class<? extends Channel> channelClass;
152
153  private final Lock consumeLock = new ReentrantLock();
154
155  private final Runnable consumer = this::consume;
156
157  // check if there is already a consumer task in the event loop's task queue
158  private final Supplier<Boolean> hasConsumerTask;
159
160  private static final int MAX_EPOCH = 0x3FFFFFFF;
161  // the lowest bit is waitingRoll, which means new writer is created and we are waiting for old
162  // writer to be closed.
163  // the second lowest bit is writerBorken which means the current writer is broken and rollWriter
164  // is needed.
165  // all other bits are the epoch number of the current writer, this is used to detect whether the
166  // writer is still the one when you issue the sync.
167  // notice that, modification to this field is only allowed under the protection of consumeLock.
168  private volatile int epochAndState;
169
170  private boolean readyForRolling;
171
172  private final Condition readyForRollingCond = consumeLock.newCondition();
173
174  private final RingBuffer<RingBufferTruck> waitingConsumePayloads;
175
176  private final Sequence waitingConsumePayloadsGatingSequence;
177
178  private final AtomicBoolean consumerScheduled = new AtomicBoolean(false);
179
180  private final long batchSize;
181
182  private final ExecutorService closeExecutor = Executors.newCachedThreadPool(
183    new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build());
184
185  private volatile AsyncFSOutput fsOut;
186
187  private final Deque<FSWALEntry> toWriteAppends = new ArrayDeque<>();
188
189  private final Deque<FSWALEntry> unackedAppends = new ArrayDeque<>();
190
191  private final SortedSet<SyncFuture> syncFutures = new TreeSet<>(SEQ_COMPARATOR);
192
193  // the highest txid of WAL entries being processed
194  private long highestProcessedAppendTxid;
195
196  // file length when we issue last sync request on the writer
197  private long fileLengthAtLastSync;
198
199  private long highestProcessedAppendTxidAtLastSync;
200
201  private final int waitOnShutdownInSeconds;
202
203  public AsyncFSWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir,
204      Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists,
205      String prefix, String suffix, EventLoopGroup eventLoopGroup,
206      Class<? extends Channel> channelClass) throws FailedLogCloseException, IOException {
207    super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix);
208    this.eventLoopGroup = eventLoopGroup;
209    this.channelClass = channelClass;
210    Supplier<Boolean> hasConsumerTask;
211    if (conf.getBoolean(ASYNC_WAL_USE_SHARED_EVENT_LOOP, DEFAULT_ASYNC_WAL_USE_SHARED_EVENT_LOOP)) {
212      this.consumeExecutor = eventLoopGroup.next();
213      if (consumeExecutor instanceof SingleThreadEventExecutor) {
214        try {
215          Field field = SingleThreadEventExecutor.class.getDeclaredField("taskQueue");
216          field.setAccessible(true);
217          Queue<?> queue = (Queue<?>) field.get(consumeExecutor);
218          hasConsumerTask = () -> queue.peek() == consumer;
219        } catch (Exception e) {
220          LOG.warn("Can not get task queue of " + consumeExecutor +
221            ", this is not necessary, just give up", e);
222          hasConsumerTask = () -> false;
223        }
224      } else {
225        hasConsumerTask = () -> false;
226      }
227    } else {
228      ThreadPoolExecutor threadPool =
229        new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
230            new ThreadFactoryBuilder().setNameFormat("AsyncFSWAL-%d").setDaemon(true).build());
231      hasConsumerTask = () -> threadPool.getQueue().peek() == consumer;
232      this.consumeExecutor = threadPool;
233    }
234
235    this.hasConsumerTask = hasConsumerTask;
236    int preallocatedEventCount =
237      conf.getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16);
238    waitingConsumePayloads =
239      RingBuffer.createMultiProducer(RingBufferTruck::new, preallocatedEventCount);
240    waitingConsumePayloadsGatingSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
241    waitingConsumePayloads.addGatingSequences(waitingConsumePayloadsGatingSequence);
242
243    // inrease the ringbuffer sequence so our txid is start from 1
244    waitingConsumePayloads.publish(waitingConsumePayloads.next());
245    waitingConsumePayloadsGatingSequence.set(waitingConsumePayloads.getCursor());
246
247    batchSize = conf.getLong(WAL_BATCH_SIZE, DEFAULT_WAL_BATCH_SIZE);
248    waitOnShutdownInSeconds = conf.getInt(ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS,
249      DEFAULT_ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS);
250  }
251
252  private static boolean waitingRoll(int epochAndState) {
253    return (epochAndState & 1) != 0;
254  }
255
256  private static boolean writerBroken(int epochAndState) {
257    return ((epochAndState >>> 1) & 1) != 0;
258  }
259
260  private static int epoch(int epochAndState) {
261    return epochAndState >>> 2;
262  }
263
264  // return whether we have successfully set readyForRolling to true.
265  private boolean trySetReadyForRolling() {
266    // Check without holding lock first. Usually we will just return here.
267    // waitingRoll is volatile and unacedEntries is only accessed inside event loop so it is safe to
268    // check them outside the consumeLock.
269    if (!waitingRoll(epochAndState) || !unackedAppends.isEmpty()) {
270      return false;
271    }
272    consumeLock.lock();
273    try {
274      // 1. a roll is requested
275      // 2. all out-going entries have been acked(we have confirmed above).
276      if (waitingRoll(epochAndState)) {
277        readyForRolling = true;
278        readyForRollingCond.signalAll();
279        return true;
280      } else {
281        return false;
282      }
283    } finally {
284      consumeLock.unlock();
285    }
286  }
287
288  private void syncFailed(long epochWhenSync, Throwable error) {
289    LOG.warn("sync failed", error);
290    boolean shouldRequestLogRoll = true;
291    consumeLock.lock();
292    try {
293      int currentEpochAndState = epochAndState;
294      if (epoch(currentEpochAndState) != epochWhenSync || writerBroken(currentEpochAndState)) {
295        // this is not the previous writer which means we have already rolled the writer.
296        // or this is still the current writer, but we have already marked it as broken and request
297        // a roll.
298        return;
299      }
300      this.epochAndState = currentEpochAndState | 0b10;
301      if (waitingRoll(currentEpochAndState)) {
302        readyForRolling = true;
303        readyForRollingCond.signalAll();
304        // this means we have already in the middle of a rollWriter so just tell the roller thread
305        // that you can continue without requesting an extra log roll.
306        shouldRequestLogRoll = false;
307      }
308    } finally {
309      consumeLock.unlock();
310    }
311    for (Iterator<FSWALEntry> iter = unackedAppends.descendingIterator(); iter.hasNext();) {
312      toWriteAppends.addFirst(iter.next());
313    }
314    highestUnsyncedTxid = highestSyncedTxid.get();
315    if (shouldRequestLogRoll) {
316      // request a roll.
317      requestLogRoll();
318    }
319  }
320
321  private void syncCompleted(AsyncWriter writer, long processedTxid, long startTimeNs) {
322    highestSyncedTxid.set(processedTxid);
323    for (Iterator<FSWALEntry> iter = unackedAppends.iterator(); iter.hasNext();) {
324      FSWALEntry entry = iter.next();
325      if (entry.getTxid() <= processedTxid) {
326        entry.release();
327        iter.remove();
328      } else {
329        break;
330      }
331    }
332    postSync(System.nanoTime() - startTimeNs, finishSync(true));
333    if (trySetReadyForRolling()) {
334      // we have just finished a roll, then do not need to check for log rolling, the writer will be
335      // closed soon.
336      return;
337    }
338    if (writer.getLength() < logrollsize || isLogRollRequested()) {
339      return;
340    }
341    requestLogRoll();
342  }
343
344  // find all the sync futures between these two txids to see if we need to issue a hsync, if no
345  // sync futures then just use the default one.
346  private boolean isHsync(long beginTxid, long endTxid) {
347    SortedSet<SyncFuture> futures =
348      syncFutures.subSet(new SyncFuture().reset(beginTxid), new SyncFuture().reset(endTxid + 1));
349    if (futures.isEmpty()) {
350      return useHsync;
351    }
352    for (SyncFuture future : futures) {
353      if (future.isForceSync()) {
354        return true;
355      }
356    }
357    return false;
358  }
359
360  private void sync(AsyncWriter writer) {
361    fileLengthAtLastSync = writer.getLength();
362    long currentHighestProcessedAppendTxid = highestProcessedAppendTxid;
363    boolean shouldUseHsync =
364      isHsync(highestProcessedAppendTxidAtLastSync, currentHighestProcessedAppendTxid);
365    highestProcessedAppendTxidAtLastSync = currentHighestProcessedAppendTxid;
366    final long startTimeNs = System.nanoTime();
367    final long epoch = (long) epochAndState >>> 2L;
368    addListener(writer.sync(shouldUseHsync), (result, error) -> {
369      if (error != null) {
370        syncFailed(epoch, error);
371      } else {
372        syncCompleted(writer, currentHighestProcessedAppendTxid, startTimeNs);
373      }
374    }, consumeExecutor);
375  }
376
377  private void addTimeAnnotation(SyncFuture future, String annotation) {
378    TraceUtil.addTimelineAnnotation(annotation);
379    // TODO handle htrace API change, see HBASE-18895
380    // future.setSpan(scope.getSpan());
381  }
382
383  private int finishSyncLowerThanTxid(long txid, boolean addSyncTrace) {
384    int finished = 0;
385    for (Iterator<SyncFuture> iter = syncFutures.iterator(); iter.hasNext();) {
386      SyncFuture sync = iter.next();
387      if (sync.getTxid() <= txid) {
388        sync.done(txid, null);
389        iter.remove();
390        finished++;
391        if (addSyncTrace) {
392          addTimeAnnotation(sync, "writer synced");
393        }
394      } else {
395        break;
396      }
397    }
398    return finished;
399  }
400
401  // try advancing the highestSyncedTxid as much as possible
402  private int finishSync(boolean addSyncTrace) {
403    if (unackedAppends.isEmpty()) {
404      // All outstanding appends have been acked.
405      if (toWriteAppends.isEmpty()) {
406        // Also no appends that wait to be written out, then just finished all pending syncs.
407        long maxSyncTxid = highestSyncedTxid.get();
408        for (SyncFuture sync : syncFutures) {
409          maxSyncTxid = Math.max(maxSyncTxid, sync.getTxid());
410          sync.done(maxSyncTxid, null);
411          if (addSyncTrace) {
412            addTimeAnnotation(sync, "writer synced");
413          }
414        }
415        highestSyncedTxid.set(maxSyncTxid);
416        int finished = syncFutures.size();
417        syncFutures.clear();
418        return finished;
419      } else {
420        // There is no append between highestProcessedAppendTxid and lowestUnprocessedAppendTxid, so
421        // if highestSyncedTxid >= highestProcessedAppendTxid, then all syncs whose txid are between
422        // highestProcessedAppendTxid and lowestUnprocessedAppendTxid can be finished.
423        long lowestUnprocessedAppendTxid = toWriteAppends.peek().getTxid();
424        assert lowestUnprocessedAppendTxid > highestProcessedAppendTxid;
425        long doneTxid = lowestUnprocessedAppendTxid - 1;
426        highestSyncedTxid.set(doneTxid);
427        return finishSyncLowerThanTxid(doneTxid, addSyncTrace);
428      }
429    } else {
430      // There are still unacked appends. So let's move the highestSyncedTxid to the txid of the
431      // first unacked append minus 1.
432      long lowestUnackedAppendTxid = unackedAppends.peek().getTxid();
433      long doneTxid = Math.max(lowestUnackedAppendTxid - 1, highestSyncedTxid.get());
434      highestSyncedTxid.set(doneTxid);
435      return finishSyncLowerThanTxid(doneTxid, addSyncTrace);
436    }
437  }
438
439  private void appendAndSync() {
440    final AsyncWriter writer = this.writer;
441    // maybe a sync request is not queued when we issue a sync, so check here to see if we could
442    // finish some.
443    finishSync(false);
444    long newHighestProcessedAppendTxid = -1L;
445    for (Iterator<FSWALEntry> iter = toWriteAppends.iterator(); iter.hasNext();) {
446      FSWALEntry entry = iter.next();
447      boolean appended;
448      try {
449        appended = appendEntry(writer, entry);
450      } catch (IOException e) {
451        throw new AssertionError("should not happen", e);
452      }
453      newHighestProcessedAppendTxid = entry.getTxid();
454      iter.remove();
455      if (appended) {
456        // This is possible, when we fail to sync, we will add the unackedAppends back to
457        // toWriteAppends, so here we may get an entry which is already in the unackedAppends.
458        if (unackedAppends.isEmpty() || unackedAppends.peekLast().getTxid() < entry.getTxid()) {
459          unackedAppends.addLast(entry);
460        }
461        if (writer.getLength() - fileLengthAtLastSync >= batchSize) {
462          break;
463        }
464      }
465    }
466    // if we have a newer transaction id, update it.
467    // otherwise, use the previous transaction id.
468    if (newHighestProcessedAppendTxid > 0) {
469      highestProcessedAppendTxid = newHighestProcessedAppendTxid;
470    } else {
471      newHighestProcessedAppendTxid = highestProcessedAppendTxid;
472    }
473
474    if (writer.getLength() - fileLengthAtLastSync >= batchSize) {
475      // sync because buffer size limit.
476      sync(writer);
477      return;
478    }
479    if (writer.getLength() == fileLengthAtLastSync) {
480      // we haven't written anything out, just advance the highestSyncedSequence since we may only
481      // stamped some region sequence id.
482      if (unackedAppends.isEmpty()) {
483        highestSyncedTxid.set(highestProcessedAppendTxid);
484        finishSync(false);
485        trySetReadyForRolling();
486      }
487      return;
488    }
489    // reach here means that we have some unsynced data but haven't reached the batch size yet
490    // but we will not issue a sync directly here even if there are sync requests because we may
491    // have some new data in the ringbuffer, so let's just return here and delay the decision of
492    // whether to issue a sync in the caller method.
493  }
494
495  private void consume() {
496    consumeLock.lock();
497    try {
498      int currentEpochAndState = epochAndState;
499      if (writerBroken(currentEpochAndState)) {
500        return;
501      }
502      if (waitingRoll(currentEpochAndState)) {
503        if (writer.getLength() > fileLengthAtLastSync) {
504          // issue a sync
505          sync(writer);
506        } else {
507          if (unackedAppends.isEmpty()) {
508            readyForRolling = true;
509            readyForRollingCond.signalAll();
510          }
511        }
512        return;
513      }
514    } finally {
515      consumeLock.unlock();
516    }
517    long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1;
518    for (long cursorBound = waitingConsumePayloads.getCursor(); nextCursor <= cursorBound;
519      nextCursor++) {
520      if (!waitingConsumePayloads.isPublished(nextCursor)) {
521        break;
522      }
523      RingBufferTruck truck = waitingConsumePayloads.get(nextCursor);
524      switch (truck.type()) {
525        case APPEND:
526          toWriteAppends.addLast(truck.unloadAppend());
527          break;
528        case SYNC:
529          syncFutures.add(truck.unloadSync());
530          break;
531        default:
532          LOG.warn("RingBufferTruck with unexpected type: " + truck.type());
533          break;
534      }
535      waitingConsumePayloadsGatingSequence.set(nextCursor);
536    }
537    appendAndSync();
538    if (hasConsumerTask.get()) {
539      return;
540    }
541    if (toWriteAppends.isEmpty()) {
542      if (waitingConsumePayloadsGatingSequence.get() == waitingConsumePayloads.getCursor()) {
543        consumerScheduled.set(false);
544        // recheck here since in append and sync we do not hold the consumeLock. Thing may
545        // happen like
546        // 1. we check cursor, no new entry
547        // 2. someone publishes a new entry to ringbuffer and the consumerScheduled is true and
548        // give up scheduling the consumer task.
549        // 3. we set consumerScheduled to false and also give up scheduling consumer task.
550        if (waitingConsumePayloadsGatingSequence.get() == waitingConsumePayloads.getCursor()) {
551          // we will give up consuming so if there are some unsynced data we need to issue a sync.
552          if (writer.getLength() > fileLengthAtLastSync && !syncFutures.isEmpty() &&
553            syncFutures.last().getTxid() > highestProcessedAppendTxidAtLastSync) {
554            // no new data in the ringbuffer and we have at least one sync request
555            sync(writer);
556          }
557          return;
558        } else {
559          // maybe someone has grabbed this before us
560          if (!consumerScheduled.compareAndSet(false, true)) {
561            return;
562          }
563        }
564      }
565    }
566    // reschedule if we still have something to write.
567    consumeExecutor.execute(consumer);
568  }
569
570  private boolean shouldScheduleConsumer() {
571    int currentEpochAndState = epochAndState;
572    if (writerBroken(currentEpochAndState) || waitingRoll(currentEpochAndState)) {
573      return false;
574    }
575    return consumerScheduled.compareAndSet(false, true);
576  }
577
578  @Override
579  protected long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore)
580      throws IOException {
581    long txid = stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore,
582      waitingConsumePayloads);
583    if (shouldScheduleConsumer()) {
584      consumeExecutor.execute(consumer);
585    }
586    return txid;
587  }
588
589  @Override
590  public void sync() throws IOException {
591    sync(useHsync);
592  }
593
594  @Override
595  public void sync(long txid) throws IOException {
596    sync(txid, useHsync);
597  }
598
599  @Override
600  public void sync(boolean forceSync) throws IOException {
601    try (TraceScope scope = TraceUtil.createTrace("AsyncFSWAL.sync")) {
602      long txid = waitingConsumePayloads.next();
603      SyncFuture future;
604      try {
605        future = getSyncFuture(txid, forceSync);
606        RingBufferTruck truck = waitingConsumePayloads.get(txid);
607        truck.load(future);
608      } finally {
609        waitingConsumePayloads.publish(txid);
610      }
611      if (shouldScheduleConsumer()) {
612        consumeExecutor.execute(consumer);
613      }
614      blockOnSync(future);
615    }
616  }
617
618  @Override
619  public void sync(long txid, boolean forceSync) throws IOException {
620    if (highestSyncedTxid.get() >= txid) {
621      return;
622    }
623    try (TraceScope scope = TraceUtil.createTrace("AsyncFSWAL.sync")) {
624      // here we do not use ring buffer sequence as txid
625      long sequence = waitingConsumePayloads.next();
626      SyncFuture future;
627      try {
628        future = getSyncFuture(txid, forceSync);
629        RingBufferTruck truck = waitingConsumePayloads.get(sequence);
630        truck.load(future);
631      } finally {
632        waitingConsumePayloads.publish(sequence);
633      }
634      if (shouldScheduleConsumer()) {
635        consumeExecutor.execute(consumer);
636      }
637      blockOnSync(future);
638    }
639  }
640
641  @Override
642  protected AsyncWriter createWriterInstance(Path path) throws IOException {
643    return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, false,
644        this.blocksize, eventLoopGroup, channelClass);
645  }
646
647  private void waitForSafePoint() {
648    consumeLock.lock();
649    try {
650      int currentEpochAndState = epochAndState;
651      if (writerBroken(currentEpochAndState) || this.writer == null) {
652        return;
653      }
654      consumerScheduled.set(true);
655      epochAndState = currentEpochAndState | 1;
656      readyForRolling = false;
657      consumeExecutor.execute(consumer);
658      while (!readyForRolling) {
659        readyForRollingCond.awaitUninterruptibly();
660      }
661    } finally {
662      consumeLock.unlock();
663    }
664  }
665
666  private long closeWriter() {
667    AsyncWriter oldWriter = this.writer;
668    if (oldWriter != null) {
669      long fileLength = oldWriter.getLength();
670      closeExecutor.execute(() -> {
671        try {
672          oldWriter.close();
673        } catch (IOException e) {
674          LOG.warn("close old writer failed", e);
675        }
676      });
677      return fileLength;
678    } else {
679      return 0L;
680    }
681  }
682
683  @Override
684  protected void doReplaceWriter(Path oldPath, Path newPath, AsyncWriter nextWriter)
685      throws IOException {
686    Preconditions.checkNotNull(nextWriter);
687    waitForSafePoint();
688    long oldFileLen = closeWriter();
689    logRollAndSetupWalProps(oldPath, newPath, oldFileLen);
690    this.writer = nextWriter;
691    if (nextWriter instanceof AsyncProtobufLogWriter) {
692      this.fsOut = ((AsyncProtobufLogWriter) nextWriter).getOutput();
693    }
694    this.fileLengthAtLastSync = nextWriter.getLength();
695    this.highestProcessedAppendTxidAtLastSync = 0L;
696    consumeLock.lock();
697    try {
698      consumerScheduled.set(true);
699      int currentEpoch = epochAndState >>> 2;
700      int nextEpoch = currentEpoch == MAX_EPOCH ? 0 : currentEpoch + 1;
701      // set a new epoch and also clear waitingRoll and writerBroken
702      this.epochAndState = nextEpoch << 2;
703      // Reset rollRequested status
704      rollRequested.set(false);
705      consumeExecutor.execute(consumer);
706    } finally {
707      consumeLock.unlock();
708    }
709  }
710
711  @Override
712  protected void doShutdown() throws IOException {
713    waitForSafePoint();
714    closeWriter();
715    closeExecutor.shutdown();
716    try {
717      if (!closeExecutor.awaitTermination(waitOnShutdownInSeconds, TimeUnit.SECONDS)) {
718        LOG.error("We have waited " + waitOnShutdownInSeconds + " seconds but" +
719          " the close of async writer doesn't complete." +
720          "Please check the status of underlying filesystem" +
721          " or increase the wait time by the config \"" + ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS +
722          "\"");
723      }
724    } catch (InterruptedException e) {
725      LOG.error("The wait for close of async writer is interrupted");
726      Thread.currentThread().interrupt();
727    }
728    IOException error = new IOException("WAL has been closed");
729    long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1;
730    // drain all the pending sync requests
731    for (long cursorBound = waitingConsumePayloads.getCursor(); nextCursor <= cursorBound;
732      nextCursor++) {
733      if (!waitingConsumePayloads.isPublished(nextCursor)) {
734        break;
735      }
736      RingBufferTruck truck = waitingConsumePayloads.get(nextCursor);
737      switch (truck.type()) {
738        case SYNC:
739          syncFutures.add(truck.unloadSync());
740          break;
741        default:
742          break;
743      }
744    }
745    // and fail them
746    syncFutures.forEach(f -> f.done(f.getTxid(), error));
747    if (!(consumeExecutor instanceof EventLoop)) {
748      consumeExecutor.shutdown();
749    }
750  }
751
752  @Override
753  protected void doAppend(AsyncWriter writer, FSWALEntry entry) {
754    writer.append(entry);
755  }
756
757  @Override
758  DatanodeInfo[] getPipeline() {
759    AsyncFSOutput output = this.fsOut;
760    return output != null ? output.getPipeline() : new DatanodeInfo[0];
761  }
762
763  @Override
764  int getLogReplication() {
765    return getPipeline().length;
766  }
767
768  @Override
769  protected boolean doCheckLogLowReplication() {
770    // not like FSHLog, AsyncFSOutput will fail immediately if there are errors writing to DNs, so
771    // typically there is no 'low replication' state, only a 'broken' state.
772    AsyncFSOutput output = this.fsOut;
773    return output != null && output.isBroken();
774  }
775}