001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 021 022import java.io.IOException; 023import java.io.InterruptedIOException; 024import java.io.OutputStream; 025import java.nio.ByteBuffer; 026import java.util.concurrent.CompletableFuture; 027import java.util.concurrent.ExecutionException; 028import java.util.function.Consumer; 029import org.apache.hadoop.fs.FileSystem; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.Cell; 032import org.apache.hadoop.hbase.io.ByteBufferWriter; 033import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput; 034import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutputHelper; 035import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; 036import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; 037import org.apache.hadoop.hbase.wal.WAL.Entry; 038import org.apache.yetus.audience.InterfaceAudience; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 043import org.apache.hbase.thirdparty.io.netty.channel.Channel; 044import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 045 046import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader; 047import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer; 048 049/** 050 * AsyncWriter for protobuf-based WAL. 051 */ 052@InterfaceAudience.Private 053public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter 054 implements AsyncFSWALProvider.AsyncWriter { 055 056 private static final Logger LOG = LoggerFactory.getLogger(AsyncProtobufLogWriter.class); 057 058 private final EventLoopGroup eventLoopGroup; 059 060 private final Class<? extends Channel> channelClass; 061 062 private AsyncFSOutput output; 063 064 private static final class OutputStreamWrapper extends OutputStream 065 implements ByteBufferWriter { 066 067 private final AsyncFSOutput out; 068 069 private final byte[] oneByteBuf = new byte[1]; 070 071 @Override 072 public void write(int b) throws IOException { 073 oneByteBuf[0] = (byte) b; 074 write(oneByteBuf); 075 } 076 077 public OutputStreamWrapper(AsyncFSOutput out) { 078 this.out = out; 079 } 080 081 @Override 082 public void write(ByteBuffer b, int off, int len) throws IOException { 083 ByteBuffer bb = b.duplicate(); 084 bb.position(off); 085 bb.limit(off + len); 086 out.write(bb); 087 } 088 089 @Override 090 public void writeInt(int i) throws IOException { 091 out.writeInt(i); 092 } 093 094 @Override 095 public void write(byte[] b, int off, int len) throws IOException { 096 out.write(b, off, len); 097 } 098 099 @Override 100 public void close() throws IOException { 101 out.close(); 102 } 103 } 104 105 private OutputStream asyncOutputWrapper; 106 107 public AsyncProtobufLogWriter(EventLoopGroup eventLoopGroup, 108 Class<? extends Channel> channelClass) { 109 this.eventLoopGroup = eventLoopGroup; 110 this.channelClass = channelClass; 111 } 112 113 /* 114 * @return class name which is recognized by hbase-1.x to avoid ProtobufLogReader throwing error: 115 * IOException: Got unknown writer class: AsyncProtobufLogWriter 116 */ 117 @Override 118 protected String getWriterClassName() { 119 return "ProtobufLogWriter"; 120 } 121 122 @Override 123 public void append(Entry entry) { 124 int buffered = output.buffered(); 125 entry.setCompressionContext(compressionContext); 126 try { 127 entry.getKey(). 128 getBuilder(compressor).setFollowingKvCount(entry.getEdit().size()).build() 129 .writeDelimitedTo(asyncOutputWrapper); 130 } catch (IOException e) { 131 throw new AssertionError("should not happen", e); 132 } 133 try { 134 for (Cell cell : entry.getEdit().getCells()) { 135 cellEncoder.write(cell); 136 } 137 } catch (IOException e) { 138 throw new AssertionError("should not happen", e); 139 } 140 length.addAndGet(output.buffered() - buffered); 141 } 142 143 @Override 144 public CompletableFuture<Long> sync() { 145 return output.flush(false); 146 } 147 148 @Override 149 public synchronized void close() throws IOException { 150 if (this.output == null) { 151 return; 152 } 153 try { 154 writeWALTrailer(); 155 output.close(); 156 } catch (Exception e) { 157 LOG.warn("normal close failed, try recover", e); 158 output.recoverAndClose(null); 159 } 160 this.output = null; 161 } 162 163 public AsyncFSOutput getOutput() { 164 return this.output; 165 } 166 167 @Override 168 protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize, 169 short replication, long blockSize) throws IOException, StreamLacksCapabilityException { 170 this.output = AsyncFSOutputHelper.createOutput(fs, path, overwritable, false, replication, 171 blockSize, eventLoopGroup, channelClass); 172 this.asyncOutputWrapper = new OutputStreamWrapper(output); 173 } 174 175 private long write(Consumer<CompletableFuture<Long>> action) throws IOException { 176 CompletableFuture<Long> future = new CompletableFuture<>(); 177 action.accept(future); 178 try { 179 return future.get().longValue(); 180 } catch (InterruptedException e) { 181 InterruptedIOException ioe = new InterruptedIOException(); 182 ioe.initCause(e); 183 throw ioe; 184 } catch (ExecutionException e) { 185 Throwables.propagateIfPossible(e.getCause(), IOException.class); 186 throw new RuntimeException(e.getCause()); 187 } 188 } 189 190 @Override 191 protected long writeMagicAndWALHeader(byte[] magic, WALHeader header) throws IOException { 192 return write(future -> { 193 output.write(magic); 194 try { 195 header.writeDelimitedTo(asyncOutputWrapper); 196 } catch (IOException e) { 197 // should not happen 198 throw new AssertionError(e); 199 } 200 addListener(output.flush(false), (len, error) -> { 201 if (error != null) { 202 future.completeExceptionally(error); 203 } else { 204 future.complete(len); 205 } 206 }); 207 }); 208 } 209 210 @Override 211 protected long writeWALTrailerAndMagic(WALTrailer trailer, byte[] magic) throws IOException { 212 return write(future -> { 213 try { 214 trailer.writeTo(asyncOutputWrapper); 215 } catch (IOException e) { 216 // should not happen 217 throw new AssertionError(e); 218 } 219 output.writeInt(trailer.getSerializedSize()); 220 output.write(magic); 221 addListener(output.flush(false), (len, error) -> { 222 if (error != null) { 223 future.completeExceptionally(error); 224 } else { 225 future.complete(len); 226 } 227 }); 228 }); 229 } 230 231 @Override 232 protected OutputStream getOutputStreamForCellEncoder() { 233 return asyncOutputWrapper; 234 } 235}