001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.asyncfs; 019 020import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.HEART_BEAT_SEQNO; 021import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.READ_TIMEOUT; 022import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.completeFile; 023import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.endFileLease; 024import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.getStatus; 025import static org.apache.hadoop.hbase.util.NettyFutureUtils.consume; 026import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeWrite; 027import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeWriteAndFlush; 028import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; 029import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE; 030import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.WRITER_IDLE; 031 032import com.google.errorprone.annotations.RestrictedApi; 033import java.io.IOException; 034import java.nio.ByteBuffer; 035import java.util.ArrayList; 036import java.util.Collection; 037import java.util.Collections; 038import java.util.Iterator; 039import java.util.List; 040import java.util.Map; 041import java.util.Set; 042import java.util.concurrent.CompletableFuture; 043import java.util.concurrent.ConcurrentHashMap; 044import java.util.concurrent.ConcurrentLinkedDeque; 045import java.util.concurrent.TimeUnit; 046import java.util.function.Supplier; 047import org.apache.hadoop.conf.Configuration; 048import org.apache.hadoop.crypto.Encryptor; 049import org.apache.hadoop.fs.Path; 050import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.CancelOnClose; 051import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; 052import org.apache.hadoop.hbase.util.CancelableProgressable; 053import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 054import org.apache.hadoop.hbase.util.FutureUtils; 055import org.apache.hadoop.hbase.util.NettyFutureUtils; 056import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils; 057import org.apache.hadoop.hdfs.DFSClient; 058import org.apache.hadoop.hdfs.DistributedFileSystem; 059import org.apache.hadoop.hdfs.protocol.ClientProtocol; 060import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 061import org.apache.hadoop.hdfs.protocol.ExtendedBlock; 062import org.apache.hadoop.hdfs.protocol.LocatedBlock; 063import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; 064import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; 065import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto; 066import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; 067import org.apache.hadoop.util.DataChecksum; 068import org.apache.yetus.audience.InterfaceAudience; 069 070import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 071import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 072import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator; 073import org.apache.hbase.thirdparty.io.netty.channel.Channel; 074import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture; 075import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler.Sharable; 076import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; 077import org.apache.hbase.thirdparty.io.netty.channel.ChannelId; 078import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler; 079import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; 080import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent; 081import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler; 082 083/** 084 * An asynchronous HDFS output stream implementation which fans out data to datanode and only 085 * supports writing file with only one block. 086 * <p> 087 * Use the createOutput method in {@link FanOutOneBlockAsyncDFSOutputHelper} to create. The mainly 088 * usage of this class is implementing WAL, so we only expose a little HDFS configurations in the 089 * method. And we place it here under io package because we want to make it independent of WAL 090 * implementation thus easier to move it to HDFS project finally. 091 * <p> 092 * Note that, although we support pipelined flush, i.e, write new data and then flush before the 093 * previous flush succeeds, the implementation is not thread safe, so you should not call its 094 * methods concurrently. 095 * <p> 096 * Advantages compare to DFSOutputStream: 097 * <ol> 098 * <li>The fan out mechanism. This will reduce the latency.</li> 099 * <li>Fail-fast when connection to datanode error. The WAL implementation could open new writer 100 * ASAP.</li> 101 * <li>We could benefit from netty's ByteBuf management mechanism.</li> 102 * </ol> 103 */ 104@InterfaceAudience.Private 105public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { 106 107 // The MAX_PACKET_SIZE is 16MB but it include the header size and checksum size. So here we set a 108 // smaller limit for data size. 109 private static final int MAX_DATA_LEN = 12 * 1024 * 1024; 110 111 private final Configuration conf; 112 113 private final DistributedFileSystem dfs; 114 115 private final DFSClient client; 116 117 private final ClientProtocol namenode; 118 119 private final String clientName; 120 121 private final String src; 122 123 private final long fileId; 124 125 private final ExtendedBlock block; 126 127 private final DatanodeInfo[] locations; 128 129 private final Encryptor encryptor; 130 131 private final Map<Channel, DatanodeInfo> datanodeInfoMap; 132 133 private final DataChecksum summer; 134 135 private final int maxDataLen; 136 137 private final ByteBufAllocator alloc; 138 139 private static final class Callback { 140 141 private final CompletableFuture<Long> future; 142 143 private final long ackedLength; 144 145 // should be backed by a thread safe collection 146 private final Set<ChannelId> unfinishedReplicas; 147 private final long packetDataLen; 148 private final long flushTimestamp; 149 private long lastAckTimestamp = -1; 150 151 public Callback(CompletableFuture<Long> future, long ackedLength, 152 final Collection<Channel> replicas, long packetDataLen) { 153 this.future = future; 154 this.ackedLength = ackedLength; 155 this.packetDataLen = packetDataLen; 156 this.flushTimestamp = EnvironmentEdgeManager.currentTime(); 157 if (replicas.isEmpty()) { 158 this.unfinishedReplicas = Collections.emptySet(); 159 } else { 160 this.unfinishedReplicas = 161 Collections.newSetFromMap(new ConcurrentHashMap<ChannelId, Boolean>(replicas.size())); 162 replicas.stream().map(Channel::id).forEachOrdered(unfinishedReplicas::add); 163 } 164 } 165 } 166 167 private final ConcurrentLinkedDeque<Callback> waitingAckQueue = new ConcurrentLinkedDeque<>(); 168 169 private volatile long ackedBlockLength = 0L; 170 171 // this could be different from acked block length because a packet can not start at the middle of 172 // a chunk. 173 private long nextPacketOffsetInBlock = 0L; 174 175 // the length of the trailing partial chunk, this is because the packet start offset must be 176 // aligned with the length of checksum chunk so we need to resend the same data. 177 private int trailingPartialChunkLength = 0; 178 179 private long nextPacketSeqno = 0L; 180 181 private ByteBuf buf; 182 183 private final SendBufSizePredictor sendBufSizePRedictor = new SendBufSizePredictor(); 184 185 // State for connections to DN 186 private enum State { 187 STREAMING, 188 CLOSING, 189 BROKEN, 190 CLOSED 191 } 192 193 private volatile State state; 194 195 private final StreamSlowMonitor streamSlowMonitor; 196 197 // all lock-free to make it run faster 198 private void completed(Channel channel) { 199 for (Iterator<Callback> iter = waitingAckQueue.iterator(); iter.hasNext();) { 200 Callback c = iter.next(); 201 // if the current unfinished replicas does not contain us then it means that we have already 202 // acked this one, let's iterate to find the one we have not acked yet. 203 if (c.unfinishedReplicas.remove(channel.id())) { 204 long current = EnvironmentEdgeManager.currentTime(); 205 streamSlowMonitor.checkProcessTimeAndSpeed(datanodeInfoMap.get(channel), c.packetDataLen, 206 current - c.flushTimestamp, c.lastAckTimestamp, c.unfinishedReplicas.size()); 207 c.lastAckTimestamp = current; 208 if (c.unfinishedReplicas.isEmpty()) { 209 // we need to remove first before complete the future. It is possible that after we 210 // complete the future the upper layer will call close immediately before we remove the 211 // entry from waitingAckQueue and lead to an IllegalStateException. And also set the 212 // ackedBlockLength first otherwise we may use a wrong length to commit the block. This 213 // may lead to multiple remove and assign but is OK. The semantic of iter.remove is 214 // removing the entry returned by calling previous next, so if the entry has already been 215 // removed then it is a no-op, and for the assign, the values are the same so no problem. 216 iter.remove(); 217 ackedBlockLength = c.ackedLength; 218 // the future.complete check is to confirm that we are the only one who grabbed the work, 219 // otherwise just give up and return. 220 if (c.future.complete(c.ackedLength)) { 221 // also wake up flush requests which have the same length. 222 while (iter.hasNext()) { 223 Callback maybeDummyCb = iter.next(); 224 if (maybeDummyCb.ackedLength == c.ackedLength) { 225 iter.remove(); 226 maybeDummyCb.future.complete(c.ackedLength); 227 } else { 228 break; 229 } 230 } 231 } 232 } 233 return; 234 } 235 } 236 } 237 238 // this usually does not happen which means it is not on the critical path so make it synchronized 239 // so that the implementation will not burn up our brain as there are multiple state changes and 240 // checks. 241 private synchronized void failed(Channel channel, Supplier<Throwable> errorSupplier) { 242 if (state == State.CLOSED) { 243 return; 244 } 245 if (state == State.BROKEN) { 246 failWaitingAckQueue(channel, errorSupplier); 247 return; 248 } 249 if (state == State.CLOSING) { 250 Callback c = waitingAckQueue.peekFirst(); 251 if (c == null || !c.unfinishedReplicas.contains(channel.id())) { 252 // nothing, the endBlock request has already finished. 253 return; 254 } 255 } 256 // disable further write, and fail all pending ack. 257 state = State.BROKEN; 258 failWaitingAckQueue(channel, errorSupplier); 259 datanodeInfoMap.keySet().forEach(NettyFutureUtils::safeClose); 260 } 261 262 private void failWaitingAckQueue(Channel channel, Supplier<Throwable> errorSupplier) { 263 Throwable error = errorSupplier.get(); 264 for (Iterator<Callback> iter = waitingAckQueue.iterator(); iter.hasNext();) { 265 Callback c = iter.next(); 266 // find the first sync request which we have not acked yet and fail all the request after it. 267 if (!c.unfinishedReplicas.contains(channel.id())) { 268 continue; 269 } 270 for (;;) { 271 c.future.completeExceptionally(error); 272 if (!iter.hasNext()) { 273 break; 274 } 275 c = iter.next(); 276 } 277 break; 278 } 279 } 280 281 @Sharable 282 private final class AckHandler extends SimpleChannelInboundHandler<PipelineAckProto> { 283 284 private final int timeoutMs; 285 286 public AckHandler(int timeoutMs) { 287 this.timeoutMs = timeoutMs; 288 } 289 290 @Override 291 protected void channelRead0(ChannelHandlerContext ctx, PipelineAckProto ack) throws Exception { 292 Status reply = getStatus(ack); 293 if (reply != Status.SUCCESS) { 294 failed(ctx.channel(), () -> new IOException("Bad response " + reply + " for block " + block 295 + " from datanode " + ctx.channel().remoteAddress())); 296 return; 297 } 298 if (PipelineAck.isRestartOOBStatus(reply)) { 299 failed(ctx.channel(), () -> new IOException("Restart response " + reply + " for block " 300 + block + " from datanode " + ctx.channel().remoteAddress())); 301 return; 302 } 303 if (ack.getSeqno() == HEART_BEAT_SEQNO) { 304 return; 305 } 306 completed(ctx.channel()); 307 } 308 309 @Override 310 public void channelInactive(ChannelHandlerContext ctx) throws Exception { 311 if (state == State.CLOSED) { 312 return; 313 } 314 failed(ctx.channel(), 315 () -> new IOException("Connection to " + ctx.channel().remoteAddress() + " closed")); 316 } 317 318 @Override 319 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 320 failed(ctx.channel(), () -> cause); 321 } 322 323 @Override 324 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { 325 if (evt instanceof IdleStateEvent) { 326 IdleStateEvent e = (IdleStateEvent) evt; 327 if (e.state() == READER_IDLE) { 328 failed(ctx.channel(), 329 () -> new IOException("Timeout(" + timeoutMs + "ms) waiting for response")); 330 } else if (e.state() == WRITER_IDLE) { 331 PacketHeader heartbeat = new PacketHeader(4, 0, HEART_BEAT_SEQNO, false, 0, false); 332 int len = heartbeat.getSerializedSize(); 333 ByteBuf buf = alloc.buffer(len); 334 heartbeat.putInBuffer(buf.nioBuffer(0, len)); 335 buf.writerIndex(len); 336 safeWriteAndFlush(ctx.channel(), buf); 337 } 338 return; 339 } 340 super.userEventTriggered(ctx, evt); 341 } 342 } 343 344 private void setupReceiver(int timeoutMs) { 345 AckHandler ackHandler = new AckHandler(timeoutMs); 346 for (Channel ch : datanodeInfoMap.keySet()) { 347 ch.pipeline().addLast( 348 new IdleStateHandler(timeoutMs, timeoutMs / 2, 0, TimeUnit.MILLISECONDS), 349 new ProtobufVarint32FrameDecoder(), 350 new ProtobufDecoder(PipelineAckProto.getDefaultInstance()), ackHandler); 351 ch.config().setAutoRead(true); 352 } 353 } 354 355 FanOutOneBlockAsyncDFSOutput(Configuration conf, DistributedFileSystem dfs, DFSClient client, 356 ClientProtocol namenode, String clientName, String src, long fileId, LocatedBlock locatedBlock, 357 Encryptor encryptor, Map<Channel, DatanodeInfo> datanodeInfoMap, DataChecksum summer, 358 ByteBufAllocator alloc, StreamSlowMonitor streamSlowMonitor) { 359 this.conf = conf; 360 this.dfs = dfs; 361 this.client = client; 362 this.namenode = namenode; 363 this.fileId = fileId; 364 this.clientName = clientName; 365 this.src = src; 366 this.block = locatedBlock.getBlock(); 367 this.locations = locatedBlock.getLocations(); 368 this.encryptor = encryptor; 369 this.datanodeInfoMap = datanodeInfoMap; 370 this.summer = summer; 371 this.maxDataLen = MAX_DATA_LEN - (MAX_DATA_LEN % summer.getBytesPerChecksum()); 372 this.alloc = alloc; 373 this.buf = alloc.directBuffer(sendBufSizePRedictor.initialSize()); 374 this.state = State.STREAMING; 375 setupReceiver(conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT)); 376 this.streamSlowMonitor = streamSlowMonitor; 377 } 378 379 @Override 380 public void writeInt(int i) { 381 buf.ensureWritable(4); 382 buf.writeInt(i); 383 } 384 385 @Override 386 public void write(ByteBuffer bb) { 387 buf.ensureWritable(bb.remaining()); 388 buf.writeBytes(bb); 389 } 390 391 @Override 392 public void write(byte[] b) { 393 write(b, 0, b.length); 394 } 395 396 @Override 397 public void write(byte[] b, int off, int len) { 398 buf.ensureWritable(len); 399 buf.writeBytes(b, off, len); 400 } 401 402 @Override 403 public int buffered() { 404 return buf.readableBytes(); 405 } 406 407 @Override 408 public DatanodeInfo[] getPipeline() { 409 return locations; 410 } 411 412 private void flushBuffer(CompletableFuture<Long> future, ByteBuf dataBuf, 413 long nextPacketOffsetInBlock, boolean syncBlock) { 414 int dataLen = dataBuf.readableBytes(); 415 int chunkLen = summer.getBytesPerChecksum(); 416 int trailingPartialChunkLen = dataLen % chunkLen; 417 int numChecks = dataLen / chunkLen + (trailingPartialChunkLen != 0 ? 1 : 0); 418 int checksumLen = numChecks * summer.getChecksumSize(); 419 ByteBuf checksumBuf = alloc.directBuffer(checksumLen); 420 summer.calculateChunkedSums(dataBuf.nioBuffer(), checksumBuf.nioBuffer(0, checksumLen)); 421 checksumBuf.writerIndex(checksumLen); 422 PacketHeader header = new PacketHeader(4 + checksumLen + dataLen, nextPacketOffsetInBlock, 423 nextPacketSeqno, false, dataLen, syncBlock); 424 int headerLen = header.getSerializedSize(); 425 ByteBuf headerBuf = alloc.buffer(headerLen); 426 header.putInBuffer(headerBuf.nioBuffer(0, headerLen)); 427 headerBuf.writerIndex(headerLen); 428 Callback c = 429 new Callback(future, nextPacketOffsetInBlock + dataLen, datanodeInfoMap.keySet(), dataLen); 430 waitingAckQueue.addLast(c); 431 // recheck again after we pushed the callback to queue 432 if (state != State.STREAMING && waitingAckQueue.peekFirst() == c) { 433 future.completeExceptionally(new IOException("stream already broken")); 434 // it's the one we have just pushed or just a no-op 435 waitingAckQueue.removeFirst(); 436 437 checksumBuf.release(); 438 headerBuf.release(); 439 440 // This method takes ownership of the dataBuf so we need release it before returning. 441 dataBuf.release(); 442 return; 443 } 444 // TODO: we should perhaps measure time taken per DN here; 445 // we could collect statistics per DN, and/or exclude bad nodes in createOutput. 446 datanodeInfoMap.keySet().forEach(ch -> { 447 safeWrite(ch, headerBuf.retainedDuplicate()); 448 safeWrite(ch, checksumBuf.retainedDuplicate()); 449 safeWriteAndFlush(ch, dataBuf.retainedDuplicate()); 450 }); 451 checksumBuf.release(); 452 headerBuf.release(); 453 dataBuf.release(); 454 nextPacketSeqno++; 455 } 456 457 private void flush0(CompletableFuture<Long> future, boolean syncBlock) { 458 if (state != State.STREAMING) { 459 future.completeExceptionally(new IOException("stream already broken")); 460 return; 461 } 462 int dataLen = buf.readableBytes(); 463 if (dataLen == trailingPartialChunkLength) { 464 // no new data 465 long lengthAfterFlush = nextPacketOffsetInBlock + dataLen; 466 Callback lastFlush = waitingAckQueue.peekLast(); 467 if (lastFlush != null) { 468 Callback c = new Callback(future, lengthAfterFlush, Collections.emptySet(), dataLen); 469 waitingAckQueue.addLast(c); 470 // recheck here if we have already removed the previous callback from the queue 471 if (waitingAckQueue.peekFirst() == c) { 472 // all previous callbacks have been removed 473 // notice that this does mean we will always win here because the background thread may 474 // have already started to mark the future here as completed in the completed or failed 475 // methods but haven't removed it from the queue yet. That's also why the removeFirst 476 // call below may be a no-op. 477 if (state != State.STREAMING) { 478 future.completeExceptionally(new IOException("stream already broken")); 479 } else { 480 future.complete(lengthAfterFlush); 481 } 482 // it's the one we have just pushed or just a no-op 483 waitingAckQueue.removeFirst(); 484 } 485 } else { 486 // we must have acked all the data so the ackedBlockLength must be same with 487 // lengthAfterFlush 488 future.complete(lengthAfterFlush); 489 } 490 return; 491 } 492 493 if (encryptor != null) { 494 ByteBuf encryptBuf = alloc.directBuffer(dataLen); 495 buf.readBytes(encryptBuf, trailingPartialChunkLength); 496 int toEncryptLength = dataLen - trailingPartialChunkLength; 497 try { 498 encryptor.encrypt(buf.nioBuffer(trailingPartialChunkLength, toEncryptLength), 499 encryptBuf.nioBuffer(trailingPartialChunkLength, toEncryptLength)); 500 } catch (IOException e) { 501 encryptBuf.release(); 502 future.completeExceptionally(e); 503 return; 504 } 505 encryptBuf.writerIndex(dataLen); 506 buf.release(); 507 buf = encryptBuf; 508 } 509 510 if (dataLen > maxDataLen) { 511 // We need to write out the data by multiple packets as the max packet allowed is 16M. 512 long nextSubPacketOffsetInBlock = nextPacketOffsetInBlock; 513 for (int remaining = dataLen;;) { 514 if (remaining < maxDataLen) { 515 flushBuffer(future, buf.readRetainedSlice(remaining), nextSubPacketOffsetInBlock, 516 syncBlock); 517 break; 518 } else { 519 flushBuffer(new CompletableFuture<>(), buf.readRetainedSlice(maxDataLen), 520 nextSubPacketOffsetInBlock, syncBlock); 521 remaining -= maxDataLen; 522 nextSubPacketOffsetInBlock += maxDataLen; 523 } 524 } 525 } else { 526 flushBuffer(future, buf.retain(), nextPacketOffsetInBlock, syncBlock); 527 } 528 trailingPartialChunkLength = dataLen % summer.getBytesPerChecksum(); 529 ByteBuf newBuf = alloc.directBuffer(sendBufSizePRedictor.guess(dataLen)) 530 .ensureWritable(trailingPartialChunkLength); 531 if (trailingPartialChunkLength != 0) { 532 buf.readerIndex(dataLen - trailingPartialChunkLength).readBytes(newBuf, 533 trailingPartialChunkLength); 534 } 535 buf.release(); 536 this.buf = newBuf; 537 nextPacketOffsetInBlock += dataLen - trailingPartialChunkLength; 538 } 539 540 /** 541 * Flush the buffer out to datanodes. 542 * @param syncBlock will call hsync if true, otherwise hflush. 543 * @return A CompletableFuture that hold the acked length after flushing. 544 */ 545 @Override 546 public CompletableFuture<Long> flush(boolean syncBlock) { 547 CompletableFuture<Long> future = new CompletableFuture<>(); 548 flush0(future, syncBlock); 549 return future; 550 } 551 552 private void endBlock() throws IOException { 553 Preconditions.checkState(waitingAckQueue.isEmpty(), 554 "should call flush first before calling close"); 555 if (state != State.STREAMING) { 556 throw new IOException("stream already broken"); 557 } 558 state = State.CLOSING; 559 long finalizedLength = ackedBlockLength; 560 PacketHeader header = new PacketHeader(4, finalizedLength, nextPacketSeqno, true, 0, false); 561 buf.release(); 562 buf = null; 563 int headerLen = header.getSerializedSize(); 564 ByteBuf headerBuf = alloc.directBuffer(headerLen); 565 header.putInBuffer(headerBuf.nioBuffer(0, headerLen)); 566 headerBuf.writerIndex(headerLen); 567 CompletableFuture<Long> future = new CompletableFuture<>(); 568 waitingAckQueue.add(new Callback(future, finalizedLength, datanodeInfoMap.keySet(), 0)); 569 datanodeInfoMap.keySet().forEach(ch -> safeWriteAndFlush(ch, headerBuf.retainedDuplicate())); 570 headerBuf.release(); 571 FutureUtils.get(future); 572 } 573 574 private void closeDataNodeChannelsAndAwait() { 575 List<ChannelFuture> futures = new ArrayList<>(); 576 for (Channel ch : datanodeInfoMap.keySet()) { 577 futures.add(ch.close()); 578 } 579 for (ChannelFuture future : futures) { 580 consume(future.awaitUninterruptibly()); 581 } 582 } 583 584 /** 585 * The close method when error occurred. Now we just call recoverFileLease. 586 */ 587 @Override 588 public void recoverAndClose(CancelableProgressable reporter) throws IOException { 589 if (buf != null) { 590 buf.release(); 591 buf = null; 592 } 593 closeDataNodeChannelsAndAwait(); 594 endFileLease(client, fileId); 595 RecoverLeaseFSUtils.recoverFileLease(dfs, new Path(src), conf, 596 reporter == null ? new CancelOnClose(client) : reporter); 597 } 598 599 /** 600 * End the current block and complete file at namenode. You should call 601 * {@link #recoverAndClose(CancelableProgressable)} if this method throws an exception. 602 */ 603 @Override 604 public void close() throws IOException { 605 endBlock(); 606 state = State.CLOSED; 607 closeDataNodeChannelsAndAwait(); 608 block.setNumBytes(ackedBlockLength); 609 completeFile(client, namenode, src, clientName, block, fileId); 610 } 611 612 @Override 613 public boolean isBroken() { 614 return state == State.BROKEN; 615 } 616 617 @Override 618 public long getSyncedLength() { 619 return this.ackedBlockLength; 620 } 621 622 @RestrictedApi(explanation = "Should only be called in tests", link = "", 623 allowedOnPath = ".*/src/test/.*") 624 Map<Channel, DatanodeInfo> getDatanodeInfoMap() { 625 return this.datanodeInfoMap; 626 } 627}