001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.asyncfs; 019 020import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.createEncryptor; 021import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate; 022import static org.apache.hadoop.hbase.util.NettyFutureUtils.addListener; 023import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeClose; 024import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeWriteAndFlush; 025import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; 026import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME; 027import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT; 028import static org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage.PIPELINE_SETUP_CREATE; 029import static org.apache.hbase.thirdparty.io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS; 030import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE; 031 032import com.google.protobuf.CodedOutputStream; 033import java.io.IOException; 034import java.io.InterruptedIOException; 035import java.lang.reflect.InvocationTargetException; 036import java.lang.reflect.Method; 037import java.util.ArrayList; 038import java.util.EnumSet; 039import java.util.HashSet; 040import java.util.IdentityHashMap; 041import java.util.List; 042import java.util.Map; 043import java.util.Set; 044import java.util.concurrent.TimeUnit; 045import org.apache.hadoop.conf.Configuration; 046import org.apache.hadoop.crypto.CryptoProtocolVersion; 047import org.apache.hadoop.crypto.Encryptor; 048import org.apache.hadoop.fs.CreateFlag; 049import org.apache.hadoop.fs.FileSystem; 050import org.apache.hadoop.fs.FileSystemLinkResolver; 051import org.apache.hadoop.fs.Path; 052import org.apache.hadoop.fs.StorageType; 053import org.apache.hadoop.fs.UnresolvedLinkException; 054import org.apache.hadoop.fs.permission.FsPermission; 055import org.apache.hadoop.hbase.client.ConnectionUtils; 056import org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager; 057import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; 058import org.apache.hadoop.hbase.util.CancelableProgressable; 059import org.apache.hadoop.hdfs.DFSClient; 060import org.apache.hadoop.hdfs.DFSOutputStream; 061import org.apache.hadoop.hdfs.DistributedFileSystem; 062import org.apache.hadoop.hdfs.protocol.ClientProtocol; 063import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 064import org.apache.hadoop.hdfs.protocol.ExtendedBlock; 065import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; 066import org.apache.hadoop.hdfs.protocol.LocatedBlock; 067import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; 068import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; 069import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; 070import org.apache.hadoop.hdfs.protocol.datatransfer.Op; 071import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; 072import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.ECN; 073import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto; 074import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; 075import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto; 076import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto; 077import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto; 078import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto; 079import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto; 080import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; 081import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; 082import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; 083import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; 084import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException; 085import org.apache.hadoop.io.EnumSetWritable; 086import org.apache.hadoop.ipc.RemoteException; 087import org.apache.hadoop.net.NetUtils; 088import org.apache.hadoop.security.token.Token; 089import org.apache.hadoop.util.DataChecksum; 090import org.apache.yetus.audience.InterfaceAudience; 091import org.slf4j.Logger; 092import org.slf4j.LoggerFactory; 093 094import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap; 095import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 096import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator; 097import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream; 098import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator; 099import org.apache.hbase.thirdparty.io.netty.channel.Channel; 100import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture; 101import org.apache.hbase.thirdparty.io.netty.channel.ChannelFutureListener; 102import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler; 103import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; 104import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer; 105import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline; 106import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; 107import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 108import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler; 109import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; 110import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent; 111import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler; 112import org.apache.hbase.thirdparty.io.netty.util.concurrent.Future; 113import org.apache.hbase.thirdparty.io.netty.util.concurrent.FutureListener; 114import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise; 115 116/** 117 * Helper class for implementing {@link FanOutOneBlockAsyncDFSOutput}. 118 */ 119@InterfaceAudience.Private 120public final class FanOutOneBlockAsyncDFSOutputHelper { 121 private static final Logger LOG = 122 LoggerFactory.getLogger(FanOutOneBlockAsyncDFSOutputHelper.class); 123 124 private FanOutOneBlockAsyncDFSOutputHelper() { 125 } 126 127 public static final String ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES = "hbase.fs.async.create.retries"; 128 129 public static final int DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES = 10; 130 // use pooled allocator for performance. 131 private static final ByteBufAllocator ALLOC = PooledByteBufAllocator.DEFAULT; 132 133 // copied from DFSPacket since it is package private. 134 public static final long HEART_BEAT_SEQNO = -1L; 135 136 // Timeouts for communicating with DataNode for streaming writes/reads 137 public static final int READ_TIMEOUT = 60 * 1000; 138 139 private interface LeaseManager { 140 141 void begin(DFSClient client, long inodeId); 142 143 void end(DFSClient client, long inodeId); 144 } 145 146 private static final LeaseManager LEASE_MANAGER; 147 148 // This is used to terminate a recoverFileLease call when FileSystem is already closed. 149 // isClientRunning is not public so we need to use reflection. 150 private interface DFSClientAdaptor { 151 152 boolean isClientRunning(DFSClient client); 153 } 154 155 private static final DFSClientAdaptor DFS_CLIENT_ADAPTOR; 156 157 // helper class for creating files. 158 private interface FileCreator { 159 default HdfsFileStatus create(ClientProtocol instance, String src, FsPermission masked, 160 String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent, short replication, 161 long blockSize, CryptoProtocolVersion[] supportedVersions) throws Exception { 162 try { 163 return (HdfsFileStatus) createObject(instance, src, masked, clientName, flag, createParent, 164 replication, blockSize, supportedVersions); 165 } catch (InvocationTargetException e) { 166 if (e.getCause() instanceof Exception) { 167 throw (Exception) e.getCause(); 168 } else { 169 throw new RuntimeException(e.getCause()); 170 } 171 } 172 }; 173 174 Object createObject(ClientProtocol instance, String src, FsPermission masked, String clientName, 175 EnumSetWritable<CreateFlag> flag, boolean createParent, short replication, long blockSize, 176 CryptoProtocolVersion[] supportedVersions) throws Exception; 177 } 178 179 private static final FileCreator FILE_CREATOR; 180 181 // CreateFlag.SHOULD_REPLICATE is to make OutputStream on a EC directory support hflush/hsync, but 182 // EC is introduced in hadoop 3.x so we do not have this enum on 2.x, that's why we need to 183 // indirectly reference it through reflection. 184 private static final CreateFlag SHOULD_REPLICATE_FLAG; 185 186 private static DFSClientAdaptor createDFSClientAdaptor() throws NoSuchMethodException { 187 Method isClientRunningMethod = DFSClient.class.getDeclaredMethod("isClientRunning"); 188 isClientRunningMethod.setAccessible(true); 189 return new DFSClientAdaptor() { 190 191 @Override 192 public boolean isClientRunning(DFSClient client) { 193 try { 194 return (Boolean) isClientRunningMethod.invoke(client); 195 } catch (IllegalAccessException | InvocationTargetException e) { 196 throw new RuntimeException(e); 197 } 198 } 199 }; 200 } 201 202 private static LeaseManager createLeaseManager() throws NoSuchMethodException { 203 Method beginFileLeaseMethod = 204 DFSClient.class.getDeclaredMethod("beginFileLease", long.class, DFSOutputStream.class); 205 beginFileLeaseMethod.setAccessible(true); 206 Method endFileLeaseMethod = DFSClient.class.getDeclaredMethod("endFileLease", long.class); 207 endFileLeaseMethod.setAccessible(true); 208 return new LeaseManager() { 209 210 @Override 211 public void begin(DFSClient client, long inodeId) { 212 try { 213 beginFileLeaseMethod.invoke(client, inodeId, null); 214 } catch (IllegalAccessException | InvocationTargetException e) { 215 throw new RuntimeException(e); 216 } 217 } 218 219 @Override 220 public void end(DFSClient client, long inodeId) { 221 try { 222 endFileLeaseMethod.invoke(client, inodeId); 223 } catch (IllegalAccessException | InvocationTargetException e) { 224 throw new RuntimeException(e); 225 } 226 } 227 }; 228 } 229 230 private static FileCreator createFileCreator3_3() throws NoSuchMethodException { 231 Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class, 232 String.class, EnumSetWritable.class, boolean.class, short.class, long.class, 233 CryptoProtocolVersion[].class, String.class, String.class); 234 235 return (instance, src, masked, clientName, flag, createParent, replication, blockSize, 236 supportedVersions) -> { 237 return (HdfsFileStatus) createMethod.invoke(instance, src, masked, clientName, flag, 238 createParent, replication, blockSize, supportedVersions, null, null); 239 }; 240 } 241 242 private static FileCreator createFileCreator3() throws NoSuchMethodException { 243 Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class, 244 String.class, EnumSetWritable.class, boolean.class, short.class, long.class, 245 CryptoProtocolVersion[].class, String.class); 246 247 return (instance, src, masked, clientName, flag, createParent, replication, blockSize, 248 supportedVersions) -> { 249 return (HdfsFileStatus) createMethod.invoke(instance, src, masked, clientName, flag, 250 createParent, replication, blockSize, supportedVersions, null); 251 }; 252 } 253 254 private static FileCreator createFileCreator2() throws NoSuchMethodException { 255 Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class, 256 String.class, EnumSetWritable.class, boolean.class, short.class, long.class, 257 CryptoProtocolVersion[].class); 258 259 return (instance, src, masked, clientName, flag, createParent, replication, blockSize, 260 supportedVersions) -> { 261 return (HdfsFileStatus) createMethod.invoke(instance, src, masked, clientName, flag, 262 createParent, replication, blockSize, supportedVersions); 263 }; 264 } 265 266 private static FileCreator createFileCreator() throws NoSuchMethodException { 267 try { 268 return createFileCreator3_3(); 269 } catch (NoSuchMethodException e) { 270 LOG.debug("ClientProtocol::create wrong number of arguments, should be hadoop 3.2 or below"); 271 } 272 273 try { 274 return createFileCreator3(); 275 } catch (NoSuchMethodException e) { 276 LOG.debug("ClientProtocol::create wrong number of arguments, should be hadoop 2.x"); 277 } 278 return createFileCreator2(); 279 } 280 281 private static CreateFlag loadShouldReplicateFlag() { 282 try { 283 return CreateFlag.valueOf("SHOULD_REPLICATE"); 284 } catch (IllegalArgumentException e) { 285 LOG.debug("can not find SHOULD_REPLICATE flag, should be hadoop 2.x", e); 286 return null; 287 } 288 } 289 290 // cancel the processing if DFSClient is already closed. 291 static final class CancelOnClose implements CancelableProgressable { 292 293 private final DFSClient client; 294 295 public CancelOnClose(DFSClient client) { 296 this.client = client; 297 } 298 299 @Override 300 public boolean progress() { 301 return DFS_CLIENT_ADAPTOR.isClientRunning(client); 302 } 303 } 304 305 static { 306 try { 307 LEASE_MANAGER = createLeaseManager(); 308 DFS_CLIENT_ADAPTOR = createDFSClientAdaptor(); 309 FILE_CREATOR = createFileCreator(); 310 SHOULD_REPLICATE_FLAG = loadShouldReplicateFlag(); 311 } catch (Exception e) { 312 String msg = "Couldn't properly initialize access to HDFS internals. Please " 313 + "update your WAL Provider to not make use of the 'asyncfs' provider. See " 314 + "HBASE-16110 for more information."; 315 LOG.error(msg, e); 316 throw new Error(msg, e); 317 } 318 } 319 320 static void beginFileLease(DFSClient client, long inodeId) { 321 LEASE_MANAGER.begin(client, inodeId); 322 } 323 324 static void endFileLease(DFSClient client, long inodeId) { 325 LEASE_MANAGER.end(client, inodeId); 326 } 327 328 static DataChecksum createChecksum(DFSClient client) { 329 return client.getConf().createChecksum(null); 330 } 331 332 static Status getStatus(PipelineAckProto ack) { 333 List<Integer> flagList = ack.getFlagList(); 334 Integer headerFlag; 335 if (flagList.isEmpty()) { 336 Status reply = ack.getReply(0); 337 headerFlag = PipelineAck.combineHeader(ECN.DISABLED, reply); 338 } else { 339 headerFlag = flagList.get(0); 340 } 341 return PipelineAck.getStatusFromHeader(headerFlag); 342 } 343 344 private static void processWriteBlockResponse(Channel channel, DatanodeInfo dnInfo, 345 Promise<Channel> promise, int timeoutMs) { 346 channel.pipeline().addLast(new IdleStateHandler(timeoutMs, 0, 0, TimeUnit.MILLISECONDS), 347 new ProtobufVarint32FrameDecoder(), 348 new ProtobufDecoder(BlockOpResponseProto.getDefaultInstance()), 349 new SimpleChannelInboundHandler<BlockOpResponseProto>() { 350 351 @Override 352 protected void channelRead0(ChannelHandlerContext ctx, BlockOpResponseProto resp) 353 throws Exception { 354 Status pipelineStatus = resp.getStatus(); 355 if (PipelineAck.isRestartOOBStatus(pipelineStatus)) { 356 throw new IOException("datanode " + dnInfo + " is restarting"); 357 } 358 String logInfo = "ack with firstBadLink as " + resp.getFirstBadLink(); 359 if (resp.getStatus() != Status.SUCCESS) { 360 if (resp.getStatus() == Status.ERROR_ACCESS_TOKEN) { 361 throw new InvalidBlockTokenException("Got access token error" + ", status message " 362 + resp.getMessage() + ", " + logInfo); 363 } else { 364 throw new IOException("Got error" + ", status=" + resp.getStatus().name() 365 + ", status message " + resp.getMessage() + ", " + logInfo); 366 } 367 } 368 // success 369 ChannelPipeline p = ctx.pipeline(); 370 for (ChannelHandler handler; (handler = p.removeLast()) != null;) { 371 // do not remove all handlers because we may have wrap or unwrap handlers at the header 372 // of pipeline. 373 if (handler instanceof IdleStateHandler) { 374 break; 375 } 376 } 377 // Disable auto read here. Enable it after we setup the streaming pipeline in 378 // FanOutOneBLockAsyncDFSOutput. 379 ctx.channel().config().setAutoRead(false); 380 promise.trySuccess(ctx.channel()); 381 } 382 383 @Override 384 public void channelInactive(ChannelHandlerContext ctx) throws Exception { 385 promise.tryFailure(new IOException("connection to " + dnInfo + " is closed")); 386 } 387 388 @Override 389 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { 390 if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == READER_IDLE) { 391 promise 392 .tryFailure(new IOException("Timeout(" + timeoutMs + "ms) waiting for response")); 393 } else { 394 super.userEventTriggered(ctx, evt); 395 } 396 } 397 398 @Override 399 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 400 promise.tryFailure(cause); 401 } 402 }); 403 } 404 405 private static void requestWriteBlock(Channel channel, StorageType storageType, 406 OpWriteBlockProto.Builder writeBlockProtoBuilder) throws IOException { 407 OpWriteBlockProto proto = 408 writeBlockProtoBuilder.setStorageType(PBHelperClient.convertStorageType(storageType)).build(); 409 int protoLen = proto.getSerializedSize(); 410 ByteBuf buffer = 411 channel.alloc().buffer(3 + CodedOutputStream.computeRawVarint32Size(protoLen) + protoLen); 412 buffer.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION); 413 buffer.writeByte(Op.WRITE_BLOCK.code); 414 proto.writeDelimitedTo(new ByteBufOutputStream(buffer)); 415 safeWriteAndFlush(channel, buffer); 416 } 417 418 private static void initialize(Configuration conf, Channel channel, DatanodeInfo dnInfo, 419 StorageType storageType, OpWriteBlockProto.Builder writeBlockProtoBuilder, int timeoutMs, 420 DFSClient client, Token<BlockTokenIdentifier> accessToken, Promise<Channel> promise) 421 throws IOException { 422 Promise<Void> saslPromise = channel.eventLoop().newPromise(); 423 trySaslNegotiate(conf, channel, dnInfo, timeoutMs, client, accessToken, saslPromise); 424 addListener(saslPromise, new FutureListener<Void>() { 425 426 @Override 427 public void operationComplete(Future<Void> future) throws Exception { 428 if (future.isSuccess()) { 429 // setup response processing pipeline first, then send request. 430 processWriteBlockResponse(channel, dnInfo, promise, timeoutMs); 431 requestWriteBlock(channel, storageType, writeBlockProtoBuilder); 432 } else { 433 promise.tryFailure(future.cause()); 434 } 435 } 436 }); 437 } 438 439 private static List<Future<Channel>> connectToDataNodes(Configuration conf, DFSClient client, 440 String clientName, LocatedBlock locatedBlock, long maxBytesRcvd, long latestGS, 441 BlockConstructionStage stage, DataChecksum summer, EventLoopGroup eventLoopGroup, 442 Class<? extends Channel> channelClass) { 443 StorageType[] storageTypes = locatedBlock.getStorageTypes(); 444 DatanodeInfo[] datanodeInfos = locatedBlock.getLocations(); 445 boolean connectToDnViaHostname = 446 conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT); 447 int timeoutMs = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT); 448 ExtendedBlock blockCopy = new ExtendedBlock(locatedBlock.getBlock()); 449 blockCopy.setNumBytes(locatedBlock.getBlockSize()); 450 ClientOperationHeaderProto header = ClientOperationHeaderProto.newBuilder() 451 .setBaseHeader(BaseHeaderProto.newBuilder().setBlock(PBHelperClient.convert(blockCopy)) 452 .setToken(PBHelperClient.convert(locatedBlock.getBlockToken()))) 453 .setClientName(clientName).build(); 454 ChecksumProto checksumProto = DataTransferProtoUtil.toProto(summer); 455 OpWriteBlockProto.Builder writeBlockProtoBuilder = 456 OpWriteBlockProto.newBuilder().setHeader(header) 457 .setStage(OpWriteBlockProto.BlockConstructionStage.valueOf(stage.name())).setPipelineSize(1) 458 .setMinBytesRcvd(locatedBlock.getBlock().getNumBytes()).setMaxBytesRcvd(maxBytesRcvd) 459 .setLatestGenerationStamp(latestGS).setRequestedChecksum(checksumProto) 460 .setCachingStrategy(CachingStrategyProto.newBuilder().setDropBehind(true).build()); 461 List<Future<Channel>> futureList = new ArrayList<>(datanodeInfos.length); 462 for (int i = 0; i < datanodeInfos.length; i++) { 463 DatanodeInfo dnInfo = datanodeInfos[i]; 464 StorageType storageType = storageTypes[i]; 465 Promise<Channel> promise = eventLoopGroup.next().newPromise(); 466 futureList.add(promise); 467 String dnAddr = dnInfo.getXferAddr(connectToDnViaHostname); 468 addListener(new Bootstrap().group(eventLoopGroup).channel(channelClass) 469 .option(CONNECT_TIMEOUT_MILLIS, timeoutMs).handler(new ChannelInitializer<Channel>() { 470 471 @Override 472 protected void initChannel(Channel ch) throws Exception { 473 // we need to get the remote address of the channel so we can only move on after 474 // channel connected. Leave an empty implementation here because netty does not allow 475 // a null handler. 476 } 477 }).connect(NetUtils.createSocketAddr(dnAddr)), new ChannelFutureListener() { 478 479 @Override 480 public void operationComplete(ChannelFuture future) throws Exception { 481 if (future.isSuccess()) { 482 initialize(conf, future.channel(), dnInfo, storageType, writeBlockProtoBuilder, 483 timeoutMs, client, locatedBlock.getBlockToken(), promise); 484 } else { 485 promise.tryFailure(future.cause()); 486 } 487 } 488 }); 489 } 490 return futureList; 491 } 492 493 /** 494 * Exception other than RemoteException thrown when calling create on namenode 495 */ 496 public static class NameNodeException extends IOException { 497 498 private static final long serialVersionUID = 3143237406477095390L; 499 500 public NameNodeException(Throwable cause) { 501 super(cause); 502 } 503 } 504 505 private static EnumSetWritable<CreateFlag> getCreateFlags(boolean overwrite) { 506 List<CreateFlag> flags = new ArrayList<>(); 507 flags.add(CreateFlag.CREATE); 508 if (overwrite) { 509 flags.add(CreateFlag.OVERWRITE); 510 } 511 if (SHOULD_REPLICATE_FLAG != null) { 512 flags.add(SHOULD_REPLICATE_FLAG); 513 } 514 return new EnumSetWritable<>(EnumSet.copyOf(flags)); 515 } 516 517 private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src, 518 boolean overwrite, boolean createParent, short replication, long blockSize, 519 EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass, StreamSlowMonitor monitor) 520 throws IOException { 521 Configuration conf = dfs.getConf(); 522 DFSClient client = dfs.getClient(); 523 String clientName = client.getClientName(); 524 ClientProtocol namenode = client.getNamenode(); 525 int createMaxRetries = 526 conf.getInt(ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES, DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES); 527 ExcludeDatanodeManager excludeDatanodeManager = monitor.getExcludeDatanodeManager(); 528 Set<DatanodeInfo> toExcludeNodes = 529 new HashSet<>(excludeDatanodeManager.getExcludeDNs().keySet()); 530 for (int retry = 0;; retry++) { 531 LOG.debug("When create output stream for {}, exclude list is {}, retry={}", src, 532 toExcludeNodes, retry); 533 HdfsFileStatus stat; 534 try { 535 stat = FILE_CREATOR.create(namenode, src, 536 FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName, 537 getCreateFlags(overwrite), createParent, replication, blockSize, 538 CryptoProtocolVersion.supported()); 539 } catch (Exception e) { 540 if (e instanceof RemoteException) { 541 throw (RemoteException) e; 542 } else { 543 throw new NameNodeException(e); 544 } 545 } 546 beginFileLease(client, stat.getFileId()); 547 boolean succ = false; 548 LocatedBlock locatedBlock = null; 549 List<Future<Channel>> futureList = null; 550 try { 551 DataChecksum summer = createChecksum(client); 552 locatedBlock = namenode.addBlock(src, client.getClientName(), null, 553 toExcludeNodes.toArray(new DatanodeInfo[0]), stat.getFileId(), null, null); 554 Map<Channel, DatanodeInfo> datanodes = new IdentityHashMap<>(); 555 futureList = connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L, 556 PIPELINE_SETUP_CREATE, summer, eventLoopGroup, channelClass); 557 for (int i = 0, n = futureList.size(); i < n; i++) { 558 DatanodeInfo datanodeInfo = locatedBlock.getLocations()[i]; 559 try { 560 datanodes.put(futureList.get(i).syncUninterruptibly().getNow(), datanodeInfo); 561 } catch (Exception e) { 562 // exclude the broken DN next time 563 toExcludeNodes.add(datanodeInfo); 564 excludeDatanodeManager.tryAddExcludeDN(datanodeInfo, "connect error"); 565 throw e; 566 } 567 } 568 Encryptor encryptor = createEncryptor(conf, stat, client); 569 FanOutOneBlockAsyncDFSOutput output = 570 new FanOutOneBlockAsyncDFSOutput(conf, dfs, client, namenode, clientName, src, 571 stat.getFileId(), locatedBlock, encryptor, datanodes, summer, ALLOC, monitor); 572 succ = true; 573 return output; 574 } catch (RemoteException e) { 575 LOG.warn("create fan-out dfs output {} failed, retry = {}", src, retry, e); 576 if (shouldRetryCreate(e)) { 577 if (retry >= createMaxRetries) { 578 throw e.unwrapRemoteException(); 579 } 580 } else { 581 throw e.unwrapRemoteException(); 582 } 583 } catch (IOException e) { 584 LOG.warn("create fan-out dfs output {} failed, retry = {}", src, retry, e); 585 if (retry >= createMaxRetries) { 586 throw e; 587 } 588 // overwrite the old broken file. 589 overwrite = true; 590 try { 591 Thread.sleep(ConnectionUtils.getPauseTime(100, retry)); 592 } catch (InterruptedException ie) { 593 throw new InterruptedIOException(); 594 } 595 } finally { 596 if (!succ) { 597 if (futureList != null) { 598 for (Future<Channel> f : futureList) { 599 addListener(f, new FutureListener<Channel>() { 600 601 @Override 602 public void operationComplete(Future<Channel> future) throws Exception { 603 if (future.isSuccess()) { 604 safeClose(future.getNow()); 605 } 606 } 607 }); 608 } 609 } 610 endFileLease(client, stat.getFileId()); 611 } 612 } 613 } 614 } 615 616 /** 617 * Create a {@link FanOutOneBlockAsyncDFSOutput}. The method maybe blocked so do not call it 618 * inside an {@link EventLoop}. 619 */ 620 public static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, Path f, 621 boolean overwrite, boolean createParent, short replication, long blockSize, 622 EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass, 623 final StreamSlowMonitor monitor) throws IOException { 624 return new FileSystemLinkResolver<FanOutOneBlockAsyncDFSOutput>() { 625 626 @Override 627 public FanOutOneBlockAsyncDFSOutput doCall(Path p) 628 throws IOException, UnresolvedLinkException { 629 return createOutput(dfs, p.toUri().getPath(), overwrite, createParent, replication, 630 blockSize, eventLoopGroup, channelClass, monitor); 631 } 632 633 @Override 634 public FanOutOneBlockAsyncDFSOutput next(FileSystem fs, Path p) throws IOException { 635 throw new UnsupportedOperationException(); 636 } 637 }.resolve(dfs, f); 638 } 639 640 public static boolean shouldRetryCreate(RemoteException e) { 641 // RetryStartFileException is introduced in HDFS 2.6+, so here we can only use the class name. 642 // For exceptions other than this, we just throw it out. This is same with 643 // DFSOutputStream.newStreamForCreate. 644 return e.getClassName().endsWith("RetryStartFileException"); 645 } 646 647 static void completeFile(DFSClient client, ClientProtocol namenode, String src, String clientName, 648 ExtendedBlock block, long fileId) { 649 for (int retry = 0;; retry++) { 650 try { 651 if (namenode.complete(src, clientName, block, fileId)) { 652 endFileLease(client, fileId); 653 return; 654 } else { 655 LOG.warn("complete file " + src + " not finished, retry = " + retry); 656 } 657 } catch (RemoteException e) { 658 IOException ioe = e.unwrapRemoteException(); 659 if (ioe instanceof LeaseExpiredException) { 660 LOG.warn("lease for file " + src + " is expired, give up", e); 661 return; 662 } else { 663 LOG.warn("complete file " + src + " failed, retry = " + retry, e); 664 } 665 } catch (Exception e) { 666 LOG.warn("complete file " + src + " failed, retry = " + retry, e); 667 } 668 sleepIgnoreInterrupt(retry); 669 } 670 } 671 672 static void sleepIgnoreInterrupt(int retry) { 673 try { 674 Thread.sleep(ConnectionUtils.getPauseTime(100, retry)); 675 } catch (InterruptedException e) { 676 } 677 } 678}