001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.asyncfs;
019
020import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.createEncryptor;
021import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate;
022import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
023import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
024import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
025import static org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage.PIPELINE_SETUP_CREATE;
026import static org.apache.hbase.thirdparty.io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS;
027import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE;
028
029import com.google.protobuf.CodedOutputStream;
030import java.io.IOException;
031import java.io.InterruptedIOException;
032import java.lang.reflect.InvocationTargetException;
033import java.lang.reflect.Method;
034import java.util.ArrayList;
035import java.util.EnumSet;
036import java.util.HashSet;
037import java.util.IdentityHashMap;
038import java.util.List;
039import java.util.Map;
040import java.util.Set;
041import java.util.concurrent.TimeUnit;
042import org.apache.hadoop.conf.Configuration;
043import org.apache.hadoop.crypto.CryptoProtocolVersion;
044import org.apache.hadoop.crypto.Encryptor;
045import org.apache.hadoop.fs.CreateFlag;
046import org.apache.hadoop.fs.FileSystem;
047import org.apache.hadoop.fs.FileSystemLinkResolver;
048import org.apache.hadoop.fs.Path;
049import org.apache.hadoop.fs.StorageType;
050import org.apache.hadoop.fs.UnresolvedLinkException;
051import org.apache.hadoop.fs.permission.FsPermission;
052import org.apache.hadoop.hbase.client.ConnectionUtils;
053import org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager;
054import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
055import org.apache.hadoop.hbase.util.CancelableProgressable;
056import org.apache.hadoop.hdfs.DFSClient;
057import org.apache.hadoop.hdfs.DFSOutputStream;
058import org.apache.hadoop.hdfs.DistributedFileSystem;
059import org.apache.hadoop.hdfs.protocol.ClientProtocol;
060import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
061import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
062import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
063import org.apache.hadoop.hdfs.protocol.LocatedBlock;
064import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
065import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
066import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
067import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
068import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
069import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.ECN;
070import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto;
071import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
072import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
073import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto;
074import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
075import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
076import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto;
077import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
078import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
079import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
080import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
081import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
082import org.apache.hadoop.io.EnumSetWritable;
083import org.apache.hadoop.ipc.RemoteException;
084import org.apache.hadoop.net.NetUtils;
085import org.apache.hadoop.security.token.Token;
086import org.apache.hadoop.util.DataChecksum;
087import org.apache.yetus.audience.InterfaceAudience;
088import org.slf4j.Logger;
089import org.slf4j.LoggerFactory;
090
091import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
092import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
093import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator;
094import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream;
095import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator;
096import org.apache.hbase.thirdparty.io.netty.channel.Channel;
097import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture;
098import org.apache.hbase.thirdparty.io.netty.channel.ChannelFutureListener;
099import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler;
100import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
101import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer;
102import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
103import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
104import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
105import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
106import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
107import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
108import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
109import org.apache.hbase.thirdparty.io.netty.util.concurrent.Future;
110import org.apache.hbase.thirdparty.io.netty.util.concurrent.FutureListener;
111import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise;
112
113/**
114 * Helper class for implementing {@link FanOutOneBlockAsyncDFSOutput}.
115 */
116@InterfaceAudience.Private
117public final class FanOutOneBlockAsyncDFSOutputHelper {
118  private static final Logger LOG =
119    LoggerFactory.getLogger(FanOutOneBlockAsyncDFSOutputHelper.class);
120
121  private FanOutOneBlockAsyncDFSOutputHelper() {
122  }
123
124  public static final String ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES = "hbase.fs.async.create.retries";
125
126  public static final int DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES = 10;
127  // use pooled allocator for performance.
128  private static final ByteBufAllocator ALLOC = PooledByteBufAllocator.DEFAULT;
129
130  // copied from DFSPacket since it is package private.
131  public static final long HEART_BEAT_SEQNO = -1L;
132
133  // Timeouts for communicating with DataNode for streaming writes/reads
134  public static final int READ_TIMEOUT = 60 * 1000;
135
136  private interface LeaseManager {
137
138    void begin(DFSClient client, long inodeId);
139
140    void end(DFSClient client, long inodeId);
141  }
142
143  private static final LeaseManager LEASE_MANAGER;
144
145  // helper class for creating files.
146  private interface FileCreator {
147    default HdfsFileStatus create(ClientProtocol instance, String src, FsPermission masked,
148      String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent, short replication,
149      long blockSize, CryptoProtocolVersion[] supportedVersions) throws Exception {
150      try {
151        return (HdfsFileStatus) createObject(instance, src, masked, clientName, flag, createParent,
152          replication, blockSize, supportedVersions);
153      } catch (InvocationTargetException e) {
154        if (e.getCause() instanceof Exception) {
155          throw (Exception) e.getCause();
156        } else {
157          throw new RuntimeException(e.getCause());
158        }
159      }
160    }
161
162    Object createObject(ClientProtocol instance, String src, FsPermission masked, String clientName,
163      EnumSetWritable<CreateFlag> flag, boolean createParent, short replication, long blockSize,
164      CryptoProtocolVersion[] supportedVersions) throws Exception;
165  }
166
167  private static final FileCreator FILE_CREATOR;
168
169  private static LeaseManager createLeaseManager() throws NoSuchMethodException {
170    Method beginFileLeaseMethod =
171      DFSClient.class.getDeclaredMethod("beginFileLease", long.class, DFSOutputStream.class);
172    beginFileLeaseMethod.setAccessible(true);
173    Method endFileLeaseMethod = DFSClient.class.getDeclaredMethod("endFileLease", long.class);
174    endFileLeaseMethod.setAccessible(true);
175    return new LeaseManager() {
176
177      @Override
178      public void begin(DFSClient client, long inodeId) {
179        try {
180          beginFileLeaseMethod.invoke(client, inodeId, null);
181        } catch (IllegalAccessException | InvocationTargetException e) {
182          throw new RuntimeException(e);
183        }
184      }
185
186      @Override
187      public void end(DFSClient client, long inodeId) {
188        try {
189          endFileLeaseMethod.invoke(client, inodeId);
190        } catch (IllegalAccessException | InvocationTargetException e) {
191          throw new RuntimeException(e);
192        }
193      }
194    };
195  }
196
197  private static FileCreator createFileCreator3_3() throws NoSuchMethodException {
198    Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class,
199      String.class, EnumSetWritable.class, boolean.class, short.class, long.class,
200      CryptoProtocolVersion[].class, String.class, String.class);
201
202    return (instance, src, masked, clientName, flag, createParent, replication, blockSize,
203      supportedVersions) -> {
204      return (HdfsFileStatus) createMethod.invoke(instance, src, masked, clientName, flag,
205        createParent, replication, blockSize, supportedVersions, null, null);
206    };
207  }
208
209  private static FileCreator createFileCreator3() throws NoSuchMethodException {
210    Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class,
211      String.class, EnumSetWritable.class, boolean.class, short.class, long.class,
212      CryptoProtocolVersion[].class, String.class);
213
214    return (instance, src, masked, clientName, flag, createParent, replication, blockSize,
215      supportedVersions) -> {
216      return (HdfsFileStatus) createMethod.invoke(instance, src, masked, clientName, flag,
217        createParent, replication, blockSize, supportedVersions, null);
218    };
219  }
220
221  private static FileCreator createFileCreator() throws NoSuchMethodException {
222    try {
223      return createFileCreator3_3();
224    } catch (NoSuchMethodException e) {
225      LOG.debug("ClientProtocol::create wrong number of arguments, should be hadoop 3.2 or below");
226    }
227
228    return createFileCreator3();
229  }
230
231  // cancel the processing if DFSClient is already closed.
232  static final class CancelOnClose implements CancelableProgressable {
233
234    private final DFSClient client;
235
236    public CancelOnClose(DFSClient client) {
237      this.client = client;
238    }
239
240    @Override
241    public boolean progress() {
242      return client.isClientRunning();
243    }
244  }
245
246  static {
247    try {
248      LEASE_MANAGER = createLeaseManager();
249      FILE_CREATOR = createFileCreator();
250    } catch (Exception e) {
251      String msg = "Couldn't properly initialize access to HDFS internals. Please "
252        + "update your WAL Provider to not make use of the 'asyncfs' provider. See "
253        + "HBASE-16110 for more information.";
254      LOG.error(msg, e);
255      throw new Error(msg, e);
256    }
257  }
258
259  static void beginFileLease(DFSClient client, long inodeId) {
260    LEASE_MANAGER.begin(client, inodeId);
261  }
262
263  static void endFileLease(DFSClient client, long inodeId) {
264    LEASE_MANAGER.end(client, inodeId);
265  }
266
267  static DataChecksum createChecksum(DFSClient client) {
268    return client.getConf().createChecksum(null);
269  }
270
271  static Status getStatus(PipelineAckProto ack) {
272    List<Integer> flagList = ack.getFlagList();
273    Integer headerFlag;
274    if (flagList.isEmpty()) {
275      Status reply = ack.getReply(0);
276      headerFlag = PipelineAck.combineHeader(ECN.DISABLED, reply);
277    } else {
278      headerFlag = flagList.get(0);
279    }
280    return PipelineAck.getStatusFromHeader(headerFlag);
281  }
282
283  private static void processWriteBlockResponse(Channel channel, DatanodeInfo dnInfo,
284    Promise<Channel> promise, int timeoutMs) {
285    channel.pipeline().addLast(new IdleStateHandler(timeoutMs, 0, 0, TimeUnit.MILLISECONDS),
286      new ProtobufVarint32FrameDecoder(),
287      new ProtobufDecoder(BlockOpResponseProto.getDefaultInstance()),
288      new SimpleChannelInboundHandler<BlockOpResponseProto>() {
289
290        @Override
291        protected void channelRead0(ChannelHandlerContext ctx, BlockOpResponseProto resp)
292          throws Exception {
293          Status pipelineStatus = resp.getStatus();
294          if (PipelineAck.isRestartOOBStatus(pipelineStatus)) {
295            throw new IOException("datanode " + dnInfo + " is restarting");
296          }
297          String logInfo = "ack with firstBadLink as " + resp.getFirstBadLink();
298          if (resp.getStatus() != Status.SUCCESS) {
299            if (resp.getStatus() == Status.ERROR_ACCESS_TOKEN) {
300              throw new InvalidBlockTokenException("Got access token error" + ", status message "
301                + resp.getMessage() + ", " + logInfo);
302            } else {
303              throw new IOException("Got error" + ", status=" + resp.getStatus().name()
304                + ", status message " + resp.getMessage() + ", " + logInfo);
305            }
306          }
307          // success
308          ChannelPipeline p = ctx.pipeline();
309          for (ChannelHandler handler; (handler = p.removeLast()) != null;) {
310            // do not remove all handlers because we may have wrap or unwrap handlers at the header
311            // of pipeline.
312            if (handler instanceof IdleStateHandler) {
313              break;
314            }
315          }
316          // Disable auto read here. Enable it after we setup the streaming pipeline in
317          // FanOutOneBLockAsyncDFSOutput.
318          ctx.channel().config().setAutoRead(false);
319          promise.trySuccess(ctx.channel());
320        }
321
322        @Override
323        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
324          promise.tryFailure(new IOException("connection to " + dnInfo + " is closed"));
325        }
326
327        @Override
328        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
329          if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == READER_IDLE) {
330            promise
331              .tryFailure(new IOException("Timeout(" + timeoutMs + "ms) waiting for response"));
332          } else {
333            super.userEventTriggered(ctx, evt);
334          }
335        }
336
337        @Override
338        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
339          promise.tryFailure(cause);
340        }
341      });
342  }
343
344  private static void requestWriteBlock(Channel channel, StorageType storageType,
345    OpWriteBlockProto.Builder writeBlockProtoBuilder) throws IOException {
346    OpWriteBlockProto proto =
347      writeBlockProtoBuilder.setStorageType(PBHelperClient.convertStorageType(storageType)).build();
348    int protoLen = proto.getSerializedSize();
349    ByteBuf buffer =
350      channel.alloc().buffer(3 + CodedOutputStream.computeRawVarint32Size(protoLen) + protoLen);
351    buffer.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
352    buffer.writeByte(Op.WRITE_BLOCK.code);
353    proto.writeDelimitedTo(new ByteBufOutputStream(buffer));
354    channel.writeAndFlush(buffer);
355  }
356
357  private static void initialize(Configuration conf, Channel channel, DatanodeInfo dnInfo,
358    StorageType storageType, OpWriteBlockProto.Builder writeBlockProtoBuilder, int timeoutMs,
359    DFSClient client, Token<BlockTokenIdentifier> accessToken, Promise<Channel> promise)
360    throws IOException {
361    Promise<Void> saslPromise = channel.eventLoop().newPromise();
362    trySaslNegotiate(conf, channel, dnInfo, timeoutMs, client, accessToken, saslPromise);
363    saslPromise.addListener(new FutureListener<Void>() {
364
365      @Override
366      public void operationComplete(Future<Void> future) throws Exception {
367        if (future.isSuccess()) {
368          // setup response processing pipeline first, then send request.
369          processWriteBlockResponse(channel, dnInfo, promise, timeoutMs);
370          requestWriteBlock(channel, storageType, writeBlockProtoBuilder);
371        } else {
372          promise.tryFailure(future.cause());
373        }
374      }
375    });
376  }
377
378  private static List<Future<Channel>> connectToDataNodes(Configuration conf, DFSClient client,
379    String clientName, LocatedBlock locatedBlock, long maxBytesRcvd, long latestGS,
380    BlockConstructionStage stage, DataChecksum summer, EventLoopGroup eventLoopGroup,
381    Class<? extends Channel> channelClass) {
382    StorageType[] storageTypes = locatedBlock.getStorageTypes();
383    DatanodeInfo[] datanodeInfos = locatedBlock.getLocations();
384    boolean connectToDnViaHostname =
385      conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
386    int timeoutMs = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT);
387    ExtendedBlock blockCopy = new ExtendedBlock(locatedBlock.getBlock());
388    blockCopy.setNumBytes(locatedBlock.getBlockSize());
389    ClientOperationHeaderProto header = ClientOperationHeaderProto.newBuilder()
390      .setBaseHeader(BaseHeaderProto.newBuilder().setBlock(PBHelperClient.convert(blockCopy))
391        .setToken(PBHelperClient.convert(locatedBlock.getBlockToken())))
392      .setClientName(clientName).build();
393    ChecksumProto checksumProto = DataTransferProtoUtil.toProto(summer);
394    OpWriteBlockProto.Builder writeBlockProtoBuilder =
395      OpWriteBlockProto.newBuilder().setHeader(header)
396        .setStage(OpWriteBlockProto.BlockConstructionStage.valueOf(stage.name())).setPipelineSize(1)
397        .setMinBytesRcvd(locatedBlock.getBlock().getNumBytes()).setMaxBytesRcvd(maxBytesRcvd)
398        .setLatestGenerationStamp(latestGS).setRequestedChecksum(checksumProto)
399        .setCachingStrategy(CachingStrategyProto.newBuilder().setDropBehind(true).build());
400    List<Future<Channel>> futureList = new ArrayList<>(datanodeInfos.length);
401    for (int i = 0; i < datanodeInfos.length; i++) {
402      DatanodeInfo dnInfo = datanodeInfos[i];
403      StorageType storageType = storageTypes[i];
404      Promise<Channel> promise = eventLoopGroup.next().newPromise();
405      futureList.add(promise);
406      String dnAddr = dnInfo.getXferAddr(connectToDnViaHostname);
407      new Bootstrap().group(eventLoopGroup).channel(channelClass)
408        .option(CONNECT_TIMEOUT_MILLIS, timeoutMs).handler(new ChannelInitializer<Channel>() {
409
410          @Override
411          protected void initChannel(Channel ch) throws Exception {
412            // we need to get the remote address of the channel so we can only move on after
413            // channel connected. Leave an empty implementation here because netty does not allow
414            // a null handler.
415          }
416        }).connect(NetUtils.createSocketAddr(dnAddr)).addListener(new ChannelFutureListener() {
417
418          @Override
419          public void operationComplete(ChannelFuture future) throws Exception {
420            if (future.isSuccess()) {
421              initialize(conf, future.channel(), dnInfo, storageType, writeBlockProtoBuilder,
422                timeoutMs, client, locatedBlock.getBlockToken(), promise);
423            } else {
424              promise.tryFailure(future.cause());
425            }
426          }
427        });
428    }
429    return futureList;
430  }
431
432  /**
433   * Exception other than RemoteException thrown when calling create on namenode
434   */
435  public static class NameNodeException extends IOException {
436
437    private static final long serialVersionUID = 3143237406477095390L;
438
439    public NameNodeException(Throwable cause) {
440      super(cause);
441    }
442  }
443
444  private static EnumSetWritable<CreateFlag> getCreateFlags(boolean overwrite) {
445    List<CreateFlag> flags = new ArrayList<>();
446    flags.add(CreateFlag.CREATE);
447    if (overwrite) {
448      flags.add(CreateFlag.OVERWRITE);
449    }
450    flags.add(CreateFlag.SHOULD_REPLICATE);
451    return new EnumSetWritable<>(EnumSet.copyOf(flags));
452  }
453
454  private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src,
455    boolean overwrite, boolean createParent, short replication, long blockSize,
456    EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass, StreamSlowMonitor monitor)
457    throws IOException {
458    Configuration conf = dfs.getConf();
459    DFSClient client = dfs.getClient();
460    String clientName = client.getClientName();
461    ClientProtocol namenode = client.getNamenode();
462    int createMaxRetries =
463      conf.getInt(ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES, DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES);
464    ExcludeDatanodeManager excludeDatanodeManager = monitor.getExcludeDatanodeManager();
465    Set<DatanodeInfo> toExcludeNodes =
466      new HashSet<>(excludeDatanodeManager.getExcludeDNs().keySet());
467    for (int retry = 0;; retry++) {
468      LOG.debug("When create output stream for {}, exclude list is {}, retry={}", src,
469        toExcludeNodes, retry);
470      HdfsFileStatus stat;
471      try {
472        stat = FILE_CREATOR.create(namenode, src,
473          FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName,
474          getCreateFlags(overwrite), createParent, replication, blockSize,
475          CryptoProtocolVersion.supported());
476      } catch (Exception e) {
477        if (e instanceof RemoteException) {
478          throw (RemoteException) e;
479        } else {
480          throw new NameNodeException(e);
481        }
482      }
483      beginFileLease(client, stat.getFileId());
484      boolean succ = false;
485      LocatedBlock locatedBlock = null;
486      List<Future<Channel>> futureList = null;
487      try {
488        DataChecksum summer = createChecksum(client);
489        locatedBlock = namenode.addBlock(src, client.getClientName(), null,
490          toExcludeNodes.toArray(new DatanodeInfo[0]), stat.getFileId(), null, null);
491        Map<Channel, DatanodeInfo> datanodes = new IdentityHashMap<>();
492        futureList = connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L,
493          PIPELINE_SETUP_CREATE, summer, eventLoopGroup, channelClass);
494        for (int i = 0, n = futureList.size(); i < n; i++) {
495          DatanodeInfo datanodeInfo = locatedBlock.getLocations()[i];
496          try {
497            datanodes.put(futureList.get(i).syncUninterruptibly().getNow(), datanodeInfo);
498          } catch (Exception e) {
499            // exclude the broken DN next time
500            toExcludeNodes.add(datanodeInfo);
501            excludeDatanodeManager.tryAddExcludeDN(datanodeInfo, "connect error");
502            throw e;
503          }
504        }
505        Encryptor encryptor = createEncryptor(conf, stat, client);
506        FanOutOneBlockAsyncDFSOutput output =
507          new FanOutOneBlockAsyncDFSOutput(conf, dfs, client, namenode, clientName, src,
508            stat.getFileId(), locatedBlock, encryptor, datanodes, summer, ALLOC, monitor);
509        succ = true;
510        return output;
511      } catch (RemoteException e) {
512        LOG.warn("create fan-out dfs output {} failed, retry = {}", src, retry, e);
513        if (shouldRetryCreate(e)) {
514          if (retry >= createMaxRetries) {
515            throw e.unwrapRemoteException();
516          }
517        } else {
518          throw e.unwrapRemoteException();
519        }
520      } catch (IOException e) {
521        LOG.warn("create fan-out dfs output {} failed, retry = {}", src, retry, e);
522        if (retry >= createMaxRetries) {
523          throw e;
524        }
525        // overwrite the old broken file.
526        overwrite = true;
527        try {
528          Thread.sleep(ConnectionUtils.getPauseTime(100, retry));
529        } catch (InterruptedException ie) {
530          throw new InterruptedIOException();
531        }
532      } finally {
533        if (!succ) {
534          if (futureList != null) {
535            for (Future<Channel> f : futureList) {
536              f.addListener(new FutureListener<Channel>() {
537
538                @Override
539                public void operationComplete(Future<Channel> future) throws Exception {
540                  if (future.isSuccess()) {
541                    future.getNow().close();
542                  }
543                }
544              });
545            }
546          }
547          endFileLease(client, stat.getFileId());
548        }
549      }
550    }
551  }
552
553  /**
554   * Create a {@link FanOutOneBlockAsyncDFSOutput}. The method maybe blocked so do not call it
555   * inside an {@link EventLoop}.
556   */
557  public static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, Path f,
558    boolean overwrite, boolean createParent, short replication, long blockSize,
559    EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass,
560    final StreamSlowMonitor monitor) throws IOException {
561    return new FileSystemLinkResolver<FanOutOneBlockAsyncDFSOutput>() {
562
563      @Override
564      public FanOutOneBlockAsyncDFSOutput doCall(Path p)
565        throws IOException, UnresolvedLinkException {
566        return createOutput(dfs, p.toUri().getPath(), overwrite, createParent, replication,
567          blockSize, eventLoopGroup, channelClass, monitor);
568      }
569
570      @Override
571      public FanOutOneBlockAsyncDFSOutput next(FileSystem fs, Path p) throws IOException {
572        throw new UnsupportedOperationException();
573      }
574    }.resolve(dfs, f);
575  }
576
577  public static boolean shouldRetryCreate(RemoteException e) {
578    // RetryStartFileException is introduced in HDFS 2.6+, so here we can only use the class name.
579    // For exceptions other than this, we just throw it out. This is same with
580    // DFSOutputStream.newStreamForCreate.
581    return e.getClassName().endsWith("RetryStartFileException");
582  }
583
584  static void completeFile(DFSClient client, ClientProtocol namenode, String src, String clientName,
585    ExtendedBlock block, long fileId) {
586    for (int retry = 0;; retry++) {
587      try {
588        if (namenode.complete(src, clientName, block, fileId)) {
589          endFileLease(client, fileId);
590          return;
591        } else {
592          LOG.warn("complete file " + src + " not finished, retry = " + retry);
593        }
594      } catch (RemoteException e) {
595        IOException ioe = e.unwrapRemoteException();
596        if (ioe instanceof LeaseExpiredException) {
597          LOG.warn("lease for file " + src + " is expired, give up", e);
598          return;
599        } else {
600          LOG.warn("complete file " + src + " failed, retry = " + retry, e);
601        }
602      } catch (Exception e) {
603        LOG.warn("complete file " + src + " failed, retry = " + retry, e);
604      }
605      sleepIgnoreInterrupt(retry);
606    }
607  }
608
609  static void sleepIgnoreInterrupt(int retry) {
610    try {
611      Thread.sleep(ConnectionUtils.getPauseTime(100, retry));
612    } catch (InterruptedException e) {
613    }
614  }
615}