001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION; 021 022import com.google.errorprone.annotations.RestrictedApi; 023import java.io.IOException; 024import java.net.InetAddress; 025import java.net.InetSocketAddress; 026import java.nio.ByteBuffer; 027import java.nio.channels.ReadableByteChannel; 028import java.nio.channels.WritableByteChannel; 029import java.util.Collections; 030import java.util.HashMap; 031import java.util.List; 032import java.util.Locale; 033import java.util.Map; 034import java.util.Optional; 035import java.util.concurrent.atomic.LongAdder; 036import org.apache.commons.lang3.StringUtils; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.hbase.CallQueueTooBigException; 039import org.apache.hadoop.hbase.CellScanner; 040import org.apache.hadoop.hbase.DoNotRetryIOException; 041import org.apache.hadoop.hbase.ExtendedCellScanner; 042import org.apache.hadoop.hbase.HConstants; 043import org.apache.hadoop.hbase.Server; 044import org.apache.hadoop.hbase.conf.ConfigurationObserver; 045import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 046import org.apache.hadoop.hbase.io.ByteBuffAllocator; 047import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; 048import org.apache.hadoop.hbase.monitoring.TaskMonitor; 049import org.apache.hadoop.hbase.monitoring.ThreadLocalServerSideScanMetrics; 050import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder; 051import org.apache.hadoop.hbase.namequeues.RpcLogDetails; 052import org.apache.hadoop.hbase.regionserver.RSRpcServices; 053import org.apache.hadoop.hbase.security.HBasePolicyProvider; 054import org.apache.hadoop.hbase.security.SaslUtil; 055import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection; 056import org.apache.hadoop.hbase.security.User; 057import org.apache.hadoop.hbase.security.UserProvider; 058import org.apache.hadoop.hbase.security.provider.SaslServerAuthenticationProviders; 059import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager; 060import org.apache.hadoop.hbase.util.CoprocessorConfigurationUtil; 061import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 062import org.apache.hadoop.hbase.util.GsonUtil; 063import org.apache.hadoop.hbase.util.Pair; 064import org.apache.hadoop.security.UserGroupInformation; 065import org.apache.hadoop.security.authorize.AuthorizationException; 066import org.apache.hadoop.security.authorize.PolicyProvider; 067import org.apache.hadoop.security.authorize.ProxyUsers; 068import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; 069import org.apache.hadoop.security.token.SecretManager; 070import org.apache.hadoop.security.token.TokenIdentifier; 071import org.apache.yetus.audience.InterfaceAudience; 072import org.slf4j.Logger; 073import org.slf4j.LoggerFactory; 074 075import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 076import org.apache.hbase.thirdparty.com.google.gson.Gson; 077import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; 078import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 079import org.apache.hbase.thirdparty.com.google.protobuf.Message; 080import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 081import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 082 083import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 084import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 085import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader; 086 087/** 088 * An RPC server that hosts protobuf described Services. 089 */ 090@InterfaceAudience.Private 091public abstract class RpcServer implements RpcServerInterface, ConfigurationObserver { 092 // LOG is being used in CallRunner and the log level is being changed in tests 093 public static final Logger LOG = LoggerFactory.getLogger(RpcServer.class); 094 protected static final CallQueueTooBigException CALL_QUEUE_TOO_BIG_EXCEPTION = 095 new CallQueueTooBigException(); 096 097 private static final String MULTI_GETS = "multi.gets"; 098 private static final String MULTI_MUTATIONS = "multi.mutations"; 099 private static final String MULTI_SERVICE_CALLS = "multi.service_calls"; 100 101 private final boolean authorize; 102 private volatile boolean isOnlineLogProviderEnabled; 103 protected boolean isSecurityEnabled; 104 protected final SaslServerAuthenticationProviders saslProviders; 105 106 public static final byte CURRENT_VERSION = 0; 107 108 /** 109 * Whether we allow a fallback to SIMPLE auth for insecure clients when security is enabled. 110 */ 111 public static final String FALLBACK_TO_INSECURE_CLIENT_AUTH = 112 "hbase.ipc.server.fallback-to-simple-auth-allowed"; 113 114 /** 115 * How many calls/handler are allowed in the queue. 116 */ 117 protected static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10; 118 119 protected final CellBlockBuilder cellBlockBuilder; 120 121 protected static final String AUTH_FAILED_FOR = "Auth failed for "; 122 protected static final String AUTH_SUCCESSFUL_FOR = "Auth successful for "; 123 protected static final Logger AUDITLOG = 124 LoggerFactory.getLogger("SecurityLogger." + Server.class.getName()); 125 protected SecretManager<TokenIdentifier> secretManager; 126 protected final Map<String, String> saslProps; 127 protected final String serverPrincipal; 128 129 protected ServiceAuthorizationManager authManager; 130 131 /** 132 * This is set to Call object before Handler invokes an RPC and ybdie after the call returns. 133 */ 134 protected static final ThreadLocal<RpcCall> CurCall = new ThreadLocal<>(); 135 136 /** Keeps MonitoredRPCHandler per handler thread. */ 137 protected static final ThreadLocal<MonitoredRPCHandler> MONITORED_RPC = new ThreadLocal<>(); 138 139 protected final InetSocketAddress bindAddress; 140 141 protected MetricsHBaseServer metrics; 142 143 protected final Configuration conf; 144 145 /** 146 * Maximum size in bytes of the currently queued and running Calls. If a new Call puts us over 147 * this size, then we will reject the call (after parsing it though). It will go back to the 148 * client and client will retry. Set this size with "hbase.ipc.server.max.callqueue.size". The 149 * call queue size gets incremented after we parse a call and before we add it to the queue of 150 * calls for the scheduler to use. It get decremented after we have 'run' the Call. The current 151 * size is kept in {@link #callQueueSizeInBytes}. 152 * @see #callQueueSizeInBytes 153 * @see #DEFAULT_MAX_CALLQUEUE_SIZE 154 */ 155 protected final long maxQueueSizeInBytes; 156 protected static final int DEFAULT_MAX_CALLQUEUE_SIZE = 1024 * 1024 * 1024; 157 158 /** 159 * This is a running count of the size in bytes of all outstanding calls whether currently 160 * executing or queued waiting to be run. 161 */ 162 protected final LongAdder callQueueSizeInBytes = new LongAdder(); 163 164 protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm 165 protected final boolean tcpKeepAlive; // if T then use keepalives 166 167 /** 168 * This flag is used to indicate to sub threads when they should go down. When we call 169 * {@link #start()}, all threads started will consult this flag on whether they should keep going. 170 * It is set to false when {@link #stop()} is called. 171 */ 172 volatile boolean running = true; 173 174 /** 175 * This flag is set to true after all threads are up and 'running' and the server is then opened 176 * for business by the call to {@link #start()}. 177 */ 178 volatile boolean started = false; 179 180 protected AuthenticationTokenSecretManager authTokenSecretMgr = null; 181 182 protected HBaseRPCErrorHandler errorHandler = null; 183 184 public static final String MAX_REQUEST_SIZE = "hbase.ipc.max.request.size"; 185 186 protected static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time"; 187 protected static final String WARN_RESPONSE_SIZE = "hbase.ipc.warn.response.size"; 188 protected static final String WARN_SCAN_RESPONSE_TIME = "hbase.ipc.warn.response.time.scan"; 189 protected static final String WARN_SCAN_RESPONSE_SIZE = "hbase.ipc.warn.response.size.scan"; 190 191 /** 192 * Minimum allowable timeout (in milliseconds) in rpc request's header. This configuration exists 193 * to prevent the rpc service regarding this request as timeout immediately. 194 */ 195 protected static final String MIN_CLIENT_REQUEST_TIMEOUT = "hbase.ipc.min.client.request.timeout"; 196 protected static final int DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT = 20; 197 198 /** Default value for above params */ 199 public static final int DEFAULT_MAX_REQUEST_SIZE = DEFAULT_MAX_CALLQUEUE_SIZE / 4; // 256M 200 protected static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds 201 protected static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024; 202 203 protected static final int DEFAULT_TRACE_LOG_MAX_LENGTH = 1000; 204 protected static final String TRACE_LOG_MAX_LENGTH = "hbase.ipc.trace.log.max.length"; 205 protected static final String KEY_WORD_TRUNCATED = " <TRUNCATED>"; 206 207 protected static final Gson GSON = GsonUtil.createGsonWithDisableHtmlEscaping().create(); 208 209 protected final int maxRequestSize; 210 protected volatile int warnResponseTime; 211 protected volatile int warnResponseSize; 212 protected volatile int warnScanResponseTime; 213 protected volatile int warnScanResponseSize; 214 215 protected final int minClientRequestTimeout; 216 217 protected final Server server; 218 protected final List<BlockingServiceAndInterface> services; 219 220 protected final RpcScheduler scheduler; 221 222 protected final UserProvider userProvider; 223 224 protected final ByteBuffAllocator bbAllocator; 225 226 protected volatile boolean allowFallbackToSimpleAuth; 227 228 volatile RpcCoprocessorHost cpHost; 229 230 /** 231 * Used to get details for scan with a scanner_id<br/> 232 * TODO try to figure out a better way and remove reference from regionserver package later. 233 */ 234 private RSRpcServices rsRpcServices; 235 236 /** 237 * Use to add online slowlog responses 238 */ 239 private NamedQueueRecorder namedQueueRecorder; 240 241 @FunctionalInterface 242 protected interface CallCleanup { 243 void run(); 244 } 245 246 /** 247 * Datastructure for passing a {@link BlockingService} and its associated class of protobuf 248 * service interface. For example, a server that fielded what is defined in the client protobuf 249 * service would pass in an implementation of the client blocking service and then its 250 * ClientService.BlockingInterface.class. Used checking connection setup. 251 */ 252 public static class BlockingServiceAndInterface { 253 private final BlockingService service; 254 private final Class<?> serviceInterface; 255 256 public BlockingServiceAndInterface(final BlockingService service, 257 final Class<?> serviceInterface) { 258 this.service = service; 259 this.serviceInterface = serviceInterface; 260 } 261 262 public Class<?> getServiceInterface() { 263 return this.serviceInterface; 264 } 265 266 public BlockingService getBlockingService() { 267 return this.service; 268 } 269 } 270 271 /** 272 * Constructs a server listening on the named port and address. 273 * @param server hosting instance of {@link Server}. We will do authentications if an 274 * instance else pass null for no authentication check. 275 * @param name Used keying this rpc servers' metrics and for naming the Listener 276 * thread. 277 * @param services A list of services. 278 * @param bindAddress Where to listen 279 * @param reservoirEnabled Enable ByteBufferPool or not. 280 */ 281 public RpcServer(final Server server, final String name, 282 final List<BlockingServiceAndInterface> services, final InetSocketAddress bindAddress, 283 Configuration conf, RpcScheduler scheduler, boolean reservoirEnabled) throws IOException { 284 this.bbAllocator = ByteBuffAllocator.create(conf, reservoirEnabled); 285 this.server = server; 286 this.services = services; 287 this.bindAddress = bindAddress; 288 this.conf = conf; 289 // See declaration above for documentation on what this size is. 290 this.maxQueueSizeInBytes = 291 this.conf.getLong("hbase.ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE); 292 293 this.warnResponseTime = getWarnResponseTime(conf); 294 this.warnResponseSize = getWarnResponseSize(conf); 295 this.warnScanResponseTime = getWarnScanResponseTime(conf); 296 this.warnScanResponseSize = getWarnScanResponseSize(conf); 297 this.minClientRequestTimeout = 298 conf.getInt(MIN_CLIENT_REQUEST_TIMEOUT, DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT); 299 this.maxRequestSize = conf.getInt(MAX_REQUEST_SIZE, DEFAULT_MAX_REQUEST_SIZE); 300 301 this.metrics = new MetricsHBaseServer(name, new MetricsHBaseServerWrapperImpl(this)); 302 this.tcpNoDelay = conf.getBoolean("hbase.ipc.server.tcpnodelay", true); 303 this.tcpKeepAlive = conf.getBoolean("hbase.ipc.server.tcpkeepalive", true); 304 305 this.cellBlockBuilder = new CellBlockBuilder(conf); 306 307 this.authorize = conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false); 308 this.userProvider = UserProvider.instantiate(conf); 309 this.isSecurityEnabled = userProvider.isHBaseSecurityEnabled(); 310 if (isSecurityEnabled) { 311 saslProps = SaslUtil.initSaslProperties(conf.get("hbase.rpc.protection", 312 QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT))); 313 serverPrincipal = Preconditions.checkNotNull(userProvider.getCurrentUserName(), 314 "can not get current user name when security is enabled"); 315 } else { 316 saslProps = Collections.emptyMap(); 317 serverPrincipal = HConstants.EMPTY_STRING; 318 } 319 this.saslProviders = new SaslServerAuthenticationProviders(conf); 320 321 this.isOnlineLogProviderEnabled = getIsOnlineLogProviderEnabled(conf); 322 this.scheduler = scheduler; 323 324 initializeCoprocessorHost(getConf()); 325 } 326 327 @Override 328 public void onConfigurationChange(Configuration newConf) { 329 initReconfigurable(newConf); 330 if (scheduler instanceof ConfigurationObserver) { 331 ((ConfigurationObserver) scheduler).onConfigurationChange(newConf); 332 } 333 if (authorize) { 334 refreshAuthManager(newConf, new HBasePolicyProvider()); 335 } 336 refreshSlowLogConfiguration(newConf); 337 if ( 338 CoprocessorConfigurationUtil.checkConfigurationChange(this.cpHost, newConf, 339 CoprocessorHost.RPC_COPROCESSOR_CONF_KEY) 340 ) { 341 LOG.info("Update the RPC coprocessor(s) because the configuration has changed"); 342 initializeCoprocessorHost(newConf); 343 } 344 } 345 346 private void refreshSlowLogConfiguration(Configuration newConf) { 347 boolean newIsOnlineLogProviderEnabled = getIsOnlineLogProviderEnabled(newConf); 348 if (isOnlineLogProviderEnabled != newIsOnlineLogProviderEnabled) { 349 isOnlineLogProviderEnabled = newIsOnlineLogProviderEnabled; 350 } 351 int newWarnResponseTime = getWarnResponseTime(newConf); 352 if (warnResponseTime != newWarnResponseTime) { 353 warnResponseTime = newWarnResponseTime; 354 } 355 int newWarnResponseSize = getWarnResponseSize(newConf); 356 if (warnResponseSize != newWarnResponseSize) { 357 warnResponseSize = newWarnResponseSize; 358 } 359 int newWarnResponseTimeScan = getWarnScanResponseTime(newConf); 360 if (warnScanResponseTime != newWarnResponseTimeScan) { 361 warnScanResponseTime = newWarnResponseTimeScan; 362 } 363 int newWarnScanResponseSize = getWarnScanResponseSize(newConf); 364 if (warnScanResponseSize != newWarnScanResponseSize) { 365 warnScanResponseSize = newWarnScanResponseSize; 366 } 367 } 368 369 private static boolean getIsOnlineLogProviderEnabled(Configuration conf) { 370 return conf.getBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, 371 HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED); 372 } 373 374 private static int getWarnResponseTime(Configuration conf) { 375 return conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME); 376 } 377 378 private static int getWarnResponseSize(Configuration conf) { 379 return conf.getInt(WARN_RESPONSE_SIZE, DEFAULT_WARN_RESPONSE_SIZE); 380 } 381 382 private static int getWarnScanResponseTime(Configuration conf) { 383 return conf.getInt(WARN_SCAN_RESPONSE_TIME, getWarnResponseTime(conf)); 384 } 385 386 private static int getWarnScanResponseSize(Configuration conf) { 387 return conf.getInt(WARN_SCAN_RESPONSE_SIZE, getWarnResponseSize(conf)); 388 } 389 390 protected void initReconfigurable(Configuration confToLoad) { 391 this.allowFallbackToSimpleAuth = confToLoad.getBoolean(FALLBACK_TO_INSECURE_CLIENT_AUTH, false); 392 if (isSecurityEnabled && allowFallbackToSimpleAuth) { 393 LOG.warn("********* WARNING! *********"); 394 LOG.warn("This server is configured to allow connections from INSECURE clients"); 395 LOG.warn("(" + FALLBACK_TO_INSECURE_CLIENT_AUTH + " = true)."); 396 LOG.warn("While this option is enabled, client identities cannot be secured, and user"); 397 LOG.warn("impersonation is possible!"); 398 LOG.warn("For secure operation, please disable SIMPLE authentication as soon as possible,"); 399 LOG.warn("by setting " + FALLBACK_TO_INSECURE_CLIENT_AUTH + " = false in hbase-site.xml"); 400 LOG.warn("****************************"); 401 } 402 } 403 404 Configuration getConf() { 405 return conf; 406 } 407 408 @Override 409 public boolean isStarted() { 410 return this.started; 411 } 412 413 @Override 414 public synchronized void refreshAuthManager(Configuration conf, PolicyProvider pp) { 415 // Ignore warnings that this should be accessed in a static way instead of via an instance; 416 // it'll break if you go via static route. 417 System.setProperty("hadoop.policy.file", "hbase-policy.xml"); 418 this.authManager.refresh(conf, pp); 419 LOG.info("Refreshed hbase-policy.xml successfully"); 420 ProxyUsers.refreshSuperUserGroupsConfiguration(conf); 421 LOG.info("Refreshed super and proxy users successfully"); 422 } 423 424 protected AuthenticationTokenSecretManager createSecretManager() { 425 if (!isSecurityEnabled) return null; 426 if (server == null) return null; 427 Configuration conf = server.getConfiguration(); 428 long keyUpdateInterval = conf.getLong("hbase.auth.key.update.interval", 24 * 60 * 60 * 1000); 429 long maxAge = conf.getLong("hbase.auth.token.max.lifetime", 7 * 24 * 60 * 60 * 1000); 430 return new AuthenticationTokenSecretManager(conf, server.getZooKeeper(), 431 server.getServerName().toString(), keyUpdateInterval, maxAge); 432 } 433 434 public SecretManager<? extends TokenIdentifier> getSecretManager() { 435 return this.secretManager; 436 } 437 438 @SuppressWarnings("unchecked") 439 public void setSecretManager(SecretManager<? extends TokenIdentifier> secretManager) { 440 this.secretManager = (SecretManager<TokenIdentifier>) secretManager; 441 } 442 443 /** 444 * This is a server side method, which is invoked over RPC. On success the return response has 445 * protobuf response payload. On failure, the exception name and the stack trace are returned in 446 * the protobuf response. 447 */ 448 @Override 449 public Pair<Message, ExtendedCellScanner> call(RpcCall call, MonitoredRPCHandler status) 450 throws IOException { 451 try { 452 MethodDescriptor md = call.getMethod(); 453 Message param = call.getParam(); 454 status.setRPC(md.getName(), new Object[] { param }, call.getReceiveTime()); 455 // TODO: Review after we add in encoded data blocks. 456 status.setRPCPacket(param); 457 status.resume("Servicing call"); 458 // get an instance of the method arg type 459 HBaseRpcController controller = new HBaseRpcControllerImpl(call.getCellScanner()); 460 controller.setCallTimeout(call.getTimeout()); 461 Message result = call.getService().callBlockingMethod(md, controller, param); 462 long receiveTime = call.getReceiveTime(); 463 long startTime = call.getStartTime(); 464 long endTime = EnvironmentEdgeManager.currentTime(); 465 int processingTime = (int) (endTime - startTime); 466 int qTime = (int) (startTime - receiveTime); 467 int totalTime = (int) (endTime - receiveTime); 468 long fsReadTime = ThreadLocalServerSideScanMetrics.getFsReadTimeCounter().get(); 469 if (LOG.isTraceEnabled()) { 470 LOG.trace( 471 "{}, response: {}, receiveTime: {}, queueTime: {}, processingTime: {}, " 472 + "totalTime: {}, fsReadTime: {}", 473 CurCall.get().toString(), TextFormat.shortDebugString(result), 474 CurCall.get().getReceiveTime(), qTime, processingTime, totalTime, fsReadTime); 475 } 476 // Use the raw request call size for now. 477 long requestSize = call.getSize(); 478 long responseSize = result.getSerializedSize(); 479 long responseBlockSize = call.getBlockBytesScanned(); 480 if (call.isClientCellBlockSupported()) { 481 // Include the payload size in HBaseRpcController 482 responseSize += call.getResponseCellSize(); 483 } 484 485 metrics.dequeuedCall(qTime); 486 metrics.processedCall(processingTime); 487 metrics.totalCall(totalTime); 488 metrics.receivedRequest(requestSize); 489 metrics.sentResponse(responseSize); 490 // log any RPC responses that are slower than the configured warn 491 // response time or larger than configured warning size 492 boolean tooSlow = isTooSlow(call, processingTime); 493 boolean tooLarge = isTooLarge(call, responseSize, responseBlockSize); 494 if (tooSlow || tooLarge) { 495 final String userName = call.getRequestUserName().orElse(StringUtils.EMPTY); 496 // when tagging, we let TooLarge trump TooSmall to keep output simple 497 // note that large responses will often also be slow. 498 logResponse(param, md.getName(), md.getName() + "(" + param.getClass().getName() + ")", 499 tooLarge, tooSlow, status.getClient(), startTime, processingTime, qTime, responseSize, 500 responseBlockSize, fsReadTime, userName); 501 if (this.namedQueueRecorder != null && this.isOnlineLogProviderEnabled) { 502 // send logs to ring buffer owned by slowLogRecorder 503 final String className = 504 server == null ? StringUtils.EMPTY : server.getClass().getSimpleName(); 505 this.namedQueueRecorder.addRecord(new RpcLogDetails(call, param, status.getClient(), 506 responseSize, responseBlockSize, fsReadTime, className, tooSlow, tooLarge)); 507 } 508 } 509 return new Pair<>(result, controller.cellScanner()); 510 } catch (Throwable e) { 511 // The above callBlockingMethod will always return a SE. Strip the SE wrapper before 512 // putting it on the wire. Its needed to adhere to the pb Service Interface but we don't 513 // need to pass it over the wire. 514 if (e instanceof ServiceException) { 515 if (e.getCause() == null) { 516 LOG.debug("Caught a ServiceException with null cause", e); 517 } else { 518 e = e.getCause(); 519 } 520 } 521 522 // increment the number of requests that were exceptions. 523 metrics.exception(e); 524 525 if (e instanceof LinkageError) throw new DoNotRetryIOException(e); 526 if (e instanceof IOException) throw (IOException) e; 527 LOG.error("Unexpected throwable object ", e); 528 throw new IOException(e.getMessage(), e); 529 } 530 } 531 532 /** 533 * Logs an RPC response to the LOG file, producing valid JSON objects for client Operations. 534 * @param param The parameters received in the call. 535 * @param methodName The name of the method invoked 536 * @param call The string representation of the call 537 * @param tooLarge To indicate if the event is tooLarge 538 * @param tooSlow To indicate if the event is tooSlow 539 * @param clientAddress The address of the client who made this call. 540 * @param startTime The time that the call was initiated, in ms. 541 * @param processingTime The duration that the call took to run, in ms. 542 * @param qTime The duration that the call spent on the queue prior to being 543 * initiated, in ms. 544 * @param responseSize The size in bytes of the response buffer. 545 * @param blockBytesScanned The size of block bytes scanned to retrieve the response. 546 * @param userName UserName of the current RPC Call 547 */ 548 void logResponse(Message param, String methodName, String call, boolean tooLarge, boolean tooSlow, 549 String clientAddress, long startTime, int processingTime, int qTime, long responseSize, 550 long blockBytesScanned, long fsReadTime, String userName) { 551 final String className = server == null ? StringUtils.EMPTY : server.getClass().getSimpleName(); 552 // base information that is reported regardless of type of call 553 Map<String, Object> responseInfo = new HashMap<>(); 554 responseInfo.put("starttimems", startTime); 555 responseInfo.put("processingtimems", processingTime); 556 responseInfo.put("queuetimems", qTime); 557 responseInfo.put("responsesize", responseSize); 558 responseInfo.put("blockbytesscanned", blockBytesScanned); 559 responseInfo.put("fsreadtime", fsReadTime); 560 responseInfo.put("client", clientAddress); 561 responseInfo.put("class", className); 562 responseInfo.put("method", methodName); 563 responseInfo.put("call", call); 564 // The params could be really big, make sure they don't kill us at WARN 565 String stringifiedParam = ProtobufUtil.getShortTextFormat(param); 566 if (stringifiedParam.length() > 150) { 567 // Truncate to 1000 chars if TRACE is on, else to 150 chars 568 stringifiedParam = truncateTraceLog(stringifiedParam); 569 } 570 responseInfo.put("param", stringifiedParam); 571 if (param instanceof ClientProtos.ScanRequest && rsRpcServices != null) { 572 ClientProtos.ScanRequest request = ((ClientProtos.ScanRequest) param); 573 String scanDetails; 574 if (request.hasScannerId()) { 575 long scannerId = request.getScannerId(); 576 scanDetails = rsRpcServices.getScanDetailsWithId(scannerId); 577 } else { 578 scanDetails = rsRpcServices.getScanDetailsWithRequest(request); 579 } 580 if (scanDetails != null) { 581 responseInfo.put("scandetails", scanDetails); 582 } 583 } 584 if (param instanceof ClientProtos.MultiRequest) { 585 int numGets = 0; 586 int numMutations = 0; 587 int numServiceCalls = 0; 588 ClientProtos.MultiRequest multi = (ClientProtos.MultiRequest) param; 589 for (ClientProtos.RegionAction regionAction : multi.getRegionActionList()) { 590 for (ClientProtos.Action action : regionAction.getActionList()) { 591 if (action.hasMutation()) { 592 numMutations++; 593 } 594 if (action.hasGet()) { 595 numGets++; 596 } 597 if (action.hasServiceCall()) { 598 numServiceCalls++; 599 } 600 } 601 } 602 responseInfo.put(MULTI_GETS, numGets); 603 responseInfo.put(MULTI_MUTATIONS, numMutations); 604 responseInfo.put(MULTI_SERVICE_CALLS, numServiceCalls); 605 } 606 final String tag = 607 (tooLarge && tooSlow) ? "TooLarge & TooSlow" : (tooSlow ? "TooSlow" : "TooLarge"); 608 LOG.warn("(response" + tag + "): " + GSON.toJson(responseInfo)); 609 } 610 611 private boolean isTooSlow(RpcCall call, int processingTime) { 612 long warnResponseTime = call.getParam() instanceof ClientProtos.ScanRequest 613 ? warnScanResponseTime 614 : this.warnResponseTime; 615 return (processingTime > warnResponseTime && warnResponseTime > -1); 616 } 617 618 private boolean isTooLarge(RpcCall call, long responseSize, long responseBlockSize) { 619 long warnResponseSize = call.getParam() instanceof ClientProtos.ScanRequest 620 ? warnScanResponseSize 621 : this.warnResponseSize; 622 return (warnResponseSize > -1 623 && (responseSize > warnResponseSize || responseBlockSize > warnResponseSize)); 624 } 625 626 /** 627 * Truncate to number of chars decided by conf hbase.ipc.trace.log.max.length if TRACE is on else 628 * to 150 chars Refer to Jira HBASE-20826 and HBASE-20942 629 * @param strParam stringifiedParam to be truncated 630 * @return truncated trace log string 631 */ 632 String truncateTraceLog(String strParam) { 633 if (LOG.isTraceEnabled()) { 634 int traceLogMaxLength = getConf().getInt(TRACE_LOG_MAX_LENGTH, DEFAULT_TRACE_LOG_MAX_LENGTH); 635 int truncatedLength = 636 strParam.length() < traceLogMaxLength ? strParam.length() : traceLogMaxLength; 637 String truncatedFlag = truncatedLength == strParam.length() ? "" : KEY_WORD_TRUNCATED; 638 return strParam.subSequence(0, truncatedLength) + truncatedFlag; 639 } 640 return strParam.subSequence(0, 150) + KEY_WORD_TRUNCATED; 641 } 642 643 /** 644 * Set the handler for calling out of RPC for error conditions. 645 * @param handler the handler implementation 646 */ 647 @Override 648 public void setErrorHandler(HBaseRPCErrorHandler handler) { 649 this.errorHandler = handler; 650 } 651 652 @Override 653 public HBaseRPCErrorHandler getErrorHandler() { 654 return this.errorHandler; 655 } 656 657 /** 658 * Returns the metrics instance for reporting RPC call statistics 659 */ 660 @Override 661 public MetricsHBaseServer getMetrics() { 662 return metrics; 663 } 664 665 @Override 666 public void addCallSize(final long diff) { 667 this.callQueueSizeInBytes.add(diff); 668 } 669 670 /** 671 * Authorize the incoming client connection. 672 * @param user client user 673 * @param connection incoming connection 674 * @param addr InetAddress of incoming connection 675 * @throws AuthorizationException when the client isn't authorized to talk the protocol 676 */ 677 public synchronized void authorize(UserGroupInformation user, ConnectionHeader connection, 678 InetAddress addr) throws AuthorizationException { 679 if (authorize) { 680 Class<?> c = getServiceInterface(services, connection.getServiceName()); 681 authManager.authorize(user, c, getConf(), addr); 682 } 683 } 684 685 /** 686 * When the read or write buffer size is larger than this limit, i/o will be done in chunks of 687 * this size. Most RPC requests and responses would be be smaller. 688 */ 689 protected static final int NIO_BUFFER_LIMIT = 64 * 1024; // should not be more than 64KB. 690 691 /** 692 * This is a wrapper around 693 * {@link java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)}. If the amount of data 694 * is large, it writes to channel in smaller chunks. This is to avoid jdk from creating many 695 * direct buffers as the size of ByteBuffer increases. There should not be any performance 696 * degredation. 697 * @param channel writable byte channel to write on 698 * @param buffer buffer to write 699 * @return number of bytes written 700 * @throws java.io.IOException e 701 * @see java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer) 702 */ 703 protected int channelRead(ReadableByteChannel channel, ByteBuffer buffer) throws IOException { 704 705 int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) 706 ? channel.read(buffer) 707 : channelIO(channel, null, buffer); 708 if (count > 0) { 709 metrics.receivedBytes(count); 710 } 711 return count; 712 } 713 714 /** 715 * Helper for {@link #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)}. 716 * Only one of readCh or writeCh should be non-null. 717 * @param readCh read channel 718 * @param writeCh write channel 719 * @param buf buffer to read or write into/out of 720 * @return bytes written 721 * @throws java.io.IOException e 722 * @see #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer) 723 */ 724 private static int channelIO(ReadableByteChannel readCh, WritableByteChannel writeCh, 725 ByteBuffer buf) throws IOException { 726 727 int originalLimit = buf.limit(); 728 int initialRemaining = buf.remaining(); 729 int ret = 0; 730 731 while (buf.remaining() > 0) { 732 try { 733 int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT); 734 buf.limit(buf.position() + ioSize); 735 736 ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf); 737 738 if (ret < ioSize) { 739 break; 740 } 741 742 } finally { 743 buf.limit(originalLimit); 744 } 745 } 746 747 int nBytes = initialRemaining - buf.remaining(); 748 return (nBytes > 0) ? nBytes : ret; 749 } 750 751 /** 752 * Needed for features such as delayed calls. We need to be able to store the current call so that 753 * we can complete it later or ask questions of what is supported by the current ongoing call. 754 * @return An RpcCallContext backed by the currently ongoing call (gotten from a thread local) 755 */ 756 public static Optional<RpcCall> getCurrentCall() { 757 return Optional.ofNullable(CurCall.get()); 758 } 759 760 /** 761 * Just return the current rpc call if it is a {@link ServerCall} and also has {@link CellScanner} 762 * attached. 763 * <p/> 764 * Mainly used for reference counting as {@link CellScanner} may reference non heap memory. 765 */ 766 public static Optional<ServerCall<?>> getCurrentServerCallWithCellScanner() { 767 return getCurrentCall().filter(c -> c instanceof ServerCall) 768 .filter(c -> c.getCellScanner() != null).map(c -> (ServerCall<?>) c); 769 } 770 771 public static boolean isInRpcCallContext() { 772 return CurCall.get() != null; 773 } 774 775 /** 776 * Used by {@link org.apache.hadoop.hbase.master.region.MasterRegion}, to avoid hit row lock 777 * timeout when updating master region in a rpc call. See HBASE-23895, HBASE-29251 and HBASE-29294 778 * for more details. 779 * @return the currently ongoing rpc call 780 */ 781 public static Optional<RpcCall> unsetCurrentCall() { 782 Optional<RpcCall> rpcCall = getCurrentCall(); 783 CurCall.set(null); 784 return rpcCall; 785 } 786 787 /** 788 * Used by {@link org.apache.hadoop.hbase.master.region.MasterRegion}. Set the rpc call back after 789 * mutate region. 790 */ 791 public static void setCurrentCall(RpcCall rpcCall) { 792 CurCall.set(rpcCall); 793 } 794 795 /** 796 * Returns the user credentials associated with the current RPC request or not present if no 797 * credentials were provided. 798 * @return A User 799 */ 800 public static Optional<User> getRequestUser() { 801 Optional<RpcCall> ctx = getCurrentCall(); 802 return ctx.isPresent() ? ctx.get().getRequestUser() : Optional.empty(); 803 } 804 805 /** 806 * Returns the RPC connection attributes for the current RPC request. These attributes are sent by 807 * the client when initiating a new connection to the HBase server. The attributes are sent in 808 * {@code ConnectionHeader.attribute} protobuf message. 809 * @return the attribute map. It will be empty if the method is called outside of an RPC context. 810 */ 811 public static Map<String, byte[]> getConnectionAttributes() { 812 return getCurrentCall().map(RpcCall::getConnectionAttributes).orElse(Map.of()); 813 } 814 815 /** 816 * The number of open RPC conections 817 * @return the number of open rpc connections 818 */ 819 abstract public int getNumOpenConnections(); 820 821 /** 822 * Returns the username for any user associated with the current RPC request or not present if no 823 * user is set. 824 */ 825 public static Optional<String> getRequestUserName() { 826 return getRequestUser().map(User::getShortName); 827 } 828 829 /** 830 * Returns the address of the remote client associated with the current RPC request or not present 831 * if no address is set. 832 */ 833 public static Optional<InetAddress> getRemoteAddress() { 834 return getCurrentCall().map(RpcCall::getRemoteAddress); 835 } 836 837 /** 838 * @param serviceName Some arbitrary string that represents a 'service'. 839 * @param services Available service instances 840 * @return Matching BlockingServiceAndInterface pair 841 */ 842 protected static BlockingServiceAndInterface getServiceAndInterface( 843 final List<BlockingServiceAndInterface> services, final String serviceName) { 844 for (BlockingServiceAndInterface bs : services) { 845 if (bs.getBlockingService().getDescriptorForType().getName().equals(serviceName)) { 846 return bs; 847 } 848 } 849 return null; 850 } 851 852 /** 853 * @param serviceName Some arbitrary string that represents a 'service'. 854 * @param services Available services and their service interfaces. 855 * @return Service interface class for <code>serviceName</code> 856 */ 857 protected static Class<?> getServiceInterface(final List<BlockingServiceAndInterface> services, 858 final String serviceName) { 859 BlockingServiceAndInterface bsasi = getServiceAndInterface(services, serviceName); 860 return bsasi == null ? null : bsasi.getServiceInterface(); 861 } 862 863 /** 864 * @param serviceName Some arbitrary string that represents a 'service'. 865 * @param services Available services and their service interfaces. 866 * @return BlockingService that goes with the passed <code>serviceName</code> 867 */ 868 protected static BlockingService getService(final List<BlockingServiceAndInterface> services, 869 final String serviceName) { 870 BlockingServiceAndInterface bsasi = getServiceAndInterface(services, serviceName); 871 return bsasi == null ? null : bsasi.getBlockingService(); 872 } 873 874 protected static MonitoredRPCHandler getStatus() { 875 // It is ugly the way we park status up in RpcServer. Let it be for now. TODO. 876 MonitoredRPCHandler status = RpcServer.MONITORED_RPC.get(); 877 if (status != null) { 878 return status; 879 } 880 status = TaskMonitor.get().createRPCStatus(Thread.currentThread().getName()); 881 status.pause("Waiting for a call"); 882 RpcServer.MONITORED_RPC.set(status); 883 return status; 884 } 885 886 /** 887 * Returns the remote side ip address when invoked inside an RPC Returns null incase of an error. 888 */ 889 public static InetAddress getRemoteIp() { 890 RpcCall call = CurCall.get(); 891 if (call != null) { 892 return call.getRemoteAddress(); 893 } 894 return null; 895 } 896 897 @Override 898 public RpcScheduler getScheduler() { 899 return scheduler; 900 } 901 902 @Override 903 public ByteBuffAllocator getByteBuffAllocator() { 904 return this.bbAllocator; 905 } 906 907 @Override 908 public void setRsRpcServices(RSRpcServices rsRpcServices) { 909 this.rsRpcServices = rsRpcServices; 910 } 911 912 @Override 913 public void setNamedQueueRecorder(NamedQueueRecorder namedQueueRecorder) { 914 this.namedQueueRecorder = namedQueueRecorder; 915 } 916 917 protected boolean needAuthorization() { 918 return authorize; 919 } 920 921 @RestrictedApi(explanation = "Should only be called in tests", link = "", 922 allowedOnPath = ".*/src/test/.*") 923 public List<BlockingServiceAndInterface> getServices() { 924 return services; 925 } 926 927 private void initializeCoprocessorHost(Configuration conf) { 928 this.cpHost = new RpcCoprocessorHost(conf); 929 } 930 931 @Override 932 public RpcCoprocessorHost getRpcCoprocessorHost() { 933 return cpHost; 934 } 935}