001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.asyncfs;
019
020import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.HEART_BEAT_SEQNO;
021import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.READ_TIMEOUT;
022import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.completeFile;
023import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.endFileLease;
024import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.getStatus;
025import static org.apache.hadoop.hbase.util.NettyFutureUtils.consume;
026import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeWrite;
027import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeWriteAndFlush;
028import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
029import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE;
030import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.WRITER_IDLE;
031
032import com.google.errorprone.annotations.RestrictedApi;
033import java.io.IOException;
034import java.nio.ByteBuffer;
035import java.util.ArrayList;
036import java.util.Collection;
037import java.util.Collections;
038import java.util.Iterator;
039import java.util.List;
040import java.util.Map;
041import java.util.Set;
042import java.util.concurrent.CompletableFuture;
043import java.util.concurrent.ConcurrentHashMap;
044import java.util.concurrent.ConcurrentLinkedDeque;
045import java.util.concurrent.TimeUnit;
046import java.util.function.Supplier;
047import org.apache.hadoop.conf.Configuration;
048import org.apache.hadoop.crypto.Encryptor;
049import org.apache.hadoop.fs.Path;
050import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.CancelOnClose;
051import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
052import org.apache.hadoop.hbase.util.CancelableProgressable;
053import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
054import org.apache.hadoop.hbase.util.FutureUtils;
055import org.apache.hadoop.hbase.util.NettyFutureUtils;
056import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
057import org.apache.hadoop.hdfs.DFSClient;
058import org.apache.hadoop.hdfs.DistributedFileSystem;
059import org.apache.hadoop.hdfs.protocol.ClientProtocol;
060import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
061import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
062import org.apache.hadoop.hdfs.protocol.LocatedBlock;
063import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
064import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
065import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto;
066import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
067import org.apache.hadoop.util.DataChecksum;
068import org.apache.yetus.audience.InterfaceAudience;
069
070import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
071import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
072import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator;
073import org.apache.hbase.thirdparty.io.netty.channel.Channel;
074import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture;
075import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler.Sharable;
076import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
077import org.apache.hbase.thirdparty.io.netty.channel.ChannelId;
078import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
079import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
080import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
081import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
082
083/**
084 * An asynchronous HDFS output stream implementation which fans out data to datanode and only
085 * supports writing file with only one block.
086 * <p>
087 * Use the createOutput method in {@link FanOutOneBlockAsyncDFSOutputHelper} to create. The mainly
088 * usage of this class is implementing WAL, so we only expose a little HDFS configurations in the
089 * method. And we place it here under io package because we want to make it independent of WAL
090 * implementation thus easier to move it to HDFS project finally.
091 * <p>
092 * Note that, although we support pipelined flush, i.e, write new data and then flush before the
093 * previous flush succeeds, the implementation is not thread safe, so you should not call its
094 * methods concurrently.
095 * <p>
096 * Advantages compare to DFSOutputStream:
097 * <ol>
098 * <li>The fan out mechanism. This will reduce the latency.</li>
099 * <li>Fail-fast when connection to datanode error. The WAL implementation could open new writer
100 * ASAP.</li>
101 * <li>We could benefit from netty's ByteBuf management mechanism.</li>
102 * </ol>
103 */
104@InterfaceAudience.Private
105public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
106
107  // The MAX_PACKET_SIZE is 16MB but it include the header size and checksum size. So here we set a
108  // smaller limit for data size.
109  private static final int MAX_DATA_LEN = 12 * 1024 * 1024;
110
111  private final Configuration conf;
112
113  private final DistributedFileSystem dfs;
114
115  private final DFSClient client;
116
117  private final ClientProtocol namenode;
118
119  private final String clientName;
120
121  private final String src;
122
123  private final long fileId;
124
125  private final ExtendedBlock block;
126
127  private final DatanodeInfo[] locations;
128
129  private final Encryptor encryptor;
130
131  private final Map<Channel, DatanodeInfo> datanodeInfoMap;
132
133  private final DataChecksum summer;
134
135  private final int maxDataLen;
136
137  private final ByteBufAllocator alloc;
138
139  private static final class Callback {
140
141    private final CompletableFuture<Long> future;
142
143    private final long ackedLength;
144
145    // should be backed by a thread safe collection
146    private final Set<ChannelId> unfinishedReplicas;
147    private final long packetDataLen;
148    private final long flushTimestamp;
149    private long lastAckTimestamp = -1;
150
151    public Callback(CompletableFuture<Long> future, long ackedLength,
152      final Collection<Channel> replicas, long packetDataLen) {
153      this.future = future;
154      this.ackedLength = ackedLength;
155      this.packetDataLen = packetDataLen;
156      this.flushTimestamp = EnvironmentEdgeManager.currentTime();
157      if (replicas.isEmpty()) {
158        this.unfinishedReplicas = Collections.emptySet();
159      } else {
160        this.unfinishedReplicas =
161          Collections.newSetFromMap(new ConcurrentHashMap<ChannelId, Boolean>(replicas.size()));
162        replicas.stream().map(Channel::id).forEachOrdered(unfinishedReplicas::add);
163      }
164    }
165  }
166
167  private final ConcurrentLinkedDeque<Callback> waitingAckQueue = new ConcurrentLinkedDeque<>();
168
169  private volatile long ackedBlockLength = 0L;
170
171  // this could be different from acked block length because a packet can not start at the middle of
172  // a chunk.
173  private long nextPacketOffsetInBlock = 0L;
174
175  // the length of the trailing partial chunk, this is because the packet start offset must be
176  // aligned with the length of checksum chunk so we need to resend the same data.
177  private int trailingPartialChunkLength = 0;
178
179  private long nextPacketSeqno = 0L;
180
181  private ByteBuf buf;
182
183  private final SendBufSizePredictor sendBufSizePRedictor = new SendBufSizePredictor();
184
185  // State for connections to DN
186  private enum State {
187    STREAMING,
188    CLOSING,
189    BROKEN,
190    CLOSED
191  }
192
193  private volatile State state;
194
195  private final StreamSlowMonitor streamSlowMonitor;
196
197  // all lock-free to make it run faster
198  private void completed(Channel channel) {
199    for (Iterator<Callback> iter = waitingAckQueue.iterator(); iter.hasNext();) {
200      Callback c = iter.next();
201      // if the current unfinished replicas does not contain us then it means that we have already
202      // acked this one, let's iterate to find the one we have not acked yet.
203      if (c.unfinishedReplicas.remove(channel.id())) {
204        long current = EnvironmentEdgeManager.currentTime();
205        streamSlowMonitor.checkProcessTimeAndSpeed(datanodeInfoMap.get(channel), c.packetDataLen,
206          current - c.flushTimestamp, c.lastAckTimestamp, c.unfinishedReplicas.size());
207        c.lastAckTimestamp = current;
208        if (c.unfinishedReplicas.isEmpty()) {
209          // we need to remove first before complete the future. It is possible that after we
210          // complete the future the upper layer will call close immediately before we remove the
211          // entry from waitingAckQueue and lead to an IllegalStateException. And also set the
212          // ackedBlockLength first otherwise we may use a wrong length to commit the block. This
213          // may lead to multiple remove and assign but is OK. The semantic of iter.remove is
214          // removing the entry returned by calling previous next, so if the entry has already been
215          // removed then it is a no-op, and for the assign, the values are the same so no problem.
216          iter.remove();
217          ackedBlockLength = c.ackedLength;
218          // the future.complete check is to confirm that we are the only one who grabbed the work,
219          // otherwise just give up and return.
220          if (c.future.complete(c.ackedLength)) {
221            // also wake up flush requests which have the same length.
222            while (iter.hasNext()) {
223              Callback maybeDummyCb = iter.next();
224              if (maybeDummyCb.ackedLength == c.ackedLength) {
225                iter.remove();
226                maybeDummyCb.future.complete(c.ackedLength);
227              } else {
228                break;
229              }
230            }
231          }
232        }
233        return;
234      }
235    }
236  }
237
238  // this usually does not happen which means it is not on the critical path so make it synchronized
239  // so that the implementation will not burn up our brain as there are multiple state changes and
240  // checks.
241  private synchronized void failed(Channel channel, Supplier<Throwable> errorSupplier) {
242    if (state == State.CLOSED) {
243      return;
244    }
245    if (state == State.BROKEN) {
246      failWaitingAckQueue(channel, errorSupplier);
247      return;
248    }
249    if (state == State.CLOSING) {
250      Callback c = waitingAckQueue.peekFirst();
251      if (c == null || !c.unfinishedReplicas.contains(channel.id())) {
252        // nothing, the endBlock request has already finished.
253        return;
254      }
255    }
256    // disable further write, and fail all pending ack.
257    state = State.BROKEN;
258    failWaitingAckQueue(channel, errorSupplier);
259    datanodeInfoMap.keySet().forEach(NettyFutureUtils::safeClose);
260  }
261
262  private void failWaitingAckQueue(Channel channel, Supplier<Throwable> errorSupplier) {
263    Throwable error = errorSupplier.get();
264    for (Iterator<Callback> iter = waitingAckQueue.iterator(); iter.hasNext();) {
265      Callback c = iter.next();
266      // find the first sync request which we have not acked yet and fail all the request after it.
267      if (!c.unfinishedReplicas.contains(channel.id())) {
268        continue;
269      }
270      for (;;) {
271        c.future.completeExceptionally(error);
272        if (!iter.hasNext()) {
273          break;
274        }
275        c = iter.next();
276      }
277      break;
278    }
279  }
280
281  @Sharable
282  private final class AckHandler extends SimpleChannelInboundHandler<PipelineAckProto> {
283
284    private final int timeoutMs;
285
286    public AckHandler(int timeoutMs) {
287      this.timeoutMs = timeoutMs;
288    }
289
290    @Override
291    protected void channelRead0(ChannelHandlerContext ctx, PipelineAckProto ack) throws Exception {
292      Status reply = getStatus(ack);
293      if (reply != Status.SUCCESS) {
294        failed(ctx.channel(), () -> new IOException("Bad response " + reply + " for block " + block
295          + " from datanode " + ctx.channel().remoteAddress()));
296        return;
297      }
298      if (PipelineAck.isRestartOOBStatus(reply)) {
299        failed(ctx.channel(), () -> new IOException("Restart response " + reply + " for block "
300          + block + " from datanode " + ctx.channel().remoteAddress()));
301        return;
302      }
303      if (ack.getSeqno() == HEART_BEAT_SEQNO) {
304        return;
305      }
306      completed(ctx.channel());
307    }
308
309    @Override
310    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
311      if (state == State.CLOSED) {
312        return;
313      }
314      failed(ctx.channel(),
315        () -> new IOException("Connection to " + ctx.channel().remoteAddress() + " closed"));
316    }
317
318    @Override
319    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
320      failed(ctx.channel(), () -> cause);
321    }
322
323    @Override
324    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
325      if (evt instanceof IdleStateEvent) {
326        IdleStateEvent e = (IdleStateEvent) evt;
327        if (e.state() == READER_IDLE) {
328          failed(ctx.channel(),
329            () -> new IOException("Timeout(" + timeoutMs + "ms) waiting for response"));
330        } else if (e.state() == WRITER_IDLE) {
331          PacketHeader heartbeat = new PacketHeader(4, 0, HEART_BEAT_SEQNO, false, 0, false);
332          int len = heartbeat.getSerializedSize();
333          ByteBuf buf = alloc.buffer(len);
334          heartbeat.putInBuffer(buf.nioBuffer(0, len));
335          buf.writerIndex(len);
336          safeWriteAndFlush(ctx.channel(), buf);
337        }
338        return;
339      }
340      super.userEventTriggered(ctx, evt);
341    }
342  }
343
344  private void setupReceiver(int timeoutMs) {
345    AckHandler ackHandler = new AckHandler(timeoutMs);
346    for (Channel ch : datanodeInfoMap.keySet()) {
347      ch.pipeline().addLast(
348        new IdleStateHandler(timeoutMs, timeoutMs / 2, 0, TimeUnit.MILLISECONDS),
349        new ProtobufVarint32FrameDecoder(),
350        new ProtobufDecoder(PipelineAckProto.getDefaultInstance()), ackHandler);
351      ch.config().setAutoRead(true);
352    }
353  }
354
355  FanOutOneBlockAsyncDFSOutput(Configuration conf, DistributedFileSystem dfs, DFSClient client,
356    ClientProtocol namenode, String clientName, String src, long fileId, LocatedBlock locatedBlock,
357    Encryptor encryptor, Map<Channel, DatanodeInfo> datanodeInfoMap, DataChecksum summer,
358    ByteBufAllocator alloc, StreamSlowMonitor streamSlowMonitor) {
359    this.conf = conf;
360    this.dfs = dfs;
361    this.client = client;
362    this.namenode = namenode;
363    this.fileId = fileId;
364    this.clientName = clientName;
365    this.src = src;
366    this.block = locatedBlock.getBlock();
367    this.locations = locatedBlock.getLocations();
368    this.encryptor = encryptor;
369    this.datanodeInfoMap = datanodeInfoMap;
370    this.summer = summer;
371    this.maxDataLen = MAX_DATA_LEN - (MAX_DATA_LEN % summer.getBytesPerChecksum());
372    this.alloc = alloc;
373    this.buf = alloc.directBuffer(sendBufSizePRedictor.initialSize());
374    this.state = State.STREAMING;
375    setupReceiver(conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT));
376    this.streamSlowMonitor = streamSlowMonitor;
377  }
378
379  @Override
380  public void writeInt(int i) {
381    buf.ensureWritable(4);
382    buf.writeInt(i);
383  }
384
385  @Override
386  public void write(ByteBuffer bb) {
387    buf.ensureWritable(bb.remaining());
388    buf.writeBytes(bb);
389  }
390
391  @Override
392  public void write(byte[] b) {
393    write(b, 0, b.length);
394  }
395
396  @Override
397  public void write(byte[] b, int off, int len) {
398    buf.ensureWritable(len);
399    buf.writeBytes(b, off, len);
400  }
401
402  @Override
403  public int buffered() {
404    return buf.readableBytes();
405  }
406
407  @Override
408  public DatanodeInfo[] getPipeline() {
409    return locations;
410  }
411
412  private void flushBuffer(CompletableFuture<Long> future, ByteBuf dataBuf,
413    long nextPacketOffsetInBlock, boolean syncBlock) {
414    int dataLen = dataBuf.readableBytes();
415    int chunkLen = summer.getBytesPerChecksum();
416    int trailingPartialChunkLen = dataLen % chunkLen;
417    int numChecks = dataLen / chunkLen + (trailingPartialChunkLen != 0 ? 1 : 0);
418    int checksumLen = numChecks * summer.getChecksumSize();
419    ByteBuf checksumBuf = alloc.directBuffer(checksumLen);
420    summer.calculateChunkedSums(dataBuf.nioBuffer(), checksumBuf.nioBuffer(0, checksumLen));
421    checksumBuf.writerIndex(checksumLen);
422    PacketHeader header = new PacketHeader(4 + checksumLen + dataLen, nextPacketOffsetInBlock,
423      nextPacketSeqno, false, dataLen, syncBlock);
424    int headerLen = header.getSerializedSize();
425    ByteBuf headerBuf = alloc.buffer(headerLen);
426    header.putInBuffer(headerBuf.nioBuffer(0, headerLen));
427    headerBuf.writerIndex(headerLen);
428    Callback c =
429      new Callback(future, nextPacketOffsetInBlock + dataLen, datanodeInfoMap.keySet(), dataLen);
430    waitingAckQueue.addLast(c);
431    // recheck again after we pushed the callback to queue
432    if (state != State.STREAMING && waitingAckQueue.peekFirst() == c) {
433      future.completeExceptionally(new IOException("stream already broken"));
434      // it's the one we have just pushed or just a no-op
435      waitingAckQueue.removeFirst();
436
437      checksumBuf.release();
438      headerBuf.release();
439
440      // This method takes ownership of the dataBuf so we need release it before returning.
441      dataBuf.release();
442      return;
443    }
444    // TODO: we should perhaps measure time taken per DN here;
445    // we could collect statistics per DN, and/or exclude bad nodes in createOutput.
446    datanodeInfoMap.keySet().forEach(ch -> {
447      safeWrite(ch, headerBuf.retainedDuplicate());
448      safeWrite(ch, checksumBuf.retainedDuplicate());
449      safeWriteAndFlush(ch, dataBuf.retainedDuplicate());
450    });
451    checksumBuf.release();
452    headerBuf.release();
453    dataBuf.release();
454    nextPacketSeqno++;
455  }
456
457  private void flush0(CompletableFuture<Long> future, boolean syncBlock) {
458    if (state != State.STREAMING) {
459      future.completeExceptionally(new IOException("stream already broken"));
460      return;
461    }
462    int dataLen = buf.readableBytes();
463    if (dataLen == trailingPartialChunkLength) {
464      // no new data
465      long lengthAfterFlush = nextPacketOffsetInBlock + dataLen;
466      Callback lastFlush = waitingAckQueue.peekLast();
467      if (lastFlush != null) {
468        Callback c = new Callback(future, lengthAfterFlush, Collections.emptySet(), dataLen);
469        waitingAckQueue.addLast(c);
470        // recheck here if we have already removed the previous callback from the queue
471        if (waitingAckQueue.peekFirst() == c) {
472          // all previous callbacks have been removed
473          // notice that this does mean we will always win here because the background thread may
474          // have already started to mark the future here as completed in the completed or failed
475          // methods but haven't removed it from the queue yet. That's also why the removeFirst
476          // call below may be a no-op.
477          if (state != State.STREAMING) {
478            future.completeExceptionally(new IOException("stream already broken"));
479          } else {
480            future.complete(lengthAfterFlush);
481          }
482          // it's the one we have just pushed or just a no-op
483          waitingAckQueue.removeFirst();
484        }
485      } else {
486        // we must have acked all the data so the ackedBlockLength must be same with
487        // lengthAfterFlush
488        future.complete(lengthAfterFlush);
489      }
490      return;
491    }
492
493    if (encryptor != null) {
494      ByteBuf encryptBuf = alloc.directBuffer(dataLen);
495      buf.readBytes(encryptBuf, trailingPartialChunkLength);
496      int toEncryptLength = dataLen - trailingPartialChunkLength;
497      try {
498        encryptor.encrypt(buf.nioBuffer(trailingPartialChunkLength, toEncryptLength),
499          encryptBuf.nioBuffer(trailingPartialChunkLength, toEncryptLength));
500      } catch (IOException e) {
501        encryptBuf.release();
502        future.completeExceptionally(e);
503        return;
504      }
505      encryptBuf.writerIndex(dataLen);
506      buf.release();
507      buf = encryptBuf;
508    }
509
510    if (dataLen > maxDataLen) {
511      // We need to write out the data by multiple packets as the max packet allowed is 16M.
512      long nextSubPacketOffsetInBlock = nextPacketOffsetInBlock;
513      for (int remaining = dataLen;;) {
514        if (remaining < maxDataLen) {
515          flushBuffer(future, buf.readRetainedSlice(remaining), nextSubPacketOffsetInBlock,
516            syncBlock);
517          break;
518        } else {
519          flushBuffer(new CompletableFuture<>(), buf.readRetainedSlice(maxDataLen),
520            nextSubPacketOffsetInBlock, syncBlock);
521          remaining -= maxDataLen;
522          nextSubPacketOffsetInBlock += maxDataLen;
523        }
524      }
525    } else {
526      flushBuffer(future, buf.retain(), nextPacketOffsetInBlock, syncBlock);
527    }
528    trailingPartialChunkLength = dataLen % summer.getBytesPerChecksum();
529    ByteBuf newBuf = alloc.directBuffer(sendBufSizePRedictor.guess(dataLen))
530      .ensureWritable(trailingPartialChunkLength);
531    if (trailingPartialChunkLength != 0) {
532      buf.readerIndex(dataLen - trailingPartialChunkLength).readBytes(newBuf,
533        trailingPartialChunkLength);
534    }
535    buf.release();
536    this.buf = newBuf;
537    nextPacketOffsetInBlock += dataLen - trailingPartialChunkLength;
538  }
539
540  /**
541   * Flush the buffer out to datanodes.
542   * @param syncBlock will call hsync if true, otherwise hflush.
543   * @return A CompletableFuture that hold the acked length after flushing.
544   */
545  @Override
546  public CompletableFuture<Long> flush(boolean syncBlock) {
547    CompletableFuture<Long> future = new CompletableFuture<>();
548    flush0(future, syncBlock);
549    return future;
550  }
551
552  private void endBlock() throws IOException {
553    Preconditions.checkState(waitingAckQueue.isEmpty(),
554      "should call flush first before calling close");
555    if (state != State.STREAMING) {
556      throw new IOException("stream already broken");
557    }
558    state = State.CLOSING;
559    long finalizedLength = ackedBlockLength;
560    PacketHeader header = new PacketHeader(4, finalizedLength, nextPacketSeqno, true, 0, false);
561    buf.release();
562    buf = null;
563    int headerLen = header.getSerializedSize();
564    ByteBuf headerBuf = alloc.directBuffer(headerLen);
565    header.putInBuffer(headerBuf.nioBuffer(0, headerLen));
566    headerBuf.writerIndex(headerLen);
567    CompletableFuture<Long> future = new CompletableFuture<>();
568    waitingAckQueue.add(new Callback(future, finalizedLength, datanodeInfoMap.keySet(), 0));
569    datanodeInfoMap.keySet().forEach(ch -> safeWriteAndFlush(ch, headerBuf.retainedDuplicate()));
570    headerBuf.release();
571    FutureUtils.get(future);
572  }
573
574  private void closeDataNodeChannelsAndAwait() {
575    List<ChannelFuture> futures = new ArrayList<>();
576    for (Channel ch : datanodeInfoMap.keySet()) {
577      futures.add(ch.close());
578    }
579    for (ChannelFuture future : futures) {
580      consume(future.awaitUninterruptibly());
581    }
582  }
583
584  /**
585   * The close method when error occurred. Now we just call recoverFileLease.
586   */
587  @Override
588  public void recoverAndClose(CancelableProgressable reporter) throws IOException {
589    if (buf != null) {
590      buf.release();
591      buf = null;
592    }
593    closeDataNodeChannelsAndAwait();
594    endFileLease(client, fileId);
595    RecoverLeaseFSUtils.recoverFileLease(dfs, new Path(src), conf,
596      reporter == null ? new CancelOnClose(client) : reporter);
597  }
598
599  /**
600   * End the current block and complete file at namenode. You should call
601   * {@link #recoverAndClose(CancelableProgressable)} if this method throws an exception.
602   */
603  @Override
604  public void close() throws IOException {
605    endBlock();
606    state = State.CLOSED;
607    closeDataNodeChannelsAndAwait();
608    block.setNumBytes(ackedBlockLength);
609    completeFile(client, namenode, src, clientName, block, fileId);
610  }
611
612  @Override
613  public boolean isBroken() {
614    return state == State.BROKEN;
615  }
616
617  @Override
618  public long getSyncedLength() {
619    return this.ackedBlockLength;
620  }
621
622  @RestrictedApi(explanation = "Should only be called in tests", link = "",
623      allowedOnPath = ".*/src/test/.*")
624  Map<Channel, DatanodeInfo> getDatanodeInfoMap() {
625    return this.datanodeInfoMap;
626  }
627}