001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.asyncfs;
019
020import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.createEncryptor;
021import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate;
022import static org.apache.hadoop.hbase.util.LocatedBlockHelper.getLocatedBlockLocations;
023import static org.apache.hadoop.hbase.util.NettyFutureUtils.addListener;
024import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeClose;
025import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeWriteAndFlush;
026import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
027import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
028import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
029import static org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage.PIPELINE_SETUP_CREATE;
030import static org.apache.hbase.thirdparty.io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS;
031import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE;
032
033import java.io.IOException;
034import java.io.InterruptedIOException;
035import java.lang.reflect.InvocationTargetException;
036import java.lang.reflect.Method;
037import java.util.ArrayList;
038import java.util.Collection;
039import java.util.EnumSet;
040import java.util.HashSet;
041import java.util.IdentityHashMap;
042import java.util.List;
043import java.util.Map;
044import java.util.Set;
045import java.util.concurrent.TimeUnit;
046import java.util.stream.Collectors;
047import org.apache.hadoop.conf.Configuration;
048import org.apache.hadoop.crypto.CryptoProtocolVersion;
049import org.apache.hadoop.crypto.Encryptor;
050import org.apache.hadoop.fs.CreateFlag;
051import org.apache.hadoop.fs.FileSystem;
052import org.apache.hadoop.fs.FileSystemLinkResolver;
053import org.apache.hadoop.fs.Path;
054import org.apache.hadoop.fs.StorageType;
055import org.apache.hadoop.fs.UnresolvedLinkException;
056import org.apache.hadoop.fs.permission.FsPermission;
057import org.apache.hadoop.hbase.client.ConnectionUtils;
058import org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager;
059import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
060import org.apache.hadoop.hbase.util.CancelableProgressable;
061import org.apache.hadoop.hdfs.DFSClient;
062import org.apache.hadoop.hdfs.DFSOutputStream;
063import org.apache.hadoop.hdfs.DistributedFileSystem;
064import org.apache.hadoop.hdfs.protocol.ClientProtocol;
065import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
066import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
067import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
068import org.apache.hadoop.hdfs.protocol.LocatedBlock;
069import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
070import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
071import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
072import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
073import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
074import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.ECN;
075import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto;
076import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
077import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
078import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto;
079import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
080import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
081import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto;
082import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
083import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
084import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
085import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
086import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
087import org.apache.hadoop.io.EnumSetWritable;
088import org.apache.hadoop.ipc.RemoteException;
089import org.apache.hadoop.net.NetUtils;
090import org.apache.hadoop.security.token.Token;
091import org.apache.hadoop.util.DataChecksum;
092import org.apache.yetus.audience.InterfaceAudience;
093import org.slf4j.Logger;
094import org.slf4j.LoggerFactory;
095
096import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream;
097import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
098import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
099import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator;
100import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream;
101import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator;
102import org.apache.hbase.thirdparty.io.netty.channel.Channel;
103import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture;
104import org.apache.hbase.thirdparty.io.netty.channel.ChannelFutureListener;
105import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler;
106import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
107import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer;
108import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
109import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
110import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
111import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
112import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
113import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
114import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
115import org.apache.hbase.thirdparty.io.netty.util.concurrent.Future;
116import org.apache.hbase.thirdparty.io.netty.util.concurrent.FutureListener;
117import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise;
118
119/**
120 * Helper class for implementing {@link FanOutOneBlockAsyncDFSOutput}.
121 */
122@InterfaceAudience.Private
123public final class FanOutOneBlockAsyncDFSOutputHelper {
124  private static final Logger LOG =
125    LoggerFactory.getLogger(FanOutOneBlockAsyncDFSOutputHelper.class);
126
127  private FanOutOneBlockAsyncDFSOutputHelper() {
128  }
129
130  public static final String ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES = "hbase.fs.async.create.retries";
131
132  public static final int DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES = 10;
133  // use pooled allocator for performance.
134  private static final ByteBufAllocator ALLOC = PooledByteBufAllocator.DEFAULT;
135
136  // copied from DFSPacket since it is package private.
137  public static final long HEART_BEAT_SEQNO = -1L;
138
139  // Timeouts for communicating with DataNode for streaming writes/reads
140  public static final int READ_TIMEOUT = 60 * 1000;
141
142  private interface LeaseManager {
143
144    void begin(FanOutOneBlockAsyncDFSOutput output);
145
146    void end(FanOutOneBlockAsyncDFSOutput output);
147  }
148
149  private static final LeaseManager LEASE_MANAGER;
150
151  // helper class for creating files.
152  private interface FileCreator {
153    default HdfsFileStatus create(ClientProtocol instance, String src, FsPermission masked,
154      String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent, short replication,
155      long blockSize, CryptoProtocolVersion[] supportedVersions) throws Exception {
156      try {
157        return (HdfsFileStatus) createObject(instance, src, masked, clientName, flag, createParent,
158          replication, blockSize, supportedVersions);
159      } catch (InvocationTargetException e) {
160        if (e.getCause() instanceof Exception) {
161          throw (Exception) e.getCause();
162        } else {
163          throw new RuntimeException(e.getCause());
164        }
165      }
166    }
167
168    Object createObject(ClientProtocol instance, String src, FsPermission masked, String clientName,
169      EnumSetWritable<CreateFlag> flag, boolean createParent, short replication, long blockSize,
170      CryptoProtocolVersion[] supportedVersions) throws Exception;
171  }
172
173  private static final FileCreator FILE_CREATOR;
174
175  private static LeaseManager createLeaseManager3_4() throws NoSuchMethodException {
176    Method beginFileLeaseMethod =
177      DFSClient.class.getDeclaredMethod("beginFileLease", String.class, DFSOutputStream.class);
178    beginFileLeaseMethod.setAccessible(true);
179    Method endFileLeaseMethod = DFSClient.class.getDeclaredMethod("endFileLease", String.class);
180    endFileLeaseMethod.setAccessible(true);
181    Method getUniqKeyMethod = DFSOutputStream.class.getMethod("getUniqKey");
182    return new LeaseManager() {
183
184      private String getUniqKey(FanOutOneBlockAsyncDFSOutput output)
185        throws IllegalAccessException, InvocationTargetException {
186        return (String) getUniqKeyMethod.invoke(output.getDummyStream());
187      }
188
189      @Override
190      public void begin(FanOutOneBlockAsyncDFSOutput output) {
191        try {
192          beginFileLeaseMethod.invoke(output.getClient(), getUniqKey(output),
193            output.getDummyStream());
194        } catch (IllegalAccessException | InvocationTargetException e) {
195          throw new RuntimeException(e);
196        }
197      }
198
199      @Override
200      public void end(FanOutOneBlockAsyncDFSOutput output) {
201        try {
202          endFileLeaseMethod.invoke(output.getClient(), getUniqKey(output));
203        } catch (IllegalAccessException | InvocationTargetException e) {
204          throw new RuntimeException(e);
205        }
206      }
207    };
208  }
209
210  private static LeaseManager createLeaseManager3() throws NoSuchMethodException {
211    Method beginFileLeaseMethod =
212      DFSClient.class.getDeclaredMethod("beginFileLease", long.class, DFSOutputStream.class);
213    beginFileLeaseMethod.setAccessible(true);
214    Method endFileLeaseMethod = DFSClient.class.getDeclaredMethod("endFileLease", long.class);
215    endFileLeaseMethod.setAccessible(true);
216    return new LeaseManager() {
217
218      @Override
219      public void begin(FanOutOneBlockAsyncDFSOutput output) {
220        try {
221          beginFileLeaseMethod.invoke(output.getClient(), output.getDummyStream().getFileId(),
222            output.getDummyStream());
223        } catch (IllegalAccessException | InvocationTargetException e) {
224          throw new RuntimeException(e);
225        }
226      }
227
228      @Override
229      public void end(FanOutOneBlockAsyncDFSOutput output) {
230        try {
231          endFileLeaseMethod.invoke(output.getClient(), output.getDummyStream().getFileId());
232        } catch (IllegalAccessException | InvocationTargetException e) {
233          throw new RuntimeException(e);
234        }
235      }
236    };
237  }
238
239  private static LeaseManager createLeaseManager() throws NoSuchMethodException {
240    try {
241      return createLeaseManager3_4();
242    } catch (NoSuchMethodException e) {
243      LOG.debug("DFSClient::beginFileLease wrong arguments, should be hadoop 3.3 or below");
244    }
245
246    return createLeaseManager3();
247  }
248
249  private static FileCreator createFileCreator3_3() throws NoSuchMethodException {
250    Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class,
251      String.class, EnumSetWritable.class, boolean.class, short.class, long.class,
252      CryptoProtocolVersion[].class, String.class, String.class);
253
254    return (instance, src, masked, clientName, flag, createParent, replication, blockSize,
255      supportedVersions) -> {
256      return (HdfsFileStatus) createMethod.invoke(instance, src, masked, clientName, flag,
257        createParent, replication, blockSize, supportedVersions, null, null);
258    };
259  }
260
261  private static FileCreator createFileCreator3() throws NoSuchMethodException {
262    Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class,
263      String.class, EnumSetWritable.class, boolean.class, short.class, long.class,
264      CryptoProtocolVersion[].class, String.class);
265
266    return (instance, src, masked, clientName, flag, createParent, replication, blockSize,
267      supportedVersions) -> {
268      return (HdfsFileStatus) createMethod.invoke(instance, src, masked, clientName, flag,
269        createParent, replication, blockSize, supportedVersions, null);
270    };
271  }
272
273  private static FileCreator createFileCreator() throws NoSuchMethodException {
274    try {
275      return createFileCreator3_3();
276    } catch (NoSuchMethodException e) {
277      LOG.debug("ClientProtocol::create wrong number of arguments, should be hadoop 3.2 or below");
278    }
279
280    return createFileCreator3();
281  }
282
283  // cancel the processing if DFSClient is already closed.
284  static final class CancelOnClose implements CancelableProgressable {
285
286    private final DFSClient client;
287
288    public CancelOnClose(DFSClient client) {
289      this.client = client;
290    }
291
292    @Override
293    public boolean progress() {
294      return client.isClientRunning();
295    }
296  }
297
298  static {
299    try {
300      LEASE_MANAGER = createLeaseManager();
301      FILE_CREATOR = createFileCreator();
302    } catch (Exception e) {
303      String msg = "Couldn't properly initialize access to HDFS internals. Please "
304        + "update your WAL Provider to not make use of the 'asyncfs' provider. See "
305        + "HBASE-16110 for more information.";
306      LOG.error(msg, e);
307      throw new Error(msg, e);
308    }
309  }
310
311  private static void beginFileLease(FanOutOneBlockAsyncDFSOutput output) {
312    LEASE_MANAGER.begin(output);
313  }
314
315  static void endFileLease(FanOutOneBlockAsyncDFSOutput output) {
316    LEASE_MANAGER.end(output);
317  }
318
319  static DataChecksum createChecksum(DFSClient client) {
320    return client.getConf().createChecksum(null);
321  }
322
323  static Status getStatus(PipelineAckProto ack) {
324    List<Integer> flagList = ack.getFlagList();
325    Integer headerFlag;
326    if (flagList.isEmpty()) {
327      Status reply = ack.getReply(0);
328      headerFlag = PipelineAck.combineHeader(ECN.DISABLED, reply);
329    } else {
330      headerFlag = flagList.get(0);
331    }
332    return PipelineAck.getStatusFromHeader(headerFlag);
333  }
334
335  private static void processWriteBlockResponse(Channel channel, DatanodeInfo dnInfo,
336    Promise<Channel> promise, int timeoutMs) {
337    channel.pipeline().addLast(new IdleStateHandler(timeoutMs, 0, 0, TimeUnit.MILLISECONDS),
338      new ProtobufVarint32FrameDecoder(),
339      new ProtobufDecoder(BlockOpResponseProto.getDefaultInstance()),
340      new SimpleChannelInboundHandler<BlockOpResponseProto>() {
341
342        @Override
343        protected void channelRead0(ChannelHandlerContext ctx, BlockOpResponseProto resp)
344          throws Exception {
345          Status pipelineStatus = resp.getStatus();
346          if (PipelineAck.isRestartOOBStatus(pipelineStatus)) {
347            throw new IOException("datanode " + dnInfo + " is restarting");
348          }
349          String logInfo = "ack with firstBadLink as " + resp.getFirstBadLink();
350          if (resp.getStatus() != Status.SUCCESS) {
351            if (resp.getStatus() == Status.ERROR_ACCESS_TOKEN) {
352              throw new InvalidBlockTokenException("Got access token error" + ", status message "
353                + resp.getMessage() + ", " + logInfo);
354            } else {
355              throw new IOException("Got error" + ", status=" + resp.getStatus().name()
356                + ", status message " + resp.getMessage() + ", " + logInfo);
357            }
358          }
359          // success
360          ChannelPipeline p = ctx.pipeline();
361          for (ChannelHandler handler; (handler = p.removeLast()) != null;) {
362            // do not remove all handlers because we may have wrap or unwrap handlers at the header
363            // of pipeline.
364            if (handler instanceof IdleStateHandler) {
365              break;
366            }
367          }
368          // Disable auto read here. Enable it after we setup the streaming pipeline in
369          // FanOutOneBLockAsyncDFSOutput.
370          ctx.channel().config().setAutoRead(false);
371          promise.trySuccess(ctx.channel());
372        }
373
374        @Override
375        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
376          promise.tryFailure(new IOException("connection to " + dnInfo + " is closed"));
377        }
378
379        @Override
380        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
381          if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == READER_IDLE) {
382            promise
383              .tryFailure(new IOException("Timeout(" + timeoutMs + "ms) waiting for response"));
384          } else {
385            super.userEventTriggered(ctx, evt);
386          }
387        }
388
389        @Override
390        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
391          promise.tryFailure(cause);
392        }
393      });
394  }
395
396  private static void requestWriteBlock(Channel channel, StorageType storageType,
397    OpWriteBlockProto.Builder writeBlockProtoBuilder) throws IOException {
398    OpWriteBlockProto proto =
399      writeBlockProtoBuilder.setStorageType(PBHelperClient.convertStorageType(storageType)).build();
400    int protoLen = proto.getSerializedSize();
401    ByteBuf buffer =
402      channel.alloc().buffer(3 + CodedOutputStream.computeUInt32SizeNoTag(protoLen) + protoLen);
403    buffer.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
404    buffer.writeByte(Op.WRITE_BLOCK.code);
405    proto.writeDelimitedTo(new ByteBufOutputStream(buffer));
406    safeWriteAndFlush(channel, buffer);
407  }
408
409  private static void initialize(Configuration conf, Channel channel, DatanodeInfo dnInfo,
410    StorageType storageType, OpWriteBlockProto.Builder writeBlockProtoBuilder, int timeoutMs,
411    DFSClient client, Token<BlockTokenIdentifier> accessToken, Promise<Channel> promise)
412    throws IOException {
413    Promise<Void> saslPromise = channel.eventLoop().newPromise();
414    trySaslNegotiate(conf, channel, dnInfo, timeoutMs, client, accessToken, saslPromise);
415    addListener(saslPromise, new FutureListener<Void>() {
416
417      @Override
418      public void operationComplete(Future<Void> future) throws Exception {
419        if (future.isSuccess()) {
420          // setup response processing pipeline first, then send request.
421          processWriteBlockResponse(channel, dnInfo, promise, timeoutMs);
422          requestWriteBlock(channel, storageType, writeBlockProtoBuilder);
423        } else {
424          promise.tryFailure(future.cause());
425        }
426      }
427    });
428  }
429
430  private static List<Future<Channel>> connectToDataNodes(Configuration conf, DFSClient client,
431    String clientName, LocatedBlock locatedBlock, long maxBytesRcvd, long latestGS,
432    BlockConstructionStage stage, DataChecksum summer, EventLoopGroup eventLoopGroup,
433    Class<? extends Channel> channelClass) {
434    StorageType[] storageTypes = locatedBlock.getStorageTypes();
435    DatanodeInfo[] datanodeInfos = getLocatedBlockLocations(locatedBlock);
436    boolean connectToDnViaHostname =
437      conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
438    int timeoutMs = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT);
439    ExtendedBlock blockCopy = new ExtendedBlock(locatedBlock.getBlock());
440    blockCopy.setNumBytes(locatedBlock.getBlockSize());
441    ClientOperationHeaderProto header = ClientOperationHeaderProto.newBuilder()
442      .setBaseHeader(BaseHeaderProto.newBuilder().setBlock(PBHelperClient.convert(blockCopy))
443        .setToken(PBHelperClient.convert(locatedBlock.getBlockToken())))
444      .setClientName(clientName).build();
445    ChecksumProto checksumProto = DataTransferProtoUtil.toProto(summer);
446    OpWriteBlockProto.Builder writeBlockProtoBuilder =
447      OpWriteBlockProto.newBuilder().setHeader(header)
448        .setStage(OpWriteBlockProto.BlockConstructionStage.valueOf(stage.name())).setPipelineSize(1)
449        .setMinBytesRcvd(locatedBlock.getBlock().getNumBytes()).setMaxBytesRcvd(maxBytesRcvd)
450        .setLatestGenerationStamp(latestGS).setRequestedChecksum(checksumProto)
451        .setCachingStrategy(CachingStrategyProto.newBuilder().setDropBehind(true).build());
452    List<Future<Channel>> futureList = new ArrayList<>(datanodeInfos.length);
453    for (int i = 0; i < datanodeInfos.length; i++) {
454      DatanodeInfo dnInfo = datanodeInfos[i];
455      StorageType storageType = storageTypes[i];
456      Promise<Channel> promise = eventLoopGroup.next().newPromise();
457      futureList.add(promise);
458      String dnAddr = dnInfo.getXferAddr(connectToDnViaHostname);
459      addListener(new Bootstrap().group(eventLoopGroup).channel(channelClass)
460        .option(CONNECT_TIMEOUT_MILLIS, timeoutMs).handler(new ChannelInitializer<Channel>() {
461
462          @Override
463          protected void initChannel(Channel ch) throws Exception {
464            // we need to get the remote address of the channel so we can only move on after
465            // channel connected. Leave an empty implementation here because netty does not allow
466            // a null handler.
467          }
468        }).connect(NetUtils.createSocketAddr(dnAddr)), new ChannelFutureListener() {
469
470          @Override
471          public void operationComplete(ChannelFuture future) throws Exception {
472            if (future.isSuccess()) {
473              initialize(conf, future.channel(), dnInfo, storageType, writeBlockProtoBuilder,
474                timeoutMs, client, locatedBlock.getBlockToken(), promise);
475            } else {
476              promise.tryFailure(future.cause());
477            }
478          }
479        });
480    }
481    return futureList;
482  }
483
484  /**
485   * Exception other than RemoteException thrown when calling create on namenode
486   */
487  public static class NameNodeException extends IOException {
488
489    private static final long serialVersionUID = 3143237406477095390L;
490
491    public NameNodeException(Throwable cause) {
492      super(cause);
493    }
494  }
495
496  private static EnumSetWritable<CreateFlag> getCreateFlags(boolean overwrite,
497    boolean noLocalWrite) {
498    List<CreateFlag> flags = new ArrayList<>();
499    flags.add(CreateFlag.CREATE);
500    if (overwrite) {
501      flags.add(CreateFlag.OVERWRITE);
502    }
503    if (noLocalWrite) {
504      flags.add(CreateFlag.NO_LOCAL_WRITE);
505    }
506    flags.add(CreateFlag.SHOULD_REPLICATE);
507    return new EnumSetWritable<>(EnumSet.copyOf(flags));
508  }
509
510  private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src,
511    boolean overwrite, boolean createParent, short replication, long blockSize,
512    EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass, StreamSlowMonitor monitor,
513    boolean noLocalWrite) throws IOException {
514    Configuration conf = dfs.getConf();
515    DFSClient client = dfs.getClient();
516    String clientName = client.getClientName();
517    ClientProtocol namenode = client.getNamenode();
518    int createMaxRetries =
519      conf.getInt(ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES, DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES);
520    ExcludeDatanodeManager excludeDatanodeManager = monitor.getExcludeDatanodeManager();
521    Set<DatanodeInfo> toExcludeNodes =
522      new HashSet<>(excludeDatanodeManager.getExcludeDNs().keySet());
523    for (int retry = 0;; retry++) {
524      if (LOG.isDebugEnabled()) {
525        LOG.debug("When create output stream for {}, exclude list is {}, retry={}", src,
526          getDataNodeInfo(toExcludeNodes), retry);
527      }
528      EnumSetWritable<CreateFlag> createFlags = getCreateFlags(overwrite, noLocalWrite);
529      HdfsFileStatus stat;
530      try {
531        stat = FILE_CREATOR.create(namenode, src,
532          FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName,
533          createFlags, createParent, replication, blockSize, CryptoProtocolVersion.supported());
534      } catch (Exception e) {
535        if (e instanceof RemoteException) {
536          throw (RemoteException) e;
537        } else {
538          throw new NameNodeException(e);
539        }
540      }
541      boolean succ = false;
542      LocatedBlock locatedBlock = null;
543      List<Future<Channel>> futureList = null;
544      try {
545        DataChecksum summer = createChecksum(client);
546        locatedBlock = namenode.addBlock(src, client.getClientName(), null,
547          toExcludeNodes.toArray(new DatanodeInfo[0]), stat.getFileId(), null, null);
548        Map<Channel, DatanodeInfo> datanodes = new IdentityHashMap<>();
549        futureList = connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L,
550          PIPELINE_SETUP_CREATE, summer, eventLoopGroup, channelClass);
551        for (int i = 0, n = futureList.size(); i < n; i++) {
552          DatanodeInfo datanodeInfo = getLocatedBlockLocations(locatedBlock)[i];
553          try {
554            datanodes.put(futureList.get(i).syncUninterruptibly().getNow(), datanodeInfo);
555          } catch (Exception e) {
556            // exclude the broken DN next time
557            toExcludeNodes.add(datanodeInfo);
558            excludeDatanodeManager.tryAddExcludeDN(datanodeInfo, "connect error");
559            throw e;
560          }
561        }
562        Encryptor encryptor = createEncryptor(conf, stat, client);
563        FanOutOneBlockAsyncDFSOutput output =
564          new FanOutOneBlockAsyncDFSOutput(conf, dfs, client, namenode, clientName, src, stat,
565            createFlags.get(), locatedBlock, encryptor, datanodes, summer, ALLOC, monitor);
566        beginFileLease(output);
567        succ = true;
568        return output;
569      } catch (RemoteException e) {
570        LOG.warn("create fan-out dfs output {} failed, retry = {}", src, retry, e);
571        if (shouldRetryCreate(e)) {
572          if (retry >= createMaxRetries) {
573            throw e.unwrapRemoteException();
574          }
575        } else {
576          throw e.unwrapRemoteException();
577        }
578      } catch (IOException e) {
579        LOG.warn("create fan-out dfs output {} failed, retry = {}", src, retry, e);
580        if (retry >= createMaxRetries) {
581          throw e;
582        }
583        // overwrite the old broken file.
584        overwrite = true;
585        try {
586          Thread.sleep(ConnectionUtils.getPauseTime(100, retry));
587        } catch (InterruptedException ie) {
588          throw new InterruptedIOException();
589        }
590      } finally {
591        if (!succ) {
592          if (futureList != null) {
593            for (Future<Channel> f : futureList) {
594              addListener(f, new FutureListener<Channel>() {
595
596                @Override
597                public void operationComplete(Future<Channel> future) throws Exception {
598                  if (future.isSuccess()) {
599                    safeClose(future.getNow());
600                  }
601                }
602              });
603            }
604          }
605        }
606      }
607    }
608  }
609
610  /**
611   * Create a {@link FanOutOneBlockAsyncDFSOutput}. The method maybe blocked so do not call it
612   * inside an {@link EventLoop}.
613   */
614  public static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, Path f,
615    boolean overwrite, boolean createParent, short replication, long blockSize,
616    EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass,
617    final StreamSlowMonitor monitor, boolean noLocalWrite) throws IOException {
618    return new FileSystemLinkResolver<FanOutOneBlockAsyncDFSOutput>() {
619
620      @Override
621      public FanOutOneBlockAsyncDFSOutput doCall(Path p)
622        throws IOException, UnresolvedLinkException {
623        return createOutput(dfs, p.toUri().getPath(), overwrite, createParent, replication,
624          blockSize, eventLoopGroup, channelClass, monitor, noLocalWrite);
625      }
626
627      @Override
628      public FanOutOneBlockAsyncDFSOutput next(FileSystem fs, Path p) throws IOException {
629        throw new UnsupportedOperationException();
630      }
631    }.resolve(dfs, f);
632  }
633
634  public static boolean shouldRetryCreate(RemoteException e) {
635    // RetryStartFileException is introduced in HDFS 2.6+, so here we can only use the class name.
636    // For exceptions other than this, we just throw it out. This is same with
637    // DFSOutputStream.newStreamForCreate.
638    return e.getClassName().endsWith("RetryStartFileException");
639  }
640
641  static void completeFile(FanOutOneBlockAsyncDFSOutput output, DFSClient client,
642    ClientProtocol namenode, String src, String clientName, ExtendedBlock block,
643    HdfsFileStatus stat) {
644    for (int retry = 0;; retry++) {
645      try {
646        if (namenode.complete(src, clientName, block, stat.getFileId())) {
647          endFileLease(output);
648          return;
649        } else {
650          LOG.warn("complete file " + src + " not finished, retry = " + retry);
651        }
652      } catch (RemoteException e) {
653        IOException ioe = e.unwrapRemoteException();
654        if (ioe instanceof LeaseExpiredException) {
655          LOG.warn("lease for file " + src + " is expired, give up", e);
656          return;
657        } else {
658          LOG.warn("complete file " + src + " failed, retry = " + retry, e);
659        }
660      } catch (Exception e) {
661        LOG.warn("complete file " + src + " failed, retry = " + retry, e);
662      }
663      sleepIgnoreInterrupt(retry);
664    }
665  }
666
667  static void sleepIgnoreInterrupt(int retry) {
668    try {
669      Thread.sleep(ConnectionUtils.getPauseTime(100, retry));
670    } catch (InterruptedException e) {
671    }
672  }
673
674  public static String getDataNodeInfo(Collection<DatanodeInfo> datanodeInfos) {
675    if (datanodeInfos.isEmpty()) {
676      return "[]";
677    }
678    return datanodeInfos.stream()
679      .map(datanodeInfo -> new StringBuilder().append("(").append(datanodeInfo.getHostName())
680        .append("/").append(datanodeInfo.getInfoAddr()).append(":")
681        .append(datanodeInfo.getInfoPort()).append(")").toString())
682      .collect(Collectors.joining(",", "[", "]"));
683  }
684}