001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.asyncfs; 019 020import static org.apache.hadoop.fs.CreateFlag.CREATE; 021import static org.apache.hadoop.fs.CreateFlag.OVERWRITE; 022import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.createEncryptor; 023import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate; 024import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; 025import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME; 026import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT; 027import static org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage.PIPELINE_SETUP_CREATE; 028import static org.apache.hbase.thirdparty.io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS; 029import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE; 030 031import com.google.protobuf.CodedOutputStream; 032import java.io.IOException; 033import java.io.InterruptedIOException; 034import java.lang.reflect.InvocationTargetException; 035import java.lang.reflect.Method; 036import java.util.ArrayList; 037import java.util.EnumSet; 038import java.util.List; 039import java.util.concurrent.TimeUnit; 040import org.apache.commons.lang3.ArrayUtils; 041import org.apache.hadoop.conf.Configuration; 042import org.apache.hadoop.crypto.CryptoProtocolVersion; 043import org.apache.hadoop.crypto.Encryptor; 044import org.apache.hadoop.fs.CreateFlag; 045import org.apache.hadoop.fs.FileSystem; 046import org.apache.hadoop.fs.FileSystemLinkResolver; 047import org.apache.hadoop.fs.Path; 048import org.apache.hadoop.fs.StorageType; 049import org.apache.hadoop.fs.UnresolvedLinkException; 050import org.apache.hadoop.fs.permission.FsPermission; 051import org.apache.hadoop.hbase.client.ConnectionUtils; 052import org.apache.hadoop.hbase.util.CancelableProgressable; 053import org.apache.hadoop.hbase.util.FSUtils; 054import org.apache.hadoop.hdfs.DFSClient; 055import org.apache.hadoop.hdfs.DFSOutputStream; 056import org.apache.hadoop.hdfs.DistributedFileSystem; 057import org.apache.hadoop.hdfs.protocol.ClientProtocol; 058import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 059import org.apache.hadoop.hdfs.protocol.ExtendedBlock; 060import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; 061import org.apache.hadoop.hdfs.protocol.LocatedBlock; 062import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; 063import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; 064import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; 065import org.apache.hadoop.hdfs.protocol.datatransfer.Op; 066import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; 067import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.ECN; 068import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto; 069import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; 070import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto; 071import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto; 072import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto; 073import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto; 074import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto; 075import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; 076import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; 077import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; 078import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; 079import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException; 080import org.apache.hadoop.io.EnumSetWritable; 081import org.apache.hadoop.ipc.RemoteException; 082import org.apache.hadoop.net.NetUtils; 083import org.apache.hadoop.security.token.Token; 084import org.apache.hadoop.util.DataChecksum; 085import org.apache.yetus.audience.InterfaceAudience; 086import org.slf4j.Logger; 087import org.slf4j.LoggerFactory; 088 089import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap; 090import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 091import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator; 092import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream; 093import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator; 094import org.apache.hbase.thirdparty.io.netty.channel.Channel; 095import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture; 096import org.apache.hbase.thirdparty.io.netty.channel.ChannelFutureListener; 097import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler; 098import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; 099import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer; 100import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline; 101import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; 102import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 103import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler; 104import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufDecoder; 105import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; 106import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent; 107import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler; 108import org.apache.hbase.thirdparty.io.netty.util.concurrent.Future; 109import org.apache.hbase.thirdparty.io.netty.util.concurrent.FutureListener; 110import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise; 111 112/** 113 * Helper class for implementing {@link FanOutOneBlockAsyncDFSOutput}. 114 */ 115@InterfaceAudience.Private 116public final class FanOutOneBlockAsyncDFSOutputHelper { 117 private static final Logger LOG = 118 LoggerFactory.getLogger(FanOutOneBlockAsyncDFSOutputHelper.class); 119 120 private FanOutOneBlockAsyncDFSOutputHelper() { 121 } 122 123 public static final String ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES = "hbase.fs.async.create.retries"; 124 125 public static final int DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES = 10; 126 // use pooled allocator for performance. 127 private static final ByteBufAllocator ALLOC = PooledByteBufAllocator.DEFAULT; 128 129 // copied from DFSPacket since it is package private. 130 public static final long HEART_BEAT_SEQNO = -1L; 131 132 // Timeouts for communicating with DataNode for streaming writes/reads 133 public static final int READ_TIMEOUT = 60 * 1000; 134 135 private static final DatanodeInfo[] EMPTY_DN_ARRAY = new DatanodeInfo[0]; 136 137 private interface LeaseManager { 138 139 void begin(DFSClient client, long inodeId); 140 141 void end(DFSClient client, long inodeId); 142 } 143 144 private static final LeaseManager LEASE_MANAGER; 145 146 // This is used to terminate a recoverFileLease call when FileSystem is already closed. 147 // isClientRunning is not public so we need to use reflection. 148 private interface DFSClientAdaptor { 149 150 boolean isClientRunning(DFSClient client); 151 } 152 153 private static final DFSClientAdaptor DFS_CLIENT_ADAPTOR; 154 155 // helper class for creating files. 156 private interface FileCreator { 157 default HdfsFileStatus create(ClientProtocol instance, String src, FsPermission masked, 158 String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent, 159 short replication, long blockSize, CryptoProtocolVersion[] supportedVersions) 160 throws Exception { 161 try { 162 return (HdfsFileStatus) createObject(instance, src, masked, clientName, flag, createParent, 163 replication, blockSize, supportedVersions); 164 } catch (InvocationTargetException e) { 165 if (e.getCause() instanceof Exception) { 166 throw (Exception) e.getCause(); 167 } else { 168 throw new RuntimeException(e.getCause()); 169 } 170 } 171 }; 172 173 Object createObject(ClientProtocol instance, String src, FsPermission masked, String clientName, 174 EnumSetWritable<CreateFlag> flag, boolean createParent, short replication, long blockSize, 175 CryptoProtocolVersion[] supportedVersions) throws Exception; 176 } 177 178 private static final FileCreator FILE_CREATOR; 179 180 private static DFSClientAdaptor createDFSClientAdaptor() throws NoSuchMethodException { 181 Method isClientRunningMethod = DFSClient.class.getDeclaredMethod("isClientRunning"); 182 isClientRunningMethod.setAccessible(true); 183 return new DFSClientAdaptor() { 184 185 @Override 186 public boolean isClientRunning(DFSClient client) { 187 try { 188 return (Boolean) isClientRunningMethod.invoke(client); 189 } catch (IllegalAccessException | InvocationTargetException e) { 190 throw new RuntimeException(e); 191 } 192 } 193 }; 194 } 195 196 private static LeaseManager createLeaseManager() throws NoSuchMethodException { 197 Method beginFileLeaseMethod = 198 DFSClient.class.getDeclaredMethod("beginFileLease", long.class, DFSOutputStream.class); 199 beginFileLeaseMethod.setAccessible(true); 200 Method endFileLeaseMethod = DFSClient.class.getDeclaredMethod("endFileLease", long.class); 201 endFileLeaseMethod.setAccessible(true); 202 return new LeaseManager() { 203 204 @Override 205 public void begin(DFSClient client, long inodeId) { 206 try { 207 beginFileLeaseMethod.invoke(client, inodeId, null); 208 } catch (IllegalAccessException | InvocationTargetException e) { 209 throw new RuntimeException(e); 210 } 211 } 212 213 @Override 214 public void end(DFSClient client, long inodeId) { 215 try { 216 endFileLeaseMethod.invoke(client, inodeId); 217 } catch (IllegalAccessException | InvocationTargetException e) { 218 throw new RuntimeException(e); 219 } 220 } 221 }; 222 } 223 224 private static FileCreator createFileCreator3() throws NoSuchMethodException { 225 Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class, 226 String.class, EnumSetWritable.class, boolean.class, short.class, long.class, 227 CryptoProtocolVersion[].class, String.class); 228 229 return (instance, src, masked, clientName, flag, createParent, replication, blockSize, 230 supportedVersions) -> { 231 return (HdfsFileStatus) createMethod.invoke(instance, src, masked, clientName, flag, 232 createParent, replication, blockSize, supportedVersions, null); 233 }; 234 } 235 236 private static FileCreator createFileCreator2() throws NoSuchMethodException { 237 Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class, 238 String.class, EnumSetWritable.class, boolean.class, short.class, long.class, 239 CryptoProtocolVersion[].class); 240 241 return (instance, src, masked, clientName, flag, createParent, replication, blockSize, 242 supportedVersions) -> { 243 return (HdfsFileStatus) createMethod.invoke(instance, src, masked, clientName, flag, 244 createParent, replication, blockSize, supportedVersions); 245 }; 246 } 247 248 private static FileCreator createFileCreator() throws NoSuchMethodException { 249 try { 250 return createFileCreator3(); 251 } catch (NoSuchMethodException e) { 252 LOG.debug("ClientProtocol::create wrong number of arguments, should be hadoop 2.x"); 253 } 254 return createFileCreator2(); 255 } 256 257 // cancel the processing if DFSClient is already closed. 258 static final class CancelOnClose implements CancelableProgressable { 259 260 private final DFSClient client; 261 262 public CancelOnClose(DFSClient client) { 263 this.client = client; 264 } 265 266 @Override 267 public boolean progress() { 268 return DFS_CLIENT_ADAPTOR.isClientRunning(client); 269 } 270 } 271 272 static { 273 try { 274 LEASE_MANAGER = createLeaseManager(); 275 DFS_CLIENT_ADAPTOR = createDFSClientAdaptor(); 276 FILE_CREATOR = createFileCreator(); 277 } catch (Exception e) { 278 String msg = "Couldn't properly initialize access to HDFS internals. Please " + 279 "update your WAL Provider to not make use of the 'asyncfs' provider. See " + 280 "HBASE-16110 for more information."; 281 LOG.error(msg, e); 282 throw new Error(msg, e); 283 } 284 } 285 286 static void beginFileLease(DFSClient client, long inodeId) { 287 LEASE_MANAGER.begin(client, inodeId); 288 } 289 290 static void endFileLease(DFSClient client, long inodeId) { 291 LEASE_MANAGER.end(client, inodeId); 292 } 293 294 static DataChecksum createChecksum(DFSClient client) { 295 return client.getConf().createChecksum(null); 296 } 297 298 static Status getStatus(PipelineAckProto ack) { 299 List<Integer> flagList = ack.getFlagList(); 300 Integer headerFlag; 301 if (flagList.isEmpty()) { 302 Status reply = ack.getReply(0); 303 headerFlag = PipelineAck.combineHeader(ECN.DISABLED, reply); 304 } else { 305 headerFlag = flagList.get(0); 306 } 307 return PipelineAck.getStatusFromHeader(headerFlag); 308 } 309 310 private static void processWriteBlockResponse(Channel channel, DatanodeInfo dnInfo, 311 Promise<Channel> promise, int timeoutMs) { 312 channel.pipeline().addLast(new IdleStateHandler(timeoutMs, 0, 0, TimeUnit.MILLISECONDS), 313 new ProtobufVarint32FrameDecoder(), 314 new ProtobufDecoder(BlockOpResponseProto.getDefaultInstance()), 315 new SimpleChannelInboundHandler<BlockOpResponseProto>() { 316 317 @Override 318 protected void channelRead0(ChannelHandlerContext ctx, BlockOpResponseProto resp) 319 throws Exception { 320 Status pipelineStatus = resp.getStatus(); 321 if (PipelineAck.isRestartOOBStatus(pipelineStatus)) { 322 throw new IOException("datanode " + dnInfo + " is restarting"); 323 } 324 String logInfo = "ack with firstBadLink as " + resp.getFirstBadLink(); 325 if (resp.getStatus() != Status.SUCCESS) { 326 if (resp.getStatus() == Status.ERROR_ACCESS_TOKEN) { 327 throw new InvalidBlockTokenException("Got access token error" + ", status message " + 328 resp.getMessage() + ", " + logInfo); 329 } else { 330 throw new IOException("Got error" + ", status=" + resp.getStatus().name() + 331 ", status message " + resp.getMessage() + ", " + logInfo); 332 } 333 } 334 // success 335 ChannelPipeline p = ctx.pipeline(); 336 for (ChannelHandler handler; (handler = p.removeLast()) != null;) { 337 // do not remove all handlers because we may have wrap or unwrap handlers at the header 338 // of pipeline. 339 if (handler instanceof IdleStateHandler) { 340 break; 341 } 342 } 343 // Disable auto read here. Enable it after we setup the streaming pipeline in 344 // FanOutOneBLockAsyncDFSOutput. 345 ctx.channel().config().setAutoRead(false); 346 promise.trySuccess(ctx.channel()); 347 } 348 349 @Override 350 public void channelInactive(ChannelHandlerContext ctx) throws Exception { 351 promise.tryFailure(new IOException("connection to " + dnInfo + " is closed")); 352 } 353 354 @Override 355 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { 356 if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == READER_IDLE) { 357 promise 358 .tryFailure(new IOException("Timeout(" + timeoutMs + "ms) waiting for response")); 359 } else { 360 super.userEventTriggered(ctx, evt); 361 } 362 } 363 364 @Override 365 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 366 promise.tryFailure(cause); 367 } 368 }); 369 } 370 371 private static void requestWriteBlock(Channel channel, StorageType storageType, 372 OpWriteBlockProto.Builder writeBlockProtoBuilder) throws IOException { 373 OpWriteBlockProto proto = 374 writeBlockProtoBuilder.setStorageType(PBHelperClient.convertStorageType(storageType)).build(); 375 int protoLen = proto.getSerializedSize(); 376 ByteBuf buffer = 377 channel.alloc().buffer(3 + CodedOutputStream.computeRawVarint32Size(protoLen) + protoLen); 378 buffer.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION); 379 buffer.writeByte(Op.WRITE_BLOCK.code); 380 proto.writeDelimitedTo(new ByteBufOutputStream(buffer)); 381 channel.writeAndFlush(buffer); 382 } 383 384 private static void initialize(Configuration conf, Channel channel, DatanodeInfo dnInfo, 385 StorageType storageType, OpWriteBlockProto.Builder writeBlockProtoBuilder, int timeoutMs, 386 DFSClient client, Token<BlockTokenIdentifier> accessToken, Promise<Channel> promise) 387 throws IOException { 388 Promise<Void> saslPromise = channel.eventLoop().newPromise(); 389 trySaslNegotiate(conf, channel, dnInfo, timeoutMs, client, accessToken, saslPromise); 390 saslPromise.addListener(new FutureListener<Void>() { 391 392 @Override 393 public void operationComplete(Future<Void> future) throws Exception { 394 if (future.isSuccess()) { 395 // setup response processing pipeline first, then send request. 396 processWriteBlockResponse(channel, dnInfo, promise, timeoutMs); 397 requestWriteBlock(channel, storageType, writeBlockProtoBuilder); 398 } else { 399 promise.tryFailure(future.cause()); 400 } 401 } 402 }); 403 } 404 405 private static List<Future<Channel>> connectToDataNodes(Configuration conf, DFSClient client, 406 String clientName, LocatedBlock locatedBlock, long maxBytesRcvd, long latestGS, 407 BlockConstructionStage stage, DataChecksum summer, EventLoopGroup eventLoopGroup, 408 Class<? extends Channel> channelClass) { 409 StorageType[] storageTypes = locatedBlock.getStorageTypes(); 410 DatanodeInfo[] datanodeInfos = locatedBlock.getLocations(); 411 boolean connectToDnViaHostname = 412 conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT); 413 int timeoutMs = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT); 414 ExtendedBlock blockCopy = new ExtendedBlock(locatedBlock.getBlock()); 415 blockCopy.setNumBytes(locatedBlock.getBlockSize()); 416 ClientOperationHeaderProto header = ClientOperationHeaderProto.newBuilder() 417 .setBaseHeader(BaseHeaderProto.newBuilder().setBlock(PBHelperClient.convert(blockCopy)) 418 .setToken(PBHelperClient.convert(locatedBlock.getBlockToken()))) 419 .setClientName(clientName).build(); 420 ChecksumProto checksumProto = DataTransferProtoUtil.toProto(summer); 421 OpWriteBlockProto.Builder writeBlockProtoBuilder = OpWriteBlockProto.newBuilder() 422 .setHeader(header).setStage(OpWriteBlockProto.BlockConstructionStage.valueOf(stage.name())) 423 .setPipelineSize(1).setMinBytesRcvd(locatedBlock.getBlock().getNumBytes()) 424 .setMaxBytesRcvd(maxBytesRcvd).setLatestGenerationStamp(latestGS) 425 .setRequestedChecksum(checksumProto) 426 .setCachingStrategy(CachingStrategyProto.newBuilder().setDropBehind(true).build()); 427 List<Future<Channel>> futureList = new ArrayList<>(datanodeInfos.length); 428 for (int i = 0; i < datanodeInfos.length; i++) { 429 DatanodeInfo dnInfo = datanodeInfos[i]; 430 StorageType storageType = storageTypes[i]; 431 Promise<Channel> promise = eventLoopGroup.next().newPromise(); 432 futureList.add(promise); 433 String dnAddr = dnInfo.getXferAddr(connectToDnViaHostname); 434 new Bootstrap().group(eventLoopGroup).channel(channelClass) 435 .option(CONNECT_TIMEOUT_MILLIS, timeoutMs).handler(new ChannelInitializer<Channel>() { 436 437 @Override 438 protected void initChannel(Channel ch) throws Exception { 439 // we need to get the remote address of the channel so we can only move on after 440 // channel connected. Leave an empty implementation here because netty does not allow 441 // a null handler. 442 } 443 }).connect(NetUtils.createSocketAddr(dnAddr)).addListener(new ChannelFutureListener() { 444 445 @Override 446 public void operationComplete(ChannelFuture future) throws Exception { 447 if (future.isSuccess()) { 448 initialize(conf, future.channel(), dnInfo, storageType, writeBlockProtoBuilder, 449 timeoutMs, client, locatedBlock.getBlockToken(), promise); 450 } else { 451 promise.tryFailure(future.cause()); 452 } 453 } 454 }); 455 } 456 return futureList; 457 } 458 459 /** 460 * Exception other than RemoteException thrown when calling create on namenode 461 */ 462 public static class NameNodeException extends IOException { 463 464 private static final long serialVersionUID = 3143237406477095390L; 465 466 public NameNodeException(Throwable cause) { 467 super(cause); 468 } 469 } 470 471 private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src, 472 boolean overwrite, boolean createParent, short replication, long blockSize, 473 EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) throws IOException { 474 Configuration conf = dfs.getConf(); 475 FSUtils fsUtils = FSUtils.getInstance(dfs, conf); 476 DFSClient client = dfs.getClient(); 477 String clientName = client.getClientName(); 478 ClientProtocol namenode = client.getNamenode(); 479 int createMaxRetries = conf.getInt(ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES, 480 DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES); 481 DatanodeInfo[] excludesNodes = EMPTY_DN_ARRAY; 482 for (int retry = 0;; retry++) { 483 HdfsFileStatus stat; 484 try { 485 stat = FILE_CREATOR.create(namenode, src, 486 FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName, 487 new EnumSetWritable<>(overwrite ? EnumSet.of(CREATE, OVERWRITE) : EnumSet.of(CREATE)), 488 createParent, replication, blockSize, CryptoProtocolVersion.supported()); 489 } catch (Exception e) { 490 if (e instanceof RemoteException) { 491 throw (RemoteException) e; 492 } else { 493 throw new NameNodeException(e); 494 } 495 } 496 beginFileLease(client, stat.getFileId()); 497 boolean succ = false; 498 LocatedBlock locatedBlock = null; 499 List<Future<Channel>> futureList = null; 500 try { 501 DataChecksum summer = createChecksum(client); 502 locatedBlock = namenode.addBlock(src, client.getClientName(), null, excludesNodes, 503 stat.getFileId(), null, null); 504 List<Channel> datanodeList = new ArrayList<>(); 505 futureList = connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L, 506 PIPELINE_SETUP_CREATE, summer, eventLoopGroup, channelClass); 507 for (int i = 0, n = futureList.size(); i < n; i++) { 508 try { 509 datanodeList.add(futureList.get(i).syncUninterruptibly().getNow()); 510 } catch (Exception e) { 511 // exclude the broken DN next time 512 excludesNodes = ArrayUtils.add(excludesNodes, locatedBlock.getLocations()[i]); 513 throw e; 514 } 515 } 516 Encryptor encryptor = createEncryptor(conf, stat, client); 517 FanOutOneBlockAsyncDFSOutput output = 518 new FanOutOneBlockAsyncDFSOutput(conf, fsUtils, dfs, client, namenode, clientName, src, 519 stat.getFileId(), locatedBlock, encryptor, datanodeList, summer, ALLOC); 520 succ = true; 521 return output; 522 } catch (RemoteException e) { 523 LOG.warn("create fan-out dfs output {} failed, retry = {}", src, retry, e); 524 if (shouldRetryCreate(e)) { 525 if (retry >= createMaxRetries) { 526 throw e.unwrapRemoteException(); 527 } 528 } else { 529 throw e.unwrapRemoteException(); 530 } 531 } catch (IOException e) { 532 LOG.warn("create fan-out dfs output {} failed, retry = {}", src, retry, e); 533 if (retry >= createMaxRetries) { 534 throw e; 535 } 536 // overwrite the old broken file. 537 overwrite = true; 538 try { 539 Thread.sleep(ConnectionUtils.getPauseTime(100, retry)); 540 } catch (InterruptedException ie) { 541 throw new InterruptedIOException(); 542 } 543 } finally { 544 if (!succ) { 545 if (futureList != null) { 546 for (Future<Channel> f : futureList) { 547 f.addListener(new FutureListener<Channel>() { 548 549 @Override 550 public void operationComplete(Future<Channel> future) throws Exception { 551 if (future.isSuccess()) { 552 future.getNow().close(); 553 } 554 } 555 }); 556 } 557 } 558 endFileLease(client, stat.getFileId()); 559 } 560 } 561 } 562 } 563 564 /** 565 * Create a {@link FanOutOneBlockAsyncDFSOutput}. The method maybe blocked so do not call it 566 * inside an {@link EventLoop}. 567 */ 568 public static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, Path f, 569 boolean overwrite, boolean createParent, short replication, long blockSize, 570 EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) throws IOException { 571 return new FileSystemLinkResolver<FanOutOneBlockAsyncDFSOutput>() { 572 573 @Override 574 public FanOutOneBlockAsyncDFSOutput doCall(Path p) 575 throws IOException, UnresolvedLinkException { 576 return createOutput(dfs, p.toUri().getPath(), overwrite, createParent, replication, 577 blockSize, eventLoopGroup, channelClass); 578 } 579 580 @Override 581 public FanOutOneBlockAsyncDFSOutput next(FileSystem fs, Path p) throws IOException { 582 throw new UnsupportedOperationException(); 583 } 584 }.resolve(dfs, f); 585 } 586 587 public static boolean shouldRetryCreate(RemoteException e) { 588 // RetryStartFileException is introduced in HDFS 2.6+, so here we can only use the class name. 589 // For exceptions other than this, we just throw it out. This is same with 590 // DFSOutputStream.newStreamForCreate. 591 return e.getClassName().endsWith("RetryStartFileException"); 592 } 593 594 static void completeFile(DFSClient client, ClientProtocol namenode, String src, String clientName, 595 ExtendedBlock block, long fileId) { 596 for (int retry = 0;; retry++) { 597 try { 598 if (namenode.complete(src, clientName, block, fileId)) { 599 endFileLease(client, fileId); 600 return; 601 } else { 602 LOG.warn("complete file " + src + " not finished, retry = " + retry); 603 } 604 } catch (RemoteException e) { 605 IOException ioe = e.unwrapRemoteException(); 606 if (ioe instanceof LeaseExpiredException) { 607 LOG.warn("lease for file " + src + " is expired, give up", e); 608 return; 609 } else { 610 LOG.warn("complete file " + src + " failed, retry = " + retry, e); 611 } 612 } catch (Exception e) { 613 LOG.warn("complete file " + src + " failed, retry = " + retry, e); 614 } 615 sleepIgnoreInterrupt(retry); 616 } 617 } 618 619 static void sleepIgnoreInterrupt(int retry) { 620 try { 621 Thread.sleep(ConnectionUtils.getPauseTime(100, retry)); 622 } catch (InterruptedException e) { 623 } 624 } 625}