001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
021
022import java.io.IOException;
023import java.io.InterruptedIOException;
024import java.io.OutputStream;
025import java.nio.ByteBuffer;
026import java.util.concurrent.CompletableFuture;
027import java.util.concurrent.ExecutionException;
028import java.util.function.Consumer;
029import org.apache.hadoop.fs.FileSystem;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.Cell;
032import org.apache.hadoop.hbase.io.ByteBufferWriter;
033import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
034import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutputHelper;
035import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
036import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
037import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
038import org.apache.hadoop.hbase.wal.WAL.Entry;
039import org.apache.yetus.audience.InterfaceAudience;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
044import org.apache.hbase.thirdparty.io.netty.channel.Channel;
045import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
046
047import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
048import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;
049
050/**
051 * AsyncWriter for protobuf-based WAL.
052 */
053@InterfaceAudience.Private
054public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
055    implements AsyncFSWALProvider.AsyncWriter {
056
057  private static final Logger LOG = LoggerFactory.getLogger(AsyncProtobufLogWriter.class);
058
059  private final EventLoopGroup eventLoopGroup;
060
061  private final Class<? extends Channel> channelClass;
062
063  private volatile AsyncFSOutput output;
064  /**
065   * Save {@link AsyncFSOutput#getSyncedLength()} when {@link #output} is closed.
066   */
067  private volatile long finalSyncedLength = -1;
068
069  private static final class OutputStreamWrapper extends OutputStream
070      implements ByteBufferWriter {
071
072    private final AsyncFSOutput out;
073
074    private final byte[] oneByteBuf = new byte[1];
075
076    @Override
077    public void write(int b) throws IOException {
078      oneByteBuf[0] = (byte) b;
079      write(oneByteBuf);
080    }
081
082    public OutputStreamWrapper(AsyncFSOutput out) {
083      this.out = out;
084    }
085
086    @Override
087    public void write(ByteBuffer b, int off, int len) throws IOException {
088      ByteBuffer bb = b.duplicate();
089      bb.position(off);
090      bb.limit(off + len);
091      out.write(bb);
092    }
093
094    @Override
095    public void writeInt(int i) throws IOException {
096      out.writeInt(i);
097    }
098
099    @Override
100    public void write(byte[] b, int off, int len) throws IOException {
101      out.write(b, off, len);
102    }
103
104    @Override
105    public void close() throws IOException {
106      out.close();
107    }
108  }
109
110  private OutputStream asyncOutputWrapper;
111
112  public AsyncProtobufLogWriter(EventLoopGroup eventLoopGroup,
113      Class<? extends Channel> channelClass) {
114    this.eventLoopGroup = eventLoopGroup;
115    this.channelClass = channelClass;
116  }
117
118  /*
119   * @return class name which is recognized by hbase-1.x to avoid ProtobufLogReader throwing error:
120   *   IOException: Got unknown writer class: AsyncProtobufLogWriter
121   */
122  @Override
123  protected String getWriterClassName() {
124    return "ProtobufLogWriter";
125  }
126
127  @Override
128  public void append(Entry entry) {
129    int buffered = output.buffered();
130    try {
131      entry.getKey().
132        getBuilder(compressor).setFollowingKvCount(entry.getEdit().size()).build()
133          .writeDelimitedTo(asyncOutputWrapper);
134    } catch (IOException e) {
135      throw new AssertionError("should not happen", e);
136    }
137    try {
138      for (Cell cell : entry.getEdit().getCells()) {
139        cellEncoder.write(cell);
140      }
141    } catch (IOException e) {
142      throw new AssertionError("should not happen", e);
143    }
144    length.addAndGet(output.buffered() - buffered);
145  }
146
147  @Override
148  public CompletableFuture<Long> sync(boolean forceSync) {
149    return output.flush(forceSync);
150  }
151
152  @Override
153  public synchronized void close() throws IOException {
154    if (this.output == null) {
155      return;
156    }
157    try {
158      writeWALTrailer();
159      output.close();
160    } catch (Exception e) {
161      LOG.warn("normal close failed, try recover", e);
162      output.recoverAndClose(null);
163    }
164    /**
165     * We have to call {@link AsyncFSOutput#getSyncedLength()}
166     * after {@link AsyncFSOutput#close()} to get the final length
167     * synced to underlying filesystem because {@link AsyncFSOutput#close()}
168     * may also flush some data to underlying filesystem.
169     */
170    this.finalSyncedLength = this.output.getSyncedLength();
171    this.output = null;
172  }
173
174  public AsyncFSOutput getOutput() {
175    return this.output;
176  }
177
178  @Override
179  protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
180      short replication, long blockSize, StreamSlowMonitor monitor) throws IOException,
181      StreamLacksCapabilityException {
182    this.output = AsyncFSOutputHelper.createOutput(fs, path, overwritable, false, replication,
183        blockSize, eventLoopGroup, channelClass, monitor);
184    this.asyncOutputWrapper = new OutputStreamWrapper(output);
185  }
186
187  private long write(Consumer<CompletableFuture<Long>> action) throws IOException {
188    CompletableFuture<Long> future = new CompletableFuture<>();
189    action.accept(future);
190    try {
191      return future.get().longValue();
192    } catch (InterruptedException e) {
193      InterruptedIOException ioe = new InterruptedIOException();
194      ioe.initCause(e);
195      throw ioe;
196    } catch (ExecutionException e) {
197      Throwables.propagateIfPossible(e.getCause(), IOException.class);
198      throw new RuntimeException(e.getCause());
199    }
200  }
201
202  @Override
203  protected long writeMagicAndWALHeader(byte[] magic, WALHeader header) throws IOException {
204    return write(future -> {
205      output.write(magic);
206      try {
207        header.writeDelimitedTo(asyncOutputWrapper);
208      } catch (IOException e) {
209        // should not happen
210        throw new AssertionError(e);
211      }
212      addListener(output.flush(false), (len, error) -> {
213        if (error != null) {
214          future.completeExceptionally(error);
215        } else {
216          future.complete(len);
217        }
218      });
219    });
220  }
221
222  @Override
223  protected long writeWALTrailerAndMagic(WALTrailer trailer, byte[] magic) throws IOException {
224    return write(future -> {
225      try {
226        trailer.writeTo(asyncOutputWrapper);
227      } catch (IOException e) {
228        // should not happen
229        throw new AssertionError(e);
230      }
231      output.writeInt(trailer.getSerializedSize());
232      output.write(magic);
233      addListener(output.flush(false), (len, error) -> {
234        if (error != null) {
235          future.completeExceptionally(error);
236        } else {
237          future.complete(len);
238        }
239      });
240    });
241  }
242
243  @Override
244  protected OutputStream getOutputStreamForCellEncoder() {
245    return asyncOutputWrapper;
246  }
247
248  @Override
249  public long getSyncedLength() {
250   /**
251    * The statement "this.output = null;" in {@link AsyncProtobufLogWriter#close}
252    * is a sync point, if output is null, then finalSyncedLength must set,
253    * so we can return finalSyncedLength, else we return output.getSyncedLength
254    */
255    AsyncFSOutput outputToUse = this.output;
256    if(outputToUse == null) {
257        long finalSyncedLengthToUse = this.finalSyncedLength;
258        assert finalSyncedLengthToUse >= 0;
259        return finalSyncedLengthToUse;
260    }
261    return outputToUse.getSyncedLength();
262  }
263}