001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.asyncfs; 019 020import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.HEART_BEAT_SEQNO; 021import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.READ_TIMEOUT; 022import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.completeFile; 023import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.endFileLease; 024import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.getStatus; 025import static org.apache.hadoop.hbase.util.LocatedBlockHelper.getLocatedBlockLocations; 026import static org.apache.hadoop.hbase.util.NettyFutureUtils.consume; 027import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeWrite; 028import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeWriteAndFlush; 029import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; 030import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE; 031import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.WRITER_IDLE; 032 033import com.google.errorprone.annotations.RestrictedApi; 034import java.io.IOException; 035import java.nio.ByteBuffer; 036import java.util.ArrayList; 037import java.util.Collection; 038import java.util.Collections; 039import java.util.EnumSet; 040import java.util.Iterator; 041import java.util.List; 042import java.util.Map; 043import java.util.Set; 044import java.util.concurrent.CompletableFuture; 045import java.util.concurrent.ConcurrentHashMap; 046import java.util.concurrent.ConcurrentLinkedDeque; 047import java.util.concurrent.TimeUnit; 048import java.util.function.Supplier; 049import org.apache.hadoop.conf.Configuration; 050import org.apache.hadoop.crypto.Encryptor; 051import org.apache.hadoop.fs.CreateFlag; 052import org.apache.hadoop.fs.Path; 053import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.CancelOnClose; 054import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; 055import org.apache.hadoop.hbase.util.CancelableProgressable; 056import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 057import org.apache.hadoop.hbase.util.FutureUtils; 058import org.apache.hadoop.hbase.util.NettyFutureUtils; 059import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils; 060import org.apache.hadoop.hdfs.DFSClient; 061import org.apache.hadoop.hdfs.DistributedFileSystem; 062import org.apache.hadoop.hdfs.DummyDFSOutputStream; 063import org.apache.hadoop.hdfs.protocol.ClientProtocol; 064import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 065import org.apache.hadoop.hdfs.protocol.ExtendedBlock; 066import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; 067import org.apache.hadoop.hdfs.protocol.LocatedBlock; 068import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; 069import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; 070import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto; 071import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; 072import org.apache.hadoop.util.DataChecksum; 073import org.apache.yetus.audience.InterfaceAudience; 074 075import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 076import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 077import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator; 078import org.apache.hbase.thirdparty.io.netty.channel.Channel; 079import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture; 080import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler.Sharable; 081import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; 082import org.apache.hbase.thirdparty.io.netty.channel.ChannelId; 083import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler; 084import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; 085import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent; 086import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler; 087 088/** 089 * An asynchronous HDFS output stream implementation which fans out data to datanode and only 090 * supports writing file with only one block. 091 * <p> 092 * Use the createOutput method in {@link FanOutOneBlockAsyncDFSOutputHelper} to create. The main 093 * usage of this class is implementing WAL, so we only expose a little HDFS configurations in the 094 * method. And we place it here under io package because we want to make it independent of WAL 095 * implementation thus easier to move it to HDFS project finally. 096 * <p> 097 * Note that, although we support pipelined flush, i.e, write new data and then flush before the 098 * previous flush succeeds, the implementation is not thread safe, so you should not call its 099 * methods concurrently. 100 * <p> 101 * Advantages compare to DFSOutputStream: 102 * <ol> 103 * <li>The fan out mechanism. This will reduce the latency.</li> 104 * <li>Fail-fast when connection to datanode error. The WAL implementation could open new writer 105 * ASAP.</li> 106 * <li>We could benefit from netty's ByteBuf management mechanism.</li> 107 * </ol> 108 */ 109@InterfaceAudience.Private 110public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { 111 112 // The MAX_PACKET_SIZE is 16MB, but it includes the header size and checksum size. So here we set 113 // a smaller limit for data size. 114 private static final int MAX_DATA_LEN = 12 * 1024 * 1024; 115 116 private final Configuration conf; 117 118 private final DistributedFileSystem dfs; 119 120 private final DFSClient client; 121 122 private final ClientProtocol namenode; 123 124 private final String clientName; 125 126 private final String src; 127 128 private final HdfsFileStatus stat; 129 130 private final ExtendedBlock block; 131 132 private final DatanodeInfo[] locations; 133 134 private final Encryptor encryptor; 135 136 private final Map<Channel, DatanodeInfo> datanodeInfoMap; 137 138 private final DataChecksum summer; 139 140 private final int maxDataLen; 141 142 private final ByteBufAllocator alloc; 143 144 // a dummy DFSOutputStream used for lease renewal 145 private final DummyDFSOutputStream dummyStream; 146 147 private static final class Callback { 148 149 private final CompletableFuture<Long> future; 150 151 private final long ackedLength; 152 153 // should be backed by a thread safe collection 154 private final Set<ChannelId> unfinishedReplicas; 155 private final long packetDataLen; 156 private final long flushTimestamp; 157 private long lastAckTimestamp = -1; 158 159 public Callback(CompletableFuture<Long> future, long ackedLength, 160 final Collection<Channel> replicas, long packetDataLen) { 161 this.future = future; 162 this.ackedLength = ackedLength; 163 this.packetDataLen = packetDataLen; 164 this.flushTimestamp = EnvironmentEdgeManager.currentTime(); 165 if (replicas.isEmpty()) { 166 this.unfinishedReplicas = Collections.emptySet(); 167 } else { 168 this.unfinishedReplicas = 169 Collections.newSetFromMap(new ConcurrentHashMap<ChannelId, Boolean>(replicas.size())); 170 replicas.stream().map(Channel::id).forEachOrdered(unfinishedReplicas::add); 171 } 172 } 173 } 174 175 private final ConcurrentLinkedDeque<Callback> waitingAckQueue = new ConcurrentLinkedDeque<>(); 176 177 private volatile long ackedBlockLength = 0L; 178 179 // this could be different from acked block length because a packet can not start at the middle of 180 // a chunk. 181 private long nextPacketOffsetInBlock = 0L; 182 183 // the length of the trailing partial chunk, this is because the packet start offset must be 184 // aligned with the length of checksum chunk, so we need to resend the same data. 185 private int trailingPartialChunkLength = 0; 186 187 private long nextPacketSeqno = 0L; 188 189 private ByteBuf buf; 190 191 private final SendBufSizePredictor sendBufSizePRedictor = new SendBufSizePredictor(); 192 193 // State for connections to DN 194 private enum State { 195 STREAMING, 196 CLOSING, 197 BROKEN, 198 CLOSED 199 } 200 201 private volatile State state; 202 203 private final StreamSlowMonitor streamSlowMonitor; 204 205 // all lock-free to make it run faster 206 private void completed(Channel channel) { 207 for (Iterator<Callback> iter = waitingAckQueue.iterator(); iter.hasNext();) { 208 Callback c = iter.next(); 209 // if the current unfinished replicas does not contain us then it means that we have already 210 // acked this one, let's iterate to find the one we have not acked yet. 211 if (c.unfinishedReplicas.remove(channel.id())) { 212 long current = EnvironmentEdgeManager.currentTime(); 213 streamSlowMonitor.checkProcessTimeAndSpeed(datanodeInfoMap.get(channel), c.packetDataLen, 214 current - c.flushTimestamp, c.lastAckTimestamp, c.unfinishedReplicas.size()); 215 c.lastAckTimestamp = current; 216 if (c.unfinishedReplicas.isEmpty()) { 217 // we need to remove first before complete the future. It is possible that after we 218 // complete the future the upper layer will call close immediately before we remove the 219 // entry from waitingAckQueue and lead to an IllegalStateException. And also set the 220 // ackedBlockLength first otherwise we may use a wrong length to commit the block. This 221 // may lead to multiple remove and assign but is OK. The semantic of iter.remove is 222 // removing the entry returned by calling previous next, so if the entry has already been 223 // removed then it is a no-op, and for the assign, the values are the same so no problem. 224 iter.remove(); 225 ackedBlockLength = c.ackedLength; 226 // the future.complete check is to confirm that we are the only one who grabbed the work, 227 // otherwise just give up and return. 228 if (c.future.complete(c.ackedLength)) { 229 // also wake up flush requests which have the same length. 230 while (iter.hasNext()) { 231 Callback maybeDummyCb = iter.next(); 232 if (maybeDummyCb.ackedLength == c.ackedLength) { 233 iter.remove(); 234 maybeDummyCb.future.complete(c.ackedLength); 235 } else { 236 break; 237 } 238 } 239 } 240 } 241 return; 242 } 243 } 244 } 245 246 // this usually does not happen which means it is not on the critical path so make it synchronized 247 // so that the implementation will not burn up our brain as there are multiple state changes and 248 // checks. 249 private synchronized void failed(Channel channel, Supplier<Throwable> errorSupplier) { 250 if (state == State.CLOSED) { 251 return; 252 } 253 if (state == State.BROKEN) { 254 failWaitingAckQueue(channel, errorSupplier); 255 return; 256 } 257 if (state == State.CLOSING) { 258 Callback c = waitingAckQueue.peekFirst(); 259 if (c == null || !c.unfinishedReplicas.contains(channel.id())) { 260 // nothing, the endBlock request has already finished. 261 return; 262 } 263 } 264 // disable further write, and fail all pending ack. 265 state = State.BROKEN; 266 failWaitingAckQueue(channel, errorSupplier); 267 datanodeInfoMap.keySet().forEach(NettyFutureUtils::safeClose); 268 } 269 270 private void failWaitingAckQueue(Channel channel, Supplier<Throwable> errorSupplier) { 271 Throwable error = errorSupplier.get(); 272 for (Iterator<Callback> iter = waitingAckQueue.iterator(); iter.hasNext();) { 273 Callback c = iter.next(); 274 // find the first sync request which we have not acked yet and fail all the request after it. 275 if (!c.unfinishedReplicas.contains(channel.id())) { 276 continue; 277 } 278 for (;;) { 279 c.future.completeExceptionally(error); 280 if (!iter.hasNext()) { 281 break; 282 } 283 c = iter.next(); 284 } 285 break; 286 } 287 } 288 289 @Sharable 290 private final class AckHandler extends SimpleChannelInboundHandler<PipelineAckProto> { 291 292 private final int timeoutMs; 293 294 public AckHandler(int timeoutMs) { 295 this.timeoutMs = timeoutMs; 296 } 297 298 @Override 299 protected void channelRead0(ChannelHandlerContext ctx, PipelineAckProto ack) throws Exception { 300 Status reply = getStatus(ack); 301 if (reply != Status.SUCCESS) { 302 failed(ctx.channel(), () -> new IOException("Bad response " + reply + " for block " + block 303 + " from datanode " + ctx.channel().remoteAddress())); 304 return; 305 } 306 if (PipelineAck.isRestartOOBStatus(reply)) { 307 failed(ctx.channel(), () -> new IOException("Restart response " + reply + " for block " 308 + block + " from datanode " + ctx.channel().remoteAddress())); 309 return; 310 } 311 if (ack.getSeqno() == HEART_BEAT_SEQNO) { 312 return; 313 } 314 completed(ctx.channel()); 315 } 316 317 @Override 318 public void channelInactive(ChannelHandlerContext ctx) throws Exception { 319 if (state == State.CLOSED) { 320 return; 321 } 322 failed(ctx.channel(), 323 () -> new IOException("Connection to " + ctx.channel().remoteAddress() + " closed")); 324 } 325 326 @Override 327 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 328 failed(ctx.channel(), () -> cause); 329 } 330 331 @Override 332 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { 333 if (evt instanceof IdleStateEvent) { 334 IdleStateEvent e = (IdleStateEvent) evt; 335 if (e.state() == READER_IDLE) { 336 failed(ctx.channel(), 337 () -> new IOException("Timeout(" + timeoutMs + "ms) waiting for response")); 338 } else if (e.state() == WRITER_IDLE) { 339 PacketHeader heartbeat = new PacketHeader(4, 0, HEART_BEAT_SEQNO, false, 0, false); 340 int len = heartbeat.getSerializedSize(); 341 ByteBuf buf = alloc.buffer(len); 342 heartbeat.putInBuffer(buf.nioBuffer(0, len)); 343 buf.writerIndex(len); 344 safeWriteAndFlush(ctx.channel(), buf); 345 } 346 return; 347 } 348 super.userEventTriggered(ctx, evt); 349 } 350 } 351 352 private void setupReceiver(int timeoutMs) { 353 AckHandler ackHandler = new AckHandler(timeoutMs); 354 for (Channel ch : datanodeInfoMap.keySet()) { 355 ch.pipeline().addLast( 356 new IdleStateHandler(timeoutMs, timeoutMs / 2, 0, TimeUnit.MILLISECONDS), 357 new ProtobufVarint32FrameDecoder(), 358 new ProtobufDecoder(PipelineAckProto.getDefaultInstance()), ackHandler); 359 ch.config().setAutoRead(true); 360 } 361 } 362 363 FanOutOneBlockAsyncDFSOutput(Configuration conf, DistributedFileSystem dfs, DFSClient client, 364 ClientProtocol namenode, String clientName, String src, HdfsFileStatus stat, 365 EnumSet<CreateFlag> createFlags, LocatedBlock locatedBlock, Encryptor encryptor, 366 Map<Channel, DatanodeInfo> datanodeInfoMap, DataChecksum summer, ByteBufAllocator alloc, 367 StreamSlowMonitor streamSlowMonitor) { 368 this.conf = conf; 369 this.dfs = dfs; 370 this.client = client; 371 this.namenode = namenode; 372 this.stat = stat; 373 this.clientName = clientName; 374 this.src = src; 375 this.block = locatedBlock.getBlock(); 376 this.locations = getLocatedBlockLocations(locatedBlock); 377 this.encryptor = encryptor; 378 this.datanodeInfoMap = datanodeInfoMap; 379 this.summer = summer; 380 this.maxDataLen = MAX_DATA_LEN - (MAX_DATA_LEN % summer.getBytesPerChecksum()); 381 this.alloc = alloc; 382 this.buf = alloc.directBuffer(sendBufSizePRedictor.initialSize()); 383 this.state = State.STREAMING; 384 setupReceiver(conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT)); 385 this.streamSlowMonitor = streamSlowMonitor; 386 this.dummyStream = new DummyDFSOutputStream(this, client, src, stat, createFlags, summer); 387 } 388 389 @Override 390 public void writeInt(int i) { 391 buf.ensureWritable(4); 392 buf.writeInt(i); 393 } 394 395 @Override 396 public void write(ByteBuffer bb) { 397 buf.ensureWritable(bb.remaining()); 398 buf.writeBytes(bb); 399 } 400 401 @Override 402 public void write(byte[] b) { 403 write(b, 0, b.length); 404 } 405 406 @Override 407 public void write(byte[] b, int off, int len) { 408 buf.ensureWritable(len); 409 buf.writeBytes(b, off, len); 410 } 411 412 @Override 413 public int buffered() { 414 return buf.readableBytes(); 415 } 416 417 @Override 418 public DatanodeInfo[] getPipeline() { 419 return locations; 420 } 421 422 private void flushBuffer(CompletableFuture<Long> future, ByteBuf dataBuf, 423 long nextPacketOffsetInBlock, boolean syncBlock) { 424 int dataLen = dataBuf.readableBytes(); 425 int chunkLen = summer.getBytesPerChecksum(); 426 int trailingPartialChunkLen = dataLen % chunkLen; 427 int numChecks = dataLen / chunkLen + (trailingPartialChunkLen != 0 ? 1 : 0); 428 int checksumLen = numChecks * summer.getChecksumSize(); 429 ByteBuf checksumBuf = alloc.directBuffer(checksumLen); 430 summer.calculateChunkedSums(dataBuf.nioBuffer(), checksumBuf.nioBuffer(0, checksumLen)); 431 checksumBuf.writerIndex(checksumLen); 432 PacketHeader header = new PacketHeader(4 + checksumLen + dataLen, nextPacketOffsetInBlock, 433 nextPacketSeqno, false, dataLen, syncBlock); 434 int headerLen = header.getSerializedSize(); 435 ByteBuf headerBuf = alloc.buffer(headerLen); 436 header.putInBuffer(headerBuf.nioBuffer(0, headerLen)); 437 headerBuf.writerIndex(headerLen); 438 Callback c = 439 new Callback(future, nextPacketOffsetInBlock + dataLen, datanodeInfoMap.keySet(), dataLen); 440 waitingAckQueue.addLast(c); 441 // recheck again after we pushed the callback to queue 442 if (state != State.STREAMING && waitingAckQueue.peekFirst() == c) { 443 future.completeExceptionally(new IOException("stream already broken")); 444 // it's the one we have just pushed or just a no-op 445 waitingAckQueue.removeFirst(); 446 447 checksumBuf.release(); 448 headerBuf.release(); 449 450 // This method takes ownership of the dataBuf, so we need release it before returning. 451 dataBuf.release(); 452 return; 453 } 454 // TODO: we should perhaps measure time taken per DN here; 455 // we could collect statistics per DN, and/or exclude bad nodes in createOutput. 456 datanodeInfoMap.keySet().forEach(ch -> { 457 safeWrite(ch, headerBuf.retainedDuplicate()); 458 safeWrite(ch, checksumBuf.retainedDuplicate()); 459 safeWriteAndFlush(ch, dataBuf.retainedDuplicate()); 460 }); 461 checksumBuf.release(); 462 headerBuf.release(); 463 dataBuf.release(); 464 nextPacketSeqno++; 465 } 466 467 private void flush0(CompletableFuture<Long> future, boolean syncBlock) { 468 if (state != State.STREAMING) { 469 future.completeExceptionally(new IOException("stream already broken")); 470 return; 471 } 472 int dataLen = buf.readableBytes(); 473 if (dataLen == trailingPartialChunkLength) { 474 // no new data 475 long lengthAfterFlush = nextPacketOffsetInBlock + dataLen; 476 Callback lastFlush = waitingAckQueue.peekLast(); 477 if (lastFlush != null) { 478 Callback c = new Callback(future, lengthAfterFlush, Collections.emptySet(), dataLen); 479 waitingAckQueue.addLast(c); 480 // recheck here if we have already removed the previous callback from the queue 481 if (waitingAckQueue.peekFirst() == c) { 482 // all previous callbacks have been removed 483 // notice that this does mean we will always win here because the background thread may 484 // have already started to mark the future here as completed in the completed or failed 485 // methods but haven't removed it from the queue yet. That's also why the removeFirst 486 // call below may be a no-op. 487 if (state != State.STREAMING) { 488 future.completeExceptionally(new IOException("stream already broken")); 489 } else { 490 future.complete(lengthAfterFlush); 491 } 492 // it's the one we have just pushed or just a no-op 493 waitingAckQueue.removeFirst(); 494 } 495 } else { 496 // we must have acked all the data so the ackedBlockLength must be same with 497 // lengthAfterFlush 498 future.complete(lengthAfterFlush); 499 } 500 return; 501 } 502 503 if (encryptor != null) { 504 ByteBuf encryptBuf = alloc.directBuffer(dataLen); 505 buf.readBytes(encryptBuf, trailingPartialChunkLength); 506 int toEncryptLength = dataLen - trailingPartialChunkLength; 507 try { 508 encryptor.encrypt(buf.nioBuffer(trailingPartialChunkLength, toEncryptLength), 509 encryptBuf.nioBuffer(trailingPartialChunkLength, toEncryptLength)); 510 } catch (IOException e) { 511 encryptBuf.release(); 512 future.completeExceptionally(e); 513 return; 514 } 515 encryptBuf.writerIndex(dataLen); 516 buf.release(); 517 buf = encryptBuf; 518 } 519 520 if (dataLen > maxDataLen) { 521 // We need to write out the data by multiple packets as the max packet allowed is 16M. 522 long nextSubPacketOffsetInBlock = nextPacketOffsetInBlock; 523 for (int remaining = dataLen;;) { 524 if (remaining < maxDataLen) { 525 flushBuffer(future, buf.readRetainedSlice(remaining), nextSubPacketOffsetInBlock, 526 syncBlock); 527 break; 528 } else { 529 flushBuffer(new CompletableFuture<>(), buf.readRetainedSlice(maxDataLen), 530 nextSubPacketOffsetInBlock, syncBlock); 531 remaining -= maxDataLen; 532 nextSubPacketOffsetInBlock += maxDataLen; 533 } 534 } 535 } else { 536 flushBuffer(future, buf.retain(), nextPacketOffsetInBlock, syncBlock); 537 } 538 trailingPartialChunkLength = dataLen % summer.getBytesPerChecksum(); 539 ByteBuf newBuf = alloc.directBuffer(sendBufSizePRedictor.guess(dataLen)) 540 .ensureWritable(trailingPartialChunkLength); 541 if (trailingPartialChunkLength != 0) { 542 buf.readerIndex(dataLen - trailingPartialChunkLength).readBytes(newBuf, 543 trailingPartialChunkLength); 544 } 545 buf.release(); 546 this.buf = newBuf; 547 nextPacketOffsetInBlock += dataLen - trailingPartialChunkLength; 548 } 549 550 /** 551 * Flush the buffer out to datanodes. 552 * @param syncBlock will call hsync if true, otherwise hflush. 553 * @return A CompletableFuture that hold the acked length after flushing. 554 */ 555 @Override 556 public CompletableFuture<Long> flush(boolean syncBlock) { 557 CompletableFuture<Long> future = new CompletableFuture<>(); 558 flush0(future, syncBlock); 559 return future; 560 } 561 562 private void endBlock() throws IOException { 563 Preconditions.checkState(waitingAckQueue.isEmpty(), 564 "should call flush first before calling close"); 565 if (state != State.STREAMING) { 566 throw new IOException("stream already broken"); 567 } 568 state = State.CLOSING; 569 long finalizedLength = ackedBlockLength; 570 PacketHeader header = new PacketHeader(4, finalizedLength, nextPacketSeqno, true, 0, false); 571 buf.release(); 572 buf = null; 573 int headerLen = header.getSerializedSize(); 574 ByteBuf headerBuf = alloc.directBuffer(headerLen); 575 header.putInBuffer(headerBuf.nioBuffer(0, headerLen)); 576 headerBuf.writerIndex(headerLen); 577 CompletableFuture<Long> future = new CompletableFuture<>(); 578 waitingAckQueue.add(new Callback(future, finalizedLength, datanodeInfoMap.keySet(), 0)); 579 datanodeInfoMap.keySet().forEach(ch -> safeWriteAndFlush(ch, headerBuf.retainedDuplicate())); 580 headerBuf.release(); 581 FutureUtils.get(future); 582 } 583 584 private void closeDataNodeChannelsAndAwait() { 585 List<ChannelFuture> futures = new ArrayList<>(); 586 for (Channel ch : datanodeInfoMap.keySet()) { 587 futures.add(ch.close()); 588 } 589 for (ChannelFuture future : futures) { 590 consume(future.awaitUninterruptibly()); 591 } 592 } 593 594 /** 595 * The close method when error occurred. Now we just call recoverFileLease. 596 */ 597 @Override 598 public void recoverAndClose(CancelableProgressable reporter) throws IOException { 599 if (buf != null) { 600 buf.release(); 601 buf = null; 602 } 603 closeDataNodeChannelsAndAwait(); 604 endFileLease(this); 605 RecoverLeaseFSUtils.recoverFileLease(dfs, new Path(src), conf, 606 reporter == null ? new CancelOnClose(client) : reporter); 607 } 608 609 /** 610 * End the current block and complete file at namenode. You should call 611 * {@link #recoverAndClose(CancelableProgressable)} if this method throws an exception. 612 */ 613 @Override 614 public void close() throws IOException { 615 endBlock(); 616 state = State.CLOSED; 617 closeDataNodeChannelsAndAwait(); 618 block.setNumBytes(ackedBlockLength); 619 completeFile(this, client, namenode, src, clientName, block, stat); 620 } 621 622 @Override 623 public boolean isBroken() { 624 return state == State.BROKEN; 625 } 626 627 @Override 628 public long getSyncedLength() { 629 return this.ackedBlockLength; 630 } 631 632 @RestrictedApi(explanation = "Should only be called in tests", link = "", 633 allowedOnPath = ".*/src/test/.*") 634 Map<Channel, DatanodeInfo> getDatanodeInfoMap() { 635 return this.datanodeInfoMap; 636 } 637 638 DFSClient getClient() { 639 return client; 640 } 641 642 DummyDFSOutputStream getDummyStream() { 643 return dummyStream; 644 } 645 646 boolean isClosed() { 647 return state == State.CLOSED; 648 } 649 650 HdfsFileStatus getStat() { 651 return stat; 652 } 653}