001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.asyncfs; 019 020import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeWrite; 021import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY; 022import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE; 023 024import java.io.IOException; 025import java.lang.reflect.Constructor; 026import java.lang.reflect.Field; 027import java.lang.reflect.InvocationTargetException; 028import java.lang.reflect.Method; 029import java.net.InetAddress; 030import java.net.InetSocketAddress; 031import java.nio.ByteBuffer; 032import java.security.GeneralSecurityException; 033import java.util.Arrays; 034import java.util.Base64; 035import java.util.Collections; 036import java.util.List; 037import java.util.Map; 038import java.util.Set; 039import java.util.concurrent.TimeUnit; 040import java.util.concurrent.atomic.AtomicBoolean; 041import javax.security.auth.callback.Callback; 042import javax.security.auth.callback.CallbackHandler; 043import javax.security.auth.callback.NameCallback; 044import javax.security.auth.callback.PasswordCallback; 045import javax.security.auth.callback.UnsupportedCallbackException; 046import javax.security.sasl.RealmCallback; 047import javax.security.sasl.RealmChoiceCallback; 048import javax.security.sasl.Sasl; 049import javax.security.sasl.SaslClient; 050import javax.security.sasl.SaslException; 051import org.apache.commons.lang3.StringUtils; 052import org.apache.hadoop.conf.Configuration; 053import org.apache.hadoop.crypto.CipherOption; 054import org.apache.hadoop.crypto.CipherSuite; 055import org.apache.hadoop.crypto.CryptoCodec; 056import org.apache.hadoop.crypto.Decryptor; 057import org.apache.hadoop.crypto.Encryptor; 058import org.apache.hadoop.crypto.key.KeyProvider; 059import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion; 060import org.apache.hadoop.fs.FileEncryptionInfo; 061import org.apache.hadoop.hdfs.DFSClient; 062import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 063import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; 064import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; 065import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver; 066import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient; 067import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto; 068import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus; 069import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; 070import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; 071import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; 072import org.apache.hadoop.security.SaslPropertiesResolver; 073import org.apache.hadoop.security.SaslRpcServer.QualityOfProtection; 074import org.apache.hadoop.security.UserGroupInformation; 075import org.apache.hadoop.security.token.Token; 076import org.apache.yetus.audience.InterfaceAudience; 077import org.slf4j.Logger; 078import org.slf4j.LoggerFactory; 079 080import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 081import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet; 082import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 083import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream; 084import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 085import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream; 086import org.apache.hbase.thirdparty.io.netty.buffer.CompositeByteBuf; 087import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled; 088import org.apache.hbase.thirdparty.io.netty.channel.Channel; 089import org.apache.hbase.thirdparty.io.netty.channel.ChannelDuplexHandler; 090import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; 091import org.apache.hbase.thirdparty.io.netty.channel.ChannelOutboundHandlerAdapter; 092import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline; 093import org.apache.hbase.thirdparty.io.netty.channel.ChannelPromise; 094import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler; 095import org.apache.hbase.thirdparty.io.netty.handler.codec.LengthFieldBasedFrameDecoder; 096import org.apache.hbase.thirdparty.io.netty.handler.codec.MessageToByteEncoder; 097import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; 098import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent; 099import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler; 100import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise; 101 102/** 103 * Helper class for adding sasl support for {@link FanOutOneBlockAsyncDFSOutput}. 104 */ 105@InterfaceAudience.Private 106public final class FanOutOneBlockAsyncDFSOutputSaslHelper { 107 private static final Logger LOG = 108 LoggerFactory.getLogger(FanOutOneBlockAsyncDFSOutputSaslHelper.class); 109 110 private FanOutOneBlockAsyncDFSOutputSaslHelper() { 111 } 112 113 private static final String SERVER_NAME = "0"; 114 private static final String PROTOCOL = "hdfs"; 115 private static final String MECHANISM = "DIGEST-MD5"; 116 private static final int SASL_TRANSFER_MAGIC_NUMBER = 0xDEADBEEF; 117 private static final String NAME_DELIMITER = " "; 118 119 private interface SaslAdaptor { 120 121 TrustedChannelResolver getTrustedChannelResolver(SaslDataTransferClient saslClient); 122 123 SaslPropertiesResolver getSaslPropsResolver(SaslDataTransferClient saslClient); 124 125 AtomicBoolean getFallbackToSimpleAuth(SaslDataTransferClient saslClient); 126 } 127 128 private static final SaslAdaptor SASL_ADAPTOR; 129 130 private interface TransparentCryptoHelper { 131 132 Encryptor createEncryptor(Configuration conf, FileEncryptionInfo feInfo, DFSClient client) 133 throws IOException; 134 } 135 136 private static final TransparentCryptoHelper TRANSPARENT_CRYPTO_HELPER; 137 138 private static SaslAdaptor createSaslAdaptor() 139 throws NoSuchFieldException, NoSuchMethodException { 140 Field saslPropsResolverField = 141 SaslDataTransferClient.class.getDeclaredField("saslPropsResolver"); 142 saslPropsResolverField.setAccessible(true); 143 Field trustedChannelResolverField = 144 SaslDataTransferClient.class.getDeclaredField("trustedChannelResolver"); 145 trustedChannelResolverField.setAccessible(true); 146 Field fallbackToSimpleAuthField = 147 SaslDataTransferClient.class.getDeclaredField("fallbackToSimpleAuth"); 148 fallbackToSimpleAuthField.setAccessible(true); 149 return new SaslAdaptor() { 150 151 @Override 152 public TrustedChannelResolver getTrustedChannelResolver(SaslDataTransferClient saslClient) { 153 try { 154 return (TrustedChannelResolver) trustedChannelResolverField.get(saslClient); 155 } catch (IllegalAccessException e) { 156 throw new RuntimeException(e); 157 } 158 } 159 160 @Override 161 public SaslPropertiesResolver getSaslPropsResolver(SaslDataTransferClient saslClient) { 162 try { 163 return (SaslPropertiesResolver) saslPropsResolverField.get(saslClient); 164 } catch (IllegalAccessException e) { 165 throw new RuntimeException(e); 166 } 167 } 168 169 @Override 170 public AtomicBoolean getFallbackToSimpleAuth(SaslDataTransferClient saslClient) { 171 try { 172 return (AtomicBoolean) fallbackToSimpleAuthField.get(saslClient); 173 } catch (IllegalAccessException e) { 174 throw new RuntimeException(e); 175 } 176 } 177 }; 178 } 179 180 private static TransparentCryptoHelper createTransparentCryptoHelperWithoutHDFS12396() 181 throws NoSuchMethodException { 182 Method decryptEncryptedDataEncryptionKeyMethod = DFSClient.class 183 .getDeclaredMethod("decryptEncryptedDataEncryptionKey", FileEncryptionInfo.class); 184 decryptEncryptedDataEncryptionKeyMethod.setAccessible(true); 185 return new TransparentCryptoHelper() { 186 187 @Override 188 public Encryptor createEncryptor(Configuration conf, FileEncryptionInfo feInfo, 189 DFSClient client) throws IOException { 190 try { 191 KeyVersion decryptedKey = 192 (KeyVersion) decryptEncryptedDataEncryptionKeyMethod.invoke(client, feInfo); 193 CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf, feInfo.getCipherSuite()); 194 Encryptor encryptor = cryptoCodec.createEncryptor(); 195 encryptor.init(decryptedKey.getMaterial(), feInfo.getIV()); 196 return encryptor; 197 } catch (InvocationTargetException e) { 198 Throwables.propagateIfPossible(e.getTargetException(), IOException.class); 199 throw new RuntimeException(e.getTargetException()); 200 } catch (GeneralSecurityException e) { 201 throw new IOException(e); 202 } catch (IllegalAccessException e) { 203 throw new RuntimeException(e); 204 } 205 } 206 }; 207 } 208 209 private static TransparentCryptoHelper createTransparentCryptoHelperWithHDFS12396() 210 throws ClassNotFoundException, NoSuchMethodException { 211 Class<?> hdfsKMSUtilCls = Class.forName("org.apache.hadoop.hdfs.HdfsKMSUtil"); 212 Method decryptEncryptedDataEncryptionKeyMethod = hdfsKMSUtilCls.getDeclaredMethod( 213 "decryptEncryptedDataEncryptionKey", FileEncryptionInfo.class, KeyProvider.class); 214 decryptEncryptedDataEncryptionKeyMethod.setAccessible(true); 215 return new TransparentCryptoHelper() { 216 217 @Override 218 public Encryptor createEncryptor(Configuration conf, FileEncryptionInfo feInfo, 219 DFSClient client) throws IOException { 220 try { 221 KeyVersion decryptedKey = (KeyVersion) decryptEncryptedDataEncryptionKeyMethod 222 .invoke(null, feInfo, client.getKeyProvider()); 223 CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf, feInfo.getCipherSuite()); 224 Encryptor encryptor = cryptoCodec.createEncryptor(); 225 encryptor.init(decryptedKey.getMaterial(), feInfo.getIV()); 226 return encryptor; 227 } catch (InvocationTargetException e) { 228 Throwables.propagateIfPossible(e.getTargetException(), IOException.class); 229 throw new RuntimeException(e.getTargetException()); 230 } catch (GeneralSecurityException e) { 231 throw new IOException(e); 232 } catch (IllegalAccessException e) { 233 throw new RuntimeException(e); 234 } 235 } 236 }; 237 } 238 239 private static TransparentCryptoHelper createTransparentCryptoHelper() 240 throws NoSuchMethodException, ClassNotFoundException { 241 try { 242 return createTransparentCryptoHelperWithoutHDFS12396(); 243 } catch (NoSuchMethodException e) { 244 LOG.debug("No decryptEncryptedDataEncryptionKey method in DFSClient," 245 + " should be hadoop version with HDFS-12396", e); 246 } 247 return createTransparentCryptoHelperWithHDFS12396(); 248 } 249 250 static { 251 try { 252 SASL_ADAPTOR = createSaslAdaptor(); 253 TRANSPARENT_CRYPTO_HELPER = createTransparentCryptoHelper(); 254 } catch (Exception e) { 255 String msg = "Couldn't properly initialize access to HDFS internals. Please " 256 + "update your WAL Provider to not make use of the 'asyncfs' provider. See " 257 + "HBASE-16110 for more information."; 258 LOG.error(msg, e); 259 throw new Error(msg, e); 260 } 261 } 262 263 /** 264 * Sets user name and password when asked by the client-side SASL object. 265 */ 266 private static final class SaslClientCallbackHandler implements CallbackHandler { 267 268 private final char[] password; 269 private final String userName; 270 271 /** 272 * Creates a new SaslClientCallbackHandler. 273 * @param userName SASL user name 274 * @param password SASL password 275 */ 276 public SaslClientCallbackHandler(String userName, char[] password) { 277 this.password = password; 278 this.userName = userName; 279 } 280 281 @Override 282 public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException { 283 NameCallback nc = null; 284 PasswordCallback pc = null; 285 RealmCallback rc = null; 286 for (Callback callback : callbacks) { 287 if (callback instanceof RealmChoiceCallback) { 288 continue; 289 } else if (callback instanceof NameCallback) { 290 nc = (NameCallback) callback; 291 } else if (callback instanceof PasswordCallback) { 292 pc = (PasswordCallback) callback; 293 } else if (callback instanceof RealmCallback) { 294 rc = (RealmCallback) callback; 295 } else { 296 throw new UnsupportedCallbackException(callback, "Unrecognized SASL client callback"); 297 } 298 } 299 if (nc != null) { 300 nc.setName(userName); 301 } 302 if (pc != null) { 303 pc.setPassword(password); 304 } 305 if (rc != null) { 306 rc.setText(rc.getDefaultText()); 307 } 308 } 309 } 310 311 private static final class SaslNegotiateHandler extends ChannelDuplexHandler { 312 313 private final Configuration conf; 314 315 private final Map<String, String> saslProps; 316 317 private final SaslClient saslClient; 318 319 private final int timeoutMs; 320 321 private final Promise<Void> promise; 322 323 private final DFSClient dfsClient; 324 325 private int step = 0; 326 327 public SaslNegotiateHandler(Configuration conf, String username, char[] password, 328 Map<String, String> saslProps, int timeoutMs, Promise<Void> promise, DFSClient dfsClient) 329 throws SaslException { 330 this.conf = conf; 331 this.saslProps = saslProps; 332 this.saslClient = Sasl.createSaslClient(new String[] { MECHANISM }, username, PROTOCOL, 333 SERVER_NAME, saslProps, new SaslClientCallbackHandler(username, password)); 334 this.timeoutMs = timeoutMs; 335 this.promise = promise; 336 this.dfsClient = dfsClient; 337 } 338 339 private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload) throws IOException { 340 sendSaslMessage(ctx, payload, null); 341 } 342 343 private List<CipherOption> getCipherOptions() throws IOException { 344 // Negotiate cipher suites if configured. Currently, the only supported 345 // cipher suite is AES/CTR/NoPadding, but the protocol allows multiple 346 // values for future expansion. 347 String cipherSuites = conf.get(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY); 348 if (StringUtils.isBlank(cipherSuites)) { 349 return null; 350 } 351 if (!cipherSuites.equals(CipherSuite.AES_CTR_NOPADDING.getName())) { 352 throw new IOException(String.format("Invalid cipher suite, %s=%s", 353 DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuites)); 354 } 355 return Collections.singletonList(new CipherOption(CipherSuite.AES_CTR_NOPADDING)); 356 } 357 358 /** 359 * The asyncfs subsystem emulates a HDFS client by sending protobuf messages via netty. After 360 * Hadoop 3.3.0, the protobuf classes are relocated to org.apache.hadoop.thirdparty.protobuf.*. 361 * Use Reflection to check which ones to use. 362 */ 363 private static class BuilderPayloadSetter { 364 private static Method setPayloadMethod; 365 private static Constructor<?> constructor; 366 367 /** 368 * Create a ByteString from byte array without copying (wrap), and then set it as the payload 369 * for the builder. 370 * @param builder builder for HDFS DataTransferEncryptorMessage. 371 * @param payload byte array of payload. 372 */ 373 static void wrapAndSetPayload(DataTransferEncryptorMessageProto.Builder builder, 374 byte[] payload) throws IOException { 375 Object byteStringObject; 376 try { 377 // byteStringObject = new LiteralByteString(payload); 378 byteStringObject = constructor.newInstance(payload); 379 // builder.setPayload(byteStringObject); 380 setPayloadMethod.invoke(builder, constructor.getDeclaringClass().cast(byteStringObject)); 381 } catch (IllegalAccessException | InstantiationException e) { 382 throw new RuntimeException(e); 383 384 } catch (InvocationTargetException e) { 385 Throwables.propagateIfPossible(e.getTargetException(), IOException.class); 386 throw new RuntimeException(e.getTargetException()); 387 } 388 } 389 390 static { 391 Class<?> builderClass = DataTransferEncryptorMessageProto.Builder.class; 392 393 // Try the unrelocated ByteString 394 Class<?> byteStringClass; 395 try { 396 // See if it can load the relocated ByteString, which comes from hadoop-thirdparty. 397 byteStringClass = Class.forName("org.apache.hadoop.thirdparty.protobuf.ByteString"); 398 LOG.debug("Found relocated ByteString class from hadoop-thirdparty." 399 + " Assuming this is Hadoop 3.3.0+."); 400 } catch (ClassNotFoundException e) { 401 LOG.debug("Did not find relocated ByteString class from hadoop-thirdparty." 402 + " Assuming this is below Hadoop 3.3.0", e); 403 try { 404 byteStringClass = Class.forName("com.google.protobuf.ByteString"); 405 LOG.debug("com.google.protobuf.ByteString found."); 406 } catch (ClassNotFoundException ex) { 407 throw new RuntimeException(ex); 408 } 409 } 410 411 // LiteralByteString is a package private class in protobuf. Make it accessible. 412 Class<?> literalByteStringClass; 413 try { 414 literalByteStringClass = 415 Class.forName("org.apache.hadoop.thirdparty.protobuf.ByteString$LiteralByteString"); 416 LOG.debug("Shaded LiteralByteString from hadoop-thirdparty is found."); 417 } catch (ClassNotFoundException e) { 418 try { 419 literalByteStringClass = Class.forName("com.google.protobuf.LiteralByteString"); 420 LOG.debug("com.google.protobuf.LiteralByteString found."); 421 } catch (ClassNotFoundException ex) { 422 throw new RuntimeException(ex); 423 } 424 } 425 426 try { 427 constructor = literalByteStringClass.getDeclaredConstructor(byte[].class); 428 constructor.setAccessible(true); 429 } catch (NoSuchMethodException e) { 430 throw new RuntimeException(e); 431 } 432 433 try { 434 setPayloadMethod = builderClass.getMethod("setPayload", byteStringClass); 435 } catch (NoSuchMethodException e) { 436 // if either method is not found, we are in big trouble. Abort. 437 throw new RuntimeException(e); 438 } 439 } 440 } 441 442 private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload, 443 List<CipherOption> options) throws IOException { 444 DataTransferEncryptorMessageProto.Builder builder = 445 DataTransferEncryptorMessageProto.newBuilder(); 446 builder.setStatus(DataTransferEncryptorStatus.SUCCESS); 447 if (payload != null) { 448 BuilderPayloadSetter.wrapAndSetPayload(builder, payload); 449 } 450 if (options != null) { 451 builder.addAllCipherOption(PBHelperClient.convertCipherOptions(options)); 452 } 453 DataTransferEncryptorMessageProto proto = builder.build(); 454 int size = proto.getSerializedSize(); 455 size += CodedOutputStream.computeUInt32SizeNoTag(size); 456 ByteBuf buf = ctx.alloc().buffer(size); 457 proto.writeDelimitedTo(new ByteBufOutputStream(buf)); 458 safeWrite(ctx, buf); 459 } 460 461 @Override 462 public void handlerAdded(ChannelHandlerContext ctx) throws Exception { 463 safeWrite(ctx, ctx.alloc().buffer(4).writeInt(SASL_TRANSFER_MAGIC_NUMBER)); 464 sendSaslMessage(ctx, new byte[0]); 465 ctx.flush(); 466 step++; 467 } 468 469 @Override 470 public void channelInactive(ChannelHandlerContext ctx) throws Exception { 471 saslClient.dispose(); 472 } 473 474 private void check(DataTransferEncryptorMessageProto proto) throws IOException { 475 if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) { 476 dfsClient.clearDataEncryptionKey(); 477 throw new InvalidEncryptionKeyException(proto.getMessage()); 478 } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) { 479 throw new IOException(proto.getMessage()); 480 } 481 } 482 483 private String getNegotiatedQop() { 484 return (String) saslClient.getNegotiatedProperty(Sasl.QOP); 485 } 486 487 private boolean isNegotiatedQopPrivacy() { 488 String qop = getNegotiatedQop(); 489 return qop != null && "auth-conf".equalsIgnoreCase(qop); 490 } 491 492 private boolean requestedQopContainsPrivacy() { 493 Set<String> requestedQop = 494 ImmutableSet.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(","))); 495 return requestedQop.contains("auth-conf"); 496 } 497 498 private void checkSaslComplete() throws IOException { 499 if (!saslClient.isComplete()) { 500 throw new IOException("Failed to complete SASL handshake"); 501 } 502 Set<String> requestedQop = 503 ImmutableSet.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(","))); 504 String negotiatedQop = getNegotiatedQop(); 505 LOG.debug( 506 "Verifying QOP, requested QOP = " + requestedQop + ", negotiated QOP = " + negotiatedQop); 507 if (!requestedQop.contains(negotiatedQop)) { 508 throw new IOException(String.format("SASL handshake completed, but " 509 + "channel does not have acceptable quality of protection, " 510 + "requested = %s, negotiated = %s", requestedQop, negotiatedQop)); 511 } 512 } 513 514 private boolean useWrap() { 515 String qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP); 516 return qop != null && !"auth".equalsIgnoreCase(qop); 517 } 518 519 private CipherOption unwrap(CipherOption option, SaslClient saslClient) throws IOException { 520 byte[] inKey = option.getInKey(); 521 if (inKey != null) { 522 inKey = saslClient.unwrap(inKey, 0, inKey.length); 523 } 524 byte[] outKey = option.getOutKey(); 525 if (outKey != null) { 526 outKey = saslClient.unwrap(outKey, 0, outKey.length); 527 } 528 return new CipherOption(option.getCipherSuite(), inKey, option.getInIv(), outKey, 529 option.getOutIv()); 530 } 531 532 private CipherOption getCipherOption(DataTransferEncryptorMessageProto proto, 533 boolean isNegotiatedQopPrivacy, SaslClient saslClient) throws IOException { 534 List<CipherOption> cipherOptions = 535 PBHelperClient.convertCipherOptionProtos(proto.getCipherOptionList()); 536 if (cipherOptions == null || cipherOptions.isEmpty()) { 537 return null; 538 } 539 CipherOption cipherOption = cipherOptions.get(0); 540 return isNegotiatedQopPrivacy ? unwrap(cipherOption, saslClient) : cipherOption; 541 } 542 543 @Override 544 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 545 if (msg instanceof DataTransferEncryptorMessageProto) { 546 DataTransferEncryptorMessageProto proto = (DataTransferEncryptorMessageProto) msg; 547 check(proto); 548 byte[] challenge = proto.getPayload().toByteArray(); 549 byte[] response = saslClient.evaluateChallenge(challenge); 550 switch (step) { 551 case 1: { 552 List<CipherOption> cipherOptions = null; 553 if (requestedQopContainsPrivacy()) { 554 cipherOptions = getCipherOptions(); 555 } 556 sendSaslMessage(ctx, response, cipherOptions); 557 ctx.flush(); 558 step++; 559 break; 560 } 561 case 2: { 562 assert response == null; 563 checkSaslComplete(); 564 CipherOption cipherOption = 565 getCipherOption(proto, isNegotiatedQopPrivacy(), saslClient); 566 ChannelPipeline p = ctx.pipeline(); 567 while (p.first() != null) { 568 p.removeFirst(); 569 } 570 if (cipherOption != null) { 571 CryptoCodec codec = CryptoCodec.getInstance(conf, cipherOption.getCipherSuite()); 572 p.addLast(new EncryptHandler(codec, cipherOption.getInKey(), cipherOption.getInIv()), 573 new DecryptHandler(codec, cipherOption.getOutKey(), cipherOption.getOutIv())); 574 } else { 575 if (useWrap()) { 576 p.addLast(new SaslWrapHandler(saslClient), 577 new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4), 578 new SaslUnwrapHandler(saslClient)); 579 } 580 } 581 promise.trySuccess(null); 582 break; 583 } 584 default: 585 throw new IllegalArgumentException("Unrecognized negotiation step: " + step); 586 } 587 } else { 588 ctx.fireChannelRead(msg); 589 } 590 } 591 592 @Override 593 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 594 promise.tryFailure(cause); 595 } 596 597 @Override 598 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { 599 if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == READER_IDLE) { 600 promise.tryFailure(new IOException("Timeout(" + timeoutMs + "ms) waiting for response")); 601 } else { 602 super.userEventTriggered(ctx, evt); 603 } 604 } 605 } 606 607 private static final class SaslUnwrapHandler extends SimpleChannelInboundHandler<ByteBuf> { 608 609 private final SaslClient saslClient; 610 611 public SaslUnwrapHandler(SaslClient saslClient) { 612 this.saslClient = saslClient; 613 } 614 615 @Override 616 public void channelInactive(ChannelHandlerContext ctx) throws Exception { 617 saslClient.dispose(); 618 } 619 620 @Override 621 protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { 622 msg.skipBytes(4); 623 byte[] b = new byte[msg.readableBytes()]; 624 msg.readBytes(b); 625 ctx.fireChannelRead(Unpooled.wrappedBuffer(saslClient.unwrap(b, 0, b.length))); 626 } 627 } 628 629 private static final class SaslWrapHandler extends ChannelOutboundHandlerAdapter { 630 631 private final SaslClient saslClient; 632 633 private CompositeByteBuf cBuf; 634 635 public SaslWrapHandler(SaslClient saslClient) { 636 this.saslClient = saslClient; 637 } 638 639 @Override 640 public void handlerAdded(ChannelHandlerContext ctx) throws Exception { 641 cBuf = new CompositeByteBuf(ctx.alloc(), false, Integer.MAX_VALUE); 642 } 643 644 @Override 645 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) 646 throws Exception { 647 if (msg instanceof ByteBuf) { 648 ByteBuf buf = (ByteBuf) msg; 649 cBuf.addComponent(buf); 650 cBuf.writerIndex(cBuf.writerIndex() + buf.readableBytes()); 651 } else { 652 safeWrite(ctx, msg); 653 } 654 } 655 656 @Override 657 public void flush(ChannelHandlerContext ctx) throws Exception { 658 if (cBuf.isReadable()) { 659 byte[] b = new byte[cBuf.readableBytes()]; 660 cBuf.readBytes(b); 661 cBuf.discardReadComponents(); 662 byte[] wrapped = saslClient.wrap(b, 0, b.length); 663 ByteBuf buf = ctx.alloc().ioBuffer(4 + wrapped.length); 664 buf.writeInt(wrapped.length); 665 buf.writeBytes(wrapped); 666 safeWrite(ctx, buf); 667 } 668 ctx.flush(); 669 } 670 671 @Override 672 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { 673 // Release buffer on removal. 674 cBuf.release(); 675 cBuf = null; 676 } 677 } 678 679 private static final class DecryptHandler extends SimpleChannelInboundHandler<ByteBuf> { 680 681 private final Decryptor decryptor; 682 683 public DecryptHandler(CryptoCodec codec, byte[] key, byte[] iv) 684 throws GeneralSecurityException, IOException { 685 this.decryptor = codec.createDecryptor(); 686 this.decryptor.init(key, Arrays.copyOf(iv, iv.length)); 687 } 688 689 @Override 690 protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { 691 ByteBuf inBuf; 692 boolean release = false; 693 if (msg.nioBufferCount() == 1) { 694 inBuf = msg; 695 } else { 696 inBuf = ctx.alloc().directBuffer(msg.readableBytes()); 697 msg.readBytes(inBuf); 698 release = true; 699 } 700 ByteBuffer inBuffer = inBuf.nioBuffer(); 701 ByteBuf outBuf = ctx.alloc().directBuffer(inBuf.readableBytes()); 702 ByteBuffer outBuffer = outBuf.nioBuffer(0, inBuf.readableBytes()); 703 decryptor.decrypt(inBuffer, outBuffer); 704 outBuf.writerIndex(inBuf.readableBytes()); 705 if (release) { 706 inBuf.release(); 707 } 708 ctx.fireChannelRead(outBuf); 709 } 710 } 711 712 private static final class EncryptHandler extends MessageToByteEncoder<ByteBuf> { 713 714 private final Encryptor encryptor; 715 716 public EncryptHandler(CryptoCodec codec, byte[] key, byte[] iv) 717 throws GeneralSecurityException, IOException { 718 this.encryptor = codec.createEncryptor(); 719 this.encryptor.init(key, Arrays.copyOf(iv, iv.length)); 720 } 721 722 @Override 723 protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, ByteBuf msg, boolean preferDirect) 724 throws Exception { 725 if (preferDirect) { 726 return ctx.alloc().directBuffer(msg.readableBytes()); 727 } else { 728 return ctx.alloc().buffer(msg.readableBytes()); 729 } 730 } 731 732 @Override 733 protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception { 734 ByteBuf inBuf; 735 boolean release = false; 736 if (msg.nioBufferCount() == 1) { 737 inBuf = msg; 738 } else { 739 inBuf = ctx.alloc().directBuffer(msg.readableBytes()); 740 msg.readBytes(inBuf); 741 release = true; 742 } 743 ByteBuffer inBuffer = inBuf.nioBuffer(); 744 ByteBuffer outBuffer = out.nioBuffer(0, inBuf.readableBytes()); 745 encryptor.encrypt(inBuffer, outBuffer); 746 out.writerIndex(inBuf.readableBytes()); 747 if (release) { 748 inBuf.release(); 749 } 750 } 751 } 752 753 private static String getUserNameFromEncryptionKey(DataEncryptionKey encryptionKey) { 754 return encryptionKey.keyId + NAME_DELIMITER + encryptionKey.blockPoolId + NAME_DELIMITER 755 + Base64.getEncoder().encodeToString(encryptionKey.nonce); 756 } 757 758 private static char[] encryptionKeyToPassword(byte[] encryptionKey) { 759 return Base64.getEncoder().encodeToString(encryptionKey).toCharArray(); 760 } 761 762 private static String buildUsername(Token<BlockTokenIdentifier> blockToken) { 763 return Base64.getEncoder().encodeToString(blockToken.getIdentifier()); 764 } 765 766 private static char[] buildClientPassword(Token<BlockTokenIdentifier> blockToken) { 767 return Base64.getEncoder().encodeToString(blockToken.getPassword()).toCharArray(); 768 } 769 770 private static Map<String, String> createSaslPropertiesForEncryption(String encryptionAlgorithm) { 771 Map<String, String> saslProps = Maps.newHashMapWithExpectedSize(3); 772 saslProps.put(Sasl.QOP, QualityOfProtection.PRIVACY.getSaslQop()); 773 saslProps.put(Sasl.SERVER_AUTH, "true"); 774 saslProps.put("com.sun.security.sasl.digest.cipher", encryptionAlgorithm); 775 return saslProps; 776 } 777 778 private static void doSaslNegotiation(Configuration conf, Channel channel, int timeoutMs, 779 String username, char[] password, Map<String, String> saslProps, Promise<Void> saslPromise, 780 DFSClient dfsClient) { 781 try { 782 channel.pipeline().addLast(new IdleStateHandler(timeoutMs, 0, 0, TimeUnit.MILLISECONDS), 783 new ProtobufVarint32FrameDecoder(), 784 new ProtobufDecoder(DataTransferEncryptorMessageProto.getDefaultInstance()), 785 new SaslNegotiateHandler(conf, username, password, saslProps, timeoutMs, saslPromise, 786 dfsClient)); 787 } catch (SaslException e) { 788 saslPromise.tryFailure(e); 789 } 790 } 791 792 static void trySaslNegotiate(Configuration conf, Channel channel, DatanodeInfo dnInfo, 793 int timeoutMs, DFSClient client, Token<BlockTokenIdentifier> accessToken, 794 Promise<Void> saslPromise) throws IOException { 795 SaslDataTransferClient saslClient = client.getSaslDataTransferClient(); 796 SaslPropertiesResolver saslPropsResolver = SASL_ADAPTOR.getSaslPropsResolver(saslClient); 797 TrustedChannelResolver trustedChannelResolver = 798 SASL_ADAPTOR.getTrustedChannelResolver(saslClient); 799 AtomicBoolean fallbackToSimpleAuth = SASL_ADAPTOR.getFallbackToSimpleAuth(saslClient); 800 InetAddress addr = ((InetSocketAddress) channel.remoteAddress()).getAddress(); 801 if (trustedChannelResolver.isTrusted() || trustedChannelResolver.isTrusted(addr)) { 802 saslPromise.trySuccess(null); 803 return; 804 } 805 DataEncryptionKey encryptionKey = client.newDataEncryptionKey(); 806 if (encryptionKey != null) { 807 if (LOG.isDebugEnabled()) { 808 LOG.debug( 809 "SASL client doing encrypted handshake for addr = " + addr + ", datanodeId = " + dnInfo); 810 } 811 doSaslNegotiation(conf, channel, timeoutMs, getUserNameFromEncryptionKey(encryptionKey), 812 encryptionKeyToPassword(encryptionKey.encryptionKey), 813 createSaslPropertiesForEncryption(encryptionKey.encryptionAlgorithm), saslPromise, client); 814 } else if (!UserGroupInformation.isSecurityEnabled()) { 815 if (LOG.isDebugEnabled()) { 816 LOG.debug("SASL client skipping handshake in unsecured configuration for addr = " + addr 817 + ", datanodeId = " + dnInfo); 818 } 819 saslPromise.trySuccess(null); 820 } else if (dnInfo.getXferPort() < 1024) { 821 if (LOG.isDebugEnabled()) { 822 LOG.debug("SASL client skipping handshake in secured configuration with " 823 + "privileged port for addr = " + addr + ", datanodeId = " + dnInfo); 824 } 825 saslPromise.trySuccess(null); 826 } else if (fallbackToSimpleAuth != null && fallbackToSimpleAuth.get()) { 827 if (LOG.isDebugEnabled()) { 828 LOG.debug("SASL client skipping handshake in secured configuration with " 829 + "unsecured cluster for addr = " + addr + ", datanodeId = " + dnInfo); 830 } 831 saslPromise.trySuccess(null); 832 } else if (saslPropsResolver != null) { 833 if (LOG.isDebugEnabled()) { 834 LOG.debug( 835 "SASL client doing general handshake for addr = " + addr + ", datanodeId = " + dnInfo); 836 } 837 doSaslNegotiation(conf, channel, timeoutMs, buildUsername(accessToken), 838 buildClientPassword(accessToken), saslPropsResolver.getClientProperties(addr), saslPromise, 839 client); 840 } else { 841 // It's a secured cluster using non-privileged ports, but no SASL. The only way this can 842 // happen is if the DataNode has ignore.secure.ports.for.testing configured, so this is a rare 843 // edge case. 844 if (LOG.isDebugEnabled()) { 845 LOG.debug("SASL client skipping handshake in secured configuration with no SASL " 846 + "protection configured for addr = " + addr + ", datanodeId = " + dnInfo); 847 } 848 saslPromise.trySuccess(null); 849 } 850 } 851 852 static Encryptor createEncryptor(Configuration conf, HdfsFileStatus stat, DFSClient client) 853 throws IOException { 854 FileEncryptionInfo feInfo = stat.getFileEncryptionInfo(); 855 if (feInfo == null) { 856 return null; 857 } 858 return TRANSPARENT_CRYPTO_HELPER.createEncryptor(conf, feInfo, client); 859 } 860}