001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
021
022import java.io.IOException;
023import java.io.InterruptedIOException;
024import java.io.OutputStream;
025import java.nio.ByteBuffer;
026import java.util.concurrent.CompletableFuture;
027import java.util.concurrent.ExecutionException;
028import java.util.function.Consumer;
029import org.apache.hadoop.fs.FileSystem;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.Cell;
032import org.apache.hadoop.hbase.io.ByteBufferWriter;
033import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
034import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutputHelper;
035import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
036import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
037import org.apache.hadoop.hbase.wal.WAL.Entry;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
043import org.apache.hbase.thirdparty.io.netty.channel.Channel;
044import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
045
046import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
047import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;
048
049/**
050 * AsyncWriter for protobuf-based WAL.
051 */
052@InterfaceAudience.Private
053public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
054    implements AsyncFSWALProvider.AsyncWriter {
055
056  private static final Logger LOG = LoggerFactory.getLogger(AsyncProtobufLogWriter.class);
057
058  private final EventLoopGroup eventLoopGroup;
059
060  private final Class<? extends Channel> channelClass;
061
062  private AsyncFSOutput output;
063
064  private static final class OutputStreamWrapper extends OutputStream
065      implements ByteBufferWriter {
066
067    private final AsyncFSOutput out;
068
069    private final byte[] oneByteBuf = new byte[1];
070
071    @Override
072    public void write(int b) throws IOException {
073      oneByteBuf[0] = (byte) b;
074      write(oneByteBuf);
075    }
076
077    public OutputStreamWrapper(AsyncFSOutput out) {
078      this.out = out;
079    }
080
081    @Override
082    public void write(ByteBuffer b, int off, int len) throws IOException {
083      ByteBuffer bb = b.duplicate();
084      bb.position(off);
085      bb.limit(off + len);
086      out.write(bb);
087    }
088
089    @Override
090    public void writeInt(int i) throws IOException {
091      out.writeInt(i);
092    }
093
094    @Override
095    public void write(byte[] b, int off, int len) throws IOException {
096      out.write(b, off, len);
097    }
098
099    @Override
100    public void close() throws IOException {
101      out.close();
102    }
103  }
104
105  private OutputStream asyncOutputWrapper;
106
107  public AsyncProtobufLogWriter(EventLoopGroup eventLoopGroup,
108      Class<? extends Channel> channelClass) {
109    this.eventLoopGroup = eventLoopGroup;
110    this.channelClass = channelClass;
111  }
112
113  /*
114   * @return class name which is recognized by hbase-1.x to avoid ProtobufLogReader throwing error:
115   *   IOException: Got unknown writer class: AsyncProtobufLogWriter
116   */
117  @Override
118  protected String getWriterClassName() {
119    return "ProtobufLogWriter";
120  }
121
122  @Override
123  public void append(Entry entry) {
124    int buffered = output.buffered();
125    try {
126      entry.getKey().
127        getBuilder(compressor).setFollowingKvCount(entry.getEdit().size()).build()
128          .writeDelimitedTo(asyncOutputWrapper);
129    } catch (IOException e) {
130      throw new AssertionError("should not happen", e);
131    }
132    try {
133      for (Cell cell : entry.getEdit().getCells()) {
134        cellEncoder.write(cell);
135      }
136    } catch (IOException e) {
137      throw new AssertionError("should not happen", e);
138    }
139    length.addAndGet(output.buffered() - buffered);
140  }
141
142  @Override
143  public CompletableFuture<Long> sync(boolean forceSync) {
144    return output.flush(forceSync);
145  }
146
147  @Override
148  public synchronized void close() throws IOException {
149    if (this.output == null) {
150      return;
151    }
152    try {
153      writeWALTrailer();
154      output.close();
155    } catch (Exception e) {
156      LOG.warn("normal close failed, try recover", e);
157      output.recoverAndClose(null);
158    }
159    this.output = null;
160  }
161
162  public AsyncFSOutput getOutput() {
163    return this.output;
164  }
165
166  @Override
167  protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
168      short replication, long blockSize) throws IOException, StreamLacksCapabilityException {
169    this.output = AsyncFSOutputHelper.createOutput(fs, path, overwritable, false, replication,
170        blockSize, eventLoopGroup, channelClass);
171    this.asyncOutputWrapper = new OutputStreamWrapper(output);
172  }
173
174  private long write(Consumer<CompletableFuture<Long>> action) throws IOException {
175    CompletableFuture<Long> future = new CompletableFuture<>();
176    action.accept(future);
177    try {
178      return future.get().longValue();
179    } catch (InterruptedException e) {
180      InterruptedIOException ioe = new InterruptedIOException();
181      ioe.initCause(e);
182      throw ioe;
183    } catch (ExecutionException e) {
184      Throwables.propagateIfPossible(e.getCause(), IOException.class);
185      throw new RuntimeException(e.getCause());
186    }
187  }
188
189  @Override
190  protected long writeMagicAndWALHeader(byte[] magic, WALHeader header) throws IOException {
191    return write(future -> {
192      output.write(magic);
193      try {
194        header.writeDelimitedTo(asyncOutputWrapper);
195      } catch (IOException e) {
196        // should not happen
197        throw new AssertionError(e);
198      }
199      addListener(output.flush(false), (len, error) -> {
200        if (error != null) {
201          future.completeExceptionally(error);
202        } else {
203          future.complete(len);
204        }
205      });
206    });
207  }
208
209  @Override
210  protected long writeWALTrailerAndMagic(WALTrailer trailer, byte[] magic) throws IOException {
211    return write(future -> {
212      try {
213        trailer.writeTo(asyncOutputWrapper);
214      } catch (IOException e) {
215        // should not happen
216        throw new AssertionError(e);
217      }
218      output.writeInt(trailer.getSerializedSize());
219      output.write(magic);
220      addListener(output.flush(false), (len, error) -> {
221        if (error != null) {
222          future.completeExceptionally(error);
223        } else {
224          future.complete(len);
225        }
226      });
227    });
228  }
229
230  @Override
231  protected OutputStream getOutputStreamForCellEncoder() {
232    return asyncOutputWrapper;
233  }
234}