001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.ipc; 020 021import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION; 022 023import java.io.IOException; 024import java.net.InetAddress; 025import java.net.InetSocketAddress; 026import java.nio.ByteBuffer; 027import java.nio.channels.ReadableByteChannel; 028import java.nio.channels.WritableByteChannel; 029import java.util.ArrayList; 030import java.util.Collections; 031import java.util.HashMap; 032import java.util.List; 033import java.util.Locale; 034import java.util.Map; 035import java.util.Optional; 036import java.util.concurrent.atomic.LongAdder; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.hbase.CallQueueTooBigException; 039import org.apache.hadoop.hbase.CellScanner; 040import org.apache.hadoop.hbase.DoNotRetryIOException; 041import org.apache.hadoop.hbase.HConstants; 042import org.apache.hadoop.hbase.Server; 043import org.apache.hadoop.hbase.conf.ConfigurationObserver; 044import org.apache.hadoop.hbase.exceptions.RequestTooBigException; 045import org.apache.hadoop.hbase.io.ByteBufferPool; 046import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; 047import org.apache.hadoop.hbase.monitoring.TaskMonitor; 048import org.apache.hadoop.hbase.nio.ByteBuff; 049import org.apache.hadoop.hbase.nio.MultiByteBuff; 050import org.apache.hadoop.hbase.nio.SingleByteBuff; 051import org.apache.hadoop.hbase.regionserver.RSRpcServices; 052import org.apache.hadoop.hbase.security.SaslUtil; 053import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection; 054import org.apache.hadoop.hbase.security.User; 055import org.apache.hadoop.hbase.security.UserProvider; 056import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager; 057import org.apache.hadoop.hbase.util.GsonUtil; 058import org.apache.hadoop.hbase.util.Pair; 059import org.apache.hadoop.security.UserGroupInformation; 060import org.apache.hadoop.security.authorize.AuthorizationException; 061import org.apache.hadoop.security.authorize.PolicyProvider; 062import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; 063import org.apache.hadoop.security.token.SecretManager; 064import org.apache.hadoop.security.token.TokenIdentifier; 065import org.apache.yetus.audience.InterfaceAudience; 066import org.slf4j.Logger; 067import org.slf4j.LoggerFactory; 068 069import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 070import org.apache.hbase.thirdparty.com.google.gson.Gson; 071import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; 072import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 073import org.apache.hbase.thirdparty.com.google.protobuf.Message; 074import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 075import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 076 077import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 078import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 079import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader; 080 081/** 082 * An RPC server that hosts protobuf described Services. 083 * 084 */ 085@InterfaceAudience.Private 086public abstract class RpcServer implements RpcServerInterface, 087 ConfigurationObserver { 088 // LOG is being used in CallRunner and the log level is being changed in tests 089 public static final Logger LOG = LoggerFactory.getLogger(RpcServer.class); 090 protected static final CallQueueTooBigException CALL_QUEUE_TOO_BIG_EXCEPTION 091 = new CallQueueTooBigException(); 092 093 private final boolean authorize; 094 protected boolean isSecurityEnabled; 095 096 public static final byte CURRENT_VERSION = 0; 097 098 /** 099 * Whether we allow a fallback to SIMPLE auth for insecure clients when security is enabled. 100 */ 101 public static final String FALLBACK_TO_INSECURE_CLIENT_AUTH = 102 "hbase.ipc.server.fallback-to-simple-auth-allowed"; 103 104 /** 105 * How many calls/handler are allowed in the queue. 106 */ 107 protected static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10; 108 109 protected final CellBlockBuilder cellBlockBuilder; 110 111 protected static final String AUTH_FAILED_FOR = "Auth failed for "; 112 protected static final String AUTH_SUCCESSFUL_FOR = "Auth successful for "; 113 protected static final Logger AUDITLOG = LoggerFactory.getLogger("SecurityLogger." 114 + Server.class.getName()); 115 protected SecretManager<TokenIdentifier> secretManager; 116 protected final Map<String, String> saslProps; 117 118 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC", 119 justification="Start is synchronized so authManager creation is single-threaded") 120 protected ServiceAuthorizationManager authManager; 121 122 /** This is set to Call object before Handler invokes an RPC and ybdie 123 * after the call returns. 124 */ 125 protected static final ThreadLocal<RpcCall> CurCall = new ThreadLocal<>(); 126 127 /** Keeps MonitoredRPCHandler per handler thread. */ 128 protected static final ThreadLocal<MonitoredRPCHandler> MONITORED_RPC = new ThreadLocal<>(); 129 130 protected final InetSocketAddress bindAddress; 131 132 protected MetricsHBaseServer metrics; 133 134 protected final Configuration conf; 135 136 /** 137 * Maximum size in bytes of the currently queued and running Calls. If a new Call puts us over 138 * this size, then we will reject the call (after parsing it though). It will go back to the 139 * client and client will retry. Set this size with "hbase.ipc.server.max.callqueue.size". The 140 * call queue size gets incremented after we parse a call and before we add it to the queue of 141 * calls for the scheduler to use. It get decremented after we have 'run' the Call. The current 142 * size is kept in {@link #callQueueSizeInBytes}. 143 * @see #callQueueSizeInBytes 144 * @see #DEFAULT_MAX_CALLQUEUE_SIZE 145 */ 146 protected final long maxQueueSizeInBytes; 147 protected static final int DEFAULT_MAX_CALLQUEUE_SIZE = 1024 * 1024 * 1024; 148 149 /** 150 * This is a running count of the size in bytes of all outstanding calls whether currently 151 * executing or queued waiting to be run. 152 */ 153 protected final LongAdder callQueueSizeInBytes = new LongAdder(); 154 155 protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm 156 protected final boolean tcpKeepAlive; // if T then use keepalives 157 158 /** 159 * This flag is used to indicate to sub threads when they should go down. When we call 160 * {@link #start()}, all threads started will consult this flag on whether they should 161 * keep going. It is set to false when {@link #stop()} is called. 162 */ 163 volatile boolean running = true; 164 165 /** 166 * This flag is set to true after all threads are up and 'running' and the server is then opened 167 * for business by the call to {@link #start()}. 168 */ 169 volatile boolean started = false; 170 171 protected AuthenticationTokenSecretManager authTokenSecretMgr = null; 172 173 protected HBaseRPCErrorHandler errorHandler = null; 174 175 public static final String MAX_REQUEST_SIZE = "hbase.ipc.max.request.size"; 176 protected static final RequestTooBigException REQUEST_TOO_BIG_EXCEPTION = 177 new RequestTooBigException(); 178 179 protected static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time"; 180 protected static final String WARN_RESPONSE_SIZE = "hbase.ipc.warn.response.size"; 181 182 /** 183 * Minimum allowable timeout (in milliseconds) in rpc request's header. This 184 * configuration exists to prevent the rpc service regarding this request as timeout immediately. 185 */ 186 protected static final String MIN_CLIENT_REQUEST_TIMEOUT = "hbase.ipc.min.client.request.timeout"; 187 protected static final int DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT = 20; 188 189 /** Default value for above params */ 190 public static final int DEFAULT_MAX_REQUEST_SIZE = DEFAULT_MAX_CALLQUEUE_SIZE / 4; // 256M 191 protected static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds 192 protected static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024; 193 194 protected static final int DEFAULT_TRACE_LOG_MAX_LENGTH = 1000; 195 protected static final String TRACE_LOG_MAX_LENGTH = "hbase.ipc.trace.log.max.length"; 196 protected static final String KEY_WORD_TRUNCATED = " <TRUNCATED>"; 197 198 protected static final Gson GSON = GsonUtil.createGson().create(); 199 200 protected final int maxRequestSize; 201 protected final int warnResponseTime; 202 protected final int warnResponseSize; 203 204 protected final int minClientRequestTimeout; 205 206 protected final Server server; 207 protected final List<BlockingServiceAndInterface> services; 208 209 protected final RpcScheduler scheduler; 210 211 protected UserProvider userProvider; 212 213 protected final ByteBufferPool reservoir; 214 // The requests and response will use buffers from ByteBufferPool, when the size of the 215 // request/response is at least this size. 216 // We make this to be 1/6th of the pool buffer size. 217 protected final int minSizeForReservoirUse; 218 219 protected volatile boolean allowFallbackToSimpleAuth; 220 221 /** 222 * Used to get details for scan with a scanner_id<br/> 223 * TODO try to figure out a better way and remove reference from regionserver package later. 224 */ 225 private RSRpcServices rsRpcServices; 226 227 @FunctionalInterface 228 protected static interface CallCleanup { 229 void run(); 230 } 231 232 /** 233 * Datastructure for passing a {@link BlockingService} and its associated class of 234 * protobuf service interface. For example, a server that fielded what is defined 235 * in the client protobuf service would pass in an implementation of the client blocking service 236 * and then its ClientService.BlockingInterface.class. Used checking connection setup. 237 */ 238 public static class BlockingServiceAndInterface { 239 private final BlockingService service; 240 private final Class<?> serviceInterface; 241 public BlockingServiceAndInterface(final BlockingService service, 242 final Class<?> serviceInterface) { 243 this.service = service; 244 this.serviceInterface = serviceInterface; 245 } 246 public Class<?> getServiceInterface() { 247 return this.serviceInterface; 248 } 249 public BlockingService getBlockingService() { 250 return this.service; 251 } 252 } 253 254 /** 255 * Constructs a server listening on the named port and address. 256 * @param server hosting instance of {@link Server}. We will do authentications if an 257 * instance else pass null for no authentication check. 258 * @param name Used keying this rpc servers' metrics and for naming the Listener thread. 259 * @param services A list of services. 260 * @param bindAddress Where to listen 261 * @param conf 262 * @param scheduler 263 * @param reservoirEnabled Enable ByteBufferPool or not. 264 */ 265 public RpcServer(final Server server, final String name, 266 final List<BlockingServiceAndInterface> services, 267 final InetSocketAddress bindAddress, Configuration conf, 268 RpcScheduler scheduler, boolean reservoirEnabled) throws IOException { 269 if (reservoirEnabled) { 270 int poolBufSize = conf.getInt(ByteBufferPool.BUFFER_SIZE_KEY, 271 ByteBufferPool.DEFAULT_BUFFER_SIZE); 272 // The max number of buffers to be pooled in the ByteBufferPool. The default value been 273 // selected based on the #handlers configured. When it is read request, 2 MB is the max size 274 // at which we will send back one RPC request. Means max we need 2 MB for creating the 275 // response cell block. (Well it might be much lesser than this because in 2 MB size calc, we 276 // include the heap size overhead of each cells also.) Considering 2 MB, we will need 277 // (2 * 1024 * 1024) / poolBufSize buffers to make the response cell block. Pool buffer size 278 // is by default 64 KB. 279 // In case of read request, at the end of the handler process, we will make the response 280 // cellblock and add the Call to connection's response Q and a single Responder thread takes 281 // connections and responses from that one by one and do the socket write. So there is chances 282 // that by the time a handler originated response is actually done writing to socket and so 283 // released the BBs it used, the handler might have processed one more read req. On an avg 2x 284 // we consider and consider that also for the max buffers to pool 285 int bufsForTwoMB = (2 * 1024 * 1024) / poolBufSize; 286 int maxPoolSize = conf.getInt(ByteBufferPool.MAX_POOL_SIZE_KEY, 287 conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, 288 HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) * bufsForTwoMB * 2); 289 this.reservoir = new ByteBufferPool(poolBufSize, maxPoolSize); 290 this.minSizeForReservoirUse = getMinSizeForReservoirUse(this.reservoir); 291 } else { 292 reservoir = null; 293 this.minSizeForReservoirUse = Integer.MAX_VALUE;// reservoir itself not in place. 294 } 295 this.server = server; 296 this.services = services; 297 this.bindAddress = bindAddress; 298 this.conf = conf; 299 // See declaration above for documentation on what this size is. 300 this.maxQueueSizeInBytes = 301 this.conf.getLong("hbase.ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE); 302 303 this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME); 304 this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE, DEFAULT_WARN_RESPONSE_SIZE); 305 this.minClientRequestTimeout = conf.getInt(MIN_CLIENT_REQUEST_TIMEOUT, 306 DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT); 307 this.maxRequestSize = conf.getInt(MAX_REQUEST_SIZE, DEFAULT_MAX_REQUEST_SIZE); 308 309 this.metrics = new MetricsHBaseServer(name, new MetricsHBaseServerWrapperImpl(this)); 310 this.tcpNoDelay = conf.getBoolean("hbase.ipc.server.tcpnodelay", true); 311 this.tcpKeepAlive = conf.getBoolean("hbase.ipc.server.tcpkeepalive", true); 312 313 this.cellBlockBuilder = new CellBlockBuilder(conf); 314 315 this.authorize = conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false); 316 this.userProvider = UserProvider.instantiate(conf); 317 this.isSecurityEnabled = userProvider.isHBaseSecurityEnabled(); 318 if (isSecurityEnabled) { 319 saslProps = SaslUtil.initSaslProperties(conf.get("hbase.rpc.protection", 320 QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT))); 321 } else { 322 saslProps = Collections.emptyMap(); 323 } 324 325 this.scheduler = scheduler; 326 } 327 328 @VisibleForTesting 329 static int getMinSizeForReservoirUse(ByteBufferPool pool) { 330 return pool.getBufferSize() / 6; 331 } 332 333 @Override 334 public void onConfigurationChange(Configuration newConf) { 335 initReconfigurable(newConf); 336 if (scheduler instanceof ConfigurationObserver) { 337 ((ConfigurationObserver) scheduler).onConfigurationChange(newConf); 338 } 339 } 340 341 protected void initReconfigurable(Configuration confToLoad) { 342 this.allowFallbackToSimpleAuth = confToLoad.getBoolean(FALLBACK_TO_INSECURE_CLIENT_AUTH, false); 343 if (isSecurityEnabled && allowFallbackToSimpleAuth) { 344 LOG.warn("********* WARNING! *********"); 345 LOG.warn("This server is configured to allow connections from INSECURE clients"); 346 LOG.warn("(" + FALLBACK_TO_INSECURE_CLIENT_AUTH + " = true)."); 347 LOG.warn("While this option is enabled, client identities cannot be secured, and user"); 348 LOG.warn("impersonation is possible!"); 349 LOG.warn("For secure operation, please disable SIMPLE authentication as soon as possible,"); 350 LOG.warn("by setting " + FALLBACK_TO_INSECURE_CLIENT_AUTH + " = false in hbase-site.xml"); 351 LOG.warn("****************************"); 352 } 353 } 354 355 Configuration getConf() { 356 return conf; 357 } 358 359 @Override 360 public boolean isStarted() { 361 return this.started; 362 } 363 364 @Override 365 public void refreshAuthManager(PolicyProvider pp) { 366 // Ignore warnings that this should be accessed in a static way instead of via an instance; 367 // it'll break if you go via static route. 368 synchronized (authManager) { 369 authManager.refresh(this.conf, pp); 370 } 371 } 372 373 protected AuthenticationTokenSecretManager createSecretManager() { 374 if (!isSecurityEnabled) return null; 375 if (server == null) return null; 376 Configuration conf = server.getConfiguration(); 377 long keyUpdateInterval = 378 conf.getLong("hbase.auth.key.update.interval", 24*60*60*1000); 379 long maxAge = 380 conf.getLong("hbase.auth.token.max.lifetime", 7*24*60*60*1000); 381 return new AuthenticationTokenSecretManager(conf, server.getZooKeeper(), 382 server.getServerName().toString(), keyUpdateInterval, maxAge); 383 } 384 385 public SecretManager<? extends TokenIdentifier> getSecretManager() { 386 return this.secretManager; 387 } 388 389 @SuppressWarnings("unchecked") 390 public void setSecretManager(SecretManager<? extends TokenIdentifier> secretManager) { 391 this.secretManager = (SecretManager<TokenIdentifier>) secretManager; 392 } 393 394 /** 395 * This is a server side method, which is invoked over RPC. On success 396 * the return response has protobuf response payload. On failure, the 397 * exception name and the stack trace are returned in the protobuf response. 398 */ 399 @Override 400 public Pair<Message, CellScanner> call(RpcCall call, 401 MonitoredRPCHandler status) throws IOException { 402 try { 403 MethodDescriptor md = call.getMethod(); 404 Message param = call.getParam(); 405 status.setRPC(md.getName(), new Object[]{param}, 406 call.getReceiveTime()); 407 // TODO: Review after we add in encoded data blocks. 408 status.setRPCPacket(param); 409 status.resume("Servicing call"); 410 //get an instance of the method arg type 411 HBaseRpcController controller = new HBaseRpcControllerImpl(call.getCellScanner()); 412 controller.setCallTimeout(call.getTimeout()); 413 Message result = call.getService().callBlockingMethod(md, controller, param); 414 long receiveTime = call.getReceiveTime(); 415 long startTime = call.getStartTime(); 416 long endTime = System.currentTimeMillis(); 417 int processingTime = (int) (endTime - startTime); 418 int qTime = (int) (startTime - receiveTime); 419 int totalTime = (int) (endTime - receiveTime); 420 if (LOG.isTraceEnabled()) { 421 LOG.trace(CurCall.get().toString() + 422 ", response " + TextFormat.shortDebugString(result) + 423 " queueTime: " + qTime + 424 " processingTime: " + processingTime + 425 " totalTime: " + totalTime); 426 } 427 // Use the raw request call size for now. 428 long requestSize = call.getSize(); 429 long responseSize = result.getSerializedSize(); 430 if (call.isClientCellBlockSupported()) { 431 // Include the payload size in HBaseRpcController 432 responseSize += call.getResponseCellSize(); 433 } 434 435 metrics.dequeuedCall(qTime); 436 metrics.processedCall(processingTime); 437 metrics.totalCall(totalTime); 438 metrics.receivedRequest(requestSize); 439 metrics.sentResponse(responseSize); 440 // log any RPC responses that are slower than the configured warn 441 // response time or larger than configured warning size 442 boolean tooSlow = (processingTime > warnResponseTime && warnResponseTime > -1); 443 boolean tooLarge = (responseSize > warnResponseSize && warnResponseSize > -1); 444 if (tooSlow || tooLarge) { 445 // when tagging, we let TooLarge trump TooSmall to keep output simple 446 // note that large responses will often also be slow. 447 logResponse(param, 448 md.getName(), md.getName() + "(" + param.getClass().getName() + ")", 449 (tooLarge ? "TooLarge" : "TooSlow"), 450 status.getClient(), startTime, processingTime, qTime, 451 responseSize); 452 } 453 return new Pair<>(result, controller.cellScanner()); 454 } catch (Throwable e) { 455 // The above callBlockingMethod will always return a SE. Strip the SE wrapper before 456 // putting it on the wire. Its needed to adhere to the pb Service Interface but we don't 457 // need to pass it over the wire. 458 if (e instanceof ServiceException) { 459 if (e.getCause() == null) { 460 LOG.debug("Caught a ServiceException with null cause", e); 461 } else { 462 e = e.getCause(); 463 } 464 } 465 466 // increment the number of requests that were exceptions. 467 metrics.exception(e); 468 469 if (e instanceof LinkageError) throw new DoNotRetryIOException(e); 470 if (e instanceof IOException) throw (IOException)e; 471 LOG.error("Unexpected throwable object ", e); 472 throw new IOException(e.getMessage(), e); 473 } 474 } 475 476 /** 477 * Logs an RPC response to the LOG file, producing valid JSON objects for 478 * client Operations. 479 * @param param The parameters received in the call. 480 * @param methodName The name of the method invoked 481 * @param call The string representation of the call 482 * @param tag The tag that will be used to indicate this event in the log. 483 * @param clientAddress The address of the client who made this call. 484 * @param startTime The time that the call was initiated, in ms. 485 * @param processingTime The duration that the call took to run, in ms. 486 * @param qTime The duration that the call spent on the queue 487 * prior to being initiated, in ms. 488 * @param responseSize The size in bytes of the response buffer. 489 */ 490 void logResponse(Message param, String methodName, String call, String tag, 491 String clientAddress, long startTime, int processingTime, int qTime, 492 long responseSize) throws IOException { 493 // base information that is reported regardless of type of call 494 Map<String, Object> responseInfo = new HashMap<>(); 495 responseInfo.put("starttimems", startTime); 496 responseInfo.put("processingtimems", processingTime); 497 responseInfo.put("queuetimems", qTime); 498 responseInfo.put("responsesize", responseSize); 499 responseInfo.put("client", clientAddress); 500 responseInfo.put("class", server == null? "": server.getClass().getSimpleName()); 501 responseInfo.put("method", methodName); 502 responseInfo.put("call", call); 503 // The params could be really big, make sure they don't kill us at WARN 504 String stringifiedParam = ProtobufUtil.getShortTextFormat(param); 505 if (stringifiedParam.length() > 150) { 506 // Truncate to 1000 chars if TRACE is on, else to 150 chars 507 stringifiedParam = truncateTraceLog(stringifiedParam); 508 } 509 responseInfo.put("param", stringifiedParam); 510 if (param instanceof ClientProtos.ScanRequest && rsRpcServices != null) { 511 ClientProtos.ScanRequest request = ((ClientProtos.ScanRequest) param); 512 if (request.hasScannerId()) { 513 long scannerId = request.getScannerId(); 514 String scanDetails = rsRpcServices.getScanDetailsWithId(scannerId); 515 if (scanDetails != null) { 516 responseInfo.put("scandetails", scanDetails); 517 } 518 } 519 } 520 if (param instanceof ClientProtos.MultiRequest) { 521 int numGets = 0; 522 int numMutations = 0; 523 int numServiceCalls = 0; 524 ClientProtos.MultiRequest multi = (ClientProtos.MultiRequest)param; 525 for (ClientProtos.RegionAction regionAction : multi.getRegionActionList()) { 526 for (ClientProtos.Action action: regionAction.getActionList()) { 527 if (action.hasMutation()) { 528 numMutations++; 529 } 530 if (action.hasGet()) { 531 numGets++; 532 } 533 if (action.hasServiceCall()) { 534 numServiceCalls++; 535 } 536 } 537 } 538 responseInfo.put("multi.gets", numGets); 539 responseInfo.put("multi.mutations", numMutations); 540 responseInfo.put("multi.servicecalls", numServiceCalls); 541 } 542 LOG.warn("(response" + tag + "): " + GSON.toJson(responseInfo)); 543 } 544 545 /** 546 * Truncate to number of chars decided by conf hbase.ipc.trace.log.max.length 547 * if TRACE is on else to 150 chars Refer to Jira HBASE-20826 and HBASE-20942 548 * @param strParam stringifiedParam to be truncated 549 * @return truncated trace log string 550 */ 551 @VisibleForTesting 552 String truncateTraceLog(String strParam) { 553 if (LOG.isTraceEnabled()) { 554 int traceLogMaxLength = getConf().getInt(TRACE_LOG_MAX_LENGTH, DEFAULT_TRACE_LOG_MAX_LENGTH); 555 int truncatedLength = 556 strParam.length() < traceLogMaxLength ? strParam.length() : traceLogMaxLength; 557 String truncatedFlag = truncatedLength == strParam.length() ? "" : KEY_WORD_TRUNCATED; 558 return strParam.subSequence(0, truncatedLength) + truncatedFlag; 559 } 560 return strParam.subSequence(0, 150) + KEY_WORD_TRUNCATED; 561 } 562 563 /** 564 * Set the handler for calling out of RPC for error conditions. 565 * @param handler the handler implementation 566 */ 567 @Override 568 public void setErrorHandler(HBaseRPCErrorHandler handler) { 569 this.errorHandler = handler; 570 } 571 572 @Override 573 public HBaseRPCErrorHandler getErrorHandler() { 574 return this.errorHandler; 575 } 576 577 /** 578 * Returns the metrics instance for reporting RPC call statistics 579 */ 580 @Override 581 public MetricsHBaseServer getMetrics() { 582 return metrics; 583 } 584 585 @Override 586 public void addCallSize(final long diff) { 587 this.callQueueSizeInBytes.add(diff); 588 } 589 590 /** 591 * Authorize the incoming client connection. 592 * @param user client user 593 * @param connection incoming connection 594 * @param addr InetAddress of incoming connection 595 * @throws AuthorizationException when the client isn't authorized to talk the protocol 596 */ 597 public void authorize(UserGroupInformation user, ConnectionHeader connection, 598 InetAddress addr) throws AuthorizationException { 599 if (authorize) { 600 Class<?> c = getServiceInterface(services, connection.getServiceName()); 601 synchronized (authManager) { 602 authManager.authorize(user, c, getConf(), addr); 603 } 604 } 605 } 606 607 /** 608 * When the read or write buffer size is larger than this limit, i/o will be 609 * done in chunks of this size. Most RPC requests and responses would be 610 * be smaller. 611 */ 612 protected static final int NIO_BUFFER_LIMIT = 64 * 1024; //should not be more than 64KB. 613 614 /** 615 * This is a wrapper around {@link java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)}. 616 * If the amount of data is large, it writes to channel in smaller chunks. 617 * This is to avoid jdk from creating many direct buffers as the size of 618 * ByteBuffer increases. There should not be any performance degredation. 619 * 620 * @param channel writable byte channel to write on 621 * @param buffer buffer to write 622 * @return number of bytes written 623 * @throws java.io.IOException e 624 * @see java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer) 625 */ 626 protected int channelRead(ReadableByteChannel channel, 627 ByteBuffer buffer) throws IOException { 628 629 int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ? 630 channel.read(buffer) : channelIO(channel, null, buffer); 631 if (count > 0) { 632 metrics.receivedBytes(count); 633 } 634 return count; 635 } 636 637 /** 638 * Helper for {@link #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)}. 639 * Only one of readCh or writeCh should be non-null. 640 * 641 * @param readCh read channel 642 * @param writeCh write channel 643 * @param buf buffer to read or write into/out of 644 * @return bytes written 645 * @throws java.io.IOException e 646 * @see #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer) 647 */ 648 private static int channelIO(ReadableByteChannel readCh, 649 WritableByteChannel writeCh, 650 ByteBuffer buf) throws IOException { 651 652 int originalLimit = buf.limit(); 653 int initialRemaining = buf.remaining(); 654 int ret = 0; 655 656 while (buf.remaining() > 0) { 657 try { 658 int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT); 659 buf.limit(buf.position() + ioSize); 660 661 ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf); 662 663 if (ret < ioSize) { 664 break; 665 } 666 667 } finally { 668 buf.limit(originalLimit); 669 } 670 } 671 672 int nBytes = initialRemaining - buf.remaining(); 673 return (nBytes > 0) ? nBytes : ret; 674 } 675 676 /** 677 * This is extracted to a static method for better unit testing. We try to get buffer(s) from pool 678 * as much as possible. 679 * 680 * @param pool The ByteBufferPool to use 681 * @param minSizeForPoolUse Only for buffer size above this, we will try to use pool. Any buffer 682 * need of size below this, create on heap ByteBuffer. 683 * @param reqLen Bytes count in request 684 */ 685 @VisibleForTesting 686 static Pair<ByteBuff, CallCleanup> allocateByteBuffToReadInto(ByteBufferPool pool, 687 int minSizeForPoolUse, int reqLen) { 688 ByteBuff resultBuf; 689 List<ByteBuffer> bbs = new ArrayList<>((reqLen / pool.getBufferSize()) + 1); 690 int remain = reqLen; 691 ByteBuffer buf = null; 692 while (remain >= minSizeForPoolUse && (buf = pool.getBuffer()) != null) { 693 bbs.add(buf); 694 remain -= pool.getBufferSize(); 695 } 696 ByteBuffer[] bufsFromPool = null; 697 if (bbs.size() > 0) { 698 bufsFromPool = new ByteBuffer[bbs.size()]; 699 bbs.toArray(bufsFromPool); 700 } 701 if (remain > 0) { 702 bbs.add(ByteBuffer.allocate(remain)); 703 } 704 if (bbs.size() > 1) { 705 ByteBuffer[] items = new ByteBuffer[bbs.size()]; 706 bbs.toArray(items); 707 resultBuf = new MultiByteBuff(items); 708 } else { 709 // We are backed by single BB 710 resultBuf = new SingleByteBuff(bbs.get(0)); 711 } 712 resultBuf.limit(reqLen); 713 if (bufsFromPool != null) { 714 final ByteBuffer[] bufsFromPoolFinal = bufsFromPool; 715 return new Pair<>(resultBuf, () -> { 716 // Return back all the BBs to pool 717 for (int i = 0; i < bufsFromPoolFinal.length; i++) { 718 pool.putbackBuffer(bufsFromPoolFinal[i]); 719 } 720 }); 721 } 722 return new Pair<>(resultBuf, null); 723 } 724 725 /** 726 * Needed for features such as delayed calls. We need to be able to store the current call 727 * so that we can complete it later or ask questions of what is supported by the current ongoing 728 * call. 729 * @return An RpcCallContext backed by the currently ongoing call (gotten from a thread local) 730 */ 731 public static Optional<RpcCall> getCurrentCall() { 732 return Optional.ofNullable(CurCall.get()); 733 } 734 735 public static boolean isInRpcCallContext() { 736 return CurCall.get() != null; 737 } 738 739 /** 740 * Returns the user credentials associated with the current RPC request or not present if no 741 * credentials were provided. 742 * @return A User 743 */ 744 public static Optional<User> getRequestUser() { 745 Optional<RpcCall> ctx = getCurrentCall(); 746 return ctx.isPresent() ? ctx.get().getRequestUser() : Optional.empty(); 747 } 748 749 /** 750 * The number of open RPC conections 751 * @return the number of open rpc connections 752 */ 753 abstract public int getNumOpenConnections(); 754 755 /** 756 * Returns the username for any user associated with the current RPC 757 * request or not present if no user is set. 758 */ 759 public static Optional<String> getRequestUserName() { 760 return getRequestUser().map(User::getShortName); 761 } 762 763 /** 764 * @return Address of remote client if a request is ongoing, else null 765 */ 766 public static Optional<InetAddress> getRemoteAddress() { 767 return getCurrentCall().map(RpcCall::getRemoteAddress); 768 } 769 770 /** 771 * @param serviceName Some arbitrary string that represents a 'service'. 772 * @param services Available service instances 773 * @return Matching BlockingServiceAndInterface pair 774 */ 775 protected static BlockingServiceAndInterface getServiceAndInterface( 776 final List<BlockingServiceAndInterface> services, final String serviceName) { 777 for (BlockingServiceAndInterface bs : services) { 778 if (bs.getBlockingService().getDescriptorForType().getName().equals(serviceName)) { 779 return bs; 780 } 781 } 782 return null; 783 } 784 785 /** 786 * @param serviceName Some arbitrary string that represents a 'service'. 787 * @param services Available services and their service interfaces. 788 * @return Service interface class for <code>serviceName</code> 789 */ 790 protected static Class<?> getServiceInterface( 791 final List<BlockingServiceAndInterface> services, 792 final String serviceName) { 793 BlockingServiceAndInterface bsasi = 794 getServiceAndInterface(services, serviceName); 795 return bsasi == null? null: bsasi.getServiceInterface(); 796 } 797 798 /** 799 * @param serviceName Some arbitrary string that represents a 'service'. 800 * @param services Available services and their service interfaces. 801 * @return BlockingService that goes with the passed <code>serviceName</code> 802 */ 803 protected static BlockingService getService( 804 final List<BlockingServiceAndInterface> services, 805 final String serviceName) { 806 BlockingServiceAndInterface bsasi = 807 getServiceAndInterface(services, serviceName); 808 return bsasi == null? null: bsasi.getBlockingService(); 809 } 810 811 protected static MonitoredRPCHandler getStatus() { 812 // It is ugly the way we park status up in RpcServer. Let it be for now. TODO. 813 MonitoredRPCHandler status = RpcServer.MONITORED_RPC.get(); 814 if (status != null) { 815 return status; 816 } 817 status = TaskMonitor.get().createRPCStatus(Thread.currentThread().getName()); 818 status.pause("Waiting for a call"); 819 RpcServer.MONITORED_RPC.set(status); 820 return status; 821 } 822 823 /** Returns the remote side ip address when invoked inside an RPC 824 * Returns null incase of an error. 825 * @return InetAddress 826 */ 827 public static InetAddress getRemoteIp() { 828 RpcCall call = CurCall.get(); 829 if (call != null) { 830 return call.getRemoteAddress(); 831 } 832 return null; 833 } 834 835 @Override 836 public RpcScheduler getScheduler() { 837 return scheduler; 838 } 839 840 @Override 841 public void setRsRpcServices(RSRpcServices rsRpcServices) { 842 this.rsRpcServices = rsRpcServices; 843 } 844}