001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import java.io.IOException; 021import java.lang.reflect.Field; 022import java.util.List; 023import java.util.Queue; 024import java.util.concurrent.CompletableFuture; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.fs.FileSystem; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.Abortable; 029import org.apache.hadoop.hbase.HBaseInterfaceAudience; 030import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput; 031import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; 032import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; 033import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter; 034import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 035import org.apache.yetus.audience.InterfaceAudience; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039import org.apache.hbase.thirdparty.io.netty.channel.Channel; 040import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 041import org.apache.hbase.thirdparty.io.netty.util.concurrent.SingleThreadEventExecutor; 042 043/** 044 * An asynchronous implementation of FSWAL. 045 * <p> 046 * Here 'waitingConsumePayloads' acts as the RingBuffer in FSHLog. 047 * <p> 048 * For append, we process it as follow: 049 * <ol> 050 * <li>In the caller thread(typically, in the rpc handler thread): 051 * <ol> 052 * <li>Insert the entry into 'waitingConsumePayloads'. Use ringbuffer sequence as txid.</li> 053 * <li>Schedule the consumer task if needed. See {@link #shouldScheduleConsumer()} for more details. 054 * </li> 055 * </ol> 056 * </li> 057 * <li>In the consumer task(executed in a single threaded thread pool) 058 * <ol> 059 * <li>Poll the entry from {@link #waitingConsumePayloads} and insert it into 060 * {@link #toWriteAppends}</li> 061 * <li>Poll the entry from {@link #toWriteAppends}, append it to the AsyncWriter, and insert it into 062 * {@link #unackedAppends}</li> 063 * <li>If the buffered size reaches {@link #batchSize}, or there is a sync request, then we call 064 * sync on the AsyncWriter.</li> 065 * <li>In the callback methods: 066 * <ul> 067 * <li>If succeeded, poll the entry from {@link #unackedAppends} and drop it.</li> 068 * <li>If failed, add all the entries in {@link #unackedAppends} back to {@link #toWriteAppends} and 069 * wait for writing them again.</li> 070 * </ul> 071 * </li> 072 * </ol> 073 * </li> 074 * </ol> 075 * For sync, the processing stages are almost same. And different from FSHLog, we will open a new 076 * writer and rewrite unacked entries to the new writer and sync again if we hit a sync error. 077 * <p> 078 * Here we only describe the logic of doReplaceWriter. The main logic of rollWriter is same with 079 * FSHLog.<br> 080 * For a normal roll request(for example, we have reached the log roll size): 081 * <ol> 082 * <li>In the log roller thread, we will set {@link #waitingRoll} to true and 083 * {@link #readyForRolling} to false, and then wait on {@link #readyForRolling}(see 084 * {@link #waitForSafePoint()}).</li> 085 * <li>In the consumer thread, we will stop polling entries from {@link #waitingConsumePayloads} if 086 * {@link #waitingRoll} is true, and also stop writing the entries in {@link #toWriteAppends} out. 087 * </li> 088 * <li>If there are unflush data in the writer, sync them.</li> 089 * <li>When all out-going sync request is finished, i.e, the {@link #unackedAppends} is empty, 090 * signal the {@link #readyForRollingCond}.</li> 091 * <li>Back to the log roller thread, now we can confirm that there are no out-going entries, i.e., 092 * we reach a safe point. So it is safe to replace old writer with new writer now.</li> 093 * <li>Set {@link #writerBroken} and {@link #waitingRoll} to false.</li> 094 * <li>Schedule the consumer task.</li> 095 * <li>Schedule a background task to close the old writer.</li> 096 * </ol> 097 * For a broken writer roll request, the only difference is that we can bypass the wait for safe 098 * point stage. 099 */ 100@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 101public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> { 102 103 private static final Logger LOG = LoggerFactory.getLogger(AsyncFSWAL.class); 104 105 public static final String ASYNC_WAL_USE_SHARED_EVENT_LOOP = 106 "hbase.wal.async.use-shared-event-loop"; 107 public static final boolean DEFAULT_ASYNC_WAL_USE_SHARED_EVENT_LOOP = false; 108 109 public static final String ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS = 110 "hbase.wal.async.wait.on.shutdown.seconds"; 111 public static final int DEFAULT_ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS = 5; 112 113 private final EventLoopGroup eventLoopGroup; 114 115 private final Class<? extends Channel> channelClass; 116 117 private volatile AsyncFSOutput fsOut; 118 119 private final StreamSlowMonitor streamSlowMonitor; 120 121 public AsyncFSWAL(FileSystem fs, Abortable abortable, Path rootDir, String logDir, 122 String archiveDir, Configuration conf, List<WALActionsListener> listeners, 123 boolean failIfWALExists, String prefix, String suffix, FileSystem remoteFs, Path remoteWALDir, 124 EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass, StreamSlowMonitor monitor) 125 throws FailedLogCloseException, IOException { 126 super(fs, abortable, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, 127 suffix, remoteFs, remoteWALDir); 128 this.eventLoopGroup = eventLoopGroup; 129 this.channelClass = channelClass; 130 this.streamSlowMonitor = monitor; 131 if (conf.getBoolean(ASYNC_WAL_USE_SHARED_EVENT_LOOP, DEFAULT_ASYNC_WAL_USE_SHARED_EVENT_LOOP)) { 132 this.consumeExecutor = eventLoopGroup.next(); 133 this.shouldShutDownConsumeExecutorWhenClose = false; 134 if (consumeExecutor instanceof SingleThreadEventExecutor) { 135 try { 136 Field field = SingleThreadEventExecutor.class.getDeclaredField("taskQueue"); 137 field.setAccessible(true); 138 Queue<?> queue = (Queue<?>) field.get(consumeExecutor); 139 this.hasConsumerTask = () -> queue.peek() == consumer; 140 } catch (Exception e) { 141 LOG.warn("Can not get task queue of " + consumeExecutor 142 + ", this is not necessary, just give up", e); 143 this.hasConsumerTask = () -> false; 144 } 145 } else { 146 this.hasConsumerTask = () -> false; 147 } 148 } else { 149 this.createSingleThreadPoolConsumeExecutor("AsyncFSWAL", rootDir, prefix); 150 } 151 152 this.setWaitOnShutdownInSeconds(conf.getInt(ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS, 153 DEFAULT_ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS), ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS); 154 } 155 156 @Override 157 protected CompletableFuture<Long> doWriterSync(AsyncWriter writer, boolean shouldUseHsync, 158 long txidWhenSyn) { 159 return writer.sync(shouldUseHsync); 160 } 161 162 protected final AsyncWriter createAsyncWriter(FileSystem fs, Path path) throws IOException { 163 return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, false, this.blocksize, 164 eventLoopGroup, channelClass, streamSlowMonitor); 165 } 166 167 @Override 168 protected AsyncWriter createWriterInstance(FileSystem fs, Path path) throws IOException { 169 return createAsyncWriter(fs, path); 170 } 171 172 @Override 173 protected void onWriterReplaced(AsyncWriter nextWriter) { 174 if (nextWriter instanceof AsyncProtobufLogWriter) { 175 this.fsOut = ((AsyncProtobufLogWriter) nextWriter).getOutput(); 176 } 177 } 178 179 @Override 180 protected void doAppend(AsyncWriter writer, FSWALEntry entry) { 181 writer.append(entry); 182 } 183 184 @Override 185 DatanodeInfo[] getPipeline() { 186 AsyncFSOutput output = this.fsOut; 187 return output != null ? output.getPipeline() : new DatanodeInfo[0]; 188 } 189 190 @Override 191 int getLogReplication() { 192 return getPipeline().length; 193 } 194 195 @Override 196 protected boolean doCheckLogLowReplication() { 197 // not like FSHLog, AsyncFSOutput will fail immediately if there are errors writing to DNs, so 198 // typically there is no 'low replication' state, only a 'broken' state. 199 AsyncFSOutput output = this.fsOut; 200 return output != null && output.isBroken(); 201 } 202 203 @Override 204 protected AsyncWriter createCombinedWriter(AsyncWriter localWriter, AsyncWriter remoteWriter) { 205 // put remote writer first as usually it will cost more time to finish, so we write to it first 206 return CombinedAsyncWriter.create(remoteWriter, localWriter); 207 } 208}