001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.ipc;
020
021import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION;
022
023import java.io.IOException;
024import java.net.InetAddress;
025import java.net.InetSocketAddress;
026import java.nio.ByteBuffer;
027import java.nio.channels.ReadableByteChannel;
028import java.nio.channels.WritableByteChannel;
029import java.util.ArrayList;
030import java.util.Collections;
031import java.util.HashMap;
032import java.util.List;
033import java.util.Locale;
034import java.util.Map;
035import java.util.Optional;
036import java.util.concurrent.atomic.LongAdder;
037
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.hbase.CallQueueTooBigException;
040import org.apache.hadoop.hbase.CellScanner;
041import org.apache.hadoop.hbase.DoNotRetryIOException;
042import org.apache.hadoop.hbase.HConstants;
043import org.apache.hadoop.hbase.Server;
044import org.apache.hadoop.hbase.conf.ConfigurationObserver;
045import org.apache.hadoop.hbase.exceptions.RequestTooBigException;
046import org.apache.hadoop.hbase.io.ByteBufferPool;
047import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
048import org.apache.hadoop.hbase.monitoring.TaskMonitor;
049import org.apache.hadoop.hbase.nio.ByteBuff;
050import org.apache.hadoop.hbase.nio.MultiByteBuff;
051import org.apache.hadoop.hbase.nio.SingleByteBuff;
052import org.apache.hadoop.hbase.regionserver.RSRpcServices;
053import org.apache.hadoop.hbase.security.SaslUtil;
054import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
055import org.apache.hadoop.hbase.security.User;
056import org.apache.hadoop.hbase.security.UserProvider;
057import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
058import org.apache.hadoop.hbase.util.Pair;
059import org.apache.hadoop.security.UserGroupInformation;
060import org.apache.hadoop.security.authorize.AuthorizationException;
061import org.apache.hadoop.security.authorize.PolicyProvider;
062import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
063import org.apache.hadoop.security.token.SecretManager;
064import org.apache.hadoop.security.token.TokenIdentifier;
065import org.apache.yetus.audience.InterfaceAudience;
066import org.slf4j.Logger;
067import org.slf4j.LoggerFactory;
068
069import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
070import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
071import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
072import org.apache.hbase.thirdparty.com.google.protobuf.Message;
073import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
074import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
075import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
076import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
077import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
078
079import com.fasterxml.jackson.databind.ObjectMapper;
080
081/**
082 * An RPC server that hosts protobuf described Services.
083 *
084 */
085@InterfaceAudience.Private
086public abstract class RpcServer implements RpcServerInterface,
087    ConfigurationObserver {
088  // LOG is being used in CallRunner and the log level is being changed in tests
089  public static final Logger LOG = LoggerFactory.getLogger(RpcServer.class);
090  protected static final CallQueueTooBigException CALL_QUEUE_TOO_BIG_EXCEPTION
091      = new CallQueueTooBigException();
092
093  private final boolean authorize;
094  protected boolean isSecurityEnabled;
095
096  public static final byte CURRENT_VERSION = 0;
097
098  /**
099   * Whether we allow a fallback to SIMPLE auth for insecure clients when security is enabled.
100   */
101  public static final String FALLBACK_TO_INSECURE_CLIENT_AUTH =
102          "hbase.ipc.server.fallback-to-simple-auth-allowed";
103
104  /**
105   * How many calls/handler are allowed in the queue.
106   */
107  protected static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10;
108
109  protected final CellBlockBuilder cellBlockBuilder;
110
111  protected static final String AUTH_FAILED_FOR = "Auth failed for ";
112  protected static final String AUTH_SUCCESSFUL_FOR = "Auth successful for ";
113  protected static final Logger AUDITLOG = LoggerFactory.getLogger("SecurityLogger."
114      + Server.class.getName());
115  protected SecretManager<TokenIdentifier> secretManager;
116  protected final Map<String, String> saslProps;
117
118  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
119      justification="Start is synchronized so authManager creation is single-threaded")
120  protected ServiceAuthorizationManager authManager;
121
122  /** This is set to Call object before Handler invokes an RPC and ybdie
123   * after the call returns.
124   */
125  protected static final ThreadLocal<RpcCall> CurCall = new ThreadLocal<>();
126
127  /** Keeps MonitoredRPCHandler per handler thread. */
128  protected static final ThreadLocal<MonitoredRPCHandler> MONITORED_RPC = new ThreadLocal<>();
129
130  protected final InetSocketAddress bindAddress;
131
132  protected MetricsHBaseServer metrics;
133
134  protected final Configuration conf;
135
136  /**
137   * Maximum size in bytes of the currently queued and running Calls. If a new Call puts us over
138   * this size, then we will reject the call (after parsing it though). It will go back to the
139   * client and client will retry. Set this size with "hbase.ipc.server.max.callqueue.size". The
140   * call queue size gets incremented after we parse a call and before we add it to the queue of
141   * calls for the scheduler to use. It get decremented after we have 'run' the Call. The current
142   * size is kept in {@link #callQueueSizeInBytes}.
143   * @see #callQueueSizeInBytes
144   * @see #DEFAULT_MAX_CALLQUEUE_SIZE
145   */
146  protected final long maxQueueSizeInBytes;
147  protected static final int DEFAULT_MAX_CALLQUEUE_SIZE = 1024 * 1024 * 1024;
148
149  /**
150   * This is a running count of the size in bytes of all outstanding calls whether currently
151   * executing or queued waiting to be run.
152   */
153  protected final LongAdder callQueueSizeInBytes = new LongAdder();
154
155  protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
156  protected final boolean tcpKeepAlive; // if T then use keepalives
157
158  /**
159   * This flag is used to indicate to sub threads when they should go down.  When we call
160   * {@link #start()}, all threads started will consult this flag on whether they should
161   * keep going.  It is set to false when {@link #stop()} is called.
162   */
163  volatile boolean running = true;
164
165  /**
166   * This flag is set to true after all threads are up and 'running' and the server is then opened
167   * for business by the call to {@link #start()}.
168   */
169  volatile boolean started = false;
170
171  protected AuthenticationTokenSecretManager authTokenSecretMgr = null;
172
173  protected HBaseRPCErrorHandler errorHandler = null;
174
175  public static final String MAX_REQUEST_SIZE = "hbase.ipc.max.request.size";
176  protected static final RequestTooBigException REQUEST_TOO_BIG_EXCEPTION =
177      new RequestTooBigException();
178
179  protected static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time";
180  protected static final String WARN_RESPONSE_SIZE = "hbase.ipc.warn.response.size";
181
182  /**
183   * Minimum allowable timeout (in milliseconds) in rpc request's header. This
184   * configuration exists to prevent the rpc service regarding this request as timeout immediately.
185   */
186  protected static final String MIN_CLIENT_REQUEST_TIMEOUT = "hbase.ipc.min.client.request.timeout";
187  protected static final int DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT = 20;
188
189  /** Default value for above params */
190  public static final int DEFAULT_MAX_REQUEST_SIZE = DEFAULT_MAX_CALLQUEUE_SIZE / 4; // 256M
191  protected static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds
192  protected static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
193
194  protected static final int DEFAULT_TRACE_LOG_MAX_LENGTH = 1000;
195  protected static final String TRACE_LOG_MAX_LENGTH = "hbase.ipc.trace.log.max.length";
196  protected static final String KEY_WORD_TRUNCATED = " <TRUNCATED>";
197
198  protected static final ObjectMapper MAPPER = new ObjectMapper();
199
200  protected final int maxRequestSize;
201  protected final int warnResponseTime;
202  protected final int warnResponseSize;
203
204  protected final int minClientRequestTimeout;
205
206  protected final Server server;
207  protected final List<BlockingServiceAndInterface> services;
208
209  protected final RpcScheduler scheduler;
210
211  protected UserProvider userProvider;
212
213  protected final ByteBufferPool reservoir;
214  // The requests and response will use buffers from ByteBufferPool, when the size of the
215  // request/response is at least this size.
216  // We make this to be 1/6th of the pool buffer size.
217  protected final int minSizeForReservoirUse;
218
219  protected volatile boolean allowFallbackToSimpleAuth;
220
221  /**
222   * Used to get details for scan with a scanner_id<br/>
223   * TODO try to figure out a better way and remove reference from regionserver package later.
224   */
225  private RSRpcServices rsRpcServices;
226
227  @FunctionalInterface
228  protected static interface CallCleanup {
229    void run();
230  }
231
232  /**
233   * Datastructure for passing a {@link BlockingService} and its associated class of
234   * protobuf service interface.  For example, a server that fielded what is defined
235   * in the client protobuf service would pass in an implementation of the client blocking service
236   * and then its ClientService.BlockingInterface.class.  Used checking connection setup.
237   */
238  public static class BlockingServiceAndInterface {
239    private final BlockingService service;
240    private final Class<?> serviceInterface;
241    public BlockingServiceAndInterface(final BlockingService service,
242        final Class<?> serviceInterface) {
243      this.service = service;
244      this.serviceInterface = serviceInterface;
245    }
246    public Class<?> getServiceInterface() {
247      return this.serviceInterface;
248    }
249    public BlockingService getBlockingService() {
250      return this.service;
251    }
252  }
253
254  /**
255   * Constructs a server listening on the named port and address.
256   * @param server hosting instance of {@link Server}. We will do authentications if an
257   * instance else pass null for no authentication check.
258   * @param name Used keying this rpc servers' metrics and for naming the Listener thread.
259   * @param services A list of services.
260   * @param bindAddress Where to listen
261   * @param conf
262   * @param scheduler
263   * @param reservoirEnabled Enable ByteBufferPool or not.
264   */
265  public RpcServer(final Server server, final String name,
266      final List<BlockingServiceAndInterface> services,
267      final InetSocketAddress bindAddress, Configuration conf,
268      RpcScheduler scheduler, boolean reservoirEnabled) throws IOException {
269    if (reservoirEnabled) {
270      int poolBufSize = conf.getInt(ByteBufferPool.BUFFER_SIZE_KEY,
271          ByteBufferPool.DEFAULT_BUFFER_SIZE);
272      // The max number of buffers to be pooled in the ByteBufferPool. The default value been
273      // selected based on the #handlers configured. When it is read request, 2 MB is the max size
274      // at which we will send back one RPC request. Means max we need 2 MB for creating the
275      // response cell block. (Well it might be much lesser than this because in 2 MB size calc, we
276      // include the heap size overhead of each cells also.) Considering 2 MB, we will need
277      // (2 * 1024 * 1024) / poolBufSize buffers to make the response cell block. Pool buffer size
278      // is by default 64 KB.
279      // In case of read request, at the end of the handler process, we will make the response
280      // cellblock and add the Call to connection's response Q and a single Responder thread takes
281      // connections and responses from that one by one and do the socket write. So there is chances
282      // that by the time a handler originated response is actually done writing to socket and so
283      // released the BBs it used, the handler might have processed one more read req. On an avg 2x
284      // we consider and consider that also for the max buffers to pool
285      int bufsForTwoMB = (2 * 1024 * 1024) / poolBufSize;
286      int maxPoolSize = conf.getInt(ByteBufferPool.MAX_POOL_SIZE_KEY,
287          conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
288              HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) * bufsForTwoMB * 2);
289      this.reservoir = new ByteBufferPool(poolBufSize, maxPoolSize);
290      this.minSizeForReservoirUse = getMinSizeForReservoirUse(this.reservoir);
291    } else {
292      reservoir = null;
293      this.minSizeForReservoirUse = Integer.MAX_VALUE;// reservoir itself not in place.
294    }
295    this.server = server;
296    this.services = services;
297    this.bindAddress = bindAddress;
298    this.conf = conf;
299    // See declaration above for documentation on what this size is.
300    this.maxQueueSizeInBytes =
301      this.conf.getLong("hbase.ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE);
302
303    this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME);
304    this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE, DEFAULT_WARN_RESPONSE_SIZE);
305    this.minClientRequestTimeout = conf.getInt(MIN_CLIENT_REQUEST_TIMEOUT,
306        DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT);
307    this.maxRequestSize = conf.getInt(MAX_REQUEST_SIZE, DEFAULT_MAX_REQUEST_SIZE);
308
309    this.metrics = new MetricsHBaseServer(name, new MetricsHBaseServerWrapperImpl(this));
310    this.tcpNoDelay = conf.getBoolean("hbase.ipc.server.tcpnodelay", true);
311    this.tcpKeepAlive = conf.getBoolean("hbase.ipc.server.tcpkeepalive", true);
312
313    this.cellBlockBuilder = new CellBlockBuilder(conf);
314
315    this.authorize = conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false);
316    this.userProvider = UserProvider.instantiate(conf);
317    this.isSecurityEnabled = userProvider.isHBaseSecurityEnabled();
318    if (isSecurityEnabled) {
319      saslProps = SaslUtil.initSaslProperties(conf.get("hbase.rpc.protection",
320        QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)));
321    } else {
322      saslProps = Collections.emptyMap();
323    }
324
325    this.scheduler = scheduler;
326  }
327
328  @VisibleForTesting
329  static int getMinSizeForReservoirUse(ByteBufferPool pool) {
330    return pool.getBufferSize() / 6;
331  }
332
333  @Override
334  public void onConfigurationChange(Configuration newConf) {
335    initReconfigurable(newConf);
336    if (scheduler instanceof ConfigurationObserver) {
337      ((ConfigurationObserver) scheduler).onConfigurationChange(newConf);
338    }
339  }
340
341  protected void initReconfigurable(Configuration confToLoad) {
342    this.allowFallbackToSimpleAuth = confToLoad.getBoolean(FALLBACK_TO_INSECURE_CLIENT_AUTH, false);
343    if (isSecurityEnabled && allowFallbackToSimpleAuth) {
344      LOG.warn("********* WARNING! *********");
345      LOG.warn("This server is configured to allow connections from INSECURE clients");
346      LOG.warn("(" + FALLBACK_TO_INSECURE_CLIENT_AUTH + " = true).");
347      LOG.warn("While this option is enabled, client identities cannot be secured, and user");
348      LOG.warn("impersonation is possible!");
349      LOG.warn("For secure operation, please disable SIMPLE authentication as soon as possible,");
350      LOG.warn("by setting " + FALLBACK_TO_INSECURE_CLIENT_AUTH + " = false in hbase-site.xml");
351      LOG.warn("****************************");
352    }
353  }
354
355  Configuration getConf() {
356    return conf;
357  }
358
359  @Override
360  public boolean isStarted() {
361    return this.started;
362  }
363
364  @Override
365  public void refreshAuthManager(PolicyProvider pp) {
366    // Ignore warnings that this should be accessed in a static way instead of via an instance;
367    // it'll break if you go via static route.
368    synchronized (authManager) {
369      authManager.refresh(this.conf, pp);
370    }
371  }
372
373  protected AuthenticationTokenSecretManager createSecretManager() {
374    if (!isSecurityEnabled) return null;
375    if (server == null) return null;
376    Configuration conf = server.getConfiguration();
377    long keyUpdateInterval =
378        conf.getLong("hbase.auth.key.update.interval", 24*60*60*1000);
379    long maxAge =
380        conf.getLong("hbase.auth.token.max.lifetime", 7*24*60*60*1000);
381    return new AuthenticationTokenSecretManager(conf, server.getZooKeeper(),
382        server.getServerName().toString(), keyUpdateInterval, maxAge);
383  }
384
385  public SecretManager<? extends TokenIdentifier> getSecretManager() {
386    return this.secretManager;
387  }
388
389  @SuppressWarnings("unchecked")
390  public void setSecretManager(SecretManager<? extends TokenIdentifier> secretManager) {
391    this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
392  }
393
394  /**
395   * This is a server side method, which is invoked over RPC. On success
396   * the return response has protobuf response payload. On failure, the
397   * exception name and the stack trace are returned in the protobuf response.
398   */
399  @Override
400  public Pair<Message, CellScanner> call(RpcCall call,
401      MonitoredRPCHandler status) throws IOException {
402    try {
403      MethodDescriptor md = call.getMethod();
404      Message param = call.getParam();
405      status.setRPC(md.getName(), new Object[]{param},
406        call.getReceiveTime());
407      // TODO: Review after we add in encoded data blocks.
408      status.setRPCPacket(param);
409      status.resume("Servicing call");
410      //get an instance of the method arg type
411      HBaseRpcController controller = new HBaseRpcControllerImpl(call.getCellScanner());
412      controller.setCallTimeout(call.getTimeout());
413      Message result = call.getService().callBlockingMethod(md, controller, param);
414      long receiveTime = call.getReceiveTime();
415      long startTime = call.getStartTime();
416      long endTime = System.currentTimeMillis();
417      int processingTime = (int) (endTime - startTime);
418      int qTime = (int) (startTime - receiveTime);
419      int totalTime = (int) (endTime - receiveTime);
420      if (LOG.isTraceEnabled()) {
421        LOG.trace(CurCall.get().toString() +
422            ", response " + TextFormat.shortDebugString(result) +
423            " queueTime: " + qTime +
424            " processingTime: " + processingTime +
425            " totalTime: " + totalTime);
426      }
427      // Use the raw request call size for now.
428      long requestSize = call.getSize();
429      long responseSize = result.getSerializedSize();
430      if (call.isClientCellBlockSupported()) {
431        // Include the payload size in HBaseRpcController
432        responseSize += call.getResponseCellSize();
433      }
434
435      metrics.dequeuedCall(qTime);
436      metrics.processedCall(processingTime);
437      metrics.totalCall(totalTime);
438      metrics.receivedRequest(requestSize);
439      metrics.sentResponse(responseSize);
440      // log any RPC responses that are slower than the configured warn
441      // response time or larger than configured warning size
442      boolean tooSlow = (processingTime > warnResponseTime && warnResponseTime > -1);
443      boolean tooLarge = (responseSize > warnResponseSize && warnResponseSize > -1);
444      if (tooSlow || tooLarge) {
445        // when tagging, we let TooLarge trump TooSmall to keep output simple
446        // note that large responses will often also be slow.
447        logResponse(param,
448            md.getName(), md.getName() + "(" + param.getClass().getName() + ")",
449            (tooLarge ? "TooLarge" : "TooSlow"),
450            status.getClient(), startTime, processingTime, qTime,
451            responseSize);
452      }
453      return new Pair<>(result, controller.cellScanner());
454    } catch (Throwable e) {
455      // The above callBlockingMethod will always return a SE.  Strip the SE wrapper before
456      // putting it on the wire.  Its needed to adhere to the pb Service Interface but we don't
457      // need to pass it over the wire.
458      if (e instanceof ServiceException) {
459        if (e.getCause() == null) {
460          LOG.debug("Caught a ServiceException with null cause", e);
461        } else {
462          e = e.getCause();
463        }
464      }
465
466      // increment the number of requests that were exceptions.
467      metrics.exception(e);
468
469      if (e instanceof LinkageError) throw new DoNotRetryIOException(e);
470      if (e instanceof IOException) throw (IOException)e;
471      LOG.error("Unexpected throwable object ", e);
472      throw new IOException(e.getMessage(), e);
473    }
474  }
475
476  /**
477   * Logs an RPC response to the LOG file, producing valid JSON objects for
478   * client Operations.
479   * @param param The parameters received in the call.
480   * @param methodName The name of the method invoked
481   * @param call The string representation of the call
482   * @param tag  The tag that will be used to indicate this event in the log.
483   * @param clientAddress   The address of the client who made this call.
484   * @param startTime       The time that the call was initiated, in ms.
485   * @param processingTime  The duration that the call took to run, in ms.
486   * @param qTime           The duration that the call spent on the queue
487   *                        prior to being initiated, in ms.
488   * @param responseSize    The size in bytes of the response buffer.
489   */
490  void logResponse(Message param, String methodName, String call, String tag,
491      String clientAddress, long startTime, int processingTime, int qTime,
492      long responseSize) throws IOException {
493    // base information that is reported regardless of type of call
494    Map<String, Object> responseInfo = new HashMap<>();
495    responseInfo.put("starttimems", startTime);
496    responseInfo.put("processingtimems", processingTime);
497    responseInfo.put("queuetimems", qTime);
498    responseInfo.put("responsesize", responseSize);
499    responseInfo.put("client", clientAddress);
500    responseInfo.put("class", server == null? "": server.getClass().getSimpleName());
501    responseInfo.put("method", methodName);
502    responseInfo.put("call", call);
503    // The params could be really big, make sure they don't kill us at WARN
504    String stringifiedParam = ProtobufUtil.getShortTextFormat(param);
505    if (stringifiedParam.length() > 150) {
506      // Truncate to 1000 chars if TRACE is on, else to 150 chars
507      stringifiedParam = truncateTraceLog(stringifiedParam);
508    }
509    responseInfo.put("param", stringifiedParam);
510    if (param instanceof ClientProtos.ScanRequest && rsRpcServices != null) {
511      ClientProtos.ScanRequest request = ((ClientProtos.ScanRequest) param);
512      if (request.hasScannerId()) {
513        long scannerId = request.getScannerId();
514        String scanDetails = rsRpcServices.getScanDetailsWithId(scannerId);
515        if (scanDetails != null) {
516          responseInfo.put("scandetails", scanDetails);
517        }
518      }
519    }
520    LOG.warn("(response" + tag + "): " + MAPPER.writeValueAsString(responseInfo));
521  }
522
523  /**
524   * Truncate to number of chars decided by conf hbase.ipc.trace.log.max.length
525   * if TRACE is on else to 150 chars Refer to Jira HBASE-20826 and HBASE-20942
526   * @param strParam stringifiedParam to be truncated
527   * @return truncated trace log string
528   */
529  @VisibleForTesting
530  String truncateTraceLog(String strParam) {
531    if (LOG.isTraceEnabled()) {
532      int traceLogMaxLength = getConf().getInt(TRACE_LOG_MAX_LENGTH, DEFAULT_TRACE_LOG_MAX_LENGTH);
533      int truncatedLength =
534          strParam.length() < traceLogMaxLength ? strParam.length() : traceLogMaxLength;
535      String truncatedFlag = truncatedLength == strParam.length() ? "" : KEY_WORD_TRUNCATED;
536      return strParam.subSequence(0, truncatedLength) + truncatedFlag;
537    }
538    return strParam.subSequence(0, 150) + KEY_WORD_TRUNCATED;
539  }
540
541  /**
542   * Set the handler for calling out of RPC for error conditions.
543   * @param handler the handler implementation
544   */
545  @Override
546  public void setErrorHandler(HBaseRPCErrorHandler handler) {
547    this.errorHandler = handler;
548  }
549
550  @Override
551  public HBaseRPCErrorHandler getErrorHandler() {
552    return this.errorHandler;
553  }
554
555  /**
556   * Returns the metrics instance for reporting RPC call statistics
557   */
558  @Override
559  public MetricsHBaseServer getMetrics() {
560    return metrics;
561  }
562
563  @Override
564  public void addCallSize(final long diff) {
565    this.callQueueSizeInBytes.add(diff);
566  }
567
568  /**
569   * Authorize the incoming client connection.
570   * @param user client user
571   * @param connection incoming connection
572   * @param addr InetAddress of incoming connection
573   * @throws AuthorizationException when the client isn't authorized to talk the protocol
574   */
575  public void authorize(UserGroupInformation user, ConnectionHeader connection,
576      InetAddress addr) throws AuthorizationException {
577    if (authorize) {
578      Class<?> c = getServiceInterface(services, connection.getServiceName());
579      synchronized (authManager) {
580        authManager.authorize(user, c, getConf(), addr);
581      }
582    }
583  }
584
585  /**
586   * When the read or write buffer size is larger than this limit, i/o will be
587   * done in chunks of this size. Most RPC requests and responses would be
588   * be smaller.
589   */
590  protected static final int NIO_BUFFER_LIMIT = 64 * 1024; //should not be more than 64KB.
591
592  /**
593   * This is a wrapper around {@link java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)}.
594   * If the amount of data is large, it writes to channel in smaller chunks.
595   * This is to avoid jdk from creating many direct buffers as the size of
596   * ByteBuffer increases. There should not be any performance degredation.
597   *
598   * @param channel writable byte channel to write on
599   * @param buffer buffer to write
600   * @return number of bytes written
601   * @throws java.io.IOException e
602   * @see java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)
603   */
604  protected int channelRead(ReadableByteChannel channel,
605                                   ByteBuffer buffer) throws IOException {
606
607    int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
608           channel.read(buffer) : channelIO(channel, null, buffer);
609    if (count > 0) {
610      metrics.receivedBytes(count);
611    }
612    return count;
613  }
614
615  /**
616   * Helper for {@link #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)}.
617   * Only one of readCh or writeCh should be non-null.
618   *
619   * @param readCh read channel
620   * @param writeCh write channel
621   * @param buf buffer to read or write into/out of
622   * @return bytes written
623   * @throws java.io.IOException e
624   * @see #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)
625   */
626  private static int channelIO(ReadableByteChannel readCh,
627                               WritableByteChannel writeCh,
628                               ByteBuffer buf) throws IOException {
629
630    int originalLimit = buf.limit();
631    int initialRemaining = buf.remaining();
632    int ret = 0;
633
634    while (buf.remaining() > 0) {
635      try {
636        int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
637        buf.limit(buf.position() + ioSize);
638
639        ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf);
640
641        if (ret < ioSize) {
642          break;
643        }
644
645      } finally {
646        buf.limit(originalLimit);
647      }
648    }
649
650    int nBytes = initialRemaining - buf.remaining();
651    return (nBytes > 0) ? nBytes : ret;
652  }
653
654  /**
655   * This is extracted to a static method for better unit testing. We try to get buffer(s) from pool
656   * as much as possible.
657   *
658   * @param pool The ByteBufferPool to use
659   * @param minSizeForPoolUse Only for buffer size above this, we will try to use pool. Any buffer
660   *           need of size below this, create on heap ByteBuffer.
661   * @param reqLen Bytes count in request
662   */
663  @VisibleForTesting
664  static Pair<ByteBuff, CallCleanup> allocateByteBuffToReadInto(ByteBufferPool pool,
665      int minSizeForPoolUse, int reqLen) {
666    ByteBuff resultBuf;
667    List<ByteBuffer> bbs = new ArrayList<>((reqLen / pool.getBufferSize()) + 1);
668    int remain = reqLen;
669    ByteBuffer buf = null;
670    while (remain >= minSizeForPoolUse && (buf = pool.getBuffer()) != null) {
671      bbs.add(buf);
672      remain -= pool.getBufferSize();
673    }
674    ByteBuffer[] bufsFromPool = null;
675    if (bbs.size() > 0) {
676      bufsFromPool = new ByteBuffer[bbs.size()];
677      bbs.toArray(bufsFromPool);
678    }
679    if (remain > 0) {
680      bbs.add(ByteBuffer.allocate(remain));
681    }
682    if (bbs.size() > 1) {
683      ByteBuffer[] items = new ByteBuffer[bbs.size()];
684      bbs.toArray(items);
685      resultBuf = new MultiByteBuff(items);
686    } else {
687      // We are backed by single BB
688      resultBuf = new SingleByteBuff(bbs.get(0));
689    }
690    resultBuf.limit(reqLen);
691    if (bufsFromPool != null) {
692      final ByteBuffer[] bufsFromPoolFinal = bufsFromPool;
693      return new Pair<>(resultBuf, () -> {
694        // Return back all the BBs to pool
695        for (int i = 0; i < bufsFromPoolFinal.length; i++) {
696          pool.putbackBuffer(bufsFromPoolFinal[i]);
697        }
698      });
699    }
700    return new Pair<>(resultBuf, null);
701  }
702
703  /**
704   * Needed for features such as delayed calls.  We need to be able to store the current call
705   * so that we can complete it later or ask questions of what is supported by the current ongoing
706   * call.
707   * @return An RpcCallContext backed by the currently ongoing call (gotten from a thread local)
708   */
709  public static Optional<RpcCall> getCurrentCall() {
710    return Optional.ofNullable(CurCall.get());
711  }
712
713  public static boolean isInRpcCallContext() {
714    return CurCall.get() != null;
715  }
716
717  /**
718   * Returns the user credentials associated with the current RPC request or not present if no
719   * credentials were provided.
720   * @return A User
721   */
722  public static Optional<User> getRequestUser() {
723    Optional<RpcCall> ctx = getCurrentCall();
724    return ctx.isPresent() ? ctx.get().getRequestUser() : Optional.empty();
725  }
726
727  /**
728   * The number of open RPC conections
729   * @return the number of open rpc connections
730   */
731  abstract public int getNumOpenConnections();
732
733  /**
734   * Returns the username for any user associated with the current RPC
735   * request or not present if no user is set.
736   */
737  public static Optional<String> getRequestUserName() {
738    return getRequestUser().map(User::getShortName);
739  }
740
741  /**
742   * @return Address of remote client if a request is ongoing, else null
743   */
744  public static Optional<InetAddress> getRemoteAddress() {
745    return getCurrentCall().map(RpcCall::getRemoteAddress);
746  }
747
748  /**
749   * @param serviceName Some arbitrary string that represents a 'service'.
750   * @param services Available service instances
751   * @return Matching BlockingServiceAndInterface pair
752   */
753  protected static BlockingServiceAndInterface getServiceAndInterface(
754      final List<BlockingServiceAndInterface> services, final String serviceName) {
755    for (BlockingServiceAndInterface bs : services) {
756      if (bs.getBlockingService().getDescriptorForType().getName().equals(serviceName)) {
757        return bs;
758      }
759    }
760    return null;
761  }
762
763  /**
764   * @param serviceName Some arbitrary string that represents a 'service'.
765   * @param services Available services and their service interfaces.
766   * @return Service interface class for <code>serviceName</code>
767   */
768  protected static Class<?> getServiceInterface(
769      final List<BlockingServiceAndInterface> services,
770      final String serviceName) {
771    BlockingServiceAndInterface bsasi =
772        getServiceAndInterface(services, serviceName);
773    return bsasi == null? null: bsasi.getServiceInterface();
774  }
775
776  /**
777   * @param serviceName Some arbitrary string that represents a 'service'.
778   * @param services Available services and their service interfaces.
779   * @return BlockingService that goes with the passed <code>serviceName</code>
780   */
781  protected static BlockingService getService(
782      final List<BlockingServiceAndInterface> services,
783      final String serviceName) {
784    BlockingServiceAndInterface bsasi =
785        getServiceAndInterface(services, serviceName);
786    return bsasi == null? null: bsasi.getBlockingService();
787  }
788
789  protected static MonitoredRPCHandler getStatus() {
790    // It is ugly the way we park status up in RpcServer.  Let it be for now.  TODO.
791    MonitoredRPCHandler status = RpcServer.MONITORED_RPC.get();
792    if (status != null) {
793      return status;
794    }
795    status = TaskMonitor.get().createRPCStatus(Thread.currentThread().getName());
796    status.pause("Waiting for a call");
797    RpcServer.MONITORED_RPC.set(status);
798    return status;
799  }
800
801  /** Returns the remote side ip address when invoked inside an RPC
802   *  Returns null incase of an error.
803   *  @return InetAddress
804   */
805  public static InetAddress getRemoteIp() {
806    RpcCall call = CurCall.get();
807    if (call != null) {
808      return call.getRemoteAddress();
809    }
810    return null;
811  }
812
813  @Override
814  public RpcScheduler getScheduler() {
815    return scheduler;
816  }
817
818  @Override
819  public void setRsRpcServices(RSRpcServices rsRpcServices) {
820    this.rsRpcServices = rsRpcServices;
821  }
822}