001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
021
022import java.io.IOException;
023import java.io.InterruptedIOException;
024import java.io.OutputStream;
025import java.nio.ByteBuffer;
026import java.util.concurrent.CompletableFuture;
027import java.util.concurrent.ExecutionException;
028import java.util.concurrent.TimeUnit;
029import java.util.concurrent.TimeoutException;
030import java.util.function.Consumer;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.HBaseConfiguration;
035import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
036import org.apache.hadoop.hbase.io.ByteBufferWriter;
037import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
038import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutputHelper;
039import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
040import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
041import org.apache.hadoop.hbase.wal.AbstractWALRoller;
042import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
043import org.apache.hadoop.hbase.wal.WAL.Entry;
044import org.apache.yetus.audience.InterfaceAudience;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
049import org.apache.hbase.thirdparty.io.netty.channel.Channel;
050import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
051
052import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
053import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;
054
055/**
056 * AsyncWriter for protobuf-based WAL.
057 */
058@InterfaceAudience.Private
059public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
060  implements AsyncFSWALProvider.AsyncWriter {
061
062  private static final Logger LOG = LoggerFactory.getLogger(AsyncProtobufLogWriter.class);
063
064  private final EventLoopGroup eventLoopGroup;
065
066  private final Class<? extends Channel> channelClass;
067
068  private volatile AsyncFSOutput output;
069  /**
070   * Save {@link AsyncFSOutput#getSyncedLength()} when {@link #output} is closed.
071   */
072  private volatile long finalSyncedLength = -1;
073
074  private static final class OutputStreamWrapper extends OutputStream implements ByteBufferWriter {
075
076    private final AsyncFSOutput out;
077
078    private final byte[] oneByteBuf = new byte[1];
079
080    @Override
081    public void write(int b) throws IOException {
082      oneByteBuf[0] = (byte) b;
083      write(oneByteBuf);
084    }
085
086    public OutputStreamWrapper(AsyncFSOutput out) {
087      this.out = out;
088    }
089
090    @Override
091    public void write(ByteBuffer b, int off, int len) throws IOException {
092      ByteBuffer bb = b.duplicate();
093      bb.position(off);
094      bb.limit(off + len);
095      out.write(bb);
096    }
097
098    @Override
099    public void writeInt(int i) throws IOException {
100      out.writeInt(i);
101    }
102
103    @Override
104    public void write(byte[] b, int off, int len) throws IOException {
105      out.write(b, off, len);
106    }
107
108    @Override
109    public void close() throws IOException {
110      out.close();
111    }
112  }
113
114  private OutputStream asyncOutputWrapper;
115  private long waitTimeout;
116
117  public AsyncProtobufLogWriter(EventLoopGroup eventLoopGroup,
118    Class<? extends Channel> channelClass) {
119    this.eventLoopGroup = eventLoopGroup;
120    this.channelClass = channelClass;
121    // Reuse WAL_ROLL_WAIT_TIMEOUT here to avoid an infinite wait if somehow a wait on a future
122    // never completes. The objective is the same. We want to propagate an exception to trigger
123    // an abort if we seem to be hung.
124    if (this.conf == null) {
125      this.conf = HBaseConfiguration.create();
126    }
127    this.waitTimeout = this.conf.getLong(AbstractWALRoller.WAL_ROLL_WAIT_TIMEOUT,
128      AbstractWALRoller.DEFAULT_WAL_ROLL_WAIT_TIMEOUT);
129  }
130
131  @Override
132  public void append(Entry entry) {
133    int buffered = output.buffered();
134    try {
135      entry.getKey().getBuilder(compressor).setFollowingKvCount(entry.getEdit().size()).build()
136        .writeDelimitedTo(asyncOutputWrapper);
137    } catch (IOException e) {
138      throw new AssertionError("should not happen", e);
139    }
140    try {
141      for (Cell cell : entry.getEdit().getCells()) {
142        cellEncoder.write(cell);
143      }
144    } catch (IOException e) {
145      throw new AssertionError("should not happen", e);
146    }
147    length.addAndGet(output.buffered() - buffered);
148  }
149
150  @Override
151  public CompletableFuture<Long> sync(boolean forceSync) {
152    return output.flush(forceSync);
153  }
154
155  @Override
156  public synchronized void close() throws IOException {
157    if (this.output == null) {
158      return;
159    }
160    try {
161      writeWALTrailer();
162      output.close();
163    } catch (Exception e) {
164      LOG.warn("normal close failed, try recover", e);
165      output.recoverAndClose(null);
166    }
167    /**
168     * We have to call {@link AsyncFSOutput#getSyncedLength()} after {@link AsyncFSOutput#close()}
169     * to get the final length synced to underlying filesystem because {@link AsyncFSOutput#close()}
170     * may also flush some data to underlying filesystem.
171     */
172    this.finalSyncedLength = this.output.getSyncedLength();
173    this.output = null;
174  }
175
176  public AsyncFSOutput getOutput() {
177    return this.output;
178  }
179
180  @Override
181  protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
182    short replication, long blockSize, StreamSlowMonitor monitor, boolean noLocalWrite)
183    throws IOException, StreamLacksCapabilityException {
184    this.output = AsyncFSOutputHelper.createOutput(fs, path, overwritable, false, replication,
185      blockSize, eventLoopGroup, channelClass, monitor, noLocalWrite);
186    this.asyncOutputWrapper = new OutputStreamWrapper(output);
187  }
188
189  @Override
190  protected void closeOutputIfNecessary() {
191    if (this.output != null) {
192      try {
193        this.output.close();
194      } catch (IOException e) {
195        LOG.warn("Close output failed", e);
196      }
197    }
198  }
199
200  private long writeWALMetadata(Consumer<CompletableFuture<Long>> action) throws IOException {
201    CompletableFuture<Long> future = new CompletableFuture<>();
202    action.accept(future);
203    try {
204      return future.get(waitTimeout, TimeUnit.MILLISECONDS).longValue();
205    } catch (InterruptedException e) {
206      InterruptedIOException ioe = new InterruptedIOException();
207      ioe.initCause(e);
208      throw ioe;
209    } catch (ExecutionException e) {
210      Throwables.propagateIfPossible(e.getCause(), IOException.class);
211      throw new RuntimeException(e.getCause());
212    } catch (TimeoutException e) {
213      throw new TimeoutIOException(e);
214    }
215  }
216
217  @Override
218  protected long writeMagicAndWALHeader(byte[] magic, WALHeader header) throws IOException {
219    return writeWALMetadata(future -> {
220      output.write(magic);
221      try {
222        header.writeDelimitedTo(asyncOutputWrapper);
223      } catch (IOException e) {
224        // should not happen
225        throw new AssertionError(e);
226      }
227      addListener(output.flush(false), (len, error) -> {
228        if (error != null) {
229          future.completeExceptionally(error);
230        } else {
231          future.complete(len);
232        }
233      });
234    });
235  }
236
237  @Override
238  protected long writeWALTrailerAndMagic(WALTrailer trailer, byte[] magic) throws IOException {
239    return writeWALMetadata(future -> {
240      try {
241        trailer.writeTo(asyncOutputWrapper);
242      } catch (IOException e) {
243        // should not happen
244        throw new AssertionError(e);
245      }
246      output.writeInt(trailer.getSerializedSize());
247      output.write(magic);
248      addListener(output.flush(false), (len, error) -> {
249        if (error != null) {
250          future.completeExceptionally(error);
251        } else {
252          future.complete(len);
253        }
254      });
255    });
256  }
257
258  @Override
259  protected OutputStream getOutputStreamForCellEncoder() {
260    return asyncOutputWrapper;
261  }
262
263  @Override
264  public long getSyncedLength() {
265    /**
266     * The statement "this.output = null;" in {@link AsyncProtobufLogWriter#close} is a sync point,
267     * if output is null, then finalSyncedLength must set, so we can return finalSyncedLength, else
268     * we return output.getSyncedLength
269     */
270    AsyncFSOutput outputToUse = this.output;
271    if (outputToUse == null) {
272      long finalSyncedLengthToUse = this.finalSyncedLength;
273      assert finalSyncedLengthToUse >= 0;
274      return finalSyncedLengthToUse;
275    }
276    return outputToUse.getSyncedLength();
277  }
278}