001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import java.io.IOException;
021import java.lang.reflect.Field;
022import java.util.List;
023import java.util.Queue;
024import java.util.concurrent.CompletableFuture;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.Abortable;
029import org.apache.hadoop.hbase.HBaseInterfaceAudience;
030import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
031import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
032import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
033import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter;
034import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
035import org.apache.yetus.audience.InterfaceAudience;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039import org.apache.hbase.thirdparty.io.netty.channel.Channel;
040import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
041import org.apache.hbase.thirdparty.io.netty.util.concurrent.SingleThreadEventExecutor;
042
043/**
044 * An asynchronous implementation of FSWAL.
045 * <p>
046 * Here 'waitingConsumePayloads' acts as the RingBuffer in FSHLog.
047 * <p>
048 * For append, we process it as follow:
049 * <ol>
050 * <li>In the caller thread(typically, in the rpc handler thread):
051 * <ol>
052 * <li>Insert the entry into 'waitingConsumePayloads'. Use ringbuffer sequence as txid.</li>
053 * <li>Schedule the consumer task if needed. See {@link #shouldScheduleConsumer()} for more details.
054 * </li>
055 * </ol>
056 * </li>
057 * <li>In the consumer task(executed in a single threaded thread pool)
058 * <ol>
059 * <li>Poll the entry from {@link #waitingConsumePayloads} and insert it into
060 * {@link #toWriteAppends}</li>
061 * <li>Poll the entry from {@link #toWriteAppends}, append it to the AsyncWriter, and insert it into
062 * {@link #unackedAppends}</li>
063 * <li>If the buffered size reaches {@link #batchSize}, or there is a sync request, then we call
064 * sync on the AsyncWriter.</li>
065 * <li>In the callback methods:
066 * <ul>
067 * <li>If succeeded, poll the entry from {@link #unackedAppends} and drop it.</li>
068 * <li>If failed, add all the entries in {@link #unackedAppends} back to {@link #toWriteAppends} and
069 * wait for writing them again.</li>
070 * </ul>
071 * </li>
072 * </ol>
073 * </li>
074 * </ol>
075 * For sync, the processing stages are almost same. And different from FSHLog, we will open a new
076 * writer and rewrite unacked entries to the new writer and sync again if we hit a sync error.
077 * <p>
078 * Here we only describe the logic of doReplaceWriter. The main logic of rollWriter is same with
079 * FSHLog.<br>
080 * For a normal roll request(for example, we have reached the log roll size):
081 * <ol>
082 * <li>In the log roller thread, we will set {@link #waitingRoll} to true and
083 * {@link #readyForRolling} to false, and then wait on {@link #readyForRolling}(see
084 * {@link #waitForSafePoint()}).</li>
085 * <li>In the consumer thread, we will stop polling entries from {@link #waitingConsumePayloads} if
086 * {@link #waitingRoll} is true, and also stop writing the entries in {@link #toWriteAppends} out.
087 * </li>
088 * <li>If there are unflush data in the writer, sync them.</li>
089 * <li>When all out-going sync request is finished, i.e, the {@link #unackedAppends} is empty,
090 * signal the {@link #readyForRollingCond}.</li>
091 * <li>Back to the log roller thread, now we can confirm that there are no out-going entries, i.e.,
092 * we reach a safe point. So it is safe to replace old writer with new writer now.</li>
093 * <li>Set {@link #writerBroken} and {@link #waitingRoll} to false.</li>
094 * <li>Schedule the consumer task.</li>
095 * <li>Schedule a background task to close the old writer.</li>
096 * </ol>
097 * For a broken writer roll request, the only difference is that we can bypass the wait for safe
098 * point stage.
099 */
100@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
101public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
102
103  private static final Logger LOG = LoggerFactory.getLogger(AsyncFSWAL.class);
104
105  public static final String WAL_BATCH_SIZE = "hbase.wal.batch.size";
106  public static final long DEFAULT_WAL_BATCH_SIZE = 64L * 1024;
107
108  public static final String ASYNC_WAL_USE_SHARED_EVENT_LOOP =
109    "hbase.wal.async.use-shared-event-loop";
110  public static final boolean DEFAULT_ASYNC_WAL_USE_SHARED_EVENT_LOOP = false;
111
112  public static final String ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS =
113    "hbase.wal.async.wait.on.shutdown.seconds";
114  public static final int DEFAULT_ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS = 5;
115
116  private final EventLoopGroup eventLoopGroup;
117
118  private final Class<? extends Channel> channelClass;
119
120  private volatile AsyncFSOutput fsOut;
121
122  private final StreamSlowMonitor streamSlowMonitor;
123
124  public AsyncFSWAL(FileSystem fs, Abortable abortable, Path rootDir, String logDir,
125    String archiveDir, Configuration conf, List<WALActionsListener> listeners,
126    boolean failIfWALExists, String prefix, String suffix, FileSystem remoteFs, Path remoteWALDir,
127    EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass, StreamSlowMonitor monitor)
128    throws FailedLogCloseException, IOException {
129    super(fs, abortable, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix,
130      suffix, remoteFs, remoteWALDir);
131    this.eventLoopGroup = eventLoopGroup;
132    this.channelClass = channelClass;
133    this.streamSlowMonitor = monitor;
134    if (conf.getBoolean(ASYNC_WAL_USE_SHARED_EVENT_LOOP, DEFAULT_ASYNC_WAL_USE_SHARED_EVENT_LOOP)) {
135      this.consumeExecutor = eventLoopGroup.next();
136      this.shouldShutDownConsumeExecutorWhenClose = false;
137      if (consumeExecutor instanceof SingleThreadEventExecutor) {
138        try {
139          Field field = SingleThreadEventExecutor.class.getDeclaredField("taskQueue");
140          field.setAccessible(true);
141          Queue<?> queue = (Queue<?>) field.get(consumeExecutor);
142          this.hasConsumerTask = () -> queue.peek() == consumer;
143        } catch (Exception e) {
144          LOG.warn("Can not get task queue of " + consumeExecutor
145            + ", this is not necessary, just give up", e);
146          this.hasConsumerTask = () -> false;
147        }
148      } else {
149        this.hasConsumerTask = () -> false;
150      }
151    } else {
152      this.createSingleThreadPoolConsumeExecutor("AsyncFSWAL", rootDir, prefix);
153    }
154
155    this.setWaitOnShutdownInSeconds(conf.getInt(ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS,
156      DEFAULT_ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS), ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS);
157  }
158
159  @Override
160  protected CompletableFuture<Long> doWriterSync(AsyncWriter writer, boolean shouldUseHsync,
161    long txidWhenSyn) {
162    return writer.sync(shouldUseHsync);
163  }
164
165  protected final AsyncWriter createAsyncWriter(FileSystem fs, Path path) throws IOException {
166    return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, false, this.blocksize,
167      eventLoopGroup, channelClass, streamSlowMonitor);
168  }
169
170  @Override
171  protected AsyncWriter createWriterInstance(FileSystem fs, Path path) throws IOException {
172    return createAsyncWriter(fs, path);
173  }
174
175  @Override
176  protected void onWriterReplaced(AsyncWriter nextWriter) {
177    if (nextWriter instanceof AsyncProtobufLogWriter) {
178      this.fsOut = ((AsyncProtobufLogWriter) nextWriter).getOutput();
179    }
180  }
181
182  @Override
183  protected void doAppend(AsyncWriter writer, FSWALEntry entry) {
184    writer.append(entry);
185  }
186
187  @Override
188  DatanodeInfo[] getPipeline() {
189    AsyncFSOutput output = this.fsOut;
190    return output != null ? output.getPipeline() : new DatanodeInfo[0];
191  }
192
193  @Override
194  int getLogReplication() {
195    return getPipeline().length;
196  }
197
198  @Override
199  protected boolean doCheckLogLowReplication() {
200    // not like FSHLog, AsyncFSOutput will fail immediately if there are errors writing to DNs, so
201    // typically there is no 'low replication' state, only a 'broken' state.
202    AsyncFSOutput output = this.fsOut;
203    return output != null && output.isBroken();
204  }
205
206  @Override
207  protected AsyncWriter createCombinedWriter(AsyncWriter localWriter, AsyncWriter remoteWriter) {
208    // put remote writer first as usually it will cost more time to finish, so we write to it first
209    return CombinedAsyncWriter.create(remoteWriter, localWriter);
210  }
211}