001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.asyncfs;
019
020import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
021import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE;
022
023import com.google.protobuf.ByteString;
024import com.google.protobuf.CodedOutputStream;
025import java.io.IOException;
026import java.lang.reflect.Field;
027import java.lang.reflect.InvocationTargetException;
028import java.lang.reflect.Method;
029import java.net.InetAddress;
030import java.net.InetSocketAddress;
031import java.nio.ByteBuffer;
032import java.security.GeneralSecurityException;
033import java.util.Arrays;
034import java.util.Collections;
035import java.util.List;
036import java.util.Map;
037import java.util.Set;
038import java.util.concurrent.TimeUnit;
039import java.util.concurrent.atomic.AtomicBoolean;
040import javax.security.auth.callback.Callback;
041import javax.security.auth.callback.CallbackHandler;
042import javax.security.auth.callback.NameCallback;
043import javax.security.auth.callback.PasswordCallback;
044import javax.security.auth.callback.UnsupportedCallbackException;
045import javax.security.sasl.RealmCallback;
046import javax.security.sasl.RealmChoiceCallback;
047import javax.security.sasl.Sasl;
048import javax.security.sasl.SaslClient;
049import javax.security.sasl.SaslException;
050import org.apache.commons.codec.binary.Base64;
051import org.apache.commons.lang3.StringUtils;
052import org.apache.hadoop.conf.Configuration;
053import org.apache.hadoop.crypto.CipherOption;
054import org.apache.hadoop.crypto.CipherSuite;
055import org.apache.hadoop.crypto.CryptoCodec;
056import org.apache.hadoop.crypto.Decryptor;
057import org.apache.hadoop.crypto.Encryptor;
058import org.apache.hadoop.crypto.key.KeyProvider;
059import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
060import org.apache.hadoop.fs.FileEncryptionInfo;
061import org.apache.hadoop.hdfs.DFSClient;
062import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
063import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
064import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
065import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
066import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
067import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto;
068import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus;
069import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CipherOptionProto;
070import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
071import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
072import org.apache.hadoop.security.SaslPropertiesResolver;
073import org.apache.hadoop.security.SaslRpcServer.QualityOfProtection;
074import org.apache.hadoop.security.UserGroupInformation;
075import org.apache.hadoop.security.token.Token;
076import org.apache.yetus.audience.InterfaceAudience;
077import org.slf4j.Logger;
078import org.slf4j.LoggerFactory;
079
080import org.apache.hbase.thirdparty.com.google.common.base.Charsets;
081import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
082import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
083import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
084import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
085import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream;
086import org.apache.hbase.thirdparty.io.netty.buffer.CompositeByteBuf;
087import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled;
088import org.apache.hbase.thirdparty.io.netty.channel.Channel;
089import org.apache.hbase.thirdparty.io.netty.channel.ChannelDuplexHandler;
090import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
091import org.apache.hbase.thirdparty.io.netty.channel.ChannelOutboundHandlerAdapter;
092import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
093import org.apache.hbase.thirdparty.io.netty.channel.ChannelPromise;
094import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
095import org.apache.hbase.thirdparty.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
096import org.apache.hbase.thirdparty.io.netty.handler.codec.MessageToByteEncoder;
097import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufDecoder;
098import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
099import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
100import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
101import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise;
102
103/**
104 * Helper class for adding sasl support for {@link FanOutOneBlockAsyncDFSOutput}.
105 */
106@InterfaceAudience.Private
107public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
108  private static final Logger LOG =
109      LoggerFactory.getLogger(FanOutOneBlockAsyncDFSOutputSaslHelper.class);
110
111  private FanOutOneBlockAsyncDFSOutputSaslHelper() {
112  }
113
114  private static final String SERVER_NAME = "0";
115  private static final String PROTOCOL = "hdfs";
116  private static final String MECHANISM = "DIGEST-MD5";
117  private static final int SASL_TRANSFER_MAGIC_NUMBER = 0xDEADBEEF;
118  private static final String NAME_DELIMITER = " ";
119
120  private interface SaslAdaptor {
121
122    TrustedChannelResolver getTrustedChannelResolver(SaslDataTransferClient saslClient);
123
124    SaslPropertiesResolver getSaslPropsResolver(SaslDataTransferClient saslClient);
125
126    AtomicBoolean getFallbackToSimpleAuth(SaslDataTransferClient saslClient);
127  }
128
129  private static final SaslAdaptor SASL_ADAPTOR;
130
131  // helper class for convert protos.
132  private interface PBHelper {
133
134    List<CipherOptionProto> convertCipherOptions(List<CipherOption> options);
135
136    List<CipherOption> convertCipherOptionProtos(List<CipherOptionProto> options);
137  }
138
139  private static final PBHelper PB_HELPER;
140
141  private interface TransparentCryptoHelper {
142
143    Encryptor createEncryptor(Configuration conf, FileEncryptionInfo feInfo, DFSClient client)
144        throws IOException;
145  }
146
147  private static final TransparentCryptoHelper TRANSPARENT_CRYPTO_HELPER;
148
149  private static SaslAdaptor createSaslAdaptor()
150      throws NoSuchFieldException, NoSuchMethodException {
151    Field saslPropsResolverField =
152        SaslDataTransferClient.class.getDeclaredField("saslPropsResolver");
153    saslPropsResolverField.setAccessible(true);
154    Field trustedChannelResolverField =
155        SaslDataTransferClient.class.getDeclaredField("trustedChannelResolver");
156    trustedChannelResolverField.setAccessible(true);
157    Field fallbackToSimpleAuthField =
158        SaslDataTransferClient.class.getDeclaredField("fallbackToSimpleAuth");
159    fallbackToSimpleAuthField.setAccessible(true);
160    return new SaslAdaptor() {
161
162      @Override
163      public TrustedChannelResolver getTrustedChannelResolver(SaslDataTransferClient saslClient) {
164        try {
165          return (TrustedChannelResolver) trustedChannelResolverField.get(saslClient);
166        } catch (IllegalAccessException e) {
167          throw new RuntimeException(e);
168        }
169      }
170
171      @Override
172      public SaslPropertiesResolver getSaslPropsResolver(SaslDataTransferClient saslClient) {
173        try {
174          return (SaslPropertiesResolver) saslPropsResolverField.get(saslClient);
175        } catch (IllegalAccessException e) {
176          throw new RuntimeException(e);
177        }
178      }
179
180      @Override
181      public AtomicBoolean getFallbackToSimpleAuth(SaslDataTransferClient saslClient) {
182        try {
183          return (AtomicBoolean) fallbackToSimpleAuthField.get(saslClient);
184        } catch (IllegalAccessException e) {
185          throw new RuntimeException(e);
186        }
187      }
188    };
189  }
190
191  private static PBHelper createPBHelper() throws NoSuchMethodException {
192    Class<?> helperClass;
193    try {
194      helperClass = Class.forName("org.apache.hadoop.hdfs.protocolPB.PBHelperClient");
195    } catch (ClassNotFoundException e) {
196      LOG.debug("No PBHelperClient class found, should be hadoop 2.7-", e);
197      helperClass = org.apache.hadoop.hdfs.protocolPB.PBHelper.class;
198    }
199    Method convertCipherOptionsMethod = helperClass.getMethod("convertCipherOptions", List.class);
200    Method convertCipherOptionProtosMethod =
201        helperClass.getMethod("convertCipherOptionProtos", List.class);
202    return new PBHelper() {
203
204      @SuppressWarnings("unchecked")
205      @Override
206      public List<CipherOptionProto> convertCipherOptions(List<CipherOption> options) {
207        try {
208          return (List<CipherOptionProto>) convertCipherOptionsMethod.invoke(null, options);
209        } catch (IllegalAccessException | InvocationTargetException e) {
210          throw new RuntimeException(e);
211        }
212      }
213
214      @SuppressWarnings("unchecked")
215      @Override
216      public List<CipherOption> convertCipherOptionProtos(List<CipherOptionProto> options) {
217        try {
218          return (List<CipherOption>) convertCipherOptionProtosMethod.invoke(null, options);
219        } catch (IllegalAccessException | InvocationTargetException e) {
220          throw new RuntimeException(e);
221        }
222      }
223    };
224  }
225
226  private static TransparentCryptoHelper createTransparentCryptoHelper27()
227      throws NoSuchMethodException {
228    Method decryptEncryptedDataEncryptionKeyMethod = DFSClient.class
229      .getDeclaredMethod("decryptEncryptedDataEncryptionKey", FileEncryptionInfo.class);
230    decryptEncryptedDataEncryptionKeyMethod.setAccessible(true);
231    return new TransparentCryptoHelper() {
232
233      @Override
234      public Encryptor createEncryptor(Configuration conf, FileEncryptionInfo feInfo,
235          DFSClient client) throws IOException {
236        try {
237          KeyVersion decryptedKey =
238            (KeyVersion) decryptEncryptedDataEncryptionKeyMethod.invoke(client, feInfo);
239          CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf, feInfo.getCipherSuite());
240          Encryptor encryptor = cryptoCodec.createEncryptor();
241          encryptor.init(decryptedKey.getMaterial(), feInfo.getIV());
242          return encryptor;
243        } catch (InvocationTargetException e) {
244          Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
245          throw new RuntimeException(e.getTargetException());
246        } catch (GeneralSecurityException e) {
247          throw new IOException(e);
248        } catch (IllegalAccessException e) {
249          throw new RuntimeException(e);
250        }
251      }
252    };
253  }
254
255  private static TransparentCryptoHelper createTransparentCryptoHelper28()
256      throws ClassNotFoundException, NoSuchMethodException {
257    Class<?> hdfsKMSUtilCls = Class.forName("org.apache.hadoop.hdfs.HdfsKMSUtil");
258    Method decryptEncryptedDataEncryptionKeyMethod = hdfsKMSUtilCls.getDeclaredMethod(
259      "decryptEncryptedDataEncryptionKey", FileEncryptionInfo.class, KeyProvider.class);
260    decryptEncryptedDataEncryptionKeyMethod.setAccessible(true);
261    return new TransparentCryptoHelper() {
262
263      @Override
264      public Encryptor createEncryptor(Configuration conf, FileEncryptionInfo feInfo,
265          DFSClient client) throws IOException {
266        try {
267          KeyVersion decryptedKey = (KeyVersion) decryptEncryptedDataEncryptionKeyMethod
268            .invoke(null, feInfo, client.getKeyProvider());
269          CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf, feInfo.getCipherSuite());
270          Encryptor encryptor = cryptoCodec.createEncryptor();
271          encryptor.init(decryptedKey.getMaterial(), feInfo.getIV());
272          return encryptor;
273        } catch (InvocationTargetException e) {
274          Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
275          throw new RuntimeException(e.getTargetException());
276        } catch (GeneralSecurityException e) {
277          throw new IOException(e);
278        } catch (IllegalAccessException e) {
279          throw new RuntimeException(e);
280        }
281      }
282    };
283  }
284
285  private static TransparentCryptoHelper createTransparentCryptoHelper()
286      throws NoSuchMethodException, ClassNotFoundException {
287    try {
288      return createTransparentCryptoHelper27();
289    } catch (NoSuchMethodException e) {
290      LOG.debug("No decryptEncryptedDataEncryptionKey method in DFSClient, should be hadoop 2.8+",
291        e);
292    }
293    return createTransparentCryptoHelper28();
294  }
295
296  static {
297    try {
298      SASL_ADAPTOR = createSaslAdaptor();
299      PB_HELPER = createPBHelper();
300      TRANSPARENT_CRYPTO_HELPER = createTransparentCryptoHelper();
301    } catch (Exception e) {
302      String msg = "Couldn't properly initialize access to HDFS internals. Please "
303          + "update your WAL Provider to not make use of the 'asyncfs' provider. See "
304          + "HBASE-16110 for more information.";
305      LOG.error(msg, e);
306      throw new Error(msg, e);
307    }
308  }
309
310  /**
311   * Sets user name and password when asked by the client-side SASL object.
312   */
313  private static final class SaslClientCallbackHandler implements CallbackHandler {
314
315    private final char[] password;
316    private final String userName;
317
318    /**
319     * Creates a new SaslClientCallbackHandler.
320     * @param userName SASL user name
321     * @param password SASL password
322     */
323    public SaslClientCallbackHandler(String userName, char[] password) {
324      this.password = password;
325      this.userName = userName;
326    }
327
328    @Override
329    public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
330      NameCallback nc = null;
331      PasswordCallback pc = null;
332      RealmCallback rc = null;
333      for (Callback callback : callbacks) {
334        if (callback instanceof RealmChoiceCallback) {
335          continue;
336        } else if (callback instanceof NameCallback) {
337          nc = (NameCallback) callback;
338        } else if (callback instanceof PasswordCallback) {
339          pc = (PasswordCallback) callback;
340        } else if (callback instanceof RealmCallback) {
341          rc = (RealmCallback) callback;
342        } else {
343          throw new UnsupportedCallbackException(callback, "Unrecognized SASL client callback");
344        }
345      }
346      if (nc != null) {
347        nc.setName(userName);
348      }
349      if (pc != null) {
350        pc.setPassword(password);
351      }
352      if (rc != null) {
353        rc.setText(rc.getDefaultText());
354      }
355    }
356  }
357
358  private static final class SaslNegotiateHandler extends ChannelDuplexHandler {
359
360    private final Configuration conf;
361
362    private final Map<String, String> saslProps;
363
364    private final SaslClient saslClient;
365
366    private final int timeoutMs;
367
368    private final Promise<Void> promise;
369
370    private final DFSClient dfsClient;
371
372    private int step = 0;
373
374    public SaslNegotiateHandler(Configuration conf, String username, char[] password,
375        Map<String, String> saslProps, int timeoutMs, Promise<Void> promise,
376        DFSClient dfsClient) throws SaslException {
377      this.conf = conf;
378      this.saslProps = saslProps;
379      this.saslClient = Sasl.createSaslClient(new String[] { MECHANISM }, username, PROTOCOL,
380        SERVER_NAME, saslProps, new SaslClientCallbackHandler(username, password));
381      this.timeoutMs = timeoutMs;
382      this.promise = promise;
383      this.dfsClient = dfsClient;
384    }
385
386    private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload) throws IOException {
387      sendSaslMessage(ctx, payload, null);
388    }
389
390    private List<CipherOption> getCipherOptions() throws IOException {
391      // Negotiate cipher suites if configured. Currently, the only supported
392      // cipher suite is AES/CTR/NoPadding, but the protocol allows multiple
393      // values for future expansion.
394      String cipherSuites = conf.get(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY);
395      if (StringUtils.isBlank(cipherSuites)) {
396        return null;
397      }
398      if (!cipherSuites.equals(CipherSuite.AES_CTR_NOPADDING.getName())) {
399        throw new IOException(String.format("Invalid cipher suite, %s=%s",
400          DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuites));
401      }
402      return Collections.singletonList(new CipherOption(CipherSuite.AES_CTR_NOPADDING));
403    }
404
405    private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload,
406        List<CipherOption> options) throws IOException {
407      DataTransferEncryptorMessageProto.Builder builder =
408          DataTransferEncryptorMessageProto.newBuilder();
409      builder.setStatus(DataTransferEncryptorStatus.SUCCESS);
410      if (payload != null) {
411        // Was ByteStringer; fix w/o using ByteStringer. Its in hbase-protocol
412        // and we want to keep that out of hbase-server.
413        builder.setPayload(ByteString.copyFrom(payload));
414      }
415      if (options != null) {
416        builder.addAllCipherOption(PB_HELPER.convertCipherOptions(options));
417      }
418      DataTransferEncryptorMessageProto proto = builder.build();
419      int size = proto.getSerializedSize();
420      size += CodedOutputStream.computeRawVarint32Size(size);
421      ByteBuf buf = ctx.alloc().buffer(size);
422      proto.writeDelimitedTo(new ByteBufOutputStream(buf));
423      ctx.write(buf);
424    }
425
426    @Override
427    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
428      ctx.write(ctx.alloc().buffer(4).writeInt(SASL_TRANSFER_MAGIC_NUMBER));
429      sendSaslMessage(ctx, new byte[0]);
430      ctx.flush();
431      step++;
432    }
433
434    @Override
435    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
436      saslClient.dispose();
437    }
438
439    private void check(DataTransferEncryptorMessageProto proto) throws IOException {
440      if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) {
441        dfsClient.clearDataEncryptionKey();
442        throw new InvalidEncryptionKeyException(proto.getMessage());
443      } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) {
444        throw new IOException(proto.getMessage());
445      }
446    }
447
448    private String getNegotiatedQop() {
449      return (String) saslClient.getNegotiatedProperty(Sasl.QOP);
450    }
451
452    private boolean isNegotiatedQopPrivacy() {
453      String qop = getNegotiatedQop();
454      return qop != null && "auth-conf".equalsIgnoreCase(qop);
455    }
456
457    private boolean requestedQopContainsPrivacy() {
458      Set<String> requestedQop =
459          ImmutableSet.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(",")));
460      return requestedQop.contains("auth-conf");
461    }
462
463    private void checkSaslComplete() throws IOException {
464      if (!saslClient.isComplete()) {
465        throw new IOException("Failed to complete SASL handshake");
466      }
467      Set<String> requestedQop =
468          ImmutableSet.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(",")));
469      String negotiatedQop = getNegotiatedQop();
470      LOG.debug(
471        "Verifying QOP, requested QOP = " + requestedQop + ", negotiated QOP = " + negotiatedQop);
472      if (!requestedQop.contains(negotiatedQop)) {
473        throw new IOException(String.format("SASL handshake completed, but "
474            + "channel does not have acceptable quality of protection, "
475            + "requested = %s, negotiated = %s",
476          requestedQop, negotiatedQop));
477      }
478    }
479
480    private boolean useWrap() {
481      String qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP);
482      return qop != null && !"auth".equalsIgnoreCase(qop);
483    }
484
485    private CipherOption unwrap(CipherOption option, SaslClient saslClient) throws IOException {
486      byte[] inKey = option.getInKey();
487      if (inKey != null) {
488        inKey = saslClient.unwrap(inKey, 0, inKey.length);
489      }
490      byte[] outKey = option.getOutKey();
491      if (outKey != null) {
492        outKey = saslClient.unwrap(outKey, 0, outKey.length);
493      }
494      return new CipherOption(option.getCipherSuite(), inKey, option.getInIv(), outKey,
495          option.getOutIv());
496    }
497
498    private CipherOption getCipherOption(DataTransferEncryptorMessageProto proto,
499        boolean isNegotiatedQopPrivacy, SaslClient saslClient) throws IOException {
500      List<CipherOption> cipherOptions =
501          PB_HELPER.convertCipherOptionProtos(proto.getCipherOptionList());
502      if (cipherOptions == null || cipherOptions.isEmpty()) {
503        return null;
504      }
505      CipherOption cipherOption = cipherOptions.get(0);
506      return isNegotiatedQopPrivacy ? unwrap(cipherOption, saslClient) : cipherOption;
507    }
508
509    @Override
510    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
511      if (msg instanceof DataTransferEncryptorMessageProto) {
512        DataTransferEncryptorMessageProto proto = (DataTransferEncryptorMessageProto) msg;
513        check(proto);
514        byte[] challenge = proto.getPayload().toByteArray();
515        byte[] response = saslClient.evaluateChallenge(challenge);
516        switch (step) {
517          case 1: {
518            List<CipherOption> cipherOptions = null;
519            if (requestedQopContainsPrivacy()) {
520              cipherOptions = getCipherOptions();
521            }
522            sendSaslMessage(ctx, response, cipherOptions);
523            ctx.flush();
524            step++;
525            break;
526          }
527          case 2: {
528            assert response == null;
529            checkSaslComplete();
530            CipherOption cipherOption =
531                getCipherOption(proto, isNegotiatedQopPrivacy(), saslClient);
532            ChannelPipeline p = ctx.pipeline();
533            while (p.first() != null) {
534              p.removeFirst();
535            }
536            if (cipherOption != null) {
537              CryptoCodec codec = CryptoCodec.getInstance(conf, cipherOption.getCipherSuite());
538              p.addLast(new EncryptHandler(codec, cipherOption.getInKey(), cipherOption.getInIv()),
539                new DecryptHandler(codec, cipherOption.getOutKey(), cipherOption.getOutIv()));
540            } else {
541              if (useWrap()) {
542                p.addLast(new SaslWrapHandler(saslClient),
543                  new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4),
544                  new SaslUnwrapHandler(saslClient));
545              }
546            }
547            promise.trySuccess(null);
548            break;
549          }
550          default:
551            throw new IllegalArgumentException("Unrecognized negotiation step: " + step);
552        }
553      } else {
554        ctx.fireChannelRead(msg);
555      }
556    }
557
558    @Override
559    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
560      promise.tryFailure(cause);
561    }
562
563    @Override
564    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
565      if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == READER_IDLE) {
566        promise.tryFailure(new IOException("Timeout(" + timeoutMs + "ms) waiting for response"));
567      } else {
568        super.userEventTriggered(ctx, evt);
569      }
570    }
571  }
572
573  private static final class SaslUnwrapHandler extends SimpleChannelInboundHandler<ByteBuf> {
574
575    private final SaslClient saslClient;
576
577    public SaslUnwrapHandler(SaslClient saslClient) {
578      this.saslClient = saslClient;
579    }
580
581    @Override
582    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
583      saslClient.dispose();
584    }
585
586    @Override
587    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
588      msg.skipBytes(4);
589      byte[] b = new byte[msg.readableBytes()];
590      msg.readBytes(b);
591      ctx.fireChannelRead(Unpooled.wrappedBuffer(saslClient.unwrap(b, 0, b.length)));
592    }
593  }
594
595  private static final class SaslWrapHandler extends ChannelOutboundHandlerAdapter {
596
597    private final SaslClient saslClient;
598
599    private CompositeByteBuf cBuf;
600
601    public SaslWrapHandler(SaslClient saslClient) {
602      this.saslClient = saslClient;
603    }
604
605    @Override
606    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
607      cBuf = new CompositeByteBuf(ctx.alloc(), false, Integer.MAX_VALUE);
608    }
609
610    @Override
611    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
612        throws Exception {
613      if (msg instanceof ByteBuf) {
614        ByteBuf buf = (ByteBuf) msg;
615        cBuf.addComponent(buf);
616        cBuf.writerIndex(cBuf.writerIndex() + buf.readableBytes());
617      } else {
618        ctx.write(msg);
619      }
620    }
621
622    @Override
623    public void flush(ChannelHandlerContext ctx) throws Exception {
624      if (cBuf.isReadable()) {
625        byte[] b = new byte[cBuf.readableBytes()];
626        cBuf.readBytes(b);
627        cBuf.discardReadComponents();
628        byte[] wrapped = saslClient.wrap(b, 0, b.length);
629        ByteBuf buf = ctx.alloc().ioBuffer(4 + wrapped.length);
630        buf.writeInt(wrapped.length);
631        buf.writeBytes(wrapped);
632        ctx.write(buf);
633      }
634      ctx.flush();
635    }
636
637    @Override
638    public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
639      cBuf.release();
640      cBuf = null;
641    }
642  }
643
644  private static final class DecryptHandler extends SimpleChannelInboundHandler<ByteBuf> {
645
646    private final Decryptor decryptor;
647
648    public DecryptHandler(CryptoCodec codec, byte[] key, byte[] iv)
649        throws GeneralSecurityException, IOException {
650      this.decryptor = codec.createDecryptor();
651      this.decryptor.init(key, Arrays.copyOf(iv, iv.length));
652    }
653
654    @Override
655    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
656      ByteBuf inBuf;
657      boolean release = false;
658      if (msg.nioBufferCount() == 1) {
659        inBuf = msg;
660      } else {
661        inBuf = ctx.alloc().directBuffer(msg.readableBytes());
662        msg.readBytes(inBuf);
663        release = true;
664      }
665      ByteBuffer inBuffer = inBuf.nioBuffer();
666      ByteBuf outBuf = ctx.alloc().directBuffer(inBuf.readableBytes());
667      ByteBuffer outBuffer = outBuf.nioBuffer(0, inBuf.readableBytes());
668      decryptor.decrypt(inBuffer, outBuffer);
669      outBuf.writerIndex(inBuf.readableBytes());
670      if (release) {
671        inBuf.release();
672      }
673      ctx.fireChannelRead(outBuf);
674    }
675  }
676
677  private static final class EncryptHandler extends MessageToByteEncoder<ByteBuf> {
678
679    private final Encryptor encryptor;
680
681    public EncryptHandler(CryptoCodec codec, byte[] key, byte[] iv)
682        throws GeneralSecurityException, IOException {
683      this.encryptor = codec.createEncryptor();
684      this.encryptor.init(key, Arrays.copyOf(iv, iv.length));
685    }
686
687    @Override
688    protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, ByteBuf msg, boolean preferDirect)
689        throws Exception {
690      if (preferDirect) {
691        return ctx.alloc().directBuffer(msg.readableBytes());
692      } else {
693        return ctx.alloc().buffer(msg.readableBytes());
694      }
695    }
696
697    @Override
698    protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception {
699      ByteBuf inBuf;
700      boolean release = false;
701      if (msg.nioBufferCount() == 1) {
702        inBuf = msg;
703      } else {
704        inBuf = ctx.alloc().directBuffer(msg.readableBytes());
705        msg.readBytes(inBuf);
706        release = true;
707      }
708      ByteBuffer inBuffer = inBuf.nioBuffer();
709      ByteBuffer outBuffer = out.nioBuffer(0, inBuf.readableBytes());
710      encryptor.encrypt(inBuffer, outBuffer);
711      out.writerIndex(inBuf.readableBytes());
712      if (release) {
713        inBuf.release();
714      }
715    }
716  }
717
718  private static String getUserNameFromEncryptionKey(DataEncryptionKey encryptionKey) {
719    return encryptionKey.keyId + NAME_DELIMITER + encryptionKey.blockPoolId + NAME_DELIMITER
720        + new String(Base64.encodeBase64(encryptionKey.nonce, false), Charsets.UTF_8);
721  }
722
723  private static char[] encryptionKeyToPassword(byte[] encryptionKey) {
724    return new String(Base64.encodeBase64(encryptionKey, false), Charsets.UTF_8).toCharArray();
725  }
726
727  private static String buildUsername(Token<BlockTokenIdentifier> blockToken) {
728    return new String(Base64.encodeBase64(blockToken.getIdentifier(), false), Charsets.UTF_8);
729  }
730
731  private static char[] buildClientPassword(Token<BlockTokenIdentifier> blockToken) {
732    return new String(Base64.encodeBase64(blockToken.getPassword(), false), Charsets.UTF_8)
733        .toCharArray();
734  }
735
736  private static Map<String, String> createSaslPropertiesForEncryption(String encryptionAlgorithm) {
737    Map<String, String> saslProps = Maps.newHashMapWithExpectedSize(3);
738    saslProps.put(Sasl.QOP, QualityOfProtection.PRIVACY.getSaslQop());
739    saslProps.put(Sasl.SERVER_AUTH, "true");
740    saslProps.put("com.sun.security.sasl.digest.cipher", encryptionAlgorithm);
741    return saslProps;
742  }
743
744  private static void doSaslNegotiation(Configuration conf, Channel channel, int timeoutMs,
745      String username, char[] password, Map<String, String> saslProps, Promise<Void> saslPromise,
746      DFSClient dfsClient) {
747    try {
748      channel.pipeline().addLast(new IdleStateHandler(timeoutMs, 0, 0, TimeUnit.MILLISECONDS),
749        new ProtobufVarint32FrameDecoder(),
750        new ProtobufDecoder(DataTransferEncryptorMessageProto.getDefaultInstance()),
751        new SaslNegotiateHandler(conf, username, password, saslProps, timeoutMs, saslPromise,
752            dfsClient));
753    } catch (SaslException e) {
754      saslPromise.tryFailure(e);
755    }
756  }
757
758  static void trySaslNegotiate(Configuration conf, Channel channel, DatanodeInfo dnInfo,
759      int timeoutMs, DFSClient client, Token<BlockTokenIdentifier> accessToken,
760      Promise<Void> saslPromise) throws IOException {
761    SaslDataTransferClient saslClient = client.getSaslDataTransferClient();
762    SaslPropertiesResolver saslPropsResolver = SASL_ADAPTOR.getSaslPropsResolver(saslClient);
763    TrustedChannelResolver trustedChannelResolver =
764        SASL_ADAPTOR.getTrustedChannelResolver(saslClient);
765    AtomicBoolean fallbackToSimpleAuth = SASL_ADAPTOR.getFallbackToSimpleAuth(saslClient);
766    InetAddress addr = ((InetSocketAddress) channel.remoteAddress()).getAddress();
767    if (trustedChannelResolver.isTrusted() || trustedChannelResolver.isTrusted(addr)) {
768      saslPromise.trySuccess(null);
769      return;
770    }
771    DataEncryptionKey encryptionKey = client.newDataEncryptionKey();
772    if (encryptionKey != null) {
773      if (LOG.isDebugEnabled()) {
774        LOG.debug(
775          "SASL client doing encrypted handshake for addr = " + addr + ", datanodeId = " + dnInfo);
776      }
777      doSaslNegotiation(conf, channel, timeoutMs, getUserNameFromEncryptionKey(encryptionKey),
778        encryptionKeyToPassword(encryptionKey.encryptionKey),
779        createSaslPropertiesForEncryption(encryptionKey.encryptionAlgorithm), saslPromise,
780          client);
781    } else if (!UserGroupInformation.isSecurityEnabled()) {
782      if (LOG.isDebugEnabled()) {
783        LOG.debug("SASL client skipping handshake in unsecured configuration for addr = " + addr
784            + ", datanodeId = " + dnInfo);
785      }
786      saslPromise.trySuccess(null);
787    } else if (dnInfo.getXferPort() < 1024) {
788      if (LOG.isDebugEnabled()) {
789        LOG.debug("SASL client skipping handshake in secured configuration with "
790            + "privileged port for addr = " + addr + ", datanodeId = " + dnInfo);
791      }
792      saslPromise.trySuccess(null);
793    } else if (fallbackToSimpleAuth != null && fallbackToSimpleAuth.get()) {
794      if (LOG.isDebugEnabled()) {
795        LOG.debug("SASL client skipping handshake in secured configuration with "
796            + "unsecured cluster for addr = " + addr + ", datanodeId = " + dnInfo);
797      }
798      saslPromise.trySuccess(null);
799    } else if (saslPropsResolver != null) {
800      if (LOG.isDebugEnabled()) {
801        LOG.debug(
802          "SASL client doing general handshake for addr = " + addr + ", datanodeId = " + dnInfo);
803      }
804      doSaslNegotiation(conf, channel, timeoutMs, buildUsername(accessToken),
805        buildClientPassword(accessToken), saslPropsResolver.getClientProperties(addr), saslPromise,
806          client);
807    } else {
808      // It's a secured cluster using non-privileged ports, but no SASL. The only way this can
809      // happen is if the DataNode has ignore.secure.ports.for.testing configured, so this is a rare
810      // edge case.
811      if (LOG.isDebugEnabled()) {
812        LOG.debug("SASL client skipping handshake in secured configuration with no SASL "
813            + "protection configured for addr = " + addr + ", datanodeId = " + dnInfo);
814      }
815      saslPromise.trySuccess(null);
816    }
817  }
818
819  static Encryptor createEncryptor(Configuration conf, HdfsFileStatus stat, DFSClient client)
820      throws IOException {
821    FileEncryptionInfo feInfo = stat.getFileEncryptionInfo();
822    if (feInfo == null) {
823      return null;
824    }
825    return TRANSPARENT_CRYPTO_HELPER.createEncryptor(conf, feInfo, client);
826  }
827}