001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.asyncfs;
019
020import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.HEART_BEAT_SEQNO;
021import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.READ_TIMEOUT;
022import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.completeFile;
023import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.endFileLease;
024import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.getStatus;
025import static org.apache.hadoop.hbase.util.LocatedBlockHelper.getLocatedBlockLocations;
026import static org.apache.hadoop.hbase.util.NettyFutureUtils.consume;
027import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeWrite;
028import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeWriteAndFlush;
029import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
030import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE;
031import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.WRITER_IDLE;
032
033import com.google.errorprone.annotations.RestrictedApi;
034import java.io.IOException;
035import java.nio.ByteBuffer;
036import java.util.ArrayList;
037import java.util.Collection;
038import java.util.Collections;
039import java.util.Iterator;
040import java.util.List;
041import java.util.Map;
042import java.util.Set;
043import java.util.concurrent.CompletableFuture;
044import java.util.concurrent.ConcurrentHashMap;
045import java.util.concurrent.ConcurrentLinkedDeque;
046import java.util.concurrent.TimeUnit;
047import java.util.function.Supplier;
048import org.apache.hadoop.conf.Configuration;
049import org.apache.hadoop.crypto.Encryptor;
050import org.apache.hadoop.fs.Path;
051import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.CancelOnClose;
052import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
053import org.apache.hadoop.hbase.util.CancelableProgressable;
054import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
055import org.apache.hadoop.hbase.util.FutureUtils;
056import org.apache.hadoop.hbase.util.NettyFutureUtils;
057import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
058import org.apache.hadoop.hdfs.DFSClient;
059import org.apache.hadoop.hdfs.DistributedFileSystem;
060import org.apache.hadoop.hdfs.protocol.ClientProtocol;
061import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
062import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
063import org.apache.hadoop.hdfs.protocol.LocatedBlock;
064import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
065import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
066import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto;
067import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
068import org.apache.hadoop.util.DataChecksum;
069import org.apache.yetus.audience.InterfaceAudience;
070
071import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
072import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
073import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator;
074import org.apache.hbase.thirdparty.io.netty.channel.Channel;
075import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture;
076import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler.Sharable;
077import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
078import org.apache.hbase.thirdparty.io.netty.channel.ChannelId;
079import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
080import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
081import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
082import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
083
084/**
085 * An asynchronous HDFS output stream implementation which fans out data to datanode and only
086 * supports writing file with only one block.
087 * <p>
088 * Use the createOutput method in {@link FanOutOneBlockAsyncDFSOutputHelper} to create. The main
089 * usage of this class is implementing WAL, so we only expose a little HDFS configurations in the
090 * method. And we place it here under io package because we want to make it independent of WAL
091 * implementation thus easier to move it to HDFS project finally.
092 * <p>
093 * Note that, although we support pipelined flush, i.e, write new data and then flush before the
094 * previous flush succeeds, the implementation is not thread safe, so you should not call its
095 * methods concurrently.
096 * <p>
097 * Advantages compare to DFSOutputStream:
098 * <ol>
099 * <li>The fan out mechanism. This will reduce the latency.</li>
100 * <li>Fail-fast when connection to datanode error. The WAL implementation could open new writer
101 * ASAP.</li>
102 * <li>We could benefit from netty's ByteBuf management mechanism.</li>
103 * </ol>
104 */
105@InterfaceAudience.Private
106public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
107
108  // The MAX_PACKET_SIZE is 16MB, but it includes the header size and checksum size. So here we set
109  // a smaller limit for data size.
110  private static final int MAX_DATA_LEN = 12 * 1024 * 1024;
111
112  private final Configuration conf;
113
114  private final DistributedFileSystem dfs;
115
116  private final DFSClient client;
117
118  private final ClientProtocol namenode;
119
120  private final String clientName;
121
122  private final String src;
123
124  private final long fileId;
125
126  private final ExtendedBlock block;
127
128  private final DatanodeInfo[] locations;
129
130  private final Encryptor encryptor;
131
132  private final Map<Channel, DatanodeInfo> datanodeInfoMap;
133
134  private final DataChecksum summer;
135
136  private final int maxDataLen;
137
138  private final ByteBufAllocator alloc;
139
140  private static final class Callback {
141
142    private final CompletableFuture<Long> future;
143
144    private final long ackedLength;
145
146    // should be backed by a thread safe collection
147    private final Set<ChannelId> unfinishedReplicas;
148    private final long packetDataLen;
149    private final long flushTimestamp;
150    private long lastAckTimestamp = -1;
151
152    public Callback(CompletableFuture<Long> future, long ackedLength,
153      final Collection<Channel> replicas, long packetDataLen) {
154      this.future = future;
155      this.ackedLength = ackedLength;
156      this.packetDataLen = packetDataLen;
157      this.flushTimestamp = EnvironmentEdgeManager.currentTime();
158      if (replicas.isEmpty()) {
159        this.unfinishedReplicas = Collections.emptySet();
160      } else {
161        this.unfinishedReplicas =
162          Collections.newSetFromMap(new ConcurrentHashMap<ChannelId, Boolean>(replicas.size()));
163        replicas.stream().map(Channel::id).forEachOrdered(unfinishedReplicas::add);
164      }
165    }
166  }
167
168  private final ConcurrentLinkedDeque<Callback> waitingAckQueue = new ConcurrentLinkedDeque<>();
169
170  private volatile long ackedBlockLength = 0L;
171
172  // this could be different from acked block length because a packet can not start at the middle of
173  // a chunk.
174  private long nextPacketOffsetInBlock = 0L;
175
176  // the length of the trailing partial chunk, this is because the packet start offset must be
177  // aligned with the length of checksum chunk, so we need to resend the same data.
178  private int trailingPartialChunkLength = 0;
179
180  private long nextPacketSeqno = 0L;
181
182  private ByteBuf buf;
183
184  private final SendBufSizePredictor sendBufSizePRedictor = new SendBufSizePredictor();
185
186  // State for connections to DN
187  private enum State {
188    STREAMING,
189    CLOSING,
190    BROKEN,
191    CLOSED
192  }
193
194  private volatile State state;
195
196  private final StreamSlowMonitor streamSlowMonitor;
197
198  // all lock-free to make it run faster
199  private void completed(Channel channel) {
200    for (Iterator<Callback> iter = waitingAckQueue.iterator(); iter.hasNext();) {
201      Callback c = iter.next();
202      // if the current unfinished replicas does not contain us then it means that we have already
203      // acked this one, let's iterate to find the one we have not acked yet.
204      if (c.unfinishedReplicas.remove(channel.id())) {
205        long current = EnvironmentEdgeManager.currentTime();
206        streamSlowMonitor.checkProcessTimeAndSpeed(datanodeInfoMap.get(channel), c.packetDataLen,
207          current - c.flushTimestamp, c.lastAckTimestamp, c.unfinishedReplicas.size());
208        c.lastAckTimestamp = current;
209        if (c.unfinishedReplicas.isEmpty()) {
210          // we need to remove first before complete the future. It is possible that after we
211          // complete the future the upper layer will call close immediately before we remove the
212          // entry from waitingAckQueue and lead to an IllegalStateException. And also set the
213          // ackedBlockLength first otherwise we may use a wrong length to commit the block. This
214          // may lead to multiple remove and assign but is OK. The semantic of iter.remove is
215          // removing the entry returned by calling previous next, so if the entry has already been
216          // removed then it is a no-op, and for the assign, the values are the same so no problem.
217          iter.remove();
218          ackedBlockLength = c.ackedLength;
219          // the future.complete check is to confirm that we are the only one who grabbed the work,
220          // otherwise just give up and return.
221          if (c.future.complete(c.ackedLength)) {
222            // also wake up flush requests which have the same length.
223            while (iter.hasNext()) {
224              Callback maybeDummyCb = iter.next();
225              if (maybeDummyCb.ackedLength == c.ackedLength) {
226                iter.remove();
227                maybeDummyCb.future.complete(c.ackedLength);
228              } else {
229                break;
230              }
231            }
232          }
233        }
234        return;
235      }
236    }
237  }
238
239  // this usually does not happen which means it is not on the critical path so make it synchronized
240  // so that the implementation will not burn up our brain as there are multiple state changes and
241  // checks.
242  private synchronized void failed(Channel channel, Supplier<Throwable> errorSupplier) {
243    if (state == State.CLOSED) {
244      return;
245    }
246    if (state == State.BROKEN) {
247      failWaitingAckQueue(channel, errorSupplier);
248      return;
249    }
250    if (state == State.CLOSING) {
251      Callback c = waitingAckQueue.peekFirst();
252      if (c == null || !c.unfinishedReplicas.contains(channel.id())) {
253        // nothing, the endBlock request has already finished.
254        return;
255      }
256    }
257    // disable further write, and fail all pending ack.
258    state = State.BROKEN;
259    failWaitingAckQueue(channel, errorSupplier);
260    datanodeInfoMap.keySet().forEach(NettyFutureUtils::safeClose);
261  }
262
263  private void failWaitingAckQueue(Channel channel, Supplier<Throwable> errorSupplier) {
264    Throwable error = errorSupplier.get();
265    for (Iterator<Callback> iter = waitingAckQueue.iterator(); iter.hasNext();) {
266      Callback c = iter.next();
267      // find the first sync request which we have not acked yet and fail all the request after it.
268      if (!c.unfinishedReplicas.contains(channel.id())) {
269        continue;
270      }
271      for (;;) {
272        c.future.completeExceptionally(error);
273        if (!iter.hasNext()) {
274          break;
275        }
276        c = iter.next();
277      }
278      break;
279    }
280  }
281
282  @Sharable
283  private final class AckHandler extends SimpleChannelInboundHandler<PipelineAckProto> {
284
285    private final int timeoutMs;
286
287    public AckHandler(int timeoutMs) {
288      this.timeoutMs = timeoutMs;
289    }
290
291    @Override
292    protected void channelRead0(ChannelHandlerContext ctx, PipelineAckProto ack) throws Exception {
293      Status reply = getStatus(ack);
294      if (reply != Status.SUCCESS) {
295        failed(ctx.channel(), () -> new IOException("Bad response " + reply + " for block " + block
296          + " from datanode " + ctx.channel().remoteAddress()));
297        return;
298      }
299      if (PipelineAck.isRestartOOBStatus(reply)) {
300        failed(ctx.channel(), () -> new IOException("Restart response " + reply + " for block "
301          + block + " from datanode " + ctx.channel().remoteAddress()));
302        return;
303      }
304      if (ack.getSeqno() == HEART_BEAT_SEQNO) {
305        return;
306      }
307      completed(ctx.channel());
308    }
309
310    @Override
311    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
312      if (state == State.CLOSED) {
313        return;
314      }
315      failed(ctx.channel(),
316        () -> new IOException("Connection to " + ctx.channel().remoteAddress() + " closed"));
317    }
318
319    @Override
320    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
321      failed(ctx.channel(), () -> cause);
322    }
323
324    @Override
325    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
326      if (evt instanceof IdleStateEvent) {
327        IdleStateEvent e = (IdleStateEvent) evt;
328        if (e.state() == READER_IDLE) {
329          failed(ctx.channel(),
330            () -> new IOException("Timeout(" + timeoutMs + "ms) waiting for response"));
331        } else if (e.state() == WRITER_IDLE) {
332          PacketHeader heartbeat = new PacketHeader(4, 0, HEART_BEAT_SEQNO, false, 0, false);
333          int len = heartbeat.getSerializedSize();
334          ByteBuf buf = alloc.buffer(len);
335          heartbeat.putInBuffer(buf.nioBuffer(0, len));
336          buf.writerIndex(len);
337          safeWriteAndFlush(ctx.channel(), buf);
338        }
339        return;
340      }
341      super.userEventTriggered(ctx, evt);
342    }
343  }
344
345  private void setupReceiver(int timeoutMs) {
346    AckHandler ackHandler = new AckHandler(timeoutMs);
347    for (Channel ch : datanodeInfoMap.keySet()) {
348      ch.pipeline().addLast(
349        new IdleStateHandler(timeoutMs, timeoutMs / 2, 0, TimeUnit.MILLISECONDS),
350        new ProtobufVarint32FrameDecoder(),
351        new ProtobufDecoder(PipelineAckProto.getDefaultInstance()), ackHandler);
352      ch.config().setAutoRead(true);
353    }
354  }
355
356  FanOutOneBlockAsyncDFSOutput(Configuration conf, DistributedFileSystem dfs, DFSClient client,
357    ClientProtocol namenode, String clientName, String src, long fileId, LocatedBlock locatedBlock,
358    Encryptor encryptor, Map<Channel, DatanodeInfo> datanodeInfoMap, DataChecksum summer,
359    ByteBufAllocator alloc, StreamSlowMonitor streamSlowMonitor) {
360    this.conf = conf;
361    this.dfs = dfs;
362    this.client = client;
363    this.namenode = namenode;
364    this.fileId = fileId;
365    this.clientName = clientName;
366    this.src = src;
367    this.block = locatedBlock.getBlock();
368    this.locations = getLocatedBlockLocations(locatedBlock);
369    this.encryptor = encryptor;
370    this.datanodeInfoMap = datanodeInfoMap;
371    this.summer = summer;
372    this.maxDataLen = MAX_DATA_LEN - (MAX_DATA_LEN % summer.getBytesPerChecksum());
373    this.alloc = alloc;
374    this.buf = alloc.directBuffer(sendBufSizePRedictor.initialSize());
375    this.state = State.STREAMING;
376    setupReceiver(conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT));
377    this.streamSlowMonitor = streamSlowMonitor;
378  }
379
380  @Override
381  public void writeInt(int i) {
382    buf.ensureWritable(4);
383    buf.writeInt(i);
384  }
385
386  @Override
387  public void write(ByteBuffer bb) {
388    buf.ensureWritable(bb.remaining());
389    buf.writeBytes(bb);
390  }
391
392  @Override
393  public void write(byte[] b) {
394    write(b, 0, b.length);
395  }
396
397  @Override
398  public void write(byte[] b, int off, int len) {
399    buf.ensureWritable(len);
400    buf.writeBytes(b, off, len);
401  }
402
403  @Override
404  public int buffered() {
405    return buf.readableBytes();
406  }
407
408  @Override
409  public DatanodeInfo[] getPipeline() {
410    return locations;
411  }
412
413  private void flushBuffer(CompletableFuture<Long> future, ByteBuf dataBuf,
414    long nextPacketOffsetInBlock, boolean syncBlock) {
415    int dataLen = dataBuf.readableBytes();
416    int chunkLen = summer.getBytesPerChecksum();
417    int trailingPartialChunkLen = dataLen % chunkLen;
418    int numChecks = dataLen / chunkLen + (trailingPartialChunkLen != 0 ? 1 : 0);
419    int checksumLen = numChecks * summer.getChecksumSize();
420    ByteBuf checksumBuf = alloc.directBuffer(checksumLen);
421    summer.calculateChunkedSums(dataBuf.nioBuffer(), checksumBuf.nioBuffer(0, checksumLen));
422    checksumBuf.writerIndex(checksumLen);
423    PacketHeader header = new PacketHeader(4 + checksumLen + dataLen, nextPacketOffsetInBlock,
424      nextPacketSeqno, false, dataLen, syncBlock);
425    int headerLen = header.getSerializedSize();
426    ByteBuf headerBuf = alloc.buffer(headerLen);
427    header.putInBuffer(headerBuf.nioBuffer(0, headerLen));
428    headerBuf.writerIndex(headerLen);
429    Callback c =
430      new Callback(future, nextPacketOffsetInBlock + dataLen, datanodeInfoMap.keySet(), dataLen);
431    waitingAckQueue.addLast(c);
432    // recheck again after we pushed the callback to queue
433    if (state != State.STREAMING && waitingAckQueue.peekFirst() == c) {
434      future.completeExceptionally(new IOException("stream already broken"));
435      // it's the one we have just pushed or just a no-op
436      waitingAckQueue.removeFirst();
437
438      checksumBuf.release();
439      headerBuf.release();
440
441      // This method takes ownership of the dataBuf, so we need release it before returning.
442      dataBuf.release();
443      return;
444    }
445    // TODO: we should perhaps measure time taken per DN here;
446    // we could collect statistics per DN, and/or exclude bad nodes in createOutput.
447    datanodeInfoMap.keySet().forEach(ch -> {
448      safeWrite(ch, headerBuf.retainedDuplicate());
449      safeWrite(ch, checksumBuf.retainedDuplicate());
450      safeWriteAndFlush(ch, dataBuf.retainedDuplicate());
451    });
452    checksumBuf.release();
453    headerBuf.release();
454    dataBuf.release();
455    nextPacketSeqno++;
456  }
457
458  private void flush0(CompletableFuture<Long> future, boolean syncBlock) {
459    if (state != State.STREAMING) {
460      future.completeExceptionally(new IOException("stream already broken"));
461      return;
462    }
463    int dataLen = buf.readableBytes();
464    if (dataLen == trailingPartialChunkLength) {
465      // no new data
466      long lengthAfterFlush = nextPacketOffsetInBlock + dataLen;
467      Callback lastFlush = waitingAckQueue.peekLast();
468      if (lastFlush != null) {
469        Callback c = new Callback(future, lengthAfterFlush, Collections.emptySet(), dataLen);
470        waitingAckQueue.addLast(c);
471        // recheck here if we have already removed the previous callback from the queue
472        if (waitingAckQueue.peekFirst() == c) {
473          // all previous callbacks have been removed
474          // notice that this does mean we will always win here because the background thread may
475          // have already started to mark the future here as completed in the completed or failed
476          // methods but haven't removed it from the queue yet. That's also why the removeFirst
477          // call below may be a no-op.
478          if (state != State.STREAMING) {
479            future.completeExceptionally(new IOException("stream already broken"));
480          } else {
481            future.complete(lengthAfterFlush);
482          }
483          // it's the one we have just pushed or just a no-op
484          waitingAckQueue.removeFirst();
485        }
486      } else {
487        // we must have acked all the data so the ackedBlockLength must be same with
488        // lengthAfterFlush
489        future.complete(lengthAfterFlush);
490      }
491      return;
492    }
493
494    if (encryptor != null) {
495      ByteBuf encryptBuf = alloc.directBuffer(dataLen);
496      buf.readBytes(encryptBuf, trailingPartialChunkLength);
497      int toEncryptLength = dataLen - trailingPartialChunkLength;
498      try {
499        encryptor.encrypt(buf.nioBuffer(trailingPartialChunkLength, toEncryptLength),
500          encryptBuf.nioBuffer(trailingPartialChunkLength, toEncryptLength));
501      } catch (IOException e) {
502        encryptBuf.release();
503        future.completeExceptionally(e);
504        return;
505      }
506      encryptBuf.writerIndex(dataLen);
507      buf.release();
508      buf = encryptBuf;
509    }
510
511    if (dataLen > maxDataLen) {
512      // We need to write out the data by multiple packets as the max packet allowed is 16M.
513      long nextSubPacketOffsetInBlock = nextPacketOffsetInBlock;
514      for (int remaining = dataLen;;) {
515        if (remaining < maxDataLen) {
516          flushBuffer(future, buf.readRetainedSlice(remaining), nextSubPacketOffsetInBlock,
517            syncBlock);
518          break;
519        } else {
520          flushBuffer(new CompletableFuture<>(), buf.readRetainedSlice(maxDataLen),
521            nextSubPacketOffsetInBlock, syncBlock);
522          remaining -= maxDataLen;
523          nextSubPacketOffsetInBlock += maxDataLen;
524        }
525      }
526    } else {
527      flushBuffer(future, buf.retain(), nextPacketOffsetInBlock, syncBlock);
528    }
529    trailingPartialChunkLength = dataLen % summer.getBytesPerChecksum();
530    ByteBuf newBuf = alloc.directBuffer(sendBufSizePRedictor.guess(dataLen))
531      .ensureWritable(trailingPartialChunkLength);
532    if (trailingPartialChunkLength != 0) {
533      buf.readerIndex(dataLen - trailingPartialChunkLength).readBytes(newBuf,
534        trailingPartialChunkLength);
535    }
536    buf.release();
537    this.buf = newBuf;
538    nextPacketOffsetInBlock += dataLen - trailingPartialChunkLength;
539  }
540
541  /**
542   * Flush the buffer out to datanodes.
543   * @param syncBlock will call hsync if true, otherwise hflush.
544   * @return A CompletableFuture that hold the acked length after flushing.
545   */
546  @Override
547  public CompletableFuture<Long> flush(boolean syncBlock) {
548    CompletableFuture<Long> future = new CompletableFuture<>();
549    flush0(future, syncBlock);
550    return future;
551  }
552
553  private void endBlock() throws IOException {
554    Preconditions.checkState(waitingAckQueue.isEmpty(),
555      "should call flush first before calling close");
556    if (state != State.STREAMING) {
557      throw new IOException("stream already broken");
558    }
559    state = State.CLOSING;
560    long finalizedLength = ackedBlockLength;
561    PacketHeader header = new PacketHeader(4, finalizedLength, nextPacketSeqno, true, 0, false);
562    buf.release();
563    buf = null;
564    int headerLen = header.getSerializedSize();
565    ByteBuf headerBuf = alloc.directBuffer(headerLen);
566    header.putInBuffer(headerBuf.nioBuffer(0, headerLen));
567    headerBuf.writerIndex(headerLen);
568    CompletableFuture<Long> future = new CompletableFuture<>();
569    waitingAckQueue.add(new Callback(future, finalizedLength, datanodeInfoMap.keySet(), 0));
570    datanodeInfoMap.keySet().forEach(ch -> safeWriteAndFlush(ch, headerBuf.retainedDuplicate()));
571    headerBuf.release();
572    FutureUtils.get(future);
573  }
574
575  private void closeDataNodeChannelsAndAwait() {
576    List<ChannelFuture> futures = new ArrayList<>();
577    for (Channel ch : datanodeInfoMap.keySet()) {
578      futures.add(ch.close());
579    }
580    for (ChannelFuture future : futures) {
581      consume(future.awaitUninterruptibly());
582    }
583  }
584
585  /**
586   * The close method when error occurred. Now we just call recoverFileLease.
587   */
588  @Override
589  public void recoverAndClose(CancelableProgressable reporter) throws IOException {
590    if (buf != null) {
591      buf.release();
592      buf = null;
593    }
594    closeDataNodeChannelsAndAwait();
595    endFileLease(client, fileId);
596    RecoverLeaseFSUtils.recoverFileLease(dfs, new Path(src), conf,
597      reporter == null ? new CancelOnClose(client) : reporter);
598  }
599
600  /**
601   * End the current block and complete file at namenode. You should call
602   * {@link #recoverAndClose(CancelableProgressable)} if this method throws an exception.
603   */
604  @Override
605  public void close() throws IOException {
606    endBlock();
607    state = State.CLOSED;
608    closeDataNodeChannelsAndAwait();
609    block.setNumBytes(ackedBlockLength);
610    completeFile(client, namenode, src, clientName, block, fileId);
611  }
612
613  @Override
614  public boolean isBroken() {
615    return state == State.BROKEN;
616  }
617
618  @Override
619  public long getSyncedLength() {
620    return this.ackedBlockLength;
621  }
622
623  @RestrictedApi(explanation = "Should only be called in tests", link = "",
624      allowedOnPath = ".*/src/test/.*")
625  Map<Channel, DatanodeInfo> getDatanodeInfoMap() {
626    return this.datanodeInfoMap;
627  }
628}