001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE; 021import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException; 022 023import com.google.errorprone.annotations.RestrictedApi; 024import io.opentelemetry.api.trace.Span; 025import io.opentelemetry.api.trace.StatusCode; 026import io.opentelemetry.context.Scope; 027import java.io.IOException; 028import java.net.SocketAddress; 029import java.util.Collection; 030import java.util.Map; 031import java.util.concurrent.Executors; 032import java.util.concurrent.ScheduledExecutorService; 033import java.util.concurrent.ScheduledFuture; 034import java.util.concurrent.TimeUnit; 035import java.util.concurrent.atomic.AtomicInteger; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.ServerName; 039import org.apache.hadoop.hbase.client.MetricsConnection; 040import org.apache.hadoop.hbase.client.trace.IpcClientSpanBuilder; 041import org.apache.hadoop.hbase.codec.Codec; 042import org.apache.hadoop.hbase.codec.KeyValueCodec; 043import org.apache.hadoop.hbase.net.Address; 044import org.apache.hadoop.hbase.security.User; 045import org.apache.hadoop.hbase.security.UserProvider; 046import org.apache.hadoop.hbase.trace.TraceUtil; 047import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 048import org.apache.hadoop.hbase.util.PoolMap; 049import org.apache.hadoop.hbase.util.Threads; 050import org.apache.hadoop.io.compress.CompressionCodec; 051import org.apache.hadoop.ipc.RemoteException; 052import org.apache.yetus.audience.InterfaceAudience; 053import org.slf4j.Logger; 054import org.slf4j.LoggerFactory; 055 056import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 057import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; 058import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader; 059import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache; 060import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 061import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel; 062import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors; 063import org.apache.hbase.thirdparty.com.google.protobuf.Message; 064import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 065import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel; 066import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 067import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 068import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; 069 070import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 071 072/** 073 * Provides the basics for a RpcClient implementation like configuration and Logging. 074 * <p> 075 * Locking schema of the current IPC implementation 076 * <ul> 077 * <li>There is a lock in {@link AbstractRpcClient} to protect the fetching or creating 078 * connection.</li> 079 * <li>There is a lock in {@link Call} to make sure that we can only finish the call once.</li> 080 * <li>The same for {@link HBaseRpcController} as {@link Call}. And see the comment of 081 * {@link HBaseRpcController#notifyOnCancel(RpcCallback, HBaseRpcController.CancellationCallback)} 082 * of how to deal with cancel.</li> 083 * <li>For connection implementation, the construction of a connection should be as fast as possible 084 * because the creation is protected under a lock. Connect to remote side when needed. There is no 085 * forced locking schema for a connection implementation.</li> 086 * <li>For the locking order, the {@link Call} and {@link HBaseRpcController}'s lock should be held 087 * at last. So the callbacks in {@link Call} and {@link HBaseRpcController} should be execute 088 * outside the lock in {@link Call} and {@link HBaseRpcController} which means the implementations 089 * of the callbacks are free to hold any lock.</li> 090 * </ul> 091 * @since 2.0.0 092 */ 093@InterfaceAudience.Private 094public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcClient { 095 // Log level is being changed in tests 096 public static final Logger LOG = LoggerFactory.getLogger(AbstractRpcClient.class); 097 098 protected static final HashedWheelTimer WHEEL_TIMER = new HashedWheelTimer( 099 new ThreadFactoryBuilder().setNameFormat("RpcClient-timer-pool-%d").setDaemon(true) 100 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(), 101 10, TimeUnit.MILLISECONDS); 102 103 private static final ScheduledExecutorService IDLE_CONN_SWEEPER = 104 Executors.newScheduledThreadPool(1, 105 new ThreadFactoryBuilder().setNameFormat("Idle-Rpc-Conn-Sweeper-pool-%d").setDaemon(true) 106 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 107 108 private boolean running = true; // if client runs 109 110 protected final Configuration conf; 111 protected final Map<String, byte[]> connectionAttributes; 112 protected final String clusterId; 113 protected final SocketAddress localAddr; 114 protected final MetricsConnection metrics; 115 116 protected final UserProvider userProvider; 117 protected final CellBlockBuilder cellBlockBuilder; 118 119 protected final int minIdleTimeBeforeClose; // if the connection is idle for more than this 120 // time (in ms), it will be closed at any moment. 121 protected final int maxRetries; // the max. no. of retries for socket connections 122 protected final long failureSleep; // Time to sleep before retry on failure. 123 protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm 124 protected final boolean tcpKeepAlive; // if T then use keepalives 125 protected final Codec codec; 126 protected final CompressionCodec compressor; 127 protected final boolean fallbackAllowed; 128 129 protected final FailedServers failedServers; 130 131 protected final int connectTO; 132 protected final int readTO; 133 protected final int writeTO; 134 135 private final PoolMap<ConnectionId, T> connections; 136 137 private final AtomicInteger callIdCnt = new AtomicInteger(0); 138 139 private final ScheduledFuture<?> cleanupIdleConnectionTask; 140 141 private int maxConcurrentCallsPerServer; 142 143 private static final LoadingCache<Address, AtomicInteger> concurrentCounterCache = 144 CacheBuilder.newBuilder().expireAfterAccess(1, TimeUnit.HOURS) 145 .build(new CacheLoader<Address, AtomicInteger>() { 146 @Override 147 public AtomicInteger load(Address key) throws Exception { 148 return new AtomicInteger(0); 149 } 150 }); 151 152 /** 153 * Construct an IPC client for the cluster <code>clusterId</code> 154 * @param conf configuration 155 * @param clusterId the cluster id 156 * @param localAddr client socket bind address. 157 * @param metrics the connection metrics 158 */ 159 public AbstractRpcClient(Configuration conf, String clusterId, SocketAddress localAddr, 160 MetricsConnection metrics, Map<String, byte[]> connectionAttributes) { 161 this.userProvider = UserProvider.instantiate(conf); 162 this.localAddr = localAddr; 163 this.tcpKeepAlive = conf.getBoolean("hbase.ipc.client.tcpkeepalive", true); 164 this.clusterId = clusterId != null ? clusterId : HConstants.CLUSTER_ID_DEFAULT; 165 this.failureSleep = 166 conf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE); 167 this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0); 168 this.tcpNoDelay = conf.getBoolean("hbase.ipc.client.tcpnodelay", true); 169 this.cellBlockBuilder = new CellBlockBuilder(conf); 170 171 this.minIdleTimeBeforeClose = conf.getInt(IDLE_TIME, 120000); // 2 minutes 172 this.conf = conf; 173 this.connectionAttributes = connectionAttributes; 174 this.codec = getCodec(); 175 this.compressor = getCompressor(conf); 176 this.fallbackAllowed = conf.getBoolean(IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY, 177 IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT); 178 this.failedServers = new FailedServers(conf); 179 this.connectTO = conf.getInt(SOCKET_TIMEOUT_CONNECT, DEFAULT_SOCKET_TIMEOUT_CONNECT); 180 this.readTO = conf.getInt(SOCKET_TIMEOUT_READ, DEFAULT_SOCKET_TIMEOUT_READ); 181 this.writeTO = conf.getInt(SOCKET_TIMEOUT_WRITE, DEFAULT_SOCKET_TIMEOUT_WRITE); 182 this.metrics = metrics; 183 this.maxConcurrentCallsPerServer = 184 conf.getInt(HConstants.HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD, 185 HConstants.DEFAULT_HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD); 186 187 this.connections = new PoolMap<>(getPoolType(conf), getPoolSize(conf)); 188 189 this.cleanupIdleConnectionTask = IDLE_CONN_SWEEPER.scheduleAtFixedRate(new Runnable() { 190 191 @Override 192 public void run() { 193 cleanupIdleConnections(); 194 } 195 }, minIdleTimeBeforeClose, minIdleTimeBeforeClose, TimeUnit.MILLISECONDS); 196 197 if (LOG.isDebugEnabled()) { 198 LOG.debug("Codec=" + this.codec + ", compressor=" + this.compressor + ", tcpKeepAlive=" 199 + this.tcpKeepAlive + ", tcpNoDelay=" + this.tcpNoDelay + ", connectTO=" + this.connectTO 200 + ", readTO=" + this.readTO + ", writeTO=" + this.writeTO + ", minIdleTimeBeforeClose=" 201 + this.minIdleTimeBeforeClose + ", maxRetries=" + this.maxRetries + ", fallbackAllowed=" 202 + this.fallbackAllowed + ", bind address=" 203 + (this.localAddr != null ? this.localAddr : "null")); 204 } 205 } 206 207 private void cleanupIdleConnections() { 208 long closeBeforeTime = EnvironmentEdgeManager.currentTime() - minIdleTimeBeforeClose; 209 synchronized (connections) { 210 for (T conn : connections.values()) { 211 // Remove connection if it has not been chosen by anyone for more than maxIdleTime, and the 212 // connection itself has already shutdown. The latter check is because we may still 213 // have some pending calls on connection, so we should not shut down the connection outside. 214 // The connection itself will disconnect if there is no pending call for maxIdleTime. 215 if (conn.getLastTouched() < closeBeforeTime && !conn.isActive()) { 216 if (LOG.isTraceEnabled()) { 217 LOG.trace("Cleanup idle connection to {}", conn.remoteId().getAddress()); 218 } 219 connections.remove(conn.remoteId(), conn); 220 conn.cleanupConnection(); 221 } 222 } 223 } 224 } 225 226 public static String getDefaultCodec(final Configuration c) { 227 // If "hbase.client.default.rpc.codec" is empty string -- you can't set it to null because 228 // Configuration will complain -- then no default codec (and we'll pb everything). Else 229 // default is KeyValueCodec 230 return c.get(DEFAULT_CODEC_CLASS, KeyValueCodec.class.getCanonicalName()); 231 } 232 233 /** 234 * Encapsulate the ugly casting and RuntimeException conversion in private method. 235 * @return Codec to use on this client. 236 */ 237 protected Codec getCodec() { 238 // For NO CODEC, "hbase.client.rpc.codec" must be configured with empty string AND 239 // "hbase.client.default.rpc.codec" also -- because default is to do cell block encoding. 240 String className = conf.get(HConstants.RPC_CODEC_CONF_KEY, getDefaultCodec(this.conf)); 241 if (className == null || className.length() == 0) { 242 return null; 243 } 244 try { 245 return Class.forName(className).asSubclass(Codec.class).getDeclaredConstructor() 246 .newInstance(); 247 } catch (Exception e) { 248 throw new RuntimeException("Failed getting codec " + className, e); 249 } 250 } 251 252 @Override 253 public boolean hasCellBlockSupport() { 254 return this.codec != null; 255 } 256 257 // for writing tests that want to throw exception when connecting. 258 protected boolean isTcpNoDelay() { 259 return tcpNoDelay; 260 } 261 262 /** 263 * Encapsulate the ugly casting and RuntimeException conversion in private method. 264 * @param conf configuration 265 * @return The compressor to use on this client. 266 */ 267 private static CompressionCodec getCompressor(final Configuration conf) { 268 String className = conf.get("hbase.client.rpc.compressor", null); 269 if (className == null || className.isEmpty()) { 270 return null; 271 } 272 try { 273 return Class.forName(className).asSubclass(CompressionCodec.class).getDeclaredConstructor() 274 .newInstance(); 275 } catch (Exception e) { 276 throw new RuntimeException("Failed getting compressor " + className, e); 277 } 278 } 279 280 /** 281 * Return the pool type specified in the configuration, which must be set to either 282 * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or 283 * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal}, otherwise default to the 284 * former. For applications with many user threads, use a small round-robin pool. For applications 285 * with few user threads, you may want to try using a thread-local pool. In any case, the number 286 * of {@link org.apache.hadoop.hbase.ipc.RpcClient} instances should not exceed the operating 287 * system's hard limit on the number of connections. 288 * @param config configuration 289 * @return either a {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or 290 * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal} 291 */ 292 private static PoolMap.PoolType getPoolType(Configuration config) { 293 return PoolMap.PoolType.valueOf(config.get(HConstants.HBASE_CLIENT_IPC_POOL_TYPE), 294 PoolMap.PoolType.RoundRobin); 295 } 296 297 /** 298 * Return the pool size specified in the configuration, which is applicable only if the pool type 299 * is {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin}. 300 * @param config configuration 301 * @return the maximum pool size 302 */ 303 private static int getPoolSize(Configuration config) { 304 int poolSize = config.getInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, 1); 305 306 if (poolSize <= 0) { 307 LOG.warn("{} must be positive. Using default value: 1", 308 HConstants.HBASE_CLIENT_IPC_POOL_SIZE); 309 return 1; 310 } else { 311 return poolSize; 312 } 313 } 314 315 private int nextCallId() { 316 int id, next; 317 do { 318 id = callIdCnt.get(); 319 next = id < Integer.MAX_VALUE ? id + 1 : 0; 320 } while (!callIdCnt.compareAndSet(id, next)); 321 return id; 322 } 323 324 /** 325 * Make a blocking call. Throws exceptions if there are network problems or if the remote code 326 * threw an exception. 327 * @param ticket Be careful which ticket you pass. A new user will mean a new Connection. 328 * {@link UserProvider#getCurrent()} makes a new instance of User each time so will 329 * be a new Connection each time. 330 * @return A pair with the Message response and the Cell data (if any). 331 */ 332 private Message callBlockingMethod(Descriptors.MethodDescriptor md, HBaseRpcController hrc, 333 Message param, Message returnType, final User ticket, final Address isa) 334 throws ServiceException { 335 BlockingRpcCallback<Message> done = new BlockingRpcCallback<>(); 336 callMethod(md, hrc, param, returnType, ticket, isa, done); 337 Message val; 338 try { 339 val = done.get(); 340 } catch (IOException e) { 341 throw new ServiceException(e); 342 } 343 if (hrc.failed()) { 344 throw new ServiceException(hrc.getFailed()); 345 } else { 346 return val; 347 } 348 } 349 350 /** 351 * Get a connection from the pool, or create a new one and add it to the pool. Connections to a 352 * given host/port are reused. 353 */ 354 private T getConnection(ConnectionId remoteId) throws IOException { 355 if (failedServers.isFailedServer(remoteId.getAddress())) { 356 if (LOG.isDebugEnabled()) { 357 LOG.debug("Not trying to connect to " + remoteId.getAddress() 358 + " this server is in the failed servers list"); 359 } 360 throw new FailedServerException( 361 "This server is in the failed servers list: " + remoteId.getAddress()); 362 } 363 T conn; 364 synchronized (connections) { 365 if (!running) { 366 throw new StoppedRpcClientException(); 367 } 368 conn = connections.getOrCreate(remoteId, () -> createConnection(remoteId)); 369 conn.setLastTouched(EnvironmentEdgeManager.currentTime()); 370 } 371 return conn; 372 } 373 374 /** 375 * Not connected. 376 */ 377 protected abstract T createConnection(ConnectionId remoteId) throws IOException; 378 379 private void onCallFinished(Call call, HBaseRpcController hrc, Address addr, 380 RpcCallback<Message> callback) { 381 call.callStats.setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.getStartTime()); 382 if (metrics != null) { 383 metrics.updateRpc(call.md, hrc.getTableName(), call.param, call.callStats, call.error); 384 } 385 if (LOG.isTraceEnabled()) { 386 LOG.trace("CallId: {}, call: {}, startTime: {}ms, callTime: {}ms, status: {}", call.id, 387 call.md.getName(), call.getStartTime(), call.callStats.getCallTimeMs(), 388 call.error != null ? "failed" : "successful"); 389 } 390 if (call.error != null) { 391 if (call.error instanceof RemoteException) { 392 call.error.fillInStackTrace(); 393 hrc.setFailed(call.error); 394 } else { 395 hrc.setFailed(wrapException(addr, hrc.getRegionInfo(), call.error)); 396 } 397 callback.run(null); 398 } else { 399 hrc.setDone(call.cells); 400 callback.run(call.response); 401 } 402 } 403 404 private Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc, 405 final Message param, Message returnType, final User ticket, final Address addr, 406 final RpcCallback<Message> callback) { 407 Span span = new IpcClientSpanBuilder().setMethodDescriptor(md).setRemoteAddress(addr).build(); 408 try (Scope scope = span.makeCurrent()) { 409 final MetricsConnection.CallStats cs = MetricsConnection.newCallStats(); 410 cs.setStartTime(EnvironmentEdgeManager.currentTime()); 411 412 if (param instanceof ClientProtos.MultiRequest) { 413 ClientProtos.MultiRequest req = (ClientProtos.MultiRequest) param; 414 int numActions = 0; 415 for (ClientProtos.RegionAction regionAction : req.getRegionActionList()) { 416 numActions += regionAction.getActionCount(); 417 } 418 419 cs.setNumActionsPerServer(numActions); 420 } 421 422 final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr); 423 Call call = 424 new Call(nextCallId(), md, param, hrc.cellScanner(), returnType, hrc.getCallTimeout(), 425 hrc.getPriority(), hrc.getRequestAttributes(), new RpcCallback<Call>() { 426 @Override 427 public void run(Call call) { 428 try (Scope scope = call.span.makeCurrent()) { 429 counter.decrementAndGet(); 430 onCallFinished(call, hrc, addr, callback); 431 } finally { 432 if (hrc.failed()) { 433 TraceUtil.setError(span, hrc.getFailed()); 434 } else { 435 span.setStatus(StatusCode.OK); 436 } 437 span.end(); 438 } 439 } 440 }, cs); 441 ConnectionId remoteId = new ConnectionId(ticket, md.getService().getName(), addr); 442 int count = counter.incrementAndGet(); 443 try { 444 if (count > maxConcurrentCallsPerServer) { 445 throw new ServerTooBusyException(addr, count); 446 } 447 cs.setConcurrentCallsPerServer(count); 448 T connection = getConnection(remoteId); 449 connection.sendRequest(call, hrc); 450 } catch (Exception e) { 451 call.setException(toIOE(e)); 452 span.end(); 453 } 454 return call; 455 } 456 } 457 458 static Address createAddr(ServerName sn) { 459 return Address.fromParts(sn.getHostname(), sn.getPort()); 460 } 461 462 /** 463 * Interrupt the connections to the given ip:port server. This should be called if the server is 464 * known as actually dead. This will not prevent current operation to be retried, and, depending 465 * on their own behavior, they may retry on the same server. This can be a feature, for example at 466 * startup. In any case, they're likely to get connection refused (if the process died) or no 467 * route to host: i.e. their next retries should be faster and with a safe exception. 468 */ 469 @Override 470 public void cancelConnections(ServerName sn) { 471 synchronized (connections) { 472 for (T connection : connections.values()) { 473 ConnectionId remoteId = connection.remoteId(); 474 if ( 475 remoteId.getAddress().getPort() == sn.getPort() 476 && remoteId.getAddress().getHostName().equals(sn.getHostname()) 477 ) { 478 LOG.info("The server on " + sn.toString() + " is dead - stopping the connection " 479 + connection.remoteId); 480 connections.remove(remoteId, connection); 481 connection.shutdown(); 482 connection.cleanupConnection(); 483 } 484 } 485 } 486 } 487 488 /** 489 * Configure an hbase rpccontroller 490 * @param controller to configure 491 * @param channelOperationTimeout timeout for operation 492 * @return configured controller 493 */ 494 static HBaseRpcController configureHBaseRpcController(RpcController controller, 495 int channelOperationTimeout) { 496 HBaseRpcController hrc; 497 if (controller != null && controller instanceof HBaseRpcController) { 498 hrc = (HBaseRpcController) controller; 499 if (!hrc.hasCallTimeout()) { 500 hrc.setCallTimeout(channelOperationTimeout); 501 } 502 } else { 503 hrc = new HBaseRpcControllerImpl(); 504 hrc.setCallTimeout(channelOperationTimeout); 505 } 506 return hrc; 507 } 508 509 protected abstract void closeInternal(); 510 511 @Override 512 public void close() { 513 if (LOG.isDebugEnabled()) { 514 LOG.debug("Stopping rpc client"); 515 } 516 Collection<T> connToClose; 517 synchronized (connections) { 518 if (!running) { 519 return; 520 } 521 running = false; 522 connToClose = connections.values(); 523 connections.clear(); 524 } 525 cleanupIdleConnectionTask.cancel(true); 526 for (T conn : connToClose) { 527 conn.shutdown(); 528 } 529 closeInternal(); 530 for (T conn : connToClose) { 531 conn.cleanupConnection(); 532 } 533 } 534 535 @Override 536 public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn, final User ticket, 537 int rpcTimeout) { 538 return new BlockingRpcChannelImplementation(this, createAddr(sn), ticket, rpcTimeout); 539 } 540 541 @Override 542 public RpcChannel createRpcChannel(ServerName sn, User user, int rpcTimeout) { 543 return new RpcChannelImplementation(this, createAddr(sn), user, rpcTimeout); 544 } 545 546 @RestrictedApi(explanation = "Should only be called in tests", link = "", 547 allowedOnPath = ".*/src/test/.*") 548 PoolMap<ConnectionId, T> getConnections() { 549 return connections; 550 } 551 552 private static class AbstractRpcChannel { 553 554 protected final Address addr; 555 556 protected final AbstractRpcClient<?> rpcClient; 557 558 protected final User ticket; 559 560 protected final int rpcTimeout; 561 562 protected AbstractRpcChannel(AbstractRpcClient<?> rpcClient, Address addr, User ticket, 563 int rpcTimeout) { 564 this.addr = addr; 565 this.rpcClient = rpcClient; 566 this.ticket = ticket; 567 this.rpcTimeout = rpcTimeout; 568 } 569 570 /** 571 * Configure an rpc controller 572 * @param controller to configure 573 * @return configured rpc controller 574 */ 575 protected HBaseRpcController configureRpcController(RpcController controller) { 576 HBaseRpcController hrc; 577 // TODO: Ideally we should not use an RpcController other than HBaseRpcController at client 578 // side. And now we may use ServerRpcController. 579 if (controller != null && controller instanceof HBaseRpcController) { 580 hrc = (HBaseRpcController) controller; 581 if (!hrc.hasCallTimeout()) { 582 hrc.setCallTimeout(rpcTimeout); 583 } 584 } else { 585 hrc = new HBaseRpcControllerImpl(); 586 hrc.setCallTimeout(rpcTimeout); 587 } 588 return hrc; 589 } 590 } 591 592 /** 593 * Blocking rpc channel that goes via hbase rpc. 594 */ 595 public static class BlockingRpcChannelImplementation extends AbstractRpcChannel 596 implements BlockingRpcChannel { 597 598 protected BlockingRpcChannelImplementation(AbstractRpcClient<?> rpcClient, Address addr, 599 User ticket, int rpcTimeout) { 600 super(rpcClient, addr, ticket, rpcTimeout); 601 } 602 603 @Override 604 public Message callBlockingMethod(Descriptors.MethodDescriptor md, RpcController controller, 605 Message param, Message returnType) throws ServiceException { 606 return rpcClient.callBlockingMethod(md, configureRpcController(controller), param, returnType, 607 ticket, addr); 608 } 609 } 610 611 /** 612 * Async rpc channel that goes via hbase rpc. 613 */ 614 public static class RpcChannelImplementation extends AbstractRpcChannel implements RpcChannel { 615 616 protected RpcChannelImplementation(AbstractRpcClient<?> rpcClient, Address addr, User ticket, 617 int rpcTimeout) { 618 super(rpcClient, addr, ticket, rpcTimeout); 619 } 620 621 @Override 622 public void callMethod(Descriptors.MethodDescriptor md, RpcController controller, Message param, 623 Message returnType, RpcCallback<Message> done) { 624 HBaseRpcController configuredController = configureRpcController( 625 Preconditions.checkNotNull(controller, "RpcController can not be null for async rpc call")); 626 // This method does not throw any exceptions, so the caller must provide a 627 // HBaseRpcController which is used to pass the exceptions. 628 this.rpcClient.callMethod(md, configuredController, param, returnType, ticket, addr, done); 629 } 630 } 631}