001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.asyncfs; 019 020import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.HEART_BEAT_SEQNO; 021import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.READ_TIMEOUT; 022import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.completeFile; 023import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.endFileLease; 024import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.getStatus; 025import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE; 026import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.WRITER_IDLE; 027import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; 028 029import java.io.IOException; 030import java.io.InterruptedIOException; 031import java.nio.ByteBuffer; 032import java.util.Collection; 033import java.util.Collections; 034import java.util.Iterator; 035import java.util.List; 036import java.util.Set; 037import java.util.concurrent.CompletableFuture; 038import java.util.concurrent.ConcurrentHashMap; 039import java.util.concurrent.ConcurrentLinkedDeque; 040import java.util.concurrent.ExecutionException; 041import java.util.concurrent.TimeUnit; 042import java.util.function.Supplier; 043 044import org.apache.hadoop.conf.Configuration; 045import org.apache.hadoop.crypto.Encryptor; 046import org.apache.hadoop.fs.Path; 047import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.CancelOnClose; 048import org.apache.hadoop.hbase.util.CancelableProgressable; 049import org.apache.hadoop.hbase.util.FSUtils; 050import org.apache.hadoop.hdfs.DFSClient; 051import org.apache.hadoop.hdfs.DistributedFileSystem; 052import org.apache.hadoop.hdfs.protocol.ClientProtocol; 053import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 054import org.apache.hadoop.hdfs.protocol.ExtendedBlock; 055import org.apache.hadoop.hdfs.protocol.LocatedBlock; 056import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; 057import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; 058import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto; 059import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; 060import org.apache.hadoop.util.DataChecksum; 061import org.apache.yetus.audience.InterfaceAudience; 062 063import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 064import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 065import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 066import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator; 067import org.apache.hbase.thirdparty.io.netty.channel.Channel; 068import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler.Sharable; 069import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; 070import org.apache.hbase.thirdparty.io.netty.channel.ChannelId; 071import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler; 072import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufDecoder; 073import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; 074import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent; 075import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler; 076 077/** 078 * An asynchronous HDFS output stream implementation which fans out data to datanode and only 079 * supports writing file with only one block. 080 * <p> 081 * Use the createOutput method in {@link FanOutOneBlockAsyncDFSOutputHelper} to create. The mainly 082 * usage of this class is implementing WAL, so we only expose a little HDFS configurations in the 083 * method. And we place it here under io package because we want to make it independent of WAL 084 * implementation thus easier to move it to HDFS project finally. 085 * <p> 086 * Note that, although we support pipelined flush, i.e, write new data and then flush before the 087 * previous flush succeeds, the implementation is not thread safe, so you should not call its 088 * methods concurrently. 089 * <p> 090 * Advantages compare to DFSOutputStream: 091 * <ol> 092 * <li>The fan out mechanism. This will reduce the latency.</li> 093 * <li>Fail-fast when connection to datanode error. The WAL implementation could open new writer 094 * ASAP.</li> 095 * <li>We could benefit from netty's ByteBuf management mechanism.</li> 096 * </ol> 097 */ 098@InterfaceAudience.Private 099public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { 100 101 // The MAX_PACKET_SIZE is 16MB but it include the header size and checksum size. So here we set a 102 // smaller limit for data size. 103 private static final int MAX_DATA_LEN = 12 * 1024 * 1024; 104 105 private final Configuration conf; 106 107 private final FSUtils fsUtils; 108 109 private final DistributedFileSystem dfs; 110 111 private final DFSClient client; 112 113 private final ClientProtocol namenode; 114 115 private final String clientName; 116 117 private final String src; 118 119 private final long fileId; 120 121 private final ExtendedBlock block; 122 123 private final DatanodeInfo[] locations; 124 125 private final Encryptor encryptor; 126 127 private final List<Channel> datanodeList; 128 129 private final DataChecksum summer; 130 131 private final int maxDataLen; 132 133 private final ByteBufAllocator alloc; 134 135 private static final class Callback { 136 137 private final CompletableFuture<Long> future; 138 139 private final long ackedLength; 140 141 // should be backed by a thread safe collection 142 private final Set<ChannelId> unfinishedReplicas; 143 144 public Callback(CompletableFuture<Long> future, long ackedLength, 145 Collection<Channel> replicas) { 146 this.future = future; 147 this.ackedLength = ackedLength; 148 if (replicas.isEmpty()) { 149 this.unfinishedReplicas = Collections.emptySet(); 150 } else { 151 this.unfinishedReplicas = 152 Collections.newSetFromMap(new ConcurrentHashMap<ChannelId, Boolean>(replicas.size())); 153 replicas.stream().map(c -> c.id()).forEachOrdered(unfinishedReplicas::add); 154 } 155 } 156 } 157 158 private final ConcurrentLinkedDeque<Callback> waitingAckQueue = new ConcurrentLinkedDeque<>(); 159 160 private volatile long ackedBlockLength = 0L; 161 162 // this could be different from acked block length because a packet can not start at the middle of 163 // a chunk. 164 private long nextPacketOffsetInBlock = 0L; 165 166 // the length of the trailing partial chunk, this is because the packet start offset must be 167 // aligned with the length of checksum chunk so we need to resend the same data. 168 private int trailingPartialChunkLength = 0; 169 170 private long nextPacketSeqno = 0L; 171 172 private ByteBuf buf; 173 174 private final SendBufSizePredictor sendBufSizePRedictor = new SendBufSizePredictor(); 175 176 // State for connections to DN 177 private enum State { 178 STREAMING, CLOSING, BROKEN, CLOSED 179 } 180 181 private volatile State state; 182 183 // all lock-free to make it run faster 184 private void completed(Channel channel) { 185 for (Iterator<Callback> iter = waitingAckQueue.iterator(); iter.hasNext();) { 186 Callback c = iter.next(); 187 // if the current unfinished replicas does not contain us then it means that we have already 188 // acked this one, let's iterate to find the one we have not acked yet. 189 if (c.unfinishedReplicas.remove(channel.id())) { 190 if (c.unfinishedReplicas.isEmpty()) { 191 // we need to remove first before complete the future. It is possible that after we 192 // complete the future the upper layer will call close immediately before we remove the 193 // entry from waitingAckQueue and lead to an IllegalStateException. And also set the 194 // ackedBlockLength first otherwise we may use a wrong length to commit the block. This 195 // may lead to multiple remove and assign but is OK. The semantic of iter.remove is 196 // removing the entry returned by calling previous next, so if the entry has already been 197 // removed then it is a no-op, and for the assign, the values are the same so no problem. 198 iter.remove(); 199 ackedBlockLength = c.ackedLength; 200 // the future.complete check is to confirm that we are the only one who grabbed the work, 201 // otherwise just give up and return. 202 if (c.future.complete(c.ackedLength)) { 203 // also wake up flush requests which have the same length. 204 while (iter.hasNext()) { 205 Callback maybeDummyCb = iter.next(); 206 if (maybeDummyCb.ackedLength == c.ackedLength) { 207 iter.remove(); 208 maybeDummyCb.future.complete(c.ackedLength); 209 } else { 210 break; 211 } 212 } 213 } 214 } 215 return; 216 } 217 } 218 } 219 220 // this usually does not happen which means it is not on the critical path so make it synchronized 221 // so that the implementation will not burn up our brain as there are multiple state changes and 222 // checks. 223 private synchronized void failed(Channel channel, Supplier<Throwable> errorSupplier) { 224 if (state == State.BROKEN || state == State.CLOSED) { 225 return; 226 } 227 if (state == State.CLOSING) { 228 Callback c = waitingAckQueue.peekFirst(); 229 if (c == null || !c.unfinishedReplicas.contains(channel.id())) { 230 // nothing, the endBlock request has already finished. 231 return; 232 } 233 } 234 // disable further write, and fail all pending ack. 235 state = State.BROKEN; 236 Throwable error = errorSupplier.get(); 237 for (Iterator<Callback> iter = waitingAckQueue.iterator(); iter.hasNext();) { 238 Callback c = iter.next(); 239 // find the first sync request which we have not acked yet and fail all the request after it. 240 if (!c.unfinishedReplicas.contains(channel.id())) { 241 continue; 242 } 243 for (;;) { 244 c.future.completeExceptionally(error); 245 if (!iter.hasNext()) { 246 break; 247 } 248 c = iter.next(); 249 } 250 break; 251 } 252 datanodeList.forEach(ch -> ch.close()); 253 } 254 255 @Sharable 256 private final class AckHandler extends SimpleChannelInboundHandler<PipelineAckProto> { 257 258 private final int timeoutMs; 259 260 public AckHandler(int timeoutMs) { 261 this.timeoutMs = timeoutMs; 262 } 263 264 @Override 265 protected void channelRead0(ChannelHandlerContext ctx, PipelineAckProto ack) throws Exception { 266 Status reply = getStatus(ack); 267 if (reply != Status.SUCCESS) { 268 failed(ctx.channel(), () -> new IOException("Bad response " + reply + " for block " + 269 block + " from datanode " + ctx.channel().remoteAddress())); 270 return; 271 } 272 if (PipelineAck.isRestartOOBStatus(reply)) { 273 failed(ctx.channel(), () -> new IOException("Restart response " + reply + " for block " + 274 block + " from datanode " + ctx.channel().remoteAddress())); 275 return; 276 } 277 if (ack.getSeqno() == HEART_BEAT_SEQNO) { 278 return; 279 } 280 completed(ctx.channel()); 281 } 282 283 @Override 284 public void channelInactive(ChannelHandlerContext ctx) throws Exception { 285 if (state == State.CLOSED) { 286 return; 287 } 288 failed(ctx.channel(), 289 () -> new IOException("Connection to " + ctx.channel().remoteAddress() + " closed")); 290 } 291 292 @Override 293 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 294 failed(ctx.channel(), () -> cause); 295 } 296 297 @Override 298 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { 299 if (evt instanceof IdleStateEvent) { 300 IdleStateEvent e = (IdleStateEvent) evt; 301 if (e.state() == READER_IDLE) { 302 failed(ctx.channel(), 303 () -> new IOException("Timeout(" + timeoutMs + "ms) waiting for response")); 304 } else if (e.state() == WRITER_IDLE) { 305 PacketHeader heartbeat = new PacketHeader(4, 0, HEART_BEAT_SEQNO, false, 0, false); 306 int len = heartbeat.getSerializedSize(); 307 ByteBuf buf = alloc.buffer(len); 308 heartbeat.putInBuffer(buf.nioBuffer(0, len)); 309 buf.writerIndex(len); 310 ctx.channel().writeAndFlush(buf); 311 } 312 return; 313 } 314 super.userEventTriggered(ctx, evt); 315 } 316 } 317 318 private void setupReceiver(int timeoutMs) { 319 AckHandler ackHandler = new AckHandler(timeoutMs); 320 for (Channel ch : datanodeList) { 321 ch.pipeline().addLast( 322 new IdleStateHandler(timeoutMs, timeoutMs / 2, 0, TimeUnit.MILLISECONDS), 323 new ProtobufVarint32FrameDecoder(), 324 new ProtobufDecoder(PipelineAckProto.getDefaultInstance()), ackHandler); 325 ch.config().setAutoRead(true); 326 } 327 } 328 329 FanOutOneBlockAsyncDFSOutput(Configuration conf, FSUtils fsUtils, DistributedFileSystem dfs, 330 DFSClient client, ClientProtocol namenode, String clientName, String src, long fileId, 331 LocatedBlock locatedBlock, Encryptor encryptor, List<Channel> datanodeList, 332 DataChecksum summer, ByteBufAllocator alloc) { 333 this.conf = conf; 334 this.fsUtils = fsUtils; 335 this.dfs = dfs; 336 this.client = client; 337 this.namenode = namenode; 338 this.fileId = fileId; 339 this.clientName = clientName; 340 this.src = src; 341 this.block = locatedBlock.getBlock(); 342 this.locations = locatedBlock.getLocations(); 343 this.encryptor = encryptor; 344 this.datanodeList = datanodeList; 345 this.summer = summer; 346 this.maxDataLen = MAX_DATA_LEN - (MAX_DATA_LEN % summer.getBytesPerChecksum()); 347 this.alloc = alloc; 348 this.buf = alloc.directBuffer(sendBufSizePRedictor.initialSize()); 349 this.state = State.STREAMING; 350 setupReceiver(conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT)); 351 } 352 353 @Override 354 public void writeInt(int i) { 355 buf.ensureWritable(4); 356 buf.writeInt(i); 357 } 358 359 @Override 360 public void write(ByteBuffer bb) { 361 buf.ensureWritable(bb.remaining()); 362 buf.writeBytes(bb); 363 } 364 365 @Override 366 public void write(byte[] b) { 367 write(b, 0, b.length); 368 } 369 370 @Override 371 public void write(byte[] b, int off, int len) { 372 buf.ensureWritable(len); 373 buf.writeBytes(b, off, len); 374 } 375 376 @Override 377 public int buffered() { 378 return buf.readableBytes(); 379 } 380 381 @Override 382 public DatanodeInfo[] getPipeline() { 383 return locations; 384 } 385 386 private void flushBuffer(CompletableFuture<Long> future, ByteBuf dataBuf, 387 long nextPacketOffsetInBlock, boolean syncBlock) { 388 int dataLen = dataBuf.readableBytes(); 389 int chunkLen = summer.getBytesPerChecksum(); 390 int trailingPartialChunkLen = dataLen % chunkLen; 391 int numChecks = dataLen / chunkLen + (trailingPartialChunkLen != 0 ? 1 : 0); 392 int checksumLen = numChecks * summer.getChecksumSize(); 393 ByteBuf checksumBuf = alloc.directBuffer(checksumLen); 394 summer.calculateChunkedSums(dataBuf.nioBuffer(), checksumBuf.nioBuffer(0, checksumLen)); 395 checksumBuf.writerIndex(checksumLen); 396 PacketHeader header = new PacketHeader(4 + checksumLen + dataLen, nextPacketOffsetInBlock, 397 nextPacketSeqno, false, dataLen, syncBlock); 398 int headerLen = header.getSerializedSize(); 399 ByteBuf headerBuf = alloc.buffer(headerLen); 400 header.putInBuffer(headerBuf.nioBuffer(0, headerLen)); 401 headerBuf.writerIndex(headerLen); 402 Callback c = new Callback(future, nextPacketOffsetInBlock + dataLen, datanodeList); 403 waitingAckQueue.addLast(c); 404 // recheck again after we pushed the callback to queue 405 if (state != State.STREAMING && waitingAckQueue.peekFirst() == c) { 406 future.completeExceptionally(new IOException("stream already broken")); 407 // it's the one we have just pushed or just a no-op 408 waitingAckQueue.removeFirst(); 409 return; 410 } 411 datanodeList.forEach(ch -> { 412 ch.write(headerBuf.retainedDuplicate()); 413 ch.write(checksumBuf.retainedDuplicate()); 414 ch.writeAndFlush(dataBuf.retainedDuplicate()); 415 }); 416 checksumBuf.release(); 417 headerBuf.release(); 418 dataBuf.release(); 419 nextPacketSeqno++; 420 } 421 422 private void flush0(CompletableFuture<Long> future, boolean syncBlock) { 423 if (state != State.STREAMING) { 424 future.completeExceptionally(new IOException("stream already broken")); 425 return; 426 } 427 int dataLen = buf.readableBytes(); 428 if (dataLen == trailingPartialChunkLength) { 429 // no new data 430 long lengthAfterFlush = nextPacketOffsetInBlock + dataLen; 431 Callback lastFlush = waitingAckQueue.peekLast(); 432 if (lastFlush != null) { 433 Callback c = new Callback(future, lengthAfterFlush, Collections.emptyList()); 434 waitingAckQueue.addLast(c); 435 // recheck here if we have already removed the previous callback from the queue 436 if (waitingAckQueue.peekFirst() == c) { 437 // all previous callbacks have been removed 438 // notice that this does mean we will always win here because the background thread may 439 // have already started to mark the future here as completed in the completed or failed 440 // methods but haven't removed it from the queue yet. That's also why the removeFirst 441 // call below may be a no-op. 442 if (state != State.STREAMING) { 443 future.completeExceptionally(new IOException("stream already broken")); 444 } else { 445 future.complete(lengthAfterFlush); 446 } 447 // it's the one we have just pushed or just a no-op 448 waitingAckQueue.removeFirst(); 449 } 450 } else { 451 // we must have acked all the data so the ackedBlockLength must be same with 452 // lengthAfterFlush 453 future.complete(lengthAfterFlush); 454 } 455 return; 456 } 457 458 if (encryptor != null) { 459 ByteBuf encryptBuf = alloc.directBuffer(dataLen); 460 buf.readBytes(encryptBuf, trailingPartialChunkLength); 461 int toEncryptLength = dataLen - trailingPartialChunkLength; 462 try { 463 encryptor.encrypt(buf.nioBuffer(trailingPartialChunkLength, toEncryptLength), 464 encryptBuf.nioBuffer(trailingPartialChunkLength, toEncryptLength)); 465 } catch (IOException e) { 466 encryptBuf.release(); 467 future.completeExceptionally(e); 468 return; 469 } 470 encryptBuf.writerIndex(dataLen); 471 buf.release(); 472 buf = encryptBuf; 473 } 474 475 if (dataLen > maxDataLen) { 476 // We need to write out the data by multiple packets as the max packet allowed is 16M. 477 long nextSubPacketOffsetInBlock = nextPacketOffsetInBlock; 478 for (int remaining = dataLen;;) { 479 if (remaining < maxDataLen) { 480 flushBuffer(future, buf.readRetainedSlice(remaining), nextSubPacketOffsetInBlock, 481 syncBlock); 482 break; 483 } else { 484 flushBuffer(new CompletableFuture<>(), buf.readRetainedSlice(maxDataLen), 485 nextSubPacketOffsetInBlock, syncBlock); 486 remaining -= maxDataLen; 487 nextSubPacketOffsetInBlock += maxDataLen; 488 } 489 } 490 } else { 491 flushBuffer(future, buf.retain(), nextPacketOffsetInBlock, syncBlock); 492 } 493 trailingPartialChunkLength = dataLen % summer.getBytesPerChecksum(); 494 ByteBuf newBuf = alloc.directBuffer(sendBufSizePRedictor.guess(dataLen)) 495 .ensureWritable(trailingPartialChunkLength); 496 if (trailingPartialChunkLength != 0) { 497 buf.readerIndex(dataLen - trailingPartialChunkLength).readBytes(newBuf, 498 trailingPartialChunkLength); 499 } 500 buf.release(); 501 this.buf = newBuf; 502 nextPacketOffsetInBlock += dataLen - trailingPartialChunkLength; 503 } 504 505 /** 506 * Flush the buffer out to datanodes. 507 * @param syncBlock will call hsync if true, otherwise hflush. 508 * @return A CompletableFuture that hold the acked length after flushing. 509 */ 510 @Override 511 public CompletableFuture<Long> flush(boolean syncBlock) { 512 CompletableFuture<Long> future = new CompletableFuture<>(); 513 flush0(future, syncBlock); 514 return future; 515 } 516 517 private void endBlock() throws IOException { 518 Preconditions.checkState(waitingAckQueue.isEmpty(), 519 "should call flush first before calling close"); 520 if (state != State.STREAMING) { 521 throw new IOException("stream already broken"); 522 } 523 state = State.CLOSING; 524 long finalizedLength = ackedBlockLength; 525 PacketHeader header = new PacketHeader(4, finalizedLength, nextPacketSeqno, true, 0, false); 526 buf.release(); 527 buf = null; 528 int headerLen = header.getSerializedSize(); 529 ByteBuf headerBuf = alloc.directBuffer(headerLen); 530 header.putInBuffer(headerBuf.nioBuffer(0, headerLen)); 531 headerBuf.writerIndex(headerLen); 532 CompletableFuture<Long> future = new CompletableFuture<>(); 533 waitingAckQueue.add(new Callback(future, finalizedLength, datanodeList)); 534 datanodeList.forEach(ch -> ch.writeAndFlush(headerBuf.retainedDuplicate())); 535 headerBuf.release(); 536 try { 537 future.get(); 538 } catch (InterruptedException e) { 539 throw (IOException) new InterruptedIOException().initCause(e); 540 } catch (ExecutionException e) { 541 Throwable cause = e.getCause(); 542 Throwables.propagateIfPossible(cause, IOException.class); 543 throw new IOException(cause); 544 } 545 } 546 547 /** 548 * The close method when error occurred. Now we just call recoverFileLease. 549 */ 550 @Override 551 public void recoverAndClose(CancelableProgressable reporter) throws IOException { 552 if (buf != null) { 553 buf.release(); 554 buf = null; 555 } 556 datanodeList.forEach(ch -> ch.close()); 557 datanodeList.forEach(ch -> ch.closeFuture().awaitUninterruptibly()); 558 endFileLease(client, fileId); 559 fsUtils.recoverFileLease(dfs, new Path(src), conf, 560 reporter == null ? new CancelOnClose(client) : reporter); 561 } 562 563 /** 564 * End the current block and complete file at namenode. You should call 565 * {@link #recoverAndClose(CancelableProgressable)} if this method throws an exception. 566 */ 567 @Override 568 public void close() throws IOException { 569 endBlock(); 570 state = State.CLOSED; 571 datanodeList.forEach(ch -> ch.close()); 572 datanodeList.forEach(ch -> ch.closeFuture().awaitUninterruptibly()); 573 block.setNumBytes(ackedBlockLength); 574 completeFile(client, namenode, src, clientName, block, fileId); 575 } 576 577 @Override 578 public boolean isBroken() { 579 return state == State.BROKEN; 580 } 581}