001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import java.io.IOException; 021import java.lang.reflect.Field; 022import java.util.List; 023import java.util.Queue; 024import java.util.concurrent.CompletableFuture; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.fs.FileSystem; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.Abortable; 029import org.apache.hadoop.hbase.HBaseInterfaceAudience; 030import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput; 031import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; 032import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; 033import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter; 034import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 035import org.apache.yetus.audience.InterfaceAudience; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039import org.apache.hbase.thirdparty.io.netty.channel.Channel; 040import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 041import org.apache.hbase.thirdparty.io.netty.util.concurrent.SingleThreadEventExecutor; 042 043/** 044 * An asynchronous implementation of FSWAL. 045 * <p> 046 * Here 'waitingConsumePayloads' acts as the RingBuffer in FSHLog. 047 * <p> 048 * For append, we process it as follow: 049 * <ol> 050 * <li>In the caller thread(typically, in the rpc handler thread): 051 * <ol> 052 * <li>Insert the entry into 'waitingConsumePayloads'. Use ringbuffer sequence as txid.</li> 053 * <li>Schedule the consumer task if needed. See {@link #shouldScheduleConsumer()} for more details. 054 * </li> 055 * </ol> 056 * </li> 057 * <li>In the consumer task(executed in a single threaded thread pool) 058 * <ol> 059 * <li>Poll the entry from {@link #waitingConsumePayloads} and insert it into 060 * {@link #toWriteAppends}</li> 061 * <li>Poll the entry from {@link #toWriteAppends}, append it to the AsyncWriter, and insert it into 062 * {@link #unackedAppends}</li> 063 * <li>If the buffered size reaches {@link #batchSize}, or there is a sync request, then we call 064 * sync on the AsyncWriter.</li> 065 * <li>In the callback methods: 066 * <ul> 067 * <li>If succeeded, poll the entry from {@link #unackedAppends} and drop it.</li> 068 * <li>If failed, add all the entries in {@link #unackedAppends} back to {@link #toWriteAppends} and 069 * wait for writing them again.</li> 070 * </ul> 071 * </li> 072 * </ol> 073 * </li> 074 * </ol> 075 * For sync, the processing stages are almost same. And different from FSHLog, we will open a new 076 * writer and rewrite unacked entries to the new writer and sync again if we hit a sync error. 077 * <p> 078 * Here we only describe the logic of doReplaceWriter. The main logic of rollWriter is same with 079 * FSHLog.<br> 080 * For a normal roll request(for example, we have reached the log roll size): 081 * <ol> 082 * <li>In the log roller thread, we will set {@link #waitingRoll} to true and 083 * {@link #readyForRolling} to false, and then wait on {@link #readyForRolling}(see 084 * {@link #waitForSafePoint()}).</li> 085 * <li>In the consumer thread, we will stop polling entries from {@link #waitingConsumePayloads} if 086 * {@link #waitingRoll} is true, and also stop writing the entries in {@link #toWriteAppends} out. 087 * </li> 088 * <li>If there are unflush data in the writer, sync them.</li> 089 * <li>When all out-going sync request is finished, i.e, the {@link #unackedAppends} is empty, 090 * signal the {@link #readyForRollingCond}.</li> 091 * <li>Back to the log roller thread, now we can confirm that there are no out-going entries, i.e., 092 * we reach a safe point. So it is safe to replace old writer with new writer now.</li> 093 * <li>Set {@link #writerBroken} and {@link #waitingRoll} to false.</li> 094 * <li>Schedule the consumer task.</li> 095 * <li>Schedule a background task to close the old writer.</li> 096 * </ol> 097 * For a broken writer roll request, the only difference is that we can bypass the wait for safe 098 * point stage. 099 */ 100@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 101public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> { 102 103 private static final Logger LOG = LoggerFactory.getLogger(AsyncFSWAL.class); 104 105 public static final String WAL_BATCH_SIZE = "hbase.wal.batch.size"; 106 public static final long DEFAULT_WAL_BATCH_SIZE = 64L * 1024; 107 108 public static final String ASYNC_WAL_USE_SHARED_EVENT_LOOP = 109 "hbase.wal.async.use-shared-event-loop"; 110 public static final boolean DEFAULT_ASYNC_WAL_USE_SHARED_EVENT_LOOP = false; 111 112 public static final String ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS = 113 "hbase.wal.async.wait.on.shutdown.seconds"; 114 public static final int DEFAULT_ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS = 5; 115 116 private final EventLoopGroup eventLoopGroup; 117 118 private final Class<? extends Channel> channelClass; 119 120 private volatile AsyncFSOutput fsOut; 121 122 private final StreamSlowMonitor streamSlowMonitor; 123 124 public AsyncFSWAL(FileSystem fs, Abortable abortable, Path rootDir, String logDir, 125 String archiveDir, Configuration conf, List<WALActionsListener> listeners, 126 boolean failIfWALExists, String prefix, String suffix, FileSystem remoteFs, Path remoteWALDir, 127 EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass, StreamSlowMonitor monitor) 128 throws FailedLogCloseException, IOException { 129 super(fs, abortable, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, 130 suffix, remoteFs, remoteWALDir); 131 this.eventLoopGroup = eventLoopGroup; 132 this.channelClass = channelClass; 133 this.streamSlowMonitor = monitor; 134 if (conf.getBoolean(ASYNC_WAL_USE_SHARED_EVENT_LOOP, DEFAULT_ASYNC_WAL_USE_SHARED_EVENT_LOOP)) { 135 this.consumeExecutor = eventLoopGroup.next(); 136 this.shouldShutDownConsumeExecutorWhenClose = false; 137 if (consumeExecutor instanceof SingleThreadEventExecutor) { 138 try { 139 Field field = SingleThreadEventExecutor.class.getDeclaredField("taskQueue"); 140 field.setAccessible(true); 141 Queue<?> queue = (Queue<?>) field.get(consumeExecutor); 142 this.hasConsumerTask = () -> queue.peek() == consumer; 143 } catch (Exception e) { 144 LOG.warn("Can not get task queue of " + consumeExecutor 145 + ", this is not necessary, just give up", e); 146 this.hasConsumerTask = () -> false; 147 } 148 } else { 149 this.hasConsumerTask = () -> false; 150 } 151 } else { 152 this.createSingleThreadPoolConsumeExecutor("AsyncFSWAL", rootDir, prefix); 153 } 154 155 this.setWaitOnShutdownInSeconds(conf.getInt(ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS, 156 DEFAULT_ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS), ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS); 157 } 158 159 @Override 160 protected CompletableFuture<Long> doWriterSync(AsyncWriter writer, boolean shouldUseHsync, 161 long txidWhenSyn) { 162 return writer.sync(shouldUseHsync); 163 } 164 165 protected final AsyncWriter createAsyncWriter(FileSystem fs, Path path) throws IOException { 166 return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, false, this.blocksize, 167 eventLoopGroup, channelClass, streamSlowMonitor); 168 } 169 170 @Override 171 protected AsyncWriter createWriterInstance(FileSystem fs, Path path) throws IOException { 172 return createAsyncWriter(fs, path); 173 } 174 175 @Override 176 protected void onWriterReplaced(AsyncWriter nextWriter) { 177 if (nextWriter instanceof AsyncProtobufLogWriter) { 178 this.fsOut = ((AsyncProtobufLogWriter) nextWriter).getOutput(); 179 } 180 } 181 182 @Override 183 protected void doAppend(AsyncWriter writer, FSWALEntry entry) { 184 writer.append(entry); 185 } 186 187 @Override 188 DatanodeInfo[] getPipeline() { 189 AsyncFSOutput output = this.fsOut; 190 return output != null ? output.getPipeline() : new DatanodeInfo[0]; 191 } 192 193 @Override 194 int getLogReplication() { 195 return getPipeline().length; 196 } 197 198 @Override 199 protected boolean doCheckLogLowReplication() { 200 // not like FSHLog, AsyncFSOutput will fail immediately if there are errors writing to DNs, so 201 // typically there is no 'low replication' state, only a 'broken' state. 202 AsyncFSOutput output = this.fsOut; 203 return output != null && output.isBroken(); 204 } 205 206 @Override 207 protected AsyncWriter createCombinedWriter(AsyncWriter localWriter, AsyncWriter remoteWriter) { 208 // put remote writer first as usually it will cost more time to finish, so we write to it first 209 return CombinedAsyncWriter.create(remoteWriter, localWriter); 210 } 211}