001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 021 022import java.io.IOException; 023import java.io.InterruptedIOException; 024import java.io.OutputStream; 025import java.nio.ByteBuffer; 026import java.util.concurrent.CompletableFuture; 027import java.util.concurrent.ExecutionException; 028import java.util.concurrent.TimeUnit; 029import java.util.concurrent.TimeoutException; 030import java.util.function.Consumer; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.Cell; 034import org.apache.hadoop.hbase.HBaseConfiguration; 035import org.apache.hadoop.hbase.io.ByteBufferWriter; 036import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput; 037import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutputHelper; 038import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; 039import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; 040import org.apache.hadoop.hbase.wal.AbstractWALRoller; 041import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; 042import org.apache.hadoop.hbase.wal.WAL.Entry; 043import org.apache.yetus.audience.InterfaceAudience; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 048import org.apache.hbase.thirdparty.io.netty.channel.Channel; 049import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 050 051import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader; 052import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer; 053 054/** 055 * AsyncWriter for protobuf-based WAL. 056 */ 057@InterfaceAudience.Private 058public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter 059 implements AsyncFSWALProvider.AsyncWriter { 060 061 private static final Logger LOG = LoggerFactory.getLogger(AsyncProtobufLogWriter.class); 062 063 private final EventLoopGroup eventLoopGroup; 064 065 private final Class<? extends Channel> channelClass; 066 067 private volatile AsyncFSOutput output; 068 /** 069 * Save {@link AsyncFSOutput#getSyncedLength()} when {@link #output} is closed. 070 */ 071 private volatile long finalSyncedLength = -1; 072 073 private static final class OutputStreamWrapper extends OutputStream implements ByteBufferWriter { 074 075 private final AsyncFSOutput out; 076 077 private final byte[] oneByteBuf = new byte[1]; 078 079 @Override 080 public void write(int b) throws IOException { 081 oneByteBuf[0] = (byte) b; 082 write(oneByteBuf); 083 } 084 085 public OutputStreamWrapper(AsyncFSOutput out) { 086 this.out = out; 087 } 088 089 @Override 090 public void write(ByteBuffer b, int off, int len) throws IOException { 091 ByteBuffer bb = b.duplicate(); 092 bb.position(off); 093 bb.limit(off + len); 094 out.write(bb); 095 } 096 097 @Override 098 public void writeInt(int i) throws IOException { 099 out.writeInt(i); 100 } 101 102 @Override 103 public void write(byte[] b, int off, int len) throws IOException { 104 out.write(b, off, len); 105 } 106 107 @Override 108 public void close() throws IOException { 109 out.close(); 110 } 111 } 112 113 private OutputStream asyncOutputWrapper; 114 private long waitTimeout; 115 116 public AsyncProtobufLogWriter(EventLoopGroup eventLoopGroup, 117 Class<? extends Channel> channelClass) { 118 this.eventLoopGroup = eventLoopGroup; 119 this.channelClass = channelClass; 120 // Reuse WAL_ROLL_WAIT_TIMEOUT here to avoid an infinite wait if somehow a wait on a future 121 // never completes. The objective is the same. We want to propagate an exception to trigger 122 // an abort if we seem to be hung. 123 if (this.conf == null) { 124 this.conf = HBaseConfiguration.create(); 125 } 126 this.waitTimeout = this.conf.getLong(AbstractWALRoller.WAL_ROLL_WAIT_TIMEOUT, 127 AbstractWALRoller.DEFAULT_WAL_ROLL_WAIT_TIMEOUT); 128 } 129 130 /* 131 * @return class name which is recognized by hbase-1.x to avoid ProtobufLogReader throwing error: 132 * IOException: Got unknown writer class: AsyncProtobufLogWriter 133 */ 134 @Override 135 protected String getWriterClassName() { 136 return "ProtobufLogWriter"; 137 } 138 139 @Override 140 public void append(Entry entry) { 141 int buffered = output.buffered(); 142 try { 143 entry.getKey().getBuilder(compressor).setFollowingKvCount(entry.getEdit().size()).build() 144 .writeDelimitedTo(asyncOutputWrapper); 145 } catch (IOException e) { 146 throw new AssertionError("should not happen", e); 147 } 148 try { 149 for (Cell cell : entry.getEdit().getCells()) { 150 cellEncoder.write(cell); 151 } 152 } catch (IOException e) { 153 throw new AssertionError("should not happen", e); 154 } 155 length.addAndGet(output.buffered() - buffered); 156 } 157 158 @Override 159 public CompletableFuture<Long> sync(boolean forceSync) { 160 return output.flush(forceSync); 161 } 162 163 @Override 164 public synchronized void close() throws IOException { 165 if (this.output == null) { 166 return; 167 } 168 try { 169 writeWALTrailer(); 170 output.close(); 171 } catch (Exception e) { 172 LOG.warn("normal close failed, try recover", e); 173 output.recoverAndClose(null); 174 } 175 /** 176 * We have to call {@link AsyncFSOutput#getSyncedLength()} after {@link AsyncFSOutput#close()} 177 * to get the final length synced to underlying filesystem because {@link AsyncFSOutput#close()} 178 * may also flush some data to underlying filesystem. 179 */ 180 this.finalSyncedLength = this.output.getSyncedLength(); 181 this.output = null; 182 } 183 184 public AsyncFSOutput getOutput() { 185 return this.output; 186 } 187 188 @Override 189 protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize, 190 short replication, long blockSize, StreamSlowMonitor monitor) 191 throws IOException, StreamLacksCapabilityException { 192 this.output = AsyncFSOutputHelper.createOutput(fs, path, overwritable, false, replication, 193 blockSize, eventLoopGroup, channelClass, monitor); 194 this.asyncOutputWrapper = new OutputStreamWrapper(output); 195 } 196 197 @Override 198 protected void closeOutputIfNecessary() { 199 if (this.output != null) { 200 try { 201 this.output.close(); 202 } catch (IOException e) { 203 LOG.warn("Close output failed", e); 204 } 205 } 206 } 207 208 private long writeWALMetadata(Consumer<CompletableFuture<Long>> action) throws IOException { 209 CompletableFuture<Long> future = new CompletableFuture<>(); 210 action.accept(future); 211 try { 212 return future.get(waitTimeout, TimeUnit.MILLISECONDS).longValue(); 213 } catch (InterruptedException e) { 214 InterruptedIOException ioe = new InterruptedIOException(); 215 ioe.initCause(e); 216 throw ioe; 217 } catch (ExecutionException | TimeoutException e) { 218 Throwables.propagateIfPossible(e.getCause(), IOException.class); 219 throw new RuntimeException(e.getCause()); 220 } 221 } 222 223 @Override 224 protected long writeMagicAndWALHeader(byte[] magic, WALHeader header) throws IOException { 225 return writeWALMetadata(future -> { 226 output.write(magic); 227 try { 228 header.writeDelimitedTo(asyncOutputWrapper); 229 } catch (IOException e) { 230 // should not happen 231 throw new AssertionError(e); 232 } 233 addListener(output.flush(false), (len, error) -> { 234 if (error != null) { 235 future.completeExceptionally(error); 236 } else { 237 future.complete(len); 238 } 239 }); 240 }); 241 } 242 243 @Override 244 protected long writeWALTrailerAndMagic(WALTrailer trailer, byte[] magic) throws IOException { 245 return writeWALMetadata(future -> { 246 try { 247 trailer.writeTo(asyncOutputWrapper); 248 } catch (IOException e) { 249 // should not happen 250 throw new AssertionError(e); 251 } 252 output.writeInt(trailer.getSerializedSize()); 253 output.write(magic); 254 addListener(output.flush(false), (len, error) -> { 255 if (error != null) { 256 future.completeExceptionally(error); 257 } else { 258 future.complete(len); 259 } 260 }); 261 }); 262 } 263 264 @Override 265 protected OutputStream getOutputStreamForCellEncoder() { 266 return asyncOutputWrapper; 267 } 268 269 @Override 270 public long getSyncedLength() { 271 /** 272 * The statement "this.output = null;" in {@link AsyncProtobufLogWriter#close} is a sync point, 273 * if output is null, then finalSyncedLength must set, so we can return finalSyncedLength, else 274 * we return output.getSyncedLength 275 */ 276 AsyncFSOutput outputToUse = this.output; 277 if (outputToUse == null) { 278 long finalSyncedLengthToUse = this.finalSyncedLength; 279 assert finalSyncedLengthToUse >= 0; 280 return finalSyncedLengthToUse; 281 } 282 return outputToUse.getSyncedLength(); 283 } 284}