001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import java.io.IOException;
021import java.lang.reflect.Field;
022import java.util.List;
023import java.util.Queue;
024import java.util.concurrent.CompletableFuture;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.Abortable;
029import org.apache.hadoop.hbase.HBaseInterfaceAudience;
030import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
031import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
032import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
033import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter;
034import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
035import org.apache.yetus.audience.InterfaceAudience;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039import org.apache.hbase.thirdparty.io.netty.channel.Channel;
040import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
041import org.apache.hbase.thirdparty.io.netty.util.concurrent.SingleThreadEventExecutor;
042
043/**
044 * An asynchronous implementation of FSWAL.
045 * <p>
046 * Here 'waitingConsumePayloads' acts as the RingBuffer in FSHLog.
047 * <p>
048 * For append, we process it as follow:
049 * <ol>
050 * <li>In the caller thread(typically, in the rpc handler thread):
051 * <ol>
052 * <li>Insert the entry into 'waitingConsumePayloads'. Use ringbuffer sequence as txid.</li>
053 * <li>Schedule the consumer task if needed. See {@link #shouldScheduleConsumer()} for more details.
054 * </li>
055 * </ol>
056 * </li>
057 * <li>In the consumer task(executed in a single threaded thread pool)
058 * <ol>
059 * <li>Poll the entry from {@link #waitingConsumePayloads} and insert it into
060 * {@link #toWriteAppends}</li>
061 * <li>Poll the entry from {@link #toWriteAppends}, append it to the AsyncWriter, and insert it into
062 * {@link #unackedAppends}</li>
063 * <li>If the buffered size reaches {@link #batchSize}, or there is a sync request, then we call
064 * sync on the AsyncWriter.</li>
065 * <li>In the callback methods:
066 * <ul>
067 * <li>If succeeded, poll the entry from {@link #unackedAppends} and drop it.</li>
068 * <li>If failed, add all the entries in {@link #unackedAppends} back to {@link #toWriteAppends} and
069 * wait for writing them again.</li>
070 * </ul>
071 * </li>
072 * </ol>
073 * </li>
074 * </ol>
075 * For sync, the processing stages are almost same. And different from FSHLog, we will open a new
076 * writer and rewrite unacked entries to the new writer and sync again if we hit a sync error.
077 * <p>
078 * Here we only describe the logic of doReplaceWriter. The main logic of rollWriter is same with
079 * FSHLog.<br>
080 * For a normal roll request(for example, we have reached the log roll size):
081 * <ol>
082 * <li>In the log roller thread, we will set {@link #waitingRoll} to true and
083 * {@link #readyForRolling} to false, and then wait on {@link #readyForRolling}(see
084 * {@link #waitForSafePoint()}).</li>
085 * <li>In the consumer thread, we will stop polling entries from {@link #waitingConsumePayloads} if
086 * {@link #waitingRoll} is true, and also stop writing the entries in {@link #toWriteAppends} out.
087 * </li>
088 * <li>If there are unflush data in the writer, sync them.</li>
089 * <li>When all out-going sync request is finished, i.e, the {@link #unackedAppends} is empty,
090 * signal the {@link #readyForRollingCond}.</li>
091 * <li>Back to the log roller thread, now we can confirm that there are no out-going entries, i.e.,
092 * we reach a safe point. So it is safe to replace old writer with new writer now.</li>
093 * <li>Set {@link #writerBroken} and {@link #waitingRoll} to false.</li>
094 * <li>Schedule the consumer task.</li>
095 * <li>Schedule a background task to close the old writer.</li>
096 * </ol>
097 * For a broken writer roll request, the only difference is that we can bypass the wait for safe
098 * point stage.
099 */
100@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
101public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
102
103  private static final Logger LOG = LoggerFactory.getLogger(AsyncFSWAL.class);
104
105  public static final String ASYNC_WAL_USE_SHARED_EVENT_LOOP =
106    "hbase.wal.async.use-shared-event-loop";
107  public static final boolean DEFAULT_ASYNC_WAL_USE_SHARED_EVENT_LOOP = false;
108
109  public static final String ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS =
110    "hbase.wal.async.wait.on.shutdown.seconds";
111  public static final int DEFAULT_ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS = 5;
112
113  private final EventLoopGroup eventLoopGroup;
114
115  private final Class<? extends Channel> channelClass;
116
117  private volatile AsyncFSOutput fsOut;
118
119  private final StreamSlowMonitor streamSlowMonitor;
120
121  public AsyncFSWAL(FileSystem fs, Abortable abortable, Path rootDir, String logDir,
122    String archiveDir, Configuration conf, List<WALActionsListener> listeners,
123    boolean failIfWALExists, String prefix, String suffix, FileSystem remoteFs, Path remoteWALDir,
124    EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass, StreamSlowMonitor monitor)
125    throws FailedLogCloseException, IOException {
126    super(fs, abortable, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix,
127      suffix, remoteFs, remoteWALDir);
128    this.eventLoopGroup = eventLoopGroup;
129    this.channelClass = channelClass;
130    this.streamSlowMonitor = monitor;
131    if (conf.getBoolean(ASYNC_WAL_USE_SHARED_EVENT_LOOP, DEFAULT_ASYNC_WAL_USE_SHARED_EVENT_LOOP)) {
132      this.consumeExecutor = eventLoopGroup.next();
133      this.shouldShutDownConsumeExecutorWhenClose = false;
134      if (consumeExecutor instanceof SingleThreadEventExecutor) {
135        try {
136          Field field = SingleThreadEventExecutor.class.getDeclaredField("taskQueue");
137          field.setAccessible(true);
138          Queue<?> queue = (Queue<?>) field.get(consumeExecutor);
139          this.hasConsumerTask = () -> queue.peek() == consumer;
140        } catch (Exception e) {
141          LOG.warn("Can not get task queue of " + consumeExecutor
142            + ", this is not necessary, just give up", e);
143          this.hasConsumerTask = () -> false;
144        }
145      } else {
146        this.hasConsumerTask = () -> false;
147      }
148    } else {
149      this.createSingleThreadPoolConsumeExecutor("AsyncFSWAL", rootDir, prefix);
150    }
151
152    this.setWaitOnShutdownInSeconds(conf.getInt(ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS,
153      DEFAULT_ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS), ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS);
154  }
155
156  @Override
157  protected CompletableFuture<Long> doWriterSync(AsyncWriter writer, boolean shouldUseHsync,
158    long txidWhenSyn) {
159    return writer.sync(shouldUseHsync);
160  }
161
162  protected final AsyncWriter createAsyncWriter(FileSystem fs, Path path) throws IOException {
163    return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, false, this.blocksize,
164      eventLoopGroup, channelClass, streamSlowMonitor);
165  }
166
167  @Override
168  protected AsyncWriter createWriterInstance(FileSystem fs, Path path) throws IOException {
169    return createAsyncWriter(fs, path);
170  }
171
172  @Override
173  protected void onWriterReplaced(AsyncWriter nextWriter) {
174    if (nextWriter instanceof AsyncProtobufLogWriter) {
175      this.fsOut = ((AsyncProtobufLogWriter) nextWriter).getOutput();
176    }
177  }
178
179  @Override
180  protected void doAppend(AsyncWriter writer, FSWALEntry entry) {
181    writer.append(entry);
182  }
183
184  @Override
185  DatanodeInfo[] getPipeline() {
186    AsyncFSOutput output = this.fsOut;
187    return output != null ? output.getPipeline() : new DatanodeInfo[0];
188  }
189
190  @Override
191  int getLogReplication() {
192    return getPipeline().length;
193  }
194
195  @Override
196  protected boolean doCheckLogLowReplication() {
197    // not like FSHLog, AsyncFSOutput will fail immediately if there are errors writing to DNs, so
198    // typically there is no 'low replication' state, only a 'broken' state.
199    AsyncFSOutput output = this.fsOut;
200    return output != null && output.isBroken();
201  }
202
203  @Override
204  protected AsyncWriter createCombinedWriter(AsyncWriter localWriter, AsyncWriter remoteWriter) {
205    // put remote writer first as usually it will cost more time to finish, so we write to it first
206    return CombinedAsyncWriter.create(remoteWriter, localWriter);
207  }
208}