001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
021
022import java.io.IOException;
023import java.io.InterruptedIOException;
024import java.io.OutputStream;
025import java.nio.ByteBuffer;
026import java.util.concurrent.CompletableFuture;
027import java.util.concurrent.ExecutionException;
028import java.util.function.Consumer;
029import org.apache.hadoop.fs.FileSystem;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.Cell;
032import org.apache.hadoop.hbase.io.ByteBufferWriter;
033import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
034import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutputHelper;
035import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
036import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
037import org.apache.hadoop.hbase.wal.WAL.Entry;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
043import org.apache.hbase.thirdparty.io.netty.channel.Channel;
044import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
045
046import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
047import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;
048
049/**
050 * AsyncWriter for protobuf-based WAL.
051 */
052@InterfaceAudience.Private
053public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
054    implements AsyncFSWALProvider.AsyncWriter {
055
056  private static final Logger LOG = LoggerFactory.getLogger(AsyncProtobufLogWriter.class);
057
058  private final EventLoopGroup eventLoopGroup;
059
060  private final Class<? extends Channel> channelClass;
061
062  private AsyncFSOutput output;
063
064  private static final class OutputStreamWrapper extends OutputStream
065      implements ByteBufferWriter {
066
067    private final AsyncFSOutput out;
068
069    private final byte[] oneByteBuf = new byte[1];
070
071    @Override
072    public void write(int b) throws IOException {
073      oneByteBuf[0] = (byte) b;
074      write(oneByteBuf);
075    }
076
077    public OutputStreamWrapper(AsyncFSOutput out) {
078      this.out = out;
079    }
080
081    @Override
082    public void write(ByteBuffer b, int off, int len) throws IOException {
083      ByteBuffer bb = b.duplicate();
084      bb.position(off);
085      bb.limit(off + len);
086      out.write(bb);
087    }
088
089    @Override
090    public void writeInt(int i) throws IOException {
091      out.writeInt(i);
092    }
093
094    @Override
095    public void write(byte[] b, int off, int len) throws IOException {
096      out.write(b, off, len);
097    }
098
099    @Override
100    public void close() throws IOException {
101      out.close();
102    }
103  }
104
105  private OutputStream asyncOutputWrapper;
106
107  public AsyncProtobufLogWriter(EventLoopGroup eventLoopGroup,
108      Class<? extends Channel> channelClass) {
109    this.eventLoopGroup = eventLoopGroup;
110    this.channelClass = channelClass;
111  }
112
113  /*
114   * @return class name which is recognized by hbase-1.x to avoid ProtobufLogReader throwing error:
115   *   IOException: Got unknown writer class: AsyncProtobufLogWriter
116   */
117  @Override
118  protected String getWriterClassName() {
119    return "ProtobufLogWriter";
120  }
121
122  @Override
123  public void append(Entry entry) {
124    int buffered = output.buffered();
125    entry.setCompressionContext(compressionContext);
126    try {
127      entry.getKey().
128        getBuilder(compressor).setFollowingKvCount(entry.getEdit().size()).build()
129          .writeDelimitedTo(asyncOutputWrapper);
130    } catch (IOException e) {
131      throw new AssertionError("should not happen", e);
132    }
133    try {
134      for (Cell cell : entry.getEdit().getCells()) {
135        cellEncoder.write(cell);
136      }
137    } catch (IOException e) {
138      throw new AssertionError("should not happen", e);
139    }
140    length.addAndGet(output.buffered() - buffered);
141  }
142
143  @Override
144  public CompletableFuture<Long> sync() {
145    return output.flush(false);
146  }
147
148  @Override
149  public synchronized void close() throws IOException {
150    if (this.output == null) {
151      return;
152    }
153    try {
154      writeWALTrailer();
155      output.close();
156    } catch (Exception e) {
157      LOG.warn("normal close failed, try recover", e);
158      output.recoverAndClose(null);
159    }
160    this.output = null;
161  }
162
163  public AsyncFSOutput getOutput() {
164    return this.output;
165  }
166
167  @Override
168  protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
169      short replication, long blockSize) throws IOException, StreamLacksCapabilityException {
170    this.output = AsyncFSOutputHelper.createOutput(fs, path, overwritable, false, replication,
171        blockSize, eventLoopGroup, channelClass);
172    this.asyncOutputWrapper = new OutputStreamWrapper(output);
173  }
174
175  private long write(Consumer<CompletableFuture<Long>> action) throws IOException {
176    CompletableFuture<Long> future = new CompletableFuture<>();
177    action.accept(future);
178    try {
179      return future.get().longValue();
180    } catch (InterruptedException e) {
181      InterruptedIOException ioe = new InterruptedIOException();
182      ioe.initCause(e);
183      throw ioe;
184    } catch (ExecutionException e) {
185      Throwables.propagateIfPossible(e.getCause(), IOException.class);
186      throw new RuntimeException(e.getCause());
187    }
188  }
189
190  @Override
191  protected long writeMagicAndWALHeader(byte[] magic, WALHeader header) throws IOException {
192    return write(future -> {
193      output.write(magic);
194      try {
195        header.writeDelimitedTo(asyncOutputWrapper);
196      } catch (IOException e) {
197        // should not happen
198        throw new AssertionError(e);
199      }
200      addListener(output.flush(false), (len, error) -> {
201        if (error != null) {
202          future.completeExceptionally(error);
203        } else {
204          future.complete(len);
205        }
206      });
207    });
208  }
209
210  @Override
211  protected long writeWALTrailerAndMagic(WALTrailer trailer, byte[] magic) throws IOException {
212    return write(future -> {
213      try {
214        trailer.writeTo(asyncOutputWrapper);
215      } catch (IOException e) {
216        // should not happen
217        throw new AssertionError(e);
218      }
219      output.writeInt(trailer.getSerializedSize());
220      output.write(magic);
221      addListener(output.flush(false), (len, error) -> {
222        if (error != null) {
223          future.completeExceptionally(error);
224        } else {
225          future.complete(len);
226        }
227      });
228    });
229  }
230
231  @Override
232  protected OutputStream getOutputStreamForCellEncoder() {
233    return asyncOutputWrapper;
234  }
235}