001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.asyncfs;
019
020import static org.apache.hadoop.fs.CreateFlag.CREATE;
021import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
022import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.createEncryptor;
023import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate;
024import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
025import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
026import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
027import static org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage.PIPELINE_SETUP_CREATE;
028import static org.apache.hbase.thirdparty.io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS;
029import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE;
030
031import com.google.protobuf.CodedOutputStream;
032import java.io.IOException;
033import java.io.InterruptedIOException;
034import java.lang.reflect.InvocationTargetException;
035import java.lang.reflect.Method;
036import java.util.ArrayList;
037import java.util.EnumSet;
038import java.util.List;
039import java.util.concurrent.TimeUnit;
040import org.apache.commons.lang3.ArrayUtils;
041import org.apache.hadoop.conf.Configuration;
042import org.apache.hadoop.crypto.CryptoProtocolVersion;
043import org.apache.hadoop.crypto.Encryptor;
044import org.apache.hadoop.fs.CreateFlag;
045import org.apache.hadoop.fs.FileSystem;
046import org.apache.hadoop.fs.FileSystemLinkResolver;
047import org.apache.hadoop.fs.Path;
048import org.apache.hadoop.fs.StorageType;
049import org.apache.hadoop.fs.UnresolvedLinkException;
050import org.apache.hadoop.fs.permission.FsPermission;
051import org.apache.hadoop.hbase.client.ConnectionUtils;
052import org.apache.hadoop.hbase.util.CancelableProgressable;
053import org.apache.hadoop.hbase.util.FSUtils;
054import org.apache.hadoop.hdfs.DFSClient;
055import org.apache.hadoop.hdfs.DFSOutputStream;
056import org.apache.hadoop.hdfs.DistributedFileSystem;
057import org.apache.hadoop.hdfs.protocol.ClientProtocol;
058import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
059import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
060import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
061import org.apache.hadoop.hdfs.protocol.LocatedBlock;
062import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
063import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
064import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
065import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
066import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
067import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.ECN;
068import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto;
069import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
070import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
071import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto;
072import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
073import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
074import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto;
075import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
076import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
077import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
078import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
079import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
080import org.apache.hadoop.io.EnumSetWritable;
081import org.apache.hadoop.ipc.RemoteException;
082import org.apache.hadoop.net.NetUtils;
083import org.apache.hadoop.security.token.Token;
084import org.apache.hadoop.util.DataChecksum;
085import org.apache.yetus.audience.InterfaceAudience;
086import org.slf4j.Logger;
087import org.slf4j.LoggerFactory;
088
089import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
090import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
091import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator;
092import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream;
093import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator;
094import org.apache.hbase.thirdparty.io.netty.channel.Channel;
095import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture;
096import org.apache.hbase.thirdparty.io.netty.channel.ChannelFutureListener;
097import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler;
098import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
099import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer;
100import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
101import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
102import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
103import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
104import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufDecoder;
105import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
106import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
107import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
108import org.apache.hbase.thirdparty.io.netty.util.concurrent.Future;
109import org.apache.hbase.thirdparty.io.netty.util.concurrent.FutureListener;
110import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise;
111
112/**
113 * Helper class for implementing {@link FanOutOneBlockAsyncDFSOutput}.
114 */
115@InterfaceAudience.Private
116public final class FanOutOneBlockAsyncDFSOutputHelper {
117  private static final Logger LOG =
118      LoggerFactory.getLogger(FanOutOneBlockAsyncDFSOutputHelper.class);
119
120  private FanOutOneBlockAsyncDFSOutputHelper() {
121  }
122
123  public static final String ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES = "hbase.fs.async.create.retries";
124
125  public static final int DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES = 10;
126  // use pooled allocator for performance.
127  private static final ByteBufAllocator ALLOC = PooledByteBufAllocator.DEFAULT;
128
129  // copied from DFSPacket since it is package private.
130  public static final long HEART_BEAT_SEQNO = -1L;
131
132  // Timeouts for communicating with DataNode for streaming writes/reads
133  public static final int READ_TIMEOUT = 60 * 1000;
134
135  private static final DatanodeInfo[] EMPTY_DN_ARRAY = new DatanodeInfo[0];
136
137  private interface LeaseManager {
138
139    void begin(DFSClient client, long inodeId);
140
141    void end(DFSClient client, long inodeId);
142  }
143
144  private static final LeaseManager LEASE_MANAGER;
145
146  // This is used to terminate a recoverFileLease call when FileSystem is already closed.
147  // isClientRunning is not public so we need to use reflection.
148  private interface DFSClientAdaptor {
149
150    boolean isClientRunning(DFSClient client);
151  }
152
153  private static final DFSClientAdaptor DFS_CLIENT_ADAPTOR;
154
155  // helper class for creating files.
156  private interface FileCreator {
157    default HdfsFileStatus create(ClientProtocol instance, String src, FsPermission masked,
158        String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent,
159        short replication, long blockSize, CryptoProtocolVersion[] supportedVersions)
160        throws Exception {
161      try {
162        return (HdfsFileStatus) createObject(instance, src, masked, clientName, flag, createParent,
163          replication, blockSize, supportedVersions);
164      } catch (InvocationTargetException e) {
165        if (e.getCause() instanceof Exception) {
166          throw (Exception) e.getCause();
167        } else {
168          throw new RuntimeException(e.getCause());
169        }
170      }
171    };
172
173    Object createObject(ClientProtocol instance, String src, FsPermission masked, String clientName,
174        EnumSetWritable<CreateFlag> flag, boolean createParent, short replication, long blockSize,
175        CryptoProtocolVersion[] supportedVersions) throws Exception;
176  }
177
178  private static final FileCreator FILE_CREATOR;
179
180  private static DFSClientAdaptor createDFSClientAdaptor() throws NoSuchMethodException {
181    Method isClientRunningMethod = DFSClient.class.getDeclaredMethod("isClientRunning");
182    isClientRunningMethod.setAccessible(true);
183    return new DFSClientAdaptor() {
184
185      @Override
186      public boolean isClientRunning(DFSClient client) {
187        try {
188          return (Boolean) isClientRunningMethod.invoke(client);
189        } catch (IllegalAccessException | InvocationTargetException e) {
190          throw new RuntimeException(e);
191        }
192      }
193    };
194  }
195
196  private static LeaseManager createLeaseManager() throws NoSuchMethodException {
197    Method beginFileLeaseMethod =
198        DFSClient.class.getDeclaredMethod("beginFileLease", long.class, DFSOutputStream.class);
199    beginFileLeaseMethod.setAccessible(true);
200    Method endFileLeaseMethod = DFSClient.class.getDeclaredMethod("endFileLease", long.class);
201    endFileLeaseMethod.setAccessible(true);
202    return new LeaseManager() {
203
204      @Override
205      public void begin(DFSClient client, long inodeId) {
206        try {
207          beginFileLeaseMethod.invoke(client, inodeId, null);
208        } catch (IllegalAccessException | InvocationTargetException e) {
209          throw new RuntimeException(e);
210        }
211      }
212
213      @Override
214      public void end(DFSClient client, long inodeId) {
215        try {
216          endFileLeaseMethod.invoke(client, inodeId);
217        } catch (IllegalAccessException | InvocationTargetException e) {
218          throw new RuntimeException(e);
219        }
220      }
221    };
222  }
223
224  private static FileCreator createFileCreator3() throws NoSuchMethodException {
225    Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class,
226      String.class, EnumSetWritable.class, boolean.class, short.class, long.class,
227      CryptoProtocolVersion[].class, String.class);
228
229    return (instance, src, masked, clientName, flag, createParent, replication, blockSize,
230        supportedVersions) -> {
231      return (HdfsFileStatus) createMethod.invoke(instance, src, masked, clientName, flag,
232        createParent, replication, blockSize, supportedVersions, null);
233    };
234  }
235
236  private static FileCreator createFileCreator2() throws NoSuchMethodException {
237    Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class,
238      String.class, EnumSetWritable.class, boolean.class, short.class, long.class,
239      CryptoProtocolVersion[].class);
240
241    return (instance, src, masked, clientName, flag, createParent, replication, blockSize,
242        supportedVersions) -> {
243      return (HdfsFileStatus) createMethod.invoke(instance, src, masked, clientName, flag,
244        createParent, replication, blockSize, supportedVersions);
245    };
246  }
247
248  private static FileCreator createFileCreator() throws NoSuchMethodException {
249    try {
250      return createFileCreator3();
251    } catch (NoSuchMethodException e) {
252      LOG.debug("ClientProtocol::create wrong number of arguments, should be hadoop 2.x");
253    }
254    return createFileCreator2();
255  }
256
257  // cancel the processing if DFSClient is already closed.
258  static final class CancelOnClose implements CancelableProgressable {
259
260    private final DFSClient client;
261
262    public CancelOnClose(DFSClient client) {
263      this.client = client;
264    }
265
266    @Override
267    public boolean progress() {
268      return DFS_CLIENT_ADAPTOR.isClientRunning(client);
269    }
270  }
271
272  static {
273    try {
274      LEASE_MANAGER = createLeaseManager();
275      DFS_CLIENT_ADAPTOR = createDFSClientAdaptor();
276      FILE_CREATOR = createFileCreator();
277    } catch (Exception e) {
278      String msg = "Couldn't properly initialize access to HDFS internals. Please " +
279          "update your WAL Provider to not make use of the 'asyncfs' provider. See " +
280          "HBASE-16110 for more information.";
281      LOG.error(msg, e);
282      throw new Error(msg, e);
283    }
284  }
285
286  static void beginFileLease(DFSClient client, long inodeId) {
287    LEASE_MANAGER.begin(client, inodeId);
288  }
289
290  static void endFileLease(DFSClient client, long inodeId) {
291    LEASE_MANAGER.end(client, inodeId);
292  }
293
294  static DataChecksum createChecksum(DFSClient client) {
295    return client.getConf().createChecksum(null);
296  }
297
298  static Status getStatus(PipelineAckProto ack) {
299    List<Integer> flagList = ack.getFlagList();
300    Integer headerFlag;
301    if (flagList.isEmpty()) {
302      Status reply = ack.getReply(0);
303      headerFlag = PipelineAck.combineHeader(ECN.DISABLED, reply);
304    } else {
305      headerFlag = flagList.get(0);
306    }
307    return PipelineAck.getStatusFromHeader(headerFlag);
308  }
309
310  private static void processWriteBlockResponse(Channel channel, DatanodeInfo dnInfo,
311      Promise<Channel> promise, int timeoutMs) {
312    channel.pipeline().addLast(new IdleStateHandler(timeoutMs, 0, 0, TimeUnit.MILLISECONDS),
313      new ProtobufVarint32FrameDecoder(),
314      new ProtobufDecoder(BlockOpResponseProto.getDefaultInstance()),
315      new SimpleChannelInboundHandler<BlockOpResponseProto>() {
316
317        @Override
318        protected void channelRead0(ChannelHandlerContext ctx, BlockOpResponseProto resp)
319            throws Exception {
320          Status pipelineStatus = resp.getStatus();
321          if (PipelineAck.isRestartOOBStatus(pipelineStatus)) {
322            throw new IOException("datanode " + dnInfo + " is restarting");
323          }
324          String logInfo = "ack with firstBadLink as " + resp.getFirstBadLink();
325          if (resp.getStatus() != Status.SUCCESS) {
326            if (resp.getStatus() == Status.ERROR_ACCESS_TOKEN) {
327              throw new InvalidBlockTokenException("Got access token error" + ", status message " +
328                  resp.getMessage() + ", " + logInfo);
329            } else {
330              throw new IOException("Got error" + ", status=" + resp.getStatus().name() +
331                  ", status message " + resp.getMessage() + ", " + logInfo);
332            }
333          }
334          // success
335          ChannelPipeline p = ctx.pipeline();
336          for (ChannelHandler handler; (handler = p.removeLast()) != null;) {
337            // do not remove all handlers because we may have wrap or unwrap handlers at the header
338            // of pipeline.
339            if (handler instanceof IdleStateHandler) {
340              break;
341            }
342          }
343          // Disable auto read here. Enable it after we setup the streaming pipeline in
344          // FanOutOneBLockAsyncDFSOutput.
345          ctx.channel().config().setAutoRead(false);
346          promise.trySuccess(ctx.channel());
347        }
348
349        @Override
350        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
351          promise.tryFailure(new IOException("connection to " + dnInfo + " is closed"));
352        }
353
354        @Override
355        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
356          if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == READER_IDLE) {
357            promise
358                .tryFailure(new IOException("Timeout(" + timeoutMs + "ms) waiting for response"));
359          } else {
360            super.userEventTriggered(ctx, evt);
361          }
362        }
363
364        @Override
365        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
366          promise.tryFailure(cause);
367        }
368      });
369  }
370
371  private static void requestWriteBlock(Channel channel, StorageType storageType,
372      OpWriteBlockProto.Builder writeBlockProtoBuilder) throws IOException {
373    OpWriteBlockProto proto =
374      writeBlockProtoBuilder.setStorageType(PBHelperClient.convertStorageType(storageType)).build();
375    int protoLen = proto.getSerializedSize();
376    ByteBuf buffer =
377      channel.alloc().buffer(3 + CodedOutputStream.computeRawVarint32Size(protoLen) + protoLen);
378    buffer.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
379    buffer.writeByte(Op.WRITE_BLOCK.code);
380    proto.writeDelimitedTo(new ByteBufOutputStream(buffer));
381    channel.writeAndFlush(buffer);
382  }
383
384  private static void initialize(Configuration conf, Channel channel, DatanodeInfo dnInfo,
385      StorageType storageType, OpWriteBlockProto.Builder writeBlockProtoBuilder, int timeoutMs,
386      DFSClient client, Token<BlockTokenIdentifier> accessToken, Promise<Channel> promise)
387      throws IOException {
388    Promise<Void> saslPromise = channel.eventLoop().newPromise();
389    trySaslNegotiate(conf, channel, dnInfo, timeoutMs, client, accessToken, saslPromise);
390    saslPromise.addListener(new FutureListener<Void>() {
391
392      @Override
393      public void operationComplete(Future<Void> future) throws Exception {
394        if (future.isSuccess()) {
395          // setup response processing pipeline first, then send request.
396          processWriteBlockResponse(channel, dnInfo, promise, timeoutMs);
397          requestWriteBlock(channel, storageType, writeBlockProtoBuilder);
398        } else {
399          promise.tryFailure(future.cause());
400        }
401      }
402    });
403  }
404
405  private static List<Future<Channel>> connectToDataNodes(Configuration conf, DFSClient client,
406      String clientName, LocatedBlock locatedBlock, long maxBytesRcvd, long latestGS,
407      BlockConstructionStage stage, DataChecksum summer, EventLoopGroup eventLoopGroup,
408      Class<? extends Channel> channelClass) {
409    StorageType[] storageTypes = locatedBlock.getStorageTypes();
410    DatanodeInfo[] datanodeInfos = locatedBlock.getLocations();
411    boolean connectToDnViaHostname =
412        conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
413    int timeoutMs = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT);
414    ExtendedBlock blockCopy = new ExtendedBlock(locatedBlock.getBlock());
415    blockCopy.setNumBytes(locatedBlock.getBlockSize());
416    ClientOperationHeaderProto header = ClientOperationHeaderProto.newBuilder()
417      .setBaseHeader(BaseHeaderProto.newBuilder().setBlock(PBHelperClient.convert(blockCopy))
418        .setToken(PBHelperClient.convert(locatedBlock.getBlockToken())))
419      .setClientName(clientName).build();
420    ChecksumProto checksumProto = DataTransferProtoUtil.toProto(summer);
421    OpWriteBlockProto.Builder writeBlockProtoBuilder = OpWriteBlockProto.newBuilder()
422        .setHeader(header).setStage(OpWriteBlockProto.BlockConstructionStage.valueOf(stage.name()))
423        .setPipelineSize(1).setMinBytesRcvd(locatedBlock.getBlock().getNumBytes())
424        .setMaxBytesRcvd(maxBytesRcvd).setLatestGenerationStamp(latestGS)
425        .setRequestedChecksum(checksumProto)
426        .setCachingStrategy(CachingStrategyProto.newBuilder().setDropBehind(true).build());
427    List<Future<Channel>> futureList = new ArrayList<>(datanodeInfos.length);
428    for (int i = 0; i < datanodeInfos.length; i++) {
429      DatanodeInfo dnInfo = datanodeInfos[i];
430      StorageType storageType = storageTypes[i];
431      Promise<Channel> promise = eventLoopGroup.next().newPromise();
432      futureList.add(promise);
433      String dnAddr = dnInfo.getXferAddr(connectToDnViaHostname);
434      new Bootstrap().group(eventLoopGroup).channel(channelClass)
435          .option(CONNECT_TIMEOUT_MILLIS, timeoutMs).handler(new ChannelInitializer<Channel>() {
436
437            @Override
438            protected void initChannel(Channel ch) throws Exception {
439              // we need to get the remote address of the channel so we can only move on after
440              // channel connected. Leave an empty implementation here because netty does not allow
441              // a null handler.
442            }
443          }).connect(NetUtils.createSocketAddr(dnAddr)).addListener(new ChannelFutureListener() {
444
445            @Override
446            public void operationComplete(ChannelFuture future) throws Exception {
447              if (future.isSuccess()) {
448                initialize(conf, future.channel(), dnInfo, storageType, writeBlockProtoBuilder,
449                  timeoutMs, client, locatedBlock.getBlockToken(), promise);
450              } else {
451                promise.tryFailure(future.cause());
452              }
453            }
454          });
455    }
456    return futureList;
457  }
458
459  /**
460   * Exception other than RemoteException thrown when calling create on namenode
461   */
462  public static class NameNodeException extends IOException {
463
464    private static final long serialVersionUID = 3143237406477095390L;
465
466    public NameNodeException(Throwable cause) {
467      super(cause);
468    }
469  }
470
471  private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src,
472      boolean overwrite, boolean createParent, short replication, long blockSize,
473      EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) throws IOException {
474    Configuration conf = dfs.getConf();
475    FSUtils fsUtils = FSUtils.getInstance(dfs, conf);
476    DFSClient client = dfs.getClient();
477    String clientName = client.getClientName();
478    ClientProtocol namenode = client.getNamenode();
479    int createMaxRetries = conf.getInt(ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES,
480      DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES);
481    DatanodeInfo[] excludesNodes = EMPTY_DN_ARRAY;
482    for (int retry = 0;; retry++) {
483      HdfsFileStatus stat;
484      try {
485        stat = FILE_CREATOR.create(namenode, src,
486          FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName,
487          new EnumSetWritable<>(overwrite ? EnumSet.of(CREATE, OVERWRITE) : EnumSet.of(CREATE)),
488          createParent, replication, blockSize, CryptoProtocolVersion.supported());
489      } catch (Exception e) {
490        if (e instanceof RemoteException) {
491          throw (RemoteException) e;
492        } else {
493          throw new NameNodeException(e);
494        }
495      }
496      beginFileLease(client, stat.getFileId());
497      boolean succ = false;
498      LocatedBlock locatedBlock = null;
499      List<Future<Channel>> futureList = null;
500      try {
501        DataChecksum summer = createChecksum(client);
502        locatedBlock = namenode.addBlock(src, client.getClientName(), null, excludesNodes,
503          stat.getFileId(), null, null);
504        List<Channel> datanodeList = new ArrayList<>();
505        futureList = connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L,
506          PIPELINE_SETUP_CREATE, summer, eventLoopGroup, channelClass);
507        for (int i = 0, n = futureList.size(); i < n; i++) {
508          try {
509            datanodeList.add(futureList.get(i).syncUninterruptibly().getNow());
510          } catch (Exception e) {
511            // exclude the broken DN next time
512            excludesNodes = ArrayUtils.add(excludesNodes, locatedBlock.getLocations()[i]);
513            throw e;
514          }
515        }
516        Encryptor encryptor = createEncryptor(conf, stat, client);
517        FanOutOneBlockAsyncDFSOutput output =
518          new FanOutOneBlockAsyncDFSOutput(conf, fsUtils, dfs, client, namenode, clientName, src,
519              stat.getFileId(), locatedBlock, encryptor, datanodeList, summer, ALLOC);
520        succ = true;
521        return output;
522      } catch (RemoteException e) {
523        LOG.warn("create fan-out dfs output {} failed, retry = {}", src, retry, e);
524        if (shouldRetryCreate(e)) {
525          if (retry >= createMaxRetries) {
526            throw e.unwrapRemoteException();
527          }
528        } else {
529          throw e.unwrapRemoteException();
530        }
531      } catch (IOException e) {
532        LOG.warn("create fan-out dfs output {} failed, retry = {}", src, retry, e);
533        if (retry >= createMaxRetries) {
534          throw e;
535        }
536        // overwrite the old broken file.
537        overwrite = true;
538        try {
539          Thread.sleep(ConnectionUtils.getPauseTime(100, retry));
540        } catch (InterruptedException ie) {
541          throw new InterruptedIOException();
542        }
543      } finally {
544        if (!succ) {
545          if (futureList != null) {
546            for (Future<Channel> f : futureList) {
547              f.addListener(new FutureListener<Channel>() {
548
549                @Override
550                public void operationComplete(Future<Channel> future) throws Exception {
551                  if (future.isSuccess()) {
552                    future.getNow().close();
553                  }
554                }
555              });
556            }
557          }
558          endFileLease(client, stat.getFileId());
559        }
560      }
561    }
562  }
563
564  /**
565   * Create a {@link FanOutOneBlockAsyncDFSOutput}. The method maybe blocked so do not call it
566   * inside an {@link EventLoop}.
567   */
568  public static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, Path f,
569      boolean overwrite, boolean createParent, short replication, long blockSize,
570      EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) throws IOException {
571    return new FileSystemLinkResolver<FanOutOneBlockAsyncDFSOutput>() {
572
573      @Override
574      public FanOutOneBlockAsyncDFSOutput doCall(Path p)
575          throws IOException, UnresolvedLinkException {
576        return createOutput(dfs, p.toUri().getPath(), overwrite, createParent, replication,
577          blockSize, eventLoopGroup, channelClass);
578      }
579
580      @Override
581      public FanOutOneBlockAsyncDFSOutput next(FileSystem fs, Path p) throws IOException {
582        throw new UnsupportedOperationException();
583      }
584    }.resolve(dfs, f);
585  }
586
587  public static boolean shouldRetryCreate(RemoteException e) {
588    // RetryStartFileException is introduced in HDFS 2.6+, so here we can only use the class name.
589    // For exceptions other than this, we just throw it out. This is same with
590    // DFSOutputStream.newStreamForCreate.
591    return e.getClassName().endsWith("RetryStartFileException");
592  }
593
594  static void completeFile(DFSClient client, ClientProtocol namenode, String src, String clientName,
595      ExtendedBlock block, long fileId) {
596    for (int retry = 0;; retry++) {
597      try {
598        if (namenode.complete(src, clientName, block, fileId)) {
599          endFileLease(client, fileId);
600          return;
601        } else {
602          LOG.warn("complete file " + src + " not finished, retry = " + retry);
603        }
604      } catch (RemoteException e) {
605        IOException ioe = e.unwrapRemoteException();
606        if (ioe instanceof LeaseExpiredException) {
607          LOG.warn("lease for file " + src + " is expired, give up", e);
608          return;
609        } else {
610          LOG.warn("complete file " + src + " failed, retry = " + retry, e);
611        }
612      } catch (Exception e) {
613        LOG.warn("complete file " + src + " failed, retry = " + retry, e);
614      }
615      sleepIgnoreInterrupt(retry);
616    }
617  }
618
619  static void sleepIgnoreInterrupt(int retry) {
620    try {
621      Thread.sleep(ConnectionUtils.getPauseTime(100, retry));
622    } catch (InterruptedException e) {
623    }
624  }
625}