001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import static org.apache.hadoop.hbase.HConstants.RPC_HEADER;
021
022import io.opentelemetry.api.GlobalOpenTelemetry;
023import io.opentelemetry.api.trace.Span;
024import io.opentelemetry.context.Context;
025import io.opentelemetry.context.Scope;
026import io.opentelemetry.context.propagation.TextMapGetter;
027import java.io.Closeable;
028import java.io.DataOutputStream;
029import java.io.IOException;
030import java.net.InetAddress;
031import java.net.InetSocketAddress;
032import java.nio.ByteBuffer;
033import java.security.GeneralSecurityException;
034import java.util.Objects;
035import java.util.Properties;
036import org.apache.commons.crypto.cipher.CryptoCipherFactory;
037import org.apache.commons.crypto.random.CryptoRandom;
038import org.apache.commons.crypto.random.CryptoRandomFactory;
039import org.apache.hadoop.hbase.CellScanner;
040import org.apache.hadoop.hbase.DoNotRetryIOException;
041import org.apache.hadoop.hbase.client.VersionInfoUtil;
042import org.apache.hadoop.hbase.codec.Codec;
043import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
044import org.apache.hadoop.hbase.io.crypto.aes.CryptoAES;
045import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
046import org.apache.hadoop.hbase.nio.ByteBuff;
047import org.apache.hadoop.hbase.security.AccessDeniedException;
048import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
049import org.apache.hadoop.hbase.security.SaslStatus;
050import org.apache.hadoop.hbase.security.SaslUtil;
051import org.apache.hadoop.hbase.security.User;
052import org.apache.hadoop.hbase.security.provider.SaslServerAuthenticationProvider;
053import org.apache.hadoop.hbase.security.provider.SaslServerAuthenticationProviders;
054import org.apache.hadoop.hbase.security.provider.SimpleSaslServerAuthenticationProvider;
055import org.apache.hadoop.hbase.trace.TraceUtil;
056import org.apache.hadoop.hbase.util.Bytes;
057import org.apache.hadoop.hbase.util.Pair;
058import org.apache.hadoop.io.IntWritable;
059import org.apache.hadoop.io.Writable;
060import org.apache.hadoop.io.WritableUtils;
061import org.apache.hadoop.io.compress.CompressionCodec;
062import org.apache.hadoop.security.UserGroupInformation;
063import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
064import org.apache.hadoop.security.authorize.AuthorizationException;
065import org.apache.hadoop.security.authorize.ProxyUsers;
066import org.apache.yetus.audience.InterfaceAudience;
067
068import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
069import org.apache.hbase.thirdparty.com.google.protobuf.ByteInput;
070import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
071import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream;
072import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
073import org.apache.hbase.thirdparty.com.google.protobuf.Message;
074import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
075import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
076
077import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
078import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo;
079import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
080import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
081import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
082import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader;
083import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation;
084import org.apache.hadoop.hbase.shaded.protobuf.generated.TracingProtos.RPCTInfo;
085
086/** Reads calls from a connection and queues them for handling. */
087@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "VO_VOLATILE_INCREMENT",
088    justification = "False positive according to http://sourceforge.net/p/findbugs/bugs/1032/")
089@InterfaceAudience.Private
090abstract class ServerRpcConnection implements Closeable {
091
092  private static final TextMapGetter<RPCTInfo> getter = new RPCTInfoGetter();
093
094  protected final RpcServer rpcServer;
095  // If the connection header has been read or not.
096  protected boolean connectionHeaderRead = false;
097
098  protected CallCleanup callCleanup;
099
100  // Cache the remote host & port info so that even if the socket is
101  // disconnected, we can say where it used to connect to.
102  protected String hostAddress;
103  protected int remotePort;
104  protected InetAddress addr;
105  protected ConnectionHeader connectionHeader;
106
107  /**
108   * Codec the client asked use.
109   */
110  protected Codec codec;
111  /**
112   * Compression codec the client asked us use.
113   */
114  protected CompressionCodec compressionCodec;
115  protected BlockingService service;
116
117  protected SaslServerAuthenticationProvider provider;
118  protected boolean skipInitialSaslHandshake;
119  protected boolean useSasl;
120  protected HBaseSaslRpcServer saslServer;
121
122  // was authentication allowed with a fallback to simple auth
123  protected boolean authenticatedWithFallback;
124
125  protected boolean retryImmediatelySupported = false;
126
127  protected User user = null;
128  protected UserGroupInformation ugi = null;
129  protected SaslServerAuthenticationProviders saslProviders = null;
130
131  public ServerRpcConnection(RpcServer rpcServer) {
132    this.rpcServer = rpcServer;
133    this.callCleanup = null;
134    this.saslProviders = SaslServerAuthenticationProviders.getInstance(rpcServer.getConf());
135  }
136
137  @Override
138  public String toString() {
139    return getHostAddress() + ":" + remotePort;
140  }
141
142  public String getHostAddress() {
143    return hostAddress;
144  }
145
146  public InetAddress getHostInetAddress() {
147    return addr;
148  }
149
150  public int getRemotePort() {
151    return remotePort;
152  }
153
154  public VersionInfo getVersionInfo() {
155    if (connectionHeader != null && connectionHeader.hasVersionInfo()) {
156      return connectionHeader.getVersionInfo();
157    }
158    return null;
159  }
160
161  private String getFatalConnectionString(final int version, final byte authByte) {
162    return "serverVersion=" + RpcServer.CURRENT_VERSION + ", clientVersion=" + version
163      + ", authMethod=" + authByte +
164      // The provider may be null if we failed to parse the header of the request
165      ", authName=" + (provider == null ? "unknown" : provider.getSaslAuthMethod().getName())
166      + " from " + toString();
167  }
168
169  /**
170   * Set up cell block codecs
171   */
172  private void setupCellBlockCodecs() throws FatalConnectionException {
173    // TODO: Plug in other supported decoders.
174    if (!connectionHeader.hasCellBlockCodecClass()) {
175      return;
176    }
177    String className = connectionHeader.getCellBlockCodecClass();
178    if (className == null || className.length() == 0) {
179      return;
180    }
181    try {
182      this.codec = (Codec) Class.forName(className).getDeclaredConstructor().newInstance();
183    } catch (Exception e) {
184      throw new UnsupportedCellCodecException(className, e);
185    }
186    if (!connectionHeader.hasCellBlockCompressorClass()) {
187      return;
188    }
189    className = connectionHeader.getCellBlockCompressorClass();
190    try {
191      this.compressionCodec =
192        (CompressionCodec) Class.forName(className).getDeclaredConstructor().newInstance();
193    } catch (Exception e) {
194      throw new UnsupportedCompressionCodecException(className, e);
195    }
196  }
197
198  /**
199   * Set up cipher for rpc encryption with Apache Commons Crypto.
200   */
201  private Pair<RPCProtos.ConnectionHeaderResponse, CryptoAES> setupCryptoCipher()
202    throws FatalConnectionException {
203    // If simple auth, return
204    if (saslServer == null) {
205      return null;
206    }
207    // check if rpc encryption with Crypto AES
208    String qop = saslServer.getNegotiatedQop();
209    boolean isEncryption = SaslUtil.QualityOfProtection.PRIVACY.getSaslQop().equalsIgnoreCase(qop);
210    boolean isCryptoAesEncryption = isEncryption
211      && this.rpcServer.conf.getBoolean("hbase.rpc.crypto.encryption.aes.enabled", false);
212    if (!isCryptoAesEncryption) {
213      return null;
214    }
215    if (!connectionHeader.hasRpcCryptoCipherTransformation()) {
216      return null;
217    }
218    String transformation = connectionHeader.getRpcCryptoCipherTransformation();
219    if (transformation == null || transformation.length() == 0) {
220      return null;
221    }
222    // Negotiates AES based on complete saslServer.
223    // The Crypto metadata need to be encrypted and send to client.
224    Properties properties = new Properties();
225    // the property for SecureRandomFactory
226    properties.setProperty(CryptoRandomFactory.CLASSES_KEY,
227      this.rpcServer.conf.get("hbase.crypto.sasl.encryption.aes.crypto.random",
228        "org.apache.commons.crypto.random.JavaCryptoRandom"));
229    // the property for cipher class
230    properties.setProperty(CryptoCipherFactory.CLASSES_KEY,
231      this.rpcServer.conf.get("hbase.rpc.crypto.encryption.aes.cipher.class",
232        "org.apache.commons.crypto.cipher.JceCipher"));
233
234    int cipherKeyBits =
235      this.rpcServer.conf.getInt("hbase.rpc.crypto.encryption.aes.cipher.keySizeBits", 128);
236    // generate key and iv
237    if (cipherKeyBits % 8 != 0) {
238      throw new IllegalArgumentException(
239        "The AES cipher key size in bits" + " should be a multiple of byte");
240    }
241    int len = cipherKeyBits / 8;
242    byte[] inKey = new byte[len];
243    byte[] outKey = new byte[len];
244    byte[] inIv = new byte[len];
245    byte[] outIv = new byte[len];
246
247    CryptoAES cryptoAES;
248    try {
249      // generate the cipher meta data with SecureRandom
250      CryptoRandom secureRandom = CryptoRandomFactory.getCryptoRandom(properties);
251      secureRandom.nextBytes(inKey);
252      secureRandom.nextBytes(outKey);
253      secureRandom.nextBytes(inIv);
254      secureRandom.nextBytes(outIv);
255
256      // create CryptoAES for server
257      cryptoAES = new CryptoAES(transformation, properties, inKey, outKey, inIv, outIv);
258    } catch (GeneralSecurityException | IOException ex) {
259      throw new UnsupportedCryptoException(ex.getMessage(), ex);
260    }
261    // create SaslCipherMeta and send to client,
262    // for client, the [inKey, outKey], [inIv, outIv] should be reversed
263    RPCProtos.CryptoCipherMeta.Builder ccmBuilder = RPCProtos.CryptoCipherMeta.newBuilder();
264    ccmBuilder.setTransformation(transformation);
265    ccmBuilder.setInIv(getByteString(outIv));
266    ccmBuilder.setInKey(getByteString(outKey));
267    ccmBuilder.setOutIv(getByteString(inIv));
268    ccmBuilder.setOutKey(getByteString(inKey));
269    RPCProtos.ConnectionHeaderResponse resp =
270      RPCProtos.ConnectionHeaderResponse.newBuilder().setCryptoCipherMeta(ccmBuilder).build();
271    return Pair.newPair(resp, cryptoAES);
272  }
273
274  private ByteString getByteString(byte[] bytes) {
275    // return singleton to reduce object allocation
276    return (bytes.length == 0) ? ByteString.EMPTY : ByteString.copyFrom(bytes);
277  }
278
279  private UserGroupInformation createUser(ConnectionHeader head) {
280    UserGroupInformation ugi = null;
281
282    if (!head.hasUserInfo()) {
283      return null;
284    }
285    UserInformation userInfoProto = head.getUserInfo();
286    String effectiveUser = null;
287    if (userInfoProto.hasEffectiveUser()) {
288      effectiveUser = userInfoProto.getEffectiveUser();
289    }
290    String realUser = null;
291    if (userInfoProto.hasRealUser()) {
292      realUser = userInfoProto.getRealUser();
293    }
294    if (effectiveUser != null) {
295      if (realUser != null) {
296        UserGroupInformation realUserUgi = UserGroupInformation.createRemoteUser(realUser);
297        ugi = UserGroupInformation.createProxyUser(effectiveUser, realUserUgi);
298      } else {
299        ugi = UserGroupInformation.createRemoteUser(effectiveUser);
300      }
301    }
302    return ugi;
303  }
304
305  protected final void disposeSasl() {
306    if (saslServer != null) {
307      saslServer.dispose();
308      saslServer = null;
309    }
310  }
311
312  /**
313   * No protobuf encoding of raw sasl messages
314   */
315  protected final void doRawSaslReply(SaslStatus status, Writable rv, String errorClass,
316    String error) throws IOException {
317    BufferChain bc;
318    // In my testing, have noticed that sasl messages are usually
319    // in the ballpark of 100-200. That's why the initial capacity is 256.
320    try (ByteBufferOutputStream saslResponse = new ByteBufferOutputStream(256);
321      DataOutputStream out = new DataOutputStream(saslResponse)) {
322      out.writeInt(status.state); // write status
323      if (status == SaslStatus.SUCCESS) {
324        rv.write(out);
325      } else {
326        WritableUtils.writeString(out, errorClass);
327        WritableUtils.writeString(out, error);
328      }
329      bc = new BufferChain(saslResponse.getByteBuffer());
330    }
331    doRespond(() -> bc);
332  }
333
334  HBaseSaslRpcServer getOrCreateSaslServer() throws IOException {
335    if (saslServer == null) {
336      saslServer = new HBaseSaslRpcServer(provider, rpcServer.saslProps, rpcServer.secretManager);
337    }
338    return saslServer;
339  }
340
341  void finishSaslNegotiation() throws IOException {
342    String qop = saslServer.getNegotiatedQop();
343    ugi = provider.getAuthorizedUgi(saslServer.getAuthorizationID(), this.rpcServer.secretManager);
344    RpcServer.LOG.debug(
345      "SASL server context established. Authenticated client: {}. Negotiated QoP is {}", ugi, qop);
346    rpcServer.metrics.authenticationSuccess();
347    RpcServer.AUDITLOG.info(RpcServer.AUTH_SUCCESSFUL_FOR + ugi);
348  }
349
350  public void processOneRpc(ByteBuff buf) throws IOException, InterruptedException {
351    if (connectionHeaderRead) {
352      processRequest(buf);
353    } else {
354      processConnectionHeader(buf);
355      callCleanupIfNeeded();
356      this.connectionHeaderRead = true;
357      if (rpcServer.needAuthorization() && !authorizeConnection()) {
358        // Throw FatalConnectionException wrapping ACE so client does right thing and closes
359        // down the connection instead of trying to read non-existent retun.
360        throw new AccessDeniedException("Connection from " + this + " for service "
361          + connectionHeader.getServiceName() + " is unauthorized for user: " + ugi);
362      }
363      this.user = this.rpcServer.userProvider.create(this.ugi);
364    }
365  }
366
367  private boolean authorizeConnection() throws IOException {
368    try {
369      // If auth method is DIGEST, the token was obtained by the
370      // real user for the effective user, therefore not required to
371      // authorize real user. doAs is allowed only for simple or kerberos
372      // authentication
373      if (ugi != null && ugi.getRealUser() != null && provider.supportsProtocolAuthentication()) {
374        ProxyUsers.authorize(ugi, this.getHostAddress(), this.rpcServer.conf);
375      }
376      this.rpcServer.authorize(ugi, connectionHeader, getHostInetAddress());
377      this.rpcServer.metrics.authorizationSuccess();
378    } catch (AuthorizationException ae) {
379      if (RpcServer.LOG.isDebugEnabled()) {
380        RpcServer.LOG.debug("Connection authorization failed: " + ae.getMessage(), ae);
381      }
382      this.rpcServer.metrics.authorizationFailure();
383      doRespond(getErrorResponse(ae.getMessage(), new AccessDeniedException(ae)));
384      return false;
385    }
386    return true;
387  }
388
389  private CodedInputStream createCis(ByteBuff buf) {
390    // Here we read in the header. We avoid having pb
391    // do its default 4k allocation for CodedInputStream. We force it to use
392    // backing array.
393    CodedInputStream cis;
394    if (buf.hasArray()) {
395      cis = UnsafeByteOperations
396        .unsafeWrap(buf.array(), buf.arrayOffset() + buf.position(), buf.limit()).newCodedInput();
397    } else {
398      cis = UnsafeByteOperations.unsafeWrap(new ByteBuffByteInput(buf, buf.limit()), 0, buf.limit())
399        .newCodedInput();
400    }
401    cis.enableAliasing(true);
402    return cis;
403  }
404
405  // Reads the connection header following version
406  private void processConnectionHeader(ByteBuff buf) throws IOException {
407    this.connectionHeader = ConnectionHeader.parseFrom(createCis(buf));
408    String serviceName = connectionHeader.getServiceName();
409    if (serviceName == null) {
410      throw new EmptyServiceNameException();
411    }
412    this.service = RpcServer.getService(this.rpcServer.services, serviceName);
413    if (this.service == null) {
414      throw new UnknownServiceException(serviceName);
415    }
416    setupCellBlockCodecs();
417    sendConnectionHeaderResponseIfNeeded();
418    UserGroupInformation protocolUser = createUser(connectionHeader);
419    if (!useSasl) {
420      ugi = protocolUser;
421      if (ugi != null) {
422        ugi.setAuthenticationMethod(AuthenticationMethod.SIMPLE);
423      }
424      // audit logging for SASL authenticated users happens in saslReadAndProcess()
425      if (authenticatedWithFallback) {
426        RpcServer.LOG.warn("Allowed fallback to SIMPLE auth for {} connecting from {}", ugi,
427          getHostAddress());
428      }
429    } else {
430      // user is authenticated
431      ugi.setAuthenticationMethod(provider.getSaslAuthMethod().getAuthMethod());
432      // Now we check if this is a proxy user case. If the protocol user is
433      // different from the 'user', it is a proxy user scenario. However,
434      // this is not allowed if user authenticated with DIGEST.
435      if ((protocolUser != null) && (!protocolUser.getUserName().equals(ugi.getUserName()))) {
436        if (!provider.supportsProtocolAuthentication()) {
437          // Not allowed to doAs if token authentication is used
438          throw new AccessDeniedException("Authenticated user (" + ugi
439            + ") doesn't match what the client claims to be (" + protocolUser + ")");
440        } else {
441          // Effective user can be different from authenticated user
442          // for simple auth or kerberos auth
443          // The user is the real user. Now we create a proxy user
444          UserGroupInformation realUser = ugi;
445          ugi = UserGroupInformation.createProxyUser(protocolUser.getUserName(), realUser);
446          // Now the user is a proxy user, set Authentication method Proxy.
447          ugi.setAuthenticationMethod(AuthenticationMethod.PROXY);
448        }
449      }
450    }
451    String version;
452    if (this.connectionHeader.hasVersionInfo()) {
453      // see if this connection will support RetryImmediatelyException
454      this.retryImmediatelySupported = VersionInfoUtil.hasMinimumVersion(getVersionInfo(), 1, 2);
455      version = this.connectionHeader.getVersionInfo().getVersion();
456    } else {
457      version = "UNKNOWN";
458    }
459    RpcServer.AUDITLOG.info("Connection from {}:{}, version={}, sasl={}, ugi={}, service={}",
460      this.hostAddress, this.remotePort, version, this.useSasl, this.ugi, serviceName);
461  }
462
463  /**
464   * Send the response for connection header
465   */
466  private void sendConnectionHeaderResponseIfNeeded() throws FatalConnectionException {
467    Pair<RPCProtos.ConnectionHeaderResponse, CryptoAES> pair = setupCryptoCipher();
468    // Response the connection header if Crypto AES is enabled
469    if (pair == null) {
470      return;
471    }
472    try {
473      int size = pair.getFirst().getSerializedSize();
474      BufferChain bc;
475      try (ByteBufferOutputStream bbOut = new ByteBufferOutputStream(4 + size);
476        DataOutputStream out = new DataOutputStream(bbOut)) {
477        out.writeInt(size);
478        pair.getFirst().writeTo(out);
479        bc = new BufferChain(bbOut.getByteBuffer());
480      }
481      doRespond(new RpcResponse() {
482
483        @Override
484        public BufferChain getResponse() {
485          return bc;
486        }
487
488        @Override
489        public void done() {
490          // must switch after sending the connection header response, as the client still uses the
491          // original SaslClient to unwrap the data we send back
492          saslServer.switchToCryptoAES(pair.getSecond());
493        }
494      });
495    } catch (IOException ex) {
496      throw new UnsupportedCryptoException(ex.getMessage(), ex);
497    }
498  }
499
500  protected abstract void doRespond(RpcResponse resp) throws IOException;
501
502  /**
503   * Has the request header and the request param and optionally encoded data buffer all in this one
504   * array.
505   * <p/>
506   * Will be overridden in tests.
507   */
508  protected void processRequest(ByteBuff buf) throws IOException, InterruptedException {
509    long totalRequestSize = buf.limit();
510    int offset = 0;
511    // Here we read in the header. We avoid having pb
512    // do its default 4k allocation for CodedInputStream. We force it to use
513    // backing array.
514    CodedInputStream cis = createCis(buf);
515    int headerSize = cis.readRawVarint32();
516    offset = cis.getTotalBytesRead();
517    Message.Builder builder = RequestHeader.newBuilder();
518    ProtobufUtil.mergeFrom(builder, cis, headerSize);
519    RequestHeader header = (RequestHeader) builder.build();
520    offset += headerSize;
521    Context traceCtx = GlobalOpenTelemetry.getPropagators().getTextMapPropagator()
522      .extract(Context.current(), header.getTraceInfo(), getter);
523
524    // n.b. Management of this Span instance is a little odd. Most exit paths from this try scope
525    // are early-exits due to error cases. There's only one success path, the asynchronous call to
526    // RpcScheduler#dispatch. The success path assumes ownership of the span, which is represented
527    // by null-ing out the reference in this scope. All other paths end the span. Thus, and in
528    // order to avoid accidentally orphaning the span, the call to Span#end happens in a finally
529    // block iff the span is non-null.
530    Span span = TraceUtil.createRemoteSpan("RpcServer.process", traceCtx);
531    try (Scope ignored = span.makeCurrent()) {
532      int id = header.getCallId();
533      if (RpcServer.LOG.isTraceEnabled()) {
534        RpcServer.LOG.trace("RequestHeader " + TextFormat.shortDebugString(header)
535          + " totalRequestSize: " + totalRequestSize + " bytes");
536      }
537      // Enforcing the call queue size, this triggers a retry in the client
538      // This is a bit late to be doing this check - we have already read in the
539      // total request.
540      if (
541        (totalRequestSize + this.rpcServer.callQueueSizeInBytes.sum())
542            > this.rpcServer.maxQueueSizeInBytes
543      ) {
544        final ServerCall<?> callTooBig = createCall(id, this.service, null, null, null, null,
545          totalRequestSize, null, 0, this.callCleanup);
546        this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION);
547        callTooBig.setResponse(null, null, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION,
548          "Call queue is full on " + this.rpcServer.server.getServerName()
549            + ", is hbase.ipc.server.max.callqueue.size too small?");
550        TraceUtil.setError(span, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION);
551        callTooBig.sendResponseIfReady();
552        return;
553      }
554      MethodDescriptor md = null;
555      Message param = null;
556      CellScanner cellScanner = null;
557      try {
558        if (header.hasRequestParam() && header.getRequestParam()) {
559          md = this.service.getDescriptorForType().findMethodByName(header.getMethodName());
560          if (md == null) {
561            throw new UnsupportedOperationException(header.getMethodName());
562          }
563          builder = this.service.getRequestPrototype(md).newBuilderForType();
564          cis.resetSizeCounter();
565          int paramSize = cis.readRawVarint32();
566          offset += cis.getTotalBytesRead();
567          if (builder != null) {
568            ProtobufUtil.mergeFrom(builder, cis, paramSize);
569            param = builder.build();
570          }
571          offset += paramSize;
572        } else {
573          // currently header must have request param, so we directly throw
574          // exception here
575          String msg = "Invalid request header: " + TextFormat.shortDebugString(header)
576            + ", should have param set in it";
577          RpcServer.LOG.warn(msg);
578          throw new DoNotRetryIOException(msg);
579        }
580        if (header.hasCellBlockMeta()) {
581          buf.position(offset);
582          ByteBuff dup = buf.duplicate();
583          dup.limit(offset + header.getCellBlockMeta().getLength());
584          cellScanner = this.rpcServer.cellBlockBuilder.createCellScannerReusingBuffers(this.codec,
585            this.compressionCodec, dup);
586        }
587      } catch (Throwable thrown) {
588        InetSocketAddress address = this.rpcServer.getListenerAddress();
589        String msg = (address != null ? address : "(channel closed)")
590          + " is unable to read call parameter from client " + getHostAddress();
591        RpcServer.LOG.warn(msg, thrown);
592
593        this.rpcServer.metrics.exception(thrown);
594
595        final Throwable responseThrowable;
596        if (thrown instanceof LinkageError) {
597          // probably the hbase hadoop version does not match the running hadoop version
598          responseThrowable = new DoNotRetryIOException(thrown);
599        } else if (thrown instanceof UnsupportedOperationException) {
600          // If the method is not present on the server, do not retry.
601          responseThrowable = new DoNotRetryIOException(thrown);
602        } else {
603          responseThrowable = thrown;
604        }
605
606        ServerCall<?> readParamsFailedCall = createCall(id, this.service, null, null, null, null,
607          totalRequestSize, null, 0, this.callCleanup);
608        readParamsFailedCall.setResponse(null, null, responseThrowable,
609          msg + "; " + responseThrowable.getMessage());
610        TraceUtil.setError(span, responseThrowable);
611        readParamsFailedCall.sendResponseIfReady();
612        return;
613      }
614
615      int timeout = 0;
616      if (header.hasTimeout() && header.getTimeout() > 0) {
617        timeout = Math.max(this.rpcServer.minClientRequestTimeout, header.getTimeout());
618      }
619      ServerCall<?> call = createCall(id, this.service, md, header, param, cellScanner,
620        totalRequestSize, this.addr, timeout, this.callCleanup);
621
622      if (this.rpcServer.scheduler.dispatch(new CallRunner(this.rpcServer, call))) {
623        // unset span do that it's not closed in the finally block
624        span = null;
625      } else {
626        this.rpcServer.callQueueSizeInBytes.add(-1 * call.getSize());
627        this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION);
628        call.setResponse(null, null, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION,
629          "Call queue is full on " + this.rpcServer.server.getServerName()
630            + ", too many items queued ?");
631        TraceUtil.setError(span, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION);
632        call.sendResponseIfReady();
633      }
634    } finally {
635      if (span != null) {
636        span.end();
637      }
638    }
639  }
640
641  protected final RpcResponse getErrorResponse(String msg, Exception e) throws IOException {
642    ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder().setCallId(-1);
643    ServerCall.setExceptionResponse(e, msg, headerBuilder);
644    ByteBuffer headerBuf =
645      ServerCall.createHeaderAndMessageBytes(null, headerBuilder.build(), 0, null);
646    BufferChain buf = new BufferChain(headerBuf);
647    return () -> buf;
648  }
649
650  private void doBadPreambleHandling(String msg) throws IOException {
651    doBadPreambleHandling(msg, new FatalConnectionException(msg));
652  }
653
654  private void doBadPreambleHandling(String msg, Exception e) throws IOException {
655    RpcServer.LOG.warn(msg);
656    doRespond(getErrorResponse(msg, e));
657  }
658
659  protected final void callCleanupIfNeeded() {
660    if (callCleanup != null) {
661      callCleanup.run();
662      callCleanup = null;
663    }
664  }
665
666  protected final boolean processPreamble(ByteBuffer preambleBuffer) throws IOException {
667    assert preambleBuffer.remaining() == 6;
668    for (int i = 0; i < RPC_HEADER.length; i++) {
669      if (RPC_HEADER[i] != preambleBuffer.get()) {
670        doBadPreambleHandling(
671          "Expected HEADER=" + Bytes.toStringBinary(RPC_HEADER) + " but received HEADER="
672            + Bytes.toStringBinary(preambleBuffer.array(), 0, RPC_HEADER.length) + " from "
673            + toString());
674        return false;
675      }
676    }
677    int version = preambleBuffer.get() & 0xFF;
678    byte authbyte = preambleBuffer.get();
679
680    if (version != RpcServer.CURRENT_VERSION) {
681      String msg = getFatalConnectionString(version, authbyte);
682      doBadPreambleHandling(msg, new WrongVersionException(msg));
683      return false;
684    }
685    this.provider = this.saslProviders.selectProvider(authbyte);
686    if (this.provider == null) {
687      String msg = getFatalConnectionString(version, authbyte);
688      doBadPreambleHandling(msg, new BadAuthException(msg));
689      return false;
690    }
691    // TODO this is a wart while simple auth'n doesn't go through sasl.
692    if (this.rpcServer.isSecurityEnabled && isSimpleAuthentication()) {
693      if (this.rpcServer.allowFallbackToSimpleAuth) {
694        this.rpcServer.metrics.authenticationFallback();
695        authenticatedWithFallback = true;
696      } else {
697        AccessDeniedException ae = new AccessDeniedException("Authentication is required");
698        doRespond(getErrorResponse(ae.getMessage(), ae));
699        return false;
700      }
701    }
702    if (!this.rpcServer.isSecurityEnabled && !isSimpleAuthentication()) {
703      doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(SaslUtil.SWITCH_TO_SIMPLE_AUTH), null,
704        null);
705      provider = saslProviders.getSimpleProvider();
706      // client has already sent the initial Sasl message and we
707      // should ignore it. Both client and server should fall back
708      // to simple auth from now on.
709      skipInitialSaslHandshake = true;
710    }
711    useSasl = !(provider instanceof SimpleSaslServerAuthenticationProvider);
712    return true;
713  }
714
715  boolean isSimpleAuthentication() {
716    return Objects.requireNonNull(provider) instanceof SimpleSaslServerAuthenticationProvider;
717  }
718
719  public abstract boolean isConnectionOpen();
720
721  public abstract ServerCall<?> createCall(int id, BlockingService service, MethodDescriptor md,
722    RequestHeader header, Message param, CellScanner cellScanner, long size,
723    InetAddress remoteAddress, int timeout, CallCleanup reqCleanup);
724
725  private static class ByteBuffByteInput extends ByteInput {
726
727    private ByteBuff buf;
728    private int length;
729
730    ByteBuffByteInput(ByteBuff buf, int length) {
731      this.buf = buf;
732      this.length = length;
733    }
734
735    @Override
736    public byte read(int offset) {
737      return this.buf.get(offset);
738    }
739
740    @Override
741    public int read(int offset, byte[] out, int outOffset, int len) {
742      this.buf.get(offset, out, outOffset, len);
743      return len;
744    }
745
746    @Override
747    public int read(int offset, ByteBuffer out) {
748      int len = out.remaining();
749      this.buf.get(out, offset, len);
750      return len;
751    }
752
753    @Override
754    public int size() {
755      return this.length;
756    }
757  }
758}