001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.asyncfs; 019 020import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.createEncryptor; 021import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate; 022import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; 023import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME; 024import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT; 025import static org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage.PIPELINE_SETUP_CREATE; 026import static org.apache.hbase.thirdparty.io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS; 027import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE; 028 029import com.google.protobuf.CodedOutputStream; 030import java.io.IOException; 031import java.io.InterruptedIOException; 032import java.lang.reflect.InvocationTargetException; 033import java.lang.reflect.Method; 034import java.util.ArrayList; 035import java.util.EnumSet; 036import java.util.HashSet; 037import java.util.IdentityHashMap; 038import java.util.List; 039import java.util.Map; 040import java.util.Set; 041import java.util.concurrent.TimeUnit; 042import org.apache.hadoop.conf.Configuration; 043import org.apache.hadoop.crypto.CryptoProtocolVersion; 044import org.apache.hadoop.crypto.Encryptor; 045import org.apache.hadoop.fs.CreateFlag; 046import org.apache.hadoop.fs.FileSystem; 047import org.apache.hadoop.fs.FileSystemLinkResolver; 048import org.apache.hadoop.fs.Path; 049import org.apache.hadoop.fs.StorageType; 050import org.apache.hadoop.fs.UnresolvedLinkException; 051import org.apache.hadoop.fs.permission.FsPermission; 052import org.apache.hadoop.hbase.client.ConnectionUtils; 053import org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager; 054import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; 055import org.apache.hadoop.hbase.util.CancelableProgressable; 056import org.apache.hadoop.hdfs.DFSClient; 057import org.apache.hadoop.hdfs.DFSOutputStream; 058import org.apache.hadoop.hdfs.DistributedFileSystem; 059import org.apache.hadoop.hdfs.protocol.ClientProtocol; 060import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 061import org.apache.hadoop.hdfs.protocol.ExtendedBlock; 062import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; 063import org.apache.hadoop.hdfs.protocol.LocatedBlock; 064import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; 065import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; 066import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; 067import org.apache.hadoop.hdfs.protocol.datatransfer.Op; 068import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; 069import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.ECN; 070import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto; 071import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; 072import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto; 073import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto; 074import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto; 075import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto; 076import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto; 077import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; 078import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; 079import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; 080import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; 081import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException; 082import org.apache.hadoop.io.EnumSetWritable; 083import org.apache.hadoop.ipc.RemoteException; 084import org.apache.hadoop.net.NetUtils; 085import org.apache.hadoop.security.token.Token; 086import org.apache.hadoop.util.DataChecksum; 087import org.apache.yetus.audience.InterfaceAudience; 088import org.slf4j.Logger; 089import org.slf4j.LoggerFactory; 090 091import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap; 092import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 093import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator; 094import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream; 095import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator; 096import org.apache.hbase.thirdparty.io.netty.channel.Channel; 097import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture; 098import org.apache.hbase.thirdparty.io.netty.channel.ChannelFutureListener; 099import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler; 100import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; 101import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer; 102import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline; 103import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; 104import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 105import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler; 106import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; 107import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent; 108import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler; 109import org.apache.hbase.thirdparty.io.netty.util.concurrent.Future; 110import org.apache.hbase.thirdparty.io.netty.util.concurrent.FutureListener; 111import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise; 112 113/** 114 * Helper class for implementing {@link FanOutOneBlockAsyncDFSOutput}. 115 */ 116@InterfaceAudience.Private 117public final class FanOutOneBlockAsyncDFSOutputHelper { 118 private static final Logger LOG = 119 LoggerFactory.getLogger(FanOutOneBlockAsyncDFSOutputHelper.class); 120 121 private FanOutOneBlockAsyncDFSOutputHelper() { 122 } 123 124 public static final String ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES = "hbase.fs.async.create.retries"; 125 126 public static final int DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES = 10; 127 // use pooled allocator for performance. 128 private static final ByteBufAllocator ALLOC = PooledByteBufAllocator.DEFAULT; 129 130 // copied from DFSPacket since it is package private. 131 public static final long HEART_BEAT_SEQNO = -1L; 132 133 // Timeouts for communicating with DataNode for streaming writes/reads 134 public static final int READ_TIMEOUT = 60 * 1000; 135 136 private interface LeaseManager { 137 138 void begin(DFSClient client, long inodeId); 139 140 void end(DFSClient client, long inodeId); 141 } 142 143 private static final LeaseManager LEASE_MANAGER; 144 145 // helper class for creating files. 146 private interface FileCreator { 147 default HdfsFileStatus create(ClientProtocol instance, String src, FsPermission masked, 148 String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent, short replication, 149 long blockSize, CryptoProtocolVersion[] supportedVersions) throws Exception { 150 try { 151 return (HdfsFileStatus) createObject(instance, src, masked, clientName, flag, createParent, 152 replication, blockSize, supportedVersions); 153 } catch (InvocationTargetException e) { 154 if (e.getCause() instanceof Exception) { 155 throw (Exception) e.getCause(); 156 } else { 157 throw new RuntimeException(e.getCause()); 158 } 159 } 160 } 161 162 Object createObject(ClientProtocol instance, String src, FsPermission masked, String clientName, 163 EnumSetWritable<CreateFlag> flag, boolean createParent, short replication, long blockSize, 164 CryptoProtocolVersion[] supportedVersions) throws Exception; 165 } 166 167 private static final FileCreator FILE_CREATOR; 168 169 private static LeaseManager createLeaseManager() throws NoSuchMethodException { 170 Method beginFileLeaseMethod = 171 DFSClient.class.getDeclaredMethod("beginFileLease", long.class, DFSOutputStream.class); 172 beginFileLeaseMethod.setAccessible(true); 173 Method endFileLeaseMethod = DFSClient.class.getDeclaredMethod("endFileLease", long.class); 174 endFileLeaseMethod.setAccessible(true); 175 return new LeaseManager() { 176 177 @Override 178 public void begin(DFSClient client, long inodeId) { 179 try { 180 beginFileLeaseMethod.invoke(client, inodeId, null); 181 } catch (IllegalAccessException | InvocationTargetException e) { 182 throw new RuntimeException(e); 183 } 184 } 185 186 @Override 187 public void end(DFSClient client, long inodeId) { 188 try { 189 endFileLeaseMethod.invoke(client, inodeId); 190 } catch (IllegalAccessException | InvocationTargetException e) { 191 throw new RuntimeException(e); 192 } 193 } 194 }; 195 } 196 197 private static FileCreator createFileCreator3_3() throws NoSuchMethodException { 198 Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class, 199 String.class, EnumSetWritable.class, boolean.class, short.class, long.class, 200 CryptoProtocolVersion[].class, String.class, String.class); 201 202 return (instance, src, masked, clientName, flag, createParent, replication, blockSize, 203 supportedVersions) -> { 204 return (HdfsFileStatus) createMethod.invoke(instance, src, masked, clientName, flag, 205 createParent, replication, blockSize, supportedVersions, null, null); 206 }; 207 } 208 209 private static FileCreator createFileCreator3() throws NoSuchMethodException { 210 Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class, 211 String.class, EnumSetWritable.class, boolean.class, short.class, long.class, 212 CryptoProtocolVersion[].class, String.class); 213 214 return (instance, src, masked, clientName, flag, createParent, replication, blockSize, 215 supportedVersions) -> { 216 return (HdfsFileStatus) createMethod.invoke(instance, src, masked, clientName, flag, 217 createParent, replication, blockSize, supportedVersions, null); 218 }; 219 } 220 221 private static FileCreator createFileCreator() throws NoSuchMethodException { 222 try { 223 return createFileCreator3_3(); 224 } catch (NoSuchMethodException e) { 225 LOG.debug("ClientProtocol::create wrong number of arguments, should be hadoop 3.2 or below"); 226 } 227 228 return createFileCreator3(); 229 } 230 231 // cancel the processing if DFSClient is already closed. 232 static final class CancelOnClose implements CancelableProgressable { 233 234 private final DFSClient client; 235 236 public CancelOnClose(DFSClient client) { 237 this.client = client; 238 } 239 240 @Override 241 public boolean progress() { 242 return client.isClientRunning(); 243 } 244 } 245 246 static { 247 try { 248 LEASE_MANAGER = createLeaseManager(); 249 FILE_CREATOR = createFileCreator(); 250 } catch (Exception e) { 251 String msg = "Couldn't properly initialize access to HDFS internals. Please " 252 + "update your WAL Provider to not make use of the 'asyncfs' provider. See " 253 + "HBASE-16110 for more information."; 254 LOG.error(msg, e); 255 throw new Error(msg, e); 256 } 257 } 258 259 static void beginFileLease(DFSClient client, long inodeId) { 260 LEASE_MANAGER.begin(client, inodeId); 261 } 262 263 static void endFileLease(DFSClient client, long inodeId) { 264 LEASE_MANAGER.end(client, inodeId); 265 } 266 267 static DataChecksum createChecksum(DFSClient client) { 268 return client.getConf().createChecksum(null); 269 } 270 271 static Status getStatus(PipelineAckProto ack) { 272 List<Integer> flagList = ack.getFlagList(); 273 Integer headerFlag; 274 if (flagList.isEmpty()) { 275 Status reply = ack.getReply(0); 276 headerFlag = PipelineAck.combineHeader(ECN.DISABLED, reply); 277 } else { 278 headerFlag = flagList.get(0); 279 } 280 return PipelineAck.getStatusFromHeader(headerFlag); 281 } 282 283 private static void processWriteBlockResponse(Channel channel, DatanodeInfo dnInfo, 284 Promise<Channel> promise, int timeoutMs) { 285 channel.pipeline().addLast(new IdleStateHandler(timeoutMs, 0, 0, TimeUnit.MILLISECONDS), 286 new ProtobufVarint32FrameDecoder(), 287 new ProtobufDecoder(BlockOpResponseProto.getDefaultInstance()), 288 new SimpleChannelInboundHandler<BlockOpResponseProto>() { 289 290 @Override 291 protected void channelRead0(ChannelHandlerContext ctx, BlockOpResponseProto resp) 292 throws Exception { 293 Status pipelineStatus = resp.getStatus(); 294 if (PipelineAck.isRestartOOBStatus(pipelineStatus)) { 295 throw new IOException("datanode " + dnInfo + " is restarting"); 296 } 297 String logInfo = "ack with firstBadLink as " + resp.getFirstBadLink(); 298 if (resp.getStatus() != Status.SUCCESS) { 299 if (resp.getStatus() == Status.ERROR_ACCESS_TOKEN) { 300 throw new InvalidBlockTokenException("Got access token error" + ", status message " 301 + resp.getMessage() + ", " + logInfo); 302 } else { 303 throw new IOException("Got error" + ", status=" + resp.getStatus().name() 304 + ", status message " + resp.getMessage() + ", " + logInfo); 305 } 306 } 307 // success 308 ChannelPipeline p = ctx.pipeline(); 309 for (ChannelHandler handler; (handler = p.removeLast()) != null;) { 310 // do not remove all handlers because we may have wrap or unwrap handlers at the header 311 // of pipeline. 312 if (handler instanceof IdleStateHandler) { 313 break; 314 } 315 } 316 // Disable auto read here. Enable it after we setup the streaming pipeline in 317 // FanOutOneBLockAsyncDFSOutput. 318 ctx.channel().config().setAutoRead(false); 319 promise.trySuccess(ctx.channel()); 320 } 321 322 @Override 323 public void channelInactive(ChannelHandlerContext ctx) throws Exception { 324 promise.tryFailure(new IOException("connection to " + dnInfo + " is closed")); 325 } 326 327 @Override 328 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { 329 if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == READER_IDLE) { 330 promise 331 .tryFailure(new IOException("Timeout(" + timeoutMs + "ms) waiting for response")); 332 } else { 333 super.userEventTriggered(ctx, evt); 334 } 335 } 336 337 @Override 338 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 339 promise.tryFailure(cause); 340 } 341 }); 342 } 343 344 private static void requestWriteBlock(Channel channel, StorageType storageType, 345 OpWriteBlockProto.Builder writeBlockProtoBuilder) throws IOException { 346 OpWriteBlockProto proto = 347 writeBlockProtoBuilder.setStorageType(PBHelperClient.convertStorageType(storageType)).build(); 348 int protoLen = proto.getSerializedSize(); 349 ByteBuf buffer = 350 channel.alloc().buffer(3 + CodedOutputStream.computeRawVarint32Size(protoLen) + protoLen); 351 buffer.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION); 352 buffer.writeByte(Op.WRITE_BLOCK.code); 353 proto.writeDelimitedTo(new ByteBufOutputStream(buffer)); 354 channel.writeAndFlush(buffer); 355 } 356 357 private static void initialize(Configuration conf, Channel channel, DatanodeInfo dnInfo, 358 StorageType storageType, OpWriteBlockProto.Builder writeBlockProtoBuilder, int timeoutMs, 359 DFSClient client, Token<BlockTokenIdentifier> accessToken, Promise<Channel> promise) 360 throws IOException { 361 Promise<Void> saslPromise = channel.eventLoop().newPromise(); 362 trySaslNegotiate(conf, channel, dnInfo, timeoutMs, client, accessToken, saslPromise); 363 saslPromise.addListener(new FutureListener<Void>() { 364 365 @Override 366 public void operationComplete(Future<Void> future) throws Exception { 367 if (future.isSuccess()) { 368 // setup response processing pipeline first, then send request. 369 processWriteBlockResponse(channel, dnInfo, promise, timeoutMs); 370 requestWriteBlock(channel, storageType, writeBlockProtoBuilder); 371 } else { 372 promise.tryFailure(future.cause()); 373 } 374 } 375 }); 376 } 377 378 private static List<Future<Channel>> connectToDataNodes(Configuration conf, DFSClient client, 379 String clientName, LocatedBlock locatedBlock, long maxBytesRcvd, long latestGS, 380 BlockConstructionStage stage, DataChecksum summer, EventLoopGroup eventLoopGroup, 381 Class<? extends Channel> channelClass) { 382 StorageType[] storageTypes = locatedBlock.getStorageTypes(); 383 DatanodeInfo[] datanodeInfos = locatedBlock.getLocations(); 384 boolean connectToDnViaHostname = 385 conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT); 386 int timeoutMs = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT); 387 ExtendedBlock blockCopy = new ExtendedBlock(locatedBlock.getBlock()); 388 blockCopy.setNumBytes(locatedBlock.getBlockSize()); 389 ClientOperationHeaderProto header = ClientOperationHeaderProto.newBuilder() 390 .setBaseHeader(BaseHeaderProto.newBuilder().setBlock(PBHelperClient.convert(blockCopy)) 391 .setToken(PBHelperClient.convert(locatedBlock.getBlockToken()))) 392 .setClientName(clientName).build(); 393 ChecksumProto checksumProto = DataTransferProtoUtil.toProto(summer); 394 OpWriteBlockProto.Builder writeBlockProtoBuilder = 395 OpWriteBlockProto.newBuilder().setHeader(header) 396 .setStage(OpWriteBlockProto.BlockConstructionStage.valueOf(stage.name())).setPipelineSize(1) 397 .setMinBytesRcvd(locatedBlock.getBlock().getNumBytes()).setMaxBytesRcvd(maxBytesRcvd) 398 .setLatestGenerationStamp(latestGS).setRequestedChecksum(checksumProto) 399 .setCachingStrategy(CachingStrategyProto.newBuilder().setDropBehind(true).build()); 400 List<Future<Channel>> futureList = new ArrayList<>(datanodeInfos.length); 401 for (int i = 0; i < datanodeInfos.length; i++) { 402 DatanodeInfo dnInfo = datanodeInfos[i]; 403 StorageType storageType = storageTypes[i]; 404 Promise<Channel> promise = eventLoopGroup.next().newPromise(); 405 futureList.add(promise); 406 String dnAddr = dnInfo.getXferAddr(connectToDnViaHostname); 407 new Bootstrap().group(eventLoopGroup).channel(channelClass) 408 .option(CONNECT_TIMEOUT_MILLIS, timeoutMs).handler(new ChannelInitializer<Channel>() { 409 410 @Override 411 protected void initChannel(Channel ch) throws Exception { 412 // we need to get the remote address of the channel so we can only move on after 413 // channel connected. Leave an empty implementation here because netty does not allow 414 // a null handler. 415 } 416 }).connect(NetUtils.createSocketAddr(dnAddr)).addListener(new ChannelFutureListener() { 417 418 @Override 419 public void operationComplete(ChannelFuture future) throws Exception { 420 if (future.isSuccess()) { 421 initialize(conf, future.channel(), dnInfo, storageType, writeBlockProtoBuilder, 422 timeoutMs, client, locatedBlock.getBlockToken(), promise); 423 } else { 424 promise.tryFailure(future.cause()); 425 } 426 } 427 }); 428 } 429 return futureList; 430 } 431 432 /** 433 * Exception other than RemoteException thrown when calling create on namenode 434 */ 435 public static class NameNodeException extends IOException { 436 437 private static final long serialVersionUID = 3143237406477095390L; 438 439 public NameNodeException(Throwable cause) { 440 super(cause); 441 } 442 } 443 444 private static EnumSetWritable<CreateFlag> getCreateFlags(boolean overwrite) { 445 List<CreateFlag> flags = new ArrayList<>(); 446 flags.add(CreateFlag.CREATE); 447 if (overwrite) { 448 flags.add(CreateFlag.OVERWRITE); 449 } 450 flags.add(CreateFlag.SHOULD_REPLICATE); 451 return new EnumSetWritable<>(EnumSet.copyOf(flags)); 452 } 453 454 private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src, 455 boolean overwrite, boolean createParent, short replication, long blockSize, 456 EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass, StreamSlowMonitor monitor) 457 throws IOException { 458 Configuration conf = dfs.getConf(); 459 DFSClient client = dfs.getClient(); 460 String clientName = client.getClientName(); 461 ClientProtocol namenode = client.getNamenode(); 462 int createMaxRetries = 463 conf.getInt(ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES, DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES); 464 ExcludeDatanodeManager excludeDatanodeManager = monitor.getExcludeDatanodeManager(); 465 Set<DatanodeInfo> toExcludeNodes = 466 new HashSet<>(excludeDatanodeManager.getExcludeDNs().keySet()); 467 for (int retry = 0;; retry++) { 468 LOG.debug("When create output stream for {}, exclude list is {}, retry={}", src, 469 toExcludeNodes, retry); 470 HdfsFileStatus stat; 471 try { 472 stat = FILE_CREATOR.create(namenode, src, 473 FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName, 474 getCreateFlags(overwrite), createParent, replication, blockSize, 475 CryptoProtocolVersion.supported()); 476 } catch (Exception e) { 477 if (e instanceof RemoteException) { 478 throw (RemoteException) e; 479 } else { 480 throw new NameNodeException(e); 481 } 482 } 483 beginFileLease(client, stat.getFileId()); 484 boolean succ = false; 485 LocatedBlock locatedBlock = null; 486 List<Future<Channel>> futureList = null; 487 try { 488 DataChecksum summer = createChecksum(client); 489 locatedBlock = namenode.addBlock(src, client.getClientName(), null, 490 toExcludeNodes.toArray(new DatanodeInfo[0]), stat.getFileId(), null, null); 491 Map<Channel, DatanodeInfo> datanodes = new IdentityHashMap<>(); 492 futureList = connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L, 493 PIPELINE_SETUP_CREATE, summer, eventLoopGroup, channelClass); 494 for (int i = 0, n = futureList.size(); i < n; i++) { 495 DatanodeInfo datanodeInfo = locatedBlock.getLocations()[i]; 496 try { 497 datanodes.put(futureList.get(i).syncUninterruptibly().getNow(), datanodeInfo); 498 } catch (Exception e) { 499 // exclude the broken DN next time 500 toExcludeNodes.add(datanodeInfo); 501 excludeDatanodeManager.tryAddExcludeDN(datanodeInfo, "connect error"); 502 throw e; 503 } 504 } 505 Encryptor encryptor = createEncryptor(conf, stat, client); 506 FanOutOneBlockAsyncDFSOutput output = 507 new FanOutOneBlockAsyncDFSOutput(conf, dfs, client, namenode, clientName, src, 508 stat.getFileId(), locatedBlock, encryptor, datanodes, summer, ALLOC, monitor); 509 succ = true; 510 return output; 511 } catch (RemoteException e) { 512 LOG.warn("create fan-out dfs output {} failed, retry = {}", src, retry, e); 513 if (shouldRetryCreate(e)) { 514 if (retry >= createMaxRetries) { 515 throw e.unwrapRemoteException(); 516 } 517 } else { 518 throw e.unwrapRemoteException(); 519 } 520 } catch (IOException e) { 521 LOG.warn("create fan-out dfs output {} failed, retry = {}", src, retry, e); 522 if (retry >= createMaxRetries) { 523 throw e; 524 } 525 // overwrite the old broken file. 526 overwrite = true; 527 try { 528 Thread.sleep(ConnectionUtils.getPauseTime(100, retry)); 529 } catch (InterruptedException ie) { 530 throw new InterruptedIOException(); 531 } 532 } finally { 533 if (!succ) { 534 if (futureList != null) { 535 for (Future<Channel> f : futureList) { 536 f.addListener(new FutureListener<Channel>() { 537 538 @Override 539 public void operationComplete(Future<Channel> future) throws Exception { 540 if (future.isSuccess()) { 541 future.getNow().close(); 542 } 543 } 544 }); 545 } 546 } 547 endFileLease(client, stat.getFileId()); 548 } 549 } 550 } 551 } 552 553 /** 554 * Create a {@link FanOutOneBlockAsyncDFSOutput}. The method maybe blocked so do not call it 555 * inside an {@link EventLoop}. 556 */ 557 public static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, Path f, 558 boolean overwrite, boolean createParent, short replication, long blockSize, 559 EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass, 560 final StreamSlowMonitor monitor) throws IOException { 561 return new FileSystemLinkResolver<FanOutOneBlockAsyncDFSOutput>() { 562 563 @Override 564 public FanOutOneBlockAsyncDFSOutput doCall(Path p) 565 throws IOException, UnresolvedLinkException { 566 return createOutput(dfs, p.toUri().getPath(), overwrite, createParent, replication, 567 blockSize, eventLoopGroup, channelClass, monitor); 568 } 569 570 @Override 571 public FanOutOneBlockAsyncDFSOutput next(FileSystem fs, Path p) throws IOException { 572 throw new UnsupportedOperationException(); 573 } 574 }.resolve(dfs, f); 575 } 576 577 public static boolean shouldRetryCreate(RemoteException e) { 578 // RetryStartFileException is introduced in HDFS 2.6+, so here we can only use the class name. 579 // For exceptions other than this, we just throw it out. This is same with 580 // DFSOutputStream.newStreamForCreate. 581 return e.getClassName().endsWith("RetryStartFileException"); 582 } 583 584 static void completeFile(DFSClient client, ClientProtocol namenode, String src, String clientName, 585 ExtendedBlock block, long fileId) { 586 for (int retry = 0;; retry++) { 587 try { 588 if (namenode.complete(src, clientName, block, fileId)) { 589 endFileLease(client, fileId); 590 return; 591 } else { 592 LOG.warn("complete file " + src + " not finished, retry = " + retry); 593 } 594 } catch (RemoteException e) { 595 IOException ioe = e.unwrapRemoteException(); 596 if (ioe instanceof LeaseExpiredException) { 597 LOG.warn("lease for file " + src + " is expired, give up", e); 598 return; 599 } else { 600 LOG.warn("complete file " + src + " failed, retry = " + retry, e); 601 } 602 } catch (Exception e) { 603 LOG.warn("complete file " + src + " failed, retry = " + retry, e); 604 } 605 sleepIgnoreInterrupt(retry); 606 } 607 } 608 609 static void sleepIgnoreInterrupt(int retry) { 610 try { 611 Thread.sleep(ConnectionUtils.getPauseTime(100, retry)); 612 } catch (InterruptedException e) { 613 } 614 } 615}