001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.asyncfs;
019
020import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.createEncryptor;
021import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate;
022import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
023import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
024import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
025import static org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage.PIPELINE_SETUP_CREATE;
026import static org.apache.hbase.thirdparty.io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS;
027import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE;
028
029import com.google.protobuf.CodedOutputStream;
030import java.io.IOException;
031import java.io.InterruptedIOException;
032import java.lang.reflect.InvocationTargetException;
033import java.lang.reflect.Method;
034import java.util.ArrayList;
035import java.util.EnumSet;
036import java.util.List;
037import java.util.concurrent.TimeUnit;
038import org.apache.commons.lang3.ArrayUtils;
039import org.apache.hadoop.conf.Configuration;
040import org.apache.hadoop.crypto.CryptoProtocolVersion;
041import org.apache.hadoop.crypto.Encryptor;
042import org.apache.hadoop.fs.CreateFlag;
043import org.apache.hadoop.fs.FileSystem;
044import org.apache.hadoop.fs.FileSystemLinkResolver;
045import org.apache.hadoop.fs.Path;
046import org.apache.hadoop.fs.StorageType;
047import org.apache.hadoop.fs.UnresolvedLinkException;
048import org.apache.hadoop.fs.permission.FsPermission;
049import org.apache.hadoop.hbase.client.ConnectionUtils;
050import org.apache.hadoop.hbase.util.CancelableProgressable;
051import org.apache.hadoop.hdfs.DFSClient;
052import org.apache.hadoop.hdfs.DFSOutputStream;
053import org.apache.hadoop.hdfs.DistributedFileSystem;
054import org.apache.hadoop.hdfs.protocol.ClientProtocol;
055import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
056import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
057import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
058import org.apache.hadoop.hdfs.protocol.LocatedBlock;
059import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
060import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
061import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
062import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
063import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
064import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.ECN;
065import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto;
066import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
067import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
068import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto;
069import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
070import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
071import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto;
072import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
073import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
074import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
075import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
076import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
077import org.apache.hadoop.io.EnumSetWritable;
078import org.apache.hadoop.ipc.RemoteException;
079import org.apache.hadoop.net.NetUtils;
080import org.apache.hadoop.security.token.Token;
081import org.apache.hadoop.util.DataChecksum;
082import org.apache.yetus.audience.InterfaceAudience;
083import org.slf4j.Logger;
084import org.slf4j.LoggerFactory;
085
086import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
087import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
088import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator;
089import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream;
090import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator;
091import org.apache.hbase.thirdparty.io.netty.channel.Channel;
092import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture;
093import org.apache.hbase.thirdparty.io.netty.channel.ChannelFutureListener;
094import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler;
095import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
096import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer;
097import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
098import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
099import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
100import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
101import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
102import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
103import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
104import org.apache.hbase.thirdparty.io.netty.util.concurrent.Future;
105import org.apache.hbase.thirdparty.io.netty.util.concurrent.FutureListener;
106import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise;
107
108/**
109 * Helper class for implementing {@link FanOutOneBlockAsyncDFSOutput}.
110 */
111@InterfaceAudience.Private
112public final class FanOutOneBlockAsyncDFSOutputHelper {
113  private static final Logger LOG =
114      LoggerFactory.getLogger(FanOutOneBlockAsyncDFSOutputHelper.class);
115
116  private FanOutOneBlockAsyncDFSOutputHelper() {
117  }
118
119  public static final String ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES = "hbase.fs.async.create.retries";
120
121  public static final int DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES = 10;
122  // use pooled allocator for performance.
123  private static final ByteBufAllocator ALLOC = PooledByteBufAllocator.DEFAULT;
124
125  // copied from DFSPacket since it is package private.
126  public static final long HEART_BEAT_SEQNO = -1L;
127
128  // Timeouts for communicating with DataNode for streaming writes/reads
129  public static final int READ_TIMEOUT = 60 * 1000;
130
131  private static final DatanodeInfo[] EMPTY_DN_ARRAY = new DatanodeInfo[0];
132
133  private interface LeaseManager {
134
135    void begin(DFSClient client, long inodeId);
136
137    void end(DFSClient client, long inodeId);
138  }
139
140  private static final LeaseManager LEASE_MANAGER;
141
142  // This is used to terminate a recoverFileLease call when FileSystem is already closed.
143  // isClientRunning is not public so we need to use reflection.
144  private interface DFSClientAdaptor {
145
146    boolean isClientRunning(DFSClient client);
147  }
148
149  private static final DFSClientAdaptor DFS_CLIENT_ADAPTOR;
150
151  // helper class for creating files.
152  private interface FileCreator {
153    default HdfsFileStatus create(ClientProtocol instance, String src, FsPermission masked,
154        String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent,
155        short replication, long blockSize, CryptoProtocolVersion[] supportedVersions)
156        throws Exception {
157      try {
158        return (HdfsFileStatus) createObject(instance, src, masked, clientName, flag, createParent,
159          replication, blockSize, supportedVersions);
160      } catch (InvocationTargetException e) {
161        if (e.getCause() instanceof Exception) {
162          throw (Exception) e.getCause();
163        } else {
164          throw new RuntimeException(e.getCause());
165        }
166      }
167    };
168
169    Object createObject(ClientProtocol instance, String src, FsPermission masked, String clientName,
170        EnumSetWritable<CreateFlag> flag, boolean createParent, short replication, long blockSize,
171        CryptoProtocolVersion[] supportedVersions) throws Exception;
172  }
173
174  private static final FileCreator FILE_CREATOR;
175
176  // CreateFlag.SHOULD_REPLICATE is to make OutputStream on a EC directory support hflush/hsync, but
177  // EC is introduced in hadoop 3.x so we do not have this enum on 2.x, that's why we need to
178  // indirectly reference it through reflection.
179  private static final CreateFlag SHOULD_REPLICATE_FLAG;
180
181  private static DFSClientAdaptor createDFSClientAdaptor() throws NoSuchMethodException {
182    Method isClientRunningMethod = DFSClient.class.getDeclaredMethod("isClientRunning");
183    isClientRunningMethod.setAccessible(true);
184    return new DFSClientAdaptor() {
185
186      @Override
187      public boolean isClientRunning(DFSClient client) {
188        try {
189          return (Boolean) isClientRunningMethod.invoke(client);
190        } catch (IllegalAccessException | InvocationTargetException e) {
191          throw new RuntimeException(e);
192        }
193      }
194    };
195  }
196
197  private static LeaseManager createLeaseManager() throws NoSuchMethodException {
198    Method beginFileLeaseMethod =
199        DFSClient.class.getDeclaredMethod("beginFileLease", long.class, DFSOutputStream.class);
200    beginFileLeaseMethod.setAccessible(true);
201    Method endFileLeaseMethod = DFSClient.class.getDeclaredMethod("endFileLease", long.class);
202    endFileLeaseMethod.setAccessible(true);
203    return new LeaseManager() {
204
205      @Override
206      public void begin(DFSClient client, long inodeId) {
207        try {
208          beginFileLeaseMethod.invoke(client, inodeId, null);
209        } catch (IllegalAccessException | InvocationTargetException e) {
210          throw new RuntimeException(e);
211        }
212      }
213
214      @Override
215      public void end(DFSClient client, long inodeId) {
216        try {
217          endFileLeaseMethod.invoke(client, inodeId);
218        } catch (IllegalAccessException | InvocationTargetException e) {
219          throw new RuntimeException(e);
220        }
221      }
222    };
223  }
224
225  private static FileCreator createFileCreator3_3() throws NoSuchMethodException {
226    Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class,
227        String.class, EnumSetWritable.class, boolean.class, short.class, long.class,
228        CryptoProtocolVersion[].class, String.class, String.class);
229
230    return (instance, src, masked, clientName, flag, createParent, replication, blockSize,
231        supportedVersions) -> {
232      return (HdfsFileStatus) createMethod.invoke(instance, src, masked, clientName, flag,
233          createParent, replication, blockSize, supportedVersions, null, null);
234    };
235  }
236
237  private static FileCreator createFileCreator3() throws NoSuchMethodException {
238    Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class,
239      String.class, EnumSetWritable.class, boolean.class, short.class, long.class,
240      CryptoProtocolVersion[].class, String.class);
241
242    return (instance, src, masked, clientName, flag, createParent, replication, blockSize,
243        supportedVersions) -> {
244      return (HdfsFileStatus) createMethod.invoke(instance, src, masked, clientName, flag,
245        createParent, replication, blockSize, supportedVersions, null);
246    };
247  }
248
249  private static FileCreator createFileCreator2() throws NoSuchMethodException {
250    Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class,
251      String.class, EnumSetWritable.class, boolean.class, short.class, long.class,
252      CryptoProtocolVersion[].class);
253
254    return (instance, src, masked, clientName, flag, createParent, replication, blockSize,
255        supportedVersions) -> {
256      return (HdfsFileStatus) createMethod.invoke(instance, src, masked, clientName, flag,
257        createParent, replication, blockSize, supportedVersions);
258    };
259  }
260
261  private static FileCreator createFileCreator() throws NoSuchMethodException {
262    try {
263      return createFileCreator3_3();
264    } catch (NoSuchMethodException e) {
265      LOG.debug("ClientProtocol::create wrong number of arguments, should be hadoop 3.2 or below");
266    }
267
268    try {
269      return createFileCreator3();
270    } catch (NoSuchMethodException e) {
271      LOG.debug("ClientProtocol::create wrong number of arguments, should be hadoop 2.x");
272    }
273    return createFileCreator2();
274  }
275
276  private static CreateFlag loadShouldReplicateFlag() {
277    try {
278      return CreateFlag.valueOf("SHOULD_REPLICATE");
279    } catch (IllegalArgumentException e) {
280      LOG.debug("can not find SHOULD_REPLICATE flag, should be hadoop 2.x", e);
281      return null;
282    }
283  }
284
285  // cancel the processing if DFSClient is already closed.
286  static final class CancelOnClose implements CancelableProgressable {
287
288    private final DFSClient client;
289
290    public CancelOnClose(DFSClient client) {
291      this.client = client;
292    }
293
294    @Override
295    public boolean progress() {
296      return DFS_CLIENT_ADAPTOR.isClientRunning(client);
297    }
298  }
299
300  static {
301    try {
302      LEASE_MANAGER = createLeaseManager();
303      DFS_CLIENT_ADAPTOR = createDFSClientAdaptor();
304      FILE_CREATOR = createFileCreator();
305      SHOULD_REPLICATE_FLAG = loadShouldReplicateFlag();
306    } catch (Exception e) {
307      String msg = "Couldn't properly initialize access to HDFS internals. Please " +
308          "update your WAL Provider to not make use of the 'asyncfs' provider. See " +
309          "HBASE-16110 for more information.";
310      LOG.error(msg, e);
311      throw new Error(msg, e);
312    }
313  }
314
315  static void beginFileLease(DFSClient client, long inodeId) {
316    LEASE_MANAGER.begin(client, inodeId);
317  }
318
319  static void endFileLease(DFSClient client, long inodeId) {
320    LEASE_MANAGER.end(client, inodeId);
321  }
322
323  static DataChecksum createChecksum(DFSClient client) {
324    return client.getConf().createChecksum(null);
325  }
326
327  static Status getStatus(PipelineAckProto ack) {
328    List<Integer> flagList = ack.getFlagList();
329    Integer headerFlag;
330    if (flagList.isEmpty()) {
331      Status reply = ack.getReply(0);
332      headerFlag = PipelineAck.combineHeader(ECN.DISABLED, reply);
333    } else {
334      headerFlag = flagList.get(0);
335    }
336    return PipelineAck.getStatusFromHeader(headerFlag);
337  }
338
339  private static void processWriteBlockResponse(Channel channel, DatanodeInfo dnInfo,
340      Promise<Channel> promise, int timeoutMs) {
341    channel.pipeline().addLast(new IdleStateHandler(timeoutMs, 0, 0, TimeUnit.MILLISECONDS),
342      new ProtobufVarint32FrameDecoder(),
343      new ProtobufDecoder(BlockOpResponseProto.getDefaultInstance()),
344      new SimpleChannelInboundHandler<BlockOpResponseProto>() {
345
346        @Override
347        protected void channelRead0(ChannelHandlerContext ctx, BlockOpResponseProto resp)
348            throws Exception {
349          Status pipelineStatus = resp.getStatus();
350          if (PipelineAck.isRestartOOBStatus(pipelineStatus)) {
351            throw new IOException("datanode " + dnInfo + " is restarting");
352          }
353          String logInfo = "ack with firstBadLink as " + resp.getFirstBadLink();
354          if (resp.getStatus() != Status.SUCCESS) {
355            if (resp.getStatus() == Status.ERROR_ACCESS_TOKEN) {
356              throw new InvalidBlockTokenException("Got access token error" + ", status message " +
357                  resp.getMessage() + ", " + logInfo);
358            } else {
359              throw new IOException("Got error" + ", status=" + resp.getStatus().name() +
360                  ", status message " + resp.getMessage() + ", " + logInfo);
361            }
362          }
363          // success
364          ChannelPipeline p = ctx.pipeline();
365          for (ChannelHandler handler; (handler = p.removeLast()) != null;) {
366            // do not remove all handlers because we may have wrap or unwrap handlers at the header
367            // of pipeline.
368            if (handler instanceof IdleStateHandler) {
369              break;
370            }
371          }
372          // Disable auto read here. Enable it after we setup the streaming pipeline in
373          // FanOutOneBLockAsyncDFSOutput.
374          ctx.channel().config().setAutoRead(false);
375          promise.trySuccess(ctx.channel());
376        }
377
378        @Override
379        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
380          promise.tryFailure(new IOException("connection to " + dnInfo + " is closed"));
381        }
382
383        @Override
384        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
385          if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == READER_IDLE) {
386            promise
387                .tryFailure(new IOException("Timeout(" + timeoutMs + "ms) waiting for response"));
388          } else {
389            super.userEventTriggered(ctx, evt);
390          }
391        }
392
393        @Override
394        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
395          promise.tryFailure(cause);
396        }
397      });
398  }
399
400  private static void requestWriteBlock(Channel channel, StorageType storageType,
401      OpWriteBlockProto.Builder writeBlockProtoBuilder) throws IOException {
402    OpWriteBlockProto proto =
403      writeBlockProtoBuilder.setStorageType(PBHelperClient.convertStorageType(storageType)).build();
404    int protoLen = proto.getSerializedSize();
405    ByteBuf buffer =
406      channel.alloc().buffer(3 + CodedOutputStream.computeRawVarint32Size(protoLen) + protoLen);
407    buffer.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
408    buffer.writeByte(Op.WRITE_BLOCK.code);
409    proto.writeDelimitedTo(new ByteBufOutputStream(buffer));
410    channel.writeAndFlush(buffer);
411  }
412
413  private static void initialize(Configuration conf, Channel channel, DatanodeInfo dnInfo,
414      StorageType storageType, OpWriteBlockProto.Builder writeBlockProtoBuilder, int timeoutMs,
415      DFSClient client, Token<BlockTokenIdentifier> accessToken, Promise<Channel> promise)
416      throws IOException {
417    Promise<Void> saslPromise = channel.eventLoop().newPromise();
418    trySaslNegotiate(conf, channel, dnInfo, timeoutMs, client, accessToken, saslPromise);
419    saslPromise.addListener(new FutureListener<Void>() {
420
421      @Override
422      public void operationComplete(Future<Void> future) throws Exception {
423        if (future.isSuccess()) {
424          // setup response processing pipeline first, then send request.
425          processWriteBlockResponse(channel, dnInfo, promise, timeoutMs);
426          requestWriteBlock(channel, storageType, writeBlockProtoBuilder);
427        } else {
428          promise.tryFailure(future.cause());
429        }
430      }
431    });
432  }
433
434  private static List<Future<Channel>> connectToDataNodes(Configuration conf, DFSClient client,
435      String clientName, LocatedBlock locatedBlock, long maxBytesRcvd, long latestGS,
436      BlockConstructionStage stage, DataChecksum summer, EventLoopGroup eventLoopGroup,
437      Class<? extends Channel> channelClass) {
438    StorageType[] storageTypes = locatedBlock.getStorageTypes();
439    DatanodeInfo[] datanodeInfos = locatedBlock.getLocations();
440    boolean connectToDnViaHostname =
441        conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
442    int timeoutMs = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT);
443    ExtendedBlock blockCopy = new ExtendedBlock(locatedBlock.getBlock());
444    blockCopy.setNumBytes(locatedBlock.getBlockSize());
445    ClientOperationHeaderProto header = ClientOperationHeaderProto.newBuilder()
446      .setBaseHeader(BaseHeaderProto.newBuilder().setBlock(PBHelperClient.convert(blockCopy))
447        .setToken(PBHelperClient.convert(locatedBlock.getBlockToken())))
448      .setClientName(clientName).build();
449    ChecksumProto checksumProto = DataTransferProtoUtil.toProto(summer);
450    OpWriteBlockProto.Builder writeBlockProtoBuilder = OpWriteBlockProto.newBuilder()
451        .setHeader(header).setStage(OpWriteBlockProto.BlockConstructionStage.valueOf(stage.name()))
452        .setPipelineSize(1).setMinBytesRcvd(locatedBlock.getBlock().getNumBytes())
453        .setMaxBytesRcvd(maxBytesRcvd).setLatestGenerationStamp(latestGS)
454        .setRequestedChecksum(checksumProto)
455        .setCachingStrategy(CachingStrategyProto.newBuilder().setDropBehind(true).build());
456    List<Future<Channel>> futureList = new ArrayList<>(datanodeInfos.length);
457    for (int i = 0; i < datanodeInfos.length; i++) {
458      DatanodeInfo dnInfo = datanodeInfos[i];
459      StorageType storageType = storageTypes[i];
460      Promise<Channel> promise = eventLoopGroup.next().newPromise();
461      futureList.add(promise);
462      String dnAddr = dnInfo.getXferAddr(connectToDnViaHostname);
463      new Bootstrap().group(eventLoopGroup).channel(channelClass)
464          .option(CONNECT_TIMEOUT_MILLIS, timeoutMs).handler(new ChannelInitializer<Channel>() {
465
466            @Override
467            protected void initChannel(Channel ch) throws Exception {
468              // we need to get the remote address of the channel so we can only move on after
469              // channel connected. Leave an empty implementation here because netty does not allow
470              // a null handler.
471            }
472          }).connect(NetUtils.createSocketAddr(dnAddr)).addListener(new ChannelFutureListener() {
473
474            @Override
475            public void operationComplete(ChannelFuture future) throws Exception {
476              if (future.isSuccess()) {
477                initialize(conf, future.channel(), dnInfo, storageType, writeBlockProtoBuilder,
478                  timeoutMs, client, locatedBlock.getBlockToken(), promise);
479              } else {
480                promise.tryFailure(future.cause());
481              }
482            }
483          });
484    }
485    return futureList;
486  }
487
488  /**
489   * Exception other than RemoteException thrown when calling create on namenode
490   */
491  public static class NameNodeException extends IOException {
492
493    private static final long serialVersionUID = 3143237406477095390L;
494
495    public NameNodeException(Throwable cause) {
496      super(cause);
497    }
498  }
499
500  private static EnumSetWritable<CreateFlag> getCreateFlags(boolean overwrite) {
501    List<CreateFlag> flags = new ArrayList<>();
502    flags.add(CreateFlag.CREATE);
503    if (overwrite) {
504      flags.add(CreateFlag.OVERWRITE);
505    }
506    if (SHOULD_REPLICATE_FLAG != null) {
507      flags.add(SHOULD_REPLICATE_FLAG);
508    }
509    return new EnumSetWritable<>(EnumSet.copyOf(flags));
510  }
511
512  private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src,
513      boolean overwrite, boolean createParent, short replication, long blockSize,
514      EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) throws IOException {
515    Configuration conf = dfs.getConf();
516    DFSClient client = dfs.getClient();
517    String clientName = client.getClientName();
518    ClientProtocol namenode = client.getNamenode();
519    int createMaxRetries = conf.getInt(ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES,
520      DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES);
521    DatanodeInfo[] excludesNodes = EMPTY_DN_ARRAY;
522    for (int retry = 0;; retry++) {
523      HdfsFileStatus stat;
524      try {
525        stat = FILE_CREATOR.create(namenode, src,
526          FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName,
527          getCreateFlags(overwrite), createParent, replication, blockSize,
528          CryptoProtocolVersion.supported());
529      } catch (Exception e) {
530        if (e instanceof RemoteException) {
531          throw (RemoteException) e;
532        } else {
533          throw new NameNodeException(e);
534        }
535      }
536      beginFileLease(client, stat.getFileId());
537      boolean succ = false;
538      LocatedBlock locatedBlock = null;
539      List<Future<Channel>> futureList = null;
540      try {
541        DataChecksum summer = createChecksum(client);
542        locatedBlock = namenode.addBlock(src, client.getClientName(), null, excludesNodes,
543          stat.getFileId(), null, null);
544        List<Channel> datanodeList = new ArrayList<>();
545        futureList = connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L,
546          PIPELINE_SETUP_CREATE, summer, eventLoopGroup, channelClass);
547        for (int i = 0, n = futureList.size(); i < n; i++) {
548          try {
549            datanodeList.add(futureList.get(i).syncUninterruptibly().getNow());
550          } catch (Exception e) {
551            // exclude the broken DN next time
552            excludesNodes = ArrayUtils.add(excludesNodes, locatedBlock.getLocations()[i]);
553            throw e;
554          }
555        }
556        Encryptor encryptor = createEncryptor(conf, stat, client);
557        FanOutOneBlockAsyncDFSOutput output =
558          new FanOutOneBlockAsyncDFSOutput(conf, dfs, client, namenode, clientName, src,
559            stat.getFileId(), locatedBlock, encryptor, datanodeList, summer, ALLOC);
560        succ = true;
561        return output;
562      } catch (RemoteException e) {
563        LOG.warn("create fan-out dfs output {} failed, retry = {}", src, retry, e);
564        if (shouldRetryCreate(e)) {
565          if (retry >= createMaxRetries) {
566            throw e.unwrapRemoteException();
567          }
568        } else {
569          throw e.unwrapRemoteException();
570        }
571      } catch (IOException e) {
572        LOG.warn("create fan-out dfs output {} failed, retry = {}", src, retry, e);
573        if (retry >= createMaxRetries) {
574          throw e;
575        }
576        // overwrite the old broken file.
577        overwrite = true;
578        try {
579          Thread.sleep(ConnectionUtils.getPauseTime(100, retry));
580        } catch (InterruptedException ie) {
581          throw new InterruptedIOException();
582        }
583      } finally {
584        if (!succ) {
585          if (futureList != null) {
586            for (Future<Channel> f : futureList) {
587              f.addListener(new FutureListener<Channel>() {
588
589                @Override
590                public void operationComplete(Future<Channel> future) throws Exception {
591                  if (future.isSuccess()) {
592                    future.getNow().close();
593                  }
594                }
595              });
596            }
597          }
598          endFileLease(client, stat.getFileId());
599        }
600      }
601    }
602  }
603
604  /**
605   * Create a {@link FanOutOneBlockAsyncDFSOutput}. The method maybe blocked so do not call it
606   * inside an {@link EventLoop}.
607   */
608  public static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, Path f,
609      boolean overwrite, boolean createParent, short replication, long blockSize,
610      EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) throws IOException {
611    return new FileSystemLinkResolver<FanOutOneBlockAsyncDFSOutput>() {
612
613      @Override
614      public FanOutOneBlockAsyncDFSOutput doCall(Path p)
615          throws IOException, UnresolvedLinkException {
616        return createOutput(dfs, p.toUri().getPath(), overwrite, createParent, replication,
617          blockSize, eventLoopGroup, channelClass);
618      }
619
620      @Override
621      public FanOutOneBlockAsyncDFSOutput next(FileSystem fs, Path p) throws IOException {
622        throw new UnsupportedOperationException();
623      }
624    }.resolve(dfs, f);
625  }
626
627  public static boolean shouldRetryCreate(RemoteException e) {
628    // RetryStartFileException is introduced in HDFS 2.6+, so here we can only use the class name.
629    // For exceptions other than this, we just throw it out. This is same with
630    // DFSOutputStream.newStreamForCreate.
631    return e.getClassName().endsWith("RetryStartFileException");
632  }
633
634  static void completeFile(DFSClient client, ClientProtocol namenode, String src, String clientName,
635      ExtendedBlock block, long fileId) {
636    for (int retry = 0;; retry++) {
637      try {
638        if (namenode.complete(src, clientName, block, fileId)) {
639          endFileLease(client, fileId);
640          return;
641        } else {
642          LOG.warn("complete file " + src + " not finished, retry = " + retry);
643        }
644      } catch (RemoteException e) {
645        IOException ioe = e.unwrapRemoteException();
646        if (ioe instanceof LeaseExpiredException) {
647          LOG.warn("lease for file " + src + " is expired, give up", e);
648          return;
649        } else {
650          LOG.warn("complete file " + src + " failed, retry = " + retry, e);
651        }
652      } catch (Exception e) {
653        LOG.warn("complete file " + src + " failed, retry = " + retry, e);
654      }
655      sleepIgnoreInterrupt(retry);
656    }
657  }
658
659  static void sleepIgnoreInterrupt(int retry) {
660    try {
661      Thread.sleep(ConnectionUtils.getPauseTime(100, retry));
662    } catch (InterruptedException e) {
663    }
664  }
665}