001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.asyncfs;
019
020import static org.apache.hadoop.fs.CreateFlag.CREATE;
021import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
022import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.createEncryptor;
023import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate;
024import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
025import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
026import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
027import static org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage.PIPELINE_SETUP_CREATE;
028import static org.apache.hbase.thirdparty.io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS;
029import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE;
030
031import com.google.protobuf.CodedOutputStream;
032import java.io.IOException;
033import java.io.InterruptedIOException;
034import java.lang.reflect.InvocationTargetException;
035import java.lang.reflect.Method;
036import java.util.ArrayList;
037import java.util.EnumSet;
038import java.util.List;
039import java.util.concurrent.TimeUnit;
040import org.apache.commons.lang3.ArrayUtils;
041import org.apache.hadoop.conf.Configuration;
042import org.apache.hadoop.crypto.CryptoProtocolVersion;
043import org.apache.hadoop.crypto.Encryptor;
044import org.apache.hadoop.fs.CreateFlag;
045import org.apache.hadoop.fs.FileSystem;
046import org.apache.hadoop.fs.FileSystemLinkResolver;
047import org.apache.hadoop.fs.Path;
048import org.apache.hadoop.fs.UnresolvedLinkException;
049import org.apache.hadoop.fs.permission.FsPermission;
050import org.apache.hadoop.hbase.client.ConnectionUtils;
051import org.apache.hadoop.hbase.util.CancelableProgressable;
052import org.apache.hadoop.hbase.util.FSUtils;
053import org.apache.hadoop.hdfs.DFSClient;
054import org.apache.hadoop.hdfs.DFSOutputStream;
055import org.apache.hadoop.hdfs.DistributedFileSystem;
056import org.apache.hadoop.hdfs.protocol.ClientProtocol;
057import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
058import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
059import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
060import org.apache.hadoop.hdfs.protocol.LocatedBlock;
061import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
062import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
063import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
064import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
065import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
066import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto;
067import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
068import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
069import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto;
070import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
071import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
072import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto;
073import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
074import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
075import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto;
076import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
077import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
078import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
079import org.apache.hadoop.io.EnumSetWritable;
080import org.apache.hadoop.ipc.RemoteException;
081import org.apache.hadoop.net.NetUtils;
082import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
083import org.apache.hadoop.security.token.Token;
084import org.apache.hadoop.util.DataChecksum;
085import org.apache.yetus.audience.InterfaceAudience;
086import org.slf4j.Logger;
087import org.slf4j.LoggerFactory;
088
089import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
090import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
091import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
092import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
093import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator;
094import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream;
095import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator;
096import org.apache.hbase.thirdparty.io.netty.channel.Channel;
097import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture;
098import org.apache.hbase.thirdparty.io.netty.channel.ChannelFutureListener;
099import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler;
100import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
101import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer;
102import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
103import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
104import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
105import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
106import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufDecoder;
107import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
108import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
109import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
110import org.apache.hbase.thirdparty.io.netty.util.concurrent.Future;
111import org.apache.hbase.thirdparty.io.netty.util.concurrent.FutureListener;
112import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise;
113
114/**
115 * Helper class for implementing {@link FanOutOneBlockAsyncDFSOutput}.
116 */
117@InterfaceAudience.Private
118public final class FanOutOneBlockAsyncDFSOutputHelper {
119  private static final Logger LOG =
120      LoggerFactory.getLogger(FanOutOneBlockAsyncDFSOutputHelper.class);
121
122  private FanOutOneBlockAsyncDFSOutputHelper() {
123  }
124
125  public static final String ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES = "hbase.fs.async.create.retries";
126
127  public static final int DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES = 10;
128  // use pooled allocator for performance.
129  private static final ByteBufAllocator ALLOC = PooledByteBufAllocator.DEFAULT;
130
131  // copied from DFSPacket since it is package private.
132  public static final long HEART_BEAT_SEQNO = -1L;
133
134  // Timeouts for communicating with DataNode for streaming writes/reads
135  public static final int READ_TIMEOUT = 60 * 1000;
136
137  private static final DatanodeInfo[] EMPTY_DN_ARRAY = new DatanodeInfo[0];
138
139  // helper class for getting Status from PipelineAckProto. In hadoop 2.6 or before, there is a
140  // getStatus method, and for hadoop 2.7 or after, the status is retrieved from flag. The flag may
141  // get from proto directly, or combined by the reply field of the proto and a ECN object. See
142  // createPipelineAckStatusGetter for more details.
143  private interface PipelineAckStatusGetter {
144    Status get(PipelineAckProto ack);
145  }
146
147  private static final PipelineAckStatusGetter PIPELINE_ACK_STATUS_GETTER;
148
149  // StorageType enum is placed under o.a.h.hdfs in hadoop 2.6 and o.a.h.fs in hadoop 2.7. So here
150  // we need to use reflection to set it.See createStorageTypeSetter for more details.
151  private interface StorageTypeSetter {
152    OpWriteBlockProto.Builder set(OpWriteBlockProto.Builder builder, Enum<?> storageType);
153  }
154
155  private static final StorageTypeSetter STORAGE_TYPE_SETTER;
156
157  // helper class for calling add block method on namenode. There is a addBlockFlags parameter for
158  // hadoop 2.8 or later. See createBlockAdder for more details.
159  private interface BlockAdder {
160
161    LocatedBlock addBlock(ClientProtocol namenode, String src, String clientName,
162        ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId, String[] favoredNodes)
163        throws IOException;
164  }
165
166  private static final BlockAdder BLOCK_ADDER;
167
168  private interface LeaseManager {
169
170    void begin(DFSClient client, long inodeId);
171
172    void end(DFSClient client, long inodeId);
173  }
174
175  private static final LeaseManager LEASE_MANAGER;
176
177  // This is used to terminate a recoverFileLease call when FileSystem is already closed.
178  // isClientRunning is not public so we need to use reflection.
179  private interface DFSClientAdaptor {
180
181    boolean isClientRunning(DFSClient client);
182  }
183
184  private static final DFSClientAdaptor DFS_CLIENT_ADAPTOR;
185
186  // helper class for convert protos.
187  private interface PBHelper {
188
189    ExtendedBlockProto convert(ExtendedBlock b);
190
191    TokenProto convert(Token<?> tok);
192  }
193
194  private static final PBHelper PB_HELPER;
195
196  // helper class for creating data checksum.
197  private interface ChecksumCreater {
198    DataChecksum createChecksum(DFSClient client);
199  }
200
201  private static final ChecksumCreater CHECKSUM_CREATER;
202
203  // helper class for creating files.
204  private interface FileCreator {
205    default HdfsFileStatus create(ClientProtocol instance, String src, FsPermission masked,
206        String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent,
207        short replication, long blockSize, CryptoProtocolVersion[] supportedVersions)
208        throws Exception {
209      try {
210        return (HdfsFileStatus) createObject(instance, src, masked, clientName, flag, createParent,
211          replication, blockSize, supportedVersions);
212      } catch (InvocationTargetException e) {
213        if (e.getCause() instanceof Exception) {
214          throw (Exception) e.getCause();
215        } else {
216          throw new RuntimeException(e.getCause());
217        }
218      }
219    };
220
221    Object createObject(ClientProtocol instance, String src, FsPermission masked, String clientName,
222        EnumSetWritable<CreateFlag> flag, boolean createParent, short replication, long blockSize,
223        CryptoProtocolVersion[] supportedVersions) throws Exception;
224  }
225
226  private static final FileCreator FILE_CREATOR;
227
228  private static DFSClientAdaptor createDFSClientAdaptor() throws NoSuchMethodException {
229    Method isClientRunningMethod = DFSClient.class.getDeclaredMethod("isClientRunning");
230    isClientRunningMethod.setAccessible(true);
231    return new DFSClientAdaptor() {
232
233      @Override
234      public boolean isClientRunning(DFSClient client) {
235        try {
236          return (Boolean) isClientRunningMethod.invoke(client);
237        } catch (IllegalAccessException | InvocationTargetException e) {
238          throw new RuntimeException(e);
239        }
240      }
241    };
242  }
243
244  private static LeaseManager createLeaseManager() throws NoSuchMethodException {
245    Method beginFileLeaseMethod =
246        DFSClient.class.getDeclaredMethod("beginFileLease", long.class, DFSOutputStream.class);
247    beginFileLeaseMethod.setAccessible(true);
248    Method endFileLeaseMethod = DFSClient.class.getDeclaredMethod("endFileLease", long.class);
249    endFileLeaseMethod.setAccessible(true);
250    return new LeaseManager() {
251
252      @Override
253      public void begin(DFSClient client, long inodeId) {
254        try {
255          beginFileLeaseMethod.invoke(client, inodeId, null);
256        } catch (IllegalAccessException | InvocationTargetException e) {
257          throw new RuntimeException(e);
258        }
259      }
260
261      @Override
262      public void end(DFSClient client, long inodeId) {
263        try {
264          endFileLeaseMethod.invoke(client, inodeId);
265        } catch (IllegalAccessException | InvocationTargetException e) {
266          throw new RuntimeException(e);
267        }
268      }
269    };
270  }
271
272  private static PipelineAckStatusGetter createPipelineAckStatusGetter27()
273      throws NoSuchMethodException {
274    Method getFlagListMethod = PipelineAckProto.class.getMethod("getFlagList");
275    @SuppressWarnings("rawtypes")
276    Class<? extends Enum> ecnClass;
277    try {
278      ecnClass = Class.forName("org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck$ECN")
279          .asSubclass(Enum.class);
280    } catch (ClassNotFoundException e) {
281      String msg = "Couldn't properly initialize the PipelineAck.ECN class. Please " +
282          "update your WAL Provider to not make use of the 'asyncfs' provider. See " +
283          "HBASE-16110 for more information.";
284      LOG.error(msg, e);
285      throw new Error(msg, e);
286    }
287    @SuppressWarnings("unchecked")
288    Enum<?> disabledECN = Enum.valueOf(ecnClass, "DISABLED");
289    Method getReplyMethod = PipelineAckProto.class.getMethod("getReply", int.class);
290    Method combineHeaderMethod =
291        PipelineAck.class.getMethod("combineHeader", ecnClass, Status.class);
292    Method getStatusFromHeaderMethod =
293        PipelineAck.class.getMethod("getStatusFromHeader", int.class);
294    return new PipelineAckStatusGetter() {
295
296      @Override
297      public Status get(PipelineAckProto ack) {
298        try {
299          @SuppressWarnings("unchecked")
300          List<Integer> flagList = (List<Integer>) getFlagListMethod.invoke(ack);
301          Integer headerFlag;
302          if (flagList.isEmpty()) {
303            Status reply = (Status) getReplyMethod.invoke(ack, 0);
304            headerFlag = (Integer) combineHeaderMethod.invoke(null, disabledECN, reply);
305          } else {
306            headerFlag = flagList.get(0);
307          }
308          return (Status) getStatusFromHeaderMethod.invoke(null, headerFlag);
309        } catch (IllegalAccessException | InvocationTargetException e) {
310          throw new RuntimeException(e);
311        }
312      }
313    };
314  }
315
316  private static PipelineAckStatusGetter createPipelineAckStatusGetter26()
317      throws NoSuchMethodException {
318    Method getStatusMethod = PipelineAckProto.class.getMethod("getStatus", int.class);
319    return new PipelineAckStatusGetter() {
320
321      @Override
322      public Status get(PipelineAckProto ack) {
323        try {
324          return (Status) getStatusMethod.invoke(ack, 0);
325        } catch (IllegalAccessException | InvocationTargetException e) {
326          throw new RuntimeException(e);
327        }
328      }
329    };
330  }
331
332  private static PipelineAckStatusGetter createPipelineAckStatusGetter()
333      throws NoSuchMethodException {
334    try {
335      return createPipelineAckStatusGetter27();
336    } catch (NoSuchMethodException e) {
337      LOG.debug("Can not get expected method " + e.getMessage() +
338          ", this usually because your Hadoop is pre 2.7.0, " +
339          "try the methods in Hadoop 2.6.x instead.");
340    }
341    return createPipelineAckStatusGetter26();
342  }
343
344  private static StorageTypeSetter createStorageTypeSetter() throws NoSuchMethodException {
345    Method setStorageTypeMethod =
346        OpWriteBlockProto.Builder.class.getMethod("setStorageType", StorageTypeProto.class);
347    ImmutableMap.Builder<String, StorageTypeProto> builder = ImmutableMap.builder();
348    for (StorageTypeProto storageTypeProto : StorageTypeProto.values()) {
349      builder.put(storageTypeProto.name(), storageTypeProto);
350    }
351    ImmutableMap<String, StorageTypeProto> name2ProtoEnum = builder.build();
352    return new StorageTypeSetter() {
353
354      @Override
355      public OpWriteBlockProto.Builder set(OpWriteBlockProto.Builder builder, Enum<?> storageType) {
356        Object protoEnum = name2ProtoEnum.get(storageType.name());
357        try {
358          setStorageTypeMethod.invoke(builder, protoEnum);
359        } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
360          throw new RuntimeException(e);
361        }
362        return builder;
363      }
364    };
365  }
366
367  private static BlockAdder createBlockAdder() throws NoSuchMethodException {
368    for (Method method : ClientProtocol.class.getMethods()) {
369      if (method.getName().equals("addBlock")) {
370        Method addBlockMethod = method;
371        Class<?>[] paramTypes = addBlockMethod.getParameterTypes();
372        if (paramTypes[paramTypes.length - 1] == String[].class) {
373          return new BlockAdder() {
374
375            @Override
376            public LocatedBlock addBlock(ClientProtocol namenode, String src, String clientName,
377                ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId,
378                String[] favoredNodes) throws IOException {
379              try {
380                return (LocatedBlock) addBlockMethod.invoke(namenode, src, clientName, previous,
381                  excludeNodes, fileId, favoredNodes);
382              } catch (IllegalAccessException e) {
383                throw new RuntimeException(e);
384              } catch (InvocationTargetException e) {
385                Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
386                throw new RuntimeException(e);
387              }
388            }
389          };
390        } else {
391          return new BlockAdder() {
392
393            @Override
394            public LocatedBlock addBlock(ClientProtocol namenode, String src, String clientName,
395                ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId,
396                String[] favoredNodes) throws IOException {
397              try {
398                return (LocatedBlock) addBlockMethod.invoke(namenode, src, clientName, previous,
399                  excludeNodes, fileId, favoredNodes, null);
400              } catch (IllegalAccessException e) {
401                throw new RuntimeException(e);
402              } catch (InvocationTargetException e) {
403                Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
404                throw new RuntimeException(e);
405              }
406            }
407          };
408        }
409      }
410    }
411    throw new NoSuchMethodException("Can not find addBlock method in ClientProtocol");
412  }
413
414  private static PBHelper createPBHelper() throws NoSuchMethodException {
415    Class<?> helperClass;
416    String clazzName = "org.apache.hadoop.hdfs.protocolPB.PBHelperClient";
417    try {
418      helperClass = Class.forName(clazzName);
419    } catch (ClassNotFoundException e) {
420      helperClass = org.apache.hadoop.hdfs.protocolPB.PBHelper.class;
421      LOG.debug("" + clazzName + " not found (Hadoop is pre-2.8.0?); using " +
422          helperClass.toString() + " instead.");
423    }
424    Method convertEBMethod = helperClass.getMethod("convert", ExtendedBlock.class);
425    Method convertTokenMethod = helperClass.getMethod("convert", Token.class);
426    return new PBHelper() {
427
428      @Override
429      public ExtendedBlockProto convert(ExtendedBlock b) {
430        try {
431          return (ExtendedBlockProto) convertEBMethod.invoke(null, b);
432        } catch (IllegalAccessException | InvocationTargetException e) {
433          throw new RuntimeException(e);
434        }
435      }
436
437      @Override
438      public TokenProto convert(Token<?> tok) {
439        try {
440          return (TokenProto) convertTokenMethod.invoke(null, tok);
441        } catch (IllegalAccessException | InvocationTargetException e) {
442          throw new RuntimeException(e);
443        }
444      }
445    };
446  }
447
448  private static ChecksumCreater createChecksumCreater28(Method getConfMethod, Class<?> confClass)
449      throws NoSuchMethodException {
450    for (Method method : confClass.getMethods()) {
451      if (method.getName().equals("createChecksum")) {
452        Method createChecksumMethod = method;
453        return new ChecksumCreater() {
454
455          @Override
456          public DataChecksum createChecksum(DFSClient client) {
457            try {
458              return (DataChecksum) createChecksumMethod.invoke(getConfMethod.invoke(client),
459                (Object) null);
460            } catch (IllegalAccessException | InvocationTargetException e) {
461              throw new RuntimeException(e);
462            }
463          }
464        };
465      }
466    }
467    throw new NoSuchMethodException("Can not find createChecksum method in DfsClientConf");
468  }
469
470  private static ChecksumCreater createChecksumCreater27(Method getConfMethod, Class<?> confClass)
471      throws NoSuchMethodException {
472    Method createChecksumMethod = confClass.getDeclaredMethod("createChecksum");
473    createChecksumMethod.setAccessible(true);
474    return new ChecksumCreater() {
475
476      @Override
477      public DataChecksum createChecksum(DFSClient client) {
478        try {
479          return (DataChecksum) createChecksumMethod.invoke(getConfMethod.invoke(client));
480        } catch (IllegalAccessException | InvocationTargetException e) {
481          throw new RuntimeException(e);
482        }
483      }
484    };
485  }
486
487  private static ChecksumCreater createChecksumCreater()
488      throws NoSuchMethodException, ClassNotFoundException {
489    Method getConfMethod = DFSClient.class.getMethod("getConf");
490    try {
491      return createChecksumCreater28(getConfMethod,
492        Class.forName("org.apache.hadoop.hdfs.client.impl.DfsClientConf"));
493    } catch (ClassNotFoundException e) {
494      LOG.debug("No DfsClientConf class found, should be hadoop 2.7-", e);
495    }
496    return createChecksumCreater27(getConfMethod,
497      Class.forName("org.apache.hadoop.hdfs.DFSClient$Conf"));
498  }
499
500  private static FileCreator createFileCreator3() throws NoSuchMethodException {
501    Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class,
502      String.class, EnumSetWritable.class, boolean.class, short.class, long.class,
503      CryptoProtocolVersion[].class, String.class);
504
505    return (instance, src, masked, clientName, flag, createParent, replication, blockSize,
506        supportedVersions) -> {
507      return (HdfsFileStatus) createMethod.invoke(instance, src, masked, clientName, flag,
508        createParent, replication, blockSize, supportedVersions, null);
509    };
510  }
511
512  private static FileCreator createFileCreator2() throws NoSuchMethodException {
513    Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class,
514      String.class, EnumSetWritable.class, boolean.class, short.class, long.class,
515      CryptoProtocolVersion[].class);
516
517    return (instance, src, masked, clientName, flag, createParent, replication, blockSize,
518        supportedVersions) -> {
519      return (HdfsFileStatus) createMethod.invoke(instance, src, masked, clientName, flag,
520        createParent, replication, blockSize, supportedVersions);
521    };
522  }
523
524  private static FileCreator createFileCreator() throws NoSuchMethodException {
525    try {
526      return createFileCreator3();
527    } catch (NoSuchMethodException e) {
528      LOG.debug("ClientProtocol::create wrong number of arguments, should be hadoop 2.x");
529    }
530    return createFileCreator2();
531  }
532
533  // cancel the processing if DFSClient is already closed.
534  static final class CancelOnClose implements CancelableProgressable {
535
536    private final DFSClient client;
537
538    public CancelOnClose(DFSClient client) {
539      this.client = client;
540    }
541
542    @Override
543    public boolean progress() {
544      return DFS_CLIENT_ADAPTOR.isClientRunning(client);
545    }
546  }
547
548  static {
549    try {
550      PIPELINE_ACK_STATUS_GETTER = createPipelineAckStatusGetter();
551      STORAGE_TYPE_SETTER = createStorageTypeSetter();
552      BLOCK_ADDER = createBlockAdder();
553      LEASE_MANAGER = createLeaseManager();
554      DFS_CLIENT_ADAPTOR = createDFSClientAdaptor();
555      PB_HELPER = createPBHelper();
556      CHECKSUM_CREATER = createChecksumCreater();
557      FILE_CREATOR = createFileCreator();
558    } catch (Exception e) {
559      String msg = "Couldn't properly initialize access to HDFS internals. Please " +
560          "update your WAL Provider to not make use of the 'asyncfs' provider. See " +
561          "HBASE-16110 for more information.";
562      LOG.error(msg, e);
563      throw new Error(msg, e);
564    }
565  }
566
567  static void beginFileLease(DFSClient client, long inodeId) {
568    LEASE_MANAGER.begin(client, inodeId);
569  }
570
571  static void endFileLease(DFSClient client, long inodeId) {
572    LEASE_MANAGER.end(client, inodeId);
573  }
574
575  static DataChecksum createChecksum(DFSClient client) {
576    return CHECKSUM_CREATER.createChecksum(client);
577  }
578
579  static Status getStatus(PipelineAckProto ack) {
580    return PIPELINE_ACK_STATUS_GETTER.get(ack);
581  }
582
583  private static void processWriteBlockResponse(Channel channel, DatanodeInfo dnInfo,
584      Promise<Channel> promise, int timeoutMs) {
585    channel.pipeline().addLast(new IdleStateHandler(timeoutMs, 0, 0, TimeUnit.MILLISECONDS),
586      new ProtobufVarint32FrameDecoder(),
587      new ProtobufDecoder(BlockOpResponseProto.getDefaultInstance()),
588      new SimpleChannelInboundHandler<BlockOpResponseProto>() {
589
590        @Override
591        protected void channelRead0(ChannelHandlerContext ctx, BlockOpResponseProto resp)
592            throws Exception {
593          Status pipelineStatus = resp.getStatus();
594          if (PipelineAck.isRestartOOBStatus(pipelineStatus)) {
595            throw new IOException("datanode " + dnInfo + " is restarting");
596          }
597          String logInfo = "ack with firstBadLink as " + resp.getFirstBadLink();
598          if (resp.getStatus() != Status.SUCCESS) {
599            if (resp.getStatus() == Status.ERROR_ACCESS_TOKEN) {
600              throw new InvalidBlockTokenException("Got access token error" + ", status message " +
601                  resp.getMessage() + ", " + logInfo);
602            } else {
603              throw new IOException("Got error" + ", status=" + resp.getStatus().name() +
604                  ", status message " + resp.getMessage() + ", " + logInfo);
605            }
606          }
607          // success
608          ChannelPipeline p = ctx.pipeline();
609          for (ChannelHandler handler; (handler = p.removeLast()) != null;) {
610            // do not remove all handlers because we may have wrap or unwrap handlers at the header
611            // of pipeline.
612            if (handler instanceof IdleStateHandler) {
613              break;
614            }
615          }
616          // Disable auto read here. Enable it after we setup the streaming pipeline in
617          // FanOutOneBLockAsyncDFSOutput.
618          ctx.channel().config().setAutoRead(false);
619          promise.trySuccess(ctx.channel());
620        }
621
622        @Override
623        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
624          promise.tryFailure(new IOException("connection to " + dnInfo + " is closed"));
625        }
626
627        @Override
628        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
629          if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == READER_IDLE) {
630            promise
631                .tryFailure(new IOException("Timeout(" + timeoutMs + "ms) waiting for response"));
632          } else {
633            super.userEventTriggered(ctx, evt);
634          }
635        }
636
637        @Override
638        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
639          promise.tryFailure(cause);
640        }
641      });
642  }
643
644  private static void requestWriteBlock(Channel channel, Enum<?> storageType,
645      OpWriteBlockProto.Builder writeBlockProtoBuilder) throws IOException {
646    OpWriteBlockProto proto = STORAGE_TYPE_SETTER.set(writeBlockProtoBuilder, storageType).build();
647    int protoLen = proto.getSerializedSize();
648    ByteBuf buffer =
649        channel.alloc().buffer(3 + CodedOutputStream.computeRawVarint32Size(protoLen) + protoLen);
650    buffer.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
651    buffer.writeByte(Op.WRITE_BLOCK.code);
652    proto.writeDelimitedTo(new ByteBufOutputStream(buffer));
653    channel.writeAndFlush(buffer);
654  }
655
656  private static void initialize(Configuration conf, Channel channel, DatanodeInfo dnInfo,
657      Enum<?> storageType, OpWriteBlockProto.Builder writeBlockProtoBuilder, int timeoutMs,
658      DFSClient client, Token<BlockTokenIdentifier> accessToken, Promise<Channel> promise)
659      throws IOException {
660    Promise<Void> saslPromise = channel.eventLoop().newPromise();
661    trySaslNegotiate(conf, channel, dnInfo, timeoutMs, client, accessToken, saslPromise);
662    saslPromise.addListener(new FutureListener<Void>() {
663
664      @Override
665      public void operationComplete(Future<Void> future) throws Exception {
666        if (future.isSuccess()) {
667          // setup response processing pipeline first, then send request.
668          processWriteBlockResponse(channel, dnInfo, promise, timeoutMs);
669          requestWriteBlock(channel, storageType, writeBlockProtoBuilder);
670        } else {
671          promise.tryFailure(future.cause());
672        }
673      }
674    });
675  }
676
677  private static List<Future<Channel>> connectToDataNodes(Configuration conf, DFSClient client,
678      String clientName, LocatedBlock locatedBlock, long maxBytesRcvd, long latestGS,
679      BlockConstructionStage stage, DataChecksum summer, EventLoopGroup eventLoopGroup,
680      Class<? extends Channel> channelClass) {
681    Enum<?>[] storageTypes = locatedBlock.getStorageTypes();
682    DatanodeInfo[] datanodeInfos = locatedBlock.getLocations();
683    boolean connectToDnViaHostname =
684        conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
685    int timeoutMs = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT);
686    ExtendedBlock blockCopy = new ExtendedBlock(locatedBlock.getBlock());
687    blockCopy.setNumBytes(locatedBlock.getBlockSize());
688    ClientOperationHeaderProto header = ClientOperationHeaderProto.newBuilder()
689        .setBaseHeader(BaseHeaderProto.newBuilder().setBlock(PB_HELPER.convert(blockCopy))
690            .setToken(PB_HELPER.convert(locatedBlock.getBlockToken())))
691        .setClientName(clientName).build();
692    ChecksumProto checksumProto = DataTransferProtoUtil.toProto(summer);
693    OpWriteBlockProto.Builder writeBlockProtoBuilder = OpWriteBlockProto.newBuilder()
694        .setHeader(header).setStage(OpWriteBlockProto.BlockConstructionStage.valueOf(stage.name()))
695        .setPipelineSize(1).setMinBytesRcvd(locatedBlock.getBlock().getNumBytes())
696        .setMaxBytesRcvd(maxBytesRcvd).setLatestGenerationStamp(latestGS)
697        .setRequestedChecksum(checksumProto)
698        .setCachingStrategy(CachingStrategyProto.newBuilder().setDropBehind(true).build());
699    List<Future<Channel>> futureList = new ArrayList<>(datanodeInfos.length);
700    for (int i = 0; i < datanodeInfos.length; i++) {
701      DatanodeInfo dnInfo = datanodeInfos[i];
702      Enum<?> storageType = storageTypes[i];
703      Promise<Channel> promise = eventLoopGroup.next().newPromise();
704      futureList.add(promise);
705      String dnAddr = dnInfo.getXferAddr(connectToDnViaHostname);
706      new Bootstrap().group(eventLoopGroup).channel(channelClass)
707          .option(CONNECT_TIMEOUT_MILLIS, timeoutMs).handler(new ChannelInitializer<Channel>() {
708
709            @Override
710            protected void initChannel(Channel ch) throws Exception {
711              // we need to get the remote address of the channel so we can only move on after
712              // channel connected. Leave an empty implementation here because netty does not allow
713              // a null handler.
714            }
715          }).connect(NetUtils.createSocketAddr(dnAddr)).addListener(new ChannelFutureListener() {
716
717            @Override
718            public void operationComplete(ChannelFuture future) throws Exception {
719              if (future.isSuccess()) {
720                initialize(conf, future.channel(), dnInfo, storageType, writeBlockProtoBuilder,
721                  timeoutMs, client, locatedBlock.getBlockToken(), promise);
722              } else {
723                promise.tryFailure(future.cause());
724              }
725            }
726          });
727    }
728    return futureList;
729  }
730
731  /**
732   * Exception other than RemoteException thrown when calling create on namenode
733   */
734  public static class NameNodeException extends IOException {
735
736    private static final long serialVersionUID = 3143237406477095390L;
737
738    public NameNodeException(Throwable cause) {
739      super(cause);
740    }
741  }
742
743  private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src,
744      boolean overwrite, boolean createParent, short replication, long blockSize,
745      EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) throws IOException {
746    Configuration conf = dfs.getConf();
747    FSUtils fsUtils = FSUtils.getInstance(dfs, conf);
748    DFSClient client = dfs.getClient();
749    String clientName = client.getClientName();
750    ClientProtocol namenode = client.getNamenode();
751    int createMaxRetries = conf.getInt(ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES,
752      DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES);
753    DatanodeInfo[] excludesNodes = EMPTY_DN_ARRAY;
754    for (int retry = 0;; retry++) {
755      HdfsFileStatus stat;
756      try {
757        stat = FILE_CREATOR.create(namenode, src,
758          FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName,
759          new EnumSetWritable<>(overwrite ? EnumSet.of(CREATE, OVERWRITE) : EnumSet.of(CREATE)),
760          createParent, replication, blockSize, CryptoProtocolVersion.supported());
761      } catch (Exception e) {
762        if (e instanceof RemoteException) {
763          throw (RemoteException) e;
764        } else {
765          throw new NameNodeException(e);
766        }
767      }
768      beginFileLease(client, stat.getFileId());
769      boolean succ = false;
770      LocatedBlock locatedBlock = null;
771      List<Future<Channel>> futureList = null;
772      try {
773        DataChecksum summer = createChecksum(client);
774        locatedBlock = BLOCK_ADDER.addBlock(namenode, src, client.getClientName(), null,
775          excludesNodes, stat.getFileId(), null);
776        List<Channel> datanodeList = new ArrayList<>();
777        futureList = connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L,
778          PIPELINE_SETUP_CREATE, summer, eventLoopGroup, channelClass);
779        for (int i = 0, n = futureList.size(); i < n; i++) {
780          try {
781            datanodeList.add(futureList.get(i).syncUninterruptibly().getNow());
782          } catch (Exception e) {
783            // exclude the broken DN next time
784            excludesNodes = ArrayUtils.add(excludesNodes, locatedBlock.getLocations()[i]);
785            throw e;
786          }
787        }
788        Encryptor encryptor = createEncryptor(conf, stat, client);
789        FanOutOneBlockAsyncDFSOutput output =
790          new FanOutOneBlockAsyncDFSOutput(conf, fsUtils, dfs, client, namenode, clientName, src,
791              stat.getFileId(), locatedBlock, encryptor, datanodeList, summer, ALLOC);
792        succ = true;
793        return output;
794      } catch (RemoteException e) {
795        LOG.warn("create fan-out dfs output {} failed, retry = {}", src, retry, e);
796        if (shouldRetryCreate(e)) {
797          if (retry >= createMaxRetries) {
798            throw e.unwrapRemoteException();
799          }
800        } else {
801          throw e.unwrapRemoteException();
802        }
803      } catch (IOException e) {
804        LOG.warn("create fan-out dfs output {} failed, retry = {}", src, retry, e);
805        if (retry >= createMaxRetries) {
806          throw e;
807        }
808        // overwrite the old broken file.
809        overwrite = true;
810        try {
811          Thread.sleep(ConnectionUtils.getPauseTime(100, retry));
812        } catch (InterruptedException ie) {
813          throw new InterruptedIOException();
814        }
815      } finally {
816        if (!succ) {
817          if (futureList != null) {
818            for (Future<Channel> f : futureList) {
819              f.addListener(new FutureListener<Channel>() {
820
821                @Override
822                public void operationComplete(Future<Channel> future) throws Exception {
823                  if (future.isSuccess()) {
824                    future.getNow().close();
825                  }
826                }
827              });
828            }
829          }
830          endFileLease(client, stat.getFileId());
831        }
832      }
833    }
834  }
835
836  /**
837   * Create a {@link FanOutOneBlockAsyncDFSOutput}. The method maybe blocked so do not call it
838   * inside an {@link EventLoop}.
839   */
840  public static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, Path f,
841      boolean overwrite, boolean createParent, short replication, long blockSize,
842      EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) throws IOException {
843    return new FileSystemLinkResolver<FanOutOneBlockAsyncDFSOutput>() {
844
845      @Override
846      public FanOutOneBlockAsyncDFSOutput doCall(Path p)
847          throws IOException, UnresolvedLinkException {
848        return createOutput(dfs, p.toUri().getPath(), overwrite, createParent, replication,
849          blockSize, eventLoopGroup, channelClass);
850      }
851
852      @Override
853      public FanOutOneBlockAsyncDFSOutput next(FileSystem fs, Path p) throws IOException {
854        throw new UnsupportedOperationException();
855      }
856    }.resolve(dfs, f);
857  }
858
859  public static boolean shouldRetryCreate(RemoteException e) {
860    // RetryStartFileException is introduced in HDFS 2.6+, so here we can only use the class name.
861    // For exceptions other than this, we just throw it out. This is same with
862    // DFSOutputStream.newStreamForCreate.
863    return e.getClassName().endsWith("RetryStartFileException");
864  }
865
866  static void completeFile(DFSClient client, ClientProtocol namenode, String src, String clientName,
867      ExtendedBlock block, long fileId) {
868    for (int retry = 0;; retry++) {
869      try {
870        if (namenode.complete(src, clientName, block, fileId)) {
871          endFileLease(client, fileId);
872          return;
873        } else {
874          LOG.warn("complete file " + src + " not finished, retry = " + retry);
875        }
876      } catch (RemoteException e) {
877        IOException ioe = e.unwrapRemoteException();
878        if (ioe instanceof LeaseExpiredException) {
879          LOG.warn("lease for file " + src + " is expired, give up", e);
880          return;
881        } else {
882          LOG.warn("complete file " + src + " failed, retry = " + retry, e);
883        }
884      } catch (Exception e) {
885        LOG.warn("complete file " + src + " failed, retry = " + retry, e);
886      }
887      sleepIgnoreInterrupt(retry);
888    }
889  }
890
891  static void sleepIgnoreInterrupt(int retry) {
892    try {
893      Thread.sleep(ConnectionUtils.getPauseTime(100, retry));
894    } catch (InterruptedException e) {
895    }
896  }
897}