001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.asyncfs;
019
020import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
021import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE;
022
023import com.google.protobuf.CodedOutputStream;
024import java.io.IOException;
025import java.lang.reflect.Constructor;
026import java.lang.reflect.Field;
027import java.lang.reflect.InvocationTargetException;
028import java.lang.reflect.Method;
029import java.net.InetAddress;
030import java.net.InetSocketAddress;
031import java.nio.ByteBuffer;
032import java.security.GeneralSecurityException;
033import java.util.Arrays;
034import java.util.Base64;
035import java.util.Collections;
036import java.util.List;
037import java.util.Map;
038import java.util.Set;
039import java.util.concurrent.TimeUnit;
040import java.util.concurrent.atomic.AtomicBoolean;
041import javax.security.auth.callback.Callback;
042import javax.security.auth.callback.CallbackHandler;
043import javax.security.auth.callback.NameCallback;
044import javax.security.auth.callback.PasswordCallback;
045import javax.security.auth.callback.UnsupportedCallbackException;
046import javax.security.sasl.RealmCallback;
047import javax.security.sasl.RealmChoiceCallback;
048import javax.security.sasl.Sasl;
049import javax.security.sasl.SaslClient;
050import javax.security.sasl.SaslException;
051import org.apache.commons.lang3.StringUtils;
052import org.apache.hadoop.conf.Configuration;
053import org.apache.hadoop.crypto.CipherOption;
054import org.apache.hadoop.crypto.CipherSuite;
055import org.apache.hadoop.crypto.CryptoCodec;
056import org.apache.hadoop.crypto.Decryptor;
057import org.apache.hadoop.crypto.Encryptor;
058import org.apache.hadoop.crypto.key.KeyProvider;
059import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
060import org.apache.hadoop.fs.FileEncryptionInfo;
061import org.apache.hadoop.hdfs.DFSClient;
062import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
063import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
064import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
065import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
066import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
067import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto;
068import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus;
069import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
070import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
071import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
072import org.apache.hadoop.security.SaslPropertiesResolver;
073import org.apache.hadoop.security.SaslRpcServer.QualityOfProtection;
074import org.apache.hadoop.security.UserGroupInformation;
075import org.apache.hadoop.security.token.Token;
076import org.apache.yetus.audience.InterfaceAudience;
077import org.slf4j.Logger;
078import org.slf4j.LoggerFactory;
079
080import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
081import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
082import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
083import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
084import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream;
085import org.apache.hbase.thirdparty.io.netty.buffer.CompositeByteBuf;
086import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled;
087import org.apache.hbase.thirdparty.io.netty.channel.Channel;
088import org.apache.hbase.thirdparty.io.netty.channel.ChannelDuplexHandler;
089import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
090import org.apache.hbase.thirdparty.io.netty.channel.ChannelOutboundHandlerAdapter;
091import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
092import org.apache.hbase.thirdparty.io.netty.channel.ChannelPromise;
093import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
094import org.apache.hbase.thirdparty.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
095import org.apache.hbase.thirdparty.io.netty.handler.codec.MessageToByteEncoder;
096import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
097import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
098import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
099import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise;
100
101/**
102 * Helper class for adding sasl support for {@link FanOutOneBlockAsyncDFSOutput}.
103 */
104@InterfaceAudience.Private
105public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
106  private static final Logger LOG =
107      LoggerFactory.getLogger(FanOutOneBlockAsyncDFSOutputSaslHelper.class);
108
109  private FanOutOneBlockAsyncDFSOutputSaslHelper() {
110  }
111
112  private static final String SERVER_NAME = "0";
113  private static final String PROTOCOL = "hdfs";
114  private static final String MECHANISM = "DIGEST-MD5";
115  private static final int SASL_TRANSFER_MAGIC_NUMBER = 0xDEADBEEF;
116  private static final String NAME_DELIMITER = " ";
117
118  private interface SaslAdaptor {
119
120    TrustedChannelResolver getTrustedChannelResolver(SaslDataTransferClient saslClient);
121
122    SaslPropertiesResolver getSaslPropsResolver(SaslDataTransferClient saslClient);
123
124    AtomicBoolean getFallbackToSimpleAuth(SaslDataTransferClient saslClient);
125  }
126
127  private static final SaslAdaptor SASL_ADAPTOR;
128
129  private interface TransparentCryptoHelper {
130
131    Encryptor createEncryptor(Configuration conf, FileEncryptionInfo feInfo, DFSClient client)
132        throws IOException;
133  }
134
135  private static final TransparentCryptoHelper TRANSPARENT_CRYPTO_HELPER;
136
137  private static SaslAdaptor createSaslAdaptor()
138      throws NoSuchFieldException, NoSuchMethodException {
139    Field saslPropsResolverField =
140        SaslDataTransferClient.class.getDeclaredField("saslPropsResolver");
141    saslPropsResolverField.setAccessible(true);
142    Field trustedChannelResolverField =
143        SaslDataTransferClient.class.getDeclaredField("trustedChannelResolver");
144    trustedChannelResolverField.setAccessible(true);
145    Field fallbackToSimpleAuthField =
146        SaslDataTransferClient.class.getDeclaredField("fallbackToSimpleAuth");
147    fallbackToSimpleAuthField.setAccessible(true);
148    return new SaslAdaptor() {
149
150      @Override
151      public TrustedChannelResolver getTrustedChannelResolver(SaslDataTransferClient saslClient) {
152        try {
153          return (TrustedChannelResolver) trustedChannelResolverField.get(saslClient);
154        } catch (IllegalAccessException e) {
155          throw new RuntimeException(e);
156        }
157      }
158
159      @Override
160      public SaslPropertiesResolver getSaslPropsResolver(SaslDataTransferClient saslClient) {
161        try {
162          return (SaslPropertiesResolver) saslPropsResolverField.get(saslClient);
163        } catch (IllegalAccessException e) {
164          throw new RuntimeException(e);
165        }
166      }
167
168      @Override
169      public AtomicBoolean getFallbackToSimpleAuth(SaslDataTransferClient saslClient) {
170        try {
171          return (AtomicBoolean) fallbackToSimpleAuthField.get(saslClient);
172        } catch (IllegalAccessException e) {
173          throw new RuntimeException(e);
174        }
175      }
176    };
177  }
178
179  private static TransparentCryptoHelper createTransparentCryptoHelperWithoutHDFS12396()
180      throws NoSuchMethodException {
181    Method decryptEncryptedDataEncryptionKeyMethod = DFSClient.class
182      .getDeclaredMethod("decryptEncryptedDataEncryptionKey", FileEncryptionInfo.class);
183    decryptEncryptedDataEncryptionKeyMethod.setAccessible(true);
184    return new TransparentCryptoHelper() {
185
186      @Override
187      public Encryptor createEncryptor(Configuration conf, FileEncryptionInfo feInfo,
188          DFSClient client) throws IOException {
189        try {
190          KeyVersion decryptedKey =
191            (KeyVersion) decryptEncryptedDataEncryptionKeyMethod.invoke(client, feInfo);
192          CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf, feInfo.getCipherSuite());
193          Encryptor encryptor = cryptoCodec.createEncryptor();
194          encryptor.init(decryptedKey.getMaterial(), feInfo.getIV());
195          return encryptor;
196        } catch (InvocationTargetException e) {
197          Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
198          throw new RuntimeException(e.getTargetException());
199        } catch (GeneralSecurityException e) {
200          throw new IOException(e);
201        } catch (IllegalAccessException e) {
202          throw new RuntimeException(e);
203        }
204      }
205    };
206  }
207
208  private static TransparentCryptoHelper createTransparentCryptoHelperWithHDFS12396()
209      throws ClassNotFoundException, NoSuchMethodException {
210    Class<?> hdfsKMSUtilCls = Class.forName("org.apache.hadoop.hdfs.HdfsKMSUtil");
211    Method decryptEncryptedDataEncryptionKeyMethod = hdfsKMSUtilCls.getDeclaredMethod(
212      "decryptEncryptedDataEncryptionKey", FileEncryptionInfo.class, KeyProvider.class);
213    decryptEncryptedDataEncryptionKeyMethod.setAccessible(true);
214    return new TransparentCryptoHelper() {
215
216      @Override
217      public Encryptor createEncryptor(Configuration conf, FileEncryptionInfo feInfo,
218          DFSClient client) throws IOException {
219        try {
220          KeyVersion decryptedKey = (KeyVersion) decryptEncryptedDataEncryptionKeyMethod
221            .invoke(null, feInfo, client.getKeyProvider());
222          CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf, feInfo.getCipherSuite());
223          Encryptor encryptor = cryptoCodec.createEncryptor();
224          encryptor.init(decryptedKey.getMaterial(), feInfo.getIV());
225          return encryptor;
226        } catch (InvocationTargetException e) {
227          Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
228          throw new RuntimeException(e.getTargetException());
229        } catch (GeneralSecurityException e) {
230          throw new IOException(e);
231        } catch (IllegalAccessException e) {
232          throw new RuntimeException(e);
233        }
234      }
235    };
236  }
237
238  private static TransparentCryptoHelper createTransparentCryptoHelper()
239      throws NoSuchMethodException, ClassNotFoundException {
240    try {
241      return createTransparentCryptoHelperWithoutHDFS12396();
242    } catch (NoSuchMethodException e) {
243      LOG.debug("No decryptEncryptedDataEncryptionKey method in DFSClient," +
244        " should be hadoop version with HDFS-12396", e);
245    }
246    return createTransparentCryptoHelperWithHDFS12396();
247  }
248
249  static {
250    try {
251      SASL_ADAPTOR = createSaslAdaptor();
252      TRANSPARENT_CRYPTO_HELPER = createTransparentCryptoHelper();
253    } catch (Exception e) {
254      String msg = "Couldn't properly initialize access to HDFS internals. Please "
255          + "update your WAL Provider to not make use of the 'asyncfs' provider. See "
256          + "HBASE-16110 for more information.";
257      LOG.error(msg, e);
258      throw new Error(msg, e);
259    }
260  }
261
262  /**
263   * Sets user name and password when asked by the client-side SASL object.
264   */
265  private static final class SaslClientCallbackHandler implements CallbackHandler {
266
267    private final char[] password;
268    private final String userName;
269
270    /**
271     * Creates a new SaslClientCallbackHandler.
272     * @param userName SASL user name
273     * @param password SASL password
274     */
275    public SaslClientCallbackHandler(String userName, char[] password) {
276      this.password = password;
277      this.userName = userName;
278    }
279
280    @Override
281    public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
282      NameCallback nc = null;
283      PasswordCallback pc = null;
284      RealmCallback rc = null;
285      for (Callback callback : callbacks) {
286        if (callback instanceof RealmChoiceCallback) {
287          continue;
288        } else if (callback instanceof NameCallback) {
289          nc = (NameCallback) callback;
290        } else if (callback instanceof PasswordCallback) {
291          pc = (PasswordCallback) callback;
292        } else if (callback instanceof RealmCallback) {
293          rc = (RealmCallback) callback;
294        } else {
295          throw new UnsupportedCallbackException(callback, "Unrecognized SASL client callback");
296        }
297      }
298      if (nc != null) {
299        nc.setName(userName);
300      }
301      if (pc != null) {
302        pc.setPassword(password);
303      }
304      if (rc != null) {
305        rc.setText(rc.getDefaultText());
306      }
307    }
308  }
309
310  private static final class SaslNegotiateHandler extends ChannelDuplexHandler {
311
312    private final Configuration conf;
313
314    private final Map<String, String> saslProps;
315
316    private final SaslClient saslClient;
317
318    private final int timeoutMs;
319
320    private final Promise<Void> promise;
321
322    private final DFSClient dfsClient;
323
324    private int step = 0;
325
326    public SaslNegotiateHandler(Configuration conf, String username, char[] password,
327        Map<String, String> saslProps, int timeoutMs, Promise<Void> promise,
328        DFSClient dfsClient) throws SaslException {
329      this.conf = conf;
330      this.saslProps = saslProps;
331      this.saslClient = Sasl.createSaslClient(new String[] { MECHANISM }, username, PROTOCOL,
332        SERVER_NAME, saslProps, new SaslClientCallbackHandler(username, password));
333      this.timeoutMs = timeoutMs;
334      this.promise = promise;
335      this.dfsClient = dfsClient;
336    }
337
338    private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload) throws IOException {
339      sendSaslMessage(ctx, payload, null);
340    }
341
342    private List<CipherOption> getCipherOptions() throws IOException {
343      // Negotiate cipher suites if configured. Currently, the only supported
344      // cipher suite is AES/CTR/NoPadding, but the protocol allows multiple
345      // values for future expansion.
346      String cipherSuites = conf.get(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY);
347      if (StringUtils.isBlank(cipherSuites)) {
348        return null;
349      }
350      if (!cipherSuites.equals(CipherSuite.AES_CTR_NOPADDING.getName())) {
351        throw new IOException(String.format("Invalid cipher suite, %s=%s",
352          DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuites));
353      }
354      return Collections.singletonList(new CipherOption(CipherSuite.AES_CTR_NOPADDING));
355    }
356
357    /**
358     * The asyncfs subsystem emulates a HDFS client by sending protobuf messages via netty.
359     * After Hadoop 3.3.0, the protobuf classes are relocated to org.apache.hadoop.thirdparty.protobuf.*.
360     * Use Reflection to check which ones to use.
361     */
362    private static class BuilderPayloadSetter {
363      private static Method setPayloadMethod;
364      private static Constructor<?> constructor;
365
366      /**
367       * Create a ByteString from byte array without copying (wrap), and then set it as the payload
368       * for the builder.
369       *
370       * @param builder builder for HDFS DataTransferEncryptorMessage.
371       * @param payload byte array of payload.
372       * @throws IOException
373       */
374      static void wrapAndSetPayload(DataTransferEncryptorMessageProto.Builder builder, byte[] payload)
375        throws IOException {
376        Object byteStringObject;
377        try {
378          // byteStringObject = new LiteralByteString(payload);
379          byteStringObject = constructor.newInstance(payload);
380          // builder.setPayload(byteStringObject);
381          setPayloadMethod.invoke(builder, constructor.getDeclaringClass().cast(byteStringObject));
382        } catch (IllegalAccessException | InstantiationException e) {
383          throw new RuntimeException(e);
384
385        } catch (InvocationTargetException e) {
386          Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
387          throw new RuntimeException(e.getTargetException());
388        }
389      }
390
391      static {
392        Class<?> builderClass = DataTransferEncryptorMessageProto.Builder.class;
393
394        // Try the unrelocated ByteString
395        Class<?> byteStringClass = com.google.protobuf.ByteString.class;
396        try {
397          // See if it can load the relocated ByteString, which comes from hadoop-thirdparty.
398          byteStringClass = Class.forName("org.apache.hadoop.thirdparty.protobuf.ByteString");
399          LOG.debug("Found relocated ByteString class from hadoop-thirdparty." +
400            " Assuming this is Hadoop 3.3.0+.");
401        } catch (ClassNotFoundException e) {
402          LOG.debug("Did not find relocated ByteString class from hadoop-thirdparty." +
403            " Assuming this is below Hadoop 3.3.0", e);
404        }
405
406        // LiteralByteString is a package private class in protobuf. Make it accessible.
407        Class<?> literalByteStringClass;
408        try {
409          literalByteStringClass = Class.forName(
410            "org.apache.hadoop.thirdparty.protobuf.ByteString$LiteralByteString");
411          LOG.debug("Shaded LiteralByteString from hadoop-thirdparty is found.");
412        } catch (ClassNotFoundException e) {
413          try {
414            literalByteStringClass = Class.forName("com.google.protobuf.LiteralByteString");
415            LOG.debug("com.google.protobuf.LiteralByteString found.");
416          } catch (ClassNotFoundException ex) {
417            throw new RuntimeException(ex);
418          }
419        }
420
421        try {
422          constructor = literalByteStringClass.getDeclaredConstructor(byte[].class);
423          constructor.setAccessible(true);
424        } catch (NoSuchMethodException e) {
425          throw new RuntimeException(e);
426        }
427
428        try {
429          setPayloadMethod = builderClass.getMethod("setPayload", byteStringClass);
430        } catch (NoSuchMethodException e) {
431          // if either method is not found, we are in big trouble. Abort.
432          throw new RuntimeException(e);
433        }
434      }
435    }
436
437    private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload,
438        List<CipherOption> options) throws IOException {
439      DataTransferEncryptorMessageProto.Builder builder =
440          DataTransferEncryptorMessageProto.newBuilder();
441      builder.setStatus(DataTransferEncryptorStatus.SUCCESS);
442      if (payload != null) {
443        BuilderPayloadSetter.wrapAndSetPayload(builder, payload);
444      }
445      if (options != null) {
446        builder.addAllCipherOption(PBHelperClient.convertCipherOptions(options));
447      }
448      DataTransferEncryptorMessageProto proto = builder.build();
449      int size = proto.getSerializedSize();
450      size += CodedOutputStream.computeRawVarint32Size(size);
451      ByteBuf buf = ctx.alloc().buffer(size);
452      proto.writeDelimitedTo(new ByteBufOutputStream(buf));
453      ctx.write(buf);
454    }
455
456    @Override
457    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
458      ctx.write(ctx.alloc().buffer(4).writeInt(SASL_TRANSFER_MAGIC_NUMBER));
459      sendSaslMessage(ctx, new byte[0]);
460      ctx.flush();
461      step++;
462    }
463
464    @Override
465    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
466      saslClient.dispose();
467    }
468
469    private void check(DataTransferEncryptorMessageProto proto) throws IOException {
470      if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) {
471        dfsClient.clearDataEncryptionKey();
472        throw new InvalidEncryptionKeyException(proto.getMessage());
473      } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) {
474        throw new IOException(proto.getMessage());
475      }
476    }
477
478    private String getNegotiatedQop() {
479      return (String) saslClient.getNegotiatedProperty(Sasl.QOP);
480    }
481
482    private boolean isNegotiatedQopPrivacy() {
483      String qop = getNegotiatedQop();
484      return qop != null && "auth-conf".equalsIgnoreCase(qop);
485    }
486
487    private boolean requestedQopContainsPrivacy() {
488      Set<String> requestedQop =
489          ImmutableSet.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(",")));
490      return requestedQop.contains("auth-conf");
491    }
492
493    private void checkSaslComplete() throws IOException {
494      if (!saslClient.isComplete()) {
495        throw new IOException("Failed to complete SASL handshake");
496      }
497      Set<String> requestedQop =
498          ImmutableSet.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(",")));
499      String negotiatedQop = getNegotiatedQop();
500      LOG.debug(
501        "Verifying QOP, requested QOP = " + requestedQop + ", negotiated QOP = " + negotiatedQop);
502      if (!requestedQop.contains(negotiatedQop)) {
503        throw new IOException(String.format("SASL handshake completed, but "
504            + "channel does not have acceptable quality of protection, "
505            + "requested = %s, negotiated = %s",
506          requestedQop, negotiatedQop));
507      }
508    }
509
510    private boolean useWrap() {
511      String qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP);
512      return qop != null && !"auth".equalsIgnoreCase(qop);
513    }
514
515    private CipherOption unwrap(CipherOption option, SaslClient saslClient) throws IOException {
516      byte[] inKey = option.getInKey();
517      if (inKey != null) {
518        inKey = saslClient.unwrap(inKey, 0, inKey.length);
519      }
520      byte[] outKey = option.getOutKey();
521      if (outKey != null) {
522        outKey = saslClient.unwrap(outKey, 0, outKey.length);
523      }
524      return new CipherOption(option.getCipherSuite(), inKey, option.getInIv(), outKey,
525          option.getOutIv());
526    }
527
528    private CipherOption getCipherOption(DataTransferEncryptorMessageProto proto,
529        boolean isNegotiatedQopPrivacy, SaslClient saslClient) throws IOException {
530      List<CipherOption> cipherOptions =
531          PBHelperClient.convertCipherOptionProtos(proto.getCipherOptionList());
532      if (cipherOptions == null || cipherOptions.isEmpty()) {
533        return null;
534      }
535      CipherOption cipherOption = cipherOptions.get(0);
536      return isNegotiatedQopPrivacy ? unwrap(cipherOption, saslClient) : cipherOption;
537    }
538
539    @Override
540    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
541      if (msg instanceof DataTransferEncryptorMessageProto) {
542        DataTransferEncryptorMessageProto proto = (DataTransferEncryptorMessageProto) msg;
543        check(proto);
544        byte[] challenge = proto.getPayload().toByteArray();
545        byte[] response = saslClient.evaluateChallenge(challenge);
546        switch (step) {
547          case 1: {
548            List<CipherOption> cipherOptions = null;
549            if (requestedQopContainsPrivacy()) {
550              cipherOptions = getCipherOptions();
551            }
552            sendSaslMessage(ctx, response, cipherOptions);
553            ctx.flush();
554            step++;
555            break;
556          }
557          case 2: {
558            assert response == null;
559            checkSaslComplete();
560            CipherOption cipherOption =
561                getCipherOption(proto, isNegotiatedQopPrivacy(), saslClient);
562            ChannelPipeline p = ctx.pipeline();
563            while (p.first() != null) {
564              p.removeFirst();
565            }
566            if (cipherOption != null) {
567              CryptoCodec codec = CryptoCodec.getInstance(conf, cipherOption.getCipherSuite());
568              p.addLast(new EncryptHandler(codec, cipherOption.getInKey(), cipherOption.getInIv()),
569                new DecryptHandler(codec, cipherOption.getOutKey(), cipherOption.getOutIv()));
570            } else {
571              if (useWrap()) {
572                p.addLast(new SaslWrapHandler(saslClient),
573                  new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4),
574                  new SaslUnwrapHandler(saslClient));
575              }
576            }
577            promise.trySuccess(null);
578            break;
579          }
580          default:
581            throw new IllegalArgumentException("Unrecognized negotiation step: " + step);
582        }
583      } else {
584        ctx.fireChannelRead(msg);
585      }
586    }
587
588    @Override
589    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
590      promise.tryFailure(cause);
591    }
592
593    @Override
594    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
595      if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == READER_IDLE) {
596        promise.tryFailure(new IOException("Timeout(" + timeoutMs + "ms) waiting for response"));
597      } else {
598        super.userEventTriggered(ctx, evt);
599      }
600    }
601  }
602
603  private static final class SaslUnwrapHandler extends SimpleChannelInboundHandler<ByteBuf> {
604
605    private final SaslClient saslClient;
606
607    public SaslUnwrapHandler(SaslClient saslClient) {
608      this.saslClient = saslClient;
609    }
610
611    @Override
612    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
613      saslClient.dispose();
614    }
615
616    @Override
617    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
618      msg.skipBytes(4);
619      byte[] b = new byte[msg.readableBytes()];
620      msg.readBytes(b);
621      ctx.fireChannelRead(Unpooled.wrappedBuffer(saslClient.unwrap(b, 0, b.length)));
622    }
623  }
624
625  private static final class SaslWrapHandler extends ChannelOutboundHandlerAdapter {
626
627    private final SaslClient saslClient;
628
629    private CompositeByteBuf cBuf;
630
631    public SaslWrapHandler(SaslClient saslClient) {
632      this.saslClient = saslClient;
633    }
634
635    @Override
636    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
637      cBuf = new CompositeByteBuf(ctx.alloc(), false, Integer.MAX_VALUE);
638    }
639
640    @Override
641    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
642        throws Exception {
643      if (msg instanceof ByteBuf) {
644        ByteBuf buf = (ByteBuf) msg;
645        cBuf.addComponent(buf);
646        cBuf.writerIndex(cBuf.writerIndex() + buf.readableBytes());
647      } else {
648        ctx.write(msg);
649      }
650    }
651
652    @Override
653    public void flush(ChannelHandlerContext ctx) throws Exception {
654      if (cBuf.isReadable()) {
655        byte[] b = new byte[cBuf.readableBytes()];
656        cBuf.readBytes(b);
657        cBuf.discardReadComponents();
658        byte[] wrapped = saslClient.wrap(b, 0, b.length);
659        ByteBuf buf = ctx.alloc().ioBuffer(4 + wrapped.length);
660        buf.writeInt(wrapped.length);
661        buf.writeBytes(wrapped);
662        ctx.write(buf);
663      }
664      ctx.flush();
665    }
666
667    @Override
668    public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
669      cBuf.release();
670      cBuf = null;
671    }
672  }
673
674  private static final class DecryptHandler extends SimpleChannelInboundHandler<ByteBuf> {
675
676    private final Decryptor decryptor;
677
678    public DecryptHandler(CryptoCodec codec, byte[] key, byte[] iv)
679        throws GeneralSecurityException, IOException {
680      this.decryptor = codec.createDecryptor();
681      this.decryptor.init(key, Arrays.copyOf(iv, iv.length));
682    }
683
684    @Override
685    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
686      ByteBuf inBuf;
687      boolean release = false;
688      if (msg.nioBufferCount() == 1) {
689        inBuf = msg;
690      } else {
691        inBuf = ctx.alloc().directBuffer(msg.readableBytes());
692        msg.readBytes(inBuf);
693        release = true;
694      }
695      ByteBuffer inBuffer = inBuf.nioBuffer();
696      ByteBuf outBuf = ctx.alloc().directBuffer(inBuf.readableBytes());
697      ByteBuffer outBuffer = outBuf.nioBuffer(0, inBuf.readableBytes());
698      decryptor.decrypt(inBuffer, outBuffer);
699      outBuf.writerIndex(inBuf.readableBytes());
700      if (release) {
701        inBuf.release();
702      }
703      ctx.fireChannelRead(outBuf);
704    }
705  }
706
707  private static final class EncryptHandler extends MessageToByteEncoder<ByteBuf> {
708
709    private final Encryptor encryptor;
710
711    public EncryptHandler(CryptoCodec codec, byte[] key, byte[] iv)
712        throws GeneralSecurityException, IOException {
713      this.encryptor = codec.createEncryptor();
714      this.encryptor.init(key, Arrays.copyOf(iv, iv.length));
715    }
716
717    @Override
718    protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, ByteBuf msg, boolean preferDirect)
719        throws Exception {
720      if (preferDirect) {
721        return ctx.alloc().directBuffer(msg.readableBytes());
722      } else {
723        return ctx.alloc().buffer(msg.readableBytes());
724      }
725    }
726
727    @Override
728    protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception {
729      ByteBuf inBuf;
730      boolean release = false;
731      if (msg.nioBufferCount() == 1) {
732        inBuf = msg;
733      } else {
734        inBuf = ctx.alloc().directBuffer(msg.readableBytes());
735        msg.readBytes(inBuf);
736        release = true;
737      }
738      ByteBuffer inBuffer = inBuf.nioBuffer();
739      ByteBuffer outBuffer = out.nioBuffer(0, inBuf.readableBytes());
740      encryptor.encrypt(inBuffer, outBuffer);
741      out.writerIndex(inBuf.readableBytes());
742      if (release) {
743        inBuf.release();
744      }
745    }
746  }
747
748  private static String getUserNameFromEncryptionKey(DataEncryptionKey encryptionKey) {
749    return encryptionKey.keyId + NAME_DELIMITER + encryptionKey.blockPoolId + NAME_DELIMITER
750        + Base64.getEncoder().encodeToString(encryptionKey.nonce);
751  }
752
753  private static char[] encryptionKeyToPassword(byte[] encryptionKey) {
754    return Base64.getEncoder().encodeToString(encryptionKey).toCharArray();
755  }
756
757  private static String buildUsername(Token<BlockTokenIdentifier> blockToken) {
758    return Base64.getEncoder().encodeToString(blockToken.getIdentifier());
759  }
760
761  private static char[] buildClientPassword(Token<BlockTokenIdentifier> blockToken) {
762    return Base64.getEncoder().encodeToString(blockToken.getPassword()).toCharArray();
763  }
764
765  private static Map<String, String> createSaslPropertiesForEncryption(String encryptionAlgorithm) {
766    Map<String, String> saslProps = Maps.newHashMapWithExpectedSize(3);
767    saslProps.put(Sasl.QOP, QualityOfProtection.PRIVACY.getSaslQop());
768    saslProps.put(Sasl.SERVER_AUTH, "true");
769    saslProps.put("com.sun.security.sasl.digest.cipher", encryptionAlgorithm);
770    return saslProps;
771  }
772
773  private static void doSaslNegotiation(Configuration conf, Channel channel, int timeoutMs,
774      String username, char[] password, Map<String, String> saslProps, Promise<Void> saslPromise,
775      DFSClient dfsClient) {
776    try {
777      channel.pipeline().addLast(new IdleStateHandler(timeoutMs, 0, 0, TimeUnit.MILLISECONDS),
778        new ProtobufVarint32FrameDecoder(),
779        new ProtobufDecoder(DataTransferEncryptorMessageProto.getDefaultInstance()),
780        new SaslNegotiateHandler(conf, username, password, saslProps, timeoutMs, saslPromise,
781            dfsClient));
782    } catch (SaslException e) {
783      saslPromise.tryFailure(e);
784    }
785  }
786
787  static void trySaslNegotiate(Configuration conf, Channel channel, DatanodeInfo dnInfo,
788      int timeoutMs, DFSClient client, Token<BlockTokenIdentifier> accessToken,
789      Promise<Void> saslPromise) throws IOException {
790    SaslDataTransferClient saslClient = client.getSaslDataTransferClient();
791    SaslPropertiesResolver saslPropsResolver = SASL_ADAPTOR.getSaslPropsResolver(saslClient);
792    TrustedChannelResolver trustedChannelResolver =
793        SASL_ADAPTOR.getTrustedChannelResolver(saslClient);
794    AtomicBoolean fallbackToSimpleAuth = SASL_ADAPTOR.getFallbackToSimpleAuth(saslClient);
795    InetAddress addr = ((InetSocketAddress) channel.remoteAddress()).getAddress();
796    if (trustedChannelResolver.isTrusted() || trustedChannelResolver.isTrusted(addr)) {
797      saslPromise.trySuccess(null);
798      return;
799    }
800    DataEncryptionKey encryptionKey = client.newDataEncryptionKey();
801    if (encryptionKey != null) {
802      if (LOG.isDebugEnabled()) {
803        LOG.debug(
804          "SASL client doing encrypted handshake for addr = " + addr + ", datanodeId = " + dnInfo);
805      }
806      doSaslNegotiation(conf, channel, timeoutMs, getUserNameFromEncryptionKey(encryptionKey),
807        encryptionKeyToPassword(encryptionKey.encryptionKey),
808        createSaslPropertiesForEncryption(encryptionKey.encryptionAlgorithm), saslPromise,
809          client);
810    } else if (!UserGroupInformation.isSecurityEnabled()) {
811      if (LOG.isDebugEnabled()) {
812        LOG.debug("SASL client skipping handshake in unsecured configuration for addr = " + addr
813            + ", datanodeId = " + dnInfo);
814      }
815      saslPromise.trySuccess(null);
816    } else if (dnInfo.getXferPort() < 1024) {
817      if (LOG.isDebugEnabled()) {
818        LOG.debug("SASL client skipping handshake in secured configuration with "
819            + "privileged port for addr = " + addr + ", datanodeId = " + dnInfo);
820      }
821      saslPromise.trySuccess(null);
822    } else if (fallbackToSimpleAuth != null && fallbackToSimpleAuth.get()) {
823      if (LOG.isDebugEnabled()) {
824        LOG.debug("SASL client skipping handshake in secured configuration with "
825            + "unsecured cluster for addr = " + addr + ", datanodeId = " + dnInfo);
826      }
827      saslPromise.trySuccess(null);
828    } else if (saslPropsResolver != null) {
829      if (LOG.isDebugEnabled()) {
830        LOG.debug(
831          "SASL client doing general handshake for addr = " + addr + ", datanodeId = " + dnInfo);
832      }
833      doSaslNegotiation(conf, channel, timeoutMs, buildUsername(accessToken),
834        buildClientPassword(accessToken), saslPropsResolver.getClientProperties(addr), saslPromise,
835          client);
836    } else {
837      // It's a secured cluster using non-privileged ports, but no SASL. The only way this can
838      // happen is if the DataNode has ignore.secure.ports.for.testing configured, so this is a rare
839      // edge case.
840      if (LOG.isDebugEnabled()) {
841        LOG.debug("SASL client skipping handshake in secured configuration with no SASL "
842            + "protection configured for addr = " + addr + ", datanodeId = " + dnInfo);
843      }
844      saslPromise.trySuccess(null);
845    }
846  }
847
848  static Encryptor createEncryptor(Configuration conf, HdfsFileStatus stat, DFSClient client)
849      throws IOException {
850    FileEncryptionInfo feInfo = stat.getFileEncryptionInfo();
851    if (feInfo == null) {
852      return null;
853    }
854    return TRANSPARENT_CRYPTO_HELPER.createEncryptor(conf, feInfo, client);
855  }
856}