001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.asyncfs;
019
020import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.HEART_BEAT_SEQNO;
021import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.READ_TIMEOUT;
022import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.completeFile;
023import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.endFileLease;
024import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.getStatus;
025import static org.apache.hadoop.hbase.util.LocatedBlockHelper.getLocatedBlockLocations;
026import static org.apache.hadoop.hbase.util.NettyFutureUtils.consume;
027import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeWrite;
028import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeWriteAndFlush;
029import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
030import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE;
031import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.WRITER_IDLE;
032
033import com.google.errorprone.annotations.RestrictedApi;
034import java.io.IOException;
035import java.nio.ByteBuffer;
036import java.util.ArrayList;
037import java.util.Collection;
038import java.util.Collections;
039import java.util.EnumSet;
040import java.util.Iterator;
041import java.util.List;
042import java.util.Map;
043import java.util.Set;
044import java.util.concurrent.CompletableFuture;
045import java.util.concurrent.ConcurrentHashMap;
046import java.util.concurrent.ConcurrentLinkedDeque;
047import java.util.concurrent.TimeUnit;
048import java.util.function.Supplier;
049import org.apache.hadoop.conf.Configuration;
050import org.apache.hadoop.crypto.Encryptor;
051import org.apache.hadoop.fs.CreateFlag;
052import org.apache.hadoop.fs.Path;
053import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.CancelOnClose;
054import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
055import org.apache.hadoop.hbase.util.CancelableProgressable;
056import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
057import org.apache.hadoop.hbase.util.FutureUtils;
058import org.apache.hadoop.hbase.util.NettyFutureUtils;
059import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
060import org.apache.hadoop.hdfs.DFSClient;
061import org.apache.hadoop.hdfs.DistributedFileSystem;
062import org.apache.hadoop.hdfs.DummyDFSOutputStream;
063import org.apache.hadoop.hdfs.protocol.ClientProtocol;
064import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
065import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
066import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
067import org.apache.hadoop.hdfs.protocol.LocatedBlock;
068import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
069import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
070import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto;
071import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
072import org.apache.hadoop.util.DataChecksum;
073import org.apache.yetus.audience.InterfaceAudience;
074
075import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
076import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
077import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator;
078import org.apache.hbase.thirdparty.io.netty.channel.Channel;
079import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture;
080import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler.Sharable;
081import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
082import org.apache.hbase.thirdparty.io.netty.channel.ChannelId;
083import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
084import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
085import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
086import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
087
088/**
089 * An asynchronous HDFS output stream implementation which fans out data to datanode and only
090 * supports writing file with only one block.
091 * <p>
092 * Use the createOutput method in {@link FanOutOneBlockAsyncDFSOutputHelper} to create. The main
093 * usage of this class is implementing WAL, so we only expose a little HDFS configurations in the
094 * method. And we place it here under io package because we want to make it independent of WAL
095 * implementation thus easier to move it to HDFS project finally.
096 * <p>
097 * Note that, although we support pipelined flush, i.e, write new data and then flush before the
098 * previous flush succeeds, the implementation is not thread safe, so you should not call its
099 * methods concurrently.
100 * <p>
101 * Advantages compare to DFSOutputStream:
102 * <ol>
103 * <li>The fan out mechanism. This will reduce the latency.</li>
104 * <li>Fail-fast when connection to datanode error. The WAL implementation could open new writer
105 * ASAP.</li>
106 * <li>We could benefit from netty's ByteBuf management mechanism.</li>
107 * </ol>
108 */
109@InterfaceAudience.Private
110public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
111
112  // The MAX_PACKET_SIZE is 16MB, but it includes the header size and checksum size. So here we set
113  // a smaller limit for data size.
114  private static final int MAX_DATA_LEN = 12 * 1024 * 1024;
115
116  private final Configuration conf;
117
118  private final DistributedFileSystem dfs;
119
120  private final DFSClient client;
121
122  private final ClientProtocol namenode;
123
124  private final String clientName;
125
126  private final String src;
127
128  private final HdfsFileStatus stat;
129
130  private final ExtendedBlock block;
131
132  private final DatanodeInfo[] locations;
133
134  private final Encryptor encryptor;
135
136  private final Map<Channel, DatanodeInfo> datanodeInfoMap;
137
138  private final DataChecksum summer;
139
140  private final int maxDataLen;
141
142  private final ByteBufAllocator alloc;
143
144  // a dummy DFSOutputStream used for lease renewal
145  private final DummyDFSOutputStream dummyStream;
146
147  private static final class Callback {
148
149    private final CompletableFuture<Long> future;
150
151    private final long ackedLength;
152
153    // should be backed by a thread safe collection
154    private final Set<ChannelId> unfinishedReplicas;
155    private final long packetDataLen;
156    private final long flushTimestamp;
157    private long lastAckTimestamp = -1;
158
159    public Callback(CompletableFuture<Long> future, long ackedLength,
160      final Collection<Channel> replicas, long packetDataLen) {
161      this.future = future;
162      this.ackedLength = ackedLength;
163      this.packetDataLen = packetDataLen;
164      this.flushTimestamp = EnvironmentEdgeManager.currentTime();
165      if (replicas.isEmpty()) {
166        this.unfinishedReplicas = Collections.emptySet();
167      } else {
168        this.unfinishedReplicas =
169          Collections.newSetFromMap(new ConcurrentHashMap<ChannelId, Boolean>(replicas.size()));
170        replicas.stream().map(Channel::id).forEachOrdered(unfinishedReplicas::add);
171      }
172    }
173  }
174
175  private final ConcurrentLinkedDeque<Callback> waitingAckQueue = new ConcurrentLinkedDeque<>();
176
177  private volatile long ackedBlockLength = 0L;
178
179  // this could be different from acked block length because a packet can not start at the middle of
180  // a chunk.
181  private long nextPacketOffsetInBlock = 0L;
182
183  // the length of the trailing partial chunk, this is because the packet start offset must be
184  // aligned with the length of checksum chunk, so we need to resend the same data.
185  private int trailingPartialChunkLength = 0;
186
187  private long nextPacketSeqno = 0L;
188
189  private ByteBuf buf;
190
191  private final SendBufSizePredictor sendBufSizePRedictor = new SendBufSizePredictor();
192
193  // State for connections to DN
194  private enum State {
195    STREAMING,
196    CLOSING,
197    BROKEN,
198    CLOSED
199  }
200
201  private volatile State state;
202
203  private final StreamSlowMonitor streamSlowMonitor;
204
205  // all lock-free to make it run faster
206  private void completed(Channel channel) {
207    for (Iterator<Callback> iter = waitingAckQueue.iterator(); iter.hasNext();) {
208      Callback c = iter.next();
209      // if the current unfinished replicas does not contain us then it means that we have already
210      // acked this one, let's iterate to find the one we have not acked yet.
211      if (c.unfinishedReplicas.remove(channel.id())) {
212        long current = EnvironmentEdgeManager.currentTime();
213        streamSlowMonitor.checkProcessTimeAndSpeed(datanodeInfoMap.get(channel), c.packetDataLen,
214          current - c.flushTimestamp, c.lastAckTimestamp, c.unfinishedReplicas.size());
215        c.lastAckTimestamp = current;
216        if (c.unfinishedReplicas.isEmpty()) {
217          // we need to remove first before complete the future. It is possible that after we
218          // complete the future the upper layer will call close immediately before we remove the
219          // entry from waitingAckQueue and lead to an IllegalStateException. And also set the
220          // ackedBlockLength first otherwise we may use a wrong length to commit the block. This
221          // may lead to multiple remove and assign but is OK. The semantic of iter.remove is
222          // removing the entry returned by calling previous next, so if the entry has already been
223          // removed then it is a no-op, and for the assign, the values are the same so no problem.
224          iter.remove();
225          ackedBlockLength = c.ackedLength;
226          // the future.complete check is to confirm that we are the only one who grabbed the work,
227          // otherwise just give up and return.
228          if (c.future.complete(c.ackedLength)) {
229            // also wake up flush requests which have the same length.
230            while (iter.hasNext()) {
231              Callback maybeDummyCb = iter.next();
232              if (maybeDummyCb.ackedLength == c.ackedLength) {
233                iter.remove();
234                maybeDummyCb.future.complete(c.ackedLength);
235              } else {
236                break;
237              }
238            }
239          }
240        }
241        return;
242      }
243    }
244  }
245
246  // this usually does not happen which means it is not on the critical path so make it synchronized
247  // so that the implementation will not burn up our brain as there are multiple state changes and
248  // checks.
249  private synchronized void failed(Channel channel, Supplier<Throwable> errorSupplier) {
250    if (state == State.CLOSED) {
251      return;
252    }
253    if (state == State.BROKEN) {
254      failWaitingAckQueue(channel, errorSupplier);
255      return;
256    }
257    if (state == State.CLOSING) {
258      Callback c = waitingAckQueue.peekFirst();
259      if (c == null || !c.unfinishedReplicas.contains(channel.id())) {
260        // nothing, the endBlock request has already finished.
261        return;
262      }
263    }
264    // disable further write, and fail all pending ack.
265    state = State.BROKEN;
266    failWaitingAckQueue(channel, errorSupplier);
267    datanodeInfoMap.keySet().forEach(NettyFutureUtils::safeClose);
268  }
269
270  private void failWaitingAckQueue(Channel channel, Supplier<Throwable> errorSupplier) {
271    Throwable error = errorSupplier.get();
272    for (Iterator<Callback> iter = waitingAckQueue.iterator(); iter.hasNext();) {
273      Callback c = iter.next();
274      // find the first sync request which we have not acked yet and fail all the request after it.
275      if (!c.unfinishedReplicas.contains(channel.id())) {
276        continue;
277      }
278      for (;;) {
279        c.future.completeExceptionally(error);
280        if (!iter.hasNext()) {
281          break;
282        }
283        c = iter.next();
284      }
285      break;
286    }
287  }
288
289  @Sharable
290  private final class AckHandler extends SimpleChannelInboundHandler<PipelineAckProto> {
291
292    private final int timeoutMs;
293
294    public AckHandler(int timeoutMs) {
295      this.timeoutMs = timeoutMs;
296    }
297
298    @Override
299    protected void channelRead0(ChannelHandlerContext ctx, PipelineAckProto ack) throws Exception {
300      Status reply = getStatus(ack);
301      if (reply != Status.SUCCESS) {
302        failed(ctx.channel(), () -> new IOException("Bad response " + reply + " for block " + block
303          + " from datanode " + ctx.channel().remoteAddress()));
304        return;
305      }
306      if (PipelineAck.isRestartOOBStatus(reply)) {
307        failed(ctx.channel(), () -> new IOException("Restart response " + reply + " for block "
308          + block + " from datanode " + ctx.channel().remoteAddress()));
309        return;
310      }
311      if (ack.getSeqno() == HEART_BEAT_SEQNO) {
312        return;
313      }
314      completed(ctx.channel());
315    }
316
317    @Override
318    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
319      if (state == State.CLOSED) {
320        return;
321      }
322      failed(ctx.channel(),
323        () -> new IOException("Connection to " + ctx.channel().remoteAddress() + " closed"));
324    }
325
326    @Override
327    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
328      failed(ctx.channel(), () -> cause);
329    }
330
331    @Override
332    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
333      if (evt instanceof IdleStateEvent) {
334        IdleStateEvent e = (IdleStateEvent) evt;
335        if (e.state() == READER_IDLE) {
336          failed(ctx.channel(),
337            () -> new IOException("Timeout(" + timeoutMs + "ms) waiting for response"));
338        } else if (e.state() == WRITER_IDLE) {
339          PacketHeader heartbeat = new PacketHeader(4, 0, HEART_BEAT_SEQNO, false, 0, false);
340          int len = heartbeat.getSerializedSize();
341          ByteBuf buf = alloc.buffer(len);
342          heartbeat.putInBuffer(buf.nioBuffer(0, len));
343          buf.writerIndex(len);
344          safeWriteAndFlush(ctx.channel(), buf);
345        }
346        return;
347      }
348      super.userEventTriggered(ctx, evt);
349    }
350  }
351
352  private void setupReceiver(int timeoutMs) {
353    AckHandler ackHandler = new AckHandler(timeoutMs);
354    for (Channel ch : datanodeInfoMap.keySet()) {
355      ch.pipeline().addLast(
356        new IdleStateHandler(timeoutMs, timeoutMs / 2, 0, TimeUnit.MILLISECONDS),
357        new ProtobufVarint32FrameDecoder(),
358        new ProtobufDecoder(PipelineAckProto.getDefaultInstance()), ackHandler);
359      ch.config().setAutoRead(true);
360    }
361  }
362
363  FanOutOneBlockAsyncDFSOutput(Configuration conf, DistributedFileSystem dfs, DFSClient client,
364    ClientProtocol namenode, String clientName, String src, HdfsFileStatus stat,
365    EnumSet<CreateFlag> createFlags, LocatedBlock locatedBlock, Encryptor encryptor,
366    Map<Channel, DatanodeInfo> datanodeInfoMap, DataChecksum summer, ByteBufAllocator alloc,
367    StreamSlowMonitor streamSlowMonitor) {
368    this.conf = conf;
369    this.dfs = dfs;
370    this.client = client;
371    this.namenode = namenode;
372    this.stat = stat;
373    this.clientName = clientName;
374    this.src = src;
375    this.block = locatedBlock.getBlock();
376    this.locations = getLocatedBlockLocations(locatedBlock);
377    this.encryptor = encryptor;
378    this.datanodeInfoMap = datanodeInfoMap;
379    this.summer = summer;
380    this.maxDataLen = MAX_DATA_LEN - (MAX_DATA_LEN % summer.getBytesPerChecksum());
381    this.alloc = alloc;
382    this.buf = alloc.directBuffer(sendBufSizePRedictor.initialSize());
383    this.state = State.STREAMING;
384    setupReceiver(conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT));
385    this.streamSlowMonitor = streamSlowMonitor;
386    this.dummyStream = new DummyDFSOutputStream(this, client, src, stat, createFlags, summer);
387  }
388
389  @Override
390  public void writeInt(int i) {
391    buf.ensureWritable(4);
392    buf.writeInt(i);
393  }
394
395  @Override
396  public void write(ByteBuffer bb) {
397    buf.ensureWritable(bb.remaining());
398    buf.writeBytes(bb);
399  }
400
401  @Override
402  public void write(byte[] b) {
403    write(b, 0, b.length);
404  }
405
406  @Override
407  public void write(byte[] b, int off, int len) {
408    buf.ensureWritable(len);
409    buf.writeBytes(b, off, len);
410  }
411
412  @Override
413  public int buffered() {
414    return buf.readableBytes();
415  }
416
417  @Override
418  public DatanodeInfo[] getPipeline() {
419    return locations;
420  }
421
422  private void flushBuffer(CompletableFuture<Long> future, ByteBuf dataBuf,
423    long nextPacketOffsetInBlock, boolean syncBlock) {
424    int dataLen = dataBuf.readableBytes();
425    int chunkLen = summer.getBytesPerChecksum();
426    int trailingPartialChunkLen = dataLen % chunkLen;
427    int numChecks = dataLen / chunkLen + (trailingPartialChunkLen != 0 ? 1 : 0);
428    int checksumLen = numChecks * summer.getChecksumSize();
429    ByteBuf checksumBuf = alloc.directBuffer(checksumLen);
430    summer.calculateChunkedSums(dataBuf.nioBuffer(), checksumBuf.nioBuffer(0, checksumLen));
431    checksumBuf.writerIndex(checksumLen);
432    PacketHeader header = new PacketHeader(4 + checksumLen + dataLen, nextPacketOffsetInBlock,
433      nextPacketSeqno, false, dataLen, syncBlock);
434    int headerLen = header.getSerializedSize();
435    ByteBuf headerBuf = alloc.buffer(headerLen);
436    header.putInBuffer(headerBuf.nioBuffer(0, headerLen));
437    headerBuf.writerIndex(headerLen);
438    Callback c =
439      new Callback(future, nextPacketOffsetInBlock + dataLen, datanodeInfoMap.keySet(), dataLen);
440    waitingAckQueue.addLast(c);
441    // recheck again after we pushed the callback to queue
442    if (state != State.STREAMING && waitingAckQueue.peekFirst() == c) {
443      future.completeExceptionally(new IOException("stream already broken"));
444      // it's the one we have just pushed or just a no-op
445      waitingAckQueue.removeFirst();
446
447      checksumBuf.release();
448      headerBuf.release();
449
450      // This method takes ownership of the dataBuf, so we need release it before returning.
451      dataBuf.release();
452      return;
453    }
454    // TODO: we should perhaps measure time taken per DN here;
455    // we could collect statistics per DN, and/or exclude bad nodes in createOutput.
456    datanodeInfoMap.keySet().forEach(ch -> {
457      safeWrite(ch, headerBuf.retainedDuplicate());
458      safeWrite(ch, checksumBuf.retainedDuplicate());
459      safeWriteAndFlush(ch, dataBuf.retainedDuplicate());
460    });
461    checksumBuf.release();
462    headerBuf.release();
463    dataBuf.release();
464    nextPacketSeqno++;
465  }
466
467  private void flush0(CompletableFuture<Long> future, boolean syncBlock) {
468    if (state != State.STREAMING) {
469      future.completeExceptionally(new IOException("stream already broken"));
470      return;
471    }
472    int dataLen = buf.readableBytes();
473    if (dataLen == trailingPartialChunkLength) {
474      // no new data
475      long lengthAfterFlush = nextPacketOffsetInBlock + dataLen;
476      Callback lastFlush = waitingAckQueue.peekLast();
477      if (lastFlush != null) {
478        Callback c = new Callback(future, lengthAfterFlush, Collections.emptySet(), dataLen);
479        waitingAckQueue.addLast(c);
480        // recheck here if we have already removed the previous callback from the queue
481        if (waitingAckQueue.peekFirst() == c) {
482          // all previous callbacks have been removed
483          // notice that this does mean we will always win here because the background thread may
484          // have already started to mark the future here as completed in the completed or failed
485          // methods but haven't removed it from the queue yet. That's also why the removeFirst
486          // call below may be a no-op.
487          if (state != State.STREAMING) {
488            future.completeExceptionally(new IOException("stream already broken"));
489          } else {
490            future.complete(lengthAfterFlush);
491          }
492          // it's the one we have just pushed or just a no-op
493          waitingAckQueue.removeFirst();
494        }
495      } else {
496        // we must have acked all the data so the ackedBlockLength must be same with
497        // lengthAfterFlush
498        future.complete(lengthAfterFlush);
499      }
500      return;
501    }
502
503    if (encryptor != null) {
504      ByteBuf encryptBuf = alloc.directBuffer(dataLen);
505      buf.readBytes(encryptBuf, trailingPartialChunkLength);
506      int toEncryptLength = dataLen - trailingPartialChunkLength;
507      try {
508        encryptor.encrypt(buf.nioBuffer(trailingPartialChunkLength, toEncryptLength),
509          encryptBuf.nioBuffer(trailingPartialChunkLength, toEncryptLength));
510      } catch (IOException e) {
511        encryptBuf.release();
512        future.completeExceptionally(e);
513        return;
514      }
515      encryptBuf.writerIndex(dataLen);
516      buf.release();
517      buf = encryptBuf;
518    }
519
520    if (dataLen > maxDataLen) {
521      // We need to write out the data by multiple packets as the max packet allowed is 16M.
522      long nextSubPacketOffsetInBlock = nextPacketOffsetInBlock;
523      for (int remaining = dataLen;;) {
524        if (remaining < maxDataLen) {
525          flushBuffer(future, buf.readRetainedSlice(remaining), nextSubPacketOffsetInBlock,
526            syncBlock);
527          break;
528        } else {
529          flushBuffer(new CompletableFuture<>(), buf.readRetainedSlice(maxDataLen),
530            nextSubPacketOffsetInBlock, syncBlock);
531          remaining -= maxDataLen;
532          nextSubPacketOffsetInBlock += maxDataLen;
533        }
534      }
535    } else {
536      flushBuffer(future, buf.retain(), nextPacketOffsetInBlock, syncBlock);
537    }
538    trailingPartialChunkLength = dataLen % summer.getBytesPerChecksum();
539    ByteBuf newBuf = alloc.directBuffer(sendBufSizePRedictor.guess(dataLen))
540      .ensureWritable(trailingPartialChunkLength);
541    if (trailingPartialChunkLength != 0) {
542      buf.readerIndex(dataLen - trailingPartialChunkLength).readBytes(newBuf,
543        trailingPartialChunkLength);
544    }
545    buf.release();
546    this.buf = newBuf;
547    nextPacketOffsetInBlock += dataLen - trailingPartialChunkLength;
548  }
549
550  /**
551   * Flush the buffer out to datanodes.
552   * @param syncBlock will call hsync if true, otherwise hflush.
553   * @return A CompletableFuture that hold the acked length after flushing.
554   */
555  @Override
556  public CompletableFuture<Long> flush(boolean syncBlock) {
557    CompletableFuture<Long> future = new CompletableFuture<>();
558    flush0(future, syncBlock);
559    return future;
560  }
561
562  private void endBlock() throws IOException {
563    Preconditions.checkState(waitingAckQueue.isEmpty(),
564      "should call flush first before calling close");
565    if (state != State.STREAMING) {
566      throw new IOException("stream already broken");
567    }
568    state = State.CLOSING;
569    long finalizedLength = ackedBlockLength;
570    PacketHeader header = new PacketHeader(4, finalizedLength, nextPacketSeqno, true, 0, false);
571    buf.release();
572    buf = null;
573    int headerLen = header.getSerializedSize();
574    ByteBuf headerBuf = alloc.directBuffer(headerLen);
575    header.putInBuffer(headerBuf.nioBuffer(0, headerLen));
576    headerBuf.writerIndex(headerLen);
577    CompletableFuture<Long> future = new CompletableFuture<>();
578    waitingAckQueue.add(new Callback(future, finalizedLength, datanodeInfoMap.keySet(), 0));
579    datanodeInfoMap.keySet().forEach(ch -> safeWriteAndFlush(ch, headerBuf.retainedDuplicate()));
580    headerBuf.release();
581    FutureUtils.get(future);
582  }
583
584  private void closeDataNodeChannelsAndAwait() {
585    List<ChannelFuture> futures = new ArrayList<>();
586    for (Channel ch : datanodeInfoMap.keySet()) {
587      futures.add(ch.close());
588    }
589    for (ChannelFuture future : futures) {
590      consume(future.awaitUninterruptibly());
591    }
592  }
593
594  /**
595   * The close method when error occurred. Now we just call recoverFileLease.
596   */
597  @Override
598  public void recoverAndClose(CancelableProgressable reporter) throws IOException {
599    if (buf != null) {
600      buf.release();
601      buf = null;
602    }
603    closeDataNodeChannelsAndAwait();
604    endFileLease(this);
605    RecoverLeaseFSUtils.recoverFileLease(dfs, new Path(src), conf,
606      reporter == null ? new CancelOnClose(client) : reporter);
607  }
608
609  /**
610   * End the current block and complete file at namenode. You should call
611   * {@link #recoverAndClose(CancelableProgressable)} if this method throws an exception.
612   */
613  @Override
614  public void close() throws IOException {
615    endBlock();
616    state = State.CLOSED;
617    closeDataNodeChannelsAndAwait();
618    block.setNumBytes(ackedBlockLength);
619    completeFile(this, client, namenode, src, clientName, block, stat);
620  }
621
622  @Override
623  public boolean isBroken() {
624    return state == State.BROKEN;
625  }
626
627  @Override
628  public long getSyncedLength() {
629    return this.ackedBlockLength;
630  }
631
632  @RestrictedApi(explanation = "Should only be called in tests", link = "",
633      allowedOnPath = ".*/src/test/.*")
634  Map<Channel, DatanodeInfo> getDatanodeInfoMap() {
635    return this.datanodeInfoMap;
636  }
637
638  DFSClient getClient() {
639    return client;
640  }
641
642  DummyDFSOutputStream getDummyStream() {
643    return dummyStream;
644  }
645
646  boolean isClosed() {
647    return state == State.CLOSED;
648  }
649
650  HdfsFileStatus getStat() {
651    return stat;
652  }
653}