001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.security; 020 021import java.io.BufferedInputStream; 022import java.io.BufferedOutputStream; 023import java.io.DataInputStream; 024import java.io.DataOutputStream; 025import java.io.FilterInputStream; 026import java.io.FilterOutputStream; 027import java.io.IOException; 028import java.io.InputStream; 029import java.io.OutputStream; 030import java.nio.ByteBuffer; 031 032import javax.security.sasl.Sasl; 033import javax.security.sasl.SaslException; 034 035import org.apache.hadoop.conf.Configuration; 036import org.apache.yetus.audience.InterfaceAudience; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039import org.apache.hadoop.hbase.io.crypto.aes.CryptoAES; 040import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; 041import org.apache.hadoop.io.WritableUtils; 042import org.apache.hadoop.ipc.RemoteException; 043import org.apache.hadoop.security.SaslInputStream; 044import org.apache.hadoop.security.SaslOutputStream; 045import org.apache.hadoop.security.token.Token; 046import org.apache.hadoop.security.token.TokenIdentifier; 047 048/** 049 * A utility class that encapsulates SASL logic for RPC client. Copied from 050 * <code>org.apache.hadoop.security</code> 051 */ 052@InterfaceAudience.Private 053public class HBaseSaslRpcClient extends AbstractHBaseSaslRpcClient { 054 055 private static final Logger LOG = LoggerFactory.getLogger(HBaseSaslRpcClient.class); 056 private boolean cryptoAesEnable; 057 private CryptoAES cryptoAES; 058 private InputStream saslInputStream; 059 private InputStream cryptoInputStream; 060 private OutputStream saslOutputStream; 061 private OutputStream cryptoOutputStream; 062 private boolean initStreamForCrypto; 063 064 public HBaseSaslRpcClient(AuthMethod method, Token<? extends TokenIdentifier> token, 065 String serverPrincipal, boolean fallbackAllowed) throws IOException { 066 super(method, token, serverPrincipal, fallbackAllowed); 067 } 068 069 public HBaseSaslRpcClient(AuthMethod method, Token<? extends TokenIdentifier> token, 070 String serverPrincipal, boolean fallbackAllowed, String rpcProtection, 071 boolean initStreamForCrypto) throws IOException { 072 super(method, token, serverPrincipal, fallbackAllowed, rpcProtection); 073 this.initStreamForCrypto = initStreamForCrypto; 074 } 075 076 private static void readStatus(DataInputStream inStream) throws IOException { 077 int status = inStream.readInt(); // read status 078 if (status != SaslStatus.SUCCESS.state) { 079 throw new RemoteException(WritableUtils.readString(inStream), 080 WritableUtils.readString(inStream)); 081 } 082 } 083 084 /** 085 * Do client side SASL authentication with server via the given InputStream and OutputStream 086 * @param inS InputStream to use 087 * @param outS OutputStream to use 088 * @return true if connection is set up, or false if needs to switch to simple Auth. 089 * @throws IOException 090 */ 091 public boolean saslConnect(InputStream inS, OutputStream outS) throws IOException { 092 DataInputStream inStream = new DataInputStream(new BufferedInputStream(inS)); 093 DataOutputStream outStream = new DataOutputStream(new BufferedOutputStream(outS)); 094 095 try { 096 byte[] saslToken = getInitialResponse(); 097 if (saslToken != null) { 098 outStream.writeInt(saslToken.length); 099 outStream.write(saslToken, 0, saslToken.length); 100 outStream.flush(); 101 if (LOG.isDebugEnabled()) { 102 LOG.debug("Have sent token of size " + saslToken.length + " from initSASLContext."); 103 } 104 } 105 if (!isComplete()) { 106 readStatus(inStream); 107 int len = inStream.readInt(); 108 if (len == SaslUtil.SWITCH_TO_SIMPLE_AUTH) { 109 if (!fallbackAllowed) { 110 throw new IOException("Server asks us to fall back to SIMPLE auth, " 111 + "but this client is configured to only allow secure connections."); 112 } 113 if (LOG.isDebugEnabled()) { 114 LOG.debug("Server asks us to fall back to simple auth."); 115 } 116 dispose(); 117 return false; 118 } 119 saslToken = new byte[len]; 120 if (LOG.isDebugEnabled()) { 121 LOG.debug("Will read input token of size " + saslToken.length 122 + " for processing by initSASLContext"); 123 } 124 inStream.readFully(saslToken); 125 } 126 127 while (!isComplete()) { 128 saslToken = evaluateChallenge(saslToken); 129 if (saslToken != null) { 130 if (LOG.isDebugEnabled()) { 131 LOG.debug("Will send token of size " + saslToken.length + " from initSASLContext."); 132 } 133 outStream.writeInt(saslToken.length); 134 outStream.write(saslToken, 0, saslToken.length); 135 outStream.flush(); 136 } 137 if (!isComplete()) { 138 readStatus(inStream); 139 saslToken = new byte[inStream.readInt()]; 140 if (LOG.isDebugEnabled()) { 141 LOG.debug("Will read input token of size " + saslToken.length 142 + " for processing by initSASLContext"); 143 } 144 inStream.readFully(saslToken); 145 } 146 } 147 if (LOG.isDebugEnabled()) { 148 LOG.debug("SASL client context established. Negotiated QoP: " 149 + saslClient.getNegotiatedProperty(Sasl.QOP)); 150 } 151 // initial the inputStream, outputStream for both Sasl encryption 152 // and Crypto AES encryption if necessary 153 // if Crypto AES encryption enabled, the saslInputStream/saslOutputStream is 154 // only responsible for connection header negotiation, 155 // cryptoInputStream/cryptoOutputStream is responsible for rpc encryption with Crypto AES 156 saslInputStream = new SaslInputStream(inS, saslClient); 157 saslOutputStream = new SaslOutputStream(outS, saslClient); 158 if (initStreamForCrypto) { 159 cryptoInputStream = new WrappedInputStream(inS); 160 cryptoOutputStream = new WrappedOutputStream(outS); 161 } 162 163 return true; 164 } catch (IOException e) { 165 try { 166 saslClient.dispose(); 167 } catch (SaslException ignored) { 168 // ignore further exceptions during cleanup 169 } 170 throw e; 171 } 172 } 173 174 public String getSaslQOP() { 175 return (String) saslClient.getNegotiatedProperty(Sasl.QOP); 176 } 177 178 public void initCryptoCipher(RPCProtos.CryptoCipherMeta cryptoCipherMeta, 179 Configuration conf) throws IOException { 180 // create SaslAES for client 181 cryptoAES = EncryptionUtil.createCryptoAES(cryptoCipherMeta, conf); 182 cryptoAesEnable = true; 183 } 184 185 /** 186 * Get a SASL wrapped InputStream. Can be called only after saslConnect() has been called. 187 * @return a SASL wrapped InputStream 188 * @throws IOException 189 */ 190 public InputStream getInputStream() throws IOException { 191 if (!saslClient.isComplete()) { 192 throw new IOException("Sasl authentication exchange hasn't completed yet"); 193 } 194 // If Crypto AES is enabled, return cryptoInputStream which unwrap the data with Crypto AES. 195 if (cryptoAesEnable && cryptoInputStream != null) { 196 return cryptoInputStream; 197 } 198 return saslInputStream; 199 } 200 201 class WrappedInputStream extends FilterInputStream { 202 private ByteBuffer unwrappedRpcBuffer = ByteBuffer.allocate(0); 203 public WrappedInputStream(InputStream in) throws IOException { 204 super(in); 205 } 206 207 @Override 208 public int read() throws IOException { 209 byte[] b = new byte[1]; 210 int n = read(b, 0, 1); 211 return (n != -1) ? b[0] : -1; 212 } 213 214 @Override 215 public int read(byte b[]) throws IOException { 216 return read(b, 0, b.length); 217 } 218 219 @Override 220 public synchronized int read(byte[] buf, int off, int len) throws IOException { 221 // fill the buffer with the next RPC message 222 if (unwrappedRpcBuffer.remaining() == 0) { 223 readNextRpcPacket(); 224 } 225 // satisfy as much of the request as possible 226 int readLen = Math.min(len, unwrappedRpcBuffer.remaining()); 227 unwrappedRpcBuffer.get(buf, off, readLen); 228 return readLen; 229 } 230 231 // unwrap messages with Crypto AES 232 private void readNextRpcPacket() throws IOException { 233 LOG.debug("reading next wrapped RPC packet"); 234 DataInputStream dis = new DataInputStream(in); 235 int rpcLen = dis.readInt(); 236 byte[] rpcBuf = new byte[rpcLen]; 237 dis.readFully(rpcBuf); 238 239 // unwrap with Crypto AES 240 rpcBuf = cryptoAES.unwrap(rpcBuf, 0, rpcBuf.length); 241 if (LOG.isDebugEnabled()) { 242 LOG.debug("unwrapping token of length:" + rpcBuf.length); 243 } 244 unwrappedRpcBuffer = ByteBuffer.wrap(rpcBuf); 245 } 246 } 247 248 /** 249 * Get a SASL wrapped OutputStream. Can be called only after saslConnect() has been called. 250 * @return a SASL wrapped OutputStream 251 * @throws IOException 252 */ 253 public OutputStream getOutputStream() throws IOException { 254 if (!saslClient.isComplete()) { 255 throw new IOException("Sasl authentication exchange hasn't completed yet"); 256 } 257 // If Crypto AES is enabled, return cryptoOutputStream which wrap the data with Crypto AES. 258 if (cryptoAesEnable && cryptoOutputStream != null) { 259 return cryptoOutputStream; 260 } 261 return saslOutputStream; 262 } 263 264 class WrappedOutputStream extends FilterOutputStream { 265 public WrappedOutputStream(OutputStream out) throws IOException { 266 super(out); 267 } 268 @Override 269 public void write(byte[] buf, int off, int len) throws IOException { 270 if (LOG.isDebugEnabled()) { 271 LOG.debug("wrapping token of length:" + len); 272 } 273 274 // wrap with Crypto AES 275 byte[] wrapped = cryptoAES.wrap(buf, off, len); 276 DataOutputStream dob = new DataOutputStream(out); 277 dob.writeInt(wrapped.length); 278 dob.write(wrapped, 0, wrapped.length); 279 dob.flush(); 280 } 281 } 282}