001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 021 022import java.io.IOException; 023import java.io.InterruptedIOException; 024import java.io.OutputStream; 025import java.nio.ByteBuffer; 026import java.util.concurrent.CompletableFuture; 027import java.util.concurrent.ExecutionException; 028import java.util.function.Consumer; 029import org.apache.hadoop.fs.FileSystem; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.Cell; 032import org.apache.hadoop.hbase.io.ByteBufferWriter; 033import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput; 034import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutputHelper; 035import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; 036import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; 037import org.apache.hadoop.hbase.wal.WAL.Entry; 038import org.apache.yetus.audience.InterfaceAudience; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 043import org.apache.hbase.thirdparty.io.netty.channel.Channel; 044import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 045 046import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader; 047import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer; 048 049/** 050 * AsyncWriter for protobuf-based WAL. 051 */ 052@InterfaceAudience.Private 053public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter 054 implements AsyncFSWALProvider.AsyncWriter { 055 056 private static final Logger LOG = LoggerFactory.getLogger(AsyncProtobufLogWriter.class); 057 058 private final EventLoopGroup eventLoopGroup; 059 060 private final Class<? extends Channel> channelClass; 061 062 private volatile AsyncFSOutput output; 063 /** 064 * Save {@link AsyncFSOutput#getSyncedLength()} when {@link #output} is closed. 065 */ 066 private volatile long finalSyncedLength = -1; 067 068 private static final class OutputStreamWrapper extends OutputStream 069 implements ByteBufferWriter { 070 071 private final AsyncFSOutput out; 072 073 private final byte[] oneByteBuf = new byte[1]; 074 075 @Override 076 public void write(int b) throws IOException { 077 oneByteBuf[0] = (byte) b; 078 write(oneByteBuf); 079 } 080 081 public OutputStreamWrapper(AsyncFSOutput out) { 082 this.out = out; 083 } 084 085 @Override 086 public void write(ByteBuffer b, int off, int len) throws IOException { 087 ByteBuffer bb = b.duplicate(); 088 bb.position(off); 089 bb.limit(off + len); 090 out.write(bb); 091 } 092 093 @Override 094 public void writeInt(int i) throws IOException { 095 out.writeInt(i); 096 } 097 098 @Override 099 public void write(byte[] b, int off, int len) throws IOException { 100 out.write(b, off, len); 101 } 102 103 @Override 104 public void close() throws IOException { 105 out.close(); 106 } 107 } 108 109 private OutputStream asyncOutputWrapper; 110 111 public AsyncProtobufLogWriter(EventLoopGroup eventLoopGroup, 112 Class<? extends Channel> channelClass) { 113 this.eventLoopGroup = eventLoopGroup; 114 this.channelClass = channelClass; 115 } 116 117 /* 118 * @return class name which is recognized by hbase-1.x to avoid ProtobufLogReader throwing error: 119 * IOException: Got unknown writer class: AsyncProtobufLogWriter 120 */ 121 @Override 122 protected String getWriterClassName() { 123 return "ProtobufLogWriter"; 124 } 125 126 @Override 127 public void append(Entry entry) { 128 int buffered = output.buffered(); 129 try { 130 entry.getKey(). 131 getBuilder(compressor).setFollowingKvCount(entry.getEdit().size()).build() 132 .writeDelimitedTo(asyncOutputWrapper); 133 } catch (IOException e) { 134 throw new AssertionError("should not happen", e); 135 } 136 try { 137 for (Cell cell : entry.getEdit().getCells()) { 138 cellEncoder.write(cell); 139 } 140 } catch (IOException e) { 141 throw new AssertionError("should not happen", e); 142 } 143 length.addAndGet(output.buffered() - buffered); 144 } 145 146 @Override 147 public CompletableFuture<Long> sync(boolean forceSync) { 148 return output.flush(forceSync); 149 } 150 151 @Override 152 public synchronized void close() throws IOException { 153 if (this.output == null) { 154 return; 155 } 156 try { 157 writeWALTrailer(); 158 output.close(); 159 } catch (Exception e) { 160 LOG.warn("normal close failed, try recover", e); 161 output.recoverAndClose(null); 162 } 163 /** 164 * We have to call {@link AsyncFSOutput#getSyncedLength()} 165 * after {@link AsyncFSOutput#close()} to get the final length 166 * synced to underlying filesystem because {@link AsyncFSOutput#close()} 167 * may also flush some data to underlying filesystem. 168 */ 169 this.finalSyncedLength = this.output.getSyncedLength(); 170 this.output = null; 171 } 172 173 public AsyncFSOutput getOutput() { 174 return this.output; 175 } 176 177 @Override 178 protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize, 179 short replication, long blockSize) throws IOException, StreamLacksCapabilityException { 180 this.output = AsyncFSOutputHelper.createOutput(fs, path, overwritable, false, replication, 181 blockSize, eventLoopGroup, channelClass); 182 this.asyncOutputWrapper = new OutputStreamWrapper(output); 183 } 184 185 private long write(Consumer<CompletableFuture<Long>> action) throws IOException { 186 CompletableFuture<Long> future = new CompletableFuture<>(); 187 action.accept(future); 188 try { 189 return future.get().longValue(); 190 } catch (InterruptedException e) { 191 InterruptedIOException ioe = new InterruptedIOException(); 192 ioe.initCause(e); 193 throw ioe; 194 } catch (ExecutionException e) { 195 Throwables.propagateIfPossible(e.getCause(), IOException.class); 196 throw new RuntimeException(e.getCause()); 197 } 198 } 199 200 @Override 201 protected long writeMagicAndWALHeader(byte[] magic, WALHeader header) throws IOException { 202 return write(future -> { 203 output.write(magic); 204 try { 205 header.writeDelimitedTo(asyncOutputWrapper); 206 } catch (IOException e) { 207 // should not happen 208 throw new AssertionError(e); 209 } 210 addListener(output.flush(false), (len, error) -> { 211 if (error != null) { 212 future.completeExceptionally(error); 213 } else { 214 future.complete(len); 215 } 216 }); 217 }); 218 } 219 220 @Override 221 protected long writeWALTrailerAndMagic(WALTrailer trailer, byte[] magic) throws IOException { 222 return write(future -> { 223 try { 224 trailer.writeTo(asyncOutputWrapper); 225 } catch (IOException e) { 226 // should not happen 227 throw new AssertionError(e); 228 } 229 output.writeInt(trailer.getSerializedSize()); 230 output.write(magic); 231 addListener(output.flush(false), (len, error) -> { 232 if (error != null) { 233 future.completeExceptionally(error); 234 } else { 235 future.complete(len); 236 } 237 }); 238 }); 239 } 240 241 @Override 242 protected OutputStream getOutputStreamForCellEncoder() { 243 return asyncOutputWrapper; 244 } 245 246 @Override 247 public long getSyncedLength() { 248 /** 249 * The statement "this.output = null;" in {@link AsyncProtobufLogWriter#close} 250 * is a sync point, if output is null, then finalSyncedLength must set, 251 * so we can return finalSyncedLength, else we return output.getSyncedLength 252 */ 253 AsyncFSOutput outputToUse = this.output; 254 if(outputToUse == null) { 255 long finalSyncedLengthToUse = this.finalSyncedLength; 256 assert finalSyncedLengthToUse >= 0; 257 return finalSyncedLengthToUse; 258 } 259 return outputToUse.getSyncedLength(); 260 } 261}