001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.ipc;
020
021import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION;
022
023import java.io.IOException;
024import java.net.InetAddress;
025import java.net.InetSocketAddress;
026import java.nio.ByteBuffer;
027import java.nio.channels.ReadableByteChannel;
028import java.nio.channels.WritableByteChannel;
029import java.util.Collections;
030import java.util.HashMap;
031import java.util.List;
032import java.util.Locale;
033import java.util.Map;
034import java.util.Optional;
035import java.util.concurrent.atomic.LongAdder;
036import org.apache.commons.lang3.StringUtils;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.hbase.CallQueueTooBigException;
039import org.apache.hadoop.hbase.CellScanner;
040import org.apache.hadoop.hbase.DoNotRetryIOException;
041import org.apache.hadoop.hbase.HConstants;
042import org.apache.hadoop.hbase.Server;
043import org.apache.hadoop.hbase.conf.ConfigurationObserver;
044import org.apache.hadoop.hbase.exceptions.RequestTooBigException;
045import org.apache.hadoop.hbase.io.ByteBuffAllocator;
046import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
047import org.apache.hadoop.hbase.monitoring.TaskMonitor;
048import org.apache.hadoop.hbase.regionserver.RSRpcServices;
049import org.apache.hadoop.hbase.namequeues.RpcLogDetails;
050import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
051import org.apache.hadoop.hbase.security.HBasePolicyProvider;
052import org.apache.hadoop.hbase.security.SaslUtil;
053import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
054import org.apache.hadoop.hbase.security.User;
055import org.apache.hadoop.hbase.security.UserProvider;
056import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
057import org.apache.hadoop.hbase.util.GsonUtil;
058import org.apache.hadoop.hbase.util.Pair;
059import org.apache.hadoop.security.UserGroupInformation;
060import org.apache.hadoop.security.authorize.AuthorizationException;
061import org.apache.hadoop.security.authorize.PolicyProvider;
062import org.apache.hadoop.security.authorize.ProxyUsers;
063import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
064import org.apache.hadoop.security.token.SecretManager;
065import org.apache.hadoop.security.token.TokenIdentifier;
066import org.apache.yetus.audience.InterfaceAudience;
067import org.slf4j.Logger;
068import org.slf4j.LoggerFactory;
069
070import org.apache.hbase.thirdparty.com.google.gson.Gson;
071import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
072import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
073import org.apache.hbase.thirdparty.com.google.protobuf.Message;
074import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
075import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
076
077import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
078import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
079import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
080
081/**
082 * An RPC server that hosts protobuf described Services.
083 *
084 */
085@InterfaceAudience.Private
086public abstract class RpcServer implements RpcServerInterface,
087    ConfigurationObserver {
088  // LOG is being used in CallRunner and the log level is being changed in tests
089  public static final Logger LOG = LoggerFactory.getLogger(RpcServer.class);
090  protected static final CallQueueTooBigException CALL_QUEUE_TOO_BIG_EXCEPTION
091      = new CallQueueTooBigException();
092
093  private static final String MULTI_GETS = "multi.gets";
094  private static final String MULTI_MUTATIONS = "multi.mutations";
095  private static final String MULTI_SERVICE_CALLS = "multi.service_calls";
096
097  private final boolean authorize;
098  private final boolean isOnlineLogProviderEnabled;
099  protected boolean isSecurityEnabled;
100
101  public static final byte CURRENT_VERSION = 0;
102
103  /**
104   * Whether we allow a fallback to SIMPLE auth for insecure clients when security is enabled.
105   */
106  public static final String FALLBACK_TO_INSECURE_CLIENT_AUTH =
107          "hbase.ipc.server.fallback-to-simple-auth-allowed";
108
109  /**
110   * How many calls/handler are allowed in the queue.
111   */
112  protected static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10;
113
114  protected final CellBlockBuilder cellBlockBuilder;
115
116  protected static final String AUTH_FAILED_FOR = "Auth failed for ";
117  protected static final String AUTH_SUCCESSFUL_FOR = "Auth successful for ";
118  protected static final Logger AUDITLOG = LoggerFactory.getLogger("SecurityLogger."
119      + Server.class.getName());
120  protected SecretManager<TokenIdentifier> secretManager;
121  protected final Map<String, String> saslProps;
122
123  protected ServiceAuthorizationManager authManager;
124
125  /** This is set to Call object before Handler invokes an RPC and ybdie
126   * after the call returns.
127   */
128  protected static final ThreadLocal<RpcCall> CurCall = new ThreadLocal<>();
129
130  /** Keeps MonitoredRPCHandler per handler thread. */
131  protected static final ThreadLocal<MonitoredRPCHandler> MONITORED_RPC = new ThreadLocal<>();
132
133  protected final InetSocketAddress bindAddress;
134
135  protected MetricsHBaseServer metrics;
136
137  protected final Configuration conf;
138
139  /**
140   * Maximum size in bytes of the currently queued and running Calls. If a new Call puts us over
141   * this size, then we will reject the call (after parsing it though). It will go back to the
142   * client and client will retry. Set this size with "hbase.ipc.server.max.callqueue.size". The
143   * call queue size gets incremented after we parse a call and before we add it to the queue of
144   * calls for the scheduler to use. It get decremented after we have 'run' the Call. The current
145   * size is kept in {@link #callQueueSizeInBytes}.
146   * @see #callQueueSizeInBytes
147   * @see #DEFAULT_MAX_CALLQUEUE_SIZE
148   */
149  protected final long maxQueueSizeInBytes;
150  protected static final int DEFAULT_MAX_CALLQUEUE_SIZE = 1024 * 1024 * 1024;
151
152  /**
153   * This is a running count of the size in bytes of all outstanding calls whether currently
154   * executing or queued waiting to be run.
155   */
156  protected final LongAdder callQueueSizeInBytes = new LongAdder();
157
158  protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
159  protected final boolean tcpKeepAlive; // if T then use keepalives
160
161  /**
162   * This flag is used to indicate to sub threads when they should go down.  When we call
163   * {@link #start()}, all threads started will consult this flag on whether they should
164   * keep going.  It is set to false when {@link #stop()} is called.
165   */
166  volatile boolean running = true;
167
168  /**
169   * This flag is set to true after all threads are up and 'running' and the server is then opened
170   * for business by the call to {@link #start()}.
171   */
172  volatile boolean started = false;
173
174  protected AuthenticationTokenSecretManager authTokenSecretMgr = null;
175
176  protected HBaseRPCErrorHandler errorHandler = null;
177
178  public static final String MAX_REQUEST_SIZE = "hbase.ipc.max.request.size";
179  protected static final RequestTooBigException REQUEST_TOO_BIG_EXCEPTION =
180      new RequestTooBigException();
181
182  protected static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time";
183  protected static final String WARN_RESPONSE_SIZE = "hbase.ipc.warn.response.size";
184
185  /**
186   * Minimum allowable timeout (in milliseconds) in rpc request's header. This
187   * configuration exists to prevent the rpc service regarding this request as timeout immediately.
188   */
189  protected static final String MIN_CLIENT_REQUEST_TIMEOUT = "hbase.ipc.min.client.request.timeout";
190  protected static final int DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT = 20;
191
192  /** Default value for above params */
193  public static final int DEFAULT_MAX_REQUEST_SIZE = DEFAULT_MAX_CALLQUEUE_SIZE / 4; // 256M
194  protected static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds
195  protected static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
196
197  protected static final int DEFAULT_TRACE_LOG_MAX_LENGTH = 1000;
198  protected static final String TRACE_LOG_MAX_LENGTH = "hbase.ipc.trace.log.max.length";
199  protected static final String KEY_WORD_TRUNCATED = " <TRUNCATED>";
200
201  protected static final Gson GSON = GsonUtil.createGsonWithDisableHtmlEscaping().create();
202
203  protected final int maxRequestSize;
204  protected final int warnResponseTime;
205  protected final int warnResponseSize;
206
207  protected final int minClientRequestTimeout;
208
209  protected final Server server;
210  protected final List<BlockingServiceAndInterface> services;
211
212  protected final RpcScheduler scheduler;
213
214  protected UserProvider userProvider;
215
216  protected final ByteBuffAllocator bbAllocator;
217
218  protected volatile boolean allowFallbackToSimpleAuth;
219
220  /**
221   * Used to get details for scan with a scanner_id<br/>
222   * TODO try to figure out a better way and remove reference from regionserver package later.
223   */
224  private RSRpcServices rsRpcServices;
225
226
227  /**
228   * Use to add online slowlog responses
229   */
230  private NamedQueueRecorder namedQueueRecorder;
231
232  @FunctionalInterface
233  protected interface CallCleanup {
234    void run();
235  }
236
237  /**
238   * Datastructure for passing a {@link BlockingService} and its associated class of
239   * protobuf service interface.  For example, a server that fielded what is defined
240   * in the client protobuf service would pass in an implementation of the client blocking service
241   * and then its ClientService.BlockingInterface.class.  Used checking connection setup.
242   */
243  public static class BlockingServiceAndInterface {
244    private final BlockingService service;
245    private final Class<?> serviceInterface;
246    public BlockingServiceAndInterface(final BlockingService service,
247        final Class<?> serviceInterface) {
248      this.service = service;
249      this.serviceInterface = serviceInterface;
250    }
251    public Class<?> getServiceInterface() {
252      return this.serviceInterface;
253    }
254    public BlockingService getBlockingService() {
255      return this.service;
256    }
257  }
258
259  /**
260   * Constructs a server listening on the named port and address.
261   * @param server hosting instance of {@link Server}. We will do authentications if an
262   * instance else pass null for no authentication check.
263   * @param name Used keying this rpc servers' metrics and for naming the Listener thread.
264   * @param services A list of services.
265   * @param bindAddress Where to listen
266   * @param conf
267   * @param scheduler
268   * @param reservoirEnabled Enable ByteBufferPool or not.
269   */
270  public RpcServer(final Server server, final String name,
271      final List<BlockingServiceAndInterface> services,
272      final InetSocketAddress bindAddress, Configuration conf,
273      RpcScheduler scheduler, boolean reservoirEnabled) throws IOException {
274    this.bbAllocator = ByteBuffAllocator.create(conf, reservoirEnabled);
275    this.server = server;
276    this.services = services;
277    this.bindAddress = bindAddress;
278    this.conf = conf;
279    // See declaration above for documentation on what this size is.
280    this.maxQueueSizeInBytes =
281      this.conf.getLong("hbase.ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE);
282
283    this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME);
284    this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE, DEFAULT_WARN_RESPONSE_SIZE);
285    this.minClientRequestTimeout = conf.getInt(MIN_CLIENT_REQUEST_TIMEOUT,
286        DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT);
287    this.maxRequestSize = conf.getInt(MAX_REQUEST_SIZE, DEFAULT_MAX_REQUEST_SIZE);
288
289    this.metrics = new MetricsHBaseServer(name, new MetricsHBaseServerWrapperImpl(this));
290    this.tcpNoDelay = conf.getBoolean("hbase.ipc.server.tcpnodelay", true);
291    this.tcpKeepAlive = conf.getBoolean("hbase.ipc.server.tcpkeepalive", true);
292
293    this.cellBlockBuilder = new CellBlockBuilder(conf);
294
295    this.authorize = conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false);
296    this.userProvider = UserProvider.instantiate(conf);
297    this.isSecurityEnabled = userProvider.isHBaseSecurityEnabled();
298    if (isSecurityEnabled) {
299      saslProps = SaslUtil.initSaslProperties(conf.get("hbase.rpc.protection",
300        QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)));
301    } else {
302      saslProps = Collections.emptyMap();
303    }
304
305    this.isOnlineLogProviderEnabled = conf.getBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY,
306      HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED);
307    this.scheduler = scheduler;
308  }
309
310  @Override
311  public void onConfigurationChange(Configuration newConf) {
312    initReconfigurable(newConf);
313    if (scheduler instanceof ConfigurationObserver) {
314      ((ConfigurationObserver) scheduler).onConfigurationChange(newConf);
315    }
316    if (authorize) {
317      refreshAuthManager(newConf, new HBasePolicyProvider());
318    }
319  }
320
321  protected void initReconfigurable(Configuration confToLoad) {
322    this.allowFallbackToSimpleAuth = confToLoad.getBoolean(FALLBACK_TO_INSECURE_CLIENT_AUTH, false);
323    if (isSecurityEnabled && allowFallbackToSimpleAuth) {
324      LOG.warn("********* WARNING! *********");
325      LOG.warn("This server is configured to allow connections from INSECURE clients");
326      LOG.warn("(" + FALLBACK_TO_INSECURE_CLIENT_AUTH + " = true).");
327      LOG.warn("While this option is enabled, client identities cannot be secured, and user");
328      LOG.warn("impersonation is possible!");
329      LOG.warn("For secure operation, please disable SIMPLE authentication as soon as possible,");
330      LOG.warn("by setting " + FALLBACK_TO_INSECURE_CLIENT_AUTH + " = false in hbase-site.xml");
331      LOG.warn("****************************");
332    }
333  }
334
335  Configuration getConf() {
336    return conf;
337  }
338
339  @Override
340  public boolean isStarted() {
341    return this.started;
342  }
343
344  @Override
345  public synchronized void refreshAuthManager(Configuration conf, PolicyProvider pp) {
346    // Ignore warnings that this should be accessed in a static way instead of via an instance;
347    // it'll break if you go via static route.
348    System.setProperty("hadoop.policy.file", "hbase-policy.xml");
349    this.authManager.refresh(conf, pp);
350    LOG.info("Refreshed hbase-policy.xml successfully");
351    ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
352    LOG.info("Refreshed super and proxy users successfully");
353  }
354
355  protected AuthenticationTokenSecretManager createSecretManager() {
356    if (!isSecurityEnabled) return null;
357    if (server == null) return null;
358    Configuration conf = server.getConfiguration();
359    long keyUpdateInterval =
360        conf.getLong("hbase.auth.key.update.interval", 24*60*60*1000);
361    long maxAge =
362        conf.getLong("hbase.auth.token.max.lifetime", 7*24*60*60*1000);
363    return new AuthenticationTokenSecretManager(conf, server.getZooKeeper(),
364        server.getServerName().toString(), keyUpdateInterval, maxAge);
365  }
366
367  public SecretManager<? extends TokenIdentifier> getSecretManager() {
368    return this.secretManager;
369  }
370
371  @SuppressWarnings("unchecked")
372  public void setSecretManager(SecretManager<? extends TokenIdentifier> secretManager) {
373    this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
374  }
375
376  /**
377   * This is a server side method, which is invoked over RPC. On success
378   * the return response has protobuf response payload. On failure, the
379   * exception name and the stack trace are returned in the protobuf response.
380   */
381  @Override
382  public Pair<Message, CellScanner> call(RpcCall call,
383      MonitoredRPCHandler status) throws IOException {
384    try {
385      MethodDescriptor md = call.getMethod();
386      Message param = call.getParam();
387      status.setRPC(md.getName(), new Object[]{param},
388        call.getReceiveTime());
389      // TODO: Review after we add in encoded data blocks.
390      status.setRPCPacket(param);
391      status.resume("Servicing call");
392      //get an instance of the method arg type
393      HBaseRpcController controller = new HBaseRpcControllerImpl(call.getCellScanner());
394      controller.setCallTimeout(call.getTimeout());
395      Message result = call.getService().callBlockingMethod(md, controller, param);
396      long receiveTime = call.getReceiveTime();
397      long startTime = call.getStartTime();
398      long endTime = System.currentTimeMillis();
399      int processingTime = (int) (endTime - startTime);
400      int qTime = (int) (startTime - receiveTime);
401      int totalTime = (int) (endTime - receiveTime);
402      if (LOG.isTraceEnabled()) {
403        LOG.trace(CurCall.get().toString() +
404            ", response " + TextFormat.shortDebugString(result) +
405            " queueTime: " + qTime +
406            " processingTime: " + processingTime +
407            " totalTime: " + totalTime);
408      }
409      // Use the raw request call size for now.
410      long requestSize = call.getSize();
411      long responseSize = result.getSerializedSize();
412      if (call.isClientCellBlockSupported()) {
413        // Include the payload size in HBaseRpcController
414        responseSize += call.getResponseCellSize();
415      }
416
417      metrics.dequeuedCall(qTime);
418      metrics.processedCall(processingTime);
419      metrics.totalCall(totalTime);
420      metrics.receivedRequest(requestSize);
421      metrics.sentResponse(responseSize);
422      // log any RPC responses that are slower than the configured warn
423      // response time or larger than configured warning size
424      boolean tooSlow = (processingTime > warnResponseTime && warnResponseTime > -1);
425      boolean tooLarge = (responseSize > warnResponseSize && warnResponseSize > -1);
426      if (tooSlow || tooLarge) {
427        final String userName = call.getRequestUserName().orElse(StringUtils.EMPTY);
428        // when tagging, we let TooLarge trump TooSmall to keep output simple
429        // note that large responses will often also be slow.
430        logResponse(param,
431          md.getName(), md.getName() + "(" + param.getClass().getName() + ")",
432          tooLarge, tooSlow,
433          status.getClient(), startTime, processingTime, qTime,
434          responseSize, userName);
435        if (this.namedQueueRecorder != null && this.isOnlineLogProviderEnabled) {
436          // send logs to ring buffer owned by slowLogRecorder
437          final String className =
438            server == null ? StringUtils.EMPTY : server.getClass().getSimpleName();
439          this.namedQueueRecorder.addRecord(
440            new RpcLogDetails(call, param, status.getClient(), responseSize, className, tooSlow,
441              tooLarge));
442        }
443      }
444      return new Pair<>(result, controller.cellScanner());
445    } catch (Throwable e) {
446      // The above callBlockingMethod will always return a SE.  Strip the SE wrapper before
447      // putting it on the wire.  Its needed to adhere to the pb Service Interface but we don't
448      // need to pass it over the wire.
449      if (e instanceof ServiceException) {
450        if (e.getCause() == null) {
451          LOG.debug("Caught a ServiceException with null cause", e);
452        } else {
453          e = e.getCause();
454        }
455      }
456
457      // increment the number of requests that were exceptions.
458      metrics.exception(e);
459
460      if (e instanceof LinkageError) throw new DoNotRetryIOException(e);
461      if (e instanceof IOException) throw (IOException)e;
462      LOG.error("Unexpected throwable object ", e);
463      throw new IOException(e.getMessage(), e);
464    }
465  }
466
467  /**
468   * Logs an RPC response to the LOG file, producing valid JSON objects for
469   * client Operations.
470   * @param param The parameters received in the call.
471   * @param methodName The name of the method invoked
472   * @param call The string representation of the call
473   * @param tooLarge To indicate if the event is tooLarge
474   * @param tooSlow To indicate if the event is tooSlow
475   * @param clientAddress The address of the client who made this call.
476   * @param startTime The time that the call was initiated, in ms.
477   * @param processingTime The duration that the call took to run, in ms.
478   * @param qTime The duration that the call spent on the queue
479   *   prior to being initiated, in ms.
480   * @param responseSize The size in bytes of the response buffer.
481   * @param userName UserName of the current RPC Call
482   */
483  void logResponse(Message param, String methodName, String call, boolean tooLarge,
484      boolean tooSlow, String clientAddress, long startTime, int processingTime, int qTime,
485      long responseSize, String userName) {
486    final String className = server == null ? StringUtils.EMPTY :
487      server.getClass().getSimpleName();
488    // base information that is reported regardless of type of call
489    Map<String, Object> responseInfo = new HashMap<>();
490    responseInfo.put("starttimems", startTime);
491    responseInfo.put("processingtimems", processingTime);
492    responseInfo.put("queuetimems", qTime);
493    responseInfo.put("responsesize", responseSize);
494    responseInfo.put("client", clientAddress);
495    responseInfo.put("class", className);
496    responseInfo.put("method", methodName);
497    responseInfo.put("call", call);
498    // The params could be really big, make sure they don't kill us at WARN
499    String stringifiedParam = ProtobufUtil.getShortTextFormat(param);
500    if (stringifiedParam.length() > 150) {
501      // Truncate to 1000 chars if TRACE is on, else to 150 chars
502      stringifiedParam = truncateTraceLog(stringifiedParam);
503    }
504    responseInfo.put("param", stringifiedParam);
505    if (param instanceof ClientProtos.ScanRequest && rsRpcServices != null) {
506      ClientProtos.ScanRequest request = ((ClientProtos.ScanRequest) param);
507      String scanDetails;
508      if (request.hasScannerId()) {
509        long scannerId = request.getScannerId();
510        scanDetails = rsRpcServices.getScanDetailsWithId(scannerId);
511      } else {
512        scanDetails = rsRpcServices.getScanDetailsWithRequest(request);
513      }
514      if (scanDetails != null) {
515        responseInfo.put("scandetails", scanDetails);
516      }
517    }
518    if (param instanceof ClientProtos.MultiRequest) {
519      int numGets = 0;
520      int numMutations = 0;
521      int numServiceCalls = 0;
522      ClientProtos.MultiRequest multi = (ClientProtos.MultiRequest)param;
523      for (ClientProtos.RegionAction regionAction : multi.getRegionActionList()) {
524        for (ClientProtos.Action action: regionAction.getActionList()) {
525          if (action.hasMutation()) {
526            numMutations++;
527          }
528          if (action.hasGet()) {
529            numGets++;
530          }
531          if (action.hasServiceCall()) {
532            numServiceCalls++;
533          }
534        }
535      }
536      responseInfo.put(MULTI_GETS, numGets);
537      responseInfo.put(MULTI_MUTATIONS, numMutations);
538      responseInfo.put(MULTI_SERVICE_CALLS, numServiceCalls);
539    }
540    final String tag = (tooLarge && tooSlow) ? "TooLarge & TooSlow"
541      : (tooSlow ? "TooSlow" : "TooLarge");
542    LOG.warn("(response" + tag + "): " + GSON.toJson(responseInfo));
543  }
544
545
546  /**
547   * Truncate to number of chars decided by conf hbase.ipc.trace.log.max.length
548   * if TRACE is on else to 150 chars Refer to Jira HBASE-20826 and HBASE-20942
549   * @param strParam stringifiedParam to be truncated
550   * @return truncated trace log string
551   */
552  String truncateTraceLog(String strParam) {
553    if (LOG.isTraceEnabled()) {
554      int traceLogMaxLength = getConf().getInt(TRACE_LOG_MAX_LENGTH, DEFAULT_TRACE_LOG_MAX_LENGTH);
555      int truncatedLength =
556          strParam.length() < traceLogMaxLength ? strParam.length() : traceLogMaxLength;
557      String truncatedFlag = truncatedLength == strParam.length() ? "" : KEY_WORD_TRUNCATED;
558      return strParam.subSequence(0, truncatedLength) + truncatedFlag;
559    }
560    return strParam.subSequence(0, 150) + KEY_WORD_TRUNCATED;
561  }
562
563  /**
564   * Set the handler for calling out of RPC for error conditions.
565   * @param handler the handler implementation
566   */
567  @Override
568  public void setErrorHandler(HBaseRPCErrorHandler handler) {
569    this.errorHandler = handler;
570  }
571
572  @Override
573  public HBaseRPCErrorHandler getErrorHandler() {
574    return this.errorHandler;
575  }
576
577  /**
578   * Returns the metrics instance for reporting RPC call statistics
579   */
580  @Override
581  public MetricsHBaseServer getMetrics() {
582    return metrics;
583  }
584
585  @Override
586  public void addCallSize(final long diff) {
587    this.callQueueSizeInBytes.add(diff);
588  }
589
590  /**
591   * Authorize the incoming client connection.
592   * @param user client user
593   * @param connection incoming connection
594   * @param addr InetAddress of incoming connection
595   * @throws AuthorizationException when the client isn't authorized to talk the protocol
596   */
597  public synchronized void authorize(UserGroupInformation user, ConnectionHeader connection,
598      InetAddress addr) throws AuthorizationException {
599    if (authorize) {
600      Class<?> c = getServiceInterface(services, connection.getServiceName());
601      authManager.authorize(user, c, getConf(), addr);
602    }
603  }
604
605  /**
606   * When the read or write buffer size is larger than this limit, i/o will be
607   * done in chunks of this size. Most RPC requests and responses would be
608   * be smaller.
609   */
610  protected static final int NIO_BUFFER_LIMIT = 64 * 1024; //should not be more than 64KB.
611
612  /**
613   * This is a wrapper around {@link java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)}.
614   * If the amount of data is large, it writes to channel in smaller chunks.
615   * This is to avoid jdk from creating many direct buffers as the size of
616   * ByteBuffer increases. There should not be any performance degredation.
617   *
618   * @param channel writable byte channel to write on
619   * @param buffer buffer to write
620   * @return number of bytes written
621   * @throws java.io.IOException e
622   * @see java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)
623   */
624  protected int channelRead(ReadableByteChannel channel,
625                                   ByteBuffer buffer) throws IOException {
626
627    int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
628           channel.read(buffer) : channelIO(channel, null, buffer);
629    if (count > 0) {
630      metrics.receivedBytes(count);
631    }
632    return count;
633  }
634
635  /**
636   * Helper for {@link #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)}.
637   * Only one of readCh or writeCh should be non-null.
638   *
639   * @param readCh read channel
640   * @param writeCh write channel
641   * @param buf buffer to read or write into/out of
642   * @return bytes written
643   * @throws java.io.IOException e
644   * @see #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)
645   */
646  private static int channelIO(ReadableByteChannel readCh,
647                               WritableByteChannel writeCh,
648                               ByteBuffer buf) throws IOException {
649
650    int originalLimit = buf.limit();
651    int initialRemaining = buf.remaining();
652    int ret = 0;
653
654    while (buf.remaining() > 0) {
655      try {
656        int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
657        buf.limit(buf.position() + ioSize);
658
659        ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf);
660
661        if (ret < ioSize) {
662          break;
663        }
664
665      } finally {
666        buf.limit(originalLimit);
667      }
668    }
669
670    int nBytes = initialRemaining - buf.remaining();
671    return (nBytes > 0) ? nBytes : ret;
672  }
673
674  /**
675   * Needed for features such as delayed calls.  We need to be able to store the current call
676   * so that we can complete it later or ask questions of what is supported by the current ongoing
677   * call.
678   * @return An RpcCallContext backed by the currently ongoing call (gotten from a thread local)
679   */
680  public static Optional<RpcCall> getCurrentCall() {
681    return Optional.ofNullable(CurCall.get());
682  }
683
684  public static boolean isInRpcCallContext() {
685    return CurCall.get() != null;
686  }
687
688  /**
689   * Used by {@link org.apache.hadoop.hbase.procedure2.store.region.RegionProcedureStore}. For
690   * master's rpc call, it may generate new procedure and mutate the region which store procedure.
691   * There are some check about rpc when mutate region, such as rpc timeout check. So unset the rpc
692   * call to avoid the rpc check.
693   * @return the currently ongoing rpc call
694   */
695  public static Optional<RpcCall> unsetCurrentCall() {
696    Optional<RpcCall> rpcCall = getCurrentCall();
697    CurCall.set(null);
698    return rpcCall;
699  }
700
701  /**
702   * Used by {@link org.apache.hadoop.hbase.procedure2.store.region.RegionProcedureStore}. Set the
703   * rpc call back after mutate region.
704   */
705  public static void setCurrentCall(RpcCall rpcCall) {
706    CurCall.set(rpcCall);
707  }
708
709  /**
710   * Returns the user credentials associated with the current RPC request or not present if no
711   * credentials were provided.
712   * @return A User
713   */
714  public static Optional<User> getRequestUser() {
715    Optional<RpcCall> ctx = getCurrentCall();
716    return ctx.isPresent() ? ctx.get().getRequestUser() : Optional.empty();
717  }
718
719  /**
720   * The number of open RPC conections
721   * @return the number of open rpc connections
722   */
723  abstract public int getNumOpenConnections();
724
725  /**
726   * Returns the username for any user associated with the current RPC
727   * request or not present if no user is set.
728   */
729  public static Optional<String> getRequestUserName() {
730    return getRequestUser().map(User::getShortName);
731  }
732
733  /**
734   * @return Address of remote client if a request is ongoing, else null
735   */
736  public static Optional<InetAddress> getRemoteAddress() {
737    return getCurrentCall().map(RpcCall::getRemoteAddress);
738  }
739
740  /**
741   * @param serviceName Some arbitrary string that represents a 'service'.
742   * @param services Available service instances
743   * @return Matching BlockingServiceAndInterface pair
744   */
745  protected static BlockingServiceAndInterface getServiceAndInterface(
746      final List<BlockingServiceAndInterface> services, final String serviceName) {
747    for (BlockingServiceAndInterface bs : services) {
748      if (bs.getBlockingService().getDescriptorForType().getName().equals(serviceName)) {
749        return bs;
750      }
751    }
752    return null;
753  }
754
755  /**
756   * @param serviceName Some arbitrary string that represents a 'service'.
757   * @param services Available services and their service interfaces.
758   * @return Service interface class for <code>serviceName</code>
759   */
760  protected static Class<?> getServiceInterface(
761      final List<BlockingServiceAndInterface> services,
762      final String serviceName) {
763    BlockingServiceAndInterface bsasi =
764        getServiceAndInterface(services, serviceName);
765    return bsasi == null? null: bsasi.getServiceInterface();
766  }
767
768  /**
769   * @param serviceName Some arbitrary string that represents a 'service'.
770   * @param services Available services and their service interfaces.
771   * @return BlockingService that goes with the passed <code>serviceName</code>
772   */
773  protected static BlockingService getService(
774      final List<BlockingServiceAndInterface> services,
775      final String serviceName) {
776    BlockingServiceAndInterface bsasi =
777        getServiceAndInterface(services, serviceName);
778    return bsasi == null? null: bsasi.getBlockingService();
779  }
780
781  protected static MonitoredRPCHandler getStatus() {
782    // It is ugly the way we park status up in RpcServer.  Let it be for now.  TODO.
783    MonitoredRPCHandler status = RpcServer.MONITORED_RPC.get();
784    if (status != null) {
785      return status;
786    }
787    status = TaskMonitor.get().createRPCStatus(Thread.currentThread().getName());
788    status.pause("Waiting for a call");
789    RpcServer.MONITORED_RPC.set(status);
790    return status;
791  }
792
793  /** Returns the remote side ip address when invoked inside an RPC
794   *  Returns null incase of an error.
795   *  @return InetAddress
796   */
797  public static InetAddress getRemoteIp() {
798    RpcCall call = CurCall.get();
799    if (call != null) {
800      return call.getRemoteAddress();
801    }
802    return null;
803  }
804
805  @Override
806  public RpcScheduler getScheduler() {
807    return scheduler;
808  }
809
810  @Override
811  public ByteBuffAllocator getByteBuffAllocator() {
812    return this.bbAllocator;
813  }
814
815  @Override
816  public void setRsRpcServices(RSRpcServices rsRpcServices) {
817    this.rsRpcServices = rsRpcServices;
818  }
819
820  @Override
821  public void setNamedQueueRecorder(NamedQueueRecorder namedQueueRecorder) {
822    this.namedQueueRecorder = namedQueueRecorder;
823  }
824
825}