001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION; 021 022import java.io.IOException; 023import java.net.InetAddress; 024import java.net.InetSocketAddress; 025import java.nio.ByteBuffer; 026import java.nio.channels.ReadableByteChannel; 027import java.nio.channels.WritableByteChannel; 028import java.util.Collections; 029import java.util.HashMap; 030import java.util.List; 031import java.util.Locale; 032import java.util.Map; 033import java.util.Optional; 034import java.util.concurrent.atomic.LongAdder; 035import org.apache.commons.lang3.StringUtils; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.hbase.CallQueueTooBigException; 038import org.apache.hadoop.hbase.CellScanner; 039import org.apache.hadoop.hbase.DoNotRetryIOException; 040import org.apache.hadoop.hbase.HConstants; 041import org.apache.hadoop.hbase.Server; 042import org.apache.hadoop.hbase.conf.ConfigurationObserver; 043import org.apache.hadoop.hbase.io.ByteBuffAllocator; 044import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; 045import org.apache.hadoop.hbase.monitoring.TaskMonitor; 046import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder; 047import org.apache.hadoop.hbase.namequeues.RpcLogDetails; 048import org.apache.hadoop.hbase.regionserver.RSRpcServices; 049import org.apache.hadoop.hbase.security.HBasePolicyProvider; 050import org.apache.hadoop.hbase.security.SaslUtil; 051import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection; 052import org.apache.hadoop.hbase.security.User; 053import org.apache.hadoop.hbase.security.UserProvider; 054import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager; 055import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 056import org.apache.hadoop.hbase.util.GsonUtil; 057import org.apache.hadoop.hbase.util.Pair; 058import org.apache.hadoop.security.UserGroupInformation; 059import org.apache.hadoop.security.authorize.AuthorizationException; 060import org.apache.hadoop.security.authorize.PolicyProvider; 061import org.apache.hadoop.security.authorize.ProxyUsers; 062import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; 063import org.apache.hadoop.security.token.SecretManager; 064import org.apache.hadoop.security.token.TokenIdentifier; 065import org.apache.yetus.audience.InterfaceAudience; 066import org.slf4j.Logger; 067import org.slf4j.LoggerFactory; 068 069import org.apache.hbase.thirdparty.com.google.gson.Gson; 070import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; 071import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 072import org.apache.hbase.thirdparty.com.google.protobuf.Message; 073import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 074import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 075 076import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 077import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 078import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader; 079 080/** 081 * An RPC server that hosts protobuf described Services. 082 */ 083@InterfaceAudience.Private 084public abstract class RpcServer implements RpcServerInterface, ConfigurationObserver { 085 // LOG is being used in CallRunner and the log level is being changed in tests 086 public static final Logger LOG = LoggerFactory.getLogger(RpcServer.class); 087 protected static final CallQueueTooBigException CALL_QUEUE_TOO_BIG_EXCEPTION = 088 new CallQueueTooBigException(); 089 090 private static final String MULTI_GETS = "multi.gets"; 091 private static final String MULTI_MUTATIONS = "multi.mutations"; 092 private static final String MULTI_SERVICE_CALLS = "multi.service_calls"; 093 094 private final boolean authorize; 095 private volatile boolean isOnlineLogProviderEnabled; 096 protected boolean isSecurityEnabled; 097 098 public static final byte CURRENT_VERSION = 0; 099 100 /** 101 * Whether we allow a fallback to SIMPLE auth for insecure clients when security is enabled. 102 */ 103 public static final String FALLBACK_TO_INSECURE_CLIENT_AUTH = 104 "hbase.ipc.server.fallback-to-simple-auth-allowed"; 105 106 /** 107 * How many calls/handler are allowed in the queue. 108 */ 109 protected static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10; 110 111 protected final CellBlockBuilder cellBlockBuilder; 112 113 protected static final String AUTH_FAILED_FOR = "Auth failed for "; 114 protected static final String AUTH_SUCCESSFUL_FOR = "Auth successful for "; 115 protected static final Logger AUDITLOG = 116 LoggerFactory.getLogger("SecurityLogger." + Server.class.getName()); 117 protected SecretManager<TokenIdentifier> secretManager; 118 protected final Map<String, String> saslProps; 119 120 protected ServiceAuthorizationManager authManager; 121 122 /** 123 * This is set to Call object before Handler invokes an RPC and ybdie after the call returns. 124 */ 125 protected static final ThreadLocal<RpcCall> CurCall = new ThreadLocal<>(); 126 127 /** Keeps MonitoredRPCHandler per handler thread. */ 128 protected static final ThreadLocal<MonitoredRPCHandler> MONITORED_RPC = new ThreadLocal<>(); 129 130 protected final InetSocketAddress bindAddress; 131 132 protected MetricsHBaseServer metrics; 133 134 protected final Configuration conf; 135 136 /** 137 * Maximum size in bytes of the currently queued and running Calls. If a new Call puts us over 138 * this size, then we will reject the call (after parsing it though). It will go back to the 139 * client and client will retry. Set this size with "hbase.ipc.server.max.callqueue.size". The 140 * call queue size gets incremented after we parse a call and before we add it to the queue of 141 * calls for the scheduler to use. It get decremented after we have 'run' the Call. The current 142 * size is kept in {@link #callQueueSizeInBytes}. 143 * @see #callQueueSizeInBytes 144 * @see #DEFAULT_MAX_CALLQUEUE_SIZE 145 */ 146 protected final long maxQueueSizeInBytes; 147 protected static final int DEFAULT_MAX_CALLQUEUE_SIZE = 1024 * 1024 * 1024; 148 149 /** 150 * This is a running count of the size in bytes of all outstanding calls whether currently 151 * executing or queued waiting to be run. 152 */ 153 protected final LongAdder callQueueSizeInBytes = new LongAdder(); 154 155 protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm 156 protected final boolean tcpKeepAlive; // if T then use keepalives 157 158 /** 159 * This flag is used to indicate to sub threads when they should go down. When we call 160 * {@link #start()}, all threads started will consult this flag on whether they should keep going. 161 * It is set to false when {@link #stop()} is called. 162 */ 163 volatile boolean running = true; 164 165 /** 166 * This flag is set to true after all threads are up and 'running' and the server is then opened 167 * for business by the call to {@link #start()}. 168 */ 169 volatile boolean started = false; 170 171 protected AuthenticationTokenSecretManager authTokenSecretMgr = null; 172 173 protected HBaseRPCErrorHandler errorHandler = null; 174 175 public static final String MAX_REQUEST_SIZE = "hbase.ipc.max.request.size"; 176 177 protected static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time"; 178 protected static final String WARN_RESPONSE_SIZE = "hbase.ipc.warn.response.size"; 179 protected static final String WARN_SCAN_RESPONSE_TIME = "hbase.ipc.warn.response.time.scan"; 180 protected static final String WARN_SCAN_RESPONSE_SIZE = "hbase.ipc.warn.response.size.scan"; 181 182 /** 183 * Minimum allowable timeout (in milliseconds) in rpc request's header. This configuration exists 184 * to prevent the rpc service regarding this request as timeout immediately. 185 */ 186 protected static final String MIN_CLIENT_REQUEST_TIMEOUT = "hbase.ipc.min.client.request.timeout"; 187 protected static final int DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT = 20; 188 189 /** Default value for above params */ 190 public static final int DEFAULT_MAX_REQUEST_SIZE = DEFAULT_MAX_CALLQUEUE_SIZE / 4; // 256M 191 protected static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds 192 protected static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024; 193 194 protected static final int DEFAULT_TRACE_LOG_MAX_LENGTH = 1000; 195 protected static final String TRACE_LOG_MAX_LENGTH = "hbase.ipc.trace.log.max.length"; 196 protected static final String KEY_WORD_TRUNCATED = " <TRUNCATED>"; 197 198 protected static final Gson GSON = GsonUtil.createGsonWithDisableHtmlEscaping().create(); 199 200 protected final int maxRequestSize; 201 protected volatile int warnResponseTime; 202 protected volatile int warnResponseSize; 203 protected volatile int warnScanResponseTime; 204 protected volatile int warnScanResponseSize; 205 206 protected final int minClientRequestTimeout; 207 208 protected final Server server; 209 protected final List<BlockingServiceAndInterface> services; 210 211 protected final RpcScheduler scheduler; 212 213 protected UserProvider userProvider; 214 215 protected final ByteBuffAllocator bbAllocator; 216 217 protected volatile boolean allowFallbackToSimpleAuth; 218 219 /** 220 * Used to get details for scan with a scanner_id<br/> 221 * TODO try to figure out a better way and remove reference from regionserver package later. 222 */ 223 private RSRpcServices rsRpcServices; 224 225 /** 226 * Use to add online slowlog responses 227 */ 228 private NamedQueueRecorder namedQueueRecorder; 229 230 @FunctionalInterface 231 protected interface CallCleanup { 232 void run(); 233 } 234 235 /** 236 * Datastructure for passing a {@link BlockingService} and its associated class of protobuf 237 * service interface. For example, a server that fielded what is defined in the client protobuf 238 * service would pass in an implementation of the client blocking service and then its 239 * ClientService.BlockingInterface.class. Used checking connection setup. 240 */ 241 public static class BlockingServiceAndInterface { 242 private final BlockingService service; 243 private final Class<?> serviceInterface; 244 245 public BlockingServiceAndInterface(final BlockingService service, 246 final Class<?> serviceInterface) { 247 this.service = service; 248 this.serviceInterface = serviceInterface; 249 } 250 251 public Class<?> getServiceInterface() { 252 return this.serviceInterface; 253 } 254 255 public BlockingService getBlockingService() { 256 return this.service; 257 } 258 } 259 260 /** 261 * Constructs a server listening on the named port and address. 262 * @param server hosting instance of {@link Server}. We will do authentications if an 263 * instance else pass null for no authentication check. 264 * @param name Used keying this rpc servers' metrics and for naming the Listener 265 * thread. 266 * @param services A list of services. 267 * @param bindAddress Where to listen 268 * @param reservoirEnabled Enable ByteBufferPool or not. 269 */ 270 public RpcServer(final Server server, final String name, 271 final List<BlockingServiceAndInterface> services, final InetSocketAddress bindAddress, 272 Configuration conf, RpcScheduler scheduler, boolean reservoirEnabled) throws IOException { 273 this.bbAllocator = ByteBuffAllocator.create(conf, reservoirEnabled); 274 this.server = server; 275 this.services = services; 276 this.bindAddress = bindAddress; 277 this.conf = conf; 278 // See declaration above for documentation on what this size is. 279 this.maxQueueSizeInBytes = 280 this.conf.getLong("hbase.ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE); 281 282 this.warnResponseTime = getWarnResponseTime(conf); 283 this.warnResponseSize = getWarnResponseSize(conf); 284 this.warnScanResponseTime = getWarnScanResponseTime(conf); 285 this.warnScanResponseSize = getWarnScanResponseSize(conf); 286 this.minClientRequestTimeout = 287 conf.getInt(MIN_CLIENT_REQUEST_TIMEOUT, DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT); 288 this.maxRequestSize = conf.getInt(MAX_REQUEST_SIZE, DEFAULT_MAX_REQUEST_SIZE); 289 290 this.metrics = new MetricsHBaseServer(name, new MetricsHBaseServerWrapperImpl(this)); 291 this.tcpNoDelay = conf.getBoolean("hbase.ipc.server.tcpnodelay", true); 292 this.tcpKeepAlive = conf.getBoolean("hbase.ipc.server.tcpkeepalive", true); 293 294 this.cellBlockBuilder = new CellBlockBuilder(conf); 295 296 this.authorize = conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false); 297 this.userProvider = UserProvider.instantiate(conf); 298 this.isSecurityEnabled = userProvider.isHBaseSecurityEnabled(); 299 if (isSecurityEnabled) { 300 saslProps = SaslUtil.initSaslProperties(conf.get("hbase.rpc.protection", 301 QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT))); 302 } else { 303 saslProps = Collections.emptyMap(); 304 } 305 306 this.isOnlineLogProviderEnabled = getIsOnlineLogProviderEnabled(conf); 307 this.scheduler = scheduler; 308 } 309 310 @Override 311 public void onConfigurationChange(Configuration newConf) { 312 initReconfigurable(newConf); 313 if (scheduler instanceof ConfigurationObserver) { 314 ((ConfigurationObserver) scheduler).onConfigurationChange(newConf); 315 } 316 if (authorize) { 317 refreshAuthManager(newConf, new HBasePolicyProvider()); 318 } 319 refreshSlowLogConfiguration(newConf); 320 } 321 322 private void refreshSlowLogConfiguration(Configuration newConf) { 323 boolean newIsOnlineLogProviderEnabled = getIsOnlineLogProviderEnabled(newConf); 324 if (isOnlineLogProviderEnabled != newIsOnlineLogProviderEnabled) { 325 isOnlineLogProviderEnabled = newIsOnlineLogProviderEnabled; 326 } 327 int newWarnResponseTime = getWarnResponseTime(newConf); 328 if (warnResponseTime != newWarnResponseTime) { 329 warnResponseTime = newWarnResponseTime; 330 } 331 int newWarnResponseSize = getWarnResponseSize(newConf); 332 if (warnResponseSize != newWarnResponseSize) { 333 warnResponseSize = newWarnResponseSize; 334 } 335 int newWarnResponseTimeScan = getWarnScanResponseTime(newConf); 336 if (warnScanResponseTime != newWarnResponseTimeScan) { 337 warnScanResponseTime = newWarnResponseTimeScan; 338 } 339 int newWarnScanResponseSize = getWarnScanResponseSize(newConf); 340 if (warnScanResponseSize != newWarnScanResponseSize) { 341 warnScanResponseSize = newWarnScanResponseSize; 342 } 343 } 344 345 private static boolean getIsOnlineLogProviderEnabled(Configuration conf) { 346 return conf.getBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, 347 HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED); 348 } 349 350 private static int getWarnResponseTime(Configuration conf) { 351 return conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME); 352 } 353 354 private static int getWarnResponseSize(Configuration conf) { 355 return conf.getInt(WARN_RESPONSE_SIZE, DEFAULT_WARN_RESPONSE_SIZE); 356 } 357 358 private static int getWarnScanResponseTime(Configuration conf) { 359 return conf.getInt(WARN_SCAN_RESPONSE_TIME, getWarnResponseTime(conf)); 360 } 361 362 private static int getWarnScanResponseSize(Configuration conf) { 363 return conf.getInt(WARN_SCAN_RESPONSE_SIZE, getWarnResponseSize(conf)); 364 } 365 366 protected void initReconfigurable(Configuration confToLoad) { 367 this.allowFallbackToSimpleAuth = confToLoad.getBoolean(FALLBACK_TO_INSECURE_CLIENT_AUTH, false); 368 if (isSecurityEnabled && allowFallbackToSimpleAuth) { 369 LOG.warn("********* WARNING! *********"); 370 LOG.warn("This server is configured to allow connections from INSECURE clients"); 371 LOG.warn("(" + FALLBACK_TO_INSECURE_CLIENT_AUTH + " = true)."); 372 LOG.warn("While this option is enabled, client identities cannot be secured, and user"); 373 LOG.warn("impersonation is possible!"); 374 LOG.warn("For secure operation, please disable SIMPLE authentication as soon as possible,"); 375 LOG.warn("by setting " + FALLBACK_TO_INSECURE_CLIENT_AUTH + " = false in hbase-site.xml"); 376 LOG.warn("****************************"); 377 } 378 } 379 380 Configuration getConf() { 381 return conf; 382 } 383 384 @Override 385 public boolean isStarted() { 386 return this.started; 387 } 388 389 @Override 390 public synchronized void refreshAuthManager(Configuration conf, PolicyProvider pp) { 391 // Ignore warnings that this should be accessed in a static way instead of via an instance; 392 // it'll break if you go via static route. 393 System.setProperty("hadoop.policy.file", "hbase-policy.xml"); 394 this.authManager.refresh(conf, pp); 395 LOG.info("Refreshed hbase-policy.xml successfully"); 396 ProxyUsers.refreshSuperUserGroupsConfiguration(conf); 397 LOG.info("Refreshed super and proxy users successfully"); 398 } 399 400 protected AuthenticationTokenSecretManager createSecretManager() { 401 if (!isSecurityEnabled) return null; 402 if (server == null) return null; 403 Configuration conf = server.getConfiguration(); 404 long keyUpdateInterval = conf.getLong("hbase.auth.key.update.interval", 24 * 60 * 60 * 1000); 405 long maxAge = conf.getLong("hbase.auth.token.max.lifetime", 7 * 24 * 60 * 60 * 1000); 406 return new AuthenticationTokenSecretManager(conf, server.getZooKeeper(), 407 server.getServerName().toString(), keyUpdateInterval, maxAge); 408 } 409 410 public SecretManager<? extends TokenIdentifier> getSecretManager() { 411 return this.secretManager; 412 } 413 414 @SuppressWarnings("unchecked") 415 public void setSecretManager(SecretManager<? extends TokenIdentifier> secretManager) { 416 this.secretManager = (SecretManager<TokenIdentifier>) secretManager; 417 } 418 419 /** 420 * This is a server side method, which is invoked over RPC. On success the return response has 421 * protobuf response payload. On failure, the exception name and the stack trace are returned in 422 * the protobuf response. 423 */ 424 @Override 425 public Pair<Message, CellScanner> call(RpcCall call, MonitoredRPCHandler status) 426 throws IOException { 427 try { 428 MethodDescriptor md = call.getMethod(); 429 Message param = call.getParam(); 430 status.setRPC(md.getName(), new Object[] { param }, call.getReceiveTime()); 431 // TODO: Review after we add in encoded data blocks. 432 status.setRPCPacket(param); 433 status.resume("Servicing call"); 434 // get an instance of the method arg type 435 HBaseRpcController controller = new HBaseRpcControllerImpl(call.getCellScanner()); 436 controller.setCallTimeout(call.getTimeout()); 437 Message result = call.getService().callBlockingMethod(md, controller, param); 438 long receiveTime = call.getReceiveTime(); 439 long startTime = call.getStartTime(); 440 long endTime = EnvironmentEdgeManager.currentTime(); 441 int processingTime = (int) (endTime - startTime); 442 int qTime = (int) (startTime - receiveTime); 443 int totalTime = (int) (endTime - receiveTime); 444 if (LOG.isTraceEnabled()) { 445 LOG.trace( 446 "{}, response: {}, receiveTime: {}, queueTime: {}, processingTime: {}, totalTime: {}", 447 CurCall.get().toString(), TextFormat.shortDebugString(result), 448 CurCall.get().getReceiveTime(), qTime, processingTime, totalTime); 449 } 450 // Use the raw request call size for now. 451 long requestSize = call.getSize(); 452 long responseSize = result.getSerializedSize(); 453 long responseBlockSize = call.getBlockBytesScanned(); 454 if (call.isClientCellBlockSupported()) { 455 // Include the payload size in HBaseRpcController 456 responseSize += call.getResponseCellSize(); 457 } 458 459 metrics.dequeuedCall(qTime); 460 metrics.processedCall(processingTime); 461 metrics.totalCall(totalTime); 462 metrics.receivedRequest(requestSize); 463 metrics.sentResponse(responseSize); 464 // log any RPC responses that are slower than the configured warn 465 // response time or larger than configured warning size 466 boolean tooSlow = isTooSlow(call, processingTime); 467 boolean tooLarge = isTooLarge(call, responseSize, responseBlockSize); 468 if (tooSlow || tooLarge) { 469 final String userName = call.getRequestUserName().orElse(StringUtils.EMPTY); 470 // when tagging, we let TooLarge trump TooSmall to keep output simple 471 // note that large responses will often also be slow. 472 logResponse(param, md.getName(), md.getName() + "(" + param.getClass().getName() + ")", 473 tooLarge, tooSlow, status.getClient(), startTime, processingTime, qTime, responseSize, 474 responseBlockSize, userName); 475 if (this.namedQueueRecorder != null && this.isOnlineLogProviderEnabled) { 476 // send logs to ring buffer owned by slowLogRecorder 477 final String className = 478 server == null ? StringUtils.EMPTY : server.getClass().getSimpleName(); 479 this.namedQueueRecorder.addRecord(new RpcLogDetails(call, param, status.getClient(), 480 responseSize, responseBlockSize, className, tooSlow, tooLarge)); 481 } 482 } 483 return new Pair<>(result, controller.cellScanner()); 484 } catch (Throwable e) { 485 // The above callBlockingMethod will always return a SE. Strip the SE wrapper before 486 // putting it on the wire. Its needed to adhere to the pb Service Interface but we don't 487 // need to pass it over the wire. 488 if (e instanceof ServiceException) { 489 if (e.getCause() == null) { 490 LOG.debug("Caught a ServiceException with null cause", e); 491 } else { 492 e = e.getCause(); 493 } 494 } 495 496 // increment the number of requests that were exceptions. 497 metrics.exception(e); 498 499 if (e instanceof LinkageError) throw new DoNotRetryIOException(e); 500 if (e instanceof IOException) throw (IOException) e; 501 LOG.error("Unexpected throwable object ", e); 502 throw new IOException(e.getMessage(), e); 503 } 504 } 505 506 /** 507 * Logs an RPC response to the LOG file, producing valid JSON objects for client Operations. 508 * @param param The parameters received in the call. 509 * @param methodName The name of the method invoked 510 * @param call The string representation of the call 511 * @param tooLarge To indicate if the event is tooLarge 512 * @param tooSlow To indicate if the event is tooSlow 513 * @param clientAddress The address of the client who made this call. 514 * @param startTime The time that the call was initiated, in ms. 515 * @param processingTime The duration that the call took to run, in ms. 516 * @param qTime The duration that the call spent on the queue prior to being 517 * initiated, in ms. 518 * @param responseSize The size in bytes of the response buffer. 519 * @param blockBytesScanned The size of block bytes scanned to retrieve the response. 520 * @param userName UserName of the current RPC Call 521 */ 522 void logResponse(Message param, String methodName, String call, boolean tooLarge, boolean tooSlow, 523 String clientAddress, long startTime, int processingTime, int qTime, long responseSize, 524 long blockBytesScanned, String userName) { 525 final String className = server == null ? StringUtils.EMPTY : server.getClass().getSimpleName(); 526 // base information that is reported regardless of type of call 527 Map<String, Object> responseInfo = new HashMap<>(); 528 responseInfo.put("starttimems", startTime); 529 responseInfo.put("processingtimems", processingTime); 530 responseInfo.put("queuetimems", qTime); 531 responseInfo.put("responsesize", responseSize); 532 responseInfo.put("blockbytesscanned", blockBytesScanned); 533 responseInfo.put("client", clientAddress); 534 responseInfo.put("class", className); 535 responseInfo.put("method", methodName); 536 responseInfo.put("call", call); 537 // The params could be really big, make sure they don't kill us at WARN 538 String stringifiedParam = ProtobufUtil.getShortTextFormat(param); 539 if (stringifiedParam.length() > 150) { 540 // Truncate to 1000 chars if TRACE is on, else to 150 chars 541 stringifiedParam = truncateTraceLog(stringifiedParam); 542 } 543 responseInfo.put("param", stringifiedParam); 544 if (param instanceof ClientProtos.ScanRequest && rsRpcServices != null) { 545 ClientProtos.ScanRequest request = ((ClientProtos.ScanRequest) param); 546 String scanDetails; 547 if (request.hasScannerId()) { 548 long scannerId = request.getScannerId(); 549 scanDetails = rsRpcServices.getScanDetailsWithId(scannerId); 550 } else { 551 scanDetails = rsRpcServices.getScanDetailsWithRequest(request); 552 } 553 if (scanDetails != null) { 554 responseInfo.put("scandetails", scanDetails); 555 } 556 } 557 if (param instanceof ClientProtos.MultiRequest) { 558 int numGets = 0; 559 int numMutations = 0; 560 int numServiceCalls = 0; 561 ClientProtos.MultiRequest multi = (ClientProtos.MultiRequest) param; 562 for (ClientProtos.RegionAction regionAction : multi.getRegionActionList()) { 563 for (ClientProtos.Action action : regionAction.getActionList()) { 564 if (action.hasMutation()) { 565 numMutations++; 566 } 567 if (action.hasGet()) { 568 numGets++; 569 } 570 if (action.hasServiceCall()) { 571 numServiceCalls++; 572 } 573 } 574 } 575 responseInfo.put(MULTI_GETS, numGets); 576 responseInfo.put(MULTI_MUTATIONS, numMutations); 577 responseInfo.put(MULTI_SERVICE_CALLS, numServiceCalls); 578 } 579 final String tag = 580 (tooLarge && tooSlow) ? "TooLarge & TooSlow" : (tooSlow ? "TooSlow" : "TooLarge"); 581 LOG.warn("(response" + tag + "): " + GSON.toJson(responseInfo)); 582 } 583 584 private boolean isTooSlow(RpcCall call, int processingTime) { 585 long warnResponseTime = call.getParam() instanceof ClientProtos.ScanRequest 586 ? warnScanResponseTime 587 : this.warnResponseTime; 588 return (processingTime > warnResponseTime && warnResponseTime > -1); 589 } 590 591 private boolean isTooLarge(RpcCall call, long responseSize, long responseBlockSize) { 592 long warnResponseSize = call.getParam() instanceof ClientProtos.ScanRequest 593 ? warnScanResponseSize 594 : this.warnResponseSize; 595 return (warnResponseSize > -1 596 && (responseSize > warnResponseSize || responseBlockSize > warnResponseSize)); 597 } 598 599 /** 600 * Truncate to number of chars decided by conf hbase.ipc.trace.log.max.length if TRACE is on else 601 * to 150 chars Refer to Jira HBASE-20826 and HBASE-20942 602 * @param strParam stringifiedParam to be truncated 603 * @return truncated trace log string 604 */ 605 String truncateTraceLog(String strParam) { 606 if (LOG.isTraceEnabled()) { 607 int traceLogMaxLength = getConf().getInt(TRACE_LOG_MAX_LENGTH, DEFAULT_TRACE_LOG_MAX_LENGTH); 608 int truncatedLength = 609 strParam.length() < traceLogMaxLength ? strParam.length() : traceLogMaxLength; 610 String truncatedFlag = truncatedLength == strParam.length() ? "" : KEY_WORD_TRUNCATED; 611 return strParam.subSequence(0, truncatedLength) + truncatedFlag; 612 } 613 return strParam.subSequence(0, 150) + KEY_WORD_TRUNCATED; 614 } 615 616 /** 617 * Set the handler for calling out of RPC for error conditions. 618 * @param handler the handler implementation 619 */ 620 @Override 621 public void setErrorHandler(HBaseRPCErrorHandler handler) { 622 this.errorHandler = handler; 623 } 624 625 @Override 626 public HBaseRPCErrorHandler getErrorHandler() { 627 return this.errorHandler; 628 } 629 630 /** 631 * Returns the metrics instance for reporting RPC call statistics 632 */ 633 @Override 634 public MetricsHBaseServer getMetrics() { 635 return metrics; 636 } 637 638 @Override 639 public void addCallSize(final long diff) { 640 this.callQueueSizeInBytes.add(diff); 641 } 642 643 /** 644 * Authorize the incoming client connection. 645 * @param user client user 646 * @param connection incoming connection 647 * @param addr InetAddress of incoming connection 648 * @throws AuthorizationException when the client isn't authorized to talk the protocol 649 */ 650 public synchronized void authorize(UserGroupInformation user, ConnectionHeader connection, 651 InetAddress addr) throws AuthorizationException { 652 if (authorize) { 653 Class<?> c = getServiceInterface(services, connection.getServiceName()); 654 authManager.authorize(user, c, getConf(), addr); 655 } 656 } 657 658 /** 659 * When the read or write buffer size is larger than this limit, i/o will be done in chunks of 660 * this size. Most RPC requests and responses would be be smaller. 661 */ 662 protected static final int NIO_BUFFER_LIMIT = 64 * 1024; // should not be more than 64KB. 663 664 /** 665 * This is a wrapper around 666 * {@link java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)}. If the amount of data 667 * is large, it writes to channel in smaller chunks. This is to avoid jdk from creating many 668 * direct buffers as the size of ByteBuffer increases. There should not be any performance 669 * degredation. 670 * @param channel writable byte channel to write on 671 * @param buffer buffer to write 672 * @return number of bytes written 673 * @throws java.io.IOException e 674 * @see java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer) 675 */ 676 protected int channelRead(ReadableByteChannel channel, ByteBuffer buffer) throws IOException { 677 678 int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) 679 ? channel.read(buffer) 680 : channelIO(channel, null, buffer); 681 if (count > 0) { 682 metrics.receivedBytes(count); 683 } 684 return count; 685 } 686 687 /** 688 * Helper for {@link #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)}. 689 * Only one of readCh or writeCh should be non-null. 690 * @param readCh read channel 691 * @param writeCh write channel 692 * @param buf buffer to read or write into/out of 693 * @return bytes written 694 * @throws java.io.IOException e 695 * @see #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer) 696 */ 697 private static int channelIO(ReadableByteChannel readCh, WritableByteChannel writeCh, 698 ByteBuffer buf) throws IOException { 699 700 int originalLimit = buf.limit(); 701 int initialRemaining = buf.remaining(); 702 int ret = 0; 703 704 while (buf.remaining() > 0) { 705 try { 706 int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT); 707 buf.limit(buf.position() + ioSize); 708 709 ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf); 710 711 if (ret < ioSize) { 712 break; 713 } 714 715 } finally { 716 buf.limit(originalLimit); 717 } 718 } 719 720 int nBytes = initialRemaining - buf.remaining(); 721 return (nBytes > 0) ? nBytes : ret; 722 } 723 724 /** 725 * Needed for features such as delayed calls. We need to be able to store the current call so that 726 * we can complete it later or ask questions of what is supported by the current ongoing call. 727 * @return An RpcCallContext backed by the currently ongoing call (gotten from a thread local) 728 */ 729 public static Optional<RpcCall> getCurrentCall() { 730 return Optional.ofNullable(CurCall.get()); 731 } 732 733 /** 734 * Just return the current rpc call if it is a {@link ServerCall} and also has {@link CellScanner} 735 * attached. 736 * <p/> 737 * Mainly used for reference counting as {@link CellScanner} may reference non heap memory. 738 */ 739 public static Optional<ServerCall<?>> getCurrentServerCallWithCellScanner() { 740 return getCurrentCall().filter(c -> c instanceof ServerCall) 741 .filter(c -> c.getCellScanner() != null).map(c -> (ServerCall<?>) c); 742 } 743 744 public static boolean isInRpcCallContext() { 745 return CurCall.get() != null; 746 } 747 748 /** 749 * Used by {@link org.apache.hadoop.hbase.procedure2.store.region.RegionProcedureStore}. For 750 * master's rpc call, it may generate new procedure and mutate the region which store procedure. 751 * There are some check about rpc when mutate region, such as rpc timeout check. So unset the rpc 752 * call to avoid the rpc check. 753 * @return the currently ongoing rpc call 754 */ 755 public static Optional<RpcCall> unsetCurrentCall() { 756 Optional<RpcCall> rpcCall = getCurrentCall(); 757 CurCall.set(null); 758 return rpcCall; 759 } 760 761 /** 762 * Used by {@link org.apache.hadoop.hbase.procedure2.store.region.RegionProcedureStore}. Set the 763 * rpc call back after mutate region. 764 */ 765 public static void setCurrentCall(RpcCall rpcCall) { 766 CurCall.set(rpcCall); 767 } 768 769 /** 770 * Returns the user credentials associated with the current RPC request or not present if no 771 * credentials were provided. 772 * @return A User 773 */ 774 public static Optional<User> getRequestUser() { 775 Optional<RpcCall> ctx = getCurrentCall(); 776 return ctx.isPresent() ? ctx.get().getRequestUser() : Optional.empty(); 777 } 778 779 /** 780 * The number of open RPC conections 781 * @return the number of open rpc connections 782 */ 783 abstract public int getNumOpenConnections(); 784 785 /** 786 * Returns the username for any user associated with the current RPC request or not present if no 787 * user is set. 788 */ 789 public static Optional<String> getRequestUserName() { 790 return getRequestUser().map(User::getShortName); 791 } 792 793 /** Returns Address of remote client if a request is ongoing, else null */ 794 public static Optional<InetAddress> getRemoteAddress() { 795 return getCurrentCall().map(RpcCall::getRemoteAddress); 796 } 797 798 /** 799 * @param serviceName Some arbitrary string that represents a 'service'. 800 * @param services Available service instances 801 * @return Matching BlockingServiceAndInterface pair 802 */ 803 protected static BlockingServiceAndInterface getServiceAndInterface( 804 final List<BlockingServiceAndInterface> services, final String serviceName) { 805 for (BlockingServiceAndInterface bs : services) { 806 if (bs.getBlockingService().getDescriptorForType().getName().equals(serviceName)) { 807 return bs; 808 } 809 } 810 return null; 811 } 812 813 /** 814 * @param serviceName Some arbitrary string that represents a 'service'. 815 * @param services Available services and their service interfaces. 816 * @return Service interface class for <code>serviceName</code> 817 */ 818 protected static Class<?> getServiceInterface(final List<BlockingServiceAndInterface> services, 819 final String serviceName) { 820 BlockingServiceAndInterface bsasi = getServiceAndInterface(services, serviceName); 821 return bsasi == null ? null : bsasi.getServiceInterface(); 822 } 823 824 /** 825 * @param serviceName Some arbitrary string that represents a 'service'. 826 * @param services Available services and their service interfaces. 827 * @return BlockingService that goes with the passed <code>serviceName</code> 828 */ 829 protected static BlockingService getService(final List<BlockingServiceAndInterface> services, 830 final String serviceName) { 831 BlockingServiceAndInterface bsasi = getServiceAndInterface(services, serviceName); 832 return bsasi == null ? null : bsasi.getBlockingService(); 833 } 834 835 protected static MonitoredRPCHandler getStatus() { 836 // It is ugly the way we park status up in RpcServer. Let it be for now. TODO. 837 MonitoredRPCHandler status = RpcServer.MONITORED_RPC.get(); 838 if (status != null) { 839 return status; 840 } 841 status = TaskMonitor.get().createRPCStatus(Thread.currentThread().getName()); 842 status.pause("Waiting for a call"); 843 RpcServer.MONITORED_RPC.set(status); 844 return status; 845 } 846 847 /** 848 * Returns the remote side ip address when invoked inside an RPC Returns null incase of an error. 849 */ 850 public static InetAddress getRemoteIp() { 851 RpcCall call = CurCall.get(); 852 if (call != null) { 853 return call.getRemoteAddress(); 854 } 855 return null; 856 } 857 858 @Override 859 public RpcScheduler getScheduler() { 860 return scheduler; 861 } 862 863 @Override 864 public ByteBuffAllocator getByteBuffAllocator() { 865 return this.bbAllocator; 866 } 867 868 @Override 869 public void setRsRpcServices(RSRpcServices rsRpcServices) { 870 this.rsRpcServices = rsRpcServices; 871 } 872 873 @Override 874 public void setNamedQueueRecorder(NamedQueueRecorder namedQueueRecorder) { 875 this.namedQueueRecorder = namedQueueRecorder; 876 } 877 878 protected boolean needAuthorization() { 879 return authorize; 880 } 881}