001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.asyncfs;
019
020import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.createEncryptor;
021import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate;
022import static org.apache.hadoop.hbase.util.NettyFutureUtils.addListener;
023import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeClose;
024import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeWriteAndFlush;
025import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
026import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
027import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
028import static org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage.PIPELINE_SETUP_CREATE;
029import static org.apache.hbase.thirdparty.io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS;
030import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE;
031
032import com.google.protobuf.CodedOutputStream;
033import java.io.IOException;
034import java.io.InterruptedIOException;
035import java.lang.reflect.InvocationTargetException;
036import java.lang.reflect.Method;
037import java.util.ArrayList;
038import java.util.EnumSet;
039import java.util.HashSet;
040import java.util.IdentityHashMap;
041import java.util.List;
042import java.util.Map;
043import java.util.Set;
044import java.util.concurrent.TimeUnit;
045import org.apache.hadoop.conf.Configuration;
046import org.apache.hadoop.crypto.CryptoProtocolVersion;
047import org.apache.hadoop.crypto.Encryptor;
048import org.apache.hadoop.fs.CreateFlag;
049import org.apache.hadoop.fs.FileSystem;
050import org.apache.hadoop.fs.FileSystemLinkResolver;
051import org.apache.hadoop.fs.Path;
052import org.apache.hadoop.fs.StorageType;
053import org.apache.hadoop.fs.UnresolvedLinkException;
054import org.apache.hadoop.fs.permission.FsPermission;
055import org.apache.hadoop.hbase.client.ConnectionUtils;
056import org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager;
057import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
058import org.apache.hadoop.hbase.util.CancelableProgressable;
059import org.apache.hadoop.hdfs.DFSClient;
060import org.apache.hadoop.hdfs.DFSOutputStream;
061import org.apache.hadoop.hdfs.DistributedFileSystem;
062import org.apache.hadoop.hdfs.protocol.ClientProtocol;
063import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
064import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
065import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
066import org.apache.hadoop.hdfs.protocol.LocatedBlock;
067import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
068import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
069import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
070import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
071import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
072import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.ECN;
073import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto;
074import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
075import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
076import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto;
077import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
078import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
079import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto;
080import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
081import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
082import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
083import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
084import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
085import org.apache.hadoop.io.EnumSetWritable;
086import org.apache.hadoop.ipc.RemoteException;
087import org.apache.hadoop.net.NetUtils;
088import org.apache.hadoop.security.token.Token;
089import org.apache.hadoop.util.DataChecksum;
090import org.apache.yetus.audience.InterfaceAudience;
091import org.slf4j.Logger;
092import org.slf4j.LoggerFactory;
093
094import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
095import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
096import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator;
097import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream;
098import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator;
099import org.apache.hbase.thirdparty.io.netty.channel.Channel;
100import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture;
101import org.apache.hbase.thirdparty.io.netty.channel.ChannelFutureListener;
102import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler;
103import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
104import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer;
105import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
106import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
107import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
108import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
109import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
110import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
111import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
112import org.apache.hbase.thirdparty.io.netty.util.concurrent.Future;
113import org.apache.hbase.thirdparty.io.netty.util.concurrent.FutureListener;
114import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise;
115
116/**
117 * Helper class for implementing {@link FanOutOneBlockAsyncDFSOutput}.
118 */
119@InterfaceAudience.Private
120public final class FanOutOneBlockAsyncDFSOutputHelper {
121  private static final Logger LOG =
122    LoggerFactory.getLogger(FanOutOneBlockAsyncDFSOutputHelper.class);
123
124  private FanOutOneBlockAsyncDFSOutputHelper() {
125  }
126
127  public static final String ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES = "hbase.fs.async.create.retries";
128
129  public static final int DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES = 10;
130  // use pooled allocator for performance.
131  private static final ByteBufAllocator ALLOC = PooledByteBufAllocator.DEFAULT;
132
133  // copied from DFSPacket since it is package private.
134  public static final long HEART_BEAT_SEQNO = -1L;
135
136  // Timeouts for communicating with DataNode for streaming writes/reads
137  public static final int READ_TIMEOUT = 60 * 1000;
138
139  private interface LeaseManager {
140
141    void begin(DFSClient client, long inodeId);
142
143    void end(DFSClient client, long inodeId);
144  }
145
146  private static final LeaseManager LEASE_MANAGER;
147
148  // This is used to terminate a recoverFileLease call when FileSystem is already closed.
149  // isClientRunning is not public so we need to use reflection.
150  private interface DFSClientAdaptor {
151
152    boolean isClientRunning(DFSClient client);
153  }
154
155  private static final DFSClientAdaptor DFS_CLIENT_ADAPTOR;
156
157  // helper class for creating files.
158  private interface FileCreator {
159    default HdfsFileStatus create(ClientProtocol instance, String src, FsPermission masked,
160      String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent, short replication,
161      long blockSize, CryptoProtocolVersion[] supportedVersions) throws Exception {
162      try {
163        return (HdfsFileStatus) createObject(instance, src, masked, clientName, flag, createParent,
164          replication, blockSize, supportedVersions);
165      } catch (InvocationTargetException e) {
166        if (e.getCause() instanceof Exception) {
167          throw (Exception) e.getCause();
168        } else {
169          throw new RuntimeException(e.getCause());
170        }
171      }
172    };
173
174    Object createObject(ClientProtocol instance, String src, FsPermission masked, String clientName,
175      EnumSetWritable<CreateFlag> flag, boolean createParent, short replication, long blockSize,
176      CryptoProtocolVersion[] supportedVersions) throws Exception;
177  }
178
179  private static final FileCreator FILE_CREATOR;
180
181  // CreateFlag.SHOULD_REPLICATE is to make OutputStream on a EC directory support hflush/hsync, but
182  // EC is introduced in hadoop 3.x so we do not have this enum on 2.x, that's why we need to
183  // indirectly reference it through reflection.
184  private static final CreateFlag SHOULD_REPLICATE_FLAG;
185
186  private static DFSClientAdaptor createDFSClientAdaptor() throws NoSuchMethodException {
187    Method isClientRunningMethod = DFSClient.class.getDeclaredMethod("isClientRunning");
188    isClientRunningMethod.setAccessible(true);
189    return new DFSClientAdaptor() {
190
191      @Override
192      public boolean isClientRunning(DFSClient client) {
193        try {
194          return (Boolean) isClientRunningMethod.invoke(client);
195        } catch (IllegalAccessException | InvocationTargetException e) {
196          throw new RuntimeException(e);
197        }
198      }
199    };
200  }
201
202  private static LeaseManager createLeaseManager() throws NoSuchMethodException {
203    Method beginFileLeaseMethod =
204      DFSClient.class.getDeclaredMethod("beginFileLease", long.class, DFSOutputStream.class);
205    beginFileLeaseMethod.setAccessible(true);
206    Method endFileLeaseMethod = DFSClient.class.getDeclaredMethod("endFileLease", long.class);
207    endFileLeaseMethod.setAccessible(true);
208    return new LeaseManager() {
209
210      @Override
211      public void begin(DFSClient client, long inodeId) {
212        try {
213          beginFileLeaseMethod.invoke(client, inodeId, null);
214        } catch (IllegalAccessException | InvocationTargetException e) {
215          throw new RuntimeException(e);
216        }
217      }
218
219      @Override
220      public void end(DFSClient client, long inodeId) {
221        try {
222          endFileLeaseMethod.invoke(client, inodeId);
223        } catch (IllegalAccessException | InvocationTargetException e) {
224          throw new RuntimeException(e);
225        }
226      }
227    };
228  }
229
230  private static FileCreator createFileCreator3_3() throws NoSuchMethodException {
231    Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class,
232      String.class, EnumSetWritable.class, boolean.class, short.class, long.class,
233      CryptoProtocolVersion[].class, String.class, String.class);
234
235    return (instance, src, masked, clientName, flag, createParent, replication, blockSize,
236      supportedVersions) -> {
237      return (HdfsFileStatus) createMethod.invoke(instance, src, masked, clientName, flag,
238        createParent, replication, blockSize, supportedVersions, null, null);
239    };
240  }
241
242  private static FileCreator createFileCreator3() throws NoSuchMethodException {
243    Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class,
244      String.class, EnumSetWritable.class, boolean.class, short.class, long.class,
245      CryptoProtocolVersion[].class, String.class);
246
247    return (instance, src, masked, clientName, flag, createParent, replication, blockSize,
248      supportedVersions) -> {
249      return (HdfsFileStatus) createMethod.invoke(instance, src, masked, clientName, flag,
250        createParent, replication, blockSize, supportedVersions, null);
251    };
252  }
253
254  private static FileCreator createFileCreator2() throws NoSuchMethodException {
255    Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class,
256      String.class, EnumSetWritable.class, boolean.class, short.class, long.class,
257      CryptoProtocolVersion[].class);
258
259    return (instance, src, masked, clientName, flag, createParent, replication, blockSize,
260      supportedVersions) -> {
261      return (HdfsFileStatus) createMethod.invoke(instance, src, masked, clientName, flag,
262        createParent, replication, blockSize, supportedVersions);
263    };
264  }
265
266  private static FileCreator createFileCreator() throws NoSuchMethodException {
267    try {
268      return createFileCreator3_3();
269    } catch (NoSuchMethodException e) {
270      LOG.debug("ClientProtocol::create wrong number of arguments, should be hadoop 3.2 or below");
271    }
272
273    try {
274      return createFileCreator3();
275    } catch (NoSuchMethodException e) {
276      LOG.debug("ClientProtocol::create wrong number of arguments, should be hadoop 2.x");
277    }
278    return createFileCreator2();
279  }
280
281  private static CreateFlag loadShouldReplicateFlag() {
282    try {
283      return CreateFlag.valueOf("SHOULD_REPLICATE");
284    } catch (IllegalArgumentException e) {
285      LOG.debug("can not find SHOULD_REPLICATE flag, should be hadoop 2.x", e);
286      return null;
287    }
288  }
289
290  // cancel the processing if DFSClient is already closed.
291  static final class CancelOnClose implements CancelableProgressable {
292
293    private final DFSClient client;
294
295    public CancelOnClose(DFSClient client) {
296      this.client = client;
297    }
298
299    @Override
300    public boolean progress() {
301      return DFS_CLIENT_ADAPTOR.isClientRunning(client);
302    }
303  }
304
305  static {
306    try {
307      LEASE_MANAGER = createLeaseManager();
308      DFS_CLIENT_ADAPTOR = createDFSClientAdaptor();
309      FILE_CREATOR = createFileCreator();
310      SHOULD_REPLICATE_FLAG = loadShouldReplicateFlag();
311    } catch (Exception e) {
312      String msg = "Couldn't properly initialize access to HDFS internals. Please "
313        + "update your WAL Provider to not make use of the 'asyncfs' provider. See "
314        + "HBASE-16110 for more information.";
315      LOG.error(msg, e);
316      throw new Error(msg, e);
317    }
318  }
319
320  static void beginFileLease(DFSClient client, long inodeId) {
321    LEASE_MANAGER.begin(client, inodeId);
322  }
323
324  static void endFileLease(DFSClient client, long inodeId) {
325    LEASE_MANAGER.end(client, inodeId);
326  }
327
328  static DataChecksum createChecksum(DFSClient client) {
329    return client.getConf().createChecksum(null);
330  }
331
332  static Status getStatus(PipelineAckProto ack) {
333    List<Integer> flagList = ack.getFlagList();
334    Integer headerFlag;
335    if (flagList.isEmpty()) {
336      Status reply = ack.getReply(0);
337      headerFlag = PipelineAck.combineHeader(ECN.DISABLED, reply);
338    } else {
339      headerFlag = flagList.get(0);
340    }
341    return PipelineAck.getStatusFromHeader(headerFlag);
342  }
343
344  private static void processWriteBlockResponse(Channel channel, DatanodeInfo dnInfo,
345    Promise<Channel> promise, int timeoutMs) {
346    channel.pipeline().addLast(new IdleStateHandler(timeoutMs, 0, 0, TimeUnit.MILLISECONDS),
347      new ProtobufVarint32FrameDecoder(),
348      new ProtobufDecoder(BlockOpResponseProto.getDefaultInstance()),
349      new SimpleChannelInboundHandler<BlockOpResponseProto>() {
350
351        @Override
352        protected void channelRead0(ChannelHandlerContext ctx, BlockOpResponseProto resp)
353          throws Exception {
354          Status pipelineStatus = resp.getStatus();
355          if (PipelineAck.isRestartOOBStatus(pipelineStatus)) {
356            throw new IOException("datanode " + dnInfo + " is restarting");
357          }
358          String logInfo = "ack with firstBadLink as " + resp.getFirstBadLink();
359          if (resp.getStatus() != Status.SUCCESS) {
360            if (resp.getStatus() == Status.ERROR_ACCESS_TOKEN) {
361              throw new InvalidBlockTokenException("Got access token error" + ", status message "
362                + resp.getMessage() + ", " + logInfo);
363            } else {
364              throw new IOException("Got error" + ", status=" + resp.getStatus().name()
365                + ", status message " + resp.getMessage() + ", " + logInfo);
366            }
367          }
368          // success
369          ChannelPipeline p = ctx.pipeline();
370          for (ChannelHandler handler; (handler = p.removeLast()) != null;) {
371            // do not remove all handlers because we may have wrap or unwrap handlers at the header
372            // of pipeline.
373            if (handler instanceof IdleStateHandler) {
374              break;
375            }
376          }
377          // Disable auto read here. Enable it after we setup the streaming pipeline in
378          // FanOutOneBLockAsyncDFSOutput.
379          ctx.channel().config().setAutoRead(false);
380          promise.trySuccess(ctx.channel());
381        }
382
383        @Override
384        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
385          promise.tryFailure(new IOException("connection to " + dnInfo + " is closed"));
386        }
387
388        @Override
389        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
390          if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == READER_IDLE) {
391            promise
392              .tryFailure(new IOException("Timeout(" + timeoutMs + "ms) waiting for response"));
393          } else {
394            super.userEventTriggered(ctx, evt);
395          }
396        }
397
398        @Override
399        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
400          promise.tryFailure(cause);
401        }
402      });
403  }
404
405  private static void requestWriteBlock(Channel channel, StorageType storageType,
406    OpWriteBlockProto.Builder writeBlockProtoBuilder) throws IOException {
407    OpWriteBlockProto proto =
408      writeBlockProtoBuilder.setStorageType(PBHelperClient.convertStorageType(storageType)).build();
409    int protoLen = proto.getSerializedSize();
410    ByteBuf buffer =
411      channel.alloc().buffer(3 + CodedOutputStream.computeRawVarint32Size(protoLen) + protoLen);
412    buffer.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
413    buffer.writeByte(Op.WRITE_BLOCK.code);
414    proto.writeDelimitedTo(new ByteBufOutputStream(buffer));
415    safeWriteAndFlush(channel, buffer);
416  }
417
418  private static void initialize(Configuration conf, Channel channel, DatanodeInfo dnInfo,
419    StorageType storageType, OpWriteBlockProto.Builder writeBlockProtoBuilder, int timeoutMs,
420    DFSClient client, Token<BlockTokenIdentifier> accessToken, Promise<Channel> promise)
421    throws IOException {
422    Promise<Void> saslPromise = channel.eventLoop().newPromise();
423    trySaslNegotiate(conf, channel, dnInfo, timeoutMs, client, accessToken, saslPromise);
424    addListener(saslPromise, new FutureListener<Void>() {
425
426      @Override
427      public void operationComplete(Future<Void> future) throws Exception {
428        if (future.isSuccess()) {
429          // setup response processing pipeline first, then send request.
430          processWriteBlockResponse(channel, dnInfo, promise, timeoutMs);
431          requestWriteBlock(channel, storageType, writeBlockProtoBuilder);
432        } else {
433          promise.tryFailure(future.cause());
434        }
435      }
436    });
437  }
438
439  private static List<Future<Channel>> connectToDataNodes(Configuration conf, DFSClient client,
440    String clientName, LocatedBlock locatedBlock, long maxBytesRcvd, long latestGS,
441    BlockConstructionStage stage, DataChecksum summer, EventLoopGroup eventLoopGroup,
442    Class<? extends Channel> channelClass) {
443    StorageType[] storageTypes = locatedBlock.getStorageTypes();
444    DatanodeInfo[] datanodeInfos = locatedBlock.getLocations();
445    boolean connectToDnViaHostname =
446      conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
447    int timeoutMs = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT);
448    ExtendedBlock blockCopy = new ExtendedBlock(locatedBlock.getBlock());
449    blockCopy.setNumBytes(locatedBlock.getBlockSize());
450    ClientOperationHeaderProto header = ClientOperationHeaderProto.newBuilder()
451      .setBaseHeader(BaseHeaderProto.newBuilder().setBlock(PBHelperClient.convert(blockCopy))
452        .setToken(PBHelperClient.convert(locatedBlock.getBlockToken())))
453      .setClientName(clientName).build();
454    ChecksumProto checksumProto = DataTransferProtoUtil.toProto(summer);
455    OpWriteBlockProto.Builder writeBlockProtoBuilder =
456      OpWriteBlockProto.newBuilder().setHeader(header)
457        .setStage(OpWriteBlockProto.BlockConstructionStage.valueOf(stage.name())).setPipelineSize(1)
458        .setMinBytesRcvd(locatedBlock.getBlock().getNumBytes()).setMaxBytesRcvd(maxBytesRcvd)
459        .setLatestGenerationStamp(latestGS).setRequestedChecksum(checksumProto)
460        .setCachingStrategy(CachingStrategyProto.newBuilder().setDropBehind(true).build());
461    List<Future<Channel>> futureList = new ArrayList<>(datanodeInfos.length);
462    for (int i = 0; i < datanodeInfos.length; i++) {
463      DatanodeInfo dnInfo = datanodeInfos[i];
464      StorageType storageType = storageTypes[i];
465      Promise<Channel> promise = eventLoopGroup.next().newPromise();
466      futureList.add(promise);
467      String dnAddr = dnInfo.getXferAddr(connectToDnViaHostname);
468      addListener(new Bootstrap().group(eventLoopGroup).channel(channelClass)
469        .option(CONNECT_TIMEOUT_MILLIS, timeoutMs).handler(new ChannelInitializer<Channel>() {
470
471          @Override
472          protected void initChannel(Channel ch) throws Exception {
473            // we need to get the remote address of the channel so we can only move on after
474            // channel connected. Leave an empty implementation here because netty does not allow
475            // a null handler.
476          }
477        }).connect(NetUtils.createSocketAddr(dnAddr)), new ChannelFutureListener() {
478
479          @Override
480          public void operationComplete(ChannelFuture future) throws Exception {
481            if (future.isSuccess()) {
482              initialize(conf, future.channel(), dnInfo, storageType, writeBlockProtoBuilder,
483                timeoutMs, client, locatedBlock.getBlockToken(), promise);
484            } else {
485              promise.tryFailure(future.cause());
486            }
487          }
488        });
489    }
490    return futureList;
491  }
492
493  /**
494   * Exception other than RemoteException thrown when calling create on namenode
495   */
496  public static class NameNodeException extends IOException {
497
498    private static final long serialVersionUID = 3143237406477095390L;
499
500    public NameNodeException(Throwable cause) {
501      super(cause);
502    }
503  }
504
505  private static EnumSetWritable<CreateFlag> getCreateFlags(boolean overwrite) {
506    List<CreateFlag> flags = new ArrayList<>();
507    flags.add(CreateFlag.CREATE);
508    if (overwrite) {
509      flags.add(CreateFlag.OVERWRITE);
510    }
511    if (SHOULD_REPLICATE_FLAG != null) {
512      flags.add(SHOULD_REPLICATE_FLAG);
513    }
514    return new EnumSetWritable<>(EnumSet.copyOf(flags));
515  }
516
517  private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src,
518    boolean overwrite, boolean createParent, short replication, long blockSize,
519    EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass, StreamSlowMonitor monitor)
520    throws IOException {
521    Configuration conf = dfs.getConf();
522    DFSClient client = dfs.getClient();
523    String clientName = client.getClientName();
524    ClientProtocol namenode = client.getNamenode();
525    int createMaxRetries =
526      conf.getInt(ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES, DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES);
527    ExcludeDatanodeManager excludeDatanodeManager = monitor.getExcludeDatanodeManager();
528    Set<DatanodeInfo> toExcludeNodes =
529      new HashSet<>(excludeDatanodeManager.getExcludeDNs().keySet());
530    for (int retry = 0;; retry++) {
531      LOG.debug("When create output stream for {}, exclude list is {}, retry={}", src,
532        toExcludeNodes, retry);
533      HdfsFileStatus stat;
534      try {
535        stat = FILE_CREATOR.create(namenode, src,
536          FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName,
537          getCreateFlags(overwrite), createParent, replication, blockSize,
538          CryptoProtocolVersion.supported());
539      } catch (Exception e) {
540        if (e instanceof RemoteException) {
541          throw (RemoteException) e;
542        } else {
543          throw new NameNodeException(e);
544        }
545      }
546      beginFileLease(client, stat.getFileId());
547      boolean succ = false;
548      LocatedBlock locatedBlock = null;
549      List<Future<Channel>> futureList = null;
550      try {
551        DataChecksum summer = createChecksum(client);
552        locatedBlock = namenode.addBlock(src, client.getClientName(), null,
553          toExcludeNodes.toArray(new DatanodeInfo[0]), stat.getFileId(), null, null);
554        Map<Channel, DatanodeInfo> datanodes = new IdentityHashMap<>();
555        futureList = connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L,
556          PIPELINE_SETUP_CREATE, summer, eventLoopGroup, channelClass);
557        for (int i = 0, n = futureList.size(); i < n; i++) {
558          DatanodeInfo datanodeInfo = locatedBlock.getLocations()[i];
559          try {
560            datanodes.put(futureList.get(i).syncUninterruptibly().getNow(), datanodeInfo);
561          } catch (Exception e) {
562            // exclude the broken DN next time
563            toExcludeNodes.add(datanodeInfo);
564            excludeDatanodeManager.tryAddExcludeDN(datanodeInfo, "connect error");
565            throw e;
566          }
567        }
568        Encryptor encryptor = createEncryptor(conf, stat, client);
569        FanOutOneBlockAsyncDFSOutput output =
570          new FanOutOneBlockAsyncDFSOutput(conf, dfs, client, namenode, clientName, src,
571            stat.getFileId(), locatedBlock, encryptor, datanodes, summer, ALLOC, monitor);
572        succ = true;
573        return output;
574      } catch (RemoteException e) {
575        LOG.warn("create fan-out dfs output {} failed, retry = {}", src, retry, e);
576        if (shouldRetryCreate(e)) {
577          if (retry >= createMaxRetries) {
578            throw e.unwrapRemoteException();
579          }
580        } else {
581          throw e.unwrapRemoteException();
582        }
583      } catch (IOException e) {
584        LOG.warn("create fan-out dfs output {} failed, retry = {}", src, retry, e);
585        if (retry >= createMaxRetries) {
586          throw e;
587        }
588        // overwrite the old broken file.
589        overwrite = true;
590        try {
591          Thread.sleep(ConnectionUtils.getPauseTime(100, retry));
592        } catch (InterruptedException ie) {
593          throw new InterruptedIOException();
594        }
595      } finally {
596        if (!succ) {
597          if (futureList != null) {
598            for (Future<Channel> f : futureList) {
599              addListener(f, new FutureListener<Channel>() {
600
601                @Override
602                public void operationComplete(Future<Channel> future) throws Exception {
603                  if (future.isSuccess()) {
604                    safeClose(future.getNow());
605                  }
606                }
607              });
608            }
609          }
610          endFileLease(client, stat.getFileId());
611        }
612      }
613    }
614  }
615
616  /**
617   * Create a {@link FanOutOneBlockAsyncDFSOutput}. The method maybe blocked so do not call it
618   * inside an {@link EventLoop}.
619   */
620  public static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, Path f,
621    boolean overwrite, boolean createParent, short replication, long blockSize,
622    EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass,
623    final StreamSlowMonitor monitor) throws IOException {
624    return new FileSystemLinkResolver<FanOutOneBlockAsyncDFSOutput>() {
625
626      @Override
627      public FanOutOneBlockAsyncDFSOutput doCall(Path p)
628        throws IOException, UnresolvedLinkException {
629        return createOutput(dfs, p.toUri().getPath(), overwrite, createParent, replication,
630          blockSize, eventLoopGroup, channelClass, monitor);
631      }
632
633      @Override
634      public FanOutOneBlockAsyncDFSOutput next(FileSystem fs, Path p) throws IOException {
635        throw new UnsupportedOperationException();
636      }
637    }.resolve(dfs, f);
638  }
639
640  public static boolean shouldRetryCreate(RemoteException e) {
641    // RetryStartFileException is introduced in HDFS 2.6+, so here we can only use the class name.
642    // For exceptions other than this, we just throw it out. This is same with
643    // DFSOutputStream.newStreamForCreate.
644    return e.getClassName().endsWith("RetryStartFileException");
645  }
646
647  static void completeFile(DFSClient client, ClientProtocol namenode, String src, String clientName,
648    ExtendedBlock block, long fileId) {
649    for (int retry = 0;; retry++) {
650      try {
651        if (namenode.complete(src, clientName, block, fileId)) {
652          endFileLease(client, fileId);
653          return;
654        } else {
655          LOG.warn("complete file " + src + " not finished, retry = " + retry);
656        }
657      } catch (RemoteException e) {
658        IOException ioe = e.unwrapRemoteException();
659        if (ioe instanceof LeaseExpiredException) {
660          LOG.warn("lease for file " + src + " is expired, give up", e);
661          return;
662        } else {
663          LOG.warn("complete file " + src + " failed, retry = " + retry, e);
664        }
665      } catch (Exception e) {
666        LOG.warn("complete file " + src + " failed, retry = " + retry, e);
667      }
668      sleepIgnoreInterrupt(retry);
669    }
670  }
671
672  static void sleepIgnoreInterrupt(int retry) {
673    try {
674      Thread.sleep(ConnectionUtils.getPauseTime(100, retry));
675    } catch (InterruptedException e) {
676    }
677  }
678}