001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION;
021
022import com.google.errorprone.annotations.RestrictedApi;
023import java.io.IOException;
024import java.net.InetAddress;
025import java.net.InetSocketAddress;
026import java.nio.ByteBuffer;
027import java.nio.channels.ReadableByteChannel;
028import java.nio.channels.WritableByteChannel;
029import java.util.Collections;
030import java.util.HashMap;
031import java.util.List;
032import java.util.Locale;
033import java.util.Map;
034import java.util.Optional;
035import java.util.concurrent.atomic.LongAdder;
036import org.apache.commons.lang3.StringUtils;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.hbase.CallQueueTooBigException;
039import org.apache.hadoop.hbase.CellScanner;
040import org.apache.hadoop.hbase.DoNotRetryIOException;
041import org.apache.hadoop.hbase.HConstants;
042import org.apache.hadoop.hbase.Server;
043import org.apache.hadoop.hbase.conf.ConfigurationObserver;
044import org.apache.hadoop.hbase.io.ByteBuffAllocator;
045import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
046import org.apache.hadoop.hbase.monitoring.TaskMonitor;
047import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
048import org.apache.hadoop.hbase.namequeues.RpcLogDetails;
049import org.apache.hadoop.hbase.regionserver.RSRpcServices;
050import org.apache.hadoop.hbase.security.HBasePolicyProvider;
051import org.apache.hadoop.hbase.security.SaslUtil;
052import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
053import org.apache.hadoop.hbase.security.User;
054import org.apache.hadoop.hbase.security.UserProvider;
055import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
056import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
057import org.apache.hadoop.hbase.util.GsonUtil;
058import org.apache.hadoop.hbase.util.Pair;
059import org.apache.hadoop.security.UserGroupInformation;
060import org.apache.hadoop.security.authorize.AuthorizationException;
061import org.apache.hadoop.security.authorize.PolicyProvider;
062import org.apache.hadoop.security.authorize.ProxyUsers;
063import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
064import org.apache.hadoop.security.token.SecretManager;
065import org.apache.hadoop.security.token.TokenIdentifier;
066import org.apache.yetus.audience.InterfaceAudience;
067import org.slf4j.Logger;
068import org.slf4j.LoggerFactory;
069
070import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
071import org.apache.hbase.thirdparty.com.google.gson.Gson;
072import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
073import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
074import org.apache.hbase.thirdparty.com.google.protobuf.Message;
075import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
076import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
077
078import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
079import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
080import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
081
082/**
083 * An RPC server that hosts protobuf described Services.
084 */
085@InterfaceAudience.Private
086public abstract class RpcServer implements RpcServerInterface, ConfigurationObserver {
087  // LOG is being used in CallRunner and the log level is being changed in tests
088  public static final Logger LOG = LoggerFactory.getLogger(RpcServer.class);
089  protected static final CallQueueTooBigException CALL_QUEUE_TOO_BIG_EXCEPTION =
090    new CallQueueTooBigException();
091
092  private static final String MULTI_GETS = "multi.gets";
093  private static final String MULTI_MUTATIONS = "multi.mutations";
094  private static final String MULTI_SERVICE_CALLS = "multi.service_calls";
095
096  private final boolean authorize;
097  private volatile boolean isOnlineLogProviderEnabled;
098  protected boolean isSecurityEnabled;
099
100  public static final byte CURRENT_VERSION = 0;
101
102  /**
103   * Whether we allow a fallback to SIMPLE auth for insecure clients when security is enabled.
104   */
105  public static final String FALLBACK_TO_INSECURE_CLIENT_AUTH =
106    "hbase.ipc.server.fallback-to-simple-auth-allowed";
107
108  /**
109   * How many calls/handler are allowed in the queue.
110   */
111  protected static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10;
112
113  protected final CellBlockBuilder cellBlockBuilder;
114
115  protected static final String AUTH_FAILED_FOR = "Auth failed for ";
116  protected static final String AUTH_SUCCESSFUL_FOR = "Auth successful for ";
117  protected static final Logger AUDITLOG =
118    LoggerFactory.getLogger("SecurityLogger." + Server.class.getName());
119  protected SecretManager<TokenIdentifier> secretManager;
120  protected final Map<String, String> saslProps;
121  protected final String serverPrincipal;
122
123  protected ServiceAuthorizationManager authManager;
124
125  /**
126   * This is set to Call object before Handler invokes an RPC and ybdie after the call returns.
127   */
128  protected static final ThreadLocal<RpcCall> CurCall = new ThreadLocal<>();
129
130  /** Keeps MonitoredRPCHandler per handler thread. */
131  protected static final ThreadLocal<MonitoredRPCHandler> MONITORED_RPC = new ThreadLocal<>();
132
133  protected final InetSocketAddress bindAddress;
134
135  protected MetricsHBaseServer metrics;
136
137  protected final Configuration conf;
138
139  /**
140   * Maximum size in bytes of the currently queued and running Calls. If a new Call puts us over
141   * this size, then we will reject the call (after parsing it though). It will go back to the
142   * client and client will retry. Set this size with "hbase.ipc.server.max.callqueue.size". The
143   * call queue size gets incremented after we parse a call and before we add it to the queue of
144   * calls for the scheduler to use. It get decremented after we have 'run' the Call. The current
145   * size is kept in {@link #callQueueSizeInBytes}.
146   * @see #callQueueSizeInBytes
147   * @see #DEFAULT_MAX_CALLQUEUE_SIZE
148   */
149  protected final long maxQueueSizeInBytes;
150  protected static final int DEFAULT_MAX_CALLQUEUE_SIZE = 1024 * 1024 * 1024;
151
152  /**
153   * This is a running count of the size in bytes of all outstanding calls whether currently
154   * executing or queued waiting to be run.
155   */
156  protected final LongAdder callQueueSizeInBytes = new LongAdder();
157
158  protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
159  protected final boolean tcpKeepAlive; // if T then use keepalives
160
161  /**
162   * This flag is used to indicate to sub threads when they should go down. When we call
163   * {@link #start()}, all threads started will consult this flag on whether they should keep going.
164   * It is set to false when {@link #stop()} is called.
165   */
166  volatile boolean running = true;
167
168  /**
169   * This flag is set to true after all threads are up and 'running' and the server is then opened
170   * for business by the call to {@link #start()}.
171   */
172  volatile boolean started = false;
173
174  protected AuthenticationTokenSecretManager authTokenSecretMgr = null;
175
176  protected HBaseRPCErrorHandler errorHandler = null;
177
178  public static final String MAX_REQUEST_SIZE = "hbase.ipc.max.request.size";
179
180  protected static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time";
181  protected static final String WARN_RESPONSE_SIZE = "hbase.ipc.warn.response.size";
182  protected static final String WARN_SCAN_RESPONSE_TIME = "hbase.ipc.warn.response.time.scan";
183  protected static final String WARN_SCAN_RESPONSE_SIZE = "hbase.ipc.warn.response.size.scan";
184
185  /**
186   * Minimum allowable timeout (in milliseconds) in rpc request's header. This configuration exists
187   * to prevent the rpc service regarding this request as timeout immediately.
188   */
189  protected static final String MIN_CLIENT_REQUEST_TIMEOUT = "hbase.ipc.min.client.request.timeout";
190  protected static final int DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT = 20;
191
192  /** Default value for above params */
193  public static final int DEFAULT_MAX_REQUEST_SIZE = DEFAULT_MAX_CALLQUEUE_SIZE / 4; // 256M
194  protected static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds
195  protected static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
196
197  protected static final int DEFAULT_TRACE_LOG_MAX_LENGTH = 1000;
198  protected static final String TRACE_LOG_MAX_LENGTH = "hbase.ipc.trace.log.max.length";
199  protected static final String KEY_WORD_TRUNCATED = " <TRUNCATED>";
200
201  protected static final Gson GSON = GsonUtil.createGsonWithDisableHtmlEscaping().create();
202
203  protected final int maxRequestSize;
204  protected volatile int warnResponseTime;
205  protected volatile int warnResponseSize;
206  protected volatile int warnScanResponseTime;
207  protected volatile int warnScanResponseSize;
208
209  protected final int minClientRequestTimeout;
210
211  protected final Server server;
212  protected final List<BlockingServiceAndInterface> services;
213
214  protected final RpcScheduler scheduler;
215
216  protected final UserProvider userProvider;
217
218  protected final ByteBuffAllocator bbAllocator;
219
220  protected volatile boolean allowFallbackToSimpleAuth;
221
222  /**
223   * Used to get details for scan with a scanner_id<br/>
224   * TODO try to figure out a better way and remove reference from regionserver package later.
225   */
226  private RSRpcServices rsRpcServices;
227
228  /**
229   * Use to add online slowlog responses
230   */
231  private NamedQueueRecorder namedQueueRecorder;
232
233  @FunctionalInterface
234  protected interface CallCleanup {
235    void run();
236  }
237
238  /**
239   * Datastructure for passing a {@link BlockingService} and its associated class of protobuf
240   * service interface. For example, a server that fielded what is defined in the client protobuf
241   * service would pass in an implementation of the client blocking service and then its
242   * ClientService.BlockingInterface.class. Used checking connection setup.
243   */
244  public static class BlockingServiceAndInterface {
245    private final BlockingService service;
246    private final Class<?> serviceInterface;
247
248    public BlockingServiceAndInterface(final BlockingService service,
249      final Class<?> serviceInterface) {
250      this.service = service;
251      this.serviceInterface = serviceInterface;
252    }
253
254    public Class<?> getServiceInterface() {
255      return this.serviceInterface;
256    }
257
258    public BlockingService getBlockingService() {
259      return this.service;
260    }
261  }
262
263  /**
264   * Constructs a server listening on the named port and address.
265   * @param server           hosting instance of {@link Server}. We will do authentications if an
266   *                         instance else pass null for no authentication check.
267   * @param name             Used keying this rpc servers' metrics and for naming the Listener
268   *                         thread.
269   * @param services         A list of services.
270   * @param bindAddress      Where to listen
271   * @param reservoirEnabled Enable ByteBufferPool or not.
272   */
273  public RpcServer(final Server server, final String name,
274    final List<BlockingServiceAndInterface> services, final InetSocketAddress bindAddress,
275    Configuration conf, RpcScheduler scheduler, boolean reservoirEnabled) throws IOException {
276    this.bbAllocator = ByteBuffAllocator.create(conf, reservoirEnabled);
277    this.server = server;
278    this.services = services;
279    this.bindAddress = bindAddress;
280    this.conf = conf;
281    // See declaration above for documentation on what this size is.
282    this.maxQueueSizeInBytes =
283      this.conf.getLong("hbase.ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE);
284
285    this.warnResponseTime = getWarnResponseTime(conf);
286    this.warnResponseSize = getWarnResponseSize(conf);
287    this.warnScanResponseTime = getWarnScanResponseTime(conf);
288    this.warnScanResponseSize = getWarnScanResponseSize(conf);
289    this.minClientRequestTimeout =
290      conf.getInt(MIN_CLIENT_REQUEST_TIMEOUT, DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT);
291    this.maxRequestSize = conf.getInt(MAX_REQUEST_SIZE, DEFAULT_MAX_REQUEST_SIZE);
292
293    this.metrics = new MetricsHBaseServer(name, new MetricsHBaseServerWrapperImpl(this));
294    this.tcpNoDelay = conf.getBoolean("hbase.ipc.server.tcpnodelay", true);
295    this.tcpKeepAlive = conf.getBoolean("hbase.ipc.server.tcpkeepalive", true);
296
297    this.cellBlockBuilder = new CellBlockBuilder(conf);
298
299    this.authorize = conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false);
300    this.userProvider = UserProvider.instantiate(conf);
301    this.isSecurityEnabled = userProvider.isHBaseSecurityEnabled();
302    if (isSecurityEnabled) {
303      saslProps = SaslUtil.initSaslProperties(conf.get("hbase.rpc.protection",
304        QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)));
305      serverPrincipal = Preconditions.checkNotNull(userProvider.getCurrentUserName(),
306        "can not get current user name when security is enabled");
307    } else {
308      saslProps = Collections.emptyMap();
309      serverPrincipal = HConstants.EMPTY_STRING;
310    }
311
312    this.isOnlineLogProviderEnabled = getIsOnlineLogProviderEnabled(conf);
313    this.scheduler = scheduler;
314  }
315
316  @Override
317  public void onConfigurationChange(Configuration newConf) {
318    initReconfigurable(newConf);
319    if (scheduler instanceof ConfigurationObserver) {
320      ((ConfigurationObserver) scheduler).onConfigurationChange(newConf);
321    }
322    if (authorize) {
323      refreshAuthManager(newConf, new HBasePolicyProvider());
324    }
325    refreshSlowLogConfiguration(newConf);
326  }
327
328  private void refreshSlowLogConfiguration(Configuration newConf) {
329    boolean newIsOnlineLogProviderEnabled = getIsOnlineLogProviderEnabled(newConf);
330    if (isOnlineLogProviderEnabled != newIsOnlineLogProviderEnabled) {
331      isOnlineLogProviderEnabled = newIsOnlineLogProviderEnabled;
332    }
333    int newWarnResponseTime = getWarnResponseTime(newConf);
334    if (warnResponseTime != newWarnResponseTime) {
335      warnResponseTime = newWarnResponseTime;
336    }
337    int newWarnResponseSize = getWarnResponseSize(newConf);
338    if (warnResponseSize != newWarnResponseSize) {
339      warnResponseSize = newWarnResponseSize;
340    }
341    int newWarnResponseTimeScan = getWarnScanResponseTime(newConf);
342    if (warnScanResponseTime != newWarnResponseTimeScan) {
343      warnScanResponseTime = newWarnResponseTimeScan;
344    }
345    int newWarnScanResponseSize = getWarnScanResponseSize(newConf);
346    if (warnScanResponseSize != newWarnScanResponseSize) {
347      warnScanResponseSize = newWarnScanResponseSize;
348    }
349  }
350
351  private static boolean getIsOnlineLogProviderEnabled(Configuration conf) {
352    return conf.getBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY,
353      HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED);
354  }
355
356  private static int getWarnResponseTime(Configuration conf) {
357    return conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME);
358  }
359
360  private static int getWarnResponseSize(Configuration conf) {
361    return conf.getInt(WARN_RESPONSE_SIZE, DEFAULT_WARN_RESPONSE_SIZE);
362  }
363
364  private static int getWarnScanResponseTime(Configuration conf) {
365    return conf.getInt(WARN_SCAN_RESPONSE_TIME, getWarnResponseTime(conf));
366  }
367
368  private static int getWarnScanResponseSize(Configuration conf) {
369    return conf.getInt(WARN_SCAN_RESPONSE_SIZE, getWarnResponseSize(conf));
370  }
371
372  protected void initReconfigurable(Configuration confToLoad) {
373    this.allowFallbackToSimpleAuth = confToLoad.getBoolean(FALLBACK_TO_INSECURE_CLIENT_AUTH, false);
374    if (isSecurityEnabled && allowFallbackToSimpleAuth) {
375      LOG.warn("********* WARNING! *********");
376      LOG.warn("This server is configured to allow connections from INSECURE clients");
377      LOG.warn("(" + FALLBACK_TO_INSECURE_CLIENT_AUTH + " = true).");
378      LOG.warn("While this option is enabled, client identities cannot be secured, and user");
379      LOG.warn("impersonation is possible!");
380      LOG.warn("For secure operation, please disable SIMPLE authentication as soon as possible,");
381      LOG.warn("by setting " + FALLBACK_TO_INSECURE_CLIENT_AUTH + " = false in hbase-site.xml");
382      LOG.warn("****************************");
383    }
384  }
385
386  Configuration getConf() {
387    return conf;
388  }
389
390  @Override
391  public boolean isStarted() {
392    return this.started;
393  }
394
395  @Override
396  public synchronized void refreshAuthManager(Configuration conf, PolicyProvider pp) {
397    // Ignore warnings that this should be accessed in a static way instead of via an instance;
398    // it'll break if you go via static route.
399    System.setProperty("hadoop.policy.file", "hbase-policy.xml");
400    this.authManager.refresh(conf, pp);
401    LOG.info("Refreshed hbase-policy.xml successfully");
402    ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
403    LOG.info("Refreshed super and proxy users successfully");
404  }
405
406  protected AuthenticationTokenSecretManager createSecretManager() {
407    if (!isSecurityEnabled) return null;
408    if (server == null) return null;
409    Configuration conf = server.getConfiguration();
410    long keyUpdateInterval = conf.getLong("hbase.auth.key.update.interval", 24 * 60 * 60 * 1000);
411    long maxAge = conf.getLong("hbase.auth.token.max.lifetime", 7 * 24 * 60 * 60 * 1000);
412    return new AuthenticationTokenSecretManager(conf, server.getZooKeeper(),
413      server.getServerName().toString(), keyUpdateInterval, maxAge);
414  }
415
416  public SecretManager<? extends TokenIdentifier> getSecretManager() {
417    return this.secretManager;
418  }
419
420  @SuppressWarnings("unchecked")
421  public void setSecretManager(SecretManager<? extends TokenIdentifier> secretManager) {
422    this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
423  }
424
425  /**
426   * This is a server side method, which is invoked over RPC. On success the return response has
427   * protobuf response payload. On failure, the exception name and the stack trace are returned in
428   * the protobuf response.
429   */
430  @Override
431  public Pair<Message, CellScanner> call(RpcCall call, MonitoredRPCHandler status)
432    throws IOException {
433    try {
434      MethodDescriptor md = call.getMethod();
435      Message param = call.getParam();
436      status.setRPC(md.getName(), new Object[] { param }, call.getReceiveTime());
437      // TODO: Review after we add in encoded data blocks.
438      status.setRPCPacket(param);
439      status.resume("Servicing call");
440      // get an instance of the method arg type
441      HBaseRpcController controller = new HBaseRpcControllerImpl(call.getCellScanner());
442      controller.setCallTimeout(call.getTimeout());
443      Message result = call.getService().callBlockingMethod(md, controller, param);
444      long receiveTime = call.getReceiveTime();
445      long startTime = call.getStartTime();
446      long endTime = EnvironmentEdgeManager.currentTime();
447      int processingTime = (int) (endTime - startTime);
448      int qTime = (int) (startTime - receiveTime);
449      int totalTime = (int) (endTime - receiveTime);
450      if (LOG.isTraceEnabled()) {
451        LOG.trace(
452          "{}, response: {}, receiveTime: {}, queueTime: {}, processingTime: {}, "
453            + "totalTime: {}, fsReadTime: {}",
454          CurCall.get().toString(), TextFormat.shortDebugString(result),
455          CurCall.get().getReceiveTime(), qTime, processingTime, totalTime,
456          CurCall.get().getFsReadTime());
457      }
458      // Use the raw request call size for now.
459      long requestSize = call.getSize();
460      long responseSize = result.getSerializedSize();
461      long responseBlockSize = call.getBlockBytesScanned();
462      long fsReadTime = call.getFsReadTime();
463      if (call.isClientCellBlockSupported()) {
464        // Include the payload size in HBaseRpcController
465        responseSize += call.getResponseCellSize();
466      }
467
468      metrics.dequeuedCall(qTime);
469      metrics.processedCall(processingTime);
470      metrics.totalCall(totalTime);
471      metrics.receivedRequest(requestSize);
472      metrics.sentResponse(responseSize);
473      // log any RPC responses that are slower than the configured warn
474      // response time or larger than configured warning size
475      boolean tooSlow = isTooSlow(call, processingTime);
476      boolean tooLarge = isTooLarge(call, responseSize, responseBlockSize);
477      if (tooSlow || tooLarge) {
478        final String userName = call.getRequestUserName().orElse(StringUtils.EMPTY);
479        // when tagging, we let TooLarge trump TooSmall to keep output simple
480        // note that large responses will often also be slow.
481        logResponse(param, md.getName(), md.getName() + "(" + param.getClass().getName() + ")",
482          tooLarge, tooSlow, status.getClient(), startTime, processingTime, qTime, responseSize,
483          responseBlockSize, fsReadTime, userName);
484        if (this.namedQueueRecorder != null && this.isOnlineLogProviderEnabled) {
485          // send logs to ring buffer owned by slowLogRecorder
486          final String className =
487            server == null ? StringUtils.EMPTY : server.getClass().getSimpleName();
488          this.namedQueueRecorder.addRecord(new RpcLogDetails(call, param, status.getClient(),
489            responseSize, responseBlockSize, fsReadTime, className, tooSlow, tooLarge));
490        }
491      }
492      return new Pair<>(result, controller.cellScanner());
493    } catch (Throwable e) {
494      // The above callBlockingMethod will always return a SE. Strip the SE wrapper before
495      // putting it on the wire. Its needed to adhere to the pb Service Interface but we don't
496      // need to pass it over the wire.
497      if (e instanceof ServiceException) {
498        if (e.getCause() == null) {
499          LOG.debug("Caught a ServiceException with null cause", e);
500        } else {
501          e = e.getCause();
502        }
503      }
504
505      // increment the number of requests that were exceptions.
506      metrics.exception(e);
507
508      if (e instanceof LinkageError) throw new DoNotRetryIOException(e);
509      if (e instanceof IOException) throw (IOException) e;
510      LOG.error("Unexpected throwable object ", e);
511      throw new IOException(e.getMessage(), e);
512    }
513  }
514
515  /**
516   * Logs an RPC response to the LOG file, producing valid JSON objects for client Operations.
517   * @param param             The parameters received in the call.
518   * @param methodName        The name of the method invoked
519   * @param call              The string representation of the call
520   * @param tooLarge          To indicate if the event is tooLarge
521   * @param tooSlow           To indicate if the event is tooSlow
522   * @param clientAddress     The address of the client who made this call.
523   * @param startTime         The time that the call was initiated, in ms.
524   * @param processingTime    The duration that the call took to run, in ms.
525   * @param qTime             The duration that the call spent on the queue prior to being
526   *                          initiated, in ms.
527   * @param responseSize      The size in bytes of the response buffer.
528   * @param blockBytesScanned The size of block bytes scanned to retrieve the response.
529   * @param userName          UserName of the current RPC Call
530   */
531  void logResponse(Message param, String methodName, String call, boolean tooLarge, boolean tooSlow,
532    String clientAddress, long startTime, int processingTime, int qTime, long responseSize,
533    long blockBytesScanned, long fsReadTime, String userName) {
534    final String className = server == null ? StringUtils.EMPTY : server.getClass().getSimpleName();
535    // base information that is reported regardless of type of call
536    Map<String, Object> responseInfo = new HashMap<>();
537    responseInfo.put("starttimems", startTime);
538    responseInfo.put("processingtimems", processingTime);
539    responseInfo.put("queuetimems", qTime);
540    responseInfo.put("responsesize", responseSize);
541    responseInfo.put("blockbytesscanned", blockBytesScanned);
542    responseInfo.put("fsreadtime", fsReadTime);
543    responseInfo.put("client", clientAddress);
544    responseInfo.put("class", className);
545    responseInfo.put("method", methodName);
546    responseInfo.put("call", call);
547    // The params could be really big, make sure they don't kill us at WARN
548    String stringifiedParam = ProtobufUtil.getShortTextFormat(param);
549    if (stringifiedParam.length() > 150) {
550      // Truncate to 1000 chars if TRACE is on, else to 150 chars
551      stringifiedParam = truncateTraceLog(stringifiedParam);
552    }
553    responseInfo.put("param", stringifiedParam);
554    if (param instanceof ClientProtos.ScanRequest && rsRpcServices != null) {
555      ClientProtos.ScanRequest request = ((ClientProtos.ScanRequest) param);
556      String scanDetails;
557      if (request.hasScannerId()) {
558        long scannerId = request.getScannerId();
559        scanDetails = rsRpcServices.getScanDetailsWithId(scannerId);
560      } else {
561        scanDetails = rsRpcServices.getScanDetailsWithRequest(request);
562      }
563      if (scanDetails != null) {
564        responseInfo.put("scandetails", scanDetails);
565      }
566    }
567    if (param instanceof ClientProtos.MultiRequest) {
568      int numGets = 0;
569      int numMutations = 0;
570      int numServiceCalls = 0;
571      ClientProtos.MultiRequest multi = (ClientProtos.MultiRequest) param;
572      for (ClientProtos.RegionAction regionAction : multi.getRegionActionList()) {
573        for (ClientProtos.Action action : regionAction.getActionList()) {
574          if (action.hasMutation()) {
575            numMutations++;
576          }
577          if (action.hasGet()) {
578            numGets++;
579          }
580          if (action.hasServiceCall()) {
581            numServiceCalls++;
582          }
583        }
584      }
585      responseInfo.put(MULTI_GETS, numGets);
586      responseInfo.put(MULTI_MUTATIONS, numMutations);
587      responseInfo.put(MULTI_SERVICE_CALLS, numServiceCalls);
588    }
589    final String tag =
590      (tooLarge && tooSlow) ? "TooLarge & TooSlow" : (tooSlow ? "TooSlow" : "TooLarge");
591    LOG.warn("(response" + tag + "): " + GSON.toJson(responseInfo));
592  }
593
594  private boolean isTooSlow(RpcCall call, int processingTime) {
595    long warnResponseTime = call.getParam() instanceof ClientProtos.ScanRequest
596      ? warnScanResponseTime
597      : this.warnResponseTime;
598    return (processingTime > warnResponseTime && warnResponseTime > -1);
599  }
600
601  private boolean isTooLarge(RpcCall call, long responseSize, long responseBlockSize) {
602    long warnResponseSize = call.getParam() instanceof ClientProtos.ScanRequest
603      ? warnScanResponseSize
604      : this.warnResponseSize;
605    return (warnResponseSize > -1
606      && (responseSize > warnResponseSize || responseBlockSize > warnResponseSize));
607  }
608
609  /**
610   * Truncate to number of chars decided by conf hbase.ipc.trace.log.max.length if TRACE is on else
611   * to 150 chars Refer to Jira HBASE-20826 and HBASE-20942
612   * @param strParam stringifiedParam to be truncated
613   * @return truncated trace log string
614   */
615  String truncateTraceLog(String strParam) {
616    if (LOG.isTraceEnabled()) {
617      int traceLogMaxLength = getConf().getInt(TRACE_LOG_MAX_LENGTH, DEFAULT_TRACE_LOG_MAX_LENGTH);
618      int truncatedLength =
619        strParam.length() < traceLogMaxLength ? strParam.length() : traceLogMaxLength;
620      String truncatedFlag = truncatedLength == strParam.length() ? "" : KEY_WORD_TRUNCATED;
621      return strParam.subSequence(0, truncatedLength) + truncatedFlag;
622    }
623    return strParam.subSequence(0, 150) + KEY_WORD_TRUNCATED;
624  }
625
626  /**
627   * Set the handler for calling out of RPC for error conditions.
628   * @param handler the handler implementation
629   */
630  @Override
631  public void setErrorHandler(HBaseRPCErrorHandler handler) {
632    this.errorHandler = handler;
633  }
634
635  @Override
636  public HBaseRPCErrorHandler getErrorHandler() {
637    return this.errorHandler;
638  }
639
640  /**
641   * Returns the metrics instance for reporting RPC call statistics
642   */
643  @Override
644  public MetricsHBaseServer getMetrics() {
645    return metrics;
646  }
647
648  @Override
649  public void addCallSize(final long diff) {
650    this.callQueueSizeInBytes.add(diff);
651  }
652
653  /**
654   * Authorize the incoming client connection.
655   * @param user       client user
656   * @param connection incoming connection
657   * @param addr       InetAddress of incoming connection
658   * @throws AuthorizationException when the client isn't authorized to talk the protocol
659   */
660  public synchronized void authorize(UserGroupInformation user, ConnectionHeader connection,
661    InetAddress addr) throws AuthorizationException {
662    if (authorize) {
663      Class<?> c = getServiceInterface(services, connection.getServiceName());
664      authManager.authorize(user, c, getConf(), addr);
665    }
666  }
667
668  /**
669   * When the read or write buffer size is larger than this limit, i/o will be done in chunks of
670   * this size. Most RPC requests and responses would be be smaller.
671   */
672  protected static final int NIO_BUFFER_LIMIT = 64 * 1024; // should not be more than 64KB.
673
674  /**
675   * This is a wrapper around
676   * {@link java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)}. If the amount of data
677   * is large, it writes to channel in smaller chunks. This is to avoid jdk from creating many
678   * direct buffers as the size of ByteBuffer increases. There should not be any performance
679   * degredation.
680   * @param channel writable byte channel to write on
681   * @param buffer  buffer to write
682   * @return number of bytes written
683   * @throws java.io.IOException e
684   * @see java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)
685   */
686  protected int channelRead(ReadableByteChannel channel, ByteBuffer buffer) throws IOException {
687
688    int count = (buffer.remaining() <= NIO_BUFFER_LIMIT)
689      ? channel.read(buffer)
690      : channelIO(channel, null, buffer);
691    if (count > 0) {
692      metrics.receivedBytes(count);
693    }
694    return count;
695  }
696
697  /**
698   * Helper for {@link #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)}.
699   * Only one of readCh or writeCh should be non-null.
700   * @param readCh  read channel
701   * @param writeCh write channel
702   * @param buf     buffer to read or write into/out of
703   * @return bytes written
704   * @throws java.io.IOException e
705   * @see #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)
706   */
707  private static int channelIO(ReadableByteChannel readCh, WritableByteChannel writeCh,
708    ByteBuffer buf) throws IOException {
709
710    int originalLimit = buf.limit();
711    int initialRemaining = buf.remaining();
712    int ret = 0;
713
714    while (buf.remaining() > 0) {
715      try {
716        int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
717        buf.limit(buf.position() + ioSize);
718
719        ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf);
720
721        if (ret < ioSize) {
722          break;
723        }
724
725      } finally {
726        buf.limit(originalLimit);
727      }
728    }
729
730    int nBytes = initialRemaining - buf.remaining();
731    return (nBytes > 0) ? nBytes : ret;
732  }
733
734  /**
735   * Needed for features such as delayed calls. We need to be able to store the current call so that
736   * we can complete it later or ask questions of what is supported by the current ongoing call.
737   * @return An RpcCallContext backed by the currently ongoing call (gotten from a thread local)
738   */
739  public static Optional<RpcCall> getCurrentCall() {
740    return Optional.ofNullable(CurCall.get());
741  }
742
743  /**
744   * Just return the current rpc call if it is a {@link ServerCall} and also has {@link CellScanner}
745   * attached.
746   * <p/>
747   * Mainly used for reference counting as {@link CellScanner} may reference non heap memory.
748   */
749  public static Optional<ServerCall<?>> getCurrentServerCallWithCellScanner() {
750    return getCurrentCall().filter(c -> c instanceof ServerCall)
751      .filter(c -> c.getCellScanner() != null).map(c -> (ServerCall<?>) c);
752  }
753
754  public static boolean isInRpcCallContext() {
755    return CurCall.get() != null;
756  }
757
758  /**
759   * Used by {@link org.apache.hadoop.hbase.procedure2.store.region.RegionProcedureStore}. For
760   * master's rpc call, it may generate new procedure and mutate the region which store procedure.
761   * There are some check about rpc when mutate region, such as rpc timeout check. So unset the rpc
762   * call to avoid the rpc check.
763   * @return the currently ongoing rpc call
764   */
765  public static Optional<RpcCall> unsetCurrentCall() {
766    Optional<RpcCall> rpcCall = getCurrentCall();
767    CurCall.set(null);
768    return rpcCall;
769  }
770
771  /**
772   * Used by {@link org.apache.hadoop.hbase.procedure2.store.region.RegionProcedureStore}. Set the
773   * rpc call back after mutate region.
774   */
775  public static void setCurrentCall(RpcCall rpcCall) {
776    CurCall.set(rpcCall);
777  }
778
779  /**
780   * Returns the user credentials associated with the current RPC request or not present if no
781   * credentials were provided.
782   * @return A User
783   */
784  public static Optional<User> getRequestUser() {
785    Optional<RpcCall> ctx = getCurrentCall();
786    return ctx.isPresent() ? ctx.get().getRequestUser() : Optional.empty();
787  }
788
789  /**
790   * The number of open RPC conections
791   * @return the number of open rpc connections
792   */
793  abstract public int getNumOpenConnections();
794
795  /**
796   * Returns the username for any user associated with the current RPC request or not present if no
797   * user is set.
798   */
799  public static Optional<String> getRequestUserName() {
800    return getRequestUser().map(User::getShortName);
801  }
802
803  /**
804   * Returns the address of the remote client associated with the current RPC request or not present
805   * if no address is set.
806   */
807  public static Optional<InetAddress> getRemoteAddress() {
808    return getCurrentCall().map(RpcCall::getRemoteAddress);
809  }
810
811  /**
812   * @param serviceName Some arbitrary string that represents a 'service'.
813   * @param services    Available service instances
814   * @return Matching BlockingServiceAndInterface pair
815   */
816  protected static BlockingServiceAndInterface getServiceAndInterface(
817    final List<BlockingServiceAndInterface> services, final String serviceName) {
818    for (BlockingServiceAndInterface bs : services) {
819      if (bs.getBlockingService().getDescriptorForType().getName().equals(serviceName)) {
820        return bs;
821      }
822    }
823    return null;
824  }
825
826  /**
827   * @param serviceName Some arbitrary string that represents a 'service'.
828   * @param services    Available services and their service interfaces.
829   * @return Service interface class for <code>serviceName</code>
830   */
831  protected static Class<?> getServiceInterface(final List<BlockingServiceAndInterface> services,
832    final String serviceName) {
833    BlockingServiceAndInterface bsasi = getServiceAndInterface(services, serviceName);
834    return bsasi == null ? null : bsasi.getServiceInterface();
835  }
836
837  /**
838   * @param serviceName Some arbitrary string that represents a 'service'.
839   * @param services    Available services and their service interfaces.
840   * @return BlockingService that goes with the passed <code>serviceName</code>
841   */
842  protected static BlockingService getService(final List<BlockingServiceAndInterface> services,
843    final String serviceName) {
844    BlockingServiceAndInterface bsasi = getServiceAndInterface(services, serviceName);
845    return bsasi == null ? null : bsasi.getBlockingService();
846  }
847
848  protected static MonitoredRPCHandler getStatus() {
849    // It is ugly the way we park status up in RpcServer. Let it be for now. TODO.
850    MonitoredRPCHandler status = RpcServer.MONITORED_RPC.get();
851    if (status != null) {
852      return status;
853    }
854    status = TaskMonitor.get().createRPCStatus(Thread.currentThread().getName());
855    status.pause("Waiting for a call");
856    RpcServer.MONITORED_RPC.set(status);
857    return status;
858  }
859
860  /**
861   * Returns the remote side ip address when invoked inside an RPC Returns null incase of an error.
862   */
863  public static InetAddress getRemoteIp() {
864    RpcCall call = CurCall.get();
865    if (call != null) {
866      return call.getRemoteAddress();
867    }
868    return null;
869  }
870
871  @Override
872  public RpcScheduler getScheduler() {
873    return scheduler;
874  }
875
876  @Override
877  public ByteBuffAllocator getByteBuffAllocator() {
878    return this.bbAllocator;
879  }
880
881  @Override
882  public void setRsRpcServices(RSRpcServices rsRpcServices) {
883    this.rsRpcServices = rsRpcServices;
884  }
885
886  @Override
887  public void setNamedQueueRecorder(NamedQueueRecorder namedQueueRecorder) {
888    this.namedQueueRecorder = namedQueueRecorder;
889  }
890
891  protected boolean needAuthorization() {
892    return authorize;
893  }
894
895  @RestrictedApi(explanation = "Should only be called in tests", link = "",
896      allowedOnPath = ".*/src/test/.*")
897  public List<BlockingServiceAndInterface> getServices() {
898    return services;
899  }
900}