001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
021
022import java.io.IOException;
023import java.io.InterruptedIOException;
024import java.io.OutputStream;
025import java.nio.ByteBuffer;
026import java.util.concurrent.CompletableFuture;
027import java.util.concurrent.ExecutionException;
028import java.util.concurrent.TimeUnit;
029import java.util.concurrent.TimeoutException;
030import java.util.function.Consumer;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.HBaseConfiguration;
035import org.apache.hadoop.hbase.io.ByteBufferWriter;
036import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
037import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutputHelper;
038import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
039import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
040import org.apache.hadoop.hbase.wal.AbstractWALRoller;
041import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
042import org.apache.hadoop.hbase.wal.WAL.Entry;
043import org.apache.yetus.audience.InterfaceAudience;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
048import org.apache.hbase.thirdparty.io.netty.channel.Channel;
049import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
050
051import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
052import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;
053
054/**
055 * AsyncWriter for protobuf-based WAL.
056 */
057@InterfaceAudience.Private
058public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
059  implements AsyncFSWALProvider.AsyncWriter {
060
061  private static final Logger LOG = LoggerFactory.getLogger(AsyncProtobufLogWriter.class);
062
063  private final EventLoopGroup eventLoopGroup;
064
065  private final Class<? extends Channel> channelClass;
066
067  private volatile AsyncFSOutput output;
068  /**
069   * Save {@link AsyncFSOutput#getSyncedLength()} when {@link #output} is closed.
070   */
071  private volatile long finalSyncedLength = -1;
072
073  private static final class OutputStreamWrapper extends OutputStream implements ByteBufferWriter {
074
075    private final AsyncFSOutput out;
076
077    private final byte[] oneByteBuf = new byte[1];
078
079    @Override
080    public void write(int b) throws IOException {
081      oneByteBuf[0] = (byte) b;
082      write(oneByteBuf);
083    }
084
085    public OutputStreamWrapper(AsyncFSOutput out) {
086      this.out = out;
087    }
088
089    @Override
090    public void write(ByteBuffer b, int off, int len) throws IOException {
091      ByteBuffer bb = b.duplicate();
092      bb.position(off);
093      bb.limit(off + len);
094      out.write(bb);
095    }
096
097    @Override
098    public void writeInt(int i) throws IOException {
099      out.writeInt(i);
100    }
101
102    @Override
103    public void write(byte[] b, int off, int len) throws IOException {
104      out.write(b, off, len);
105    }
106
107    @Override
108    public void close() throws IOException {
109      out.close();
110    }
111  }
112
113  private OutputStream asyncOutputWrapper;
114  private long waitTimeout;
115
116  public AsyncProtobufLogWriter(EventLoopGroup eventLoopGroup,
117    Class<? extends Channel> channelClass) {
118    this.eventLoopGroup = eventLoopGroup;
119    this.channelClass = channelClass;
120    // Reuse WAL_ROLL_WAIT_TIMEOUT here to avoid an infinite wait if somehow a wait on a future
121    // never completes. The objective is the same. We want to propagate an exception to trigger
122    // an abort if we seem to be hung.
123    if (this.conf == null) {
124      this.conf = HBaseConfiguration.create();
125    }
126    this.waitTimeout = this.conf.getLong(AbstractWALRoller.WAL_ROLL_WAIT_TIMEOUT,
127      AbstractWALRoller.DEFAULT_WAL_ROLL_WAIT_TIMEOUT);
128  }
129
130  @Override
131  public void append(Entry entry) {
132    int buffered = output.buffered();
133    try {
134      entry.getKey().getBuilder(compressor).setFollowingKvCount(entry.getEdit().size()).build()
135        .writeDelimitedTo(asyncOutputWrapper);
136    } catch (IOException e) {
137      throw new AssertionError("should not happen", e);
138    }
139    try {
140      for (Cell cell : entry.getEdit().getCells()) {
141        cellEncoder.write(cell);
142      }
143    } catch (IOException e) {
144      throw new AssertionError("should not happen", e);
145    }
146    length.addAndGet(output.buffered() - buffered);
147  }
148
149  @Override
150  public CompletableFuture<Long> sync(boolean forceSync) {
151    return output.flush(forceSync);
152  }
153
154  @Override
155  public synchronized void close() throws IOException {
156    if (this.output == null) {
157      return;
158    }
159    try {
160      writeWALTrailer();
161      output.close();
162    } catch (Exception e) {
163      LOG.warn("normal close failed, try recover", e);
164      output.recoverAndClose(null);
165    }
166    /**
167     * We have to call {@link AsyncFSOutput#getSyncedLength()} after {@link AsyncFSOutput#close()}
168     * to get the final length synced to underlying filesystem because {@link AsyncFSOutput#close()}
169     * may also flush some data to underlying filesystem.
170     */
171    this.finalSyncedLength = this.output.getSyncedLength();
172    this.output = null;
173  }
174
175  public AsyncFSOutput getOutput() {
176    return this.output;
177  }
178
179  @Override
180  protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
181    short replication, long blockSize, StreamSlowMonitor monitor, boolean noLocalWrite)
182    throws IOException, StreamLacksCapabilityException {
183    this.output = AsyncFSOutputHelper.createOutput(fs, path, overwritable, false, replication,
184      blockSize, eventLoopGroup, channelClass, monitor, noLocalWrite);
185    this.asyncOutputWrapper = new OutputStreamWrapper(output);
186  }
187
188  @Override
189  protected void closeOutputIfNecessary() {
190    if (this.output != null) {
191      try {
192        this.output.close();
193      } catch (IOException e) {
194        LOG.warn("Close output failed", e);
195      }
196    }
197  }
198
199  private long writeWALMetadata(Consumer<CompletableFuture<Long>> action) throws IOException {
200    CompletableFuture<Long> future = new CompletableFuture<>();
201    action.accept(future);
202    try {
203      return future.get(waitTimeout, TimeUnit.MILLISECONDS).longValue();
204    } catch (InterruptedException e) {
205      InterruptedIOException ioe = new InterruptedIOException();
206      ioe.initCause(e);
207      throw ioe;
208    } catch (ExecutionException | TimeoutException e) {
209      Throwables.propagateIfPossible(e.getCause(), IOException.class);
210      throw new RuntimeException(e.getCause());
211    }
212  }
213
214  @Override
215  protected long writeMagicAndWALHeader(byte[] magic, WALHeader header) throws IOException {
216    return writeWALMetadata(future -> {
217      output.write(magic);
218      try {
219        header.writeDelimitedTo(asyncOutputWrapper);
220      } catch (IOException e) {
221        // should not happen
222        throw new AssertionError(e);
223      }
224      addListener(output.flush(false), (len, error) -> {
225        if (error != null) {
226          future.completeExceptionally(error);
227        } else {
228          future.complete(len);
229        }
230      });
231    });
232  }
233
234  @Override
235  protected long writeWALTrailerAndMagic(WALTrailer trailer, byte[] magic) throws IOException {
236    return writeWALMetadata(future -> {
237      try {
238        trailer.writeTo(asyncOutputWrapper);
239      } catch (IOException e) {
240        // should not happen
241        throw new AssertionError(e);
242      }
243      output.writeInt(trailer.getSerializedSize());
244      output.write(magic);
245      addListener(output.flush(false), (len, error) -> {
246        if (error != null) {
247          future.completeExceptionally(error);
248        } else {
249          future.complete(len);
250        }
251      });
252    });
253  }
254
255  @Override
256  protected OutputStream getOutputStreamForCellEncoder() {
257    return asyncOutputWrapper;
258  }
259
260  @Override
261  public long getSyncedLength() {
262    /**
263     * The statement "this.output = null;" in {@link AsyncProtobufLogWriter#close} is a sync point,
264     * if output is null, then finalSyncedLength must set, so we can return finalSyncedLength, else
265     * we return output.getSyncedLength
266     */
267    AsyncFSOutput outputToUse = this.output;
268    if (outputToUse == null) {
269      long finalSyncedLengthToUse = this.finalSyncedLength;
270      assert finalSyncedLengthToUse >= 0;
271      return finalSyncedLengthToUse;
272    }
273    return outputToUse.getSyncedLength();
274  }
275}