001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION;
021
022import com.google.errorprone.annotations.RestrictedApi;
023import java.io.IOException;
024import java.net.InetAddress;
025import java.net.InetSocketAddress;
026import java.nio.ByteBuffer;
027import java.nio.channels.ReadableByteChannel;
028import java.nio.channels.WritableByteChannel;
029import java.util.Collections;
030import java.util.HashMap;
031import java.util.List;
032import java.util.Locale;
033import java.util.Map;
034import java.util.Optional;
035import java.util.concurrent.atomic.LongAdder;
036import org.apache.commons.lang3.StringUtils;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.hbase.CallQueueTooBigException;
039import org.apache.hadoop.hbase.CellScanner;
040import org.apache.hadoop.hbase.DoNotRetryIOException;
041import org.apache.hadoop.hbase.ExtendedCellScanner;
042import org.apache.hadoop.hbase.HConstants;
043import org.apache.hadoop.hbase.Server;
044import org.apache.hadoop.hbase.conf.ConfigurationObserver;
045import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
046import org.apache.hadoop.hbase.io.ByteBuffAllocator;
047import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
048import org.apache.hadoop.hbase.monitoring.TaskMonitor;
049import org.apache.hadoop.hbase.monitoring.ThreadLocalServerSideScanMetrics;
050import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
051import org.apache.hadoop.hbase.namequeues.RpcLogDetails;
052import org.apache.hadoop.hbase.regionserver.RSRpcServices;
053import org.apache.hadoop.hbase.security.HBasePolicyProvider;
054import org.apache.hadoop.hbase.security.SaslUtil;
055import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
056import org.apache.hadoop.hbase.security.User;
057import org.apache.hadoop.hbase.security.UserProvider;
058import org.apache.hadoop.hbase.security.provider.SaslServerAuthenticationProviders;
059import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
060import org.apache.hadoop.hbase.util.CoprocessorConfigurationUtil;
061import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
062import org.apache.hadoop.hbase.util.GsonUtil;
063import org.apache.hadoop.hbase.util.Pair;
064import org.apache.hadoop.security.UserGroupInformation;
065import org.apache.hadoop.security.authorize.AuthorizationException;
066import org.apache.hadoop.security.authorize.PolicyProvider;
067import org.apache.hadoop.security.authorize.ProxyUsers;
068import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
069import org.apache.hadoop.security.token.SecretManager;
070import org.apache.hadoop.security.token.TokenIdentifier;
071import org.apache.yetus.audience.InterfaceAudience;
072import org.slf4j.Logger;
073import org.slf4j.LoggerFactory;
074
075import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
076import org.apache.hbase.thirdparty.com.google.gson.Gson;
077import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
078import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
079import org.apache.hbase.thirdparty.com.google.protobuf.Message;
080import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
081import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
082
083import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
084import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
085import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
086
087/**
088 * An RPC server that hosts protobuf described Services.
089 */
090@InterfaceAudience.Private
091public abstract class RpcServer implements RpcServerInterface, ConfigurationObserver {
092  // LOG is being used in CallRunner and the log level is being changed in tests
093  public static final Logger LOG = LoggerFactory.getLogger(RpcServer.class);
094  protected static final CallQueueTooBigException CALL_QUEUE_TOO_BIG_EXCEPTION =
095    new CallQueueTooBigException();
096
097  private static final String MULTI_GETS = "multi.gets";
098  private static final String MULTI_MUTATIONS = "multi.mutations";
099  private static final String MULTI_SERVICE_CALLS = "multi.service_calls";
100
101  private final boolean authorize;
102  private volatile boolean isOnlineLogProviderEnabled;
103  protected boolean isSecurityEnabled;
104  protected final SaslServerAuthenticationProviders saslProviders;
105
106  public static final byte CURRENT_VERSION = 0;
107
108  /**
109   * Whether we allow a fallback to SIMPLE auth for insecure clients when security is enabled.
110   */
111  public static final String FALLBACK_TO_INSECURE_CLIENT_AUTH =
112    "hbase.ipc.server.fallback-to-simple-auth-allowed";
113
114  /**
115   * How many calls/handler are allowed in the queue.
116   */
117  protected static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10;
118
119  protected final CellBlockBuilder cellBlockBuilder;
120
121  protected static final String AUTH_FAILED_FOR = "Auth failed for ";
122  protected static final String AUTH_SUCCESSFUL_FOR = "Auth successful for ";
123  protected static final Logger AUDITLOG =
124    LoggerFactory.getLogger("SecurityLogger." + Server.class.getName());
125  protected SecretManager<TokenIdentifier> secretManager;
126  protected final Map<String, String> saslProps;
127  protected final String serverPrincipal;
128
129  protected ServiceAuthorizationManager authManager;
130
131  /**
132   * This is set to Call object before Handler invokes an RPC and ybdie after the call returns.
133   */
134  protected static final ThreadLocal<RpcCall> CurCall = new ThreadLocal<>();
135
136  /** Keeps MonitoredRPCHandler per handler thread. */
137  protected static final ThreadLocal<MonitoredRPCHandler> MONITORED_RPC = new ThreadLocal<>();
138
139  protected final InetSocketAddress bindAddress;
140
141  protected MetricsHBaseServer metrics;
142
143  protected final Configuration conf;
144
145  /**
146   * Maximum size in bytes of the currently queued and running Calls. If a new Call puts us over
147   * this size, then we will reject the call (after parsing it though). It will go back to the
148   * client and client will retry. Set this size with "hbase.ipc.server.max.callqueue.size". The
149   * call queue size gets incremented after we parse a call and before we add it to the queue of
150   * calls for the scheduler to use. It get decremented after we have 'run' the Call. The current
151   * size is kept in {@link #callQueueSizeInBytes}.
152   * @see #callQueueSizeInBytes
153   * @see #DEFAULT_MAX_CALLQUEUE_SIZE
154   */
155  protected final long maxQueueSizeInBytes;
156  protected static final int DEFAULT_MAX_CALLQUEUE_SIZE = 1024 * 1024 * 1024;
157
158  /**
159   * This is a running count of the size in bytes of all outstanding calls whether currently
160   * executing or queued waiting to be run.
161   */
162  protected final LongAdder callQueueSizeInBytes = new LongAdder();
163
164  protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
165  protected final boolean tcpKeepAlive; // if T then use keepalives
166
167  /**
168   * This flag is used to indicate to sub threads when they should go down. When we call
169   * {@link #start()}, all threads started will consult this flag on whether they should keep going.
170   * It is set to false when {@link #stop()} is called.
171   */
172  volatile boolean running = true;
173
174  /**
175   * This flag is set to true after all threads are up and 'running' and the server is then opened
176   * for business by the call to {@link #start()}.
177   */
178  volatile boolean started = false;
179
180  protected AuthenticationTokenSecretManager authTokenSecretMgr = null;
181
182  protected HBaseRPCErrorHandler errorHandler = null;
183
184  public static final String MAX_REQUEST_SIZE = "hbase.ipc.max.request.size";
185
186  protected static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time";
187  protected static final String WARN_RESPONSE_SIZE = "hbase.ipc.warn.response.size";
188  protected static final String WARN_SCAN_RESPONSE_TIME = "hbase.ipc.warn.response.time.scan";
189  protected static final String WARN_SCAN_RESPONSE_SIZE = "hbase.ipc.warn.response.size.scan";
190
191  /**
192   * Minimum allowable timeout (in milliseconds) in rpc request's header. This configuration exists
193   * to prevent the rpc service regarding this request as timeout immediately.
194   */
195  protected static final String MIN_CLIENT_REQUEST_TIMEOUT = "hbase.ipc.min.client.request.timeout";
196  protected static final int DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT = 20;
197
198  /** Default value for above params */
199  public static final int DEFAULT_MAX_REQUEST_SIZE = DEFAULT_MAX_CALLQUEUE_SIZE / 4; // 256M
200  protected static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds
201  protected static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
202
203  protected static final int DEFAULT_TRACE_LOG_MAX_LENGTH = 1000;
204  protected static final String TRACE_LOG_MAX_LENGTH = "hbase.ipc.trace.log.max.length";
205  protected static final String KEY_WORD_TRUNCATED = " <TRUNCATED>";
206
207  protected static final Gson GSON = GsonUtil.createGsonWithDisableHtmlEscaping().create();
208
209  protected final int maxRequestSize;
210  protected volatile int warnResponseTime;
211  protected volatile int warnResponseSize;
212  protected volatile int warnScanResponseTime;
213  protected volatile int warnScanResponseSize;
214
215  protected final int minClientRequestTimeout;
216
217  protected final Server server;
218  protected final List<BlockingServiceAndInterface> services;
219
220  protected final RpcScheduler scheduler;
221
222  protected final UserProvider userProvider;
223
224  protected final ByteBuffAllocator bbAllocator;
225
226  protected volatile boolean allowFallbackToSimpleAuth;
227
228  volatile RpcCoprocessorHost cpHost;
229
230  /**
231   * Used to get details for scan with a scanner_id<br/>
232   * TODO try to figure out a better way and remove reference from regionserver package later.
233   */
234  private RSRpcServices rsRpcServices;
235
236  /**
237   * Use to add online slowlog responses
238   */
239  private NamedQueueRecorder namedQueueRecorder;
240
241  @FunctionalInterface
242  protected interface CallCleanup {
243    void run();
244  }
245
246  /**
247   * Datastructure for passing a {@link BlockingService} and its associated class of protobuf
248   * service interface. For example, a server that fielded what is defined in the client protobuf
249   * service would pass in an implementation of the client blocking service and then its
250   * ClientService.BlockingInterface.class. Used checking connection setup.
251   */
252  public static class BlockingServiceAndInterface {
253    private final BlockingService service;
254    private final Class<?> serviceInterface;
255
256    public BlockingServiceAndInterface(final BlockingService service,
257      final Class<?> serviceInterface) {
258      this.service = service;
259      this.serviceInterface = serviceInterface;
260    }
261
262    public Class<?> getServiceInterface() {
263      return this.serviceInterface;
264    }
265
266    public BlockingService getBlockingService() {
267      return this.service;
268    }
269  }
270
271  /**
272   * Constructs a server listening on the named port and address.
273   * @param server           hosting instance of {@link Server}. We will do authentications if an
274   *                         instance else pass null for no authentication check.
275   * @param name             Used keying this rpc servers' metrics and for naming the Listener
276   *                         thread.
277   * @param services         A list of services.
278   * @param bindAddress      Where to listen
279   * @param reservoirEnabled Enable ByteBufferPool or not.
280   */
281  public RpcServer(final Server server, final String name,
282    final List<BlockingServiceAndInterface> services, final InetSocketAddress bindAddress,
283    Configuration conf, RpcScheduler scheduler, boolean reservoirEnabled) throws IOException {
284    this.bbAllocator = ByteBuffAllocator.create(conf, reservoirEnabled);
285    this.server = server;
286    this.services = services;
287    this.bindAddress = bindAddress;
288    this.conf = conf;
289    // See declaration above for documentation on what this size is.
290    this.maxQueueSizeInBytes =
291      this.conf.getLong("hbase.ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE);
292
293    this.warnResponseTime = getWarnResponseTime(conf);
294    this.warnResponseSize = getWarnResponseSize(conf);
295    this.warnScanResponseTime = getWarnScanResponseTime(conf);
296    this.warnScanResponseSize = getWarnScanResponseSize(conf);
297    this.minClientRequestTimeout =
298      conf.getInt(MIN_CLIENT_REQUEST_TIMEOUT, DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT);
299    this.maxRequestSize = conf.getInt(MAX_REQUEST_SIZE, DEFAULT_MAX_REQUEST_SIZE);
300
301    this.metrics = new MetricsHBaseServer(name, new MetricsHBaseServerWrapperImpl(this));
302    this.tcpNoDelay = conf.getBoolean("hbase.ipc.server.tcpnodelay", true);
303    this.tcpKeepAlive = conf.getBoolean("hbase.ipc.server.tcpkeepalive", true);
304
305    this.cellBlockBuilder = new CellBlockBuilder(conf);
306
307    this.authorize = conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false);
308    this.userProvider = UserProvider.instantiate(conf);
309    this.isSecurityEnabled = userProvider.isHBaseSecurityEnabled();
310    if (isSecurityEnabled) {
311      saslProps = SaslUtil.initSaslProperties(conf.get("hbase.rpc.protection",
312        QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)));
313      serverPrincipal = Preconditions.checkNotNull(userProvider.getCurrentUserName(),
314        "can not get current user name when security is enabled");
315    } else {
316      saslProps = Collections.emptyMap();
317      serverPrincipal = HConstants.EMPTY_STRING;
318    }
319    this.saslProviders = new SaslServerAuthenticationProviders(conf);
320
321    this.isOnlineLogProviderEnabled = getIsOnlineLogProviderEnabled(conf);
322    this.scheduler = scheduler;
323
324    initializeCoprocessorHost(getConf());
325  }
326
327  @Override
328  public void onConfigurationChange(Configuration newConf) {
329    initReconfigurable(newConf);
330    if (scheduler instanceof ConfigurationObserver) {
331      ((ConfigurationObserver) scheduler).onConfigurationChange(newConf);
332    }
333    if (authorize) {
334      refreshAuthManager(newConf, new HBasePolicyProvider());
335    }
336    refreshSlowLogConfiguration(newConf);
337    if (
338      CoprocessorConfigurationUtil.checkConfigurationChange(this.cpHost, newConf,
339        CoprocessorHost.RPC_COPROCESSOR_CONF_KEY)
340    ) {
341      LOG.info("Update the RPC coprocessor(s) because the configuration has changed");
342      initializeCoprocessorHost(newConf);
343    }
344  }
345
346  private void refreshSlowLogConfiguration(Configuration newConf) {
347    boolean newIsOnlineLogProviderEnabled = getIsOnlineLogProviderEnabled(newConf);
348    if (isOnlineLogProviderEnabled != newIsOnlineLogProviderEnabled) {
349      isOnlineLogProviderEnabled = newIsOnlineLogProviderEnabled;
350    }
351    int newWarnResponseTime = getWarnResponseTime(newConf);
352    if (warnResponseTime != newWarnResponseTime) {
353      warnResponseTime = newWarnResponseTime;
354    }
355    int newWarnResponseSize = getWarnResponseSize(newConf);
356    if (warnResponseSize != newWarnResponseSize) {
357      warnResponseSize = newWarnResponseSize;
358    }
359    int newWarnResponseTimeScan = getWarnScanResponseTime(newConf);
360    if (warnScanResponseTime != newWarnResponseTimeScan) {
361      warnScanResponseTime = newWarnResponseTimeScan;
362    }
363    int newWarnScanResponseSize = getWarnScanResponseSize(newConf);
364    if (warnScanResponseSize != newWarnScanResponseSize) {
365      warnScanResponseSize = newWarnScanResponseSize;
366    }
367  }
368
369  private static boolean getIsOnlineLogProviderEnabled(Configuration conf) {
370    return conf.getBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY,
371      HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED);
372  }
373
374  private static int getWarnResponseTime(Configuration conf) {
375    return conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME);
376  }
377
378  private static int getWarnResponseSize(Configuration conf) {
379    return conf.getInt(WARN_RESPONSE_SIZE, DEFAULT_WARN_RESPONSE_SIZE);
380  }
381
382  private static int getWarnScanResponseTime(Configuration conf) {
383    return conf.getInt(WARN_SCAN_RESPONSE_TIME, getWarnResponseTime(conf));
384  }
385
386  private static int getWarnScanResponseSize(Configuration conf) {
387    return conf.getInt(WARN_SCAN_RESPONSE_SIZE, getWarnResponseSize(conf));
388  }
389
390  protected void initReconfigurable(Configuration confToLoad) {
391    this.allowFallbackToSimpleAuth = confToLoad.getBoolean(FALLBACK_TO_INSECURE_CLIENT_AUTH, false);
392    if (isSecurityEnabled && allowFallbackToSimpleAuth) {
393      LOG.warn("********* WARNING! *********");
394      LOG.warn("This server is configured to allow connections from INSECURE clients");
395      LOG.warn("(" + FALLBACK_TO_INSECURE_CLIENT_AUTH + " = true).");
396      LOG.warn("While this option is enabled, client identities cannot be secured, and user");
397      LOG.warn("impersonation is possible!");
398      LOG.warn("For secure operation, please disable SIMPLE authentication as soon as possible,");
399      LOG.warn("by setting " + FALLBACK_TO_INSECURE_CLIENT_AUTH + " = false in hbase-site.xml");
400      LOG.warn("****************************");
401    }
402  }
403
404  Configuration getConf() {
405    return conf;
406  }
407
408  @Override
409  public boolean isStarted() {
410    return this.started;
411  }
412
413  @Override
414  public synchronized void refreshAuthManager(Configuration conf, PolicyProvider pp) {
415    // Ignore warnings that this should be accessed in a static way instead of via an instance;
416    // it'll break if you go via static route.
417    System.setProperty("hadoop.policy.file", "hbase-policy.xml");
418    this.authManager.refresh(conf, pp);
419    LOG.info("Refreshed hbase-policy.xml successfully");
420    ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
421    LOG.info("Refreshed super and proxy users successfully");
422  }
423
424  protected AuthenticationTokenSecretManager createSecretManager() {
425    if (!isSecurityEnabled) return null;
426    if (server == null) return null;
427    Configuration conf = server.getConfiguration();
428    long keyUpdateInterval = conf.getLong("hbase.auth.key.update.interval", 24 * 60 * 60 * 1000);
429    long maxAge = conf.getLong("hbase.auth.token.max.lifetime", 7 * 24 * 60 * 60 * 1000);
430    return new AuthenticationTokenSecretManager(conf, server.getZooKeeper(),
431      server.getServerName().toString(), keyUpdateInterval, maxAge);
432  }
433
434  public SecretManager<? extends TokenIdentifier> getSecretManager() {
435    return this.secretManager;
436  }
437
438  @SuppressWarnings("unchecked")
439  public void setSecretManager(SecretManager<? extends TokenIdentifier> secretManager) {
440    this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
441  }
442
443  /**
444   * This is a server side method, which is invoked over RPC. On success the return response has
445   * protobuf response payload. On failure, the exception name and the stack trace are returned in
446   * the protobuf response.
447   */
448  @Override
449  public Pair<Message, ExtendedCellScanner> call(RpcCall call, MonitoredRPCHandler status)
450    throws IOException {
451    try {
452      MethodDescriptor md = call.getMethod();
453      Message param = call.getParam();
454      status.setRPC(md.getName(), new Object[] { param }, call.getReceiveTime());
455      // TODO: Review after we add in encoded data blocks.
456      status.setRPCPacket(param);
457      status.resume("Servicing call");
458      // get an instance of the method arg type
459      HBaseRpcController controller = new HBaseRpcControllerImpl(call.getCellScanner());
460      controller.setCallTimeout(call.getTimeout());
461      Message result = call.getService().callBlockingMethod(md, controller, param);
462      long receiveTime = call.getReceiveTime();
463      long startTime = call.getStartTime();
464      long endTime = EnvironmentEdgeManager.currentTime();
465      int processingTime = (int) (endTime - startTime);
466      int qTime = (int) (startTime - receiveTime);
467      int totalTime = (int) (endTime - receiveTime);
468      long fsReadTime = ThreadLocalServerSideScanMetrics.getFsReadTimeCounter().get();
469      if (LOG.isTraceEnabled()) {
470        LOG.trace(
471          "{}, response: {}, receiveTime: {}, queueTime: {}, processingTime: {}, "
472            + "totalTime: {}, fsReadTime: {}",
473          CurCall.get().toString(), TextFormat.shortDebugString(result),
474          CurCall.get().getReceiveTime(), qTime, processingTime, totalTime, fsReadTime);
475      }
476      // Use the raw request call size for now.
477      long requestSize = call.getSize();
478      long responseSize = result.getSerializedSize();
479      long responseBlockSize = call.getBlockBytesScanned();
480      if (call.isClientCellBlockSupported()) {
481        // Include the payload size in HBaseRpcController
482        responseSize += call.getResponseCellSize();
483      }
484
485      metrics.dequeuedCall(qTime);
486      metrics.processedCall(processingTime);
487      metrics.totalCall(totalTime);
488      metrics.receivedRequest(requestSize);
489      metrics.sentResponse(responseSize);
490      // log any RPC responses that are slower than the configured warn
491      // response time or larger than configured warning size
492      boolean tooSlow = isTooSlow(call, processingTime);
493      boolean tooLarge = isTooLarge(call, responseSize, responseBlockSize);
494      if (tooSlow || tooLarge) {
495        final String userName = call.getRequestUserName().orElse(StringUtils.EMPTY);
496        // when tagging, we let TooLarge trump TooSmall to keep output simple
497        // note that large responses will often also be slow.
498        logResponse(param, md.getName(), md.getName() + "(" + param.getClass().getName() + ")",
499          tooLarge, tooSlow, status.getClient(), startTime, processingTime, qTime, responseSize,
500          responseBlockSize, fsReadTime, userName);
501        if (this.namedQueueRecorder != null && this.isOnlineLogProviderEnabled) {
502          // send logs to ring buffer owned by slowLogRecorder
503          final String className =
504            server == null ? StringUtils.EMPTY : server.getClass().getSimpleName();
505          this.namedQueueRecorder.addRecord(new RpcLogDetails(call, param, status.getClient(),
506            responseSize, responseBlockSize, fsReadTime, className, tooSlow, tooLarge));
507        }
508      }
509      return new Pair<>(result, controller.cellScanner());
510    } catch (Throwable e) {
511      // The above callBlockingMethod will always return a SE. Strip the SE wrapper before
512      // putting it on the wire. Its needed to adhere to the pb Service Interface but we don't
513      // need to pass it over the wire.
514      if (e instanceof ServiceException) {
515        if (e.getCause() == null) {
516          LOG.debug("Caught a ServiceException with null cause", e);
517        } else {
518          e = e.getCause();
519        }
520      }
521
522      // increment the number of requests that were exceptions.
523      metrics.exception(e);
524
525      if (e instanceof LinkageError) throw new DoNotRetryIOException(e);
526      if (e instanceof IOException) throw (IOException) e;
527      LOG.error("Unexpected throwable object ", e);
528      throw new IOException(e.getMessage(), e);
529    }
530  }
531
532  /**
533   * Logs an RPC response to the LOG file, producing valid JSON objects for client Operations.
534   * @param param             The parameters received in the call.
535   * @param methodName        The name of the method invoked
536   * @param call              The string representation of the call
537   * @param tooLarge          To indicate if the event is tooLarge
538   * @param tooSlow           To indicate if the event is tooSlow
539   * @param clientAddress     The address of the client who made this call.
540   * @param startTime         The time that the call was initiated, in ms.
541   * @param processingTime    The duration that the call took to run, in ms.
542   * @param qTime             The duration that the call spent on the queue prior to being
543   *                          initiated, in ms.
544   * @param responseSize      The size in bytes of the response buffer.
545   * @param blockBytesScanned The size of block bytes scanned to retrieve the response.
546   * @param userName          UserName of the current RPC Call
547   */
548  void logResponse(Message param, String methodName, String call, boolean tooLarge, boolean tooSlow,
549    String clientAddress, long startTime, int processingTime, int qTime, long responseSize,
550    long blockBytesScanned, long fsReadTime, String userName) {
551    final String className = server == null ? StringUtils.EMPTY : server.getClass().getSimpleName();
552    // base information that is reported regardless of type of call
553    Map<String, Object> responseInfo = new HashMap<>();
554    responseInfo.put("starttimems", startTime);
555    responseInfo.put("processingtimems", processingTime);
556    responseInfo.put("queuetimems", qTime);
557    responseInfo.put("responsesize", responseSize);
558    responseInfo.put("blockbytesscanned", blockBytesScanned);
559    responseInfo.put("fsreadtime", fsReadTime);
560    responseInfo.put("client", clientAddress);
561    responseInfo.put("class", className);
562    responseInfo.put("method", methodName);
563    responseInfo.put("call", call);
564    // The params could be really big, make sure they don't kill us at WARN
565    String stringifiedParam = ProtobufUtil.getShortTextFormat(param);
566    if (stringifiedParam.length() > 150) {
567      // Truncate to 1000 chars if TRACE is on, else to 150 chars
568      stringifiedParam = truncateTraceLog(stringifiedParam);
569    }
570    responseInfo.put("param", stringifiedParam);
571    if (param instanceof ClientProtos.ScanRequest && rsRpcServices != null) {
572      ClientProtos.ScanRequest request = ((ClientProtos.ScanRequest) param);
573      String scanDetails;
574      if (request.hasScannerId()) {
575        long scannerId = request.getScannerId();
576        scanDetails = rsRpcServices.getScanDetailsWithId(scannerId);
577      } else {
578        scanDetails = rsRpcServices.getScanDetailsWithRequest(request);
579      }
580      if (scanDetails != null) {
581        responseInfo.put("scandetails", scanDetails);
582      }
583    }
584    if (param instanceof ClientProtos.MultiRequest) {
585      int numGets = 0;
586      int numMutations = 0;
587      int numServiceCalls = 0;
588      ClientProtos.MultiRequest multi = (ClientProtos.MultiRequest) param;
589      for (ClientProtos.RegionAction regionAction : multi.getRegionActionList()) {
590        for (ClientProtos.Action action : regionAction.getActionList()) {
591          if (action.hasMutation()) {
592            numMutations++;
593          }
594          if (action.hasGet()) {
595            numGets++;
596          }
597          if (action.hasServiceCall()) {
598            numServiceCalls++;
599          }
600        }
601      }
602      responseInfo.put(MULTI_GETS, numGets);
603      responseInfo.put(MULTI_MUTATIONS, numMutations);
604      responseInfo.put(MULTI_SERVICE_CALLS, numServiceCalls);
605    }
606    final String tag =
607      (tooLarge && tooSlow) ? "TooLarge & TooSlow" : (tooSlow ? "TooSlow" : "TooLarge");
608    LOG.warn("(response" + tag + "): " + GSON.toJson(responseInfo));
609  }
610
611  private boolean isTooSlow(RpcCall call, int processingTime) {
612    long warnResponseTime = call.getParam() instanceof ClientProtos.ScanRequest
613      ? warnScanResponseTime
614      : this.warnResponseTime;
615    return (processingTime > warnResponseTime && warnResponseTime > -1);
616  }
617
618  private boolean isTooLarge(RpcCall call, long responseSize, long responseBlockSize) {
619    long warnResponseSize = call.getParam() instanceof ClientProtos.ScanRequest
620      ? warnScanResponseSize
621      : this.warnResponseSize;
622    return (warnResponseSize > -1
623      && (responseSize > warnResponseSize || responseBlockSize > warnResponseSize));
624  }
625
626  /**
627   * Truncate to number of chars decided by conf hbase.ipc.trace.log.max.length if TRACE is on else
628   * to 150 chars Refer to Jira HBASE-20826 and HBASE-20942
629   * @param strParam stringifiedParam to be truncated
630   * @return truncated trace log string
631   */
632  String truncateTraceLog(String strParam) {
633    if (LOG.isTraceEnabled()) {
634      int traceLogMaxLength = getConf().getInt(TRACE_LOG_MAX_LENGTH, DEFAULT_TRACE_LOG_MAX_LENGTH);
635      int truncatedLength =
636        strParam.length() < traceLogMaxLength ? strParam.length() : traceLogMaxLength;
637      String truncatedFlag = truncatedLength == strParam.length() ? "" : KEY_WORD_TRUNCATED;
638      return strParam.subSequence(0, truncatedLength) + truncatedFlag;
639    }
640    return strParam.subSequence(0, 150) + KEY_WORD_TRUNCATED;
641  }
642
643  /**
644   * Set the handler for calling out of RPC for error conditions.
645   * @param handler the handler implementation
646   */
647  @Override
648  public void setErrorHandler(HBaseRPCErrorHandler handler) {
649    this.errorHandler = handler;
650  }
651
652  @Override
653  public HBaseRPCErrorHandler getErrorHandler() {
654    return this.errorHandler;
655  }
656
657  /**
658   * Returns the metrics instance for reporting RPC call statistics
659   */
660  @Override
661  public MetricsHBaseServer getMetrics() {
662    return metrics;
663  }
664
665  @Override
666  public void addCallSize(final long diff) {
667    this.callQueueSizeInBytes.add(diff);
668  }
669
670  /**
671   * Authorize the incoming client connection.
672   * @param user       client user
673   * @param connection incoming connection
674   * @param addr       InetAddress of incoming connection
675   * @throws AuthorizationException when the client isn't authorized to talk the protocol
676   */
677  public synchronized void authorize(UserGroupInformation user, ConnectionHeader connection,
678    InetAddress addr) throws AuthorizationException {
679    if (authorize) {
680      Class<?> c = getServiceInterface(services, connection.getServiceName());
681      authManager.authorize(user, c, getConf(), addr);
682    }
683  }
684
685  /**
686   * When the read or write buffer size is larger than this limit, i/o will be done in chunks of
687   * this size. Most RPC requests and responses would be be smaller.
688   */
689  protected static final int NIO_BUFFER_LIMIT = 64 * 1024; // should not be more than 64KB.
690
691  /**
692   * This is a wrapper around
693   * {@link java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)}. If the amount of data
694   * is large, it writes to channel in smaller chunks. This is to avoid jdk from creating many
695   * direct buffers as the size of ByteBuffer increases. There should not be any performance
696   * degredation.
697   * @param channel writable byte channel to write on
698   * @param buffer  buffer to write
699   * @return number of bytes written
700   * @throws java.io.IOException e
701   * @see java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)
702   */
703  protected int channelRead(ReadableByteChannel channel, ByteBuffer buffer) throws IOException {
704
705    int count = (buffer.remaining() <= NIO_BUFFER_LIMIT)
706      ? channel.read(buffer)
707      : channelIO(channel, null, buffer);
708    if (count > 0) {
709      metrics.receivedBytes(count);
710    }
711    return count;
712  }
713
714  /**
715   * Helper for {@link #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)}.
716   * Only one of readCh or writeCh should be non-null.
717   * @param readCh  read channel
718   * @param writeCh write channel
719   * @param buf     buffer to read or write into/out of
720   * @return bytes written
721   * @throws java.io.IOException e
722   * @see #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)
723   */
724  private static int channelIO(ReadableByteChannel readCh, WritableByteChannel writeCh,
725    ByteBuffer buf) throws IOException {
726
727    int originalLimit = buf.limit();
728    int initialRemaining = buf.remaining();
729    int ret = 0;
730
731    while (buf.remaining() > 0) {
732      try {
733        int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
734        buf.limit(buf.position() + ioSize);
735
736        ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf);
737
738        if (ret < ioSize) {
739          break;
740        }
741
742      } finally {
743        buf.limit(originalLimit);
744      }
745    }
746
747    int nBytes = initialRemaining - buf.remaining();
748    return (nBytes > 0) ? nBytes : ret;
749  }
750
751  /**
752   * Needed for features such as delayed calls. We need to be able to store the current call so that
753   * we can complete it later or ask questions of what is supported by the current ongoing call.
754   * @return An RpcCallContext backed by the currently ongoing call (gotten from a thread local)
755   */
756  public static Optional<RpcCall> getCurrentCall() {
757    return Optional.ofNullable(CurCall.get());
758  }
759
760  /**
761   * Just return the current rpc call if it is a {@link ServerCall} and also has {@link CellScanner}
762   * attached.
763   * <p/>
764   * Mainly used for reference counting as {@link CellScanner} may reference non heap memory.
765   */
766  public static Optional<ServerCall<?>> getCurrentServerCallWithCellScanner() {
767    return getCurrentCall().filter(c -> c instanceof ServerCall)
768      .filter(c -> c.getCellScanner() != null).map(c -> (ServerCall<?>) c);
769  }
770
771  public static boolean isInRpcCallContext() {
772    return CurCall.get() != null;
773  }
774
775  /**
776   * Used by {@link org.apache.hadoop.hbase.master.region.MasterRegion}, to avoid hit row lock
777   * timeout when updating master region in a rpc call. See HBASE-23895, HBASE-29251 and HBASE-29294
778   * for more details.
779   * @return the currently ongoing rpc call
780   */
781  public static Optional<RpcCall> unsetCurrentCall() {
782    Optional<RpcCall> rpcCall = getCurrentCall();
783    CurCall.set(null);
784    return rpcCall;
785  }
786
787  /**
788   * Used by {@link org.apache.hadoop.hbase.master.region.MasterRegion}. Set the rpc call back after
789   * mutate region.
790   */
791  public static void setCurrentCall(RpcCall rpcCall) {
792    CurCall.set(rpcCall);
793  }
794
795  /**
796   * Returns the user credentials associated with the current RPC request or not present if no
797   * credentials were provided.
798   * @return A User
799   */
800  public static Optional<User> getRequestUser() {
801    Optional<RpcCall> ctx = getCurrentCall();
802    return ctx.isPresent() ? ctx.get().getRequestUser() : Optional.empty();
803  }
804
805  /**
806   * Returns the RPC connection attributes for the current RPC request. These attributes are sent by
807   * the client when initiating a new connection to the HBase server. The attributes are sent in
808   * {@code ConnectionHeader.attribute} protobuf message.
809   * @return the attribute map. It will be empty if the method is called outside of an RPC context.
810   */
811  public static Map<String, byte[]> getConnectionAttributes() {
812    return getCurrentCall().map(RpcCall::getConnectionAttributes).orElse(Map.of());
813  }
814
815  /**
816   * The number of open RPC conections
817   * @return the number of open rpc connections
818   */
819  abstract public int getNumOpenConnections();
820
821  /**
822   * Returns the username for any user associated with the current RPC request or not present if no
823   * user is set.
824   */
825  public static Optional<String> getRequestUserName() {
826    return getRequestUser().map(User::getShortName);
827  }
828
829  /**
830   * Returns the address of the remote client associated with the current RPC request or not present
831   * if no address is set.
832   */
833  public static Optional<InetAddress> getRemoteAddress() {
834    return getCurrentCall().map(RpcCall::getRemoteAddress);
835  }
836
837  /**
838   * @param serviceName Some arbitrary string that represents a 'service'.
839   * @param services    Available service instances
840   * @return Matching BlockingServiceAndInterface pair
841   */
842  protected static BlockingServiceAndInterface getServiceAndInterface(
843    final List<BlockingServiceAndInterface> services, final String serviceName) {
844    for (BlockingServiceAndInterface bs : services) {
845      if (bs.getBlockingService().getDescriptorForType().getName().equals(serviceName)) {
846        return bs;
847      }
848    }
849    return null;
850  }
851
852  /**
853   * @param serviceName Some arbitrary string that represents a 'service'.
854   * @param services    Available services and their service interfaces.
855   * @return Service interface class for <code>serviceName</code>
856   */
857  protected static Class<?> getServiceInterface(final List<BlockingServiceAndInterface> services,
858    final String serviceName) {
859    BlockingServiceAndInterface bsasi = getServiceAndInterface(services, serviceName);
860    return bsasi == null ? null : bsasi.getServiceInterface();
861  }
862
863  /**
864   * @param serviceName Some arbitrary string that represents a 'service'.
865   * @param services    Available services and their service interfaces.
866   * @return BlockingService that goes with the passed <code>serviceName</code>
867   */
868  protected static BlockingService getService(final List<BlockingServiceAndInterface> services,
869    final String serviceName) {
870    BlockingServiceAndInterface bsasi = getServiceAndInterface(services, serviceName);
871    return bsasi == null ? null : bsasi.getBlockingService();
872  }
873
874  protected static MonitoredRPCHandler getStatus() {
875    // It is ugly the way we park status up in RpcServer. Let it be for now. TODO.
876    MonitoredRPCHandler status = RpcServer.MONITORED_RPC.get();
877    if (status != null) {
878      return status;
879    }
880    status = TaskMonitor.get().createRPCStatus(Thread.currentThread().getName());
881    status.pause("Waiting for a call");
882    RpcServer.MONITORED_RPC.set(status);
883    return status;
884  }
885
886  /**
887   * Returns the remote side ip address when invoked inside an RPC Returns null incase of an error.
888   */
889  public static InetAddress getRemoteIp() {
890    RpcCall call = CurCall.get();
891    if (call != null) {
892      return call.getRemoteAddress();
893    }
894    return null;
895  }
896
897  @Override
898  public RpcScheduler getScheduler() {
899    return scheduler;
900  }
901
902  @Override
903  public ByteBuffAllocator getByteBuffAllocator() {
904    return this.bbAllocator;
905  }
906
907  @Override
908  public void setRsRpcServices(RSRpcServices rsRpcServices) {
909    this.rsRpcServices = rsRpcServices;
910  }
911
912  @Override
913  public void setNamedQueueRecorder(NamedQueueRecorder namedQueueRecorder) {
914    this.namedQueueRecorder = namedQueueRecorder;
915  }
916
917  protected boolean needAuthorization() {
918    return authorize;
919  }
920
921  @RestrictedApi(explanation = "Should only be called in tests", link = "",
922      allowedOnPath = ".*/src/test/.*")
923  public List<BlockingServiceAndInterface> getServices() {
924    return services;
925  }
926
927  private void initializeCoprocessorHost(Configuration conf) {
928    this.cpHost = new RpcCoprocessorHost(conf);
929  }
930
931  @Override
932  public RpcCoprocessorHost getRpcCoprocessorHost() {
933    return cpHost;
934  }
935}