001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
021
022import java.io.IOException;
023import java.io.InterruptedIOException;
024import java.io.OutputStream;
025import java.nio.ByteBuffer;
026import java.util.concurrent.CompletableFuture;
027import java.util.concurrent.ExecutionException;
028import java.util.concurrent.TimeUnit;
029import java.util.concurrent.TimeoutException;
030import java.util.function.Consumer;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.HBaseConfiguration;
035import org.apache.hadoop.hbase.io.ByteBufferWriter;
036import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
037import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutputHelper;
038import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
039import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
040import org.apache.hadoop.hbase.wal.AbstractWALRoller;
041import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
042import org.apache.hadoop.hbase.wal.WAL.Entry;
043import org.apache.yetus.audience.InterfaceAudience;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
048import org.apache.hbase.thirdparty.io.netty.channel.Channel;
049import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
050
051import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
052import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;
053
054/**
055 * AsyncWriter for protobuf-based WAL.
056 */
057@InterfaceAudience.Private
058public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
059  implements AsyncFSWALProvider.AsyncWriter {
060
061  private static final Logger LOG = LoggerFactory.getLogger(AsyncProtobufLogWriter.class);
062
063  private final EventLoopGroup eventLoopGroup;
064
065  private final Class<? extends Channel> channelClass;
066
067  private volatile AsyncFSOutput output;
068  /**
069   * Save {@link AsyncFSOutput#getSyncedLength()} when {@link #output} is closed.
070   */
071  private volatile long finalSyncedLength = -1;
072
073  private static final class OutputStreamWrapper extends OutputStream implements ByteBufferWriter {
074
075    private final AsyncFSOutput out;
076
077    private final byte[] oneByteBuf = new byte[1];
078
079    @Override
080    public void write(int b) throws IOException {
081      oneByteBuf[0] = (byte) b;
082      write(oneByteBuf);
083    }
084
085    public OutputStreamWrapper(AsyncFSOutput out) {
086      this.out = out;
087    }
088
089    @Override
090    public void write(ByteBuffer b, int off, int len) throws IOException {
091      ByteBuffer bb = b.duplicate();
092      bb.position(off);
093      bb.limit(off + len);
094      out.write(bb);
095    }
096
097    @Override
098    public void writeInt(int i) throws IOException {
099      out.writeInt(i);
100    }
101
102    @Override
103    public void write(byte[] b, int off, int len) throws IOException {
104      out.write(b, off, len);
105    }
106
107    @Override
108    public void close() throws IOException {
109      out.close();
110    }
111  }
112
113  private OutputStream asyncOutputWrapper;
114  private long waitTimeout;
115
116  public AsyncProtobufLogWriter(EventLoopGroup eventLoopGroup,
117    Class<? extends Channel> channelClass) {
118    this.eventLoopGroup = eventLoopGroup;
119    this.channelClass = channelClass;
120    // Reuse WAL_ROLL_WAIT_TIMEOUT here to avoid an infinite wait if somehow a wait on a future
121    // never completes. The objective is the same. We want to propagate an exception to trigger
122    // an abort if we seem to be hung.
123    if (this.conf == null) {
124      this.conf = HBaseConfiguration.create();
125    }
126    this.waitTimeout = this.conf.getLong(AbstractWALRoller.WAL_ROLL_WAIT_TIMEOUT,
127      AbstractWALRoller.DEFAULT_WAL_ROLL_WAIT_TIMEOUT);
128  }
129
130  /*
131   * @return class name which is recognized by hbase-1.x to avoid ProtobufLogReader throwing error:
132   * IOException: Got unknown writer class: AsyncProtobufLogWriter
133   */
134  @Override
135  protected String getWriterClassName() {
136    return "ProtobufLogWriter";
137  }
138
139  @Override
140  public void append(Entry entry) {
141    int buffered = output.buffered();
142    try {
143      entry.getKey().getBuilder(compressor).setFollowingKvCount(entry.getEdit().size()).build()
144        .writeDelimitedTo(asyncOutputWrapper);
145    } catch (IOException e) {
146      throw new AssertionError("should not happen", e);
147    }
148    try {
149      for (Cell cell : entry.getEdit().getCells()) {
150        cellEncoder.write(cell);
151      }
152    } catch (IOException e) {
153      throw new AssertionError("should not happen", e);
154    }
155    length.addAndGet(output.buffered() - buffered);
156  }
157
158  @Override
159  public CompletableFuture<Long> sync(boolean forceSync) {
160    return output.flush(forceSync);
161  }
162
163  @Override
164  public synchronized void close() throws IOException {
165    if (this.output == null) {
166      return;
167    }
168    try {
169      writeWALTrailer();
170      output.close();
171    } catch (Exception e) {
172      LOG.warn("normal close failed, try recover", e);
173      output.recoverAndClose(null);
174    }
175    /**
176     * We have to call {@link AsyncFSOutput#getSyncedLength()} after {@link AsyncFSOutput#close()}
177     * to get the final length synced to underlying filesystem because {@link AsyncFSOutput#close()}
178     * may also flush some data to underlying filesystem.
179     */
180    this.finalSyncedLength = this.output.getSyncedLength();
181    this.output = null;
182  }
183
184  public AsyncFSOutput getOutput() {
185    return this.output;
186  }
187
188  @Override
189  protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
190    short replication, long blockSize, StreamSlowMonitor monitor)
191    throws IOException, StreamLacksCapabilityException {
192    this.output = AsyncFSOutputHelper.createOutput(fs, path, overwritable, false, replication,
193      blockSize, eventLoopGroup, channelClass, monitor);
194    this.asyncOutputWrapper = new OutputStreamWrapper(output);
195  }
196
197  @Override
198  protected void closeOutputIfNecessary() {
199    if (this.output != null) {
200      try {
201        this.output.close();
202      } catch (IOException e) {
203        LOG.warn("Close output failed", e);
204      }
205    }
206  }
207
208  private long writeWALMetadata(Consumer<CompletableFuture<Long>> action) throws IOException {
209    CompletableFuture<Long> future = new CompletableFuture<>();
210    action.accept(future);
211    try {
212      return future.get(waitTimeout, TimeUnit.MILLISECONDS).longValue();
213    } catch (InterruptedException e) {
214      InterruptedIOException ioe = new InterruptedIOException();
215      ioe.initCause(e);
216      throw ioe;
217    } catch (ExecutionException | TimeoutException e) {
218      Throwables.propagateIfPossible(e.getCause(), IOException.class);
219      throw new RuntimeException(e.getCause());
220    }
221  }
222
223  @Override
224  protected long writeMagicAndWALHeader(byte[] magic, WALHeader header) throws IOException {
225    return writeWALMetadata(future -> {
226      output.write(magic);
227      try {
228        header.writeDelimitedTo(asyncOutputWrapper);
229      } catch (IOException e) {
230        // should not happen
231        throw new AssertionError(e);
232      }
233      addListener(output.flush(false), (len, error) -> {
234        if (error != null) {
235          future.completeExceptionally(error);
236        } else {
237          future.complete(len);
238        }
239      });
240    });
241  }
242
243  @Override
244  protected long writeWALTrailerAndMagic(WALTrailer trailer, byte[] magic) throws IOException {
245    return writeWALMetadata(future -> {
246      try {
247        trailer.writeTo(asyncOutputWrapper);
248      } catch (IOException e) {
249        // should not happen
250        throw new AssertionError(e);
251      }
252      output.writeInt(trailer.getSerializedSize());
253      output.write(magic);
254      addListener(output.flush(false), (len, error) -> {
255        if (error != null) {
256          future.completeExceptionally(error);
257        } else {
258          future.complete(len);
259        }
260      });
261    });
262  }
263
264  @Override
265  protected OutputStream getOutputStreamForCellEncoder() {
266    return asyncOutputWrapper;
267  }
268
269  @Override
270  public long getSyncedLength() {
271    /**
272     * The statement "this.output = null;" in {@link AsyncProtobufLogWriter#close} is a sync point,
273     * if output is null, then finalSyncedLength must set, so we can return finalSyncedLength, else
274     * we return output.getSyncedLength
275     */
276    AsyncFSOutput outputToUse = this.output;
277    if (outputToUse == null) {
278      long finalSyncedLengthToUse = this.finalSyncedLength;
279      assert finalSyncedLengthToUse >= 0;
280      return finalSyncedLengthToUse;
281    }
282    return outputToUse.getSyncedLength();
283  }
284}