001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.ipc; 020 021import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE; 022import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException; 023 024import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 025import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 026import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; 027import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader; 028import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache; 029import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel; 030import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors; 031import org.apache.hbase.thirdparty.com.google.protobuf.Message; 032import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel; 033import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 034import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 035import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 036 037import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; 038 039import java.io.IOException; 040import java.net.InetSocketAddress; 041import java.net.SocketAddress; 042import java.net.UnknownHostException; 043import java.util.Collection; 044import java.util.HashMap; 045import java.util.Map; 046import java.util.concurrent.Executors; 047import java.util.concurrent.ScheduledExecutorService; 048import java.util.concurrent.ScheduledFuture; 049import java.util.concurrent.TimeUnit; 050import java.util.concurrent.atomic.AtomicInteger; 051 052import org.apache.hadoop.conf.Configuration; 053import org.apache.hadoop.hbase.HConstants; 054import org.apache.hadoop.hbase.ServerName; 055import org.apache.yetus.audience.InterfaceAudience; 056import org.slf4j.Logger; 057import org.slf4j.LoggerFactory; 058import org.apache.hadoop.hbase.client.MetricsConnection; 059import org.apache.hadoop.hbase.codec.Codec; 060import org.apache.hadoop.hbase.codec.KeyValueCodec; 061import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos.TokenIdentifier.Kind; 062import org.apache.hadoop.hbase.security.User; 063import org.apache.hadoop.hbase.security.UserProvider; 064import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector; 065import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 066import org.apache.hadoop.hbase.util.PoolMap; 067import org.apache.hadoop.hbase.util.Threads; 068import org.apache.hadoop.io.compress.CompressionCodec; 069import org.apache.hadoop.ipc.RemoteException; 070import org.apache.hadoop.security.token.TokenIdentifier; 071import org.apache.hadoop.security.token.TokenSelector; 072 073import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 074 075/** 076 * Provides the basics for a RpcClient implementation like configuration and Logging. 077 * <p> 078 * Locking schema of the current IPC implementation 079 * <ul> 080 * <li>There is a lock in {@link AbstractRpcClient} to protect the fetching or creating 081 * connection.</li> 082 * <li>There is a lock in {@link Call} to make sure that we can only finish the call once.</li> 083 * <li>The same for {@link HBaseRpcController} as {@link Call}. And see the comment of 084 * {@link HBaseRpcController#notifyOnCancel(RpcCallback, HBaseRpcController.CancellationCallback)} 085 * of how to deal with cancel.</li> 086 * <li>For connection implementation, the construction of a connection should be as fast as possible 087 * because the creation is protected under a lock. Connect to remote side when needed. There is no 088 * forced locking schema for a connection implementation.</li> 089 * <li>For the locking order, the {@link Call} and {@link HBaseRpcController}'s lock should be held 090 * at last. So the callbacks in {@link Call} and {@link HBaseRpcController} should be execute 091 * outside the lock in {@link Call} and {@link HBaseRpcController} which means the implementations 092 * of the callbacks are free to hold any lock.</li> 093 * </ul> 094 * @since 2.0.0 095 */ 096@InterfaceAudience.Private 097public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcClient { 098 // Log level is being changed in tests 099 public static final Logger LOG = LoggerFactory.getLogger(AbstractRpcClient.class); 100 101 protected static final HashedWheelTimer WHEEL_TIMER = new HashedWheelTimer( 102 Threads.newDaemonThreadFactory("RpcClient-timer"), 10, TimeUnit.MILLISECONDS); 103 104 private static final ScheduledExecutorService IDLE_CONN_SWEEPER = Executors 105 .newScheduledThreadPool(1, Threads.newDaemonThreadFactory("Idle-Rpc-Conn-Sweeper")); 106 107 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_MUTABLE_COLLECTION_PKGPROTECT", 108 justification="the rest of the system which live in the different package can use") 109 protected final static Map<Kind, TokenSelector<? extends TokenIdentifier>> TOKEN_HANDLERS = new HashMap<>(); 110 111 static { 112 TOKEN_HANDLERS.put(Kind.HBASE_AUTH_TOKEN, new AuthenticationTokenSelector()); 113 } 114 115 protected boolean running = true; // if client runs 116 117 protected final Configuration conf; 118 protected final String clusterId; 119 protected final SocketAddress localAddr; 120 protected final MetricsConnection metrics; 121 122 protected final UserProvider userProvider; 123 protected final CellBlockBuilder cellBlockBuilder; 124 125 protected final int minIdleTimeBeforeClose; // if the connection is idle for more than this 126 // time (in ms), it will be closed at any moment. 127 protected final int maxRetries; // the max. no. of retries for socket connections 128 protected final long failureSleep; // Time to sleep before retry on failure. 129 protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm 130 protected final boolean tcpKeepAlive; // if T then use keepalives 131 protected final Codec codec; 132 protected final CompressionCodec compressor; 133 protected final boolean fallbackAllowed; 134 135 protected final FailedServers failedServers; 136 137 protected final int connectTO; 138 protected final int readTO; 139 protected final int writeTO; 140 141 protected final PoolMap<ConnectionId, T> connections; 142 143 private final AtomicInteger callIdCnt = new AtomicInteger(0); 144 145 private final ScheduledFuture<?> cleanupIdleConnectionTask; 146 147 private int maxConcurrentCallsPerServer; 148 149 private static final LoadingCache<InetSocketAddress, AtomicInteger> concurrentCounterCache = 150 CacheBuilder.newBuilder().expireAfterAccess(1, TimeUnit.HOURS). 151 build(new CacheLoader<InetSocketAddress, AtomicInteger>() { 152 @Override public AtomicInteger load(InetSocketAddress key) throws Exception { 153 return new AtomicInteger(0); 154 } 155 }); 156 157 /** 158 * Construct an IPC client for the cluster <code>clusterId</code> 159 * @param conf configuration 160 * @param clusterId the cluster id 161 * @param localAddr client socket bind address. 162 * @param metrics the connection metrics 163 */ 164 public AbstractRpcClient(Configuration conf, String clusterId, SocketAddress localAddr, 165 MetricsConnection metrics) { 166 this.userProvider = UserProvider.instantiate(conf); 167 this.localAddr = localAddr; 168 this.tcpKeepAlive = conf.getBoolean("hbase.ipc.client.tcpkeepalive", true); 169 this.clusterId = clusterId != null ? clusterId : HConstants.CLUSTER_ID_DEFAULT; 170 this.failureSleep = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, 171 HConstants.DEFAULT_HBASE_CLIENT_PAUSE); 172 this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0); 173 this.tcpNoDelay = conf.getBoolean("hbase.ipc.client.tcpnodelay", true); 174 this.cellBlockBuilder = new CellBlockBuilder(conf); 175 176 this.minIdleTimeBeforeClose = conf.getInt(IDLE_TIME, 120000); // 2 minutes 177 this.conf = conf; 178 this.codec = getCodec(); 179 this.compressor = getCompressor(conf); 180 this.fallbackAllowed = conf.getBoolean(IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY, 181 IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT); 182 this.failedServers = new FailedServers(conf); 183 this.connectTO = conf.getInt(SOCKET_TIMEOUT_CONNECT, DEFAULT_SOCKET_TIMEOUT_CONNECT); 184 this.readTO = conf.getInt(SOCKET_TIMEOUT_READ, DEFAULT_SOCKET_TIMEOUT_READ); 185 this.writeTO = conf.getInt(SOCKET_TIMEOUT_WRITE, DEFAULT_SOCKET_TIMEOUT_WRITE); 186 this.metrics = metrics; 187 this.maxConcurrentCallsPerServer = conf.getInt( 188 HConstants.HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD, 189 HConstants.DEFAULT_HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD); 190 191 this.connections = new PoolMap<>(getPoolType(conf), getPoolSize(conf)); 192 193 this.cleanupIdleConnectionTask = IDLE_CONN_SWEEPER.scheduleAtFixedRate(new Runnable() { 194 195 @Override 196 public void run() { 197 cleanupIdleConnections(); 198 } 199 }, minIdleTimeBeforeClose, minIdleTimeBeforeClose, TimeUnit.MILLISECONDS); 200 201 if (LOG.isDebugEnabled()) { 202 LOG.debug("Codec=" + this.codec + ", compressor=" + this.compressor + ", tcpKeepAlive=" 203 + this.tcpKeepAlive + ", tcpNoDelay=" + this.tcpNoDelay + ", connectTO=" + this.connectTO 204 + ", readTO=" + this.readTO + ", writeTO=" + this.writeTO + ", minIdleTimeBeforeClose=" 205 + this.minIdleTimeBeforeClose + ", maxRetries=" + this.maxRetries + ", fallbackAllowed=" 206 + this.fallbackAllowed + ", bind address=" 207 + (this.localAddr != null ? this.localAddr : "null")); 208 } 209 } 210 211 private void cleanupIdleConnections() { 212 long closeBeforeTime = EnvironmentEdgeManager.currentTime() - minIdleTimeBeforeClose; 213 synchronized (connections) { 214 for (T conn : connections.values()) { 215 // Remove connection if it has not been chosen by anyone for more than maxIdleTime, and the 216 // connection itself has already shutdown. The latter check is because we may still 217 // have some pending calls on connection so we should not shutdown the connection outside. 218 // The connection itself will disconnect if there is no pending call for maxIdleTime. 219 if (conn.getLastTouched() < closeBeforeTime && !conn.isActive()) { 220 if (LOG.isTraceEnabled()) LOG.trace("Cleanup idle connection to " + conn.remoteId().address); 221 connections.removeValue(conn.remoteId(), conn); 222 conn.cleanupConnection(); 223 } 224 } 225 } 226 } 227 228 @VisibleForTesting 229 public static String getDefaultCodec(final Configuration c) { 230 // If "hbase.client.default.rpc.codec" is empty string -- you can't set it to null because 231 // Configuration will complain -- then no default codec (and we'll pb everything). Else 232 // default is KeyValueCodec 233 return c.get(DEFAULT_CODEC_CLASS, KeyValueCodec.class.getCanonicalName()); 234 } 235 236 /** 237 * Encapsulate the ugly casting and RuntimeException conversion in private method. 238 * @return Codec to use on this client. 239 */ 240 Codec getCodec() { 241 // For NO CODEC, "hbase.client.rpc.codec" must be configured with empty string AND 242 // "hbase.client.default.rpc.codec" also -- because default is to do cell block encoding. 243 String className = conf.get(HConstants.RPC_CODEC_CONF_KEY, getDefaultCodec(this.conf)); 244 if (className == null || className.length() == 0) { 245 return null; 246 } 247 try { 248 return (Codec) Class.forName(className).getDeclaredConstructor().newInstance(); 249 } catch (Exception e) { 250 throw new RuntimeException("Failed getting codec " + className, e); 251 } 252 } 253 254 @Override 255 public boolean hasCellBlockSupport() { 256 return this.codec != null; 257 } 258 259 // for writing tests that want to throw exception when connecting. 260 @VisibleForTesting 261 boolean isTcpNoDelay() { 262 return tcpNoDelay; 263 } 264 265 /** 266 * Encapsulate the ugly casting and RuntimeException conversion in private method. 267 * @param conf configuration 268 * @return The compressor to use on this client. 269 */ 270 private static CompressionCodec getCompressor(final Configuration conf) { 271 String className = conf.get("hbase.client.rpc.compressor", null); 272 if (className == null || className.isEmpty()) { 273 return null; 274 } 275 try { 276 return (CompressionCodec) Class.forName(className).getDeclaredConstructor().newInstance(); 277 } catch (Exception e) { 278 throw new RuntimeException("Failed getting compressor " + className, e); 279 } 280 } 281 282 /** 283 * Return the pool type specified in the configuration, which must be set to either 284 * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or 285 * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal}, otherwise default to the 286 * former. For applications with many user threads, use a small round-robin pool. For applications 287 * with few user threads, you may want to try using a thread-local pool. In any case, the number 288 * of {@link org.apache.hadoop.hbase.ipc.RpcClient} instances should not exceed the operating 289 * system's hard limit on the number of connections. 290 * @param config configuration 291 * @return either a {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or 292 * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal} 293 */ 294 private static PoolMap.PoolType getPoolType(Configuration config) { 295 return PoolMap.PoolType.valueOf(config.get(HConstants.HBASE_CLIENT_IPC_POOL_TYPE), 296 PoolMap.PoolType.RoundRobin, PoolMap.PoolType.ThreadLocal); 297 } 298 299 /** 300 * Return the pool size specified in the configuration, which is applicable only if the pool type 301 * is {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin}. 302 * @param config configuration 303 * @return the maximum pool size 304 */ 305 private static int getPoolSize(Configuration config) { 306 return config.getInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, 1); 307 } 308 309 private int nextCallId() { 310 int id, next; 311 do { 312 id = callIdCnt.get(); 313 next = id < Integer.MAX_VALUE ? id + 1 : 0; 314 } while (!callIdCnt.compareAndSet(id, next)); 315 return id; 316 } 317 318 /** 319 * Make a blocking call. Throws exceptions if there are network problems or if the remote code 320 * threw an exception. 321 * @param ticket Be careful which ticket you pass. A new user will mean a new Connection. 322 * {@link UserProvider#getCurrent()} makes a new instance of User each time so will be a 323 * new Connection each time. 324 * @return A pair with the Message response and the Cell data (if any). 325 */ 326 private Message callBlockingMethod(Descriptors.MethodDescriptor md, HBaseRpcController hrc, 327 Message param, Message returnType, final User ticket, final InetSocketAddress isa) 328 throws ServiceException { 329 BlockingRpcCallback<Message> done = new BlockingRpcCallback<>(); 330 callMethod(md, hrc, param, returnType, ticket, isa, done); 331 Message val; 332 try { 333 val = done.get(); 334 } catch (IOException e) { 335 throw new ServiceException(e); 336 } 337 if (hrc.failed()) { 338 throw new ServiceException(hrc.getFailed()); 339 } else { 340 return val; 341 } 342 } 343 344 /** 345 * Get a connection from the pool, or create a new one and add it to the pool. Connections to a 346 * given host/port are reused. 347 */ 348 private T getConnection(ConnectionId remoteId) throws IOException { 349 if (failedServers.isFailedServer(remoteId.getAddress())) { 350 if (LOG.isDebugEnabled()) { 351 LOG.debug("Not trying to connect to " + remoteId.address 352 + " this server is in the failed servers list"); 353 } 354 throw new FailedServerException( 355 "This server is in the failed servers list: " + remoteId.address); 356 } 357 T conn; 358 synchronized (connections) { 359 if (!running) { 360 throw new StoppedRpcClientException(); 361 } 362 conn = connections.get(remoteId); 363 if (conn == null) { 364 conn = createConnection(remoteId); 365 connections.put(remoteId, conn); 366 } 367 conn.setLastTouched(EnvironmentEdgeManager.currentTime()); 368 } 369 return conn; 370 } 371 372 /** 373 * Not connected. 374 */ 375 protected abstract T createConnection(ConnectionId remoteId) throws IOException; 376 377 private void onCallFinished(Call call, HBaseRpcController hrc, InetSocketAddress addr, 378 RpcCallback<Message> callback) { 379 call.callStats.setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.getStartTime()); 380 if (metrics != null) { 381 metrics.updateRpc(call.md, call.param, call.callStats); 382 } 383 if (LOG.isTraceEnabled()) { 384 LOG.trace( 385 "Call: " + call.md.getName() + ", callTime: " + call.callStats.getCallTimeMs() + "ms"); 386 } 387 if (call.error != null) { 388 if (call.error instanceof RemoteException) { 389 call.error.fillInStackTrace(); 390 hrc.setFailed(call.error); 391 } else { 392 hrc.setFailed(wrapException(addr, call.error)); 393 } 394 callback.run(null); 395 } else { 396 hrc.setDone(call.cells); 397 callback.run(call.response); 398 } 399 } 400 401 private void callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc, 402 final Message param, Message returnType, final User ticket, final InetSocketAddress addr, 403 final RpcCallback<Message> callback) { 404 final MetricsConnection.CallStats cs = MetricsConnection.newCallStats(); 405 cs.setStartTime(EnvironmentEdgeManager.currentTime()); 406 407 if (param instanceof ClientProtos.MultiRequest) { 408 ClientProtos.MultiRequest req = (ClientProtos.MultiRequest) param; 409 int numActions = 0; 410 for (ClientProtos.RegionAction regionAction : req.getRegionActionList()) { 411 numActions += regionAction.getActionCount(); 412 } 413 414 cs.setNumActionsPerServer(numActions); 415 } 416 417 final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr); 418 Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType, 419 hrc.getCallTimeout(), hrc.getPriority(), new RpcCallback<Call>() { 420 @Override 421 public void run(Call call) { 422 counter.decrementAndGet(); 423 onCallFinished(call, hrc, addr, callback); 424 } 425 }, cs); 426 ConnectionId remoteId = new ConnectionId(ticket, md.getService().getName(), addr); 427 int count = counter.incrementAndGet(); 428 try { 429 if (count > maxConcurrentCallsPerServer) { 430 throw new ServerTooBusyException(addr, count); 431 } 432 cs.setConcurrentCallsPerServer(count); 433 T connection = getConnection(remoteId); 434 connection.sendRequest(call, hrc); 435 } catch (Exception e) { 436 call.setException(toIOE(e)); 437 } 438 } 439 440 private InetSocketAddress createAddr(ServerName sn) throws UnknownHostException { 441 InetSocketAddress addr = new InetSocketAddress(sn.getHostname(), sn.getPort()); 442 if (addr.isUnresolved()) { 443 throw new UnknownHostException("can not resolve " + sn.getServerName()); 444 } 445 return addr; 446 } 447 448 /** 449 * Interrupt the connections to the given ip:port server. This should be called if the server is 450 * known as actually dead. This will not prevent current operation to be retried, and, depending 451 * on their own behavior, they may retry on the same server. This can be a feature, for example at 452 * startup. In any case, they're likely to get connection refused (if the process died) or no 453 * route to host: i.e. their next retries should be faster and with a safe exception. 454 */ 455 @Override 456 public void cancelConnections(ServerName sn) { 457 synchronized (connections) { 458 for (T connection : connections.values()) { 459 ConnectionId remoteId = connection.remoteId(); 460 if (remoteId.address.getPort() == sn.getPort() 461 && remoteId.address.getHostName().equals(sn.getHostname())) { 462 LOG.info("The server on " + sn.toString() + " is dead - stopping the connection " 463 + connection.remoteId); 464 connections.removeValue(remoteId, connection); 465 connection.shutdown(); 466 connection.cleanupConnection(); 467 } 468 } 469 } 470 } 471 /** 472 * Configure an hbase rpccontroller 473 * @param controller to configure 474 * @param channelOperationTimeout timeout for operation 475 * @return configured controller 476 */ 477 static HBaseRpcController configureHBaseRpcController( 478 RpcController controller, int channelOperationTimeout) { 479 HBaseRpcController hrc; 480 if (controller != null && controller instanceof HBaseRpcController) { 481 hrc = (HBaseRpcController) controller; 482 if (!hrc.hasCallTimeout()) { 483 hrc.setCallTimeout(channelOperationTimeout); 484 } 485 } else { 486 hrc = new HBaseRpcControllerImpl(); 487 hrc.setCallTimeout(channelOperationTimeout); 488 } 489 return hrc; 490 } 491 492 protected abstract void closeInternal(); 493 494 @Override 495 public void close() { 496 if (LOG.isDebugEnabled()) { 497 LOG.debug("Stopping rpc client"); 498 } 499 Collection<T> connToClose; 500 synchronized (connections) { 501 if (!running) { 502 return; 503 } 504 running = false; 505 connToClose = connections.values(); 506 connections.clear(); 507 } 508 cleanupIdleConnectionTask.cancel(true); 509 for (T conn : connToClose) { 510 conn.shutdown(); 511 } 512 closeInternal(); 513 for (T conn : connToClose) { 514 conn.cleanupConnection(); 515 } 516 } 517 518 @Override 519 public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn, final User ticket, 520 int rpcTimeout) throws UnknownHostException { 521 return new BlockingRpcChannelImplementation(this, createAddr(sn), ticket, rpcTimeout); 522 } 523 524 @Override 525 public RpcChannel createRpcChannel(ServerName sn, User user, int rpcTimeout) 526 throws UnknownHostException { 527 return new RpcChannelImplementation(this, createAddr(sn), user, rpcTimeout); 528 } 529 530 private static class AbstractRpcChannel { 531 532 protected final InetSocketAddress addr; 533 534 protected final AbstractRpcClient<?> rpcClient; 535 536 protected final User ticket; 537 538 protected final int rpcTimeout; 539 540 protected AbstractRpcChannel(AbstractRpcClient<?> rpcClient, InetSocketAddress addr, 541 User ticket, int rpcTimeout) { 542 this.addr = addr; 543 this.rpcClient = rpcClient; 544 this.ticket = ticket; 545 this.rpcTimeout = rpcTimeout; 546 } 547 548 /** 549 * Configure an rpc controller 550 * @param controller to configure 551 * @return configured rpc controller 552 */ 553 protected HBaseRpcController configureRpcController(RpcController controller) { 554 HBaseRpcController hrc; 555 // TODO: Ideally we should not use an RpcController other than HBaseRpcController at client 556 // side. And now we may use ServerRpcController. 557 if (controller != null && controller instanceof HBaseRpcController) { 558 hrc = (HBaseRpcController) controller; 559 if (!hrc.hasCallTimeout()) { 560 hrc.setCallTimeout(rpcTimeout); 561 } 562 } else { 563 hrc = new HBaseRpcControllerImpl(); 564 hrc.setCallTimeout(rpcTimeout); 565 } 566 return hrc; 567 } 568 } 569 570 /** 571 * Blocking rpc channel that goes via hbase rpc. 572 */ 573 @VisibleForTesting 574 public static class BlockingRpcChannelImplementation extends AbstractRpcChannel 575 implements BlockingRpcChannel { 576 577 protected BlockingRpcChannelImplementation(AbstractRpcClient<?> rpcClient, 578 InetSocketAddress addr, User ticket, int rpcTimeout) { 579 super(rpcClient, addr, ticket, rpcTimeout); 580 } 581 582 @Override 583 public Message callBlockingMethod(Descriptors.MethodDescriptor md, RpcController controller, 584 Message param, Message returnType) throws ServiceException { 585 return rpcClient.callBlockingMethod(md, configureRpcController(controller), 586 param, returnType, ticket, addr); 587 } 588 } 589 590 /** 591 * Async rpc channel that goes via hbase rpc. 592 */ 593 public static class RpcChannelImplementation extends AbstractRpcChannel implements 594 RpcChannel { 595 596 protected RpcChannelImplementation(AbstractRpcClient<?> rpcClient, InetSocketAddress addr, 597 User ticket, int rpcTimeout) throws UnknownHostException { 598 super(rpcClient, addr, ticket, rpcTimeout); 599 } 600 601 @Override 602 public void callMethod(Descriptors.MethodDescriptor md, RpcController controller, 603 Message param, Message returnType, RpcCallback<Message> done) { 604 // This method does not throw any exceptions, so the caller must provide a 605 // HBaseRpcController which is used to pass the exceptions. 606 this.rpcClient.callMethod(md, 607 configureRpcController(Preconditions.checkNotNull(controller, 608 "RpcController can not be null for async rpc call")), 609 param, returnType, ticket, addr, done); 610 } 611 } 612}