001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.ipc;
020
021import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION;
022
023import java.io.IOException;
024import java.net.InetAddress;
025import java.net.InetSocketAddress;
026import java.nio.ByteBuffer;
027import java.nio.channels.ReadableByteChannel;
028import java.nio.channels.WritableByteChannel;
029import java.util.ArrayList;
030import java.util.Collections;
031import java.util.HashMap;
032import java.util.List;
033import java.util.Locale;
034import java.util.Map;
035import java.util.Optional;
036import java.util.concurrent.atomic.LongAdder;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.hbase.CallQueueTooBigException;
039import org.apache.hadoop.hbase.CellScanner;
040import org.apache.hadoop.hbase.DoNotRetryIOException;
041import org.apache.hadoop.hbase.HConstants;
042import org.apache.hadoop.hbase.Server;
043import org.apache.hadoop.hbase.conf.ConfigurationObserver;
044import org.apache.hadoop.hbase.exceptions.RequestTooBigException;
045import org.apache.hadoop.hbase.io.ByteBufferPool;
046import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
047import org.apache.hadoop.hbase.monitoring.TaskMonitor;
048import org.apache.hadoop.hbase.nio.ByteBuff;
049import org.apache.hadoop.hbase.nio.MultiByteBuff;
050import org.apache.hadoop.hbase.nio.SingleByteBuff;
051import org.apache.hadoop.hbase.regionserver.RSRpcServices;
052import org.apache.hadoop.hbase.security.SaslUtil;
053import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
054import org.apache.hadoop.hbase.security.User;
055import org.apache.hadoop.hbase.security.UserProvider;
056import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
057import org.apache.hadoop.hbase.util.GsonUtil;
058import org.apache.hadoop.hbase.util.Pair;
059import org.apache.hadoop.security.UserGroupInformation;
060import org.apache.hadoop.security.authorize.AuthorizationException;
061import org.apache.hadoop.security.authorize.PolicyProvider;
062import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
063import org.apache.hadoop.security.token.SecretManager;
064import org.apache.hadoop.security.token.TokenIdentifier;
065import org.apache.yetus.audience.InterfaceAudience;
066import org.slf4j.Logger;
067import org.slf4j.LoggerFactory;
068
069import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
070import org.apache.hbase.thirdparty.com.google.gson.Gson;
071import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
072import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
073import org.apache.hbase.thirdparty.com.google.protobuf.Message;
074import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
075import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
076
077import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
078import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
079import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
080
081/**
082 * An RPC server that hosts protobuf described Services.
083 *
084 */
085@InterfaceAudience.Private
086public abstract class RpcServer implements RpcServerInterface,
087    ConfigurationObserver {
088  // LOG is being used in CallRunner and the log level is being changed in tests
089  public static final Logger LOG = LoggerFactory.getLogger(RpcServer.class);
090  protected static final CallQueueTooBigException CALL_QUEUE_TOO_BIG_EXCEPTION
091      = new CallQueueTooBigException();
092
093  private final boolean authorize;
094  protected boolean isSecurityEnabled;
095
096  public static final byte CURRENT_VERSION = 0;
097
098  /**
099   * Whether we allow a fallback to SIMPLE auth for insecure clients when security is enabled.
100   */
101  public static final String FALLBACK_TO_INSECURE_CLIENT_AUTH =
102          "hbase.ipc.server.fallback-to-simple-auth-allowed";
103
104  /**
105   * How many calls/handler are allowed in the queue.
106   */
107  protected static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10;
108
109  protected final CellBlockBuilder cellBlockBuilder;
110
111  protected static final String AUTH_FAILED_FOR = "Auth failed for ";
112  protected static final String AUTH_SUCCESSFUL_FOR = "Auth successful for ";
113  protected static final Logger AUDITLOG = LoggerFactory.getLogger("SecurityLogger."
114      + Server.class.getName());
115  protected SecretManager<TokenIdentifier> secretManager;
116  protected final Map<String, String> saslProps;
117
118  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
119      justification="Start is synchronized so authManager creation is single-threaded")
120  protected ServiceAuthorizationManager authManager;
121
122  /** This is set to Call object before Handler invokes an RPC and ybdie
123   * after the call returns.
124   */
125  protected static final ThreadLocal<RpcCall> CurCall = new ThreadLocal<>();
126
127  /** Keeps MonitoredRPCHandler per handler thread. */
128  protected static final ThreadLocal<MonitoredRPCHandler> MONITORED_RPC = new ThreadLocal<>();
129
130  protected final InetSocketAddress bindAddress;
131
132  protected MetricsHBaseServer metrics;
133
134  protected final Configuration conf;
135
136  /**
137   * Maximum size in bytes of the currently queued and running Calls. If a new Call puts us over
138   * this size, then we will reject the call (after parsing it though). It will go back to the
139   * client and client will retry. Set this size with "hbase.ipc.server.max.callqueue.size". The
140   * call queue size gets incremented after we parse a call and before we add it to the queue of
141   * calls for the scheduler to use. It get decremented after we have 'run' the Call. The current
142   * size is kept in {@link #callQueueSizeInBytes}.
143   * @see #callQueueSizeInBytes
144   * @see #DEFAULT_MAX_CALLQUEUE_SIZE
145   */
146  protected final long maxQueueSizeInBytes;
147  protected static final int DEFAULT_MAX_CALLQUEUE_SIZE = 1024 * 1024 * 1024;
148
149  /**
150   * This is a running count of the size in bytes of all outstanding calls whether currently
151   * executing or queued waiting to be run.
152   */
153  protected final LongAdder callQueueSizeInBytes = new LongAdder();
154
155  protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
156  protected final boolean tcpKeepAlive; // if T then use keepalives
157
158  /**
159   * This flag is used to indicate to sub threads when they should go down.  When we call
160   * {@link #start()}, all threads started will consult this flag on whether they should
161   * keep going.  It is set to false when {@link #stop()} is called.
162   */
163  volatile boolean running = true;
164
165  /**
166   * This flag is set to true after all threads are up and 'running' and the server is then opened
167   * for business by the call to {@link #start()}.
168   */
169  volatile boolean started = false;
170
171  protected AuthenticationTokenSecretManager authTokenSecretMgr = null;
172
173  protected HBaseRPCErrorHandler errorHandler = null;
174
175  public static final String MAX_REQUEST_SIZE = "hbase.ipc.max.request.size";
176  protected static final RequestTooBigException REQUEST_TOO_BIG_EXCEPTION =
177      new RequestTooBigException();
178
179  protected static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time";
180  protected static final String WARN_RESPONSE_SIZE = "hbase.ipc.warn.response.size";
181
182  /**
183   * Minimum allowable timeout (in milliseconds) in rpc request's header. This
184   * configuration exists to prevent the rpc service regarding this request as timeout immediately.
185   */
186  protected static final String MIN_CLIENT_REQUEST_TIMEOUT = "hbase.ipc.min.client.request.timeout";
187  protected static final int DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT = 20;
188
189  /** Default value for above params */
190  public static final int DEFAULT_MAX_REQUEST_SIZE = DEFAULT_MAX_CALLQUEUE_SIZE / 4; // 256M
191  protected static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds
192  protected static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
193
194  protected static final int DEFAULT_TRACE_LOG_MAX_LENGTH = 1000;
195  protected static final String TRACE_LOG_MAX_LENGTH = "hbase.ipc.trace.log.max.length";
196  protected static final String KEY_WORD_TRUNCATED = " <TRUNCATED>";
197
198  protected static final Gson GSON = GsonUtil.createGson().create();
199
200  protected final int maxRequestSize;
201  protected final int warnResponseTime;
202  protected final int warnResponseSize;
203
204  protected final int minClientRequestTimeout;
205
206  protected final Server server;
207  protected final List<BlockingServiceAndInterface> services;
208
209  protected final RpcScheduler scheduler;
210
211  protected UserProvider userProvider;
212
213  protected final ByteBufferPool reservoir;
214  // The requests and response will use buffers from ByteBufferPool, when the size of the
215  // request/response is at least this size.
216  // We make this to be 1/6th of the pool buffer size.
217  protected final int minSizeForReservoirUse;
218
219  protected volatile boolean allowFallbackToSimpleAuth;
220
221  /**
222   * Used to get details for scan with a scanner_id<br/>
223   * TODO try to figure out a better way and remove reference from regionserver package later.
224   */
225  private RSRpcServices rsRpcServices;
226
227  @FunctionalInterface
228  protected static interface CallCleanup {
229    void run();
230  }
231
232  /**
233   * Datastructure for passing a {@link BlockingService} and its associated class of
234   * protobuf service interface.  For example, a server that fielded what is defined
235   * in the client protobuf service would pass in an implementation of the client blocking service
236   * and then its ClientService.BlockingInterface.class.  Used checking connection setup.
237   */
238  public static class BlockingServiceAndInterface {
239    private final BlockingService service;
240    private final Class<?> serviceInterface;
241    public BlockingServiceAndInterface(final BlockingService service,
242        final Class<?> serviceInterface) {
243      this.service = service;
244      this.serviceInterface = serviceInterface;
245    }
246    public Class<?> getServiceInterface() {
247      return this.serviceInterface;
248    }
249    public BlockingService getBlockingService() {
250      return this.service;
251    }
252  }
253
254  /**
255   * Constructs a server listening on the named port and address.
256   * @param server hosting instance of {@link Server}. We will do authentications if an
257   * instance else pass null for no authentication check.
258   * @param name Used keying this rpc servers' metrics and for naming the Listener thread.
259   * @param services A list of services.
260   * @param bindAddress Where to listen
261   * @param conf
262   * @param scheduler
263   * @param reservoirEnabled Enable ByteBufferPool or not.
264   */
265  public RpcServer(final Server server, final String name,
266      final List<BlockingServiceAndInterface> services,
267      final InetSocketAddress bindAddress, Configuration conf,
268      RpcScheduler scheduler, boolean reservoirEnabled) throws IOException {
269    if (reservoirEnabled) {
270      int poolBufSize = conf.getInt(ByteBufferPool.BUFFER_SIZE_KEY,
271          ByteBufferPool.DEFAULT_BUFFER_SIZE);
272      // The max number of buffers to be pooled in the ByteBufferPool. The default value been
273      // selected based on the #handlers configured. When it is read request, 2 MB is the max size
274      // at which we will send back one RPC request. Means max we need 2 MB for creating the
275      // response cell block. (Well it might be much lesser than this because in 2 MB size calc, we
276      // include the heap size overhead of each cells also.) Considering 2 MB, we will need
277      // (2 * 1024 * 1024) / poolBufSize buffers to make the response cell block. Pool buffer size
278      // is by default 64 KB.
279      // In case of read request, at the end of the handler process, we will make the response
280      // cellblock and add the Call to connection's response Q and a single Responder thread takes
281      // connections and responses from that one by one and do the socket write. So there is chances
282      // that by the time a handler originated response is actually done writing to socket and so
283      // released the BBs it used, the handler might have processed one more read req. On an avg 2x
284      // we consider and consider that also for the max buffers to pool
285      int bufsForTwoMB = (2 * 1024 * 1024) / poolBufSize;
286      int maxPoolSize = conf.getInt(ByteBufferPool.MAX_POOL_SIZE_KEY,
287          conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
288              HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) * bufsForTwoMB * 2);
289      this.reservoir = new ByteBufferPool(poolBufSize, maxPoolSize);
290      this.minSizeForReservoirUse = getMinSizeForReservoirUse(this.reservoir);
291    } else {
292      reservoir = null;
293      this.minSizeForReservoirUse = Integer.MAX_VALUE;// reservoir itself not in place.
294    }
295    this.server = server;
296    this.services = services;
297    this.bindAddress = bindAddress;
298    this.conf = conf;
299    // See declaration above for documentation on what this size is.
300    this.maxQueueSizeInBytes =
301      this.conf.getLong("hbase.ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE);
302
303    this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME);
304    this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE, DEFAULT_WARN_RESPONSE_SIZE);
305    this.minClientRequestTimeout = conf.getInt(MIN_CLIENT_REQUEST_TIMEOUT,
306        DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT);
307    this.maxRequestSize = conf.getInt(MAX_REQUEST_SIZE, DEFAULT_MAX_REQUEST_SIZE);
308
309    this.metrics = new MetricsHBaseServer(name, new MetricsHBaseServerWrapperImpl(this));
310    this.tcpNoDelay = conf.getBoolean("hbase.ipc.server.tcpnodelay", true);
311    this.tcpKeepAlive = conf.getBoolean("hbase.ipc.server.tcpkeepalive", true);
312
313    this.cellBlockBuilder = new CellBlockBuilder(conf);
314
315    this.authorize = conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false);
316    this.userProvider = UserProvider.instantiate(conf);
317    this.isSecurityEnabled = userProvider.isHBaseSecurityEnabled();
318    if (isSecurityEnabled) {
319      saslProps = SaslUtil.initSaslProperties(conf.get("hbase.rpc.protection",
320        QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)));
321    } else {
322      saslProps = Collections.emptyMap();
323    }
324
325    this.scheduler = scheduler;
326  }
327
328  @VisibleForTesting
329  static int getMinSizeForReservoirUse(ByteBufferPool pool) {
330    return pool.getBufferSize() / 6;
331  }
332
333  @Override
334  public void onConfigurationChange(Configuration newConf) {
335    initReconfigurable(newConf);
336    if (scheduler instanceof ConfigurationObserver) {
337      ((ConfigurationObserver) scheduler).onConfigurationChange(newConf);
338    }
339  }
340
341  protected void initReconfigurable(Configuration confToLoad) {
342    this.allowFallbackToSimpleAuth = confToLoad.getBoolean(FALLBACK_TO_INSECURE_CLIENT_AUTH, false);
343    if (isSecurityEnabled && allowFallbackToSimpleAuth) {
344      LOG.warn("********* WARNING! *********");
345      LOG.warn("This server is configured to allow connections from INSECURE clients");
346      LOG.warn("(" + FALLBACK_TO_INSECURE_CLIENT_AUTH + " = true).");
347      LOG.warn("While this option is enabled, client identities cannot be secured, and user");
348      LOG.warn("impersonation is possible!");
349      LOG.warn("For secure operation, please disable SIMPLE authentication as soon as possible,");
350      LOG.warn("by setting " + FALLBACK_TO_INSECURE_CLIENT_AUTH + " = false in hbase-site.xml");
351      LOG.warn("****************************");
352    }
353  }
354
355  Configuration getConf() {
356    return conf;
357  }
358
359  @Override
360  public boolean isStarted() {
361    return this.started;
362  }
363
364  @Override
365  public void refreshAuthManager(PolicyProvider pp) {
366    // Ignore warnings that this should be accessed in a static way instead of via an instance;
367    // it'll break if you go via static route.
368    synchronized (authManager) {
369      authManager.refresh(this.conf, pp);
370    }
371  }
372
373  protected AuthenticationTokenSecretManager createSecretManager() {
374    if (!isSecurityEnabled) return null;
375    if (server == null) return null;
376    Configuration conf = server.getConfiguration();
377    long keyUpdateInterval =
378        conf.getLong("hbase.auth.key.update.interval", 24*60*60*1000);
379    long maxAge =
380        conf.getLong("hbase.auth.token.max.lifetime", 7*24*60*60*1000);
381    return new AuthenticationTokenSecretManager(conf, server.getZooKeeper(),
382        server.getServerName().toString(), keyUpdateInterval, maxAge);
383  }
384
385  public SecretManager<? extends TokenIdentifier> getSecretManager() {
386    return this.secretManager;
387  }
388
389  @SuppressWarnings("unchecked")
390  public void setSecretManager(SecretManager<? extends TokenIdentifier> secretManager) {
391    this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
392  }
393
394  /**
395   * This is a server side method, which is invoked over RPC. On success
396   * the return response has protobuf response payload. On failure, the
397   * exception name and the stack trace are returned in the protobuf response.
398   */
399  @Override
400  public Pair<Message, CellScanner> call(RpcCall call,
401      MonitoredRPCHandler status) throws IOException {
402    try {
403      MethodDescriptor md = call.getMethod();
404      Message param = call.getParam();
405      status.setRPC(md.getName(), new Object[]{param},
406        call.getReceiveTime());
407      // TODO: Review after we add in encoded data blocks.
408      status.setRPCPacket(param);
409      status.resume("Servicing call");
410      //get an instance of the method arg type
411      HBaseRpcController controller = new HBaseRpcControllerImpl(call.getCellScanner());
412      controller.setCallTimeout(call.getTimeout());
413      Message result = call.getService().callBlockingMethod(md, controller, param);
414      long receiveTime = call.getReceiveTime();
415      long startTime = call.getStartTime();
416      long endTime = System.currentTimeMillis();
417      int processingTime = (int) (endTime - startTime);
418      int qTime = (int) (startTime - receiveTime);
419      int totalTime = (int) (endTime - receiveTime);
420      if (LOG.isTraceEnabled()) {
421        LOG.trace(CurCall.get().toString() +
422            ", response " + TextFormat.shortDebugString(result) +
423            " queueTime: " + qTime +
424            " processingTime: " + processingTime +
425            " totalTime: " + totalTime);
426      }
427      // Use the raw request call size for now.
428      long requestSize = call.getSize();
429      long responseSize = result.getSerializedSize();
430      if (call.isClientCellBlockSupported()) {
431        // Include the payload size in HBaseRpcController
432        responseSize += call.getResponseCellSize();
433      }
434
435      metrics.dequeuedCall(qTime);
436      metrics.processedCall(processingTime);
437      metrics.totalCall(totalTime);
438      metrics.receivedRequest(requestSize);
439      metrics.sentResponse(responseSize);
440      // log any RPC responses that are slower than the configured warn
441      // response time or larger than configured warning size
442      boolean tooSlow = (processingTime > warnResponseTime && warnResponseTime > -1);
443      boolean tooLarge = (responseSize > warnResponseSize && warnResponseSize > -1);
444      if (tooSlow || tooLarge) {
445        // when tagging, we let TooLarge trump TooSmall to keep output simple
446        // note that large responses will often also be slow.
447        logResponse(param,
448            md.getName(), md.getName() + "(" + param.getClass().getName() + ")",
449            (tooLarge ? "TooLarge" : "TooSlow"),
450            status.getClient(), startTime, processingTime, qTime,
451            responseSize);
452      }
453      return new Pair<>(result, controller.cellScanner());
454    } catch (Throwable e) {
455      // The above callBlockingMethod will always return a SE.  Strip the SE wrapper before
456      // putting it on the wire.  Its needed to adhere to the pb Service Interface but we don't
457      // need to pass it over the wire.
458      if (e instanceof ServiceException) {
459        if (e.getCause() == null) {
460          LOG.debug("Caught a ServiceException with null cause", e);
461        } else {
462          e = e.getCause();
463        }
464      }
465
466      // increment the number of requests that were exceptions.
467      metrics.exception(e);
468
469      if (e instanceof LinkageError) throw new DoNotRetryIOException(e);
470      if (e instanceof IOException) throw (IOException)e;
471      LOG.error("Unexpected throwable object ", e);
472      throw new IOException(e.getMessage(), e);
473    }
474  }
475
476  /**
477   * Logs an RPC response to the LOG file, producing valid JSON objects for
478   * client Operations.
479   * @param param The parameters received in the call.
480   * @param methodName The name of the method invoked
481   * @param call The string representation of the call
482   * @param tag  The tag that will be used to indicate this event in the log.
483   * @param clientAddress   The address of the client who made this call.
484   * @param startTime       The time that the call was initiated, in ms.
485   * @param processingTime  The duration that the call took to run, in ms.
486   * @param qTime           The duration that the call spent on the queue
487   *                        prior to being initiated, in ms.
488   * @param responseSize    The size in bytes of the response buffer.
489   */
490  void logResponse(Message param, String methodName, String call, String tag,
491      String clientAddress, long startTime, int processingTime, int qTime,
492      long responseSize) throws IOException {
493    // base information that is reported regardless of type of call
494    Map<String, Object> responseInfo = new HashMap<>();
495    responseInfo.put("starttimems", startTime);
496    responseInfo.put("processingtimems", processingTime);
497    responseInfo.put("queuetimems", qTime);
498    responseInfo.put("responsesize", responseSize);
499    responseInfo.put("client", clientAddress);
500    responseInfo.put("class", server == null? "": server.getClass().getSimpleName());
501    responseInfo.put("method", methodName);
502    responseInfo.put("call", call);
503    // The params could be really big, make sure they don't kill us at WARN
504    String stringifiedParam = ProtobufUtil.getShortTextFormat(param);
505    if (stringifiedParam.length() > 150) {
506      // Truncate to 1000 chars if TRACE is on, else to 150 chars
507      stringifiedParam = truncateTraceLog(stringifiedParam);
508    }
509    responseInfo.put("param", stringifiedParam);
510    if (param instanceof ClientProtos.ScanRequest && rsRpcServices != null) {
511      ClientProtos.ScanRequest request = ((ClientProtos.ScanRequest) param);
512      if (request.hasScannerId()) {
513        long scannerId = request.getScannerId();
514        String scanDetails = rsRpcServices.getScanDetailsWithId(scannerId);
515        if (scanDetails != null) {
516          responseInfo.put("scandetails", scanDetails);
517        }
518      }
519    }
520    if (param instanceof ClientProtos.MultiRequest) {
521      int numGets = 0;
522      int numMutations = 0;
523      int numServiceCalls = 0;
524      ClientProtos.MultiRequest multi = (ClientProtos.MultiRequest)param;
525      for (ClientProtos.RegionAction regionAction : multi.getRegionActionList()) {
526        for (ClientProtos.Action action: regionAction.getActionList()) {
527          if (action.hasMutation()) {
528            numMutations++;
529          }
530          if (action.hasGet()) {
531            numGets++;
532          }
533          if (action.hasServiceCall()) {
534            numServiceCalls++;
535          }
536        }
537      }
538      responseInfo.put("multi.gets", numGets);
539      responseInfo.put("multi.mutations", numMutations);
540      responseInfo.put("multi.servicecalls", numServiceCalls);
541    }
542    LOG.warn("(response" + tag + "): " + GSON.toJson(responseInfo));
543  }
544
545  /**
546   * Truncate to number of chars decided by conf hbase.ipc.trace.log.max.length
547   * if TRACE is on else to 150 chars Refer to Jira HBASE-20826 and HBASE-20942
548   * @param strParam stringifiedParam to be truncated
549   * @return truncated trace log string
550   */
551  @VisibleForTesting
552  String truncateTraceLog(String strParam) {
553    if (LOG.isTraceEnabled()) {
554      int traceLogMaxLength = getConf().getInt(TRACE_LOG_MAX_LENGTH, DEFAULT_TRACE_LOG_MAX_LENGTH);
555      int truncatedLength =
556          strParam.length() < traceLogMaxLength ? strParam.length() : traceLogMaxLength;
557      String truncatedFlag = truncatedLength == strParam.length() ? "" : KEY_WORD_TRUNCATED;
558      return strParam.subSequence(0, truncatedLength) + truncatedFlag;
559    }
560    return strParam.subSequence(0, 150) + KEY_WORD_TRUNCATED;
561  }
562
563  /**
564   * Set the handler for calling out of RPC for error conditions.
565   * @param handler the handler implementation
566   */
567  @Override
568  public void setErrorHandler(HBaseRPCErrorHandler handler) {
569    this.errorHandler = handler;
570  }
571
572  @Override
573  public HBaseRPCErrorHandler getErrorHandler() {
574    return this.errorHandler;
575  }
576
577  /**
578   * Returns the metrics instance for reporting RPC call statistics
579   */
580  @Override
581  public MetricsHBaseServer getMetrics() {
582    return metrics;
583  }
584
585  @Override
586  public void addCallSize(final long diff) {
587    this.callQueueSizeInBytes.add(diff);
588  }
589
590  /**
591   * Authorize the incoming client connection.
592   * @param user client user
593   * @param connection incoming connection
594   * @param addr InetAddress of incoming connection
595   * @throws AuthorizationException when the client isn't authorized to talk the protocol
596   */
597  public void authorize(UserGroupInformation user, ConnectionHeader connection,
598      InetAddress addr) throws AuthorizationException {
599    if (authorize) {
600      Class<?> c = getServiceInterface(services, connection.getServiceName());
601      synchronized (authManager) {
602        authManager.authorize(user, c, getConf(), addr);
603      }
604    }
605  }
606
607  /**
608   * When the read or write buffer size is larger than this limit, i/o will be
609   * done in chunks of this size. Most RPC requests and responses would be
610   * be smaller.
611   */
612  protected static final int NIO_BUFFER_LIMIT = 64 * 1024; //should not be more than 64KB.
613
614  /**
615   * This is a wrapper around {@link java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)}.
616   * If the amount of data is large, it writes to channel in smaller chunks.
617   * This is to avoid jdk from creating many direct buffers as the size of
618   * ByteBuffer increases. There should not be any performance degredation.
619   *
620   * @param channel writable byte channel to write on
621   * @param buffer buffer to write
622   * @return number of bytes written
623   * @throws java.io.IOException e
624   * @see java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)
625   */
626  protected int channelRead(ReadableByteChannel channel,
627                                   ByteBuffer buffer) throws IOException {
628
629    int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
630           channel.read(buffer) : channelIO(channel, null, buffer);
631    if (count > 0) {
632      metrics.receivedBytes(count);
633    }
634    return count;
635  }
636
637  /**
638   * Helper for {@link #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)}.
639   * Only one of readCh or writeCh should be non-null.
640   *
641   * @param readCh read channel
642   * @param writeCh write channel
643   * @param buf buffer to read or write into/out of
644   * @return bytes written
645   * @throws java.io.IOException e
646   * @see #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)
647   */
648  private static int channelIO(ReadableByteChannel readCh,
649                               WritableByteChannel writeCh,
650                               ByteBuffer buf) throws IOException {
651
652    int originalLimit = buf.limit();
653    int initialRemaining = buf.remaining();
654    int ret = 0;
655
656    while (buf.remaining() > 0) {
657      try {
658        int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
659        buf.limit(buf.position() + ioSize);
660
661        ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf);
662
663        if (ret < ioSize) {
664          break;
665        }
666
667      } finally {
668        buf.limit(originalLimit);
669      }
670    }
671
672    int nBytes = initialRemaining - buf.remaining();
673    return (nBytes > 0) ? nBytes : ret;
674  }
675
676  /**
677   * This is extracted to a static method for better unit testing. We try to get buffer(s) from pool
678   * as much as possible.
679   *
680   * @param pool The ByteBufferPool to use
681   * @param minSizeForPoolUse Only for buffer size above this, we will try to use pool. Any buffer
682   *           need of size below this, create on heap ByteBuffer.
683   * @param reqLen Bytes count in request
684   */
685  @VisibleForTesting
686  static Pair<ByteBuff, CallCleanup> allocateByteBuffToReadInto(ByteBufferPool pool,
687      int minSizeForPoolUse, int reqLen) {
688    ByteBuff resultBuf;
689    List<ByteBuffer> bbs = new ArrayList<>((reqLen / pool.getBufferSize()) + 1);
690    int remain = reqLen;
691    ByteBuffer buf = null;
692    while (remain >= minSizeForPoolUse && (buf = pool.getBuffer()) != null) {
693      bbs.add(buf);
694      remain -= pool.getBufferSize();
695    }
696    ByteBuffer[] bufsFromPool = null;
697    if (bbs.size() > 0) {
698      bufsFromPool = new ByteBuffer[bbs.size()];
699      bbs.toArray(bufsFromPool);
700    }
701    if (remain > 0) {
702      bbs.add(ByteBuffer.allocate(remain));
703    }
704    if (bbs.size() > 1) {
705      ByteBuffer[] items = new ByteBuffer[bbs.size()];
706      bbs.toArray(items);
707      resultBuf = new MultiByteBuff(items);
708    } else {
709      // We are backed by single BB
710      resultBuf = new SingleByteBuff(bbs.get(0));
711    }
712    resultBuf.limit(reqLen);
713    if (bufsFromPool != null) {
714      final ByteBuffer[] bufsFromPoolFinal = bufsFromPool;
715      return new Pair<>(resultBuf, () -> {
716        // Return back all the BBs to pool
717        for (int i = 0; i < bufsFromPoolFinal.length; i++) {
718          pool.putbackBuffer(bufsFromPoolFinal[i]);
719        }
720      });
721    }
722    return new Pair<>(resultBuf, null);
723  }
724
725  /**
726   * Needed for features such as delayed calls.  We need to be able to store the current call
727   * so that we can complete it later or ask questions of what is supported by the current ongoing
728   * call.
729   * @return An RpcCallContext backed by the currently ongoing call (gotten from a thread local)
730   */
731  public static Optional<RpcCall> getCurrentCall() {
732    return Optional.ofNullable(CurCall.get());
733  }
734
735  public static boolean isInRpcCallContext() {
736    return CurCall.get() != null;
737  }
738
739  /**
740   * Returns the user credentials associated with the current RPC request or not present if no
741   * credentials were provided.
742   * @return A User
743   */
744  public static Optional<User> getRequestUser() {
745    Optional<RpcCall> ctx = getCurrentCall();
746    return ctx.isPresent() ? ctx.get().getRequestUser() : Optional.empty();
747  }
748
749  /**
750   * The number of open RPC conections
751   * @return the number of open rpc connections
752   */
753  abstract public int getNumOpenConnections();
754
755  /**
756   * Returns the username for any user associated with the current RPC
757   * request or not present if no user is set.
758   */
759  public static Optional<String> getRequestUserName() {
760    return getRequestUser().map(User::getShortName);
761  }
762
763  /**
764   * @return Address of remote client if a request is ongoing, else null
765   */
766  public static Optional<InetAddress> getRemoteAddress() {
767    return getCurrentCall().map(RpcCall::getRemoteAddress);
768  }
769
770  /**
771   * @param serviceName Some arbitrary string that represents a 'service'.
772   * @param services Available service instances
773   * @return Matching BlockingServiceAndInterface pair
774   */
775  protected static BlockingServiceAndInterface getServiceAndInterface(
776      final List<BlockingServiceAndInterface> services, final String serviceName) {
777    for (BlockingServiceAndInterface bs : services) {
778      if (bs.getBlockingService().getDescriptorForType().getName().equals(serviceName)) {
779        return bs;
780      }
781    }
782    return null;
783  }
784
785  /**
786   * @param serviceName Some arbitrary string that represents a 'service'.
787   * @param services Available services and their service interfaces.
788   * @return Service interface class for <code>serviceName</code>
789   */
790  protected static Class<?> getServiceInterface(
791      final List<BlockingServiceAndInterface> services,
792      final String serviceName) {
793    BlockingServiceAndInterface bsasi =
794        getServiceAndInterface(services, serviceName);
795    return bsasi == null? null: bsasi.getServiceInterface();
796  }
797
798  /**
799   * @param serviceName Some arbitrary string that represents a 'service'.
800   * @param services Available services and their service interfaces.
801   * @return BlockingService that goes with the passed <code>serviceName</code>
802   */
803  protected static BlockingService getService(
804      final List<BlockingServiceAndInterface> services,
805      final String serviceName) {
806    BlockingServiceAndInterface bsasi =
807        getServiceAndInterface(services, serviceName);
808    return bsasi == null? null: bsasi.getBlockingService();
809  }
810
811  protected static MonitoredRPCHandler getStatus() {
812    // It is ugly the way we park status up in RpcServer.  Let it be for now.  TODO.
813    MonitoredRPCHandler status = RpcServer.MONITORED_RPC.get();
814    if (status != null) {
815      return status;
816    }
817    status = TaskMonitor.get().createRPCStatus(Thread.currentThread().getName());
818    status.pause("Waiting for a call");
819    RpcServer.MONITORED_RPC.set(status);
820    return status;
821  }
822
823  /** Returns the remote side ip address when invoked inside an RPC
824   *  Returns null incase of an error.
825   *  @return InetAddress
826   */
827  public static InetAddress getRemoteIp() {
828    RpcCall call = CurCall.get();
829    if (call != null) {
830      return call.getRemoteAddress();
831    }
832    return null;
833  }
834
835  @Override
836  public RpcScheduler getScheduler() {
837    return scheduler;
838  }
839
840  @Override
841  public void setRsRpcServices(RSRpcServices rsRpcServices) {
842    this.rsRpcServices = rsRpcServices;
843  }
844}