001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION;
021
022import java.io.IOException;
023import java.net.InetAddress;
024import java.net.InetSocketAddress;
025import java.nio.ByteBuffer;
026import java.nio.channels.ReadableByteChannel;
027import java.nio.channels.WritableByteChannel;
028import java.util.Collections;
029import java.util.HashMap;
030import java.util.List;
031import java.util.Locale;
032import java.util.Map;
033import java.util.Optional;
034import java.util.concurrent.atomic.LongAdder;
035import org.apache.commons.lang3.StringUtils;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.hbase.CallQueueTooBigException;
038import org.apache.hadoop.hbase.CellScanner;
039import org.apache.hadoop.hbase.DoNotRetryIOException;
040import org.apache.hadoop.hbase.HConstants;
041import org.apache.hadoop.hbase.Server;
042import org.apache.hadoop.hbase.conf.ConfigurationObserver;
043import org.apache.hadoop.hbase.io.ByteBuffAllocator;
044import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
045import org.apache.hadoop.hbase.monitoring.TaskMonitor;
046import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
047import org.apache.hadoop.hbase.namequeues.RpcLogDetails;
048import org.apache.hadoop.hbase.regionserver.RSRpcServices;
049import org.apache.hadoop.hbase.security.HBasePolicyProvider;
050import org.apache.hadoop.hbase.security.SaslUtil;
051import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
052import org.apache.hadoop.hbase.security.User;
053import org.apache.hadoop.hbase.security.UserProvider;
054import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
055import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
056import org.apache.hadoop.hbase.util.GsonUtil;
057import org.apache.hadoop.hbase.util.Pair;
058import org.apache.hadoop.security.UserGroupInformation;
059import org.apache.hadoop.security.authorize.AuthorizationException;
060import org.apache.hadoop.security.authorize.PolicyProvider;
061import org.apache.hadoop.security.authorize.ProxyUsers;
062import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
063import org.apache.hadoop.security.token.SecretManager;
064import org.apache.hadoop.security.token.TokenIdentifier;
065import org.apache.yetus.audience.InterfaceAudience;
066import org.slf4j.Logger;
067import org.slf4j.LoggerFactory;
068
069import org.apache.hbase.thirdparty.com.google.gson.Gson;
070import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
071import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
072import org.apache.hbase.thirdparty.com.google.protobuf.Message;
073import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
074import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
075
076import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
077import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
078import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
079
080/**
081 * An RPC server that hosts protobuf described Services.
082 */
083@InterfaceAudience.Private
084public abstract class RpcServer implements RpcServerInterface, ConfigurationObserver {
085  // LOG is being used in CallRunner and the log level is being changed in tests
086  public static final Logger LOG = LoggerFactory.getLogger(RpcServer.class);
087  protected static final CallQueueTooBigException CALL_QUEUE_TOO_BIG_EXCEPTION =
088    new CallQueueTooBigException();
089
090  private static final String MULTI_GETS = "multi.gets";
091  private static final String MULTI_MUTATIONS = "multi.mutations";
092  private static final String MULTI_SERVICE_CALLS = "multi.service_calls";
093
094  private final boolean authorize;
095  private volatile boolean isOnlineLogProviderEnabled;
096  protected boolean isSecurityEnabled;
097
098  public static final byte CURRENT_VERSION = 0;
099
100  /**
101   * Whether we allow a fallback to SIMPLE auth for insecure clients when security is enabled.
102   */
103  public static final String FALLBACK_TO_INSECURE_CLIENT_AUTH =
104    "hbase.ipc.server.fallback-to-simple-auth-allowed";
105
106  /**
107   * How many calls/handler are allowed in the queue.
108   */
109  protected static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10;
110
111  protected final CellBlockBuilder cellBlockBuilder;
112
113  protected static final String AUTH_FAILED_FOR = "Auth failed for ";
114  protected static final String AUTH_SUCCESSFUL_FOR = "Auth successful for ";
115  protected static final Logger AUDITLOG =
116    LoggerFactory.getLogger("SecurityLogger." + Server.class.getName());
117  protected SecretManager<TokenIdentifier> secretManager;
118  protected final Map<String, String> saslProps;
119
120  protected ServiceAuthorizationManager authManager;
121
122  /**
123   * This is set to Call object before Handler invokes an RPC and ybdie after the call returns.
124   */
125  protected static final ThreadLocal<RpcCall> CurCall = new ThreadLocal<>();
126
127  /** Keeps MonitoredRPCHandler per handler thread. */
128  protected static final ThreadLocal<MonitoredRPCHandler> MONITORED_RPC = new ThreadLocal<>();
129
130  protected final InetSocketAddress bindAddress;
131
132  protected MetricsHBaseServer metrics;
133
134  protected final Configuration conf;
135
136  /**
137   * Maximum size in bytes of the currently queued and running Calls. If a new Call puts us over
138   * this size, then we will reject the call (after parsing it though). It will go back to the
139   * client and client will retry. Set this size with "hbase.ipc.server.max.callqueue.size". The
140   * call queue size gets incremented after we parse a call and before we add it to the queue of
141   * calls for the scheduler to use. It get decremented after we have 'run' the Call. The current
142   * size is kept in {@link #callQueueSizeInBytes}.
143   * @see #callQueueSizeInBytes
144   * @see #DEFAULT_MAX_CALLQUEUE_SIZE
145   */
146  protected final long maxQueueSizeInBytes;
147  protected static final int DEFAULT_MAX_CALLQUEUE_SIZE = 1024 * 1024 * 1024;
148
149  /**
150   * This is a running count of the size in bytes of all outstanding calls whether currently
151   * executing or queued waiting to be run.
152   */
153  protected final LongAdder callQueueSizeInBytes = new LongAdder();
154
155  protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
156  protected final boolean tcpKeepAlive; // if T then use keepalives
157
158  /**
159   * This flag is used to indicate to sub threads when they should go down. When we call
160   * {@link #start()}, all threads started will consult this flag on whether they should keep going.
161   * It is set to false when {@link #stop()} is called.
162   */
163  volatile boolean running = true;
164
165  /**
166   * This flag is set to true after all threads are up and 'running' and the server is then opened
167   * for business by the call to {@link #start()}.
168   */
169  volatile boolean started = false;
170
171  protected AuthenticationTokenSecretManager authTokenSecretMgr = null;
172
173  protected HBaseRPCErrorHandler errorHandler = null;
174
175  public static final String MAX_REQUEST_SIZE = "hbase.ipc.max.request.size";
176
177  protected static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time";
178  protected static final String WARN_RESPONSE_SIZE = "hbase.ipc.warn.response.size";
179  protected static final String WARN_SCAN_RESPONSE_TIME = "hbase.ipc.warn.response.time.scan";
180  protected static final String WARN_SCAN_RESPONSE_SIZE = "hbase.ipc.warn.response.size.scan";
181
182  /**
183   * Minimum allowable timeout (in milliseconds) in rpc request's header. This configuration exists
184   * to prevent the rpc service regarding this request as timeout immediately.
185   */
186  protected static final String MIN_CLIENT_REQUEST_TIMEOUT = "hbase.ipc.min.client.request.timeout";
187  protected static final int DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT = 20;
188
189  /** Default value for above params */
190  public static final int DEFAULT_MAX_REQUEST_SIZE = DEFAULT_MAX_CALLQUEUE_SIZE / 4; // 256M
191  protected static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds
192  protected static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
193
194  protected static final int DEFAULT_TRACE_LOG_MAX_LENGTH = 1000;
195  protected static final String TRACE_LOG_MAX_LENGTH = "hbase.ipc.trace.log.max.length";
196  protected static final String KEY_WORD_TRUNCATED = " <TRUNCATED>";
197
198  protected static final Gson GSON = GsonUtil.createGsonWithDisableHtmlEscaping().create();
199
200  protected final int maxRequestSize;
201  protected volatile int warnResponseTime;
202  protected volatile int warnResponseSize;
203  protected volatile int warnScanResponseTime;
204  protected volatile int warnScanResponseSize;
205
206  protected final int minClientRequestTimeout;
207
208  protected final Server server;
209  protected final List<BlockingServiceAndInterface> services;
210
211  protected final RpcScheduler scheduler;
212
213  protected UserProvider userProvider;
214
215  protected final ByteBuffAllocator bbAllocator;
216
217  protected volatile boolean allowFallbackToSimpleAuth;
218
219  /**
220   * Used to get details for scan with a scanner_id<br/>
221   * TODO try to figure out a better way and remove reference from regionserver package later.
222   */
223  private RSRpcServices rsRpcServices;
224
225  /**
226   * Use to add online slowlog responses
227   */
228  private NamedQueueRecorder namedQueueRecorder;
229
230  @FunctionalInterface
231  protected interface CallCleanup {
232    void run();
233  }
234
235  /**
236   * Datastructure for passing a {@link BlockingService} and its associated class of protobuf
237   * service interface. For example, a server that fielded what is defined in the client protobuf
238   * service would pass in an implementation of the client blocking service and then its
239   * ClientService.BlockingInterface.class. Used checking connection setup.
240   */
241  public static class BlockingServiceAndInterface {
242    private final BlockingService service;
243    private final Class<?> serviceInterface;
244
245    public BlockingServiceAndInterface(final BlockingService service,
246      final Class<?> serviceInterface) {
247      this.service = service;
248      this.serviceInterface = serviceInterface;
249    }
250
251    public Class<?> getServiceInterface() {
252      return this.serviceInterface;
253    }
254
255    public BlockingService getBlockingService() {
256      return this.service;
257    }
258  }
259
260  /**
261   * Constructs a server listening on the named port and address.
262   * @param server           hosting instance of {@link Server}. We will do authentications if an
263   *                         instance else pass null for no authentication check.
264   * @param name             Used keying this rpc servers' metrics and for naming the Listener
265   *                         thread.
266   * @param services         A list of services.
267   * @param bindAddress      Where to listen
268   * @param reservoirEnabled Enable ByteBufferPool or not.
269   */
270  public RpcServer(final Server server, final String name,
271    final List<BlockingServiceAndInterface> services, final InetSocketAddress bindAddress,
272    Configuration conf, RpcScheduler scheduler, boolean reservoirEnabled) throws IOException {
273    this.bbAllocator = ByteBuffAllocator.create(conf, reservoirEnabled);
274    this.server = server;
275    this.services = services;
276    this.bindAddress = bindAddress;
277    this.conf = conf;
278    // See declaration above for documentation on what this size is.
279    this.maxQueueSizeInBytes =
280      this.conf.getLong("hbase.ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE);
281
282    this.warnResponseTime = getWarnResponseTime(conf);
283    this.warnResponseSize = getWarnResponseSize(conf);
284    this.warnScanResponseTime = getWarnScanResponseTime(conf);
285    this.warnScanResponseSize = getWarnScanResponseSize(conf);
286    this.minClientRequestTimeout =
287      conf.getInt(MIN_CLIENT_REQUEST_TIMEOUT, DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT);
288    this.maxRequestSize = conf.getInt(MAX_REQUEST_SIZE, DEFAULT_MAX_REQUEST_SIZE);
289
290    this.metrics = new MetricsHBaseServer(name, new MetricsHBaseServerWrapperImpl(this));
291    this.tcpNoDelay = conf.getBoolean("hbase.ipc.server.tcpnodelay", true);
292    this.tcpKeepAlive = conf.getBoolean("hbase.ipc.server.tcpkeepalive", true);
293
294    this.cellBlockBuilder = new CellBlockBuilder(conf);
295
296    this.authorize = conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false);
297    this.userProvider = UserProvider.instantiate(conf);
298    this.isSecurityEnabled = userProvider.isHBaseSecurityEnabled();
299    if (isSecurityEnabled) {
300      saslProps = SaslUtil.initSaslProperties(conf.get("hbase.rpc.protection",
301        QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)));
302    } else {
303      saslProps = Collections.emptyMap();
304    }
305
306    this.isOnlineLogProviderEnabled = getIsOnlineLogProviderEnabled(conf);
307    this.scheduler = scheduler;
308  }
309
310  @Override
311  public void onConfigurationChange(Configuration newConf) {
312    initReconfigurable(newConf);
313    if (scheduler instanceof ConfigurationObserver) {
314      ((ConfigurationObserver) scheduler).onConfigurationChange(newConf);
315    }
316    if (authorize) {
317      refreshAuthManager(newConf, new HBasePolicyProvider());
318    }
319    refreshSlowLogConfiguration(newConf);
320  }
321
322  private void refreshSlowLogConfiguration(Configuration newConf) {
323    boolean newIsOnlineLogProviderEnabled = getIsOnlineLogProviderEnabled(newConf);
324    if (isOnlineLogProviderEnabled != newIsOnlineLogProviderEnabled) {
325      isOnlineLogProviderEnabled = newIsOnlineLogProviderEnabled;
326    }
327    int newWarnResponseTime = getWarnResponseTime(newConf);
328    if (warnResponseTime != newWarnResponseTime) {
329      warnResponseTime = newWarnResponseTime;
330    }
331    int newWarnResponseSize = getWarnResponseSize(newConf);
332    if (warnResponseSize != newWarnResponseSize) {
333      warnResponseSize = newWarnResponseSize;
334    }
335    int newWarnResponseTimeScan = getWarnScanResponseTime(newConf);
336    if (warnScanResponseTime != newWarnResponseTimeScan) {
337      warnScanResponseTime = newWarnResponseTimeScan;
338    }
339    int newWarnScanResponseSize = getWarnScanResponseSize(newConf);
340    if (warnScanResponseSize != newWarnScanResponseSize) {
341      warnScanResponseSize = newWarnScanResponseSize;
342    }
343  }
344
345  private static boolean getIsOnlineLogProviderEnabled(Configuration conf) {
346    return conf.getBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY,
347      HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED);
348  }
349
350  private static int getWarnResponseTime(Configuration conf) {
351    return conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME);
352  }
353
354  private static int getWarnResponseSize(Configuration conf) {
355    return conf.getInt(WARN_RESPONSE_SIZE, DEFAULT_WARN_RESPONSE_SIZE);
356  }
357
358  private static int getWarnScanResponseTime(Configuration conf) {
359    return conf.getInt(WARN_SCAN_RESPONSE_TIME, getWarnResponseTime(conf));
360  }
361
362  private static int getWarnScanResponseSize(Configuration conf) {
363    return conf.getInt(WARN_SCAN_RESPONSE_SIZE, getWarnResponseSize(conf));
364  }
365
366  protected void initReconfigurable(Configuration confToLoad) {
367    this.allowFallbackToSimpleAuth = confToLoad.getBoolean(FALLBACK_TO_INSECURE_CLIENT_AUTH, false);
368    if (isSecurityEnabled && allowFallbackToSimpleAuth) {
369      LOG.warn("********* WARNING! *********");
370      LOG.warn("This server is configured to allow connections from INSECURE clients");
371      LOG.warn("(" + FALLBACK_TO_INSECURE_CLIENT_AUTH + " = true).");
372      LOG.warn("While this option is enabled, client identities cannot be secured, and user");
373      LOG.warn("impersonation is possible!");
374      LOG.warn("For secure operation, please disable SIMPLE authentication as soon as possible,");
375      LOG.warn("by setting " + FALLBACK_TO_INSECURE_CLIENT_AUTH + " = false in hbase-site.xml");
376      LOG.warn("****************************");
377    }
378  }
379
380  Configuration getConf() {
381    return conf;
382  }
383
384  @Override
385  public boolean isStarted() {
386    return this.started;
387  }
388
389  @Override
390  public synchronized void refreshAuthManager(Configuration conf, PolicyProvider pp) {
391    // Ignore warnings that this should be accessed in a static way instead of via an instance;
392    // it'll break if you go via static route.
393    System.setProperty("hadoop.policy.file", "hbase-policy.xml");
394    this.authManager.refresh(conf, pp);
395    LOG.info("Refreshed hbase-policy.xml successfully");
396    ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
397    LOG.info("Refreshed super and proxy users successfully");
398  }
399
400  protected AuthenticationTokenSecretManager createSecretManager() {
401    if (!isSecurityEnabled) return null;
402    if (server == null) return null;
403    Configuration conf = server.getConfiguration();
404    long keyUpdateInterval = conf.getLong("hbase.auth.key.update.interval", 24 * 60 * 60 * 1000);
405    long maxAge = conf.getLong("hbase.auth.token.max.lifetime", 7 * 24 * 60 * 60 * 1000);
406    return new AuthenticationTokenSecretManager(conf, server.getZooKeeper(),
407      server.getServerName().toString(), keyUpdateInterval, maxAge);
408  }
409
410  public SecretManager<? extends TokenIdentifier> getSecretManager() {
411    return this.secretManager;
412  }
413
414  @SuppressWarnings("unchecked")
415  public void setSecretManager(SecretManager<? extends TokenIdentifier> secretManager) {
416    this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
417  }
418
419  /**
420   * This is a server side method, which is invoked over RPC. On success the return response has
421   * protobuf response payload. On failure, the exception name and the stack trace are returned in
422   * the protobuf response.
423   */
424  @Override
425  public Pair<Message, CellScanner> call(RpcCall call, MonitoredRPCHandler status)
426    throws IOException {
427    try {
428      MethodDescriptor md = call.getMethod();
429      Message param = call.getParam();
430      status.setRPC(md.getName(), new Object[] { param }, call.getReceiveTime());
431      // TODO: Review after we add in encoded data blocks.
432      status.setRPCPacket(param);
433      status.resume("Servicing call");
434      // get an instance of the method arg type
435      HBaseRpcController controller = new HBaseRpcControllerImpl(call.getCellScanner());
436      controller.setCallTimeout(call.getTimeout());
437      Message result = call.getService().callBlockingMethod(md, controller, param);
438      long receiveTime = call.getReceiveTime();
439      long startTime = call.getStartTime();
440      long endTime = EnvironmentEdgeManager.currentTime();
441      int processingTime = (int) (endTime - startTime);
442      int qTime = (int) (startTime - receiveTime);
443      int totalTime = (int) (endTime - receiveTime);
444      if (LOG.isTraceEnabled()) {
445        LOG.trace(
446          "{}, response: {}, receiveTime: {}, queueTime: {}, processingTime: {}, totalTime: {}",
447          CurCall.get().toString(), TextFormat.shortDebugString(result),
448          CurCall.get().getReceiveTime(), qTime, processingTime, totalTime);
449      }
450      // Use the raw request call size for now.
451      long requestSize = call.getSize();
452      long responseSize = result.getSerializedSize();
453      long responseBlockSize = call.getBlockBytesScanned();
454      if (call.isClientCellBlockSupported()) {
455        // Include the payload size in HBaseRpcController
456        responseSize += call.getResponseCellSize();
457      }
458
459      metrics.dequeuedCall(qTime);
460      metrics.processedCall(processingTime);
461      metrics.totalCall(totalTime);
462      metrics.receivedRequest(requestSize);
463      metrics.sentResponse(responseSize);
464      // log any RPC responses that are slower than the configured warn
465      // response time or larger than configured warning size
466      boolean tooSlow = isTooSlow(call, processingTime);
467      boolean tooLarge = isTooLarge(call, responseSize, responseBlockSize);
468      if (tooSlow || tooLarge) {
469        final String userName = call.getRequestUserName().orElse(StringUtils.EMPTY);
470        // when tagging, we let TooLarge trump TooSmall to keep output simple
471        // note that large responses will often also be slow.
472        logResponse(param, md.getName(), md.getName() + "(" + param.getClass().getName() + ")",
473          tooLarge, tooSlow, status.getClient(), startTime, processingTime, qTime, responseSize,
474          responseBlockSize, userName);
475        if (this.namedQueueRecorder != null && this.isOnlineLogProviderEnabled) {
476          // send logs to ring buffer owned by slowLogRecorder
477          final String className =
478            server == null ? StringUtils.EMPTY : server.getClass().getSimpleName();
479          this.namedQueueRecorder.addRecord(new RpcLogDetails(call, param, status.getClient(),
480            responseSize, responseBlockSize, className, tooSlow, tooLarge));
481        }
482      }
483      return new Pair<>(result, controller.cellScanner());
484    } catch (Throwable e) {
485      // The above callBlockingMethod will always return a SE. Strip the SE wrapper before
486      // putting it on the wire. Its needed to adhere to the pb Service Interface but we don't
487      // need to pass it over the wire.
488      if (e instanceof ServiceException) {
489        if (e.getCause() == null) {
490          LOG.debug("Caught a ServiceException with null cause", e);
491        } else {
492          e = e.getCause();
493        }
494      }
495
496      // increment the number of requests that were exceptions.
497      metrics.exception(e);
498
499      if (e instanceof LinkageError) throw new DoNotRetryIOException(e);
500      if (e instanceof IOException) throw (IOException) e;
501      LOG.error("Unexpected throwable object ", e);
502      throw new IOException(e.getMessage(), e);
503    }
504  }
505
506  /**
507   * Logs an RPC response to the LOG file, producing valid JSON objects for client Operations.
508   * @param param             The parameters received in the call.
509   * @param methodName        The name of the method invoked
510   * @param call              The string representation of the call
511   * @param tooLarge          To indicate if the event is tooLarge
512   * @param tooSlow           To indicate if the event is tooSlow
513   * @param clientAddress     The address of the client who made this call.
514   * @param startTime         The time that the call was initiated, in ms.
515   * @param processingTime    The duration that the call took to run, in ms.
516   * @param qTime             The duration that the call spent on the queue prior to being
517   *                          initiated, in ms.
518   * @param responseSize      The size in bytes of the response buffer.
519   * @param blockBytesScanned The size of block bytes scanned to retrieve the response.
520   * @param userName          UserName of the current RPC Call
521   */
522  void logResponse(Message param, String methodName, String call, boolean tooLarge, boolean tooSlow,
523    String clientAddress, long startTime, int processingTime, int qTime, long responseSize,
524    long blockBytesScanned, String userName) {
525    final String className = server == null ? StringUtils.EMPTY : server.getClass().getSimpleName();
526    // base information that is reported regardless of type of call
527    Map<String, Object> responseInfo = new HashMap<>();
528    responseInfo.put("starttimems", startTime);
529    responseInfo.put("processingtimems", processingTime);
530    responseInfo.put("queuetimems", qTime);
531    responseInfo.put("responsesize", responseSize);
532    responseInfo.put("blockbytesscanned", blockBytesScanned);
533    responseInfo.put("client", clientAddress);
534    responseInfo.put("class", className);
535    responseInfo.put("method", methodName);
536    responseInfo.put("call", call);
537    // The params could be really big, make sure they don't kill us at WARN
538    String stringifiedParam = ProtobufUtil.getShortTextFormat(param);
539    if (stringifiedParam.length() > 150) {
540      // Truncate to 1000 chars if TRACE is on, else to 150 chars
541      stringifiedParam = truncateTraceLog(stringifiedParam);
542    }
543    responseInfo.put("param", stringifiedParam);
544    if (param instanceof ClientProtos.ScanRequest && rsRpcServices != null) {
545      ClientProtos.ScanRequest request = ((ClientProtos.ScanRequest) param);
546      String scanDetails;
547      if (request.hasScannerId()) {
548        long scannerId = request.getScannerId();
549        scanDetails = rsRpcServices.getScanDetailsWithId(scannerId);
550      } else {
551        scanDetails = rsRpcServices.getScanDetailsWithRequest(request);
552      }
553      if (scanDetails != null) {
554        responseInfo.put("scandetails", scanDetails);
555      }
556    }
557    if (param instanceof ClientProtos.MultiRequest) {
558      int numGets = 0;
559      int numMutations = 0;
560      int numServiceCalls = 0;
561      ClientProtos.MultiRequest multi = (ClientProtos.MultiRequest) param;
562      for (ClientProtos.RegionAction regionAction : multi.getRegionActionList()) {
563        for (ClientProtos.Action action : regionAction.getActionList()) {
564          if (action.hasMutation()) {
565            numMutations++;
566          }
567          if (action.hasGet()) {
568            numGets++;
569          }
570          if (action.hasServiceCall()) {
571            numServiceCalls++;
572          }
573        }
574      }
575      responseInfo.put(MULTI_GETS, numGets);
576      responseInfo.put(MULTI_MUTATIONS, numMutations);
577      responseInfo.put(MULTI_SERVICE_CALLS, numServiceCalls);
578    }
579    final String tag =
580      (tooLarge && tooSlow) ? "TooLarge & TooSlow" : (tooSlow ? "TooSlow" : "TooLarge");
581    LOG.warn("(response" + tag + "): " + GSON.toJson(responseInfo));
582  }
583
584  private boolean isTooSlow(RpcCall call, int processingTime) {
585    long warnResponseTime = call.getParam() instanceof ClientProtos.ScanRequest
586      ? warnScanResponseTime
587      : this.warnResponseTime;
588    return (processingTime > warnResponseTime && warnResponseTime > -1);
589  }
590
591  private boolean isTooLarge(RpcCall call, long responseSize, long responseBlockSize) {
592    long warnResponseSize = call.getParam() instanceof ClientProtos.ScanRequest
593      ? warnScanResponseSize
594      : this.warnResponseSize;
595    return (warnResponseSize > -1
596      && (responseSize > warnResponseSize || responseBlockSize > warnResponseSize));
597  }
598
599  /**
600   * Truncate to number of chars decided by conf hbase.ipc.trace.log.max.length if TRACE is on else
601   * to 150 chars Refer to Jira HBASE-20826 and HBASE-20942
602   * @param strParam stringifiedParam to be truncated
603   * @return truncated trace log string
604   */
605  String truncateTraceLog(String strParam) {
606    if (LOG.isTraceEnabled()) {
607      int traceLogMaxLength = getConf().getInt(TRACE_LOG_MAX_LENGTH, DEFAULT_TRACE_LOG_MAX_LENGTH);
608      int truncatedLength =
609        strParam.length() < traceLogMaxLength ? strParam.length() : traceLogMaxLength;
610      String truncatedFlag = truncatedLength == strParam.length() ? "" : KEY_WORD_TRUNCATED;
611      return strParam.subSequence(0, truncatedLength) + truncatedFlag;
612    }
613    return strParam.subSequence(0, 150) + KEY_WORD_TRUNCATED;
614  }
615
616  /**
617   * Set the handler for calling out of RPC for error conditions.
618   * @param handler the handler implementation
619   */
620  @Override
621  public void setErrorHandler(HBaseRPCErrorHandler handler) {
622    this.errorHandler = handler;
623  }
624
625  @Override
626  public HBaseRPCErrorHandler getErrorHandler() {
627    return this.errorHandler;
628  }
629
630  /**
631   * Returns the metrics instance for reporting RPC call statistics
632   */
633  @Override
634  public MetricsHBaseServer getMetrics() {
635    return metrics;
636  }
637
638  @Override
639  public void addCallSize(final long diff) {
640    this.callQueueSizeInBytes.add(diff);
641  }
642
643  /**
644   * Authorize the incoming client connection.
645   * @param user       client user
646   * @param connection incoming connection
647   * @param addr       InetAddress of incoming connection
648   * @throws AuthorizationException when the client isn't authorized to talk the protocol
649   */
650  public synchronized void authorize(UserGroupInformation user, ConnectionHeader connection,
651    InetAddress addr) throws AuthorizationException {
652    if (authorize) {
653      Class<?> c = getServiceInterface(services, connection.getServiceName());
654      authManager.authorize(user, c, getConf(), addr);
655    }
656  }
657
658  /**
659   * When the read or write buffer size is larger than this limit, i/o will be done in chunks of
660   * this size. Most RPC requests and responses would be be smaller.
661   */
662  protected static final int NIO_BUFFER_LIMIT = 64 * 1024; // should not be more than 64KB.
663
664  /**
665   * This is a wrapper around
666   * {@link java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)}. If the amount of data
667   * is large, it writes to channel in smaller chunks. This is to avoid jdk from creating many
668   * direct buffers as the size of ByteBuffer increases. There should not be any performance
669   * degredation.
670   * @param channel writable byte channel to write on
671   * @param buffer  buffer to write
672   * @return number of bytes written
673   * @throws java.io.IOException e
674   * @see java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)
675   */
676  protected int channelRead(ReadableByteChannel channel, ByteBuffer buffer) throws IOException {
677
678    int count = (buffer.remaining() <= NIO_BUFFER_LIMIT)
679      ? channel.read(buffer)
680      : channelIO(channel, null, buffer);
681    if (count > 0) {
682      metrics.receivedBytes(count);
683    }
684    return count;
685  }
686
687  /**
688   * Helper for {@link #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)}.
689   * Only one of readCh or writeCh should be non-null.
690   * @param readCh  read channel
691   * @param writeCh write channel
692   * @param buf     buffer to read or write into/out of
693   * @return bytes written
694   * @throws java.io.IOException e
695   * @see #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)
696   */
697  private static int channelIO(ReadableByteChannel readCh, WritableByteChannel writeCh,
698    ByteBuffer buf) throws IOException {
699
700    int originalLimit = buf.limit();
701    int initialRemaining = buf.remaining();
702    int ret = 0;
703
704    while (buf.remaining() > 0) {
705      try {
706        int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
707        buf.limit(buf.position() + ioSize);
708
709        ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf);
710
711        if (ret < ioSize) {
712          break;
713        }
714
715      } finally {
716        buf.limit(originalLimit);
717      }
718    }
719
720    int nBytes = initialRemaining - buf.remaining();
721    return (nBytes > 0) ? nBytes : ret;
722  }
723
724  /**
725   * Needed for features such as delayed calls. We need to be able to store the current call so that
726   * we can complete it later or ask questions of what is supported by the current ongoing call.
727   * @return An RpcCallContext backed by the currently ongoing call (gotten from a thread local)
728   */
729  public static Optional<RpcCall> getCurrentCall() {
730    return Optional.ofNullable(CurCall.get());
731  }
732
733  /**
734   * Just return the current rpc call if it is a {@link ServerCall} and also has {@link CellScanner}
735   * attached.
736   * <p/>
737   * Mainly used for reference counting as {@link CellScanner} may reference non heap memory.
738   */
739  public static Optional<ServerCall<?>> getCurrentServerCallWithCellScanner() {
740    return getCurrentCall().filter(c -> c instanceof ServerCall)
741      .filter(c -> c.getCellScanner() != null).map(c -> (ServerCall<?>) c);
742  }
743
744  public static boolean isInRpcCallContext() {
745    return CurCall.get() != null;
746  }
747
748  /**
749   * Used by {@link org.apache.hadoop.hbase.procedure2.store.region.RegionProcedureStore}. For
750   * master's rpc call, it may generate new procedure and mutate the region which store procedure.
751   * There are some check about rpc when mutate region, such as rpc timeout check. So unset the rpc
752   * call to avoid the rpc check.
753   * @return the currently ongoing rpc call
754   */
755  public static Optional<RpcCall> unsetCurrentCall() {
756    Optional<RpcCall> rpcCall = getCurrentCall();
757    CurCall.set(null);
758    return rpcCall;
759  }
760
761  /**
762   * Used by {@link org.apache.hadoop.hbase.procedure2.store.region.RegionProcedureStore}. Set the
763   * rpc call back after mutate region.
764   */
765  public static void setCurrentCall(RpcCall rpcCall) {
766    CurCall.set(rpcCall);
767  }
768
769  /**
770   * Returns the user credentials associated with the current RPC request or not present if no
771   * credentials were provided.
772   * @return A User
773   */
774  public static Optional<User> getRequestUser() {
775    Optional<RpcCall> ctx = getCurrentCall();
776    return ctx.isPresent() ? ctx.get().getRequestUser() : Optional.empty();
777  }
778
779  /**
780   * The number of open RPC conections
781   * @return the number of open rpc connections
782   */
783  abstract public int getNumOpenConnections();
784
785  /**
786   * Returns the username for any user associated with the current RPC request or not present if no
787   * user is set.
788   */
789  public static Optional<String> getRequestUserName() {
790    return getRequestUser().map(User::getShortName);
791  }
792
793  /** Returns Address of remote client if a request is ongoing, else null */
794  public static Optional<InetAddress> getRemoteAddress() {
795    return getCurrentCall().map(RpcCall::getRemoteAddress);
796  }
797
798  /**
799   * @param serviceName Some arbitrary string that represents a 'service'.
800   * @param services    Available service instances
801   * @return Matching BlockingServiceAndInterface pair
802   */
803  protected static BlockingServiceAndInterface getServiceAndInterface(
804    final List<BlockingServiceAndInterface> services, final String serviceName) {
805    for (BlockingServiceAndInterface bs : services) {
806      if (bs.getBlockingService().getDescriptorForType().getName().equals(serviceName)) {
807        return bs;
808      }
809    }
810    return null;
811  }
812
813  /**
814   * @param serviceName Some arbitrary string that represents a 'service'.
815   * @param services    Available services and their service interfaces.
816   * @return Service interface class for <code>serviceName</code>
817   */
818  protected static Class<?> getServiceInterface(final List<BlockingServiceAndInterface> services,
819    final String serviceName) {
820    BlockingServiceAndInterface bsasi = getServiceAndInterface(services, serviceName);
821    return bsasi == null ? null : bsasi.getServiceInterface();
822  }
823
824  /**
825   * @param serviceName Some arbitrary string that represents a 'service'.
826   * @param services    Available services and their service interfaces.
827   * @return BlockingService that goes with the passed <code>serviceName</code>
828   */
829  protected static BlockingService getService(final List<BlockingServiceAndInterface> services,
830    final String serviceName) {
831    BlockingServiceAndInterface bsasi = getServiceAndInterface(services, serviceName);
832    return bsasi == null ? null : bsasi.getBlockingService();
833  }
834
835  protected static MonitoredRPCHandler getStatus() {
836    // It is ugly the way we park status up in RpcServer. Let it be for now. TODO.
837    MonitoredRPCHandler status = RpcServer.MONITORED_RPC.get();
838    if (status != null) {
839      return status;
840    }
841    status = TaskMonitor.get().createRPCStatus(Thread.currentThread().getName());
842    status.pause("Waiting for a call");
843    RpcServer.MONITORED_RPC.set(status);
844    return status;
845  }
846
847  /**
848   * Returns the remote side ip address when invoked inside an RPC Returns null incase of an error.
849   */
850  public static InetAddress getRemoteIp() {
851    RpcCall call = CurCall.get();
852    if (call != null) {
853      return call.getRemoteAddress();
854    }
855    return null;
856  }
857
858  @Override
859  public RpcScheduler getScheduler() {
860    return scheduler;
861  }
862
863  @Override
864  public ByteBuffAllocator getByteBuffAllocator() {
865    return this.bbAllocator;
866  }
867
868  @Override
869  public void setRsRpcServices(RSRpcServices rsRpcServices) {
870    this.rsRpcServices = rsRpcServices;
871  }
872
873  @Override
874  public void setNamedQueueRecorder(NamedQueueRecorder namedQueueRecorder) {
875    this.namedQueueRecorder = namedQueueRecorder;
876  }
877
878  protected boolean needAuthorization() {
879    return authorize;
880  }
881}