001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
021
022import java.io.IOException;
023import java.io.InterruptedIOException;
024import java.io.OutputStream;
025import java.nio.ByteBuffer;
026import java.util.concurrent.CompletableFuture;
027import java.util.concurrent.ExecutionException;
028import java.util.concurrent.TimeUnit;
029import java.util.concurrent.TimeoutException;
030import java.util.function.Consumer;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.ExtendedCell;
035import org.apache.hadoop.hbase.HBaseConfiguration;
036import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
037import org.apache.hadoop.hbase.io.ByteBufferWriter;
038import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
039import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutputHelper;
040import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
041import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
042import org.apache.hadoop.hbase.wal.AbstractWALRoller;
043import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
044import org.apache.hadoop.hbase.wal.WAL.Entry;
045import org.apache.yetus.audience.InterfaceAudience;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
050import org.apache.hbase.thirdparty.io.netty.channel.Channel;
051import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
052
053import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
054import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;
055
056/**
057 * AsyncWriter for protobuf-based WAL.
058 */
059@InterfaceAudience.Private
060public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
061  implements AsyncFSWALProvider.AsyncWriter {
062
063  private static final Logger LOG = LoggerFactory.getLogger(AsyncProtobufLogWriter.class);
064
065  private final EventLoopGroup eventLoopGroup;
066
067  private final Class<? extends Channel> channelClass;
068
069  private volatile AsyncFSOutput output;
070  /**
071   * Save {@link AsyncFSOutput#getSyncedLength()} when {@link #output} is closed.
072   */
073  private volatile long finalSyncedLength = -1;
074
075  private static final class OutputStreamWrapper extends OutputStream implements ByteBufferWriter {
076
077    private final AsyncFSOutput out;
078
079    private final byte[] oneByteBuf = new byte[1];
080
081    @Override
082    public void write(int b) throws IOException {
083      oneByteBuf[0] = (byte) b;
084      write(oneByteBuf);
085    }
086
087    public OutputStreamWrapper(AsyncFSOutput out) {
088      this.out = out;
089    }
090
091    @Override
092    public void write(ByteBuffer b, int off, int len) throws IOException {
093      ByteBuffer bb = b.duplicate();
094      bb.position(off);
095      bb.limit(off + len);
096      out.write(bb);
097    }
098
099    @Override
100    public void writeInt(int i) throws IOException {
101      out.writeInt(i);
102    }
103
104    @Override
105    public void write(byte[] b, int off, int len) throws IOException {
106      out.write(b, off, len);
107    }
108
109    @Override
110    public void close() throws IOException {
111      out.close();
112    }
113  }
114
115  private OutputStream asyncOutputWrapper;
116  private long waitTimeout;
117
118  public AsyncProtobufLogWriter(EventLoopGroup eventLoopGroup,
119    Class<? extends Channel> channelClass) {
120    this.eventLoopGroup = eventLoopGroup;
121    this.channelClass = channelClass;
122    // Reuse WAL_ROLL_WAIT_TIMEOUT here to avoid an infinite wait if somehow a wait on a future
123    // never completes. The objective is the same. We want to propagate an exception to trigger
124    // an abort if we seem to be hung.
125    if (this.conf == null) {
126      this.conf = HBaseConfiguration.create();
127    }
128    this.waitTimeout = this.conf.getLong(AbstractWALRoller.WAL_ROLL_WAIT_TIMEOUT,
129      AbstractWALRoller.DEFAULT_WAL_ROLL_WAIT_TIMEOUT);
130  }
131
132  @Override
133  public void append(Entry entry) {
134    int buffered = output.buffered();
135    try {
136      entry.getKey().getBuilder(compressor).setFollowingKvCount(entry.getEdit().size()).build()
137        .writeDelimitedTo(asyncOutputWrapper);
138    } catch (IOException e) {
139      throw new AssertionError("should not happen", e);
140    }
141    try {
142      for (Cell cell : entry.getEdit().getCells()) {
143        cellEncoder.write((ExtendedCell) cell);
144      }
145    } catch (IOException e) {
146      throw new AssertionError("should not happen", e);
147    }
148    length.addAndGet(output.buffered() - buffered);
149  }
150
151  @Override
152  public CompletableFuture<Long> sync(boolean forceSync) {
153    return output.flush(forceSync);
154  }
155
156  @Override
157  public synchronized void close() throws IOException {
158    if (this.output == null) {
159      return;
160    }
161    try {
162      writeWALTrailer();
163      output.close();
164    } catch (Exception e) {
165      LOG.warn("normal close failed, try recover", e);
166      output.recoverAndClose(null);
167    }
168    /**
169     * We have to call {@link AsyncFSOutput#getSyncedLength()} after {@link AsyncFSOutput#close()}
170     * to get the final length synced to underlying filesystem because {@link AsyncFSOutput#close()}
171     * may also flush some data to underlying filesystem.
172     */
173    this.finalSyncedLength = this.output.getSyncedLength();
174    this.output = null;
175  }
176
177  public AsyncFSOutput getOutput() {
178    return this.output;
179  }
180
181  @Override
182  protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
183    short replication, long blockSize, StreamSlowMonitor monitor, boolean noLocalWrite)
184    throws IOException, StreamLacksCapabilityException {
185    this.output = AsyncFSOutputHelper.createOutput(fs, path, overwritable, false, replication,
186      blockSize, eventLoopGroup, channelClass, monitor, noLocalWrite);
187    this.asyncOutputWrapper = new OutputStreamWrapper(output);
188  }
189
190  @Override
191  protected void closeOutputIfNecessary() {
192    if (this.output != null) {
193      try {
194        this.output.close();
195      } catch (IOException e) {
196        LOG.warn("Close output failed", e);
197      }
198    }
199  }
200
201  private long writeWALMetadata(Consumer<CompletableFuture<Long>> action) throws IOException {
202    CompletableFuture<Long> future = new CompletableFuture<>();
203    action.accept(future);
204    try {
205      return future.get(waitTimeout, TimeUnit.MILLISECONDS).longValue();
206    } catch (InterruptedException e) {
207      InterruptedIOException ioe = new InterruptedIOException();
208      ioe.initCause(e);
209      throw ioe;
210    } catch (ExecutionException e) {
211      Throwables.propagateIfPossible(e.getCause(), IOException.class);
212      throw new RuntimeException(e.getCause());
213    } catch (TimeoutException e) {
214      throw new TimeoutIOException(e);
215    }
216  }
217
218  @Override
219  protected long writeMagicAndWALHeader(byte[] magic, WALHeader header) throws IOException {
220    return writeWALMetadata(future -> {
221      output.write(magic);
222      try {
223        header.writeDelimitedTo(asyncOutputWrapper);
224      } catch (IOException e) {
225        // should not happen
226        throw new AssertionError(e);
227      }
228      addListener(output.flush(false), (len, error) -> {
229        if (error != null) {
230          future.completeExceptionally(error);
231        } else {
232          future.complete(len);
233        }
234      });
235    });
236  }
237
238  @Override
239  protected long writeWALTrailerAndMagic(WALTrailer trailer, byte[] magic) throws IOException {
240    return writeWALMetadata(future -> {
241      try {
242        trailer.writeTo(asyncOutputWrapper);
243      } catch (IOException e) {
244        // should not happen
245        throw new AssertionError(e);
246      }
247      output.writeInt(trailer.getSerializedSize());
248      output.write(magic);
249      addListener(output.flush(false), (len, error) -> {
250        if (error != null) {
251          future.completeExceptionally(error);
252        } else {
253          future.complete(len);
254        }
255      });
256    });
257  }
258
259  @Override
260  protected OutputStream getOutputStreamForCellEncoder() {
261    return asyncOutputWrapper;
262  }
263
264  @Override
265  public long getSyncedLength() {
266    /**
267     * The statement "this.output = null;" in {@link AsyncProtobufLogWriter#close} is a sync point,
268     * if output is null, then finalSyncedLength must set, so we can return finalSyncedLength, else
269     * we return output.getSyncedLength
270     */
271    AsyncFSOutput outputToUse = this.output;
272    if (outputToUse == null) {
273      long finalSyncedLengthToUse = this.finalSyncedLength;
274      assert finalSyncedLengthToUse >= 0;
275      return finalSyncedLengthToUse;
276    }
277    return outputToUse.getSyncedLength();
278  }
279}