001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.ipc;
020
021import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION;
022
023import java.io.IOException;
024import java.net.InetAddress;
025import java.net.InetSocketAddress;
026import java.nio.ByteBuffer;
027import java.nio.channels.ReadableByteChannel;
028import java.nio.channels.WritableByteChannel;
029import java.util.Collections;
030import java.util.HashMap;
031import java.util.List;
032import java.util.Locale;
033import java.util.Map;
034import java.util.Optional;
035import java.util.concurrent.atomic.LongAdder;
036
037import org.apache.commons.lang3.StringUtils;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.hbase.CallQueueTooBigException;
040import org.apache.hadoop.hbase.CellScanner;
041import org.apache.hadoop.hbase.DoNotRetryIOException;
042import org.apache.hadoop.hbase.HConstants;
043import org.apache.hadoop.hbase.Server;
044import org.apache.hadoop.hbase.conf.ConfigurationObserver;
045import org.apache.hadoop.hbase.exceptions.RequestTooBigException;
046import org.apache.hadoop.hbase.io.ByteBuffAllocator;
047import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
048import org.apache.hadoop.hbase.monitoring.TaskMonitor;
049import org.apache.hadoop.hbase.regionserver.RSRpcServices;
050import org.apache.hadoop.hbase.namequeues.RpcLogDetails;
051import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
052import org.apache.hadoop.hbase.security.HBasePolicyProvider;
053import org.apache.hadoop.hbase.security.SaslUtil;
054import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
055import org.apache.hadoop.hbase.security.User;
056import org.apache.hadoop.hbase.security.UserProvider;
057import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
058import org.apache.hadoop.hbase.util.GsonUtil;
059import org.apache.hadoop.hbase.util.Pair;
060import org.apache.hadoop.security.UserGroupInformation;
061import org.apache.hadoop.security.authorize.AuthorizationException;
062import org.apache.hadoop.security.authorize.PolicyProvider;
063import org.apache.hadoop.security.authorize.ProxyUsers;
064import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
065import org.apache.hadoop.security.token.SecretManager;
066import org.apache.hadoop.security.token.TokenIdentifier;
067import org.apache.yetus.audience.InterfaceAudience;
068import org.slf4j.Logger;
069import org.slf4j.LoggerFactory;
070
071import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
072import org.apache.hbase.thirdparty.com.google.gson.Gson;
073import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
074import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
075import org.apache.hbase.thirdparty.com.google.protobuf.Message;
076import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
077import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
078
079import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
080import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
081import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
082
083/**
084 * An RPC server that hosts protobuf described Services.
085 *
086 */
087@InterfaceAudience.Private
088public abstract class RpcServer implements RpcServerInterface,
089    ConfigurationObserver {
090  // LOG is being used in CallRunner and the log level is being changed in tests
091  public static final Logger LOG = LoggerFactory.getLogger(RpcServer.class);
092  protected static final CallQueueTooBigException CALL_QUEUE_TOO_BIG_EXCEPTION
093      = new CallQueueTooBigException();
094
095  private static final String MULTI_GETS = "multi.gets";
096  private static final String MULTI_MUTATIONS = "multi.mutations";
097  private static final String MULTI_SERVICE_CALLS = "multi.service_calls";
098
099  private final boolean authorize;
100  private final boolean isOnlineLogProviderEnabled;
101  protected boolean isSecurityEnabled;
102
103  public static final byte CURRENT_VERSION = 0;
104
105  /**
106   * Whether we allow a fallback to SIMPLE auth for insecure clients when security is enabled.
107   */
108  public static final String FALLBACK_TO_INSECURE_CLIENT_AUTH =
109          "hbase.ipc.server.fallback-to-simple-auth-allowed";
110
111  /**
112   * How many calls/handler are allowed in the queue.
113   */
114  protected static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10;
115
116  protected final CellBlockBuilder cellBlockBuilder;
117
118  protected static final String AUTH_FAILED_FOR = "Auth failed for ";
119  protected static final String AUTH_SUCCESSFUL_FOR = "Auth successful for ";
120  protected static final Logger AUDITLOG = LoggerFactory.getLogger("SecurityLogger."
121      + Server.class.getName());
122  protected SecretManager<TokenIdentifier> secretManager;
123  protected final Map<String, String> saslProps;
124
125  protected ServiceAuthorizationManager authManager;
126
127  /** This is set to Call object before Handler invokes an RPC and ybdie
128   * after the call returns.
129   */
130  protected static final ThreadLocal<RpcCall> CurCall = new ThreadLocal<>();
131
132  /** Keeps MonitoredRPCHandler per handler thread. */
133  protected static final ThreadLocal<MonitoredRPCHandler> MONITORED_RPC = new ThreadLocal<>();
134
135  protected final InetSocketAddress bindAddress;
136
137  protected MetricsHBaseServer metrics;
138
139  protected final Configuration conf;
140
141  /**
142   * Maximum size in bytes of the currently queued and running Calls. If a new Call puts us over
143   * this size, then we will reject the call (after parsing it though). It will go back to the
144   * client and client will retry. Set this size with "hbase.ipc.server.max.callqueue.size". The
145   * call queue size gets incremented after we parse a call and before we add it to the queue of
146   * calls for the scheduler to use. It get decremented after we have 'run' the Call. The current
147   * size is kept in {@link #callQueueSizeInBytes}.
148   * @see #callQueueSizeInBytes
149   * @see #DEFAULT_MAX_CALLQUEUE_SIZE
150   */
151  protected final long maxQueueSizeInBytes;
152  protected static final int DEFAULT_MAX_CALLQUEUE_SIZE = 1024 * 1024 * 1024;
153
154  /**
155   * This is a running count of the size in bytes of all outstanding calls whether currently
156   * executing or queued waiting to be run.
157   */
158  protected final LongAdder callQueueSizeInBytes = new LongAdder();
159
160  protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
161  protected final boolean tcpKeepAlive; // if T then use keepalives
162
163  /**
164   * This flag is used to indicate to sub threads when they should go down.  When we call
165   * {@link #start()}, all threads started will consult this flag on whether they should
166   * keep going.  It is set to false when {@link #stop()} is called.
167   */
168  volatile boolean running = true;
169
170  /**
171   * This flag is set to true after all threads are up and 'running' and the server is then opened
172   * for business by the call to {@link #start()}.
173   */
174  volatile boolean started = false;
175
176  protected AuthenticationTokenSecretManager authTokenSecretMgr = null;
177
178  protected HBaseRPCErrorHandler errorHandler = null;
179
180  public static final String MAX_REQUEST_SIZE = "hbase.ipc.max.request.size";
181  protected static final RequestTooBigException REQUEST_TOO_BIG_EXCEPTION =
182      new RequestTooBigException();
183
184  protected static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time";
185  protected static final String WARN_RESPONSE_SIZE = "hbase.ipc.warn.response.size";
186
187  /**
188   * Minimum allowable timeout (in milliseconds) in rpc request's header. This
189   * configuration exists to prevent the rpc service regarding this request as timeout immediately.
190   */
191  protected static final String MIN_CLIENT_REQUEST_TIMEOUT = "hbase.ipc.min.client.request.timeout";
192  protected static final int DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT = 20;
193
194  /** Default value for above params */
195  public static final int DEFAULT_MAX_REQUEST_SIZE = DEFAULT_MAX_CALLQUEUE_SIZE / 4; // 256M
196  protected static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds
197  protected static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
198
199  protected static final int DEFAULT_TRACE_LOG_MAX_LENGTH = 1000;
200  protected static final String TRACE_LOG_MAX_LENGTH = "hbase.ipc.trace.log.max.length";
201  protected static final String KEY_WORD_TRUNCATED = " <TRUNCATED>";
202
203  protected static final Gson GSON = GsonUtil.createGson().create();
204
205  protected final int maxRequestSize;
206  protected final int warnResponseTime;
207  protected final int warnResponseSize;
208
209  protected final int minClientRequestTimeout;
210
211  protected final Server server;
212  protected final List<BlockingServiceAndInterface> services;
213
214  protected final RpcScheduler scheduler;
215
216  protected UserProvider userProvider;
217
218  protected final ByteBuffAllocator bbAllocator;
219
220  protected volatile boolean allowFallbackToSimpleAuth;
221
222  /**
223   * Used to get details for scan with a scanner_id<br/>
224   * TODO try to figure out a better way and remove reference from regionserver package later.
225   */
226  private RSRpcServices rsRpcServices;
227
228
229  /**
230   * Use to add online slowlog responses
231   */
232  private NamedQueueRecorder namedQueueRecorder;
233
234  @FunctionalInterface
235  protected interface CallCleanup {
236    void run();
237  }
238
239  /**
240   * Datastructure for passing a {@link BlockingService} and its associated class of
241   * protobuf service interface.  For example, a server that fielded what is defined
242   * in the client protobuf service would pass in an implementation of the client blocking service
243   * and then its ClientService.BlockingInterface.class.  Used checking connection setup.
244   */
245  public static class BlockingServiceAndInterface {
246    private final BlockingService service;
247    private final Class<?> serviceInterface;
248    public BlockingServiceAndInterface(final BlockingService service,
249        final Class<?> serviceInterface) {
250      this.service = service;
251      this.serviceInterface = serviceInterface;
252    }
253    public Class<?> getServiceInterface() {
254      return this.serviceInterface;
255    }
256    public BlockingService getBlockingService() {
257      return this.service;
258    }
259  }
260
261  /**
262   * Constructs a server listening on the named port and address.
263   * @param server hosting instance of {@link Server}. We will do authentications if an
264   * instance else pass null for no authentication check.
265   * @param name Used keying this rpc servers' metrics and for naming the Listener thread.
266   * @param services A list of services.
267   * @param bindAddress Where to listen
268   * @param conf
269   * @param scheduler
270   * @param reservoirEnabled Enable ByteBufferPool or not.
271   */
272  public RpcServer(final Server server, final String name,
273      final List<BlockingServiceAndInterface> services,
274      final InetSocketAddress bindAddress, Configuration conf,
275      RpcScheduler scheduler, boolean reservoirEnabled) throws IOException {
276    this.bbAllocator = ByteBuffAllocator.create(conf, reservoirEnabled);
277    this.server = server;
278    this.services = services;
279    this.bindAddress = bindAddress;
280    this.conf = conf;
281    // See declaration above for documentation on what this size is.
282    this.maxQueueSizeInBytes =
283      this.conf.getLong("hbase.ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE);
284
285    this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME);
286    this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE, DEFAULT_WARN_RESPONSE_SIZE);
287    this.minClientRequestTimeout = conf.getInt(MIN_CLIENT_REQUEST_TIMEOUT,
288        DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT);
289    this.maxRequestSize = conf.getInt(MAX_REQUEST_SIZE, DEFAULT_MAX_REQUEST_SIZE);
290
291    this.metrics = new MetricsHBaseServer(name, new MetricsHBaseServerWrapperImpl(this));
292    this.tcpNoDelay = conf.getBoolean("hbase.ipc.server.tcpnodelay", true);
293    this.tcpKeepAlive = conf.getBoolean("hbase.ipc.server.tcpkeepalive", true);
294
295    this.cellBlockBuilder = new CellBlockBuilder(conf);
296
297    this.authorize = conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false);
298    this.userProvider = UserProvider.instantiate(conf);
299    this.isSecurityEnabled = userProvider.isHBaseSecurityEnabled();
300    if (isSecurityEnabled) {
301      saslProps = SaslUtil.initSaslProperties(conf.get("hbase.rpc.protection",
302        QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)));
303    } else {
304      saslProps = Collections.emptyMap();
305    }
306
307    this.isOnlineLogProviderEnabled = conf.getBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY,
308      HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED);
309    this.scheduler = scheduler;
310  }
311
312  @Override
313  public void onConfigurationChange(Configuration newConf) {
314    initReconfigurable(newConf);
315    if (scheduler instanceof ConfigurationObserver) {
316      ((ConfigurationObserver) scheduler).onConfigurationChange(newConf);
317    }
318    if (authorize) {
319      refreshAuthManager(newConf, new HBasePolicyProvider());
320    }
321  }
322
323  protected void initReconfigurable(Configuration confToLoad) {
324    this.allowFallbackToSimpleAuth = confToLoad.getBoolean(FALLBACK_TO_INSECURE_CLIENT_AUTH, false);
325    if (isSecurityEnabled && allowFallbackToSimpleAuth) {
326      LOG.warn("********* WARNING! *********");
327      LOG.warn("This server is configured to allow connections from INSECURE clients");
328      LOG.warn("(" + FALLBACK_TO_INSECURE_CLIENT_AUTH + " = true).");
329      LOG.warn("While this option is enabled, client identities cannot be secured, and user");
330      LOG.warn("impersonation is possible!");
331      LOG.warn("For secure operation, please disable SIMPLE authentication as soon as possible,");
332      LOG.warn("by setting " + FALLBACK_TO_INSECURE_CLIENT_AUTH + " = false in hbase-site.xml");
333      LOG.warn("****************************");
334    }
335  }
336
337  Configuration getConf() {
338    return conf;
339  }
340
341  @Override
342  public boolean isStarted() {
343    return this.started;
344  }
345
346  @Override
347  public synchronized void refreshAuthManager(Configuration conf, PolicyProvider pp) {
348    // Ignore warnings that this should be accessed in a static way instead of via an instance;
349    // it'll break if you go via static route.
350    System.setProperty("hadoop.policy.file", "hbase-policy.xml");
351    this.authManager.refresh(conf, pp);
352    LOG.info("Refreshed hbase-policy.xml successfully");
353    ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
354    LOG.info("Refreshed super and proxy users successfully");
355  }
356
357  protected AuthenticationTokenSecretManager createSecretManager() {
358    if (!isSecurityEnabled) return null;
359    if (server == null) return null;
360    Configuration conf = server.getConfiguration();
361    long keyUpdateInterval =
362        conf.getLong("hbase.auth.key.update.interval", 24*60*60*1000);
363    long maxAge =
364        conf.getLong("hbase.auth.token.max.lifetime", 7*24*60*60*1000);
365    return new AuthenticationTokenSecretManager(conf, server.getZooKeeper(),
366        server.getServerName().toString(), keyUpdateInterval, maxAge);
367  }
368
369  public SecretManager<? extends TokenIdentifier> getSecretManager() {
370    return this.secretManager;
371  }
372
373  @SuppressWarnings("unchecked")
374  public void setSecretManager(SecretManager<? extends TokenIdentifier> secretManager) {
375    this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
376  }
377
378  /**
379   * This is a server side method, which is invoked over RPC. On success
380   * the return response has protobuf response payload. On failure, the
381   * exception name and the stack trace are returned in the protobuf response.
382   */
383  @Override
384  public Pair<Message, CellScanner> call(RpcCall call,
385      MonitoredRPCHandler status) throws IOException {
386    try {
387      MethodDescriptor md = call.getMethod();
388      Message param = call.getParam();
389      status.setRPC(md.getName(), new Object[]{param},
390        call.getReceiveTime());
391      // TODO: Review after we add in encoded data blocks.
392      status.setRPCPacket(param);
393      status.resume("Servicing call");
394      //get an instance of the method arg type
395      HBaseRpcController controller = new HBaseRpcControllerImpl(call.getCellScanner());
396      controller.setCallTimeout(call.getTimeout());
397      Message result = call.getService().callBlockingMethod(md, controller, param);
398      long receiveTime = call.getReceiveTime();
399      long startTime = call.getStartTime();
400      long endTime = System.currentTimeMillis();
401      int processingTime = (int) (endTime - startTime);
402      int qTime = (int) (startTime - receiveTime);
403      int totalTime = (int) (endTime - receiveTime);
404      if (LOG.isTraceEnabled()) {
405        LOG.trace(CurCall.get().toString() +
406            ", response " + TextFormat.shortDebugString(result) +
407            " queueTime: " + qTime +
408            " processingTime: " + processingTime +
409            " totalTime: " + totalTime);
410      }
411      // Use the raw request call size for now.
412      long requestSize = call.getSize();
413      long responseSize = result.getSerializedSize();
414      if (call.isClientCellBlockSupported()) {
415        // Include the payload size in HBaseRpcController
416        responseSize += call.getResponseCellSize();
417      }
418
419      metrics.dequeuedCall(qTime);
420      metrics.processedCall(processingTime);
421      metrics.totalCall(totalTime);
422      metrics.receivedRequest(requestSize);
423      metrics.sentResponse(responseSize);
424      // log any RPC responses that are slower than the configured warn
425      // response time or larger than configured warning size
426      boolean tooSlow = (processingTime > warnResponseTime && warnResponseTime > -1);
427      boolean tooLarge = (responseSize > warnResponseSize && warnResponseSize > -1);
428      if (tooSlow || tooLarge) {
429        final String userName = call.getRequestUserName().orElse(StringUtils.EMPTY);
430        // when tagging, we let TooLarge trump TooSmall to keep output simple
431        // note that large responses will often also be slow.
432        logResponse(param,
433          md.getName(), md.getName() + "(" + param.getClass().getName() + ")",
434          tooLarge, tooSlow,
435          status.getClient(), startTime, processingTime, qTime,
436          responseSize, userName);
437        if (this.namedQueueRecorder != null && this.isOnlineLogProviderEnabled) {
438          // send logs to ring buffer owned by slowLogRecorder
439          final String className =
440            server == null ? StringUtils.EMPTY : server.getClass().getSimpleName();
441          this.namedQueueRecorder.addRecord(
442            new RpcLogDetails(call, param, status.getClient(), responseSize, className, tooSlow,
443              tooLarge));
444        }
445      }
446      return new Pair<>(result, controller.cellScanner());
447    } catch (Throwable e) {
448      // The above callBlockingMethod will always return a SE.  Strip the SE wrapper before
449      // putting it on the wire.  Its needed to adhere to the pb Service Interface but we don't
450      // need to pass it over the wire.
451      if (e instanceof ServiceException) {
452        if (e.getCause() == null) {
453          LOG.debug("Caught a ServiceException with null cause", e);
454        } else {
455          e = e.getCause();
456        }
457      }
458
459      // increment the number of requests that were exceptions.
460      metrics.exception(e);
461
462      if (e instanceof LinkageError) throw new DoNotRetryIOException(e);
463      if (e instanceof IOException) throw (IOException)e;
464      LOG.error("Unexpected throwable object ", e);
465      throw new IOException(e.getMessage(), e);
466    }
467  }
468
469  /**
470   * Logs an RPC response to the LOG file, producing valid JSON objects for
471   * client Operations.
472   * @param param The parameters received in the call.
473   * @param methodName The name of the method invoked
474   * @param call The string representation of the call
475   * @param tooLarge To indicate if the event is tooLarge
476   * @param tooSlow To indicate if the event is tooSlow
477   * @param clientAddress The address of the client who made this call.
478   * @param startTime The time that the call was initiated, in ms.
479   * @param processingTime The duration that the call took to run, in ms.
480   * @param qTime The duration that the call spent on the queue
481   *   prior to being initiated, in ms.
482   * @param responseSize The size in bytes of the response buffer.
483   * @param userName UserName of the current RPC Call
484   */
485  void logResponse(Message param, String methodName, String call, boolean tooLarge,
486      boolean tooSlow, String clientAddress, long startTime, int processingTime, int qTime,
487      long responseSize, String userName) {
488    final String className = server == null ? StringUtils.EMPTY :
489      server.getClass().getSimpleName();
490    // base information that is reported regardless of type of call
491    Map<String, Object> responseInfo = new HashMap<>();
492    responseInfo.put("starttimems", startTime);
493    responseInfo.put("processingtimems", processingTime);
494    responseInfo.put("queuetimems", qTime);
495    responseInfo.put("responsesize", responseSize);
496    responseInfo.put("client", clientAddress);
497    responseInfo.put("class", className);
498    responseInfo.put("method", methodName);
499    responseInfo.put("call", call);
500    // The params could be really big, make sure they don't kill us at WARN
501    String stringifiedParam = ProtobufUtil.getShortTextFormat(param);
502    if (stringifiedParam.length() > 150) {
503      // Truncate to 1000 chars if TRACE is on, else to 150 chars
504      stringifiedParam = truncateTraceLog(stringifiedParam);
505    }
506    responseInfo.put("param", stringifiedParam);
507    if (param instanceof ClientProtos.ScanRequest && rsRpcServices != null) {
508      ClientProtos.ScanRequest request = ((ClientProtos.ScanRequest) param);
509      String scanDetails;
510      if (request.hasScannerId()) {
511        long scannerId = request.getScannerId();
512        scanDetails = rsRpcServices.getScanDetailsWithId(scannerId);
513      } else {
514        scanDetails = rsRpcServices.getScanDetailsWithRequest(request);
515      }
516      if (scanDetails != null) {
517        responseInfo.put("scandetails", scanDetails);
518      }
519    }
520    if (param instanceof ClientProtos.MultiRequest) {
521      int numGets = 0;
522      int numMutations = 0;
523      int numServiceCalls = 0;
524      ClientProtos.MultiRequest multi = (ClientProtos.MultiRequest)param;
525      for (ClientProtos.RegionAction regionAction : multi.getRegionActionList()) {
526        for (ClientProtos.Action action: regionAction.getActionList()) {
527          if (action.hasMutation()) {
528            numMutations++;
529          }
530          if (action.hasGet()) {
531            numGets++;
532          }
533          if (action.hasServiceCall()) {
534            numServiceCalls++;
535          }
536        }
537      }
538      responseInfo.put(MULTI_GETS, numGets);
539      responseInfo.put(MULTI_MUTATIONS, numMutations);
540      responseInfo.put(MULTI_SERVICE_CALLS, numServiceCalls);
541    }
542    final String tag = (tooLarge && tooSlow) ? "TooLarge & TooSlow"
543      : (tooSlow ? "TooSlow" : "TooLarge");
544    LOG.warn("(response" + tag + "): " + GSON.toJson(responseInfo));
545  }
546
547
548  /**
549   * Truncate to number of chars decided by conf hbase.ipc.trace.log.max.length
550   * if TRACE is on else to 150 chars Refer to Jira HBASE-20826 and HBASE-20942
551   * @param strParam stringifiedParam to be truncated
552   * @return truncated trace log string
553   */
554  @VisibleForTesting
555  String truncateTraceLog(String strParam) {
556    if (LOG.isTraceEnabled()) {
557      int traceLogMaxLength = getConf().getInt(TRACE_LOG_MAX_LENGTH, DEFAULT_TRACE_LOG_MAX_LENGTH);
558      int truncatedLength =
559          strParam.length() < traceLogMaxLength ? strParam.length() : traceLogMaxLength;
560      String truncatedFlag = truncatedLength == strParam.length() ? "" : KEY_WORD_TRUNCATED;
561      return strParam.subSequence(0, truncatedLength) + truncatedFlag;
562    }
563    return strParam.subSequence(0, 150) + KEY_WORD_TRUNCATED;
564  }
565
566  /**
567   * Set the handler for calling out of RPC for error conditions.
568   * @param handler the handler implementation
569   */
570  @Override
571  public void setErrorHandler(HBaseRPCErrorHandler handler) {
572    this.errorHandler = handler;
573  }
574
575  @Override
576  public HBaseRPCErrorHandler getErrorHandler() {
577    return this.errorHandler;
578  }
579
580  /**
581   * Returns the metrics instance for reporting RPC call statistics
582   */
583  @Override
584  public MetricsHBaseServer getMetrics() {
585    return metrics;
586  }
587
588  @Override
589  public void addCallSize(final long diff) {
590    this.callQueueSizeInBytes.add(diff);
591  }
592
593  /**
594   * Authorize the incoming client connection.
595   * @param user client user
596   * @param connection incoming connection
597   * @param addr InetAddress of incoming connection
598   * @throws AuthorizationException when the client isn't authorized to talk the protocol
599   */
600  public synchronized void authorize(UserGroupInformation user, ConnectionHeader connection,
601      InetAddress addr) throws AuthorizationException {
602    if (authorize) {
603      Class<?> c = getServiceInterface(services, connection.getServiceName());
604      authManager.authorize(user, c, getConf(), addr);
605    }
606  }
607
608  /**
609   * When the read or write buffer size is larger than this limit, i/o will be
610   * done in chunks of this size. Most RPC requests and responses would be
611   * be smaller.
612   */
613  protected static final int NIO_BUFFER_LIMIT = 64 * 1024; //should not be more than 64KB.
614
615  /**
616   * This is a wrapper around {@link java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)}.
617   * If the amount of data is large, it writes to channel in smaller chunks.
618   * This is to avoid jdk from creating many direct buffers as the size of
619   * ByteBuffer increases. There should not be any performance degredation.
620   *
621   * @param channel writable byte channel to write on
622   * @param buffer buffer to write
623   * @return number of bytes written
624   * @throws java.io.IOException e
625   * @see java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)
626   */
627  protected int channelRead(ReadableByteChannel channel,
628                                   ByteBuffer buffer) throws IOException {
629
630    int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
631           channel.read(buffer) : channelIO(channel, null, buffer);
632    if (count > 0) {
633      metrics.receivedBytes(count);
634    }
635    return count;
636  }
637
638  /**
639   * Helper for {@link #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)}.
640   * Only one of readCh or writeCh should be non-null.
641   *
642   * @param readCh read channel
643   * @param writeCh write channel
644   * @param buf buffer to read or write into/out of
645   * @return bytes written
646   * @throws java.io.IOException e
647   * @see #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)
648   */
649  private static int channelIO(ReadableByteChannel readCh,
650                               WritableByteChannel writeCh,
651                               ByteBuffer buf) throws IOException {
652
653    int originalLimit = buf.limit();
654    int initialRemaining = buf.remaining();
655    int ret = 0;
656
657    while (buf.remaining() > 0) {
658      try {
659        int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
660        buf.limit(buf.position() + ioSize);
661
662        ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf);
663
664        if (ret < ioSize) {
665          break;
666        }
667
668      } finally {
669        buf.limit(originalLimit);
670      }
671    }
672
673    int nBytes = initialRemaining - buf.remaining();
674    return (nBytes > 0) ? nBytes : ret;
675  }
676
677  /**
678   * Needed for features such as delayed calls.  We need to be able to store the current call
679   * so that we can complete it later or ask questions of what is supported by the current ongoing
680   * call.
681   * @return An RpcCallContext backed by the currently ongoing call (gotten from a thread local)
682   */
683  public static Optional<RpcCall> getCurrentCall() {
684    return Optional.ofNullable(CurCall.get());
685  }
686
687  public static boolean isInRpcCallContext() {
688    return CurCall.get() != null;
689  }
690
691  /**
692   * Used by {@link org.apache.hadoop.hbase.procedure2.store.region.RegionProcedureStore}. For
693   * master's rpc call, it may generate new procedure and mutate the region which store procedure.
694   * There are some check about rpc when mutate region, such as rpc timeout check. So unset the rpc
695   * call to avoid the rpc check.
696   * @return the currently ongoing rpc call
697   */
698  public static Optional<RpcCall> unsetCurrentCall() {
699    Optional<RpcCall> rpcCall = getCurrentCall();
700    CurCall.set(null);
701    return rpcCall;
702  }
703
704  /**
705   * Used by {@link org.apache.hadoop.hbase.procedure2.store.region.RegionProcedureStore}. Set the
706   * rpc call back after mutate region.
707   */
708  public static void setCurrentCall(RpcCall rpcCall) {
709    CurCall.set(rpcCall);
710  }
711
712  /**
713   * Returns the user credentials associated with the current RPC request or not present if no
714   * credentials were provided.
715   * @return A User
716   */
717  public static Optional<User> getRequestUser() {
718    Optional<RpcCall> ctx = getCurrentCall();
719    return ctx.isPresent() ? ctx.get().getRequestUser() : Optional.empty();
720  }
721
722  /**
723   * The number of open RPC conections
724   * @return the number of open rpc connections
725   */
726  abstract public int getNumOpenConnections();
727
728  /**
729   * Returns the username for any user associated with the current RPC
730   * request or not present if no user is set.
731   */
732  public static Optional<String> getRequestUserName() {
733    return getRequestUser().map(User::getShortName);
734  }
735
736  /**
737   * @return Address of remote client if a request is ongoing, else null
738   */
739  public static Optional<InetAddress> getRemoteAddress() {
740    return getCurrentCall().map(RpcCall::getRemoteAddress);
741  }
742
743  /**
744   * @param serviceName Some arbitrary string that represents a 'service'.
745   * @param services Available service instances
746   * @return Matching BlockingServiceAndInterface pair
747   */
748  protected static BlockingServiceAndInterface getServiceAndInterface(
749      final List<BlockingServiceAndInterface> services, final String serviceName) {
750    for (BlockingServiceAndInterface bs : services) {
751      if (bs.getBlockingService().getDescriptorForType().getName().equals(serviceName)) {
752        return bs;
753      }
754    }
755    return null;
756  }
757
758  /**
759   * @param serviceName Some arbitrary string that represents a 'service'.
760   * @param services Available services and their service interfaces.
761   * @return Service interface class for <code>serviceName</code>
762   */
763  protected static Class<?> getServiceInterface(
764      final List<BlockingServiceAndInterface> services,
765      final String serviceName) {
766    BlockingServiceAndInterface bsasi =
767        getServiceAndInterface(services, serviceName);
768    return bsasi == null? null: bsasi.getServiceInterface();
769  }
770
771  /**
772   * @param serviceName Some arbitrary string that represents a 'service'.
773   * @param services Available services and their service interfaces.
774   * @return BlockingService that goes with the passed <code>serviceName</code>
775   */
776  protected static BlockingService getService(
777      final List<BlockingServiceAndInterface> services,
778      final String serviceName) {
779    BlockingServiceAndInterface bsasi =
780        getServiceAndInterface(services, serviceName);
781    return bsasi == null? null: bsasi.getBlockingService();
782  }
783
784  protected static MonitoredRPCHandler getStatus() {
785    // It is ugly the way we park status up in RpcServer.  Let it be for now.  TODO.
786    MonitoredRPCHandler status = RpcServer.MONITORED_RPC.get();
787    if (status != null) {
788      return status;
789    }
790    status = TaskMonitor.get().createRPCStatus(Thread.currentThread().getName());
791    status.pause("Waiting for a call");
792    RpcServer.MONITORED_RPC.set(status);
793    return status;
794  }
795
796  /** Returns the remote side ip address when invoked inside an RPC
797   *  Returns null incase of an error.
798   *  @return InetAddress
799   */
800  public static InetAddress getRemoteIp() {
801    RpcCall call = CurCall.get();
802    if (call != null) {
803      return call.getRemoteAddress();
804    }
805    return null;
806  }
807
808  @Override
809  public RpcScheduler getScheduler() {
810    return scheduler;
811  }
812
813  @Override
814  public ByteBuffAllocator getByteBuffAllocator() {
815    return this.bbAllocator;
816  }
817
818  @Override
819  public void setRsRpcServices(RSRpcServices rsRpcServices) {
820    this.rsRpcServices = rsRpcServices;
821  }
822
823  @Override
824  public void setNamedQueueRecorder(NamedQueueRecorder namedQueueRecorder) {
825    this.namedQueueRecorder = namedQueueRecorder;
826  }
827
828}