001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 021 022import java.io.IOException; 023import java.io.InterruptedIOException; 024import java.io.OutputStream; 025import java.nio.ByteBuffer; 026import java.util.concurrent.CompletableFuture; 027import java.util.concurrent.ExecutionException; 028import java.util.concurrent.TimeUnit; 029import java.util.concurrent.TimeoutException; 030import java.util.function.Consumer; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.Cell; 034import org.apache.hadoop.hbase.ExtendedCell; 035import org.apache.hadoop.hbase.HBaseConfiguration; 036import org.apache.hadoop.hbase.exceptions.TimeoutIOException; 037import org.apache.hadoop.hbase.io.ByteBufferWriter; 038import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput; 039import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutputHelper; 040import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; 041import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; 042import org.apache.hadoop.hbase.wal.AbstractWALRoller; 043import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; 044import org.apache.hadoop.hbase.wal.WAL.Entry; 045import org.apache.yetus.audience.InterfaceAudience; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 050import org.apache.hbase.thirdparty.io.netty.channel.Channel; 051import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 052 053import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader; 054import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer; 055 056/** 057 * AsyncWriter for protobuf-based WAL. 058 */ 059@InterfaceAudience.Private 060public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter 061 implements AsyncFSWALProvider.AsyncWriter { 062 063 private static final Logger LOG = LoggerFactory.getLogger(AsyncProtobufLogWriter.class); 064 065 private final EventLoopGroup eventLoopGroup; 066 067 private final Class<? extends Channel> channelClass; 068 069 private volatile AsyncFSOutput output; 070 /** 071 * Save {@link AsyncFSOutput#getSyncedLength()} when {@link #output} is closed. 072 */ 073 private volatile long finalSyncedLength = -1; 074 075 private static final class OutputStreamWrapper extends OutputStream implements ByteBufferWriter { 076 077 private final AsyncFSOutput out; 078 079 private final byte[] oneByteBuf = new byte[1]; 080 081 @Override 082 public void write(int b) throws IOException { 083 oneByteBuf[0] = (byte) b; 084 write(oneByteBuf); 085 } 086 087 public OutputStreamWrapper(AsyncFSOutput out) { 088 this.out = out; 089 } 090 091 @Override 092 public void write(ByteBuffer b, int off, int len) throws IOException { 093 ByteBuffer bb = b.duplicate(); 094 bb.position(off); 095 bb.limit(off + len); 096 out.write(bb); 097 } 098 099 @Override 100 public void writeInt(int i) throws IOException { 101 out.writeInt(i); 102 } 103 104 @Override 105 public void write(byte[] b, int off, int len) throws IOException { 106 out.write(b, off, len); 107 } 108 109 @Override 110 public void close() throws IOException { 111 out.close(); 112 } 113 } 114 115 private OutputStream asyncOutputWrapper; 116 private long waitTimeout; 117 118 public AsyncProtobufLogWriter(EventLoopGroup eventLoopGroup, 119 Class<? extends Channel> channelClass) { 120 this.eventLoopGroup = eventLoopGroup; 121 this.channelClass = channelClass; 122 // Reuse WAL_ROLL_WAIT_TIMEOUT here to avoid an infinite wait if somehow a wait on a future 123 // never completes. The objective is the same. We want to propagate an exception to trigger 124 // an abort if we seem to be hung. 125 if (this.conf == null) { 126 this.conf = HBaseConfiguration.create(); 127 } 128 this.waitTimeout = this.conf.getLong(AbstractWALRoller.WAL_ROLL_WAIT_TIMEOUT, 129 AbstractWALRoller.DEFAULT_WAL_ROLL_WAIT_TIMEOUT); 130 } 131 132 @Override 133 public void append(Entry entry) { 134 int buffered = output.buffered(); 135 try { 136 entry.getKey().getBuilder(compressor).setFollowingKvCount(entry.getEdit().size()).build() 137 .writeDelimitedTo(asyncOutputWrapper); 138 } catch (IOException e) { 139 throw new AssertionError("should not happen", e); 140 } 141 try { 142 for (Cell cell : entry.getEdit().getCells()) { 143 cellEncoder.write((ExtendedCell) cell); 144 } 145 } catch (IOException e) { 146 throw new AssertionError("should not happen", e); 147 } 148 length.addAndGet(output.buffered() - buffered); 149 } 150 151 @Override 152 public CompletableFuture<Long> sync(boolean forceSync) { 153 return output.flush(forceSync); 154 } 155 156 @Override 157 public synchronized void close() throws IOException { 158 if (this.output == null) { 159 return; 160 } 161 try { 162 writeWALTrailer(); 163 output.close(); 164 } catch (Exception e) { 165 LOG.warn("normal close failed, try recover", e); 166 output.recoverAndClose(null); 167 } 168 /** 169 * We have to call {@link AsyncFSOutput#getSyncedLength()} after {@link AsyncFSOutput#close()} 170 * to get the final length synced to underlying filesystem because {@link AsyncFSOutput#close()} 171 * may also flush some data to underlying filesystem. 172 */ 173 this.finalSyncedLength = this.output.getSyncedLength(); 174 this.output = null; 175 } 176 177 public AsyncFSOutput getOutput() { 178 return this.output; 179 } 180 181 @Override 182 protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize, 183 short replication, long blockSize, StreamSlowMonitor monitor, boolean noLocalWrite) 184 throws IOException, StreamLacksCapabilityException { 185 this.output = AsyncFSOutputHelper.createOutput(fs, path, overwritable, false, replication, 186 blockSize, eventLoopGroup, channelClass, monitor, noLocalWrite); 187 this.asyncOutputWrapper = new OutputStreamWrapper(output); 188 } 189 190 @Override 191 protected void closeOutputIfNecessary() { 192 if (this.output != null) { 193 try { 194 this.output.close(); 195 } catch (IOException e) { 196 LOG.warn("Close output failed", e); 197 } 198 } 199 } 200 201 private long writeWALMetadata(Consumer<CompletableFuture<Long>> action) throws IOException { 202 CompletableFuture<Long> future = new CompletableFuture<>(); 203 action.accept(future); 204 try { 205 return future.get(waitTimeout, TimeUnit.MILLISECONDS).longValue(); 206 } catch (InterruptedException e) { 207 InterruptedIOException ioe = new InterruptedIOException(); 208 ioe.initCause(e); 209 throw ioe; 210 } catch (ExecutionException e) { 211 Throwables.propagateIfPossible(e.getCause(), IOException.class); 212 throw new RuntimeException(e.getCause()); 213 } catch (TimeoutException e) { 214 throw new TimeoutIOException(e); 215 } 216 } 217 218 @Override 219 protected long writeMagicAndWALHeader(byte[] magic, WALHeader header) throws IOException { 220 return writeWALMetadata(future -> { 221 output.write(magic); 222 try { 223 header.writeDelimitedTo(asyncOutputWrapper); 224 } catch (IOException e) { 225 // should not happen 226 throw new AssertionError(e); 227 } 228 addListener(output.flush(false), (len, error) -> { 229 if (error != null) { 230 future.completeExceptionally(error); 231 } else { 232 future.complete(len); 233 } 234 }); 235 }); 236 } 237 238 @Override 239 protected long writeWALTrailerAndMagic(WALTrailer trailer, byte[] magic) throws IOException { 240 return writeWALMetadata(future -> { 241 try { 242 trailer.writeTo(asyncOutputWrapper); 243 } catch (IOException e) { 244 // should not happen 245 throw new AssertionError(e); 246 } 247 output.writeInt(trailer.getSerializedSize()); 248 output.write(magic); 249 addListener(output.flush(false), (len, error) -> { 250 if (error != null) { 251 future.completeExceptionally(error); 252 } else { 253 future.complete(len); 254 } 255 }); 256 }); 257 } 258 259 @Override 260 protected OutputStream getOutputStreamForCellEncoder() { 261 return asyncOutputWrapper; 262 } 263 264 @Override 265 public long getSyncedLength() { 266 /** 267 * The statement "this.output = null;" in {@link AsyncProtobufLogWriter#close} is a sync point, 268 * if output is null, then finalSyncedLength must set, so we can return finalSyncedLength, else 269 * we return output.getSyncedLength 270 */ 271 AsyncFSOutput outputToUse = this.output; 272 if (outputToUse == null) { 273 long finalSyncedLengthToUse = this.finalSyncedLength; 274 assert finalSyncedLengthToUse >= 0; 275 return finalSyncedLengthToUse; 276 } 277 return outputToUse.getSyncedLength(); 278 } 279}