001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION;
021
022import com.google.errorprone.annotations.RestrictedApi;
023import java.io.IOException;
024import java.net.InetAddress;
025import java.net.InetSocketAddress;
026import java.nio.ByteBuffer;
027import java.nio.channels.ReadableByteChannel;
028import java.nio.channels.WritableByteChannel;
029import java.util.Collections;
030import java.util.HashMap;
031import java.util.List;
032import java.util.Locale;
033import java.util.Map;
034import java.util.Optional;
035import java.util.concurrent.atomic.LongAdder;
036import org.apache.commons.lang3.StringUtils;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.hbase.CallQueueTooBigException;
039import org.apache.hadoop.hbase.CellScanner;
040import org.apache.hadoop.hbase.DoNotRetryIOException;
041import org.apache.hadoop.hbase.ExtendedCellScanner;
042import org.apache.hadoop.hbase.HConstants;
043import org.apache.hadoop.hbase.Server;
044import org.apache.hadoop.hbase.conf.ConfigurationObserver;
045import org.apache.hadoop.hbase.io.ByteBuffAllocator;
046import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
047import org.apache.hadoop.hbase.monitoring.TaskMonitor;
048import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
049import org.apache.hadoop.hbase.namequeues.RpcLogDetails;
050import org.apache.hadoop.hbase.regionserver.RSRpcServices;
051import org.apache.hadoop.hbase.security.HBasePolicyProvider;
052import org.apache.hadoop.hbase.security.SaslUtil;
053import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
054import org.apache.hadoop.hbase.security.User;
055import org.apache.hadoop.hbase.security.UserProvider;
056import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
057import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
058import org.apache.hadoop.hbase.util.GsonUtil;
059import org.apache.hadoop.hbase.util.Pair;
060import org.apache.hadoop.security.UserGroupInformation;
061import org.apache.hadoop.security.authorize.AuthorizationException;
062import org.apache.hadoop.security.authorize.PolicyProvider;
063import org.apache.hadoop.security.authorize.ProxyUsers;
064import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
065import org.apache.hadoop.security.token.SecretManager;
066import org.apache.hadoop.security.token.TokenIdentifier;
067import org.apache.yetus.audience.InterfaceAudience;
068import org.slf4j.Logger;
069import org.slf4j.LoggerFactory;
070
071import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
072import org.apache.hbase.thirdparty.com.google.gson.Gson;
073import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
074import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
075import org.apache.hbase.thirdparty.com.google.protobuf.Message;
076import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
077import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
078
079import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
080import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
081import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
082
083/**
084 * An RPC server that hosts protobuf described Services.
085 */
086@InterfaceAudience.Private
087public abstract class RpcServer implements RpcServerInterface, ConfigurationObserver {
088  // LOG is being used in CallRunner and the log level is being changed in tests
089  public static final Logger LOG = LoggerFactory.getLogger(RpcServer.class);
090  protected static final CallQueueTooBigException CALL_QUEUE_TOO_BIG_EXCEPTION =
091    new CallQueueTooBigException();
092
093  private static final String MULTI_GETS = "multi.gets";
094  private static final String MULTI_MUTATIONS = "multi.mutations";
095  private static final String MULTI_SERVICE_CALLS = "multi.service_calls";
096
097  private final boolean authorize;
098  private volatile boolean isOnlineLogProviderEnabled;
099  protected boolean isSecurityEnabled;
100
101  public static final byte CURRENT_VERSION = 0;
102
103  /**
104   * Whether we allow a fallback to SIMPLE auth for insecure clients when security is enabled.
105   */
106  public static final String FALLBACK_TO_INSECURE_CLIENT_AUTH =
107    "hbase.ipc.server.fallback-to-simple-auth-allowed";
108
109  /**
110   * How many calls/handler are allowed in the queue.
111   */
112  protected static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10;
113
114  protected final CellBlockBuilder cellBlockBuilder;
115
116  protected static final String AUTH_FAILED_FOR = "Auth failed for ";
117  protected static final String AUTH_SUCCESSFUL_FOR = "Auth successful for ";
118  protected static final Logger AUDITLOG =
119    LoggerFactory.getLogger("SecurityLogger." + Server.class.getName());
120  protected SecretManager<TokenIdentifier> secretManager;
121  protected final Map<String, String> saslProps;
122  protected final String serverPrincipal;
123
124  protected ServiceAuthorizationManager authManager;
125
126  /**
127   * This is set to Call object before Handler invokes an RPC and ybdie after the call returns.
128   */
129  protected static final ThreadLocal<RpcCall> CurCall = new ThreadLocal<>();
130
131  /** Keeps MonitoredRPCHandler per handler thread. */
132  protected static final ThreadLocal<MonitoredRPCHandler> MONITORED_RPC = new ThreadLocal<>();
133
134  protected final InetSocketAddress bindAddress;
135
136  protected MetricsHBaseServer metrics;
137
138  protected final Configuration conf;
139
140  /**
141   * Maximum size in bytes of the currently queued and running Calls. If a new Call puts us over
142   * this size, then we will reject the call (after parsing it though). It will go back to the
143   * client and client will retry. Set this size with "hbase.ipc.server.max.callqueue.size". The
144   * call queue size gets incremented after we parse a call and before we add it to the queue of
145   * calls for the scheduler to use. It get decremented after we have 'run' the Call. The current
146   * size is kept in {@link #callQueueSizeInBytes}.
147   * @see #callQueueSizeInBytes
148   * @see #DEFAULT_MAX_CALLQUEUE_SIZE
149   */
150  protected final long maxQueueSizeInBytes;
151  protected static final int DEFAULT_MAX_CALLQUEUE_SIZE = 1024 * 1024 * 1024;
152
153  /**
154   * This is a running count of the size in bytes of all outstanding calls whether currently
155   * executing or queued waiting to be run.
156   */
157  protected final LongAdder callQueueSizeInBytes = new LongAdder();
158
159  protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
160  protected final boolean tcpKeepAlive; // if T then use keepalives
161
162  /**
163   * This flag is used to indicate to sub threads when they should go down. When we call
164   * {@link #start()}, all threads started will consult this flag on whether they should keep going.
165   * It is set to false when {@link #stop()} is called.
166   */
167  volatile boolean running = true;
168
169  /**
170   * This flag is set to true after all threads are up and 'running' and the server is then opened
171   * for business by the call to {@link #start()}.
172   */
173  volatile boolean started = false;
174
175  protected AuthenticationTokenSecretManager authTokenSecretMgr = null;
176
177  protected HBaseRPCErrorHandler errorHandler = null;
178
179  public static final String MAX_REQUEST_SIZE = "hbase.ipc.max.request.size";
180
181  protected static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time";
182  protected static final String WARN_RESPONSE_SIZE = "hbase.ipc.warn.response.size";
183  protected static final String WARN_SCAN_RESPONSE_TIME = "hbase.ipc.warn.response.time.scan";
184  protected static final String WARN_SCAN_RESPONSE_SIZE = "hbase.ipc.warn.response.size.scan";
185
186  /**
187   * Minimum allowable timeout (in milliseconds) in rpc request's header. This configuration exists
188   * to prevent the rpc service regarding this request as timeout immediately.
189   */
190  protected static final String MIN_CLIENT_REQUEST_TIMEOUT = "hbase.ipc.min.client.request.timeout";
191  protected static final int DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT = 20;
192
193  /** Default value for above params */
194  public static final int DEFAULT_MAX_REQUEST_SIZE = DEFAULT_MAX_CALLQUEUE_SIZE / 4; // 256M
195  protected static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds
196  protected static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
197
198  protected static final int DEFAULT_TRACE_LOG_MAX_LENGTH = 1000;
199  protected static final String TRACE_LOG_MAX_LENGTH = "hbase.ipc.trace.log.max.length";
200  protected static final String KEY_WORD_TRUNCATED = " <TRUNCATED>";
201
202  protected static final Gson GSON = GsonUtil.createGsonWithDisableHtmlEscaping().create();
203
204  protected final int maxRequestSize;
205  protected volatile int warnResponseTime;
206  protected volatile int warnResponseSize;
207  protected volatile int warnScanResponseTime;
208  protected volatile int warnScanResponseSize;
209
210  protected final int minClientRequestTimeout;
211
212  protected final Server server;
213  protected final List<BlockingServiceAndInterface> services;
214
215  protected final RpcScheduler scheduler;
216
217  protected final UserProvider userProvider;
218
219  protected final ByteBuffAllocator bbAllocator;
220
221  protected volatile boolean allowFallbackToSimpleAuth;
222
223  /**
224   * Used to get details for scan with a scanner_id<br/>
225   * TODO try to figure out a better way and remove reference from regionserver package later.
226   */
227  private RSRpcServices rsRpcServices;
228
229  /**
230   * Use to add online slowlog responses
231   */
232  private NamedQueueRecorder namedQueueRecorder;
233
234  @FunctionalInterface
235  protected interface CallCleanup {
236    void run();
237  }
238
239  /**
240   * Datastructure for passing a {@link BlockingService} and its associated class of protobuf
241   * service interface. For example, a server that fielded what is defined in the client protobuf
242   * service would pass in an implementation of the client blocking service and then its
243   * ClientService.BlockingInterface.class. Used checking connection setup.
244   */
245  public static class BlockingServiceAndInterface {
246    private final BlockingService service;
247    private final Class<?> serviceInterface;
248
249    public BlockingServiceAndInterface(final BlockingService service,
250      final Class<?> serviceInterface) {
251      this.service = service;
252      this.serviceInterface = serviceInterface;
253    }
254
255    public Class<?> getServiceInterface() {
256      return this.serviceInterface;
257    }
258
259    public BlockingService getBlockingService() {
260      return this.service;
261    }
262  }
263
264  /**
265   * Constructs a server listening on the named port and address.
266   * @param server           hosting instance of {@link Server}. We will do authentications if an
267   *                         instance else pass null for no authentication check.
268   * @param name             Used keying this rpc servers' metrics and for naming the Listener
269   *                         thread.
270   * @param services         A list of services.
271   * @param bindAddress      Where to listen
272   * @param reservoirEnabled Enable ByteBufferPool or not.
273   */
274  public RpcServer(final Server server, final String name,
275    final List<BlockingServiceAndInterface> services, final InetSocketAddress bindAddress,
276    Configuration conf, RpcScheduler scheduler, boolean reservoirEnabled) throws IOException {
277    this.bbAllocator = ByteBuffAllocator.create(conf, reservoirEnabled);
278    this.server = server;
279    this.services = services;
280    this.bindAddress = bindAddress;
281    this.conf = conf;
282    // See declaration above for documentation on what this size is.
283    this.maxQueueSizeInBytes =
284      this.conf.getLong("hbase.ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE);
285
286    this.warnResponseTime = getWarnResponseTime(conf);
287    this.warnResponseSize = getWarnResponseSize(conf);
288    this.warnScanResponseTime = getWarnScanResponseTime(conf);
289    this.warnScanResponseSize = getWarnScanResponseSize(conf);
290    this.minClientRequestTimeout =
291      conf.getInt(MIN_CLIENT_REQUEST_TIMEOUT, DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT);
292    this.maxRequestSize = conf.getInt(MAX_REQUEST_SIZE, DEFAULT_MAX_REQUEST_SIZE);
293
294    this.metrics = new MetricsHBaseServer(name, new MetricsHBaseServerWrapperImpl(this));
295    this.tcpNoDelay = conf.getBoolean("hbase.ipc.server.tcpnodelay", true);
296    this.tcpKeepAlive = conf.getBoolean("hbase.ipc.server.tcpkeepalive", true);
297
298    this.cellBlockBuilder = new CellBlockBuilder(conf);
299
300    this.authorize = conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false);
301    this.userProvider = UserProvider.instantiate(conf);
302    this.isSecurityEnabled = userProvider.isHBaseSecurityEnabled();
303    if (isSecurityEnabled) {
304      saslProps = SaslUtil.initSaslProperties(conf.get("hbase.rpc.protection",
305        QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)));
306      serverPrincipal = Preconditions.checkNotNull(userProvider.getCurrentUserName(),
307        "can not get current user name when security is enabled");
308    } else {
309      saslProps = Collections.emptyMap();
310      serverPrincipal = HConstants.EMPTY_STRING;
311    }
312
313    this.isOnlineLogProviderEnabled = getIsOnlineLogProviderEnabled(conf);
314    this.scheduler = scheduler;
315  }
316
317  @Override
318  public void onConfigurationChange(Configuration newConf) {
319    initReconfigurable(newConf);
320    if (scheduler instanceof ConfigurationObserver) {
321      ((ConfigurationObserver) scheduler).onConfigurationChange(newConf);
322    }
323    if (authorize) {
324      refreshAuthManager(newConf, new HBasePolicyProvider());
325    }
326    refreshSlowLogConfiguration(newConf);
327  }
328
329  private void refreshSlowLogConfiguration(Configuration newConf) {
330    boolean newIsOnlineLogProviderEnabled = getIsOnlineLogProviderEnabled(newConf);
331    if (isOnlineLogProviderEnabled != newIsOnlineLogProviderEnabled) {
332      isOnlineLogProviderEnabled = newIsOnlineLogProviderEnabled;
333    }
334    int newWarnResponseTime = getWarnResponseTime(newConf);
335    if (warnResponseTime != newWarnResponseTime) {
336      warnResponseTime = newWarnResponseTime;
337    }
338    int newWarnResponseSize = getWarnResponseSize(newConf);
339    if (warnResponseSize != newWarnResponseSize) {
340      warnResponseSize = newWarnResponseSize;
341    }
342    int newWarnResponseTimeScan = getWarnScanResponseTime(newConf);
343    if (warnScanResponseTime != newWarnResponseTimeScan) {
344      warnScanResponseTime = newWarnResponseTimeScan;
345    }
346    int newWarnScanResponseSize = getWarnScanResponseSize(newConf);
347    if (warnScanResponseSize != newWarnScanResponseSize) {
348      warnScanResponseSize = newWarnScanResponseSize;
349    }
350  }
351
352  private static boolean getIsOnlineLogProviderEnabled(Configuration conf) {
353    return conf.getBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY,
354      HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED);
355  }
356
357  private static int getWarnResponseTime(Configuration conf) {
358    return conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME);
359  }
360
361  private static int getWarnResponseSize(Configuration conf) {
362    return conf.getInt(WARN_RESPONSE_SIZE, DEFAULT_WARN_RESPONSE_SIZE);
363  }
364
365  private static int getWarnScanResponseTime(Configuration conf) {
366    return conf.getInt(WARN_SCAN_RESPONSE_TIME, getWarnResponseTime(conf));
367  }
368
369  private static int getWarnScanResponseSize(Configuration conf) {
370    return conf.getInt(WARN_SCAN_RESPONSE_SIZE, getWarnResponseSize(conf));
371  }
372
373  protected void initReconfigurable(Configuration confToLoad) {
374    this.allowFallbackToSimpleAuth = confToLoad.getBoolean(FALLBACK_TO_INSECURE_CLIENT_AUTH, false);
375    if (isSecurityEnabled && allowFallbackToSimpleAuth) {
376      LOG.warn("********* WARNING! *********");
377      LOG.warn("This server is configured to allow connections from INSECURE clients");
378      LOG.warn("(" + FALLBACK_TO_INSECURE_CLIENT_AUTH + " = true).");
379      LOG.warn("While this option is enabled, client identities cannot be secured, and user");
380      LOG.warn("impersonation is possible!");
381      LOG.warn("For secure operation, please disable SIMPLE authentication as soon as possible,");
382      LOG.warn("by setting " + FALLBACK_TO_INSECURE_CLIENT_AUTH + " = false in hbase-site.xml");
383      LOG.warn("****************************");
384    }
385  }
386
387  Configuration getConf() {
388    return conf;
389  }
390
391  @Override
392  public boolean isStarted() {
393    return this.started;
394  }
395
396  @Override
397  public synchronized void refreshAuthManager(Configuration conf, PolicyProvider pp) {
398    // Ignore warnings that this should be accessed in a static way instead of via an instance;
399    // it'll break if you go via static route.
400    System.setProperty("hadoop.policy.file", "hbase-policy.xml");
401    this.authManager.refresh(conf, pp);
402    LOG.info("Refreshed hbase-policy.xml successfully");
403    ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
404    LOG.info("Refreshed super and proxy users successfully");
405  }
406
407  protected AuthenticationTokenSecretManager createSecretManager() {
408    if (!isSecurityEnabled) return null;
409    if (server == null) return null;
410    Configuration conf = server.getConfiguration();
411    long keyUpdateInterval = conf.getLong("hbase.auth.key.update.interval", 24 * 60 * 60 * 1000);
412    long maxAge = conf.getLong("hbase.auth.token.max.lifetime", 7 * 24 * 60 * 60 * 1000);
413    return new AuthenticationTokenSecretManager(conf, server.getZooKeeper(),
414      server.getServerName().toString(), keyUpdateInterval, maxAge);
415  }
416
417  public SecretManager<? extends TokenIdentifier> getSecretManager() {
418    return this.secretManager;
419  }
420
421  @SuppressWarnings("unchecked")
422  public void setSecretManager(SecretManager<? extends TokenIdentifier> secretManager) {
423    this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
424  }
425
426  /**
427   * This is a server side method, which is invoked over RPC. On success the return response has
428   * protobuf response payload. On failure, the exception name and the stack trace are returned in
429   * the protobuf response.
430   */
431  @Override
432  public Pair<Message, ExtendedCellScanner> call(RpcCall call, MonitoredRPCHandler status)
433    throws IOException {
434    try {
435      MethodDescriptor md = call.getMethod();
436      Message param = call.getParam();
437      status.setRPC(md.getName(), new Object[] { param }, call.getReceiveTime());
438      // TODO: Review after we add in encoded data blocks.
439      status.setRPCPacket(param);
440      status.resume("Servicing call");
441      // get an instance of the method arg type
442      HBaseRpcController controller = new HBaseRpcControllerImpl(call.getCellScanner());
443      controller.setCallTimeout(call.getTimeout());
444      Message result = call.getService().callBlockingMethod(md, controller, param);
445      long receiveTime = call.getReceiveTime();
446      long startTime = call.getStartTime();
447      long endTime = EnvironmentEdgeManager.currentTime();
448      int processingTime = (int) (endTime - startTime);
449      int qTime = (int) (startTime - receiveTime);
450      int totalTime = (int) (endTime - receiveTime);
451      if (LOG.isTraceEnabled()) {
452        LOG.trace(
453          "{}, response: {}, receiveTime: {}, queueTime: {}, processingTime: {}, "
454            + "totalTime: {}, fsReadTime: {}",
455          CurCall.get().toString(), TextFormat.shortDebugString(result),
456          CurCall.get().getReceiveTime(), qTime, processingTime, totalTime,
457          CurCall.get().getFsReadTime());
458      }
459      // Use the raw request call size for now.
460      long requestSize = call.getSize();
461      long responseSize = result.getSerializedSize();
462      long responseBlockSize = call.getBlockBytesScanned();
463      long fsReadTime = call.getFsReadTime();
464      if (call.isClientCellBlockSupported()) {
465        // Include the payload size in HBaseRpcController
466        responseSize += call.getResponseCellSize();
467      }
468
469      metrics.dequeuedCall(qTime);
470      metrics.processedCall(processingTime);
471      metrics.totalCall(totalTime);
472      metrics.receivedRequest(requestSize);
473      metrics.sentResponse(responseSize);
474      // log any RPC responses that are slower than the configured warn
475      // response time or larger than configured warning size
476      boolean tooSlow = isTooSlow(call, processingTime);
477      boolean tooLarge = isTooLarge(call, responseSize, responseBlockSize);
478      if (tooSlow || tooLarge) {
479        final String userName = call.getRequestUserName().orElse(StringUtils.EMPTY);
480        // when tagging, we let TooLarge trump TooSmall to keep output simple
481        // note that large responses will often also be slow.
482        logResponse(param, md.getName(), md.getName() + "(" + param.getClass().getName() + ")",
483          tooLarge, tooSlow, status.getClient(), startTime, processingTime, qTime, responseSize,
484          responseBlockSize, fsReadTime, userName);
485        if (this.namedQueueRecorder != null && this.isOnlineLogProviderEnabled) {
486          // send logs to ring buffer owned by slowLogRecorder
487          final String className =
488            server == null ? StringUtils.EMPTY : server.getClass().getSimpleName();
489          this.namedQueueRecorder.addRecord(new RpcLogDetails(call, param, status.getClient(),
490            responseSize, responseBlockSize, fsReadTime, className, tooSlow, tooLarge));
491        }
492      }
493      return new Pair<>(result, controller.cellScanner());
494    } catch (Throwable e) {
495      // The above callBlockingMethod will always return a SE. Strip the SE wrapper before
496      // putting it on the wire. Its needed to adhere to the pb Service Interface but we don't
497      // need to pass it over the wire.
498      if (e instanceof ServiceException) {
499        if (e.getCause() == null) {
500          LOG.debug("Caught a ServiceException with null cause", e);
501        } else {
502          e = e.getCause();
503        }
504      }
505
506      // increment the number of requests that were exceptions.
507      metrics.exception(e);
508
509      if (e instanceof LinkageError) throw new DoNotRetryIOException(e);
510      if (e instanceof IOException) throw (IOException) e;
511      LOG.error("Unexpected throwable object ", e);
512      throw new IOException(e.getMessage(), e);
513    }
514  }
515
516  /**
517   * Logs an RPC response to the LOG file, producing valid JSON objects for client Operations.
518   * @param param             The parameters received in the call.
519   * @param methodName        The name of the method invoked
520   * @param call              The string representation of the call
521   * @param tooLarge          To indicate if the event is tooLarge
522   * @param tooSlow           To indicate if the event is tooSlow
523   * @param clientAddress     The address of the client who made this call.
524   * @param startTime         The time that the call was initiated, in ms.
525   * @param processingTime    The duration that the call took to run, in ms.
526   * @param qTime             The duration that the call spent on the queue prior to being
527   *                          initiated, in ms.
528   * @param responseSize      The size in bytes of the response buffer.
529   * @param blockBytesScanned The size of block bytes scanned to retrieve the response.
530   * @param userName          UserName of the current RPC Call
531   */
532  void logResponse(Message param, String methodName, String call, boolean tooLarge, boolean tooSlow,
533    String clientAddress, long startTime, int processingTime, int qTime, long responseSize,
534    long blockBytesScanned, long fsReadTime, String userName) {
535    final String className = server == null ? StringUtils.EMPTY : server.getClass().getSimpleName();
536    // base information that is reported regardless of type of call
537    Map<String, Object> responseInfo = new HashMap<>();
538    responseInfo.put("starttimems", startTime);
539    responseInfo.put("processingtimems", processingTime);
540    responseInfo.put("queuetimems", qTime);
541    responseInfo.put("responsesize", responseSize);
542    responseInfo.put("blockbytesscanned", blockBytesScanned);
543    responseInfo.put("fsreadtime", fsReadTime);
544    responseInfo.put("client", clientAddress);
545    responseInfo.put("class", className);
546    responseInfo.put("method", methodName);
547    responseInfo.put("call", call);
548    // The params could be really big, make sure they don't kill us at WARN
549    String stringifiedParam = ProtobufUtil.getShortTextFormat(param);
550    if (stringifiedParam.length() > 150) {
551      // Truncate to 1000 chars if TRACE is on, else to 150 chars
552      stringifiedParam = truncateTraceLog(stringifiedParam);
553    }
554    responseInfo.put("param", stringifiedParam);
555    if (param instanceof ClientProtos.ScanRequest && rsRpcServices != null) {
556      ClientProtos.ScanRequest request = ((ClientProtos.ScanRequest) param);
557      String scanDetails;
558      if (request.hasScannerId()) {
559        long scannerId = request.getScannerId();
560        scanDetails = rsRpcServices.getScanDetailsWithId(scannerId);
561      } else {
562        scanDetails = rsRpcServices.getScanDetailsWithRequest(request);
563      }
564      if (scanDetails != null) {
565        responseInfo.put("scandetails", scanDetails);
566      }
567    }
568    if (param instanceof ClientProtos.MultiRequest) {
569      int numGets = 0;
570      int numMutations = 0;
571      int numServiceCalls = 0;
572      ClientProtos.MultiRequest multi = (ClientProtos.MultiRequest) param;
573      for (ClientProtos.RegionAction regionAction : multi.getRegionActionList()) {
574        for (ClientProtos.Action action : regionAction.getActionList()) {
575          if (action.hasMutation()) {
576            numMutations++;
577          }
578          if (action.hasGet()) {
579            numGets++;
580          }
581          if (action.hasServiceCall()) {
582            numServiceCalls++;
583          }
584        }
585      }
586      responseInfo.put(MULTI_GETS, numGets);
587      responseInfo.put(MULTI_MUTATIONS, numMutations);
588      responseInfo.put(MULTI_SERVICE_CALLS, numServiceCalls);
589    }
590    final String tag =
591      (tooLarge && tooSlow) ? "TooLarge & TooSlow" : (tooSlow ? "TooSlow" : "TooLarge");
592    LOG.warn("(response" + tag + "): " + GSON.toJson(responseInfo));
593  }
594
595  private boolean isTooSlow(RpcCall call, int processingTime) {
596    long warnResponseTime = call.getParam() instanceof ClientProtos.ScanRequest
597      ? warnScanResponseTime
598      : this.warnResponseTime;
599    return (processingTime > warnResponseTime && warnResponseTime > -1);
600  }
601
602  private boolean isTooLarge(RpcCall call, long responseSize, long responseBlockSize) {
603    long warnResponseSize = call.getParam() instanceof ClientProtos.ScanRequest
604      ? warnScanResponseSize
605      : this.warnResponseSize;
606    return (warnResponseSize > -1
607      && (responseSize > warnResponseSize || responseBlockSize > warnResponseSize));
608  }
609
610  /**
611   * Truncate to number of chars decided by conf hbase.ipc.trace.log.max.length if TRACE is on else
612   * to 150 chars Refer to Jira HBASE-20826 and HBASE-20942
613   * @param strParam stringifiedParam to be truncated
614   * @return truncated trace log string
615   */
616  String truncateTraceLog(String strParam) {
617    if (LOG.isTraceEnabled()) {
618      int traceLogMaxLength = getConf().getInt(TRACE_LOG_MAX_LENGTH, DEFAULT_TRACE_LOG_MAX_LENGTH);
619      int truncatedLength =
620        strParam.length() < traceLogMaxLength ? strParam.length() : traceLogMaxLength;
621      String truncatedFlag = truncatedLength == strParam.length() ? "" : KEY_WORD_TRUNCATED;
622      return strParam.subSequence(0, truncatedLength) + truncatedFlag;
623    }
624    return strParam.subSequence(0, 150) + KEY_WORD_TRUNCATED;
625  }
626
627  /**
628   * Set the handler for calling out of RPC for error conditions.
629   * @param handler the handler implementation
630   */
631  @Override
632  public void setErrorHandler(HBaseRPCErrorHandler handler) {
633    this.errorHandler = handler;
634  }
635
636  @Override
637  public HBaseRPCErrorHandler getErrorHandler() {
638    return this.errorHandler;
639  }
640
641  /**
642   * Returns the metrics instance for reporting RPC call statistics
643   */
644  @Override
645  public MetricsHBaseServer getMetrics() {
646    return metrics;
647  }
648
649  @Override
650  public void addCallSize(final long diff) {
651    this.callQueueSizeInBytes.add(diff);
652  }
653
654  /**
655   * Authorize the incoming client connection.
656   * @param user       client user
657   * @param connection incoming connection
658   * @param addr       InetAddress of incoming connection
659   * @throws AuthorizationException when the client isn't authorized to talk the protocol
660   */
661  public synchronized void authorize(UserGroupInformation user, ConnectionHeader connection,
662    InetAddress addr) throws AuthorizationException {
663    if (authorize) {
664      Class<?> c = getServiceInterface(services, connection.getServiceName());
665      authManager.authorize(user, c, getConf(), addr);
666    }
667  }
668
669  /**
670   * When the read or write buffer size is larger than this limit, i/o will be done in chunks of
671   * this size. Most RPC requests and responses would be be smaller.
672   */
673  protected static final int NIO_BUFFER_LIMIT = 64 * 1024; // should not be more than 64KB.
674
675  /**
676   * This is a wrapper around
677   * {@link java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)}. If the amount of data
678   * is large, it writes to channel in smaller chunks. This is to avoid jdk from creating many
679   * direct buffers as the size of ByteBuffer increases. There should not be any performance
680   * degredation.
681   * @param channel writable byte channel to write on
682   * @param buffer  buffer to write
683   * @return number of bytes written
684   * @throws java.io.IOException e
685   * @see java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)
686   */
687  protected int channelRead(ReadableByteChannel channel, ByteBuffer buffer) throws IOException {
688
689    int count = (buffer.remaining() <= NIO_BUFFER_LIMIT)
690      ? channel.read(buffer)
691      : channelIO(channel, null, buffer);
692    if (count > 0) {
693      metrics.receivedBytes(count);
694    }
695    return count;
696  }
697
698  /**
699   * Helper for {@link #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)}.
700   * Only one of readCh or writeCh should be non-null.
701   * @param readCh  read channel
702   * @param writeCh write channel
703   * @param buf     buffer to read or write into/out of
704   * @return bytes written
705   * @throws java.io.IOException e
706   * @see #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)
707   */
708  private static int channelIO(ReadableByteChannel readCh, WritableByteChannel writeCh,
709    ByteBuffer buf) throws IOException {
710
711    int originalLimit = buf.limit();
712    int initialRemaining = buf.remaining();
713    int ret = 0;
714
715    while (buf.remaining() > 0) {
716      try {
717        int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
718        buf.limit(buf.position() + ioSize);
719
720        ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf);
721
722        if (ret < ioSize) {
723          break;
724        }
725
726      } finally {
727        buf.limit(originalLimit);
728      }
729    }
730
731    int nBytes = initialRemaining - buf.remaining();
732    return (nBytes > 0) ? nBytes : ret;
733  }
734
735  /**
736   * Needed for features such as delayed calls. We need to be able to store the current call so that
737   * we can complete it later or ask questions of what is supported by the current ongoing call.
738   * @return An RpcCallContext backed by the currently ongoing call (gotten from a thread local)
739   */
740  public static Optional<RpcCall> getCurrentCall() {
741    return Optional.ofNullable(CurCall.get());
742  }
743
744  /**
745   * Just return the current rpc call if it is a {@link ServerCall} and also has {@link CellScanner}
746   * attached.
747   * <p/>
748   * Mainly used for reference counting as {@link CellScanner} may reference non heap memory.
749   */
750  public static Optional<ServerCall<?>> getCurrentServerCallWithCellScanner() {
751    return getCurrentCall().filter(c -> c instanceof ServerCall)
752      .filter(c -> c.getCellScanner() != null).map(c -> (ServerCall<?>) c);
753  }
754
755  public static boolean isInRpcCallContext() {
756    return CurCall.get() != null;
757  }
758
759  /**
760   * Used by {@link org.apache.hadoop.hbase.procedure2.store.region.RegionProcedureStore}. For
761   * master's rpc call, it may generate new procedure and mutate the region which store procedure.
762   * There are some check about rpc when mutate region, such as rpc timeout check. So unset the rpc
763   * call to avoid the rpc check.
764   * @return the currently ongoing rpc call
765   */
766  public static Optional<RpcCall> unsetCurrentCall() {
767    Optional<RpcCall> rpcCall = getCurrentCall();
768    CurCall.set(null);
769    return rpcCall;
770  }
771
772  /**
773   * Used by {@link org.apache.hadoop.hbase.procedure2.store.region.RegionProcedureStore}. Set the
774   * rpc call back after mutate region.
775   */
776  public static void setCurrentCall(RpcCall rpcCall) {
777    CurCall.set(rpcCall);
778  }
779
780  /**
781   * Returns the user credentials associated with the current RPC request or not present if no
782   * credentials were provided.
783   * @return A User
784   */
785  public static Optional<User> getRequestUser() {
786    Optional<RpcCall> ctx = getCurrentCall();
787    return ctx.isPresent() ? ctx.get().getRequestUser() : Optional.empty();
788  }
789
790  /**
791   * The number of open RPC conections
792   * @return the number of open rpc connections
793   */
794  abstract public int getNumOpenConnections();
795
796  /**
797   * Returns the username for any user associated with the current RPC request or not present if no
798   * user is set.
799   */
800  public static Optional<String> getRequestUserName() {
801    return getRequestUser().map(User::getShortName);
802  }
803
804  /**
805   * Returns the address of the remote client associated with the current RPC request or not present
806   * if no address is set.
807   */
808  public static Optional<InetAddress> getRemoteAddress() {
809    return getCurrentCall().map(RpcCall::getRemoteAddress);
810  }
811
812  /**
813   * @param serviceName Some arbitrary string that represents a 'service'.
814   * @param services    Available service instances
815   * @return Matching BlockingServiceAndInterface pair
816   */
817  protected static BlockingServiceAndInterface getServiceAndInterface(
818    final List<BlockingServiceAndInterface> services, final String serviceName) {
819    for (BlockingServiceAndInterface bs : services) {
820      if (bs.getBlockingService().getDescriptorForType().getName().equals(serviceName)) {
821        return bs;
822      }
823    }
824    return null;
825  }
826
827  /**
828   * @param serviceName Some arbitrary string that represents a 'service'.
829   * @param services    Available services and their service interfaces.
830   * @return Service interface class for <code>serviceName</code>
831   */
832  protected static Class<?> getServiceInterface(final List<BlockingServiceAndInterface> services,
833    final String serviceName) {
834    BlockingServiceAndInterface bsasi = getServiceAndInterface(services, serviceName);
835    return bsasi == null ? null : bsasi.getServiceInterface();
836  }
837
838  /**
839   * @param serviceName Some arbitrary string that represents a 'service'.
840   * @param services    Available services and their service interfaces.
841   * @return BlockingService that goes with the passed <code>serviceName</code>
842   */
843  protected static BlockingService getService(final List<BlockingServiceAndInterface> services,
844    final String serviceName) {
845    BlockingServiceAndInterface bsasi = getServiceAndInterface(services, serviceName);
846    return bsasi == null ? null : bsasi.getBlockingService();
847  }
848
849  protected static MonitoredRPCHandler getStatus() {
850    // It is ugly the way we park status up in RpcServer. Let it be for now. TODO.
851    MonitoredRPCHandler status = RpcServer.MONITORED_RPC.get();
852    if (status != null) {
853      return status;
854    }
855    status = TaskMonitor.get().createRPCStatus(Thread.currentThread().getName());
856    status.pause("Waiting for a call");
857    RpcServer.MONITORED_RPC.set(status);
858    return status;
859  }
860
861  /**
862   * Returns the remote side ip address when invoked inside an RPC Returns null incase of an error.
863   */
864  public static InetAddress getRemoteIp() {
865    RpcCall call = CurCall.get();
866    if (call != null) {
867      return call.getRemoteAddress();
868    }
869    return null;
870  }
871
872  @Override
873  public RpcScheduler getScheduler() {
874    return scheduler;
875  }
876
877  @Override
878  public ByteBuffAllocator getByteBuffAllocator() {
879    return this.bbAllocator;
880  }
881
882  @Override
883  public void setRsRpcServices(RSRpcServices rsRpcServices) {
884    this.rsRpcServices = rsRpcServices;
885  }
886
887  @Override
888  public void setNamedQueueRecorder(NamedQueueRecorder namedQueueRecorder) {
889    this.namedQueueRecorder = namedQueueRecorder;
890  }
891
892  protected boolean needAuthorization() {
893    return authorize;
894  }
895
896  @RestrictedApi(explanation = "Should only be called in tests", link = "",
897      allowedOnPath = ".*/src/test/.*")
898  public List<BlockingServiceAndInterface> getServices() {
899    return services;
900  }
901}