001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION; 021 022import java.io.IOException; 023import java.net.InetAddress; 024import java.net.InetSocketAddress; 025import java.nio.ByteBuffer; 026import java.nio.channels.ReadableByteChannel; 027import java.nio.channels.WritableByteChannel; 028import java.util.Collections; 029import java.util.HashMap; 030import java.util.List; 031import java.util.Locale; 032import java.util.Map; 033import java.util.Optional; 034import java.util.concurrent.atomic.LongAdder; 035import org.apache.commons.lang3.StringUtils; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.hbase.CallQueueTooBigException; 038import org.apache.hadoop.hbase.CellScanner; 039import org.apache.hadoop.hbase.DoNotRetryIOException; 040import org.apache.hadoop.hbase.HConstants; 041import org.apache.hadoop.hbase.Server; 042import org.apache.hadoop.hbase.conf.ConfigurationObserver; 043import org.apache.hadoop.hbase.io.ByteBuffAllocator; 044import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; 045import org.apache.hadoop.hbase.monitoring.TaskMonitor; 046import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder; 047import org.apache.hadoop.hbase.namequeues.RpcLogDetails; 048import org.apache.hadoop.hbase.regionserver.RSRpcServices; 049import org.apache.hadoop.hbase.security.HBasePolicyProvider; 050import org.apache.hadoop.hbase.security.SaslUtil; 051import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection; 052import org.apache.hadoop.hbase.security.User; 053import org.apache.hadoop.hbase.security.UserProvider; 054import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager; 055import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 056import org.apache.hadoop.hbase.util.GsonUtil; 057import org.apache.hadoop.hbase.util.Pair; 058import org.apache.hadoop.security.UserGroupInformation; 059import org.apache.hadoop.security.authorize.AuthorizationException; 060import org.apache.hadoop.security.authorize.PolicyProvider; 061import org.apache.hadoop.security.authorize.ProxyUsers; 062import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; 063import org.apache.hadoop.security.token.SecretManager; 064import org.apache.hadoop.security.token.TokenIdentifier; 065import org.apache.yetus.audience.InterfaceAudience; 066import org.slf4j.Logger; 067import org.slf4j.LoggerFactory; 068 069import org.apache.hbase.thirdparty.com.google.gson.Gson; 070import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; 071import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 072import org.apache.hbase.thirdparty.com.google.protobuf.Message; 073import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 074import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 075 076import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 077import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 078import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader; 079 080/** 081 * An RPC server that hosts protobuf described Services. 082 */ 083@InterfaceAudience.Private 084public abstract class RpcServer implements RpcServerInterface, ConfigurationObserver { 085 // LOG is being used in CallRunner and the log level is being changed in tests 086 public static final Logger LOG = LoggerFactory.getLogger(RpcServer.class); 087 protected static final CallQueueTooBigException CALL_QUEUE_TOO_BIG_EXCEPTION = 088 new CallQueueTooBigException(); 089 090 private static final String MULTI_GETS = "multi.gets"; 091 private static final String MULTI_MUTATIONS = "multi.mutations"; 092 private static final String MULTI_SERVICE_CALLS = "multi.service_calls"; 093 094 private final boolean authorize; 095 private final boolean isOnlineLogProviderEnabled; 096 protected boolean isSecurityEnabled; 097 098 public static final byte CURRENT_VERSION = 0; 099 100 /** 101 * Whether we allow a fallback to SIMPLE auth for insecure clients when security is enabled. 102 */ 103 public static final String FALLBACK_TO_INSECURE_CLIENT_AUTH = 104 "hbase.ipc.server.fallback-to-simple-auth-allowed"; 105 106 /** 107 * How many calls/handler are allowed in the queue. 108 */ 109 protected static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10; 110 111 protected final CellBlockBuilder cellBlockBuilder; 112 113 protected static final String AUTH_FAILED_FOR = "Auth failed for "; 114 protected static final String AUTH_SUCCESSFUL_FOR = "Auth successful for "; 115 protected static final Logger AUDITLOG = 116 LoggerFactory.getLogger("SecurityLogger." + Server.class.getName()); 117 protected SecretManager<TokenIdentifier> secretManager; 118 protected final Map<String, String> saslProps; 119 120 protected ServiceAuthorizationManager authManager; 121 122 /** 123 * This is set to Call object before Handler invokes an RPC and ybdie after the call returns. 124 */ 125 protected static final ThreadLocal<RpcCall> CurCall = new ThreadLocal<>(); 126 127 /** Keeps MonitoredRPCHandler per handler thread. */ 128 protected static final ThreadLocal<MonitoredRPCHandler> MONITORED_RPC = new ThreadLocal<>(); 129 130 protected final InetSocketAddress bindAddress; 131 132 protected MetricsHBaseServer metrics; 133 134 protected final Configuration conf; 135 136 /** 137 * Maximum size in bytes of the currently queued and running Calls. If a new Call puts us over 138 * this size, then we will reject the call (after parsing it though). It will go back to the 139 * client and client will retry. Set this size with "hbase.ipc.server.max.callqueue.size". The 140 * call queue size gets incremented after we parse a call and before we add it to the queue of 141 * calls for the scheduler to use. It get decremented after we have 'run' the Call. The current 142 * size is kept in {@link #callQueueSizeInBytes}. 143 * @see #callQueueSizeInBytes 144 * @see #DEFAULT_MAX_CALLQUEUE_SIZE 145 */ 146 protected final long maxQueueSizeInBytes; 147 protected static final int DEFAULT_MAX_CALLQUEUE_SIZE = 1024 * 1024 * 1024; 148 149 /** 150 * This is a running count of the size in bytes of all outstanding calls whether currently 151 * executing or queued waiting to be run. 152 */ 153 protected final LongAdder callQueueSizeInBytes = new LongAdder(); 154 155 protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm 156 protected final boolean tcpKeepAlive; // if T then use keepalives 157 158 /** 159 * This flag is used to indicate to sub threads when they should go down. When we call 160 * {@link #start()}, all threads started will consult this flag on whether they should keep going. 161 * It is set to false when {@link #stop()} is called. 162 */ 163 volatile boolean running = true; 164 165 /** 166 * This flag is set to true after all threads are up and 'running' and the server is then opened 167 * for business by the call to {@link #start()}. 168 */ 169 volatile boolean started = false; 170 171 protected AuthenticationTokenSecretManager authTokenSecretMgr = null; 172 173 protected HBaseRPCErrorHandler errorHandler = null; 174 175 public static final String MAX_REQUEST_SIZE = "hbase.ipc.max.request.size"; 176 177 protected static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time"; 178 protected static final String WARN_RESPONSE_SIZE = "hbase.ipc.warn.response.size"; 179 180 /** 181 * Minimum allowable timeout (in milliseconds) in rpc request's header. This configuration exists 182 * to prevent the rpc service regarding this request as timeout immediately. 183 */ 184 protected static final String MIN_CLIENT_REQUEST_TIMEOUT = "hbase.ipc.min.client.request.timeout"; 185 protected static final int DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT = 20; 186 187 /** Default value for above params */ 188 public static final int DEFAULT_MAX_REQUEST_SIZE = DEFAULT_MAX_CALLQUEUE_SIZE / 4; // 256M 189 protected static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds 190 protected static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024; 191 192 protected static final int DEFAULT_TRACE_LOG_MAX_LENGTH = 1000; 193 protected static final String TRACE_LOG_MAX_LENGTH = "hbase.ipc.trace.log.max.length"; 194 protected static final String KEY_WORD_TRUNCATED = " <TRUNCATED>"; 195 196 protected static final Gson GSON = GsonUtil.createGsonWithDisableHtmlEscaping().create(); 197 198 protected final int maxRequestSize; 199 protected final int warnResponseTime; 200 protected final int warnResponseSize; 201 202 protected final int minClientRequestTimeout; 203 204 protected final Server server; 205 protected final List<BlockingServiceAndInterface> services; 206 207 protected final RpcScheduler scheduler; 208 209 protected UserProvider userProvider; 210 211 protected final ByteBuffAllocator bbAllocator; 212 213 protected volatile boolean allowFallbackToSimpleAuth; 214 215 /** 216 * Used to get details for scan with a scanner_id<br/> 217 * TODO try to figure out a better way and remove reference from regionserver package later. 218 */ 219 private RSRpcServices rsRpcServices; 220 221 /** 222 * Use to add online slowlog responses 223 */ 224 private NamedQueueRecorder namedQueueRecorder; 225 226 @FunctionalInterface 227 protected interface CallCleanup { 228 void run(); 229 } 230 231 /** 232 * Datastructure for passing a {@link BlockingService} and its associated class of protobuf 233 * service interface. For example, a server that fielded what is defined in the client protobuf 234 * service would pass in an implementation of the client blocking service and then its 235 * ClientService.BlockingInterface.class. Used checking connection setup. 236 */ 237 public static class BlockingServiceAndInterface { 238 private final BlockingService service; 239 private final Class<?> serviceInterface; 240 241 public BlockingServiceAndInterface(final BlockingService service, 242 final Class<?> serviceInterface) { 243 this.service = service; 244 this.serviceInterface = serviceInterface; 245 } 246 247 public Class<?> getServiceInterface() { 248 return this.serviceInterface; 249 } 250 251 public BlockingService getBlockingService() { 252 return this.service; 253 } 254 } 255 256 /** 257 * Constructs a server listening on the named port and address. 258 * @param server hosting instance of {@link Server}. We will do authentications if an 259 * instance else pass null for no authentication check. 260 * @param name Used keying this rpc servers' metrics and for naming the Listener thread. 261 * @param services A list of services. 262 * @param bindAddress Where to listen nn * @param reservoirEnabled Enable ByteBufferPool or not. 263 */ 264 public RpcServer(final Server server, final String name, 265 final List<BlockingServiceAndInterface> services, final InetSocketAddress bindAddress, 266 Configuration conf, RpcScheduler scheduler, boolean reservoirEnabled) throws IOException { 267 this.bbAllocator = ByteBuffAllocator.create(conf, reservoirEnabled); 268 this.server = server; 269 this.services = services; 270 this.bindAddress = bindAddress; 271 this.conf = conf; 272 // See declaration above for documentation on what this size is. 273 this.maxQueueSizeInBytes = 274 this.conf.getLong("hbase.ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE); 275 276 this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME); 277 this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE, DEFAULT_WARN_RESPONSE_SIZE); 278 this.minClientRequestTimeout = 279 conf.getInt(MIN_CLIENT_REQUEST_TIMEOUT, DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT); 280 this.maxRequestSize = conf.getInt(MAX_REQUEST_SIZE, DEFAULT_MAX_REQUEST_SIZE); 281 282 this.metrics = new MetricsHBaseServer(name, new MetricsHBaseServerWrapperImpl(this)); 283 this.tcpNoDelay = conf.getBoolean("hbase.ipc.server.tcpnodelay", true); 284 this.tcpKeepAlive = conf.getBoolean("hbase.ipc.server.tcpkeepalive", true); 285 286 this.cellBlockBuilder = new CellBlockBuilder(conf); 287 288 this.authorize = conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false); 289 this.userProvider = UserProvider.instantiate(conf); 290 this.isSecurityEnabled = userProvider.isHBaseSecurityEnabled(); 291 if (isSecurityEnabled) { 292 saslProps = SaslUtil.initSaslProperties(conf.get("hbase.rpc.protection", 293 QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT))); 294 } else { 295 saslProps = Collections.emptyMap(); 296 } 297 298 this.isOnlineLogProviderEnabled = conf.getBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, 299 HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED); 300 this.scheduler = scheduler; 301 } 302 303 @Override 304 public void onConfigurationChange(Configuration newConf) { 305 initReconfigurable(newConf); 306 if (scheduler instanceof ConfigurationObserver) { 307 ((ConfigurationObserver) scheduler).onConfigurationChange(newConf); 308 } 309 if (authorize) { 310 refreshAuthManager(newConf, new HBasePolicyProvider()); 311 } 312 } 313 314 protected void initReconfigurable(Configuration confToLoad) { 315 this.allowFallbackToSimpleAuth = confToLoad.getBoolean(FALLBACK_TO_INSECURE_CLIENT_AUTH, false); 316 if (isSecurityEnabled && allowFallbackToSimpleAuth) { 317 LOG.warn("********* WARNING! *********"); 318 LOG.warn("This server is configured to allow connections from INSECURE clients"); 319 LOG.warn("(" + FALLBACK_TO_INSECURE_CLIENT_AUTH + " = true)."); 320 LOG.warn("While this option is enabled, client identities cannot be secured, and user"); 321 LOG.warn("impersonation is possible!"); 322 LOG.warn("For secure operation, please disable SIMPLE authentication as soon as possible,"); 323 LOG.warn("by setting " + FALLBACK_TO_INSECURE_CLIENT_AUTH + " = false in hbase-site.xml"); 324 LOG.warn("****************************"); 325 } 326 } 327 328 Configuration getConf() { 329 return conf; 330 } 331 332 @Override 333 public boolean isStarted() { 334 return this.started; 335 } 336 337 @Override 338 public synchronized void refreshAuthManager(Configuration conf, PolicyProvider pp) { 339 // Ignore warnings that this should be accessed in a static way instead of via an instance; 340 // it'll break if you go via static route. 341 System.setProperty("hadoop.policy.file", "hbase-policy.xml"); 342 this.authManager.refresh(conf, pp); 343 LOG.info("Refreshed hbase-policy.xml successfully"); 344 ProxyUsers.refreshSuperUserGroupsConfiguration(conf); 345 LOG.info("Refreshed super and proxy users successfully"); 346 } 347 348 protected AuthenticationTokenSecretManager createSecretManager() { 349 if (!isSecurityEnabled) return null; 350 if (server == null) return null; 351 Configuration conf = server.getConfiguration(); 352 long keyUpdateInterval = conf.getLong("hbase.auth.key.update.interval", 24 * 60 * 60 * 1000); 353 long maxAge = conf.getLong("hbase.auth.token.max.lifetime", 7 * 24 * 60 * 60 * 1000); 354 return new AuthenticationTokenSecretManager(conf, server.getZooKeeper(), 355 server.getServerName().toString(), keyUpdateInterval, maxAge); 356 } 357 358 public SecretManager<? extends TokenIdentifier> getSecretManager() { 359 return this.secretManager; 360 } 361 362 @SuppressWarnings("unchecked") 363 public void setSecretManager(SecretManager<? extends TokenIdentifier> secretManager) { 364 this.secretManager = (SecretManager<TokenIdentifier>) secretManager; 365 } 366 367 /** 368 * This is a server side method, which is invoked over RPC. On success the return response has 369 * protobuf response payload. On failure, the exception name and the stack trace are returned in 370 * the protobuf response. 371 */ 372 @Override 373 public Pair<Message, CellScanner> call(RpcCall call, MonitoredRPCHandler status) 374 throws IOException { 375 try { 376 MethodDescriptor md = call.getMethod(); 377 Message param = call.getParam(); 378 status.setRPC(md.getName(), new Object[] { param }, call.getReceiveTime()); 379 // TODO: Review after we add in encoded data blocks. 380 status.setRPCPacket(param); 381 status.resume("Servicing call"); 382 // get an instance of the method arg type 383 HBaseRpcController controller = new HBaseRpcControllerImpl(call.getCellScanner()); 384 controller.setCallTimeout(call.getTimeout()); 385 Message result = call.getService().callBlockingMethod(md, controller, param); 386 long receiveTime = call.getReceiveTime(); 387 long startTime = call.getStartTime(); 388 long endTime = EnvironmentEdgeManager.currentTime(); 389 int processingTime = (int) (endTime - startTime); 390 int qTime = (int) (startTime - receiveTime); 391 int totalTime = (int) (endTime - receiveTime); 392 if (LOG.isTraceEnabled()) { 393 LOG.trace( 394 "{}, response: {}, receiveTime: {}, queueTime: {}, processingTime: {}, totalTime: {}", 395 CurCall.get().toString(), TextFormat.shortDebugString(result), 396 CurCall.get().getReceiveTime(), qTime, processingTime, totalTime); 397 } 398 // Use the raw request call size for now. 399 long requestSize = call.getSize(); 400 long responseSize = result.getSerializedSize(); 401 if (call.isClientCellBlockSupported()) { 402 // Include the payload size in HBaseRpcController 403 responseSize += call.getResponseCellSize(); 404 } 405 406 metrics.dequeuedCall(qTime); 407 metrics.processedCall(processingTime); 408 metrics.totalCall(totalTime); 409 metrics.receivedRequest(requestSize); 410 metrics.sentResponse(responseSize); 411 // log any RPC responses that are slower than the configured warn 412 // response time or larger than configured warning size 413 boolean tooSlow = (processingTime > warnResponseTime && warnResponseTime > -1); 414 boolean tooLarge = (responseSize > warnResponseSize && warnResponseSize > -1); 415 if (tooSlow || tooLarge) { 416 final String userName = call.getRequestUserName().orElse(StringUtils.EMPTY); 417 // when tagging, we let TooLarge trump TooSmall to keep output simple 418 // note that large responses will often also be slow. 419 logResponse(param, md.getName(), md.getName() + "(" + param.getClass().getName() + ")", 420 tooLarge, tooSlow, status.getClient(), startTime, processingTime, qTime, responseSize, 421 userName); 422 if (this.namedQueueRecorder != null && this.isOnlineLogProviderEnabled) { 423 // send logs to ring buffer owned by slowLogRecorder 424 final String className = 425 server == null ? StringUtils.EMPTY : server.getClass().getSimpleName(); 426 this.namedQueueRecorder.addRecord(new RpcLogDetails(call, param, status.getClient(), 427 responseSize, className, tooSlow, tooLarge)); 428 } 429 } 430 return new Pair<>(result, controller.cellScanner()); 431 } catch (Throwable e) { 432 // The above callBlockingMethod will always return a SE. Strip the SE wrapper before 433 // putting it on the wire. Its needed to adhere to the pb Service Interface but we don't 434 // need to pass it over the wire. 435 if (e instanceof ServiceException) { 436 if (e.getCause() == null) { 437 LOG.debug("Caught a ServiceException with null cause", e); 438 } else { 439 e = e.getCause(); 440 } 441 } 442 443 // increment the number of requests that were exceptions. 444 metrics.exception(e); 445 446 if (e instanceof LinkageError) throw new DoNotRetryIOException(e); 447 if (e instanceof IOException) throw (IOException) e; 448 LOG.error("Unexpected throwable object ", e); 449 throw new IOException(e.getMessage(), e); 450 } 451 } 452 453 /** 454 * Logs an RPC response to the LOG file, producing valid JSON objects for client Operations. 455 * @param param The parameters received in the call. 456 * @param methodName The name of the method invoked 457 * @param call The string representation of the call 458 * @param tooLarge To indicate if the event is tooLarge 459 * @param tooSlow To indicate if the event is tooSlow 460 * @param clientAddress The address of the client who made this call. 461 * @param startTime The time that the call was initiated, in ms. 462 * @param processingTime The duration that the call took to run, in ms. 463 * @param qTime The duration that the call spent on the queue prior to being initiated, 464 * in ms. 465 * @param responseSize The size in bytes of the response buffer. 466 * @param userName UserName of the current RPC Call 467 */ 468 void logResponse(Message param, String methodName, String call, boolean tooLarge, boolean tooSlow, 469 String clientAddress, long startTime, int processingTime, int qTime, long responseSize, 470 String userName) { 471 final String className = server == null ? StringUtils.EMPTY : server.getClass().getSimpleName(); 472 // base information that is reported regardless of type of call 473 Map<String, Object> responseInfo = new HashMap<>(); 474 responseInfo.put("starttimems", startTime); 475 responseInfo.put("processingtimems", processingTime); 476 responseInfo.put("queuetimems", qTime); 477 responseInfo.put("responsesize", responseSize); 478 responseInfo.put("client", clientAddress); 479 responseInfo.put("class", className); 480 responseInfo.put("method", methodName); 481 responseInfo.put("call", call); 482 // The params could be really big, make sure they don't kill us at WARN 483 String stringifiedParam = ProtobufUtil.getShortTextFormat(param); 484 if (stringifiedParam.length() > 150) { 485 // Truncate to 1000 chars if TRACE is on, else to 150 chars 486 stringifiedParam = truncateTraceLog(stringifiedParam); 487 } 488 responseInfo.put("param", stringifiedParam); 489 if (param instanceof ClientProtos.ScanRequest && rsRpcServices != null) { 490 ClientProtos.ScanRequest request = ((ClientProtos.ScanRequest) param); 491 String scanDetails; 492 if (request.hasScannerId()) { 493 long scannerId = request.getScannerId(); 494 scanDetails = rsRpcServices.getScanDetailsWithId(scannerId); 495 } else { 496 scanDetails = rsRpcServices.getScanDetailsWithRequest(request); 497 } 498 if (scanDetails != null) { 499 responseInfo.put("scandetails", scanDetails); 500 } 501 } 502 if (param instanceof ClientProtos.MultiRequest) { 503 int numGets = 0; 504 int numMutations = 0; 505 int numServiceCalls = 0; 506 ClientProtos.MultiRequest multi = (ClientProtos.MultiRequest) param; 507 for (ClientProtos.RegionAction regionAction : multi.getRegionActionList()) { 508 for (ClientProtos.Action action : regionAction.getActionList()) { 509 if (action.hasMutation()) { 510 numMutations++; 511 } 512 if (action.hasGet()) { 513 numGets++; 514 } 515 if (action.hasServiceCall()) { 516 numServiceCalls++; 517 } 518 } 519 } 520 responseInfo.put(MULTI_GETS, numGets); 521 responseInfo.put(MULTI_MUTATIONS, numMutations); 522 responseInfo.put(MULTI_SERVICE_CALLS, numServiceCalls); 523 } 524 final String tag = 525 (tooLarge && tooSlow) ? "TooLarge & TooSlow" : (tooSlow ? "TooSlow" : "TooLarge"); 526 LOG.warn("(response" + tag + "): " + GSON.toJson(responseInfo)); 527 } 528 529 /** 530 * Truncate to number of chars decided by conf hbase.ipc.trace.log.max.length if TRACE is on else 531 * to 150 chars Refer to Jira HBASE-20826 and HBASE-20942 532 * @param strParam stringifiedParam to be truncated 533 * @return truncated trace log string 534 */ 535 String truncateTraceLog(String strParam) { 536 if (LOG.isTraceEnabled()) { 537 int traceLogMaxLength = getConf().getInt(TRACE_LOG_MAX_LENGTH, DEFAULT_TRACE_LOG_MAX_LENGTH); 538 int truncatedLength = 539 strParam.length() < traceLogMaxLength ? strParam.length() : traceLogMaxLength; 540 String truncatedFlag = truncatedLength == strParam.length() ? "" : KEY_WORD_TRUNCATED; 541 return strParam.subSequence(0, truncatedLength) + truncatedFlag; 542 } 543 return strParam.subSequence(0, 150) + KEY_WORD_TRUNCATED; 544 } 545 546 /** 547 * Set the handler for calling out of RPC for error conditions. 548 * @param handler the handler implementation 549 */ 550 @Override 551 public void setErrorHandler(HBaseRPCErrorHandler handler) { 552 this.errorHandler = handler; 553 } 554 555 @Override 556 public HBaseRPCErrorHandler getErrorHandler() { 557 return this.errorHandler; 558 } 559 560 /** 561 * Returns the metrics instance for reporting RPC call statistics 562 */ 563 @Override 564 public MetricsHBaseServer getMetrics() { 565 return metrics; 566 } 567 568 @Override 569 public void addCallSize(final long diff) { 570 this.callQueueSizeInBytes.add(diff); 571 } 572 573 /** 574 * Authorize the incoming client connection. 575 * @param user client user 576 * @param connection incoming connection 577 * @param addr InetAddress of incoming connection 578 * @throws AuthorizationException when the client isn't authorized to talk the protocol 579 */ 580 public synchronized void authorize(UserGroupInformation user, ConnectionHeader connection, 581 InetAddress addr) throws AuthorizationException { 582 if (authorize) { 583 Class<?> c = getServiceInterface(services, connection.getServiceName()); 584 authManager.authorize(user, c, getConf(), addr); 585 } 586 } 587 588 /** 589 * When the read or write buffer size is larger than this limit, i/o will be done in chunks of 590 * this size. Most RPC requests and responses would be be smaller. 591 */ 592 protected static final int NIO_BUFFER_LIMIT = 64 * 1024; // should not be more than 64KB. 593 594 /** 595 * This is a wrapper around 596 * {@link java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)}. If the amount of data 597 * is large, it writes to channel in smaller chunks. This is to avoid jdk from creating many 598 * direct buffers as the size of ByteBuffer increases. There should not be any performance 599 * degredation. 600 * @param channel writable byte channel to write on 601 * @param buffer buffer to write 602 * @return number of bytes written 603 * @throws java.io.IOException e 604 * @see java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer) 605 */ 606 protected int channelRead(ReadableByteChannel channel, ByteBuffer buffer) throws IOException { 607 608 int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) 609 ? channel.read(buffer) 610 : channelIO(channel, null, buffer); 611 if (count > 0) { 612 metrics.receivedBytes(count); 613 } 614 return count; 615 } 616 617 /** 618 * Helper for {@link #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)}. 619 * Only one of readCh or writeCh should be non-null. 620 * @param readCh read channel 621 * @param writeCh write channel 622 * @param buf buffer to read or write into/out of 623 * @return bytes written 624 * @throws java.io.IOException e 625 * @see #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer) 626 */ 627 private static int channelIO(ReadableByteChannel readCh, WritableByteChannel writeCh, 628 ByteBuffer buf) throws IOException { 629 630 int originalLimit = buf.limit(); 631 int initialRemaining = buf.remaining(); 632 int ret = 0; 633 634 while (buf.remaining() > 0) { 635 try { 636 int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT); 637 buf.limit(buf.position() + ioSize); 638 639 ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf); 640 641 if (ret < ioSize) { 642 break; 643 } 644 645 } finally { 646 buf.limit(originalLimit); 647 } 648 } 649 650 int nBytes = initialRemaining - buf.remaining(); 651 return (nBytes > 0) ? nBytes : ret; 652 } 653 654 /** 655 * Needed for features such as delayed calls. We need to be able to store the current call so that 656 * we can complete it later or ask questions of what is supported by the current ongoing call. 657 * @return An RpcCallContext backed by the currently ongoing call (gotten from a thread local) 658 */ 659 public static Optional<RpcCall> getCurrentCall() { 660 return Optional.ofNullable(CurCall.get()); 661 } 662 663 public static boolean isInRpcCallContext() { 664 return CurCall.get() != null; 665 } 666 667 /** 668 * Used by {@link org.apache.hadoop.hbase.procedure2.store.region.RegionProcedureStore}. For 669 * master's rpc call, it may generate new procedure and mutate the region which store procedure. 670 * There are some check about rpc when mutate region, such as rpc timeout check. So unset the rpc 671 * call to avoid the rpc check. 672 * @return the currently ongoing rpc call 673 */ 674 public static Optional<RpcCall> unsetCurrentCall() { 675 Optional<RpcCall> rpcCall = getCurrentCall(); 676 CurCall.set(null); 677 return rpcCall; 678 } 679 680 /** 681 * Used by {@link org.apache.hadoop.hbase.procedure2.store.region.RegionProcedureStore}. Set the 682 * rpc call back after mutate region. 683 */ 684 public static void setCurrentCall(RpcCall rpcCall) { 685 CurCall.set(rpcCall); 686 } 687 688 /** 689 * Returns the user credentials associated with the current RPC request or not present if no 690 * credentials were provided. 691 * @return A User 692 */ 693 public static Optional<User> getRequestUser() { 694 Optional<RpcCall> ctx = getCurrentCall(); 695 return ctx.isPresent() ? ctx.get().getRequestUser() : Optional.empty(); 696 } 697 698 /** 699 * The number of open RPC conections 700 * @return the number of open rpc connections 701 */ 702 abstract public int getNumOpenConnections(); 703 704 /** 705 * Returns the username for any user associated with the current RPC request or not present if no 706 * user is set. 707 */ 708 public static Optional<String> getRequestUserName() { 709 return getRequestUser().map(User::getShortName); 710 } 711 712 /** Returns Address of remote client if a request is ongoing, else null */ 713 public static Optional<InetAddress> getRemoteAddress() { 714 return getCurrentCall().map(RpcCall::getRemoteAddress); 715 } 716 717 /** 718 * @param serviceName Some arbitrary string that represents a 'service'. 719 * @param services Available service instances 720 * @return Matching BlockingServiceAndInterface pair 721 */ 722 protected static BlockingServiceAndInterface getServiceAndInterface( 723 final List<BlockingServiceAndInterface> services, final String serviceName) { 724 for (BlockingServiceAndInterface bs : services) { 725 if (bs.getBlockingService().getDescriptorForType().getName().equals(serviceName)) { 726 return bs; 727 } 728 } 729 return null; 730 } 731 732 /** 733 * @param serviceName Some arbitrary string that represents a 'service'. 734 * @param services Available services and their service interfaces. 735 * @return Service interface class for <code>serviceName</code> 736 */ 737 protected static Class<?> getServiceInterface(final List<BlockingServiceAndInterface> services, 738 final String serviceName) { 739 BlockingServiceAndInterface bsasi = getServiceAndInterface(services, serviceName); 740 return bsasi == null ? null : bsasi.getServiceInterface(); 741 } 742 743 /** 744 * @param serviceName Some arbitrary string that represents a 'service'. 745 * @param services Available services and their service interfaces. 746 * @return BlockingService that goes with the passed <code>serviceName</code> 747 */ 748 protected static BlockingService getService(final List<BlockingServiceAndInterface> services, 749 final String serviceName) { 750 BlockingServiceAndInterface bsasi = getServiceAndInterface(services, serviceName); 751 return bsasi == null ? null : bsasi.getBlockingService(); 752 } 753 754 protected static MonitoredRPCHandler getStatus() { 755 // It is ugly the way we park status up in RpcServer. Let it be for now. TODO. 756 MonitoredRPCHandler status = RpcServer.MONITORED_RPC.get(); 757 if (status != null) { 758 return status; 759 } 760 status = TaskMonitor.get().createRPCStatus(Thread.currentThread().getName()); 761 status.pause("Waiting for a call"); 762 RpcServer.MONITORED_RPC.set(status); 763 return status; 764 } 765 766 /** 767 * Returns the remote side ip address when invoked inside an RPC Returns null incase of an error. 768 * n 769 */ 770 public static InetAddress getRemoteIp() { 771 RpcCall call = CurCall.get(); 772 if (call != null) { 773 return call.getRemoteAddress(); 774 } 775 return null; 776 } 777 778 @Override 779 public RpcScheduler getScheduler() { 780 return scheduler; 781 } 782 783 @Override 784 public ByteBuffAllocator getByteBuffAllocator() { 785 return this.bbAllocator; 786 } 787 788 @Override 789 public void setRsRpcServices(RSRpcServices rsRpcServices) { 790 this.rsRpcServices = rsRpcServices; 791 } 792 793 @Override 794 public void setNamedQueueRecorder(NamedQueueRecorder namedQueueRecorder) { 795 this.namedQueueRecorder = namedQueueRecorder; 796 } 797 798 protected boolean needAuthorization() { 799 return authorize; 800 } 801}