001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.asyncfs; 019 020import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY; 021import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE; 022 023import com.google.protobuf.ByteString; 024import com.google.protobuf.CodedOutputStream; 025import java.io.IOException; 026import java.lang.reflect.Field; 027import java.lang.reflect.InvocationTargetException; 028import java.lang.reflect.Method; 029import java.net.InetAddress; 030import java.net.InetSocketAddress; 031import java.nio.ByteBuffer; 032import java.security.GeneralSecurityException; 033import java.util.Arrays; 034import java.util.Collections; 035import java.util.List; 036import java.util.Map; 037import java.util.Set; 038import java.util.concurrent.TimeUnit; 039import java.util.concurrent.atomic.AtomicBoolean; 040import javax.security.auth.callback.Callback; 041import javax.security.auth.callback.CallbackHandler; 042import javax.security.auth.callback.NameCallback; 043import javax.security.auth.callback.PasswordCallback; 044import javax.security.auth.callback.UnsupportedCallbackException; 045import javax.security.sasl.RealmCallback; 046import javax.security.sasl.RealmChoiceCallback; 047import javax.security.sasl.Sasl; 048import javax.security.sasl.SaslClient; 049import javax.security.sasl.SaslException; 050import org.apache.commons.codec.binary.Base64; 051import org.apache.commons.lang3.StringUtils; 052import org.apache.hadoop.conf.Configuration; 053import org.apache.hadoop.crypto.CipherOption; 054import org.apache.hadoop.crypto.CipherSuite; 055import org.apache.hadoop.crypto.CryptoCodec; 056import org.apache.hadoop.crypto.Decryptor; 057import org.apache.hadoop.crypto.Encryptor; 058import org.apache.hadoop.crypto.key.KeyProvider; 059import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion; 060import org.apache.hadoop.fs.FileEncryptionInfo; 061import org.apache.hadoop.hdfs.DFSClient; 062import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 063import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; 064import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; 065import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver; 066import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient; 067import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto; 068import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus; 069import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CipherOptionProto; 070import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; 071import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; 072import org.apache.hadoop.security.SaslPropertiesResolver; 073import org.apache.hadoop.security.SaslRpcServer.QualityOfProtection; 074import org.apache.hadoop.security.UserGroupInformation; 075import org.apache.hadoop.security.token.Token; 076import org.apache.yetus.audience.InterfaceAudience; 077import org.slf4j.Logger; 078import org.slf4j.LoggerFactory; 079 080import org.apache.hbase.thirdparty.com.google.common.base.Charsets; 081import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 082import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet; 083import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 084import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 085import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream; 086import org.apache.hbase.thirdparty.io.netty.buffer.CompositeByteBuf; 087import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled; 088import org.apache.hbase.thirdparty.io.netty.channel.Channel; 089import org.apache.hbase.thirdparty.io.netty.channel.ChannelDuplexHandler; 090import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; 091import org.apache.hbase.thirdparty.io.netty.channel.ChannelOutboundHandlerAdapter; 092import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline; 093import org.apache.hbase.thirdparty.io.netty.channel.ChannelPromise; 094import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler; 095import org.apache.hbase.thirdparty.io.netty.handler.codec.LengthFieldBasedFrameDecoder; 096import org.apache.hbase.thirdparty.io.netty.handler.codec.MessageToByteEncoder; 097import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufDecoder; 098import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; 099import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent; 100import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler; 101import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise; 102 103/** 104 * Helper class for adding sasl support for {@link FanOutOneBlockAsyncDFSOutput}. 105 */ 106@InterfaceAudience.Private 107public final class FanOutOneBlockAsyncDFSOutputSaslHelper { 108 private static final Logger LOG = 109 LoggerFactory.getLogger(FanOutOneBlockAsyncDFSOutputSaslHelper.class); 110 111 private FanOutOneBlockAsyncDFSOutputSaslHelper() { 112 } 113 114 private static final String SERVER_NAME = "0"; 115 private static final String PROTOCOL = "hdfs"; 116 private static final String MECHANISM = "DIGEST-MD5"; 117 private static final int SASL_TRANSFER_MAGIC_NUMBER = 0xDEADBEEF; 118 private static final String NAME_DELIMITER = " "; 119 120 private interface SaslAdaptor { 121 122 TrustedChannelResolver getTrustedChannelResolver(SaslDataTransferClient saslClient); 123 124 SaslPropertiesResolver getSaslPropsResolver(SaslDataTransferClient saslClient); 125 126 AtomicBoolean getFallbackToSimpleAuth(SaslDataTransferClient saslClient); 127 } 128 129 private static final SaslAdaptor SASL_ADAPTOR; 130 131 // helper class for convert protos. 132 private interface PBHelper { 133 134 List<CipherOptionProto> convertCipherOptions(List<CipherOption> options); 135 136 List<CipherOption> convertCipherOptionProtos(List<CipherOptionProto> options); 137 } 138 139 private static final PBHelper PB_HELPER; 140 141 private interface TransparentCryptoHelper { 142 143 Encryptor createEncryptor(Configuration conf, FileEncryptionInfo feInfo, DFSClient client) 144 throws IOException; 145 } 146 147 private static final TransparentCryptoHelper TRANSPARENT_CRYPTO_HELPER; 148 149 private static SaslAdaptor createSaslAdaptor() 150 throws NoSuchFieldException, NoSuchMethodException { 151 Field saslPropsResolverField = 152 SaslDataTransferClient.class.getDeclaredField("saslPropsResolver"); 153 saslPropsResolverField.setAccessible(true); 154 Field trustedChannelResolverField = 155 SaslDataTransferClient.class.getDeclaredField("trustedChannelResolver"); 156 trustedChannelResolverField.setAccessible(true); 157 Field fallbackToSimpleAuthField = 158 SaslDataTransferClient.class.getDeclaredField("fallbackToSimpleAuth"); 159 fallbackToSimpleAuthField.setAccessible(true); 160 return new SaslAdaptor() { 161 162 @Override 163 public TrustedChannelResolver getTrustedChannelResolver(SaslDataTransferClient saslClient) { 164 try { 165 return (TrustedChannelResolver) trustedChannelResolverField.get(saslClient); 166 } catch (IllegalAccessException e) { 167 throw new RuntimeException(e); 168 } 169 } 170 171 @Override 172 public SaslPropertiesResolver getSaslPropsResolver(SaslDataTransferClient saslClient) { 173 try { 174 return (SaslPropertiesResolver) saslPropsResolverField.get(saslClient); 175 } catch (IllegalAccessException e) { 176 throw new RuntimeException(e); 177 } 178 } 179 180 @Override 181 public AtomicBoolean getFallbackToSimpleAuth(SaslDataTransferClient saslClient) { 182 try { 183 return (AtomicBoolean) fallbackToSimpleAuthField.get(saslClient); 184 } catch (IllegalAccessException e) { 185 throw new RuntimeException(e); 186 } 187 } 188 }; 189 } 190 191 private static PBHelper createPBHelper() throws NoSuchMethodException { 192 Class<?> helperClass; 193 try { 194 helperClass = Class.forName("org.apache.hadoop.hdfs.protocolPB.PBHelperClient"); 195 } catch (ClassNotFoundException e) { 196 LOG.debug("No PBHelperClient class found, should be hadoop 2.7-", e); 197 helperClass = org.apache.hadoop.hdfs.protocolPB.PBHelper.class; 198 } 199 Method convertCipherOptionsMethod = helperClass.getMethod("convertCipherOptions", List.class); 200 Method convertCipherOptionProtosMethod = 201 helperClass.getMethod("convertCipherOptionProtos", List.class); 202 return new PBHelper() { 203 204 @SuppressWarnings("unchecked") 205 @Override 206 public List<CipherOptionProto> convertCipherOptions(List<CipherOption> options) { 207 try { 208 return (List<CipherOptionProto>) convertCipherOptionsMethod.invoke(null, options); 209 } catch (IllegalAccessException | InvocationTargetException e) { 210 throw new RuntimeException(e); 211 } 212 } 213 214 @SuppressWarnings("unchecked") 215 @Override 216 public List<CipherOption> convertCipherOptionProtos(List<CipherOptionProto> options) { 217 try { 218 return (List<CipherOption>) convertCipherOptionProtosMethod.invoke(null, options); 219 } catch (IllegalAccessException | InvocationTargetException e) { 220 throw new RuntimeException(e); 221 } 222 } 223 }; 224 } 225 226 private static TransparentCryptoHelper createTransparentCryptoHelper27() 227 throws NoSuchMethodException { 228 Method decryptEncryptedDataEncryptionKeyMethod = DFSClient.class 229 .getDeclaredMethod("decryptEncryptedDataEncryptionKey", FileEncryptionInfo.class); 230 decryptEncryptedDataEncryptionKeyMethod.setAccessible(true); 231 return new TransparentCryptoHelper() { 232 233 @Override 234 public Encryptor createEncryptor(Configuration conf, FileEncryptionInfo feInfo, 235 DFSClient client) throws IOException { 236 try { 237 KeyVersion decryptedKey = 238 (KeyVersion) decryptEncryptedDataEncryptionKeyMethod.invoke(client, feInfo); 239 CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf, feInfo.getCipherSuite()); 240 Encryptor encryptor = cryptoCodec.createEncryptor(); 241 encryptor.init(decryptedKey.getMaterial(), feInfo.getIV()); 242 return encryptor; 243 } catch (InvocationTargetException e) { 244 Throwables.propagateIfPossible(e.getTargetException(), IOException.class); 245 throw new RuntimeException(e.getTargetException()); 246 } catch (GeneralSecurityException e) { 247 throw new IOException(e); 248 } catch (IllegalAccessException e) { 249 throw new RuntimeException(e); 250 } 251 } 252 }; 253 } 254 255 private static TransparentCryptoHelper createTransparentCryptoHelper28() 256 throws ClassNotFoundException, NoSuchMethodException { 257 Class<?> hdfsKMSUtilCls = Class.forName("org.apache.hadoop.hdfs.HdfsKMSUtil"); 258 Method decryptEncryptedDataEncryptionKeyMethod = hdfsKMSUtilCls.getDeclaredMethod( 259 "decryptEncryptedDataEncryptionKey", FileEncryptionInfo.class, KeyProvider.class); 260 decryptEncryptedDataEncryptionKeyMethod.setAccessible(true); 261 return new TransparentCryptoHelper() { 262 263 @Override 264 public Encryptor createEncryptor(Configuration conf, FileEncryptionInfo feInfo, 265 DFSClient client) throws IOException { 266 try { 267 KeyVersion decryptedKey = (KeyVersion) decryptEncryptedDataEncryptionKeyMethod 268 .invoke(null, feInfo, client.getKeyProvider()); 269 CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf, feInfo.getCipherSuite()); 270 Encryptor encryptor = cryptoCodec.createEncryptor(); 271 encryptor.init(decryptedKey.getMaterial(), feInfo.getIV()); 272 return encryptor; 273 } catch (InvocationTargetException e) { 274 Throwables.propagateIfPossible(e.getTargetException(), IOException.class); 275 throw new RuntimeException(e.getTargetException()); 276 } catch (GeneralSecurityException e) { 277 throw new IOException(e); 278 } catch (IllegalAccessException e) { 279 throw new RuntimeException(e); 280 } 281 } 282 }; 283 } 284 285 private static TransparentCryptoHelper createTransparentCryptoHelper() 286 throws NoSuchMethodException, ClassNotFoundException { 287 try { 288 return createTransparentCryptoHelper27(); 289 } catch (NoSuchMethodException e) { 290 LOG.debug("No decryptEncryptedDataEncryptionKey method in DFSClient, should be hadoop 2.8+", 291 e); 292 } 293 return createTransparentCryptoHelper28(); 294 } 295 296 static { 297 try { 298 SASL_ADAPTOR = createSaslAdaptor(); 299 PB_HELPER = createPBHelper(); 300 TRANSPARENT_CRYPTO_HELPER = createTransparentCryptoHelper(); 301 } catch (Exception e) { 302 String msg = "Couldn't properly initialize access to HDFS internals. Please " 303 + "update your WAL Provider to not make use of the 'asyncfs' provider. See " 304 + "HBASE-16110 for more information."; 305 LOG.error(msg, e); 306 throw new Error(msg, e); 307 } 308 } 309 310 /** 311 * Sets user name and password when asked by the client-side SASL object. 312 */ 313 private static final class SaslClientCallbackHandler implements CallbackHandler { 314 315 private final char[] password; 316 private final String userName; 317 318 /** 319 * Creates a new SaslClientCallbackHandler. 320 * @param userName SASL user name 321 * @param password SASL password 322 */ 323 public SaslClientCallbackHandler(String userName, char[] password) { 324 this.password = password; 325 this.userName = userName; 326 } 327 328 @Override 329 public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException { 330 NameCallback nc = null; 331 PasswordCallback pc = null; 332 RealmCallback rc = null; 333 for (Callback callback : callbacks) { 334 if (callback instanceof RealmChoiceCallback) { 335 continue; 336 } else if (callback instanceof NameCallback) { 337 nc = (NameCallback) callback; 338 } else if (callback instanceof PasswordCallback) { 339 pc = (PasswordCallback) callback; 340 } else if (callback instanceof RealmCallback) { 341 rc = (RealmCallback) callback; 342 } else { 343 throw new UnsupportedCallbackException(callback, "Unrecognized SASL client callback"); 344 } 345 } 346 if (nc != null) { 347 nc.setName(userName); 348 } 349 if (pc != null) { 350 pc.setPassword(password); 351 } 352 if (rc != null) { 353 rc.setText(rc.getDefaultText()); 354 } 355 } 356 } 357 358 private static final class SaslNegotiateHandler extends ChannelDuplexHandler { 359 360 private final Configuration conf; 361 362 private final Map<String, String> saslProps; 363 364 private final SaslClient saslClient; 365 366 private final int timeoutMs; 367 368 private final Promise<Void> promise; 369 370 private final DFSClient dfsClient; 371 372 private int step = 0; 373 374 public SaslNegotiateHandler(Configuration conf, String username, char[] password, 375 Map<String, String> saslProps, int timeoutMs, Promise<Void> promise, 376 DFSClient dfsClient) throws SaslException { 377 this.conf = conf; 378 this.saslProps = saslProps; 379 this.saslClient = Sasl.createSaslClient(new String[] { MECHANISM }, username, PROTOCOL, 380 SERVER_NAME, saslProps, new SaslClientCallbackHandler(username, password)); 381 this.timeoutMs = timeoutMs; 382 this.promise = promise; 383 this.dfsClient = dfsClient; 384 } 385 386 private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload) throws IOException { 387 sendSaslMessage(ctx, payload, null); 388 } 389 390 private List<CipherOption> getCipherOptions() throws IOException { 391 // Negotiate cipher suites if configured. Currently, the only supported 392 // cipher suite is AES/CTR/NoPadding, but the protocol allows multiple 393 // values for future expansion. 394 String cipherSuites = conf.get(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY); 395 if (StringUtils.isBlank(cipherSuites)) { 396 return null; 397 } 398 if (!cipherSuites.equals(CipherSuite.AES_CTR_NOPADDING.getName())) { 399 throw new IOException(String.format("Invalid cipher suite, %s=%s", 400 DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuites)); 401 } 402 return Collections.singletonList(new CipherOption(CipherSuite.AES_CTR_NOPADDING)); 403 } 404 405 private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload, 406 List<CipherOption> options) throws IOException { 407 DataTransferEncryptorMessageProto.Builder builder = 408 DataTransferEncryptorMessageProto.newBuilder(); 409 builder.setStatus(DataTransferEncryptorStatus.SUCCESS); 410 if (payload != null) { 411 // Was ByteStringer; fix w/o using ByteStringer. Its in hbase-protocol 412 // and we want to keep that out of hbase-server. 413 builder.setPayload(ByteString.copyFrom(payload)); 414 } 415 if (options != null) { 416 builder.addAllCipherOption(PB_HELPER.convertCipherOptions(options)); 417 } 418 DataTransferEncryptorMessageProto proto = builder.build(); 419 int size = proto.getSerializedSize(); 420 size += CodedOutputStream.computeRawVarint32Size(size); 421 ByteBuf buf = ctx.alloc().buffer(size); 422 proto.writeDelimitedTo(new ByteBufOutputStream(buf)); 423 ctx.write(buf); 424 } 425 426 @Override 427 public void handlerAdded(ChannelHandlerContext ctx) throws Exception { 428 ctx.write(ctx.alloc().buffer(4).writeInt(SASL_TRANSFER_MAGIC_NUMBER)); 429 sendSaslMessage(ctx, new byte[0]); 430 ctx.flush(); 431 step++; 432 } 433 434 @Override 435 public void channelInactive(ChannelHandlerContext ctx) throws Exception { 436 saslClient.dispose(); 437 } 438 439 private void check(DataTransferEncryptorMessageProto proto) throws IOException { 440 if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) { 441 dfsClient.clearDataEncryptionKey(); 442 throw new InvalidEncryptionKeyException(proto.getMessage()); 443 } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) { 444 throw new IOException(proto.getMessage()); 445 } 446 } 447 448 private String getNegotiatedQop() { 449 return (String) saslClient.getNegotiatedProperty(Sasl.QOP); 450 } 451 452 private boolean isNegotiatedQopPrivacy() { 453 String qop = getNegotiatedQop(); 454 return qop != null && "auth-conf".equalsIgnoreCase(qop); 455 } 456 457 private boolean requestedQopContainsPrivacy() { 458 Set<String> requestedQop = 459 ImmutableSet.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(","))); 460 return requestedQop.contains("auth-conf"); 461 } 462 463 private void checkSaslComplete() throws IOException { 464 if (!saslClient.isComplete()) { 465 throw new IOException("Failed to complete SASL handshake"); 466 } 467 Set<String> requestedQop = 468 ImmutableSet.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(","))); 469 String negotiatedQop = getNegotiatedQop(); 470 LOG.debug( 471 "Verifying QOP, requested QOP = " + requestedQop + ", negotiated QOP = " + negotiatedQop); 472 if (!requestedQop.contains(negotiatedQop)) { 473 throw new IOException(String.format("SASL handshake completed, but " 474 + "channel does not have acceptable quality of protection, " 475 + "requested = %s, negotiated = %s", 476 requestedQop, negotiatedQop)); 477 } 478 } 479 480 private boolean useWrap() { 481 String qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP); 482 return qop != null && !"auth".equalsIgnoreCase(qop); 483 } 484 485 private CipherOption unwrap(CipherOption option, SaslClient saslClient) throws IOException { 486 byte[] inKey = option.getInKey(); 487 if (inKey != null) { 488 inKey = saslClient.unwrap(inKey, 0, inKey.length); 489 } 490 byte[] outKey = option.getOutKey(); 491 if (outKey != null) { 492 outKey = saslClient.unwrap(outKey, 0, outKey.length); 493 } 494 return new CipherOption(option.getCipherSuite(), inKey, option.getInIv(), outKey, 495 option.getOutIv()); 496 } 497 498 private CipherOption getCipherOption(DataTransferEncryptorMessageProto proto, 499 boolean isNegotiatedQopPrivacy, SaslClient saslClient) throws IOException { 500 List<CipherOption> cipherOptions = 501 PB_HELPER.convertCipherOptionProtos(proto.getCipherOptionList()); 502 if (cipherOptions == null || cipherOptions.isEmpty()) { 503 return null; 504 } 505 CipherOption cipherOption = cipherOptions.get(0); 506 return isNegotiatedQopPrivacy ? unwrap(cipherOption, saslClient) : cipherOption; 507 } 508 509 @Override 510 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 511 if (msg instanceof DataTransferEncryptorMessageProto) { 512 DataTransferEncryptorMessageProto proto = (DataTransferEncryptorMessageProto) msg; 513 check(proto); 514 byte[] challenge = proto.getPayload().toByteArray(); 515 byte[] response = saslClient.evaluateChallenge(challenge); 516 switch (step) { 517 case 1: { 518 List<CipherOption> cipherOptions = null; 519 if (requestedQopContainsPrivacy()) { 520 cipherOptions = getCipherOptions(); 521 } 522 sendSaslMessage(ctx, response, cipherOptions); 523 ctx.flush(); 524 step++; 525 break; 526 } 527 case 2: { 528 assert response == null; 529 checkSaslComplete(); 530 CipherOption cipherOption = 531 getCipherOption(proto, isNegotiatedQopPrivacy(), saslClient); 532 ChannelPipeline p = ctx.pipeline(); 533 while (p.first() != null) { 534 p.removeFirst(); 535 } 536 if (cipherOption != null) { 537 CryptoCodec codec = CryptoCodec.getInstance(conf, cipherOption.getCipherSuite()); 538 p.addLast(new EncryptHandler(codec, cipherOption.getInKey(), cipherOption.getInIv()), 539 new DecryptHandler(codec, cipherOption.getOutKey(), cipherOption.getOutIv())); 540 } else { 541 if (useWrap()) { 542 p.addLast(new SaslWrapHandler(saslClient), 543 new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4), 544 new SaslUnwrapHandler(saslClient)); 545 } 546 } 547 promise.trySuccess(null); 548 break; 549 } 550 default: 551 throw new IllegalArgumentException("Unrecognized negotiation step: " + step); 552 } 553 } else { 554 ctx.fireChannelRead(msg); 555 } 556 } 557 558 @Override 559 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 560 promise.tryFailure(cause); 561 } 562 563 @Override 564 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { 565 if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == READER_IDLE) { 566 promise.tryFailure(new IOException("Timeout(" + timeoutMs + "ms) waiting for response")); 567 } else { 568 super.userEventTriggered(ctx, evt); 569 } 570 } 571 } 572 573 private static final class SaslUnwrapHandler extends SimpleChannelInboundHandler<ByteBuf> { 574 575 private final SaslClient saslClient; 576 577 public SaslUnwrapHandler(SaslClient saslClient) { 578 this.saslClient = saslClient; 579 } 580 581 @Override 582 public void channelInactive(ChannelHandlerContext ctx) throws Exception { 583 saslClient.dispose(); 584 } 585 586 @Override 587 protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { 588 msg.skipBytes(4); 589 byte[] b = new byte[msg.readableBytes()]; 590 msg.readBytes(b); 591 ctx.fireChannelRead(Unpooled.wrappedBuffer(saslClient.unwrap(b, 0, b.length))); 592 } 593 } 594 595 private static final class SaslWrapHandler extends ChannelOutboundHandlerAdapter { 596 597 private final SaslClient saslClient; 598 599 private CompositeByteBuf cBuf; 600 601 public SaslWrapHandler(SaslClient saslClient) { 602 this.saslClient = saslClient; 603 } 604 605 @Override 606 public void handlerAdded(ChannelHandlerContext ctx) throws Exception { 607 cBuf = new CompositeByteBuf(ctx.alloc(), false, Integer.MAX_VALUE); 608 } 609 610 @Override 611 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) 612 throws Exception { 613 if (msg instanceof ByteBuf) { 614 ByteBuf buf = (ByteBuf) msg; 615 cBuf.addComponent(buf); 616 cBuf.writerIndex(cBuf.writerIndex() + buf.readableBytes()); 617 } else { 618 ctx.write(msg); 619 } 620 } 621 622 @Override 623 public void flush(ChannelHandlerContext ctx) throws Exception { 624 if (cBuf.isReadable()) { 625 byte[] b = new byte[cBuf.readableBytes()]; 626 cBuf.readBytes(b); 627 cBuf.discardReadComponents(); 628 byte[] wrapped = saslClient.wrap(b, 0, b.length); 629 ByteBuf buf = ctx.alloc().ioBuffer(4 + wrapped.length); 630 buf.writeInt(wrapped.length); 631 buf.writeBytes(wrapped); 632 ctx.write(buf); 633 } 634 ctx.flush(); 635 } 636 637 @Override 638 public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { 639 cBuf.release(); 640 cBuf = null; 641 } 642 } 643 644 private static final class DecryptHandler extends SimpleChannelInboundHandler<ByteBuf> { 645 646 private final Decryptor decryptor; 647 648 public DecryptHandler(CryptoCodec codec, byte[] key, byte[] iv) 649 throws GeneralSecurityException, IOException { 650 this.decryptor = codec.createDecryptor(); 651 this.decryptor.init(key, Arrays.copyOf(iv, iv.length)); 652 } 653 654 @Override 655 protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { 656 ByteBuf inBuf; 657 boolean release = false; 658 if (msg.nioBufferCount() == 1) { 659 inBuf = msg; 660 } else { 661 inBuf = ctx.alloc().directBuffer(msg.readableBytes()); 662 msg.readBytes(inBuf); 663 release = true; 664 } 665 ByteBuffer inBuffer = inBuf.nioBuffer(); 666 ByteBuf outBuf = ctx.alloc().directBuffer(inBuf.readableBytes()); 667 ByteBuffer outBuffer = outBuf.nioBuffer(0, inBuf.readableBytes()); 668 decryptor.decrypt(inBuffer, outBuffer); 669 outBuf.writerIndex(inBuf.readableBytes()); 670 if (release) { 671 inBuf.release(); 672 } 673 ctx.fireChannelRead(outBuf); 674 } 675 } 676 677 private static final class EncryptHandler extends MessageToByteEncoder<ByteBuf> { 678 679 private final Encryptor encryptor; 680 681 public EncryptHandler(CryptoCodec codec, byte[] key, byte[] iv) 682 throws GeneralSecurityException, IOException { 683 this.encryptor = codec.createEncryptor(); 684 this.encryptor.init(key, Arrays.copyOf(iv, iv.length)); 685 } 686 687 @Override 688 protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, ByteBuf msg, boolean preferDirect) 689 throws Exception { 690 if (preferDirect) { 691 return ctx.alloc().directBuffer(msg.readableBytes()); 692 } else { 693 return ctx.alloc().buffer(msg.readableBytes()); 694 } 695 } 696 697 @Override 698 protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception { 699 ByteBuf inBuf; 700 boolean release = false; 701 if (msg.nioBufferCount() == 1) { 702 inBuf = msg; 703 } else { 704 inBuf = ctx.alloc().directBuffer(msg.readableBytes()); 705 msg.readBytes(inBuf); 706 release = true; 707 } 708 ByteBuffer inBuffer = inBuf.nioBuffer(); 709 ByteBuffer outBuffer = out.nioBuffer(0, inBuf.readableBytes()); 710 encryptor.encrypt(inBuffer, outBuffer); 711 out.writerIndex(inBuf.readableBytes()); 712 if (release) { 713 inBuf.release(); 714 } 715 } 716 } 717 718 private static String getUserNameFromEncryptionKey(DataEncryptionKey encryptionKey) { 719 return encryptionKey.keyId + NAME_DELIMITER + encryptionKey.blockPoolId + NAME_DELIMITER 720 + new String(Base64.encodeBase64(encryptionKey.nonce, false), Charsets.UTF_8); 721 } 722 723 private static char[] encryptionKeyToPassword(byte[] encryptionKey) { 724 return new String(Base64.encodeBase64(encryptionKey, false), Charsets.UTF_8).toCharArray(); 725 } 726 727 private static String buildUsername(Token<BlockTokenIdentifier> blockToken) { 728 return new String(Base64.encodeBase64(blockToken.getIdentifier(), false), Charsets.UTF_8); 729 } 730 731 private static char[] buildClientPassword(Token<BlockTokenIdentifier> blockToken) { 732 return new String(Base64.encodeBase64(blockToken.getPassword(), false), Charsets.UTF_8) 733 .toCharArray(); 734 } 735 736 private static Map<String, String> createSaslPropertiesForEncryption(String encryptionAlgorithm) { 737 Map<String, String> saslProps = Maps.newHashMapWithExpectedSize(3); 738 saslProps.put(Sasl.QOP, QualityOfProtection.PRIVACY.getSaslQop()); 739 saslProps.put(Sasl.SERVER_AUTH, "true"); 740 saslProps.put("com.sun.security.sasl.digest.cipher", encryptionAlgorithm); 741 return saslProps; 742 } 743 744 private static void doSaslNegotiation(Configuration conf, Channel channel, int timeoutMs, 745 String username, char[] password, Map<String, String> saslProps, Promise<Void> saslPromise, 746 DFSClient dfsClient) { 747 try { 748 channel.pipeline().addLast(new IdleStateHandler(timeoutMs, 0, 0, TimeUnit.MILLISECONDS), 749 new ProtobufVarint32FrameDecoder(), 750 new ProtobufDecoder(DataTransferEncryptorMessageProto.getDefaultInstance()), 751 new SaslNegotiateHandler(conf, username, password, saslProps, timeoutMs, saslPromise, 752 dfsClient)); 753 } catch (SaslException e) { 754 saslPromise.tryFailure(e); 755 } 756 } 757 758 static void trySaslNegotiate(Configuration conf, Channel channel, DatanodeInfo dnInfo, 759 int timeoutMs, DFSClient client, Token<BlockTokenIdentifier> accessToken, 760 Promise<Void> saslPromise) throws IOException { 761 SaslDataTransferClient saslClient = client.getSaslDataTransferClient(); 762 SaslPropertiesResolver saslPropsResolver = SASL_ADAPTOR.getSaslPropsResolver(saslClient); 763 TrustedChannelResolver trustedChannelResolver = 764 SASL_ADAPTOR.getTrustedChannelResolver(saslClient); 765 AtomicBoolean fallbackToSimpleAuth = SASL_ADAPTOR.getFallbackToSimpleAuth(saslClient); 766 InetAddress addr = ((InetSocketAddress) channel.remoteAddress()).getAddress(); 767 if (trustedChannelResolver.isTrusted() || trustedChannelResolver.isTrusted(addr)) { 768 saslPromise.trySuccess(null); 769 return; 770 } 771 DataEncryptionKey encryptionKey = client.newDataEncryptionKey(); 772 if (encryptionKey != null) { 773 if (LOG.isDebugEnabled()) { 774 LOG.debug( 775 "SASL client doing encrypted handshake for addr = " + addr + ", datanodeId = " + dnInfo); 776 } 777 doSaslNegotiation(conf, channel, timeoutMs, getUserNameFromEncryptionKey(encryptionKey), 778 encryptionKeyToPassword(encryptionKey.encryptionKey), 779 createSaslPropertiesForEncryption(encryptionKey.encryptionAlgorithm), saslPromise, 780 client); 781 } else if (!UserGroupInformation.isSecurityEnabled()) { 782 if (LOG.isDebugEnabled()) { 783 LOG.debug("SASL client skipping handshake in unsecured configuration for addr = " + addr 784 + ", datanodeId = " + dnInfo); 785 } 786 saslPromise.trySuccess(null); 787 } else if (dnInfo.getXferPort() < 1024) { 788 if (LOG.isDebugEnabled()) { 789 LOG.debug("SASL client skipping handshake in secured configuration with " 790 + "privileged port for addr = " + addr + ", datanodeId = " + dnInfo); 791 } 792 saslPromise.trySuccess(null); 793 } else if (fallbackToSimpleAuth != null && fallbackToSimpleAuth.get()) { 794 if (LOG.isDebugEnabled()) { 795 LOG.debug("SASL client skipping handshake in secured configuration with " 796 + "unsecured cluster for addr = " + addr + ", datanodeId = " + dnInfo); 797 } 798 saslPromise.trySuccess(null); 799 } else if (saslPropsResolver != null) { 800 if (LOG.isDebugEnabled()) { 801 LOG.debug( 802 "SASL client doing general handshake for addr = " + addr + ", datanodeId = " + dnInfo); 803 } 804 doSaslNegotiation(conf, channel, timeoutMs, buildUsername(accessToken), 805 buildClientPassword(accessToken), saslPropsResolver.getClientProperties(addr), saslPromise, 806 client); 807 } else { 808 // It's a secured cluster using non-privileged ports, but no SASL. The only way this can 809 // happen is if the DataNode has ignore.secure.ports.for.testing configured, so this is a rare 810 // edge case. 811 if (LOG.isDebugEnabled()) { 812 LOG.debug("SASL client skipping handshake in secured configuration with no SASL " 813 + "protection configured for addr = " + addr + ", datanodeId = " + dnInfo); 814 } 815 saslPromise.trySuccess(null); 816 } 817 } 818 819 static Encryptor createEncryptor(Configuration conf, HdfsFileStatus stat, DFSClient client) 820 throws IOException { 821 FileEncryptionInfo feInfo = stat.getFileEncryptionInfo(); 822 if (feInfo == null) { 823 return null; 824 } 825 return TRANSPARENT_CRYPTO_HELPER.createEncryptor(conf, feInfo, client); 826 } 827}