001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.security;
020
021import java.io.BufferedInputStream;
022import java.io.BufferedOutputStream;
023import java.io.DataInputStream;
024import java.io.DataOutputStream;
025import java.io.FilterInputStream;
026import java.io.FilterOutputStream;
027import java.io.IOException;
028import java.io.InputStream;
029import java.io.OutputStream;
030import java.nio.ByteBuffer;
031
032import javax.security.sasl.Sasl;
033import javax.security.sasl.SaslException;
034
035import org.apache.hadoop.conf.Configuration;
036import org.apache.yetus.audience.InterfaceAudience;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039import org.apache.hadoop.hbase.io.crypto.aes.CryptoAES;
040import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
041import org.apache.hadoop.io.WritableUtils;
042import org.apache.hadoop.ipc.RemoteException;
043import org.apache.hadoop.security.SaslInputStream;
044import org.apache.hadoop.security.SaslOutputStream;
045import org.apache.hadoop.security.token.Token;
046import org.apache.hadoop.security.token.TokenIdentifier;
047
048/**
049 * A utility class that encapsulates SASL logic for RPC client. Copied from
050 * <code>org.apache.hadoop.security</code>
051 */
052@InterfaceAudience.Private
053public class HBaseSaslRpcClient extends AbstractHBaseSaslRpcClient {
054
055  private static final Logger LOG = LoggerFactory.getLogger(HBaseSaslRpcClient.class);
056  private boolean cryptoAesEnable;
057  private CryptoAES cryptoAES;
058  private InputStream saslInputStream;
059  private InputStream cryptoInputStream;
060  private OutputStream saslOutputStream;
061  private OutputStream cryptoOutputStream;
062  private boolean initStreamForCrypto;
063
064  public HBaseSaslRpcClient(AuthMethod method, Token<? extends TokenIdentifier> token,
065      String serverPrincipal, boolean fallbackAllowed) throws IOException {
066    super(method, token, serverPrincipal, fallbackAllowed);
067  }
068
069  public HBaseSaslRpcClient(AuthMethod method, Token<? extends TokenIdentifier> token,
070      String serverPrincipal, boolean fallbackAllowed, String rpcProtection,
071      boolean initStreamForCrypto) throws IOException {
072    super(method, token, serverPrincipal, fallbackAllowed, rpcProtection);
073    this.initStreamForCrypto = initStreamForCrypto;
074  }
075
076  private static void readStatus(DataInputStream inStream) throws IOException {
077    int status = inStream.readInt(); // read status
078    if (status != SaslStatus.SUCCESS.state) {
079      throw new RemoteException(WritableUtils.readString(inStream),
080          WritableUtils.readString(inStream));
081    }
082  }
083
084  /**
085   * Do client side SASL authentication with server via the given InputStream and OutputStream
086   * @param inS InputStream to use
087   * @param outS OutputStream to use
088   * @return true if connection is set up, or false if needs to switch to simple Auth.
089   * @throws IOException
090   */
091  public boolean saslConnect(InputStream inS, OutputStream outS) throws IOException {
092    DataInputStream inStream = new DataInputStream(new BufferedInputStream(inS));
093    DataOutputStream outStream = new DataOutputStream(new BufferedOutputStream(outS));
094
095    try {
096      byte[] saslToken = getInitialResponse();
097      if (saslToken != null) {
098        outStream.writeInt(saslToken.length);
099        outStream.write(saslToken, 0, saslToken.length);
100        outStream.flush();
101        if (LOG.isDebugEnabled()) {
102          LOG.debug("Have sent token of size " + saslToken.length + " from initSASLContext.");
103        }
104      }
105      if (!isComplete()) {
106        readStatus(inStream);
107        int len = inStream.readInt();
108        if (len == SaslUtil.SWITCH_TO_SIMPLE_AUTH) {
109          if (!fallbackAllowed) {
110            throw new IOException("Server asks us to fall back to SIMPLE auth, "
111                + "but this client is configured to only allow secure connections.");
112          }
113          if (LOG.isDebugEnabled()) {
114            LOG.debug("Server asks us to fall back to simple auth.");
115          }
116          dispose();
117          return false;
118        }
119        saslToken = new byte[len];
120        if (LOG.isDebugEnabled()) {
121          LOG.debug("Will read input token of size " + saslToken.length
122              + " for processing by initSASLContext");
123        }
124        inStream.readFully(saslToken);
125      }
126
127      while (!isComplete()) {
128        saslToken = evaluateChallenge(saslToken);
129        if (saslToken != null) {
130          if (LOG.isDebugEnabled()) {
131            LOG.debug("Will send token of size " + saslToken.length + " from initSASLContext.");
132          }
133          outStream.writeInt(saslToken.length);
134          outStream.write(saslToken, 0, saslToken.length);
135          outStream.flush();
136        }
137        if (!isComplete()) {
138          readStatus(inStream);
139          saslToken = new byte[inStream.readInt()];
140          if (LOG.isDebugEnabled()) {
141            LOG.debug("Will read input token of size " + saslToken.length
142                + " for processing by initSASLContext");
143          }
144          inStream.readFully(saslToken);
145        }
146      }
147      if (LOG.isDebugEnabled()) {
148        LOG.debug("SASL client context established. Negotiated QoP: "
149            + saslClient.getNegotiatedProperty(Sasl.QOP));
150      }
151      // initial the inputStream, outputStream for both Sasl encryption
152      // and Crypto AES encryption if necessary
153      // if Crypto AES encryption enabled, the saslInputStream/saslOutputStream is
154      // only responsible for connection header negotiation,
155      // cryptoInputStream/cryptoOutputStream is responsible for rpc encryption with Crypto AES
156      saslInputStream = new SaslInputStream(inS, saslClient);
157      saslOutputStream = new SaslOutputStream(outS, saslClient);
158      if (initStreamForCrypto) {
159        cryptoInputStream = new WrappedInputStream(inS);
160        cryptoOutputStream = new WrappedOutputStream(outS);
161      }
162
163      return true;
164    } catch (IOException e) {
165      try {
166        saslClient.dispose();
167      } catch (SaslException ignored) {
168        // ignore further exceptions during cleanup
169      }
170      throw e;
171    }
172  }
173
174  public String getSaslQOP() {
175    return (String) saslClient.getNegotiatedProperty(Sasl.QOP);
176  }
177
178  public void initCryptoCipher(RPCProtos.CryptoCipherMeta cryptoCipherMeta,
179      Configuration conf) throws IOException {
180    // create SaslAES for client
181    cryptoAES = EncryptionUtil.createCryptoAES(cryptoCipherMeta, conf);
182    cryptoAesEnable = true;
183  }
184
185  /**
186   * Get a SASL wrapped InputStream. Can be called only after saslConnect() has been called.
187   * @return a SASL wrapped InputStream
188   * @throws IOException
189   */
190  public InputStream getInputStream() throws IOException {
191    if (!saslClient.isComplete()) {
192      throw new IOException("Sasl authentication exchange hasn't completed yet");
193    }
194    // If Crypto AES is enabled, return cryptoInputStream which unwrap the data with Crypto AES.
195    if (cryptoAesEnable && cryptoInputStream != null) {
196      return cryptoInputStream;
197    }
198    return saslInputStream;
199  }
200
201  class WrappedInputStream extends FilterInputStream {
202    private ByteBuffer unwrappedRpcBuffer = ByteBuffer.allocate(0);
203    public WrappedInputStream(InputStream in) throws IOException {
204      super(in);
205    }
206
207    @Override
208    public int read() throws IOException {
209      byte[] b = new byte[1];
210      int n = read(b, 0, 1);
211      return (n != -1) ? b[0] : -1;
212    }
213
214    @Override
215    public int read(byte b[]) throws IOException {
216      return read(b, 0, b.length);
217    }
218
219    @Override
220    public synchronized int read(byte[] buf, int off, int len) throws IOException {
221      // fill the buffer with the next RPC message
222      if (unwrappedRpcBuffer.remaining() == 0) {
223        readNextRpcPacket();
224      }
225      // satisfy as much of the request as possible
226      int readLen = Math.min(len, unwrappedRpcBuffer.remaining());
227      unwrappedRpcBuffer.get(buf, off, readLen);
228      return readLen;
229    }
230
231    // unwrap messages with Crypto AES
232    private void readNextRpcPacket() throws IOException {
233      LOG.debug("reading next wrapped RPC packet");
234      DataInputStream dis = new DataInputStream(in);
235      int rpcLen = dis.readInt();
236      byte[] rpcBuf = new byte[rpcLen];
237      dis.readFully(rpcBuf);
238
239      // unwrap with Crypto AES
240      rpcBuf = cryptoAES.unwrap(rpcBuf, 0, rpcBuf.length);
241      if (LOG.isDebugEnabled()) {
242        LOG.debug("unwrapping token of length:" + rpcBuf.length);
243      }
244      unwrappedRpcBuffer = ByteBuffer.wrap(rpcBuf);
245    }
246  }
247
248  /**
249   * Get a SASL wrapped OutputStream. Can be called only after saslConnect() has been called.
250   * @return a SASL wrapped OutputStream
251   * @throws IOException
252   */
253  public OutputStream getOutputStream() throws IOException {
254    if (!saslClient.isComplete()) {
255      throw new IOException("Sasl authentication exchange hasn't completed yet");
256    }
257    // If Crypto AES is enabled, return cryptoOutputStream which wrap the data with Crypto AES.
258    if (cryptoAesEnable && cryptoOutputStream != null) {
259      return cryptoOutputStream;
260    }
261    return saslOutputStream;
262  }
263
264  class WrappedOutputStream extends FilterOutputStream {
265    public WrappedOutputStream(OutputStream out) throws IOException {
266      super(out);
267    }
268    @Override
269    public void write(byte[] buf, int off, int len) throws IOException {
270      if (LOG.isDebugEnabled()) {
271        LOG.debug("wrapping token of length:" + len);
272      }
273
274      // wrap with Crypto AES
275      byte[] wrapped = cryptoAES.wrap(buf, off, len);
276      DataOutputStream dob = new DataOutputStream(out);
277      dob.writeInt(wrapped.length);
278      dob.write(wrapped, 0, wrapped.length);
279      dob.flush();
280    }
281  }
282}