001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.asyncfs; 019 020import static org.apache.hadoop.fs.CreateFlag.CREATE; 021import static org.apache.hadoop.fs.CreateFlag.OVERWRITE; 022import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.createEncryptor; 023import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate; 024import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; 025import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME; 026import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT; 027import static org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage.PIPELINE_SETUP_CREATE; 028import static org.apache.hbase.thirdparty.io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS; 029import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE; 030 031import com.google.protobuf.CodedOutputStream; 032import java.io.IOException; 033import java.io.InterruptedIOException; 034import java.lang.reflect.InvocationTargetException; 035import java.lang.reflect.Method; 036import java.util.ArrayList; 037import java.util.EnumSet; 038import java.util.List; 039import java.util.concurrent.TimeUnit; 040import org.apache.commons.lang3.ArrayUtils; 041import org.apache.hadoop.conf.Configuration; 042import org.apache.hadoop.crypto.CryptoProtocolVersion; 043import org.apache.hadoop.crypto.Encryptor; 044import org.apache.hadoop.fs.CreateFlag; 045import org.apache.hadoop.fs.FileSystem; 046import org.apache.hadoop.fs.FileSystemLinkResolver; 047import org.apache.hadoop.fs.Path; 048import org.apache.hadoop.fs.UnresolvedLinkException; 049import org.apache.hadoop.fs.permission.FsPermission; 050import org.apache.hadoop.hbase.client.ConnectionUtils; 051import org.apache.hadoop.hbase.util.CancelableProgressable; 052import org.apache.hadoop.hbase.util.FSUtils; 053import org.apache.hadoop.hdfs.DFSClient; 054import org.apache.hadoop.hdfs.DFSOutputStream; 055import org.apache.hadoop.hdfs.DistributedFileSystem; 056import org.apache.hadoop.hdfs.protocol.ClientProtocol; 057import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 058import org.apache.hadoop.hdfs.protocol.ExtendedBlock; 059import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; 060import org.apache.hadoop.hdfs.protocol.LocatedBlock; 061import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; 062import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; 063import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; 064import org.apache.hadoop.hdfs.protocol.datatransfer.Op; 065import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; 066import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto; 067import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; 068import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto; 069import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto; 070import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto; 071import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto; 072import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto; 073import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; 074import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto; 075import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto; 076import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; 077import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; 078import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException; 079import org.apache.hadoop.io.EnumSetWritable; 080import org.apache.hadoop.ipc.RemoteException; 081import org.apache.hadoop.net.NetUtils; 082import org.apache.hadoop.security.proto.SecurityProtos.TokenProto; 083import org.apache.hadoop.security.token.Token; 084import org.apache.hadoop.util.DataChecksum; 085import org.apache.yetus.audience.InterfaceAudience; 086import org.slf4j.Logger; 087import org.slf4j.LoggerFactory; 088 089import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 090import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 091import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap; 092import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 093import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator; 094import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream; 095import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator; 096import org.apache.hbase.thirdparty.io.netty.channel.Channel; 097import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture; 098import org.apache.hbase.thirdparty.io.netty.channel.ChannelFutureListener; 099import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler; 100import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; 101import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer; 102import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline; 103import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; 104import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 105import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler; 106import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufDecoder; 107import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; 108import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent; 109import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler; 110import org.apache.hbase.thirdparty.io.netty.util.concurrent.Future; 111import org.apache.hbase.thirdparty.io.netty.util.concurrent.FutureListener; 112import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise; 113 114/** 115 * Helper class for implementing {@link FanOutOneBlockAsyncDFSOutput}. 116 */ 117@InterfaceAudience.Private 118public final class FanOutOneBlockAsyncDFSOutputHelper { 119 private static final Logger LOG = 120 LoggerFactory.getLogger(FanOutOneBlockAsyncDFSOutputHelper.class); 121 122 private FanOutOneBlockAsyncDFSOutputHelper() { 123 } 124 125 public static final String ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES = "hbase.fs.async.create.retries"; 126 127 public static final int DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES = 10; 128 // use pooled allocator for performance. 129 private static final ByteBufAllocator ALLOC = PooledByteBufAllocator.DEFAULT; 130 131 // copied from DFSPacket since it is package private. 132 public static final long HEART_BEAT_SEQNO = -1L; 133 134 // Timeouts for communicating with DataNode for streaming writes/reads 135 public static final int READ_TIMEOUT = 60 * 1000; 136 137 private static final DatanodeInfo[] EMPTY_DN_ARRAY = new DatanodeInfo[0]; 138 139 // helper class for getting Status from PipelineAckProto. In hadoop 2.6 or before, there is a 140 // getStatus method, and for hadoop 2.7 or after, the status is retrieved from flag. The flag may 141 // get from proto directly, or combined by the reply field of the proto and a ECN object. See 142 // createPipelineAckStatusGetter for more details. 143 private interface PipelineAckStatusGetter { 144 Status get(PipelineAckProto ack); 145 } 146 147 private static final PipelineAckStatusGetter PIPELINE_ACK_STATUS_GETTER; 148 149 // StorageType enum is placed under o.a.h.hdfs in hadoop 2.6 and o.a.h.fs in hadoop 2.7. So here 150 // we need to use reflection to set it.See createStorageTypeSetter for more details. 151 private interface StorageTypeSetter { 152 OpWriteBlockProto.Builder set(OpWriteBlockProto.Builder builder, Enum<?> storageType); 153 } 154 155 private static final StorageTypeSetter STORAGE_TYPE_SETTER; 156 157 // helper class for calling add block method on namenode. There is a addBlockFlags parameter for 158 // hadoop 2.8 or later. See createBlockAdder for more details. 159 private interface BlockAdder { 160 161 LocatedBlock addBlock(ClientProtocol namenode, String src, String clientName, 162 ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId, String[] favoredNodes) 163 throws IOException; 164 } 165 166 private static final BlockAdder BLOCK_ADDER; 167 168 private interface LeaseManager { 169 170 void begin(DFSClient client, long inodeId); 171 172 void end(DFSClient client, long inodeId); 173 } 174 175 private static final LeaseManager LEASE_MANAGER; 176 177 // This is used to terminate a recoverFileLease call when FileSystem is already closed. 178 // isClientRunning is not public so we need to use reflection. 179 private interface DFSClientAdaptor { 180 181 boolean isClientRunning(DFSClient client); 182 } 183 184 private static final DFSClientAdaptor DFS_CLIENT_ADAPTOR; 185 186 // helper class for convert protos. 187 private interface PBHelper { 188 189 ExtendedBlockProto convert(ExtendedBlock b); 190 191 TokenProto convert(Token<?> tok); 192 } 193 194 private static final PBHelper PB_HELPER; 195 196 // helper class for creating data checksum. 197 private interface ChecksumCreater { 198 DataChecksum createChecksum(DFSClient client); 199 } 200 201 private static final ChecksumCreater CHECKSUM_CREATER; 202 203 // helper class for creating files. 204 private interface FileCreator { 205 default HdfsFileStatus create(ClientProtocol instance, String src, FsPermission masked, 206 String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent, 207 short replication, long blockSize, CryptoProtocolVersion[] supportedVersions) 208 throws Exception { 209 try { 210 return (HdfsFileStatus) createObject(instance, src, masked, clientName, flag, createParent, 211 replication, blockSize, supportedVersions); 212 } catch (InvocationTargetException e) { 213 if (e.getCause() instanceof Exception) { 214 throw (Exception) e.getCause(); 215 } else { 216 throw new RuntimeException(e.getCause()); 217 } 218 } 219 }; 220 221 Object createObject(ClientProtocol instance, String src, FsPermission masked, String clientName, 222 EnumSetWritable<CreateFlag> flag, boolean createParent, short replication, long blockSize, 223 CryptoProtocolVersion[] supportedVersions) throws Exception; 224 } 225 226 private static final FileCreator FILE_CREATOR; 227 228 private static DFSClientAdaptor createDFSClientAdaptor() throws NoSuchMethodException { 229 Method isClientRunningMethod = DFSClient.class.getDeclaredMethod("isClientRunning"); 230 isClientRunningMethod.setAccessible(true); 231 return new DFSClientAdaptor() { 232 233 @Override 234 public boolean isClientRunning(DFSClient client) { 235 try { 236 return (Boolean) isClientRunningMethod.invoke(client); 237 } catch (IllegalAccessException | InvocationTargetException e) { 238 throw new RuntimeException(e); 239 } 240 } 241 }; 242 } 243 244 private static LeaseManager createLeaseManager() throws NoSuchMethodException { 245 Method beginFileLeaseMethod = 246 DFSClient.class.getDeclaredMethod("beginFileLease", long.class, DFSOutputStream.class); 247 beginFileLeaseMethod.setAccessible(true); 248 Method endFileLeaseMethod = DFSClient.class.getDeclaredMethod("endFileLease", long.class); 249 endFileLeaseMethod.setAccessible(true); 250 return new LeaseManager() { 251 252 @Override 253 public void begin(DFSClient client, long inodeId) { 254 try { 255 beginFileLeaseMethod.invoke(client, inodeId, null); 256 } catch (IllegalAccessException | InvocationTargetException e) { 257 throw new RuntimeException(e); 258 } 259 } 260 261 @Override 262 public void end(DFSClient client, long inodeId) { 263 try { 264 endFileLeaseMethod.invoke(client, inodeId); 265 } catch (IllegalAccessException | InvocationTargetException e) { 266 throw new RuntimeException(e); 267 } 268 } 269 }; 270 } 271 272 private static PipelineAckStatusGetter createPipelineAckStatusGetter27() 273 throws NoSuchMethodException { 274 Method getFlagListMethod = PipelineAckProto.class.getMethod("getFlagList"); 275 @SuppressWarnings("rawtypes") 276 Class<? extends Enum> ecnClass; 277 try { 278 ecnClass = Class.forName("org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck$ECN") 279 .asSubclass(Enum.class); 280 } catch (ClassNotFoundException e) { 281 String msg = "Couldn't properly initialize the PipelineAck.ECN class. Please " + 282 "update your WAL Provider to not make use of the 'asyncfs' provider. See " + 283 "HBASE-16110 for more information."; 284 LOG.error(msg, e); 285 throw new Error(msg, e); 286 } 287 @SuppressWarnings("unchecked") 288 Enum<?> disabledECN = Enum.valueOf(ecnClass, "DISABLED"); 289 Method getReplyMethod = PipelineAckProto.class.getMethod("getReply", int.class); 290 Method combineHeaderMethod = 291 PipelineAck.class.getMethod("combineHeader", ecnClass, Status.class); 292 Method getStatusFromHeaderMethod = 293 PipelineAck.class.getMethod("getStatusFromHeader", int.class); 294 return new PipelineAckStatusGetter() { 295 296 @Override 297 public Status get(PipelineAckProto ack) { 298 try { 299 @SuppressWarnings("unchecked") 300 List<Integer> flagList = (List<Integer>) getFlagListMethod.invoke(ack); 301 Integer headerFlag; 302 if (flagList.isEmpty()) { 303 Status reply = (Status) getReplyMethod.invoke(ack, 0); 304 headerFlag = (Integer) combineHeaderMethod.invoke(null, disabledECN, reply); 305 } else { 306 headerFlag = flagList.get(0); 307 } 308 return (Status) getStatusFromHeaderMethod.invoke(null, headerFlag); 309 } catch (IllegalAccessException | InvocationTargetException e) { 310 throw new RuntimeException(e); 311 } 312 } 313 }; 314 } 315 316 private static PipelineAckStatusGetter createPipelineAckStatusGetter26() 317 throws NoSuchMethodException { 318 Method getStatusMethod = PipelineAckProto.class.getMethod("getStatus", int.class); 319 return new PipelineAckStatusGetter() { 320 321 @Override 322 public Status get(PipelineAckProto ack) { 323 try { 324 return (Status) getStatusMethod.invoke(ack, 0); 325 } catch (IllegalAccessException | InvocationTargetException e) { 326 throw new RuntimeException(e); 327 } 328 } 329 }; 330 } 331 332 private static PipelineAckStatusGetter createPipelineAckStatusGetter() 333 throws NoSuchMethodException { 334 try { 335 return createPipelineAckStatusGetter27(); 336 } catch (NoSuchMethodException e) { 337 LOG.debug("Can not get expected method " + e.getMessage() + 338 ", this usually because your Hadoop is pre 2.7.0, " + 339 "try the methods in Hadoop 2.6.x instead."); 340 } 341 return createPipelineAckStatusGetter26(); 342 } 343 344 private static StorageTypeSetter createStorageTypeSetter() throws NoSuchMethodException { 345 Method setStorageTypeMethod = 346 OpWriteBlockProto.Builder.class.getMethod("setStorageType", StorageTypeProto.class); 347 ImmutableMap.Builder<String, StorageTypeProto> builder = ImmutableMap.builder(); 348 for (StorageTypeProto storageTypeProto : StorageTypeProto.values()) { 349 builder.put(storageTypeProto.name(), storageTypeProto); 350 } 351 ImmutableMap<String, StorageTypeProto> name2ProtoEnum = builder.build(); 352 return new StorageTypeSetter() { 353 354 @Override 355 public OpWriteBlockProto.Builder set(OpWriteBlockProto.Builder builder, Enum<?> storageType) { 356 Object protoEnum = name2ProtoEnum.get(storageType.name()); 357 try { 358 setStorageTypeMethod.invoke(builder, protoEnum); 359 } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { 360 throw new RuntimeException(e); 361 } 362 return builder; 363 } 364 }; 365 } 366 367 private static BlockAdder createBlockAdder() throws NoSuchMethodException { 368 for (Method method : ClientProtocol.class.getMethods()) { 369 if (method.getName().equals("addBlock")) { 370 Method addBlockMethod = method; 371 Class<?>[] paramTypes = addBlockMethod.getParameterTypes(); 372 if (paramTypes[paramTypes.length - 1] == String[].class) { 373 return new BlockAdder() { 374 375 @Override 376 public LocatedBlock addBlock(ClientProtocol namenode, String src, String clientName, 377 ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId, 378 String[] favoredNodes) throws IOException { 379 try { 380 return (LocatedBlock) addBlockMethod.invoke(namenode, src, clientName, previous, 381 excludeNodes, fileId, favoredNodes); 382 } catch (IllegalAccessException e) { 383 throw new RuntimeException(e); 384 } catch (InvocationTargetException e) { 385 Throwables.propagateIfPossible(e.getTargetException(), IOException.class); 386 throw new RuntimeException(e); 387 } 388 } 389 }; 390 } else { 391 return new BlockAdder() { 392 393 @Override 394 public LocatedBlock addBlock(ClientProtocol namenode, String src, String clientName, 395 ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId, 396 String[] favoredNodes) throws IOException { 397 try { 398 return (LocatedBlock) addBlockMethod.invoke(namenode, src, clientName, previous, 399 excludeNodes, fileId, favoredNodes, null); 400 } catch (IllegalAccessException e) { 401 throw new RuntimeException(e); 402 } catch (InvocationTargetException e) { 403 Throwables.propagateIfPossible(e.getTargetException(), IOException.class); 404 throw new RuntimeException(e); 405 } 406 } 407 }; 408 } 409 } 410 } 411 throw new NoSuchMethodException("Can not find addBlock method in ClientProtocol"); 412 } 413 414 private static PBHelper createPBHelper() throws NoSuchMethodException { 415 Class<?> helperClass; 416 String clazzName = "org.apache.hadoop.hdfs.protocolPB.PBHelperClient"; 417 try { 418 helperClass = Class.forName(clazzName); 419 } catch (ClassNotFoundException e) { 420 helperClass = org.apache.hadoop.hdfs.protocolPB.PBHelper.class; 421 LOG.debug("" + clazzName + " not found (Hadoop is pre-2.8.0?); using " + 422 helperClass.toString() + " instead."); 423 } 424 Method convertEBMethod = helperClass.getMethod("convert", ExtendedBlock.class); 425 Method convertTokenMethod = helperClass.getMethod("convert", Token.class); 426 return new PBHelper() { 427 428 @Override 429 public ExtendedBlockProto convert(ExtendedBlock b) { 430 try { 431 return (ExtendedBlockProto) convertEBMethod.invoke(null, b); 432 } catch (IllegalAccessException | InvocationTargetException e) { 433 throw new RuntimeException(e); 434 } 435 } 436 437 @Override 438 public TokenProto convert(Token<?> tok) { 439 try { 440 return (TokenProto) convertTokenMethod.invoke(null, tok); 441 } catch (IllegalAccessException | InvocationTargetException e) { 442 throw new RuntimeException(e); 443 } 444 } 445 }; 446 } 447 448 private static ChecksumCreater createChecksumCreater28(Method getConfMethod, Class<?> confClass) 449 throws NoSuchMethodException { 450 for (Method method : confClass.getMethods()) { 451 if (method.getName().equals("createChecksum")) { 452 Method createChecksumMethod = method; 453 return new ChecksumCreater() { 454 455 @Override 456 public DataChecksum createChecksum(DFSClient client) { 457 try { 458 return (DataChecksum) createChecksumMethod.invoke(getConfMethod.invoke(client), 459 (Object) null); 460 } catch (IllegalAccessException | InvocationTargetException e) { 461 throw new RuntimeException(e); 462 } 463 } 464 }; 465 } 466 } 467 throw new NoSuchMethodException("Can not find createChecksum method in DfsClientConf"); 468 } 469 470 private static ChecksumCreater createChecksumCreater27(Method getConfMethod, Class<?> confClass) 471 throws NoSuchMethodException { 472 Method createChecksumMethod = confClass.getDeclaredMethod("createChecksum"); 473 createChecksumMethod.setAccessible(true); 474 return new ChecksumCreater() { 475 476 @Override 477 public DataChecksum createChecksum(DFSClient client) { 478 try { 479 return (DataChecksum) createChecksumMethod.invoke(getConfMethod.invoke(client)); 480 } catch (IllegalAccessException | InvocationTargetException e) { 481 throw new RuntimeException(e); 482 } 483 } 484 }; 485 } 486 487 private static ChecksumCreater createChecksumCreater() 488 throws NoSuchMethodException, ClassNotFoundException { 489 Method getConfMethod = DFSClient.class.getMethod("getConf"); 490 try { 491 return createChecksumCreater28(getConfMethod, 492 Class.forName("org.apache.hadoop.hdfs.client.impl.DfsClientConf")); 493 } catch (ClassNotFoundException e) { 494 LOG.debug("No DfsClientConf class found, should be hadoop 2.7-", e); 495 } 496 return createChecksumCreater27(getConfMethod, 497 Class.forName("org.apache.hadoop.hdfs.DFSClient$Conf")); 498 } 499 500 private static FileCreator createFileCreator3() throws NoSuchMethodException { 501 Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class, 502 String.class, EnumSetWritable.class, boolean.class, short.class, long.class, 503 CryptoProtocolVersion[].class, String.class); 504 505 return (instance, src, masked, clientName, flag, createParent, replication, blockSize, 506 supportedVersions) -> { 507 return (HdfsFileStatus) createMethod.invoke(instance, src, masked, clientName, flag, 508 createParent, replication, blockSize, supportedVersions, null); 509 }; 510 } 511 512 private static FileCreator createFileCreator2() throws NoSuchMethodException { 513 Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class, 514 String.class, EnumSetWritable.class, boolean.class, short.class, long.class, 515 CryptoProtocolVersion[].class); 516 517 return (instance, src, masked, clientName, flag, createParent, replication, blockSize, 518 supportedVersions) -> { 519 return (HdfsFileStatus) createMethod.invoke(instance, src, masked, clientName, flag, 520 createParent, replication, blockSize, supportedVersions); 521 }; 522 } 523 524 private static FileCreator createFileCreator() throws NoSuchMethodException { 525 try { 526 return createFileCreator3(); 527 } catch (NoSuchMethodException e) { 528 LOG.debug("ClientProtocol::create wrong number of arguments, should be hadoop 2.x"); 529 } 530 return createFileCreator2(); 531 } 532 533 // cancel the processing if DFSClient is already closed. 534 static final class CancelOnClose implements CancelableProgressable { 535 536 private final DFSClient client; 537 538 public CancelOnClose(DFSClient client) { 539 this.client = client; 540 } 541 542 @Override 543 public boolean progress() { 544 return DFS_CLIENT_ADAPTOR.isClientRunning(client); 545 } 546 } 547 548 static { 549 try { 550 PIPELINE_ACK_STATUS_GETTER = createPipelineAckStatusGetter(); 551 STORAGE_TYPE_SETTER = createStorageTypeSetter(); 552 BLOCK_ADDER = createBlockAdder(); 553 LEASE_MANAGER = createLeaseManager(); 554 DFS_CLIENT_ADAPTOR = createDFSClientAdaptor(); 555 PB_HELPER = createPBHelper(); 556 CHECKSUM_CREATER = createChecksumCreater(); 557 FILE_CREATOR = createFileCreator(); 558 } catch (Exception e) { 559 String msg = "Couldn't properly initialize access to HDFS internals. Please " + 560 "update your WAL Provider to not make use of the 'asyncfs' provider. See " + 561 "HBASE-16110 for more information."; 562 LOG.error(msg, e); 563 throw new Error(msg, e); 564 } 565 } 566 567 static void beginFileLease(DFSClient client, long inodeId) { 568 LEASE_MANAGER.begin(client, inodeId); 569 } 570 571 static void endFileLease(DFSClient client, long inodeId) { 572 LEASE_MANAGER.end(client, inodeId); 573 } 574 575 static DataChecksum createChecksum(DFSClient client) { 576 return CHECKSUM_CREATER.createChecksum(client); 577 } 578 579 static Status getStatus(PipelineAckProto ack) { 580 return PIPELINE_ACK_STATUS_GETTER.get(ack); 581 } 582 583 private static void processWriteBlockResponse(Channel channel, DatanodeInfo dnInfo, 584 Promise<Channel> promise, int timeoutMs) { 585 channel.pipeline().addLast(new IdleStateHandler(timeoutMs, 0, 0, TimeUnit.MILLISECONDS), 586 new ProtobufVarint32FrameDecoder(), 587 new ProtobufDecoder(BlockOpResponseProto.getDefaultInstance()), 588 new SimpleChannelInboundHandler<BlockOpResponseProto>() { 589 590 @Override 591 protected void channelRead0(ChannelHandlerContext ctx, BlockOpResponseProto resp) 592 throws Exception { 593 Status pipelineStatus = resp.getStatus(); 594 if (PipelineAck.isRestartOOBStatus(pipelineStatus)) { 595 throw new IOException("datanode " + dnInfo + " is restarting"); 596 } 597 String logInfo = "ack with firstBadLink as " + resp.getFirstBadLink(); 598 if (resp.getStatus() != Status.SUCCESS) { 599 if (resp.getStatus() == Status.ERROR_ACCESS_TOKEN) { 600 throw new InvalidBlockTokenException("Got access token error" + ", status message " + 601 resp.getMessage() + ", " + logInfo); 602 } else { 603 throw new IOException("Got error" + ", status=" + resp.getStatus().name() + 604 ", status message " + resp.getMessage() + ", " + logInfo); 605 } 606 } 607 // success 608 ChannelPipeline p = ctx.pipeline(); 609 for (ChannelHandler handler; (handler = p.removeLast()) != null;) { 610 // do not remove all handlers because we may have wrap or unwrap handlers at the header 611 // of pipeline. 612 if (handler instanceof IdleStateHandler) { 613 break; 614 } 615 } 616 // Disable auto read here. Enable it after we setup the streaming pipeline in 617 // FanOutOneBLockAsyncDFSOutput. 618 ctx.channel().config().setAutoRead(false); 619 promise.trySuccess(ctx.channel()); 620 } 621 622 @Override 623 public void channelInactive(ChannelHandlerContext ctx) throws Exception { 624 promise.tryFailure(new IOException("connection to " + dnInfo + " is closed")); 625 } 626 627 @Override 628 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { 629 if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == READER_IDLE) { 630 promise 631 .tryFailure(new IOException("Timeout(" + timeoutMs + "ms) waiting for response")); 632 } else { 633 super.userEventTriggered(ctx, evt); 634 } 635 } 636 637 @Override 638 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 639 promise.tryFailure(cause); 640 } 641 }); 642 } 643 644 private static void requestWriteBlock(Channel channel, Enum<?> storageType, 645 OpWriteBlockProto.Builder writeBlockProtoBuilder) throws IOException { 646 OpWriteBlockProto proto = STORAGE_TYPE_SETTER.set(writeBlockProtoBuilder, storageType).build(); 647 int protoLen = proto.getSerializedSize(); 648 ByteBuf buffer = 649 channel.alloc().buffer(3 + CodedOutputStream.computeRawVarint32Size(protoLen) + protoLen); 650 buffer.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION); 651 buffer.writeByte(Op.WRITE_BLOCK.code); 652 proto.writeDelimitedTo(new ByteBufOutputStream(buffer)); 653 channel.writeAndFlush(buffer); 654 } 655 656 private static void initialize(Configuration conf, Channel channel, DatanodeInfo dnInfo, 657 Enum<?> storageType, OpWriteBlockProto.Builder writeBlockProtoBuilder, int timeoutMs, 658 DFSClient client, Token<BlockTokenIdentifier> accessToken, Promise<Channel> promise) 659 throws IOException { 660 Promise<Void> saslPromise = channel.eventLoop().newPromise(); 661 trySaslNegotiate(conf, channel, dnInfo, timeoutMs, client, accessToken, saslPromise); 662 saslPromise.addListener(new FutureListener<Void>() { 663 664 @Override 665 public void operationComplete(Future<Void> future) throws Exception { 666 if (future.isSuccess()) { 667 // setup response processing pipeline first, then send request. 668 processWriteBlockResponse(channel, dnInfo, promise, timeoutMs); 669 requestWriteBlock(channel, storageType, writeBlockProtoBuilder); 670 } else { 671 promise.tryFailure(future.cause()); 672 } 673 } 674 }); 675 } 676 677 private static List<Future<Channel>> connectToDataNodes(Configuration conf, DFSClient client, 678 String clientName, LocatedBlock locatedBlock, long maxBytesRcvd, long latestGS, 679 BlockConstructionStage stage, DataChecksum summer, EventLoopGroup eventLoopGroup, 680 Class<? extends Channel> channelClass) { 681 Enum<?>[] storageTypes = locatedBlock.getStorageTypes(); 682 DatanodeInfo[] datanodeInfos = locatedBlock.getLocations(); 683 boolean connectToDnViaHostname = 684 conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT); 685 int timeoutMs = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT); 686 ExtendedBlock blockCopy = new ExtendedBlock(locatedBlock.getBlock()); 687 blockCopy.setNumBytes(locatedBlock.getBlockSize()); 688 ClientOperationHeaderProto header = ClientOperationHeaderProto.newBuilder() 689 .setBaseHeader(BaseHeaderProto.newBuilder().setBlock(PB_HELPER.convert(blockCopy)) 690 .setToken(PB_HELPER.convert(locatedBlock.getBlockToken()))) 691 .setClientName(clientName).build(); 692 ChecksumProto checksumProto = DataTransferProtoUtil.toProto(summer); 693 OpWriteBlockProto.Builder writeBlockProtoBuilder = OpWriteBlockProto.newBuilder() 694 .setHeader(header).setStage(OpWriteBlockProto.BlockConstructionStage.valueOf(stage.name())) 695 .setPipelineSize(1).setMinBytesRcvd(locatedBlock.getBlock().getNumBytes()) 696 .setMaxBytesRcvd(maxBytesRcvd).setLatestGenerationStamp(latestGS) 697 .setRequestedChecksum(checksumProto) 698 .setCachingStrategy(CachingStrategyProto.newBuilder().setDropBehind(true).build()); 699 List<Future<Channel>> futureList = new ArrayList<>(datanodeInfos.length); 700 for (int i = 0; i < datanodeInfos.length; i++) { 701 DatanodeInfo dnInfo = datanodeInfos[i]; 702 Enum<?> storageType = storageTypes[i]; 703 Promise<Channel> promise = eventLoopGroup.next().newPromise(); 704 futureList.add(promise); 705 String dnAddr = dnInfo.getXferAddr(connectToDnViaHostname); 706 new Bootstrap().group(eventLoopGroup).channel(channelClass) 707 .option(CONNECT_TIMEOUT_MILLIS, timeoutMs).handler(new ChannelInitializer<Channel>() { 708 709 @Override 710 protected void initChannel(Channel ch) throws Exception { 711 // we need to get the remote address of the channel so we can only move on after 712 // channel connected. Leave an empty implementation here because netty does not allow 713 // a null handler. 714 } 715 }).connect(NetUtils.createSocketAddr(dnAddr)).addListener(new ChannelFutureListener() { 716 717 @Override 718 public void operationComplete(ChannelFuture future) throws Exception { 719 if (future.isSuccess()) { 720 initialize(conf, future.channel(), dnInfo, storageType, writeBlockProtoBuilder, 721 timeoutMs, client, locatedBlock.getBlockToken(), promise); 722 } else { 723 promise.tryFailure(future.cause()); 724 } 725 } 726 }); 727 } 728 return futureList; 729 } 730 731 /** 732 * Exception other than RemoteException thrown when calling create on namenode 733 */ 734 public static class NameNodeException extends IOException { 735 736 private static final long serialVersionUID = 3143237406477095390L; 737 738 public NameNodeException(Throwable cause) { 739 super(cause); 740 } 741 } 742 743 private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src, 744 boolean overwrite, boolean createParent, short replication, long blockSize, 745 EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) throws IOException { 746 Configuration conf = dfs.getConf(); 747 FSUtils fsUtils = FSUtils.getInstance(dfs, conf); 748 DFSClient client = dfs.getClient(); 749 String clientName = client.getClientName(); 750 ClientProtocol namenode = client.getNamenode(); 751 int createMaxRetries = conf.getInt(ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES, 752 DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES); 753 DatanodeInfo[] excludesNodes = EMPTY_DN_ARRAY; 754 for (int retry = 0;; retry++) { 755 HdfsFileStatus stat; 756 try { 757 stat = FILE_CREATOR.create(namenode, src, 758 FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName, 759 new EnumSetWritable<>(overwrite ? EnumSet.of(CREATE, OVERWRITE) : EnumSet.of(CREATE)), 760 createParent, replication, blockSize, CryptoProtocolVersion.supported()); 761 } catch (Exception e) { 762 if (e instanceof RemoteException) { 763 throw (RemoteException) e; 764 } else { 765 throw new NameNodeException(e); 766 } 767 } 768 beginFileLease(client, stat.getFileId()); 769 boolean succ = false; 770 LocatedBlock locatedBlock = null; 771 List<Future<Channel>> futureList = null; 772 try { 773 DataChecksum summer = createChecksum(client); 774 locatedBlock = BLOCK_ADDER.addBlock(namenode, src, client.getClientName(), null, 775 excludesNodes, stat.getFileId(), null); 776 List<Channel> datanodeList = new ArrayList<>(); 777 futureList = connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L, 778 PIPELINE_SETUP_CREATE, summer, eventLoopGroup, channelClass); 779 for (int i = 0, n = futureList.size(); i < n; i++) { 780 try { 781 datanodeList.add(futureList.get(i).syncUninterruptibly().getNow()); 782 } catch (Exception e) { 783 // exclude the broken DN next time 784 excludesNodes = ArrayUtils.add(excludesNodes, locatedBlock.getLocations()[i]); 785 throw e; 786 } 787 } 788 Encryptor encryptor = createEncryptor(conf, stat, client); 789 FanOutOneBlockAsyncDFSOutput output = 790 new FanOutOneBlockAsyncDFSOutput(conf, fsUtils, dfs, client, namenode, clientName, src, 791 stat.getFileId(), locatedBlock, encryptor, datanodeList, summer, ALLOC); 792 succ = true; 793 return output; 794 } catch (RemoteException e) { 795 LOG.warn("create fan-out dfs output {} failed, retry = {}", src, retry, e); 796 if (shouldRetryCreate(e)) { 797 if (retry >= createMaxRetries) { 798 throw e.unwrapRemoteException(); 799 } 800 } else { 801 throw e.unwrapRemoteException(); 802 } 803 } catch (IOException e) { 804 LOG.warn("create fan-out dfs output {} failed, retry = {}", src, retry, e); 805 if (retry >= createMaxRetries) { 806 throw e; 807 } 808 // overwrite the old broken file. 809 overwrite = true; 810 try { 811 Thread.sleep(ConnectionUtils.getPauseTime(100, retry)); 812 } catch (InterruptedException ie) { 813 throw new InterruptedIOException(); 814 } 815 } finally { 816 if (!succ) { 817 if (futureList != null) { 818 for (Future<Channel> f : futureList) { 819 f.addListener(new FutureListener<Channel>() { 820 821 @Override 822 public void operationComplete(Future<Channel> future) throws Exception { 823 if (future.isSuccess()) { 824 future.getNow().close(); 825 } 826 } 827 }); 828 } 829 } 830 endFileLease(client, stat.getFileId()); 831 } 832 } 833 } 834 } 835 836 /** 837 * Create a {@link FanOutOneBlockAsyncDFSOutput}. The method maybe blocked so do not call it 838 * inside an {@link EventLoop}. 839 */ 840 public static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, Path f, 841 boolean overwrite, boolean createParent, short replication, long blockSize, 842 EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) throws IOException { 843 return new FileSystemLinkResolver<FanOutOneBlockAsyncDFSOutput>() { 844 845 @Override 846 public FanOutOneBlockAsyncDFSOutput doCall(Path p) 847 throws IOException, UnresolvedLinkException { 848 return createOutput(dfs, p.toUri().getPath(), overwrite, createParent, replication, 849 blockSize, eventLoopGroup, channelClass); 850 } 851 852 @Override 853 public FanOutOneBlockAsyncDFSOutput next(FileSystem fs, Path p) throws IOException { 854 throw new UnsupportedOperationException(); 855 } 856 }.resolve(dfs, f); 857 } 858 859 public static boolean shouldRetryCreate(RemoteException e) { 860 // RetryStartFileException is introduced in HDFS 2.6+, so here we can only use the class name. 861 // For exceptions other than this, we just throw it out. This is same with 862 // DFSOutputStream.newStreamForCreate. 863 return e.getClassName().endsWith("RetryStartFileException"); 864 } 865 866 static void completeFile(DFSClient client, ClientProtocol namenode, String src, String clientName, 867 ExtendedBlock block, long fileId) { 868 for (int retry = 0;; retry++) { 869 try { 870 if (namenode.complete(src, clientName, block, fileId)) { 871 endFileLease(client, fileId); 872 return; 873 } else { 874 LOG.warn("complete file " + src + " not finished, retry = " + retry); 875 } 876 } catch (RemoteException e) { 877 IOException ioe = e.unwrapRemoteException(); 878 if (ioe instanceof LeaseExpiredException) { 879 LOG.warn("lease for file " + src + " is expired, give up", e); 880 return; 881 } else { 882 LOG.warn("complete file " + src + " failed, retry = " + retry, e); 883 } 884 } catch (Exception e) { 885 LOG.warn("complete file " + src + " failed, retry = " + retry, e); 886 } 887 sleepIgnoreInterrupt(retry); 888 } 889 } 890 891 static void sleepIgnoreInterrupt(int retry) { 892 try { 893 Thread.sleep(ConnectionUtils.getPauseTime(100, retry)); 894 } catch (InterruptedException e) { 895 } 896 } 897}