001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.asyncfs; 019 020import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY; 021import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE; 022 023import com.google.protobuf.ByteString; 024import com.google.protobuf.CodedOutputStream; 025import java.io.IOException; 026import java.lang.reflect.Field; 027import java.lang.reflect.InvocationTargetException; 028import java.lang.reflect.Method; 029import java.net.InetAddress; 030import java.net.InetSocketAddress; 031import java.nio.ByteBuffer; 032import java.security.GeneralSecurityException; 033import java.util.Arrays; 034import java.util.Collections; 035import java.util.List; 036import java.util.Map; 037import java.util.Set; 038import java.util.concurrent.TimeUnit; 039import java.util.concurrent.atomic.AtomicBoolean; 040import javax.security.auth.callback.Callback; 041import javax.security.auth.callback.CallbackHandler; 042import javax.security.auth.callback.NameCallback; 043import javax.security.auth.callback.PasswordCallback; 044import javax.security.auth.callback.UnsupportedCallbackException; 045import javax.security.sasl.RealmCallback; 046import javax.security.sasl.RealmChoiceCallback; 047import javax.security.sasl.Sasl; 048import javax.security.sasl.SaslClient; 049import javax.security.sasl.SaslException; 050import org.apache.commons.codec.binary.Base64; 051import org.apache.commons.lang3.StringUtils; 052import org.apache.hadoop.conf.Configuration; 053import org.apache.hadoop.crypto.CipherOption; 054import org.apache.hadoop.crypto.CipherSuite; 055import org.apache.hadoop.crypto.CryptoCodec; 056import org.apache.hadoop.crypto.Decryptor; 057import org.apache.hadoop.crypto.Encryptor; 058import org.apache.hadoop.crypto.key.KeyProvider; 059import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion; 060import org.apache.hadoop.fs.FileEncryptionInfo; 061import org.apache.hadoop.hdfs.DFSClient; 062import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 063import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; 064import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; 065import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver; 066import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient; 067import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto; 068import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus; 069import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; 070import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; 071import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; 072import org.apache.hadoop.security.SaslPropertiesResolver; 073import org.apache.hadoop.security.SaslRpcServer.QualityOfProtection; 074import org.apache.hadoop.security.UserGroupInformation; 075import org.apache.hadoop.security.token.Token; 076import org.apache.yetus.audience.InterfaceAudience; 077import org.slf4j.Logger; 078import org.slf4j.LoggerFactory; 079 080import org.apache.hbase.thirdparty.com.google.common.base.Charsets; 081import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 082import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet; 083import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 084import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 085import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream; 086import org.apache.hbase.thirdparty.io.netty.buffer.CompositeByteBuf; 087import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled; 088import org.apache.hbase.thirdparty.io.netty.channel.Channel; 089import org.apache.hbase.thirdparty.io.netty.channel.ChannelDuplexHandler; 090import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; 091import org.apache.hbase.thirdparty.io.netty.channel.ChannelOutboundHandlerAdapter; 092import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline; 093import org.apache.hbase.thirdparty.io.netty.channel.ChannelPromise; 094import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler; 095import org.apache.hbase.thirdparty.io.netty.handler.codec.LengthFieldBasedFrameDecoder; 096import org.apache.hbase.thirdparty.io.netty.handler.codec.MessageToByteEncoder; 097import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufDecoder; 098import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; 099import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent; 100import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler; 101import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise; 102 103/** 104 * Helper class for adding sasl support for {@link FanOutOneBlockAsyncDFSOutput}. 105 */ 106@InterfaceAudience.Private 107public final class FanOutOneBlockAsyncDFSOutputSaslHelper { 108 private static final Logger LOG = 109 LoggerFactory.getLogger(FanOutOneBlockAsyncDFSOutputSaslHelper.class); 110 111 private FanOutOneBlockAsyncDFSOutputSaslHelper() { 112 } 113 114 private static final String SERVER_NAME = "0"; 115 private static final String PROTOCOL = "hdfs"; 116 private static final String MECHANISM = "DIGEST-MD5"; 117 private static final int SASL_TRANSFER_MAGIC_NUMBER = 0xDEADBEEF; 118 private static final String NAME_DELIMITER = " "; 119 120 private interface SaslAdaptor { 121 122 TrustedChannelResolver getTrustedChannelResolver(SaslDataTransferClient saslClient); 123 124 SaslPropertiesResolver getSaslPropsResolver(SaslDataTransferClient saslClient); 125 126 AtomicBoolean getFallbackToSimpleAuth(SaslDataTransferClient saslClient); 127 } 128 129 private static final SaslAdaptor SASL_ADAPTOR; 130 131 private interface TransparentCryptoHelper { 132 133 Encryptor createEncryptor(Configuration conf, FileEncryptionInfo feInfo, DFSClient client) 134 throws IOException; 135 } 136 137 private static final TransparentCryptoHelper TRANSPARENT_CRYPTO_HELPER; 138 139 private static SaslAdaptor createSaslAdaptor() 140 throws NoSuchFieldException, NoSuchMethodException { 141 Field saslPropsResolverField = 142 SaslDataTransferClient.class.getDeclaredField("saslPropsResolver"); 143 saslPropsResolverField.setAccessible(true); 144 Field trustedChannelResolverField = 145 SaslDataTransferClient.class.getDeclaredField("trustedChannelResolver"); 146 trustedChannelResolverField.setAccessible(true); 147 Field fallbackToSimpleAuthField = 148 SaslDataTransferClient.class.getDeclaredField("fallbackToSimpleAuth"); 149 fallbackToSimpleAuthField.setAccessible(true); 150 return new SaslAdaptor() { 151 152 @Override 153 public TrustedChannelResolver getTrustedChannelResolver(SaslDataTransferClient saslClient) { 154 try { 155 return (TrustedChannelResolver) trustedChannelResolverField.get(saslClient); 156 } catch (IllegalAccessException e) { 157 throw new RuntimeException(e); 158 } 159 } 160 161 @Override 162 public SaslPropertiesResolver getSaslPropsResolver(SaslDataTransferClient saslClient) { 163 try { 164 return (SaslPropertiesResolver) saslPropsResolverField.get(saslClient); 165 } catch (IllegalAccessException e) { 166 throw new RuntimeException(e); 167 } 168 } 169 170 @Override 171 public AtomicBoolean getFallbackToSimpleAuth(SaslDataTransferClient saslClient) { 172 try { 173 return (AtomicBoolean) fallbackToSimpleAuthField.get(saslClient); 174 } catch (IllegalAccessException e) { 175 throw new RuntimeException(e); 176 } 177 } 178 }; 179 } 180 181 private static TransparentCryptoHelper createTransparentCryptoHelperWithoutHDFS12396() 182 throws NoSuchMethodException { 183 Method decryptEncryptedDataEncryptionKeyMethod = DFSClient.class 184 .getDeclaredMethod("decryptEncryptedDataEncryptionKey", FileEncryptionInfo.class); 185 decryptEncryptedDataEncryptionKeyMethod.setAccessible(true); 186 return new TransparentCryptoHelper() { 187 188 @Override 189 public Encryptor createEncryptor(Configuration conf, FileEncryptionInfo feInfo, 190 DFSClient client) throws IOException { 191 try { 192 KeyVersion decryptedKey = 193 (KeyVersion) decryptEncryptedDataEncryptionKeyMethod.invoke(client, feInfo); 194 CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf, feInfo.getCipherSuite()); 195 Encryptor encryptor = cryptoCodec.createEncryptor(); 196 encryptor.init(decryptedKey.getMaterial(), feInfo.getIV()); 197 return encryptor; 198 } catch (InvocationTargetException e) { 199 Throwables.propagateIfPossible(e.getTargetException(), IOException.class); 200 throw new RuntimeException(e.getTargetException()); 201 } catch (GeneralSecurityException e) { 202 throw new IOException(e); 203 } catch (IllegalAccessException e) { 204 throw new RuntimeException(e); 205 } 206 } 207 }; 208 } 209 210 private static TransparentCryptoHelper createTransparentCryptoHelperWithHDFS12396() 211 throws ClassNotFoundException, NoSuchMethodException { 212 Class<?> hdfsKMSUtilCls = Class.forName("org.apache.hadoop.hdfs.HdfsKMSUtil"); 213 Method decryptEncryptedDataEncryptionKeyMethod = hdfsKMSUtilCls.getDeclaredMethod( 214 "decryptEncryptedDataEncryptionKey", FileEncryptionInfo.class, KeyProvider.class); 215 decryptEncryptedDataEncryptionKeyMethod.setAccessible(true); 216 return new TransparentCryptoHelper() { 217 218 @Override 219 public Encryptor createEncryptor(Configuration conf, FileEncryptionInfo feInfo, 220 DFSClient client) throws IOException { 221 try { 222 KeyVersion decryptedKey = (KeyVersion) decryptEncryptedDataEncryptionKeyMethod 223 .invoke(null, feInfo, client.getKeyProvider()); 224 CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf, feInfo.getCipherSuite()); 225 Encryptor encryptor = cryptoCodec.createEncryptor(); 226 encryptor.init(decryptedKey.getMaterial(), feInfo.getIV()); 227 return encryptor; 228 } catch (InvocationTargetException e) { 229 Throwables.propagateIfPossible(e.getTargetException(), IOException.class); 230 throw new RuntimeException(e.getTargetException()); 231 } catch (GeneralSecurityException e) { 232 throw new IOException(e); 233 } catch (IllegalAccessException e) { 234 throw new RuntimeException(e); 235 } 236 } 237 }; 238 } 239 240 private static TransparentCryptoHelper createTransparentCryptoHelper() 241 throws NoSuchMethodException, ClassNotFoundException { 242 try { 243 return createTransparentCryptoHelperWithoutHDFS12396(); 244 } catch (NoSuchMethodException e) { 245 LOG.debug("No decryptEncryptedDataEncryptionKey method in DFSClient," + 246 " should be hadoop version with HDFS-12396", e); 247 } 248 return createTransparentCryptoHelperWithHDFS12396(); 249 } 250 251 static { 252 try { 253 SASL_ADAPTOR = createSaslAdaptor(); 254 TRANSPARENT_CRYPTO_HELPER = createTransparentCryptoHelper(); 255 } catch (Exception e) { 256 String msg = "Couldn't properly initialize access to HDFS internals. Please " 257 + "update your WAL Provider to not make use of the 'asyncfs' provider. See " 258 + "HBASE-16110 for more information."; 259 LOG.error(msg, e); 260 throw new Error(msg, e); 261 } 262 } 263 264 /** 265 * Sets user name and password when asked by the client-side SASL object. 266 */ 267 private static final class SaslClientCallbackHandler implements CallbackHandler { 268 269 private final char[] password; 270 private final String userName; 271 272 /** 273 * Creates a new SaslClientCallbackHandler. 274 * @param userName SASL user name 275 * @param password SASL password 276 */ 277 public SaslClientCallbackHandler(String userName, char[] password) { 278 this.password = password; 279 this.userName = userName; 280 } 281 282 @Override 283 public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException { 284 NameCallback nc = null; 285 PasswordCallback pc = null; 286 RealmCallback rc = null; 287 for (Callback callback : callbacks) { 288 if (callback instanceof RealmChoiceCallback) { 289 continue; 290 } else if (callback instanceof NameCallback) { 291 nc = (NameCallback) callback; 292 } else if (callback instanceof PasswordCallback) { 293 pc = (PasswordCallback) callback; 294 } else if (callback instanceof RealmCallback) { 295 rc = (RealmCallback) callback; 296 } else { 297 throw new UnsupportedCallbackException(callback, "Unrecognized SASL client callback"); 298 } 299 } 300 if (nc != null) { 301 nc.setName(userName); 302 } 303 if (pc != null) { 304 pc.setPassword(password); 305 } 306 if (rc != null) { 307 rc.setText(rc.getDefaultText()); 308 } 309 } 310 } 311 312 private static final class SaslNegotiateHandler extends ChannelDuplexHandler { 313 314 private final Configuration conf; 315 316 private final Map<String, String> saslProps; 317 318 private final SaslClient saslClient; 319 320 private final int timeoutMs; 321 322 private final Promise<Void> promise; 323 324 private int step = 0; 325 326 public SaslNegotiateHandler(Configuration conf, String username, char[] password, 327 Map<String, String> saslProps, int timeoutMs, Promise<Void> promise) throws SaslException { 328 this.conf = conf; 329 this.saslProps = saslProps; 330 this.saslClient = Sasl.createSaslClient(new String[] { MECHANISM }, username, PROTOCOL, 331 SERVER_NAME, saslProps, new SaslClientCallbackHandler(username, password)); 332 this.timeoutMs = timeoutMs; 333 this.promise = promise; 334 } 335 336 private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload) throws IOException { 337 sendSaslMessage(ctx, payload, null); 338 } 339 340 private List<CipherOption> getCipherOptions() throws IOException { 341 // Negotiate cipher suites if configured. Currently, the only supported 342 // cipher suite is AES/CTR/NoPadding, but the protocol allows multiple 343 // values for future expansion. 344 String cipherSuites = conf.get(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY); 345 if (StringUtils.isBlank(cipherSuites)) { 346 return null; 347 } 348 if (!cipherSuites.equals(CipherSuite.AES_CTR_NOPADDING.getName())) { 349 throw new IOException(String.format("Invalid cipher suite, %s=%s", 350 DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuites)); 351 } 352 return Collections.singletonList(new CipherOption(CipherSuite.AES_CTR_NOPADDING)); 353 } 354 355 private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload, 356 List<CipherOption> options) throws IOException { 357 DataTransferEncryptorMessageProto.Builder builder = 358 DataTransferEncryptorMessageProto.newBuilder(); 359 builder.setStatus(DataTransferEncryptorStatus.SUCCESS); 360 if (payload != null) { 361 // Was ByteStringer; fix w/o using ByteStringer. Its in hbase-protocol 362 // and we want to keep that out of hbase-server. 363 builder.setPayload(ByteString.copyFrom(payload)); 364 } 365 if (options != null) { 366 builder.addAllCipherOption(PBHelperClient.convertCipherOptions(options)); 367 } 368 DataTransferEncryptorMessageProto proto = builder.build(); 369 int size = proto.getSerializedSize(); 370 size += CodedOutputStream.computeRawVarint32Size(size); 371 ByteBuf buf = ctx.alloc().buffer(size); 372 proto.writeDelimitedTo(new ByteBufOutputStream(buf)); 373 ctx.write(buf); 374 } 375 376 @Override 377 public void handlerAdded(ChannelHandlerContext ctx) throws Exception { 378 ctx.write(ctx.alloc().buffer(4).writeInt(SASL_TRANSFER_MAGIC_NUMBER)); 379 sendSaslMessage(ctx, new byte[0]); 380 ctx.flush(); 381 step++; 382 } 383 384 @Override 385 public void channelInactive(ChannelHandlerContext ctx) throws Exception { 386 saslClient.dispose(); 387 } 388 389 private void check(DataTransferEncryptorMessageProto proto) throws IOException { 390 if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) { 391 throw new InvalidEncryptionKeyException(proto.getMessage()); 392 } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) { 393 throw new IOException(proto.getMessage()); 394 } 395 } 396 397 private String getNegotiatedQop() { 398 return (String) saslClient.getNegotiatedProperty(Sasl.QOP); 399 } 400 401 private boolean isNegotiatedQopPrivacy() { 402 String qop = getNegotiatedQop(); 403 return qop != null && "auth-conf".equalsIgnoreCase(qop); 404 } 405 406 private boolean requestedQopContainsPrivacy() { 407 Set<String> requestedQop = 408 ImmutableSet.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(","))); 409 return requestedQop.contains("auth-conf"); 410 } 411 412 private void checkSaslComplete() throws IOException { 413 if (!saslClient.isComplete()) { 414 throw new IOException("Failed to complete SASL handshake"); 415 } 416 Set<String> requestedQop = 417 ImmutableSet.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(","))); 418 String negotiatedQop = getNegotiatedQop(); 419 LOG.debug( 420 "Verifying QOP, requested QOP = " + requestedQop + ", negotiated QOP = " + negotiatedQop); 421 if (!requestedQop.contains(negotiatedQop)) { 422 throw new IOException(String.format("SASL handshake completed, but " 423 + "channel does not have acceptable quality of protection, " 424 + "requested = %s, negotiated = %s", 425 requestedQop, negotiatedQop)); 426 } 427 } 428 429 private boolean useWrap() { 430 String qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP); 431 return qop != null && !"auth".equalsIgnoreCase(qop); 432 } 433 434 private CipherOption unwrap(CipherOption option, SaslClient saslClient) throws IOException { 435 byte[] inKey = option.getInKey(); 436 if (inKey != null) { 437 inKey = saslClient.unwrap(inKey, 0, inKey.length); 438 } 439 byte[] outKey = option.getOutKey(); 440 if (outKey != null) { 441 outKey = saslClient.unwrap(outKey, 0, outKey.length); 442 } 443 return new CipherOption(option.getCipherSuite(), inKey, option.getInIv(), outKey, 444 option.getOutIv()); 445 } 446 447 private CipherOption getCipherOption(DataTransferEncryptorMessageProto proto, 448 boolean isNegotiatedQopPrivacy, SaslClient saslClient) throws IOException { 449 List<CipherOption> cipherOptions = 450 PBHelperClient.convertCipherOptionProtos(proto.getCipherOptionList()); 451 if (cipherOptions == null || cipherOptions.isEmpty()) { 452 return null; 453 } 454 CipherOption cipherOption = cipherOptions.get(0); 455 return isNegotiatedQopPrivacy ? unwrap(cipherOption, saslClient) : cipherOption; 456 } 457 458 @Override 459 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 460 if (msg instanceof DataTransferEncryptorMessageProto) { 461 DataTransferEncryptorMessageProto proto = (DataTransferEncryptorMessageProto) msg; 462 check(proto); 463 byte[] challenge = proto.getPayload().toByteArray(); 464 byte[] response = saslClient.evaluateChallenge(challenge); 465 switch (step) { 466 case 1: { 467 List<CipherOption> cipherOptions = null; 468 if (requestedQopContainsPrivacy()) { 469 cipherOptions = getCipherOptions(); 470 } 471 sendSaslMessage(ctx, response, cipherOptions); 472 ctx.flush(); 473 step++; 474 break; 475 } 476 case 2: { 477 assert response == null; 478 checkSaslComplete(); 479 CipherOption cipherOption = 480 getCipherOption(proto, isNegotiatedQopPrivacy(), saslClient); 481 ChannelPipeline p = ctx.pipeline(); 482 while (p.first() != null) { 483 p.removeFirst(); 484 } 485 if (cipherOption != null) { 486 CryptoCodec codec = CryptoCodec.getInstance(conf, cipherOption.getCipherSuite()); 487 p.addLast(new EncryptHandler(codec, cipherOption.getInKey(), cipherOption.getInIv()), 488 new DecryptHandler(codec, cipherOption.getOutKey(), cipherOption.getOutIv())); 489 } else { 490 if (useWrap()) { 491 p.addLast(new SaslWrapHandler(saslClient), 492 new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4), 493 new SaslUnwrapHandler(saslClient)); 494 } 495 } 496 promise.trySuccess(null); 497 break; 498 } 499 default: 500 throw new IllegalArgumentException("Unrecognized negotiation step: " + step); 501 } 502 } else { 503 ctx.fireChannelRead(msg); 504 } 505 } 506 507 @Override 508 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 509 promise.tryFailure(cause); 510 } 511 512 @Override 513 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { 514 if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == READER_IDLE) { 515 promise.tryFailure(new IOException("Timeout(" + timeoutMs + "ms) waiting for response")); 516 } else { 517 super.userEventTriggered(ctx, evt); 518 } 519 } 520 } 521 522 private static final class SaslUnwrapHandler extends SimpleChannelInboundHandler<ByteBuf> { 523 524 private final SaslClient saslClient; 525 526 public SaslUnwrapHandler(SaslClient saslClient) { 527 this.saslClient = saslClient; 528 } 529 530 @Override 531 public void channelInactive(ChannelHandlerContext ctx) throws Exception { 532 saslClient.dispose(); 533 } 534 535 @Override 536 protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { 537 msg.skipBytes(4); 538 byte[] b = new byte[msg.readableBytes()]; 539 msg.readBytes(b); 540 ctx.fireChannelRead(Unpooled.wrappedBuffer(saslClient.unwrap(b, 0, b.length))); 541 } 542 } 543 544 private static final class SaslWrapHandler extends ChannelOutboundHandlerAdapter { 545 546 private final SaslClient saslClient; 547 548 private CompositeByteBuf cBuf; 549 550 public SaslWrapHandler(SaslClient saslClient) { 551 this.saslClient = saslClient; 552 } 553 554 @Override 555 public void handlerAdded(ChannelHandlerContext ctx) throws Exception { 556 cBuf = new CompositeByteBuf(ctx.alloc(), false, Integer.MAX_VALUE); 557 } 558 559 @Override 560 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) 561 throws Exception { 562 if (msg instanceof ByteBuf) { 563 ByteBuf buf = (ByteBuf) msg; 564 cBuf.addComponent(buf); 565 cBuf.writerIndex(cBuf.writerIndex() + buf.readableBytes()); 566 } else { 567 ctx.write(msg); 568 } 569 } 570 571 @Override 572 public void flush(ChannelHandlerContext ctx) throws Exception { 573 if (cBuf.isReadable()) { 574 byte[] b = new byte[cBuf.readableBytes()]; 575 cBuf.readBytes(b); 576 cBuf.discardReadComponents(); 577 byte[] wrapped = saslClient.wrap(b, 0, b.length); 578 ByteBuf buf = ctx.alloc().ioBuffer(4 + wrapped.length); 579 buf.writeInt(wrapped.length); 580 buf.writeBytes(wrapped); 581 ctx.write(buf); 582 } 583 ctx.flush(); 584 } 585 586 @Override 587 public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { 588 cBuf.release(); 589 cBuf = null; 590 } 591 } 592 593 private static final class DecryptHandler extends SimpleChannelInboundHandler<ByteBuf> { 594 595 private final Decryptor decryptor; 596 597 public DecryptHandler(CryptoCodec codec, byte[] key, byte[] iv) 598 throws GeneralSecurityException, IOException { 599 this.decryptor = codec.createDecryptor(); 600 this.decryptor.init(key, Arrays.copyOf(iv, iv.length)); 601 } 602 603 @Override 604 protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { 605 ByteBuf inBuf; 606 boolean release = false; 607 if (msg.nioBufferCount() == 1) { 608 inBuf = msg; 609 } else { 610 inBuf = ctx.alloc().directBuffer(msg.readableBytes()); 611 msg.readBytes(inBuf); 612 release = true; 613 } 614 ByteBuffer inBuffer = inBuf.nioBuffer(); 615 ByteBuf outBuf = ctx.alloc().directBuffer(inBuf.readableBytes()); 616 ByteBuffer outBuffer = outBuf.nioBuffer(0, inBuf.readableBytes()); 617 decryptor.decrypt(inBuffer, outBuffer); 618 outBuf.writerIndex(inBuf.readableBytes()); 619 if (release) { 620 inBuf.release(); 621 } 622 ctx.fireChannelRead(outBuf); 623 } 624 } 625 626 private static final class EncryptHandler extends MessageToByteEncoder<ByteBuf> { 627 628 private final Encryptor encryptor; 629 630 public EncryptHandler(CryptoCodec codec, byte[] key, byte[] iv) 631 throws GeneralSecurityException, IOException { 632 this.encryptor = codec.createEncryptor(); 633 this.encryptor.init(key, Arrays.copyOf(iv, iv.length)); 634 } 635 636 @Override 637 protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, ByteBuf msg, boolean preferDirect) 638 throws Exception { 639 if (preferDirect) { 640 return ctx.alloc().directBuffer(msg.readableBytes()); 641 } else { 642 return ctx.alloc().buffer(msg.readableBytes()); 643 } 644 } 645 646 @Override 647 protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception { 648 ByteBuf inBuf; 649 boolean release = false; 650 if (msg.nioBufferCount() == 1) { 651 inBuf = msg; 652 } else { 653 inBuf = ctx.alloc().directBuffer(msg.readableBytes()); 654 msg.readBytes(inBuf); 655 release = true; 656 } 657 ByteBuffer inBuffer = inBuf.nioBuffer(); 658 ByteBuffer outBuffer = out.nioBuffer(0, inBuf.readableBytes()); 659 encryptor.encrypt(inBuffer, outBuffer); 660 out.writerIndex(inBuf.readableBytes()); 661 if (release) { 662 inBuf.release(); 663 } 664 } 665 } 666 667 private static String getUserNameFromEncryptionKey(DataEncryptionKey encryptionKey) { 668 return encryptionKey.keyId + NAME_DELIMITER + encryptionKey.blockPoolId + NAME_DELIMITER 669 + new String(Base64.encodeBase64(encryptionKey.nonce, false), Charsets.UTF_8); 670 } 671 672 private static char[] encryptionKeyToPassword(byte[] encryptionKey) { 673 return new String(Base64.encodeBase64(encryptionKey, false), Charsets.UTF_8).toCharArray(); 674 } 675 676 private static String buildUsername(Token<BlockTokenIdentifier> blockToken) { 677 return new String(Base64.encodeBase64(blockToken.getIdentifier(), false), Charsets.UTF_8); 678 } 679 680 private static char[] buildClientPassword(Token<BlockTokenIdentifier> blockToken) { 681 return new String(Base64.encodeBase64(blockToken.getPassword(), false), Charsets.UTF_8) 682 .toCharArray(); 683 } 684 685 private static Map<String, String> createSaslPropertiesForEncryption(String encryptionAlgorithm) { 686 Map<String, String> saslProps = Maps.newHashMapWithExpectedSize(3); 687 saslProps.put(Sasl.QOP, QualityOfProtection.PRIVACY.getSaslQop()); 688 saslProps.put(Sasl.SERVER_AUTH, "true"); 689 saslProps.put("com.sun.security.sasl.digest.cipher", encryptionAlgorithm); 690 return saslProps; 691 } 692 693 private static void doSaslNegotiation(Configuration conf, Channel channel, int timeoutMs, 694 String username, char[] password, Map<String, String> saslProps, Promise<Void> saslPromise) { 695 try { 696 channel.pipeline().addLast(new IdleStateHandler(timeoutMs, 0, 0, TimeUnit.MILLISECONDS), 697 new ProtobufVarint32FrameDecoder(), 698 new ProtobufDecoder(DataTransferEncryptorMessageProto.getDefaultInstance()), 699 new SaslNegotiateHandler(conf, username, password, saslProps, timeoutMs, saslPromise)); 700 } catch (SaslException e) { 701 saslPromise.tryFailure(e); 702 } 703 } 704 705 static void trySaslNegotiate(Configuration conf, Channel channel, DatanodeInfo dnInfo, 706 int timeoutMs, DFSClient client, Token<BlockTokenIdentifier> accessToken, 707 Promise<Void> saslPromise) throws IOException { 708 SaslDataTransferClient saslClient = client.getSaslDataTransferClient(); 709 SaslPropertiesResolver saslPropsResolver = SASL_ADAPTOR.getSaslPropsResolver(saslClient); 710 TrustedChannelResolver trustedChannelResolver = 711 SASL_ADAPTOR.getTrustedChannelResolver(saslClient); 712 AtomicBoolean fallbackToSimpleAuth = SASL_ADAPTOR.getFallbackToSimpleAuth(saslClient); 713 InetAddress addr = ((InetSocketAddress) channel.remoteAddress()).getAddress(); 714 if (trustedChannelResolver.isTrusted() || trustedChannelResolver.isTrusted(addr)) { 715 saslPromise.trySuccess(null); 716 return; 717 } 718 DataEncryptionKey encryptionKey = client.newDataEncryptionKey(); 719 if (encryptionKey != null) { 720 if (LOG.isDebugEnabled()) { 721 LOG.debug( 722 "SASL client doing encrypted handshake for addr = " + addr + ", datanodeId = " + dnInfo); 723 } 724 doSaslNegotiation(conf, channel, timeoutMs, getUserNameFromEncryptionKey(encryptionKey), 725 encryptionKeyToPassword(encryptionKey.encryptionKey), 726 createSaslPropertiesForEncryption(encryptionKey.encryptionAlgorithm), saslPromise); 727 } else if (!UserGroupInformation.isSecurityEnabled()) { 728 if (LOG.isDebugEnabled()) { 729 LOG.debug("SASL client skipping handshake in unsecured configuration for addr = " + addr 730 + ", datanodeId = " + dnInfo); 731 } 732 saslPromise.trySuccess(null); 733 } else if (dnInfo.getXferPort() < 1024) { 734 if (LOG.isDebugEnabled()) { 735 LOG.debug("SASL client skipping handshake in secured configuration with " 736 + "privileged port for addr = " + addr + ", datanodeId = " + dnInfo); 737 } 738 saslPromise.trySuccess(null); 739 } else if (fallbackToSimpleAuth != null && fallbackToSimpleAuth.get()) { 740 if (LOG.isDebugEnabled()) { 741 LOG.debug("SASL client skipping handshake in secured configuration with " 742 + "unsecured cluster for addr = " + addr + ", datanodeId = " + dnInfo); 743 } 744 saslPromise.trySuccess(null); 745 } else if (saslPropsResolver != null) { 746 if (LOG.isDebugEnabled()) { 747 LOG.debug( 748 "SASL client doing general handshake for addr = " + addr + ", datanodeId = " + dnInfo); 749 } 750 doSaslNegotiation(conf, channel, timeoutMs, buildUsername(accessToken), 751 buildClientPassword(accessToken), saslPropsResolver.getClientProperties(addr), saslPromise); 752 } else { 753 // It's a secured cluster using non-privileged ports, but no SASL. The only way this can 754 // happen is if the DataNode has ignore.secure.ports.for.testing configured, so this is a rare 755 // edge case. 756 if (LOG.isDebugEnabled()) { 757 LOG.debug("SASL client skipping handshake in secured configuration with no SASL " 758 + "protection configured for addr = " + addr + ", datanodeId = " + dnInfo); 759 } 760 saslPromise.trySuccess(null); 761 } 762 } 763 764 static Encryptor createEncryptor(Configuration conf, HdfsFileStatus stat, DFSClient client) 765 throws IOException { 766 FileEncryptionInfo feInfo = stat.getFileEncryptionInfo(); 767 if (feInfo == null) { 768 return null; 769 } 770 return TRANSPARENT_CRYPTO_HELPER.createEncryptor(conf, feInfo, client); 771 } 772}