001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import static org.apache.hadoop.hbase.HConstants.RPC_HEADER;
021
022import java.io.ByteArrayInputStream;
023import java.io.Closeable;
024import java.io.DataOutputStream;
025import java.io.IOException;
026import java.net.InetAddress;
027import java.net.InetSocketAddress;
028import java.nio.ByteBuffer;
029import java.nio.channels.Channels;
030import java.nio.channels.ReadableByteChannel;
031import java.security.GeneralSecurityException;
032import java.util.Objects;
033import java.util.Properties;
034
035import org.apache.commons.crypto.cipher.CryptoCipherFactory;
036import org.apache.commons.crypto.random.CryptoRandom;
037import org.apache.commons.crypto.random.CryptoRandomFactory;
038import org.apache.hadoop.hbase.CellScanner;
039import org.apache.hadoop.hbase.DoNotRetryIOException;
040import org.apache.yetus.audience.InterfaceAudience;
041import org.apache.hadoop.hbase.client.VersionInfoUtil;
042import org.apache.hadoop.hbase.codec.Codec;
043import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
044import org.apache.hadoop.hbase.io.crypto.aes.CryptoAES;
045import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
046import org.apache.hadoop.hbase.nio.ByteBuff;
047import org.apache.hadoop.hbase.nio.SingleByteBuff;
048import org.apache.hadoop.hbase.security.AccessDeniedException;
049import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
050import org.apache.hadoop.hbase.security.SaslStatus;
051import org.apache.hadoop.hbase.security.SaslUtil;
052import org.apache.hadoop.hbase.security.User;
053import org.apache.hadoop.hbase.security.provider.SaslServerAuthenticationProvider;
054import org.apache.hadoop.hbase.security.provider.SaslServerAuthenticationProviders;
055import org.apache.hadoop.hbase.security.provider.SimpleSaslServerAuthenticationProvider;
056import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
057import org.apache.hbase.thirdparty.com.google.protobuf.ByteInput;
058import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
059import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream;
060import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
061import org.apache.hbase.thirdparty.com.google.protobuf.Message;
062import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
063import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
064import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
065import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo;
066import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
067import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
068import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
069import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader;
070import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation;
071import org.apache.hadoop.hbase.util.Bytes;
072import org.apache.hadoop.io.BytesWritable;
073import org.apache.hadoop.io.IntWritable;
074import org.apache.hadoop.io.Writable;
075import org.apache.hadoop.io.WritableUtils;
076import org.apache.hadoop.io.compress.CompressionCodec;
077import org.apache.hadoop.security.UserGroupInformation;
078import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
079import org.apache.hadoop.security.authorize.AuthorizationException;
080import org.apache.hadoop.security.authorize.ProxyUsers;
081import org.apache.hadoop.security.token.SecretManager.InvalidToken;
082
083/** Reads calls from a connection and queues them for handling. */
084@edu.umd.cs.findbugs.annotations.SuppressWarnings(
085    value="VO_VOLATILE_INCREMENT",
086    justification="False positive according to http://sourceforge.net/p/findbugs/bugs/1032/")
087@InterfaceAudience.Private
088abstract class ServerRpcConnection implements Closeable {
089  /**  */
090  protected final RpcServer rpcServer;
091  // If the connection header has been read or not.
092  protected boolean connectionHeaderRead = false;
093
094  protected CallCleanup callCleanup;
095
096  // Cache the remote host & port info so that even if the socket is
097  // disconnected, we can say where it used to connect to.
098  protected String hostAddress;
099  protected int remotePort;
100  protected InetAddress addr;
101  protected ConnectionHeader connectionHeader;
102
103  /**
104   * Codec the client asked use.
105   */
106  protected Codec codec;
107  /**
108   * Compression codec the client asked us use.
109   */
110  protected CompressionCodec compressionCodec;
111  protected BlockingService service;
112
113  protected SaslServerAuthenticationProvider provider;
114  protected boolean saslContextEstablished;
115  protected boolean skipInitialSaslHandshake;
116  private ByteBuffer unwrappedData;
117  // When is this set? FindBugs wants to know! Says NP
118  private ByteBuffer unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
119  protected boolean useSasl;
120  protected HBaseSaslRpcServer saslServer;
121  protected CryptoAES cryptoAES;
122  protected boolean useWrap = false;
123  protected boolean useCryptoAesWrap = false;
124
125  // was authentication allowed with a fallback to simple auth
126  protected boolean authenticatedWithFallback;
127
128  protected boolean retryImmediatelySupported = false;
129
130  protected User user = null;
131  protected UserGroupInformation ugi = null;
132  protected SaslServerAuthenticationProviders saslProviders = null;
133
134  public ServerRpcConnection(RpcServer rpcServer) {
135    this.rpcServer = rpcServer;
136    this.callCleanup = null;
137    this.saslProviders = SaslServerAuthenticationProviders.getInstance(rpcServer.getConf());
138  }
139
140  @Override
141  public String toString() {
142    return getHostAddress() + ":" + remotePort;
143  }
144
145  public String getHostAddress() {
146    return hostAddress;
147  }
148
149  public InetAddress getHostInetAddress() {
150    return addr;
151  }
152
153  public int getRemotePort() {
154    return remotePort;
155  }
156
157  public VersionInfo getVersionInfo() {
158    if (connectionHeader.hasVersionInfo()) {
159      return connectionHeader.getVersionInfo();
160    }
161    return null;
162  }
163
164  private String getFatalConnectionString(final int version, final byte authByte) {
165    return "serverVersion=" + RpcServer.CURRENT_VERSION +
166        ", clientVersion=" + version + ", authMethod=" + authByte +
167        // The provider may be null if we failed to parse the header of the request
168        ", authName=" + (provider == null ? "unknown" : provider.getSaslAuthMethod().getName()) +
169        " from " + toString();
170  }
171
172  /**
173   * Set up cell block codecs
174   * @throws FatalConnectionException
175   */
176  private void setupCellBlockCodecs(final ConnectionHeader header)
177      throws FatalConnectionException {
178    // TODO: Plug in other supported decoders.
179    if (!header.hasCellBlockCodecClass()) return;
180    String className = header.getCellBlockCodecClass();
181    if (className == null || className.length() == 0) return;
182    try {
183      this.codec = (Codec)Class.forName(className).getDeclaredConstructor().newInstance();
184    } catch (Exception e) {
185      throw new UnsupportedCellCodecException(className, e);
186    }
187    if (!header.hasCellBlockCompressorClass()) return;
188    className = header.getCellBlockCompressorClass();
189    try {
190      this.compressionCodec =
191          (CompressionCodec)Class.forName(className).getDeclaredConstructor().newInstance();
192    } catch (Exception e) {
193      throw new UnsupportedCompressionCodecException(className, e);
194    }
195  }
196
197  /**
198   * Set up cipher for rpc encryption with Apache Commons Crypto
199   *
200   * @throws FatalConnectionException
201   */
202  private void setupCryptoCipher(final ConnectionHeader header,
203      RPCProtos.ConnectionHeaderResponse.Builder chrBuilder)
204      throws FatalConnectionException {
205    // If simple auth, return
206    if (saslServer == null) return;
207    // check if rpc encryption with Crypto AES
208    String qop = saslServer.getNegotiatedQop();
209    boolean isEncryption = SaslUtil.QualityOfProtection.PRIVACY
210        .getSaslQop().equalsIgnoreCase(qop);
211    boolean isCryptoAesEncryption = isEncryption && this.rpcServer.conf.getBoolean(
212        "hbase.rpc.crypto.encryption.aes.enabled", false);
213    if (!isCryptoAesEncryption) return;
214    if (!header.hasRpcCryptoCipherTransformation()) return;
215    String transformation = header.getRpcCryptoCipherTransformation();
216    if (transformation == null || transformation.length() == 0) return;
217     // Negotiates AES based on complete saslServer.
218     // The Crypto metadata need to be encrypted and send to client.
219    Properties properties = new Properties();
220    // the property for SecureRandomFactory
221    properties.setProperty(CryptoRandomFactory.CLASSES_KEY,
222        this.rpcServer.conf.get("hbase.crypto.sasl.encryption.aes.crypto.random",
223            "org.apache.commons.crypto.random.JavaCryptoRandom"));
224    // the property for cipher class
225    properties.setProperty(CryptoCipherFactory.CLASSES_KEY,
226        this.rpcServer.conf.get("hbase.rpc.crypto.encryption.aes.cipher.class",
227            "org.apache.commons.crypto.cipher.JceCipher"));
228
229    int cipherKeyBits = this.rpcServer.conf.getInt(
230        "hbase.rpc.crypto.encryption.aes.cipher.keySizeBits", 128);
231    // generate key and iv
232    if (cipherKeyBits % 8 != 0) {
233      throw new IllegalArgumentException("The AES cipher key size in bits" +
234          " should be a multiple of byte");
235    }
236    int len = cipherKeyBits / 8;
237    byte[] inKey = new byte[len];
238    byte[] outKey = new byte[len];
239    byte[] inIv = new byte[len];
240    byte[] outIv = new byte[len];
241
242    try {
243      // generate the cipher meta data with SecureRandom
244      CryptoRandom secureRandom = CryptoRandomFactory.getCryptoRandom(properties);
245      secureRandom.nextBytes(inKey);
246      secureRandom.nextBytes(outKey);
247      secureRandom.nextBytes(inIv);
248      secureRandom.nextBytes(outIv);
249
250      // create CryptoAES for server
251      cryptoAES = new CryptoAES(transformation, properties,
252          inKey, outKey, inIv, outIv);
253      // create SaslCipherMeta and send to client,
254      //  for client, the [inKey, outKey], [inIv, outIv] should be reversed
255      RPCProtos.CryptoCipherMeta.Builder ccmBuilder = RPCProtos.CryptoCipherMeta.newBuilder();
256      ccmBuilder.setTransformation(transformation);
257      ccmBuilder.setInIv(getByteString(outIv));
258      ccmBuilder.setInKey(getByteString(outKey));
259      ccmBuilder.setOutIv(getByteString(inIv));
260      ccmBuilder.setOutKey(getByteString(inKey));
261      chrBuilder.setCryptoCipherMeta(ccmBuilder);
262      useCryptoAesWrap = true;
263    } catch (GeneralSecurityException | IOException ex) {
264      throw new UnsupportedCryptoException(ex.getMessage(), ex);
265    }
266  }
267
268  private ByteString getByteString(byte[] bytes) {
269    // return singleton to reduce object allocation
270    return (bytes.length == 0) ? ByteString.EMPTY : ByteString.copyFrom(bytes);
271  }
272
273  private UserGroupInformation createUser(ConnectionHeader head) {
274    UserGroupInformation ugi = null;
275
276    if (!head.hasUserInfo()) {
277      return null;
278    }
279    UserInformation userInfoProto = head.getUserInfo();
280    String effectiveUser = null;
281    if (userInfoProto.hasEffectiveUser()) {
282      effectiveUser = userInfoProto.getEffectiveUser();
283    }
284    String realUser = null;
285    if (userInfoProto.hasRealUser()) {
286      realUser = userInfoProto.getRealUser();
287    }
288    if (effectiveUser != null) {
289      if (realUser != null) {
290        UserGroupInformation realUserUgi =
291            UserGroupInformation.createRemoteUser(realUser);
292        ugi = UserGroupInformation.createProxyUser(effectiveUser, realUserUgi);
293      } else {
294        ugi = UserGroupInformation.createRemoteUser(effectiveUser);
295      }
296    }
297    return ugi;
298  }
299
300  protected final void disposeSasl() {
301    if (saslServer != null) {
302      saslServer.dispose();
303      saslServer = null;
304    }
305  }
306
307  /**
308   * No protobuf encoding of raw sasl messages
309   */
310  protected final void doRawSaslReply(SaslStatus status, Writable rv,
311      String errorClass, String error) throws IOException {
312    BufferChain bc;
313    // In my testing, have noticed that sasl messages are usually
314    // in the ballpark of 100-200. That's why the initial capacity is 256.
315    try (ByteBufferOutputStream saslResponse = new ByteBufferOutputStream(256);
316        DataOutputStream  out = new DataOutputStream(saslResponse)) {
317      out.writeInt(status.state); // write status
318      if (status == SaslStatus.SUCCESS) {
319        rv.write(out);
320      } else {
321        WritableUtils.writeString(out, errorClass);
322        WritableUtils.writeString(out, error);
323      }
324      bc = new BufferChain(saslResponse.getByteBuffer());
325    }
326    doRespond(() -> bc);
327  }
328
329  public void saslReadAndProcess(ByteBuff saslToken) throws IOException,
330      InterruptedException {
331    if (saslContextEstablished) {
332      RpcServer.LOG.trace("Read input token of size={} for processing by saslServer.unwrap()",
333        saslToken.limit());
334      if (!useWrap) {
335        processOneRpc(saslToken);
336      } else {
337        byte[] b = saslToken.hasArray() ? saslToken.array() : saslToken.toBytes();
338        byte [] plaintextData;
339        if (useCryptoAesWrap) {
340          // unwrap with CryptoAES
341          plaintextData = cryptoAES.unwrap(b, 0, b.length);
342        } else {
343          plaintextData = saslServer.unwrap(b, 0, b.length);
344        }
345        processUnwrappedData(plaintextData);
346      }
347    } else {
348      byte[] replyToken;
349      try {
350        if (saslServer == null) {
351          try {
352            saslServer =
353              new HBaseSaslRpcServer(provider, rpcServer.saslProps, rpcServer.secretManager);
354          } catch (Exception e){
355            RpcServer.LOG.error("Error when trying to create instance of HBaseSaslRpcServer "
356              + "with sasl provider: " + provider, e);
357            throw e;
358          }
359          RpcServer.LOG.debug("Created SASL server with mechanism={}",
360              provider.getSaslAuthMethod().getAuthMethod());
361        }
362        RpcServer.LOG.debug("Read input token of size={} for processing by saslServer." +
363            "evaluateResponse()", saslToken.limit());
364        replyToken = saslServer.evaluateResponse(saslToken.hasArray()?
365            saslToken.array() : saslToken.toBytes());
366      } catch (IOException e) {
367        IOException sendToClient = e;
368        Throwable cause = e;
369        while (cause != null) {
370          if (cause instanceof InvalidToken) {
371            sendToClient = (InvalidToken) cause;
372            break;
373          }
374          cause = cause.getCause();
375        }
376        doRawSaslReply(SaslStatus.ERROR, null, sendToClient.getClass().getName(),
377          sendToClient.getLocalizedMessage());
378        this.rpcServer.metrics.authenticationFailure();
379        String clientIP = this.toString();
380        // attempting user could be null
381        RpcServer.AUDITLOG
382            .warn("{} {}: {}", RpcServer.AUTH_FAILED_FOR, clientIP, saslServer.getAttemptingUser());
383        throw e;
384      }
385      if (replyToken != null) {
386        if (RpcServer.LOG.isDebugEnabled()) {
387          RpcServer.LOG.debug("Will send token of size " + replyToken.length
388              + " from saslServer.");
389        }
390        doRawSaslReply(SaslStatus.SUCCESS, new BytesWritable(replyToken), null,
391            null);
392      }
393      if (saslServer.isComplete()) {
394        String qop = saslServer.getNegotiatedQop();
395        useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
396        ugi = provider.getAuthorizedUgi(saslServer.getAuthorizationID(),
397            this.rpcServer.secretManager);
398        RpcServer.LOG.debug(
399            "SASL server context established. Authenticated client: {}. Negotiated QoP is {}",
400            ugi, qop);
401        this.rpcServer.metrics.authenticationSuccess();
402        RpcServer.AUDITLOG.info(RpcServer.AUTH_SUCCESSFUL_FOR + ugi);
403        saslContextEstablished = true;
404      }
405    }
406  }
407
408  private void processUnwrappedData(byte[] inBuf) throws IOException, InterruptedException {
409    ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(inBuf));
410    // Read all RPCs contained in the inBuf, even partial ones
411    while (true) {
412      int count;
413      if (unwrappedDataLengthBuffer.remaining() > 0) {
414        count = this.rpcServer.channelRead(ch, unwrappedDataLengthBuffer);
415        if (count <= 0 || unwrappedDataLengthBuffer.remaining() > 0)
416          return;
417      }
418
419      if (unwrappedData == null) {
420        unwrappedDataLengthBuffer.flip();
421        int unwrappedDataLength = unwrappedDataLengthBuffer.getInt();
422
423        if (unwrappedDataLength == RpcClient.PING_CALL_ID) {
424          if (RpcServer.LOG.isDebugEnabled())
425            RpcServer.LOG.debug("Received ping message");
426          unwrappedDataLengthBuffer.clear();
427          continue; // ping message
428        }
429        unwrappedData = ByteBuffer.allocate(unwrappedDataLength);
430      }
431
432      count = this.rpcServer.channelRead(ch, unwrappedData);
433      if (count <= 0 || unwrappedData.remaining() > 0)
434        return;
435
436      if (unwrappedData.remaining() == 0) {
437        unwrappedDataLengthBuffer.clear();
438        unwrappedData.flip();
439        processOneRpc(new SingleByteBuff(unwrappedData));
440        unwrappedData = null;
441      }
442    }
443  }
444
445  public void processOneRpc(ByteBuff buf) throws IOException,
446      InterruptedException {
447    if (connectionHeaderRead) {
448      processRequest(buf);
449    } else {
450      processConnectionHeader(buf);
451      this.connectionHeaderRead = true;
452      if (!authorizeConnection()) {
453        // Throw FatalConnectionException wrapping ACE so client does right thing and closes
454        // down the connection instead of trying to read non-existent retun.
455        throw new AccessDeniedException("Connection from " + this + " for service " +
456          connectionHeader.getServiceName() + " is unauthorized for user: " + ugi);
457      }
458      this.user = this.rpcServer.userProvider.create(this.ugi);
459    }
460  }
461
462  private boolean authorizeConnection() throws IOException {
463    try {
464      // If auth method is DIGEST, the token was obtained by the
465      // real user for the effective user, therefore not required to
466      // authorize real user. doAs is allowed only for simple or kerberos
467      // authentication
468      if (ugi != null && ugi.getRealUser() != null
469          && provider.supportsProtocolAuthentication()) {
470        ProxyUsers.authorize(ugi, this.getHostAddress(), this.rpcServer.conf);
471      }
472      this.rpcServer.authorize(ugi, connectionHeader, getHostInetAddress());
473      this.rpcServer.metrics.authorizationSuccess();
474    } catch (AuthorizationException ae) {
475      if (RpcServer.LOG.isDebugEnabled()) {
476        RpcServer.LOG.debug("Connection authorization failed: " + ae.getMessage(), ae);
477      }
478      this.rpcServer.metrics.authorizationFailure();
479      doRespond(getErrorResponse(ae.getMessage(), new AccessDeniedException(ae)));
480      return false;
481    }
482    return true;
483  }
484
485  // Reads the connection header following version
486  private void processConnectionHeader(ByteBuff buf) throws IOException {
487    if (buf.hasArray()) {
488      this.connectionHeader = ConnectionHeader.parseFrom(buf.array());
489    } else {
490      CodedInputStream cis = UnsafeByteOperations.unsafeWrap(
491          new ByteBuffByteInput(buf, 0, buf.limit()), 0, buf.limit()).newCodedInput();
492      cis.enableAliasing(true);
493      this.connectionHeader = ConnectionHeader.parseFrom(cis);
494    }
495    String serviceName = connectionHeader.getServiceName();
496    if (serviceName == null) throw new EmptyServiceNameException();
497    this.service = RpcServer.getService(this.rpcServer.services, serviceName);
498    if (this.service == null) throw new UnknownServiceException(serviceName);
499    setupCellBlockCodecs(this.connectionHeader);
500    RPCProtos.ConnectionHeaderResponse.Builder chrBuilder =
501        RPCProtos.ConnectionHeaderResponse.newBuilder();
502    setupCryptoCipher(this.connectionHeader, chrBuilder);
503    responseConnectionHeader(chrBuilder);
504    UserGroupInformation protocolUser = createUser(connectionHeader);
505    if (!useSasl) {
506      ugi = protocolUser;
507      if (ugi != null) {
508        ugi.setAuthenticationMethod(AuthenticationMethod.SIMPLE);
509      }
510      // audit logging for SASL authenticated users happens in saslReadAndProcess()
511      if (authenticatedWithFallback) {
512        RpcServer.LOG.warn("Allowed fallback to SIMPLE auth for {} connecting from {}",
513            ugi, getHostAddress());
514      }
515    } else {
516      // user is authenticated
517      ugi.setAuthenticationMethod(provider.getSaslAuthMethod().getAuthMethod());
518      //Now we check if this is a proxy user case. If the protocol user is
519      //different from the 'user', it is a proxy user scenario. However,
520      //this is not allowed if user authenticated with DIGEST.
521      if ((protocolUser != null)
522          && (!protocolUser.getUserName().equals(ugi.getUserName()))) {
523        if (!provider.supportsProtocolAuthentication()) {
524          // Not allowed to doAs if token authentication is used
525          throw new AccessDeniedException("Authenticated user (" + ugi
526              + ") doesn't match what the client claims to be ("
527              + protocolUser + ")");
528        } else {
529          // Effective user can be different from authenticated user
530          // for simple auth or kerberos auth
531          // The user is the real user. Now we create a proxy user
532          UserGroupInformation realUser = ugi;
533          ugi = UserGroupInformation.createProxyUser(protocolUser
534              .getUserName(), realUser);
535          // Now the user is a proxy user, set Authentication method Proxy.
536          ugi.setAuthenticationMethod(AuthenticationMethod.PROXY);
537        }
538      }
539    }
540    String version;
541    if (this.connectionHeader.hasVersionInfo()) {
542      // see if this connection will support RetryImmediatelyException
543      this.retryImmediatelySupported =
544          VersionInfoUtil.hasMinimumVersion(getVersionInfo(), 1, 2);
545      version = this.connectionHeader.getVersionInfo().getVersion();
546    } else {
547      version = "UNKNOWN";
548    }
549    RpcServer.AUDITLOG.info("Connection from {}:{}, version={}, sasl={}, ugi={}, service={}",
550        this.hostAddress, this.remotePort, version, this.useSasl, this.ugi, serviceName);
551  }
552
553  /**
554   * Send the response for connection header
555   */
556  private void responseConnectionHeader(RPCProtos.ConnectionHeaderResponse.Builder chrBuilder)
557      throws FatalConnectionException {
558    // Response the connection header if Crypto AES is enabled
559    if (!chrBuilder.hasCryptoCipherMeta()) return;
560    try {
561      byte[] connectionHeaderResBytes = chrBuilder.build().toByteArray();
562      // encrypt the Crypto AES cipher meta data with sasl server, and send to client
563      byte[] unwrapped = new byte[connectionHeaderResBytes.length + 4];
564      Bytes.putBytes(unwrapped, 0, Bytes.toBytes(connectionHeaderResBytes.length), 0, 4);
565      Bytes.putBytes(unwrapped, 4, connectionHeaderResBytes, 0, connectionHeaderResBytes.length);
566      byte[] wrapped = saslServer.wrap(unwrapped, 0, unwrapped.length);
567      BufferChain bc;
568      try (ByteBufferOutputStream response = new ByteBufferOutputStream(wrapped.length + 4);
569          DataOutputStream out = new DataOutputStream(response)) {
570        out.writeInt(wrapped.length);
571        out.write(wrapped);
572        bc = new BufferChain(response.getByteBuffer());
573      }
574      doRespond(() -> bc);
575    } catch (IOException ex) {
576      throw new UnsupportedCryptoException(ex.getMessage(), ex);
577    }
578  }
579
580  protected abstract void doRespond(RpcResponse resp) throws IOException;
581
582  /**
583   * @param buf
584   *          Has the request header and the request param and optionally
585   *          encoded data buffer all in this one array.
586   * @throws IOException
587   * @throws InterruptedException
588   */
589  protected void processRequest(ByteBuff buf) throws IOException,
590      InterruptedException {
591    long totalRequestSize = buf.limit();
592    int offset = 0;
593    // Here we read in the header. We avoid having pb
594    // do its default 4k allocation for CodedInputStream. We force it to use
595    // backing array.
596    CodedInputStream cis;
597    if (buf.hasArray()) {
598      cis = UnsafeByteOperations.unsafeWrap(buf.array(), 0, buf.limit()).newCodedInput();
599    } else {
600      cis = UnsafeByteOperations
601          .unsafeWrap(new ByteBuffByteInput(buf, 0, buf.limit()), 0, buf.limit()).newCodedInput();
602    }
603    cis.enableAliasing(true);
604    int headerSize = cis.readRawVarint32();
605    offset = cis.getTotalBytesRead();
606    Message.Builder builder = RequestHeader.newBuilder();
607    ProtobufUtil.mergeFrom(builder, cis, headerSize);
608    RequestHeader header = (RequestHeader) builder.build();
609    offset += headerSize;
610    int id = header.getCallId();
611    if (RpcServer.LOG.isTraceEnabled()) {
612      RpcServer.LOG.trace("RequestHeader " + TextFormat.shortDebugString(header)
613          + " totalRequestSize: " + totalRequestSize + " bytes");
614    }
615    // Enforcing the call queue size, this triggers a retry in the client
616    // This is a bit late to be doing this check - we have already read in the
617    // total request.
618    if ((totalRequestSize +
619        this.rpcServer.callQueueSizeInBytes.sum()) > this.rpcServer.maxQueueSizeInBytes) {
620      final ServerCall<?> callTooBig = createCall(id, this.service, null, null, null, null,
621        totalRequestSize, null, 0, this.callCleanup);
622      this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION);
623      callTooBig.setResponse(null, null,  RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION,
624        "Call queue is full on " + this.rpcServer.server.getServerName() +
625        ", is hbase.ipc.server.max.callqueue.size too small?");
626      callTooBig.sendResponseIfReady();
627      return;
628    }
629    MethodDescriptor md = null;
630    Message param = null;
631    CellScanner cellScanner = null;
632    try {
633      if (header.hasRequestParam() && header.getRequestParam()) {
634        md = this.service.getDescriptorForType().findMethodByName(
635            header.getMethodName());
636        if (md == null)
637          throw new UnsupportedOperationException(header.getMethodName());
638        builder = this.service.getRequestPrototype(md).newBuilderForType();
639        cis.resetSizeCounter();
640        int paramSize = cis.readRawVarint32();
641        offset += cis.getTotalBytesRead();
642        if (builder != null) {
643          ProtobufUtil.mergeFrom(builder, cis, paramSize);
644          param = builder.build();
645        }
646        offset += paramSize;
647      } else {
648        // currently header must have request param, so we directly throw
649        // exception here
650        String msg = "Invalid request header: "
651            + TextFormat.shortDebugString(header)
652            + ", should have param set in it";
653        RpcServer.LOG.warn(msg);
654        throw new DoNotRetryIOException(msg);
655      }
656      if (header.hasCellBlockMeta()) {
657        buf.position(offset);
658        ByteBuff dup = buf.duplicate();
659        dup.limit(offset + header.getCellBlockMeta().getLength());
660        cellScanner = this.rpcServer.cellBlockBuilder.createCellScannerReusingBuffers(
661            this.codec, this.compressionCodec, dup);
662      }
663    } catch (Throwable t) {
664      InetSocketAddress address = this.rpcServer.getListenerAddress();
665      String msg = (address != null ? address : "(channel closed)")
666          + " is unable to read call parameter from client "
667          + getHostAddress();
668      RpcServer.LOG.warn(msg, t);
669
670      this.rpcServer.metrics.exception(t);
671
672      // probably the hbase hadoop version does not match the running hadoop
673      // version
674      if (t instanceof LinkageError) {
675        t = new DoNotRetryIOException(t);
676      }
677      // If the method is not present on the server, do not retry.
678      if (t instanceof UnsupportedOperationException) {
679        t = new DoNotRetryIOException(t);
680      }
681
682      ServerCall<?> readParamsFailedCall = createCall(id, this.service, null, null, null, null,
683        totalRequestSize, null, 0, this.callCleanup);
684      readParamsFailedCall.setResponse(null, null, t, msg + "; " + t.getMessage());
685      readParamsFailedCall.sendResponseIfReady();
686      return;
687    }
688
689    int timeout = 0;
690    if (header.hasTimeout() && header.getTimeout() > 0) {
691      timeout = Math.max(this.rpcServer.minClientRequestTimeout, header.getTimeout());
692    }
693    ServerCall<?> call = createCall(id, this.service, md, header, param, cellScanner, totalRequestSize,
694      this.addr, timeout, this.callCleanup);
695
696    if (!this.rpcServer.scheduler.dispatch(new CallRunner(this.rpcServer, call))) {
697      this.rpcServer.callQueueSizeInBytes.add(-1 * call.getSize());
698      this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION);
699      call.setResponse(null, null, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION,
700        "Call queue is full on " + this.rpcServer.server.getServerName() +
701            ", too many items queued ?");
702      call.sendResponseIfReady();
703    }
704  }
705
706  protected final RpcResponse getErrorResponse(String msg, Exception e) throws IOException {
707    ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder().setCallId(-1);
708    ServerCall.setExceptionResponse(e, msg, headerBuilder);
709    ByteBuffer headerBuf =
710        ServerCall.createHeaderAndMessageBytes(null, headerBuilder.build(), 0, null);
711    BufferChain buf = new BufferChain(headerBuf);
712    return () -> buf;
713  }
714
715  private void doBadPreambleHandling(String msg) throws IOException {
716    doBadPreambleHandling(msg, new FatalConnectionException(msg));
717  }
718
719  private void doBadPreambleHandling(String msg, Exception e) throws IOException {
720    SimpleRpcServer.LOG.warn(msg);
721    doRespond(getErrorResponse(msg, e));
722  }
723
724  protected final boolean processPreamble(ByteBuffer preambleBuffer) throws IOException {
725    assert preambleBuffer.remaining() == 6;
726    for (int i = 0; i < RPC_HEADER.length; i++) {
727      if (RPC_HEADER[i] != preambleBuffer.get()) {
728        doBadPreambleHandling(
729          "Expected HEADER=" + Bytes.toStringBinary(RPC_HEADER) + " but received HEADER=" +
730              Bytes.toStringBinary(preambleBuffer.array(), 0, RPC_HEADER.length) + " from " +
731              toString());
732        return false;
733      }
734    }
735    int version = preambleBuffer.get() & 0xFF;
736    byte authbyte = preambleBuffer.get();
737
738    if (version != SimpleRpcServer.CURRENT_VERSION) {
739      String msg = getFatalConnectionString(version, authbyte);
740      doBadPreambleHandling(msg, new WrongVersionException(msg));
741      return false;
742    }
743    this.provider = this.saslProviders.selectProvider(authbyte);
744    if (this.provider == null) {
745      String msg = getFatalConnectionString(version, authbyte);
746      doBadPreambleHandling(msg, new BadAuthException(msg));
747      return false;
748    }
749    // TODO this is a wart while simple auth'n doesn't go through sasl.
750    if (this.rpcServer.isSecurityEnabled && isSimpleAuthentication()) {
751      if (this.rpcServer.allowFallbackToSimpleAuth) {
752        this.rpcServer.metrics.authenticationFallback();
753        authenticatedWithFallback = true;
754      } else {
755        AccessDeniedException ae = new AccessDeniedException("Authentication is required");
756        doRespond(getErrorResponse(ae.getMessage(), ae));
757        return false;
758      }
759    }
760    if (!this.rpcServer.isSecurityEnabled && !isSimpleAuthentication()) {
761      doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(SaslUtil.SWITCH_TO_SIMPLE_AUTH), null,
762        null);
763      provider = saslProviders.getSimpleProvider();
764      // client has already sent the initial Sasl message and we
765      // should ignore it. Both client and server should fall back
766      // to simple auth from now on.
767      skipInitialSaslHandshake = true;
768    }
769    useSasl = !(provider instanceof SimpleSaslServerAuthenticationProvider);
770    return true;
771  }
772
773  boolean isSimpleAuthentication() {
774    return Objects.requireNonNull(provider) instanceof SimpleSaslServerAuthenticationProvider;
775  }
776
777  public abstract boolean isConnectionOpen();
778
779  public abstract ServerCall<?> createCall(int id, BlockingService service, MethodDescriptor md,
780      RequestHeader header, Message param, CellScanner cellScanner, long size,
781      InetAddress remoteAddress, int timeout, CallCleanup reqCleanup);
782
783  private static class ByteBuffByteInput extends ByteInput {
784
785    private ByteBuff buf;
786    private int offset;
787    private int length;
788
789    ByteBuffByteInput(ByteBuff buf, int offset, int length) {
790      this.buf = buf;
791      this.offset = offset;
792      this.length = length;
793    }
794
795    @Override
796    public byte read(int offset) {
797      return this.buf.get(getAbsoluteOffset(offset));
798    }
799
800    private int getAbsoluteOffset(int offset) {
801      return this.offset + offset;
802    }
803
804    @Override
805    public int read(int offset, byte[] out, int outOffset, int len) {
806      this.buf.get(getAbsoluteOffset(offset), out, outOffset, len);
807      return len;
808    }
809
810    @Override
811    public int read(int offset, ByteBuffer out) {
812      int len = out.remaining();
813      this.buf.get(out, getAbsoluteOffset(offset), len);
814      return len;
815    }
816
817    @Override
818    public int size() {
819      return this.length;
820    }
821  }
822}