001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.asyncfs; 019 020import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.createEncryptor; 021import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate; 022import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; 023import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME; 024import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT; 025import static org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage.PIPELINE_SETUP_CREATE; 026import static org.apache.hbase.thirdparty.io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS; 027import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE; 028 029import com.google.protobuf.CodedOutputStream; 030import java.io.IOException; 031import java.io.InterruptedIOException; 032import java.lang.reflect.InvocationTargetException; 033import java.lang.reflect.Method; 034import java.util.ArrayList; 035import java.util.EnumSet; 036import java.util.List; 037import java.util.concurrent.TimeUnit; 038import org.apache.commons.lang3.ArrayUtils; 039import org.apache.hadoop.conf.Configuration; 040import org.apache.hadoop.crypto.CryptoProtocolVersion; 041import org.apache.hadoop.crypto.Encryptor; 042import org.apache.hadoop.fs.CreateFlag; 043import org.apache.hadoop.fs.FileSystem; 044import org.apache.hadoop.fs.FileSystemLinkResolver; 045import org.apache.hadoop.fs.Path; 046import org.apache.hadoop.fs.StorageType; 047import org.apache.hadoop.fs.UnresolvedLinkException; 048import org.apache.hadoop.fs.permission.FsPermission; 049import org.apache.hadoop.hbase.client.ConnectionUtils; 050import org.apache.hadoop.hbase.util.CancelableProgressable; 051import org.apache.hadoop.hdfs.DFSClient; 052import org.apache.hadoop.hdfs.DFSOutputStream; 053import org.apache.hadoop.hdfs.DistributedFileSystem; 054import org.apache.hadoop.hdfs.protocol.ClientProtocol; 055import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 056import org.apache.hadoop.hdfs.protocol.ExtendedBlock; 057import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; 058import org.apache.hadoop.hdfs.protocol.LocatedBlock; 059import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; 060import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; 061import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; 062import org.apache.hadoop.hdfs.protocol.datatransfer.Op; 063import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; 064import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.ECN; 065import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto; 066import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; 067import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto; 068import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto; 069import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto; 070import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto; 071import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto; 072import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; 073import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; 074import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; 075import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; 076import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException; 077import org.apache.hadoop.io.EnumSetWritable; 078import org.apache.hadoop.ipc.RemoteException; 079import org.apache.hadoop.net.NetUtils; 080import org.apache.hadoop.security.token.Token; 081import org.apache.hadoop.util.DataChecksum; 082import org.apache.yetus.audience.InterfaceAudience; 083import org.slf4j.Logger; 084import org.slf4j.LoggerFactory; 085 086import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap; 087import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 088import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator; 089import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream; 090import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator; 091import org.apache.hbase.thirdparty.io.netty.channel.Channel; 092import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture; 093import org.apache.hbase.thirdparty.io.netty.channel.ChannelFutureListener; 094import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler; 095import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; 096import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer; 097import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline; 098import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; 099import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 100import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler; 101import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; 102import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent; 103import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler; 104import org.apache.hbase.thirdparty.io.netty.util.concurrent.Future; 105import org.apache.hbase.thirdparty.io.netty.util.concurrent.FutureListener; 106import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise; 107 108/** 109 * Helper class for implementing {@link FanOutOneBlockAsyncDFSOutput}. 110 */ 111@InterfaceAudience.Private 112public final class FanOutOneBlockAsyncDFSOutputHelper { 113 private static final Logger LOG = 114 LoggerFactory.getLogger(FanOutOneBlockAsyncDFSOutputHelper.class); 115 116 private FanOutOneBlockAsyncDFSOutputHelper() { 117 } 118 119 public static final String ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES = "hbase.fs.async.create.retries"; 120 121 public static final int DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES = 10; 122 // use pooled allocator for performance. 123 private static final ByteBufAllocator ALLOC = PooledByteBufAllocator.DEFAULT; 124 125 // copied from DFSPacket since it is package private. 126 public static final long HEART_BEAT_SEQNO = -1L; 127 128 // Timeouts for communicating with DataNode for streaming writes/reads 129 public static final int READ_TIMEOUT = 60 * 1000; 130 131 private static final DatanodeInfo[] EMPTY_DN_ARRAY = new DatanodeInfo[0]; 132 133 private interface LeaseManager { 134 135 void begin(DFSClient client, long inodeId); 136 137 void end(DFSClient client, long inodeId); 138 } 139 140 private static final LeaseManager LEASE_MANAGER; 141 142 // This is used to terminate a recoverFileLease call when FileSystem is already closed. 143 // isClientRunning is not public so we need to use reflection. 144 private interface DFSClientAdaptor { 145 146 boolean isClientRunning(DFSClient client); 147 } 148 149 private static final DFSClientAdaptor DFS_CLIENT_ADAPTOR; 150 151 // helper class for creating files. 152 private interface FileCreator { 153 default HdfsFileStatus create(ClientProtocol instance, String src, FsPermission masked, 154 String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent, 155 short replication, long blockSize, CryptoProtocolVersion[] supportedVersions) 156 throws Exception { 157 try { 158 return (HdfsFileStatus) createObject(instance, src, masked, clientName, flag, createParent, 159 replication, blockSize, supportedVersions); 160 } catch (InvocationTargetException e) { 161 if (e.getCause() instanceof Exception) { 162 throw (Exception) e.getCause(); 163 } else { 164 throw new RuntimeException(e.getCause()); 165 } 166 } 167 }; 168 169 Object createObject(ClientProtocol instance, String src, FsPermission masked, String clientName, 170 EnumSetWritable<CreateFlag> flag, boolean createParent, short replication, long blockSize, 171 CryptoProtocolVersion[] supportedVersions) throws Exception; 172 } 173 174 private static final FileCreator FILE_CREATOR; 175 176 // CreateFlag.SHOULD_REPLICATE is to make OutputStream on a EC directory support hflush/hsync, but 177 // EC is introduced in hadoop 3.x so we do not have this enum on 2.x, that's why we need to 178 // indirectly reference it through reflection. 179 private static final CreateFlag SHOULD_REPLICATE_FLAG; 180 181 private static DFSClientAdaptor createDFSClientAdaptor() throws NoSuchMethodException { 182 Method isClientRunningMethod = DFSClient.class.getDeclaredMethod("isClientRunning"); 183 isClientRunningMethod.setAccessible(true); 184 return new DFSClientAdaptor() { 185 186 @Override 187 public boolean isClientRunning(DFSClient client) { 188 try { 189 return (Boolean) isClientRunningMethod.invoke(client); 190 } catch (IllegalAccessException | InvocationTargetException e) { 191 throw new RuntimeException(e); 192 } 193 } 194 }; 195 } 196 197 private static LeaseManager createLeaseManager() throws NoSuchMethodException { 198 Method beginFileLeaseMethod = 199 DFSClient.class.getDeclaredMethod("beginFileLease", long.class, DFSOutputStream.class); 200 beginFileLeaseMethod.setAccessible(true); 201 Method endFileLeaseMethod = DFSClient.class.getDeclaredMethod("endFileLease", long.class); 202 endFileLeaseMethod.setAccessible(true); 203 return new LeaseManager() { 204 205 @Override 206 public void begin(DFSClient client, long inodeId) { 207 try { 208 beginFileLeaseMethod.invoke(client, inodeId, null); 209 } catch (IllegalAccessException | InvocationTargetException e) { 210 throw new RuntimeException(e); 211 } 212 } 213 214 @Override 215 public void end(DFSClient client, long inodeId) { 216 try { 217 endFileLeaseMethod.invoke(client, inodeId); 218 } catch (IllegalAccessException | InvocationTargetException e) { 219 throw new RuntimeException(e); 220 } 221 } 222 }; 223 } 224 225 private static FileCreator createFileCreator3_3() throws NoSuchMethodException { 226 Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class, 227 String.class, EnumSetWritable.class, boolean.class, short.class, long.class, 228 CryptoProtocolVersion[].class, String.class, String.class); 229 230 return (instance, src, masked, clientName, flag, createParent, replication, blockSize, 231 supportedVersions) -> { 232 return (HdfsFileStatus) createMethod.invoke(instance, src, masked, clientName, flag, 233 createParent, replication, blockSize, supportedVersions, null, null); 234 }; 235 } 236 237 private static FileCreator createFileCreator3() throws NoSuchMethodException { 238 Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class, 239 String.class, EnumSetWritable.class, boolean.class, short.class, long.class, 240 CryptoProtocolVersion[].class, String.class); 241 242 return (instance, src, masked, clientName, flag, createParent, replication, blockSize, 243 supportedVersions) -> { 244 return (HdfsFileStatus) createMethod.invoke(instance, src, masked, clientName, flag, 245 createParent, replication, blockSize, supportedVersions, null); 246 }; 247 } 248 249 private static FileCreator createFileCreator2() throws NoSuchMethodException { 250 Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class, 251 String.class, EnumSetWritable.class, boolean.class, short.class, long.class, 252 CryptoProtocolVersion[].class); 253 254 return (instance, src, masked, clientName, flag, createParent, replication, blockSize, 255 supportedVersions) -> { 256 return (HdfsFileStatus) createMethod.invoke(instance, src, masked, clientName, flag, 257 createParent, replication, blockSize, supportedVersions); 258 }; 259 } 260 261 private static FileCreator createFileCreator() throws NoSuchMethodException { 262 try { 263 return createFileCreator3_3(); 264 } catch (NoSuchMethodException e) { 265 LOG.debug("ClientProtocol::create wrong number of arguments, should be hadoop 3.2 or below"); 266 } 267 268 try { 269 return createFileCreator3(); 270 } catch (NoSuchMethodException e) { 271 LOG.debug("ClientProtocol::create wrong number of arguments, should be hadoop 2.x"); 272 } 273 return createFileCreator2(); 274 } 275 276 private static CreateFlag loadShouldReplicateFlag() { 277 try { 278 return CreateFlag.valueOf("SHOULD_REPLICATE"); 279 } catch (IllegalArgumentException e) { 280 LOG.debug("can not find SHOULD_REPLICATE flag, should be hadoop 2.x", e); 281 return null; 282 } 283 } 284 285 // cancel the processing if DFSClient is already closed. 286 static final class CancelOnClose implements CancelableProgressable { 287 288 private final DFSClient client; 289 290 public CancelOnClose(DFSClient client) { 291 this.client = client; 292 } 293 294 @Override 295 public boolean progress() { 296 return DFS_CLIENT_ADAPTOR.isClientRunning(client); 297 } 298 } 299 300 static { 301 try { 302 LEASE_MANAGER = createLeaseManager(); 303 DFS_CLIENT_ADAPTOR = createDFSClientAdaptor(); 304 FILE_CREATOR = createFileCreator(); 305 SHOULD_REPLICATE_FLAG = loadShouldReplicateFlag(); 306 } catch (Exception e) { 307 String msg = "Couldn't properly initialize access to HDFS internals. Please " + 308 "update your WAL Provider to not make use of the 'asyncfs' provider. See " + 309 "HBASE-16110 for more information."; 310 LOG.error(msg, e); 311 throw new Error(msg, e); 312 } 313 } 314 315 static void beginFileLease(DFSClient client, long inodeId) { 316 LEASE_MANAGER.begin(client, inodeId); 317 } 318 319 static void endFileLease(DFSClient client, long inodeId) { 320 LEASE_MANAGER.end(client, inodeId); 321 } 322 323 static DataChecksum createChecksum(DFSClient client) { 324 return client.getConf().createChecksum(null); 325 } 326 327 static Status getStatus(PipelineAckProto ack) { 328 List<Integer> flagList = ack.getFlagList(); 329 Integer headerFlag; 330 if (flagList.isEmpty()) { 331 Status reply = ack.getReply(0); 332 headerFlag = PipelineAck.combineHeader(ECN.DISABLED, reply); 333 } else { 334 headerFlag = flagList.get(0); 335 } 336 return PipelineAck.getStatusFromHeader(headerFlag); 337 } 338 339 private static void processWriteBlockResponse(Channel channel, DatanodeInfo dnInfo, 340 Promise<Channel> promise, int timeoutMs) { 341 channel.pipeline().addLast(new IdleStateHandler(timeoutMs, 0, 0, TimeUnit.MILLISECONDS), 342 new ProtobufVarint32FrameDecoder(), 343 new ProtobufDecoder(BlockOpResponseProto.getDefaultInstance()), 344 new SimpleChannelInboundHandler<BlockOpResponseProto>() { 345 346 @Override 347 protected void channelRead0(ChannelHandlerContext ctx, BlockOpResponseProto resp) 348 throws Exception { 349 Status pipelineStatus = resp.getStatus(); 350 if (PipelineAck.isRestartOOBStatus(pipelineStatus)) { 351 throw new IOException("datanode " + dnInfo + " is restarting"); 352 } 353 String logInfo = "ack with firstBadLink as " + resp.getFirstBadLink(); 354 if (resp.getStatus() != Status.SUCCESS) { 355 if (resp.getStatus() == Status.ERROR_ACCESS_TOKEN) { 356 throw new InvalidBlockTokenException("Got access token error" + ", status message " + 357 resp.getMessage() + ", " + logInfo); 358 } else { 359 throw new IOException("Got error" + ", status=" + resp.getStatus().name() + 360 ", status message " + resp.getMessage() + ", " + logInfo); 361 } 362 } 363 // success 364 ChannelPipeline p = ctx.pipeline(); 365 for (ChannelHandler handler; (handler = p.removeLast()) != null;) { 366 // do not remove all handlers because we may have wrap or unwrap handlers at the header 367 // of pipeline. 368 if (handler instanceof IdleStateHandler) { 369 break; 370 } 371 } 372 // Disable auto read here. Enable it after we setup the streaming pipeline in 373 // FanOutOneBLockAsyncDFSOutput. 374 ctx.channel().config().setAutoRead(false); 375 promise.trySuccess(ctx.channel()); 376 } 377 378 @Override 379 public void channelInactive(ChannelHandlerContext ctx) throws Exception { 380 promise.tryFailure(new IOException("connection to " + dnInfo + " is closed")); 381 } 382 383 @Override 384 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { 385 if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == READER_IDLE) { 386 promise 387 .tryFailure(new IOException("Timeout(" + timeoutMs + "ms) waiting for response")); 388 } else { 389 super.userEventTriggered(ctx, evt); 390 } 391 } 392 393 @Override 394 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 395 promise.tryFailure(cause); 396 } 397 }); 398 } 399 400 private static void requestWriteBlock(Channel channel, StorageType storageType, 401 OpWriteBlockProto.Builder writeBlockProtoBuilder) throws IOException { 402 OpWriteBlockProto proto = 403 writeBlockProtoBuilder.setStorageType(PBHelperClient.convertStorageType(storageType)).build(); 404 int protoLen = proto.getSerializedSize(); 405 ByteBuf buffer = 406 channel.alloc().buffer(3 + CodedOutputStream.computeRawVarint32Size(protoLen) + protoLen); 407 buffer.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION); 408 buffer.writeByte(Op.WRITE_BLOCK.code); 409 proto.writeDelimitedTo(new ByteBufOutputStream(buffer)); 410 channel.writeAndFlush(buffer); 411 } 412 413 private static void initialize(Configuration conf, Channel channel, DatanodeInfo dnInfo, 414 StorageType storageType, OpWriteBlockProto.Builder writeBlockProtoBuilder, int timeoutMs, 415 DFSClient client, Token<BlockTokenIdentifier> accessToken, Promise<Channel> promise) 416 throws IOException { 417 Promise<Void> saslPromise = channel.eventLoop().newPromise(); 418 trySaslNegotiate(conf, channel, dnInfo, timeoutMs, client, accessToken, saslPromise); 419 saslPromise.addListener(new FutureListener<Void>() { 420 421 @Override 422 public void operationComplete(Future<Void> future) throws Exception { 423 if (future.isSuccess()) { 424 // setup response processing pipeline first, then send request. 425 processWriteBlockResponse(channel, dnInfo, promise, timeoutMs); 426 requestWriteBlock(channel, storageType, writeBlockProtoBuilder); 427 } else { 428 promise.tryFailure(future.cause()); 429 } 430 } 431 }); 432 } 433 434 private static List<Future<Channel>> connectToDataNodes(Configuration conf, DFSClient client, 435 String clientName, LocatedBlock locatedBlock, long maxBytesRcvd, long latestGS, 436 BlockConstructionStage stage, DataChecksum summer, EventLoopGroup eventLoopGroup, 437 Class<? extends Channel> channelClass) { 438 StorageType[] storageTypes = locatedBlock.getStorageTypes(); 439 DatanodeInfo[] datanodeInfos = locatedBlock.getLocations(); 440 boolean connectToDnViaHostname = 441 conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT); 442 int timeoutMs = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT); 443 ExtendedBlock blockCopy = new ExtendedBlock(locatedBlock.getBlock()); 444 blockCopy.setNumBytes(locatedBlock.getBlockSize()); 445 ClientOperationHeaderProto header = ClientOperationHeaderProto.newBuilder() 446 .setBaseHeader(BaseHeaderProto.newBuilder().setBlock(PBHelperClient.convert(blockCopy)) 447 .setToken(PBHelperClient.convert(locatedBlock.getBlockToken()))) 448 .setClientName(clientName).build(); 449 ChecksumProto checksumProto = DataTransferProtoUtil.toProto(summer); 450 OpWriteBlockProto.Builder writeBlockProtoBuilder = OpWriteBlockProto.newBuilder() 451 .setHeader(header).setStage(OpWriteBlockProto.BlockConstructionStage.valueOf(stage.name())) 452 .setPipelineSize(1).setMinBytesRcvd(locatedBlock.getBlock().getNumBytes()) 453 .setMaxBytesRcvd(maxBytesRcvd).setLatestGenerationStamp(latestGS) 454 .setRequestedChecksum(checksumProto) 455 .setCachingStrategy(CachingStrategyProto.newBuilder().setDropBehind(true).build()); 456 List<Future<Channel>> futureList = new ArrayList<>(datanodeInfos.length); 457 for (int i = 0; i < datanodeInfos.length; i++) { 458 DatanodeInfo dnInfo = datanodeInfos[i]; 459 StorageType storageType = storageTypes[i]; 460 Promise<Channel> promise = eventLoopGroup.next().newPromise(); 461 futureList.add(promise); 462 String dnAddr = dnInfo.getXferAddr(connectToDnViaHostname); 463 new Bootstrap().group(eventLoopGroup).channel(channelClass) 464 .option(CONNECT_TIMEOUT_MILLIS, timeoutMs).handler(new ChannelInitializer<Channel>() { 465 466 @Override 467 protected void initChannel(Channel ch) throws Exception { 468 // we need to get the remote address of the channel so we can only move on after 469 // channel connected. Leave an empty implementation here because netty does not allow 470 // a null handler. 471 } 472 }).connect(NetUtils.createSocketAddr(dnAddr)).addListener(new ChannelFutureListener() { 473 474 @Override 475 public void operationComplete(ChannelFuture future) throws Exception { 476 if (future.isSuccess()) { 477 initialize(conf, future.channel(), dnInfo, storageType, writeBlockProtoBuilder, 478 timeoutMs, client, locatedBlock.getBlockToken(), promise); 479 } else { 480 promise.tryFailure(future.cause()); 481 } 482 } 483 }); 484 } 485 return futureList; 486 } 487 488 /** 489 * Exception other than RemoteException thrown when calling create on namenode 490 */ 491 public static class NameNodeException extends IOException { 492 493 private static final long serialVersionUID = 3143237406477095390L; 494 495 public NameNodeException(Throwable cause) { 496 super(cause); 497 } 498 } 499 500 private static EnumSetWritable<CreateFlag> getCreateFlags(boolean overwrite) { 501 List<CreateFlag> flags = new ArrayList<>(); 502 flags.add(CreateFlag.CREATE); 503 if (overwrite) { 504 flags.add(CreateFlag.OVERWRITE); 505 } 506 if (SHOULD_REPLICATE_FLAG != null) { 507 flags.add(SHOULD_REPLICATE_FLAG); 508 } 509 return new EnumSetWritable<>(EnumSet.copyOf(flags)); 510 } 511 512 private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src, 513 boolean overwrite, boolean createParent, short replication, long blockSize, 514 EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) throws IOException { 515 Configuration conf = dfs.getConf(); 516 DFSClient client = dfs.getClient(); 517 String clientName = client.getClientName(); 518 ClientProtocol namenode = client.getNamenode(); 519 int createMaxRetries = conf.getInt(ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES, 520 DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES); 521 DatanodeInfo[] excludesNodes = EMPTY_DN_ARRAY; 522 for (int retry = 0;; retry++) { 523 HdfsFileStatus stat; 524 try { 525 stat = FILE_CREATOR.create(namenode, src, 526 FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName, 527 getCreateFlags(overwrite), createParent, replication, blockSize, 528 CryptoProtocolVersion.supported()); 529 } catch (Exception e) { 530 if (e instanceof RemoteException) { 531 throw (RemoteException) e; 532 } else { 533 throw new NameNodeException(e); 534 } 535 } 536 beginFileLease(client, stat.getFileId()); 537 boolean succ = false; 538 LocatedBlock locatedBlock = null; 539 List<Future<Channel>> futureList = null; 540 try { 541 DataChecksum summer = createChecksum(client); 542 locatedBlock = namenode.addBlock(src, client.getClientName(), null, excludesNodes, 543 stat.getFileId(), null, null); 544 List<Channel> datanodeList = new ArrayList<>(); 545 futureList = connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L, 546 PIPELINE_SETUP_CREATE, summer, eventLoopGroup, channelClass); 547 for (int i = 0, n = futureList.size(); i < n; i++) { 548 try { 549 datanodeList.add(futureList.get(i).syncUninterruptibly().getNow()); 550 } catch (Exception e) { 551 // exclude the broken DN next time 552 excludesNodes = ArrayUtils.add(excludesNodes, locatedBlock.getLocations()[i]); 553 throw e; 554 } 555 } 556 Encryptor encryptor = createEncryptor(conf, stat, client); 557 FanOutOneBlockAsyncDFSOutput output = 558 new FanOutOneBlockAsyncDFSOutput(conf, dfs, client, namenode, clientName, src, 559 stat.getFileId(), locatedBlock, encryptor, datanodeList, summer, ALLOC); 560 succ = true; 561 return output; 562 } catch (RemoteException e) { 563 LOG.warn("create fan-out dfs output {} failed, retry = {}", src, retry, e); 564 if (shouldRetryCreate(e)) { 565 if (retry >= createMaxRetries) { 566 throw e.unwrapRemoteException(); 567 } 568 } else { 569 throw e.unwrapRemoteException(); 570 } 571 } catch (IOException e) { 572 LOG.warn("create fan-out dfs output {} failed, retry = {}", src, retry, e); 573 if (retry >= createMaxRetries) { 574 throw e; 575 } 576 // overwrite the old broken file. 577 overwrite = true; 578 try { 579 Thread.sleep(ConnectionUtils.getPauseTime(100, retry)); 580 } catch (InterruptedException ie) { 581 throw new InterruptedIOException(); 582 } 583 } finally { 584 if (!succ) { 585 if (futureList != null) { 586 for (Future<Channel> f : futureList) { 587 f.addListener(new FutureListener<Channel>() { 588 589 @Override 590 public void operationComplete(Future<Channel> future) throws Exception { 591 if (future.isSuccess()) { 592 future.getNow().close(); 593 } 594 } 595 }); 596 } 597 } 598 endFileLease(client, stat.getFileId()); 599 } 600 } 601 } 602 } 603 604 /** 605 * Create a {@link FanOutOneBlockAsyncDFSOutput}. The method maybe blocked so do not call it 606 * inside an {@link EventLoop}. 607 */ 608 public static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, Path f, 609 boolean overwrite, boolean createParent, short replication, long blockSize, 610 EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) throws IOException { 611 return new FileSystemLinkResolver<FanOutOneBlockAsyncDFSOutput>() { 612 613 @Override 614 public FanOutOneBlockAsyncDFSOutput doCall(Path p) 615 throws IOException, UnresolvedLinkException { 616 return createOutput(dfs, p.toUri().getPath(), overwrite, createParent, replication, 617 blockSize, eventLoopGroup, channelClass); 618 } 619 620 @Override 621 public FanOutOneBlockAsyncDFSOutput next(FileSystem fs, Path p) throws IOException { 622 throw new UnsupportedOperationException(); 623 } 624 }.resolve(dfs, f); 625 } 626 627 public static boolean shouldRetryCreate(RemoteException e) { 628 // RetryStartFileException is introduced in HDFS 2.6+, so here we can only use the class name. 629 // For exceptions other than this, we just throw it out. This is same with 630 // DFSOutputStream.newStreamForCreate. 631 return e.getClassName().endsWith("RetryStartFileException"); 632 } 633 634 static void completeFile(DFSClient client, ClientProtocol namenode, String src, String clientName, 635 ExtendedBlock block, long fileId) { 636 for (int retry = 0;; retry++) { 637 try { 638 if (namenode.complete(src, clientName, block, fileId)) { 639 endFileLease(client, fileId); 640 return; 641 } else { 642 LOG.warn("complete file " + src + " not finished, retry = " + retry); 643 } 644 } catch (RemoteException e) { 645 IOException ioe = e.unwrapRemoteException(); 646 if (ioe instanceof LeaseExpiredException) { 647 LOG.warn("lease for file " + src + " is expired, give up", e); 648 return; 649 } else { 650 LOG.warn("complete file " + src + " failed, retry = " + retry, e); 651 } 652 } catch (Exception e) { 653 LOG.warn("complete file " + src + " failed, retry = " + retry, e); 654 } 655 sleepIgnoreInterrupt(retry); 656 } 657 } 658 659 static void sleepIgnoreInterrupt(int retry) { 660 try { 661 Thread.sleep(ConnectionUtils.getPauseTime(100, retry)); 662 } catch (InterruptedException e) { 663 } 664 } 665}