001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.ipc; 020 021import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION; 022 023import java.io.IOException; 024import java.net.InetAddress; 025import java.net.InetSocketAddress; 026import java.nio.ByteBuffer; 027import java.nio.channels.ReadableByteChannel; 028import java.nio.channels.WritableByteChannel; 029import java.util.Collections; 030import java.util.HashMap; 031import java.util.List; 032import java.util.Locale; 033import java.util.Map; 034import java.util.Optional; 035import java.util.concurrent.atomic.LongAdder; 036import org.apache.commons.lang3.StringUtils; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.hbase.CallQueueTooBigException; 039import org.apache.hadoop.hbase.CellScanner; 040import org.apache.hadoop.hbase.DoNotRetryIOException; 041import org.apache.hadoop.hbase.HConstants; 042import org.apache.hadoop.hbase.Server; 043import org.apache.hadoop.hbase.conf.ConfigurationObserver; 044import org.apache.hadoop.hbase.exceptions.RequestTooBigException; 045import org.apache.hadoop.hbase.io.ByteBuffAllocator; 046import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; 047import org.apache.hadoop.hbase.monitoring.TaskMonitor; 048import org.apache.hadoop.hbase.regionserver.RSRpcServices; 049import org.apache.hadoop.hbase.namequeues.RpcLogDetails; 050import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder; 051import org.apache.hadoop.hbase.security.HBasePolicyProvider; 052import org.apache.hadoop.hbase.security.SaslUtil; 053import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection; 054import org.apache.hadoop.hbase.security.User; 055import org.apache.hadoop.hbase.security.UserProvider; 056import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager; 057import org.apache.hadoop.hbase.util.GsonUtil; 058import org.apache.hadoop.hbase.util.Pair; 059import org.apache.hadoop.security.UserGroupInformation; 060import org.apache.hadoop.security.authorize.AuthorizationException; 061import org.apache.hadoop.security.authorize.PolicyProvider; 062import org.apache.hadoop.security.authorize.ProxyUsers; 063import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; 064import org.apache.hadoop.security.token.SecretManager; 065import org.apache.hadoop.security.token.TokenIdentifier; 066import org.apache.yetus.audience.InterfaceAudience; 067import org.slf4j.Logger; 068import org.slf4j.LoggerFactory; 069 070import org.apache.hbase.thirdparty.com.google.gson.Gson; 071import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; 072import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 073import org.apache.hbase.thirdparty.com.google.protobuf.Message; 074import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 075import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 076 077import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 078import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 079import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader; 080 081/** 082 * An RPC server that hosts protobuf described Services. 083 * 084 */ 085@InterfaceAudience.Private 086public abstract class RpcServer implements RpcServerInterface, 087 ConfigurationObserver { 088 // LOG is being used in CallRunner and the log level is being changed in tests 089 public static final Logger LOG = LoggerFactory.getLogger(RpcServer.class); 090 protected static final CallQueueTooBigException CALL_QUEUE_TOO_BIG_EXCEPTION 091 = new CallQueueTooBigException(); 092 093 private static final String MULTI_GETS = "multi.gets"; 094 private static final String MULTI_MUTATIONS = "multi.mutations"; 095 private static final String MULTI_SERVICE_CALLS = "multi.service_calls"; 096 097 private final boolean authorize; 098 private final boolean isOnlineLogProviderEnabled; 099 protected boolean isSecurityEnabled; 100 101 public static final byte CURRENT_VERSION = 0; 102 103 /** 104 * Whether we allow a fallback to SIMPLE auth for insecure clients when security is enabled. 105 */ 106 public static final String FALLBACK_TO_INSECURE_CLIENT_AUTH = 107 "hbase.ipc.server.fallback-to-simple-auth-allowed"; 108 109 /** 110 * How many calls/handler are allowed in the queue. 111 */ 112 protected static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10; 113 114 protected final CellBlockBuilder cellBlockBuilder; 115 116 protected static final String AUTH_FAILED_FOR = "Auth failed for "; 117 protected static final String AUTH_SUCCESSFUL_FOR = "Auth successful for "; 118 protected static final Logger AUDITLOG = LoggerFactory.getLogger("SecurityLogger." 119 + Server.class.getName()); 120 protected SecretManager<TokenIdentifier> secretManager; 121 protected final Map<String, String> saslProps; 122 123 protected ServiceAuthorizationManager authManager; 124 125 /** This is set to Call object before Handler invokes an RPC and ybdie 126 * after the call returns. 127 */ 128 protected static final ThreadLocal<RpcCall> CurCall = new ThreadLocal<>(); 129 130 /** Keeps MonitoredRPCHandler per handler thread. */ 131 protected static final ThreadLocal<MonitoredRPCHandler> MONITORED_RPC = new ThreadLocal<>(); 132 133 protected final InetSocketAddress bindAddress; 134 135 protected MetricsHBaseServer metrics; 136 137 protected final Configuration conf; 138 139 /** 140 * Maximum size in bytes of the currently queued and running Calls. If a new Call puts us over 141 * this size, then we will reject the call (after parsing it though). It will go back to the 142 * client and client will retry. Set this size with "hbase.ipc.server.max.callqueue.size". The 143 * call queue size gets incremented after we parse a call and before we add it to the queue of 144 * calls for the scheduler to use. It get decremented after we have 'run' the Call. The current 145 * size is kept in {@link #callQueueSizeInBytes}. 146 * @see #callQueueSizeInBytes 147 * @see #DEFAULT_MAX_CALLQUEUE_SIZE 148 */ 149 protected final long maxQueueSizeInBytes; 150 protected static final int DEFAULT_MAX_CALLQUEUE_SIZE = 1024 * 1024 * 1024; 151 152 /** 153 * This is a running count of the size in bytes of all outstanding calls whether currently 154 * executing or queued waiting to be run. 155 */ 156 protected final LongAdder callQueueSizeInBytes = new LongAdder(); 157 158 protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm 159 protected final boolean tcpKeepAlive; // if T then use keepalives 160 161 /** 162 * This flag is used to indicate to sub threads when they should go down. When we call 163 * {@link #start()}, all threads started will consult this flag on whether they should 164 * keep going. It is set to false when {@link #stop()} is called. 165 */ 166 volatile boolean running = true; 167 168 /** 169 * This flag is set to true after all threads are up and 'running' and the server is then opened 170 * for business by the call to {@link #start()}. 171 */ 172 volatile boolean started = false; 173 174 protected AuthenticationTokenSecretManager authTokenSecretMgr = null; 175 176 protected HBaseRPCErrorHandler errorHandler = null; 177 178 public static final String MAX_REQUEST_SIZE = "hbase.ipc.max.request.size"; 179 protected static final RequestTooBigException REQUEST_TOO_BIG_EXCEPTION = 180 new RequestTooBigException(); 181 182 protected static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time"; 183 protected static final String WARN_RESPONSE_SIZE = "hbase.ipc.warn.response.size"; 184 185 /** 186 * Minimum allowable timeout (in milliseconds) in rpc request's header. This 187 * configuration exists to prevent the rpc service regarding this request as timeout immediately. 188 */ 189 protected static final String MIN_CLIENT_REQUEST_TIMEOUT = "hbase.ipc.min.client.request.timeout"; 190 protected static final int DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT = 20; 191 192 /** Default value for above params */ 193 public static final int DEFAULT_MAX_REQUEST_SIZE = DEFAULT_MAX_CALLQUEUE_SIZE / 4; // 256M 194 protected static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds 195 protected static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024; 196 197 protected static final int DEFAULT_TRACE_LOG_MAX_LENGTH = 1000; 198 protected static final String TRACE_LOG_MAX_LENGTH = "hbase.ipc.trace.log.max.length"; 199 protected static final String KEY_WORD_TRUNCATED = " <TRUNCATED>"; 200 201 protected static final Gson GSON = GsonUtil.createGsonWithDisableHtmlEscaping().create(); 202 203 protected final int maxRequestSize; 204 protected final int warnResponseTime; 205 protected final int warnResponseSize; 206 207 protected final int minClientRequestTimeout; 208 209 protected final Server server; 210 protected final List<BlockingServiceAndInterface> services; 211 212 protected final RpcScheduler scheduler; 213 214 protected UserProvider userProvider; 215 216 protected final ByteBuffAllocator bbAllocator; 217 218 protected volatile boolean allowFallbackToSimpleAuth; 219 220 /** 221 * Used to get details for scan with a scanner_id<br/> 222 * TODO try to figure out a better way and remove reference from regionserver package later. 223 */ 224 private RSRpcServices rsRpcServices; 225 226 227 /** 228 * Use to add online slowlog responses 229 */ 230 private NamedQueueRecorder namedQueueRecorder; 231 232 @FunctionalInterface 233 protected interface CallCleanup { 234 void run(); 235 } 236 237 /** 238 * Datastructure for passing a {@link BlockingService} and its associated class of 239 * protobuf service interface. For example, a server that fielded what is defined 240 * in the client protobuf service would pass in an implementation of the client blocking service 241 * and then its ClientService.BlockingInterface.class. Used checking connection setup. 242 */ 243 public static class BlockingServiceAndInterface { 244 private final BlockingService service; 245 private final Class<?> serviceInterface; 246 public BlockingServiceAndInterface(final BlockingService service, 247 final Class<?> serviceInterface) { 248 this.service = service; 249 this.serviceInterface = serviceInterface; 250 } 251 public Class<?> getServiceInterface() { 252 return this.serviceInterface; 253 } 254 public BlockingService getBlockingService() { 255 return this.service; 256 } 257 } 258 259 /** 260 * Constructs a server listening on the named port and address. 261 * @param server hosting instance of {@link Server}. We will do authentications if an 262 * instance else pass null for no authentication check. 263 * @param name Used keying this rpc servers' metrics and for naming the Listener thread. 264 * @param services A list of services. 265 * @param bindAddress Where to listen 266 * @param conf 267 * @param scheduler 268 * @param reservoirEnabled Enable ByteBufferPool or not. 269 */ 270 public RpcServer(final Server server, final String name, 271 final List<BlockingServiceAndInterface> services, 272 final InetSocketAddress bindAddress, Configuration conf, 273 RpcScheduler scheduler, boolean reservoirEnabled) throws IOException { 274 this.bbAllocator = ByteBuffAllocator.create(conf, reservoirEnabled); 275 this.server = server; 276 this.services = services; 277 this.bindAddress = bindAddress; 278 this.conf = conf; 279 // See declaration above for documentation on what this size is. 280 this.maxQueueSizeInBytes = 281 this.conf.getLong("hbase.ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE); 282 283 this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME); 284 this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE, DEFAULT_WARN_RESPONSE_SIZE); 285 this.minClientRequestTimeout = conf.getInt(MIN_CLIENT_REQUEST_TIMEOUT, 286 DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT); 287 this.maxRequestSize = conf.getInt(MAX_REQUEST_SIZE, DEFAULT_MAX_REQUEST_SIZE); 288 289 this.metrics = new MetricsHBaseServer(name, new MetricsHBaseServerWrapperImpl(this)); 290 this.tcpNoDelay = conf.getBoolean("hbase.ipc.server.tcpnodelay", true); 291 this.tcpKeepAlive = conf.getBoolean("hbase.ipc.server.tcpkeepalive", true); 292 293 this.cellBlockBuilder = new CellBlockBuilder(conf); 294 295 this.authorize = conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false); 296 this.userProvider = UserProvider.instantiate(conf); 297 this.isSecurityEnabled = userProvider.isHBaseSecurityEnabled(); 298 if (isSecurityEnabled) { 299 saslProps = SaslUtil.initSaslProperties(conf.get("hbase.rpc.protection", 300 QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT))); 301 } else { 302 saslProps = Collections.emptyMap(); 303 } 304 305 this.isOnlineLogProviderEnabled = conf.getBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, 306 HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED); 307 this.scheduler = scheduler; 308 } 309 310 @Override 311 public void onConfigurationChange(Configuration newConf) { 312 initReconfigurable(newConf); 313 if (scheduler instanceof ConfigurationObserver) { 314 ((ConfigurationObserver) scheduler).onConfigurationChange(newConf); 315 } 316 if (authorize) { 317 refreshAuthManager(newConf, new HBasePolicyProvider()); 318 } 319 } 320 321 protected void initReconfigurable(Configuration confToLoad) { 322 this.allowFallbackToSimpleAuth = confToLoad.getBoolean(FALLBACK_TO_INSECURE_CLIENT_AUTH, false); 323 if (isSecurityEnabled && allowFallbackToSimpleAuth) { 324 LOG.warn("********* WARNING! *********"); 325 LOG.warn("This server is configured to allow connections from INSECURE clients"); 326 LOG.warn("(" + FALLBACK_TO_INSECURE_CLIENT_AUTH + " = true)."); 327 LOG.warn("While this option is enabled, client identities cannot be secured, and user"); 328 LOG.warn("impersonation is possible!"); 329 LOG.warn("For secure operation, please disable SIMPLE authentication as soon as possible,"); 330 LOG.warn("by setting " + FALLBACK_TO_INSECURE_CLIENT_AUTH + " = false in hbase-site.xml"); 331 LOG.warn("****************************"); 332 } 333 } 334 335 Configuration getConf() { 336 return conf; 337 } 338 339 @Override 340 public boolean isStarted() { 341 return this.started; 342 } 343 344 @Override 345 public synchronized void refreshAuthManager(Configuration conf, PolicyProvider pp) { 346 // Ignore warnings that this should be accessed in a static way instead of via an instance; 347 // it'll break if you go via static route. 348 System.setProperty("hadoop.policy.file", "hbase-policy.xml"); 349 this.authManager.refresh(conf, pp); 350 LOG.info("Refreshed hbase-policy.xml successfully"); 351 ProxyUsers.refreshSuperUserGroupsConfiguration(conf); 352 LOG.info("Refreshed super and proxy users successfully"); 353 } 354 355 protected AuthenticationTokenSecretManager createSecretManager() { 356 if (!isSecurityEnabled) return null; 357 if (server == null) return null; 358 Configuration conf = server.getConfiguration(); 359 long keyUpdateInterval = 360 conf.getLong("hbase.auth.key.update.interval", 24*60*60*1000); 361 long maxAge = 362 conf.getLong("hbase.auth.token.max.lifetime", 7*24*60*60*1000); 363 return new AuthenticationTokenSecretManager(conf, server.getZooKeeper(), 364 server.getServerName().toString(), keyUpdateInterval, maxAge); 365 } 366 367 public SecretManager<? extends TokenIdentifier> getSecretManager() { 368 return this.secretManager; 369 } 370 371 @SuppressWarnings("unchecked") 372 public void setSecretManager(SecretManager<? extends TokenIdentifier> secretManager) { 373 this.secretManager = (SecretManager<TokenIdentifier>) secretManager; 374 } 375 376 /** 377 * This is a server side method, which is invoked over RPC. On success 378 * the return response has protobuf response payload. On failure, the 379 * exception name and the stack trace are returned in the protobuf response. 380 */ 381 @Override 382 public Pair<Message, CellScanner> call(RpcCall call, 383 MonitoredRPCHandler status) throws IOException { 384 try { 385 MethodDescriptor md = call.getMethod(); 386 Message param = call.getParam(); 387 status.setRPC(md.getName(), new Object[]{param}, 388 call.getReceiveTime()); 389 // TODO: Review after we add in encoded data blocks. 390 status.setRPCPacket(param); 391 status.resume("Servicing call"); 392 //get an instance of the method arg type 393 HBaseRpcController controller = new HBaseRpcControllerImpl(call.getCellScanner()); 394 controller.setCallTimeout(call.getTimeout()); 395 Message result = call.getService().callBlockingMethod(md, controller, param); 396 long receiveTime = call.getReceiveTime(); 397 long startTime = call.getStartTime(); 398 long endTime = System.currentTimeMillis(); 399 int processingTime = (int) (endTime - startTime); 400 int qTime = (int) (startTime - receiveTime); 401 int totalTime = (int) (endTime - receiveTime); 402 if (LOG.isTraceEnabled()) { 403 LOG.trace(CurCall.get().toString() + 404 ", response " + TextFormat.shortDebugString(result) + 405 " queueTime: " + qTime + 406 " processingTime: " + processingTime + 407 " totalTime: " + totalTime); 408 } 409 // Use the raw request call size for now. 410 long requestSize = call.getSize(); 411 long responseSize = result.getSerializedSize(); 412 if (call.isClientCellBlockSupported()) { 413 // Include the payload size in HBaseRpcController 414 responseSize += call.getResponseCellSize(); 415 } 416 417 metrics.dequeuedCall(qTime); 418 metrics.processedCall(processingTime); 419 metrics.totalCall(totalTime); 420 metrics.receivedRequest(requestSize); 421 metrics.sentResponse(responseSize); 422 // log any RPC responses that are slower than the configured warn 423 // response time or larger than configured warning size 424 boolean tooSlow = (processingTime > warnResponseTime && warnResponseTime > -1); 425 boolean tooLarge = (responseSize > warnResponseSize && warnResponseSize > -1); 426 if (tooSlow || tooLarge) { 427 final String userName = call.getRequestUserName().orElse(StringUtils.EMPTY); 428 // when tagging, we let TooLarge trump TooSmall to keep output simple 429 // note that large responses will often also be slow. 430 logResponse(param, 431 md.getName(), md.getName() + "(" + param.getClass().getName() + ")", 432 tooLarge, tooSlow, 433 status.getClient(), startTime, processingTime, qTime, 434 responseSize, userName); 435 if (this.namedQueueRecorder != null && this.isOnlineLogProviderEnabled) { 436 // send logs to ring buffer owned by slowLogRecorder 437 final String className = 438 server == null ? StringUtils.EMPTY : server.getClass().getSimpleName(); 439 this.namedQueueRecorder.addRecord( 440 new RpcLogDetails(call, param, status.getClient(), responseSize, className, tooSlow, 441 tooLarge)); 442 } 443 } 444 return new Pair<>(result, controller.cellScanner()); 445 } catch (Throwable e) { 446 // The above callBlockingMethod will always return a SE. Strip the SE wrapper before 447 // putting it on the wire. Its needed to adhere to the pb Service Interface but we don't 448 // need to pass it over the wire. 449 if (e instanceof ServiceException) { 450 if (e.getCause() == null) { 451 LOG.debug("Caught a ServiceException with null cause", e); 452 } else { 453 e = e.getCause(); 454 } 455 } 456 457 // increment the number of requests that were exceptions. 458 metrics.exception(e); 459 460 if (e instanceof LinkageError) throw new DoNotRetryIOException(e); 461 if (e instanceof IOException) throw (IOException)e; 462 LOG.error("Unexpected throwable object ", e); 463 throw new IOException(e.getMessage(), e); 464 } 465 } 466 467 /** 468 * Logs an RPC response to the LOG file, producing valid JSON objects for 469 * client Operations. 470 * @param param The parameters received in the call. 471 * @param methodName The name of the method invoked 472 * @param call The string representation of the call 473 * @param tooLarge To indicate if the event is tooLarge 474 * @param tooSlow To indicate if the event is tooSlow 475 * @param clientAddress The address of the client who made this call. 476 * @param startTime The time that the call was initiated, in ms. 477 * @param processingTime The duration that the call took to run, in ms. 478 * @param qTime The duration that the call spent on the queue 479 * prior to being initiated, in ms. 480 * @param responseSize The size in bytes of the response buffer. 481 * @param userName UserName of the current RPC Call 482 */ 483 void logResponse(Message param, String methodName, String call, boolean tooLarge, 484 boolean tooSlow, String clientAddress, long startTime, int processingTime, int qTime, 485 long responseSize, String userName) { 486 final String className = server == null ? StringUtils.EMPTY : 487 server.getClass().getSimpleName(); 488 // base information that is reported regardless of type of call 489 Map<String, Object> responseInfo = new HashMap<>(); 490 responseInfo.put("starttimems", startTime); 491 responseInfo.put("processingtimems", processingTime); 492 responseInfo.put("queuetimems", qTime); 493 responseInfo.put("responsesize", responseSize); 494 responseInfo.put("client", clientAddress); 495 responseInfo.put("class", className); 496 responseInfo.put("method", methodName); 497 responseInfo.put("call", call); 498 // The params could be really big, make sure they don't kill us at WARN 499 String stringifiedParam = ProtobufUtil.getShortTextFormat(param); 500 if (stringifiedParam.length() > 150) { 501 // Truncate to 1000 chars if TRACE is on, else to 150 chars 502 stringifiedParam = truncateTraceLog(stringifiedParam); 503 } 504 responseInfo.put("param", stringifiedParam); 505 if (param instanceof ClientProtos.ScanRequest && rsRpcServices != null) { 506 ClientProtos.ScanRequest request = ((ClientProtos.ScanRequest) param); 507 String scanDetails; 508 if (request.hasScannerId()) { 509 long scannerId = request.getScannerId(); 510 scanDetails = rsRpcServices.getScanDetailsWithId(scannerId); 511 } else { 512 scanDetails = rsRpcServices.getScanDetailsWithRequest(request); 513 } 514 if (scanDetails != null) { 515 responseInfo.put("scandetails", scanDetails); 516 } 517 } 518 if (param instanceof ClientProtos.MultiRequest) { 519 int numGets = 0; 520 int numMutations = 0; 521 int numServiceCalls = 0; 522 ClientProtos.MultiRequest multi = (ClientProtos.MultiRequest)param; 523 for (ClientProtos.RegionAction regionAction : multi.getRegionActionList()) { 524 for (ClientProtos.Action action: regionAction.getActionList()) { 525 if (action.hasMutation()) { 526 numMutations++; 527 } 528 if (action.hasGet()) { 529 numGets++; 530 } 531 if (action.hasServiceCall()) { 532 numServiceCalls++; 533 } 534 } 535 } 536 responseInfo.put(MULTI_GETS, numGets); 537 responseInfo.put(MULTI_MUTATIONS, numMutations); 538 responseInfo.put(MULTI_SERVICE_CALLS, numServiceCalls); 539 } 540 final String tag = (tooLarge && tooSlow) ? "TooLarge & TooSlow" 541 : (tooSlow ? "TooSlow" : "TooLarge"); 542 LOG.warn("(response" + tag + "): " + GSON.toJson(responseInfo)); 543 } 544 545 546 /** 547 * Truncate to number of chars decided by conf hbase.ipc.trace.log.max.length 548 * if TRACE is on else to 150 chars Refer to Jira HBASE-20826 and HBASE-20942 549 * @param strParam stringifiedParam to be truncated 550 * @return truncated trace log string 551 */ 552 String truncateTraceLog(String strParam) { 553 if (LOG.isTraceEnabled()) { 554 int traceLogMaxLength = getConf().getInt(TRACE_LOG_MAX_LENGTH, DEFAULT_TRACE_LOG_MAX_LENGTH); 555 int truncatedLength = 556 strParam.length() < traceLogMaxLength ? strParam.length() : traceLogMaxLength; 557 String truncatedFlag = truncatedLength == strParam.length() ? "" : KEY_WORD_TRUNCATED; 558 return strParam.subSequence(0, truncatedLength) + truncatedFlag; 559 } 560 return strParam.subSequence(0, 150) + KEY_WORD_TRUNCATED; 561 } 562 563 /** 564 * Set the handler for calling out of RPC for error conditions. 565 * @param handler the handler implementation 566 */ 567 @Override 568 public void setErrorHandler(HBaseRPCErrorHandler handler) { 569 this.errorHandler = handler; 570 } 571 572 @Override 573 public HBaseRPCErrorHandler getErrorHandler() { 574 return this.errorHandler; 575 } 576 577 /** 578 * Returns the metrics instance for reporting RPC call statistics 579 */ 580 @Override 581 public MetricsHBaseServer getMetrics() { 582 return metrics; 583 } 584 585 @Override 586 public void addCallSize(final long diff) { 587 this.callQueueSizeInBytes.add(diff); 588 } 589 590 /** 591 * Authorize the incoming client connection. 592 * @param user client user 593 * @param connection incoming connection 594 * @param addr InetAddress of incoming connection 595 * @throws AuthorizationException when the client isn't authorized to talk the protocol 596 */ 597 public synchronized void authorize(UserGroupInformation user, ConnectionHeader connection, 598 InetAddress addr) throws AuthorizationException { 599 if (authorize) { 600 Class<?> c = getServiceInterface(services, connection.getServiceName()); 601 authManager.authorize(user, c, getConf(), addr); 602 } 603 } 604 605 /** 606 * When the read or write buffer size is larger than this limit, i/o will be 607 * done in chunks of this size. Most RPC requests and responses would be 608 * be smaller. 609 */ 610 protected static final int NIO_BUFFER_LIMIT = 64 * 1024; //should not be more than 64KB. 611 612 /** 613 * This is a wrapper around {@link java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)}. 614 * If the amount of data is large, it writes to channel in smaller chunks. 615 * This is to avoid jdk from creating many direct buffers as the size of 616 * ByteBuffer increases. There should not be any performance degredation. 617 * 618 * @param channel writable byte channel to write on 619 * @param buffer buffer to write 620 * @return number of bytes written 621 * @throws java.io.IOException e 622 * @see java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer) 623 */ 624 protected int channelRead(ReadableByteChannel channel, 625 ByteBuffer buffer) throws IOException { 626 627 int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ? 628 channel.read(buffer) : channelIO(channel, null, buffer); 629 if (count > 0) { 630 metrics.receivedBytes(count); 631 } 632 return count; 633 } 634 635 /** 636 * Helper for {@link #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)}. 637 * Only one of readCh or writeCh should be non-null. 638 * 639 * @param readCh read channel 640 * @param writeCh write channel 641 * @param buf buffer to read or write into/out of 642 * @return bytes written 643 * @throws java.io.IOException e 644 * @see #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer) 645 */ 646 private static int channelIO(ReadableByteChannel readCh, 647 WritableByteChannel writeCh, 648 ByteBuffer buf) throws IOException { 649 650 int originalLimit = buf.limit(); 651 int initialRemaining = buf.remaining(); 652 int ret = 0; 653 654 while (buf.remaining() > 0) { 655 try { 656 int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT); 657 buf.limit(buf.position() + ioSize); 658 659 ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf); 660 661 if (ret < ioSize) { 662 break; 663 } 664 665 } finally { 666 buf.limit(originalLimit); 667 } 668 } 669 670 int nBytes = initialRemaining - buf.remaining(); 671 return (nBytes > 0) ? nBytes : ret; 672 } 673 674 /** 675 * Needed for features such as delayed calls. We need to be able to store the current call 676 * so that we can complete it later or ask questions of what is supported by the current ongoing 677 * call. 678 * @return An RpcCallContext backed by the currently ongoing call (gotten from a thread local) 679 */ 680 public static Optional<RpcCall> getCurrentCall() { 681 return Optional.ofNullable(CurCall.get()); 682 } 683 684 public static boolean isInRpcCallContext() { 685 return CurCall.get() != null; 686 } 687 688 /** 689 * Used by {@link org.apache.hadoop.hbase.procedure2.store.region.RegionProcedureStore}. For 690 * master's rpc call, it may generate new procedure and mutate the region which store procedure. 691 * There are some check about rpc when mutate region, such as rpc timeout check. So unset the rpc 692 * call to avoid the rpc check. 693 * @return the currently ongoing rpc call 694 */ 695 public static Optional<RpcCall> unsetCurrentCall() { 696 Optional<RpcCall> rpcCall = getCurrentCall(); 697 CurCall.set(null); 698 return rpcCall; 699 } 700 701 /** 702 * Used by {@link org.apache.hadoop.hbase.procedure2.store.region.RegionProcedureStore}. Set the 703 * rpc call back after mutate region. 704 */ 705 public static void setCurrentCall(RpcCall rpcCall) { 706 CurCall.set(rpcCall); 707 } 708 709 /** 710 * Returns the user credentials associated with the current RPC request or not present if no 711 * credentials were provided. 712 * @return A User 713 */ 714 public static Optional<User> getRequestUser() { 715 Optional<RpcCall> ctx = getCurrentCall(); 716 return ctx.isPresent() ? ctx.get().getRequestUser() : Optional.empty(); 717 } 718 719 /** 720 * The number of open RPC conections 721 * @return the number of open rpc connections 722 */ 723 abstract public int getNumOpenConnections(); 724 725 /** 726 * Returns the username for any user associated with the current RPC 727 * request or not present if no user is set. 728 */ 729 public static Optional<String> getRequestUserName() { 730 return getRequestUser().map(User::getShortName); 731 } 732 733 /** 734 * @return Address of remote client if a request is ongoing, else null 735 */ 736 public static Optional<InetAddress> getRemoteAddress() { 737 return getCurrentCall().map(RpcCall::getRemoteAddress); 738 } 739 740 /** 741 * @param serviceName Some arbitrary string that represents a 'service'. 742 * @param services Available service instances 743 * @return Matching BlockingServiceAndInterface pair 744 */ 745 protected static BlockingServiceAndInterface getServiceAndInterface( 746 final List<BlockingServiceAndInterface> services, final String serviceName) { 747 for (BlockingServiceAndInterface bs : services) { 748 if (bs.getBlockingService().getDescriptorForType().getName().equals(serviceName)) { 749 return bs; 750 } 751 } 752 return null; 753 } 754 755 /** 756 * @param serviceName Some arbitrary string that represents a 'service'. 757 * @param services Available services and their service interfaces. 758 * @return Service interface class for <code>serviceName</code> 759 */ 760 protected static Class<?> getServiceInterface( 761 final List<BlockingServiceAndInterface> services, 762 final String serviceName) { 763 BlockingServiceAndInterface bsasi = 764 getServiceAndInterface(services, serviceName); 765 return bsasi == null? null: bsasi.getServiceInterface(); 766 } 767 768 /** 769 * @param serviceName Some arbitrary string that represents a 'service'. 770 * @param services Available services and their service interfaces. 771 * @return BlockingService that goes with the passed <code>serviceName</code> 772 */ 773 protected static BlockingService getService( 774 final List<BlockingServiceAndInterface> services, 775 final String serviceName) { 776 BlockingServiceAndInterface bsasi = 777 getServiceAndInterface(services, serviceName); 778 return bsasi == null? null: bsasi.getBlockingService(); 779 } 780 781 protected static MonitoredRPCHandler getStatus() { 782 // It is ugly the way we park status up in RpcServer. Let it be for now. TODO. 783 MonitoredRPCHandler status = RpcServer.MONITORED_RPC.get(); 784 if (status != null) { 785 return status; 786 } 787 status = TaskMonitor.get().createRPCStatus(Thread.currentThread().getName()); 788 status.pause("Waiting for a call"); 789 RpcServer.MONITORED_RPC.set(status); 790 return status; 791 } 792 793 /** Returns the remote side ip address when invoked inside an RPC 794 * Returns null incase of an error. 795 * @return InetAddress 796 */ 797 public static InetAddress getRemoteIp() { 798 RpcCall call = CurCall.get(); 799 if (call != null) { 800 return call.getRemoteAddress(); 801 } 802 return null; 803 } 804 805 @Override 806 public RpcScheduler getScheduler() { 807 return scheduler; 808 } 809 810 @Override 811 public ByteBuffAllocator getByteBuffAllocator() { 812 return this.bbAllocator; 813 } 814 815 @Override 816 public void setRsRpcServices(RSRpcServices rsRpcServices) { 817 this.rsRpcServices = rsRpcServices; 818 } 819 820 @Override 821 public void setNamedQueueRecorder(NamedQueueRecorder namedQueueRecorder) { 822 this.namedQueueRecorder = namedQueueRecorder; 823 } 824 825}