001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.asyncfs;
019
020import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.HEART_BEAT_SEQNO;
021import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.READ_TIMEOUT;
022import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.completeFile;
023import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.endFileLease;
024import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.getStatus;
025import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE;
026import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.WRITER_IDLE;
027import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
028
029import java.io.IOException;
030import java.io.InterruptedIOException;
031import java.nio.ByteBuffer;
032import java.util.Collection;
033import java.util.Collections;
034import java.util.Iterator;
035import java.util.List;
036import java.util.Set;
037import java.util.concurrent.CompletableFuture;
038import java.util.concurrent.ConcurrentHashMap;
039import java.util.concurrent.ConcurrentLinkedDeque;
040import java.util.concurrent.ExecutionException;
041import java.util.concurrent.TimeUnit;
042import java.util.function.Supplier;
043
044import org.apache.hadoop.conf.Configuration;
045import org.apache.hadoop.crypto.Encryptor;
046import org.apache.hadoop.fs.Path;
047import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.CancelOnClose;
048import org.apache.hadoop.hbase.util.CancelableProgressable;
049import org.apache.hadoop.hbase.util.FSUtils;
050import org.apache.hadoop.hdfs.DFSClient;
051import org.apache.hadoop.hdfs.DistributedFileSystem;
052import org.apache.hadoop.hdfs.protocol.ClientProtocol;
053import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
054import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
055import org.apache.hadoop.hdfs.protocol.LocatedBlock;
056import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
057import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
058import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto;
059import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
060import org.apache.hadoop.util.DataChecksum;
061import org.apache.yetus.audience.InterfaceAudience;
062
063import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
064import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
065import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
066import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator;
067import org.apache.hbase.thirdparty.io.netty.channel.Channel;
068import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler.Sharable;
069import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
070import org.apache.hbase.thirdparty.io.netty.channel.ChannelId;
071import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
072import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufDecoder;
073import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
074import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
075import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
076
077/**
078 * An asynchronous HDFS output stream implementation which fans out data to datanode and only
079 * supports writing file with only one block.
080 * <p>
081 * Use the createOutput method in {@link FanOutOneBlockAsyncDFSOutputHelper} to create. The mainly
082 * usage of this class is implementing WAL, so we only expose a little HDFS configurations in the
083 * method. And we place it here under io package because we want to make it independent of WAL
084 * implementation thus easier to move it to HDFS project finally.
085 * <p>
086 * Note that, although we support pipelined flush, i.e, write new data and then flush before the
087 * previous flush succeeds, the implementation is not thread safe, so you should not call its
088 * methods concurrently.
089 * <p>
090 * Advantages compare to DFSOutputStream:
091 * <ol>
092 * <li>The fan out mechanism. This will reduce the latency.</li>
093 * <li>Fail-fast when connection to datanode error. The WAL implementation could open new writer
094 * ASAP.</li>
095 * <li>We could benefit from netty's ByteBuf management mechanism.</li>
096 * </ol>
097 */
098@InterfaceAudience.Private
099public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
100
101  // The MAX_PACKET_SIZE is 16MB but it include the header size and checksum size. So here we set a
102  // smaller limit for data size.
103  private static final int MAX_DATA_LEN = 12 * 1024 * 1024;
104
105  private final Configuration conf;
106
107  private final FSUtils fsUtils;
108
109  private final DistributedFileSystem dfs;
110
111  private final DFSClient client;
112
113  private final ClientProtocol namenode;
114
115  private final String clientName;
116
117  private final String src;
118
119  private final long fileId;
120
121  private final ExtendedBlock block;
122
123  private final DatanodeInfo[] locations;
124
125  private final Encryptor encryptor;
126
127  private final List<Channel> datanodeList;
128
129  private final DataChecksum summer;
130
131  private final int maxDataLen;
132
133  private final ByteBufAllocator alloc;
134
135  private static final class Callback {
136
137    private final CompletableFuture<Long> future;
138
139    private final long ackedLength;
140
141    // should be backed by a thread safe collection
142    private final Set<ChannelId> unfinishedReplicas;
143
144    public Callback(CompletableFuture<Long> future, long ackedLength,
145        Collection<Channel> replicas) {
146      this.future = future;
147      this.ackedLength = ackedLength;
148      if (replicas.isEmpty()) {
149        this.unfinishedReplicas = Collections.emptySet();
150      } else {
151        this.unfinishedReplicas =
152          Collections.newSetFromMap(new ConcurrentHashMap<ChannelId, Boolean>(replicas.size()));
153        replicas.stream().map(c -> c.id()).forEachOrdered(unfinishedReplicas::add);
154      }
155    }
156  }
157
158  private final ConcurrentLinkedDeque<Callback> waitingAckQueue = new ConcurrentLinkedDeque<>();
159
160  private volatile long ackedBlockLength = 0L;
161
162  // this could be different from acked block length because a packet can not start at the middle of
163  // a chunk.
164  private long nextPacketOffsetInBlock = 0L;
165
166  // the length of the trailing partial chunk, this is because the packet start offset must be
167  // aligned with the length of checksum chunk so we need to resend the same data.
168  private int trailingPartialChunkLength = 0;
169
170  private long nextPacketSeqno = 0L;
171
172  private ByteBuf buf;
173
174  private final SendBufSizePredictor sendBufSizePRedictor = new SendBufSizePredictor();
175
176  // State for connections to DN
177  private enum State {
178    STREAMING, CLOSING, BROKEN, CLOSED
179  }
180
181  private volatile State state;
182
183  // all lock-free to make it run faster
184  private void completed(Channel channel) {
185    for (Iterator<Callback> iter = waitingAckQueue.iterator(); iter.hasNext();) {
186      Callback c = iter.next();
187      // if the current unfinished replicas does not contain us then it means that we have already
188      // acked this one, let's iterate to find the one we have not acked yet.
189      if (c.unfinishedReplicas.remove(channel.id())) {
190        if (c.unfinishedReplicas.isEmpty()) {
191          // we need to remove first before complete the future. It is possible that after we
192          // complete the future the upper layer will call close immediately before we remove the
193          // entry from waitingAckQueue and lead to an IllegalStateException. And also set the
194          // ackedBlockLength first otherwise we may use a wrong length to commit the block. This
195          // may lead to multiple remove and assign but is OK. The semantic of iter.remove is
196          // removing the entry returned by calling previous next, so if the entry has already been
197          // removed then it is a no-op, and for the assign, the values are the same so no problem.
198          iter.remove();
199          ackedBlockLength = c.ackedLength;
200          // the future.complete check is to confirm that we are the only one who grabbed the work,
201          // otherwise just give up and return.
202          if (c.future.complete(c.ackedLength)) {
203            // also wake up flush requests which have the same length.
204            while (iter.hasNext()) {
205              Callback maybeDummyCb = iter.next();
206              if (maybeDummyCb.ackedLength == c.ackedLength) {
207                iter.remove();
208                maybeDummyCb.future.complete(c.ackedLength);
209              } else {
210                break;
211              }
212            }
213          }
214        }
215        return;
216      }
217    }
218  }
219
220  // this usually does not happen which means it is not on the critical path so make it synchronized
221  // so that the implementation will not burn up our brain as there are multiple state changes and
222  // checks.
223  private synchronized void failed(Channel channel, Supplier<Throwable> errorSupplier) {
224    if (state == State.BROKEN || state == State.CLOSED) {
225      return;
226    }
227    if (state == State.CLOSING) {
228      Callback c = waitingAckQueue.peekFirst();
229      if (c == null || !c.unfinishedReplicas.contains(channel.id())) {
230        // nothing, the endBlock request has already finished.
231        return;
232      }
233    }
234    // disable further write, and fail all pending ack.
235    state = State.BROKEN;
236    Throwable error = errorSupplier.get();
237    for (Iterator<Callback> iter = waitingAckQueue.iterator(); iter.hasNext();) {
238      Callback c = iter.next();
239      // find the first sync request which we have not acked yet and fail all the request after it.
240      if (!c.unfinishedReplicas.contains(channel.id())) {
241        continue;
242      }
243      for (;;) {
244        c.future.completeExceptionally(error);
245        if (!iter.hasNext()) {
246          break;
247        }
248        c = iter.next();
249      }
250      break;
251    }
252    datanodeList.forEach(ch -> ch.close());
253  }
254
255  @Sharable
256  private final class AckHandler extends SimpleChannelInboundHandler<PipelineAckProto> {
257
258    private final int timeoutMs;
259
260    public AckHandler(int timeoutMs) {
261      this.timeoutMs = timeoutMs;
262    }
263
264    @Override
265    protected void channelRead0(ChannelHandlerContext ctx, PipelineAckProto ack) throws Exception {
266      Status reply = getStatus(ack);
267      if (reply != Status.SUCCESS) {
268        failed(ctx.channel(), () -> new IOException("Bad response " + reply + " for block " +
269          block + " from datanode " + ctx.channel().remoteAddress()));
270        return;
271      }
272      if (PipelineAck.isRestartOOBStatus(reply)) {
273        failed(ctx.channel(), () -> new IOException("Restart response " + reply + " for block " +
274          block + " from datanode " + ctx.channel().remoteAddress()));
275        return;
276      }
277      if (ack.getSeqno() == HEART_BEAT_SEQNO) {
278        return;
279      }
280      completed(ctx.channel());
281    }
282
283    @Override
284    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
285      if (state == State.CLOSED) {
286        return;
287      }
288      failed(ctx.channel(),
289        () -> new IOException("Connection to " + ctx.channel().remoteAddress() + " closed"));
290    }
291
292    @Override
293    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
294      failed(ctx.channel(), () -> cause);
295    }
296
297    @Override
298    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
299      if (evt instanceof IdleStateEvent) {
300        IdleStateEvent e = (IdleStateEvent) evt;
301        if (e.state() == READER_IDLE) {
302          failed(ctx.channel(),
303            () -> new IOException("Timeout(" + timeoutMs + "ms) waiting for response"));
304        } else if (e.state() == WRITER_IDLE) {
305          PacketHeader heartbeat = new PacketHeader(4, 0, HEART_BEAT_SEQNO, false, 0, false);
306          int len = heartbeat.getSerializedSize();
307          ByteBuf buf = alloc.buffer(len);
308          heartbeat.putInBuffer(buf.nioBuffer(0, len));
309          buf.writerIndex(len);
310          ctx.channel().writeAndFlush(buf);
311        }
312        return;
313      }
314      super.userEventTriggered(ctx, evt);
315    }
316  }
317
318  private void setupReceiver(int timeoutMs) {
319    AckHandler ackHandler = new AckHandler(timeoutMs);
320    for (Channel ch : datanodeList) {
321      ch.pipeline().addLast(
322        new IdleStateHandler(timeoutMs, timeoutMs / 2, 0, TimeUnit.MILLISECONDS),
323        new ProtobufVarint32FrameDecoder(),
324        new ProtobufDecoder(PipelineAckProto.getDefaultInstance()), ackHandler);
325      ch.config().setAutoRead(true);
326    }
327  }
328
329  FanOutOneBlockAsyncDFSOutput(Configuration conf, FSUtils fsUtils, DistributedFileSystem dfs,
330      DFSClient client, ClientProtocol namenode, String clientName, String src, long fileId,
331      LocatedBlock locatedBlock, Encryptor encryptor, List<Channel> datanodeList,
332      DataChecksum summer, ByteBufAllocator alloc) {
333    this.conf = conf;
334    this.fsUtils = fsUtils;
335    this.dfs = dfs;
336    this.client = client;
337    this.namenode = namenode;
338    this.fileId = fileId;
339    this.clientName = clientName;
340    this.src = src;
341    this.block = locatedBlock.getBlock();
342    this.locations = locatedBlock.getLocations();
343    this.encryptor = encryptor;
344    this.datanodeList = datanodeList;
345    this.summer = summer;
346    this.maxDataLen = MAX_DATA_LEN - (MAX_DATA_LEN % summer.getBytesPerChecksum());
347    this.alloc = alloc;
348    this.buf = alloc.directBuffer(sendBufSizePRedictor.initialSize());
349    this.state = State.STREAMING;
350    setupReceiver(conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT));
351  }
352
353  @Override
354  public void writeInt(int i) {
355    buf.ensureWritable(4);
356    buf.writeInt(i);
357  }
358
359  @Override
360  public void write(ByteBuffer bb) {
361    buf.ensureWritable(bb.remaining());
362    buf.writeBytes(bb);
363  }
364
365  @Override
366  public void write(byte[] b) {
367    write(b, 0, b.length);
368  }
369
370  @Override
371  public void write(byte[] b, int off, int len) {
372    buf.ensureWritable(len);
373    buf.writeBytes(b, off, len);
374  }
375
376  @Override
377  public int buffered() {
378    return buf.readableBytes();
379  }
380
381  @Override
382  public DatanodeInfo[] getPipeline() {
383    return locations;
384  }
385
386  private void flushBuffer(CompletableFuture<Long> future, ByteBuf dataBuf,
387      long nextPacketOffsetInBlock, boolean syncBlock) {
388    int dataLen = dataBuf.readableBytes();
389    int chunkLen = summer.getBytesPerChecksum();
390    int trailingPartialChunkLen = dataLen % chunkLen;
391    int numChecks = dataLen / chunkLen + (trailingPartialChunkLen != 0 ? 1 : 0);
392    int checksumLen = numChecks * summer.getChecksumSize();
393    ByteBuf checksumBuf = alloc.directBuffer(checksumLen);
394    summer.calculateChunkedSums(dataBuf.nioBuffer(), checksumBuf.nioBuffer(0, checksumLen));
395    checksumBuf.writerIndex(checksumLen);
396    PacketHeader header = new PacketHeader(4 + checksumLen + dataLen, nextPacketOffsetInBlock,
397        nextPacketSeqno, false, dataLen, syncBlock);
398    int headerLen = header.getSerializedSize();
399    ByteBuf headerBuf = alloc.buffer(headerLen);
400    header.putInBuffer(headerBuf.nioBuffer(0, headerLen));
401    headerBuf.writerIndex(headerLen);
402    Callback c = new Callback(future, nextPacketOffsetInBlock + dataLen, datanodeList);
403    waitingAckQueue.addLast(c);
404    // recheck again after we pushed the callback to queue
405    if (state != State.STREAMING && waitingAckQueue.peekFirst() == c) {
406      future.completeExceptionally(new IOException("stream already broken"));
407      // it's the one we have just pushed or just a no-op
408      waitingAckQueue.removeFirst();
409      return;
410    }
411    datanodeList.forEach(ch -> {
412      ch.write(headerBuf.retainedDuplicate());
413      ch.write(checksumBuf.retainedDuplicate());
414      ch.writeAndFlush(dataBuf.retainedDuplicate());
415    });
416    checksumBuf.release();
417    headerBuf.release();
418    dataBuf.release();
419    nextPacketSeqno++;
420  }
421
422  private void flush0(CompletableFuture<Long> future, boolean syncBlock) {
423    if (state != State.STREAMING) {
424      future.completeExceptionally(new IOException("stream already broken"));
425      return;
426    }
427    int dataLen = buf.readableBytes();
428    if (dataLen == trailingPartialChunkLength) {
429      // no new data
430      long lengthAfterFlush = nextPacketOffsetInBlock + dataLen;
431      Callback lastFlush = waitingAckQueue.peekLast();
432      if (lastFlush != null) {
433        Callback c = new Callback(future, lengthAfterFlush, Collections.emptyList());
434        waitingAckQueue.addLast(c);
435        // recheck here if we have already removed the previous callback from the queue
436        if (waitingAckQueue.peekFirst() == c) {
437          // all previous callbacks have been removed
438          // notice that this does mean we will always win here because the background thread may
439          // have already started to mark the future here as completed in the completed or failed
440          // methods but haven't removed it from the queue yet. That's also why the removeFirst
441          // call below may be a no-op.
442          if (state != State.STREAMING) {
443            future.completeExceptionally(new IOException("stream already broken"));
444          } else {
445            future.complete(lengthAfterFlush);
446          }
447          // it's the one we have just pushed or just a no-op
448          waitingAckQueue.removeFirst();
449        }
450      } else {
451        // we must have acked all the data so the ackedBlockLength must be same with
452        // lengthAfterFlush
453        future.complete(lengthAfterFlush);
454      }
455      return;
456    }
457
458    if (encryptor != null) {
459      ByteBuf encryptBuf = alloc.directBuffer(dataLen);
460      buf.readBytes(encryptBuf, trailingPartialChunkLength);
461      int toEncryptLength = dataLen - trailingPartialChunkLength;
462      try {
463        encryptor.encrypt(buf.nioBuffer(trailingPartialChunkLength, toEncryptLength),
464          encryptBuf.nioBuffer(trailingPartialChunkLength, toEncryptLength));
465      } catch (IOException e) {
466        encryptBuf.release();
467        future.completeExceptionally(e);
468        return;
469      }
470      encryptBuf.writerIndex(dataLen);
471      buf.release();
472      buf = encryptBuf;
473    }
474
475    if (dataLen > maxDataLen) {
476      // We need to write out the data by multiple packets as the max packet allowed is 16M.
477      long nextSubPacketOffsetInBlock = nextPacketOffsetInBlock;
478      for (int remaining = dataLen;;) {
479        if (remaining < maxDataLen) {
480          flushBuffer(future, buf.readRetainedSlice(remaining), nextSubPacketOffsetInBlock,
481            syncBlock);
482          break;
483        } else {
484          flushBuffer(new CompletableFuture<>(), buf.readRetainedSlice(maxDataLen),
485            nextSubPacketOffsetInBlock, syncBlock);
486          remaining -= maxDataLen;
487          nextSubPacketOffsetInBlock += maxDataLen;
488        }
489      }
490    } else {
491      flushBuffer(future, buf.retain(), nextPacketOffsetInBlock, syncBlock);
492    }
493    trailingPartialChunkLength = dataLen % summer.getBytesPerChecksum();
494    ByteBuf newBuf = alloc.directBuffer(sendBufSizePRedictor.guess(dataLen))
495        .ensureWritable(trailingPartialChunkLength);
496    if (trailingPartialChunkLength != 0) {
497      buf.readerIndex(dataLen - trailingPartialChunkLength).readBytes(newBuf,
498        trailingPartialChunkLength);
499    }
500    buf.release();
501    this.buf = newBuf;
502    nextPacketOffsetInBlock += dataLen - trailingPartialChunkLength;
503  }
504
505  /**
506   * Flush the buffer out to datanodes.
507   * @param syncBlock will call hsync if true, otherwise hflush.
508   * @return A CompletableFuture that hold the acked length after flushing.
509   */
510  @Override
511  public CompletableFuture<Long> flush(boolean syncBlock) {
512    CompletableFuture<Long> future = new CompletableFuture<>();
513    flush0(future, syncBlock);
514    return future;
515  }
516
517  private void endBlock() throws IOException {
518    Preconditions.checkState(waitingAckQueue.isEmpty(),
519      "should call flush first before calling close");
520    if (state != State.STREAMING) {
521      throw new IOException("stream already broken");
522    }
523    state = State.CLOSING;
524    long finalizedLength = ackedBlockLength;
525    PacketHeader header = new PacketHeader(4, finalizedLength, nextPacketSeqno, true, 0, false);
526    buf.release();
527    buf = null;
528    int headerLen = header.getSerializedSize();
529    ByteBuf headerBuf = alloc.directBuffer(headerLen);
530    header.putInBuffer(headerBuf.nioBuffer(0, headerLen));
531    headerBuf.writerIndex(headerLen);
532    CompletableFuture<Long> future = new CompletableFuture<>();
533    waitingAckQueue.add(new Callback(future, finalizedLength, datanodeList));
534    datanodeList.forEach(ch -> ch.writeAndFlush(headerBuf.retainedDuplicate()));
535    headerBuf.release();
536    try {
537      future.get();
538    } catch (InterruptedException e) {
539      throw (IOException) new InterruptedIOException().initCause(e);
540    } catch (ExecutionException e) {
541      Throwable cause = e.getCause();
542      Throwables.propagateIfPossible(cause, IOException.class);
543      throw new IOException(cause);
544    }
545  }
546
547  /**
548   * The close method when error occurred. Now we just call recoverFileLease.
549   */
550  @Override
551  public void recoverAndClose(CancelableProgressable reporter) throws IOException {
552    datanodeList.forEach(ch -> ch.close());
553    datanodeList.forEach(ch -> ch.closeFuture().awaitUninterruptibly());
554    endFileLease(client, fileId);
555    fsUtils.recoverFileLease(dfs, new Path(src), conf,
556      reporter == null ? new CancelOnClose(client) : reporter);
557  }
558
559  /**
560   * End the current block and complete file at namenode. You should call
561   * {@link #recoverAndClose(CancelableProgressable)} if this method throws an exception.
562   */
563  @Override
564  public void close() throws IOException {
565    endBlock();
566    state = State.CLOSED;
567    datanodeList.forEach(ch -> ch.close());
568    datanodeList.forEach(ch -> ch.closeFuture().awaitUninterruptibly());
569    block.setNumBytes(ackedBlockLength);
570    completeFile(client, namenode, src, clientName, block, fileId);
571  }
572
573  @Override
574  public boolean isBroken() {
575    return state == State.BROKEN;
576  }
577}