001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.asyncfs; 019 020import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.HEART_BEAT_SEQNO; 021import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.READ_TIMEOUT; 022import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.completeFile; 023import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.endFileLease; 024import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.getStatus; 025import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE; 026import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.WRITER_IDLE; 027import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; 028 029import java.io.IOException; 030import java.io.InterruptedIOException; 031import java.nio.ByteBuffer; 032import java.util.Collection; 033import java.util.Collections; 034import java.util.Iterator; 035import java.util.List; 036import java.util.Set; 037import java.util.concurrent.CompletableFuture; 038import java.util.concurrent.ConcurrentHashMap; 039import java.util.concurrent.ConcurrentLinkedDeque; 040import java.util.concurrent.ExecutionException; 041import java.util.concurrent.TimeUnit; 042import java.util.function.Supplier; 043 044import org.apache.hadoop.conf.Configuration; 045import org.apache.hadoop.crypto.Encryptor; 046import org.apache.hadoop.fs.Path; 047import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.CancelOnClose; 048import org.apache.hadoop.hbase.util.CancelableProgressable; 049import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils; 050import org.apache.hadoop.hdfs.DFSClient; 051import org.apache.hadoop.hdfs.DistributedFileSystem; 052import org.apache.hadoop.hdfs.protocol.ClientProtocol; 053import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 054import org.apache.hadoop.hdfs.protocol.ExtendedBlock; 055import org.apache.hadoop.hdfs.protocol.LocatedBlock; 056import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; 057import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; 058import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto; 059import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; 060import org.apache.hadoop.util.DataChecksum; 061import org.apache.yetus.audience.InterfaceAudience; 062 063import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 064import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 065import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 066import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator; 067import org.apache.hbase.thirdparty.io.netty.channel.Channel; 068import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler.Sharable; 069import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; 070import org.apache.hbase.thirdparty.io.netty.channel.ChannelId; 071import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler; 072import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; 073import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent; 074import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler; 075 076/** 077 * An asynchronous HDFS output stream implementation which fans out data to datanode and only 078 * supports writing file with only one block. 079 * <p> 080 * Use the createOutput method in {@link FanOutOneBlockAsyncDFSOutputHelper} to create. The mainly 081 * usage of this class is implementing WAL, so we only expose a little HDFS configurations in the 082 * method. And we place it here under io package because we want to make it independent of WAL 083 * implementation thus easier to move it to HDFS project finally. 084 * <p> 085 * Note that, although we support pipelined flush, i.e, write new data and then flush before the 086 * previous flush succeeds, the implementation is not thread safe, so you should not call its 087 * methods concurrently. 088 * <p> 089 * Advantages compare to DFSOutputStream: 090 * <ol> 091 * <li>The fan out mechanism. This will reduce the latency.</li> 092 * <li>Fail-fast when connection to datanode error. The WAL implementation could open new writer 093 * ASAP.</li> 094 * <li>We could benefit from netty's ByteBuf management mechanism.</li> 095 * </ol> 096 */ 097@InterfaceAudience.Private 098public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { 099 100 // The MAX_PACKET_SIZE is 16MB but it include the header size and checksum size. So here we set a 101 // smaller limit for data size. 102 private static final int MAX_DATA_LEN = 12 * 1024 * 1024; 103 104 private final Configuration conf; 105 106 private final DistributedFileSystem dfs; 107 108 private final DFSClient client; 109 110 private final ClientProtocol namenode; 111 112 private final String clientName; 113 114 private final String src; 115 116 private final long fileId; 117 118 private final ExtendedBlock block; 119 120 private final DatanodeInfo[] locations; 121 122 private final Encryptor encryptor; 123 124 private final List<Channel> datanodeList; 125 126 private final DataChecksum summer; 127 128 private final int maxDataLen; 129 130 private final ByteBufAllocator alloc; 131 132 private static final class Callback { 133 134 private final CompletableFuture<Long> future; 135 136 private final long ackedLength; 137 138 // should be backed by a thread safe collection 139 private final Set<ChannelId> unfinishedReplicas; 140 141 public Callback(CompletableFuture<Long> future, long ackedLength, 142 Collection<Channel> replicas) { 143 this.future = future; 144 this.ackedLength = ackedLength; 145 if (replicas.isEmpty()) { 146 this.unfinishedReplicas = Collections.emptySet(); 147 } else { 148 this.unfinishedReplicas = 149 Collections.newSetFromMap(new ConcurrentHashMap<ChannelId, Boolean>(replicas.size())); 150 replicas.stream().map(c -> c.id()).forEachOrdered(unfinishedReplicas::add); 151 } 152 } 153 } 154 155 private final ConcurrentLinkedDeque<Callback> waitingAckQueue = new ConcurrentLinkedDeque<>(); 156 157 private volatile long ackedBlockLength = 0L; 158 159 // this could be different from acked block length because a packet can not start at the middle of 160 // a chunk. 161 private long nextPacketOffsetInBlock = 0L; 162 163 // the length of the trailing partial chunk, this is because the packet start offset must be 164 // aligned with the length of checksum chunk so we need to resend the same data. 165 private int trailingPartialChunkLength = 0; 166 167 private long nextPacketSeqno = 0L; 168 169 private ByteBuf buf; 170 171 private final SendBufSizePredictor sendBufSizePRedictor = new SendBufSizePredictor(); 172 173 // State for connections to DN 174 private enum State { 175 STREAMING, CLOSING, BROKEN, CLOSED 176 } 177 178 private volatile State state; 179 180 // all lock-free to make it run faster 181 private void completed(Channel channel) { 182 for (Iterator<Callback> iter = waitingAckQueue.iterator(); iter.hasNext();) { 183 Callback c = iter.next(); 184 // if the current unfinished replicas does not contain us then it means that we have already 185 // acked this one, let's iterate to find the one we have not acked yet. 186 if (c.unfinishedReplicas.remove(channel.id())) { 187 if (c.unfinishedReplicas.isEmpty()) { 188 // we need to remove first before complete the future. It is possible that after we 189 // complete the future the upper layer will call close immediately before we remove the 190 // entry from waitingAckQueue and lead to an IllegalStateException. And also set the 191 // ackedBlockLength first otherwise we may use a wrong length to commit the block. This 192 // may lead to multiple remove and assign but is OK. The semantic of iter.remove is 193 // removing the entry returned by calling previous next, so if the entry has already been 194 // removed then it is a no-op, and for the assign, the values are the same so no problem. 195 iter.remove(); 196 ackedBlockLength = c.ackedLength; 197 // the future.complete check is to confirm that we are the only one who grabbed the work, 198 // otherwise just give up and return. 199 if (c.future.complete(c.ackedLength)) { 200 // also wake up flush requests which have the same length. 201 while (iter.hasNext()) { 202 Callback maybeDummyCb = iter.next(); 203 if (maybeDummyCb.ackedLength == c.ackedLength) { 204 iter.remove(); 205 maybeDummyCb.future.complete(c.ackedLength); 206 } else { 207 break; 208 } 209 } 210 } 211 } 212 return; 213 } 214 } 215 } 216 217 // this usually does not happen which means it is not on the critical path so make it synchronized 218 // so that the implementation will not burn up our brain as there are multiple state changes and 219 // checks. 220 private synchronized void failed(Channel channel, Supplier<Throwable> errorSupplier) { 221 if (state == State.BROKEN || state == State.CLOSED) { 222 return; 223 } 224 if (state == State.CLOSING) { 225 Callback c = waitingAckQueue.peekFirst(); 226 if (c == null || !c.unfinishedReplicas.contains(channel.id())) { 227 // nothing, the endBlock request has already finished. 228 return; 229 } 230 } 231 // disable further write, and fail all pending ack. 232 state = State.BROKEN; 233 Throwable error = errorSupplier.get(); 234 for (Iterator<Callback> iter = waitingAckQueue.iterator(); iter.hasNext();) { 235 Callback c = iter.next(); 236 // find the first sync request which we have not acked yet and fail all the request after it. 237 if (!c.unfinishedReplicas.contains(channel.id())) { 238 continue; 239 } 240 for (;;) { 241 c.future.completeExceptionally(error); 242 if (!iter.hasNext()) { 243 break; 244 } 245 c = iter.next(); 246 } 247 break; 248 } 249 datanodeList.forEach(ch -> ch.close()); 250 } 251 252 @Sharable 253 private final class AckHandler extends SimpleChannelInboundHandler<PipelineAckProto> { 254 255 private final int timeoutMs; 256 257 public AckHandler(int timeoutMs) { 258 this.timeoutMs = timeoutMs; 259 } 260 261 @Override 262 protected void channelRead0(ChannelHandlerContext ctx, PipelineAckProto ack) throws Exception { 263 Status reply = getStatus(ack); 264 if (reply != Status.SUCCESS) { 265 failed(ctx.channel(), () -> new IOException("Bad response " + reply + " for block " + 266 block + " from datanode " + ctx.channel().remoteAddress())); 267 return; 268 } 269 if (PipelineAck.isRestartOOBStatus(reply)) { 270 failed(ctx.channel(), () -> new IOException("Restart response " + reply + " for block " + 271 block + " from datanode " + ctx.channel().remoteAddress())); 272 return; 273 } 274 if (ack.getSeqno() == HEART_BEAT_SEQNO) { 275 return; 276 } 277 completed(ctx.channel()); 278 } 279 280 @Override 281 public void channelInactive(ChannelHandlerContext ctx) throws Exception { 282 if (state == State.CLOSED) { 283 return; 284 } 285 failed(ctx.channel(), 286 () -> new IOException("Connection to " + ctx.channel().remoteAddress() + " closed")); 287 } 288 289 @Override 290 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 291 failed(ctx.channel(), () -> cause); 292 } 293 294 @Override 295 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { 296 if (evt instanceof IdleStateEvent) { 297 IdleStateEvent e = (IdleStateEvent) evt; 298 if (e.state() == READER_IDLE) { 299 failed(ctx.channel(), 300 () -> new IOException("Timeout(" + timeoutMs + "ms) waiting for response")); 301 } else if (e.state() == WRITER_IDLE) { 302 PacketHeader heartbeat = new PacketHeader(4, 0, HEART_BEAT_SEQNO, false, 0, false); 303 int len = heartbeat.getSerializedSize(); 304 ByteBuf buf = alloc.buffer(len); 305 heartbeat.putInBuffer(buf.nioBuffer(0, len)); 306 buf.writerIndex(len); 307 ctx.channel().writeAndFlush(buf); 308 } 309 return; 310 } 311 super.userEventTriggered(ctx, evt); 312 } 313 } 314 315 private void setupReceiver(int timeoutMs) { 316 AckHandler ackHandler = new AckHandler(timeoutMs); 317 for (Channel ch : datanodeList) { 318 ch.pipeline().addLast( 319 new IdleStateHandler(timeoutMs, timeoutMs / 2, 0, TimeUnit.MILLISECONDS), 320 new ProtobufVarint32FrameDecoder(), 321 new ProtobufDecoder(PipelineAckProto.getDefaultInstance()), ackHandler); 322 ch.config().setAutoRead(true); 323 } 324 } 325 326 FanOutOneBlockAsyncDFSOutput(Configuration conf,DistributedFileSystem dfs, 327 DFSClient client, ClientProtocol namenode, String clientName, String src, long fileId, 328 LocatedBlock locatedBlock, Encryptor encryptor, List<Channel> datanodeList, 329 DataChecksum summer, ByteBufAllocator alloc) { 330 this.conf = conf; 331 this.dfs = dfs; 332 this.client = client; 333 this.namenode = namenode; 334 this.fileId = fileId; 335 this.clientName = clientName; 336 this.src = src; 337 this.block = locatedBlock.getBlock(); 338 this.locations = locatedBlock.getLocations(); 339 this.encryptor = encryptor; 340 this.datanodeList = datanodeList; 341 this.summer = summer; 342 this.maxDataLen = MAX_DATA_LEN - (MAX_DATA_LEN % summer.getBytesPerChecksum()); 343 this.alloc = alloc; 344 this.buf = alloc.directBuffer(sendBufSizePRedictor.initialSize()); 345 this.state = State.STREAMING; 346 setupReceiver(conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT)); 347 } 348 349 @Override 350 public void writeInt(int i) { 351 buf.ensureWritable(4); 352 buf.writeInt(i); 353 } 354 355 @Override 356 public void write(ByteBuffer bb) { 357 buf.ensureWritable(bb.remaining()); 358 buf.writeBytes(bb); 359 } 360 361 @Override 362 public void write(byte[] b) { 363 write(b, 0, b.length); 364 } 365 366 @Override 367 public void write(byte[] b, int off, int len) { 368 buf.ensureWritable(len); 369 buf.writeBytes(b, off, len); 370 } 371 372 @Override 373 public int buffered() { 374 return buf.readableBytes(); 375 } 376 377 @Override 378 public DatanodeInfo[] getPipeline() { 379 return locations; 380 } 381 382 private void flushBuffer(CompletableFuture<Long> future, ByteBuf dataBuf, 383 long nextPacketOffsetInBlock, boolean syncBlock) { 384 int dataLen = dataBuf.readableBytes(); 385 int chunkLen = summer.getBytesPerChecksum(); 386 int trailingPartialChunkLen = dataLen % chunkLen; 387 int numChecks = dataLen / chunkLen + (trailingPartialChunkLen != 0 ? 1 : 0); 388 int checksumLen = numChecks * summer.getChecksumSize(); 389 ByteBuf checksumBuf = alloc.directBuffer(checksumLen); 390 summer.calculateChunkedSums(dataBuf.nioBuffer(), checksumBuf.nioBuffer(0, checksumLen)); 391 checksumBuf.writerIndex(checksumLen); 392 PacketHeader header = new PacketHeader(4 + checksumLen + dataLen, nextPacketOffsetInBlock, 393 nextPacketSeqno, false, dataLen, syncBlock); 394 int headerLen = header.getSerializedSize(); 395 ByteBuf headerBuf = alloc.buffer(headerLen); 396 header.putInBuffer(headerBuf.nioBuffer(0, headerLen)); 397 headerBuf.writerIndex(headerLen); 398 Callback c = new Callback(future, nextPacketOffsetInBlock + dataLen, datanodeList); 399 waitingAckQueue.addLast(c); 400 // recheck again after we pushed the callback to queue 401 if (state != State.STREAMING && waitingAckQueue.peekFirst() == c) { 402 future.completeExceptionally(new IOException("stream already broken")); 403 // it's the one we have just pushed or just a no-op 404 waitingAckQueue.removeFirst(); 405 return; 406 } 407 datanodeList.forEach(ch -> { 408 ch.write(headerBuf.retainedDuplicate()); 409 ch.write(checksumBuf.retainedDuplicate()); 410 ch.writeAndFlush(dataBuf.retainedDuplicate()); 411 }); 412 checksumBuf.release(); 413 headerBuf.release(); 414 dataBuf.release(); 415 nextPacketSeqno++; 416 } 417 418 private void flush0(CompletableFuture<Long> future, boolean syncBlock) { 419 if (state != State.STREAMING) { 420 future.completeExceptionally(new IOException("stream already broken")); 421 return; 422 } 423 int dataLen = buf.readableBytes(); 424 if (dataLen == trailingPartialChunkLength) { 425 // no new data 426 long lengthAfterFlush = nextPacketOffsetInBlock + dataLen; 427 Callback lastFlush = waitingAckQueue.peekLast(); 428 if (lastFlush != null) { 429 Callback c = new Callback(future, lengthAfterFlush, Collections.emptyList()); 430 waitingAckQueue.addLast(c); 431 // recheck here if we have already removed the previous callback from the queue 432 if (waitingAckQueue.peekFirst() == c) { 433 // all previous callbacks have been removed 434 // notice that this does mean we will always win here because the background thread may 435 // have already started to mark the future here as completed in the completed or failed 436 // methods but haven't removed it from the queue yet. That's also why the removeFirst 437 // call below may be a no-op. 438 if (state != State.STREAMING) { 439 future.completeExceptionally(new IOException("stream already broken")); 440 } else { 441 future.complete(lengthAfterFlush); 442 } 443 // it's the one we have just pushed or just a no-op 444 waitingAckQueue.removeFirst(); 445 } 446 } else { 447 // we must have acked all the data so the ackedBlockLength must be same with 448 // lengthAfterFlush 449 future.complete(lengthAfterFlush); 450 } 451 return; 452 } 453 454 if (encryptor != null) { 455 ByteBuf encryptBuf = alloc.directBuffer(dataLen); 456 buf.readBytes(encryptBuf, trailingPartialChunkLength); 457 int toEncryptLength = dataLen - trailingPartialChunkLength; 458 try { 459 encryptor.encrypt(buf.nioBuffer(trailingPartialChunkLength, toEncryptLength), 460 encryptBuf.nioBuffer(trailingPartialChunkLength, toEncryptLength)); 461 } catch (IOException e) { 462 encryptBuf.release(); 463 future.completeExceptionally(e); 464 return; 465 } 466 encryptBuf.writerIndex(dataLen); 467 buf.release(); 468 buf = encryptBuf; 469 } 470 471 if (dataLen > maxDataLen) { 472 // We need to write out the data by multiple packets as the max packet allowed is 16M. 473 long nextSubPacketOffsetInBlock = nextPacketOffsetInBlock; 474 for (int remaining = dataLen;;) { 475 if (remaining < maxDataLen) { 476 flushBuffer(future, buf.readRetainedSlice(remaining), nextSubPacketOffsetInBlock, 477 syncBlock); 478 break; 479 } else { 480 flushBuffer(new CompletableFuture<>(), buf.readRetainedSlice(maxDataLen), 481 nextSubPacketOffsetInBlock, syncBlock); 482 remaining -= maxDataLen; 483 nextSubPacketOffsetInBlock += maxDataLen; 484 } 485 } 486 } else { 487 flushBuffer(future, buf.retain(), nextPacketOffsetInBlock, syncBlock); 488 } 489 trailingPartialChunkLength = dataLen % summer.getBytesPerChecksum(); 490 ByteBuf newBuf = alloc.directBuffer(sendBufSizePRedictor.guess(dataLen)) 491 .ensureWritable(trailingPartialChunkLength); 492 if (trailingPartialChunkLength != 0) { 493 buf.readerIndex(dataLen - trailingPartialChunkLength).readBytes(newBuf, 494 trailingPartialChunkLength); 495 } 496 buf.release(); 497 this.buf = newBuf; 498 nextPacketOffsetInBlock += dataLen - trailingPartialChunkLength; 499 } 500 501 /** 502 * Flush the buffer out to datanodes. 503 * @param syncBlock will call hsync if true, otherwise hflush. 504 * @return A CompletableFuture that hold the acked length after flushing. 505 */ 506 @Override 507 public CompletableFuture<Long> flush(boolean syncBlock) { 508 CompletableFuture<Long> future = new CompletableFuture<>(); 509 flush0(future, syncBlock); 510 return future; 511 } 512 513 private void endBlock() throws IOException { 514 Preconditions.checkState(waitingAckQueue.isEmpty(), 515 "should call flush first before calling close"); 516 if (state != State.STREAMING) { 517 throw new IOException("stream already broken"); 518 } 519 state = State.CLOSING; 520 long finalizedLength = ackedBlockLength; 521 PacketHeader header = new PacketHeader(4, finalizedLength, nextPacketSeqno, true, 0, false); 522 buf.release(); 523 buf = null; 524 int headerLen = header.getSerializedSize(); 525 ByteBuf headerBuf = alloc.directBuffer(headerLen); 526 header.putInBuffer(headerBuf.nioBuffer(0, headerLen)); 527 headerBuf.writerIndex(headerLen); 528 CompletableFuture<Long> future = new CompletableFuture<>(); 529 waitingAckQueue.add(new Callback(future, finalizedLength, datanodeList)); 530 datanodeList.forEach(ch -> ch.writeAndFlush(headerBuf.retainedDuplicate())); 531 headerBuf.release(); 532 try { 533 future.get(); 534 } catch (InterruptedException e) { 535 throw (IOException) new InterruptedIOException().initCause(e); 536 } catch (ExecutionException e) { 537 Throwable cause = e.getCause(); 538 Throwables.propagateIfPossible(cause, IOException.class); 539 throw new IOException(cause); 540 } 541 } 542 543 /** 544 * The close method when error occurred. Now we just call recoverFileLease. 545 */ 546 @Override 547 public void recoverAndClose(CancelableProgressable reporter) throws IOException { 548 if (buf != null) { 549 buf.release(); 550 buf = null; 551 } 552 datanodeList.forEach(ch -> ch.close()); 553 datanodeList.forEach(ch -> ch.closeFuture().awaitUninterruptibly()); 554 endFileLease(client, fileId); 555 RecoverLeaseFSUtils.recoverFileLease(dfs, new Path(src), conf, 556 reporter == null ? new CancelOnClose(client) : reporter); 557 } 558 559 /** 560 * End the current block and complete file at namenode. You should call 561 * {@link #recoverAndClose(CancelableProgressable)} if this method throws an exception. 562 */ 563 @Override 564 public void close() throws IOException { 565 endBlock(); 566 state = State.CLOSED; 567 datanodeList.forEach(ch -> ch.close()); 568 datanodeList.forEach(ch -> ch.closeFuture().awaitUninterruptibly()); 569 block.setNumBytes(ackedBlockLength); 570 completeFile(client, namenode, src, clientName, block, fileId); 571 } 572 573 @Override 574 public boolean isBroken() { 575 return state == State.BROKEN; 576 } 577}