001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.ipc; 020 021import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE; 022import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException; 023 024import java.io.IOException; 025import java.net.InetSocketAddress; 026import java.net.SocketAddress; 027import java.net.UnknownHostException; 028import java.util.Collection; 029import java.util.concurrent.Executors; 030import java.util.concurrent.ScheduledExecutorService; 031import java.util.concurrent.ScheduledFuture; 032import java.util.concurrent.TimeUnit; 033import java.util.concurrent.atomic.AtomicInteger; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.ServerName; 037import org.apache.hadoop.hbase.client.MetricsConnection; 038import org.apache.hadoop.hbase.codec.Codec; 039import org.apache.hadoop.hbase.codec.KeyValueCodec; 040import org.apache.hadoop.hbase.security.User; 041import org.apache.hadoop.hbase.security.UserProvider; 042import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 043import org.apache.hadoop.hbase.util.PoolMap; 044import org.apache.hadoop.hbase.util.Threads; 045import org.apache.hadoop.io.compress.CompressionCodec; 046import org.apache.hadoop.ipc.RemoteException; 047import org.apache.yetus.audience.InterfaceAudience; 048import org.slf4j.Logger; 049import org.slf4j.LoggerFactory; 050 051import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 052import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 053import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; 054import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader; 055import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache; 056import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel; 057import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors; 058import org.apache.hbase.thirdparty.com.google.protobuf.Message; 059import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 060import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel; 061import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 062import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 063import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; 064 065import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 066 067/** 068 * Provides the basics for a RpcClient implementation like configuration and Logging. 069 * <p> 070 * Locking schema of the current IPC implementation 071 * <ul> 072 * <li>There is a lock in {@link AbstractRpcClient} to protect the fetching or creating 073 * connection.</li> 074 * <li>There is a lock in {@link Call} to make sure that we can only finish the call once.</li> 075 * <li>The same for {@link HBaseRpcController} as {@link Call}. And see the comment of 076 * {@link HBaseRpcController#notifyOnCancel(RpcCallback, HBaseRpcController.CancellationCallback)} 077 * of how to deal with cancel.</li> 078 * <li>For connection implementation, the construction of a connection should be as fast as possible 079 * because the creation is protected under a lock. Connect to remote side when needed. There is no 080 * forced locking schema for a connection implementation.</li> 081 * <li>For the locking order, the {@link Call} and {@link HBaseRpcController}'s lock should be held 082 * at last. So the callbacks in {@link Call} and {@link HBaseRpcController} should be execute 083 * outside the lock in {@link Call} and {@link HBaseRpcController} which means the implementations 084 * of the callbacks are free to hold any lock.</li> 085 * </ul> 086 * @since 2.0.0 087 */ 088@InterfaceAudience.Private 089public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcClient { 090 // Log level is being changed in tests 091 public static final Logger LOG = LoggerFactory.getLogger(AbstractRpcClient.class); 092 093 protected static final HashedWheelTimer WHEEL_TIMER = new HashedWheelTimer( 094 new ThreadFactoryBuilder().setNameFormat("RpcClient-timer-pool-%d").setDaemon(true) 095 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(), 096 10, TimeUnit.MILLISECONDS); 097 098 private static final ScheduledExecutorService IDLE_CONN_SWEEPER = 099 Executors.newScheduledThreadPool(1, 100 new ThreadFactoryBuilder().setNameFormat("Idle-Rpc-Conn-Sweeper-pool-%d").setDaemon(true) 101 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 102 103 private boolean running = true; // if client runs 104 105 protected final Configuration conf; 106 protected final String clusterId; 107 protected final SocketAddress localAddr; 108 protected final MetricsConnection metrics; 109 110 protected final UserProvider userProvider; 111 protected final CellBlockBuilder cellBlockBuilder; 112 113 protected final int minIdleTimeBeforeClose; // if the connection is idle for more than this 114 // time (in ms), it will be closed at any moment. 115 protected final int maxRetries; // the max. no. of retries for socket connections 116 protected final long failureSleep; // Time to sleep before retry on failure. 117 protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm 118 protected final boolean tcpKeepAlive; // if T then use keepalives 119 protected final Codec codec; 120 protected final CompressionCodec compressor; 121 protected final boolean fallbackAllowed; 122 123 protected final FailedServers failedServers; 124 125 protected final int connectTO; 126 protected final int readTO; 127 protected final int writeTO; 128 129 private final PoolMap<ConnectionId, T> connections; 130 131 private final AtomicInteger callIdCnt = new AtomicInteger(0); 132 133 private final ScheduledFuture<?> cleanupIdleConnectionTask; 134 135 private int maxConcurrentCallsPerServer; 136 137 private static final LoadingCache<InetSocketAddress, AtomicInteger> concurrentCounterCache = 138 CacheBuilder.newBuilder().expireAfterAccess(1, TimeUnit.HOURS). 139 build(new CacheLoader<InetSocketAddress, AtomicInteger>() { 140 @Override public AtomicInteger load(InetSocketAddress key) throws Exception { 141 return new AtomicInteger(0); 142 } 143 }); 144 145 /** 146 * Construct an IPC client for the cluster <code>clusterId</code> 147 * @param conf configuration 148 * @param clusterId the cluster id 149 * @param localAddr client socket bind address. 150 * @param metrics the connection metrics 151 */ 152 public AbstractRpcClient(Configuration conf, String clusterId, SocketAddress localAddr, 153 MetricsConnection metrics) { 154 this.userProvider = UserProvider.instantiate(conf); 155 this.localAddr = localAddr; 156 this.tcpKeepAlive = conf.getBoolean("hbase.ipc.client.tcpkeepalive", true); 157 this.clusterId = clusterId != null ? clusterId : HConstants.CLUSTER_ID_DEFAULT; 158 this.failureSleep = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, 159 HConstants.DEFAULT_HBASE_CLIENT_PAUSE); 160 this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0); 161 this.tcpNoDelay = conf.getBoolean("hbase.ipc.client.tcpnodelay", true); 162 this.cellBlockBuilder = new CellBlockBuilder(conf); 163 164 this.minIdleTimeBeforeClose = conf.getInt(IDLE_TIME, 120000); // 2 minutes 165 this.conf = conf; 166 this.codec = getCodec(); 167 this.compressor = getCompressor(conf); 168 this.fallbackAllowed = conf.getBoolean(IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY, 169 IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT); 170 this.failedServers = new FailedServers(conf); 171 this.connectTO = conf.getInt(SOCKET_TIMEOUT_CONNECT, DEFAULT_SOCKET_TIMEOUT_CONNECT); 172 this.readTO = conf.getInt(SOCKET_TIMEOUT_READ, DEFAULT_SOCKET_TIMEOUT_READ); 173 this.writeTO = conf.getInt(SOCKET_TIMEOUT_WRITE, DEFAULT_SOCKET_TIMEOUT_WRITE); 174 this.metrics = metrics; 175 this.maxConcurrentCallsPerServer = conf.getInt( 176 HConstants.HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD, 177 HConstants.DEFAULT_HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD); 178 179 this.connections = new PoolMap<>(getPoolType(conf), getPoolSize(conf)); 180 181 this.cleanupIdleConnectionTask = IDLE_CONN_SWEEPER.scheduleAtFixedRate(new Runnable() { 182 183 @Override 184 public void run() { 185 cleanupIdleConnections(); 186 } 187 }, minIdleTimeBeforeClose, minIdleTimeBeforeClose, TimeUnit.MILLISECONDS); 188 189 if (LOG.isDebugEnabled()) { 190 LOG.debug("Codec=" + this.codec + ", compressor=" + this.compressor + ", tcpKeepAlive=" 191 + this.tcpKeepAlive + ", tcpNoDelay=" + this.tcpNoDelay + ", connectTO=" + this.connectTO 192 + ", readTO=" + this.readTO + ", writeTO=" + this.writeTO + ", minIdleTimeBeforeClose=" 193 + this.minIdleTimeBeforeClose + ", maxRetries=" + this.maxRetries + ", fallbackAllowed=" 194 + this.fallbackAllowed + ", bind address=" 195 + (this.localAddr != null ? this.localAddr : "null")); 196 } 197 } 198 199 private void cleanupIdleConnections() { 200 long closeBeforeTime = EnvironmentEdgeManager.currentTime() - minIdleTimeBeforeClose; 201 synchronized (connections) { 202 for (T conn : connections.values()) { 203 // Remove connection if it has not been chosen by anyone for more than maxIdleTime, and the 204 // connection itself has already shutdown. The latter check is because we may still 205 // have some pending calls on connection so we should not shutdown the connection outside. 206 // The connection itself will disconnect if there is no pending call for maxIdleTime. 207 if (conn.getLastTouched() < closeBeforeTime && !conn.isActive()) { 208 if (LOG.isTraceEnabled()) { 209 LOG.trace("Cleanup idle connection to {}", conn.remoteId().address); 210 } 211 connections.remove(conn.remoteId(), conn); 212 conn.cleanupConnection(); 213 } 214 } 215 } 216 } 217 218 public static String getDefaultCodec(final Configuration c) { 219 // If "hbase.client.default.rpc.codec" is empty string -- you can't set it to null because 220 // Configuration will complain -- then no default codec (and we'll pb everything). Else 221 // default is KeyValueCodec 222 return c.get(DEFAULT_CODEC_CLASS, KeyValueCodec.class.getCanonicalName()); 223 } 224 225 /** 226 * Encapsulate the ugly casting and RuntimeException conversion in private method. 227 * @return Codec to use on this client. 228 */ 229 Codec getCodec() { 230 // For NO CODEC, "hbase.client.rpc.codec" must be configured with empty string AND 231 // "hbase.client.default.rpc.codec" also -- because default is to do cell block encoding. 232 String className = conf.get(HConstants.RPC_CODEC_CONF_KEY, getDefaultCodec(this.conf)); 233 if (className == null || className.length() == 0) { 234 return null; 235 } 236 try { 237 return (Codec) Class.forName(className).getDeclaredConstructor().newInstance(); 238 } catch (Exception e) { 239 throw new RuntimeException("Failed getting codec " + className, e); 240 } 241 } 242 243 @Override 244 public boolean hasCellBlockSupport() { 245 return this.codec != null; 246 } 247 248 // for writing tests that want to throw exception when connecting. 249 boolean isTcpNoDelay() { 250 return tcpNoDelay; 251 } 252 253 /** 254 * Encapsulate the ugly casting and RuntimeException conversion in private method. 255 * @param conf configuration 256 * @return The compressor to use on this client. 257 */ 258 private static CompressionCodec getCompressor(final Configuration conf) { 259 String className = conf.get("hbase.client.rpc.compressor", null); 260 if (className == null || className.isEmpty()) { 261 return null; 262 } 263 try { 264 return (CompressionCodec) Class.forName(className).getDeclaredConstructor().newInstance(); 265 } catch (Exception e) { 266 throw new RuntimeException("Failed getting compressor " + className, e); 267 } 268 } 269 270 /** 271 * Return the pool type specified in the configuration, which must be set to either 272 * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or 273 * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal}, otherwise default to the 274 * former. For applications with many user threads, use a small round-robin pool. For applications 275 * with few user threads, you may want to try using a thread-local pool. In any case, the number 276 * of {@link org.apache.hadoop.hbase.ipc.RpcClient} instances should not exceed the operating 277 * system's hard limit on the number of connections. 278 * @param config configuration 279 * @return either a {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or 280 * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal} 281 */ 282 private static PoolMap.PoolType getPoolType(Configuration config) { 283 return PoolMap.PoolType.valueOf(config.get(HConstants.HBASE_CLIENT_IPC_POOL_TYPE), 284 PoolMap.PoolType.RoundRobin); 285 } 286 287 /** 288 * Return the pool size specified in the configuration, which is applicable only if the pool type 289 * is {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin}. 290 * @param config configuration 291 * @return the maximum pool size 292 */ 293 private static int getPoolSize(Configuration config) { 294 int poolSize = config.getInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, 1); 295 296 if (poolSize <= 0) { 297 LOG.warn("{} must be positive. Using default value: 1", HConstants.HBASE_CLIENT_IPC_POOL_SIZE); 298 return 1; 299 } else { 300 return poolSize; 301 } 302 } 303 304 private int nextCallId() { 305 int id, next; 306 do { 307 id = callIdCnt.get(); 308 next = id < Integer.MAX_VALUE ? id + 1 : 0; 309 } while (!callIdCnt.compareAndSet(id, next)); 310 return id; 311 } 312 313 /** 314 * Make a blocking call. Throws exceptions if there are network problems or if the remote code 315 * threw an exception. 316 * @param ticket Be careful which ticket you pass. A new user will mean a new Connection. 317 * {@link UserProvider#getCurrent()} makes a new instance of User each time so will be a 318 * new Connection each time. 319 * @return A pair with the Message response and the Cell data (if any). 320 */ 321 private Message callBlockingMethod(Descriptors.MethodDescriptor md, HBaseRpcController hrc, 322 Message param, Message returnType, final User ticket, final InetSocketAddress isa) 323 throws ServiceException { 324 BlockingRpcCallback<Message> done = new BlockingRpcCallback<>(); 325 callMethod(md, hrc, param, returnType, ticket, isa, done); 326 Message val; 327 try { 328 val = done.get(); 329 } catch (IOException e) { 330 throw new ServiceException(e); 331 } 332 if (hrc.failed()) { 333 throw new ServiceException(hrc.getFailed()); 334 } else { 335 return val; 336 } 337 } 338 339 /** 340 * Get a connection from the pool, or create a new one and add it to the pool. Connections to a 341 * given host/port are reused. 342 */ 343 private T getConnection(ConnectionId remoteId) throws IOException { 344 if (failedServers.isFailedServer(remoteId.getAddress())) { 345 if (LOG.isDebugEnabled()) { 346 LOG.debug("Not trying to connect to " + remoteId.address 347 + " this server is in the failed servers list"); 348 } 349 throw new FailedServerException( 350 "This server is in the failed servers list: " + remoteId.address); 351 } 352 T conn; 353 synchronized (connections) { 354 if (!running) { 355 throw new StoppedRpcClientException(); 356 } 357 conn = connections.getOrCreate(remoteId, () -> createConnection(remoteId)); 358 conn.setLastTouched(EnvironmentEdgeManager.currentTime()); 359 } 360 return conn; 361 } 362 363 /** 364 * Not connected. 365 */ 366 protected abstract T createConnection(ConnectionId remoteId) throws IOException; 367 368 private void onCallFinished(Call call, HBaseRpcController hrc, InetSocketAddress addr, 369 RpcCallback<Message> callback) { 370 call.callStats.setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.getStartTime()); 371 if (metrics != null) { 372 metrics.updateRpc(call.md, call.param, call.callStats); 373 } 374 if (LOG.isTraceEnabled()) { 375 LOG.trace( 376 "Call: " + call.md.getName() + ", callTime: " + call.callStats.getCallTimeMs() + "ms"); 377 } 378 if (call.error != null) { 379 if (call.error instanceof RemoteException) { 380 call.error.fillInStackTrace(); 381 hrc.setFailed(call.error); 382 } else { 383 hrc.setFailed(wrapException(addr, call.error)); 384 } 385 callback.run(null); 386 } else { 387 hrc.setDone(call.cells); 388 callback.run(call.response); 389 } 390 } 391 392 Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc, 393 final Message param, Message returnType, final User ticket, final InetSocketAddress addr, 394 final RpcCallback<Message> callback) { 395 final MetricsConnection.CallStats cs = MetricsConnection.newCallStats(); 396 cs.setStartTime(EnvironmentEdgeManager.currentTime()); 397 398 if (param instanceof ClientProtos.MultiRequest) { 399 ClientProtos.MultiRequest req = (ClientProtos.MultiRequest) param; 400 int numActions = 0; 401 for (ClientProtos.RegionAction regionAction : req.getRegionActionList()) { 402 numActions += regionAction.getActionCount(); 403 } 404 405 cs.setNumActionsPerServer(numActions); 406 } 407 408 final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr); 409 Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType, 410 hrc.getCallTimeout(), hrc.getPriority(), new RpcCallback<Call>() { 411 @Override 412 public void run(Call call) { 413 counter.decrementAndGet(); 414 onCallFinished(call, hrc, addr, callback); 415 } 416 }, cs); 417 ConnectionId remoteId = new ConnectionId(ticket, md.getService().getName(), addr); 418 int count = counter.incrementAndGet(); 419 try { 420 if (count > maxConcurrentCallsPerServer) { 421 throw new ServerTooBusyException(addr, count); 422 } 423 cs.setConcurrentCallsPerServer(count); 424 T connection = getConnection(remoteId); 425 connection.sendRequest(call, hrc); 426 } catch (Exception e) { 427 call.setException(toIOE(e)); 428 } 429 return call; 430 } 431 432 InetSocketAddress createAddr(ServerName sn) throws UnknownHostException { 433 InetSocketAddress addr = new InetSocketAddress(sn.getHostname(), sn.getPort()); 434 if (addr.isUnresolved()) { 435 throw new UnknownHostException("can not resolve " + sn.getServerName()); 436 } 437 return addr; 438 } 439 440 /** 441 * Interrupt the connections to the given ip:port server. This should be called if the server is 442 * known as actually dead. This will not prevent current operation to be retried, and, depending 443 * on their own behavior, they may retry on the same server. This can be a feature, for example at 444 * startup. In any case, they're likely to get connection refused (if the process died) or no 445 * route to host: i.e. their next retries should be faster and with a safe exception. 446 */ 447 @Override 448 public void cancelConnections(ServerName sn) { 449 synchronized (connections) { 450 for (T connection : connections.values()) { 451 ConnectionId remoteId = connection.remoteId(); 452 if (remoteId.address.getPort() == sn.getPort() 453 && remoteId.address.getHostName().equals(sn.getHostname())) { 454 LOG.info("The server on " + sn.toString() + " is dead - stopping the connection " 455 + connection.remoteId); 456 connections.remove(remoteId, connection); 457 connection.shutdown(); 458 connection.cleanupConnection(); 459 } 460 } 461 } 462 } 463 /** 464 * Configure an hbase rpccontroller 465 * @param controller to configure 466 * @param channelOperationTimeout timeout for operation 467 * @return configured controller 468 */ 469 static HBaseRpcController configureHBaseRpcController( 470 RpcController controller, int channelOperationTimeout) { 471 HBaseRpcController hrc; 472 if (controller != null && controller instanceof HBaseRpcController) { 473 hrc = (HBaseRpcController) controller; 474 if (!hrc.hasCallTimeout()) { 475 hrc.setCallTimeout(channelOperationTimeout); 476 } 477 } else { 478 hrc = new HBaseRpcControllerImpl(); 479 hrc.setCallTimeout(channelOperationTimeout); 480 } 481 return hrc; 482 } 483 484 protected abstract void closeInternal(); 485 486 @Override 487 public void close() { 488 if (LOG.isDebugEnabled()) { 489 LOG.debug("Stopping rpc client"); 490 } 491 Collection<T> connToClose; 492 synchronized (connections) { 493 if (!running) { 494 return; 495 } 496 running = false; 497 connToClose = connections.values(); 498 connections.clear(); 499 } 500 cleanupIdleConnectionTask.cancel(true); 501 for (T conn : connToClose) { 502 conn.shutdown(); 503 } 504 closeInternal(); 505 for (T conn : connToClose) { 506 conn.cleanupConnection(); 507 } 508 } 509 510 @Override 511 public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn, final User ticket, 512 int rpcTimeout) throws UnknownHostException { 513 return new BlockingRpcChannelImplementation(this, createAddr(sn), ticket, rpcTimeout); 514 } 515 516 @Override 517 public RpcChannel createRpcChannel(ServerName sn, User user, int rpcTimeout) 518 throws UnknownHostException { 519 return new RpcChannelImplementation(this, createAddr(sn), user, rpcTimeout); 520 } 521 522 private static class AbstractRpcChannel { 523 524 protected final InetSocketAddress addr; 525 526 protected final AbstractRpcClient<?> rpcClient; 527 528 protected final User ticket; 529 530 protected final int rpcTimeout; 531 532 protected AbstractRpcChannel(AbstractRpcClient<?> rpcClient, InetSocketAddress addr, 533 User ticket, int rpcTimeout) { 534 this.addr = addr; 535 this.rpcClient = rpcClient; 536 this.ticket = ticket; 537 this.rpcTimeout = rpcTimeout; 538 } 539 540 /** 541 * Configure an rpc controller 542 * @param controller to configure 543 * @return configured rpc controller 544 */ 545 protected HBaseRpcController configureRpcController(RpcController controller) { 546 HBaseRpcController hrc; 547 // TODO: Ideally we should not use an RpcController other than HBaseRpcController at client 548 // side. And now we may use ServerRpcController. 549 if (controller != null && controller instanceof HBaseRpcController) { 550 hrc = (HBaseRpcController) controller; 551 if (!hrc.hasCallTimeout()) { 552 hrc.setCallTimeout(rpcTimeout); 553 } 554 } else { 555 hrc = new HBaseRpcControllerImpl(); 556 hrc.setCallTimeout(rpcTimeout); 557 } 558 return hrc; 559 } 560 } 561 562 /** 563 * Blocking rpc channel that goes via hbase rpc. 564 */ 565 public static class BlockingRpcChannelImplementation extends AbstractRpcChannel 566 implements BlockingRpcChannel { 567 568 protected BlockingRpcChannelImplementation(AbstractRpcClient<?> rpcClient, 569 InetSocketAddress addr, User ticket, int rpcTimeout) { 570 super(rpcClient, addr, ticket, rpcTimeout); 571 } 572 573 @Override 574 public Message callBlockingMethod(Descriptors.MethodDescriptor md, RpcController controller, 575 Message param, Message returnType) throws ServiceException { 576 return rpcClient.callBlockingMethod(md, configureRpcController(controller), 577 param, returnType, ticket, addr); 578 } 579 } 580 581 /** 582 * Async rpc channel that goes via hbase rpc. 583 */ 584 public static class RpcChannelImplementation extends AbstractRpcChannel implements 585 RpcChannel { 586 587 protected RpcChannelImplementation(AbstractRpcClient<?> rpcClient, InetSocketAddress addr, 588 User ticket, int rpcTimeout) throws UnknownHostException { 589 super(rpcClient, addr, ticket, rpcTimeout); 590 } 591 592 @Override 593 public void callMethod(Descriptors.MethodDescriptor md, RpcController controller, 594 Message param, Message returnType, RpcCallback<Message> done) { 595 // This method does not throw any exceptions, so the caller must provide a 596 // HBaseRpcController which is used to pass the exceptions. 597 this.rpcClient.callMethod(md, 598 configureRpcController(Preconditions.checkNotNull(controller, 599 "RpcController can not be null for async rpc call")), 600 param, returnType, ticket, addr, done); 601 } 602 } 603}