001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import static org.apache.hadoop.hbase.HConstants.RPC_HEADER;
021
022import io.opentelemetry.api.GlobalOpenTelemetry;
023import io.opentelemetry.api.trace.Span;
024import io.opentelemetry.context.Context;
025import io.opentelemetry.context.Scope;
026import io.opentelemetry.context.propagation.TextMapGetter;
027import java.io.Closeable;
028import java.io.DataOutputStream;
029import java.io.IOException;
030import java.net.InetAddress;
031import java.net.InetSocketAddress;
032import java.nio.ByteBuffer;
033import java.security.GeneralSecurityException;
034import java.security.cert.X509Certificate;
035import java.util.Collections;
036import java.util.Map;
037import java.util.Objects;
038import java.util.Properties;
039import org.apache.commons.crypto.cipher.CryptoCipherFactory;
040import org.apache.commons.crypto.random.CryptoRandom;
041import org.apache.commons.crypto.random.CryptoRandomFactory;
042import org.apache.hadoop.hbase.DoNotRetryIOException;
043import org.apache.hadoop.hbase.ExtendedCellScanner;
044import org.apache.hadoop.hbase.client.ConnectionRegistryEndpoint;
045import org.apache.hadoop.hbase.client.VersionInfoUtil;
046import org.apache.hadoop.hbase.codec.Codec;
047import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
048import org.apache.hadoop.hbase.io.crypto.aes.CryptoAES;
049import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
050import org.apache.hadoop.hbase.nio.ByteBuff;
051import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
052import org.apache.hadoop.hbase.security.AccessDeniedException;
053import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
054import org.apache.hadoop.hbase.security.SaslStatus;
055import org.apache.hadoop.hbase.security.SaslUtil;
056import org.apache.hadoop.hbase.security.User;
057import org.apache.hadoop.hbase.security.provider.SaslServerAuthenticationProvider;
058import org.apache.hadoop.hbase.security.provider.SaslServerAuthenticationProviders;
059import org.apache.hadoop.hbase.security.provider.SimpleSaslServerAuthenticationProvider;
060import org.apache.hadoop.hbase.trace.TraceUtil;
061import org.apache.hadoop.hbase.util.ByteBufferUtils;
062import org.apache.hadoop.hbase.util.Bytes;
063import org.apache.hadoop.hbase.util.Pair;
064import org.apache.hadoop.io.IntWritable;
065import org.apache.hadoop.io.Writable;
066import org.apache.hadoop.io.WritableUtils;
067import org.apache.hadoop.io.compress.CompressionCodec;
068import org.apache.hadoop.security.UserGroupInformation;
069import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
070import org.apache.hadoop.security.authorize.AuthorizationException;
071import org.apache.hadoop.security.authorize.ProxyUsers;
072import org.apache.yetus.audience.InterfaceAudience;
073
074import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
075import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
076import org.apache.hbase.thirdparty.com.google.protobuf.ByteInput;
077import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
078import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream;
079import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
080import org.apache.hbase.thirdparty.com.google.protobuf.Message;
081import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
082import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
083
084import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
085import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
086import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo;
087import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
088import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
089import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
090import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader;
091import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.SecurityPreamableResponse;
092import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation;
093import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetConnectionRegistryResponse;
094import org.apache.hadoop.hbase.shaded.protobuf.generated.TracingProtos.RPCTInfo;
095
096/** Reads calls from a connection and queues them for handling. */
097@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "VO_VOLATILE_INCREMENT",
098    justification = "False positive according to http://sourceforge.net/p/findbugs/bugs/1032/")
099@InterfaceAudience.Private
100abstract class ServerRpcConnection implements Closeable {
101
102  private static final TextMapGetter<RPCTInfo> getter = new RPCTInfoGetter();
103
104  protected final RpcServer rpcServer;
105  // If the connection header has been read or not.
106  protected boolean connectionHeaderRead = false;
107
108  protected CallCleanup callCleanup;
109
110  // Cache the remote host & port info so that even if the socket is
111  // disconnected, we can say where it used to connect to.
112  protected String hostAddress;
113  protected int remotePort;
114  protected InetAddress addr;
115  protected ConnectionHeader connectionHeader;
116  protected Map<String, byte[]> connectionAttributes;
117
118  /**
119   * Codec the client asked use.
120   */
121  protected Codec codec;
122  /**
123   * Compression codec the client asked us use.
124   */
125  protected CompressionCodec compressionCodec;
126  protected BlockingService service;
127
128  protected SaslServerAuthenticationProvider provider;
129  protected boolean skipInitialSaslHandshake;
130  protected boolean useSasl;
131  protected HBaseSaslRpcServer saslServer;
132
133  // was authentication allowed with a fallback to simple auth
134  protected boolean authenticatedWithFallback;
135
136  protected boolean retryImmediatelySupported = false;
137
138  protected User user = null;
139  protected UserGroupInformation ugi = null;
140  protected SaslServerAuthenticationProviders saslProviders = null;
141  protected X509Certificate[] clientCertificateChain = null;
142
143  public ServerRpcConnection(RpcServer rpcServer) {
144    this.rpcServer = rpcServer;
145    this.callCleanup = null;
146    this.saslProviders = SaslServerAuthenticationProviders.getInstance(rpcServer.getConf());
147  }
148
149  @Override
150  public String toString() {
151    return getHostAddress() + ":" + remotePort;
152  }
153
154  public String getHostAddress() {
155    return hostAddress;
156  }
157
158  public InetAddress getHostInetAddress() {
159    return addr;
160  }
161
162  public int getRemotePort() {
163    return remotePort;
164  }
165
166  public VersionInfo getVersionInfo() {
167    if (connectionHeader != null && connectionHeader.hasVersionInfo()) {
168      return connectionHeader.getVersionInfo();
169    }
170    return null;
171  }
172
173  private String getFatalConnectionString(final int version, final byte authByte) {
174    return "serverVersion=" + RpcServer.CURRENT_VERSION + ", clientVersion=" + version
175      + ", authMethod=" + authByte +
176      // The provider may be null if we failed to parse the header of the request
177      ", authName=" + (provider == null ? "unknown" : provider.getSaslAuthMethod().getName())
178      + " from " + toString();
179  }
180
181  /**
182   * Set up cell block codecs
183   */
184  private void setupCellBlockCodecs() throws FatalConnectionException {
185    // TODO: Plug in other supported decoders.
186    if (!connectionHeader.hasCellBlockCodecClass()) {
187      return;
188    }
189    String className = connectionHeader.getCellBlockCodecClass();
190    if (className == null || className.length() == 0) {
191      return;
192    }
193    try {
194      this.codec = (Codec) Class.forName(className).getDeclaredConstructor().newInstance();
195    } catch (Exception e) {
196      throw new UnsupportedCellCodecException(className, e);
197    }
198    if (!connectionHeader.hasCellBlockCompressorClass()) {
199      return;
200    }
201    className = connectionHeader.getCellBlockCompressorClass();
202    try {
203      this.compressionCodec =
204        (CompressionCodec) Class.forName(className).getDeclaredConstructor().newInstance();
205    } catch (Exception e) {
206      throw new UnsupportedCompressionCodecException(className, e);
207    }
208  }
209
210  /**
211   * Set up cipher for rpc encryption with Apache Commons Crypto.
212   */
213  private Pair<RPCProtos.ConnectionHeaderResponse, CryptoAES> setupCryptoCipher()
214    throws FatalConnectionException {
215    // If simple auth, return
216    if (saslServer == null) {
217      return null;
218    }
219    // check if rpc encryption with Crypto AES
220    String qop = saslServer.getNegotiatedQop();
221    boolean isEncryption = SaslUtil.QualityOfProtection.PRIVACY.getSaslQop().equalsIgnoreCase(qop);
222    boolean isCryptoAesEncryption = isEncryption
223      && this.rpcServer.conf.getBoolean("hbase.rpc.crypto.encryption.aes.enabled", false);
224    if (!isCryptoAesEncryption) {
225      return null;
226    }
227    if (!connectionHeader.hasRpcCryptoCipherTransformation()) {
228      return null;
229    }
230    String transformation = connectionHeader.getRpcCryptoCipherTransformation();
231    if (transformation == null || transformation.length() == 0) {
232      return null;
233    }
234    // Negotiates AES based on complete saslServer.
235    // The Crypto metadata need to be encrypted and send to client.
236    Properties properties = new Properties();
237    // the property for SecureRandomFactory
238    properties.setProperty(CryptoRandomFactory.CLASSES_KEY,
239      this.rpcServer.conf.get("hbase.crypto.sasl.encryption.aes.crypto.random",
240        "org.apache.commons.crypto.random.JavaCryptoRandom"));
241    // the property for cipher class
242    properties.setProperty(CryptoCipherFactory.CLASSES_KEY,
243      this.rpcServer.conf.get("hbase.rpc.crypto.encryption.aes.cipher.class",
244        "org.apache.commons.crypto.cipher.JceCipher"));
245
246    int cipherKeyBits =
247      this.rpcServer.conf.getInt("hbase.rpc.crypto.encryption.aes.cipher.keySizeBits", 128);
248    // generate key and iv
249    if (cipherKeyBits % 8 != 0) {
250      throw new IllegalArgumentException(
251        "The AES cipher key size in bits" + " should be a multiple of byte");
252    }
253    int len = cipherKeyBits / 8;
254    byte[] inKey = new byte[len];
255    byte[] outKey = new byte[len];
256    byte[] inIv = new byte[len];
257    byte[] outIv = new byte[len];
258
259    CryptoAES cryptoAES;
260    try {
261      // generate the cipher meta data with SecureRandom
262      CryptoRandom secureRandom = CryptoRandomFactory.getCryptoRandom(properties);
263      secureRandom.nextBytes(inKey);
264      secureRandom.nextBytes(outKey);
265      secureRandom.nextBytes(inIv);
266      secureRandom.nextBytes(outIv);
267
268      // create CryptoAES for server
269      cryptoAES = new CryptoAES(transformation, properties, inKey, outKey, inIv, outIv);
270    } catch (GeneralSecurityException | IOException ex) {
271      throw new UnsupportedCryptoException(ex.getMessage(), ex);
272    }
273    // create SaslCipherMeta and send to client,
274    // for client, the [inKey, outKey], [inIv, outIv] should be reversed
275    RPCProtos.CryptoCipherMeta.Builder ccmBuilder = RPCProtos.CryptoCipherMeta.newBuilder();
276    ccmBuilder.setTransformation(transformation);
277    ccmBuilder.setInIv(getByteString(outIv));
278    ccmBuilder.setInKey(getByteString(outKey));
279    ccmBuilder.setOutIv(getByteString(inIv));
280    ccmBuilder.setOutKey(getByteString(inKey));
281    RPCProtos.ConnectionHeaderResponse resp =
282      RPCProtos.ConnectionHeaderResponse.newBuilder().setCryptoCipherMeta(ccmBuilder).build();
283    return Pair.newPair(resp, cryptoAES);
284  }
285
286  private ByteString getByteString(byte[] bytes) {
287    // return singleton to reduce object allocation
288    return (bytes.length == 0) ? ByteString.EMPTY : ByteString.copyFrom(bytes);
289  }
290
291  private UserGroupInformation createUser(ConnectionHeader head) {
292    UserGroupInformation ugi = null;
293
294    if (!head.hasUserInfo()) {
295      return null;
296    }
297    UserInformation userInfoProto = head.getUserInfo();
298    String effectiveUser = null;
299    if (userInfoProto.hasEffectiveUser()) {
300      effectiveUser = userInfoProto.getEffectiveUser();
301    }
302    String realUser = null;
303    if (userInfoProto.hasRealUser()) {
304      realUser = userInfoProto.getRealUser();
305    }
306    if (effectiveUser != null) {
307      if (realUser != null) {
308        UserGroupInformation realUserUgi = UserGroupInformation.createRemoteUser(realUser);
309        ugi = UserGroupInformation.createProxyUser(effectiveUser, realUserUgi);
310      } else {
311        ugi = UserGroupInformation.createRemoteUser(effectiveUser);
312      }
313    }
314    return ugi;
315  }
316
317  protected final void disposeSasl() {
318    if (saslServer != null) {
319      saslServer.dispose();
320      saslServer = null;
321    }
322  }
323
324  /**
325   * No protobuf encoding of raw sasl messages
326   */
327  protected final void doRawSaslReply(SaslStatus status, Writable rv, String errorClass,
328    String error) throws IOException {
329    BufferChain bc;
330    // In my testing, have noticed that sasl messages are usually
331    // in the ballpark of 100-200. That's why the initial capacity is 256.
332    try (ByteBufferOutputStream saslResponse = new ByteBufferOutputStream(256);
333      DataOutputStream out = new DataOutputStream(saslResponse)) {
334      out.writeInt(status.state); // write status
335      if (status == SaslStatus.SUCCESS) {
336        rv.write(out);
337      } else {
338        WritableUtils.writeString(out, errorClass);
339        WritableUtils.writeString(out, error);
340      }
341      bc = new BufferChain(saslResponse.getByteBuffer());
342    }
343    doRespond(() -> bc);
344  }
345
346  HBaseSaslRpcServer getOrCreateSaslServer() throws IOException {
347    if (saslServer == null) {
348      saslServer = new HBaseSaslRpcServer(provider, rpcServer.saslProps, rpcServer.secretManager);
349    }
350    return saslServer;
351  }
352
353  void finishSaslNegotiation() throws IOException {
354    String qop = saslServer.getNegotiatedQop();
355    ugi = provider.getAuthorizedUgi(saslServer.getAuthorizationID(), this.rpcServer.secretManager);
356    RpcServer.LOG.debug(
357      "SASL server context established. Authenticated client: {}. Negotiated QoP is {}", ugi, qop);
358    rpcServer.metrics.authenticationSuccess();
359    RpcServer.AUDITLOG.info(RpcServer.AUTH_SUCCESSFUL_FOR + ugi);
360  }
361
362  public void processOneRpc(ByteBuff buf) throws IOException, InterruptedException {
363    if (connectionHeaderRead) {
364      processRequest(buf);
365    } else {
366      processConnectionHeader(buf);
367      callCleanupIfNeeded();
368      this.connectionHeaderRead = true;
369      this.rpcServer.getRpcCoprocessorHost().preAuthorizeConnection(connectionHeader, addr);
370      if (rpcServer.needAuthorization() && !authorizeConnection()) {
371        // Throw FatalConnectionException wrapping ACE so client does right thing and closes
372        // down the connection instead of trying to read non-existent retun.
373        throw new AccessDeniedException("Connection from " + this + " for service "
374          + connectionHeader.getServiceName() + " is unauthorized for user: " + ugi);
375      }
376      this.user = this.rpcServer.userProvider.create(this.ugi);
377      this.rpcServer.getRpcCoprocessorHost().postAuthorizeConnection(
378        this.user != null ? this.user.getName() : null, this.clientCertificateChain);
379    }
380  }
381
382  private boolean authorizeConnection() throws IOException {
383    try {
384      // If auth method is DIGEST, the token was obtained by the
385      // real user for the effective user, therefore not required to
386      // authorize real user. doAs is allowed only for simple or kerberos
387      // authentication
388      if (ugi != null && ugi.getRealUser() != null && provider.supportsProtocolAuthentication()) {
389        ProxyUsers.authorize(ugi, this.getHostAddress(), this.rpcServer.conf);
390      }
391      this.rpcServer.authorize(ugi, connectionHeader, getHostInetAddress());
392      this.rpcServer.metrics.authorizationSuccess();
393    } catch (AuthorizationException ae) {
394      if (RpcServer.LOG.isDebugEnabled()) {
395        RpcServer.LOG.debug("Connection authorization failed: " + ae.getMessage(), ae);
396      }
397      this.rpcServer.metrics.authorizationFailure();
398      doRespond(getErrorResponse(ae.getMessage(), new AccessDeniedException(ae)));
399      return false;
400    }
401    return true;
402  }
403
404  private CodedInputStream createCis(ByteBuff buf) {
405    // Here we read in the header. We avoid having pb
406    // do its default 4k allocation for CodedInputStream. We force it to use
407    // backing array.
408    CodedInputStream cis;
409    if (buf.hasArray()) {
410      cis = UnsafeByteOperations
411        .unsafeWrap(buf.array(), buf.arrayOffset() + buf.position(), buf.limit()).newCodedInput();
412    } else {
413      cis = UnsafeByteOperations.unsafeWrap(new ByteBuffByteInput(buf, buf.limit()), 0, buf.limit())
414        .newCodedInput();
415    }
416    cis.enableAliasing(true);
417    return cis;
418  }
419
420  // Reads the connection header following version
421  private void processConnectionHeader(ByteBuff buf) throws IOException {
422    this.connectionHeader = ConnectionHeader.parseFrom(createCis(buf));
423
424    // we want to copy the attributes prior to releasing the buffer so that they don't get corrupted
425    // eventually
426    if (connectionHeader.getAttributeList().isEmpty()) {
427      this.connectionAttributes = Collections.emptyMap();
428    } else {
429      this.connectionAttributes =
430        Maps.newHashMapWithExpectedSize(connectionHeader.getAttributeList().size());
431      for (HBaseProtos.NameBytesPair nameBytesPair : connectionHeader.getAttributeList()) {
432        this.connectionAttributes.put(nameBytesPair.getName(),
433          nameBytesPair.getValue().toByteArray());
434      }
435    }
436    String serviceName = connectionHeader.getServiceName();
437    if (serviceName == null) {
438      throw new EmptyServiceNameException();
439    }
440    this.service = RpcServer.getService(this.rpcServer.services, serviceName);
441    if (this.service == null) {
442      throw new UnknownServiceException(serviceName);
443    }
444    setupCellBlockCodecs();
445    sendConnectionHeaderResponseIfNeeded();
446    UserGroupInformation protocolUser = createUser(connectionHeader);
447    if (!useSasl) {
448      ugi = protocolUser;
449      if (ugi != null) {
450        ugi.setAuthenticationMethod(AuthenticationMethod.SIMPLE);
451      }
452      // audit logging for SASL authenticated users happens in saslReadAndProcess()
453      if (authenticatedWithFallback) {
454        RpcServer.LOG.warn("Allowed fallback to SIMPLE auth for {} connecting from {}", ugi,
455          getHostAddress());
456      }
457    } else {
458      // user is authenticated
459      ugi.setAuthenticationMethod(provider.getSaslAuthMethod().getAuthMethod());
460      // Now we check if this is a proxy user case. If the protocol user is
461      // different from the 'user', it is a proxy user scenario. However,
462      // this is not allowed if user authenticated with DIGEST.
463      if ((protocolUser != null) && (!protocolUser.getUserName().equals(ugi.getUserName()))) {
464        if (!provider.supportsProtocolAuthentication()) {
465          // Not allowed to doAs if token authentication is used
466          throw new AccessDeniedException("Authenticated user (" + ugi
467            + ") doesn't match what the client claims to be (" + protocolUser + ")");
468        } else {
469          // Effective user can be different from authenticated user
470          // for simple auth or kerberos auth
471          // The user is the real user. Now we create a proxy user
472          UserGroupInformation realUser = ugi;
473          ugi = UserGroupInformation.createProxyUser(protocolUser.getUserName(), realUser);
474          // Now the user is a proxy user, set Authentication method Proxy.
475          ugi.setAuthenticationMethod(AuthenticationMethod.PROXY);
476        }
477      }
478    }
479    String version;
480    if (this.connectionHeader.hasVersionInfo()) {
481      // see if this connection will support RetryImmediatelyException
482      this.retryImmediatelySupported = VersionInfoUtil.hasMinimumVersion(getVersionInfo(), 1, 2);
483      version = this.connectionHeader.getVersionInfo().getVersion();
484    } else {
485      version = "UNKNOWN";
486    }
487    RpcServer.AUDITLOG.info("Connection from {}:{}, version={}, sasl={}, ugi={}, service={}",
488      this.hostAddress, this.remotePort, version, this.useSasl, this.ugi, serviceName);
489  }
490
491  /**
492   * Send the response for connection header
493   */
494  private void sendConnectionHeaderResponseIfNeeded() throws FatalConnectionException {
495    Pair<RPCProtos.ConnectionHeaderResponse, CryptoAES> pair = setupCryptoCipher();
496    // Response the connection header if Crypto AES is enabled
497    if (pair == null) {
498      return;
499    }
500    try {
501      int size = pair.getFirst().getSerializedSize();
502      BufferChain bc;
503      try (ByteBufferOutputStream bbOut = new ByteBufferOutputStream(4 + size);
504        DataOutputStream out = new DataOutputStream(bbOut)) {
505        out.writeInt(size);
506        pair.getFirst().writeTo(out);
507        bc = new BufferChain(bbOut.getByteBuffer());
508      }
509      doRespond(new RpcResponse() {
510
511        @Override
512        public BufferChain getResponse() {
513          return bc;
514        }
515
516        @Override
517        public void done() {
518          // must switch after sending the connection header response, as the client still uses the
519          // original SaslClient to unwrap the data we send back
520          saslServer.switchToCryptoAES(pair.getSecond());
521        }
522      });
523    } catch (IOException ex) {
524      throw new UnsupportedCryptoException(ex.getMessage(), ex);
525    }
526  }
527
528  protected abstract void doRespond(RpcResponse resp) throws IOException;
529
530  /**
531   * Has the request header and the request param and optionally encoded data buffer all in this one
532   * array.
533   * <p/>
534   * Will be overridden in tests.
535   */
536  protected void processRequest(ByteBuff buf) throws IOException, InterruptedException {
537    long totalRequestSize = buf.limit();
538    int offset = 0;
539    // Here we read in the header. We avoid having pb
540    // do its default 4k allocation for CodedInputStream. We force it to use
541    // backing array.
542    CodedInputStream cis = createCis(buf);
543    int headerSize = cis.readRawVarint32();
544    offset = cis.getTotalBytesRead();
545    Message.Builder builder = RequestHeader.newBuilder();
546    ProtobufUtil.mergeFrom(builder, cis, headerSize);
547    RequestHeader header = (RequestHeader) builder.build();
548    offset += headerSize;
549    Context traceCtx = GlobalOpenTelemetry.getPropagators().getTextMapPropagator()
550      .extract(Context.current(), header.getTraceInfo(), getter);
551
552    // n.b. Management of this Span instance is a little odd. Most exit paths from this try scope
553    // are early-exits due to error cases. There's only one success path, the asynchronous call to
554    // RpcScheduler#dispatch. The success path assumes ownership of the span, which is represented
555    // by null-ing out the reference in this scope. All other paths end the span. Thus, and in
556    // order to avoid accidentally orphaning the span, the call to Span#end happens in a finally
557    // block iff the span is non-null.
558    Span span = TraceUtil.createRemoteSpan("RpcServer.process", traceCtx);
559    try (Scope ignored = span.makeCurrent()) {
560      int id = header.getCallId();
561      // HBASE-28128 - if server is aborting, don't bother trying to process. It will
562      // fail at the handler layer, but worse might result in CallQueueTooBigException if the
563      // queue is full but server is not properly processing requests. Better to throw an aborted
564      // exception here so that the client can properly react.
565      if (rpcServer.server != null && rpcServer.server.isAborted()) {
566        RegionServerAbortedException serverIsAborted = new RegionServerAbortedException(
567          "Server " + rpcServer.server.getServerName() + " aborting");
568        this.rpcServer.metrics.exception(serverIsAborted);
569        sendErrorResponseForCall(id, totalRequestSize, span, serverIsAborted.getMessage(),
570          serverIsAborted);
571        return;
572      }
573
574      if (RpcServer.LOG.isTraceEnabled()) {
575        RpcServer.LOG.trace("RequestHeader " + TextFormat.shortDebugString(header)
576          + " totalRequestSize: " + totalRequestSize + " bytes");
577      }
578      // Enforcing the call queue size, this triggers a retry in the client
579      // This is a bit late to be doing this check - we have already read in the
580      // total request.
581      if (
582        (totalRequestSize + this.rpcServer.callQueueSizeInBytes.sum())
583            > this.rpcServer.maxQueueSizeInBytes
584      ) {
585        this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION);
586        sendErrorResponseForCall(id, totalRequestSize, span,
587          "Call queue is full on " + this.rpcServer.server.getServerName()
588            + ", is hbase.ipc.server.max.callqueue.size too small?",
589          RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION);
590        return;
591      }
592      MethodDescriptor md = null;
593      Message param = null;
594      ExtendedCellScanner cellScanner = null;
595      try {
596        if (header.hasRequestParam() && header.getRequestParam()) {
597          md = this.service.getDescriptorForType().findMethodByName(header.getMethodName());
598          if (md == null) {
599            throw new UnsupportedOperationException(header.getMethodName());
600          }
601          builder = this.service.getRequestPrototype(md).newBuilderForType();
602          cis.resetSizeCounter();
603          int paramSize = cis.readRawVarint32();
604          offset += cis.getTotalBytesRead();
605          if (builder != null) {
606            ProtobufUtil.mergeFrom(builder, cis, paramSize);
607            param = builder.build();
608          }
609          offset += paramSize;
610        } else {
611          // currently header must have request param, so we directly throw
612          // exception here
613          String msg = "Invalid request header: " + TextFormat.shortDebugString(header)
614            + ", should have param set in it";
615          RpcServer.LOG.warn(msg);
616          throw new DoNotRetryIOException(msg);
617        }
618        if (header.hasCellBlockMeta()) {
619          buf.position(offset);
620          ByteBuff dup = buf.duplicate();
621          dup.limit(offset + header.getCellBlockMeta().getLength());
622          cellScanner = this.rpcServer.cellBlockBuilder.createCellScannerReusingBuffers(this.codec,
623            this.compressionCodec, dup);
624        }
625      } catch (Throwable thrown) {
626        InetSocketAddress address = this.rpcServer.getListenerAddress();
627        String msg = (address != null ? address : "(channel closed)")
628          + " is unable to read call parameter from client " + getHostAddress();
629        RpcServer.LOG.warn(msg, thrown);
630
631        this.rpcServer.metrics.exception(thrown);
632
633        final Throwable responseThrowable;
634        if (thrown instanceof LinkageError) {
635          // probably the hbase hadoop version does not match the running hadoop version
636          responseThrowable = new DoNotRetryIOException(thrown);
637        } else if (thrown instanceof UnsupportedOperationException) {
638          // If the method is not present on the server, do not retry.
639          responseThrowable = new DoNotRetryIOException(thrown);
640        } else {
641          responseThrowable = thrown;
642        }
643
644        sendErrorResponseForCall(id, totalRequestSize, span,
645          msg + "; " + responseThrowable.getMessage(), responseThrowable);
646        return;
647      }
648
649      int timeout = 0;
650      if (header.hasTimeout() && header.getTimeout() > 0) {
651        timeout = Math.max(this.rpcServer.minClientRequestTimeout, header.getTimeout());
652      }
653      ServerCall<?> call = createCall(id, this.service, md, header, param, cellScanner,
654        totalRequestSize, this.addr, timeout, this.callCleanup);
655
656      if (this.rpcServer.scheduler.dispatch(new CallRunner(this.rpcServer, call))) {
657        // unset span do that it's not closed in the finally block
658        span = null;
659      } else {
660        this.rpcServer.callQueueSizeInBytes.add(-1 * call.getSize());
661        this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION);
662        call.setResponse(null, null, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION,
663          "Call queue is full on " + this.rpcServer.server.getServerName()
664            + ", too many items queued ?");
665        TraceUtil.setError(span, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION);
666        call.sendResponseIfReady();
667      }
668    } finally {
669      if (span != null) {
670        span.end();
671      }
672    }
673  }
674
675  private void sendErrorResponseForCall(int id, long totalRequestSize, Span span, String msg,
676    Throwable responseThrowable) throws IOException {
677    ServerCall<?> failedcall = createCall(id, this.service, null, null, null, null,
678      totalRequestSize, null, 0, this.callCleanup);
679    failedcall.setResponse(null, null, responseThrowable, msg);
680    TraceUtil.setError(span, responseThrowable);
681    failedcall.sendResponseIfReady();
682  }
683
684  protected final RpcResponse getErrorResponse(String msg, Exception e) throws IOException {
685    ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder().setCallId(-1);
686    ServerCall.setExceptionResponse(e, msg, headerBuilder);
687    ByteBuffer headerBuf =
688      ServerCall.createHeaderAndMessageBytes(null, headerBuilder.build(), 0, null);
689    BufferChain buf = new BufferChain(headerBuf);
690    return () -> buf;
691  }
692
693  private void doBadPreambleHandling(String msg) throws IOException {
694    doBadPreambleHandling(msg, new FatalConnectionException(msg));
695  }
696
697  private void doBadPreambleHandling(String msg, Exception e) throws IOException {
698    RpcServer.LOG.warn(msg, e);
699    doRespond(getErrorResponse(msg, e));
700  }
701
702  private void doPreambleResponse(Message resp) throws IOException {
703    ResponseHeader header = ResponseHeader.newBuilder().setCallId(-1).build();
704    ByteBuffer buf = ServerCall.createHeaderAndMessageBytes(resp, header, 0, null);
705    BufferChain bufChain = new BufferChain(buf);
706    doRespond(() -> bufChain);
707  }
708
709  private boolean doConnectionRegistryResponse() throws IOException {
710    if (!(rpcServer.server instanceof ConnectionRegistryEndpoint)) {
711      // should be in tests or some scenarios where we should not reach here
712      return false;
713    }
714    // on backup masters, this request may be blocked since we need to fetch it from filesystem,
715    // but since it is just backup master, it is not a critical problem
716    String clusterId = ((ConnectionRegistryEndpoint) rpcServer.server).getClusterId();
717    RpcServer.LOG.debug("Response connection registry, clusterId = '{}'", clusterId);
718    if (clusterId == null) {
719      // should be in tests or some scenarios where we should not reach here
720      return false;
721    }
722    GetConnectionRegistryResponse resp =
723      GetConnectionRegistryResponse.newBuilder().setClusterId(clusterId).build();
724    doPreambleResponse(resp);
725    return true;
726  }
727
728  private void doSecurityPreambleResponse() throws IOException {
729    if (rpcServer.isSecurityEnabled) {
730      SecurityPreamableResponse resp = SecurityPreamableResponse.newBuilder()
731        .setServerPrincipal(rpcServer.serverPrincipal).build();
732      doPreambleResponse(resp);
733    } else {
734      // security is not enabled, do not need a principal when connecting, throw a special exception
735      // to let client know it should just use simple authentication
736      doRespond(getErrorResponse("security is not enabled", new SecurityNotEnabledException()));
737    }
738  }
739
740  protected final void callCleanupIfNeeded() {
741    if (callCleanup != null) {
742      callCleanup.run();
743      callCleanup = null;
744    }
745  }
746
747  protected enum PreambleResponse {
748    SUCCEED, // successfully processed the rpc preamble header
749    CONTINUE, // the preamble header is for other purpose, wait for the rpc preamble header
750    CLOSE // close the rpc connection
751  }
752
753  protected final PreambleResponse processPreamble(ByteBuffer preambleBuffer) throws IOException {
754    assert preambleBuffer.remaining() == 6;
755    if (
756      ByteBufferUtils.equals(preambleBuffer, preambleBuffer.position(), 6,
757        RpcClient.REGISTRY_PREAMBLE_HEADER, 0, 6) && doConnectionRegistryResponse()
758    ) {
759      return PreambleResponse.CLOSE;
760    }
761    if (
762      ByteBufferUtils.equals(preambleBuffer, preambleBuffer.position(), 6,
763        RpcClient.SECURITY_PREAMBLE_HEADER, 0, 6)
764    ) {
765      doSecurityPreambleResponse();
766      return PreambleResponse.CONTINUE;
767    }
768    if (!ByteBufferUtils.equals(preambleBuffer, preambleBuffer.position(), 4, RPC_HEADER, 0, 4)) {
769      doBadPreambleHandling(
770        "Expected HEADER=" + Bytes.toStringBinary(RPC_HEADER) + " but received HEADER="
771          + Bytes.toStringBinary(
772            ByteBufferUtils.toBytes(preambleBuffer, preambleBuffer.position(), RPC_HEADER.length),
773            0, RPC_HEADER.length)
774          + " from " + toString());
775      return PreambleResponse.CLOSE;
776    }
777    int version = preambleBuffer.get(preambleBuffer.position() + 4) & 0xFF;
778    byte authByte = preambleBuffer.get(preambleBuffer.position() + 5);
779    if (version != RpcServer.CURRENT_VERSION) {
780      String msg = getFatalConnectionString(version, authByte);
781      doBadPreambleHandling(msg, new WrongVersionException(msg));
782      return PreambleResponse.CLOSE;
783    }
784
785    this.provider = this.saslProviders.selectProvider(authByte);
786    if (this.provider == null) {
787      String msg = getFatalConnectionString(version, authByte);
788      doBadPreambleHandling(msg, new BadAuthException(msg));
789      return PreambleResponse.CLOSE;
790    }
791    // TODO this is a wart while simple auth'n doesn't go through sasl.
792    if (this.rpcServer.isSecurityEnabled && isSimpleAuthentication()) {
793      if (this.rpcServer.allowFallbackToSimpleAuth) {
794        this.rpcServer.metrics.authenticationFallback();
795        authenticatedWithFallback = true;
796      } else {
797        AccessDeniedException ae = new AccessDeniedException("Authentication is required");
798        doRespond(getErrorResponse(ae.getMessage(), ae));
799        return PreambleResponse.CLOSE;
800      }
801    }
802    if (!this.rpcServer.isSecurityEnabled && !isSimpleAuthentication()) {
803      doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(SaslUtil.SWITCH_TO_SIMPLE_AUTH), null,
804        null);
805      provider = saslProviders.getSimpleProvider();
806      // client has already sent the initial Sasl message and we
807      // should ignore it. Both client and server should fall back
808      // to simple auth from now on.
809      skipInitialSaslHandshake = true;
810    }
811    useSasl = !(provider instanceof SimpleSaslServerAuthenticationProvider);
812    return PreambleResponse.SUCCEED;
813  }
814
815  boolean isSimpleAuthentication() {
816    return Objects.requireNonNull(provider) instanceof SimpleSaslServerAuthenticationProvider;
817  }
818
819  public abstract boolean isConnectionOpen();
820
821  public abstract ServerCall<?> createCall(int id, BlockingService service, MethodDescriptor md,
822    RequestHeader header, Message param, ExtendedCellScanner cellScanner, long size,
823    InetAddress remoteAddress, int timeout, CallCleanup reqCleanup);
824
825  private static class ByteBuffByteInput extends ByteInput {
826
827    private ByteBuff buf;
828    private int length;
829
830    ByteBuffByteInput(ByteBuff buf, int length) {
831      this.buf = buf;
832      this.length = length;
833    }
834
835    @Override
836    public byte read(int offset) {
837      return this.buf.get(offset);
838    }
839
840    @Override
841    public int read(int offset, byte[] out, int outOffset, int len) {
842      this.buf.get(offset, out, outOffset, len);
843      return len;
844    }
845
846    @Override
847    public int read(int offset, ByteBuffer out) {
848      int len = out.remaining();
849      this.buf.get(out, offset, len);
850      return len;
851    }
852
853    @Override
854    public int size() {
855      return this.length;
856    }
857  }
858}