001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.ipc; 020 021import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION; 022 023import java.io.IOException; 024import java.net.InetAddress; 025import java.net.InetSocketAddress; 026import java.nio.ByteBuffer; 027import java.nio.channels.ReadableByteChannel; 028import java.nio.channels.WritableByteChannel; 029import java.util.ArrayList; 030import java.util.Collections; 031import java.util.HashMap; 032import java.util.List; 033import java.util.Locale; 034import java.util.Map; 035import java.util.Optional; 036import java.util.concurrent.atomic.LongAdder; 037 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.hbase.CallQueueTooBigException; 040import org.apache.hadoop.hbase.CellScanner; 041import org.apache.hadoop.hbase.DoNotRetryIOException; 042import org.apache.hadoop.hbase.HConstants; 043import org.apache.hadoop.hbase.Server; 044import org.apache.hadoop.hbase.conf.ConfigurationObserver; 045import org.apache.hadoop.hbase.exceptions.RequestTooBigException; 046import org.apache.hadoop.hbase.io.ByteBufferPool; 047import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; 048import org.apache.hadoop.hbase.monitoring.TaskMonitor; 049import org.apache.hadoop.hbase.nio.ByteBuff; 050import org.apache.hadoop.hbase.nio.MultiByteBuff; 051import org.apache.hadoop.hbase.nio.SingleByteBuff; 052import org.apache.hadoop.hbase.regionserver.RSRpcServices; 053import org.apache.hadoop.hbase.security.SaslUtil; 054import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection; 055import org.apache.hadoop.hbase.security.User; 056import org.apache.hadoop.hbase.security.UserProvider; 057import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager; 058import org.apache.hadoop.hbase.util.Pair; 059import org.apache.hadoop.security.UserGroupInformation; 060import org.apache.hadoop.security.authorize.AuthorizationException; 061import org.apache.hadoop.security.authorize.PolicyProvider; 062import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; 063import org.apache.hadoop.security.token.SecretManager; 064import org.apache.hadoop.security.token.TokenIdentifier; 065import org.apache.yetus.audience.InterfaceAudience; 066import org.slf4j.Logger; 067import org.slf4j.LoggerFactory; 068 069import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 070import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; 071import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 072import org.apache.hbase.thirdparty.com.google.protobuf.Message; 073import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 074import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 075import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 076import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 077import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader; 078 079import com.fasterxml.jackson.databind.ObjectMapper; 080 081/** 082 * An RPC server that hosts protobuf described Services. 083 * 084 */ 085@InterfaceAudience.Private 086public abstract class RpcServer implements RpcServerInterface, 087 ConfigurationObserver { 088 // LOG is being used in CallRunner and the log level is being changed in tests 089 public static final Logger LOG = LoggerFactory.getLogger(RpcServer.class); 090 protected static final CallQueueTooBigException CALL_QUEUE_TOO_BIG_EXCEPTION 091 = new CallQueueTooBigException(); 092 093 private final boolean authorize; 094 protected boolean isSecurityEnabled; 095 096 public static final byte CURRENT_VERSION = 0; 097 098 /** 099 * Whether we allow a fallback to SIMPLE auth for insecure clients when security is enabled. 100 */ 101 public static final String FALLBACK_TO_INSECURE_CLIENT_AUTH = 102 "hbase.ipc.server.fallback-to-simple-auth-allowed"; 103 104 /** 105 * How many calls/handler are allowed in the queue. 106 */ 107 protected static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10; 108 109 protected final CellBlockBuilder cellBlockBuilder; 110 111 protected static final String AUTH_FAILED_FOR = "Auth failed for "; 112 protected static final String AUTH_SUCCESSFUL_FOR = "Auth successful for "; 113 protected static final Logger AUDITLOG = LoggerFactory.getLogger("SecurityLogger." 114 + Server.class.getName()); 115 protected SecretManager<TokenIdentifier> secretManager; 116 protected final Map<String, String> saslProps; 117 118 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC", 119 justification="Start is synchronized so authManager creation is single-threaded") 120 protected ServiceAuthorizationManager authManager; 121 122 /** This is set to Call object before Handler invokes an RPC and ybdie 123 * after the call returns. 124 */ 125 protected static final ThreadLocal<RpcCall> CurCall = new ThreadLocal<>(); 126 127 /** Keeps MonitoredRPCHandler per handler thread. */ 128 protected static final ThreadLocal<MonitoredRPCHandler> MONITORED_RPC = new ThreadLocal<>(); 129 130 protected final InetSocketAddress bindAddress; 131 132 protected MetricsHBaseServer metrics; 133 134 protected final Configuration conf; 135 136 /** 137 * Maximum size in bytes of the currently queued and running Calls. If a new Call puts us over 138 * this size, then we will reject the call (after parsing it though). It will go back to the 139 * client and client will retry. Set this size with "hbase.ipc.server.max.callqueue.size". The 140 * call queue size gets incremented after we parse a call and before we add it to the queue of 141 * calls for the scheduler to use. It get decremented after we have 'run' the Call. The current 142 * size is kept in {@link #callQueueSizeInBytes}. 143 * @see #callQueueSizeInBytes 144 * @see #DEFAULT_MAX_CALLQUEUE_SIZE 145 */ 146 protected final long maxQueueSizeInBytes; 147 protected static final int DEFAULT_MAX_CALLQUEUE_SIZE = 1024 * 1024 * 1024; 148 149 /** 150 * This is a running count of the size in bytes of all outstanding calls whether currently 151 * executing or queued waiting to be run. 152 */ 153 protected final LongAdder callQueueSizeInBytes = new LongAdder(); 154 155 protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm 156 protected final boolean tcpKeepAlive; // if T then use keepalives 157 158 /** 159 * This flag is used to indicate to sub threads when they should go down. When we call 160 * {@link #start()}, all threads started will consult this flag on whether they should 161 * keep going. It is set to false when {@link #stop()} is called. 162 */ 163 volatile boolean running = true; 164 165 /** 166 * This flag is set to true after all threads are up and 'running' and the server is then opened 167 * for business by the call to {@link #start()}. 168 */ 169 volatile boolean started = false; 170 171 protected AuthenticationTokenSecretManager authTokenSecretMgr = null; 172 173 protected HBaseRPCErrorHandler errorHandler = null; 174 175 public static final String MAX_REQUEST_SIZE = "hbase.ipc.max.request.size"; 176 protected static final RequestTooBigException REQUEST_TOO_BIG_EXCEPTION = 177 new RequestTooBigException(); 178 179 protected static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time"; 180 protected static final String WARN_RESPONSE_SIZE = "hbase.ipc.warn.response.size"; 181 182 /** 183 * Minimum allowable timeout (in milliseconds) in rpc request's header. This 184 * configuration exists to prevent the rpc service regarding this request as timeout immediately. 185 */ 186 protected static final String MIN_CLIENT_REQUEST_TIMEOUT = "hbase.ipc.min.client.request.timeout"; 187 protected static final int DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT = 20; 188 189 /** Default value for above params */ 190 public static final int DEFAULT_MAX_REQUEST_SIZE = DEFAULT_MAX_CALLQUEUE_SIZE / 4; // 256M 191 protected static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds 192 protected static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024; 193 194 protected static final int DEFAULT_TRACE_LOG_MAX_LENGTH = 1000; 195 protected static final String TRACE_LOG_MAX_LENGTH = "hbase.ipc.trace.log.max.length"; 196 protected static final String KEY_WORD_TRUNCATED = " <TRUNCATED>"; 197 198 protected static final ObjectMapper MAPPER = new ObjectMapper(); 199 200 protected final int maxRequestSize; 201 protected final int warnResponseTime; 202 protected final int warnResponseSize; 203 204 protected final int minClientRequestTimeout; 205 206 protected final Server server; 207 protected final List<BlockingServiceAndInterface> services; 208 209 protected final RpcScheduler scheduler; 210 211 protected UserProvider userProvider; 212 213 protected final ByteBufferPool reservoir; 214 // The requests and response will use buffers from ByteBufferPool, when the size of the 215 // request/response is at least this size. 216 // We make this to be 1/6th of the pool buffer size. 217 protected final int minSizeForReservoirUse; 218 219 protected volatile boolean allowFallbackToSimpleAuth; 220 221 /** 222 * Used to get details for scan with a scanner_id<br/> 223 * TODO try to figure out a better way and remove reference from regionserver package later. 224 */ 225 private RSRpcServices rsRpcServices; 226 227 @FunctionalInterface 228 protected static interface CallCleanup { 229 void run(); 230 } 231 232 /** 233 * Datastructure for passing a {@link BlockingService} and its associated class of 234 * protobuf service interface. For example, a server that fielded what is defined 235 * in the client protobuf service would pass in an implementation of the client blocking service 236 * and then its ClientService.BlockingInterface.class. Used checking connection setup. 237 */ 238 public static class BlockingServiceAndInterface { 239 private final BlockingService service; 240 private final Class<?> serviceInterface; 241 public BlockingServiceAndInterface(final BlockingService service, 242 final Class<?> serviceInterface) { 243 this.service = service; 244 this.serviceInterface = serviceInterface; 245 } 246 public Class<?> getServiceInterface() { 247 return this.serviceInterface; 248 } 249 public BlockingService getBlockingService() { 250 return this.service; 251 } 252 } 253 254 /** 255 * Constructs a server listening on the named port and address. 256 * @param server hosting instance of {@link Server}. We will do authentications if an 257 * instance else pass null for no authentication check. 258 * @param name Used keying this rpc servers' metrics and for naming the Listener thread. 259 * @param services A list of services. 260 * @param bindAddress Where to listen 261 * @param conf 262 * @param scheduler 263 * @param reservoirEnabled Enable ByteBufferPool or not. 264 */ 265 public RpcServer(final Server server, final String name, 266 final List<BlockingServiceAndInterface> services, 267 final InetSocketAddress bindAddress, Configuration conf, 268 RpcScheduler scheduler, boolean reservoirEnabled) throws IOException { 269 if (reservoirEnabled) { 270 int poolBufSize = conf.getInt(ByteBufferPool.BUFFER_SIZE_KEY, 271 ByteBufferPool.DEFAULT_BUFFER_SIZE); 272 // The max number of buffers to be pooled in the ByteBufferPool. The default value been 273 // selected based on the #handlers configured. When it is read request, 2 MB is the max size 274 // at which we will send back one RPC request. Means max we need 2 MB for creating the 275 // response cell block. (Well it might be much lesser than this because in 2 MB size calc, we 276 // include the heap size overhead of each cells also.) Considering 2 MB, we will need 277 // (2 * 1024 * 1024) / poolBufSize buffers to make the response cell block. Pool buffer size 278 // is by default 64 KB. 279 // In case of read request, at the end of the handler process, we will make the response 280 // cellblock and add the Call to connection's response Q and a single Responder thread takes 281 // connections and responses from that one by one and do the socket write. So there is chances 282 // that by the time a handler originated response is actually done writing to socket and so 283 // released the BBs it used, the handler might have processed one more read req. On an avg 2x 284 // we consider and consider that also for the max buffers to pool 285 int bufsForTwoMB = (2 * 1024 * 1024) / poolBufSize; 286 int maxPoolSize = conf.getInt(ByteBufferPool.MAX_POOL_SIZE_KEY, 287 conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, 288 HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) * bufsForTwoMB * 2); 289 this.reservoir = new ByteBufferPool(poolBufSize, maxPoolSize); 290 this.minSizeForReservoirUse = getMinSizeForReservoirUse(this.reservoir); 291 } else { 292 reservoir = null; 293 this.minSizeForReservoirUse = Integer.MAX_VALUE;// reservoir itself not in place. 294 } 295 this.server = server; 296 this.services = services; 297 this.bindAddress = bindAddress; 298 this.conf = conf; 299 // See declaration above for documentation on what this size is. 300 this.maxQueueSizeInBytes = 301 this.conf.getLong("hbase.ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE); 302 303 this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME); 304 this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE, DEFAULT_WARN_RESPONSE_SIZE); 305 this.minClientRequestTimeout = conf.getInt(MIN_CLIENT_REQUEST_TIMEOUT, 306 DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT); 307 this.maxRequestSize = conf.getInt(MAX_REQUEST_SIZE, DEFAULT_MAX_REQUEST_SIZE); 308 309 this.metrics = new MetricsHBaseServer(name, new MetricsHBaseServerWrapperImpl(this)); 310 this.tcpNoDelay = conf.getBoolean("hbase.ipc.server.tcpnodelay", true); 311 this.tcpKeepAlive = conf.getBoolean("hbase.ipc.server.tcpkeepalive", true); 312 313 this.cellBlockBuilder = new CellBlockBuilder(conf); 314 315 this.authorize = conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false); 316 this.userProvider = UserProvider.instantiate(conf); 317 this.isSecurityEnabled = userProvider.isHBaseSecurityEnabled(); 318 if (isSecurityEnabled) { 319 saslProps = SaslUtil.initSaslProperties(conf.get("hbase.rpc.protection", 320 QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT))); 321 } else { 322 saslProps = Collections.emptyMap(); 323 } 324 325 this.scheduler = scheduler; 326 } 327 328 @VisibleForTesting 329 static int getMinSizeForReservoirUse(ByteBufferPool pool) { 330 return pool.getBufferSize() / 6; 331 } 332 333 @Override 334 public void onConfigurationChange(Configuration newConf) { 335 initReconfigurable(newConf); 336 if (scheduler instanceof ConfigurationObserver) { 337 ((ConfigurationObserver) scheduler).onConfigurationChange(newConf); 338 } 339 } 340 341 protected void initReconfigurable(Configuration confToLoad) { 342 this.allowFallbackToSimpleAuth = confToLoad.getBoolean(FALLBACK_TO_INSECURE_CLIENT_AUTH, false); 343 if (isSecurityEnabled && allowFallbackToSimpleAuth) { 344 LOG.warn("********* WARNING! *********"); 345 LOG.warn("This server is configured to allow connections from INSECURE clients"); 346 LOG.warn("(" + FALLBACK_TO_INSECURE_CLIENT_AUTH + " = true)."); 347 LOG.warn("While this option is enabled, client identities cannot be secured, and user"); 348 LOG.warn("impersonation is possible!"); 349 LOG.warn("For secure operation, please disable SIMPLE authentication as soon as possible,"); 350 LOG.warn("by setting " + FALLBACK_TO_INSECURE_CLIENT_AUTH + " = false in hbase-site.xml"); 351 LOG.warn("****************************"); 352 } 353 } 354 355 Configuration getConf() { 356 return conf; 357 } 358 359 @Override 360 public boolean isStarted() { 361 return this.started; 362 } 363 364 @Override 365 public void refreshAuthManager(PolicyProvider pp) { 366 // Ignore warnings that this should be accessed in a static way instead of via an instance; 367 // it'll break if you go via static route. 368 synchronized (authManager) { 369 authManager.refresh(this.conf, pp); 370 } 371 } 372 373 protected AuthenticationTokenSecretManager createSecretManager() { 374 if (!isSecurityEnabled) return null; 375 if (server == null) return null; 376 Configuration conf = server.getConfiguration(); 377 long keyUpdateInterval = 378 conf.getLong("hbase.auth.key.update.interval", 24*60*60*1000); 379 long maxAge = 380 conf.getLong("hbase.auth.token.max.lifetime", 7*24*60*60*1000); 381 return new AuthenticationTokenSecretManager(conf, server.getZooKeeper(), 382 server.getServerName().toString(), keyUpdateInterval, maxAge); 383 } 384 385 public SecretManager<? extends TokenIdentifier> getSecretManager() { 386 return this.secretManager; 387 } 388 389 @SuppressWarnings("unchecked") 390 public void setSecretManager(SecretManager<? extends TokenIdentifier> secretManager) { 391 this.secretManager = (SecretManager<TokenIdentifier>) secretManager; 392 } 393 394 /** 395 * This is a server side method, which is invoked over RPC. On success 396 * the return response has protobuf response payload. On failure, the 397 * exception name and the stack trace are returned in the protobuf response. 398 */ 399 @Override 400 public Pair<Message, CellScanner> call(RpcCall call, 401 MonitoredRPCHandler status) throws IOException { 402 try { 403 MethodDescriptor md = call.getMethod(); 404 Message param = call.getParam(); 405 status.setRPC(md.getName(), new Object[]{param}, 406 call.getReceiveTime()); 407 // TODO: Review after we add in encoded data blocks. 408 status.setRPCPacket(param); 409 status.resume("Servicing call"); 410 //get an instance of the method arg type 411 HBaseRpcController controller = new HBaseRpcControllerImpl(call.getCellScanner()); 412 controller.setCallTimeout(call.getTimeout()); 413 Message result = call.getService().callBlockingMethod(md, controller, param); 414 long receiveTime = call.getReceiveTime(); 415 long startTime = call.getStartTime(); 416 long endTime = System.currentTimeMillis(); 417 int processingTime = (int) (endTime - startTime); 418 int qTime = (int) (startTime - receiveTime); 419 int totalTime = (int) (endTime - receiveTime); 420 if (LOG.isTraceEnabled()) { 421 LOG.trace(CurCall.get().toString() + 422 ", response " + TextFormat.shortDebugString(result) + 423 " queueTime: " + qTime + 424 " processingTime: " + processingTime + 425 " totalTime: " + totalTime); 426 } 427 // Use the raw request call size for now. 428 long requestSize = call.getSize(); 429 long responseSize = result.getSerializedSize(); 430 if (call.isClientCellBlockSupported()) { 431 // Include the payload size in HBaseRpcController 432 responseSize += call.getResponseCellSize(); 433 } 434 435 metrics.dequeuedCall(qTime); 436 metrics.processedCall(processingTime); 437 metrics.totalCall(totalTime); 438 metrics.receivedRequest(requestSize); 439 metrics.sentResponse(responseSize); 440 // log any RPC responses that are slower than the configured warn 441 // response time or larger than configured warning size 442 boolean tooSlow = (processingTime > warnResponseTime && warnResponseTime > -1); 443 boolean tooLarge = (responseSize > warnResponseSize && warnResponseSize > -1); 444 if (tooSlow || tooLarge) { 445 // when tagging, we let TooLarge trump TooSmall to keep output simple 446 // note that large responses will often also be slow. 447 logResponse(param, 448 md.getName(), md.getName() + "(" + param.getClass().getName() + ")", 449 (tooLarge ? "TooLarge" : "TooSlow"), 450 status.getClient(), startTime, processingTime, qTime, 451 responseSize); 452 } 453 return new Pair<>(result, controller.cellScanner()); 454 } catch (Throwable e) { 455 // The above callBlockingMethod will always return a SE. Strip the SE wrapper before 456 // putting it on the wire. Its needed to adhere to the pb Service Interface but we don't 457 // need to pass it over the wire. 458 if (e instanceof ServiceException) { 459 if (e.getCause() == null) { 460 LOG.debug("Caught a ServiceException with null cause", e); 461 } else { 462 e = e.getCause(); 463 } 464 } 465 466 // increment the number of requests that were exceptions. 467 metrics.exception(e); 468 469 if (e instanceof LinkageError) throw new DoNotRetryIOException(e); 470 if (e instanceof IOException) throw (IOException)e; 471 LOG.error("Unexpected throwable object ", e); 472 throw new IOException(e.getMessage(), e); 473 } 474 } 475 476 /** 477 * Logs an RPC response to the LOG file, producing valid JSON objects for 478 * client Operations. 479 * @param param The parameters received in the call. 480 * @param methodName The name of the method invoked 481 * @param call The string representation of the call 482 * @param tag The tag that will be used to indicate this event in the log. 483 * @param clientAddress The address of the client who made this call. 484 * @param startTime The time that the call was initiated, in ms. 485 * @param processingTime The duration that the call took to run, in ms. 486 * @param qTime The duration that the call spent on the queue 487 * prior to being initiated, in ms. 488 * @param responseSize The size in bytes of the response buffer. 489 */ 490 void logResponse(Message param, String methodName, String call, String tag, 491 String clientAddress, long startTime, int processingTime, int qTime, 492 long responseSize) throws IOException { 493 // base information that is reported regardless of type of call 494 Map<String, Object> responseInfo = new HashMap<>(); 495 responseInfo.put("starttimems", startTime); 496 responseInfo.put("processingtimems", processingTime); 497 responseInfo.put("queuetimems", qTime); 498 responseInfo.put("responsesize", responseSize); 499 responseInfo.put("client", clientAddress); 500 responseInfo.put("class", server == null? "": server.getClass().getSimpleName()); 501 responseInfo.put("method", methodName); 502 responseInfo.put("call", call); 503 // The params could be really big, make sure they don't kill us at WARN 504 String stringifiedParam = ProtobufUtil.getShortTextFormat(param); 505 if (stringifiedParam.length() > 150) { 506 // Truncate to 1000 chars if TRACE is on, else to 150 chars 507 stringifiedParam = truncateTraceLog(stringifiedParam); 508 } 509 responseInfo.put("param", stringifiedParam); 510 if (param instanceof ClientProtos.ScanRequest && rsRpcServices != null) { 511 ClientProtos.ScanRequest request = ((ClientProtos.ScanRequest) param); 512 if (request.hasScannerId()) { 513 long scannerId = request.getScannerId(); 514 String scanDetails = rsRpcServices.getScanDetailsWithId(scannerId); 515 if (scanDetails != null) { 516 responseInfo.put("scandetails", scanDetails); 517 } 518 } 519 } 520 LOG.warn("(response" + tag + "): " + MAPPER.writeValueAsString(responseInfo)); 521 } 522 523 /** 524 * Truncate to number of chars decided by conf hbase.ipc.trace.log.max.length 525 * if TRACE is on else to 150 chars Refer to Jira HBASE-20826 and HBASE-20942 526 * @param strParam stringifiedParam to be truncated 527 * @return truncated trace log string 528 */ 529 @VisibleForTesting 530 String truncateTraceLog(String strParam) { 531 if (LOG.isTraceEnabled()) { 532 int traceLogMaxLength = getConf().getInt(TRACE_LOG_MAX_LENGTH, DEFAULT_TRACE_LOG_MAX_LENGTH); 533 int truncatedLength = 534 strParam.length() < traceLogMaxLength ? strParam.length() : traceLogMaxLength; 535 String truncatedFlag = truncatedLength == strParam.length() ? "" : KEY_WORD_TRUNCATED; 536 return strParam.subSequence(0, truncatedLength) + truncatedFlag; 537 } 538 return strParam.subSequence(0, 150) + KEY_WORD_TRUNCATED; 539 } 540 541 /** 542 * Set the handler for calling out of RPC for error conditions. 543 * @param handler the handler implementation 544 */ 545 @Override 546 public void setErrorHandler(HBaseRPCErrorHandler handler) { 547 this.errorHandler = handler; 548 } 549 550 @Override 551 public HBaseRPCErrorHandler getErrorHandler() { 552 return this.errorHandler; 553 } 554 555 /** 556 * Returns the metrics instance for reporting RPC call statistics 557 */ 558 @Override 559 public MetricsHBaseServer getMetrics() { 560 return metrics; 561 } 562 563 @Override 564 public void addCallSize(final long diff) { 565 this.callQueueSizeInBytes.add(diff); 566 } 567 568 /** 569 * Authorize the incoming client connection. 570 * @param user client user 571 * @param connection incoming connection 572 * @param addr InetAddress of incoming connection 573 * @throws AuthorizationException when the client isn't authorized to talk the protocol 574 */ 575 public void authorize(UserGroupInformation user, ConnectionHeader connection, 576 InetAddress addr) throws AuthorizationException { 577 if (authorize) { 578 Class<?> c = getServiceInterface(services, connection.getServiceName()); 579 synchronized (authManager) { 580 authManager.authorize(user, c, getConf(), addr); 581 } 582 } 583 } 584 585 /** 586 * When the read or write buffer size is larger than this limit, i/o will be 587 * done in chunks of this size. Most RPC requests and responses would be 588 * be smaller. 589 */ 590 protected static final int NIO_BUFFER_LIMIT = 64 * 1024; //should not be more than 64KB. 591 592 /** 593 * This is a wrapper around {@link java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)}. 594 * If the amount of data is large, it writes to channel in smaller chunks. 595 * This is to avoid jdk from creating many direct buffers as the size of 596 * ByteBuffer increases. There should not be any performance degredation. 597 * 598 * @param channel writable byte channel to write on 599 * @param buffer buffer to write 600 * @return number of bytes written 601 * @throws java.io.IOException e 602 * @see java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer) 603 */ 604 protected int channelRead(ReadableByteChannel channel, 605 ByteBuffer buffer) throws IOException { 606 607 int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ? 608 channel.read(buffer) : channelIO(channel, null, buffer); 609 if (count > 0) { 610 metrics.receivedBytes(count); 611 } 612 return count; 613 } 614 615 /** 616 * Helper for {@link #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer). 617 * Only one of readCh or writeCh should be non-null. 618 * 619 * @param readCh read channel 620 * @param writeCh write channel 621 * @param buf buffer to read or write into/out of 622 * @return bytes written 623 * @throws java.io.IOException e 624 * @see #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer) 625 */ 626 private static int channelIO(ReadableByteChannel readCh, 627 WritableByteChannel writeCh, 628 ByteBuffer buf) throws IOException { 629 630 int originalLimit = buf.limit(); 631 int initialRemaining = buf.remaining(); 632 int ret = 0; 633 634 while (buf.remaining() > 0) { 635 try { 636 int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT); 637 buf.limit(buf.position() + ioSize); 638 639 ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf); 640 641 if (ret < ioSize) { 642 break; 643 } 644 645 } finally { 646 buf.limit(originalLimit); 647 } 648 } 649 650 int nBytes = initialRemaining - buf.remaining(); 651 return (nBytes > 0) ? nBytes : ret; 652 } 653 654 /** 655 * This is extracted to a static method for better unit testing. We try to get buffer(s) from pool 656 * as much as possible. 657 * 658 * @param pool The ByteBufferPool to use 659 * @param minSizeForPoolUse Only for buffer size above this, we will try to use pool. Any buffer 660 * need of size below this, create on heap ByteBuffer. 661 * @param reqLen Bytes count in request 662 */ 663 @VisibleForTesting 664 static Pair<ByteBuff, CallCleanup> allocateByteBuffToReadInto(ByteBufferPool pool, 665 int minSizeForPoolUse, int reqLen) { 666 ByteBuff resultBuf; 667 List<ByteBuffer> bbs = new ArrayList<>((reqLen / pool.getBufferSize()) + 1); 668 int remain = reqLen; 669 ByteBuffer buf = null; 670 while (remain >= minSizeForPoolUse && (buf = pool.getBuffer()) != null) { 671 bbs.add(buf); 672 remain -= pool.getBufferSize(); 673 } 674 ByteBuffer[] bufsFromPool = null; 675 if (bbs.size() > 0) { 676 bufsFromPool = new ByteBuffer[bbs.size()]; 677 bbs.toArray(bufsFromPool); 678 } 679 if (remain > 0) { 680 bbs.add(ByteBuffer.allocate(remain)); 681 } 682 if (bbs.size() > 1) { 683 ByteBuffer[] items = new ByteBuffer[bbs.size()]; 684 bbs.toArray(items); 685 resultBuf = new MultiByteBuff(items); 686 } else { 687 // We are backed by single BB 688 resultBuf = new SingleByteBuff(bbs.get(0)); 689 } 690 resultBuf.limit(reqLen); 691 if (bufsFromPool != null) { 692 final ByteBuffer[] bufsFromPoolFinal = bufsFromPool; 693 return new Pair<>(resultBuf, () -> { 694 // Return back all the BBs to pool 695 for (int i = 0; i < bufsFromPoolFinal.length; i++) { 696 pool.putbackBuffer(bufsFromPoolFinal[i]); 697 } 698 }); 699 } 700 return new Pair<>(resultBuf, null); 701 } 702 703 /** 704 * Needed for features such as delayed calls. We need to be able to store the current call 705 * so that we can complete it later or ask questions of what is supported by the current ongoing 706 * call. 707 * @return An RpcCallContext backed by the currently ongoing call (gotten from a thread local) 708 */ 709 public static Optional<RpcCall> getCurrentCall() { 710 return Optional.ofNullable(CurCall.get()); 711 } 712 713 public static boolean isInRpcCallContext() { 714 return CurCall.get() != null; 715 } 716 717 /** 718 * Returns the user credentials associated with the current RPC request or not present if no 719 * credentials were provided. 720 * @return A User 721 */ 722 public static Optional<User> getRequestUser() { 723 Optional<RpcCall> ctx = getCurrentCall(); 724 return ctx.isPresent() ? ctx.get().getRequestUser() : Optional.empty(); 725 } 726 727 /** 728 * The number of open RPC conections 729 * @return the number of open rpc connections 730 */ 731 abstract public int getNumOpenConnections(); 732 733 /** 734 * Returns the username for any user associated with the current RPC 735 * request or not present if no user is set. 736 */ 737 public static Optional<String> getRequestUserName() { 738 return getRequestUser().map(User::getShortName); 739 } 740 741 /** 742 * @return Address of remote client if a request is ongoing, else null 743 */ 744 public static Optional<InetAddress> getRemoteAddress() { 745 return getCurrentCall().map(RpcCall::getRemoteAddress); 746 } 747 748 /** 749 * @param serviceName Some arbitrary string that represents a 'service'. 750 * @param services Available service instances 751 * @return Matching BlockingServiceAndInterface pair 752 */ 753 protected static BlockingServiceAndInterface getServiceAndInterface( 754 final List<BlockingServiceAndInterface> services, final String serviceName) { 755 for (BlockingServiceAndInterface bs : services) { 756 if (bs.getBlockingService().getDescriptorForType().getName().equals(serviceName)) { 757 return bs; 758 } 759 } 760 return null; 761 } 762 763 /** 764 * @param serviceName Some arbitrary string that represents a 'service'. 765 * @param services Available services and their service interfaces. 766 * @return Service interface class for <code>serviceName</code> 767 */ 768 protected static Class<?> getServiceInterface( 769 final List<BlockingServiceAndInterface> services, 770 final String serviceName) { 771 BlockingServiceAndInterface bsasi = 772 getServiceAndInterface(services, serviceName); 773 return bsasi == null? null: bsasi.getServiceInterface(); 774 } 775 776 /** 777 * @param serviceName Some arbitrary string that represents a 'service'. 778 * @param services Available services and their service interfaces. 779 * @return BlockingService that goes with the passed <code>serviceName</code> 780 */ 781 protected static BlockingService getService( 782 final List<BlockingServiceAndInterface> services, 783 final String serviceName) { 784 BlockingServiceAndInterface bsasi = 785 getServiceAndInterface(services, serviceName); 786 return bsasi == null? null: bsasi.getBlockingService(); 787 } 788 789 protected static MonitoredRPCHandler getStatus() { 790 // It is ugly the way we park status up in RpcServer. Let it be for now. TODO. 791 MonitoredRPCHandler status = RpcServer.MONITORED_RPC.get(); 792 if (status != null) { 793 return status; 794 } 795 status = TaskMonitor.get().createRPCStatus(Thread.currentThread().getName()); 796 status.pause("Waiting for a call"); 797 RpcServer.MONITORED_RPC.set(status); 798 return status; 799 } 800 801 /** Returns the remote side ip address when invoked inside an RPC 802 * Returns null incase of an error. 803 * @return InetAddress 804 */ 805 public static InetAddress getRemoteIp() { 806 RpcCall call = CurCall.get(); 807 if (call != null) { 808 return call.getRemoteAddress(); 809 } 810 return null; 811 } 812 813 @Override 814 public RpcScheduler getScheduler() { 815 return scheduler; 816 } 817 818 @Override 819 public void setRsRpcServices(RSRpcServices rsRpcServices) { 820 this.rsRpcServices = rsRpcServices; 821 } 822}