001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
021
022import java.io.IOException;
023import java.io.InterruptedIOException;
024import java.io.OutputStream;
025import java.nio.ByteBuffer;
026import java.util.concurrent.CompletableFuture;
027import java.util.concurrent.ExecutionException;
028import java.util.function.Consumer;
029import org.apache.hadoop.fs.FileSystem;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.Cell;
032import org.apache.hadoop.hbase.io.ByteBufferWriter;
033import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
034import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutputHelper;
035import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
036import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
037import org.apache.hadoop.hbase.wal.WAL.Entry;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
043import org.apache.hbase.thirdparty.io.netty.channel.Channel;
044import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
045
046import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
047import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;
048
049/**
050 * AsyncWriter for protobuf-based WAL.
051 */
052@InterfaceAudience.Private
053public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
054    implements AsyncFSWALProvider.AsyncWriter {
055
056  private static final Logger LOG = LoggerFactory.getLogger(AsyncProtobufLogWriter.class);
057
058  private final EventLoopGroup eventLoopGroup;
059
060  private final Class<? extends Channel> channelClass;
061
062  private volatile AsyncFSOutput output;
063  /**
064   * Save {@link AsyncFSOutput#getSyncedLength()} when {@link #output} is closed.
065   */
066  private volatile long finalSyncedLength = -1;
067
068  private static final class OutputStreamWrapper extends OutputStream
069      implements ByteBufferWriter {
070
071    private final AsyncFSOutput out;
072
073    private final byte[] oneByteBuf = new byte[1];
074
075    @Override
076    public void write(int b) throws IOException {
077      oneByteBuf[0] = (byte) b;
078      write(oneByteBuf);
079    }
080
081    public OutputStreamWrapper(AsyncFSOutput out) {
082      this.out = out;
083    }
084
085    @Override
086    public void write(ByteBuffer b, int off, int len) throws IOException {
087      ByteBuffer bb = b.duplicate();
088      bb.position(off);
089      bb.limit(off + len);
090      out.write(bb);
091    }
092
093    @Override
094    public void writeInt(int i) throws IOException {
095      out.writeInt(i);
096    }
097
098    @Override
099    public void write(byte[] b, int off, int len) throws IOException {
100      out.write(b, off, len);
101    }
102
103    @Override
104    public void close() throws IOException {
105      out.close();
106    }
107  }
108
109  private OutputStream asyncOutputWrapper;
110
111  public AsyncProtobufLogWriter(EventLoopGroup eventLoopGroup,
112      Class<? extends Channel> channelClass) {
113    this.eventLoopGroup = eventLoopGroup;
114    this.channelClass = channelClass;
115  }
116
117  /*
118   * @return class name which is recognized by hbase-1.x to avoid ProtobufLogReader throwing error:
119   *   IOException: Got unknown writer class: AsyncProtobufLogWriter
120   */
121  @Override
122  protected String getWriterClassName() {
123    return "ProtobufLogWriter";
124  }
125
126  @Override
127  public void append(Entry entry) {
128    int buffered = output.buffered();
129    try {
130      entry.getKey().
131        getBuilder(compressor).setFollowingKvCount(entry.getEdit().size()).build()
132          .writeDelimitedTo(asyncOutputWrapper);
133    } catch (IOException e) {
134      throw new AssertionError("should not happen", e);
135    }
136    try {
137      for (Cell cell : entry.getEdit().getCells()) {
138        cellEncoder.write(cell);
139      }
140    } catch (IOException e) {
141      throw new AssertionError("should not happen", e);
142    }
143    length.addAndGet(output.buffered() - buffered);
144  }
145
146  @Override
147  public CompletableFuture<Long> sync(boolean forceSync) {
148    return output.flush(forceSync);
149  }
150
151  @Override
152  public synchronized void close() throws IOException {
153    if (this.output == null) {
154      return;
155    }
156    try {
157      writeWALTrailer();
158      output.close();
159    } catch (Exception e) {
160      LOG.warn("normal close failed, try recover", e);
161      output.recoverAndClose(null);
162    }
163    /**
164     * We have to call {@link AsyncFSOutput#getSyncedLength()}
165     * after {@link AsyncFSOutput#close()} to get the final length
166     * synced to underlying filesystem because {@link AsyncFSOutput#close()}
167     * may also flush some data to underlying filesystem.
168     */
169    this.finalSyncedLength = this.output.getSyncedLength();
170    this.output = null;
171  }
172
173  public AsyncFSOutput getOutput() {
174    return this.output;
175  }
176
177  @Override
178  protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
179      short replication, long blockSize) throws IOException, StreamLacksCapabilityException {
180    this.output = AsyncFSOutputHelper.createOutput(fs, path, overwritable, false, replication,
181        blockSize, eventLoopGroup, channelClass);
182    this.asyncOutputWrapper = new OutputStreamWrapper(output);
183  }
184
185  private long write(Consumer<CompletableFuture<Long>> action) throws IOException {
186    CompletableFuture<Long> future = new CompletableFuture<>();
187    action.accept(future);
188    try {
189      return future.get().longValue();
190    } catch (InterruptedException e) {
191      InterruptedIOException ioe = new InterruptedIOException();
192      ioe.initCause(e);
193      throw ioe;
194    } catch (ExecutionException e) {
195      Throwables.propagateIfPossible(e.getCause(), IOException.class);
196      throw new RuntimeException(e.getCause());
197    }
198  }
199
200  @Override
201  protected long writeMagicAndWALHeader(byte[] magic, WALHeader header) throws IOException {
202    return write(future -> {
203      output.write(magic);
204      try {
205        header.writeDelimitedTo(asyncOutputWrapper);
206      } catch (IOException e) {
207        // should not happen
208        throw new AssertionError(e);
209      }
210      addListener(output.flush(false), (len, error) -> {
211        if (error != null) {
212          future.completeExceptionally(error);
213        } else {
214          future.complete(len);
215        }
216      });
217    });
218  }
219
220  @Override
221  protected long writeWALTrailerAndMagic(WALTrailer trailer, byte[] magic) throws IOException {
222    return write(future -> {
223      try {
224        trailer.writeTo(asyncOutputWrapper);
225      } catch (IOException e) {
226        // should not happen
227        throw new AssertionError(e);
228      }
229      output.writeInt(trailer.getSerializedSize());
230      output.write(magic);
231      addListener(output.flush(false), (len, error) -> {
232        if (error != null) {
233          future.completeExceptionally(error);
234        } else {
235          future.complete(len);
236        }
237      });
238    });
239  }
240
241  @Override
242  protected OutputStream getOutputStreamForCellEncoder() {
243    return asyncOutputWrapper;
244  }
245
246  @Override
247  public long getSyncedLength() {
248   /**
249    * The statement "this.output = null;" in {@link AsyncProtobufLogWriter#close}
250    * is a sync point, if output is null, then finalSyncedLength must set,
251    * so we can return finalSyncedLength, else we return output.getSyncedLength
252    */
253    AsyncFSOutput outputToUse = this.output;
254    if(outputToUse == null) {
255        long finalSyncedLengthToUse = this.finalSyncedLength;
256        assert finalSyncedLengthToUse >= 0;
257        return finalSyncedLengthToUse;
258    }
259    return outputToUse.getSyncedLength();
260  }
261}