001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.asyncfs; 019 020import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.createEncryptor; 021import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate; 022import static org.apache.hadoop.hbase.util.LocatedBlockHelper.getLocatedBlockLocations; 023import static org.apache.hadoop.hbase.util.NettyFutureUtils.addListener; 024import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeClose; 025import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeWriteAndFlush; 026import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; 027import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME; 028import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT; 029import static org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage.PIPELINE_SETUP_CREATE; 030import static org.apache.hbase.thirdparty.io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS; 031import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE; 032 033import java.io.IOException; 034import java.io.InterruptedIOException; 035import java.lang.reflect.InvocationTargetException; 036import java.lang.reflect.Method; 037import java.util.ArrayList; 038import java.util.Collection; 039import java.util.EnumSet; 040import java.util.HashSet; 041import java.util.IdentityHashMap; 042import java.util.List; 043import java.util.Map; 044import java.util.Set; 045import java.util.concurrent.TimeUnit; 046import java.util.stream.Collectors; 047import org.apache.hadoop.conf.Configuration; 048import org.apache.hadoop.crypto.CryptoProtocolVersion; 049import org.apache.hadoop.crypto.Encryptor; 050import org.apache.hadoop.fs.CreateFlag; 051import org.apache.hadoop.fs.FileSystem; 052import org.apache.hadoop.fs.FileSystemLinkResolver; 053import org.apache.hadoop.fs.Path; 054import org.apache.hadoop.fs.StorageType; 055import org.apache.hadoop.fs.UnresolvedLinkException; 056import org.apache.hadoop.fs.permission.FsPermission; 057import org.apache.hadoop.hbase.client.ConnectionUtils; 058import org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager; 059import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; 060import org.apache.hadoop.hbase.util.CancelableProgressable; 061import org.apache.hadoop.hdfs.DFSClient; 062import org.apache.hadoop.hdfs.DFSOutputStream; 063import org.apache.hadoop.hdfs.DistributedFileSystem; 064import org.apache.hadoop.hdfs.protocol.ClientProtocol; 065import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 066import org.apache.hadoop.hdfs.protocol.ExtendedBlock; 067import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; 068import org.apache.hadoop.hdfs.protocol.LocatedBlock; 069import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; 070import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; 071import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; 072import org.apache.hadoop.hdfs.protocol.datatransfer.Op; 073import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; 074import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.ECN; 075import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto; 076import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; 077import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto; 078import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto; 079import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto; 080import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto; 081import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto; 082import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; 083import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; 084import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; 085import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; 086import org.apache.hadoop.io.EnumSetWritable; 087import org.apache.hadoop.ipc.RemoteException; 088import org.apache.hadoop.net.NetUtils; 089import org.apache.hadoop.security.token.Token; 090import org.apache.hadoop.util.DataChecksum; 091import org.apache.yetus.audience.InterfaceAudience; 092import org.slf4j.Logger; 093import org.slf4j.LoggerFactory; 094 095import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream; 096import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap; 097import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 098import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator; 099import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream; 100import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator; 101import org.apache.hbase.thirdparty.io.netty.channel.Channel; 102import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture; 103import org.apache.hbase.thirdparty.io.netty.channel.ChannelFutureListener; 104import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler; 105import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; 106import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer; 107import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline; 108import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; 109import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 110import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler; 111import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; 112import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent; 113import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler; 114import org.apache.hbase.thirdparty.io.netty.util.concurrent.Future; 115import org.apache.hbase.thirdparty.io.netty.util.concurrent.FutureListener; 116import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise; 117 118/** 119 * Helper class for implementing {@link FanOutOneBlockAsyncDFSOutput}. 120 */ 121@InterfaceAudience.Private 122public final class FanOutOneBlockAsyncDFSOutputHelper { 123 private static final Logger LOG = 124 LoggerFactory.getLogger(FanOutOneBlockAsyncDFSOutputHelper.class); 125 126 private FanOutOneBlockAsyncDFSOutputHelper() { 127 } 128 129 public static final String ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES = "hbase.fs.async.create.retries"; 130 131 public static final int DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES = 10; 132 // use pooled allocator for performance. 133 private static final ByteBufAllocator ALLOC = PooledByteBufAllocator.DEFAULT; 134 135 // copied from DFSPacket since it is package private. 136 public static final long HEART_BEAT_SEQNO = -1L; 137 138 // Timeouts for communicating with DataNode for streaming writes/reads 139 public static final int READ_TIMEOUT = 60 * 1000; 140 141 private interface LeaseManager { 142 143 void begin(FanOutOneBlockAsyncDFSOutput output); 144 145 void end(FanOutOneBlockAsyncDFSOutput output); 146 } 147 148 private static final LeaseManager LEASE_MANAGER; 149 150 // helper class for creating files. 151 private interface FileCreator { 152 default HdfsFileStatus create(ClientProtocol instance, String src, FsPermission masked, 153 String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent, short replication, 154 long blockSize, CryptoProtocolVersion[] supportedVersions) throws Exception { 155 try { 156 return (HdfsFileStatus) createObject(instance, src, masked, clientName, flag, createParent, 157 replication, blockSize, supportedVersions); 158 } catch (InvocationTargetException e) { 159 if (e.getCause() instanceof Exception) { 160 throw (Exception) e.getCause(); 161 } else { 162 throw new RuntimeException(e.getCause()); 163 } 164 } 165 } 166 167 Object createObject(ClientProtocol instance, String src, FsPermission masked, String clientName, 168 EnumSetWritable<CreateFlag> flag, boolean createParent, short replication, long blockSize, 169 CryptoProtocolVersion[] supportedVersions) throws Exception; 170 } 171 172 private static final FileCreator FILE_CREATOR; 173 174 private static LeaseManager createLeaseManager3_4() throws NoSuchMethodException { 175 Method beginFileLeaseMethod = 176 DFSClient.class.getDeclaredMethod("beginFileLease", String.class, DFSOutputStream.class); 177 beginFileLeaseMethod.setAccessible(true); 178 Method endFileLeaseMethod = DFSClient.class.getDeclaredMethod("endFileLease", String.class); 179 endFileLeaseMethod.setAccessible(true); 180 Method getUniqKeyMethod = DFSOutputStream.class.getMethod("getUniqKey"); 181 return new LeaseManager() { 182 183 private String getUniqKey(FanOutOneBlockAsyncDFSOutput output) 184 throws IllegalAccessException, InvocationTargetException { 185 return (String) getUniqKeyMethod.invoke(output.getDummyStream()); 186 } 187 188 @Override 189 public void begin(FanOutOneBlockAsyncDFSOutput output) { 190 try { 191 beginFileLeaseMethod.invoke(output.getClient(), getUniqKey(output), 192 output.getDummyStream()); 193 } catch (IllegalAccessException | InvocationTargetException e) { 194 throw new RuntimeException(e); 195 } 196 } 197 198 @Override 199 public void end(FanOutOneBlockAsyncDFSOutput output) { 200 try { 201 endFileLeaseMethod.invoke(output.getClient(), getUniqKey(output)); 202 } catch (IllegalAccessException | InvocationTargetException e) { 203 throw new RuntimeException(e); 204 } 205 } 206 }; 207 } 208 209 private static LeaseManager createLeaseManager3() throws NoSuchMethodException { 210 Method beginFileLeaseMethod = 211 DFSClient.class.getDeclaredMethod("beginFileLease", long.class, DFSOutputStream.class); 212 beginFileLeaseMethod.setAccessible(true); 213 Method endFileLeaseMethod = DFSClient.class.getDeclaredMethod("endFileLease", long.class); 214 endFileLeaseMethod.setAccessible(true); 215 return new LeaseManager() { 216 217 @Override 218 public void begin(FanOutOneBlockAsyncDFSOutput output) { 219 try { 220 beginFileLeaseMethod.invoke(output.getClient(), output.getDummyStream().getFileId(), 221 output.getDummyStream()); 222 } catch (IllegalAccessException | InvocationTargetException e) { 223 throw new RuntimeException(e); 224 } 225 } 226 227 @Override 228 public void end(FanOutOneBlockAsyncDFSOutput output) { 229 try { 230 endFileLeaseMethod.invoke(output.getClient(), output.getDummyStream().getFileId()); 231 } catch (IllegalAccessException | InvocationTargetException e) { 232 throw new RuntimeException(e); 233 } 234 } 235 }; 236 } 237 238 private static LeaseManager createLeaseManager() throws NoSuchMethodException { 239 try { 240 return createLeaseManager3_4(); 241 } catch (NoSuchMethodException e) { 242 LOG.debug("DFSClient::beginFileLease wrong arguments, should be hadoop 3.3 or below"); 243 } 244 245 return createLeaseManager3(); 246 } 247 248 private static FileCreator createFileCreator3_3() throws NoSuchMethodException { 249 Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class, 250 String.class, EnumSetWritable.class, boolean.class, short.class, long.class, 251 CryptoProtocolVersion[].class, String.class, String.class); 252 253 return (instance, src, masked, clientName, flag, createParent, replication, blockSize, 254 supportedVersions) -> { 255 return (HdfsFileStatus) createMethod.invoke(instance, src, masked, clientName, flag, 256 createParent, replication, blockSize, supportedVersions, null, null); 257 }; 258 } 259 260 private static FileCreator createFileCreator3() throws NoSuchMethodException { 261 Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class, 262 String.class, EnumSetWritable.class, boolean.class, short.class, long.class, 263 CryptoProtocolVersion[].class, String.class); 264 265 return (instance, src, masked, clientName, flag, createParent, replication, blockSize, 266 supportedVersions) -> { 267 return (HdfsFileStatus) createMethod.invoke(instance, src, masked, clientName, flag, 268 createParent, replication, blockSize, supportedVersions, null); 269 }; 270 } 271 272 private static FileCreator createFileCreator() throws NoSuchMethodException { 273 try { 274 return createFileCreator3_3(); 275 } catch (NoSuchMethodException e) { 276 LOG.debug("ClientProtocol::create wrong number of arguments, should be hadoop 3.2 or below"); 277 } 278 279 return createFileCreator3(); 280 } 281 282 // cancel the processing if DFSClient is already closed. 283 static final class CancelOnClose implements CancelableProgressable { 284 285 private final DFSClient client; 286 287 public CancelOnClose(DFSClient client) { 288 this.client = client; 289 } 290 291 @Override 292 public boolean progress() { 293 return client.isClientRunning(); 294 } 295 } 296 297 static { 298 try { 299 LEASE_MANAGER = createLeaseManager(); 300 FILE_CREATOR = createFileCreator(); 301 } catch (Exception e) { 302 String msg = "Couldn't properly initialize access to HDFS internals. Please " 303 + "update your WAL Provider to not make use of the 'asyncfs' provider. See " 304 + "HBASE-16110 for more information."; 305 LOG.error(msg, e); 306 throw new Error(msg, e); 307 } 308 } 309 310 private static void beginFileLease(FanOutOneBlockAsyncDFSOutput output) { 311 LEASE_MANAGER.begin(output); 312 } 313 314 static void endFileLease(FanOutOneBlockAsyncDFSOutput output) { 315 LEASE_MANAGER.end(output); 316 } 317 318 static DataChecksum createChecksum(DFSClient client) { 319 return client.getConf().createChecksum(null); 320 } 321 322 static Status getStatus(PipelineAckProto ack) { 323 List<Integer> flagList = ack.getFlagList(); 324 Integer headerFlag; 325 if (flagList.isEmpty()) { 326 Status reply = ack.getReply(0); 327 headerFlag = PipelineAck.combineHeader(ECN.DISABLED, reply); 328 } else { 329 headerFlag = flagList.get(0); 330 } 331 return PipelineAck.getStatusFromHeader(headerFlag); 332 } 333 334 private static void processWriteBlockResponse(Channel channel, DatanodeInfo dnInfo, 335 Promise<Channel> promise, int timeoutMs) { 336 channel.pipeline().addLast(new IdleStateHandler(timeoutMs, 0, 0, TimeUnit.MILLISECONDS), 337 new ProtobufVarint32FrameDecoder(), 338 new ProtobufDecoder(BlockOpResponseProto.getDefaultInstance()), 339 new SimpleChannelInboundHandler<BlockOpResponseProto>() { 340 341 @Override 342 protected void channelRead0(ChannelHandlerContext ctx, BlockOpResponseProto resp) 343 throws Exception { 344 Status pipelineStatus = resp.getStatus(); 345 if (PipelineAck.isRestartOOBStatus(pipelineStatus)) { 346 throw new IOException("datanode " + dnInfo + " is restarting"); 347 } 348 String logInfo = "ack with firstBadLink as " + resp.getFirstBadLink() + " from datanode " 349 + dnInfo.getHostName() + "/" + dnInfo.getInfoAddr(); 350 if (resp.getStatus() != Status.SUCCESS) { 351 if (resp.getStatus() == Status.ERROR_ACCESS_TOKEN) { 352 throw new InvalidBlockTokenException("Got access token error" + ", status message " 353 + resp.getMessage() + ", " + logInfo); 354 } else { 355 throw new IOException("Got error" + ", status=" + resp.getStatus().name() 356 + ", status message " + resp.getMessage() + ", " + logInfo); 357 } 358 } 359 // success 360 ChannelPipeline p = ctx.pipeline(); 361 for (ChannelHandler handler; (handler = p.removeLast()) != null;) { 362 // do not remove all handlers because we may have wrap or unwrap handlers at the header 363 // of pipeline. 364 if (handler instanceof IdleStateHandler) { 365 break; 366 } 367 } 368 // Disable auto read here. Enable it after we setup the streaming pipeline in 369 // FanOutOneBLockAsyncDFSOutput. 370 ctx.channel().config().setAutoRead(false); 371 promise.trySuccess(ctx.channel()); 372 } 373 374 @Override 375 public void channelInactive(ChannelHandlerContext ctx) throws Exception { 376 promise.tryFailure(new IOException("connection to " + dnInfo + " is closed")); 377 } 378 379 @Override 380 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { 381 if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == READER_IDLE) { 382 promise.tryFailure( 383 new IOException("Timeout(" + timeoutMs + "ms) waiting for response from datanode " 384 + dnInfo.getHostName() + "/" + dnInfo.getInfoAddr())); 385 } else { 386 super.userEventTriggered(ctx, evt); 387 } 388 } 389 390 @Override 391 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 392 promise.tryFailure(cause); 393 } 394 }); 395 } 396 397 private static void requestWriteBlock(Channel channel, StorageType storageType, 398 OpWriteBlockProto.Builder writeBlockProtoBuilder) throws IOException { 399 OpWriteBlockProto proto = 400 writeBlockProtoBuilder.setStorageType(PBHelperClient.convertStorageType(storageType)).build(); 401 int protoLen = proto.getSerializedSize(); 402 ByteBuf buffer = 403 channel.alloc().buffer(3 + CodedOutputStream.computeUInt32SizeNoTag(protoLen) + protoLen); 404 buffer.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION); 405 buffer.writeByte(Op.WRITE_BLOCK.code); 406 proto.writeDelimitedTo(new ByteBufOutputStream(buffer)); 407 safeWriteAndFlush(channel, buffer); 408 } 409 410 private static void initialize(Configuration conf, Channel channel, DatanodeInfo dnInfo, 411 StorageType storageType, OpWriteBlockProto.Builder writeBlockProtoBuilder, int timeoutMs, 412 DFSClient client, Token<BlockTokenIdentifier> accessToken, Promise<Channel> promise) 413 throws IOException { 414 Promise<Void> saslPromise = channel.eventLoop().newPromise(); 415 trySaslNegotiate(conf, channel, dnInfo, timeoutMs, client, accessToken, saslPromise); 416 addListener(saslPromise, new FutureListener<Void>() { 417 418 @Override 419 public void operationComplete(Future<Void> future) throws Exception { 420 if (future.isSuccess()) { 421 // setup response processing pipeline first, then send request. 422 processWriteBlockResponse(channel, dnInfo, promise, timeoutMs); 423 requestWriteBlock(channel, storageType, writeBlockProtoBuilder); 424 } else { 425 promise.tryFailure(future.cause()); 426 } 427 } 428 }); 429 } 430 431 private static List<Future<Channel>> connectToDataNodes(Configuration conf, DFSClient client, 432 String clientName, LocatedBlock locatedBlock, long maxBytesRcvd, long latestGS, 433 BlockConstructionStage stage, DataChecksum summer, EventLoopGroup eventLoopGroup, 434 Class<? extends Channel> channelClass) { 435 StorageType[] storageTypes = locatedBlock.getStorageTypes(); 436 DatanodeInfo[] datanodeInfos = getLocatedBlockLocations(locatedBlock); 437 boolean connectToDnViaHostname = 438 conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT); 439 int timeoutMs = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT); 440 ExtendedBlock blockCopy = new ExtendedBlock(locatedBlock.getBlock()); 441 blockCopy.setNumBytes(locatedBlock.getBlockSize()); 442 ClientOperationHeaderProto header = ClientOperationHeaderProto.newBuilder() 443 .setBaseHeader(BaseHeaderProto.newBuilder().setBlock(PBHelperClient.convert(blockCopy)) 444 .setToken(PBHelperClient.convert(locatedBlock.getBlockToken()))) 445 .setClientName(clientName).build(); 446 ChecksumProto checksumProto = DataTransferProtoUtil.toProto(summer); 447 OpWriteBlockProto.Builder writeBlockProtoBuilder = 448 OpWriteBlockProto.newBuilder().setHeader(header) 449 .setStage(OpWriteBlockProto.BlockConstructionStage.valueOf(stage.name())).setPipelineSize(1) 450 .setMinBytesRcvd(locatedBlock.getBlock().getNumBytes()).setMaxBytesRcvd(maxBytesRcvd) 451 .setLatestGenerationStamp(latestGS).setRequestedChecksum(checksumProto) 452 .setCachingStrategy(CachingStrategyProto.newBuilder().setDropBehind(true).build()); 453 List<Future<Channel>> futureList = new ArrayList<>(datanodeInfos.length); 454 for (int i = 0; i < datanodeInfos.length; i++) { 455 DatanodeInfo dnInfo = datanodeInfos[i]; 456 StorageType storageType = storageTypes[i]; 457 Promise<Channel> promise = eventLoopGroup.next().newPromise(); 458 futureList.add(promise); 459 String dnAddr = dnInfo.getXferAddr(connectToDnViaHostname); 460 addListener(new Bootstrap().group(eventLoopGroup).channel(channelClass) 461 .option(CONNECT_TIMEOUT_MILLIS, timeoutMs).handler(new ChannelInitializer<Channel>() { 462 463 @Override 464 protected void initChannel(Channel ch) throws Exception { 465 // we need to get the remote address of the channel so we can only move on after 466 // channel connected. Leave an empty implementation here because netty does not allow 467 // a null handler. 468 } 469 }).connect(NetUtils.createSocketAddr(dnAddr)), new ChannelFutureListener() { 470 471 @Override 472 public void operationComplete(ChannelFuture future) throws Exception { 473 if (future.isSuccess()) { 474 initialize(conf, future.channel(), dnInfo, storageType, writeBlockProtoBuilder, 475 timeoutMs, client, locatedBlock.getBlockToken(), promise); 476 } else { 477 promise.tryFailure(future.cause()); 478 } 479 } 480 }); 481 } 482 return futureList; 483 } 484 485 /** 486 * Exception other than RemoteException thrown when calling create on namenode 487 */ 488 public static class NameNodeException extends IOException { 489 490 private static final long serialVersionUID = 3143237406477095390L; 491 492 public NameNodeException(Throwable cause) { 493 super(cause); 494 } 495 } 496 497 private static EnumSetWritable<CreateFlag> getCreateFlags(boolean overwrite, 498 boolean noLocalWrite) { 499 List<CreateFlag> flags = new ArrayList<>(); 500 flags.add(CreateFlag.CREATE); 501 if (overwrite) { 502 flags.add(CreateFlag.OVERWRITE); 503 } 504 if (noLocalWrite) { 505 flags.add(CreateFlag.NO_LOCAL_WRITE); 506 } 507 flags.add(CreateFlag.SHOULD_REPLICATE); 508 return new EnumSetWritable<>(EnumSet.copyOf(flags)); 509 } 510 511 private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src, 512 boolean overwrite, boolean createParent, short replication, long blockSize, 513 EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass, StreamSlowMonitor monitor, 514 boolean noLocalWrite) throws IOException { 515 Configuration conf = dfs.getConf(); 516 DFSClient client = dfs.getClient(); 517 String clientName = client.getClientName(); 518 ClientProtocol namenode = client.getNamenode(); 519 int createMaxRetries = 520 conf.getInt(ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES, DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES); 521 ExcludeDatanodeManager excludeDatanodeManager = monitor.getExcludeDatanodeManager(); 522 Set<DatanodeInfo> toExcludeNodes = 523 new HashSet<>(excludeDatanodeManager.getExcludeDNs().keySet()); 524 for (int retry = 0;; retry++) { 525 if (LOG.isDebugEnabled()) { 526 LOG.debug("When create output stream for {}, exclude list is {}, retry={}", src, 527 getDataNodeInfo(toExcludeNodes), retry); 528 } 529 EnumSetWritable<CreateFlag> createFlags = getCreateFlags(overwrite, noLocalWrite); 530 HdfsFileStatus stat; 531 try { 532 stat = FILE_CREATOR.create(namenode, src, 533 FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName, 534 createFlags, createParent, replication, blockSize, CryptoProtocolVersion.supported()); 535 } catch (Exception e) { 536 if (e instanceof RemoteException) { 537 throw (RemoteException) e; 538 } else { 539 throw new NameNodeException(e); 540 } 541 } 542 boolean succ = false; 543 LocatedBlock locatedBlock = null; 544 List<Future<Channel>> futureList = null; 545 try { 546 DataChecksum summer = createChecksum(client); 547 locatedBlock = namenode.addBlock(src, client.getClientName(), null, 548 toExcludeNodes.toArray(new DatanodeInfo[0]), stat.getFileId(), null, null); 549 Map<Channel, DatanodeInfo> datanodes = new IdentityHashMap<>(); 550 futureList = connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L, 551 PIPELINE_SETUP_CREATE, summer, eventLoopGroup, channelClass); 552 for (int i = 0, n = futureList.size(); i < n; i++) { 553 DatanodeInfo datanodeInfo = getLocatedBlockLocations(locatedBlock)[i]; 554 try { 555 datanodes.put(futureList.get(i).syncUninterruptibly().getNow(), datanodeInfo); 556 } catch (Exception e) { 557 // exclude the broken DN next time 558 toExcludeNodes.add(datanodeInfo); 559 excludeDatanodeManager.tryAddExcludeDN(datanodeInfo, 560 ExcludeDatanodeManager.ExcludeCause.CONNECT_ERROR.getCause()); 561 throw e; 562 } 563 } 564 Encryptor encryptor = createEncryptor(conf, stat, client); 565 FanOutOneBlockAsyncDFSOutput output = 566 new FanOutOneBlockAsyncDFSOutput(conf, dfs, client, namenode, clientName, src, stat, 567 createFlags.get(), locatedBlock, encryptor, datanodes, summer, ALLOC, monitor); 568 beginFileLease(output); 569 succ = true; 570 return output; 571 } catch (RemoteException e) { 572 LOG.warn("create fan-out dfs output {} failed, retry = {}", src, retry, e); 573 if (shouldRetryCreate(e)) { 574 if (retry >= createMaxRetries) { 575 throw e.unwrapRemoteException(); 576 } 577 } else { 578 throw e.unwrapRemoteException(); 579 } 580 } catch (IOException e) { 581 LOG.warn("create fan-out dfs output {} failed, retry = {}", src, retry, e); 582 if (retry >= createMaxRetries) { 583 throw e; 584 } 585 // overwrite the old broken file. 586 overwrite = true; 587 try { 588 Thread.sleep(ConnectionUtils.getPauseTime(100, retry)); 589 } catch (InterruptedException ie) { 590 throw new InterruptedIOException(); 591 } 592 } finally { 593 if (!succ) { 594 if (futureList != null) { 595 for (Future<Channel> f : futureList) { 596 addListener(f, new FutureListener<Channel>() { 597 598 @Override 599 public void operationComplete(Future<Channel> future) throws Exception { 600 if (future.isSuccess()) { 601 safeClose(future.getNow()); 602 } 603 } 604 }); 605 } 606 } 607 } 608 } 609 } 610 } 611 612 /** 613 * Create a {@link FanOutOneBlockAsyncDFSOutput}. The method maybe blocked so do not call it 614 * inside an {@link EventLoop}. 615 */ 616 public static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, Path f, 617 boolean overwrite, boolean createParent, short replication, long blockSize, 618 EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass, 619 final StreamSlowMonitor monitor, boolean noLocalWrite) throws IOException { 620 return new FileSystemLinkResolver<FanOutOneBlockAsyncDFSOutput>() { 621 622 @Override 623 public FanOutOneBlockAsyncDFSOutput doCall(Path p) 624 throws IOException, UnresolvedLinkException { 625 return createOutput(dfs, p.toUri().getPath(), overwrite, createParent, replication, 626 blockSize, eventLoopGroup, channelClass, monitor, noLocalWrite); 627 } 628 629 @Override 630 public FanOutOneBlockAsyncDFSOutput next(FileSystem fs, Path p) throws IOException { 631 throw new UnsupportedOperationException(); 632 } 633 }.resolve(dfs, f); 634 } 635 636 public static boolean shouldRetryCreate(RemoteException e) { 637 // RetryStartFileException is introduced in HDFS 2.6+, so here we can only use the class name. 638 // For exceptions other than this, we just throw it out. This is same with 639 // DFSOutputStream.newStreamForCreate. 640 return e.getClassName().endsWith("RetryStartFileException"); 641 } 642 643 static void completeFile(FanOutOneBlockAsyncDFSOutput output, DFSClient client, 644 ClientProtocol namenode, String src, String clientName, ExtendedBlock block, 645 HdfsFileStatus stat) throws IOException { 646 int maxRetries = client.getConf().getNumBlockWriteLocateFollowingRetry(); 647 for (int retry = 0; retry < maxRetries; retry++) { 648 try { 649 if (namenode.complete(src, clientName, block, stat.getFileId())) { 650 endFileLease(output); 651 return; 652 } else { 653 LOG.warn("complete file " + src + " not finished, retry = " + retry); 654 } 655 } catch (RemoteException e) { 656 throw e.unwrapRemoteException(); 657 } 658 sleepIgnoreInterrupt(retry); 659 } 660 throw new IOException("can not complete file after retrying " + maxRetries + " times"); 661 } 662 663 static void sleepIgnoreInterrupt(int retry) { 664 try { 665 Thread.sleep(ConnectionUtils.getPauseTime(100, retry)); 666 } catch (InterruptedException e) { 667 } 668 } 669 670 public static String getDataNodeInfo(Collection<DatanodeInfo> datanodeInfos) { 671 if (datanodeInfos.isEmpty()) { 672 return "[]"; 673 } 674 return datanodeInfos.stream() 675 .map(datanodeInfo -> new StringBuilder().append("(").append(datanodeInfo.getHostName()) 676 .append("/").append(datanodeInfo.getInfoAddr()).append(":") 677 .append(datanodeInfo.getInfoPort()).append(")").toString()) 678 .collect(Collectors.joining(",", "[", "]")); 679 } 680}