001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import static org.apache.hadoop.hbase.HConstants.RPC_HEADER;
021
022import io.opentelemetry.api.GlobalOpenTelemetry;
023import io.opentelemetry.api.trace.Span;
024import io.opentelemetry.context.Context;
025import io.opentelemetry.context.Scope;
026import io.opentelemetry.context.propagation.TextMapGetter;
027import java.io.Closeable;
028import java.io.DataOutputStream;
029import java.io.IOException;
030import java.net.InetAddress;
031import java.net.InetSocketAddress;
032import java.nio.ByteBuffer;
033import java.security.GeneralSecurityException;
034import java.security.cert.X509Certificate;
035import java.util.Collections;
036import java.util.Map;
037import java.util.Objects;
038import java.util.Properties;
039import org.apache.commons.crypto.cipher.CryptoCipherFactory;
040import org.apache.commons.crypto.random.CryptoRandom;
041import org.apache.commons.crypto.random.CryptoRandomFactory;
042import org.apache.hadoop.hbase.DoNotRetryIOException;
043import org.apache.hadoop.hbase.ExtendedCellScanner;
044import org.apache.hadoop.hbase.client.ConnectionRegistryEndpoint;
045import org.apache.hadoop.hbase.client.VersionInfoUtil;
046import org.apache.hadoop.hbase.codec.Codec;
047import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
048import org.apache.hadoop.hbase.io.crypto.aes.CryptoAES;
049import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
050import org.apache.hadoop.hbase.nio.ByteBuff;
051import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
052import org.apache.hadoop.hbase.security.AccessDeniedException;
053import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
054import org.apache.hadoop.hbase.security.SaslStatus;
055import org.apache.hadoop.hbase.security.SaslUtil;
056import org.apache.hadoop.hbase.security.User;
057import org.apache.hadoop.hbase.security.provider.SaslServerAuthenticationProvider;
058import org.apache.hadoop.hbase.security.provider.SimpleSaslServerAuthenticationProvider;
059import org.apache.hadoop.hbase.trace.TraceUtil;
060import org.apache.hadoop.hbase.util.ByteBufferUtils;
061import org.apache.hadoop.hbase.util.Bytes;
062import org.apache.hadoop.hbase.util.Pair;
063import org.apache.hadoop.io.IntWritable;
064import org.apache.hadoop.io.Writable;
065import org.apache.hadoop.io.WritableUtils;
066import org.apache.hadoop.io.compress.CompressionCodec;
067import org.apache.hadoop.security.UserGroupInformation;
068import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
069import org.apache.hadoop.security.authorize.AuthorizationException;
070import org.apache.hadoop.security.authorize.ProxyUsers;
071import org.apache.yetus.audience.InterfaceAudience;
072
073import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
074import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
075import org.apache.hbase.thirdparty.com.google.protobuf.ByteInput;
076import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
077import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream;
078import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
079import org.apache.hbase.thirdparty.com.google.protobuf.Message;
080import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
081import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
082
083import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
084import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
085import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo;
086import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
087import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
088import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
089import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader;
090import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.SecurityPreamableResponse;
091import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation;
092import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetConnectionRegistryResponse;
093import org.apache.hadoop.hbase.shaded.protobuf.generated.TracingProtos.RPCTInfo;
094
095/** Reads calls from a connection and queues them for handling. */
096@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "VO_VOLATILE_INCREMENT",
097    justification = "False positive according to http://sourceforge.net/p/findbugs/bugs/1032/")
098@InterfaceAudience.Private
099public abstract class ServerRpcConnection implements Closeable {
100
101  private static final TextMapGetter<RPCTInfo> getter = new RPCTInfoGetter();
102
103  protected final RpcServer rpcServer;
104  // If the connection header has been read or not.
105  protected boolean connectionHeaderRead = false;
106
107  protected CallCleanup callCleanup;
108
109  // Cache the remote host & port info so that even if the socket is
110  // disconnected, we can say where it used to connect to.
111  protected String hostAddress;
112  protected int remotePort;
113  protected InetAddress addr;
114  protected ConnectionHeader connectionHeader;
115  protected Map<String, byte[]> connectionAttributes;
116
117  /**
118   * Codec the client asked use.
119   */
120  protected Codec codec;
121  /**
122   * Compression codec the client asked us use.
123   */
124  protected CompressionCodec compressionCodec;
125  protected BlockingService service;
126
127  protected SaslServerAuthenticationProvider provider;
128  protected boolean skipInitialSaslHandshake;
129  protected boolean useSasl;
130  protected HBaseSaslRpcServer saslServer;
131
132  // was authentication allowed with a fallback to simple auth
133  protected boolean authenticatedWithFallback;
134
135  protected boolean retryImmediatelySupported = false;
136
137  protected User user = null;
138  protected UserGroupInformation ugi = null;
139  protected X509Certificate[] clientCertificateChain = null;
140
141  public ServerRpcConnection(RpcServer rpcServer) {
142    this.rpcServer = rpcServer;
143    this.callCleanup = null;
144  }
145
146  @Override
147  public String toString() {
148    return getHostAddress() + ":" + remotePort;
149  }
150
151  public String getHostAddress() {
152    return hostAddress;
153  }
154
155  public InetAddress getHostInetAddress() {
156    return addr;
157  }
158
159  public int getRemotePort() {
160    return remotePort;
161  }
162
163  public VersionInfo getVersionInfo() {
164    if (connectionHeader != null && connectionHeader.hasVersionInfo()) {
165      return connectionHeader.getVersionInfo();
166    }
167    return null;
168  }
169
170  private String getFatalConnectionString(final int version, final byte authByte) {
171    return "serverVersion=" + RpcServer.CURRENT_VERSION + ", clientVersion=" + version
172      + ", authMethod=" + authByte +
173      // The provider may be null if we failed to parse the header of the request
174      ", authName=" + (provider == null ? "unknown" : provider.getSaslAuthMethod().getName())
175      + " from " + toString();
176  }
177
178  /**
179   * Set up cell block codecs
180   */
181  private void setupCellBlockCodecs() throws FatalConnectionException {
182    // TODO: Plug in other supported decoders.
183    if (!connectionHeader.hasCellBlockCodecClass()) {
184      return;
185    }
186    String className = connectionHeader.getCellBlockCodecClass();
187    if (className == null || className.length() == 0) {
188      return;
189    }
190    try {
191      this.codec = (Codec) Class.forName(className).getDeclaredConstructor().newInstance();
192    } catch (Exception e) {
193      throw new UnsupportedCellCodecException(className, e);
194    }
195    if (!connectionHeader.hasCellBlockCompressorClass()) {
196      return;
197    }
198    className = connectionHeader.getCellBlockCompressorClass();
199    try {
200      this.compressionCodec =
201        (CompressionCodec) Class.forName(className).getDeclaredConstructor().newInstance();
202    } catch (Exception e) {
203      throw new UnsupportedCompressionCodecException(className, e);
204    }
205  }
206
207  /**
208   * Set up cipher for rpc encryption with Apache Commons Crypto.
209   */
210  private Pair<RPCProtos.ConnectionHeaderResponse, CryptoAES> setupCryptoCipher()
211    throws FatalConnectionException {
212    // If simple auth, return
213    if (saslServer == null) {
214      return null;
215    }
216    // check if rpc encryption with Crypto AES
217    String qop = saslServer.getNegotiatedQop();
218    boolean isEncryption = SaslUtil.QualityOfProtection.PRIVACY.getSaslQop().equalsIgnoreCase(qop);
219    boolean isCryptoAesEncryption = isEncryption
220      && this.rpcServer.conf.getBoolean("hbase.rpc.crypto.encryption.aes.enabled", false);
221    if (!isCryptoAesEncryption) {
222      return null;
223    }
224    if (!connectionHeader.hasRpcCryptoCipherTransformation()) {
225      return null;
226    }
227    String transformation = connectionHeader.getRpcCryptoCipherTransformation();
228    if (transformation == null || transformation.length() == 0) {
229      return null;
230    }
231    // Negotiates AES based on complete saslServer.
232    // The Crypto metadata need to be encrypted and send to client.
233    Properties properties = new Properties();
234    // the property for SecureRandomFactory
235    properties.setProperty(CryptoRandomFactory.CLASSES_KEY,
236      this.rpcServer.conf.get("hbase.crypto.sasl.encryption.aes.crypto.random",
237        "org.apache.commons.crypto.random.JavaCryptoRandom"));
238    // the property for cipher class
239    properties.setProperty(CryptoCipherFactory.CLASSES_KEY,
240      this.rpcServer.conf.get("hbase.rpc.crypto.encryption.aes.cipher.class",
241        "org.apache.commons.crypto.cipher.JceCipher"));
242
243    int cipherKeyBits =
244      this.rpcServer.conf.getInt("hbase.rpc.crypto.encryption.aes.cipher.keySizeBits", 128);
245    // generate key and iv
246    if (cipherKeyBits % 8 != 0) {
247      throw new IllegalArgumentException(
248        "The AES cipher key size in bits" + " should be a multiple of byte");
249    }
250    int len = cipherKeyBits / 8;
251    byte[] inKey = new byte[len];
252    byte[] outKey = new byte[len];
253    byte[] inIv = new byte[len];
254    byte[] outIv = new byte[len];
255
256    CryptoAES cryptoAES;
257    try {
258      // generate the cipher meta data with SecureRandom
259      CryptoRandom secureRandom = CryptoRandomFactory.getCryptoRandom(properties);
260      secureRandom.nextBytes(inKey);
261      secureRandom.nextBytes(outKey);
262      secureRandom.nextBytes(inIv);
263      secureRandom.nextBytes(outIv);
264
265      // create CryptoAES for server
266      cryptoAES = new CryptoAES(transformation, properties, inKey, outKey, inIv, outIv);
267    } catch (GeneralSecurityException | IOException ex) {
268      throw new UnsupportedCryptoException(ex.getMessage(), ex);
269    }
270    // create SaslCipherMeta and send to client,
271    // for client, the [inKey, outKey], [inIv, outIv] should be reversed
272    RPCProtos.CryptoCipherMeta.Builder ccmBuilder = RPCProtos.CryptoCipherMeta.newBuilder();
273    ccmBuilder.setTransformation(transformation);
274    ccmBuilder.setInIv(getByteString(outIv));
275    ccmBuilder.setInKey(getByteString(outKey));
276    ccmBuilder.setOutIv(getByteString(inIv));
277    ccmBuilder.setOutKey(getByteString(inKey));
278    RPCProtos.ConnectionHeaderResponse resp =
279      RPCProtos.ConnectionHeaderResponse.newBuilder().setCryptoCipherMeta(ccmBuilder).build();
280    return Pair.newPair(resp, cryptoAES);
281  }
282
283  private ByteString getByteString(byte[] bytes) {
284    // return singleton to reduce object allocation
285    return (bytes.length == 0) ? ByteString.EMPTY : ByteString.copyFrom(bytes);
286  }
287
288  private UserGroupInformation createUser(ConnectionHeader head) {
289    UserGroupInformation ugi = null;
290
291    if (!head.hasUserInfo()) {
292      return null;
293    }
294    UserInformation userInfoProto = head.getUserInfo();
295    String effectiveUser = null;
296    if (userInfoProto.hasEffectiveUser()) {
297      effectiveUser = userInfoProto.getEffectiveUser();
298    }
299    String realUser = null;
300    if (userInfoProto.hasRealUser()) {
301      realUser = userInfoProto.getRealUser();
302    }
303    if (effectiveUser != null) {
304      if (realUser != null) {
305        UserGroupInformation realUserUgi = UserGroupInformation.createRemoteUser(realUser);
306        ugi = UserGroupInformation.createProxyUser(effectiveUser, realUserUgi);
307      } else {
308        ugi = UserGroupInformation.createRemoteUser(effectiveUser);
309      }
310    }
311    return ugi;
312  }
313
314  protected final void disposeSasl() {
315    if (saslServer != null) {
316      saslServer.dispose();
317      saslServer = null;
318    }
319  }
320
321  /**
322   * No protobuf encoding of raw sasl messages
323   */
324  protected final void doRawSaslReply(SaslStatus status, Writable rv, String errorClass,
325    String error) throws IOException {
326    BufferChain bc;
327    // In my testing, have noticed that sasl messages are usually
328    // in the ballpark of 100-200. That's why the initial capacity is 256.
329    try (ByteBufferOutputStream saslResponse = new ByteBufferOutputStream(256);
330      DataOutputStream out = new DataOutputStream(saslResponse)) {
331      out.writeInt(status.state); // write status
332      if (status == SaslStatus.SUCCESS) {
333        rv.write(out);
334      } else {
335        WritableUtils.writeString(out, errorClass);
336        WritableUtils.writeString(out, error);
337      }
338      bc = new BufferChain(saslResponse.getByteBuffer());
339    }
340    doRespond(() -> bc);
341  }
342
343  HBaseSaslRpcServer getOrCreateSaslServer() throws IOException {
344    if (saslServer == null) {
345      saslServer = new HBaseSaslRpcServer(provider, rpcServer.saslProps, rpcServer.secretManager);
346    }
347    return saslServer;
348  }
349
350  void finishSaslNegotiation() throws IOException {
351    String negotiatedQop = saslServer.getNegotiatedQop();
352    SaslUtil.verifyNegotiatedQop(saslServer.getRequestedQop(), negotiatedQop);
353    ugi = provider.getAuthorizedUgi(saslServer.getAuthorizationID(), this.rpcServer.secretManager);
354    RpcServer.LOG.debug(
355      "SASL server context established. Authenticated client: {}. Negotiated QoP is {}", ugi,
356      negotiatedQop);
357    rpcServer.metrics.authenticationSuccess();
358    RpcServer.AUDITLOG.info(RpcServer.AUTH_SUCCESSFUL_FOR + ugi);
359  }
360
361  public void processOneRpc(ByteBuff buf) throws IOException, InterruptedException {
362    if (connectionHeaderRead) {
363      processRequest(buf);
364    } else {
365      processConnectionHeader(buf);
366      callCleanupIfNeeded();
367      this.connectionHeaderRead = true;
368      this.rpcServer.getRpcCoprocessorHost().preAuthorizeConnection(connectionHeader, addr);
369      if (rpcServer.needAuthorization() && !authorizeConnection()) {
370        // Throw FatalConnectionException wrapping ACE so client does right thing and closes
371        // down the connection instead of trying to read non-existent retun.
372        throw new AccessDeniedException("Connection from " + this + " for service "
373          + connectionHeader.getServiceName() + " is unauthorized for user: " + ugi);
374      }
375      this.user = this.rpcServer.userProvider.create(this.ugi);
376      this.rpcServer.getRpcCoprocessorHost().postAuthorizeConnection(
377        this.user != null ? this.user.getName() : null, this.clientCertificateChain);
378    }
379  }
380
381  private boolean authorizeConnection() throws IOException {
382    try {
383      // If auth method is DIGEST, the token was obtained by the
384      // real user for the effective user, therefore not required to
385      // authorize real user. doAs is allowed only for simple or kerberos
386      // authentication
387      if (ugi != null && ugi.getRealUser() != null && provider.supportsProtocolAuthentication()) {
388        ProxyUsers.authorize(ugi, this.getHostAddress(), this.rpcServer.conf);
389      }
390      this.rpcServer.authorize(ugi, connectionHeader, getHostInetAddress());
391      this.rpcServer.metrics.authorizationSuccess();
392    } catch (AuthorizationException ae) {
393      if (RpcServer.LOG.isDebugEnabled()) {
394        RpcServer.LOG.debug("Connection authorization failed: " + ae.getMessage(), ae);
395      }
396      this.rpcServer.metrics.authorizationFailure();
397      doRespond(getErrorResponse(ae.getMessage(), new AccessDeniedException(ae)));
398      return false;
399    }
400    return true;
401  }
402
403  private CodedInputStream createCis(ByteBuff buf) {
404    // Here we read in the header. We avoid having pb
405    // do its default 4k allocation for CodedInputStream. We force it to use
406    // backing array.
407    CodedInputStream cis;
408    if (buf.hasArray()) {
409      cis = UnsafeByteOperations
410        .unsafeWrap(buf.array(), buf.arrayOffset() + buf.position(), buf.limit()).newCodedInput();
411    } else {
412      cis = UnsafeByteOperations.unsafeWrap(new ByteBuffByteInput(buf, buf.limit()), 0, buf.limit())
413        .newCodedInput();
414    }
415    cis.enableAliasing(true);
416    return cis;
417  }
418
419  // Reads the connection header following version
420  private void processConnectionHeader(ByteBuff buf) throws IOException {
421    this.connectionHeader = ConnectionHeader.parseFrom(createCis(buf));
422
423    // we want to copy the attributes prior to releasing the buffer so that they don't get corrupted
424    // eventually
425    if (connectionHeader.getAttributeList().isEmpty()) {
426      this.connectionAttributes = Collections.emptyMap();
427    } else {
428      this.connectionAttributes =
429        Maps.newHashMapWithExpectedSize(connectionHeader.getAttributeList().size());
430      for (HBaseProtos.NameBytesPair nameBytesPair : connectionHeader.getAttributeList()) {
431        this.connectionAttributes.put(nameBytesPair.getName(),
432          nameBytesPair.getValue().toByteArray());
433      }
434    }
435    String serviceName = connectionHeader.getServiceName();
436    if (serviceName == null) {
437      throw new EmptyServiceNameException();
438    }
439    this.service = RpcServer.getService(this.rpcServer.services, serviceName);
440    if (this.service == null) {
441      throw new UnknownServiceException(serviceName);
442    }
443    setupCellBlockCodecs();
444    sendConnectionHeaderResponseIfNeeded();
445    UserGroupInformation protocolUser = createUser(connectionHeader);
446    if (!useSasl) {
447      ugi = protocolUser;
448      if (ugi != null) {
449        ugi.setAuthenticationMethod(AuthenticationMethod.SIMPLE);
450      }
451      // audit logging for SASL authenticated users happens in saslReadAndProcess()
452      if (authenticatedWithFallback) {
453        RpcServer.LOG.warn("Allowed fallback to SIMPLE auth for {} connecting from {}", ugi,
454          getHostAddress());
455      }
456    } else {
457      // user is authenticated
458      ugi.setAuthenticationMethod(provider.getSaslAuthMethod().getAuthMethod());
459      // Now we check if this is a proxy user case. If the protocol user is
460      // different from the 'user', it is a proxy user scenario. However,
461      // this is not allowed if user authenticated with DIGEST.
462      if ((protocolUser != null) && (!protocolUser.getUserName().equals(ugi.getUserName()))) {
463        if (!provider.supportsProtocolAuthentication()) {
464          // Not allowed to doAs if token authentication is used
465          throw new AccessDeniedException("Authenticated user (" + ugi
466            + ") doesn't match what the client claims to be (" + protocolUser + ")");
467        } else {
468          // Effective user can be different from authenticated user
469          // for simple auth or kerberos auth
470          // The user is the real user. Now we create a proxy user
471          UserGroupInformation realUser = ugi;
472          ugi = UserGroupInformation.createProxyUser(protocolUser.getUserName(), realUser);
473          // Now the user is a proxy user, set Authentication method Proxy.
474          ugi.setAuthenticationMethod(AuthenticationMethod.PROXY);
475        }
476      }
477    }
478    String version;
479    if (this.connectionHeader.hasVersionInfo()) {
480      // see if this connection will support RetryImmediatelyException
481      this.retryImmediatelySupported = VersionInfoUtil.hasMinimumVersion(getVersionInfo(), 1, 2);
482      version = this.connectionHeader.getVersionInfo().getVersion();
483    } else {
484      version = "UNKNOWN";
485    }
486    RpcServer.AUDITLOG.info("Connection from {}:{}, version={}, sasl={}, ugi={}, service={}",
487      this.hostAddress, this.remotePort, version, this.useSasl, this.ugi, serviceName);
488  }
489
490  public ConnectionHeader getConnectionHeader() {
491    return connectionHeader;
492  }
493
494  /**
495   * Send the response for connection header
496   */
497  private void sendConnectionHeaderResponseIfNeeded() throws FatalConnectionException {
498    Pair<RPCProtos.ConnectionHeaderResponse, CryptoAES> pair = setupCryptoCipher();
499    // Response the connection header if Crypto AES is enabled
500    if (pair == null) {
501      return;
502    }
503    try {
504      int size = pair.getFirst().getSerializedSize();
505      BufferChain bc;
506      try (ByteBufferOutputStream bbOut = new ByteBufferOutputStream(4 + size);
507        DataOutputStream out = new DataOutputStream(bbOut)) {
508        out.writeInt(size);
509        pair.getFirst().writeTo(out);
510        bc = new BufferChain(bbOut.getByteBuffer());
511      }
512      doRespond(new RpcResponse() {
513
514        @Override
515        public BufferChain getResponse() {
516          return bc;
517        }
518
519        @Override
520        public void done() {
521          // must switch after sending the connection header response, as the client still uses the
522          // original SaslClient to unwrap the data we send back
523          saslServer.switchToCryptoAES(pair.getSecond());
524        }
525      });
526    } catch (IOException ex) {
527      throw new UnsupportedCryptoException(ex.getMessage(), ex);
528    }
529  }
530
531  protected abstract void doRespond(RpcResponse resp) throws IOException;
532
533  /**
534   * Has the request header and the request param and optionally encoded data buffer all in this one
535   * array.
536   * <p/>
537   * Will be overridden in tests.
538   */
539  protected void processRequest(ByteBuff buf) throws IOException, InterruptedException {
540    long totalRequestSize = buf.limit();
541    int offset = 0;
542    // Here we read in the header. We avoid having pb
543    // do its default 4k allocation for CodedInputStream. We force it to use
544    // backing array.
545    CodedInputStream cis = createCis(buf);
546    int headerSize = cis.readRawVarint32();
547    offset = cis.getTotalBytesRead();
548    Message.Builder builder = RequestHeader.newBuilder();
549    ProtobufUtil.mergeFrom(builder, cis, headerSize);
550    RequestHeader header = (RequestHeader) builder.build();
551    offset += headerSize;
552    Context traceCtx = GlobalOpenTelemetry.getPropagators().getTextMapPropagator()
553      .extract(Context.current(), header.getTraceInfo(), getter);
554
555    // n.b. Management of this Span instance is a little odd. Most exit paths from this try scope
556    // are early-exits due to error cases. There's only one success path, the asynchronous call to
557    // RpcScheduler#dispatch. The success path assumes ownership of the span, which is represented
558    // by null-ing out the reference in this scope. All other paths end the span. Thus, and in
559    // order to avoid accidentally orphaning the span, the call to Span#end happens in a finally
560    // block iff the span is non-null.
561    Span span = TraceUtil.createRemoteSpan("RpcServer.process", traceCtx);
562    try (Scope ignored = span.makeCurrent()) {
563      int id = header.getCallId();
564      // HBASE-28128 - if server is aborting, don't bother trying to process. It will
565      // fail at the handler layer, but worse might result in CallQueueTooBigException if the
566      // queue is full but server is not properly processing requests. Better to throw an aborted
567      // exception here so that the client can properly react.
568      if (rpcServer.server != null && rpcServer.server.isAborted()) {
569        RegionServerAbortedException serverIsAborted = new RegionServerAbortedException(
570          "Server " + rpcServer.server.getServerName() + " aborting");
571        this.rpcServer.metrics.exception(serverIsAborted);
572        sendErrorResponseForCall(id, totalRequestSize, span, serverIsAborted.getMessage(),
573          serverIsAborted);
574        return;
575      }
576
577      if (RpcServer.LOG.isTraceEnabled()) {
578        RpcServer.LOG.trace("RequestHeader " + TextFormat.shortDebugString(header)
579          + " totalRequestSize: " + totalRequestSize + " bytes");
580      }
581      // Enforcing the call queue size, this triggers a retry in the client
582      // This is a bit late to be doing this check - we have already read in the
583      // total request.
584      if (
585        (totalRequestSize + this.rpcServer.callQueueSizeInBytes.sum())
586            > this.rpcServer.maxQueueSizeInBytes
587      ) {
588        this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION);
589        sendErrorResponseForCall(id, totalRequestSize, span,
590          "Call queue is full on " + this.rpcServer.server.getServerName()
591            + ", is hbase.ipc.server.max.callqueue.size too small?",
592          RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION);
593        return;
594      }
595      MethodDescriptor md = null;
596      Message param = null;
597      ExtendedCellScanner cellScanner = null;
598      try {
599        if (header.hasRequestParam() && header.getRequestParam()) {
600          md = this.service.getDescriptorForType().findMethodByName(header.getMethodName());
601          if (md == null) {
602            throw new UnsupportedOperationException(header.getMethodName());
603          }
604          builder = this.service.getRequestPrototype(md).newBuilderForType();
605          cis.resetSizeCounter();
606          int paramSize = cis.readRawVarint32();
607          offset += cis.getTotalBytesRead();
608          if (builder != null) {
609            ProtobufUtil.mergeFrom(builder, cis, paramSize);
610            param = builder.build();
611          }
612          offset += paramSize;
613        } else {
614          // currently header must have request param, so we directly throw
615          // exception here
616          String msg = "Invalid request header: " + TextFormat.shortDebugString(header)
617            + ", should have param set in it";
618          RpcServer.LOG.warn(msg);
619          throw new DoNotRetryIOException(msg);
620        }
621        if (header.hasCellBlockMeta()) {
622          buf.position(offset);
623          ByteBuff dup = buf.duplicate();
624          dup.limit(offset + header.getCellBlockMeta().getLength());
625          cellScanner = this.rpcServer.cellBlockBuilder.createCellScannerReusingBuffers(this.codec,
626            this.compressionCodec, dup);
627        }
628      } catch (Throwable thrown) {
629        InetSocketAddress address = this.rpcServer.getListenerAddress();
630        String msg = (address != null ? address : "(channel closed)")
631          + " is unable to read call parameter from client " + getHostAddress();
632        RpcServer.LOG.warn(msg, thrown);
633
634        this.rpcServer.metrics.exception(thrown);
635
636        final Throwable responseThrowable;
637        if (thrown instanceof LinkageError) {
638          // probably the hbase hadoop version does not match the running hadoop version
639          responseThrowable = new DoNotRetryIOException(thrown);
640        } else if (thrown instanceof UnsupportedOperationException) {
641          // If the method is not present on the server, do not retry.
642          responseThrowable = new DoNotRetryIOException(thrown);
643        } else {
644          responseThrowable = thrown;
645        }
646
647        sendErrorResponseForCall(id, totalRequestSize, span,
648          msg + "; " + responseThrowable.getMessage(), responseThrowable);
649        return;
650      }
651
652      int timeout = 0;
653      if (header.hasTimeout() && header.getTimeout() > 0) {
654        timeout = Math.max(this.rpcServer.minClientRequestTimeout, header.getTimeout());
655      }
656      ServerCall<?> call = createCall(id, this.service, md, header, param, cellScanner,
657        totalRequestSize, this.addr, timeout, this.callCleanup);
658
659      if (this.rpcServer.scheduler.dispatch(new CallRunner(this.rpcServer, call))) {
660        // unset span do that it's not closed in the finally block
661        span = null;
662      } else {
663        this.rpcServer.callQueueSizeInBytes.add(-1 * call.getSize());
664        this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION);
665        call.setResponse(null, null, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION,
666          "Call queue is full on " + this.rpcServer.server.getServerName()
667            + ", too many items queued ?");
668        TraceUtil.setError(span, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION);
669        call.sendResponseIfReady();
670      }
671    } finally {
672      if (span != null) {
673        span.end();
674      }
675    }
676  }
677
678  private void sendErrorResponseForCall(int id, long totalRequestSize, Span span, String msg,
679    Throwable responseThrowable) throws IOException {
680    ServerCall<?> failedcall = createCall(id, this.service, null, null, null, null,
681      totalRequestSize, null, 0, this.callCleanup);
682    failedcall.setResponse(null, null, responseThrowable, msg);
683    TraceUtil.setError(span, responseThrowable);
684    failedcall.sendResponseIfReady();
685  }
686
687  protected final RpcResponse getErrorResponse(String msg, Exception e) throws IOException {
688    ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder().setCallId(-1);
689    ServerCall.setExceptionResponse(e, msg, headerBuilder);
690    ByteBuffer headerBuf =
691      ServerCall.createHeaderAndMessageBytes(null, headerBuilder.build(), 0, null);
692    BufferChain buf = new BufferChain(headerBuf);
693    return () -> buf;
694  }
695
696  private void doBadPreambleHandling(String msg) throws IOException {
697    doBadPreambleHandling(msg, new FatalConnectionException(msg));
698  }
699
700  private void doBadPreambleHandling(String msg, Exception e) throws IOException {
701    RpcServer.LOG.warn(msg, e);
702    doRespond(getErrorResponse(msg, e));
703  }
704
705  private void doPreambleResponse(Message resp) throws IOException {
706    ResponseHeader header = ResponseHeader.newBuilder().setCallId(-1).build();
707    ByteBuffer buf = ServerCall.createHeaderAndMessageBytes(resp, header, 0, null);
708    BufferChain bufChain = new BufferChain(buf);
709    doRespond(() -> bufChain);
710  }
711
712  private boolean doConnectionRegistryResponse() throws IOException {
713    if (!(rpcServer.server instanceof ConnectionRegistryEndpoint)) {
714      // should be in tests or some scenarios where we should not reach here
715      return false;
716    }
717    // on backup masters, this request may be blocked since we need to fetch it from filesystem,
718    // but since it is just backup master, it is not a critical problem
719    String clusterId = ((ConnectionRegistryEndpoint) rpcServer.server).getClusterId();
720    RpcServer.LOG.debug("Response connection registry, clusterId = '{}'", clusterId);
721    if (clusterId == null) {
722      // should be in tests or some scenarios where we should not reach here
723      return false;
724    }
725    GetConnectionRegistryResponse resp =
726      GetConnectionRegistryResponse.newBuilder().setClusterId(clusterId).build();
727    doPreambleResponse(resp);
728    return true;
729  }
730
731  private void doSecurityPreambleResponse() throws IOException {
732    if (rpcServer.isSecurityEnabled) {
733      SecurityPreamableResponse resp = SecurityPreamableResponse.newBuilder()
734        .setServerPrincipal(rpcServer.serverPrincipal).build();
735      doPreambleResponse(resp);
736    } else {
737      // security is not enabled, do not need a principal when connecting, throw a special exception
738      // to let client know it should just use simple authentication
739      doRespond(getErrorResponse("security is not enabled", new SecurityNotEnabledException()));
740    }
741  }
742
743  protected final void callCleanupIfNeeded() {
744    if (callCleanup != null) {
745      callCleanup.run();
746      callCleanup = null;
747    }
748  }
749
750  protected enum PreambleResponse {
751    SUCCEED, // successfully processed the rpc preamble header
752    CONTINUE, // the preamble header is for other purpose, wait for the rpc preamble header
753    CLOSE // close the rpc connection
754  }
755
756  protected final PreambleResponse processPreamble(ByteBuffer preambleBuffer) throws IOException {
757    assert preambleBuffer.remaining() == 6;
758    if (
759      ByteBufferUtils.equals(preambleBuffer, preambleBuffer.position(), 6,
760        RpcClient.REGISTRY_PREAMBLE_HEADER, 0, 6) && doConnectionRegistryResponse()
761    ) {
762      return PreambleResponse.CLOSE;
763    }
764    if (
765      ByteBufferUtils.equals(preambleBuffer, preambleBuffer.position(), 6,
766        RpcClient.SECURITY_PREAMBLE_HEADER, 0, 6)
767    ) {
768      doSecurityPreambleResponse();
769      return PreambleResponse.CONTINUE;
770    }
771    if (!ByteBufferUtils.equals(preambleBuffer, preambleBuffer.position(), 4, RPC_HEADER, 0, 4)) {
772      doBadPreambleHandling(
773        "Expected HEADER=" + Bytes.toStringBinary(RPC_HEADER) + " but received HEADER="
774          + Bytes.toStringBinary(
775            ByteBufferUtils.toBytes(preambleBuffer, preambleBuffer.position(), RPC_HEADER.length),
776            0, RPC_HEADER.length)
777          + " from " + toString());
778      return PreambleResponse.CLOSE;
779    }
780    int version = preambleBuffer.get(preambleBuffer.position() + 4) & 0xFF;
781    byte authByte = preambleBuffer.get(preambleBuffer.position() + 5);
782    if (version != RpcServer.CURRENT_VERSION) {
783      String msg = getFatalConnectionString(version, authByte);
784      doBadPreambleHandling(msg, new WrongVersionException(msg));
785      return PreambleResponse.CLOSE;
786    }
787
788    this.provider = rpcServer.saslProviders.selectProvider(authByte);
789    if (this.provider == null) {
790      String msg = getFatalConnectionString(version, authByte);
791      doBadPreambleHandling(msg, new BadAuthException(msg));
792      return PreambleResponse.CLOSE;
793    }
794    // TODO this is a wart while simple auth'n doesn't go through sasl.
795    if (this.rpcServer.isSecurityEnabled && isSimpleAuthentication()) {
796      if (this.rpcServer.allowFallbackToSimpleAuth) {
797        this.rpcServer.metrics.authenticationFallback();
798        authenticatedWithFallback = true;
799      } else {
800        AccessDeniedException ae = new AccessDeniedException("Authentication is required");
801        doRespond(getErrorResponse(ae.getMessage(), ae));
802        return PreambleResponse.CLOSE;
803      }
804    }
805    if (!this.rpcServer.isSecurityEnabled && !isSimpleAuthentication()) {
806      doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(SaslUtil.SWITCH_TO_SIMPLE_AUTH), null,
807        null);
808      provider = rpcServer.saslProviders.getSimpleProvider();
809      // client has already sent the initial Sasl message and we
810      // should ignore it. Both client and server should fall back
811      // to simple auth from now on.
812      skipInitialSaslHandshake = true;
813    }
814    useSasl = !(provider instanceof SimpleSaslServerAuthenticationProvider);
815    return PreambleResponse.SUCCEED;
816  }
817
818  boolean isSimpleAuthentication() {
819    return Objects.requireNonNull(provider) instanceof SimpleSaslServerAuthenticationProvider;
820  }
821
822  public abstract boolean isConnectionOpen();
823
824  public abstract ServerCall<?> createCall(int id, BlockingService service, MethodDescriptor md,
825    RequestHeader header, Message param, ExtendedCellScanner cellScanner, long size,
826    InetAddress remoteAddress, int timeout, CallCleanup reqCleanup);
827
828  private static class ByteBuffByteInput extends ByteInput {
829
830    private ByteBuff buf;
831    private int length;
832
833    ByteBuffByteInput(ByteBuff buf, int length) {
834      this.buf = buf;
835      this.length = length;
836    }
837
838    @Override
839    public byte read(int offset) {
840      return this.buf.get(offset);
841    }
842
843    @Override
844    public int read(int offset, byte[] out, int outOffset, int len) {
845      this.buf.get(offset, out, outOffset, len);
846      return len;
847    }
848
849    @Override
850    public int read(int offset, ByteBuffer out) {
851      int len = out.remaining();
852      this.buf.get(out, offset, len);
853      return len;
854    }
855
856    @Override
857    public long readLong(int offset) {
858      return this.buf.getLong(offset);
859    }
860
861    @Override
862    public int size() {
863      return this.length;
864    }
865
866  }
867}