001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.asyncfs; 019 020import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY; 021import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE; 022 023import com.google.protobuf.CodedOutputStream; 024import java.io.IOException; 025import java.lang.reflect.Constructor; 026import java.lang.reflect.Field; 027import java.lang.reflect.InvocationTargetException; 028import java.lang.reflect.Method; 029import java.net.InetAddress; 030import java.net.InetSocketAddress; 031import java.nio.ByteBuffer; 032import java.security.GeneralSecurityException; 033import java.util.Arrays; 034import java.util.Base64; 035import java.util.Collections; 036import java.util.List; 037import java.util.Map; 038import java.util.Set; 039import java.util.concurrent.TimeUnit; 040import java.util.concurrent.atomic.AtomicBoolean; 041import javax.security.auth.callback.Callback; 042import javax.security.auth.callback.CallbackHandler; 043import javax.security.auth.callback.NameCallback; 044import javax.security.auth.callback.PasswordCallback; 045import javax.security.auth.callback.UnsupportedCallbackException; 046import javax.security.sasl.RealmCallback; 047import javax.security.sasl.RealmChoiceCallback; 048import javax.security.sasl.Sasl; 049import javax.security.sasl.SaslClient; 050import javax.security.sasl.SaslException; 051import org.apache.commons.lang3.StringUtils; 052import org.apache.hadoop.conf.Configuration; 053import org.apache.hadoop.crypto.CipherOption; 054import org.apache.hadoop.crypto.CipherSuite; 055import org.apache.hadoop.crypto.CryptoCodec; 056import org.apache.hadoop.crypto.Decryptor; 057import org.apache.hadoop.crypto.Encryptor; 058import org.apache.hadoop.crypto.key.KeyProvider; 059import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion; 060import org.apache.hadoop.fs.FileEncryptionInfo; 061import org.apache.hadoop.hdfs.DFSClient; 062import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 063import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; 064import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; 065import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver; 066import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient; 067import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto; 068import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus; 069import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; 070import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; 071import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; 072import org.apache.hadoop.security.SaslPropertiesResolver; 073import org.apache.hadoop.security.SaslRpcServer.QualityOfProtection; 074import org.apache.hadoop.security.UserGroupInformation; 075import org.apache.hadoop.security.token.Token; 076import org.apache.yetus.audience.InterfaceAudience; 077import org.slf4j.Logger; 078import org.slf4j.LoggerFactory; 079 080import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 081import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet; 082import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 083import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 084import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream; 085import org.apache.hbase.thirdparty.io.netty.buffer.CompositeByteBuf; 086import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled; 087import org.apache.hbase.thirdparty.io.netty.channel.Channel; 088import org.apache.hbase.thirdparty.io.netty.channel.ChannelDuplexHandler; 089import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; 090import org.apache.hbase.thirdparty.io.netty.channel.ChannelOutboundHandlerAdapter; 091import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline; 092import org.apache.hbase.thirdparty.io.netty.channel.ChannelPromise; 093import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler; 094import org.apache.hbase.thirdparty.io.netty.handler.codec.LengthFieldBasedFrameDecoder; 095import org.apache.hbase.thirdparty.io.netty.handler.codec.MessageToByteEncoder; 096import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; 097import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent; 098import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler; 099import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise; 100 101/** 102 * Helper class for adding sasl support for {@link FanOutOneBlockAsyncDFSOutput}. 103 */ 104@InterfaceAudience.Private 105public final class FanOutOneBlockAsyncDFSOutputSaslHelper { 106 private static final Logger LOG = 107 LoggerFactory.getLogger(FanOutOneBlockAsyncDFSOutputSaslHelper.class); 108 109 private FanOutOneBlockAsyncDFSOutputSaslHelper() { 110 } 111 112 private static final String SERVER_NAME = "0"; 113 private static final String PROTOCOL = "hdfs"; 114 private static final String MECHANISM = "DIGEST-MD5"; 115 private static final int SASL_TRANSFER_MAGIC_NUMBER = 0xDEADBEEF; 116 private static final String NAME_DELIMITER = " "; 117 118 private interface SaslAdaptor { 119 120 TrustedChannelResolver getTrustedChannelResolver(SaslDataTransferClient saslClient); 121 122 SaslPropertiesResolver getSaslPropsResolver(SaslDataTransferClient saslClient); 123 124 AtomicBoolean getFallbackToSimpleAuth(SaslDataTransferClient saslClient); 125 } 126 127 private static final SaslAdaptor SASL_ADAPTOR; 128 129 private interface TransparentCryptoHelper { 130 131 Encryptor createEncryptor(Configuration conf, FileEncryptionInfo feInfo, DFSClient client) 132 throws IOException; 133 } 134 135 private static final TransparentCryptoHelper TRANSPARENT_CRYPTO_HELPER; 136 137 private static SaslAdaptor createSaslAdaptor() 138 throws NoSuchFieldException, NoSuchMethodException { 139 Field saslPropsResolverField = 140 SaslDataTransferClient.class.getDeclaredField("saslPropsResolver"); 141 saslPropsResolverField.setAccessible(true); 142 Field trustedChannelResolverField = 143 SaslDataTransferClient.class.getDeclaredField("trustedChannelResolver"); 144 trustedChannelResolverField.setAccessible(true); 145 Field fallbackToSimpleAuthField = 146 SaslDataTransferClient.class.getDeclaredField("fallbackToSimpleAuth"); 147 fallbackToSimpleAuthField.setAccessible(true); 148 return new SaslAdaptor() { 149 150 @Override 151 public TrustedChannelResolver getTrustedChannelResolver(SaslDataTransferClient saslClient) { 152 try { 153 return (TrustedChannelResolver) trustedChannelResolverField.get(saslClient); 154 } catch (IllegalAccessException e) { 155 throw new RuntimeException(e); 156 } 157 } 158 159 @Override 160 public SaslPropertiesResolver getSaslPropsResolver(SaslDataTransferClient saslClient) { 161 try { 162 return (SaslPropertiesResolver) saslPropsResolverField.get(saslClient); 163 } catch (IllegalAccessException e) { 164 throw new RuntimeException(e); 165 } 166 } 167 168 @Override 169 public AtomicBoolean getFallbackToSimpleAuth(SaslDataTransferClient saslClient) { 170 try { 171 return (AtomicBoolean) fallbackToSimpleAuthField.get(saslClient); 172 } catch (IllegalAccessException e) { 173 throw new RuntimeException(e); 174 } 175 } 176 }; 177 } 178 179 private static TransparentCryptoHelper createTransparentCryptoHelperWithoutHDFS12396() 180 throws NoSuchMethodException { 181 Method decryptEncryptedDataEncryptionKeyMethod = DFSClient.class 182 .getDeclaredMethod("decryptEncryptedDataEncryptionKey", FileEncryptionInfo.class); 183 decryptEncryptedDataEncryptionKeyMethod.setAccessible(true); 184 return new TransparentCryptoHelper() { 185 186 @Override 187 public Encryptor createEncryptor(Configuration conf, FileEncryptionInfo feInfo, 188 DFSClient client) throws IOException { 189 try { 190 KeyVersion decryptedKey = 191 (KeyVersion) decryptEncryptedDataEncryptionKeyMethod.invoke(client, feInfo); 192 CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf, feInfo.getCipherSuite()); 193 Encryptor encryptor = cryptoCodec.createEncryptor(); 194 encryptor.init(decryptedKey.getMaterial(), feInfo.getIV()); 195 return encryptor; 196 } catch (InvocationTargetException e) { 197 Throwables.propagateIfPossible(e.getTargetException(), IOException.class); 198 throw new RuntimeException(e.getTargetException()); 199 } catch (GeneralSecurityException e) { 200 throw new IOException(e); 201 } catch (IllegalAccessException e) { 202 throw new RuntimeException(e); 203 } 204 } 205 }; 206 } 207 208 private static TransparentCryptoHelper createTransparentCryptoHelperWithHDFS12396() 209 throws ClassNotFoundException, NoSuchMethodException { 210 Class<?> hdfsKMSUtilCls = Class.forName("org.apache.hadoop.hdfs.HdfsKMSUtil"); 211 Method decryptEncryptedDataEncryptionKeyMethod = hdfsKMSUtilCls.getDeclaredMethod( 212 "decryptEncryptedDataEncryptionKey", FileEncryptionInfo.class, KeyProvider.class); 213 decryptEncryptedDataEncryptionKeyMethod.setAccessible(true); 214 return new TransparentCryptoHelper() { 215 216 @Override 217 public Encryptor createEncryptor(Configuration conf, FileEncryptionInfo feInfo, 218 DFSClient client) throws IOException { 219 try { 220 KeyVersion decryptedKey = (KeyVersion) decryptEncryptedDataEncryptionKeyMethod 221 .invoke(null, feInfo, client.getKeyProvider()); 222 CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf, feInfo.getCipherSuite()); 223 Encryptor encryptor = cryptoCodec.createEncryptor(); 224 encryptor.init(decryptedKey.getMaterial(), feInfo.getIV()); 225 return encryptor; 226 } catch (InvocationTargetException e) { 227 Throwables.propagateIfPossible(e.getTargetException(), IOException.class); 228 throw new RuntimeException(e.getTargetException()); 229 } catch (GeneralSecurityException e) { 230 throw new IOException(e); 231 } catch (IllegalAccessException e) { 232 throw new RuntimeException(e); 233 } 234 } 235 }; 236 } 237 238 private static TransparentCryptoHelper createTransparentCryptoHelper() 239 throws NoSuchMethodException, ClassNotFoundException { 240 try { 241 return createTransparentCryptoHelperWithoutHDFS12396(); 242 } catch (NoSuchMethodException e) { 243 LOG.debug("No decryptEncryptedDataEncryptionKey method in DFSClient," 244 + " should be hadoop version with HDFS-12396", e); 245 } 246 return createTransparentCryptoHelperWithHDFS12396(); 247 } 248 249 static { 250 try { 251 SASL_ADAPTOR = createSaslAdaptor(); 252 TRANSPARENT_CRYPTO_HELPER = createTransparentCryptoHelper(); 253 } catch (Exception e) { 254 String msg = "Couldn't properly initialize access to HDFS internals. Please " 255 + "update your WAL Provider to not make use of the 'asyncfs' provider. See " 256 + "HBASE-16110 for more information."; 257 LOG.error(msg, e); 258 throw new Error(msg, e); 259 } 260 } 261 262 /** 263 * Sets user name and password when asked by the client-side SASL object. 264 */ 265 private static final class SaslClientCallbackHandler implements CallbackHandler { 266 267 private final char[] password; 268 private final String userName; 269 270 /** 271 * Creates a new SaslClientCallbackHandler. 272 * @param userName SASL user name 273 * @param password SASL password 274 */ 275 public SaslClientCallbackHandler(String userName, char[] password) { 276 this.password = password; 277 this.userName = userName; 278 } 279 280 @Override 281 public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException { 282 NameCallback nc = null; 283 PasswordCallback pc = null; 284 RealmCallback rc = null; 285 for (Callback callback : callbacks) { 286 if (callback instanceof RealmChoiceCallback) { 287 continue; 288 } else if (callback instanceof NameCallback) { 289 nc = (NameCallback) callback; 290 } else if (callback instanceof PasswordCallback) { 291 pc = (PasswordCallback) callback; 292 } else if (callback instanceof RealmCallback) { 293 rc = (RealmCallback) callback; 294 } else { 295 throw new UnsupportedCallbackException(callback, "Unrecognized SASL client callback"); 296 } 297 } 298 if (nc != null) { 299 nc.setName(userName); 300 } 301 if (pc != null) { 302 pc.setPassword(password); 303 } 304 if (rc != null) { 305 rc.setText(rc.getDefaultText()); 306 } 307 } 308 } 309 310 private static final class SaslNegotiateHandler extends ChannelDuplexHandler { 311 312 private final Configuration conf; 313 314 private final Map<String, String> saslProps; 315 316 private final SaslClient saslClient; 317 318 private final int timeoutMs; 319 320 private final Promise<Void> promise; 321 322 private final DFSClient dfsClient; 323 324 private int step = 0; 325 326 public SaslNegotiateHandler(Configuration conf, String username, char[] password, 327 Map<String, String> saslProps, int timeoutMs, Promise<Void> promise, DFSClient dfsClient) 328 throws SaslException { 329 this.conf = conf; 330 this.saslProps = saslProps; 331 this.saslClient = Sasl.createSaslClient(new String[] { MECHANISM }, username, PROTOCOL, 332 SERVER_NAME, saslProps, new SaslClientCallbackHandler(username, password)); 333 this.timeoutMs = timeoutMs; 334 this.promise = promise; 335 this.dfsClient = dfsClient; 336 } 337 338 private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload) throws IOException { 339 sendSaslMessage(ctx, payload, null); 340 } 341 342 private List<CipherOption> getCipherOptions() throws IOException { 343 // Negotiate cipher suites if configured. Currently, the only supported 344 // cipher suite is AES/CTR/NoPadding, but the protocol allows multiple 345 // values for future expansion. 346 String cipherSuites = conf.get(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY); 347 if (StringUtils.isBlank(cipherSuites)) { 348 return null; 349 } 350 if (!cipherSuites.equals(CipherSuite.AES_CTR_NOPADDING.getName())) { 351 throw new IOException(String.format("Invalid cipher suite, %s=%s", 352 DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuites)); 353 } 354 return Collections.singletonList(new CipherOption(CipherSuite.AES_CTR_NOPADDING)); 355 } 356 357 /** 358 * The asyncfs subsystem emulates a HDFS client by sending protobuf messages via netty. After 359 * Hadoop 3.3.0, the protobuf classes are relocated to org.apache.hadoop.thirdparty.protobuf.*. 360 * Use Reflection to check which ones to use. 361 */ 362 private static class BuilderPayloadSetter { 363 private static Method setPayloadMethod; 364 private static Constructor<?> constructor; 365 366 /** 367 * Create a ByteString from byte array without copying (wrap), and then set it as the payload 368 * for the builder. 369 * @param builder builder for HDFS DataTransferEncryptorMessage. 370 * @param payload byte array of payload. n 371 */ 372 static void wrapAndSetPayload(DataTransferEncryptorMessageProto.Builder builder, 373 byte[] payload) throws IOException { 374 Object byteStringObject; 375 try { 376 // byteStringObject = new LiteralByteString(payload); 377 byteStringObject = constructor.newInstance(payload); 378 // builder.setPayload(byteStringObject); 379 setPayloadMethod.invoke(builder, constructor.getDeclaringClass().cast(byteStringObject)); 380 } catch (IllegalAccessException | InstantiationException e) { 381 throw new RuntimeException(e); 382 383 } catch (InvocationTargetException e) { 384 Throwables.propagateIfPossible(e.getTargetException(), IOException.class); 385 throw new RuntimeException(e.getTargetException()); 386 } 387 } 388 389 static { 390 Class<?> builderClass = DataTransferEncryptorMessageProto.Builder.class; 391 392 // Try the unrelocated ByteString 393 Class<?> byteStringClass = com.google.protobuf.ByteString.class; 394 try { 395 // See if it can load the relocated ByteString, which comes from hadoop-thirdparty. 396 byteStringClass = Class.forName("org.apache.hadoop.thirdparty.protobuf.ByteString"); 397 LOG.debug("Found relocated ByteString class from hadoop-thirdparty." 398 + " Assuming this is Hadoop 3.3.0+."); 399 } catch (ClassNotFoundException e) { 400 LOG.debug("Did not find relocated ByteString class from hadoop-thirdparty." 401 + " Assuming this is below Hadoop 3.3.0", e); 402 } 403 404 // LiteralByteString is a package private class in protobuf. Make it accessible. 405 Class<?> literalByteStringClass; 406 try { 407 literalByteStringClass = 408 Class.forName("org.apache.hadoop.thirdparty.protobuf.ByteString$LiteralByteString"); 409 LOG.debug("Shaded LiteralByteString from hadoop-thirdparty is found."); 410 } catch (ClassNotFoundException e) { 411 try { 412 literalByteStringClass = Class.forName("com.google.protobuf.LiteralByteString"); 413 LOG.debug("com.google.protobuf.LiteralByteString found."); 414 } catch (ClassNotFoundException ex) { 415 throw new RuntimeException(ex); 416 } 417 } 418 419 try { 420 constructor = literalByteStringClass.getDeclaredConstructor(byte[].class); 421 constructor.setAccessible(true); 422 } catch (NoSuchMethodException e) { 423 throw new RuntimeException(e); 424 } 425 426 try { 427 setPayloadMethod = builderClass.getMethod("setPayload", byteStringClass); 428 } catch (NoSuchMethodException e) { 429 // if either method is not found, we are in big trouble. Abort. 430 throw new RuntimeException(e); 431 } 432 } 433 } 434 435 private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload, 436 List<CipherOption> options) throws IOException { 437 DataTransferEncryptorMessageProto.Builder builder = 438 DataTransferEncryptorMessageProto.newBuilder(); 439 builder.setStatus(DataTransferEncryptorStatus.SUCCESS); 440 if (payload != null) { 441 BuilderPayloadSetter.wrapAndSetPayload(builder, payload); 442 } 443 if (options != null) { 444 builder.addAllCipherOption(PBHelperClient.convertCipherOptions(options)); 445 } 446 DataTransferEncryptorMessageProto proto = builder.build(); 447 int size = proto.getSerializedSize(); 448 size += CodedOutputStream.computeRawVarint32Size(size); 449 ByteBuf buf = ctx.alloc().buffer(size); 450 proto.writeDelimitedTo(new ByteBufOutputStream(buf)); 451 ctx.write(buf); 452 } 453 454 @Override 455 public void handlerAdded(ChannelHandlerContext ctx) throws Exception { 456 ctx.write(ctx.alloc().buffer(4).writeInt(SASL_TRANSFER_MAGIC_NUMBER)); 457 sendSaslMessage(ctx, new byte[0]); 458 ctx.flush(); 459 step++; 460 } 461 462 @Override 463 public void channelInactive(ChannelHandlerContext ctx) throws Exception { 464 saslClient.dispose(); 465 } 466 467 private void check(DataTransferEncryptorMessageProto proto) throws IOException { 468 if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) { 469 dfsClient.clearDataEncryptionKey(); 470 throw new InvalidEncryptionKeyException(proto.getMessage()); 471 } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) { 472 throw new IOException(proto.getMessage()); 473 } 474 } 475 476 private String getNegotiatedQop() { 477 return (String) saslClient.getNegotiatedProperty(Sasl.QOP); 478 } 479 480 private boolean isNegotiatedQopPrivacy() { 481 String qop = getNegotiatedQop(); 482 return qop != null && "auth-conf".equalsIgnoreCase(qop); 483 } 484 485 private boolean requestedQopContainsPrivacy() { 486 Set<String> requestedQop = 487 ImmutableSet.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(","))); 488 return requestedQop.contains("auth-conf"); 489 } 490 491 private void checkSaslComplete() throws IOException { 492 if (!saslClient.isComplete()) { 493 throw new IOException("Failed to complete SASL handshake"); 494 } 495 Set<String> requestedQop = 496 ImmutableSet.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(","))); 497 String negotiatedQop = getNegotiatedQop(); 498 LOG.debug( 499 "Verifying QOP, requested QOP = " + requestedQop + ", negotiated QOP = " + negotiatedQop); 500 if (!requestedQop.contains(negotiatedQop)) { 501 throw new IOException(String.format("SASL handshake completed, but " 502 + "channel does not have acceptable quality of protection, " 503 + "requested = %s, negotiated = %s", requestedQop, negotiatedQop)); 504 } 505 } 506 507 private boolean useWrap() { 508 String qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP); 509 return qop != null && !"auth".equalsIgnoreCase(qop); 510 } 511 512 private CipherOption unwrap(CipherOption option, SaslClient saslClient) throws IOException { 513 byte[] inKey = option.getInKey(); 514 if (inKey != null) { 515 inKey = saslClient.unwrap(inKey, 0, inKey.length); 516 } 517 byte[] outKey = option.getOutKey(); 518 if (outKey != null) { 519 outKey = saslClient.unwrap(outKey, 0, outKey.length); 520 } 521 return new CipherOption(option.getCipherSuite(), inKey, option.getInIv(), outKey, 522 option.getOutIv()); 523 } 524 525 private CipherOption getCipherOption(DataTransferEncryptorMessageProto proto, 526 boolean isNegotiatedQopPrivacy, SaslClient saslClient) throws IOException { 527 List<CipherOption> cipherOptions = 528 PBHelperClient.convertCipherOptionProtos(proto.getCipherOptionList()); 529 if (cipherOptions == null || cipherOptions.isEmpty()) { 530 return null; 531 } 532 CipherOption cipherOption = cipherOptions.get(0); 533 return isNegotiatedQopPrivacy ? unwrap(cipherOption, saslClient) : cipherOption; 534 } 535 536 @Override 537 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 538 if (msg instanceof DataTransferEncryptorMessageProto) { 539 DataTransferEncryptorMessageProto proto = (DataTransferEncryptorMessageProto) msg; 540 check(proto); 541 byte[] challenge = proto.getPayload().toByteArray(); 542 byte[] response = saslClient.evaluateChallenge(challenge); 543 switch (step) { 544 case 1: { 545 List<CipherOption> cipherOptions = null; 546 if (requestedQopContainsPrivacy()) { 547 cipherOptions = getCipherOptions(); 548 } 549 sendSaslMessage(ctx, response, cipherOptions); 550 ctx.flush(); 551 step++; 552 break; 553 } 554 case 2: { 555 assert response == null; 556 checkSaslComplete(); 557 CipherOption cipherOption = 558 getCipherOption(proto, isNegotiatedQopPrivacy(), saslClient); 559 ChannelPipeline p = ctx.pipeline(); 560 while (p.first() != null) { 561 p.removeFirst(); 562 } 563 if (cipherOption != null) { 564 CryptoCodec codec = CryptoCodec.getInstance(conf, cipherOption.getCipherSuite()); 565 p.addLast(new EncryptHandler(codec, cipherOption.getInKey(), cipherOption.getInIv()), 566 new DecryptHandler(codec, cipherOption.getOutKey(), cipherOption.getOutIv())); 567 } else { 568 if (useWrap()) { 569 p.addLast(new SaslWrapHandler(saslClient), 570 new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4), 571 new SaslUnwrapHandler(saslClient)); 572 } 573 } 574 promise.trySuccess(null); 575 break; 576 } 577 default: 578 throw new IllegalArgumentException("Unrecognized negotiation step: " + step); 579 } 580 } else { 581 ctx.fireChannelRead(msg); 582 } 583 } 584 585 @Override 586 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 587 promise.tryFailure(cause); 588 } 589 590 @Override 591 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { 592 if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == READER_IDLE) { 593 promise.tryFailure(new IOException("Timeout(" + timeoutMs + "ms) waiting for response")); 594 } else { 595 super.userEventTriggered(ctx, evt); 596 } 597 } 598 } 599 600 private static final class SaslUnwrapHandler extends SimpleChannelInboundHandler<ByteBuf> { 601 602 private final SaslClient saslClient; 603 604 public SaslUnwrapHandler(SaslClient saslClient) { 605 this.saslClient = saslClient; 606 } 607 608 @Override 609 public void channelInactive(ChannelHandlerContext ctx) throws Exception { 610 saslClient.dispose(); 611 } 612 613 @Override 614 protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { 615 msg.skipBytes(4); 616 byte[] b = new byte[msg.readableBytes()]; 617 msg.readBytes(b); 618 ctx.fireChannelRead(Unpooled.wrappedBuffer(saslClient.unwrap(b, 0, b.length))); 619 } 620 } 621 622 private static final class SaslWrapHandler extends ChannelOutboundHandlerAdapter { 623 624 private final SaslClient saslClient; 625 626 private CompositeByteBuf cBuf; 627 628 public SaslWrapHandler(SaslClient saslClient) { 629 this.saslClient = saslClient; 630 } 631 632 @Override 633 public void handlerAdded(ChannelHandlerContext ctx) throws Exception { 634 cBuf = new CompositeByteBuf(ctx.alloc(), false, Integer.MAX_VALUE); 635 } 636 637 @Override 638 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) 639 throws Exception { 640 if (msg instanceof ByteBuf) { 641 ByteBuf buf = (ByteBuf) msg; 642 cBuf.addComponent(buf); 643 cBuf.writerIndex(cBuf.writerIndex() + buf.readableBytes()); 644 } else { 645 ctx.write(msg); 646 } 647 } 648 649 @Override 650 public void flush(ChannelHandlerContext ctx) throws Exception { 651 if (cBuf.isReadable()) { 652 byte[] b = new byte[cBuf.readableBytes()]; 653 cBuf.readBytes(b); 654 cBuf.discardReadComponents(); 655 byte[] wrapped = saslClient.wrap(b, 0, b.length); 656 ByteBuf buf = ctx.alloc().ioBuffer(4 + wrapped.length); 657 buf.writeInt(wrapped.length); 658 buf.writeBytes(wrapped); 659 ctx.write(buf); 660 } 661 ctx.flush(); 662 } 663 664 @Override 665 public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { 666 cBuf.release(); 667 cBuf = null; 668 } 669 } 670 671 private static final class DecryptHandler extends SimpleChannelInboundHandler<ByteBuf> { 672 673 private final Decryptor decryptor; 674 675 public DecryptHandler(CryptoCodec codec, byte[] key, byte[] iv) 676 throws GeneralSecurityException, IOException { 677 this.decryptor = codec.createDecryptor(); 678 this.decryptor.init(key, Arrays.copyOf(iv, iv.length)); 679 } 680 681 @Override 682 protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { 683 ByteBuf inBuf; 684 boolean release = false; 685 if (msg.nioBufferCount() == 1) { 686 inBuf = msg; 687 } else { 688 inBuf = ctx.alloc().directBuffer(msg.readableBytes()); 689 msg.readBytes(inBuf); 690 release = true; 691 } 692 ByteBuffer inBuffer = inBuf.nioBuffer(); 693 ByteBuf outBuf = ctx.alloc().directBuffer(inBuf.readableBytes()); 694 ByteBuffer outBuffer = outBuf.nioBuffer(0, inBuf.readableBytes()); 695 decryptor.decrypt(inBuffer, outBuffer); 696 outBuf.writerIndex(inBuf.readableBytes()); 697 if (release) { 698 inBuf.release(); 699 } 700 ctx.fireChannelRead(outBuf); 701 } 702 } 703 704 private static final class EncryptHandler extends MessageToByteEncoder<ByteBuf> { 705 706 private final Encryptor encryptor; 707 708 public EncryptHandler(CryptoCodec codec, byte[] key, byte[] iv) 709 throws GeneralSecurityException, IOException { 710 this.encryptor = codec.createEncryptor(); 711 this.encryptor.init(key, Arrays.copyOf(iv, iv.length)); 712 } 713 714 @Override 715 protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, ByteBuf msg, boolean preferDirect) 716 throws Exception { 717 if (preferDirect) { 718 return ctx.alloc().directBuffer(msg.readableBytes()); 719 } else { 720 return ctx.alloc().buffer(msg.readableBytes()); 721 } 722 } 723 724 @Override 725 protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception { 726 ByteBuf inBuf; 727 boolean release = false; 728 if (msg.nioBufferCount() == 1) { 729 inBuf = msg; 730 } else { 731 inBuf = ctx.alloc().directBuffer(msg.readableBytes()); 732 msg.readBytes(inBuf); 733 release = true; 734 } 735 ByteBuffer inBuffer = inBuf.nioBuffer(); 736 ByteBuffer outBuffer = out.nioBuffer(0, inBuf.readableBytes()); 737 encryptor.encrypt(inBuffer, outBuffer); 738 out.writerIndex(inBuf.readableBytes()); 739 if (release) { 740 inBuf.release(); 741 } 742 } 743 } 744 745 private static String getUserNameFromEncryptionKey(DataEncryptionKey encryptionKey) { 746 return encryptionKey.keyId + NAME_DELIMITER + encryptionKey.blockPoolId + NAME_DELIMITER 747 + Base64.getEncoder().encodeToString(encryptionKey.nonce); 748 } 749 750 private static char[] encryptionKeyToPassword(byte[] encryptionKey) { 751 return Base64.getEncoder().encodeToString(encryptionKey).toCharArray(); 752 } 753 754 private static String buildUsername(Token<BlockTokenIdentifier> blockToken) { 755 return Base64.getEncoder().encodeToString(blockToken.getIdentifier()); 756 } 757 758 private static char[] buildClientPassword(Token<BlockTokenIdentifier> blockToken) { 759 return Base64.getEncoder().encodeToString(blockToken.getPassword()).toCharArray(); 760 } 761 762 private static Map<String, String> createSaslPropertiesForEncryption(String encryptionAlgorithm) { 763 Map<String, String> saslProps = Maps.newHashMapWithExpectedSize(3); 764 saslProps.put(Sasl.QOP, QualityOfProtection.PRIVACY.getSaslQop()); 765 saslProps.put(Sasl.SERVER_AUTH, "true"); 766 saslProps.put("com.sun.security.sasl.digest.cipher", encryptionAlgorithm); 767 return saslProps; 768 } 769 770 private static void doSaslNegotiation(Configuration conf, Channel channel, int timeoutMs, 771 String username, char[] password, Map<String, String> saslProps, Promise<Void> saslPromise, 772 DFSClient dfsClient) { 773 try { 774 channel.pipeline().addLast(new IdleStateHandler(timeoutMs, 0, 0, TimeUnit.MILLISECONDS), 775 new ProtobufVarint32FrameDecoder(), 776 new ProtobufDecoder(DataTransferEncryptorMessageProto.getDefaultInstance()), 777 new SaslNegotiateHandler(conf, username, password, saslProps, timeoutMs, saslPromise, 778 dfsClient)); 779 } catch (SaslException e) { 780 saslPromise.tryFailure(e); 781 } 782 } 783 784 static void trySaslNegotiate(Configuration conf, Channel channel, DatanodeInfo dnInfo, 785 int timeoutMs, DFSClient client, Token<BlockTokenIdentifier> accessToken, 786 Promise<Void> saslPromise) throws IOException { 787 SaslDataTransferClient saslClient = client.getSaslDataTransferClient(); 788 SaslPropertiesResolver saslPropsResolver = SASL_ADAPTOR.getSaslPropsResolver(saslClient); 789 TrustedChannelResolver trustedChannelResolver = 790 SASL_ADAPTOR.getTrustedChannelResolver(saslClient); 791 AtomicBoolean fallbackToSimpleAuth = SASL_ADAPTOR.getFallbackToSimpleAuth(saslClient); 792 InetAddress addr = ((InetSocketAddress) channel.remoteAddress()).getAddress(); 793 if (trustedChannelResolver.isTrusted() || trustedChannelResolver.isTrusted(addr)) { 794 saslPromise.trySuccess(null); 795 return; 796 } 797 DataEncryptionKey encryptionKey = client.newDataEncryptionKey(); 798 if (encryptionKey != null) { 799 if (LOG.isDebugEnabled()) { 800 LOG.debug( 801 "SASL client doing encrypted handshake for addr = " + addr + ", datanodeId = " + dnInfo); 802 } 803 doSaslNegotiation(conf, channel, timeoutMs, getUserNameFromEncryptionKey(encryptionKey), 804 encryptionKeyToPassword(encryptionKey.encryptionKey), 805 createSaslPropertiesForEncryption(encryptionKey.encryptionAlgorithm), saslPromise, client); 806 } else if (!UserGroupInformation.isSecurityEnabled()) { 807 if (LOG.isDebugEnabled()) { 808 LOG.debug("SASL client skipping handshake in unsecured configuration for addr = " + addr 809 + ", datanodeId = " + dnInfo); 810 } 811 saslPromise.trySuccess(null); 812 } else if (dnInfo.getXferPort() < 1024) { 813 if (LOG.isDebugEnabled()) { 814 LOG.debug("SASL client skipping handshake in secured configuration with " 815 + "privileged port for addr = " + addr + ", datanodeId = " + dnInfo); 816 } 817 saslPromise.trySuccess(null); 818 } else if (fallbackToSimpleAuth != null && fallbackToSimpleAuth.get()) { 819 if (LOG.isDebugEnabled()) { 820 LOG.debug("SASL client skipping handshake in secured configuration with " 821 + "unsecured cluster for addr = " + addr + ", datanodeId = " + dnInfo); 822 } 823 saslPromise.trySuccess(null); 824 } else if (saslPropsResolver != null) { 825 if (LOG.isDebugEnabled()) { 826 LOG.debug( 827 "SASL client doing general handshake for addr = " + addr + ", datanodeId = " + dnInfo); 828 } 829 doSaslNegotiation(conf, channel, timeoutMs, buildUsername(accessToken), 830 buildClientPassword(accessToken), saslPropsResolver.getClientProperties(addr), saslPromise, 831 client); 832 } else { 833 // It's a secured cluster using non-privileged ports, but no SASL. The only way this can 834 // happen is if the DataNode has ignore.secure.ports.for.testing configured, so this is a rare 835 // edge case. 836 if (LOG.isDebugEnabled()) { 837 LOG.debug("SASL client skipping handshake in secured configuration with no SASL " 838 + "protection configured for addr = " + addr + ", datanodeId = " + dnInfo); 839 } 840 saslPromise.trySuccess(null); 841 } 842 } 843 844 static Encryptor createEncryptor(Configuration conf, HdfsFileStatus stat, DFSClient client) 845 throws IOException { 846 FileEncryptionInfo feInfo = stat.getFileEncryptionInfo(); 847 if (feInfo == null) { 848 return null; 849 } 850 return TRANSPARENT_CRYPTO_HELPER.createEncryptor(conf, feInfo, client); 851 } 852}