001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION;
021
022import com.google.errorprone.annotations.RestrictedApi;
023import java.io.IOException;
024import java.net.InetAddress;
025import java.net.InetSocketAddress;
026import java.nio.ByteBuffer;
027import java.nio.channels.ReadableByteChannel;
028import java.nio.channels.WritableByteChannel;
029import java.util.Collections;
030import java.util.HashMap;
031import java.util.List;
032import java.util.Locale;
033import java.util.Map;
034import java.util.Optional;
035import java.util.concurrent.atomic.LongAdder;
036import org.apache.commons.lang3.StringUtils;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.hbase.CallQueueTooBigException;
039import org.apache.hadoop.hbase.CellScanner;
040import org.apache.hadoop.hbase.DoNotRetryIOException;
041import org.apache.hadoop.hbase.ExtendedCellScanner;
042import org.apache.hadoop.hbase.HConstants;
043import org.apache.hadoop.hbase.Server;
044import org.apache.hadoop.hbase.conf.ConfigurationObserver;
045import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
046import org.apache.hadoop.hbase.io.ByteBuffAllocator;
047import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
048import org.apache.hadoop.hbase.monitoring.TaskMonitor;
049import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
050import org.apache.hadoop.hbase.namequeues.RpcLogDetails;
051import org.apache.hadoop.hbase.regionserver.RSRpcServices;
052import org.apache.hadoop.hbase.security.HBasePolicyProvider;
053import org.apache.hadoop.hbase.security.SaslUtil;
054import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
055import org.apache.hadoop.hbase.security.User;
056import org.apache.hadoop.hbase.security.UserProvider;
057import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
058import org.apache.hadoop.hbase.util.CoprocessorConfigurationUtil;
059import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
060import org.apache.hadoop.hbase.util.GsonUtil;
061import org.apache.hadoop.hbase.util.Pair;
062import org.apache.hadoop.security.UserGroupInformation;
063import org.apache.hadoop.security.authorize.AuthorizationException;
064import org.apache.hadoop.security.authorize.PolicyProvider;
065import org.apache.hadoop.security.authorize.ProxyUsers;
066import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
067import org.apache.hadoop.security.token.SecretManager;
068import org.apache.hadoop.security.token.TokenIdentifier;
069import org.apache.yetus.audience.InterfaceAudience;
070import org.slf4j.Logger;
071import org.slf4j.LoggerFactory;
072
073import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
074import org.apache.hbase.thirdparty.com.google.gson.Gson;
075import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
076import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
077import org.apache.hbase.thirdparty.com.google.protobuf.Message;
078import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
079import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
080
081import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
082import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
083import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
084
085/**
086 * An RPC server that hosts protobuf described Services.
087 */
088@InterfaceAudience.Private
089public abstract class RpcServer implements RpcServerInterface, ConfigurationObserver {
090  // LOG is being used in CallRunner and the log level is being changed in tests
091  public static final Logger LOG = LoggerFactory.getLogger(RpcServer.class);
092  protected static final CallQueueTooBigException CALL_QUEUE_TOO_BIG_EXCEPTION =
093    new CallQueueTooBigException();
094
095  private static final String MULTI_GETS = "multi.gets";
096  private static final String MULTI_MUTATIONS = "multi.mutations";
097  private static final String MULTI_SERVICE_CALLS = "multi.service_calls";
098
099  private final boolean authorize;
100  private volatile boolean isOnlineLogProviderEnabled;
101  protected boolean isSecurityEnabled;
102
103  public static final byte CURRENT_VERSION = 0;
104
105  /**
106   * Whether we allow a fallback to SIMPLE auth for insecure clients when security is enabled.
107   */
108  public static final String FALLBACK_TO_INSECURE_CLIENT_AUTH =
109    "hbase.ipc.server.fallback-to-simple-auth-allowed";
110
111  /**
112   * How many calls/handler are allowed in the queue.
113   */
114  protected static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10;
115
116  protected final CellBlockBuilder cellBlockBuilder;
117
118  protected static final String AUTH_FAILED_FOR = "Auth failed for ";
119  protected static final String AUTH_SUCCESSFUL_FOR = "Auth successful for ";
120  protected static final Logger AUDITLOG =
121    LoggerFactory.getLogger("SecurityLogger." + Server.class.getName());
122  protected SecretManager<TokenIdentifier> secretManager;
123  protected final Map<String, String> saslProps;
124  protected final String serverPrincipal;
125
126  protected ServiceAuthorizationManager authManager;
127
128  /**
129   * This is set to Call object before Handler invokes an RPC and ybdie after the call returns.
130   */
131  protected static final ThreadLocal<RpcCall> CurCall = new ThreadLocal<>();
132
133  /** Keeps MonitoredRPCHandler per handler thread. */
134  protected static final ThreadLocal<MonitoredRPCHandler> MONITORED_RPC = new ThreadLocal<>();
135
136  protected final InetSocketAddress bindAddress;
137
138  protected MetricsHBaseServer metrics;
139
140  protected final Configuration conf;
141
142  /**
143   * Maximum size in bytes of the currently queued and running Calls. If a new Call puts us over
144   * this size, then we will reject the call (after parsing it though). It will go back to the
145   * client and client will retry. Set this size with "hbase.ipc.server.max.callqueue.size". The
146   * call queue size gets incremented after we parse a call and before we add it to the queue of
147   * calls for the scheduler to use. It get decremented after we have 'run' the Call. The current
148   * size is kept in {@link #callQueueSizeInBytes}.
149   * @see #callQueueSizeInBytes
150   * @see #DEFAULT_MAX_CALLQUEUE_SIZE
151   */
152  protected final long maxQueueSizeInBytes;
153  protected static final int DEFAULT_MAX_CALLQUEUE_SIZE = 1024 * 1024 * 1024;
154
155  /**
156   * This is a running count of the size in bytes of all outstanding calls whether currently
157   * executing or queued waiting to be run.
158   */
159  protected final LongAdder callQueueSizeInBytes = new LongAdder();
160
161  protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
162  protected final boolean tcpKeepAlive; // if T then use keepalives
163
164  /**
165   * This flag is used to indicate to sub threads when they should go down. When we call
166   * {@link #start()}, all threads started will consult this flag on whether they should keep going.
167   * It is set to false when {@link #stop()} is called.
168   */
169  volatile boolean running = true;
170
171  /**
172   * This flag is set to true after all threads are up and 'running' and the server is then opened
173   * for business by the call to {@link #start()}.
174   */
175  volatile boolean started = false;
176
177  protected AuthenticationTokenSecretManager authTokenSecretMgr = null;
178
179  protected HBaseRPCErrorHandler errorHandler = null;
180
181  public static final String MAX_REQUEST_SIZE = "hbase.ipc.max.request.size";
182
183  protected static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time";
184  protected static final String WARN_RESPONSE_SIZE = "hbase.ipc.warn.response.size";
185  protected static final String WARN_SCAN_RESPONSE_TIME = "hbase.ipc.warn.response.time.scan";
186  protected static final String WARN_SCAN_RESPONSE_SIZE = "hbase.ipc.warn.response.size.scan";
187
188  /**
189   * Minimum allowable timeout (in milliseconds) in rpc request's header. This configuration exists
190   * to prevent the rpc service regarding this request as timeout immediately.
191   */
192  protected static final String MIN_CLIENT_REQUEST_TIMEOUT = "hbase.ipc.min.client.request.timeout";
193  protected static final int DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT = 20;
194
195  /** Default value for above params */
196  public static final int DEFAULT_MAX_REQUEST_SIZE = DEFAULT_MAX_CALLQUEUE_SIZE / 4; // 256M
197  protected static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds
198  protected static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
199
200  protected static final int DEFAULT_TRACE_LOG_MAX_LENGTH = 1000;
201  protected static final String TRACE_LOG_MAX_LENGTH = "hbase.ipc.trace.log.max.length";
202  protected static final String KEY_WORD_TRUNCATED = " <TRUNCATED>";
203
204  protected static final Gson GSON = GsonUtil.createGsonWithDisableHtmlEscaping().create();
205
206  protected final int maxRequestSize;
207  protected volatile int warnResponseTime;
208  protected volatile int warnResponseSize;
209  protected volatile int warnScanResponseTime;
210  protected volatile int warnScanResponseSize;
211
212  protected final int minClientRequestTimeout;
213
214  protected final Server server;
215  protected final List<BlockingServiceAndInterface> services;
216
217  protected final RpcScheduler scheduler;
218
219  protected final UserProvider userProvider;
220
221  protected final ByteBuffAllocator bbAllocator;
222
223  protected volatile boolean allowFallbackToSimpleAuth;
224
225  volatile RpcCoprocessorHost cpHost;
226
227  /**
228   * Used to get details for scan with a scanner_id<br/>
229   * TODO try to figure out a better way and remove reference from regionserver package later.
230   */
231  private RSRpcServices rsRpcServices;
232
233  /**
234   * Use to add online slowlog responses
235   */
236  private NamedQueueRecorder namedQueueRecorder;
237
238  @FunctionalInterface
239  protected interface CallCleanup {
240    void run();
241  }
242
243  /**
244   * Datastructure for passing a {@link BlockingService} and its associated class of protobuf
245   * service interface. For example, a server that fielded what is defined in the client protobuf
246   * service would pass in an implementation of the client blocking service and then its
247   * ClientService.BlockingInterface.class. Used checking connection setup.
248   */
249  public static class BlockingServiceAndInterface {
250    private final BlockingService service;
251    private final Class<?> serviceInterface;
252
253    public BlockingServiceAndInterface(final BlockingService service,
254      final Class<?> serviceInterface) {
255      this.service = service;
256      this.serviceInterface = serviceInterface;
257    }
258
259    public Class<?> getServiceInterface() {
260      return this.serviceInterface;
261    }
262
263    public BlockingService getBlockingService() {
264      return this.service;
265    }
266  }
267
268  /**
269   * Constructs a server listening on the named port and address.
270   * @param server           hosting instance of {@link Server}. We will do authentications if an
271   *                         instance else pass null for no authentication check.
272   * @param name             Used keying this rpc servers' metrics and for naming the Listener
273   *                         thread.
274   * @param services         A list of services.
275   * @param bindAddress      Where to listen
276   * @param reservoirEnabled Enable ByteBufferPool or not.
277   */
278  public RpcServer(final Server server, final String name,
279    final List<BlockingServiceAndInterface> services, final InetSocketAddress bindAddress,
280    Configuration conf, RpcScheduler scheduler, boolean reservoirEnabled) throws IOException {
281    this.bbAllocator = ByteBuffAllocator.create(conf, reservoirEnabled);
282    this.server = server;
283    this.services = services;
284    this.bindAddress = bindAddress;
285    this.conf = conf;
286    // See declaration above for documentation on what this size is.
287    this.maxQueueSizeInBytes =
288      this.conf.getLong("hbase.ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE);
289
290    this.warnResponseTime = getWarnResponseTime(conf);
291    this.warnResponseSize = getWarnResponseSize(conf);
292    this.warnScanResponseTime = getWarnScanResponseTime(conf);
293    this.warnScanResponseSize = getWarnScanResponseSize(conf);
294    this.minClientRequestTimeout =
295      conf.getInt(MIN_CLIENT_REQUEST_TIMEOUT, DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT);
296    this.maxRequestSize = conf.getInt(MAX_REQUEST_SIZE, DEFAULT_MAX_REQUEST_SIZE);
297
298    this.metrics = new MetricsHBaseServer(name, new MetricsHBaseServerWrapperImpl(this));
299    this.tcpNoDelay = conf.getBoolean("hbase.ipc.server.tcpnodelay", true);
300    this.tcpKeepAlive = conf.getBoolean("hbase.ipc.server.tcpkeepalive", true);
301
302    this.cellBlockBuilder = new CellBlockBuilder(conf);
303
304    this.authorize = conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false);
305    this.userProvider = UserProvider.instantiate(conf);
306    this.isSecurityEnabled = userProvider.isHBaseSecurityEnabled();
307    if (isSecurityEnabled) {
308      saslProps = SaslUtil.initSaslProperties(conf.get("hbase.rpc.protection",
309        QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)));
310      serverPrincipal = Preconditions.checkNotNull(userProvider.getCurrentUserName(),
311        "can not get current user name when security is enabled");
312    } else {
313      saslProps = Collections.emptyMap();
314      serverPrincipal = HConstants.EMPTY_STRING;
315    }
316
317    this.isOnlineLogProviderEnabled = getIsOnlineLogProviderEnabled(conf);
318    this.scheduler = scheduler;
319
320    initializeCoprocessorHost(getConf());
321  }
322
323  @Override
324  public void onConfigurationChange(Configuration newConf) {
325    initReconfigurable(newConf);
326    if (scheduler instanceof ConfigurationObserver) {
327      ((ConfigurationObserver) scheduler).onConfigurationChange(newConf);
328    }
329    if (authorize) {
330      refreshAuthManager(newConf, new HBasePolicyProvider());
331    }
332    refreshSlowLogConfiguration(newConf);
333    if (
334      CoprocessorConfigurationUtil.checkConfigurationChange(getConf(), newConf,
335        CoprocessorHost.RPC_COPROCESSOR_CONF_KEY)
336    ) {
337      LOG.info("Update the RPC coprocessor(s) because the configuration has changed");
338      initializeCoprocessorHost(newConf);
339    }
340  }
341
342  private void refreshSlowLogConfiguration(Configuration newConf) {
343    boolean newIsOnlineLogProviderEnabled = getIsOnlineLogProviderEnabled(newConf);
344    if (isOnlineLogProviderEnabled != newIsOnlineLogProviderEnabled) {
345      isOnlineLogProviderEnabled = newIsOnlineLogProviderEnabled;
346    }
347    int newWarnResponseTime = getWarnResponseTime(newConf);
348    if (warnResponseTime != newWarnResponseTime) {
349      warnResponseTime = newWarnResponseTime;
350    }
351    int newWarnResponseSize = getWarnResponseSize(newConf);
352    if (warnResponseSize != newWarnResponseSize) {
353      warnResponseSize = newWarnResponseSize;
354    }
355    int newWarnResponseTimeScan = getWarnScanResponseTime(newConf);
356    if (warnScanResponseTime != newWarnResponseTimeScan) {
357      warnScanResponseTime = newWarnResponseTimeScan;
358    }
359    int newWarnScanResponseSize = getWarnScanResponseSize(newConf);
360    if (warnScanResponseSize != newWarnScanResponseSize) {
361      warnScanResponseSize = newWarnScanResponseSize;
362    }
363  }
364
365  private static boolean getIsOnlineLogProviderEnabled(Configuration conf) {
366    return conf.getBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY,
367      HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED);
368  }
369
370  private static int getWarnResponseTime(Configuration conf) {
371    return conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME);
372  }
373
374  private static int getWarnResponseSize(Configuration conf) {
375    return conf.getInt(WARN_RESPONSE_SIZE, DEFAULT_WARN_RESPONSE_SIZE);
376  }
377
378  private static int getWarnScanResponseTime(Configuration conf) {
379    return conf.getInt(WARN_SCAN_RESPONSE_TIME, getWarnResponseTime(conf));
380  }
381
382  private static int getWarnScanResponseSize(Configuration conf) {
383    return conf.getInt(WARN_SCAN_RESPONSE_SIZE, getWarnResponseSize(conf));
384  }
385
386  protected void initReconfigurable(Configuration confToLoad) {
387    this.allowFallbackToSimpleAuth = confToLoad.getBoolean(FALLBACK_TO_INSECURE_CLIENT_AUTH, false);
388    if (isSecurityEnabled && allowFallbackToSimpleAuth) {
389      LOG.warn("********* WARNING! *********");
390      LOG.warn("This server is configured to allow connections from INSECURE clients");
391      LOG.warn("(" + FALLBACK_TO_INSECURE_CLIENT_AUTH + " = true).");
392      LOG.warn("While this option is enabled, client identities cannot be secured, and user");
393      LOG.warn("impersonation is possible!");
394      LOG.warn("For secure operation, please disable SIMPLE authentication as soon as possible,");
395      LOG.warn("by setting " + FALLBACK_TO_INSECURE_CLIENT_AUTH + " = false in hbase-site.xml");
396      LOG.warn("****************************");
397    }
398  }
399
400  Configuration getConf() {
401    return conf;
402  }
403
404  @Override
405  public boolean isStarted() {
406    return this.started;
407  }
408
409  @Override
410  public synchronized void refreshAuthManager(Configuration conf, PolicyProvider pp) {
411    // Ignore warnings that this should be accessed in a static way instead of via an instance;
412    // it'll break if you go via static route.
413    System.setProperty("hadoop.policy.file", "hbase-policy.xml");
414    this.authManager.refresh(conf, pp);
415    LOG.info("Refreshed hbase-policy.xml successfully");
416    ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
417    LOG.info("Refreshed super and proxy users successfully");
418  }
419
420  protected AuthenticationTokenSecretManager createSecretManager() {
421    if (!isSecurityEnabled) return null;
422    if (server == null) return null;
423    Configuration conf = server.getConfiguration();
424    long keyUpdateInterval = conf.getLong("hbase.auth.key.update.interval", 24 * 60 * 60 * 1000);
425    long maxAge = conf.getLong("hbase.auth.token.max.lifetime", 7 * 24 * 60 * 60 * 1000);
426    return new AuthenticationTokenSecretManager(conf, server.getZooKeeper(),
427      server.getServerName().toString(), keyUpdateInterval, maxAge);
428  }
429
430  public SecretManager<? extends TokenIdentifier> getSecretManager() {
431    return this.secretManager;
432  }
433
434  @SuppressWarnings("unchecked")
435  public void setSecretManager(SecretManager<? extends TokenIdentifier> secretManager) {
436    this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
437  }
438
439  /**
440   * This is a server side method, which is invoked over RPC. On success the return response has
441   * protobuf response payload. On failure, the exception name and the stack trace are returned in
442   * the protobuf response.
443   */
444  @Override
445  public Pair<Message, ExtendedCellScanner> call(RpcCall call, MonitoredRPCHandler status)
446    throws IOException {
447    try {
448      MethodDescriptor md = call.getMethod();
449      Message param = call.getParam();
450      status.setRPC(md.getName(), new Object[] { param }, call.getReceiveTime());
451      // TODO: Review after we add in encoded data blocks.
452      status.setRPCPacket(param);
453      status.resume("Servicing call");
454      // get an instance of the method arg type
455      HBaseRpcController controller = new HBaseRpcControllerImpl(call.getCellScanner());
456      controller.setCallTimeout(call.getTimeout());
457      Message result = call.getService().callBlockingMethod(md, controller, param);
458      long receiveTime = call.getReceiveTime();
459      long startTime = call.getStartTime();
460      long endTime = EnvironmentEdgeManager.currentTime();
461      int processingTime = (int) (endTime - startTime);
462      int qTime = (int) (startTime - receiveTime);
463      int totalTime = (int) (endTime - receiveTime);
464      if (LOG.isTraceEnabled()) {
465        LOG.trace(
466          "{}, response: {}, receiveTime: {}, queueTime: {}, processingTime: {}, "
467            + "totalTime: {}, fsReadTime: {}",
468          CurCall.get().toString(), TextFormat.shortDebugString(result),
469          CurCall.get().getReceiveTime(), qTime, processingTime, totalTime,
470          CurCall.get().getFsReadTime());
471      }
472      // Use the raw request call size for now.
473      long requestSize = call.getSize();
474      long responseSize = result.getSerializedSize();
475      long responseBlockSize = call.getBlockBytesScanned();
476      long fsReadTime = call.getFsReadTime();
477      if (call.isClientCellBlockSupported()) {
478        // Include the payload size in HBaseRpcController
479        responseSize += call.getResponseCellSize();
480      }
481
482      metrics.dequeuedCall(qTime);
483      metrics.processedCall(processingTime);
484      metrics.totalCall(totalTime);
485      metrics.receivedRequest(requestSize);
486      metrics.sentResponse(responseSize);
487      // log any RPC responses that are slower than the configured warn
488      // response time or larger than configured warning size
489      boolean tooSlow = isTooSlow(call, processingTime);
490      boolean tooLarge = isTooLarge(call, responseSize, responseBlockSize);
491      if (tooSlow || tooLarge) {
492        final String userName = call.getRequestUserName().orElse(StringUtils.EMPTY);
493        // when tagging, we let TooLarge trump TooSmall to keep output simple
494        // note that large responses will often also be slow.
495        logResponse(param, md.getName(), md.getName() + "(" + param.getClass().getName() + ")",
496          tooLarge, tooSlow, status.getClient(), startTime, processingTime, qTime, responseSize,
497          responseBlockSize, fsReadTime, userName);
498        if (this.namedQueueRecorder != null && this.isOnlineLogProviderEnabled) {
499          // send logs to ring buffer owned by slowLogRecorder
500          final String className =
501            server == null ? StringUtils.EMPTY : server.getClass().getSimpleName();
502          this.namedQueueRecorder.addRecord(new RpcLogDetails(call, param, status.getClient(),
503            responseSize, responseBlockSize, fsReadTime, className, tooSlow, tooLarge));
504        }
505      }
506      return new Pair<>(result, controller.cellScanner());
507    } catch (Throwable e) {
508      // The above callBlockingMethod will always return a SE. Strip the SE wrapper before
509      // putting it on the wire. Its needed to adhere to the pb Service Interface but we don't
510      // need to pass it over the wire.
511      if (e instanceof ServiceException) {
512        if (e.getCause() == null) {
513          LOG.debug("Caught a ServiceException with null cause", e);
514        } else {
515          e = e.getCause();
516        }
517      }
518
519      // increment the number of requests that were exceptions.
520      metrics.exception(e);
521
522      if (e instanceof LinkageError) throw new DoNotRetryIOException(e);
523      if (e instanceof IOException) throw (IOException) e;
524      LOG.error("Unexpected throwable object ", e);
525      throw new IOException(e.getMessage(), e);
526    }
527  }
528
529  /**
530   * Logs an RPC response to the LOG file, producing valid JSON objects for client Operations.
531   * @param param             The parameters received in the call.
532   * @param methodName        The name of the method invoked
533   * @param call              The string representation of the call
534   * @param tooLarge          To indicate if the event is tooLarge
535   * @param tooSlow           To indicate if the event is tooSlow
536   * @param clientAddress     The address of the client who made this call.
537   * @param startTime         The time that the call was initiated, in ms.
538   * @param processingTime    The duration that the call took to run, in ms.
539   * @param qTime             The duration that the call spent on the queue prior to being
540   *                          initiated, in ms.
541   * @param responseSize      The size in bytes of the response buffer.
542   * @param blockBytesScanned The size of block bytes scanned to retrieve the response.
543   * @param userName          UserName of the current RPC Call
544   */
545  void logResponse(Message param, String methodName, String call, boolean tooLarge, boolean tooSlow,
546    String clientAddress, long startTime, int processingTime, int qTime, long responseSize,
547    long blockBytesScanned, long fsReadTime, String userName) {
548    final String className = server == null ? StringUtils.EMPTY : server.getClass().getSimpleName();
549    // base information that is reported regardless of type of call
550    Map<String, Object> responseInfo = new HashMap<>();
551    responseInfo.put("starttimems", startTime);
552    responseInfo.put("processingtimems", processingTime);
553    responseInfo.put("queuetimems", qTime);
554    responseInfo.put("responsesize", responseSize);
555    responseInfo.put("blockbytesscanned", blockBytesScanned);
556    responseInfo.put("fsreadtime", fsReadTime);
557    responseInfo.put("client", clientAddress);
558    responseInfo.put("class", className);
559    responseInfo.put("method", methodName);
560    responseInfo.put("call", call);
561    // The params could be really big, make sure they don't kill us at WARN
562    String stringifiedParam = ProtobufUtil.getShortTextFormat(param);
563    if (stringifiedParam.length() > 150) {
564      // Truncate to 1000 chars if TRACE is on, else to 150 chars
565      stringifiedParam = truncateTraceLog(stringifiedParam);
566    }
567    responseInfo.put("param", stringifiedParam);
568    if (param instanceof ClientProtos.ScanRequest && rsRpcServices != null) {
569      ClientProtos.ScanRequest request = ((ClientProtos.ScanRequest) param);
570      String scanDetails;
571      if (request.hasScannerId()) {
572        long scannerId = request.getScannerId();
573        scanDetails = rsRpcServices.getScanDetailsWithId(scannerId);
574      } else {
575        scanDetails = rsRpcServices.getScanDetailsWithRequest(request);
576      }
577      if (scanDetails != null) {
578        responseInfo.put("scandetails", scanDetails);
579      }
580    }
581    if (param instanceof ClientProtos.MultiRequest) {
582      int numGets = 0;
583      int numMutations = 0;
584      int numServiceCalls = 0;
585      ClientProtos.MultiRequest multi = (ClientProtos.MultiRequest) param;
586      for (ClientProtos.RegionAction regionAction : multi.getRegionActionList()) {
587        for (ClientProtos.Action action : regionAction.getActionList()) {
588          if (action.hasMutation()) {
589            numMutations++;
590          }
591          if (action.hasGet()) {
592            numGets++;
593          }
594          if (action.hasServiceCall()) {
595            numServiceCalls++;
596          }
597        }
598      }
599      responseInfo.put(MULTI_GETS, numGets);
600      responseInfo.put(MULTI_MUTATIONS, numMutations);
601      responseInfo.put(MULTI_SERVICE_CALLS, numServiceCalls);
602    }
603    final String tag =
604      (tooLarge && tooSlow) ? "TooLarge & TooSlow" : (tooSlow ? "TooSlow" : "TooLarge");
605    LOG.warn("(response" + tag + "): " + GSON.toJson(responseInfo));
606  }
607
608  private boolean isTooSlow(RpcCall call, int processingTime) {
609    long warnResponseTime = call.getParam() instanceof ClientProtos.ScanRequest
610      ? warnScanResponseTime
611      : this.warnResponseTime;
612    return (processingTime > warnResponseTime && warnResponseTime > -1);
613  }
614
615  private boolean isTooLarge(RpcCall call, long responseSize, long responseBlockSize) {
616    long warnResponseSize = call.getParam() instanceof ClientProtos.ScanRequest
617      ? warnScanResponseSize
618      : this.warnResponseSize;
619    return (warnResponseSize > -1
620      && (responseSize > warnResponseSize || responseBlockSize > warnResponseSize));
621  }
622
623  /**
624   * Truncate to number of chars decided by conf hbase.ipc.trace.log.max.length if TRACE is on else
625   * to 150 chars Refer to Jira HBASE-20826 and HBASE-20942
626   * @param strParam stringifiedParam to be truncated
627   * @return truncated trace log string
628   */
629  String truncateTraceLog(String strParam) {
630    if (LOG.isTraceEnabled()) {
631      int traceLogMaxLength = getConf().getInt(TRACE_LOG_MAX_LENGTH, DEFAULT_TRACE_LOG_MAX_LENGTH);
632      int truncatedLength =
633        strParam.length() < traceLogMaxLength ? strParam.length() : traceLogMaxLength;
634      String truncatedFlag = truncatedLength == strParam.length() ? "" : KEY_WORD_TRUNCATED;
635      return strParam.subSequence(0, truncatedLength) + truncatedFlag;
636    }
637    return strParam.subSequence(0, 150) + KEY_WORD_TRUNCATED;
638  }
639
640  /**
641   * Set the handler for calling out of RPC for error conditions.
642   * @param handler the handler implementation
643   */
644  @Override
645  public void setErrorHandler(HBaseRPCErrorHandler handler) {
646    this.errorHandler = handler;
647  }
648
649  @Override
650  public HBaseRPCErrorHandler getErrorHandler() {
651    return this.errorHandler;
652  }
653
654  /**
655   * Returns the metrics instance for reporting RPC call statistics
656   */
657  @Override
658  public MetricsHBaseServer getMetrics() {
659    return metrics;
660  }
661
662  @Override
663  public void addCallSize(final long diff) {
664    this.callQueueSizeInBytes.add(diff);
665  }
666
667  /**
668   * Authorize the incoming client connection.
669   * @param user       client user
670   * @param connection incoming connection
671   * @param addr       InetAddress of incoming connection
672   * @throws AuthorizationException when the client isn't authorized to talk the protocol
673   */
674  public synchronized void authorize(UserGroupInformation user, ConnectionHeader connection,
675    InetAddress addr) throws AuthorizationException {
676    if (authorize) {
677      Class<?> c = getServiceInterface(services, connection.getServiceName());
678      authManager.authorize(user, c, getConf(), addr);
679    }
680  }
681
682  /**
683   * When the read or write buffer size is larger than this limit, i/o will be done in chunks of
684   * this size. Most RPC requests and responses would be be smaller.
685   */
686  protected static final int NIO_BUFFER_LIMIT = 64 * 1024; // should not be more than 64KB.
687
688  /**
689   * This is a wrapper around
690   * {@link java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)}. If the amount of data
691   * is large, it writes to channel in smaller chunks. This is to avoid jdk from creating many
692   * direct buffers as the size of ByteBuffer increases. There should not be any performance
693   * degredation.
694   * @param channel writable byte channel to write on
695   * @param buffer  buffer to write
696   * @return number of bytes written
697   * @throws java.io.IOException e
698   * @see java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)
699   */
700  protected int channelRead(ReadableByteChannel channel, ByteBuffer buffer) throws IOException {
701
702    int count = (buffer.remaining() <= NIO_BUFFER_LIMIT)
703      ? channel.read(buffer)
704      : channelIO(channel, null, buffer);
705    if (count > 0) {
706      metrics.receivedBytes(count);
707    }
708    return count;
709  }
710
711  /**
712   * Helper for {@link #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)}.
713   * Only one of readCh or writeCh should be non-null.
714   * @param readCh  read channel
715   * @param writeCh write channel
716   * @param buf     buffer to read or write into/out of
717   * @return bytes written
718   * @throws java.io.IOException e
719   * @see #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)
720   */
721  private static int channelIO(ReadableByteChannel readCh, WritableByteChannel writeCh,
722    ByteBuffer buf) throws IOException {
723
724    int originalLimit = buf.limit();
725    int initialRemaining = buf.remaining();
726    int ret = 0;
727
728    while (buf.remaining() > 0) {
729      try {
730        int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
731        buf.limit(buf.position() + ioSize);
732
733        ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf);
734
735        if (ret < ioSize) {
736          break;
737        }
738
739      } finally {
740        buf.limit(originalLimit);
741      }
742    }
743
744    int nBytes = initialRemaining - buf.remaining();
745    return (nBytes > 0) ? nBytes : ret;
746  }
747
748  /**
749   * Needed for features such as delayed calls. We need to be able to store the current call so that
750   * we can complete it later or ask questions of what is supported by the current ongoing call.
751   * @return An RpcCallContext backed by the currently ongoing call (gotten from a thread local)
752   */
753  public static Optional<RpcCall> getCurrentCall() {
754    return Optional.ofNullable(CurCall.get());
755  }
756
757  /**
758   * Just return the current rpc call if it is a {@link ServerCall} and also has {@link CellScanner}
759   * attached.
760   * <p/>
761   * Mainly used for reference counting as {@link CellScanner} may reference non heap memory.
762   */
763  public static Optional<ServerCall<?>> getCurrentServerCallWithCellScanner() {
764    return getCurrentCall().filter(c -> c instanceof ServerCall)
765      .filter(c -> c.getCellScanner() != null).map(c -> (ServerCall<?>) c);
766  }
767
768  public static boolean isInRpcCallContext() {
769    return CurCall.get() != null;
770  }
771
772  /**
773   * Used by {@link org.apache.hadoop.hbase.procedure2.store.region.RegionProcedureStore}. For
774   * master's rpc call, it may generate new procedure and mutate the region which store procedure.
775   * There are some check about rpc when mutate region, such as rpc timeout check. So unset the rpc
776   * call to avoid the rpc check.
777   * @return the currently ongoing rpc call
778   */
779  public static Optional<RpcCall> unsetCurrentCall() {
780    Optional<RpcCall> rpcCall = getCurrentCall();
781    CurCall.set(null);
782    return rpcCall;
783  }
784
785  /**
786   * Used by {@link org.apache.hadoop.hbase.procedure2.store.region.RegionProcedureStore}. Set the
787   * rpc call back after mutate region.
788   */
789  public static void setCurrentCall(RpcCall rpcCall) {
790    CurCall.set(rpcCall);
791  }
792
793  /**
794   * Returns the user credentials associated with the current RPC request or not present if no
795   * credentials were provided.
796   * @return A User
797   */
798  public static Optional<User> getRequestUser() {
799    Optional<RpcCall> ctx = getCurrentCall();
800    return ctx.isPresent() ? ctx.get().getRequestUser() : Optional.empty();
801  }
802
803  /**
804   * The number of open RPC conections
805   * @return the number of open rpc connections
806   */
807  abstract public int getNumOpenConnections();
808
809  /**
810   * Returns the username for any user associated with the current RPC request or not present if no
811   * user is set.
812   */
813  public static Optional<String> getRequestUserName() {
814    return getRequestUser().map(User::getShortName);
815  }
816
817  /**
818   * Returns the address of the remote client associated with the current RPC request or not present
819   * if no address is set.
820   */
821  public static Optional<InetAddress> getRemoteAddress() {
822    return getCurrentCall().map(RpcCall::getRemoteAddress);
823  }
824
825  /**
826   * @param serviceName Some arbitrary string that represents a 'service'.
827   * @param services    Available service instances
828   * @return Matching BlockingServiceAndInterface pair
829   */
830  protected static BlockingServiceAndInterface getServiceAndInterface(
831    final List<BlockingServiceAndInterface> services, final String serviceName) {
832    for (BlockingServiceAndInterface bs : services) {
833      if (bs.getBlockingService().getDescriptorForType().getName().equals(serviceName)) {
834        return bs;
835      }
836    }
837    return null;
838  }
839
840  /**
841   * @param serviceName Some arbitrary string that represents a 'service'.
842   * @param services    Available services and their service interfaces.
843   * @return Service interface class for <code>serviceName</code>
844   */
845  protected static Class<?> getServiceInterface(final List<BlockingServiceAndInterface> services,
846    final String serviceName) {
847    BlockingServiceAndInterface bsasi = getServiceAndInterface(services, serviceName);
848    return bsasi == null ? null : bsasi.getServiceInterface();
849  }
850
851  /**
852   * @param serviceName Some arbitrary string that represents a 'service'.
853   * @param services    Available services and their service interfaces.
854   * @return BlockingService that goes with the passed <code>serviceName</code>
855   */
856  protected static BlockingService getService(final List<BlockingServiceAndInterface> services,
857    final String serviceName) {
858    BlockingServiceAndInterface bsasi = getServiceAndInterface(services, serviceName);
859    return bsasi == null ? null : bsasi.getBlockingService();
860  }
861
862  protected static MonitoredRPCHandler getStatus() {
863    // It is ugly the way we park status up in RpcServer. Let it be for now. TODO.
864    MonitoredRPCHandler status = RpcServer.MONITORED_RPC.get();
865    if (status != null) {
866      return status;
867    }
868    status = TaskMonitor.get().createRPCStatus(Thread.currentThread().getName());
869    status.pause("Waiting for a call");
870    RpcServer.MONITORED_RPC.set(status);
871    return status;
872  }
873
874  /**
875   * Returns the remote side ip address when invoked inside an RPC Returns null incase of an error.
876   */
877  public static InetAddress getRemoteIp() {
878    RpcCall call = CurCall.get();
879    if (call != null) {
880      return call.getRemoteAddress();
881    }
882    return null;
883  }
884
885  @Override
886  public RpcScheduler getScheduler() {
887    return scheduler;
888  }
889
890  @Override
891  public ByteBuffAllocator getByteBuffAllocator() {
892    return this.bbAllocator;
893  }
894
895  @Override
896  public void setRsRpcServices(RSRpcServices rsRpcServices) {
897    this.rsRpcServices = rsRpcServices;
898  }
899
900  @Override
901  public void setNamedQueueRecorder(NamedQueueRecorder namedQueueRecorder) {
902    this.namedQueueRecorder = namedQueueRecorder;
903  }
904
905  protected boolean needAuthorization() {
906    return authorize;
907  }
908
909  @RestrictedApi(explanation = "Should only be called in tests", link = "",
910      allowedOnPath = ".*/src/test/.*")
911  public List<BlockingServiceAndInterface> getServices() {
912    return services;
913  }
914
915  private void initializeCoprocessorHost(Configuration conf) {
916    this.cpHost = new RpcCoprocessorHost(conf);
917  }
918
919  @Override
920  public RpcCoprocessorHost getRpcCoprocessorHost() {
921    return cpHost;
922  }
923}