001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.asyncfs; 019 020import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY; 021import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE; 022 023import com.google.protobuf.CodedOutputStream; 024import java.io.IOException; 025import java.lang.reflect.Constructor; 026import java.lang.reflect.Field; 027import java.lang.reflect.InvocationTargetException; 028import java.lang.reflect.Method; 029import java.net.InetAddress; 030import java.net.InetSocketAddress; 031import java.nio.ByteBuffer; 032import java.security.GeneralSecurityException; 033import java.util.Arrays; 034import java.util.Base64; 035import java.util.Collections; 036import java.util.List; 037import java.util.Map; 038import java.util.Set; 039import java.util.concurrent.TimeUnit; 040import java.util.concurrent.atomic.AtomicBoolean; 041import javax.security.auth.callback.Callback; 042import javax.security.auth.callback.CallbackHandler; 043import javax.security.auth.callback.NameCallback; 044import javax.security.auth.callback.PasswordCallback; 045import javax.security.auth.callback.UnsupportedCallbackException; 046import javax.security.sasl.RealmCallback; 047import javax.security.sasl.RealmChoiceCallback; 048import javax.security.sasl.Sasl; 049import javax.security.sasl.SaslClient; 050import javax.security.sasl.SaslException; 051import org.apache.commons.lang3.StringUtils; 052import org.apache.hadoop.conf.Configuration; 053import org.apache.hadoop.crypto.CipherOption; 054import org.apache.hadoop.crypto.CipherSuite; 055import org.apache.hadoop.crypto.CryptoCodec; 056import org.apache.hadoop.crypto.Decryptor; 057import org.apache.hadoop.crypto.Encryptor; 058import org.apache.hadoop.crypto.key.KeyProvider; 059import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion; 060import org.apache.hadoop.fs.FileEncryptionInfo; 061import org.apache.hadoop.hdfs.DFSClient; 062import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 063import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; 064import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; 065import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver; 066import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient; 067import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto; 068import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus; 069import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; 070import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; 071import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; 072import org.apache.hadoop.security.SaslPropertiesResolver; 073import org.apache.hadoop.security.SaslRpcServer.QualityOfProtection; 074import org.apache.hadoop.security.UserGroupInformation; 075import org.apache.hadoop.security.token.Token; 076import org.apache.yetus.audience.InterfaceAudience; 077import org.slf4j.Logger; 078import org.slf4j.LoggerFactory; 079 080import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 081import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet; 082import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 083import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 084import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream; 085import org.apache.hbase.thirdparty.io.netty.buffer.CompositeByteBuf; 086import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled; 087import org.apache.hbase.thirdparty.io.netty.channel.Channel; 088import org.apache.hbase.thirdparty.io.netty.channel.ChannelDuplexHandler; 089import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; 090import org.apache.hbase.thirdparty.io.netty.channel.ChannelOutboundHandlerAdapter; 091import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline; 092import org.apache.hbase.thirdparty.io.netty.channel.ChannelPromise; 093import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler; 094import org.apache.hbase.thirdparty.io.netty.handler.codec.LengthFieldBasedFrameDecoder; 095import org.apache.hbase.thirdparty.io.netty.handler.codec.MessageToByteEncoder; 096import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; 097import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent; 098import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler; 099import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise; 100 101/** 102 * Helper class for adding sasl support for {@link FanOutOneBlockAsyncDFSOutput}. 103 */ 104@InterfaceAudience.Private 105public final class FanOutOneBlockAsyncDFSOutputSaslHelper { 106 private static final Logger LOG = 107 LoggerFactory.getLogger(FanOutOneBlockAsyncDFSOutputSaslHelper.class); 108 109 private FanOutOneBlockAsyncDFSOutputSaslHelper() { 110 } 111 112 private static final String SERVER_NAME = "0"; 113 private static final String PROTOCOL = "hdfs"; 114 private static final String MECHANISM = "DIGEST-MD5"; 115 private static final int SASL_TRANSFER_MAGIC_NUMBER = 0xDEADBEEF; 116 private static final String NAME_DELIMITER = " "; 117 118 private interface SaslAdaptor { 119 120 TrustedChannelResolver getTrustedChannelResolver(SaslDataTransferClient saslClient); 121 122 SaslPropertiesResolver getSaslPropsResolver(SaslDataTransferClient saslClient); 123 124 AtomicBoolean getFallbackToSimpleAuth(SaslDataTransferClient saslClient); 125 } 126 127 private static final SaslAdaptor SASL_ADAPTOR; 128 129 private interface TransparentCryptoHelper { 130 131 Encryptor createEncryptor(Configuration conf, FileEncryptionInfo feInfo, DFSClient client) 132 throws IOException; 133 } 134 135 private static final TransparentCryptoHelper TRANSPARENT_CRYPTO_HELPER; 136 137 private static SaslAdaptor createSaslAdaptor() 138 throws NoSuchFieldException, NoSuchMethodException { 139 Field saslPropsResolverField = 140 SaslDataTransferClient.class.getDeclaredField("saslPropsResolver"); 141 saslPropsResolverField.setAccessible(true); 142 Field trustedChannelResolverField = 143 SaslDataTransferClient.class.getDeclaredField("trustedChannelResolver"); 144 trustedChannelResolverField.setAccessible(true); 145 Field fallbackToSimpleAuthField = 146 SaslDataTransferClient.class.getDeclaredField("fallbackToSimpleAuth"); 147 fallbackToSimpleAuthField.setAccessible(true); 148 return new SaslAdaptor() { 149 150 @Override 151 public TrustedChannelResolver getTrustedChannelResolver(SaslDataTransferClient saslClient) { 152 try { 153 return (TrustedChannelResolver) trustedChannelResolverField.get(saslClient); 154 } catch (IllegalAccessException e) { 155 throw new RuntimeException(e); 156 } 157 } 158 159 @Override 160 public SaslPropertiesResolver getSaslPropsResolver(SaslDataTransferClient saslClient) { 161 try { 162 return (SaslPropertiesResolver) saslPropsResolverField.get(saslClient); 163 } catch (IllegalAccessException e) { 164 throw new RuntimeException(e); 165 } 166 } 167 168 @Override 169 public AtomicBoolean getFallbackToSimpleAuth(SaslDataTransferClient saslClient) { 170 try { 171 return (AtomicBoolean) fallbackToSimpleAuthField.get(saslClient); 172 } catch (IllegalAccessException e) { 173 throw new RuntimeException(e); 174 } 175 } 176 }; 177 } 178 179 private static TransparentCryptoHelper createTransparentCryptoHelperWithoutHDFS12396() 180 throws NoSuchMethodException { 181 Method decryptEncryptedDataEncryptionKeyMethod = DFSClient.class 182 .getDeclaredMethod("decryptEncryptedDataEncryptionKey", FileEncryptionInfo.class); 183 decryptEncryptedDataEncryptionKeyMethod.setAccessible(true); 184 return new TransparentCryptoHelper() { 185 186 @Override 187 public Encryptor createEncryptor(Configuration conf, FileEncryptionInfo feInfo, 188 DFSClient client) throws IOException { 189 try { 190 KeyVersion decryptedKey = 191 (KeyVersion) decryptEncryptedDataEncryptionKeyMethod.invoke(client, feInfo); 192 CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf, feInfo.getCipherSuite()); 193 Encryptor encryptor = cryptoCodec.createEncryptor(); 194 encryptor.init(decryptedKey.getMaterial(), feInfo.getIV()); 195 return encryptor; 196 } catch (InvocationTargetException e) { 197 Throwables.propagateIfPossible(e.getTargetException(), IOException.class); 198 throw new RuntimeException(e.getTargetException()); 199 } catch (GeneralSecurityException e) { 200 throw new IOException(e); 201 } catch (IllegalAccessException e) { 202 throw new RuntimeException(e); 203 } 204 } 205 }; 206 } 207 208 private static TransparentCryptoHelper createTransparentCryptoHelperWithHDFS12396() 209 throws ClassNotFoundException, NoSuchMethodException { 210 Class<?> hdfsKMSUtilCls = Class.forName("org.apache.hadoop.hdfs.HdfsKMSUtil"); 211 Method decryptEncryptedDataEncryptionKeyMethod = hdfsKMSUtilCls.getDeclaredMethod( 212 "decryptEncryptedDataEncryptionKey", FileEncryptionInfo.class, KeyProvider.class); 213 decryptEncryptedDataEncryptionKeyMethod.setAccessible(true); 214 return new TransparentCryptoHelper() { 215 216 @Override 217 public Encryptor createEncryptor(Configuration conf, FileEncryptionInfo feInfo, 218 DFSClient client) throws IOException { 219 try { 220 KeyVersion decryptedKey = (KeyVersion) decryptEncryptedDataEncryptionKeyMethod 221 .invoke(null, feInfo, client.getKeyProvider()); 222 CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf, feInfo.getCipherSuite()); 223 Encryptor encryptor = cryptoCodec.createEncryptor(); 224 encryptor.init(decryptedKey.getMaterial(), feInfo.getIV()); 225 return encryptor; 226 } catch (InvocationTargetException e) { 227 Throwables.propagateIfPossible(e.getTargetException(), IOException.class); 228 throw new RuntimeException(e.getTargetException()); 229 } catch (GeneralSecurityException e) { 230 throw new IOException(e); 231 } catch (IllegalAccessException e) { 232 throw new RuntimeException(e); 233 } 234 } 235 }; 236 } 237 238 private static TransparentCryptoHelper createTransparentCryptoHelper() 239 throws NoSuchMethodException, ClassNotFoundException { 240 try { 241 return createTransparentCryptoHelperWithoutHDFS12396(); 242 } catch (NoSuchMethodException e) { 243 LOG.debug("No decryptEncryptedDataEncryptionKey method in DFSClient," + 244 " should be hadoop version with HDFS-12396", e); 245 } 246 return createTransparentCryptoHelperWithHDFS12396(); 247 } 248 249 static { 250 try { 251 SASL_ADAPTOR = createSaslAdaptor(); 252 TRANSPARENT_CRYPTO_HELPER = createTransparentCryptoHelper(); 253 } catch (Exception e) { 254 String msg = "Couldn't properly initialize access to HDFS internals. Please " 255 + "update your WAL Provider to not make use of the 'asyncfs' provider. See " 256 + "HBASE-16110 for more information."; 257 LOG.error(msg, e); 258 throw new Error(msg, e); 259 } 260 } 261 262 /** 263 * Sets user name and password when asked by the client-side SASL object. 264 */ 265 private static final class SaslClientCallbackHandler implements CallbackHandler { 266 267 private final char[] password; 268 private final String userName; 269 270 /** 271 * Creates a new SaslClientCallbackHandler. 272 * @param userName SASL user name 273 * @param password SASL password 274 */ 275 public SaslClientCallbackHandler(String userName, char[] password) { 276 this.password = password; 277 this.userName = userName; 278 } 279 280 @Override 281 public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException { 282 NameCallback nc = null; 283 PasswordCallback pc = null; 284 RealmCallback rc = null; 285 for (Callback callback : callbacks) { 286 if (callback instanceof RealmChoiceCallback) { 287 continue; 288 } else if (callback instanceof NameCallback) { 289 nc = (NameCallback) callback; 290 } else if (callback instanceof PasswordCallback) { 291 pc = (PasswordCallback) callback; 292 } else if (callback instanceof RealmCallback) { 293 rc = (RealmCallback) callback; 294 } else { 295 throw new UnsupportedCallbackException(callback, "Unrecognized SASL client callback"); 296 } 297 } 298 if (nc != null) { 299 nc.setName(userName); 300 } 301 if (pc != null) { 302 pc.setPassword(password); 303 } 304 if (rc != null) { 305 rc.setText(rc.getDefaultText()); 306 } 307 } 308 } 309 310 private static final class SaslNegotiateHandler extends ChannelDuplexHandler { 311 312 private final Configuration conf; 313 314 private final Map<String, String> saslProps; 315 316 private final SaslClient saslClient; 317 318 private final int timeoutMs; 319 320 private final Promise<Void> promise; 321 322 private final DFSClient dfsClient; 323 324 private int step = 0; 325 326 public SaslNegotiateHandler(Configuration conf, String username, char[] password, 327 Map<String, String> saslProps, int timeoutMs, Promise<Void> promise, 328 DFSClient dfsClient) throws SaslException { 329 this.conf = conf; 330 this.saslProps = saslProps; 331 this.saslClient = Sasl.createSaslClient(new String[] { MECHANISM }, username, PROTOCOL, 332 SERVER_NAME, saslProps, new SaslClientCallbackHandler(username, password)); 333 this.timeoutMs = timeoutMs; 334 this.promise = promise; 335 this.dfsClient = dfsClient; 336 } 337 338 private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload) throws IOException { 339 sendSaslMessage(ctx, payload, null); 340 } 341 342 private List<CipherOption> getCipherOptions() throws IOException { 343 // Negotiate cipher suites if configured. Currently, the only supported 344 // cipher suite is AES/CTR/NoPadding, but the protocol allows multiple 345 // values for future expansion. 346 String cipherSuites = conf.get(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY); 347 if (StringUtils.isBlank(cipherSuites)) { 348 return null; 349 } 350 if (!cipherSuites.equals(CipherSuite.AES_CTR_NOPADDING.getName())) { 351 throw new IOException(String.format("Invalid cipher suite, %s=%s", 352 DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuites)); 353 } 354 return Collections.singletonList(new CipherOption(CipherSuite.AES_CTR_NOPADDING)); 355 } 356 357 /** 358 * The asyncfs subsystem emulates a HDFS client by sending protobuf messages via netty. 359 * After Hadoop 3.3.0, the protobuf classes are relocated to org.apache.hadoop.thirdparty.protobuf.*. 360 * Use Reflection to check which ones to use. 361 */ 362 private static class BuilderPayloadSetter { 363 private static Method setPayloadMethod; 364 private static Constructor<?> constructor; 365 366 /** 367 * Create a ByteString from byte array without copying (wrap), and then set it as the payload 368 * for the builder. 369 * 370 * @param builder builder for HDFS DataTransferEncryptorMessage. 371 * @param payload byte array of payload. 372 * @throws IOException 373 */ 374 static void wrapAndSetPayload(DataTransferEncryptorMessageProto.Builder builder, byte[] payload) 375 throws IOException { 376 Object byteStringObject; 377 try { 378 // byteStringObject = new LiteralByteString(payload); 379 byteStringObject = constructor.newInstance(payload); 380 // builder.setPayload(byteStringObject); 381 setPayloadMethod.invoke(builder, constructor.getDeclaringClass().cast(byteStringObject)); 382 } catch (IllegalAccessException | InstantiationException e) { 383 throw new RuntimeException(e); 384 385 } catch (InvocationTargetException e) { 386 Throwables.propagateIfPossible(e.getTargetException(), IOException.class); 387 throw new RuntimeException(e.getTargetException()); 388 } 389 } 390 391 static { 392 Class<?> builderClass = DataTransferEncryptorMessageProto.Builder.class; 393 394 // Try the unrelocated ByteString 395 Class<?> byteStringClass = com.google.protobuf.ByteString.class; 396 try { 397 // See if it can load the relocated ByteString, which comes from hadoop-thirdparty. 398 byteStringClass = Class.forName("org.apache.hadoop.thirdparty.protobuf.ByteString"); 399 LOG.debug("Found relocated ByteString class from hadoop-thirdparty." + 400 " Assuming this is Hadoop 3.3.0+."); 401 } catch (ClassNotFoundException e) { 402 LOG.debug("Did not find relocated ByteString class from hadoop-thirdparty." + 403 " Assuming this is below Hadoop 3.3.0", e); 404 } 405 406 // LiteralByteString is a package private class in protobuf. Make it accessible. 407 Class<?> literalByteStringClass; 408 try { 409 literalByteStringClass = Class.forName( 410 "org.apache.hadoop.thirdparty.protobuf.ByteString$LiteralByteString"); 411 LOG.debug("Shaded LiteralByteString from hadoop-thirdparty is found."); 412 } catch (ClassNotFoundException e) { 413 try { 414 literalByteStringClass = Class.forName("com.google.protobuf.LiteralByteString"); 415 LOG.debug("com.google.protobuf.LiteralByteString found."); 416 } catch (ClassNotFoundException ex) { 417 throw new RuntimeException(ex); 418 } 419 } 420 421 try { 422 constructor = literalByteStringClass.getDeclaredConstructor(byte[].class); 423 constructor.setAccessible(true); 424 } catch (NoSuchMethodException e) { 425 throw new RuntimeException(e); 426 } 427 428 try { 429 setPayloadMethod = builderClass.getMethod("setPayload", byteStringClass); 430 } catch (NoSuchMethodException e) { 431 // if either method is not found, we are in big trouble. Abort. 432 throw new RuntimeException(e); 433 } 434 } 435 } 436 437 private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload, 438 List<CipherOption> options) throws IOException { 439 DataTransferEncryptorMessageProto.Builder builder = 440 DataTransferEncryptorMessageProto.newBuilder(); 441 builder.setStatus(DataTransferEncryptorStatus.SUCCESS); 442 if (payload != null) { 443 BuilderPayloadSetter.wrapAndSetPayload(builder, payload); 444 } 445 if (options != null) { 446 builder.addAllCipherOption(PBHelperClient.convertCipherOptions(options)); 447 } 448 DataTransferEncryptorMessageProto proto = builder.build(); 449 int size = proto.getSerializedSize(); 450 size += CodedOutputStream.computeRawVarint32Size(size); 451 ByteBuf buf = ctx.alloc().buffer(size); 452 proto.writeDelimitedTo(new ByteBufOutputStream(buf)); 453 ctx.write(buf); 454 } 455 456 @Override 457 public void handlerAdded(ChannelHandlerContext ctx) throws Exception { 458 ctx.write(ctx.alloc().buffer(4).writeInt(SASL_TRANSFER_MAGIC_NUMBER)); 459 sendSaslMessage(ctx, new byte[0]); 460 ctx.flush(); 461 step++; 462 } 463 464 @Override 465 public void channelInactive(ChannelHandlerContext ctx) throws Exception { 466 saslClient.dispose(); 467 } 468 469 private void check(DataTransferEncryptorMessageProto proto) throws IOException { 470 if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) { 471 dfsClient.clearDataEncryptionKey(); 472 throw new InvalidEncryptionKeyException(proto.getMessage()); 473 } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) { 474 throw new IOException(proto.getMessage()); 475 } 476 } 477 478 private String getNegotiatedQop() { 479 return (String) saslClient.getNegotiatedProperty(Sasl.QOP); 480 } 481 482 private boolean isNegotiatedQopPrivacy() { 483 String qop = getNegotiatedQop(); 484 return qop != null && "auth-conf".equalsIgnoreCase(qop); 485 } 486 487 private boolean requestedQopContainsPrivacy() { 488 Set<String> requestedQop = 489 ImmutableSet.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(","))); 490 return requestedQop.contains("auth-conf"); 491 } 492 493 private void checkSaslComplete() throws IOException { 494 if (!saslClient.isComplete()) { 495 throw new IOException("Failed to complete SASL handshake"); 496 } 497 Set<String> requestedQop = 498 ImmutableSet.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(","))); 499 String negotiatedQop = getNegotiatedQop(); 500 LOG.debug( 501 "Verifying QOP, requested QOP = " + requestedQop + ", negotiated QOP = " + negotiatedQop); 502 if (!requestedQop.contains(negotiatedQop)) { 503 throw new IOException(String.format("SASL handshake completed, but " 504 + "channel does not have acceptable quality of protection, " 505 + "requested = %s, negotiated = %s", 506 requestedQop, negotiatedQop)); 507 } 508 } 509 510 private boolean useWrap() { 511 String qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP); 512 return qop != null && !"auth".equalsIgnoreCase(qop); 513 } 514 515 private CipherOption unwrap(CipherOption option, SaslClient saslClient) throws IOException { 516 byte[] inKey = option.getInKey(); 517 if (inKey != null) { 518 inKey = saslClient.unwrap(inKey, 0, inKey.length); 519 } 520 byte[] outKey = option.getOutKey(); 521 if (outKey != null) { 522 outKey = saslClient.unwrap(outKey, 0, outKey.length); 523 } 524 return new CipherOption(option.getCipherSuite(), inKey, option.getInIv(), outKey, 525 option.getOutIv()); 526 } 527 528 private CipherOption getCipherOption(DataTransferEncryptorMessageProto proto, 529 boolean isNegotiatedQopPrivacy, SaslClient saslClient) throws IOException { 530 List<CipherOption> cipherOptions = 531 PBHelperClient.convertCipherOptionProtos(proto.getCipherOptionList()); 532 if (cipherOptions == null || cipherOptions.isEmpty()) { 533 return null; 534 } 535 CipherOption cipherOption = cipherOptions.get(0); 536 return isNegotiatedQopPrivacy ? unwrap(cipherOption, saslClient) : cipherOption; 537 } 538 539 @Override 540 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 541 if (msg instanceof DataTransferEncryptorMessageProto) { 542 DataTransferEncryptorMessageProto proto = (DataTransferEncryptorMessageProto) msg; 543 check(proto); 544 byte[] challenge = proto.getPayload().toByteArray(); 545 byte[] response = saslClient.evaluateChallenge(challenge); 546 switch (step) { 547 case 1: { 548 List<CipherOption> cipherOptions = null; 549 if (requestedQopContainsPrivacy()) { 550 cipherOptions = getCipherOptions(); 551 } 552 sendSaslMessage(ctx, response, cipherOptions); 553 ctx.flush(); 554 step++; 555 break; 556 } 557 case 2: { 558 assert response == null; 559 checkSaslComplete(); 560 CipherOption cipherOption = 561 getCipherOption(proto, isNegotiatedQopPrivacy(), saslClient); 562 ChannelPipeline p = ctx.pipeline(); 563 while (p.first() != null) { 564 p.removeFirst(); 565 } 566 if (cipherOption != null) { 567 CryptoCodec codec = CryptoCodec.getInstance(conf, cipherOption.getCipherSuite()); 568 p.addLast(new EncryptHandler(codec, cipherOption.getInKey(), cipherOption.getInIv()), 569 new DecryptHandler(codec, cipherOption.getOutKey(), cipherOption.getOutIv())); 570 } else { 571 if (useWrap()) { 572 p.addLast(new SaslWrapHandler(saslClient), 573 new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4), 574 new SaslUnwrapHandler(saslClient)); 575 } 576 } 577 promise.trySuccess(null); 578 break; 579 } 580 default: 581 throw new IllegalArgumentException("Unrecognized negotiation step: " + step); 582 } 583 } else { 584 ctx.fireChannelRead(msg); 585 } 586 } 587 588 @Override 589 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 590 promise.tryFailure(cause); 591 } 592 593 @Override 594 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { 595 if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == READER_IDLE) { 596 promise.tryFailure(new IOException("Timeout(" + timeoutMs + "ms) waiting for response")); 597 } else { 598 super.userEventTriggered(ctx, evt); 599 } 600 } 601 } 602 603 private static final class SaslUnwrapHandler extends SimpleChannelInboundHandler<ByteBuf> { 604 605 private final SaslClient saslClient; 606 607 public SaslUnwrapHandler(SaslClient saslClient) { 608 this.saslClient = saslClient; 609 } 610 611 @Override 612 public void channelInactive(ChannelHandlerContext ctx) throws Exception { 613 saslClient.dispose(); 614 } 615 616 @Override 617 protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { 618 msg.skipBytes(4); 619 byte[] b = new byte[msg.readableBytes()]; 620 msg.readBytes(b); 621 ctx.fireChannelRead(Unpooled.wrappedBuffer(saslClient.unwrap(b, 0, b.length))); 622 } 623 } 624 625 private static final class SaslWrapHandler extends ChannelOutboundHandlerAdapter { 626 627 private final SaslClient saslClient; 628 629 private CompositeByteBuf cBuf; 630 631 public SaslWrapHandler(SaslClient saslClient) { 632 this.saslClient = saslClient; 633 } 634 635 @Override 636 public void handlerAdded(ChannelHandlerContext ctx) throws Exception { 637 cBuf = new CompositeByteBuf(ctx.alloc(), false, Integer.MAX_VALUE); 638 } 639 640 @Override 641 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) 642 throws Exception { 643 if (msg instanceof ByteBuf) { 644 ByteBuf buf = (ByteBuf) msg; 645 cBuf.addComponent(buf); 646 cBuf.writerIndex(cBuf.writerIndex() + buf.readableBytes()); 647 } else { 648 ctx.write(msg); 649 } 650 } 651 652 @Override 653 public void flush(ChannelHandlerContext ctx) throws Exception { 654 if (cBuf.isReadable()) { 655 byte[] b = new byte[cBuf.readableBytes()]; 656 cBuf.readBytes(b); 657 cBuf.discardReadComponents(); 658 byte[] wrapped = saslClient.wrap(b, 0, b.length); 659 ByteBuf buf = ctx.alloc().ioBuffer(4 + wrapped.length); 660 buf.writeInt(wrapped.length); 661 buf.writeBytes(wrapped); 662 ctx.write(buf); 663 } 664 ctx.flush(); 665 } 666 667 @Override 668 public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { 669 cBuf.release(); 670 cBuf = null; 671 } 672 } 673 674 private static final class DecryptHandler extends SimpleChannelInboundHandler<ByteBuf> { 675 676 private final Decryptor decryptor; 677 678 public DecryptHandler(CryptoCodec codec, byte[] key, byte[] iv) 679 throws GeneralSecurityException, IOException { 680 this.decryptor = codec.createDecryptor(); 681 this.decryptor.init(key, Arrays.copyOf(iv, iv.length)); 682 } 683 684 @Override 685 protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { 686 ByteBuf inBuf; 687 boolean release = false; 688 if (msg.nioBufferCount() == 1) { 689 inBuf = msg; 690 } else { 691 inBuf = ctx.alloc().directBuffer(msg.readableBytes()); 692 msg.readBytes(inBuf); 693 release = true; 694 } 695 ByteBuffer inBuffer = inBuf.nioBuffer(); 696 ByteBuf outBuf = ctx.alloc().directBuffer(inBuf.readableBytes()); 697 ByteBuffer outBuffer = outBuf.nioBuffer(0, inBuf.readableBytes()); 698 decryptor.decrypt(inBuffer, outBuffer); 699 outBuf.writerIndex(inBuf.readableBytes()); 700 if (release) { 701 inBuf.release(); 702 } 703 ctx.fireChannelRead(outBuf); 704 } 705 } 706 707 private static final class EncryptHandler extends MessageToByteEncoder<ByteBuf> { 708 709 private final Encryptor encryptor; 710 711 public EncryptHandler(CryptoCodec codec, byte[] key, byte[] iv) 712 throws GeneralSecurityException, IOException { 713 this.encryptor = codec.createEncryptor(); 714 this.encryptor.init(key, Arrays.copyOf(iv, iv.length)); 715 } 716 717 @Override 718 protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, ByteBuf msg, boolean preferDirect) 719 throws Exception { 720 if (preferDirect) { 721 return ctx.alloc().directBuffer(msg.readableBytes()); 722 } else { 723 return ctx.alloc().buffer(msg.readableBytes()); 724 } 725 } 726 727 @Override 728 protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception { 729 ByteBuf inBuf; 730 boolean release = false; 731 if (msg.nioBufferCount() == 1) { 732 inBuf = msg; 733 } else { 734 inBuf = ctx.alloc().directBuffer(msg.readableBytes()); 735 msg.readBytes(inBuf); 736 release = true; 737 } 738 ByteBuffer inBuffer = inBuf.nioBuffer(); 739 ByteBuffer outBuffer = out.nioBuffer(0, inBuf.readableBytes()); 740 encryptor.encrypt(inBuffer, outBuffer); 741 out.writerIndex(inBuf.readableBytes()); 742 if (release) { 743 inBuf.release(); 744 } 745 } 746 } 747 748 private static String getUserNameFromEncryptionKey(DataEncryptionKey encryptionKey) { 749 return encryptionKey.keyId + NAME_DELIMITER + encryptionKey.blockPoolId + NAME_DELIMITER 750 + Base64.getEncoder().encodeToString(encryptionKey.nonce); 751 } 752 753 private static char[] encryptionKeyToPassword(byte[] encryptionKey) { 754 return Base64.getEncoder().encodeToString(encryptionKey).toCharArray(); 755 } 756 757 private static String buildUsername(Token<BlockTokenIdentifier> blockToken) { 758 return Base64.getEncoder().encodeToString(blockToken.getIdentifier()); 759 } 760 761 private static char[] buildClientPassword(Token<BlockTokenIdentifier> blockToken) { 762 return Base64.getEncoder().encodeToString(blockToken.getPassword()).toCharArray(); 763 } 764 765 private static Map<String, String> createSaslPropertiesForEncryption(String encryptionAlgorithm) { 766 Map<String, String> saslProps = Maps.newHashMapWithExpectedSize(3); 767 saslProps.put(Sasl.QOP, QualityOfProtection.PRIVACY.getSaslQop()); 768 saslProps.put(Sasl.SERVER_AUTH, "true"); 769 saslProps.put("com.sun.security.sasl.digest.cipher", encryptionAlgorithm); 770 return saslProps; 771 } 772 773 private static void doSaslNegotiation(Configuration conf, Channel channel, int timeoutMs, 774 String username, char[] password, Map<String, String> saslProps, Promise<Void> saslPromise, 775 DFSClient dfsClient) { 776 try { 777 channel.pipeline().addLast(new IdleStateHandler(timeoutMs, 0, 0, TimeUnit.MILLISECONDS), 778 new ProtobufVarint32FrameDecoder(), 779 new ProtobufDecoder(DataTransferEncryptorMessageProto.getDefaultInstance()), 780 new SaslNegotiateHandler(conf, username, password, saslProps, timeoutMs, saslPromise, 781 dfsClient)); 782 } catch (SaslException e) { 783 saslPromise.tryFailure(e); 784 } 785 } 786 787 static void trySaslNegotiate(Configuration conf, Channel channel, DatanodeInfo dnInfo, 788 int timeoutMs, DFSClient client, Token<BlockTokenIdentifier> accessToken, 789 Promise<Void> saslPromise) throws IOException { 790 SaslDataTransferClient saslClient = client.getSaslDataTransferClient(); 791 SaslPropertiesResolver saslPropsResolver = SASL_ADAPTOR.getSaslPropsResolver(saslClient); 792 TrustedChannelResolver trustedChannelResolver = 793 SASL_ADAPTOR.getTrustedChannelResolver(saslClient); 794 AtomicBoolean fallbackToSimpleAuth = SASL_ADAPTOR.getFallbackToSimpleAuth(saslClient); 795 InetAddress addr = ((InetSocketAddress) channel.remoteAddress()).getAddress(); 796 if (trustedChannelResolver.isTrusted() || trustedChannelResolver.isTrusted(addr)) { 797 saslPromise.trySuccess(null); 798 return; 799 } 800 DataEncryptionKey encryptionKey = client.newDataEncryptionKey(); 801 if (encryptionKey != null) { 802 if (LOG.isDebugEnabled()) { 803 LOG.debug( 804 "SASL client doing encrypted handshake for addr = " + addr + ", datanodeId = " + dnInfo); 805 } 806 doSaslNegotiation(conf, channel, timeoutMs, getUserNameFromEncryptionKey(encryptionKey), 807 encryptionKeyToPassword(encryptionKey.encryptionKey), 808 createSaslPropertiesForEncryption(encryptionKey.encryptionAlgorithm), saslPromise, 809 client); 810 } else if (!UserGroupInformation.isSecurityEnabled()) { 811 if (LOG.isDebugEnabled()) { 812 LOG.debug("SASL client skipping handshake in unsecured configuration for addr = " + addr 813 + ", datanodeId = " + dnInfo); 814 } 815 saslPromise.trySuccess(null); 816 } else if (dnInfo.getXferPort() < 1024) { 817 if (LOG.isDebugEnabled()) { 818 LOG.debug("SASL client skipping handshake in secured configuration with " 819 + "privileged port for addr = " + addr + ", datanodeId = " + dnInfo); 820 } 821 saslPromise.trySuccess(null); 822 } else if (fallbackToSimpleAuth != null && fallbackToSimpleAuth.get()) { 823 if (LOG.isDebugEnabled()) { 824 LOG.debug("SASL client skipping handshake in secured configuration with " 825 + "unsecured cluster for addr = " + addr + ", datanodeId = " + dnInfo); 826 } 827 saslPromise.trySuccess(null); 828 } else if (saslPropsResolver != null) { 829 if (LOG.isDebugEnabled()) { 830 LOG.debug( 831 "SASL client doing general handshake for addr = " + addr + ", datanodeId = " + dnInfo); 832 } 833 doSaslNegotiation(conf, channel, timeoutMs, buildUsername(accessToken), 834 buildClientPassword(accessToken), saslPropsResolver.getClientProperties(addr), saslPromise, 835 client); 836 } else { 837 // It's a secured cluster using non-privileged ports, but no SASL. The only way this can 838 // happen is if the DataNode has ignore.secure.ports.for.testing configured, so this is a rare 839 // edge case. 840 if (LOG.isDebugEnabled()) { 841 LOG.debug("SASL client skipping handshake in secured configuration with no SASL " 842 + "protection configured for addr = " + addr + ", datanodeId = " + dnInfo); 843 } 844 saslPromise.trySuccess(null); 845 } 846 } 847 848 static Encryptor createEncryptor(Configuration conf, HdfsFileStatus stat, DFSClient client) 849 throws IOException { 850 FileEncryptionInfo feInfo = stat.getFileEncryptionInfo(); 851 if (feInfo == null) { 852 return null; 853 } 854 return TRANSPARENT_CRYPTO_HELPER.createEncryptor(conf, feInfo, client); 855 } 856}