001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.asyncfs; 019 020import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeWrite; 021import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY; 022import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE; 023 024import com.google.protobuf.CodedOutputStream; 025import java.io.IOException; 026import java.lang.reflect.Constructor; 027import java.lang.reflect.Field; 028import java.lang.reflect.InvocationTargetException; 029import java.lang.reflect.Method; 030import java.net.InetAddress; 031import java.net.InetSocketAddress; 032import java.nio.ByteBuffer; 033import java.security.GeneralSecurityException; 034import java.util.Arrays; 035import java.util.Base64; 036import java.util.Collections; 037import java.util.List; 038import java.util.Map; 039import java.util.Set; 040import java.util.concurrent.TimeUnit; 041import java.util.concurrent.atomic.AtomicBoolean; 042import javax.security.auth.callback.Callback; 043import javax.security.auth.callback.CallbackHandler; 044import javax.security.auth.callback.NameCallback; 045import javax.security.auth.callback.PasswordCallback; 046import javax.security.auth.callback.UnsupportedCallbackException; 047import javax.security.sasl.RealmCallback; 048import javax.security.sasl.RealmChoiceCallback; 049import javax.security.sasl.Sasl; 050import javax.security.sasl.SaslClient; 051import javax.security.sasl.SaslException; 052import org.apache.commons.lang3.StringUtils; 053import org.apache.hadoop.conf.Configuration; 054import org.apache.hadoop.crypto.CipherOption; 055import org.apache.hadoop.crypto.CipherSuite; 056import org.apache.hadoop.crypto.CryptoCodec; 057import org.apache.hadoop.crypto.Decryptor; 058import org.apache.hadoop.crypto.Encryptor; 059import org.apache.hadoop.crypto.key.KeyProvider; 060import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion; 061import org.apache.hadoop.fs.FileEncryptionInfo; 062import org.apache.hadoop.hdfs.DFSClient; 063import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 064import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; 065import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; 066import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver; 067import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient; 068import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto; 069import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus; 070import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; 071import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; 072import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; 073import org.apache.hadoop.security.SaslPropertiesResolver; 074import org.apache.hadoop.security.SaslRpcServer.QualityOfProtection; 075import org.apache.hadoop.security.UserGroupInformation; 076import org.apache.hadoop.security.token.Token; 077import org.apache.yetus.audience.InterfaceAudience; 078import org.slf4j.Logger; 079import org.slf4j.LoggerFactory; 080 081import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 082import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet; 083import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 084import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 085import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream; 086import org.apache.hbase.thirdparty.io.netty.buffer.CompositeByteBuf; 087import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled; 088import org.apache.hbase.thirdparty.io.netty.channel.Channel; 089import org.apache.hbase.thirdparty.io.netty.channel.ChannelDuplexHandler; 090import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; 091import org.apache.hbase.thirdparty.io.netty.channel.ChannelOutboundHandlerAdapter; 092import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline; 093import org.apache.hbase.thirdparty.io.netty.channel.ChannelPromise; 094import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler; 095import org.apache.hbase.thirdparty.io.netty.handler.codec.LengthFieldBasedFrameDecoder; 096import org.apache.hbase.thirdparty.io.netty.handler.codec.MessageToByteEncoder; 097import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; 098import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent; 099import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler; 100import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise; 101 102/** 103 * Helper class for adding sasl support for {@link FanOutOneBlockAsyncDFSOutput}. 104 */ 105@InterfaceAudience.Private 106public final class FanOutOneBlockAsyncDFSOutputSaslHelper { 107 private static final Logger LOG = 108 LoggerFactory.getLogger(FanOutOneBlockAsyncDFSOutputSaslHelper.class); 109 110 private FanOutOneBlockAsyncDFSOutputSaslHelper() { 111 } 112 113 private static final String SERVER_NAME = "0"; 114 private static final String PROTOCOL = "hdfs"; 115 private static final String MECHANISM = "DIGEST-MD5"; 116 private static final int SASL_TRANSFER_MAGIC_NUMBER = 0xDEADBEEF; 117 private static final String NAME_DELIMITER = " "; 118 119 private interface SaslAdaptor { 120 121 TrustedChannelResolver getTrustedChannelResolver(SaslDataTransferClient saslClient); 122 123 SaslPropertiesResolver getSaslPropsResolver(SaslDataTransferClient saslClient); 124 125 AtomicBoolean getFallbackToSimpleAuth(SaslDataTransferClient saslClient); 126 } 127 128 private static final SaslAdaptor SASL_ADAPTOR; 129 130 private interface TransparentCryptoHelper { 131 132 Encryptor createEncryptor(Configuration conf, FileEncryptionInfo feInfo, DFSClient client) 133 throws IOException; 134 } 135 136 private static final TransparentCryptoHelper TRANSPARENT_CRYPTO_HELPER; 137 138 private static SaslAdaptor createSaslAdaptor() 139 throws NoSuchFieldException, NoSuchMethodException { 140 Field saslPropsResolverField = 141 SaslDataTransferClient.class.getDeclaredField("saslPropsResolver"); 142 saslPropsResolverField.setAccessible(true); 143 Field trustedChannelResolverField = 144 SaslDataTransferClient.class.getDeclaredField("trustedChannelResolver"); 145 trustedChannelResolverField.setAccessible(true); 146 Field fallbackToSimpleAuthField = 147 SaslDataTransferClient.class.getDeclaredField("fallbackToSimpleAuth"); 148 fallbackToSimpleAuthField.setAccessible(true); 149 return new SaslAdaptor() { 150 151 @Override 152 public TrustedChannelResolver getTrustedChannelResolver(SaslDataTransferClient saslClient) { 153 try { 154 return (TrustedChannelResolver) trustedChannelResolverField.get(saslClient); 155 } catch (IllegalAccessException e) { 156 throw new RuntimeException(e); 157 } 158 } 159 160 @Override 161 public SaslPropertiesResolver getSaslPropsResolver(SaslDataTransferClient saslClient) { 162 try { 163 return (SaslPropertiesResolver) saslPropsResolverField.get(saslClient); 164 } catch (IllegalAccessException e) { 165 throw new RuntimeException(e); 166 } 167 } 168 169 @Override 170 public AtomicBoolean getFallbackToSimpleAuth(SaslDataTransferClient saslClient) { 171 try { 172 return (AtomicBoolean) fallbackToSimpleAuthField.get(saslClient); 173 } catch (IllegalAccessException e) { 174 throw new RuntimeException(e); 175 } 176 } 177 }; 178 } 179 180 private static TransparentCryptoHelper createTransparentCryptoHelperWithoutHDFS12396() 181 throws NoSuchMethodException { 182 Method decryptEncryptedDataEncryptionKeyMethod = DFSClient.class 183 .getDeclaredMethod("decryptEncryptedDataEncryptionKey", FileEncryptionInfo.class); 184 decryptEncryptedDataEncryptionKeyMethod.setAccessible(true); 185 return new TransparentCryptoHelper() { 186 187 @Override 188 public Encryptor createEncryptor(Configuration conf, FileEncryptionInfo feInfo, 189 DFSClient client) throws IOException { 190 try { 191 KeyVersion decryptedKey = 192 (KeyVersion) decryptEncryptedDataEncryptionKeyMethod.invoke(client, feInfo); 193 CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf, feInfo.getCipherSuite()); 194 Encryptor encryptor = cryptoCodec.createEncryptor(); 195 encryptor.init(decryptedKey.getMaterial(), feInfo.getIV()); 196 return encryptor; 197 } catch (InvocationTargetException e) { 198 Throwables.propagateIfPossible(e.getTargetException(), IOException.class); 199 throw new RuntimeException(e.getTargetException()); 200 } catch (GeneralSecurityException e) { 201 throw new IOException(e); 202 } catch (IllegalAccessException e) { 203 throw new RuntimeException(e); 204 } 205 } 206 }; 207 } 208 209 private static TransparentCryptoHelper createTransparentCryptoHelperWithHDFS12396() 210 throws ClassNotFoundException, NoSuchMethodException { 211 Class<?> hdfsKMSUtilCls = Class.forName("org.apache.hadoop.hdfs.HdfsKMSUtil"); 212 Method decryptEncryptedDataEncryptionKeyMethod = hdfsKMSUtilCls.getDeclaredMethod( 213 "decryptEncryptedDataEncryptionKey", FileEncryptionInfo.class, KeyProvider.class); 214 decryptEncryptedDataEncryptionKeyMethod.setAccessible(true); 215 return new TransparentCryptoHelper() { 216 217 @Override 218 public Encryptor createEncryptor(Configuration conf, FileEncryptionInfo feInfo, 219 DFSClient client) throws IOException { 220 try { 221 KeyVersion decryptedKey = (KeyVersion) decryptEncryptedDataEncryptionKeyMethod 222 .invoke(null, feInfo, client.getKeyProvider()); 223 CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf, feInfo.getCipherSuite()); 224 Encryptor encryptor = cryptoCodec.createEncryptor(); 225 encryptor.init(decryptedKey.getMaterial(), feInfo.getIV()); 226 return encryptor; 227 } catch (InvocationTargetException e) { 228 Throwables.propagateIfPossible(e.getTargetException(), IOException.class); 229 throw new RuntimeException(e.getTargetException()); 230 } catch (GeneralSecurityException e) { 231 throw new IOException(e); 232 } catch (IllegalAccessException e) { 233 throw new RuntimeException(e); 234 } 235 } 236 }; 237 } 238 239 private static TransparentCryptoHelper createTransparentCryptoHelper() 240 throws NoSuchMethodException, ClassNotFoundException { 241 try { 242 return createTransparentCryptoHelperWithoutHDFS12396(); 243 } catch (NoSuchMethodException e) { 244 LOG.debug("No decryptEncryptedDataEncryptionKey method in DFSClient," 245 + " should be hadoop version with HDFS-12396", e); 246 } 247 return createTransparentCryptoHelperWithHDFS12396(); 248 } 249 250 static { 251 try { 252 SASL_ADAPTOR = createSaslAdaptor(); 253 TRANSPARENT_CRYPTO_HELPER = createTransparentCryptoHelper(); 254 } catch (Exception e) { 255 String msg = "Couldn't properly initialize access to HDFS internals. Please " 256 + "update your WAL Provider to not make use of the 'asyncfs' provider. See " 257 + "HBASE-16110 for more information."; 258 LOG.error(msg, e); 259 throw new Error(msg, e); 260 } 261 } 262 263 /** 264 * Sets user name and password when asked by the client-side SASL object. 265 */ 266 private static final class SaslClientCallbackHandler implements CallbackHandler { 267 268 private final char[] password; 269 private final String userName; 270 271 /** 272 * Creates a new SaslClientCallbackHandler. 273 * @param userName SASL user name 274 * @param password SASL password 275 */ 276 public SaslClientCallbackHandler(String userName, char[] password) { 277 this.password = password; 278 this.userName = userName; 279 } 280 281 @Override 282 public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException { 283 NameCallback nc = null; 284 PasswordCallback pc = null; 285 RealmCallback rc = null; 286 for (Callback callback : callbacks) { 287 if (callback instanceof RealmChoiceCallback) { 288 continue; 289 } else if (callback instanceof NameCallback) { 290 nc = (NameCallback) callback; 291 } else if (callback instanceof PasswordCallback) { 292 pc = (PasswordCallback) callback; 293 } else if (callback instanceof RealmCallback) { 294 rc = (RealmCallback) callback; 295 } else { 296 throw new UnsupportedCallbackException(callback, "Unrecognized SASL client callback"); 297 } 298 } 299 if (nc != null) { 300 nc.setName(userName); 301 } 302 if (pc != null) { 303 pc.setPassword(password); 304 } 305 if (rc != null) { 306 rc.setText(rc.getDefaultText()); 307 } 308 } 309 } 310 311 private static final class SaslNegotiateHandler extends ChannelDuplexHandler { 312 313 private final Configuration conf; 314 315 private final Map<String, String> saslProps; 316 317 private final SaslClient saslClient; 318 319 private final int timeoutMs; 320 321 private final Promise<Void> promise; 322 323 private final DFSClient dfsClient; 324 325 private int step = 0; 326 327 public SaslNegotiateHandler(Configuration conf, String username, char[] password, 328 Map<String, String> saslProps, int timeoutMs, Promise<Void> promise, DFSClient dfsClient) 329 throws SaslException { 330 this.conf = conf; 331 this.saslProps = saslProps; 332 this.saslClient = Sasl.createSaslClient(new String[] { MECHANISM }, username, PROTOCOL, 333 SERVER_NAME, saslProps, new SaslClientCallbackHandler(username, password)); 334 this.timeoutMs = timeoutMs; 335 this.promise = promise; 336 this.dfsClient = dfsClient; 337 } 338 339 private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload) throws IOException { 340 sendSaslMessage(ctx, payload, null); 341 } 342 343 private List<CipherOption> getCipherOptions() throws IOException { 344 // Negotiate cipher suites if configured. Currently, the only supported 345 // cipher suite is AES/CTR/NoPadding, but the protocol allows multiple 346 // values for future expansion. 347 String cipherSuites = conf.get(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY); 348 if (StringUtils.isBlank(cipherSuites)) { 349 return null; 350 } 351 if (!cipherSuites.equals(CipherSuite.AES_CTR_NOPADDING.getName())) { 352 throw new IOException(String.format("Invalid cipher suite, %s=%s", 353 DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuites)); 354 } 355 return Collections.singletonList(new CipherOption(CipherSuite.AES_CTR_NOPADDING)); 356 } 357 358 /** 359 * The asyncfs subsystem emulates a HDFS client by sending protobuf messages via netty. After 360 * Hadoop 3.3.0, the protobuf classes are relocated to org.apache.hadoop.thirdparty.protobuf.*. 361 * Use Reflection to check which ones to use. 362 */ 363 private static class BuilderPayloadSetter { 364 private static Method setPayloadMethod; 365 private static Constructor<?> constructor; 366 367 /** 368 * Create a ByteString from byte array without copying (wrap), and then set it as the payload 369 * for the builder. 370 * @param builder builder for HDFS DataTransferEncryptorMessage. 371 * @param payload byte array of payload. n 372 */ 373 static void wrapAndSetPayload(DataTransferEncryptorMessageProto.Builder builder, 374 byte[] payload) throws IOException { 375 Object byteStringObject; 376 try { 377 // byteStringObject = new LiteralByteString(payload); 378 byteStringObject = constructor.newInstance(payload); 379 // builder.setPayload(byteStringObject); 380 setPayloadMethod.invoke(builder, constructor.getDeclaringClass().cast(byteStringObject)); 381 } catch (IllegalAccessException | InstantiationException e) { 382 throw new RuntimeException(e); 383 384 } catch (InvocationTargetException e) { 385 Throwables.propagateIfPossible(e.getTargetException(), IOException.class); 386 throw new RuntimeException(e.getTargetException()); 387 } 388 } 389 390 static { 391 Class<?> builderClass = DataTransferEncryptorMessageProto.Builder.class; 392 393 // Try the unrelocated ByteString 394 Class<?> byteStringClass = com.google.protobuf.ByteString.class; 395 try { 396 // See if it can load the relocated ByteString, which comes from hadoop-thirdparty. 397 byteStringClass = Class.forName("org.apache.hadoop.thirdparty.protobuf.ByteString"); 398 LOG.debug("Found relocated ByteString class from hadoop-thirdparty." 399 + " Assuming this is Hadoop 3.3.0+."); 400 } catch (ClassNotFoundException e) { 401 LOG.debug("Did not find relocated ByteString class from hadoop-thirdparty." 402 + " Assuming this is below Hadoop 3.3.0", e); 403 } 404 405 // LiteralByteString is a package private class in protobuf. Make it accessible. 406 Class<?> literalByteStringClass; 407 try { 408 literalByteStringClass = 409 Class.forName("org.apache.hadoop.thirdparty.protobuf.ByteString$LiteralByteString"); 410 LOG.debug("Shaded LiteralByteString from hadoop-thirdparty is found."); 411 } catch (ClassNotFoundException e) { 412 try { 413 literalByteStringClass = Class.forName("com.google.protobuf.LiteralByteString"); 414 LOG.debug("com.google.protobuf.LiteralByteString found."); 415 } catch (ClassNotFoundException ex) { 416 throw new RuntimeException(ex); 417 } 418 } 419 420 try { 421 constructor = literalByteStringClass.getDeclaredConstructor(byte[].class); 422 constructor.setAccessible(true); 423 } catch (NoSuchMethodException e) { 424 throw new RuntimeException(e); 425 } 426 427 try { 428 setPayloadMethod = builderClass.getMethod("setPayload", byteStringClass); 429 } catch (NoSuchMethodException e) { 430 // if either method is not found, we are in big trouble. Abort. 431 throw new RuntimeException(e); 432 } 433 } 434 } 435 436 private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload, 437 List<CipherOption> options) throws IOException { 438 DataTransferEncryptorMessageProto.Builder builder = 439 DataTransferEncryptorMessageProto.newBuilder(); 440 builder.setStatus(DataTransferEncryptorStatus.SUCCESS); 441 if (payload != null) { 442 BuilderPayloadSetter.wrapAndSetPayload(builder, payload); 443 } 444 if (options != null) { 445 builder.addAllCipherOption(PBHelperClient.convertCipherOptions(options)); 446 } 447 DataTransferEncryptorMessageProto proto = builder.build(); 448 int size = proto.getSerializedSize(); 449 size += CodedOutputStream.computeRawVarint32Size(size); 450 ByteBuf buf = ctx.alloc().buffer(size); 451 proto.writeDelimitedTo(new ByteBufOutputStream(buf)); 452 safeWrite(ctx, buf); 453 } 454 455 @Override 456 public void handlerAdded(ChannelHandlerContext ctx) throws Exception { 457 safeWrite(ctx, ctx.alloc().buffer(4).writeInt(SASL_TRANSFER_MAGIC_NUMBER)); 458 sendSaslMessage(ctx, new byte[0]); 459 ctx.flush(); 460 step++; 461 } 462 463 @Override 464 public void channelInactive(ChannelHandlerContext ctx) throws Exception { 465 saslClient.dispose(); 466 } 467 468 private void check(DataTransferEncryptorMessageProto proto) throws IOException { 469 if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) { 470 dfsClient.clearDataEncryptionKey(); 471 throw new InvalidEncryptionKeyException(proto.getMessage()); 472 } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) { 473 throw new IOException(proto.getMessage()); 474 } 475 } 476 477 private String getNegotiatedQop() { 478 return (String) saslClient.getNegotiatedProperty(Sasl.QOP); 479 } 480 481 private boolean isNegotiatedQopPrivacy() { 482 String qop = getNegotiatedQop(); 483 return qop != null && "auth-conf".equalsIgnoreCase(qop); 484 } 485 486 private boolean requestedQopContainsPrivacy() { 487 Set<String> requestedQop = 488 ImmutableSet.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(","))); 489 return requestedQop.contains("auth-conf"); 490 } 491 492 private void checkSaslComplete() throws IOException { 493 if (!saslClient.isComplete()) { 494 throw new IOException("Failed to complete SASL handshake"); 495 } 496 Set<String> requestedQop = 497 ImmutableSet.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(","))); 498 String negotiatedQop = getNegotiatedQop(); 499 LOG.debug( 500 "Verifying QOP, requested QOP = " + requestedQop + ", negotiated QOP = " + negotiatedQop); 501 if (!requestedQop.contains(negotiatedQop)) { 502 throw new IOException(String.format("SASL handshake completed, but " 503 + "channel does not have acceptable quality of protection, " 504 + "requested = %s, negotiated = %s", requestedQop, negotiatedQop)); 505 } 506 } 507 508 private boolean useWrap() { 509 String qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP); 510 return qop != null && !"auth".equalsIgnoreCase(qop); 511 } 512 513 private CipherOption unwrap(CipherOption option, SaslClient saslClient) throws IOException { 514 byte[] inKey = option.getInKey(); 515 if (inKey != null) { 516 inKey = saslClient.unwrap(inKey, 0, inKey.length); 517 } 518 byte[] outKey = option.getOutKey(); 519 if (outKey != null) { 520 outKey = saslClient.unwrap(outKey, 0, outKey.length); 521 } 522 return new CipherOption(option.getCipherSuite(), inKey, option.getInIv(), outKey, 523 option.getOutIv()); 524 } 525 526 private CipherOption getCipherOption(DataTransferEncryptorMessageProto proto, 527 boolean isNegotiatedQopPrivacy, SaslClient saslClient) throws IOException { 528 List<CipherOption> cipherOptions = 529 PBHelperClient.convertCipherOptionProtos(proto.getCipherOptionList()); 530 if (cipherOptions == null || cipherOptions.isEmpty()) { 531 return null; 532 } 533 CipherOption cipherOption = cipherOptions.get(0); 534 return isNegotiatedQopPrivacy ? unwrap(cipherOption, saslClient) : cipherOption; 535 } 536 537 @Override 538 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 539 if (msg instanceof DataTransferEncryptorMessageProto) { 540 DataTransferEncryptorMessageProto proto = (DataTransferEncryptorMessageProto) msg; 541 check(proto); 542 byte[] challenge = proto.getPayload().toByteArray(); 543 byte[] response = saslClient.evaluateChallenge(challenge); 544 switch (step) { 545 case 1: { 546 List<CipherOption> cipherOptions = null; 547 if (requestedQopContainsPrivacy()) { 548 cipherOptions = getCipherOptions(); 549 } 550 sendSaslMessage(ctx, response, cipherOptions); 551 ctx.flush(); 552 step++; 553 break; 554 } 555 case 2: { 556 assert response == null; 557 checkSaslComplete(); 558 CipherOption cipherOption = 559 getCipherOption(proto, isNegotiatedQopPrivacy(), saslClient); 560 ChannelPipeline p = ctx.pipeline(); 561 while (p.first() != null) { 562 p.removeFirst(); 563 } 564 if (cipherOption != null) { 565 CryptoCodec codec = CryptoCodec.getInstance(conf, cipherOption.getCipherSuite()); 566 p.addLast(new EncryptHandler(codec, cipherOption.getInKey(), cipherOption.getInIv()), 567 new DecryptHandler(codec, cipherOption.getOutKey(), cipherOption.getOutIv())); 568 } else { 569 if (useWrap()) { 570 p.addLast(new SaslWrapHandler(saslClient), 571 new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4), 572 new SaslUnwrapHandler(saslClient)); 573 } 574 } 575 promise.trySuccess(null); 576 break; 577 } 578 default: 579 throw new IllegalArgumentException("Unrecognized negotiation step: " + step); 580 } 581 } else { 582 ctx.fireChannelRead(msg); 583 } 584 } 585 586 @Override 587 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 588 promise.tryFailure(cause); 589 } 590 591 @Override 592 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { 593 if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == READER_IDLE) { 594 promise.tryFailure(new IOException("Timeout(" + timeoutMs + "ms) waiting for response")); 595 } else { 596 super.userEventTriggered(ctx, evt); 597 } 598 } 599 } 600 601 private static final class SaslUnwrapHandler extends SimpleChannelInboundHandler<ByteBuf> { 602 603 private final SaslClient saslClient; 604 605 public SaslUnwrapHandler(SaslClient saslClient) { 606 this.saslClient = saslClient; 607 } 608 609 @Override 610 public void channelInactive(ChannelHandlerContext ctx) throws Exception { 611 saslClient.dispose(); 612 } 613 614 @Override 615 protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { 616 msg.skipBytes(4); 617 byte[] b = new byte[msg.readableBytes()]; 618 msg.readBytes(b); 619 ctx.fireChannelRead(Unpooled.wrappedBuffer(saslClient.unwrap(b, 0, b.length))); 620 } 621 } 622 623 private static final class SaslWrapHandler extends ChannelOutboundHandlerAdapter { 624 625 private final SaslClient saslClient; 626 627 private CompositeByteBuf cBuf; 628 629 public SaslWrapHandler(SaslClient saslClient) { 630 this.saslClient = saslClient; 631 } 632 633 @Override 634 public void handlerAdded(ChannelHandlerContext ctx) throws Exception { 635 cBuf = new CompositeByteBuf(ctx.alloc(), false, Integer.MAX_VALUE); 636 } 637 638 @Override 639 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) 640 throws Exception { 641 if (msg instanceof ByteBuf) { 642 ByteBuf buf = (ByteBuf) msg; 643 cBuf.addComponent(buf); 644 cBuf.writerIndex(cBuf.writerIndex() + buf.readableBytes()); 645 } else { 646 safeWrite(ctx, msg); 647 } 648 } 649 650 @Override 651 public void flush(ChannelHandlerContext ctx) throws Exception { 652 if (cBuf.isReadable()) { 653 byte[] b = new byte[cBuf.readableBytes()]; 654 cBuf.readBytes(b); 655 cBuf.discardReadComponents(); 656 byte[] wrapped = saslClient.wrap(b, 0, b.length); 657 ByteBuf buf = ctx.alloc().ioBuffer(4 + wrapped.length); 658 buf.writeInt(wrapped.length); 659 buf.writeBytes(wrapped); 660 safeWrite(ctx, buf); 661 } 662 ctx.flush(); 663 } 664 665 @Override 666 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { 667 // Release buffer on removal. 668 cBuf.release(); 669 cBuf = null; 670 } 671 } 672 673 private static final class DecryptHandler extends SimpleChannelInboundHandler<ByteBuf> { 674 675 private final Decryptor decryptor; 676 677 public DecryptHandler(CryptoCodec codec, byte[] key, byte[] iv) 678 throws GeneralSecurityException, IOException { 679 this.decryptor = codec.createDecryptor(); 680 this.decryptor.init(key, Arrays.copyOf(iv, iv.length)); 681 } 682 683 @Override 684 protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { 685 ByteBuf inBuf; 686 boolean release = false; 687 if (msg.nioBufferCount() == 1) { 688 inBuf = msg; 689 } else { 690 inBuf = ctx.alloc().directBuffer(msg.readableBytes()); 691 msg.readBytes(inBuf); 692 release = true; 693 } 694 ByteBuffer inBuffer = inBuf.nioBuffer(); 695 ByteBuf outBuf = ctx.alloc().directBuffer(inBuf.readableBytes()); 696 ByteBuffer outBuffer = outBuf.nioBuffer(0, inBuf.readableBytes()); 697 decryptor.decrypt(inBuffer, outBuffer); 698 outBuf.writerIndex(inBuf.readableBytes()); 699 if (release) { 700 inBuf.release(); 701 } 702 ctx.fireChannelRead(outBuf); 703 } 704 } 705 706 private static final class EncryptHandler extends MessageToByteEncoder<ByteBuf> { 707 708 private final Encryptor encryptor; 709 710 public EncryptHandler(CryptoCodec codec, byte[] key, byte[] iv) 711 throws GeneralSecurityException, IOException { 712 this.encryptor = codec.createEncryptor(); 713 this.encryptor.init(key, Arrays.copyOf(iv, iv.length)); 714 } 715 716 @Override 717 protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, ByteBuf msg, boolean preferDirect) 718 throws Exception { 719 if (preferDirect) { 720 return ctx.alloc().directBuffer(msg.readableBytes()); 721 } else { 722 return ctx.alloc().buffer(msg.readableBytes()); 723 } 724 } 725 726 @Override 727 protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception { 728 ByteBuf inBuf; 729 boolean release = false; 730 if (msg.nioBufferCount() == 1) { 731 inBuf = msg; 732 } else { 733 inBuf = ctx.alloc().directBuffer(msg.readableBytes()); 734 msg.readBytes(inBuf); 735 release = true; 736 } 737 ByteBuffer inBuffer = inBuf.nioBuffer(); 738 ByteBuffer outBuffer = out.nioBuffer(0, inBuf.readableBytes()); 739 encryptor.encrypt(inBuffer, outBuffer); 740 out.writerIndex(inBuf.readableBytes()); 741 if (release) { 742 inBuf.release(); 743 } 744 } 745 } 746 747 private static String getUserNameFromEncryptionKey(DataEncryptionKey encryptionKey) { 748 return encryptionKey.keyId + NAME_DELIMITER + encryptionKey.blockPoolId + NAME_DELIMITER 749 + Base64.getEncoder().encodeToString(encryptionKey.nonce); 750 } 751 752 private static char[] encryptionKeyToPassword(byte[] encryptionKey) { 753 return Base64.getEncoder().encodeToString(encryptionKey).toCharArray(); 754 } 755 756 private static String buildUsername(Token<BlockTokenIdentifier> blockToken) { 757 return Base64.getEncoder().encodeToString(blockToken.getIdentifier()); 758 } 759 760 private static char[] buildClientPassword(Token<BlockTokenIdentifier> blockToken) { 761 return Base64.getEncoder().encodeToString(blockToken.getPassword()).toCharArray(); 762 } 763 764 private static Map<String, String> createSaslPropertiesForEncryption(String encryptionAlgorithm) { 765 Map<String, String> saslProps = Maps.newHashMapWithExpectedSize(3); 766 saslProps.put(Sasl.QOP, QualityOfProtection.PRIVACY.getSaslQop()); 767 saslProps.put(Sasl.SERVER_AUTH, "true"); 768 saslProps.put("com.sun.security.sasl.digest.cipher", encryptionAlgorithm); 769 return saslProps; 770 } 771 772 private static void doSaslNegotiation(Configuration conf, Channel channel, int timeoutMs, 773 String username, char[] password, Map<String, String> saslProps, Promise<Void> saslPromise, 774 DFSClient dfsClient) { 775 try { 776 channel.pipeline().addLast(new IdleStateHandler(timeoutMs, 0, 0, TimeUnit.MILLISECONDS), 777 new ProtobufVarint32FrameDecoder(), 778 new ProtobufDecoder(DataTransferEncryptorMessageProto.getDefaultInstance()), 779 new SaslNegotiateHandler(conf, username, password, saslProps, timeoutMs, saslPromise, 780 dfsClient)); 781 } catch (SaslException e) { 782 saslPromise.tryFailure(e); 783 } 784 } 785 786 static void trySaslNegotiate(Configuration conf, Channel channel, DatanodeInfo dnInfo, 787 int timeoutMs, DFSClient client, Token<BlockTokenIdentifier> accessToken, 788 Promise<Void> saslPromise) throws IOException { 789 SaslDataTransferClient saslClient = client.getSaslDataTransferClient(); 790 SaslPropertiesResolver saslPropsResolver = SASL_ADAPTOR.getSaslPropsResolver(saslClient); 791 TrustedChannelResolver trustedChannelResolver = 792 SASL_ADAPTOR.getTrustedChannelResolver(saslClient); 793 AtomicBoolean fallbackToSimpleAuth = SASL_ADAPTOR.getFallbackToSimpleAuth(saslClient); 794 InetAddress addr = ((InetSocketAddress) channel.remoteAddress()).getAddress(); 795 if (trustedChannelResolver.isTrusted() || trustedChannelResolver.isTrusted(addr)) { 796 saslPromise.trySuccess(null); 797 return; 798 } 799 DataEncryptionKey encryptionKey = client.newDataEncryptionKey(); 800 if (encryptionKey != null) { 801 if (LOG.isDebugEnabled()) { 802 LOG.debug( 803 "SASL client doing encrypted handshake for addr = " + addr + ", datanodeId = " + dnInfo); 804 } 805 doSaslNegotiation(conf, channel, timeoutMs, getUserNameFromEncryptionKey(encryptionKey), 806 encryptionKeyToPassword(encryptionKey.encryptionKey), 807 createSaslPropertiesForEncryption(encryptionKey.encryptionAlgorithm), saslPromise, client); 808 } else if (!UserGroupInformation.isSecurityEnabled()) { 809 if (LOG.isDebugEnabled()) { 810 LOG.debug("SASL client skipping handshake in unsecured configuration for addr = " + addr 811 + ", datanodeId = " + dnInfo); 812 } 813 saslPromise.trySuccess(null); 814 } else if (dnInfo.getXferPort() < 1024) { 815 if (LOG.isDebugEnabled()) { 816 LOG.debug("SASL client skipping handshake in secured configuration with " 817 + "privileged port for addr = " + addr + ", datanodeId = " + dnInfo); 818 } 819 saslPromise.trySuccess(null); 820 } else if (fallbackToSimpleAuth != null && fallbackToSimpleAuth.get()) { 821 if (LOG.isDebugEnabled()) { 822 LOG.debug("SASL client skipping handshake in secured configuration with " 823 + "unsecured cluster for addr = " + addr + ", datanodeId = " + dnInfo); 824 } 825 saslPromise.trySuccess(null); 826 } else if (saslPropsResolver != null) { 827 if (LOG.isDebugEnabled()) { 828 LOG.debug( 829 "SASL client doing general handshake for addr = " + addr + ", datanodeId = " + dnInfo); 830 } 831 doSaslNegotiation(conf, channel, timeoutMs, buildUsername(accessToken), 832 buildClientPassword(accessToken), saslPropsResolver.getClientProperties(addr), saslPromise, 833 client); 834 } else { 835 // It's a secured cluster using non-privileged ports, but no SASL. The only way this can 836 // happen is if the DataNode has ignore.secure.ports.for.testing configured, so this is a rare 837 // edge case. 838 if (LOG.isDebugEnabled()) { 839 LOG.debug("SASL client skipping handshake in secured configuration with no SASL " 840 + "protection configured for addr = " + addr + ", datanodeId = " + dnInfo); 841 } 842 saslPromise.trySuccess(null); 843 } 844 } 845 846 static Encryptor createEncryptor(Configuration conf, HdfsFileStatus stat, DFSClient client) 847 throws IOException { 848 FileEncryptionInfo feInfo = stat.getFileEncryptionInfo(); 849 if (feInfo == null) { 850 return null; 851 } 852 return TRANSPARENT_CRYPTO_HELPER.createEncryptor(conf, feInfo, client); 853 } 854}