001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import static org.apache.hadoop.hbase.HConstants.RPC_HEADER;
021
022import io.opentelemetry.api.GlobalOpenTelemetry;
023import io.opentelemetry.api.trace.Span;
024import io.opentelemetry.context.Context;
025import io.opentelemetry.context.Scope;
026import io.opentelemetry.context.propagation.TextMapGetter;
027import java.io.ByteArrayInputStream;
028import java.io.Closeable;
029import java.io.DataOutputStream;
030import java.io.IOException;
031import java.net.InetAddress;
032import java.net.InetSocketAddress;
033import java.nio.ByteBuffer;
034import java.nio.channels.Channels;
035import java.nio.channels.ReadableByteChannel;
036import java.security.GeneralSecurityException;
037import java.util.Objects;
038import java.util.Properties;
039import org.apache.commons.crypto.cipher.CryptoCipherFactory;
040import org.apache.commons.crypto.random.CryptoRandom;
041import org.apache.commons.crypto.random.CryptoRandomFactory;
042import org.apache.hadoop.hbase.CellScanner;
043import org.apache.hadoop.hbase.DoNotRetryIOException;
044import org.apache.hadoop.hbase.client.VersionInfoUtil;
045import org.apache.hadoop.hbase.codec.Codec;
046import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
047import org.apache.hadoop.hbase.io.crypto.aes.CryptoAES;
048import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
049import org.apache.hadoop.hbase.nio.ByteBuff;
050import org.apache.hadoop.hbase.nio.SingleByteBuff;
051import org.apache.hadoop.hbase.security.AccessDeniedException;
052import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
053import org.apache.hadoop.hbase.security.SaslStatus;
054import org.apache.hadoop.hbase.security.SaslUtil;
055import org.apache.hadoop.hbase.security.User;
056import org.apache.hadoop.hbase.security.provider.SaslServerAuthenticationProvider;
057import org.apache.hadoop.hbase.security.provider.SaslServerAuthenticationProviders;
058import org.apache.hadoop.hbase.security.provider.SimpleSaslServerAuthenticationProvider;
059import org.apache.hadoop.hbase.trace.TraceUtil;
060import org.apache.hadoop.hbase.util.Bytes;
061import org.apache.hadoop.io.BytesWritable;
062import org.apache.hadoop.io.IntWritable;
063import org.apache.hadoop.io.Writable;
064import org.apache.hadoop.io.WritableUtils;
065import org.apache.hadoop.io.compress.CompressionCodec;
066import org.apache.hadoop.security.UserGroupInformation;
067import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
068import org.apache.hadoop.security.authorize.AuthorizationException;
069import org.apache.hadoop.security.authorize.ProxyUsers;
070import org.apache.hadoop.security.token.SecretManager.InvalidToken;
071import org.apache.yetus.audience.InterfaceAudience;
072
073import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
074import org.apache.hbase.thirdparty.com.google.protobuf.ByteInput;
075import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
076import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream;
077import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
078import org.apache.hbase.thirdparty.com.google.protobuf.Message;
079import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
080import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
081
082import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
083import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo;
084import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
085import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
086import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
087import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader;
088import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation;
089import org.apache.hadoop.hbase.shaded.protobuf.generated.TracingProtos.RPCTInfo;
090
091/** Reads calls from a connection and queues them for handling. */
092@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "VO_VOLATILE_INCREMENT",
093    justification = "False positive according to http://sourceforge.net/p/findbugs/bugs/1032/")
094@InterfaceAudience.Private
095abstract class ServerRpcConnection implements Closeable {
096
097  private static final TextMapGetter<RPCTInfo> getter = new RPCTInfoGetter();
098
099  protected final RpcServer rpcServer;
100  // If the connection header has been read or not.
101  protected boolean connectionHeaderRead = false;
102
103  protected CallCleanup callCleanup;
104
105  // Cache the remote host & port info so that even if the socket is
106  // disconnected, we can say where it used to connect to.
107  protected String hostAddress;
108  protected int remotePort;
109  protected InetAddress addr;
110  protected ConnectionHeader connectionHeader;
111
112  /**
113   * Codec the client asked use.
114   */
115  protected Codec codec;
116  /**
117   * Compression codec the client asked us use.
118   */
119  protected CompressionCodec compressionCodec;
120  protected BlockingService service;
121
122  protected SaslServerAuthenticationProvider provider;
123  protected boolean saslContextEstablished;
124  protected boolean skipInitialSaslHandshake;
125  private ByteBuffer unwrappedData;
126  // When is this set? FindBugs wants to know! Says NP
127  private ByteBuffer unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
128  protected boolean useSasl;
129  protected HBaseSaslRpcServer saslServer;
130  protected CryptoAES cryptoAES;
131  protected boolean useWrap = false;
132  protected boolean useCryptoAesWrap = false;
133
134  // was authentication allowed with a fallback to simple auth
135  protected boolean authenticatedWithFallback;
136
137  protected boolean retryImmediatelySupported = false;
138
139  protected User user = null;
140  protected UserGroupInformation ugi = null;
141  protected SaslServerAuthenticationProviders saslProviders = null;
142
143  public ServerRpcConnection(RpcServer rpcServer) {
144    this.rpcServer = rpcServer;
145    this.callCleanup = null;
146    this.saslProviders = SaslServerAuthenticationProviders.getInstance(rpcServer.getConf());
147  }
148
149  @Override
150  public String toString() {
151    return getHostAddress() + ":" + remotePort;
152  }
153
154  public String getHostAddress() {
155    return hostAddress;
156  }
157
158  public InetAddress getHostInetAddress() {
159    return addr;
160  }
161
162  public int getRemotePort() {
163    return remotePort;
164  }
165
166  public VersionInfo getVersionInfo() {
167    if (connectionHeader.hasVersionInfo()) {
168      return connectionHeader.getVersionInfo();
169    }
170    return null;
171  }
172
173  private String getFatalConnectionString(final int version, final byte authByte) {
174    return "serverVersion=" + RpcServer.CURRENT_VERSION + ", clientVersion=" + version
175      + ", authMethod=" + authByte +
176      // The provider may be null if we failed to parse the header of the request
177      ", authName=" + (provider == null ? "unknown" : provider.getSaslAuthMethod().getName())
178      + " from " + toString();
179  }
180
181  /**
182   * Set up cell block codecs n
183   */
184  private void setupCellBlockCodecs(final ConnectionHeader header) throws FatalConnectionException {
185    // TODO: Plug in other supported decoders.
186    if (!header.hasCellBlockCodecClass()) return;
187    String className = header.getCellBlockCodecClass();
188    if (className == null || className.length() == 0) return;
189    try {
190      this.codec = (Codec) Class.forName(className).getDeclaredConstructor().newInstance();
191    } catch (Exception e) {
192      throw new UnsupportedCellCodecException(className, e);
193    }
194    if (!header.hasCellBlockCompressorClass()) return;
195    className = header.getCellBlockCompressorClass();
196    try {
197      this.compressionCodec =
198        (CompressionCodec) Class.forName(className).getDeclaredConstructor().newInstance();
199    } catch (Exception e) {
200      throw new UnsupportedCompressionCodecException(className, e);
201    }
202  }
203
204  /**
205   * Set up cipher for rpc encryption with Apache Commons Crypto n
206   */
207  private void setupCryptoCipher(final ConnectionHeader header,
208    RPCProtos.ConnectionHeaderResponse.Builder chrBuilder) throws FatalConnectionException {
209    // If simple auth, return
210    if (saslServer == null) return;
211    // check if rpc encryption with Crypto AES
212    String qop = saslServer.getNegotiatedQop();
213    boolean isEncryption = SaslUtil.QualityOfProtection.PRIVACY.getSaslQop().equalsIgnoreCase(qop);
214    boolean isCryptoAesEncryption = isEncryption
215      && this.rpcServer.conf.getBoolean("hbase.rpc.crypto.encryption.aes.enabled", false);
216    if (!isCryptoAesEncryption) return;
217    if (!header.hasRpcCryptoCipherTransformation()) return;
218    String transformation = header.getRpcCryptoCipherTransformation();
219    if (transformation == null || transformation.length() == 0) return;
220    // Negotiates AES based on complete saslServer.
221    // The Crypto metadata need to be encrypted and send to client.
222    Properties properties = new Properties();
223    // the property for SecureRandomFactory
224    properties.setProperty(CryptoRandomFactory.CLASSES_KEY,
225      this.rpcServer.conf.get("hbase.crypto.sasl.encryption.aes.crypto.random",
226        "org.apache.commons.crypto.random.JavaCryptoRandom"));
227    // the property for cipher class
228    properties.setProperty(CryptoCipherFactory.CLASSES_KEY,
229      this.rpcServer.conf.get("hbase.rpc.crypto.encryption.aes.cipher.class",
230        "org.apache.commons.crypto.cipher.JceCipher"));
231
232    int cipherKeyBits =
233      this.rpcServer.conf.getInt("hbase.rpc.crypto.encryption.aes.cipher.keySizeBits", 128);
234    // generate key and iv
235    if (cipherKeyBits % 8 != 0) {
236      throw new IllegalArgumentException(
237        "The AES cipher key size in bits" + " should be a multiple of byte");
238    }
239    int len = cipherKeyBits / 8;
240    byte[] inKey = new byte[len];
241    byte[] outKey = new byte[len];
242    byte[] inIv = new byte[len];
243    byte[] outIv = new byte[len];
244
245    try {
246      // generate the cipher meta data with SecureRandom
247      CryptoRandom secureRandom = CryptoRandomFactory.getCryptoRandom(properties);
248      secureRandom.nextBytes(inKey);
249      secureRandom.nextBytes(outKey);
250      secureRandom.nextBytes(inIv);
251      secureRandom.nextBytes(outIv);
252
253      // create CryptoAES for server
254      cryptoAES = new CryptoAES(transformation, properties, inKey, outKey, inIv, outIv);
255      // create SaslCipherMeta and send to client,
256      // for client, the [inKey, outKey], [inIv, outIv] should be reversed
257      RPCProtos.CryptoCipherMeta.Builder ccmBuilder = RPCProtos.CryptoCipherMeta.newBuilder();
258      ccmBuilder.setTransformation(transformation);
259      ccmBuilder.setInIv(getByteString(outIv));
260      ccmBuilder.setInKey(getByteString(outKey));
261      ccmBuilder.setOutIv(getByteString(inIv));
262      ccmBuilder.setOutKey(getByteString(inKey));
263      chrBuilder.setCryptoCipherMeta(ccmBuilder);
264      useCryptoAesWrap = true;
265    } catch (GeneralSecurityException | IOException ex) {
266      throw new UnsupportedCryptoException(ex.getMessage(), ex);
267    }
268  }
269
270  private ByteString getByteString(byte[] bytes) {
271    // return singleton to reduce object allocation
272    return (bytes.length == 0) ? ByteString.EMPTY : ByteString.copyFrom(bytes);
273  }
274
275  private UserGroupInformation createUser(ConnectionHeader head) {
276    UserGroupInformation ugi = null;
277
278    if (!head.hasUserInfo()) {
279      return null;
280    }
281    UserInformation userInfoProto = head.getUserInfo();
282    String effectiveUser = null;
283    if (userInfoProto.hasEffectiveUser()) {
284      effectiveUser = userInfoProto.getEffectiveUser();
285    }
286    String realUser = null;
287    if (userInfoProto.hasRealUser()) {
288      realUser = userInfoProto.getRealUser();
289    }
290    if (effectiveUser != null) {
291      if (realUser != null) {
292        UserGroupInformation realUserUgi = UserGroupInformation.createRemoteUser(realUser);
293        ugi = UserGroupInformation.createProxyUser(effectiveUser, realUserUgi);
294      } else {
295        ugi = UserGroupInformation.createRemoteUser(effectiveUser);
296      }
297    }
298    return ugi;
299  }
300
301  protected final void disposeSasl() {
302    if (saslServer != null) {
303      saslServer.dispose();
304      saslServer = null;
305    }
306  }
307
308  /**
309   * No protobuf encoding of raw sasl messages
310   */
311  protected final void doRawSaslReply(SaslStatus status, Writable rv, String errorClass,
312    String error) throws IOException {
313    BufferChain bc;
314    // In my testing, have noticed that sasl messages are usually
315    // in the ballpark of 100-200. That's why the initial capacity is 256.
316    try (ByteBufferOutputStream saslResponse = new ByteBufferOutputStream(256);
317      DataOutputStream out = new DataOutputStream(saslResponse)) {
318      out.writeInt(status.state); // write status
319      if (status == SaslStatus.SUCCESS) {
320        rv.write(out);
321      } else {
322        WritableUtils.writeString(out, errorClass);
323        WritableUtils.writeString(out, error);
324      }
325      bc = new BufferChain(saslResponse.getByteBuffer());
326    }
327    doRespond(() -> bc);
328  }
329
330  public void saslReadAndProcess(ByteBuff saslToken) throws IOException, InterruptedException {
331    if (saslContextEstablished) {
332      RpcServer.LOG.trace("Read input token of size={} for processing by saslServer.unwrap()",
333        saslToken.limit());
334      if (!useWrap) {
335        processOneRpc(saslToken);
336      } else {
337        byte[] b = saslToken.hasArray() ? saslToken.array() : saslToken.toBytes();
338        byte[] plaintextData;
339        if (useCryptoAesWrap) {
340          // unwrap with CryptoAES
341          plaintextData = cryptoAES.unwrap(b, 0, b.length);
342        } else {
343          plaintextData = saslServer.unwrap(b, 0, b.length);
344        }
345        // release the request buffer as we have already unwrapped all its content
346        callCleanupIfNeeded();
347        processUnwrappedData(plaintextData);
348      }
349    } else {
350      byte[] replyToken;
351      try {
352        if (saslServer == null) {
353          try {
354            saslServer =
355              new HBaseSaslRpcServer(provider, rpcServer.saslProps, rpcServer.secretManager);
356          } catch (Exception e) {
357            RpcServer.LOG.error("Error when trying to create instance of HBaseSaslRpcServer "
358              + "with sasl provider: " + provider, e);
359            throw e;
360          }
361          RpcServer.LOG.debug("Created SASL server with mechanism={}",
362            provider.getSaslAuthMethod().getAuthMethod());
363        }
364        RpcServer.LOG.debug(
365          "Read input token of size={} for processing by saslServer." + "evaluateResponse()",
366          saslToken.limit());
367        replyToken = saslServer
368          .evaluateResponse(saslToken.hasArray() ? saslToken.array() : saslToken.toBytes());
369      } catch (IOException e) {
370        RpcServer.LOG.debug("Failed to execute SASL handshake", e);
371        IOException sendToClient = e;
372        Throwable cause = e;
373        while (cause != null) {
374          if (cause instanceof InvalidToken) {
375            sendToClient = (InvalidToken) cause;
376            break;
377          }
378          cause = cause.getCause();
379        }
380        doRawSaslReply(SaslStatus.ERROR, null, sendToClient.getClass().getName(),
381          sendToClient.getLocalizedMessage());
382        this.rpcServer.metrics.authenticationFailure();
383        String clientIP = this.toString();
384        // attempting user could be null
385        RpcServer.AUDITLOG.warn("{} {}: {}", RpcServer.AUTH_FAILED_FOR, clientIP,
386          saslServer.getAttemptingUser());
387        throw e;
388      } finally {
389        // release the request buffer as we have already unwrapped all its content
390        callCleanupIfNeeded();
391      }
392      if (replyToken != null) {
393        if (RpcServer.LOG.isDebugEnabled()) {
394          RpcServer.LOG.debug("Will send token of size " + replyToken.length + " from saslServer.");
395        }
396        doRawSaslReply(SaslStatus.SUCCESS, new BytesWritable(replyToken), null, null);
397      }
398      if (saslServer.isComplete()) {
399        String qop = saslServer.getNegotiatedQop();
400        useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
401        ugi =
402          provider.getAuthorizedUgi(saslServer.getAuthorizationID(), this.rpcServer.secretManager);
403        RpcServer.LOG.debug(
404          "SASL server context established. Authenticated client: {}. Negotiated QoP is {}", ugi,
405          qop);
406        this.rpcServer.metrics.authenticationSuccess();
407        RpcServer.AUDITLOG.info(RpcServer.AUTH_SUCCESSFUL_FOR + ugi);
408        saslContextEstablished = true;
409      }
410    }
411  }
412
413  private void processUnwrappedData(byte[] inBuf) throws IOException, InterruptedException {
414    ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(inBuf));
415    // Read all RPCs contained in the inBuf, even partial ones
416    while (true) {
417      int count;
418      if (unwrappedDataLengthBuffer.remaining() > 0) {
419        count = this.rpcServer.channelRead(ch, unwrappedDataLengthBuffer);
420        if (count <= 0 || unwrappedDataLengthBuffer.remaining() > 0) {
421          return;
422        }
423      }
424
425      if (unwrappedData == null) {
426        unwrappedDataLengthBuffer.flip();
427        int unwrappedDataLength = unwrappedDataLengthBuffer.getInt();
428
429        if (unwrappedDataLength == RpcClient.PING_CALL_ID) {
430          if (RpcServer.LOG.isDebugEnabled()) RpcServer.LOG.debug("Received ping message");
431          unwrappedDataLengthBuffer.clear();
432          continue; // ping message
433        }
434        unwrappedData = ByteBuffer.allocate(unwrappedDataLength);
435      }
436
437      count = this.rpcServer.channelRead(ch, unwrappedData);
438      if (count <= 0 || unwrappedData.remaining() > 0) {
439        return;
440      }
441
442      if (unwrappedData.remaining() == 0) {
443        unwrappedDataLengthBuffer.clear();
444        unwrappedData.flip();
445        processOneRpc(new SingleByteBuff(unwrappedData));
446        unwrappedData = null;
447      }
448    }
449  }
450
451  public void processOneRpc(ByteBuff buf) throws IOException, InterruptedException {
452    if (connectionHeaderRead) {
453      processRequest(buf);
454    } else {
455      processConnectionHeader(buf);
456      this.connectionHeaderRead = true;
457      if (rpcServer.needAuthorization() && !authorizeConnection()) {
458        // Throw FatalConnectionException wrapping ACE so client does right thing and closes
459        // down the connection instead of trying to read non-existent retun.
460        throw new AccessDeniedException("Connection from " + this + " for service "
461          + connectionHeader.getServiceName() + " is unauthorized for user: " + ugi);
462      }
463      this.user = this.rpcServer.userProvider.create(this.ugi);
464    }
465  }
466
467  private boolean authorizeConnection() throws IOException {
468    try {
469      // If auth method is DIGEST, the token was obtained by the
470      // real user for the effective user, therefore not required to
471      // authorize real user. doAs is allowed only for simple or kerberos
472      // authentication
473      if (ugi != null && ugi.getRealUser() != null && provider.supportsProtocolAuthentication()) {
474        ProxyUsers.authorize(ugi, this.getHostAddress(), this.rpcServer.conf);
475      }
476      this.rpcServer.authorize(ugi, connectionHeader, getHostInetAddress());
477      this.rpcServer.metrics.authorizationSuccess();
478    } catch (AuthorizationException ae) {
479      if (RpcServer.LOG.isDebugEnabled()) {
480        RpcServer.LOG.debug("Connection authorization failed: " + ae.getMessage(), ae);
481      }
482      this.rpcServer.metrics.authorizationFailure();
483      doRespond(getErrorResponse(ae.getMessage(), new AccessDeniedException(ae)));
484      return false;
485    }
486    return true;
487  }
488
489  // Reads the connection header following version
490  private void processConnectionHeader(ByteBuff buf) throws IOException {
491    if (buf.hasArray()) {
492      this.connectionHeader = ConnectionHeader.parseFrom(buf.array());
493    } else {
494      CodedInputStream cis = UnsafeByteOperations
495        .unsafeWrap(new ByteBuffByteInput(buf, 0, buf.limit()), 0, buf.limit()).newCodedInput();
496      cis.enableAliasing(true);
497      this.connectionHeader = ConnectionHeader.parseFrom(cis);
498    }
499    String serviceName = connectionHeader.getServiceName();
500    if (serviceName == null) throw new EmptyServiceNameException();
501    this.service = RpcServer.getService(this.rpcServer.services, serviceName);
502    if (this.service == null) throw new UnknownServiceException(serviceName);
503    setupCellBlockCodecs(this.connectionHeader);
504    RPCProtos.ConnectionHeaderResponse.Builder chrBuilder =
505      RPCProtos.ConnectionHeaderResponse.newBuilder();
506    setupCryptoCipher(this.connectionHeader, chrBuilder);
507    responseConnectionHeader(chrBuilder);
508    UserGroupInformation protocolUser = createUser(connectionHeader);
509    if (!useSasl) {
510      ugi = protocolUser;
511      if (ugi != null) {
512        ugi.setAuthenticationMethod(AuthenticationMethod.SIMPLE);
513      }
514      // audit logging for SASL authenticated users happens in saslReadAndProcess()
515      if (authenticatedWithFallback) {
516        RpcServer.LOG.warn("Allowed fallback to SIMPLE auth for {} connecting from {}", ugi,
517          getHostAddress());
518      }
519    } else {
520      // user is authenticated
521      ugi.setAuthenticationMethod(provider.getSaslAuthMethod().getAuthMethod());
522      // Now we check if this is a proxy user case. If the protocol user is
523      // different from the 'user', it is a proxy user scenario. However,
524      // this is not allowed if user authenticated with DIGEST.
525      if ((protocolUser != null) && (!protocolUser.getUserName().equals(ugi.getUserName()))) {
526        if (!provider.supportsProtocolAuthentication()) {
527          // Not allowed to doAs if token authentication is used
528          throw new AccessDeniedException("Authenticated user (" + ugi
529            + ") doesn't match what the client claims to be (" + protocolUser + ")");
530        } else {
531          // Effective user can be different from authenticated user
532          // for simple auth or kerberos auth
533          // The user is the real user. Now we create a proxy user
534          UserGroupInformation realUser = ugi;
535          ugi = UserGroupInformation.createProxyUser(protocolUser.getUserName(), realUser);
536          // Now the user is a proxy user, set Authentication method Proxy.
537          ugi.setAuthenticationMethod(AuthenticationMethod.PROXY);
538        }
539      }
540    }
541    String version;
542    if (this.connectionHeader.hasVersionInfo()) {
543      // see if this connection will support RetryImmediatelyException
544      this.retryImmediatelySupported = VersionInfoUtil.hasMinimumVersion(getVersionInfo(), 1, 2);
545      version = this.connectionHeader.getVersionInfo().getVersion();
546    } else {
547      version = "UNKNOWN";
548    }
549    RpcServer.AUDITLOG.info("Connection from {}:{}, version={}, sasl={}, ugi={}, service={}",
550      this.hostAddress, this.remotePort, version, this.useSasl, this.ugi, serviceName);
551  }
552
553  /**
554   * Send the response for connection header
555   */
556  private void responseConnectionHeader(RPCProtos.ConnectionHeaderResponse.Builder chrBuilder)
557    throws FatalConnectionException {
558    // Response the connection header if Crypto AES is enabled
559    if (!chrBuilder.hasCryptoCipherMeta()) return;
560    try {
561      byte[] connectionHeaderResBytes = chrBuilder.build().toByteArray();
562      // encrypt the Crypto AES cipher meta data with sasl server, and send to client
563      byte[] unwrapped = new byte[connectionHeaderResBytes.length + 4];
564      Bytes.putBytes(unwrapped, 0, Bytes.toBytes(connectionHeaderResBytes.length), 0, 4);
565      Bytes.putBytes(unwrapped, 4, connectionHeaderResBytes, 0, connectionHeaderResBytes.length);
566      byte[] wrapped = saslServer.wrap(unwrapped, 0, unwrapped.length);
567      BufferChain bc;
568      try (ByteBufferOutputStream response = new ByteBufferOutputStream(wrapped.length + 4);
569        DataOutputStream out = new DataOutputStream(response)) {
570        out.writeInt(wrapped.length);
571        out.write(wrapped);
572        bc = new BufferChain(response.getByteBuffer());
573      }
574      doRespond(() -> bc);
575    } catch (IOException ex) {
576      throw new UnsupportedCryptoException(ex.getMessage(), ex);
577    }
578  }
579
580  protected abstract void doRespond(RpcResponse resp) throws IOException;
581
582  /**
583   * n * Has the request header and the request param and optionally encoded data buffer all in this
584   * one array. nn
585   */
586  protected void processRequest(ByteBuff buf) throws IOException, InterruptedException {
587    long totalRequestSize = buf.limit();
588    int offset = 0;
589    // Here we read in the header. We avoid having pb
590    // do its default 4k allocation for CodedInputStream. We force it to use
591    // backing array.
592    CodedInputStream cis;
593    if (buf.hasArray()) {
594      cis = UnsafeByteOperations.unsafeWrap(buf.array(), 0, buf.limit()).newCodedInput();
595    } else {
596      cis = UnsafeByteOperations
597        .unsafeWrap(new ByteBuffByteInput(buf, 0, buf.limit()), 0, buf.limit()).newCodedInput();
598    }
599    cis.enableAliasing(true);
600    int headerSize = cis.readRawVarint32();
601    offset = cis.getTotalBytesRead();
602    Message.Builder builder = RequestHeader.newBuilder();
603    ProtobufUtil.mergeFrom(builder, cis, headerSize);
604    RequestHeader header = (RequestHeader) builder.build();
605    offset += headerSize;
606    Context traceCtx = GlobalOpenTelemetry.getPropagators().getTextMapPropagator()
607      .extract(Context.current(), header.getTraceInfo(), getter);
608
609    // n.b. Management of this Span instance is a little odd. Most exit paths from this try scope
610    // are early-exits due to error cases. There's only one success path, the asynchronous call to
611    // RpcScheduler#dispatch. The success path assumes ownership of the span, which is represented
612    // by null-ing out the reference in this scope. All other paths end the span. Thus, and in
613    // order to avoid accidentally orphaning the span, the call to Span#end happens in a finally
614    // block iff the span is non-null.
615    Span span = TraceUtil.createRemoteSpan("RpcServer.process", traceCtx);
616    try (Scope ignored = span.makeCurrent()) {
617      int id = header.getCallId();
618      if (RpcServer.LOG.isTraceEnabled()) {
619        RpcServer.LOG.trace("RequestHeader " + TextFormat.shortDebugString(header)
620          + " totalRequestSize: " + totalRequestSize + " bytes");
621      }
622      // Enforcing the call queue size, this triggers a retry in the client
623      // This is a bit late to be doing this check - we have already read in the
624      // total request.
625      if (
626        (totalRequestSize + this.rpcServer.callQueueSizeInBytes.sum())
627            > this.rpcServer.maxQueueSizeInBytes
628      ) {
629        final ServerCall<?> callTooBig = createCall(id, this.service, null, null, null, null,
630          totalRequestSize, null, 0, this.callCleanup);
631        this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION);
632        callTooBig.setResponse(null, null, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION,
633          "Call queue is full on " + this.rpcServer.server.getServerName()
634            + ", is hbase.ipc.server.max.callqueue.size too small?");
635        TraceUtil.setError(span, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION);
636        callTooBig.sendResponseIfReady();
637        return;
638      }
639      MethodDescriptor md = null;
640      Message param = null;
641      CellScanner cellScanner = null;
642      try {
643        if (header.hasRequestParam() && header.getRequestParam()) {
644          md = this.service.getDescriptorForType().findMethodByName(header.getMethodName());
645          if (md == null) {
646            throw new UnsupportedOperationException(header.getMethodName());
647          }
648          builder = this.service.getRequestPrototype(md).newBuilderForType();
649          cis.resetSizeCounter();
650          int paramSize = cis.readRawVarint32();
651          offset += cis.getTotalBytesRead();
652          if (builder != null) {
653            ProtobufUtil.mergeFrom(builder, cis, paramSize);
654            param = builder.build();
655          }
656          offset += paramSize;
657        } else {
658          // currently header must have request param, so we directly throw
659          // exception here
660          String msg = "Invalid request header: " + TextFormat.shortDebugString(header)
661            + ", should have param set in it";
662          RpcServer.LOG.warn(msg);
663          throw new DoNotRetryIOException(msg);
664        }
665        if (header.hasCellBlockMeta()) {
666          buf.position(offset);
667          ByteBuff dup = buf.duplicate();
668          dup.limit(offset + header.getCellBlockMeta().getLength());
669          cellScanner = this.rpcServer.cellBlockBuilder.createCellScannerReusingBuffers(this.codec,
670            this.compressionCodec, dup);
671        }
672      } catch (Throwable thrown) {
673        InetSocketAddress address = this.rpcServer.getListenerAddress();
674        String msg = (address != null ? address : "(channel closed)")
675          + " is unable to read call parameter from client " + getHostAddress();
676        RpcServer.LOG.warn(msg, thrown);
677
678        this.rpcServer.metrics.exception(thrown);
679
680        final Throwable responseThrowable;
681        if (thrown instanceof LinkageError) {
682          // probably the hbase hadoop version does not match the running hadoop version
683          responseThrowable = new DoNotRetryIOException(thrown);
684        } else if (thrown instanceof UnsupportedOperationException) {
685          // If the method is not present on the server, do not retry.
686          responseThrowable = new DoNotRetryIOException(thrown);
687        } else {
688          responseThrowable = thrown;
689        }
690
691        ServerCall<?> readParamsFailedCall = createCall(id, this.service, null, null, null, null,
692          totalRequestSize, null, 0, this.callCleanup);
693        readParamsFailedCall.setResponse(null, null, responseThrowable,
694          msg + "; " + responseThrowable.getMessage());
695        TraceUtil.setError(span, responseThrowable);
696        readParamsFailedCall.sendResponseIfReady();
697        return;
698      }
699
700      int timeout = 0;
701      if (header.hasTimeout() && header.getTimeout() > 0) {
702        timeout = Math.max(this.rpcServer.minClientRequestTimeout, header.getTimeout());
703      }
704      ServerCall<?> call = createCall(id, this.service, md, header, param, cellScanner,
705        totalRequestSize, this.addr, timeout, this.callCleanup);
706
707      if (this.rpcServer.scheduler.dispatch(new CallRunner(this.rpcServer, call))) {
708        // unset span do that it's not closed in the finally block
709        span = null;
710      } else {
711        this.rpcServer.callQueueSizeInBytes.add(-1 * call.getSize());
712        this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION);
713        call.setResponse(null, null, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION,
714          "Call queue is full on " + this.rpcServer.server.getServerName()
715            + ", too many items queued ?");
716        TraceUtil.setError(span, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION);
717        call.sendResponseIfReady();
718      }
719    } finally {
720      if (span != null) {
721        span.end();
722      }
723    }
724  }
725
726  protected final RpcResponse getErrorResponse(String msg, Exception e) throws IOException {
727    ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder().setCallId(-1);
728    ServerCall.setExceptionResponse(e, msg, headerBuilder);
729    ByteBuffer headerBuf =
730      ServerCall.createHeaderAndMessageBytes(null, headerBuilder.build(), 0, null);
731    BufferChain buf = new BufferChain(headerBuf);
732    return () -> buf;
733  }
734
735  private void doBadPreambleHandling(String msg) throws IOException {
736    doBadPreambleHandling(msg, new FatalConnectionException(msg));
737  }
738
739  private void doBadPreambleHandling(String msg, Exception e) throws IOException {
740    SimpleRpcServer.LOG.warn(msg);
741    doRespond(getErrorResponse(msg, e));
742  }
743
744  protected final void callCleanupIfNeeded() {
745    if (callCleanup != null) {
746      callCleanup.run();
747      callCleanup = null;
748    }
749  }
750
751  protected final boolean processPreamble(ByteBuffer preambleBuffer) throws IOException {
752    assert preambleBuffer.remaining() == 6;
753    for (int i = 0; i < RPC_HEADER.length; i++) {
754      if (RPC_HEADER[i] != preambleBuffer.get()) {
755        doBadPreambleHandling(
756          "Expected HEADER=" + Bytes.toStringBinary(RPC_HEADER) + " but received HEADER="
757            + Bytes.toStringBinary(preambleBuffer.array(), 0, RPC_HEADER.length) + " from "
758            + toString());
759        return false;
760      }
761    }
762    int version = preambleBuffer.get() & 0xFF;
763    byte authbyte = preambleBuffer.get();
764
765    if (version != SimpleRpcServer.CURRENT_VERSION) {
766      String msg = getFatalConnectionString(version, authbyte);
767      doBadPreambleHandling(msg, new WrongVersionException(msg));
768      return false;
769    }
770    this.provider = this.saslProviders.selectProvider(authbyte);
771    if (this.provider == null) {
772      String msg = getFatalConnectionString(version, authbyte);
773      doBadPreambleHandling(msg, new BadAuthException(msg));
774      return false;
775    }
776    // TODO this is a wart while simple auth'n doesn't go through sasl.
777    if (this.rpcServer.isSecurityEnabled && isSimpleAuthentication()) {
778      if (this.rpcServer.allowFallbackToSimpleAuth) {
779        this.rpcServer.metrics.authenticationFallback();
780        authenticatedWithFallback = true;
781      } else {
782        AccessDeniedException ae = new AccessDeniedException("Authentication is required");
783        doRespond(getErrorResponse(ae.getMessage(), ae));
784        return false;
785      }
786    }
787    if (!this.rpcServer.isSecurityEnabled && !isSimpleAuthentication()) {
788      doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(SaslUtil.SWITCH_TO_SIMPLE_AUTH), null,
789        null);
790      provider = saslProviders.getSimpleProvider();
791      // client has already sent the initial Sasl message and we
792      // should ignore it. Both client and server should fall back
793      // to simple auth from now on.
794      skipInitialSaslHandshake = true;
795    }
796    useSasl = !(provider instanceof SimpleSaslServerAuthenticationProvider);
797    return true;
798  }
799
800  boolean isSimpleAuthentication() {
801    return Objects.requireNonNull(provider) instanceof SimpleSaslServerAuthenticationProvider;
802  }
803
804  public abstract boolean isConnectionOpen();
805
806  public abstract ServerCall<?> createCall(int id, BlockingService service, MethodDescriptor md,
807    RequestHeader header, Message param, CellScanner cellScanner, long size,
808    InetAddress remoteAddress, int timeout, CallCleanup reqCleanup);
809
810  private static class ByteBuffByteInput extends ByteInput {
811
812    private ByteBuff buf;
813    private int offset;
814    private int length;
815
816    ByteBuffByteInput(ByteBuff buf, int offset, int length) {
817      this.buf = buf;
818      this.offset = offset;
819      this.length = length;
820    }
821
822    @Override
823    public byte read(int offset) {
824      return this.buf.get(getAbsoluteOffset(offset));
825    }
826
827    private int getAbsoluteOffset(int offset) {
828      return this.offset + offset;
829    }
830
831    @Override
832    public int read(int offset, byte[] out, int outOffset, int len) {
833      this.buf.get(getAbsoluteOffset(offset), out, outOffset, len);
834      return len;
835    }
836
837    @Override
838    public int read(int offset, ByteBuffer out) {
839      int len = out.remaining();
840      this.buf.get(out, getAbsoluteOffset(offset), len);
841      return len;
842    }
843
844    @Override
845    public int size() {
846      return this.length;
847    }
848  }
849}