001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE; 021import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException; 022 023import io.opentelemetry.api.trace.Span; 024import io.opentelemetry.api.trace.StatusCode; 025import io.opentelemetry.context.Scope; 026import java.io.IOException; 027import java.net.SocketAddress; 028import java.util.Collection; 029import java.util.concurrent.Executors; 030import java.util.concurrent.ScheduledExecutorService; 031import java.util.concurrent.ScheduledFuture; 032import java.util.concurrent.TimeUnit; 033import java.util.concurrent.atomic.AtomicInteger; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.ServerName; 037import org.apache.hadoop.hbase.client.MetricsConnection; 038import org.apache.hadoop.hbase.client.trace.IpcClientSpanBuilder; 039import org.apache.hadoop.hbase.codec.Codec; 040import org.apache.hadoop.hbase.codec.KeyValueCodec; 041import org.apache.hadoop.hbase.net.Address; 042import org.apache.hadoop.hbase.security.User; 043import org.apache.hadoop.hbase.security.UserProvider; 044import org.apache.hadoop.hbase.trace.TraceUtil; 045import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 046import org.apache.hadoop.hbase.util.PoolMap; 047import org.apache.hadoop.hbase.util.Threads; 048import org.apache.hadoop.io.compress.CompressionCodec; 049import org.apache.hadoop.ipc.RemoteException; 050import org.apache.yetus.audience.InterfaceAudience; 051import org.slf4j.Logger; 052import org.slf4j.LoggerFactory; 053 054import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 055import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; 056import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader; 057import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache; 058import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 059import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel; 060import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors; 061import org.apache.hbase.thirdparty.com.google.protobuf.Message; 062import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 063import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel; 064import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 065import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 066import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; 067 068import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 069 070/** 071 * Provides the basics for a RpcClient implementation like configuration and Logging. 072 * <p> 073 * Locking schema of the current IPC implementation 074 * <ul> 075 * <li>There is a lock in {@link AbstractRpcClient} to protect the fetching or creating 076 * connection.</li> 077 * <li>There is a lock in {@link Call} to make sure that we can only finish the call once.</li> 078 * <li>The same for {@link HBaseRpcController} as {@link Call}. And see the comment of 079 * {@link HBaseRpcController#notifyOnCancel(RpcCallback, HBaseRpcController.CancellationCallback)} 080 * of how to deal with cancel.</li> 081 * <li>For connection implementation, the construction of a connection should be as fast as possible 082 * because the creation is protected under a lock. Connect to remote side when needed. There is no 083 * forced locking schema for a connection implementation.</li> 084 * <li>For the locking order, the {@link Call} and {@link HBaseRpcController}'s lock should be held 085 * at last. So the callbacks in {@link Call} and {@link HBaseRpcController} should be execute 086 * outside the lock in {@link Call} and {@link HBaseRpcController} which means the implementations 087 * of the callbacks are free to hold any lock.</li> 088 * </ul> 089 * @since 2.0.0 090 */ 091@InterfaceAudience.Private 092public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcClient { 093 // Log level is being changed in tests 094 public static final Logger LOG = LoggerFactory.getLogger(AbstractRpcClient.class); 095 096 protected static final HashedWheelTimer WHEEL_TIMER = new HashedWheelTimer( 097 new ThreadFactoryBuilder().setNameFormat("RpcClient-timer-pool-%d").setDaemon(true) 098 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(), 099 10, TimeUnit.MILLISECONDS); 100 101 private static final ScheduledExecutorService IDLE_CONN_SWEEPER = 102 Executors.newScheduledThreadPool(1, 103 new ThreadFactoryBuilder().setNameFormat("Idle-Rpc-Conn-Sweeper-pool-%d").setDaemon(true) 104 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 105 106 private boolean running = true; // if client runs 107 108 protected final Configuration conf; 109 protected final String clusterId; 110 protected final SocketAddress localAddr; 111 protected final MetricsConnection metrics; 112 113 protected final UserProvider userProvider; 114 protected final CellBlockBuilder cellBlockBuilder; 115 116 protected final int minIdleTimeBeforeClose; // if the connection is idle for more than this 117 // time (in ms), it will be closed at any moment. 118 protected final int maxRetries; // the max. no. of retries for socket connections 119 protected final long failureSleep; // Time to sleep before retry on failure. 120 protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm 121 protected final boolean tcpKeepAlive; // if T then use keepalives 122 protected final Codec codec; 123 protected final CompressionCodec compressor; 124 protected final boolean fallbackAllowed; 125 126 protected final FailedServers failedServers; 127 128 protected final int connectTO; 129 protected final int readTO; 130 protected final int writeTO; 131 132 private final PoolMap<ConnectionId, T> connections; 133 134 private final AtomicInteger callIdCnt = new AtomicInteger(0); 135 136 private final ScheduledFuture<?> cleanupIdleConnectionTask; 137 138 private int maxConcurrentCallsPerServer; 139 140 private static final LoadingCache<Address, AtomicInteger> concurrentCounterCache = 141 CacheBuilder.newBuilder().expireAfterAccess(1, TimeUnit.HOURS) 142 .build(new CacheLoader<Address, AtomicInteger>() { 143 @Override 144 public AtomicInteger load(Address key) throws Exception { 145 return new AtomicInteger(0); 146 } 147 }); 148 149 /** 150 * Construct an IPC client for the cluster <code>clusterId</code> 151 * @param conf configuration 152 * @param clusterId the cluster id 153 * @param localAddr client socket bind address. 154 * @param metrics the connection metrics 155 */ 156 public AbstractRpcClient(Configuration conf, String clusterId, SocketAddress localAddr, 157 MetricsConnection metrics) { 158 this.userProvider = UserProvider.instantiate(conf); 159 this.localAddr = localAddr; 160 this.tcpKeepAlive = conf.getBoolean("hbase.ipc.client.tcpkeepalive", true); 161 this.clusterId = clusterId != null ? clusterId : HConstants.CLUSTER_ID_DEFAULT; 162 this.failureSleep = 163 conf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE); 164 this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0); 165 this.tcpNoDelay = conf.getBoolean("hbase.ipc.client.tcpnodelay", true); 166 this.cellBlockBuilder = new CellBlockBuilder(conf); 167 168 this.minIdleTimeBeforeClose = conf.getInt(IDLE_TIME, 120000); // 2 minutes 169 this.conf = conf; 170 this.codec = getCodec(); 171 this.compressor = getCompressor(conf); 172 this.fallbackAllowed = conf.getBoolean(IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY, 173 IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT); 174 this.failedServers = new FailedServers(conf); 175 this.connectTO = conf.getInt(SOCKET_TIMEOUT_CONNECT, DEFAULT_SOCKET_TIMEOUT_CONNECT); 176 this.readTO = conf.getInt(SOCKET_TIMEOUT_READ, DEFAULT_SOCKET_TIMEOUT_READ); 177 this.writeTO = conf.getInt(SOCKET_TIMEOUT_WRITE, DEFAULT_SOCKET_TIMEOUT_WRITE); 178 this.metrics = metrics; 179 this.maxConcurrentCallsPerServer = 180 conf.getInt(HConstants.HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD, 181 HConstants.DEFAULT_HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD); 182 183 this.connections = new PoolMap<>(getPoolType(conf), getPoolSize(conf)); 184 185 this.cleanupIdleConnectionTask = IDLE_CONN_SWEEPER.scheduleAtFixedRate(new Runnable() { 186 187 @Override 188 public void run() { 189 cleanupIdleConnections(); 190 } 191 }, minIdleTimeBeforeClose, minIdleTimeBeforeClose, TimeUnit.MILLISECONDS); 192 193 if (LOG.isDebugEnabled()) { 194 LOG.debug("Codec=" + this.codec + ", compressor=" + this.compressor + ", tcpKeepAlive=" 195 + this.tcpKeepAlive + ", tcpNoDelay=" + this.tcpNoDelay + ", connectTO=" + this.connectTO 196 + ", readTO=" + this.readTO + ", writeTO=" + this.writeTO + ", minIdleTimeBeforeClose=" 197 + this.minIdleTimeBeforeClose + ", maxRetries=" + this.maxRetries + ", fallbackAllowed=" 198 + this.fallbackAllowed + ", bind address=" 199 + (this.localAddr != null ? this.localAddr : "null")); 200 } 201 } 202 203 private void cleanupIdleConnections() { 204 long closeBeforeTime = EnvironmentEdgeManager.currentTime() - minIdleTimeBeforeClose; 205 synchronized (connections) { 206 for (T conn : connections.values()) { 207 // Remove connection if it has not been chosen by anyone for more than maxIdleTime, and the 208 // connection itself has already shutdown. The latter check is because we may still 209 // have some pending calls on connection so we should not shutdown the connection outside. 210 // The connection itself will disconnect if there is no pending call for maxIdleTime. 211 if (conn.getLastTouched() < closeBeforeTime && !conn.isActive()) { 212 if (LOG.isTraceEnabled()) { 213 LOG.trace("Cleanup idle connection to {}", conn.remoteId().getAddress()); 214 } 215 connections.remove(conn.remoteId(), conn); 216 conn.cleanupConnection(); 217 } 218 } 219 } 220 } 221 222 public static String getDefaultCodec(final Configuration c) { 223 // If "hbase.client.default.rpc.codec" is empty string -- you can't set it to null because 224 // Configuration will complain -- then no default codec (and we'll pb everything). Else 225 // default is KeyValueCodec 226 return c.get(DEFAULT_CODEC_CLASS, KeyValueCodec.class.getCanonicalName()); 227 } 228 229 /** 230 * Encapsulate the ugly casting and RuntimeException conversion in private method. 231 * @return Codec to use on this client. 232 */ 233 Codec getCodec() { 234 // For NO CODEC, "hbase.client.rpc.codec" must be configured with empty string AND 235 // "hbase.client.default.rpc.codec" also -- because default is to do cell block encoding. 236 String className = conf.get(HConstants.RPC_CODEC_CONF_KEY, getDefaultCodec(this.conf)); 237 if (className == null || className.length() == 0) { 238 return null; 239 } 240 try { 241 return Class.forName(className).asSubclass(Codec.class).getDeclaredConstructor() 242 .newInstance(); 243 } catch (Exception e) { 244 throw new RuntimeException("Failed getting codec " + className, e); 245 } 246 } 247 248 @Override 249 public boolean hasCellBlockSupport() { 250 return this.codec != null; 251 } 252 253 // for writing tests that want to throw exception when connecting. 254 boolean isTcpNoDelay() { 255 return tcpNoDelay; 256 } 257 258 /** 259 * Encapsulate the ugly casting and RuntimeException conversion in private method. 260 * @param conf configuration 261 * @return The compressor to use on this client. 262 */ 263 private static CompressionCodec getCompressor(final Configuration conf) { 264 String className = conf.get("hbase.client.rpc.compressor", null); 265 if (className == null || className.isEmpty()) { 266 return null; 267 } 268 try { 269 return Class.forName(className).asSubclass(CompressionCodec.class).getDeclaredConstructor() 270 .newInstance(); 271 } catch (Exception e) { 272 throw new RuntimeException("Failed getting compressor " + className, e); 273 } 274 } 275 276 /** 277 * Return the pool type specified in the configuration, which must be set to either 278 * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or 279 * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal}, otherwise default to the 280 * former. For applications with many user threads, use a small round-robin pool. For applications 281 * with few user threads, you may want to try using a thread-local pool. In any case, the number 282 * of {@link org.apache.hadoop.hbase.ipc.RpcClient} instances should not exceed the operating 283 * system's hard limit on the number of connections. 284 * @param config configuration 285 * @return either a {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or 286 * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal} 287 */ 288 private static PoolMap.PoolType getPoolType(Configuration config) { 289 return PoolMap.PoolType.valueOf(config.get(HConstants.HBASE_CLIENT_IPC_POOL_TYPE), 290 PoolMap.PoolType.RoundRobin); 291 } 292 293 /** 294 * Return the pool size specified in the configuration, which is applicable only if the pool type 295 * is {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin}. 296 * @param config configuration 297 * @return the maximum pool size 298 */ 299 private static int getPoolSize(Configuration config) { 300 int poolSize = config.getInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, 1); 301 302 if (poolSize <= 0) { 303 LOG.warn("{} must be positive. Using default value: 1", 304 HConstants.HBASE_CLIENT_IPC_POOL_SIZE); 305 return 1; 306 } else { 307 return poolSize; 308 } 309 } 310 311 private int nextCallId() { 312 int id, next; 313 do { 314 id = callIdCnt.get(); 315 next = id < Integer.MAX_VALUE ? id + 1 : 0; 316 } while (!callIdCnt.compareAndSet(id, next)); 317 return id; 318 } 319 320 /** 321 * Make a blocking call. Throws exceptions if there are network problems or if the remote code 322 * threw an exception. 323 * @param ticket Be careful which ticket you pass. A new user will mean a new Connection. 324 * {@link UserProvider#getCurrent()} makes a new instance of User each time so will 325 * be a new Connection each time. 326 * @return A pair with the Message response and the Cell data (if any). 327 */ 328 private Message callBlockingMethod(Descriptors.MethodDescriptor md, HBaseRpcController hrc, 329 Message param, Message returnType, final User ticket, final Address isa) 330 throws ServiceException { 331 BlockingRpcCallback<Message> done = new BlockingRpcCallback<>(); 332 callMethod(md, hrc, param, returnType, ticket, isa, done); 333 Message val; 334 try { 335 val = done.get(); 336 } catch (IOException e) { 337 throw new ServiceException(e); 338 } 339 if (hrc.failed()) { 340 throw new ServiceException(hrc.getFailed()); 341 } else { 342 return val; 343 } 344 } 345 346 /** 347 * Get a connection from the pool, or create a new one and add it to the pool. Connections to a 348 * given host/port are reused. 349 */ 350 private T getConnection(ConnectionId remoteId) throws IOException { 351 if (failedServers.isFailedServer(remoteId.getAddress())) { 352 if (LOG.isDebugEnabled()) { 353 LOG.debug("Not trying to connect to " + remoteId.getAddress() 354 + " this server is in the failed servers list"); 355 } 356 throw new FailedServerException( 357 "This server is in the failed servers list: " + remoteId.getAddress()); 358 } 359 T conn; 360 synchronized (connections) { 361 if (!running) { 362 throw new StoppedRpcClientException(); 363 } 364 conn = connections.getOrCreate(remoteId, () -> createConnection(remoteId)); 365 conn.setLastTouched(EnvironmentEdgeManager.currentTime()); 366 } 367 return conn; 368 } 369 370 /** 371 * Not connected. 372 */ 373 protected abstract T createConnection(ConnectionId remoteId) throws IOException; 374 375 private void onCallFinished(Call call, HBaseRpcController hrc, Address addr, 376 RpcCallback<Message> callback) { 377 call.callStats.setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.getStartTime()); 378 if (metrics != null) { 379 metrics.updateRpc(call.md, call.param, call.callStats); 380 } 381 if (LOG.isTraceEnabled()) { 382 LOG.trace("CallId: {}, call: {}, startTime: {}ms, callTime: {}ms", call.id, call.md.getName(), 383 call.getStartTime(), call.callStats.getCallTimeMs()); 384 } 385 if (call.error != null) { 386 if (call.error instanceof RemoteException) { 387 call.error.fillInStackTrace(); 388 hrc.setFailed(call.error); 389 } else { 390 hrc.setFailed(wrapException(addr, hrc.getRegionInfo(), call.error)); 391 } 392 callback.run(null); 393 } else { 394 hrc.setDone(call.cells); 395 callback.run(call.response); 396 } 397 } 398 399 private Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc, 400 final Message param, Message returnType, final User ticket, final Address addr, 401 final RpcCallback<Message> callback) { 402 Span span = new IpcClientSpanBuilder().setMethodDescriptor(md).setRemoteAddress(addr).build(); 403 try (Scope scope = span.makeCurrent()) { 404 final MetricsConnection.CallStats cs = MetricsConnection.newCallStats(); 405 cs.setStartTime(EnvironmentEdgeManager.currentTime()); 406 407 if (param instanceof ClientProtos.MultiRequest) { 408 ClientProtos.MultiRequest req = (ClientProtos.MultiRequest) param; 409 int numActions = 0; 410 for (ClientProtos.RegionAction regionAction : req.getRegionActionList()) { 411 numActions += regionAction.getActionCount(); 412 } 413 414 cs.setNumActionsPerServer(numActions); 415 } 416 417 final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr); 418 Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType, 419 hrc.getCallTimeout(), hrc.getPriority(), new RpcCallback<Call>() { 420 @Override 421 public void run(Call call) { 422 try (Scope scope = call.span.makeCurrent()) { 423 counter.decrementAndGet(); 424 onCallFinished(call, hrc, addr, callback); 425 } finally { 426 if (hrc.failed()) { 427 TraceUtil.setError(span, hrc.getFailed()); 428 } else { 429 span.setStatus(StatusCode.OK); 430 } 431 span.end(); 432 } 433 } 434 }, cs); 435 ConnectionId remoteId = new ConnectionId(ticket, md.getService().getName(), addr); 436 int count = counter.incrementAndGet(); 437 try { 438 if (count > maxConcurrentCallsPerServer) { 439 throw new ServerTooBusyException(addr, count); 440 } 441 cs.setConcurrentCallsPerServer(count); 442 T connection = getConnection(remoteId); 443 connection.sendRequest(call, hrc); 444 } catch (Exception e) { 445 call.setException(toIOE(e)); 446 span.end(); 447 } 448 return call; 449 } 450 } 451 452 private static Address createAddr(ServerName sn) { 453 return Address.fromParts(sn.getHostname(), sn.getPort()); 454 } 455 456 /** 457 * Interrupt the connections to the given ip:port server. This should be called if the server is 458 * known as actually dead. This will not prevent current operation to be retried, and, depending 459 * on their own behavior, they may retry on the same server. This can be a feature, for example at 460 * startup. In any case, they're likely to get connection refused (if the process died) or no 461 * route to host: i.e. their next retries should be faster and with a safe exception. 462 */ 463 @Override 464 public void cancelConnections(ServerName sn) { 465 synchronized (connections) { 466 for (T connection : connections.values()) { 467 ConnectionId remoteId = connection.remoteId(); 468 if ( 469 remoteId.getAddress().getPort() == sn.getPort() 470 && remoteId.getAddress().getHostName().equals(sn.getHostname()) 471 ) { 472 LOG.info("The server on " + sn.toString() + " is dead - stopping the connection " 473 + connection.remoteId); 474 connections.remove(remoteId, connection); 475 connection.shutdown(); 476 connection.cleanupConnection(); 477 } 478 } 479 } 480 } 481 482 /** 483 * Configure an hbase rpccontroller 484 * @param controller to configure 485 * @param channelOperationTimeout timeout for operation 486 * @return configured controller 487 */ 488 static HBaseRpcController configureHBaseRpcController(RpcController controller, 489 int channelOperationTimeout) { 490 HBaseRpcController hrc; 491 if (controller != null && controller instanceof HBaseRpcController) { 492 hrc = (HBaseRpcController) controller; 493 if (!hrc.hasCallTimeout()) { 494 hrc.setCallTimeout(channelOperationTimeout); 495 } 496 } else { 497 hrc = new HBaseRpcControllerImpl(); 498 hrc.setCallTimeout(channelOperationTimeout); 499 } 500 return hrc; 501 } 502 503 protected abstract void closeInternal(); 504 505 @Override 506 public void close() { 507 if (LOG.isDebugEnabled()) { 508 LOG.debug("Stopping rpc client"); 509 } 510 Collection<T> connToClose; 511 synchronized (connections) { 512 if (!running) { 513 return; 514 } 515 running = false; 516 connToClose = connections.values(); 517 connections.clear(); 518 } 519 cleanupIdleConnectionTask.cancel(true); 520 for (T conn : connToClose) { 521 conn.shutdown(); 522 } 523 closeInternal(); 524 for (T conn : connToClose) { 525 conn.cleanupConnection(); 526 } 527 } 528 529 @Override 530 public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn, final User ticket, 531 int rpcTimeout) { 532 return new BlockingRpcChannelImplementation(this, createAddr(sn), ticket, rpcTimeout); 533 } 534 535 @Override 536 public RpcChannel createRpcChannel(ServerName sn, User user, int rpcTimeout) { 537 return new RpcChannelImplementation(this, createAddr(sn), user, rpcTimeout); 538 } 539 540 private static class AbstractRpcChannel { 541 542 protected final Address addr; 543 544 protected final AbstractRpcClient<?> rpcClient; 545 546 protected final User ticket; 547 548 protected final int rpcTimeout; 549 550 protected AbstractRpcChannel(AbstractRpcClient<?> rpcClient, Address addr, User ticket, 551 int rpcTimeout) { 552 this.addr = addr; 553 this.rpcClient = rpcClient; 554 this.ticket = ticket; 555 this.rpcTimeout = rpcTimeout; 556 } 557 558 /** 559 * Configure an rpc controller 560 * @param controller to configure 561 * @return configured rpc controller 562 */ 563 protected HBaseRpcController configureRpcController(RpcController controller) { 564 HBaseRpcController hrc; 565 // TODO: Ideally we should not use an RpcController other than HBaseRpcController at client 566 // side. And now we may use ServerRpcController. 567 if (controller != null && controller instanceof HBaseRpcController) { 568 hrc = (HBaseRpcController) controller; 569 if (!hrc.hasCallTimeout()) { 570 hrc.setCallTimeout(rpcTimeout); 571 } 572 } else { 573 hrc = new HBaseRpcControllerImpl(); 574 hrc.setCallTimeout(rpcTimeout); 575 } 576 return hrc; 577 } 578 } 579 580 /** 581 * Blocking rpc channel that goes via hbase rpc. 582 */ 583 public static class BlockingRpcChannelImplementation extends AbstractRpcChannel 584 implements BlockingRpcChannel { 585 586 protected BlockingRpcChannelImplementation(AbstractRpcClient<?> rpcClient, Address addr, 587 User ticket, int rpcTimeout) { 588 super(rpcClient, addr, ticket, rpcTimeout); 589 } 590 591 @Override 592 public Message callBlockingMethod(Descriptors.MethodDescriptor md, RpcController controller, 593 Message param, Message returnType) throws ServiceException { 594 return rpcClient.callBlockingMethod(md, configureRpcController(controller), param, returnType, 595 ticket, addr); 596 } 597 } 598 599 /** 600 * Async rpc channel that goes via hbase rpc. 601 */ 602 public static class RpcChannelImplementation extends AbstractRpcChannel implements RpcChannel { 603 604 protected RpcChannelImplementation(AbstractRpcClient<?> rpcClient, Address addr, User ticket, 605 int rpcTimeout) { 606 super(rpcClient, addr, ticket, rpcTimeout); 607 } 608 609 @Override 610 public void callMethod(Descriptors.MethodDescriptor md, RpcController controller, Message param, 611 Message returnType, RpcCallback<Message> done) { 612 HBaseRpcController configuredController = configureRpcController( 613 Preconditions.checkNotNull(controller, "RpcController can not be null for async rpc call")); 614 // This method does not throw any exceptions, so the caller must provide a 615 // HBaseRpcController which is used to pass the exceptions. 616 this.rpcClient.callMethod(md, configuredController, param, returnType, ticket, addr, done); 617 } 618 } 619}