001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import static org.apache.hadoop.hbase.HConstants.RPC_HEADER;
021
022import java.io.ByteArrayInputStream;
023import java.io.Closeable;
024import java.io.DataOutputStream;
025import java.io.IOException;
026import java.net.InetAddress;
027import java.net.InetSocketAddress;
028import java.nio.ByteBuffer;
029import java.nio.channels.Channels;
030import java.nio.channels.ReadableByteChannel;
031import java.security.GeneralSecurityException;
032import java.util.Properties;
033
034import org.apache.commons.crypto.cipher.CryptoCipherFactory;
035import org.apache.commons.crypto.random.CryptoRandom;
036import org.apache.commons.crypto.random.CryptoRandomFactory;
037import org.apache.hadoop.hbase.CellScanner;
038import org.apache.hadoop.hbase.DoNotRetryIOException;
039import org.apache.yetus.audience.InterfaceAudience;
040import org.apache.hadoop.hbase.client.VersionInfoUtil;
041import org.apache.hadoop.hbase.codec.Codec;
042import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
043import org.apache.hadoop.hbase.io.crypto.aes.CryptoAES;
044import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
045import org.apache.hadoop.hbase.nio.ByteBuff;
046import org.apache.hadoop.hbase.nio.SingleByteBuff;
047import org.apache.hadoop.hbase.security.AccessDeniedException;
048import org.apache.hadoop.hbase.security.AuthMethod;
049import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
050import org.apache.hadoop.hbase.security.SaslStatus;
051import org.apache.hadoop.hbase.security.SaslUtil;
052import org.apache.hadoop.hbase.security.User;
053import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
054import org.apache.hbase.thirdparty.com.google.protobuf.ByteInput;
055import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
056import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream;
057import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
058import org.apache.hbase.thirdparty.com.google.protobuf.Message;
059import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
060import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
061import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
062import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo;
063import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
064import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
065import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
066import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader;
067import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation;
068import org.apache.hadoop.hbase.util.Bytes;
069import org.apache.hadoop.io.BytesWritable;
070import org.apache.hadoop.io.IntWritable;
071import org.apache.hadoop.io.Writable;
072import org.apache.hadoop.io.WritableUtils;
073import org.apache.hadoop.io.compress.CompressionCodec;
074import org.apache.hadoop.security.UserGroupInformation;
075import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
076import org.apache.hadoop.security.authorize.AuthorizationException;
077import org.apache.hadoop.security.authorize.ProxyUsers;
078import org.apache.hadoop.security.token.SecretManager.InvalidToken;
079import org.apache.hadoop.security.token.TokenIdentifier;
080
081/** Reads calls from a connection and queues them for handling. */
082@edu.umd.cs.findbugs.annotations.SuppressWarnings(
083    value="VO_VOLATILE_INCREMENT",
084    justification="False positive according to http://sourceforge.net/p/findbugs/bugs/1032/")
085@InterfaceAudience.Private
086abstract class ServerRpcConnection implements Closeable {
087  /**  */
088  protected final RpcServer rpcServer;
089  // If the connection header has been read or not.
090  protected boolean connectionHeaderRead = false;
091
092  protected CallCleanup callCleanup;
093
094  // Cache the remote host & port info so that even if the socket is
095  // disconnected, we can say where it used to connect to.
096  protected String hostAddress;
097  protected int remotePort;
098  protected InetAddress addr;
099  protected ConnectionHeader connectionHeader;
100
101  /**
102   * Codec the client asked use.
103   */
104  protected Codec codec;
105  /**
106   * Compression codec the client asked us use.
107   */
108  protected CompressionCodec compressionCodec;
109  protected BlockingService service;
110
111  protected AuthMethod authMethod;
112  protected boolean saslContextEstablished;
113  protected boolean skipInitialSaslHandshake;
114  private ByteBuffer unwrappedData;
115  // When is this set? FindBugs wants to know! Says NP
116  private ByteBuffer unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
117  protected boolean useSasl;
118  protected HBaseSaslRpcServer saslServer;
119  protected CryptoAES cryptoAES;
120  protected boolean useWrap = false;
121  protected boolean useCryptoAesWrap = false;
122
123  // was authentication allowed with a fallback to simple auth
124  protected boolean authenticatedWithFallback;
125
126  protected boolean retryImmediatelySupported = false;
127
128  protected User user = null;
129  protected UserGroupInformation ugi = null;
130
131  public ServerRpcConnection(RpcServer rpcServer) {
132    this.rpcServer = rpcServer;
133    this.callCleanup = null;
134  }
135
136  @Override
137  public String toString() {
138    return getHostAddress() + ":" + remotePort;
139  }
140
141  public String getHostAddress() {
142    return hostAddress;
143  }
144
145  public InetAddress getHostInetAddress() {
146    return addr;
147  }
148
149  public int getRemotePort() {
150    return remotePort;
151  }
152
153  public VersionInfo getVersionInfo() {
154    if (connectionHeader.hasVersionInfo()) {
155      return connectionHeader.getVersionInfo();
156    }
157    return null;
158  }
159
160  private String getFatalConnectionString(final int version, final byte authByte) {
161    return "serverVersion=" + RpcServer.CURRENT_VERSION +
162    ", clientVersion=" + version + ", authMethod=" + authByte +
163    ", authSupported=" + (authMethod != null) + " from " + toString();
164  }
165
166  private UserGroupInformation getAuthorizedUgi(String authorizedId)
167      throws IOException {
168    UserGroupInformation authorizedUgi;
169    if (authMethod == AuthMethod.DIGEST) {
170      TokenIdentifier tokenId = HBaseSaslRpcServer.getIdentifier(authorizedId,
171          this.rpcServer.secretManager);
172      authorizedUgi = tokenId.getUser();
173      if (authorizedUgi == null) {
174        throw new AccessDeniedException(
175            "Can't retrieve username from tokenIdentifier.");
176      }
177      authorizedUgi.addTokenIdentifier(tokenId);
178    } else {
179      authorizedUgi = UserGroupInformation.createRemoteUser(authorizedId);
180    }
181    authorizedUgi.setAuthenticationMethod(authMethod.authenticationMethod.getAuthMethod());
182    return authorizedUgi;
183  }
184
185  /**
186   * Set up cell block codecs
187   * @throws FatalConnectionException
188   */
189  private void setupCellBlockCodecs(final ConnectionHeader header)
190      throws FatalConnectionException {
191    // TODO: Plug in other supported decoders.
192    if (!header.hasCellBlockCodecClass()) return;
193    String className = header.getCellBlockCodecClass();
194    if (className == null || className.length() == 0) return;
195    try {
196      this.codec = (Codec)Class.forName(className).getDeclaredConstructor().newInstance();
197    } catch (Exception e) {
198      throw new UnsupportedCellCodecException(className, e);
199    }
200    if (!header.hasCellBlockCompressorClass()) return;
201    className = header.getCellBlockCompressorClass();
202    try {
203      this.compressionCodec =
204          (CompressionCodec)Class.forName(className).getDeclaredConstructor().newInstance();
205    } catch (Exception e) {
206      throw new UnsupportedCompressionCodecException(className, e);
207    }
208  }
209
210  /**
211   * Set up cipher for rpc encryption with Apache Commons Crypto
212   *
213   * @throws FatalConnectionException
214   */
215  private void setupCryptoCipher(final ConnectionHeader header,
216      RPCProtos.ConnectionHeaderResponse.Builder chrBuilder)
217      throws FatalConnectionException {
218    // If simple auth, return
219    if (saslServer == null) return;
220    // check if rpc encryption with Crypto AES
221    String qop = saslServer.getNegotiatedQop();
222    boolean isEncryption = SaslUtil.QualityOfProtection.PRIVACY
223        .getSaslQop().equalsIgnoreCase(qop);
224    boolean isCryptoAesEncryption = isEncryption && this.rpcServer.conf.getBoolean(
225        "hbase.rpc.crypto.encryption.aes.enabled", false);
226    if (!isCryptoAesEncryption) return;
227    if (!header.hasRpcCryptoCipherTransformation()) return;
228    String transformation = header.getRpcCryptoCipherTransformation();
229    if (transformation == null || transformation.length() == 0) return;
230     // Negotiates AES based on complete saslServer.
231     // The Crypto metadata need to be encrypted and send to client.
232    Properties properties = new Properties();
233    // the property for SecureRandomFactory
234    properties.setProperty(CryptoRandomFactory.CLASSES_KEY,
235        this.rpcServer.conf.get("hbase.crypto.sasl.encryption.aes.crypto.random",
236            "org.apache.commons.crypto.random.JavaCryptoRandom"));
237    // the property for cipher class
238    properties.setProperty(CryptoCipherFactory.CLASSES_KEY,
239        this.rpcServer.conf.get("hbase.rpc.crypto.encryption.aes.cipher.class",
240            "org.apache.commons.crypto.cipher.JceCipher"));
241
242    int cipherKeyBits = this.rpcServer.conf.getInt(
243        "hbase.rpc.crypto.encryption.aes.cipher.keySizeBits", 128);
244    // generate key and iv
245    if (cipherKeyBits % 8 != 0) {
246      throw new IllegalArgumentException("The AES cipher key size in bits" +
247          " should be a multiple of byte");
248    }
249    int len = cipherKeyBits / 8;
250    byte[] inKey = new byte[len];
251    byte[] outKey = new byte[len];
252    byte[] inIv = new byte[len];
253    byte[] outIv = new byte[len];
254
255    try {
256      // generate the cipher meta data with SecureRandom
257      CryptoRandom secureRandom = CryptoRandomFactory.getCryptoRandom(properties);
258      secureRandom.nextBytes(inKey);
259      secureRandom.nextBytes(outKey);
260      secureRandom.nextBytes(inIv);
261      secureRandom.nextBytes(outIv);
262
263      // create CryptoAES for server
264      cryptoAES = new CryptoAES(transformation, properties,
265          inKey, outKey, inIv, outIv);
266      // create SaslCipherMeta and send to client,
267      //  for client, the [inKey, outKey], [inIv, outIv] should be reversed
268      RPCProtos.CryptoCipherMeta.Builder ccmBuilder = RPCProtos.CryptoCipherMeta.newBuilder();
269      ccmBuilder.setTransformation(transformation);
270      ccmBuilder.setInIv(getByteString(outIv));
271      ccmBuilder.setInKey(getByteString(outKey));
272      ccmBuilder.setOutIv(getByteString(inIv));
273      ccmBuilder.setOutKey(getByteString(inKey));
274      chrBuilder.setCryptoCipherMeta(ccmBuilder);
275      useCryptoAesWrap = true;
276    } catch (GeneralSecurityException | IOException ex) {
277      throw new UnsupportedCryptoException(ex.getMessage(), ex);
278    }
279  }
280
281  private ByteString getByteString(byte[] bytes) {
282    // return singleton to reduce object allocation
283    return (bytes.length == 0) ? ByteString.EMPTY : ByteString.copyFrom(bytes);
284  }
285
286  private UserGroupInformation createUser(ConnectionHeader head) {
287    UserGroupInformation ugi = null;
288
289    if (!head.hasUserInfo()) {
290      return null;
291    }
292    UserInformation userInfoProto = head.getUserInfo();
293    String effectiveUser = null;
294    if (userInfoProto.hasEffectiveUser()) {
295      effectiveUser = userInfoProto.getEffectiveUser();
296    }
297    String realUser = null;
298    if (userInfoProto.hasRealUser()) {
299      realUser = userInfoProto.getRealUser();
300    }
301    if (effectiveUser != null) {
302      if (realUser != null) {
303        UserGroupInformation realUserUgi =
304            UserGroupInformation.createRemoteUser(realUser);
305        ugi = UserGroupInformation.createProxyUser(effectiveUser, realUserUgi);
306      } else {
307        ugi = UserGroupInformation.createRemoteUser(effectiveUser);
308      }
309    }
310    return ugi;
311  }
312
313  protected final void disposeSasl() {
314    if (saslServer != null) {
315      saslServer.dispose();
316      saslServer = null;
317    }
318  }
319
320  /**
321   * No protobuf encoding of raw sasl messages
322   */
323  protected final void doRawSaslReply(SaslStatus status, Writable rv,
324      String errorClass, String error) throws IOException {
325    BufferChain bc;
326    // In my testing, have noticed that sasl messages are usually
327    // in the ballpark of 100-200. That's why the initial capacity is 256.
328    try (ByteBufferOutputStream saslResponse = new ByteBufferOutputStream(256);
329        DataOutputStream  out = new DataOutputStream(saslResponse)) {
330      out.writeInt(status.state); // write status
331      if (status == SaslStatus.SUCCESS) {
332        rv.write(out);
333      } else {
334        WritableUtils.writeString(out, errorClass);
335        WritableUtils.writeString(out, error);
336      }
337      bc = new BufferChain(saslResponse.getByteBuffer());
338    }
339    doRespond(() -> bc);
340  }
341
342  public void saslReadAndProcess(ByteBuff saslToken) throws IOException,
343      InterruptedException {
344    if (saslContextEstablished) {
345      RpcServer.LOG.trace("Read input token of size={} for processing by saslServer.unwrap()",
346        saslToken.limit());
347      if (!useWrap) {
348        processOneRpc(saslToken);
349      } else {
350        byte[] b = saslToken.hasArray() ? saslToken.array() : saslToken.toBytes();
351        byte [] plaintextData;
352        if (useCryptoAesWrap) {
353          // unwrap with CryptoAES
354          plaintextData = cryptoAES.unwrap(b, 0, b.length);
355        } else {
356          plaintextData = saslServer.unwrap(b, 0, b.length);
357        }
358        processUnwrappedData(plaintextData);
359      }
360    } else {
361      byte[] replyToken;
362      try {
363        if (saslServer == null) {
364          saslServer =
365              new HBaseSaslRpcServer(authMethod, rpcServer.saslProps, rpcServer.secretManager);
366          RpcServer.LOG.debug("Created SASL server with mechanism={}",
367              authMethod.getMechanismName());
368        }
369        RpcServer.LOG.debug("Read input token of size={} for processing by saslServer." +
370            "evaluateResponse()", saslToken.limit());
371        replyToken = saslServer.evaluateResponse(saslToken.hasArray()?
372            saslToken.array() : saslToken.toBytes());
373      } catch (IOException e) {
374        IOException sendToClient = e;
375        Throwable cause = e;
376        while (cause != null) {
377          if (cause instanceof InvalidToken) {
378            sendToClient = (InvalidToken) cause;
379            break;
380          }
381          cause = cause.getCause();
382        }
383        doRawSaslReply(SaslStatus.ERROR, null, sendToClient.getClass().getName(),
384          sendToClient.getLocalizedMessage());
385        this.rpcServer.metrics.authenticationFailure();
386        String clientIP = this.toString();
387        // attempting user could be null
388        RpcServer.AUDITLOG
389            .warn(RpcServer.AUTH_FAILED_FOR + clientIP + ":" + saslServer.getAttemptingUser());
390        throw e;
391      }
392      if (replyToken != null) {
393        if (RpcServer.LOG.isDebugEnabled()) {
394          RpcServer.LOG.debug("Will send token of size " + replyToken.length
395              + " from saslServer.");
396        }
397        doRawSaslReply(SaslStatus.SUCCESS, new BytesWritable(replyToken), null,
398            null);
399      }
400      if (saslServer.isComplete()) {
401        String qop = saslServer.getNegotiatedQop();
402        useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
403        ugi = getAuthorizedUgi(saslServer.getAuthorizationID());
404        if (RpcServer.LOG.isDebugEnabled()) {
405          RpcServer.LOG.debug("SASL server context established. Authenticated client: " + ugi +
406              ". Negotiated QoP is " + qop);
407        }
408        this.rpcServer.metrics.authenticationSuccess();
409        RpcServer.AUDITLOG.info(RpcServer.AUTH_SUCCESSFUL_FOR + ugi);
410        saslContextEstablished = true;
411      }
412    }
413  }
414
415  private void processUnwrappedData(byte[] inBuf) throws IOException, InterruptedException {
416    ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(inBuf));
417    // Read all RPCs contained in the inBuf, even partial ones
418    while (true) {
419      int count;
420      if (unwrappedDataLengthBuffer.remaining() > 0) {
421        count = this.rpcServer.channelRead(ch, unwrappedDataLengthBuffer);
422        if (count <= 0 || unwrappedDataLengthBuffer.remaining() > 0)
423          return;
424      }
425
426      if (unwrappedData == null) {
427        unwrappedDataLengthBuffer.flip();
428        int unwrappedDataLength = unwrappedDataLengthBuffer.getInt();
429
430        if (unwrappedDataLength == RpcClient.PING_CALL_ID) {
431          if (RpcServer.LOG.isDebugEnabled())
432            RpcServer.LOG.debug("Received ping message");
433          unwrappedDataLengthBuffer.clear();
434          continue; // ping message
435        }
436        unwrappedData = ByteBuffer.allocate(unwrappedDataLength);
437      }
438
439      count = this.rpcServer.channelRead(ch, unwrappedData);
440      if (count <= 0 || unwrappedData.remaining() > 0)
441        return;
442
443      if (unwrappedData.remaining() == 0) {
444        unwrappedDataLengthBuffer.clear();
445        unwrappedData.flip();
446        processOneRpc(new SingleByteBuff(unwrappedData));
447        unwrappedData = null;
448      }
449    }
450  }
451
452  public void processOneRpc(ByteBuff buf) throws IOException,
453      InterruptedException {
454    if (connectionHeaderRead) {
455      processRequest(buf);
456    } else {
457      processConnectionHeader(buf);
458      this.connectionHeaderRead = true;
459      if (!authorizeConnection()) {
460        // Throw FatalConnectionException wrapping ACE so client does right thing and closes
461        // down the connection instead of trying to read non-existent retun.
462        throw new AccessDeniedException("Connection from " + this + " for service " +
463          connectionHeader.getServiceName() + " is unauthorized for user: " + ugi);
464      }
465      this.user = this.rpcServer.userProvider.create(this.ugi);
466    }
467  }
468
469  private boolean authorizeConnection() throws IOException {
470    try {
471      // If auth method is DIGEST, the token was obtained by the
472      // real user for the effective user, therefore not required to
473      // authorize real user. doAs is allowed only for simple or kerberos
474      // authentication
475      if (ugi != null && ugi.getRealUser() != null
476          && (authMethod != AuthMethod.DIGEST)) {
477        ProxyUsers.authorize(ugi, this.getHostAddress(), this.rpcServer.conf);
478      }
479      this.rpcServer.authorize(ugi, connectionHeader, getHostInetAddress());
480      this.rpcServer.metrics.authorizationSuccess();
481    } catch (AuthorizationException ae) {
482      if (RpcServer.LOG.isDebugEnabled()) {
483        RpcServer.LOG.debug("Connection authorization failed: " + ae.getMessage(), ae);
484      }
485      this.rpcServer.metrics.authorizationFailure();
486      doRespond(getErrorResponse(ae.getMessage(), new AccessDeniedException(ae)));
487      return false;
488    }
489    return true;
490  }
491
492  // Reads the connection header following version
493  private void processConnectionHeader(ByteBuff buf) throws IOException {
494    if (buf.hasArray()) {
495      this.connectionHeader = ConnectionHeader.parseFrom(buf.array());
496    } else {
497      CodedInputStream cis = UnsafeByteOperations.unsafeWrap(
498          new ByteBuffByteInput(buf, 0, buf.limit()), 0, buf.limit()).newCodedInput();
499      cis.enableAliasing(true);
500      this.connectionHeader = ConnectionHeader.parseFrom(cis);
501    }
502    String serviceName = connectionHeader.getServiceName();
503    if (serviceName == null) throw new EmptyServiceNameException();
504    this.service = RpcServer.getService(this.rpcServer.services, serviceName);
505    if (this.service == null) throw new UnknownServiceException(serviceName);
506    setupCellBlockCodecs(this.connectionHeader);
507    RPCProtos.ConnectionHeaderResponse.Builder chrBuilder =
508        RPCProtos.ConnectionHeaderResponse.newBuilder();
509    setupCryptoCipher(this.connectionHeader, chrBuilder);
510    responseConnectionHeader(chrBuilder);
511    UserGroupInformation protocolUser = createUser(connectionHeader);
512    if (!useSasl) {
513      ugi = protocolUser;
514      if (ugi != null) {
515        ugi.setAuthenticationMethod(AuthMethod.SIMPLE.authenticationMethod);
516      }
517      // audit logging for SASL authenticated users happens in saslReadAndProcess()
518      if (authenticatedWithFallback) {
519        RpcServer.LOG.warn("Allowed fallback to SIMPLE auth for {} connecting from {}",
520            ugi, getHostAddress());
521      }
522    } else {
523      // user is authenticated
524      ugi.setAuthenticationMethod(authMethod.authenticationMethod);
525      //Now we check if this is a proxy user case. If the protocol user is
526      //different from the 'user', it is a proxy user scenario. However,
527      //this is not allowed if user authenticated with DIGEST.
528      if ((protocolUser != null)
529          && (!protocolUser.getUserName().equals(ugi.getUserName()))) {
530        if (authMethod == AuthMethod.DIGEST) {
531          // Not allowed to doAs if token authentication is used
532          throw new AccessDeniedException("Authenticated user (" + ugi
533              + ") doesn't match what the client claims to be ("
534              + protocolUser + ")");
535        } else {
536          // Effective user can be different from authenticated user
537          // for simple auth or kerberos auth
538          // The user is the real user. Now we create a proxy user
539          UserGroupInformation realUser = ugi;
540          ugi = UserGroupInformation.createProxyUser(protocolUser
541              .getUserName(), realUser);
542          // Now the user is a proxy user, set Authentication method Proxy.
543          ugi.setAuthenticationMethod(AuthenticationMethod.PROXY);
544        }
545      }
546    }
547    String version;
548    if (this.connectionHeader.hasVersionInfo()) {
549      // see if this connection will support RetryImmediatelyException
550      this.retryImmediatelySupported =
551          VersionInfoUtil.hasMinimumVersion(getVersionInfo(), 1, 2);
552      version = this.connectionHeader.getVersionInfo().getVersion();
553    } else {
554      version = "UNKNOWN";
555    }
556    RpcServer.AUDITLOG.info("Connection from {}:{}, version={}, sasl={}, ugi={}, service={}",
557        this.hostAddress, this.remotePort, version, this.useSasl, this.ugi, serviceName);
558  }
559
560  /**
561   * Send the response for connection header
562   */
563  private void responseConnectionHeader(RPCProtos.ConnectionHeaderResponse.Builder chrBuilder)
564      throws FatalConnectionException {
565    // Response the connection header if Crypto AES is enabled
566    if (!chrBuilder.hasCryptoCipherMeta()) return;
567    try {
568      byte[] connectionHeaderResBytes = chrBuilder.build().toByteArray();
569      // encrypt the Crypto AES cipher meta data with sasl server, and send to client
570      byte[] unwrapped = new byte[connectionHeaderResBytes.length + 4];
571      Bytes.putBytes(unwrapped, 0, Bytes.toBytes(connectionHeaderResBytes.length), 0, 4);
572      Bytes.putBytes(unwrapped, 4, connectionHeaderResBytes, 0, connectionHeaderResBytes.length);
573      byte[] wrapped = saslServer.wrap(unwrapped, 0, unwrapped.length);
574      BufferChain bc;
575      try (ByteBufferOutputStream response = new ByteBufferOutputStream(wrapped.length + 4);
576          DataOutputStream out = new DataOutputStream(response)) {
577        out.writeInt(wrapped.length);
578        out.write(wrapped);
579        bc = new BufferChain(response.getByteBuffer());
580      }
581      doRespond(() -> bc);
582    } catch (IOException ex) {
583      throw new UnsupportedCryptoException(ex.getMessage(), ex);
584    }
585  }
586
587  protected abstract void doRespond(RpcResponse resp) throws IOException;
588
589  /**
590   * @param buf
591   *          Has the request header and the request param and optionally
592   *          encoded data buffer all in this one array.
593   * @throws IOException
594   * @throws InterruptedException
595   */
596  protected void processRequest(ByteBuff buf) throws IOException,
597      InterruptedException {
598    long totalRequestSize = buf.limit();
599    int offset = 0;
600    // Here we read in the header. We avoid having pb
601    // do its default 4k allocation for CodedInputStream. We force it to use
602    // backing array.
603    CodedInputStream cis;
604    if (buf.hasArray()) {
605      cis = UnsafeByteOperations.unsafeWrap(buf.array(), 0, buf.limit()).newCodedInput();
606    } else {
607      cis = UnsafeByteOperations
608          .unsafeWrap(new ByteBuffByteInput(buf, 0, buf.limit()), 0, buf.limit()).newCodedInput();
609    }
610    cis.enableAliasing(true);
611    int headerSize = cis.readRawVarint32();
612    offset = cis.getTotalBytesRead();
613    Message.Builder builder = RequestHeader.newBuilder();
614    ProtobufUtil.mergeFrom(builder, cis, headerSize);
615    RequestHeader header = (RequestHeader) builder.build();
616    offset += headerSize;
617    int id = header.getCallId();
618    if (RpcServer.LOG.isTraceEnabled()) {
619      RpcServer.LOG.trace("RequestHeader " + TextFormat.shortDebugString(header)
620          + " totalRequestSize: " + totalRequestSize + " bytes");
621    }
622    // Enforcing the call queue size, this triggers a retry in the client
623    // This is a bit late to be doing this check - we have already read in the
624    // total request.
625    if ((totalRequestSize +
626        this.rpcServer.callQueueSizeInBytes.sum()) > this.rpcServer.maxQueueSizeInBytes) {
627      final ServerCall<?> callTooBig = createCall(id, this.service, null, null, null, null,
628        totalRequestSize, null, 0, this.callCleanup);
629      this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION);
630      callTooBig.setResponse(null, null,  RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION,
631        "Call queue is full on " + this.rpcServer.server.getServerName() +
632        ", is hbase.ipc.server.max.callqueue.size too small?");
633      callTooBig.sendResponseIfReady();
634      return;
635    }
636    MethodDescriptor md = null;
637    Message param = null;
638    CellScanner cellScanner = null;
639    try {
640      if (header.hasRequestParam() && header.getRequestParam()) {
641        md = this.service.getDescriptorForType().findMethodByName(
642            header.getMethodName());
643        if (md == null)
644          throw new UnsupportedOperationException(header.getMethodName());
645        builder = this.service.getRequestPrototype(md).newBuilderForType();
646        cis.resetSizeCounter();
647        int paramSize = cis.readRawVarint32();
648        offset += cis.getTotalBytesRead();
649        if (builder != null) {
650          ProtobufUtil.mergeFrom(builder, cis, paramSize);
651          param = builder.build();
652        }
653        offset += paramSize;
654      } else {
655        // currently header must have request param, so we directly throw
656        // exception here
657        String msg = "Invalid request header: "
658            + TextFormat.shortDebugString(header)
659            + ", should have param set in it";
660        RpcServer.LOG.warn(msg);
661        throw new DoNotRetryIOException(msg);
662      }
663      if (header.hasCellBlockMeta()) {
664        buf.position(offset);
665        ByteBuff dup = buf.duplicate();
666        dup.limit(offset + header.getCellBlockMeta().getLength());
667        cellScanner = this.rpcServer.cellBlockBuilder.createCellScannerReusingBuffers(
668            this.codec, this.compressionCodec, dup);
669      }
670    } catch (Throwable t) {
671      InetSocketAddress address = this.rpcServer.getListenerAddress();
672      String msg = (address != null ? address : "(channel closed)")
673          + " is unable to read call parameter from client "
674          + getHostAddress();
675      RpcServer.LOG.warn(msg, t);
676
677      this.rpcServer.metrics.exception(t);
678
679      // probably the hbase hadoop version does not match the running hadoop
680      // version
681      if (t instanceof LinkageError) {
682        t = new DoNotRetryIOException(t);
683      }
684      // If the method is not present on the server, do not retry.
685      if (t instanceof UnsupportedOperationException) {
686        t = new DoNotRetryIOException(t);
687      }
688
689      ServerCall<?> readParamsFailedCall = createCall(id, this.service, null, null, null, null,
690        totalRequestSize, null, 0, this.callCleanup);
691      readParamsFailedCall.setResponse(null, null, t, msg + "; " + t.getMessage());
692      readParamsFailedCall.sendResponseIfReady();
693      return;
694    }
695
696    int timeout = 0;
697    if (header.hasTimeout() && header.getTimeout() > 0) {
698      timeout = Math.max(this.rpcServer.minClientRequestTimeout, header.getTimeout());
699    }
700    ServerCall<?> call = createCall(id, this.service, md, header, param, cellScanner, totalRequestSize,
701      this.addr, timeout, this.callCleanup);
702
703    if (!this.rpcServer.scheduler.dispatch(new CallRunner(this.rpcServer, call))) {
704      this.rpcServer.callQueueSizeInBytes.add(-1 * call.getSize());
705      this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION);
706      call.setResponse(null, null, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION,
707        "Call queue is full on " + this.rpcServer.server.getServerName() +
708            ", too many items queued ?");
709      call.sendResponseIfReady();
710    }
711  }
712
713  protected final RpcResponse getErrorResponse(String msg, Exception e) throws IOException {
714    ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder().setCallId(-1);
715    ServerCall.setExceptionResponse(e, msg, headerBuilder);
716    ByteBuffer headerBuf =
717        ServerCall.createHeaderAndMessageBytes(null, headerBuilder.build(), 0, null);
718    BufferChain buf = new BufferChain(headerBuf);
719    return () -> buf;
720  }
721
722  private void doBadPreambleHandling(String msg) throws IOException {
723    doBadPreambleHandling(msg, new FatalConnectionException(msg));
724  }
725
726  private void doBadPreambleHandling(String msg, Exception e) throws IOException {
727    SimpleRpcServer.LOG.warn(msg);
728    doRespond(getErrorResponse(msg, e));
729  }
730
731  protected final boolean processPreamble(ByteBuffer preambleBuffer) throws IOException {
732    assert preambleBuffer.remaining() == 6;
733    for (int i = 0; i < RPC_HEADER.length; i++) {
734      if (RPC_HEADER[i] != preambleBuffer.get()) {
735        doBadPreambleHandling(
736          "Expected HEADER=" + Bytes.toStringBinary(RPC_HEADER) + " but received HEADER=" +
737              Bytes.toStringBinary(preambleBuffer.array(), 0, RPC_HEADER.length) + " from " +
738              toString());
739        return false;
740      }
741    }
742    int version = preambleBuffer.get() & 0xFF;
743    byte authbyte = preambleBuffer.get();
744    this.authMethod = AuthMethod.valueOf(authbyte);
745    if (version != SimpleRpcServer.CURRENT_VERSION) {
746      String msg = getFatalConnectionString(version, authbyte);
747      doBadPreambleHandling(msg, new WrongVersionException(msg));
748      return false;
749    }
750    if (authMethod == null) {
751      String msg = getFatalConnectionString(version, authbyte);
752      doBadPreambleHandling(msg, new BadAuthException(msg));
753      return false;
754    }
755    if (this.rpcServer.isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
756      if (this.rpcServer.allowFallbackToSimpleAuth) {
757        this.rpcServer.metrics.authenticationFallback();
758        authenticatedWithFallback = true;
759      } else {
760        AccessDeniedException ae = new AccessDeniedException("Authentication is required");
761        doRespond(getErrorResponse(ae.getMessage(), ae));
762        return false;
763      }
764    }
765    if (!this.rpcServer.isSecurityEnabled && authMethod != AuthMethod.SIMPLE) {
766      doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(SaslUtil.SWITCH_TO_SIMPLE_AUTH), null,
767        null);
768      authMethod = AuthMethod.SIMPLE;
769      // client has already sent the initial Sasl message and we
770      // should ignore it. Both client and server should fall back
771      // to simple auth from now on.
772      skipInitialSaslHandshake = true;
773    }
774    if (authMethod != AuthMethod.SIMPLE) {
775      useSasl = true;
776    }
777    return true;
778  }
779
780  public abstract boolean isConnectionOpen();
781
782  public abstract ServerCall<?> createCall(int id, BlockingService service, MethodDescriptor md,
783      RequestHeader header, Message param, CellScanner cellScanner, long size,
784      InetAddress remoteAddress, int timeout, CallCleanup reqCleanup);
785
786  private static class ByteBuffByteInput extends ByteInput {
787
788    private ByteBuff buf;
789    private int offset;
790    private int length;
791
792    ByteBuffByteInput(ByteBuff buf, int offset, int length) {
793      this.buf = buf;
794      this.offset = offset;
795      this.length = length;
796    }
797
798    @Override
799    public byte read(int offset) {
800      return this.buf.get(getAbsoluteOffset(offset));
801    }
802
803    private int getAbsoluteOffset(int offset) {
804      return this.offset + offset;
805    }
806
807    @Override
808    public int read(int offset, byte[] out, int outOffset, int len) {
809      this.buf.get(getAbsoluteOffset(offset), out, outOffset, len);
810      return len;
811    }
812
813    @Override
814    public int read(int offset, ByteBuffer out) {
815      int len = out.remaining();
816      this.buf.get(out, getAbsoluteOffset(offset), len);
817      return len;
818    }
819
820    @Override
821    public int size() {
822      return this.length;
823    }
824  }
825}